void Reactor::handle_events(){
// 通过同步事件多路选择器提供的select()方法监听网络事件
select(handlers);
// 处理网络事件
for(h in handlers){
h.handle_event();
}
}
// 在主程序中启动事件循环
while (true) {
handle_events();
}
public class Echo {
public static void main(String[] args) {
// 事件处理器
EchoServerHandler serverHandler = new EchoServerHandler();
// boss线程组
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
// worker线程组
EventLoopGroup workerGroup = new NioEventLoopGroup(1);
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(serverHandler);
}
});
// 绑定端口号
ChannelFuture future = bootstrap.bind(9090).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 终止worker线程组
workerGroup.shutdownGracefully();
// 终止boss线程组
bossGroup.shutdownGracefully();
}
}
}
// Socket连接处理器
class EchoServerHandler extends ChannelInboundHandlerAdapter {
// 处理读事件
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.write(msg);
}
// 处理读完成事件
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
// 处理异常事件
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}