转载

阿里开源分布式事务组件 seata :seata server 通信层解析

RPC ?

seata client 和 seata server 间是需要通过网络通信来传递信息的,client 发送请求消息给 server,server 根据实际的处理逻辑,可能会给 client 发送相应的响应消息,或者不响应任何消息。在 seata 中,客户端和服务端的通信实现,被抽象成来公共的模块,它的 package 位于 io.seata.core.rpc 中。

这个包名叫 rpc,这个包下的很多类名也有 rpc 相关的字眼,而实际上在我看来,这个通信框架并不是一个常规意义的 rpc 框架,如果硬要揪书本知识,那么 rpc 的解释如下:

远程过程调用(英语:Remote Procedure Call,缩写为 RPC)是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程。如果涉及的软件采用面向对象编程,那么远程过程调用亦可称作远程调用或远程方法调用。

在以 dubbo 为代表的微服务时代下,dubbo 常规意义上我们都称之为 rpc 框架,rpc 的理论原则是:程序员无需额外地为这个交互作用编程。那么对于像 dubbo 这样的 rpc 实现,它能让 client 像调用本地代码 api 一样,来调用远程 server 上的某个 method。

在 client 这一层直接面向 interface 编程,通过动态代理的方式,对上层屏蔽掉通信细节,在底层,将方法调用,通过序列化方式,封装成一个二进制数据串发送给 server,server 层解析该消息,通过反射的方式,将 interface 对应的 implemention 执行起来,将执行结果,扁平化成一个二进制数据串,回送给 client,client 收到数据后,拼装成 interface api 所定义的返回值类型的一个实例,作为方法调用的返回值。整个底层的细节,应用层面并不需要了解,应用层只需要以 interface.method 的方式,就像代码在本地执行一样,就能把远端 interface_implemention.method 给调用起来。

阿里开源分布式事务组件 seata :seata server 通信层解析

而 seata 的 rpc 框架上,实际上仅仅是一个普通的基于 netty 的网络通信框架,client 与 server 之间通过发送 request 和 response 来达到相互通信的目的,在 seata 中的每个 request 和 response 类,都实现了如何把自己序列化的逻辑。

各种消息类型,都实现了 io.seata.core.protocol.MessageCodec 接口

public interface MessageCodec {
    /**
     * Gets type code.
     *
     * @return the type code
     */
    short getTypeCode();

    /**
     * Encode byte [ ].
     *
     * @return the byte [ ]
     */
    byte[] encode();

    /**
     * Decode boolean.
     *
     * @param in the in
     * @return the boolean
     */
    boolean decode(ByteBuf in);
}

阿里开源分布式事务组件 seata :seata server 通信层解析

io.seata.core.protocol.GlobalBeginRequest 为例,它都 decode 和 encode 实现如下所示:

@Override
public byte[] encode() {
    ByteBuffer byteBuffer = ByteBuffer.allocate(256);
    byteBuffer.putInt(timeout);

    if (this.transactionName != null) {
        byte[] bs = transactionName.getBytes(UTF8);
        byteBuffer.putShort((short)bs.length);
        if (bs.length > 0) {
            byteBuffer.put(bs);
        }
    } else {
        byteBuffer.putShort((short)0);
    }

    byteBuffer.flip();
    byte[] content = new byte[byteBuffer.limit()];
    byteBuffer.get(content);
    return content;
}

@Override
public void decode(ByteBuffer byteBuffer) {
    this.timeout = byteBuffer.getInt();

    short len = byteBuffer.getShort();
    if (len > 0) {
        byte[] bs = new byte[len];
        byteBuffer.get(bs);
        this.setTransactionName(new String(bs, UTF8));
    }
}

这意味着,发送方先对 message 做 encode 动作形成字节数组,将字节数组发往接收方,接收方收到字节数组后,对字节数组先判断 message type,再用对应的 message 类型对字节数组做 decode 动作。

类的组织形式

从 seata server 的入口类 io.seata.server.Server 分析,main 方法如下所示:

/**
 * The entry point of application.
 *
 * @param args the input arguments
 * @throws IOException the io exception
 */
public static void main(String[] args) throws IOException {
    RpcServer rpcServer = new RpcServer(WORKING_THREADS);

    int port = SERVER_DEFAULT_PORT;
    //server port
    if (args.length > 0) {
        try {
            port = Integer.parseInt(args[0]);
        } catch (NumberFormatException e) {
            System.err.println("Usage: sh services-server.sh $LISTEN_PORT $PATH_FOR_PERSISTENT_DATA");
            System.exit(0);
        }
    }
    rpcServer.setListenPort(port);

    //log store mode : file、db
    String storeMode = null;
    if (args.length > 1) {
        storeMode = args[1];
    }

    UUIDGenerator.init(1);
    SessionHolder.init(storeMode);

    DefaultCoordinator coordinator = new DefaultCoordinator(rpcServer);
    coordinator.init();
    rpcServer.setHandler(coordinator);
    // register ShutdownHook
    ShutdownHook.getInstance().addDisposable(coordinator);

    if (args.length > 2) {
        XID.setIpAddress(args[2]);
    } else {
        XID.setIpAddress(NetUtil.getLocalIp());
    }
    XID.setPort(rpcServer.getListenPort());

    rpcServer.init();

    System.exit(0);
}

可以看到 seata server 使用一个 RpcServer 类来启动它的服务监听端口,这个端口用来接收 seata client 的消息,RpcServer 这个类是通信层的实现分析的入口。

在这里,SessionHolder 用来做全局事务树的管理,DefaultCoordinator 用来处理事务执行逻辑,而 RpcServer 是这两者可以正常运行的基础,这篇文章的重点在于剖析 RpcServer 的实现,进而延伸到 seata 整个通信框架的细节。

如果先从 RpcServer 的类继承图看的话,那么我们能发现一些与常规思维不太一样的地方,类继承图如下:

阿里开源分布式事务组件 seata :seata server 通信层解析

褐色部分是 netty 的类,灰色部分是 seata 的类。

在一般常规的思维中,依赖 netty 做一个 server,大致的思路是:

  1. 定义一个 xxx server 类
  2. 在这个类中设置初始化 netty bootstrap,eventloop,以及设置相应的 ChannelHandler

在这种思维下,很容易想到,server 与 ChannelHandler 之间的关系应该是一个“组合”的关系,即在我们构建 server 的过程中,应该把 ChannelHandler 当作参数传递给 server,成为 server 类的成员变量。

没错,这是我们一般情况下的思维。不过 seata 在这方面却不那么“常规”,从上面的类继承图中可以看到,从 RpcServer 这个类开始向上追溯,发现它其实是 ChannelDuplexHandler 的一个子类或者实例。这种逻辑让人一时很困惑,一个问题在我脑海里浮现:“当我启动一个 RpcServer 的时候,我是真的在启动一个 server 吗?看起来我好像在启动一个 ChannelHandler,可是 ChannelHandler 怎么谈得上‘启动’呢?”

异步转同步的 Future 机制

首先分析 AbstractRpcRemoting 这个类,它直接继承自 ChannelDuplexHandler 类,而 ChannelDuplexHandler 是 netty 中 inbound handler 和 outbound handler 的结合体。

AbstractRpcRemoting 的 init 方法里,仅仅通过 Java 中的定时任务执行线程池启动了一个定时执行的任务:

/**
 * Init.
 */
public void init() {
    timerExecutor.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            List<MessageFuture> timeoutMessageFutures = new ArrayList<MessageFuture>(futures.size());

            for (MessageFuture future : futures.values()) {
                if (future.isTimeout()) {
                    timeoutMessageFutures.add(future);
                }
            }

            for (MessageFuture messageFuture : timeoutMessageFutures) {
                futures.remove(messageFuture.getRequestMessage().getId());
                messageFuture.setResultMessage(null);
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("timeout clear future : " + messageFuture.getRequestMessage().getBody());
                }
            }
            nowMills = System.currentTimeMillis();
        }
    }, TIMEOUT_CHECK_INTERNAL, TIMEOUT_CHECK_INTERNAL, TimeUnit.MILLISECONDS);
}

这个定时任务的逻辑也比较简单:扫描 ConcurrentHashMap<Long, MessageFuture> futures 这个成员变量里的 MessageFuture,如果这个 Future 超时了,就将 Future 的结果设置为 null。逻辑虽然简单,但这个功能涉及到了异步通信里一个很常见的功能,即 异步转同步 的功能。

在 netty 这种基于 NIO 的通信方式中,数据的发送,接收,全部是非阻塞的,因此判断一个动作完成与否,并不能像传统的 Java 同步代码一样,代码执行完了就认为相应的动作也真正完成了,例如,在 netty 中,如果通过 channel.write(); 方法往对端发送一个数据,这个方法执行完了,并不代表数据发送出去了,channel.write() 方法会返回一个 future,应用代码应该利用这个 future ,通过这个 future 可以知道数据到底发送出去了没有,也可以为这个 future 添加动作完成后的回调逻辑,也可以阻塞等待这个 future 所关联的动作执行完毕。

在 seata 中,存在着 发送一个请求,并等待相应 这样的使用场景,上层的 api 可能是这么定义的:

public Response request(Request request) {}

而基于 nio 的底层数据发送逻辑却是这样的:

1. send request message
2. 为业务的请求构建一个业务层面的 future 实例
3. 阻塞等待在这个 future 上
4. 当收到对应的 response message 后,唤醒上面的 future,阻塞等待在这个 future 上的线程继续执行
5. 拿到结果,request 方法结束

AbstractRpcRemoting 定义了几个数据发送相关的方法,分别是:

/**
 * Send async request with response object.
 *
 * @param address the address
 * @param channel the channel
 * @param msg     the msg
 * @return the object
 * @throws TimeoutException the timeout exception
 */
protected Object sendAsyncRequestWithResponse(String address, Channel channel, Object msg) throws TimeoutException;

/**
 * Send async request with response object.
 *
 * @param address the address
 * @param channel the channel
 * @param msg     the msg
 * @param timeout the timeout
 * @return the object
 * @throws TimeoutException the timeout exception
 */
protected Object sendAsyncRequestWithResponse(String address, Channel channel, Object msg, long timeout) throws
    TimeoutException;

/**
 * Send async request without response object.
 *
 * @param address the address
 * @param channel the channel
 * @param msg     the msg
 * @return the object
 * @throws TimeoutException the timeout exception
 */
protected Object sendAsyncRequestWithoutResponse(String address, Channel channel, Object msg) throws
    TimeoutException;

这几个方法就符合上面说到的 发送一个请求,并等待相应 这样的使用场景,上面这三个方法,其实都委托给了 sendAsyncRequest 来实现,这个方法的代码是这样子的:

private Object sendAsyncRequest(String address, Channel channel, Object msg, long timeout)
    throws TimeoutException {
    if (channel == null) {
        LOGGER.warn("sendAsyncRequestWithResponse nothing, caused by null channel.");
        return null;
    }
    final RpcMessage rpcMessage = new RpcMessage();
    rpcMessage.setId(RpcMessage.getNextMessageId());
    rpcMessage.setAsync(false);
    rpcMessage.setHeartbeat(false);
    rpcMessage.setRequest(true);
    rpcMessage.setBody(msg);

    final MessageFuture messageFuture = new MessageFuture();
    messageFuture.setRequestMessage(rpcMessage);
    messageFuture.setTimeout(timeout);
    futures.put(rpcMessage.getId(), messageFuture);

    if (address != null) {
        ConcurrentHashMap<String, BlockingQueue<RpcMessage>> map = basketMap;
        BlockingQueue<RpcMessage> basket = map.get(address);
        if (basket == null) {
            map.putIfAbsent(address, new LinkedBlockingQueue<RpcMessage>());
            basket = map.get(address);
        }
        basket.offer(rpcMessage);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("offer message: " + rpcMessage.getBody());
        }
        if (!isSending) {
            synchronized (mergeLock) {
                mergeLock.notifyAll();
            }
        }
    } else {
        ChannelFuture future;
        channelWriteableCheck(channel, msg);
        future = channel.writeAndFlush(rpcMessage);
        future.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                if (!future.isSuccess()) {
                    MessageFuture messageFuture = futures.remove(rpcMessage.getId());
                    if (messageFuture != null) {
                        messageFuture.setResultMessage(future.cause());
                    }
                    destroyChannel(future.channel());
                }
            }
        });
    }
    if (timeout > 0) {
        try {
            return messageFuture.get(timeout, TimeUnit.MILLISECONDS);
        } catch (Exception exx) {
            LOGGER.error("wait response error:" + exx.getMessage() + ",ip:" + address + ",request:" + msg);
            if (exx instanceof TimeoutException) {
                throw (TimeoutException)exx;
            } else {
                throw new RuntimeException(exx);
            }
        }
    } else {
        return null;
    }
}

先抛开方法的其它细节,比如说同步写还是异步写,以及发送频率控制。我们可以发现,这个方法其实从大角度来划分,就是如下的步骤:

  1. 构造请求 message
  2. 为这个请求 message 构造一个 message future
  3. 发送数据
  4. 阻塞等待在 message future

不过 AbstractRpcRemoting 也定义了方法用于 仅发送消息,不接收响应 的使用场景,如下所示:

/**
 * Send request.
 *
 * @param channel the channel
 * @param msg     the msg
 */
protected void sendRequest(Channel channel, Object msg) {
    RpcMessage rpcMessage = new RpcMessage();
    rpcMessage.setAsync(true);
    rpcMessage.setHeartbeat(msg instanceof HeartbeatMessage);
    rpcMessage.setRequest(true);
    rpcMessage.setBody(msg);
    rpcMessage.setId(RpcMessage.getNextMessageId());
    if (msg instanceof MergeMessage) {
        mergeMsgMap.put(rpcMessage.getId(), (MergeMessage)msg);
    }
    channelWriteableCheck(channel, msg);
    if (LOGGER.isDebugEnabled()) {
        LOGGER.debug("write message:" + rpcMessage.getBody() + ", channel:" + channel + ",active?"
            + channel.isActive() + ",writable?" + channel.isWritable() + ",isopen?" + channel.isOpen());
    }
    channel.writeAndFlush(rpcMessage);
}

/**
 * Send response.
 *
 * @param msgId   the msg id
 * @param channel the channel
 * @param msg     the msg
 */
protected void sendResponse(long msgId, Channel channel, Object msg) {
    RpcMessage rpcMessage = new RpcMessage();
    rpcMessage.setAsync(true);
    rpcMessage.setHeartbeat(msg instanceof HeartbeatMessage);
    rpcMessage.setRequest(false);
    rpcMessage.setBody(msg);
    rpcMessage.setId(msgId);
    channelWriteableCheck(channel, msg);
    if (LOGGER.isDebugEnabled()) {
        LOGGER.debug("send response:" + rpcMessage.getBody() + ",channel:" + channel);
    }
    channel.writeAndFlush(rpcMessage);
}

这样的场景就不需要引入 future 机制,直接调用 netty 的 api 把数据发送出去就完事了。

分析思路回到有 future 的场景,发送数据后,要在 future 上进行阻塞等待,即调用 get 方法,那 get 方法什么返回呢,我们上面说到 future 被唤醒的时候,我们先不讨论 future 的实现细节,一个 future 什么时候被唤醒呢,在这种 请求-响应 的模式下,显然是收到了响应的时候。所以我们需要查看一下 AbstractRpcRemoting 的 channelRead 方法

@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
    if (msg instanceof RpcMessage) {
        final RpcMessage rpcMessage = (RpcMessage)msg;
        if (rpcMessage.isRequest()) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));
            }
            try {
                AbstractRpcRemoting.this.messageExecutor.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            dispatch(rpcMessage.getId(), ctx, rpcMessage.getBody());
                        } catch (Throwable th) {
                            LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
                        }
                    }
                });
            } catch (RejectedExecutionException e) {
                LOGGER.error(FrameworkErrorCode.ThreadPoolFull.getErrCode(),
                    "thread pool is full, current max pool size is " + messageExecutor.getActiveCount());
                if (allowDumpStack) {
                    String name = ManagementFactory.getRuntimeMXBean().getName();
                    String pid = name.split("@")[0];
                    int idx = new Random().nextInt(100);
                    try {
                        Runtime.getRuntime().exec("jstack " + pid + " >d:/" + idx + ".log");
                    } catch (IOException exx) {
                        LOGGER.error(exx.getMessage());
                    }
                    allowDumpStack = false;
                }
            }
        } else {
            MessageFuture messageFuture = futures.remove(rpcMessage.getId());
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug(String
                    .format("%s msgId:%s, future :%s, body:%s", this, rpcMessage.getId(), messageFuture,
                        rpcMessage.getBody()));
            }
            if (messageFuture != null) {
                messageFuture.setResultMessage(rpcMessage.getBody());
            } else {
                try {
                    AbstractRpcRemoting.this.messageExecutor.execute(new Runnable() {
                        @Override
                        public void run() {
                            try {
                                dispatch(rpcMessage.getId(), ctx, rpcMessage.getBody());
                            } catch (Throwable th) {
                                LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
                            }
                        }
                    });
                } catch (RejectedExecutionException e) {
                    LOGGER.error(FrameworkErrorCode.ThreadPoolFull.getErrCode(),
                        "thread pool is full, current max pool size is " + messageExecutor.getActiveCount());
                }
            }
        }
    }
}

可以看到调用了 messageFuture 当 setResultMessage() 方法,设置 future 的结果,也就是说,唤醒了 future,那么阻塞在 future 的 get 方法上的线程就被唤醒了,得到结果,继续往下执行。

接下来我们讨论 MessageFuture 的实现细节,其实 seata 里面有很多种 future 相关的类,实现方式也不太一样,不过都大同小异,有的是基于 CompletableFuture 实现,有的是基于 CountDownLatch 实现。比如说,MessageFuture 就是基于 CompletableFuture 实现的,先看看它的成员变量:

private RpcMessage requestMessage;
private long timeout;
private long start = System.currentTimeMillis();
private transient CompletableFuture origin = new CompletableFuture();

CompletableFuture 是它的一个成员变量,它被利用来阻塞当前线程。MessageFuture 的 get 方法,依赖于 CompletableFuture 的 get 方法,来实现有一定时间限制的等待,直到另一个线程唤醒 CompletableFuture。如下所示:

/**
 * Get object.
 *
 * @param timeout the timeout
 * @param unit    the unit
 * @return the object
 * @throws TimeoutException the timeout exception
 * @throws InterruptedException the interrupted exception
 */
public Object get(long timeout, TimeUnit unit) throws TimeoutException,
    InterruptedException {
    Object result = null;
    try {
        result = origin.get(timeout, unit);
    } catch (ExecutionException e) {
        throw new ShouldNeverHappenException("Should not get results in a multi-threaded environment", e);
    } catch (TimeoutException e) {
        throw new TimeoutException("cost " + (System.currentTimeMillis() - start) + " ms");
    }

    if (result instanceof RuntimeException) {
        throw (RuntimeException)result;
    } else if (result instanceof Throwable) {
        throw new RuntimeException((Throwable)result);
    }

    return result;
}

/**
 * Sets result message.
 *
 * @param obj the obj
 */
public void setResultMessage(Object obj) {
    origin.complete(obj);
}

既然说到了 future 机制,这里也顺便把 io.seata.config.ConfigFuture 提一下,它就是上面提到的基于 CountDownLatch 实现的一种 future 机制,虽然实现方式两者不一样,但完成的功能和作用是一样的。

private final CountDownLatch latch = new CountDownLatch(1);

/**
 * Get object.
 *
 * @param timeout the timeout
 * @param unit    the unit
 * @return the object
 * @throws InterruptedException the interrupted exception
 */
public Object get(long timeout, TimeUnit unit) {
    this.timeoutMills = unit.toMillis(timeout);
    try {
        boolean success = latch.await(timeout, unit);
        if (!success) {
            LOGGER.error(
                "config operation timeout,cost:" + (System.currentTimeMillis() - start) + " ms,op:" + operation
                    .name()
                    + ",dataId:" + dataId);
            return getFailResult();
        }
    } catch (InterruptedException exx) {
        LOGGER.error("config operate interrupted,error:" + exx.getMessage());
        return getFailResult();
    }
    if (operation == ConfigOperation.GET) {
        return result == null ? content : result;
    } else {
        return result == null ? Boolean.FALSE : result;
    }
}

/**
 * Sets result.
 *
 * @param result the result
 */
public void setResult(Object result) {
    this.result = result;
    latch.countDown();
}

阻塞操作调用了 CountDownLatch 的 await 方法,而唤醒操作则调用 countDown 方法,核心在于需要把 CountDownLatch 的 latch 值设置为 1。

实际上,Java 语言本身已经提供了 java.util.concurrent.Future 这个类来提供 Future 机制,但 Java 原生的 Future 机制功能过于单一,比如说不能主动设置 future 的结果,也不能为它添加 listener,所有有许多像 seata 这样的软件,会选择去重新实现一种 future 机制来满足异步转同步的需求。也有像 netty 这样的软件,它不会借助类似于 countdownlatch 来实现,而是直接扩展 java.util.concurrent.Future,在它的基础上添加功能。

防洪机制

在 AbstractRpcRemoting 中,往外发数据的时候,它都会先进行一个检查,即检查当前的 channel 是否可写。

private void channelWriteableCheck(Channel channel, Object msg) {
    int tryTimes = 0;
    synchronized (lock) {
        while (!channel.isWritable()) {
            try {
                tryTimes++;
                if (tryTimes > NettyClientConfig.getMaxNotWriteableRetry()) {
                    destroyChannel(channel);
                    throw new FrameworkException("msg:" + ((msg == null) ? "null" : msg.toString()),
                        FrameworkErrorCode.ChannelIsNotWritable);
                }
                lock.wait(NOT_WRITEABLE_CHECK_MILLS);
            } catch (InterruptedException exx) {
                LOGGER.error(exx.getMessage());
            }
        }
    }
}

这要从 netty 的内部机制说起,当调用 ChannelHandlerContext 或者 Channel 的 write 方法时,netty 只是把要写的数据放入了自身的一个环形队列里面,再由后台线程真正往链路上发。如果接受方的处理速度慢,也就是说,接收的速度慢,那么根据 tcpip 协议的滑动窗口机制,它也会导致发送方发送得慢。

我们可以把 netty 的环形队列想像成一个水池,调用 write 方法往池子里加水,netty 通过后台线程,慢慢把池子的水流走。这就有可能出现一种情况,由于池子水流走的速度远远慢于往池子里加水的速度,这样会导致池子的总水量随着时间的推移越来越多。所以往池子里加水时应该考虑当前池子里的水量,否则最终会导致应用的内存溢出。

netty 对于水池提供了两个设置,一个是 高水位 ,一个是 低水位 ,当池子里的水高于高水位时,这个时候 channel.isWritable() 返回 false,并且直到水位慢慢降回到低水位时,这个方法才会返回 true。

阿里开源分布式事务组件 seata :seata server 通信层解析

上述的 channelWriteableCheck 方法,发现channel 不可写的时候,进入循环等待,等待的目的是让池子的水位下降到 low water mark,如果等待超过最大允许等待的时间,那么将会抛出异常并关闭连接。

消息队列

在 AbstractRpcRemoting 中,发送数据有两种方式,一种是直接调用 channel 往外写,另一种是先把数据放进“数据篮子”里,它实际上是一个 map, key 为远端地址,value为一个消息队列。数据放队列后,再由其它线程往外发。下面是 sendAsycRequest 方法的一部分代码,显示了这种机制:

ConcurrentHashMap<String, BlockingQueue<RpcMessage>> map = basketMap;
BlockingQueue<RpcMessage> basket = map.get(address);
if (basket == null) {
    map.putIfAbsent(address, new LinkedBlockingQueue<RpcMessage>());
    basket = map.get(address);
}
basket.offer(rpcMessage);
if (LOGGER.isDebugEnabled()) {
    LOGGER.debug("offer message: " + rpcMessage.getBody());
}
if (!isSending) {
    synchronized (mergeLock) {
        mergeLock.notifyAll();
    }
}

但我们在 AbstractRpcRemoting 里面没有看有任何额外的线程在晴空这个 basketMap。回顾一下上面的 RpcServer 的类继承体系,接下来我们要分析一下,AbstractRpcRemotingServer 这个类。

阿里开源分布式事务组件 seata :seata server 通信层解析

AbstractRpcRemotingServer 这个类主要定义了于netty 启动一个 server bootstrap 相关的类,可见真正启动服务监听端口的是在这个类,先看一下它的start方法

@Override
public void start() {
    this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupWorker)
        .channel(nettyServerConfig.SERVER_CHANNEL_CLAZZ)
        .option(ChannelOption.SO_BACKLOG, nettyServerConfig.getSoBackLogSize())
        .option(ChannelOption.SO_REUSEADDR, true)
        .childOption(ChannelOption.SO_KEEPALIVE, true)
        .childOption(ChannelOption.TCP_NODELAY, true)
        .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSendBufSize())
        .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketResvBufSize())
        .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
            new WriteBufferWaterMark(nettyServerConfig.getWriteBufferLowWaterMark(),
                nettyServerConfig.getWriteBufferHighWaterMark()))
        .localAddress(new InetSocketAddress(listenPort))
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) {
                ch.pipeline().addLast(new IdleStateHandler(nettyServerConfig.getChannelMaxReadIdleSeconds(), 0, 0))
                    .addLast(new MessageCodecHandler());
                if (null != channelHandlers) {
                    addChannelPipelineLast(ch, channelHandlers);
                }

            }
        });

    if (nettyServerConfig.isEnableServerPooledByteBufAllocator()) {
        this.serverBootstrap.childOption(ChannelOption.ALLOCATOR, NettyServerConfig.DIRECT_BYTE_BUF_ALLOCATOR);
    }

    try {
        ChannelFuture future = this.serverBootstrap.bind(listenPort).sync();
        LOGGER.info("Server started ... ");
        RegistryFactory.getInstance().register(new InetSocketAddress(XID.getIpAddress(), XID.getPort()));
        initialized.set(true);
        future.channel().closeFuture().sync();
    } catch (Exception exx) {
        throw new RuntimeException(exx);
    }
}

这个类很常规,就是遵循 netty 的使用规范,用合适的配置启动一个 server,并调用注册中心 api 把自己作为一个服务发布出去。

我们可以看到,配置中确实也出现了我们上文中提到过的上下水位的配置。

另外,channelpipeline 中,除了添加一个保持链路有效性探测的 IdleStateHandler,和一个 MessageCodec,处理事务逻辑相关的 Handler 还需要由参数传入。

接下来我们看 RpcServer 这个类,从它的 init 方法里,我们可以看到,它把自己做为一个 ChannelHandler,加入到了 channel pipeline 中

/**
 * Init.
 */
@Override
public void init() {
    super.init();
    setChannelHandlers(RpcServer.this);
    DefaultServerMessageListenerImpl defaultServerMessageListenerImpl = new DefaultServerMessageListenerImpl(
        transactionMessageHandler);
    defaultServerMessageListenerImpl.init();
    defaultServerMessageListenerImpl.setServerMessageSender(this);
    this.setServerMessageListener(defaultServerMessageListenerImpl);
    super.start();

}

RpcServer 自身也实现了 channelRead 方法,但它只处理心跳相关的信息和注册相关的信息,其它的业务消息,它交给父类处理,而先前我们也已经看到,父类的channelRead

方法里,反过来会调用 dispatch 这个抽象方法去做消息的分发,而 RpcServer 类实现了这个抽象方法,在接收到不同的消息类型是,采取不同的处理流程。

关于事务的处理流程的细节,本篇文章暂不涉及,后续文章再慢慢分析。

行文至此,回想我们先前提到的一个疑惑:

“当我启动一个 RpcServer 的时候,我是真的在启动一个 server 吗?看起来我好像在启动一个 ChannelHandler,可是 ChannelHandler 怎么谈得上‘启动’呢?”

是的,我们既在启动一个 server,这个 server 也实现了事务处理逻辑,它同时也是个 ChannelHandler。

没有一定的事实标准去衡量这样写的代码是好是坏,我们也没必要去争论 Effective Java 提到的什么时候该用组合,什么时候该用继承。

本文到此结束。

阿里开源分布式事务组件 seata :seata server 通信层解析

原文  https://segmentfault.com/a/1190000019249870
正文到此结束
Loading...