13.jpg
具体参考example包下的案例
新建一个普通的SpringBoot项目ServerA, 然后导入服务注册依赖
<!-- 服务注册 -->
<dependency>
<groupId>com.burukeyou.BoomRpc</groupId>
<artifactId>BoomRpc.register</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
复制代码
然后编写application.properties文件,配置服务发布的信息
#发布服务的端口 server.port=8090 # 发布服务的应用名 spring.application.name=ServerA # 服务注册中心的地址,即Zookeeper的连接地址 boomRpc.register.address=192.168.1.19:2181 复制代码
在SpringBoot程序入口类添加@EnableServerRegister注解开启服务注册
@EnableServerRegister
@SpringBootApplication
public class ServerA_Application {
public static void main(String[] args) {
SpringApplication.run(ServerA_Application.class,args);
}
}
复制代码
编写需要公开被调用的服务并用@BoomService注解标注, 并填写服务名与接口ProductService保持一致
public interface ProductService {
List<String> getAllProductByUserId(String id);
void buyOne(Integer productId);
}
@BoomService("ProductService")
public class ProductServiceImpl implements ProductService {
@Override
public List<String> getAllProductByUserId(String id) {
return Arrays.asList("苹果","西瓜","饮料");
}
@Override
public void buyOne(Integer productId) {
System.out.println("购买商品:" + productId);
}
}
复制代码
连续启动3个此项目, 依次修改application.properties的server.port为8090,8091,8092 启动项目,服务发布成功,在Zookeer可观察到, 服务应用ServerA下有三个服务器节点提供者
新建一个普通SpringBoot项目Client, 导入依赖
<!-- 服务调用 -->
<dependency>
<groupId>com.burukeyou.BoomRpc</groupId>
<artifactId>BoomRpc.rpc</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
复制代码
编写application.properties配置文件
server.port=8080 spring.application.name=client # 配置负载均衡策略: 轮训 boomRpc.rpc.loadblacne=roundRobin # 服务注册中心地址 boomRpc.register.address=192.168.1.19:2181 复制代码
复制需要远端调用的服务接口,比如ServerA项目中编写的ProductService接口类, 并用
@BoomRpc(name = "ServerA",callback = ProductServiceCallback.class)//
public interface ProductService {
List<String> getAllProductByUserId(String id);
void buyOne(Integer productId);
}
// 容错处理类
public class ProductServiceCallback extends Callback implements ProductService {
@Override
public List<String> getAllProductByUserId(String id) {
Throwable throwable = getThrowable(); // 获得失败异常信息
System.err.println("getAllProductByUserId服务调用失败: "+throwable.getMessage());
return Arrays.asList("获取商品失败");
}
@Override
public void buyOne(Integer productId) {
Throwable throwable = getThrowable();
System.err.println("buyOne服务调用失败: "+throwable.getMessage());
}
}
复制代码
在Cleint应用程序启动类添加开启服务调用功能注解@EnableBoomRpc, 而provider属性填写需要远端调用的接口的所在的包的位置 (即之前编写的ProductService所在包的位置)
@EnableBoomRpc(provider = {"burukeyou.client.rpc"})
@SpringBootApplication
public class Client01Application {
public static void main(String[] args) {
SpringApplication.run(Client01Application.class,args);
}
}
复制代码
编写Controller测试远程调用效果
@RestController
public class TestController {
@Autowired
private ProductService productService;
@RequestMapping("/b")
public void testProductServiceBuyOne(){
productService.buyOne(47289384);
}
@RequestMapping("/c")
public List<String> testProductServiceGetAll(){
return productService.getAllProductByUserId("5324534");
}
}
复制代码
启动Client程序,并调用接口http://localhost:8080/c ,发现与期待结果一致远程调用了ServerA的实现类
此次请求被负载均衡到了 ServerA的 8090节点
连续发送三个 http://localhost:8080/c 请求, 由于使用轮训策略, 服务器节点依次被调用
此时若再启动一个ServerA项目端口为8093, 测Client应用能动态感知到更新本地服务提供者缓存,保证数据一致性.
这时再发请求发现请求被负载到了新的服务器节点
同里把8093节点停掉也能动态感知到, 之后如何请求也不会被负载到8093上
如果把所有服务提供者Server停掉, 则Client远程调用将会失败,但配置了降级策略会回调到callback类进行处理, 如果不配置callbakc属性默认会抛出异常
再请求http://localhost:8080/c接口返回,发现代码输出与ProductServiceCallback类被回调了
为了模拟请求超时,把ServerRequesthandler类的睡眠代码 Thread.sleep(30000);解开注释
之后再次发送http://localhost:8080/c请求, 会请求超时被降级处理类回调
每次请求发送的超时时间默认是2秒,默认会重试3次发送, 在第4次发现重试次数用完就会抛出异常被降级处理类ProductServiceCallback处理回调显示
抛开所有不谈,只要能远程调用其他服务上的方法就算是实现了RPC,再说白就是服务于服务之间能进行通讯,所以只能我们之间能进行通讯我就能进行远程调用
private Object handleRequest(RpcRequest request) {
//获得要调用哪个类
String className = request.getClassName();
className = className.substring(className.lastIndexOf(".")+1);
// 获得要调用的这个类的哪个方法
String methodName = request.getMethodName();
// 获得要调用的这个类的这个方法的参数类型
Class<?>[] parameterTypes = request.getParameterTypes();
// 获得要调用的这个类的这个方法的参数类型的具体值
Object[] parameters = request.getParameters();
// 获得这个类的具体实现类的class对象
Object obj = ServerRegisterBoot.impClassMap.get(className);
Class<?> clazz = obj.getClass();
// 根据这个类的具体实现类的class对象获取到方法对象
Method method = clazz.getMethod(methodName, parameterTypes);
// 执行方法对象的到结果
return method.invoke(obj, parameters);
}
复制代码
另一个关键就是如何像调用本地方法一样调用远程方法, 不可能说你每次要远程调用就重新写一大堆的代码,什么建立通讯连接,发送请求数据, 处理请求数据,把请求数据返回吧
public interface ProductService {
List<String> get(String id);
}
@Autowired
private ProductService productService; // 2
productService.getAllProductByUserId(id); // 3
复制代码
动态代理的远程调用核心实现如下:
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 1 - 封装请求对象,用于发送到另一段
//就是要告诉另一端你要调用哪个类的哪个方法
RpcRequest rpcRequest = new RpcRequest();
rpcRequest.setClassName(method.getDeclaringClass().getName());
rpcRequest.setMethodName(method.getName());
rpcRequest.setParameterTypes(method.getParameterTypes());
rpcRequest.setParameters(args);
// 2- 获得服务serverName的服务提供者列表
List<String> providerList = RpcCacheHolder.SERVER_PROVIDERS.get(serverName);
if (providerList == null || providerList.size() < 1 ){
// 另一段压根没提供服务的机器存在,肯定无法远程调用直接降级处理
return Callbacker.Builder(rpcRequest)
.IfNotCallback(() -> {throw new RuntimeException(serverName+"服务不存在,调用失败");})
.orElseSet(new RuntimeException(serverName+"服务不存在,调用失败"));
}
// 4. 负载均衡,具体要把请求发送到哪个机器处理
LoadBalanceContext loadBalanceContext = RpcCacheHolder.APPLICATION_CONTEXT.getBean(LoadBalanceContext.class);
String serverIp = loadBalanceContext.executeLoadBalance(providerList);
System.out.println("负载均衡: 调用服务" + serverName +"的" + serverIp + " 服务器节点");
//
String[] host = serverIp.split(":");
RpcClient rpcClient = new RpcClient(host[0].trim(), Integer.parseInt(host[1].trim()),serverName);
RpcResponse rpcResponse = null;
try {
// 5 - 最后把请求消息通过Netty发送到另一端即可, 等待接受另一端的响应
rpcResponse = rpcClient.sendRequest(rpcRequest);
} catch (Exception e) {
// 如果因为某些原因发送失败,直接降级处理
return Callbacker.Builder(rpcRequest).IfNotCallback(e::printStackTrace).orElseSet(e);
}
// 最后把实际远程调用的那个方法的返回值返回即可
return rpcResponse != null ? rpcResponse.getResult() : null;
}
复制代码
public String register(String nodePath, String nodeData) {
isConnenct();
String path = null;
try {
path = zkClient.create().creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL) //设置为临时节点
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) //设置权限
.forPath(nodePath, nodeData.getBytes());
}catch (KeeperException.NodeExistsException e){
logger.error("NodeExistsException ----服务注册失败,该服务器节点 {} 已经注册,请修改",e.getPath());
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
return path;
}
复制代码
Reflections reflections = new Reflections("@BoomRpc注解所在的包");
Set<Class<?>> rpcClazz = reflections.getTypesAnnotatedWith(BoomRpc.class, true);
for (Class<?> e : rpcClazz) {
BoomRpc annotation = e.getAnnotation(BoomRpc.class);
String serverName = "".equals(annotation.name()) ? annotation.value() : annotation.name();
//把要订阅的服务serverName添加到Set集合即可
RpcCacheHolder.SUBSCRIBE_SERVICE.add(serverName);
}
复制代码
然后就可以用curator的api去订阅这些服务.代码实现如下
public static CuratorFramework zkClient = null;
private static ReentrantLock updateProviderLock = new ReentrantLock();
// 服务发现
public List<String> discover(String serverName){
isConnenct();
List<String> serverList = null;
try {
PathChildrenCache childrenCache = new PathChildrenCache(zkClient, "/" + serverName,true);
childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
// 订阅服务
addListener(childrenCache,serverName);
} catch (Exception e) {
e.printStackTrace();
}
return serverList;
}
private void addListener(PathChildrenCache childrenCache,String serverName){
childrenCache.getListenable().addListener((curatorFramework, event) -> {
// 创建子节点
if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)){
String path = event.getData().getPath();
String host = path.substring(path.lastIndexOf("/") + 1, path.length());
System.out.println("服务器上线:" + path);
try {
// 更新本地服务提供者缓存
//SERVER_PROVIDERS就是一个 Map<String, List<String>>集合
updateProviderLock.lock();
List<String> list = RpcCacheHolder.SERVER_PROVIDERS.getOrDefault(serverName,new ArrayList<>());
list.add(host);
RpcCacheHolder.SERVER_PROVIDERS.put(serverName,list);
} finally {
updateProviderLock.unlock();
}
}
// 删除子节点
else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)){
String path = event.getData().getPath();
String host = path.substring(path.lastIndexOf("/") + 1, path.length());
System.out.println("服务器下线:" + event.getData().getPath());
try {
updateProviderLock.lock();
List<String> list = RpcCacheHolder.SERVER_PROVIDERS.get(serverName);
list.remove(host);
RpcCacheHolder.SERVER_PROVIDERS.put(serverName,list);
} finally {
updateProviderLock.unlock();
}
}
});
}
复制代码
public class RpcRequest extends RpcProtocol {
private String requestId;// 请求id
private String className; //调用类名
private String methodName; //调用方法名
private Class<?>[] parameterTypes; // 方法参数类型
private Object[] parameters;//方法参数
}
复制代码
public class RpcResponse extends RpcProtocol {
private String requestId; //对应的请求id, 这个响应对象对应哪个请求
private Exception exception; //请求异常信息
private Object result; // 实际远程方法调用返回的数据
复制代码
一般都可以在配置文件中配置使用哪种负载策略,然后用Spring读取动态生成对应的负载均衡实现类然后设置到LoadBalanceContext上下文种, 最后再把LoadBalanceContext注入到Spring容器中即可. 之后通过LoadBalanceContext执行负载策略.代码实现
@Configuration
public class RpcBeanConfiguration {
private final BoomRpcProperties properties;
public RpcBeanConfiguration(BoomRpcProperties properties) {
this.properties = properties;
}
// Spring启动时执行这段代码,把LoadBalanceContext注入到容器中
@Bean
public LoadBalanceContext loadBalanceContext(){
LoadBalanceContext loadBalanceContext = new LoadBalanceContext();
if ("roundRobin".equalsIgnoreCase(properties.getLoadblacne().trim())){
loadBalanceContext.setLoadBalanceStrategy(new RoundRobinStrategy());
}else if("random".equalsIgnoreCase(properties.getLoadblacne().trim())){
loadBalanceContext.setLoadBalanceStrategy(new RandomStrategy());
}else if("hash".equalsIgnoreCase(properties.getLoadblacne().trim())){
// todo
}else {
loadBalanceContext.setLoadBalanceStrategy(new RandomStrategy());
}
return loadBalanceContext;
}
}
复制代码
一般是否需要处理回调只有一个逻辑即是否配置了callback类,没有则直接抛出异常,伪代码如下:
//假设已经拿到@BoomRpc注解的callback属性的值
if(callback != null) {
// 调用callback的类对象进行回调处理
}else {
// 直接抛出异常
}
复制代码
为了方便调用写了一个链式调用链Callbacker处理: 调用方式如下:
RpcResponse rpcResponse = null;
try {
// 这是远程服务调用返回的响应对象
rpcResponse = rpcClient.sendRequest(rpcRequest);
//根据响应对象是否包含异常信息判断是否远程调用异常
if (rpcResponse != null && rpcResponse.getException() != null){
Exception e = rpcResponse.getException();
//如果存在异常则交给容错处理类Callbacker进行处理
return Callbacker.Builder(rpcRequest).IfNotCallback(e::printStackTrace).orElseSet(e);
}
} catch (Exception e) {
// 捕获异常,容错处理
return Callbacker.Builder(rpcRequest).IfNotCallback(e::printStackTrace).orElseSet(e);
}
复制代码
具体容错处理类Callbacker实现:
public class Callbacker {
private RpcRequest rpcRequest;
private Class<?> callBackClass;
private Class<?> callback;
public static Callbacker Builder(RpcRequest rpcRequest){
return new Callbacker(rpcRequest);
}
// 创建Callbacker的同时拿到 BoomRpc注解配置的callback类
private Callbacker(RpcRequest rpcRequest) {
this.rpcRequest = rpcRequest;
try {
System.out.println(rpcRequest);
callBackClass = Class.forName(rpcRequest.getClassName());
callback = callBackClass.getAnnotation(BoomRpc.class).callback();
} catch (Exception e) {
//e.printStackTrace();
System.out.println("1");
}
}
// 如果不需要回调处理直接执行自定义的处理方法Process,一般都会直接传抛出异常
public Callbacker IfNotCallback(Process process){
if (!shouldCallback())
process.doSomething();
return this;
}
// 如果要进行回调处理, 直接把异常信息传入去处理即可
public Object orElseSet(Throwable throwable) throws Exception {
if (shouldCallback()){
// 创建callback的实例对象
Object obj = callback.newInstance();
//从callback上获得此次请求的方法的方法对象
Method method = callback.getMethod(rpcRequest.getMethodName(),rpcRequest.getParameterTypes());
// 由于callback类都需要继承Callback抽象类拿到异常信息,这里就可以动态注入异常信息
Method setThrowable = callback.getMethod("setThrowable", Throwable.class);
setThrowable.invoke(obj,throwable);
// 之后执行此次请求的方法的方法对象即可
return method.invoke(obj,rpcRequest.getParameters());
}else
return null;
}
// 判断callback是否为默认值void.class来进行回调处理
private boolean shouldCallback(){
return callback != void.class;
}
public interface Process {
void doSomething();
}
}
复制代码