public class NIO_Demo2 {
public static void main(String[] args) throws IOException, InterruptedException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress("0.0.0.0", 3333), 1000);
serverSocketChannel.configureBlocking(false);
final Selector[] selectors = new Selector[Runtime.getRuntime().availableProcessors()];
for (int i = 0; i < selectors.length; i++) {
final Selector selector = Selector.open();
selectors[i] = selector;
new Thread(new ClientProcessor(selector)).start();
}
AtomicInteger id = new AtomicInteger();
Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
iterator.next();
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selectors[id.getAndIncrement() % selectors.length], SelectionKey.OP_READ);
iterator.remove();
}
}
}
/**
* 客户端消息处理器
*/
static class ClientProcessor implements Runnable {
private Selector selector;
public ClientProcessor(Selector selector) {
this.selector = selector;
}
@Override
public void run() {
while (true) {
try {
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
for (SelectionKey key : selectionKeys) {
if (!key.isValid()) {
continue;
}
if (key.isReadable()) {
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer readBuff = (ByteBuffer) key.attachment();
int read = socketChannel.read(readBuff);
if (read == -1) {
//通道连接关闭,可以取消这个注册键,后续不在触发。
key.cancel();
socketChannel.close();
} else {
//翻转buffer,从写入状态切换到读取状态
readBuff.flip();
int position = readBuff.position();
int limit = readBuff.limit();
List<ByteBuffer> buffers = new ArrayList<>();
// 按照协议从流中分割出消息
/**从readBuffer确认每一个字节,发现分割符则切分出一个消息**/
for (int i = position; i < limit; i++) {
//读取到消息结束符
if (readBuff.get() == '/r') {
ByteBuffer message = ByteBuffer.allocate(i - readBuff.position());
readBuff.limit(i);
message.put(readBuff);
readBuff.limit(limit);
message.flip();
buffers.add(message);
}
}
/**从readBuffer确认每一个字节,发现分割符则切分出一个消息**/
/**将所有得到的消息发送出去**/
for (ByteBuffer buffer : buffers) {
while (buffer.hasRemaining()) {
socketChannel.write(buffer);
}
}
/**将所有得到的消息发送出去**/
// 压缩readBuffer,压缩完毕后进入写入状态。并且由于长度是256,压缩之后必然有足够的空间可以写入一条消息
readBuff.compact();
}
}
}
selectionKeys.clear();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
socketChannel.register(selectors[id.getAndIncrement() % selectors.length], SelectionKey.OP_READ);
必须设置通道为 非阻塞,才能向 Selector 注册。
在发生错误的语句前添加:
socketChannel.configureBlocking(false);
注意参数值,false 为 非阻塞,true 为 阻塞。
点击量:0