hadoop-rpc使用实例

Hadoop RPC主要对外提供2种接口

  1. 构造一个客户端代理对象,向服务器发送RPC请求

    1
    public static ProtocolProxy getProxy/waitForProxy
  2. 为某个协议实例构造一个服务器对象,用于处理客户端发送请求。

    1
    public static Server RPC.Builder(Configuration).build()

    注意: 在Hadoop以前的版本中,这个接口为

    1
    public static Server getServer()

Hadoop RPC的使用方法可以分为4个步骤

  1. 定义RPC协议。RPC协议是客户端和服务器之间的通信接口,它定义了服务器对外提供的服务接口(方法),Hadoop中所有自定义的RPC接口都需继承 VersionedProtocol 正如Java RMI需要实现 Remote 接口一样
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    import org.apache.hadoop.ipc.VersionedProtocol;

    public interface DemoProtocol extends VersionedProtocol {
    //注意接口中必须有一个名为 versionID 的字段,
    //因为客户端调用时,服务器会反射获取这个字段,以保证调用的服务接口版本统一,
    //不然会抛出 java.lang.NoSuchFieldException: versionID 异常
    public static final long versionID = 0;

    String echo(String name);
    }
  1. 实现RPC协议。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    import org.apache.hadoop.ipc.ProtocolSignature;

    import java.io.IOException;

    public class DemoProtocolImpl implements DemoProtocol {
    public String echo(String name) {
    return "Hadoop Server echo: " + name;
    }

    public long getProtocolVersion(String s, long l) throws IOException {
    return DemoProtocol.versionID;
    }

    public ProtocolSignature getProtocolSignature(String s, long l, int i) throws IOException {
    return new ProtocolSignature(DemoProtocol.versionID, null);
    }
    }
  2. 构造RPC Server

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.ipc.RPC;

    import java.io.IOException;

    public class RpcServer {

    public static void main(String[] args) throws IOException {
    RPC.Server server = new RPC.Builder(new Configuration()).setProtocol(DemoProtocol.class)
    .setInstance(new DemoProtocolImpl()).setBindAddress("127.0.0.1")
    .setPort(9000).setNumHandlers(5).build();

    server.start();
    }
    }
  3. 构造RPC Client 并发送RPC请求

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.ipc.RPC;

    import java.io.IOException;
    import java.net.InetSocketAddress;

    public class RpcClient {

    public static void main(String[] args) throws IOException {
    DemoProtocol proxy = RPC.getProxy(DemoProtocol.class, DemoProtocol.versionID, new InetSocketAddress("127.0.0.1", 9000), new Configuration());

    String result = proxy.echo("123");

    System.out.println(result);
    }
    }

调用结果 成功输出 Hadoop Server echo: 123

代码需要的依赖 Maven

1
2
3
4
5
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.4</version>
</dependency>