从源码中分析Hadoop的RPC机制
RPC是Remote Procedure Call(远程过程调用)的简称,这一机制都要面对两个问题
对象调用方式;
序列/反序列化机制
在此之前,我们有必要了解什么是架构层次的协议。通俗一点说,就是我把某些接口和接口中的方法称为协议,客户端和服务端只要实现这些接口中的方法就可以进行通信了,从这个角度来说,架构层次协议的说法就可以成立了。Hadoop的RPC机制正是采用了这种“架构层次的协议”,有一整套作为协议的接口,如下图
Hadoop的RPC组件,依赖于Hadoop Writable接口类型的支持,要求每个实现类都要确保将本类的对象正确序列化与反序列化。因此RPC使用Java动态代理与反射实现对象调用方式,客户端到服务器数据的序列化与反序列化由Hadoop框架或用户自己来实现,也就是数据组装时定制的。RPC架构图如下
VersionedProtocol接口
VersionedProtocol是所有RPC协议接口的父接口,其中只有一个方法:getProtocolVersion()
HDFS相关
ClientDatanodeProtocol:一个客户端和datanode之间的协议接口,用于数据块恢复。
ClientProtocol:client与Namenode交互的接口,所有控制流的请求均在这里,如:创建文件、删除文件等;
- DatanodeProtocol : Datanode与Namenode交互的接口,如心跳、blockreport等;
NamenodeProtocol:SecondaryNode与Namenode交互的接口。
Mapreduce相关
InterDatanodeProtocol:Datanode内部交互的接口,用来更新block的元数据;
InnerTrackerProtocol:TaskTracker与JobTracker交互的接口,功能与DatanodeProtocol相似;
JobSubmissionProtocol:JobClient与JobTracker交互的接口,用来提交Job、获得Job等与Job相关的操作;
TaskUmbilicalProtocol:Task中子进程与母进程交互的接口,子进程即map、reduce等操作,母进程即TaskTracker,该接口可以回报子进程的运行状态(词汇扫盲: umbilical 脐带的, 关系亲密的) 。
RPC实现流程
简单来说,Hadoop RPC=动态代理+定制的二进制流。分布式对象一般都会要求根据接口生成存根和框架。如 CORBA,可以通过 IDL,生成存根和框架。在ipc.RPC类中有一些内部类,下边简单介绍下
Invocation:用于封装方法名和参数,作为数据传输层,相当于VO吧。
ClientCache:用于存储client对象,用socket factory作为hash key,存储结构为
hashMap <SocketFactory, Client>
。Invoker:是动态代理中的调用实现类,继承了InvocationHandler.
Server:是ipc.Server的实现类。我们就需要这样的步骤了。
上类图
从以上的分析可以知道,Invocation类仅作为VO,ClientCache类只是作为缓存,而Server类用于服务端的处理,他们都和客户端的数据流和业务逻辑没有关系。为了分析 Invoker,我们需要介绍一些 Java 反射实现 Dynamic Proxy 的背景。
Dynamic Proxy 是由两个 class 实现的:java.lang.reflect.Proxy
和 java.lang.reflect.InvocationHandler
,后者是一个接口。
所谓 Dynamic Proxy 是这样一种 class:它是在运行时生成的 class,在生成它时你必须提供一组 interface 给它,然后该 class就宣称它实现了这些 interface。
这个 Dynamic Proxy 其实就是一个典型的 Proxy 模式,它丌会替你作实质性的工作,在生成它的实例时你必须提供一个handler,由它接管实际的工作。
这个 handler,在 Hadoop 的 RPC 中,就是 Invoker 对象。
我们可以简单地理解:就是你可以通过一个接口来生成一个类,这个类上的所有方法调用,都会传递到你生成类时传递的
InvocationHandler 实现中。
在 Hadoop 的 RPC 中,Invoker 实现了 InvocationHandler 的 invoke 方法(invoke 方法也是 InvocationHandler 的唯一方法)。 Invoker 会把所有跟这次调用相关的调用方法名,参数类型列表,参数列表打包,然后利用前面我们分析过的 Client,通过 socket 传递到服务器端。就是说,你在 proxy 类上的任何调用,都通过 Client 发送到远方的服务器上。
Invoker 使用 Invocation。 Invocation 封装了一个过程调用的所有相关信息,它的主要属性有: methodName,调用方法名,parameterClasses,调用方法参数的类型列表和 parameters,调用方法参数。注意,它实现了 Writable 接口,可以串行化。
RPC.Server 实现了 org.apache.hadoop.ipc.Server
,你可以把一个对象,通过 RPC,升级成为一个服务器。服务器接收到的请求(通过 Invocation),解串行化以后,就发成了方法名,方法参数列表和参数列表。调用 Java 反射,我们就可以调用对应的对象的方法。调用的结果再通过 socket,迒回给客户端,客户端把结果解包后,就可以返回给Dynamic Proxy 的使用者了。
我们接下来去研究的就是RPC.Invoker类中的invoke()方法了,代码如下
public Object invoke(Object proxy, Method method, Object[] args) |
一般我们看到的动态代理的invoke()方法中总会有 method.invoke(ac, arg); 这句代码。而上面代码中却没有。其实使用 method.invoke(ac, arg); 是在本地JVM中调用;而在hadoop中,是将数据发送给服务端,服务端将处理的结果再返回给客户端,所以这里的invoke()方法必然需要进行网络通信。而网络通信就是下面的这段代码实现的:
ObjectWritable value = (ObjectWritable) |
Invocation类在这里封装了方法名和参数,充当VO。其实这里网络通信只是调用了Client类的call()方法。
ipc.Client源码
接下来分析一下ipc.Client源码,在此之前我们得明确下我们的目标,总结出了以下几个问题
客户端和服务端的连接是怎样建立的?
客户端是怎样给服务端发送数据的?
客户端是怎样获取服务端的返回数据的?
基于这三个问题,我们开始分析ipc.Client源码,主要包含以下几个类
- Call:用于封装Invocation对象,作为VO,写到服务端,同时也用于存储从服务端返回的数据。
- Connection:用以处理远程连接对象。继承了Thread
- ConnectionId:唯一确定一个连接
Question1:客户端和服务端的连接是怎样建立的?
Client类中的cal()方法如下
public Writable call(Writable param, ConnectionId remoteId) |
具体代码的作用我已做了注释,所以这里不再赘述。分析代码后,我们会发现和网络通信有关的代码只会是下面的两句了:
Connection connection = getConnection(remoteId, call); //获得一个连接 |
先看看是怎么获得一个到服务端的连接吧,下面贴出ipc.Client类中的getConnection()方法。
private Connection getConnection(ConnectionId remoteId, |
Client.Connection类中的setupIOstreams()方法如下:
private synchronized void setupIOstreams() throws InterruptedException |
再有一步我们就知道客户端的连接是怎么建立的啦,下面贴出Client.Connection类中的setupConnection()方法
private synchronized void setupConnection() throws IOException { |
不难看出客户端的连接是创建一个普通的socket进行通信的。
Question2:客户端是怎样给服务端发送数据的?
Client.Connection类的sendParam()方法如下
public void sendParam(Call call) { |
Question3:客户端是怎样获取服务端的返回数据的?
Client.Connection类和Client.Call类中的相关方法如下
Method1:
public void run() {
……
while (waitForWork()) {
receiveResponse(); //具体的处理方法
}
close();
……
}
Method2:
private void receiveResponse() { |
Method3:public synchronized void setValue(Writable value) {
this.value = value;
callComplete(); //具体实现
}
protected synchronized void callComplete() {
this.done = true;
notify(); // 唤醒client等待线程
}
启动一个处理线程,读取从服务端传来的call对象,将call对象读取完毕后,唤醒client处理线程。就这么简单,客户端就获取了服务端返回的数据。客户端的源码分析暂时到这,下面我们来分析Server端的源码
ipc.Server源码分析
内部类如下
Call :用于存储客户端发来的请求
Listener : 监听类,用于监听客户端发来的请求,同时Listener内部还有一个静态类,Listener.Reader,当监听器监听到用户请求,便让Reader读取用户请求。
Responder :响应RPC请求类,请求处理完毕,由Responder发送给请求客户端。
Connection :连接类,真正的客户端请求读取逻辑在这个类中。
Handler :请求处理类,会循环阻塞读取callQueue中的call对象,并对其进行操作。
初始化Server
hadoop是怎样初始化RPC的Server端的呢?
Namenode初始化时一定初始化了RPC的Sever端,那我们去看看Namenode的初始化源码
private void initialize(Configuration conf) throws IOException { |
RPC的server对象是通过ipc.RPC类的getServer()方法获得的。ipc.RPC类中的getServer()源码如下
public static Server getServer(final Object instance, final String bindAddress, final int port, |
getServer()是一个创建Server对象的工厂方法,但创建的却是RPC.Server类的对象。
运行Server
初始化Server后,Server端就运行起来了,看看ipc.Server的start()源码
/** 启动服务 */ |
Server处理请求
- 分析源码得知,Server端采用Listener监听客户端的连接,下面先分析一下Listener的构造函数
public Listener() throws IOException { |
在启动Listener线程时,服务端会一直等待客户端的连接,下面贴出Server.Listener类的run()方法
public void run() |
Server.Listener类中doAccept ()方法中的关键源码如下:
void doAccept(SelectionKey key) throws IOException, OutOfMemoryError { |
当reader被唤醒,reader接着执行doRead()方法。
- 接收请求
Server.Listener.Reader类中的doRead()方法和Server.Connection类中的readAndProcess()方法源码如下:
Method1: void doRead(SelectionKey key) throws InterruptedException {
int count = 0;
Connection c = (Connection)key.attachment(); //获得connection对象
if (c == null) {
return;
}
c.setLastContact(System.currentTimeMillis());
try {
count = c.readAndProcess(); // 接受并处理请求
} catch (InterruptedException ieo) {
……
}
……
}
Method2:
public int readAndProcess() throws IOException, InterruptedException { |
- 获得call对象
Method1: private void processOneRpc(byte[] buf) throws IOException,
InterruptedException {
if (headerRead) {
processData(buf);
} else {
processHeader(buf);
headerRead = true;
if (!authorizeConnection()) {
throw new AccessControlException("Connection from " + this
+ " for protocol " + header.getProtocol()
+ " is unauthorized for user " + user);
}
}
}
Method2:private void processData(byte[] buf) throws IOException, InterruptedException {
DataInputStream dis =
new DataInputStream(new ByteArrayInputStream(buf));
int id = dis.readInt(); // 尝试读取id
Writable param = ReflectionUtils.newInstance(paramClass, conf);//读取参数
param.readFields(dis);
Call call = new Call(id, param, this); //封装成call
callQueue.put(call); // 将call存入callQueue
incRpcCount(); // 增加rpc请求的计数
}
- 处理call对象
对call对象的处理是Server类中的Handler内部类来处理的。Server.Handler类中run()方法中的关键代码如下:
while (running) { |
- 返回请求
Server.Responder类中的doRespond()方法源码如下:
void doRespond(Call call) throws IOException |