Netty
是JBOSS提供的一款Java的开源工具,是基于 NIO
的客户端/服务端的编码框架,同时 Netty
也具有高性能,高扩展,异步事件驱动等特性受到各类应用的深切拥戴
基于 Netty
,可以快速开发网络服务器和客户端的应用程序
Netty
应用广泛,已经有成百上千的分布式中间件,各种开源项目以及各类商业项目的应用。如: Kafka
、 ElasticSearch
、 Dubbo
等都在使用 Netty
,这广泛的使用率对于它的巨大优点是密不可分的,大致总结如下:
这里的 异步事件驱动 其实可以分为:
表示为非阻塞,标准IO操作都是阻塞模式的
Future-Listener机制,方便主动获取获通过通知机制获得IO操作结果
学习 Netty
前,我们需要做一些准备工作,准备工作做得好,学习 Netty
不烦恼
在 JDK1.4之前
,Java的IO操作都是阻塞的,为了弥补不足,引入了新异步IO库: javaNewIO
,简称 NIO
Reactor是高性能网络编程在设计和架构层面的基础模式,了解了 Reactor
反应器模式,才能轻松的学习和掌握 Netty
,而且 Netty
的整体架构就是 Reactor
反应器模式
掌握以上两大知识点,我们再开始我们的 Netty
学习旅程,那么接下来我们先从 javaNIO
开始说起
javaNIO是一个基于缓冲区的,基于通道的I/O操作方法,不同于 标准I/O 操作方法, 标准I/O操作 的读写都是阻塞模式,而 NIO 的读写为非阻塞模式,且 javaNIO 的效率远高于 标准I/O
那么, NIO 是如何做到非阻塞的呢,我们先看下它的IO模型
在Java中,socket连接模式默认是阻塞模式,但是在Linux下,可以通过设置将socket变成非阻塞模式,使用非阻塞模式的IO读写,叫做 同步非阻塞IO ,出现以下两种情况:
这种情况下,如果为了读取到最终的数据,用户线程需要不断轮询,直到出现存在数据的情况,这种方式的缺点很明显:
所以为了避免同步非阻塞IO中轮询等待的问题,引出了 IO多路复用模型
在 IO多路复用模型 中,引入了一个新的系统调用: 查询IO的就绪状态 ,通过该系统调用,一个进程可以监视多个文件描述符,一旦某个文件描述符就绪,内核能够将就绪的状态返回给应用程序,随后应用程序通过就绪的状态,进行相应的IO系统操作
下面进入到真正的 NIO
的学习中,
NIO
是由以下三个核心组件组成
缓冲区,应用程序和 Channel
的主要交互操作区域
通道,类似于输入输出流的合体
选择器,负责IO事件的查询器,查询 Channel
的IO事件是否就绪, 和通道属于监控和被监控的关系
下来我们先来看了解 Buffer
缓冲区,本质是一块内存块,既可以写入数据,也可以从中读取数据
表示Buffer内部容量的大小,如果写入的数据量超过 capacity
,那么将不再写入并且会抛出异常: java.nio.BufferOverflowException
表示当前的位置, position
和缓冲区读写模式有关,
在写入模式下:
在读模式下:
关于读写模式如何切换,下面讲,这里涉及到position和limit的变化
表示读写的最大上限,在刚进入到写模式时,读写的最大上限=capacity容器大小
在进入读模式下, limit=写模式下的position
Buffer类是一个非线程安全类
Buffer类是一个抽象类,位于 java.nio
中,其子类对应Java中的主要数据类型,内部是由对应子类类型的数组构成,下面看验证过程:
DoubleBuffer buffer = DoubleBuffer.allocate(100); //建立一个内部容量大小为100的Buffer 复制代码
跟踪其源码:
public static DoubleBuffer allocate(intcapacity){
if(capacity<0)
throw new IllegalArgumentException();
return new HeapDoubleBuffer(capacity,capacity);
}
//-------------HeapDoubleBuffer----------------
HeapDoubleBuffer(intcap,intlim){
super(-1,0,lim,cap,newdouble[cap],0);
}
//---------------DoubleBuffer---------------
DoubleBuffer(int mark,int pos,int lim,int cap,
double[] hb,int offset)
{
super(mark,pos,lim,cap);
this.hb=hb;
this.offset=offset;
}
复制代码
专门用来内存映射的类型
所有Buffer的创建过程都是一样的,不再一一举例,下面说几个重要概念
我猜猜,肯定有很多人会想到 StringBuffer
,O(∩_∩)O哈哈~,不一样的
创建Buffer对象,并分配内存空间,并且默认情况下,该Buffer处于 写模式 下,不信我们来看结果:
这里我就采用 ByteBuffer
ByteBuffer buffer=ByteBuffer.allocate(100);
private static void show(ByteBuffer buffer){
System.out.print("position:"+buffer.position());
System.out.print("/t");
System.out.print("capacity:"+buffer.capacity());
System.out.print("/t");
System.out.println("limit:"+buffer.limit());
System.out.println("-----------------");
}
//position:0 capacity:100 limit:100
复制代码
position为0,limit和初识容量大小相等,说明是写入模式
将数据写入到Buffer中
buffer.put("helloworld".getBytes());
//继续调用show方法,查看position的变化
//position:11 capacity:100 limit:100
复制代码
position变成11,其余不变
翻转,将写模式转变成读模式
buffer.flip(); //继续调用show方法,查看position的变化 //position:0 capacity:100 limit:11 复制代码
对于翻转前和翻转后,limit变成翻转前的position值,position重置为0,当position>=limit时,就没有数据可以读取
那么,如何再转为写模式呢?
这两个方法都可以将读模式转变成写模式,
clear:清空
compact:压缩
将模式转成读模式后,可以开始从缓冲区读取数据,每读一个数据,position+1
buffer.get(); 复制代码
如果需要读取到整个数组,调用
buffer.array() 复制代码
倒带,就是如果已经读完的数据,需要再读一次,就可以调用rewind()方法
allocate put flip get clear/compact
Buffer的重点操作在于对position和limit的变化,大家可以多看看对应方法的源码
上面说到, JavaNIO
是一个基于缓冲区的,基于通道的I/O操作方法,在 NIO
中,可以将连接想象成通道,一个连接就是一个通道
作为 NIO
的核心组件之一,根据不同的传输协议有不同类型的通道实现
本质上 NIO
的I/O操作方法就是在操作 Buffer
下面我们一个个来学习
文件通道,文件通道是一个专门用来操作文件的通道,既可以从文件读取数据,也可以将数据写入到文件中,
FileChannel
是一个阻塞类型的通道,不可以设置为非阻塞模式
//得到读取通道
FileChannel fisChannel=new FileInputStream("").getChannel();
//得到输出通道
FileChannel fosChannel=new FileOutputStream("").getChannel();
//通过文件随机访问类得到通道
FileChannel AccChannel=new RandowAccessFile("","rw").getChannel();
复制代码
不同类型的流得到不同意义上的通道,
本质上 NIO
的I/O操作方法就是在操作 Buffer
,一定要注意这句话,意思是:读取数据,就是将数据写入到 Buffer
中,故而这里的 Buffer
模式是写模式
//创建一个ByteBuffer,容量大小为1024 ByteBuffer buffer=ByteBuffer.allocate(1024); //因为刚创建出来的Buffer的模式就是写模式,所以我们不需要进行转换 //调用读取通道的read()读取,返回读取到的数据量 intlen=fisChannel.read(buffer); 复制代码
读取到数据后,我们想将数据写入到指定的文件中,我们可以这样做:
//切换buffer的模式:读模式 buffer.flip(); //写入到指定的文件中,返回写入成功的 int len=fosChannel.write(buffer); //切换buffer的模式:写模式 buffer.clear(); 复制代码
这里为什么需要切换 Buffer
的模式?
输出通道如果想将数据写入到文件中的流程:
Buffer
所以就需要将模式切换,同理,调用 clear()
也是一样的道理。
在将缓冲区写入通道时,是由操作系统来完成的,处于性能问题,不可能每次都实时写入,所以为了保证数据最终都真正的写入磁盘,所以需要调用通道的强制刷新来完成
fosChannel.force(true); 复制代码
和使用流方式是一样的,通道也需要关闭
fisChannel.close(); fosChannel.close(); 复制代码
下面我们来使用 FileChannel
来做一个完整的案例:
public class CopyFileByFileChannel {
static ByteBuffer buffer = ByteBuffer.allocate(1024);
public static void main(String[] args) {
copy_file();
}
private static void copy_file() {
FileChannel fisChannel = null;
FileChannel fosChannel = null;
FileInputStream fis = null;
FileOutputStream fos = null;
try {
fis = new FileInputStream("D://work//web//study-netty//src//main//java//top//zopx//study//nio//CopyFileByFileChannel.java");
fos = new FileOutputStream("D://work//web//study-netty//src//main//java//top//zopx//study//nio//CopyFileByFileChannel.txt");
fisChannel = fis.getChannel();
fosChannel = fos.getChannel();
while (fisChannel.read(buffer) != -1) {
buffer.flip();
// int outLen = 0;
// while ((outLen = fosChannel.write(buffer)) != 0) {
// System.out.println("outLen:"+ outLen);
// }
fosChannel.write(buffer);
buffer.clear();
}
fosChannel.force(true);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != fis) {
fis.close();
}
if (null != fos) {
fos.close();
}
if (null != fisChannel) {
fisChannel.close();
}
if (null != fosChannel) {
fosChannel.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
复制代码
事实上,针对文件复制的部分, NIO
也为我们提供了非常友好的一个方法,这里给出关键代码
long size = fisChannel.size();
long pos = 0;
long count = 0;
while (pos < size) {
count = size - pos > 1024 ? 1024 : size - pos;
pos += fosChannel.transferFrom(fisChannel, pos, count);
}
复制代码
避免了我们在创建 Buffer
后的模式切换问题
套接字通道,基于TCP面向连接的,用于客户端网络的通道,负责网络连接数据传输, SocketChannel
分为阻塞和非阻塞模式,可以通过以下配置来设置
socketChannel.configureBlocking(false); 复制代码
阻塞模式下的执行方式和效率和标准IO下的 Socket
是一样的,所以不设置为阻塞模式
那么,接下来我们来看看如何得到 SocketChannel
的实例
socketChannel=SocketChannel.open(); //非阻塞模式 socketChannel.configureBlocking(false); 复制代码
public static final String HOST="127.0.0.1";
public static final int PORT=36589;
socketChannel.connect(new InetSocketAddress(HOST,PORT));
while(!socketChannel.finishConnect()){}
复制代码
连接到服务端很简单,通过 connect()
方法就可以,但是在非阻塞模式下,客户端连接到服务端,会立即返回连接结果,不管连接是否成功,所以需要通过 自旋
的方式,判断 socketChannel
是否真正的连接到了服务端
连接到服务端后,就很简单了, 操作数据
的过程其实就是在操作 Buffer
的过程,这里就不再累述,随后通过完整的例子来操作
在关闭 SocketChannel
前,建议先给服务端发送一个结束标志,然后再关闭
socketChannel.shutdownOutput(); socketChannel.close(); 复制代码
服务端通道,面向连接,用于服务端网络的通道,负责连接监听,和 SocketChannel
一样,分为阻塞和非阻塞模式,配置方式都是一样的
server.configureBlocking(false); 复制代码
server=ServerSocketChannel.open(); server.configureBlocking(false); 复制代码
server.bind(new InetSocketAddress(36589)); 复制代码
server.close(); 复制代码
其他的操作数据等过程和 SocketChannel
是一样的,而且想真正的实现一个 ServerSocketChannel
的完整小demo,还需要和 Selector
配合使用
是基于 UDP 无连接的传输协议的数据报通道,分为阻塞和非阻塞模式,配置方式
open.configureBlocking(false); 复制代码
open=DatagramChannel.open(); open.configureBlocking(false); 复制代码
不多说了,标准写法
open.bind(new InetSocketAddress(52485)); 复制代码
这里的读取数据和之前不同,不再是通过 read()
方法来读取:
open.receive(buffer); 复制代码
发送数据也不再使用 write()
方式,而是:
open.send(buffer,newInetSocketAddress()) 复制代码
第二个参数:你想要发送给的客户端
以一个小例子让大家理解下 DatagramChannel
的使用方法
public class DatagramOpenChannel {
public static void main(String[] args) {
int port = getPort();
datagram_open_channel(port);
}
private static int getPort() {
System.out.println("请输入你要绑定的端口号:");
Scanner scanner = new Scanner(System.in);
return scanner.nextInt();
}
private static void datagram_open_channel(int port) {
DatagramChannel open = null;
try {
open = DatagramChannel.open();
open.configureBlocking(false);
open.bind(new InetSocketAddress(port));
read(open);
send(open);
} catch (IOException e) {
e.printStackTrace();
} finally {
if (null != open) {
try {
open.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
private static void send(DatagramChannel open) throws IOException {
System.out.println("输入的内容格式:port@msg");
Scanner scanner = new Scanner(System.in);
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (scanner.hasNext()) {
String next = scanner.next();
if (next.contains("@")) {
String[] split = next.split("@");
int port = Integer.parseInt(split[0]);
String msg = split[1];
buffer.put(msg.getBytes());
buffer.flip();
open.send(buffer, new InetSocketAddress("127.0.0.1", port));
buffer.clear();
}
}
}
private static void read(DatagramChannel open) throws IOException {
new Thread(() -> {
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (true) {
try {
SocketAddress receive = open.receive(buffer);
if (null != receive) {
buffer.flip();
System.out.println(new String(buffer.array(), 0, buffer.limit()));
buffer.clear();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
}
}
复制代码
测试方式:同时开启两个客户端,按照指定格式发送数据
只是一个小案例,很多地方没有做判断,大家可以在此基础上完善
选择器:是 NIO
组件中的重要角色,那么什么是选择器?
前面我们说到, NIO
的模式是 IO多路复用模型
,而选择器(Selector)就是为了完成IO的多路复用,选择器在其中就是起到查询IO的就绪状态的作用,通过选择器可以同时监控多个通道的IO状态,
需要注意的是,选择器只适用于非阻塞通道的情况下,所以 FileChannel
是不适用的
通道的某个IO操作的一种就绪状态,表示通道具备完成某个IO操作的条件,也正符合了 IO多路复用模型 的条件
有数据可读的通道,处于 读就绪 状态
一个等待写入数据的通道,处于 写就绪 状态
某个通道,完成了和对端的握手连接,处于 连接就绪 状态
某个服务端通道,监听到一个新连接的到来,处于 接收就绪 状态
selector=Selector.open(); 复制代码
//注册选择器并绑定接收就绪状态 serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT); 复制代码
我们需要注意,
注册选择器的通道必须是非阻塞模式
不是所有的通道都支持四种IO事件,比如:
while (selector.select() > 0) {
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
System.out.println("存在新链接进来");
} else if (key.isConnectable()) {
System.out.println("连接就绪");
} else if (key.isReadable()) {
System.out.println("可读");
} else if (key.isWritable()) {
System.out.println("可写");
}
}
}
复制代码
重点:
我们改造下那个例子:
Selector selector=Selector.open(); //DatagramChannel是无连接的,所以我直接绑定的读就绪 open.register(selector,SelectionKey.OP_READ); 复制代码
read()
方法 private static void read(Selector selector) throws IOException {
new Thread(() -> {
ByteBuffer buffer = ByteBuffer.allocate(1024);
try {
while (selector.select() > 0) {
Set<SelectionKey> keys =
selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isReadable()) {
DatagramChannel open = (DatagramChannel) key.channel();
SocketAddress receive = open.receive(buffer);
if (null != receive) {
buffer.flip();
System.out.println(new String(buffer.array(), 0, buffer.limit()));
buffer.clear();
}
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
复制代码
上面讲解 SocketChannel
和 ServerSocketChannel
时,没有小栗子展示,接下来我们重点来对这两者进行介绍
该栗子比较复杂,请好好消化
首先,我们先来讲解下需求:
客户端选择文件上传到服务端,保存到服务端指定的文件夹下
class NioSocket {
static final String HOST = "127.0.0.1";
static final int PORT = 23356;
static final int BUFFER_CAPACITY = 1024;
static final Charset CHARSET = StandardCharsets.UTF_8;
}
// 为了简单
class ReceiverFile {
public String fileName;
public long length;
public FileChannel outChannel;
}
复制代码
class SocketDemo {
private static String UPLOAD_FILE = "";
public static void main(String[] args) {
send_file();
}
private static void send_file() {
changeUploadFile();
File file = new File(UPLOAD_FILE);
if (!file.exists()) {
System.out.println("文件不存在");
return;
}
try {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress(
NioSocket.HOST,
NioSocket.PORT
));
while (!socketChannel.finishConnect()) {
// 异步模式, 自旋验证是否已经成功连接到服务器端
// 这里也可以做其他事情
}
System.out.println("成功连接到服务器端");
ByteBuffer buffer = ByteBuffer.allocate(NioSocket.BUFFER_CAPACITY);
ByteBuffer encode = NioSocket.CHARSET.encode(file.getName());
// 发送文件名称长度
// 这里如果直接使用 encode.capacity() 的话, 会多两个字节的长度
buffer.putInt(file.getName().trim().length());
// buffer.flip();
// socketChannel.write(buffer);
// buffer.clear();
System.out.printf("文件名称长度发送:%s /n" , encode.capacity());
// 发送文件名称
buffer.put(encode);
// socketChannel.write(encode);
System.out.printf("文件名称发送:%s /n", file.getName());
// 发送文件大小
buffer.putLong(file.length());
// buffer.flip();
// socketChannel.write(buffer);
// buffer.clear();
System.out.printf("发送文件长度:%s /n", file.length());
// 发送文件
int len = 0;
long progess = 0;
FileChannel fileChannel = new FileInputStream(file).getChannel();
while ((len = fileChannel.read(buffer)) > 0) {
buffer.flip();
socketChannel.write(buffer);
buffer.clear();
progess += len;
System.out.println("上传文件进度:" + (progess / file.length() * 100) + "%");
}
// 发送完成, 常规关闭操作
if (len == -1) {
// 发送完成, 关闭操作
fileChannel.close();
socketChannel.shutdownOutput();
socketChannel.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
private static void changeUploadFile() {
System.out.println("请输入想要上传文件的完整路径");
Scanner scanner = new Scanner(System.in);
UPLOAD_FILE = scanner.next();
}
}
复制代码
关于我注释掉的地方, 是我在测试过程中遇到的问题:
如果把信息分开发送的话, 那么服务端接收可能会出现如下问题
Exception in thread "main" java.nio.BufferUnderflowException 复制代码
class ServerSocketDemo {
private static String UPLOAD_SAVE_PATH = "D://works//111";
private static final Map<SelectableChannel, ReceiverFile> MAP = new ConcurrentHashMap<>();
public static void main(String[] args) {
receive_file();
}
private static void receive_file() {
getUploadSavePath();
// 服务器端编写
try {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
// 绑定端口
serverSocketChannel.bind(
new InetSocketAddress(
NioSocket.PORT
)
);
// 绑定选择器
Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
// 轮训
while (selector.select() > 0) {
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
// 判断事件
if (key.isAcceptable()) {
accept(key, selector);
} else if (key.isReadable()) {
processData(key);
}
}
}
selector.close();
serverSocketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
private static void processData(SelectionKey key) throws IOException {
ReceiverFile receiverFile = MAP.get(key.channel());
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(NioSocket.BUFFER_CAPACITY);
int len = 0;
while ((len = socketChannel.read(buffer)) > 0) {
buffer.flip();
if (receiverFile.fileName == null) {
// 处理文件名称
if (buffer.capacity() < 4) {
continue;
}
int fileNameLength = buffer.getInt();
byte[] fileNameArr = new byte[fileNameLength];
buffer.get(fileNameArr);
String fileName = new String(fileNameArr, NioSocket.CHARSET);
System.out.println("文件名称:" + fileName);
receiverFile.fileName = fileName;
// 处理存储文件
File dir = new File(UPLOAD_SAVE_PATH);
if (!dir.exists()) {
dir.mkdir();
}
File file = new File((UPLOAD_SAVE_PATH + File.separator + fileName).trim());
if (!file.exists()) {
file.createNewFile();
}
receiverFile.outChannel = new FileOutputStream(file).getChannel();
// 长度
if (buffer.capacity() < 8) {
continue;
}
long fileLength = buffer.getLong();
System.out.println("文件大小:" + fileLength);
receiverFile.length = fileLength;
// 文件内容
if (buffer.capacity() < 0) {
continue;
}
receiverFile.outChannel.write(buffer);
} else {
// 文件内容
receiverFile.outChannel.write(buffer);
}
buffer.clear();
}
if (len == -1) {
receiverFile.outChannel.close();
}
}
private static void accept(SelectionKey key, Selector selector) throws IOException {
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel accept = channel.accept();
accept.configureBlocking(false);
accept.register(selector, SelectionKey.OP_READ);
// 通道和File进行匹配
ReceiverFile receiverFile = new ReceiverFile();
MAP.put(accept, receiverFile);
}
private static void getUploadSavePath() {
System.out.println("请输入想要保存文件的路劲:");
Scanner scanner = new Scanner(System.in);
UPLOAD_SAVE_PATH = scanner.next();
}
}
复制代码
好, 到此NIO的知识点就完结了, 可以看到知识点虽然很多, 但其实没有很复杂, 根据上面的知识点一点一点的学习下来, 很容易就能掌握