转载

Java如何实现简单的RPC框架

一、RPC简介

RPC,全称为Remote Procedure Call,即远程过程调用,它是一个计算机通信协议。它允许像调用本地服务一样调用远程服务。它可以有不同的实现方式。如RMI(远程方法调用)、Hessian、Http invoker等。另外,RPC是与语言无关的。

RPC示意图

如上图所示,假设Computer1在调用sayHi()方法,对于Computer1而言调用sayHi()方法就像调用本地方法一样,调用 –>返回。但从后续调用可以看出Computer1调用的是Computer2中的sayHi()方法,RPC屏蔽了底层的实现细节,让调用者无需关注网络通信,数据传输等细节。

二、RPC框架的实现

上面介绍了RPC的核心原理:RPC能够让本地应用简单、高效地调用服务器中的过程(服务)。它主要应用在分布式系统。如Hadoop中的IPC组件。但怎样实现一个RPC框架呢?

从下面几个方面思考,仅供参考:

1.通信模型:假设通信的为A机器与B机器,A与B之间有通信模型,在Java中一般基于BIO或NIO;。

2.过程(服务)定位:使用给定的通信方式,与确定IP与端口及方法名称确定具体的过程或方法;

3.远程代理对象:本地调用的方法(服务)其实是远程方法的本地代理,因此可能需要一个远程代理对象,对于Java而言,远程代理对象可以使用Java的动态对象实现,封装了调用远程方法调用;

4.序列化,将对象名称、方法名称、参数等对象信息进行网络传输需要转换成二进制传输,这里可能需要不同的序列化技术方案。如:protobuf,Arvo等。

三、Java实现RPC框架

1、实现技术方案

下面使用比较原始的方案实现RPC框架,采用Socket通信、动态代理与反射与Java原生的序列化。

2、RPC框架架构

RPC架构分为三部分:

1)服务提供者,运行在服务器端,提供服务接口定义与服务实现类。

2)服务中心,运行在服务器端,负责将本地服务发布成远程服务,管理远程服务,提供给服务消费者使用。

3)服务消费者,运行在客户端,通过远程代理对象调用远程服务。

3、 具体实现

服务提供者接口定义与实现,代码如下:

public interface HelloService {

  String sayHi(String name);

}

HelloServices接口实现类:

public class HelloServiceImpl implements HelloService {

  public String sayHi(String name) {
    return "Hi, " + name;
  }

}

服务中心代码实现,代码如下:

public interface Server {
  public void stop();

  public void start() throws IOException;

  public void register(Class serviceInterface, Class impl);

  public boolean isRunning();

  public int getPort();
}

服务中心实现类:

public class ServiceCenter implements Server {
  private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

  private static final HashMap<String, Class> serviceRegistry = new HashMap<String, Class>();

  private static boolean isRunning = false;

  private static int port;

  public ServiceCenter(int port) {
    this.port = port;
  }

  public void stop() {
    isRunning = false;
    executor.shutdown();
  }

  public void start() throws IOException {
    ServerSocket server = new ServerSocket();
    server.bind(new InetSocketAddress(port));
    System.out.println("start server");
    try {
      while (true) {
        // 1.监听客户端的TCP连接,接到TCP连接后将其封装成task,由线程池执行
        executor.execute(new ServiceTask(server.accept()));
      }
    } finally {
      server.close();
    }
  }

  public void register(Class serviceInterface, Class impl) {
    serviceRegistry.put(serviceInterface.getName(), impl);
  }

  public boolean isRunning() {
    return isRunning;
  }

  public int getPort() {
    return port;
  }

  private static class ServiceTask implements Runnable {
    Socket clent = null;

    public ServiceTask(Socket client) {
      this.clent = client;
    }

    public void run() {
      ObjectInputStream input = null;
      ObjectOutputStream output = null;
      try {
        // 2.将客户端发送的码流反序列化成对象,反射调用服务实现者,获取执行结果
        input = new ObjectInputStream(clent.getInputStream());
        String serviceName = input.readUTF();
        String methodName = input.readUTF();
        Class<?>[] parameterTypes = (Class<?>[]) input.readObject();
        Object[] arguments = (Object[]) input.readObject();
        Class serviceClass = serviceRegistry.get(serviceName);
        if (serviceClass == null) {
          throw new ClassNotFoundException(serviceName + " not found");
        }
        Method method = serviceClass.getMethod(methodName, parameterTypes);
        Object result = method.invoke(serviceClass.newInstance(), arguments);

        // 3.将执行结果反序列化,通过socket发送给客户端
        output = new ObjectOutputStream(clent.getOutputStream());
        output.writeObject(result);
      } catch (Exception e) {
        e.printStackTrace();
      } finally {
        if (output != null) {
          try {
            output.close();
          } catch (IOException e) {
            e.printStackTrace();
          }
        }
        if (input != null) {
          try {
            input.close();
          } catch (IOException e) {
            e.printStackTrace();
          }
        }
        if (clent != null) {
          try {
            clent.close();
          } catch (IOException e) {
            e.printStackTrace();
          }
        }
      }

    }
  }
}

客户端的远程代理对象:

public class RPCClient<T> {
  public static <T> T getRemoteProxyObj(final Class<?> serviceInterface, final InetSocketAddress addr) {
    // 1.将本地的接口调用转换成JDK的动态代理,在动态代理中实现接口的远程调用
    return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class<?>[]{serviceInterface},
        new InvocationHandler() {
          public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            Socket socket = null;
            ObjectOutputStream output = null;
            ObjectInputStream input = null;
            try {
              // 2.创建Socket客户端,根据指定地址连接远程服务提供者
              socket = new Socket();
              socket.connect(addr);

              // 3.将远程服务调用所需的接口类、方法名、参数列表等编码后发送给服务提供者
              output = new ObjectOutputStream(socket.getOutputStream());
              output.writeUTF(serviceInterface.getName());
              output.writeUTF(method.getName());
              output.writeObject(method.getParameterTypes());
              output.writeObject(args);

              // 4.同步阻塞等待服务器返回应答,获取应答后返回
              input = new ObjectInputStream(socket.getInputStream());
              return input.readObject();
            } finally {
              if (socket != null) socket.close();
              if (output != null) output.close();
              if (input != null) input.close();
            }
          }
        });
  }
}

最后为测试类:

public class RPCTest {

  public static void main(String[] args) throws IOException {
    new Thread(new Runnable() {
      public void run() {
        try {
          Server serviceServer = new ServiceCenter(8088);
          serviceServer.register(HelloService.class, HelloServiceImpl.class);
          serviceServer.start();
        } catch (IOException e) {
          e.printStackTrace();
        }
      }
    }).start();
    HelloService service = RPCClient.getRemoteProxyObj(HelloService.class, new InetSocketAddress("localhost", 8088));
    System.out.println(service.sayHi("test"));
  }
}

运行结果:

regeist service HelloService  start server  Hi, test

四、总结

RPC本质为消息处理模型,RPC屏蔽了底层不同主机间的通信细节,让进程调用远程的服务就像是本地的服务一样。

五、可以改进的地方

这里实现的简单RPC框架是使用Java语言开发,与Java语言高度耦合,并且通信方式采用的Socket是基于BIO实现的,IO效率不高,还有Java原生的序列化机制占内存太多,运行效率也不高。可以考虑从下面几种方法改进。

  1. 可以采用基于JSON数据传输的RPC框架;
  2. 可以使用NIO或直接使用Netty替代BIO实现;
  3. 使用开源的序列化机制,如Hadoop Avro与Google protobuf等;
  4. 服务注册可以使用Zookeeper进行管理,能够让应用更加稳定。

以上就是Java如何实现简单的RPC框架的详细内容,更多关于Java实现RPC框架的资料请关注我们其它相关文章!

时间:2020-07-09

Java RPC框架过滤器机制原理解析

过滤器 字面义上理解的过滤器类似下图,从一堆物品中筛选出符合条件的留下,不符合的丢弃. GOF 职责链 GOF中有一种设计模式叫职责链,或者叫责任链,常规的UML图如下: 正统的职责链是将一个请求发给第一个接收者,接收者判断是否属于自己能处理的,如果能处理则执行操作并中止请求下发,流程到此为止.如果不能处理则将请求下发给下一个接收者一直到最后一个接收者. 变体职责链 上面提到正统的职责链有一个特点:当找到符合条件的执行者后流程中止并不会将请求继续下发给后续的执行者.这类场景比较适合比如审核操作,

分析JAVA中几种常用的RPC框架

RPC是远程过程调用的简称,广泛应用在大规模分布式应用中,作用是有助于系统的垂直拆分,使系统更易拓展.Java中的RPC框架比较多,各有特色,广泛使用的有RMI.Hessian.Dubbo等.RPC还有一个特点就是能够跨语言,本文只以JAVA语言里的RPC为例. 对于RPC有一个逻辑关系图,以RMI为例: 其他的框架结构也类似,区别在于对象的序列化方法,传输对象的通讯协议,以及注册中心的管理与failover设计(利用zookeeper). 客户端和服务端可以运行在不同的JVM中,Client只

Java RPC框架熔断降级机制原理解析

熔断与降级 为什么在RPC环节中有熔断以及降级的需求,详细的原因这里不多解释,从网上搜索一张图做示意. 熔断 我理解熔段主要解决如下几个问题: 当所依赖的对象不稳定时,能够起到快速失败的目的快速失败后,能够根据一定的算法动态试探所依赖对象是否恢复 比如产品详细页获取产品的好评总数时,由于后端服务异常导致客户端每次都需要等到超时.如果短时间内服务不能恢复,那么这段时间内的所有请求时间都将是最大的超时时间,这类消费时间又得不到正确结果的现象是不能容忍的.所以遇到这类情况,就需要根据一定的算法判定服务

Java利用Sping框架编写RPC远程过程调用服务的教程

RPC,即 Remote Procedure Call(远程过程调用),说得通俗一点就是:调用远程计算机上的服务,就像调用本地服务一样. RPC 可基于 HTTP 或 TCP 协议,Web Service 就是基于 HTTP 协议的 RPC,它具有良好的跨平台性,但其性能却不如基于 TCP 协议的 RPC.会两方面会直接影响 RPC 的性能,一是传输方式,二是序列化. 众所周知,TCP 是传输层协议,HTTP 是应用层协议,而传输层较应用层更加底层,在数据传输方面,越底层越快,因此,在一般情况下

Java RPC框架如何实现客户端限流配置

关键资源 关键资源总是有限的,也就意味着处理能力也有限,所以当面对大量业务时,为了保障自己能够有序的提供服务最经济的做法就是限制同一时间处理的事务数.比如银行的工作人员,一个工作人员同时只能为一个客户服务,来多了根本处理不了,不光是一种浪费而且有可以造成混乱的局面导致工作人员无法工作. 网络请求漏斗 越上层的服务器处理的事务越轻,应付请求的能力也越强,也就意味着同一请求越上层处理时间越短.为了有效的保护下层服务器,就需要对发送给下层的请求量做限流,在下层服务器可接受的范围内.否则就可能会出现下层

详解Nginx限流配置

本文以示例的形式,由浅入深讲解Nginx限流相关配置,是对简略的官方文档的积极补充. Nginx限流使用的是leaky bucket算法,如对算法感兴趣,可移步维基百科先行阅读.不过不了解此算法,不影响阅读本文. 空桶 我们从最简单的限流配置开始: limit_req_zone $binary_remote_addr zone=ip_limit:10m rate=10r/s; server { location /login/ { limit_req zone=ip_limit; proxy_p

Nginx抢购限流配置实现解析

因业务需求经常会有抢购业务,因此需要在负载均衡前端进行限流错误.本文同样也适用于防止CC. limit_req_zone $server_name zone=sname:10m rate=1r/s; #限制服务器每秒只能有一次访问成功 #limit_req_zone $binary_remote_addr zone=one:3m rate=1r/s; #限制IP,每秒只能访问一次 #limit_req_zone $binary_remote_addr $uri zone=two:3m rate=

Nginx对网段内ip的连接数限流配置详解

Nginx中的所谓连接数限制,其实是tcp连接,也就是请求方通过三次握手后成功建立的连接状态.Nginx一般为我们提供了 ngx_http_limit_conn_module 模块来提供限制连接功能.该模块可以根据定义的键来限制每个键值的连接数,如同一个IP来源的连接数. ngx_http_limit_conn_module指令解释 Syntax: limit_conn zone number; Default: - Context: http, server, location 该指令描述会话

Nginx源码研究之nginx限流模块详解

高并发系统有三把利器:缓存.降级和限流: 限流的目的是通过对并发访问/请求进行限速来保护系统,一旦达到限制速率则可以拒绝服务(定向到错误页).排队等待(秒杀).降级(返回兜底数据或默认数据): 高并发系统常见的限流有:限制总并发数(数据库连接池).限制瞬时并发数(如nginx的limit_conn模块,用来限制瞬时并发连接数).限制时间窗口内的平均速率(nginx的limit_req模块,用来限制每秒的平均速率): 另外还可以根据网络连接数.网络流量.CPU或内存负载等来限流. 1.限流算法 最

使用nginx实现分布式限流的方法

1.前言 一般对外暴露的系统,在促销或者黑客攻击时会涌来大量的请求,为了保护系统不被瞬间到来的高并发流量给打垮, 就需要限流 . 本文主要阐述如何用nginx 来实现限流. 听说 Hystrix 也可以, 各位有兴趣可以去研究哈 . 2.首先部署一个对外暴露接口的程序 我这里部署的是一个spring boot 项目 里面暴露了如下接口, 很简单 暴露了一个 get 请求返回 hello world 的restful 接口. 将此程序部署到 linux 服务器上. 部署步奏不再赘述, 自行百度 s

详解Spring Cloud Gateway 限流操作

Java如何实现简单的RPC框架

开发高并发系统时有三把利器用来保护系统:缓存.降级和限流. API网关作为所有请求的入口,请求量大,我们可以通过对并发访问的请求进行限速来保护系统的可用性. 常用的限流算法比如有令牌桶算法,漏桶算法,计数器算法等. 在Zuul中我们可以自己去实现限流的功能 (Zuul中如何限流在我的书 <Spring Cloud微服务-全栈技术与案例解析>  中有详细讲解) ,Spring Cloud Gateway的出现本身就是用来替代Zuul的. 要想替代那肯定得有强大的功能,除了性能上的优势之外,Spr

Java实现简单的RPC框架的示例代码

一.RPC简介 RPC,全称为Remote Procedure Call,即远程过程调用,它是一个计算机通信协议.它允许像调用本地服务一样调用远程服务.它可以有不同的实现方式.如RMI(远程方法调用).Hessian.Http invoker等.另外,RPC是与语言无关的. rpc框架做的最重要的一件事情就是封装,调用者和被调用者的通讯细节,客户端代理负责向调用方法的方法名参数返回值包等信息根据通信协议组织成报文发送给服务端,服务端解析报文,根据客户端传递的信息执行对应的方法,然后将返回值安装协

原文  https://www.zhangshengrong.com/p/w4N7DljjXr/
正文到此结束
Loading...