Netty手册
1. 简介
- Netty 是由 JBOSS 提供的一个 Java 开源框架,现为 Github上的独立项目。
- Netty 是一个异步的、基于事件驱动(客户端的行为、读写事件)的网络应用框架,用以快速开发高性能、高可靠性的网络 IO 程序。

Netty主要针对在TCP协议下,面向Clients端的高并发应用,或者Peer-to-Peer场景下的大量数据持续传输的应用。
Netty本质是一个NIO框架,适用于服务器通讯相关的多种应用场景
要透彻理解Netty , 需要先学习 NIO , 这样我们才能阅读 Netty 的源码。
Netty所属网络框架位置
2. 应用场景
- 互联网行业
互联网行业:在分布式系统中,各个 节点之间需要远程服务调用,高性能 的 RPC 框架必不可少,Netty 作为异步 高性能的通信框架,往往作为基础通 信组件被这些 RPC 框架使用。
典型的应用有:阿里分布式服务框架 Dubbo 的 RPC 框架使用 Dubbo 协议进 行节点间通信,Dubbo 协议默认使用 Netty 作为基础通信组件,用于实现各 进程节点之间的内部通信

- 游戏行业
- 无论是手游服务端还是大型的网络游戏, Java 语言得到了越来越广泛的应用
- Netty 作为高性能的基础通信组件,提 供了 TCP/UDP 和 HTTP 协议栈,方便定 制和开发私有协议栈,账号登录服务器
- 地图服务器之间可以方便的通过 Netty 进行高性能的通信
- 大数据领域
- 经典的 Hadoop 的高性能通信和 序列化组件 Avro(实现数据文件共享) 的 RPC 框架, 默认采用 Netty 进行跨界点通信
- 它的 Netty Service 基于 Netty 框 架二次封装实现。

- 其它开源项目使用到Netty
网址: https://netty.io/wiki/related-projects.html
3. 学习资料

4. IO模型
1.1 I/O 模型基本说明
I/O 模型简单的理解:就是用什么样的通道进行数据的发送和接收,很大程度上决定了程 序通信的性能
Java共支持3种网络编程模型/IO模式:BIO、NIO(学习重点)、AIO
Java BIO : 同步并阻塞(传统阻塞型),服务器实现模式为一个连接一个线程,即客户端 有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成 不必要的线程开销

- Java NIO : 同步非阻塞,服务器实现模式为一个线程处理多个请求(连接),即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有I/O请求就进行处理

- Java AIO(NIO.2) : 异步非阻塞,AIO 引入异步通道的概念,采用了 Proactor 模式,简化了程序编写,有效的请求才启动线程,它的特点是先由操作系统完成后才通知服务端程
1.2 BIO、NIO、AIO适用场景分析
- BIO方式适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高, 并发局限于应用中,JDK1.4以前的唯一选择,但程序简单易理解。
- NIO方式适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,弹幕 系统,服务器间通讯等。编程比较复杂,JDK1.4开始支持。
- AIO方式使用于连接数目多且连接比较长(重操作)的架构,比如相册服务器,充分 调用OS参与并发操作,编程比较复杂,JDK7开始支持。
2. BIO
2.1 基本介绍
- Java BIO 就是传统的java io 编程,其相关的类和接口在 java.io
- BIO(blocking I/O) : 同步阻塞,服务器实现模式为一个连接一个线程,即客户端有连 接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造 成不必要的线程开销,可以通过线程池机制改善(实现多个客户连接服务器)。 【后有 应用实例】
- BIO方式适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高, 并发局限于应用中,JDK1.4以前的唯一选择,程序简单易理解
2.2 工作机制
BIO编程简单流程
- 服务器端启动一个 ServerSocket
- 客户端启动 Socket 对服务器进行通信,默认情况下服务器端需要对每个客户建立一个线程与之通讯
- 客户端发出请求后, 先咨询服务器 是否有线程响应,如果没有则会等待,或者被拒绝
- 如果有响应,客户端线程会等待请 求结束后,在继续执行
2.3 应用实例
要求:
- 使用BIO模型编写一个服务器端,监听6666端口,当有客户端连接时,就启 动一个线程与之通讯。
- 要求使用线程池机制改善,可以连接多个客户端.
- 服务器端可以接收客户端发送的数据(telnet 方式即可)。
编写代码
public class BIOServer {
public static void main(String[] args) throws IOException {
// 1. 创建一个线程池
ExecutorService threadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
// 监听“6666” 端口,接收客户连接请求,并生成与客户端连接的Socket
ServerSocket serverSocket = new ServerSocket(6666);
System.out.println("服务器启动了");
while (true){
// 监听,等待客户端连接
final Socket socket = serverSocket.accept();
System.out.println("连接到一个客户端");
// 2. 如果有客户端连接,就创建一个线程,与之通信
threadPool.execute(()->{
handler(socket);
});
}
}
/**
* 和客户端通信的方法
* 循环的读取客户端的数据,然后输出
*/
public static void handler(Socket socket){
// 打印线程信息
System.out.println("线程信息:{id:"+Thread.currentThread().getId()+", " +
"name: "+Thread.currentThread().getName());
// 用于接收数据
byte[] bytes = new byte[1024];
// 通过 socket 获取输入流
try {
InputStream inputStream = socket.getInputStream();
// 循环的读取客户端发送的数据
while (true){
System.out.println("进行通信线程信息:{id:"+Thread.currentThread().getId()+", " +
"name: "+Thread.currentThread().getName());
int read = inputStream.read(bytes);
if (read != -1){
// 说明还可以读
// 输出客户端发送的数据
System.out.println(new String(bytes,0, read));
}else {
// 读取完毕
break;
}
}
} catch (IOException e) {
e.printStackTrace();
}finally {
System.out.println("关闭连接");
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
连接服务,测试
打开 CMD,连接 6666 端口 telnet 127.0.0.1 6666
碰到这个问题需要设置一下

控制板面-程序-启用或关闭....

输入 Ctrl + ],传递数据 send ko!
查看控制台

小结
- 从上面的结果可以发现,处理请求的线程 和 服务端客户端之间连接的线程是同一个
- 通过 Debug 方式运行可以发现,当连接上服务端之后,不进行任何操作,改线程只会阻塞在 int read = inputStream.read(bytes);
2.4 问题分析
每个请求都需要创建独立的线程,与对应的客户端进行数据 Read,业务处理,数据 Write 。
当并发数较大时,需要创建大量线程来处理连接,系统资源占 用较大。
连接建立后,如果当前线程暂时没有数据可读,则线程就阻塞 在 Read 操作上,造成线程资源浪费
3. NIO
3.1 基本介绍
- Java NIO 全称 java non-blocking IO,是指 JDK 提供的新 API。从 JDK1.4 开始,Java 提供了一系列改进的输入/输出 的新特性,被统称为 NIO(即 New IO),是同步非阻塞的
- NIO 相关类都被放在 java.nio 包及子包下,并且对原 java.io 包中的很多类进行改写。
- NIO 有三大核心部分:Channel(通道),Buffer(缓冲区), Selector(选择器)
- NIO是 面向缓冲区 ,或者面向块编程的。数据读取到一个 它稍后处理的缓冲区,需要时可在缓冲区中前后移动,这就 增加了处理过程中的灵活性,使用它可以提供非阻塞式的高 伸缩性网络
- Java NIO的非阻塞模式,使一个线程从某通道发送请求或者读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取,而不是保持线程阻塞,所以直至数据变的可以读取之前,该线程可以继续做其他的事情。 非阻塞写也是如此,一个线程请求写入一些数据到某通道,但不需要等待它完全写入,这 个线程同时可以去做别的事情。
- 通俗理解:NIO是可以做到用一个线程来处理多个操作的。假设有10000个请求过来, 根据实际情况,可以分配50或者100个线程来处理。不像之前的阻塞IO那样,非得分配10000个。
- HTTP2.0使用了多路复用的技术,做到同一个连接并发处理多个请求,而且并发请求 的数量比HTTP1.1大了好几个数量级
3.2 NIO 和 BIO 的比较
- BIO 以流的方式处理数据,而 NIO 以块的方式处理数据,块 I/O 的效率比流 I/O 高很多
- BIO 是阻塞的,NIO 则是非阻塞的
- BIO基于字节流和字符流进行操作,而 NIO 基于 Channel(通道)和 Buffer(缓冲区)进 行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。 Selector(选择器)用于监听多个通道的事件(比如:连接请求,数据到达等),因此使用单个线程就可以监听多个客户端通道
BIO | NIO | |
---|---|---|
处理数据方式 | 流(字节流、字符流) | 数据块(Buffer) |
流向 | 单向 | 双向 |
阻塞方式 | 阻塞 | 非阻塞 |
复用方式 | 一个连接一个线程 | 多路复用(Selector) |
3.3 NIO 三大核心
- Selector 、 Channel 和 Buffer 的简单关系图

- 关系图的说明:
- 每个 Channel 都会对应一个 Buffer
- Selector 对应一个线程, 一个 Selector 对应多个 Channel(连接)
- 该图反应了有三个 Channel 注册到该 selector
- 程序切换到哪个 Channel 是由事件决定的, Event 就是一个重要的概念
- Selector 会根据不同的事件,在各个 Channel(通道)上切换
- Buffer 就是一个内存块 , 底层是有一个数组
- 数据的读取写入是通过 Buffer, 这个和BIO , BIO 中要么是输入流,或者是 输出流, 不能双向,但是NIO的 Buffer 是可以读也可以写, 需要 flip 方法切换
- Channel 是双向的, 可以返回底层操作系统的情况, 比如 Linux , 底层的操作系统 通道就是双向的
3.4 缓冲区(Buffer)
1. 基本介绍
缓冲区(Buffer):缓冲区本质上是一个可以读写数据的内存块,可以理解成是一个 容器对象(含数组),该对象提供了一组方法,可以更轻松地使用内存块,,缓冲区对象内置了一些机制,能够跟踪和记录缓冲区的状态变化情况。Channel 提供从文件、 网络读取数据的渠道,但是读取或写入的数据都必须经由 Buffer,如图:

2. Buffer 类及其子类
- 在 NIO 中,Buffer 是一个顶层父类,它是一个抽象类, 类的层级关系图

常用Buffer子类一览
ByteBuffer,存储字节数据到缓冲区
ShortBuffer,存储字符串数据到缓冲区
CharBuffer,存储字符数据到缓冲区
IntBuffer,存储整数数据到缓冲区
LongBuffer,存储长整型数据到缓冲区
DoubleBuffer,存储小数到缓冲区
FloatBuffer,存储小数到缓冲区
- Buffer 的简单案例
public class BasicBuffer {
public static void main(String[] args) {
// 创建一个 Buffer, 一个可以存放 5 个整数的 Buffer
IntBuffer intBuffer = IntBuffer.allocate(5);
// 向 Buffer 中存放数据
for (int i = 0; i < intBuffer.capacity(); i++) {
intBuffer.put(i);
}
// 从 Buffer 读取数据
// 对 Buffer 进行读写切换
intBuffer.flip();
while (intBuffer.hasRemaining()){
System.out.println(intBuffer.get());
}
}
}
- Buffer类定义了所有的缓冲区都具有的四个属性来提供关于其所包含的数据元素 的信息:

- 这几个属性的大小关系 :mark <= position <= limit <= capacity
- Buffer相关方法

- 从前面可以看出对于 Java 中的基本数据类型(boolean除外),都有一个 Buffer 类型与之 相对应,最常用的自然是ByteBuffer 类(二进制数据)—— 网络传输的底层都是字节传输形式,该类的主要方法如下:
public abstract class ByteBuffer {
//缓冲区创建相关api
public static ByteBuffer allocateDirect(int capacity)// ★ 创建直接缓冲区
public static ByteBuffer allocate(int capacity)// ★ 设置缓冲区的初始容量
public static ByteBuffer wrap(byte[] array)//把一个数组放到缓冲区中使用
//构造初始化位置offset和上界length的缓冲区
public static ByteBuffer wrap(byte[] array,int offset, int length)
//缓存区存取相关API
public abstract byte get( );// ★ 从当前位置position上get,get之后,position会自动+1
public abstract byte get (int index);// ★ 从绝对位置get ,position 不会变化
public abstract ByteBuffer put (byte b);// ★ 从当前位置上添加,put之后,position会自动+1
public abstract ByteBuffer put (int index, byte b);// ★ 从绝对位置上put ,position 不会变化
}
3.5 通道(Channel)
1. 基本介绍
NIO的通道类似于流,但有些区别如下:
• 通道可以同时进行读写,而流只能读或者只能写
• 通道可以实现异步读写数据
• 通道可以从缓冲读数据,也可以写数据到缓冲:

BIO 中的 stream 是单向的,例如 FileInputStream 对 象只能进行读取数据的操作,而 NIO 中的通道 (Channel)是双向的,可以读操作,也可以写操作。
Channel在NIO中是一个接口
常用的 Channel 类有:FileChannel、 DatagramChannel、ServerSocketChannel 和 SocketChannel。

- 在 Server 服务器端中有一个 ServerSocketChannel,每当有 Client 客户端 来连接的时候,先访问 Server 的 ServerSocketChannel,然后由 ServerSocketChannel 为每一个连接创建一个 SocketChannel
- FileChannel 用于文件的数据读写
- DatagramChannel 用于 UDP 的数据读写
- ServerSocketChannel 和 SocketChannel 用于 TCP 的数据读写。
2. FileChannel 类
FileChannel主要用来对本地文件进行 IO 操作,常见的方法有
- public int read(ByteBuffer dst) ,从通道读取数据并放到缓冲区中
- public int write(ByteBuffer src) ,把缓冲区的数据写到通道中
- public long transferFrom(ReadableByteChannel src, long position, long count),从目标通道 中复制数据到当前通道
- public long transferTo(long position, long count, WritableByteChannel target),把数据从当 前通道复制给目标通道
3. 应用实例1-本地文件写数据
- 要求:
- 使用前面的ByteBuffer(缓冲) 和 FileChannel(通道), 将 “Hello,NIO_Demo01” 写入 到file01.txt 中
- 文件不存在就创建
- 流程图

- 代码实现
public class FileChannel01 {
public static void main(String[] args) throws Exception {
// 1. 得到数据
String str = "Hello";
// 2. 把数据写入 Buffer
// 创建一个输出流 , channel
FileOutputStream fileOutputStream = new FileOutputStream("D:\\file01.txt");
// 通过输出流,获取对应的 FileChannel
// fileChannel 真实类型是 fileChannelImpl
FileChannel fileChannel = fileOutputStream.getChannel();
// 创建一个缓冲区
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
// 把数据放入到 byteBuffer
byteBuffer.put(str.getBytes());
// 3. 把 Buffer 的数据传入输出流
// 4. 通过 输出流 中的 fileChannel 对象把数据写入
// 反转 Buffer
byteBuffer.flip();
// 把 Buffer 的数据写入 fileChannel
fileChannel.write(byteBuffer);
// 关闭流
fileOutputStream.close();
}
}
4. 应用实例2-本地文件读数据
- 要求:
- 使用的ByteBuffer(缓冲) 和 FileChannel(通道), 将 file01.txt 中的数据读入到程序,并显示在控制台屏幕
- 假定文件已经存在
public class FileChannel02 {
public static void main(String[] args) throws Exception {
// 1. 通过 输入流 中的 fileChannel 对象把数据读出
// 创建输入流
File file = new File("D:\\file01.txt");
FileInputStream inputStream = new FileInputStream(file);
// 通过 输入流 获得对应的 FileChannel
FileChannel fileChannel = inputStream.getChannel();
// 2. 把 输入流 的数据传入 Buffer
// 创建缓冲区
ByteBuffer buffer = ByteBuffer.allocate((int) file.length());
// 把数据从 fileChannel 读入到缓冲区
fileChannel.read(buffer);
// 3. 把数据从 Buffer 中取出
// 将缓冲区的字节转换成字符串
String s = new String(buffer.array());
// 4. 显示数据
System.out.println(s);
}
}
5. 应用实例3-使用一个Buffer完成文件读写(复制)
- 要求:
- 使用 FileChannel(通道) 和 方法 read , write,完成文件的拷贝
- 拷贝一个文本文件 file01.txt , 放在同级目录下 —— file02.txt
public class FileChannel03 {
public static void main(String[] args) throws Exception {
// 创建输入流对象,并获取对应的 Channel
File file = new File("D:\\file01.txt");
FileInputStream inputStream = new FileInputStream(file);
FileChannel inputStreamChannel = inputStream.getChannel();
// 创建输出流对象,并获取对应的 Channel
FileOutputStream outputStream = new FileOutputStream("D:\\file02.txt");
FileChannel outputStreamChannel = outputStream.getChannel();
// 创建 Buffer
ByteBuffer buffer = ByteBuffer.allocate(512);
int read = 0;
while (read != -1){
// 重置 Buffer 中的标志位,以免上一轮循环中 Buffer 中的信息,影响本轮操作
buffer.clear();
// 循环的 从 输入流 读取数据并写入到 输出流
read = inputStreamChannel.read(buffer);
buffer.flip();
outputStreamChannel.write(buffer);
}
// 关闭输入、输出流
inputStream.close();
outputStream.close();
}
}
6. 应用实例4-拷贝文件transferFrom 方法
- 要求:
- 使用 FileChannel(通道) 和 方法 transferFrom ,完成文件的拷贝
- 拷贝一张图片
public class FileChannel04 {
public static void main(String[] args) throws Exception {
// 创建输入流对象,并获取对应的 Channel
FileInputStream inputStream = new FileInputStream("D:\\Demo.png");
FileChannel inputStreamChannel = inputStream.getChannel();
// 创建输出流对象,并获取对应的 Channel
FileOutputStream outputStream = new FileOutputStream("D:\\Demo-2.png");
FileChannel outputStreamChannel = outputStream.getChannel();
// 使用 transferForm 完成拷贝
// 参数 : 被复制的流的Channel ; 起始位置 ; 结束位置
outputStreamChannel.transferFrom(inputStreamChannel,0,inputStreamChannel.size());
// 关闭输入、输出流
inputStream.close();
outputStream.close();
}
}
7. 关于Buffer 和 Channel的注意事项和细节
ByteBuffer 支持类型化的 put 和 get, put 放入的是什么数据类型,get 就应该使用相应的数据类型来取出(取出的顺序也要和存入的顺序一致),否则可能有 BufferUnderflowException 异常。
可以将一个普通Buffer 转成只读Buffer,如果对一个只读类型的 Buffer 进行写操作会报错 ReadOnlyBufferException
ByteBuffer buffer = ByteBuffer.allocate(3);
ByteBuffer byteBuffer = buffer.asReadOnlyBuffer();
System.out.println(buffer);
System.out.println(byteBuffer);

- NIO 还提供了 MappedByteBuffer, 可以让文件直接在内存(堆外的内存)中进行修改, 而如何同步到文件由NIO 来完成.
/* 说明
1. MappedByteBuffer 可以让文件直接在内存中修改,这样操作系统并不需要拷贝一次
2. MappedByteBuffer 实际类型是 DirectByteBuffer
*/
public class Mappedbytebuffer {
public static void main(String[] args) throws Exception {
RandomAccessFile randomAccessFile = new RandomAccessFile("D:\\file01.txt", "rw");
//获取对应的文件通道
FileChannel channel = randomAccessFile.getChannel();
// 参数 :使用 只读/只写/读写 模式 ; 可以修改的起始位置 ; 映射到内存的大小,即可以将文件的多少个字节映射到内存
// 这里就表示,可以对 file01.txt 文件中 [0,5) 的字节进行 读写操作
MappedByteBuffer map = channel.map(MapMode.READ_WRITE, 0, 5);
// 进行修改操作
map.put(0,(byte) 'A');
map.put(3,(byte) '3');
//关闭通道
channel.close();
}
}
- 前面我们讲的读写操作,都是通过一个Buffer 完成的,NIO 还支持 通过多个 Buffer (即 Buffer 数组) 完成读写操作,即 Scattering 和 Gathering ,遵循 依次写入,依次读取。
代码演示
package ke.cloud.netty.demo.nio.buffer;
import java.net.InetSocketAddress;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
/*
Scattering : 将数据写入到 Buffer 时,可以采用 Buffer 数组,依次写入【分散】
Gathering : 从 Buffer 读取数据,可以采用 Buffer 数组,依次读取
*/
public class ScatteringAndGatheringTest {
public static void main(String[] args) throws Exception{
// 使用ServerSocketChannel 和 inetSocketAddress
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
InetSocketAddress inetSocketAddress = new InetSocketAddress(6666);
// 绑定端口到socket并启动
serverSocketChannel.socket().bind(inetSocketAddress);
// 创建buffer数组
ByteBuffer[] byteBuffers = new ByteBuffer[2];
byteBuffers[0] = ByteBuffer.allocate(5);
byteBuffers[1] = ByteBuffer.allocate(3);
// 等待客户端连接(telnet)
SocketChannel socketChannel = serverSocketChannel.accept();
// 循环的读取数据
while (true) {
// 表示累计读取的字节数
int byteRead = 0;
// 假设从客户端最多接受8个字节
while (byteRead < 8) {
// 自动把数据分配到 byteBuffer-0 byteBuffer-1
long read = socketChannel.read(byteBuffers);
byteRead += read;
// 使用流打印,查看当前的buffer 的position和 limit
Arrays.asList(byteBuffers).stream().
map(byteBuffer -> "{position:"+ byteBuffer.position() + ",limit:"+byteBuffer.limit()+"}")
.forEach(System.out::println);
}
// 将所有的buffer进行翻转,为后面的其他操作做准备
Arrays.asList(byteBuffers).forEach(Buffer::flip);
// 将数据读出。显示到客户端
int byteWrite = 0;
while (byteWrite < 8) {
long write = socketChannel.write(byteBuffers);
byteWrite += write;
}
// 将所有的buffer清空,为后面的其他操作做准备
Arrays.asList(byteBuffers).forEach(Buffer::clear);
// 打印处理的字节数
System.out.println("{byteRead: "+byteRead+", byteWrite: "+byteWrite+"}");
}
}
}
启动程序,打开cmd连接测试
测试-1 —— 刚好发送 8 个字节
测试-2 —— 发送不足 8 个字节
测试-3 —— 发送超过 8 个字节


3.6 Selector(选择器)
1. 基本介绍
- Java 的 NIO,用非阻塞的 IO 方式。可以用一个线程,处理多个的客户端连接,就会使用到Selector(选择器)。
- Selector 能够检测多个注册的通道上是否有事件发生(注意:多个Channel以事件的方式可以注册到同一个Selector),如果有事件发生,便获取事件然 后针对每个事件进行相应的处理。这样就可以只用一个单线程去管理多个通道,也就是管理多个连接和请求。
- 只有在 连接/通道 真正有读写事件发生时,才会进行读写,就大大地减少 了系统开销,并且不必为每个连接都创建一个线程,不用去维护多个线程。
- 避免了多线程之间的上下文切换导致的开销。
2. Selector(选择器)
- Selector示意图

- 说明:
- Netty 的 IO 线程 NioEventLoop 聚合了 Selector(选择器, 也叫多路复用器),可以同时并发处理成百上千个客 户端连接。
- 当线程从某客户端 Socket 通道进行读写数据时,若没 有数据可用时,该线程可以进行其他任务。
- 线程通常将非阻塞 IO 的空闲时间用于在其他通道上 执行 IO 操作,所以单独的线程可以管理多个输入和 输出通道。
- 由于读写操作都是非阻塞的,这就可以充分提升 IO 线程的运行效率,避免由于频繁 I/O 阻塞导致的线程 挂起。
- 一个 I/O 线程可以并发处理 N 个客户端连接和读写操作,这从根本上解决了传统同步阻塞 I/O 一连接一线 程模型,架构的性能、弹性伸缩能力和可靠性都得到 了极大的提升
3. Selector类相关方法
- Selector 类是一个抽象类, 常用方法和说明如下
public abstract class Selector implements Closeable {
public static Selector open();//得到一个选择器对象
public int select(long timeout);//监控所有注册的通道,当其 中有 IO 操作可以进行时,将 对应的 SelectionKey 加入到内部集合中并返回,参数用来 设置超时时间
public Set<SelectionKey> selectedKeys();//从内部集合中得 到所有的 SelectionKey
}
4. 注意事项
- NIO中的 ServerSocketChannel功能类似ServerSocket,SocketChannel功能类 似Socket
- selector 相关方法说明
selector.select()//阻塞
selector.select(1000);//阻塞1000毫秒,在1000毫秒后返回
selector.wakeup();//唤醒
selector selector.selectNow();//不阻塞,立马返还
3.7 NIO 非阻塞 网络编程原理分析
- NIO 非阻塞 网络编程相关的(Selector、SelectionKey、 ServerScoketChannel和SocketChannel) 关系梳理图

- 说明
- 当客户端连接时,会通过 ServerSocketChannel 得到 SocketChannel
- Selector 进行监听 select 方法, 返回有事 件发生的通道的个数.
- 将socketChannel注册到Selector上, register(Selector sel, int ops), 一个 selector上可以注册多个SocketChannel
- 注册后返回一个 SelectionKey, 会和该 Selector 关联(集合)
- 进一步得到各个 SelectionKey (有事件发 生)
- 在通过 SelectionKey 反向获取 SocketChannel , 方法 channel()
- 可以通过 得到的 channel , 完成业务处理
3.8 NIO 非阻塞 网络编程快速入门
- 案例要求:
编写一个 NIO 入门案例,实现服务器端和客户端之间的数据简单通讯(非阻塞)
- 服务端
package ke.cloud.netty.demo.nio.buffer;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
* 服务端
*/
public class NIOServer {
public static void main(String[] args) throws IOException {
// 创建一个ServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 得到一个Selector
Selector selector = Selector.open();
// 绑定端口,在服务端进行监听
serverSocketChannel.socket().bind(new InetSocketAddress(6666));
// 设置为非阻塞
serverSocketChannel.configureBlocking(false);
// 把ServerSocketChannel 注册到 Selector 关心事件为OP_ACCEPT(连接)
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
// 循环等待客户端连接
while (true) {
// 等待一秒,如果没有发生事件, 就继续
if (selector.select(2000) == 0) {
System.out.println("服务器等了一秒,无连接");
continue;
}
// 如果有事件发生,获取到发生事件的SelectionKey集合
Set<SelectionKey> selectionKeys = selector.selectedKeys();
// 通过selectionKeys 反向获取对应的通道
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
// 获取iterator
SelectionKey key = iterator.next();
// 根据key发生的事件,做出相应的处理
// 如果是连接事件
if (key.isAcceptable()) {
// 通过serverSocketChannel 给该客户端生成一个SocketChannel
SocketChannel socketChannel = serverSocketChannel.accept();
// 设为非阻塞
socketChannel.configureBlocking(false);
System.out.println("客户端连接成功,生成了一个socketChannel" + socketChannel.hashCode());
// 将当前的socketChannel 注册到Selector ,关心事件为OP_READ(读)
socketChannel.register(selector, SelectionKey.OP_READ,
ByteBuffer.allocate(1024));
}
// 如果是 读事件
if (key.isReadable()) {
// 通过key 反向获取对应的通道
SocketChannel channel = (SocketChannel) key.channel();
// 获取该 SocketChannel 关联的Buffer
ByteBuffer buffer = (ByteBuffer) key.attachment();
// Channel 中的数据读到buffer
channel.read(buffer);
System.out.println("from 客户端——" + new String(buffer.array()));
buffer.clear();
}
// 处理完毕后要手动删除当前的 SelectionKey,避免多线程重复操作
iterator.remove();
}
}
}
}
- 客户端
package ke.cloud.netty.demo.nio.buffer;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
public class NIOClient {
public static void main(String[] args) throws IOException {
// 得到一个网络通道
SocketChannel socketChannel = SocketChannel.open();
// 设置为非阻塞
socketChannel.configureBlocking(false);
// 提供服务器的ip,端口
InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 6666);
// 连接服务器
if (!socketChannel.connect(inetSocketAddress)) {
// 如果没有完成
while (!socketChannel.finishConnect()) {
System.out.println("连接中……因为连接需要时间,客户端不会阻塞,可以做其他工作");
}
}
//如果连接成功 就发送数据
String str="hello";
//wrap通过参数中的字节数组的大小,直接生成对应的Buffer,并把字节数组存入
ByteBuffer buffer = ByteBuffer.wrap(str.getBytes());
socketChannel.write(buffer);
System.in.read();
}
}
- 启动测试
先启动 服务端,再启动客户端

3.9 SelectionKey
- SelectionKey,表示 Selector 和网络通道的注册关系(OPS), 共四种:
int OP_ACCEPT:有新的网络连接可以 accept,值为 16
int OP_CONNECT:代表连接已经建立,值为 8
int OP_READ:代表读操作,值为 1
int OP_WRITE:代表写操作,值为 4
public static final int OP_READ = 1 << 0;
public static final int OP_WRITE = 1 << 2;
public static final int OP_CONNECT = 1 << 3;
public static final int OP_ACCEPT = 1 << 4;
2.SelectionKey相关方法
public abstract class SelectionKey {
public abstract Selector selector();//得到与之关联的 Selector 对象
public abstract SelectableChannel channel();//得到与之关 联的通道
public final Object attachment();//得到与之关联的共享数 据
public abstract SelectionKey interestOps(int ops);//设置或改 变监听事件
public final boolean isAcceptable();//是否可以 accept
public final boolean isReadable();//是否可以读
public final boolean isWritable();//是否可以写
}
3.10 ServerSocketChannel
- ServerSocketChannel 在服务器端监听新的客户端 Socket 连接
- 相关方法如下:
public abstract class ServerSocketChannel extends AbstractSelectableChannel implements NetworkChannel{
public static ServerSocketChannel open()//得到一个 ServerSocketChannel 通道
public final ServerSocketChannel bind(SocketAddress local)//设置服务器端端口 号
public final SelectableChannel configureBlocking(boolean block)//设置阻塞或非 阻塞模式,取值 false 表示采用非阻塞模式
public SocketChannel accept()//接受一个连接,返回代表这个连接的通道对象
public final SelectionKey register(Selector sel, int ops)//注册一个选择器并设置 监听事件
}
3.11 SocketChannel
- SocketChannel,网络 IO 通道,具体负责进行读写操作。NIO 把缓冲区的数据写入通 道,或者把通道里的数据读到缓冲区。
- 相关方法如下
public abstract class SocketChannel extends AbstractSelectableChannel implements ByteChannel, ScatteringByteChannel, GatheringByteChannel, NetworkChannel{
public static SocketChannel open();//得到一个 SocketChannel 通道
public final SelectableChannel configureBlocking(boolean block);//设置阻塞或非阻塞 模式,取值 false 表示采用非阻塞模式
public boolean connect(SocketAddress remote);//连接服务器
public boolean finishConnect();//如果上面的方法连接失败,接下来就要通过该方法 完成连接操作
public int write(ByteBuffer src);//往通道里写数据
public int read(ByteBuffer dst);//从通道里读数据
public final SelectionKey register(Selector sel, int ops, Object att);//注册一个选择器并 设置监听事件,最后一个参数可以设置共享数据
public final void close();//关闭通道
}
3.12 NIO 网络编程应用实例-群聊系统
- 要求:
- 编写一个 NIO 群聊系统,实现服务器端和客户端之间的数据简单通讯(非阻塞)
- 实现多人群聊
- 服务器端:可以监测用户上线,离线, 并实现消息转发功能
- 客户端:通过channel 可以无阻塞发送 消息给其它所有用户,同时可以接受 其它用户发送的消息(有服务器转发得到)
- 目的:进一步理解NIO非阻塞网络编程 机制
- 代码实现
服务端
package ke.cloud.netty.demo.nio.chat;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.Channel;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
public class Server {
// 定义属性
private Selector selector;
private ServerSocketChannel serverSocketChannel;
private static final int PORT = 6666;
// 构造器 服务器端监听连接建立
public Server() {
try {
// 得到选择器
selector = Selector.open();
// 得到 ServerSocketChannel
serverSocketChannel = ServerSocketChannel.open();
// 绑定端口
serverSocketChannel.socket().bind(new InetSocketAddress(PORT));
// 设置非zus
serverSocketChannel.configureBlocking(false);
// 把ServerSocketChannel 注册到Selector中 关注连接事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (Exception e) {
e.printStackTrace();
} finally {
}
}
// 监听,处理客户端连接
public void listen() {
try {
// 循环处理
while (true) {
// selector开启监听
int select = selector.select();
if (select > 0) { //表示有事件要处理
// 遍历得到selectedKeys 集合
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
// 取出SelectionKey
SelectionKey key = iterator.next();
if (key.isAcceptable()) { // 处理连接事件
// 通过 ServerSocketChannel 获得 socketChannel
SocketChannel socketChannel = serverSocketChannel.accept();
// 设置非阻塞
socketChannel.configureBlocking(false);
// 将socketChannel注册到selector中
socketChannel.register(selector, SelectionKey.OP_READ);
// 给出提示
System.out.println(socketChannel.getRemoteAddress() + "上线了~");
}
// 处理读事件
if (key.isReadable()) {
// 处理读的方法
read(key);
}
iterator.remove();
}
} else {
System.out.println("等待中……");
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
// 读取客户端的消息
public void read(SelectionKey key) {
// 定义一个SocketChannel
SocketChannel socketChannel = null;
try {
// 通过SelectionKey 得到关联的Channel
socketChannel = (SocketChannel) key.channel();
// 创建缓冲区ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 从Channel 中 把数据读取到缓冲区buffer
int read = socketChannel.read(buffer);
// 根据 read 的值,做出对应的处理
if (read > 0) { //有读取到数据
String s = new String(buffer.array());
System.out.println("【服务端】收到客户端消息:" + s);
// 向其他客户端转发消息,需要排除自己
sendMessageToOther(s, socketChannel);
}
} catch (Exception e) {
// 如果在读取数据时候,发生异常,则表示离线了
try {
System.out.println(socketChannel.getRemoteAddress() + "离线了~");
// 取消注册
key.channel();
// 关闭通道
socketChannel.close();
} catch (Exception exception) {
exception.printStackTrace();
}
}
}
// 转发消息给其他客户端
private void sendMessageToOther(String msg, SocketChannel socketChannel) throws IOException {
System.out.println("服务器转发消息中……");
// 遍历所有的selectedKeys 排除自己
// 遍历所有注册到 Selector 上的 socketChannel ,并排除自己
for (SelectionKey key : selector.keys()) {
// 通过 key 取出对应的 SocketChannel
Channel channel = key.channel();
// 排除自己, channel 必须是一个 SocketChannel 类型的 并且 channel 不等于自己
if (channel instanceof SocketChannel && channel != socketChannel) {
// 转换 channel 类型
SocketChannel dest = (SocketChannel) channel;
// 将msg 存储到 buffer中
ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
// 将buffer 的数据写入通道
dest.write(buffer);
}
}
}
public static void main(String[] args) {
// 创建一个服务器对象
Server server = new Server();
server.listen();
}
}
客户端
package ke.cloud.netty.demo.nio.chat;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;
public class Client {
// 定义相关属性
// 服务器的IP
private final String HOST = "127.0.0.1";
// 服务器的端口
private final int PORT = 6666;
private Selector selector;
private SocketChannel socketChannel;
private String username;
// 构造器 服务器端监听连接建立
public Client() {
try {
// 得到选择器
selector = Selector.open();
// 得到 ServerSocketChannel
socketChannel = SocketChannel.open(new InetSocketAddress(HOST,PORT));
// 设置非zus
socketChannel.configureBlocking(false);
// 把ServerSocketChannel 注册到Selector中 关注读事件
socketChannel.register(selector, SelectionKey.OP_READ);
// 得到username·
username = socketChannel.getLocalAddress().toString().substring(1);
System.out.println(username + "is OK!");
} catch (Exception e) {
e.printStackTrace();
} finally {
}
}
// 向服务器发送消息
public void sendMsg(String msg) {
msg = username + "说:" + msg;
try {
// 把msg 写入buffer
socketChannel.write(ByteBuffer.wrap(msg.getBytes()));
// 读取从服务器端回复的消息
} catch (Exception e) {
e.printStackTrace();
} finally {
}
}
public void readMsg() {
try {
int select = selector.select();
if (select > 0) {
// 有事件发生的通道
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
if (key.isReadable()) { //读事件
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
channel.read(buffer);
String msg = new String(buffer.array());
System.out.println(msg.trim());
} else {
System.out.println("没有可用的通道");
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
// 启动客户端
Client client = new Client();
// 启动一个线程每三秒钟从服务器端读取数据
new Thread(() -> {
while (true) {
client.readMsg();
try {
Thread.sleep(3000);
}catch (Exception e){
e.printStackTrace();
}
}
}).start();
// 发送数据给服务端
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String s = scanner.nextLine();
client.sendMsg(s);
}
}
}
- 启动服务端,和三个客户端,进行测试
- 任意客户端- 发送消息

- 客户端退出

3.13 NIO与零拷贝
1. 基本介绍
- 零拷贝是网络编程的关键,很多性能优化都离不开。
- 零拷贝不是指不拷贝,而是没有 CPU 拷贝。
- 在 Java 程序中,常用的零拷贝有 mmap(内存映射) 和 sendFile。那么,他们在 OS 里,到底是怎么样的一个的设计?我们分析 mmap 和 sendFile 这两个零拷贝
2. 传统IO数据读写
- Java 传统 IO 和 网络编程的一段代码
File file = new File("test.txt");
RandomAccessFile raf = new RandomAccessFile(file, "rw");
byte[] arr = new byte[(int) file.length()];
raf.read(arr);
Socket socket = new ServerSocket(8080).accept();
socket.getOutputStream().write(arr);
- 传统 IO 读写的流程图

- 由上图可得:传统的 IO 进行了 4次拷贝, 3次状态切换
3. mmap 优化
mmap 通过内存映射,将文件映射到内核缓冲区,同时,用户空间可以共享内核空间的数据。这样,在进行网络传输时,就可以减少内核空间到用户控件的拷贝次数。如下图

- MMAP 进行了 3次拷贝,3次转换
- 比传统的优化了一些,少了 1次拷贝,但是还没有做到 零拷贝
4. sendFile 优化
- Linux 2.1 版本 提供了 sendFile 函数,其基本 原理如下:数据根本不 经过用户态,直接从内 核缓冲区进入到 Socket Buffer,同时,由于和用 户态完全无关,就减少 了一次上下文切换

- Linux 在 2.4 版本中,做了一些修改,避免了从内核缓冲区拷贝到 Socket buffer 的操作,直接拷贝到 协议栈,从而再一次减少 了数据拷贝

- 到这里,已经优化到了 2次拷贝(忽略 内核->Socket 的拷贝),2次状态切换
5. 再次理解
- 我们说零拷贝,是从操作系统的角度来说的。因为内核缓冲区之间,没有数据是 重复的(只有 kernel buffer 有一份数据)。
- 零拷贝不仅仅带来更少的数据复制,还能带来其他的性能优势,例如更少的上下 文切换,更少的 CPU 缓存伪共享以及无 CPU 校验和计算。
6. mmap 和 sendFile 的区别
- mmap 适合小数据量读写,sendFile 适合大文件传输。
- mmap 需要 3 次上下文切换,3 次数据拷贝;sendFile 需要 2 次上下文切换,最 少 2 次数据拷贝。
- sendFile 可以利用 DMA 方式,减少 CPU 拷贝,mmap 则不能(必须从内核拷贝 到 Socket 缓冲区)。
7. NIO 零拷贝案例
- 要求
- 使用传统的IO 方法传递一个大文件
- 使用NIO 零拷贝方式传递(transferTo)一个大文件
- 看看两种传递方式耗时时间分别是多少
- 传统方式
服务端
package ke.cloud.netty.demo.nio;
import java.io.DataInputStream;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
public class OldIOServer {
public static void main(String[] args) throws IOException {
// 监听 7001,端口
ServerSocket serverSocket = new ServerSocket(7001);
while (true) {
// 监事连接
Socket socket = serverSocket.accept();
// 通过serverSocket 获取获取输入流
DataInputStream dataInputStream = new DataInputStream(socket.getInputStream());
try {
byte[] bytes = new byte[4096];
while (true) {
int read = dataInputStream.read(bytes, 0, bytes.length);
if (-1 == read) {
break;
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
客户端
package ke.cloud.netty.demo.nio;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.Socket;
public class OldIOClient {
public static void main(String[] args) throws IOException {
Socket socket = new Socket("localhost", 7001);
// 需要拷贝大文件
String fileName = "E:\\BaiduNetdiskDownload\\spring-cloud-alibaba-practice-master.zip";
InputStream inputStream = new FileInputStream(fileName);
DataOutputStream dataOutputStream = new DataOutputStream(socket.getOutputStream());
byte[] buffer = new byte[4096];
long readCount;
long total = 0;
long startTime = System.currentTimeMillis();
while ((readCount = inputStream.read(buffer)) >= 0) {
total += readCount;
dataOutputStream.write(buffer);
}
System.out.println("发送总字节数: " + total + ", 耗时: " + (System.currentTimeMillis() - startTime));
dataOutputStream.close();
socket.close();
inputStream.close();
}
}
启动测试

- 零拷贝(transferTo)
服务端
package ke.cloud.netty.demo.nio;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
public class IOServer {
public static void main(String[] args) throws Exception {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(7001));
// 创建buffer
ByteBuffer buffer = ByteBuffer.allocate(4096);
while (true) {
SocketChannel socketChannel = serverSocketChannel.accept();
int read = 0;
while (read != -1) {
try {
read = socketChannel.read(buffer);
} catch (Exception e) {
e.printStackTrace();
}
// 倒带 : Position = 0,Mark 作废
buffer.rewind();
}
}
}
}
客户端
package ke.cloud.netty.demo.nio;
import java.io.FileInputStream;
import java.net.InetSocketAddress;
import java.nio.channels.FileChannel;
import java.nio.channels.SocketChannel;
public class IOClient {
public static void main(String[] args) throws Exception{
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("localhost", 7001));
String fileName = "E:\\BaiduNetdiskDownload\\spring-cloud-alibaba-practice-master.zip";
FileChannel channel = new FileInputStream(fileName).getChannel();
long startTime = System.currentTimeMillis();
// 开始传输
// transferTo 方法底层使用 零拷贝
//在 Linux 下,一个 transferTo 方法,就可以完成传输
//在 Windows 下,transferTo 一次调用只能发送 8M 文件,所以就需要分段传输,而且要注意传输时的位置
long transfer = channel.transferTo(0, channel.size(), socketChannel);
System.out.println("发送总的字节数 :"+ transfer + " ,总耗时:"+(System.currentTimeMillis() - startTime));
}
}
启动测试

- 因为文件不算很大,所以差别也没很大,文件越大优化效果越明显。
4. Java AIO 基本介绍
- JDK 7 引入了 Asynchronous I/O,即 AIO。在进行 I/O 编程中,常用到两种模式: Reactor和 Proactor。Java 的 NIO 就是 Reactor,当有事件触发时,服务器端得 到通知,进行相应的处理。
- AIO 即 NIO2.0,叫做异步不阻塞的 IO。AIO 引入异步通道的概念,采用了 Proactor 模式,简化了程序编写,有效的请求才启动线程,它的特点是先由操作系统完成后才通知服务端程序启动线程去处理,一般适用于连接数较多且连接时间较长的应用。
- 目前 AIO 还没有广泛应用,Netty 也是基于NIO, 而不是AIO, 因此我们就不详解 AIO了,参考资料 《Java新一代网络编程模型AIO原理及Linux系统 AIO介绍》 http://www.52im.net/thread-306-1-1.html。
5. BIO、NIO、AIO对比

- 举例说明
- 同步阻塞(BIO):到理发店理发,就一直等理发师,直到轮到自己理发,且在等的过程是什么也不干。
- 同步非阻塞(NIO):到理发店理发,发现前面有其它人理发,给理发师说 下,先干其他事情,一会过来看是否轮到自己.
- 异步非阻塞(AIO):给理发师打电话,让理发师上门服务,自己干其它事情,理发师自己来家给你理发
5. Netty 概述
1. 原生NIO存在的问题
- 为什么有了 NIO ,还会出现 Netty,因为 NIO 有如下问题
- NIO 的类库和 API 繁杂,使用麻烦:需要熟练掌握 Selector、ServerSocketChannel、 SocketChannel、ByteBuffer 等。
- 需要具备其他的额外技能:要熟悉 Java 多线程编程,因为 NIO 编程涉及到 Reactor 模式,你必须对多线程和网络编程非常熟悉,才能编写出高质量的 NIO 程序。
- 开发工作量和难度都非常大:例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常流的处理等等。
- JDK NIO 的 Bug:例如臭名昭著的 Epoll Bug,它会导致 Selector 空轮询(死循环),最终导致 CPU 100%。直到 JDK 1.7 版本该问题仍旧存在,没有被根本解决
2. Netty官网说明
- 官网地址 :https://netty.io/
- Netty是一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能的服务器和客户端。
3. Netty官网说明
- Netty 是由 JBOSS 提供的一个 Java 开源框架。Netty 提供异步的、基于事件驱动的网络应用程序框架,用以快速开发高性能、高可靠性的网络 IO 程序。
- Netty 可以帮助你快速、简单的开发出一个网络应用,相当于简化和流程化了 NIO 的 开发过程。
- Netty 是目前最流行的 NIO 框架,Netty 在互联网领域、大数据分布式计算领域、游戏行业、通信行业等获得了广泛的应用,知名的 Elasticsearch 、Dubbo 框架内部都采 用了 Netty。
4. Netty的优点
Netty 对 JDK 自带的 NIO 的 API 进行了封装,解决了上述问题。
- 设计优雅:适用于各种传输类型的统一 API 阻塞和非阻塞 Socket;基于灵活且可扩展的事件模型,可以清晰地分离关注点;高度可定制的线程模型 —— 单线程,一个或多个 线程池。
- 使用方便:详细记录的 Javadoc,用户指南和示例;没有其他依赖项 (JDK 5 -> Netty 3.x ; 6 -> Netty 4.x 就可以支持了)。
- 高性能、吞吐量更高:延迟更低;减少资源消耗;最小化不必要的内存复制。
- 安全:完整的 SSL/TLS 和 StartTLS 支持。
- 社区活跃、不断更新:社区活跃,版本迭代周期短,发现的 Bug 可以被及时修复, 同时,更多的新功能会被加入
5. Netty版本说明
- netty版本分为 netty3.x 和 netty4.x、netty5.x。
- 因为Netty5出现重大bug,已经被官网废弃了,目前推荐使用的是Netty4.x的稳定版本。
- 目前在官网可下载的版本 netty3.x netty4.0.x 和 netty4.1.x。
- 这里使用的是 Netty4.1.x 版本。
- netty 下载地址: https://bintray.com/netty/downloads/netty/
6. 线程模型基本介绍
1. 概述
- 不同的线程模式,对程序的性能有很大影响,为了搞清Netty 线程模式,来了解下各个线程模式,最后看看Netty 线程模型有什么优越性.
- 目前存在的线程模型有:
- 传统阻塞 I/O 服务模型
- Reactor 模式(反应器模式)
- 根据 Reactor 的数量和处理资源池线程的数量不同,有 3 种典型的实现
- 单 Reactor 单线程;
- 单 Reactor 多线程;
- 主从 Reactor 多线程
- Netty 线程模式(Netty 主要基于主从 Reactor 多线程模型做了一定的改进,其中主从 Reactor 多线程模型有多个 Reactor)
2. 传统阻塞 I/O 服务模型
- 工作原理图

- 采用阻塞IO模式获取输入的数据
- 每个连接都需要独立的线程完成数据的输入(read),业务处理, 数据返回(send)
- 问题分析
- 当并发数很大,就会创建大量的线程,占用很大系统资源
- 连接创建后,如果当前线程暂时没有数据可读,该线程会阻塞在read 操作,造成线程资源浪费
3. Reactor 模式
针对传统阻塞 I/O 服务模型的 2 个缺点,解决方案:
- 基于 I/O 复用模型:多个连接共用一个阻塞对象,应用程序只需要在一个阻塞对象等待,无需阻塞等待所有连接。当某个连接有新的数据可以处理时,操作系统通知应用程序,线程从阻塞状态返回,开始进行业务处理
- 基于线程池复用线程资源:不必再为每个连接创建线程,将连接完成后的业务处理任务分配给线程进行处理,一个线程可以处理多个连接的业务。
- Reactor 对应的叫法:
- 反应器模式
- 分发者模式(Dispatcher)
- 通知者模式(notifier)
- I/O 复用结合线程池,就是 Reactor 模式基本设计思想, 如图:

- 说明:
- Reactor 模式,通过一个或多个输入同时传递给服务处理器的模式 (基于事件驱动)。
- 服务器端程序处理传入的多个请求, 并将它们同步分派到相应的处理线程, 因此Reactor(反应器)模式也叫 Dispatcher(分发者) 模式。
- Reactor 模式使用IO复用监听事件, 收到事件后,分发给某个线程(进程), 这点就是网络服务器高并发处理关键。
- 核心组成:
- Reactor:Reactor 在一个单独的线程中运行,负责监听和分发事件,分发给适当的处 理程序来对 IO 事件做出反应。 它就像公司的电话接线员,它接听来自客户的电话并 将线路转移到适当的联系人;
- Handlers:处理程序执行 I/O 事件要完成的实际事件,类似于客户想要与之交谈的公司中的实际官员。Reactor 通过调度适当的处理程序来响应 I/O 事件,处理程序执行非阻塞操作
- Reactor 模式分类:
- 单 Reactor 单线程
- 单 Reactor 多线程
- 主从 Reactor 多线程
3.1 单 Reactor 单线程
- 工作原理示意图

- 说明:
- Select 是前面 I/O 复用模型介绍的标准网络编程 API,可以实现应用程序通过一个阻塞对象监听多路连接请求。
- Reactor 对象通过 Select 监控客户端请求事件,收到事件后通过 Dispatch 进行分发。
- 如果是建立连接请求事件,则由 Acceptor 通过 Accept 处理连接请求,然后创建一个 Handler 对象处理连接完成后的后续业务处理。
- 如果不是建立连接事件,则 Reactor 会分发调用连接对应的 Handler 来响应
- Handler 会完成 Read→业务处理→Send 的完整业务流程 结合实例:服务器端用一个线程通过多路复用搞定所有的 IO 操作(包括连接,读、写 等),编码简单,清晰明了,但是如果客户端连接数量较多,将无法支撑,前面的 NIO 案例就属于这种模型。
- 优缺点:
优点
- 模型简单,没有多线程、进程通信、竞争的问题,全部都在一个线程中完成。
缺点
- 性能问题,只有一个线程,无法完全发挥多核 CPU 的性能。Handler 在处理某 个连接上的业务时,整个进程无法处理其他连接事件,很容易导致性能瓶颈。
- 可靠性问题,线程意外终止,或者进入死循环,会导致整个系统通信模块不 可用,不能接收和处理外部消息,造成节点故障。
使用场景:客户端的数量有限,业务处理非常快速,比如 Redis在业务处理的时间复 杂度 O(1) 的情况
多线程
3.2 单Reactor- 工作原理示意图:

- 说明
- Reactor 对象通过 select 监控客户端请求事件, 收到事件后,通过 dispatch 进行分发。
- 如果建立连接请求, 则由 Acceptor 通过 accept 处理连接请求, 然后创建一个Handler对象处理完成连接后的各种事件。
- 如果不是连接请求,则由 Reactor 分发调用连接对 应的Handler 来处理。
- Handler 只负责响应事件,不做具体的业务处理, 通过 read 读取数据后,会分发给后面的 Worker 线程池的某个线程处理业务。
- Worker 线程池会分配独立线程完成真正的业务, 并将结果返回给 Handler。
- Handler 收到响应后,通过 send 将结果返回给 Client。
- 优缺点
优点:
- 可以充分的利用多核cpu 的处理能力。
缺点:
- 多线程数据共享和访问比较复杂。
- 单 Reactor 处理所有的事件的监听和响应,在单线程运行, 在高并发场景容易出现性能瓶颈。
3.3 主从 Reactor 多线程
- 工作原理示意图

- 说明
- Reactor 主线程 MainReactor 对象通过 select 监听连接事件, 收到事件后,通过 Acceptor 处理连接事件(主 Reactor 只处理连接事件)
- 当 Acceptor 处理连接事件后,MainReactor 将连接分配给 SubReactor
- SubReactor 将连接加入到连接队列进行监听,并创建 Handler 进行各种事件处理
- 当有新事件发生时, SubReactor 就会调用对应的 Handler处理
- Handler 通过 read 读取数据,分发给后面的 (Worker 线程池)处理
- (Worker 线程池)分配独立的 (Worker 线程)进行业务处理,并返 回结果
- Handler 收到响应的结果后,再通过 send 将结果返回给 Client
一个 MainReactor 可以关联多个 SubReactor
《Scalable IO in Java》 书中对 Multiple Reactors 的原理图解

- 优缺点
优点:
- 父线程与子线程的数据交互简单职责明确,父线程只需要接收新连接,子线程完成后续的业务处理。
- 父线程与子线程的数据交互简单,Reactor 主线程只需要把新连接传给子线程,子线程无需返回数据。
缺点:
- 编程复杂度较高
结合实例:这种模型在许多项目中广泛使用,包括 Nginx 主从 Reactor 多进程模型, Memcached 主从多线程,Netty 主从多线程模型的支持
3.4 Reactor 模式小结
- 3 种模式用生活案例来理解
- 单 Reactor 单线程,前台接待员和服务员是同一个人,全程为顾客服。
- 单 Reactor 多线程,1 个前台接待员,多个服务员,接待员只负责接待。
- 主从 Reactor 多线程,多个前台接待员,多个服务生。
- Reactor 模式具有如下的优点:
- 响应快,不必为单个同步时间所阻塞,虽然 Reactor 本身依然是同步的。
- 可以最大程度的避免复杂的多线程及同步问题,并且避免了多线程/进程 的切换开销。
- 扩展性好,可以方便的通过增加 Reactor 实例个数来充分利用 CPU 资源。
- 复用性好,Reactor 模型本身与具体事件处理逻辑无关,具有很高的复用性。
7. Netty模型
1. Netty 工作原理示意图
- Netty 主要基于主从 Reactors 多线程模型做了一定的改进,其中主从 Reactor 多 线程模型有多个 Reactor
1.1 简单版

- 说明 :
- BossGroup 线程维护Selector , 只关注Accecpt
- 当接收到Accept事件,获取到对应的 SocketChannel, 封装成 NIOScoketChannel并注册到 Worker 线程(事件循环), 并进行维护
- 当Worker线程监听到 Selector 中通道发生自己感 兴趣的事件后,就进行处理(就由 Handler 处理), 注意 Handler 已经加入到通道
1.2 进阶版

1.3 详细版

- 说明 :
Netty 抽象出两组线程池 BossGroup 专门负责接收客户端的连接, WorkerGroup 专门负责网络的读写
BossGroup 和 WorkerGroup 类型都是 NioEventLoopGroup
NioEventLoopGroup 相当于一个事件循环组, 这个组中含有多个事件循环 ,每一个事件循环是 NioEventLoop
NioEventLoop 表示一个不断循环的执行处理任务的线程, 每个 NioEventLoop 都有一个 Selector , 用于监听绑定在其上的 Socket 的网络通讯
NioEventLoopGroup(BossGroup、WorkerGroup) 可以有多个线程, 即可以含有多个 NioEventLoop
每个Boss 的 NioEventLoop 循环执行的步骤有3步
轮询accept 事件
处理accept 事件 , 与client建立连接 , 生成NioScocketChannel , 并将其注册到 Worker 的 NIOEventLoop 上的 Selector
处理任务队列的任务 , 即 runAllTasks
每个 Worker 的 NIOEventLoop 循环执行的步骤
轮询read, write 事件
处理i/o事件, 即read , write 事件,在对应NioScocketChannel 处理
处理任务队列的任务 , 即 runAllTasks
每个Worker NIOEventLoop 处理业务时,会使用 Pipeline(管道), Pipeline 中包含了 Channel , 即通过 Pipeline 可以获取到对应通道, 管道中维护了很多的处理器。管道可以使用 Netty 提供的,也可以自定义。
2. Netty快速入门实例-TCP服务
- 要求
- Netty 服务器在 6668 端口监听,客户端能发送消息给服务器 “hello, 服务器~”
- 服务器可以回复消息给客户端 “hello, 客户端~”
- 代码实现
- 引入依赖
<!--netty依赖-->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.78.Final</version>
</dependency>
- 编写 Netty 的服务端 :NettyServer
package ke.cloud.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class NettyServer {
public static void main(String[] args) {
// 创建 BossGroup 和 WorkerGroup
/*
说明
1. 创建两个线程组 BossGroup 和 WorkerGroup
2. BossGroup 只处理连接请求
3. WorkerGroup 处理真正客户端的业务
4. 运行时,这两个都是无限循环
*/
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// 创建 服务端 启动对象,并配置参数
ServerBootstrap bootstrap = new ServerBootstrap();
// 使用链式编程进行配置参数
bootstrap.group(bossGroup, workerGroup) // 设置两个线程组
.channel(NioServerSocketChannel.class) // NioServerSocketChannel 作为服务器的通道实现
.option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列等待连接的个数
.childOption(ChannelOption.SO_KEEPALIVE, true)// 设置连接保持活动连接状态
.childHandler(
new ChannelInitializer<SocketChannel>() { // 给 workerGroup 的 NioEventLoop 对应的管道(Pipeline)设置处理器
// 创建一个通道初始化对象
/**
* 向 workerGroup 对应的 管道(Pipeline) 设置处理器
*
* @param socketChannel
* @throws Exception
*/
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()// 获得 这个 socketChannel 对应的 Pipeline
.addLast(new NettyServerHandler()); // 把自定义的 Handler 添加到 管道
}
});
System.out.println("服务器准备好了……");
// 绑定一个端口,并且同步。生成了一个 ChannelFuture 对象
// 这里就已经启动了服务器
ChannelFuture channelFuture = bootstrap.bind(6668).sync();
// 对 关闭通道 进行监听
// 这里只是监听,只有关闭通道时才进行处理,这句话不是直接关闭了通道
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
- 编写 自定义的 Netty 服务端处理器 :NettyServerHandler
package ke.cloud.netty;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/**
* 1. 自定义一个 Handler 需要继承 Netty 规定好的某个 处理器适配器
* 2. 这时自定义的 Handler ,才能称为一个 Handler
*/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 读取数据的事件(可以读取客户端发送的消息)
*
* @param ctx 上下文对象,包含 管道、通道、地址
* @param msg 客户端发送的消息,默认是 Object 类型
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("【Server】: ctx" + ctx);
// 将 msg 转换成 ByteBuffer
/*
说明 :
1. 注意这个是 ByteBuf ,是 io.netty.buffer 包下的,不是 NIO 下的 Buffer
2. ByteBuf 比 Buffer 的性能更高一点
*/
ByteBuf buf = (ByteBuf) msg;
// 把 buf 转成 UTF8 格式的字符串
System.out.println("客户端发送的 msg :" + buf.toString(CharsetUtil.UTF_8));
System.out.println("客户端地址 :" + ctx.channel().remoteAddress());
}
/**
* 数据读取完毕后,返回消息给客户端
*
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// 把数据写入缓冲区,并刷新缓冲区 write+Flush 写入和清除
// 一般来说,需要对这个发送的消息进行编码
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端",CharsetUtil.UTF_8));
}
/**
* 处理异常
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 关闭通道
ctx.close();
}
}
- 编写 Netty 的客户端 :NettyClient
package ke.cloud.netty;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
// 客户端需要一个事件循环组
EventLoopGroup group = new NioEventLoopGroup();
try {
// 客户端启动对象 —— Bootstrap ,不是 服务端的 ServerBootstrap
// 并且是 io.netty.bootstrap 包下的
Bootstrap bootstrap = new Bootstrap();
// 设置相关参数
bootstrap.group(group)// 设置线程组
.channel(NioSocketChannel.class)// 设置客户端通道的实现类
.handler(new ChannelInitializer<SocketChannel>() {// 设置处理器
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new NettyClientHandler());// 加入自己的处理器
}
});
System.out.println("客户端准备好了……");
// 启动客户端连接服务器端
// 这里涉及到一个 Netty 的异步模型,后面详述
ChannelFuture channelFuture = bootstrap.connect("localhost", 6668).sync();
// 对关闭通道进行监听
channelFuture.channel().closeFuture().sync();
}finally {
group.shutdownGracefully();
}
}
}
- 编写 自定义的 Netty 客户端处理器 :NettyClientHandler
package ke.cloud.netty;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
/**
* 当通道就绪时,就会触发该方法,就可以发信息了
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("【Client】:ctx" + ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,server", CharsetUtil.UTF_8));
}
/**
* 当通道有读取事件时 ,会触发
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("服务器发送的 msg :" + buf.toString(CharsetUtil.UTF_8));
System.out.println("服务器的地址 :"+ ctx.channel().remoteAddress());
}
/**
* 异常处理
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.channel().close();
}
}
- 启动测试
先启动 NettyServer,在启动 NettyClient
NettyServer 的控制台:
NettyClient 的控制台:

3. 对‘Netty快速入门实例’的分析
- 通过分析 ‘Netty快速入门实例’ 的执行流程,来熟悉并加深对 Netty模型 的理解
3.1 BossGroup 和 WorkGroup 怎么确定自己有多少个 NIOEventLoop
- 点进 NioEventLoopGroup


- 一直点进 this,一直进入父类


- 找到这个值的初始化代码

- 所以 BossGroup 和 WorkerGroup 含有的子线程数(NioEventLoop)默认为 CPU 核数*2
- 在此处打一个断点,进行 Debug


- 由源码中的构造方法可知 —— 想要设置线程数只要在参数中输入即可
3.2 WorkerGroup 是如何分配这些进程的
- 设置 BossGroup 进程数为 1 ; WorkerGroup 进程数为 4 ; Client 数位 8
- 设置进程数量

- 输出线程信息,在 NettyServerHandler 中添加

- 启动服务端、和4个客户端

- 启动第五个客户端会分配到哪个线程?

- 又回到了第一个线程,在默认情况下,WorkerGroup 分配的逻辑就是按顺序循环分配的
3.3 BossGroup 和 WorkerGroup 中的 Selector 和 TaskQueue
- 打断点进行 Debug


- 每个子线程都具有自己的 Selector、TaskQueue……
3.4 CTX 上下文、Channel、Pipeline 之间关系
- 修改 NettyServerHandler ,并添加端点

- 先看 CTX 上下文中的信息

上下文中还有很多其他信息,就不一样列举了
- Pipeline

- Channel

- CTX 上下文、Channel、Pipeline 三者关系示意图

4. TaskQueue 任务队列
4.1 概述
- 如果在某个 Handler 中有个长时间的操作,那势必会造成 Pipeline管道 的阻塞,那么这种情况就需要把这些任务提交到 TaskQueue 进行异步执行
- TaskQueue 和 Channel 有绑定的关系
- 任务队列中的 Task 有 3 种典型使用场景
- 用户程序自定义的普通任务
- 用户自定义定时任务
- 非当前 Reactor 线程调用 Channel 的各种方法
4.2 体验任务的阻塞
- 编写一个 服务端的 Handler :NettyServerHandlerTaskQ
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
public class NettyServerHandlerTaskQ extends ChannelInboundHandlerAdapter {
/**
* 读取数据的事件(可以读取客户端发送的消息)
*
* @param ctx 上下文对象,包含 管道、通道、地址
* @param msg 客户端发送的消息,默认是 Object 类型
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("【Server】: ctx" + ctx);
// 比如这里有一个非常耗时的任务,希望可以异步执行
// 把该任务提交到 Channel 对应的 NIOEventLoop 的 TaskQueue 中
Thread.sleep(10*1000);
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端,这是一个执行耗时长的任务", CharsetUtil.UTF_8));
System.out.println("耗时长的任务执行完毕,继续");
}
/**
* 数据读取完毕后,返回消息给客户端
*
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// 把数据写入缓冲区,并刷新缓冲区 write+Flush 写入和清除
// 一般来说,需要对这个发送的消息进行编码
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端",CharsetUtil.UTF_8));
}
/**
* 处理异常
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 关闭通道
ctx.channel().close();
}
}
- 修改 NettyServer 中添加到 Pipeline 的 Handler

- 先这样执行一个查看效果(没有添加到任务队列)





4.3 TaskQueue 使用场景-1
- 用户程序自定义的普通任务
- 上面自定义了一个执行时间长的任务,该任务会阻塞这个 EventLoop 线程,可以使用该方法来解决耗时长的任务的阻塞问题
- 修改 NettyServerHandlerTaskQ 中的 channelRead
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("【Server】: ctx" + ctx);
// 解决方案-1:用户程序自定义的普通任务
ctx.channel().eventLoop().execute(() ->{
try {
Thread.sleep(10*1000);
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端,这是一个执行耗时长的任务", CharsetUtil.UTF_8));
} catch (InterruptedException e) {
e.printStackTrace();
}
});
System.out.println("耗时长的任务执行完毕,继续");
}
erHandlerTaskQ
- 运行测试



- 这也就能看出任务队列的异步执行了
- 下断点,查看 TaskQueue

查看 ctx -> pipeline -> channel -> eventloop -> taskqueue

可以看到这里面有刚刚的 Handler
4.4 TaskQueue 使用场景-2
- 用户自定义定时任务
- 修改 NettyServerHandlerTaskQ 中的 channelRead
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("【Server】: ctx" + ctx);
// 解决方案-2:用户自定义定时任务
// 把任务提交到 scheduledTaskQueue
// 在和服务端连接成功后 5s 开始异步执行 run 方法
ctx.channel().eventLoop().schedule(() ->{
try {
Thread.sleep(10*1000);
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端,这是一个执行耗时长的任务", CharsetUtil.UTF_8));
} catch (InterruptedException e) {
e.printStackTrace();
}
}, 5, TimeUnit.SECONDS);
System.out.println("耗时长的任务执行完毕,继续");
}
- 运行测试


- 下断点,查看 scheduledTaskQueue 中是否多了任务

- 查看 ctx -> pipeline -> channel -> eventloop -> scheduledTaskQueue

4.5 TaskQueue 使用场景-3
- 非当前 Reactor 线程调用 Channel 的各种方法
- 例如:在推送系统的业务线程里面,根据用户的标识,找到对应的 Channel 引用,然后调用 Write 类方法向该用户推送消息,就会进入到这种场景。最终的 Write 会提交到任务队列中后被异步消费
- 该场景就不演示了,只简单说一下思路
- 在用户连接到服务端的时候,可以获取到该客户对应的 SocketChannel 的 HashCode
img - 可以在服务端维护一个 集合 ,存放所有连接的 SocketChannel
- 在有消息需要推送的时候,就遍历这个 SocketChannel 集合,通过 HashCode 找到 Channel
- 通过 Channel 找到 EventLoop
- 再往后就是上面提到的任务的处理了
5. 总结
- Netty 抽象出两组线程池,BossGroup 专门负责接收客户端连接,WorkerGroup 专门负责网络读写操作。
- NioEventLoop 表示一个不断循环执行处理任务的线程,每个 NioEventLoop 都有一个 selector,用于监听绑定在其上的 socket 网络通道。
- NioEventLoop 内部采用串行化设计,从消息的 读取->解码->处理->编码->发送,始终由 IO 线程 NioEventLoop 负责。
• NioEventLoopGroup 下包含多个 NioEventLoop • 每个 NioEventLoop 中包含有一个 Selector,一个 taskQueue
• 每个 NioEventLoop 的 Selector 上可以注册监听多个 NioChannel • 每个 NioChannel 只会绑定在唯一的 NioEventLoop 上
• 每个 NioChannel 都绑定有一个自己的 ChannelPipeline
8. 异步模型和HTTP示例
1. 基本介绍
- 异步的概念和同步相对。当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的组件在完成后,通过状态、通知和回调来通知调用者。
- Netty 中的 I/O 操作是异步的,包括 Bind、Write、Connect 等操作会简单的返回一个 ChannelFuture。
- 调用者并不能立刻获得结果,而是通过 Future-Listener 机制,用户可以方便的主动获 取或者通过通知机制获得 IO 操作结果。
- etty 的异步模型是建立在 future 和 callback 的之上的。callback 就是回调。重点说 Future,它的核心思想是:假设一个方法 fun,计算过程可能非常耗时,等待 fun返回 显然不合适。那么可以在调用 fun 的时候,立马返回一个 Future,后续可以通过 Future去监控方法 fun 的处理过程(即 : Future-Listener 机制)
2. Future 说明
- 表示异步的执行结果, 可以通过它提供的方法来检测执行是否完成,比如检索计算等等。
- ChannelFuture 是一个接口

我们可以添加监听器,当监听的事件发生时,就会通知到监听器.
3. 工作原理示意图

- 说明:
- 在使用 Netty 进行编程时,拦截操作和转换出入站数据只需要您提供 callback 或利用 future 即可。这使得链式操作简单、高效, 并有利于编写可重用的、通用的代码。
- Netty 框架的目标就是让你的业务逻辑从网络基础应用编码中分离出来。
4. Future-Listener 机制
- 当 Future 对象刚刚创建时,处于非完成状态,调用者可以通过返回的 ChannelFuture 来获取操作执行的状态,注册监听函数来执行完成后的操作。
- 常见有如下操作
• 通过 isDone 方法来判断当前操作是否完成;
• 通过 isSuccess 方法来判断已完成的当前操作是否成功;
• 通过 getCause 方法来获取已完成的当前操作失败的原因;
• 通过 isCancelled 方法来判断已完成的当前操作是否被取消;
• 通过 addListener 方法来注册监听器,当操作已完成(isDone 方法返回完成),将会通知 指定的监听器;如果 Future 对象已完成,则通知指定的监听器
- 代码示例
给一个 ChannelFuture 注册监听器,来监控我们关系的事件
// 这里就已经启动了服务器
ChannelFuture channelFuture = bootstrap.bind(6668).sync();
channelFuture.addListener((ChannelFutureListener) future -> {
if (future.isSuccess()){
System.out.println("监听端口 6668 成功");
}else {
System.out.println("监听端口 6668 失败");
}
});
5. 小结
- 相比传统阻塞 I/O,执行 I/O 操作后线程会被阻塞住, 直到操作完成。
- 异步处理的好处是不会造成线程阻塞,线程在 I/O 操作期间可以执行别的程序,在高并发情形下会更稳定和更高的吞吐量。
6. 快速入门实例-HTTP服务
- 要求:
- Netty 服务器在 6668 端口监听,浏览器发出请求 "http://localhost:6668/ "
- 服务器可以回复消息给客户端 "Hello! 我是服务器 5 " , 并 对特定请求资源进行过滤.
- 目的:
Netty 可以做Http服务开发,并且理解Handler实例 和客户端及其请求的关系

- 编写代码 —— 服务端代码
- 编写 服务端 :HttpServer
package ke.cloud.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class HttpServer {
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
// 使用自己写的 ServerInitializer 完成初始化
.childHandler(new HttpServerInitializer());
ChannelFuture channelFuture = bootstrap.bind(6660).sync();
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
- 编写 服务初始化器 :HttpServerInitializer
public class HttpServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 向管道加入处理器
// 得到管道
ChannelPipeline pipeline = socketChannel.pipeline();
// 加入一个 Netty 提供的 httpServerCodec (CoDec => Coder + Decoder => 编解码器)
pipeline.addLast("MyHttpServerCodec",new HttpServerCodec());
// 增加一个自己的 Handler
pipeline.addLast("MyServerHandler", new HttpServerHandler());
}
}
- 编写 服务处理器 :HttpServerHandler
/*
1. SimpleChannelInboundHandler 是之前使用的 ChannelInboundHandlerAdapter 的子类
2. HttpObject 这个类型表示, 客户端、服务端 相互通信的数据需要被封装成什么类型
*/
public class HttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {
/**
* 读取客户端数据
*
* @param ctx 上下文
* @param msg 传递过来的消息
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
if (msg instanceof HttpRequest) {
System.out.println("httpObject 的类型 :"+ msg.getClass());
System.out.println("客户端的地址 : "+ ctx.channel().remoteAddress());
// 回复信息给浏览器,需要把数据封装成 HttpObject 类型
// 创建一个 ButeBuf
ByteBuf byteBuf = Unpooled.copiedBuffer("Hello, 我是服务器", CharsetUtil.UTF_8);
// 构建一个 Http 的响应,即 httpResponse ; 后面的三个参数 :(Http 协议的版本, Http 的状态码, 需要传输的内容)
FullHttpResponse response = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, HttpResponseStatus.OK, byteBuf);
// 设置文本的类型,及字符编码
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain;charset=utf-8");
// 文本的长度
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, byteBuf.readableBytes());
// 将构建好的 response 返回
ctx.writeAndFlush(response);
}
}
}
- 启动测试
- 打开 连接 http://localhost:6660/

- 出现了两次请求


上面的服务端启动后,在页面上不止接收到了文本,还接收到了一个网页的图标
现在把它过滤掉
修改 HttpServerHandler
// 获取请求的 URI
HttpRequest httpRequest = (HttpRequest) msg;
URI uri = new URI(httpRequest.uri());
// 判断请求路径为 /favicon.ico,就不做处理
if ("/favicon.ico".equals(uri.getPath())){
System.out.println("请求了 图标 资源,不做响应");
return;
}

- 启动测试

9. Netty 核心模块
Bootstrap 和 ServerBootstrap
1.- Bootstrap 意思是引导,一个 Netty 应用通常由一个 Bootstrap 开始,主要作用是配置整个 Netty 程序,串联各个组件,Netty 中 Bootstrap类是客户端程序的启动引导类, ServerBootstrap是服务端启动引导类
- 常见的方法有
• public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup),该方法用于服务器端,用来设置两个 EventLoop
• public B channel(Class<? extends C> channelClass),该方法用来设置一个服务器端的通道实现
• public ChannelFuture bind(int inetPort) ,该方法用于服务器端,用来设置占用的端口号
• public B option(ChannelOption option, T value),用来给 ServerChannel 添加配置
• public B group(EventLoopGroup group) ,该方法用于客户端,用来设置一个 EventLoop
• public ChannelFuture connect(String inetHost, int inetPort) ,该方法用于客户端,用来连接服务器 端
• public ServerBootstrap childOption(ChannelOption childOption, T value),用来给接收到的通道添加配置
• public ServerBootstrap childHandler(ChannelHandler childHandler),该方法用来设置业务处理类 (自定义的 handler)
2. Future 和 ChannelFuture
- Netty 中所有的 IO 操作都是异步的,不能立刻得知消息是否被正确处理。但是可以过一会等它执行完成或者直接注册一个监听,具体的实现就是通过 Future 和 ChannelFutures,他们可以注册一个监听,当操作执行成功或失败时监听会自动触发注册的监听事件
- 常见的方法有
• Channel channel(),返回当前正在进行 IO 操作的通道 • ChannelFuture sync(),等待异步操作执行完毕
3. Channel
- Netty 网络通信的组件,能够用于执行网络 I/O 操作。
- 通过Channel 可获得当前网络连接的通道的状态。
- 通过Channel 可获得网络连接的配置参数 (例如接收缓冲区大小)。
- Channel 提供异步的网络 I/O 操作(如建立连接,读写,绑定端口),异步调用意味着任何 I/O 调用都将立即返回,并且不保证在调用结束时所请求的 I/O 操作已完成。
- 调用立即返回一个 ChannelFuture 实例,通过注册监听器到 ChannelFuture 上,可以 I/O 操作成功、失败或取消时回调通知调用方。
- 支持关联 I/O 操作与对应的处理程序。
- 不同协议、不同的阻塞类型的连接都有不同的 Channel 类型与之对应。
- 常用的 Channel 类型
• NioSocketChannel,异步的客户端 TCP Socket 连接。
• NioServerSocketChannel,异步的服务器端 TCP Socket 连接。
• NioDatagramChannel,异步的 UDP 连接。
• NioSctpChannel,异步的客户端 Sctp 连接。
• NioSctpServerChannel,异步的 Sctp 服务器端连接,这些通道涵盖了 UDP 和 TCP 网络 IO 以及文件 IO。
Selector
4.- Netty 基于 Selector 对象实现 I/O 多路复用,通过 Selector 一个线程可以监听多个连接的 Channel 事件。
- 当向一个 Selector 中注册 Channel 后,Selector 内部的机制就可以自动不断地查询 (Select) 这些注册的 Channel 是否有已就绪的 I/O 事件(例如可读,可写,网络连接 完成等),这样程序就可以很简单地使用一个线程高效地管理多个 Channe。
5. ChannelHandler 及其实现类
- ChannelHandler 是一个接口,处理 I/O 事件或拦截 I/O 操作,并将其转发到其 ChannelPipeline(业务处理链)中的下一个处理程序。
- ChannelHandler 本身并没有提供很多方法,因为这个接口有许多的方法需要实现,方 便使用期间,可以继承它的子类
- ChannelHandler 及其实现类一览图。

- 我们经常需要自定义一 个 Handler 类去继承 ChannelInboundHandlerA dapter,然后通过重写相应方法实现业务逻辑
public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {
// 通道注册事件
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelRegistered();
}
// 通道注销事件
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelUnregistered();
}
// 通道就绪事件
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
}
// 通道读取数据事件
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
// 通道读取数据完毕事件
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelReadComplete();
}
// 通道发生异常事件
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.fireExceptionCaught(cause);
}
}
6. Pipeline 和 ChannelPipeline
- ChannelPipeline 是一个重点。
- ChannelPipeline 是一个 Handler 的集合,它负责处理和拦截 inbound(入站) 或者 outbound(出战) 的事件和操作,相当于一个贯穿 Netty 的链。(也可以这样理解: ChannelPipeline 是 保存 ChannelHandler 的 List,用于处理或拦截 Channel 的入站 和出站 事件 / 操作)
- ChannelPipeline 实现了一种高级形式的拦截过滤器模式,使用户可以完全控制事件的处理方式,以及 Channel 中各个的 ChannelHandler 如何相互交互。
- 在 Netty 中每个 Channel 都有且仅有一个 ChannelPipeline 与之对应,它们的组成关系如下

说明 :
• 一个 Channel 包含了一个 ChannelPipeline,而 ChannelPipeline 中又维护了一个由 ChannelHandlerContext 组成的双向链表,并且每个 ChannelHandlerContext 中又关联着一个 ChannelHandler。
• 入站事件和出站事件在一个双向链表中,入站事件会从链表 head 往后传递到最后一个入站的 handler, 出站事件会从链表 tail 往前传递到最前一个出站的 handler,两种类型的 handler 互不干扰。
- 常用方法
• ChannelPipeline addFirst(ChannelHandler… handlers),把一个业务处理类(handler) 添加到链中的第一个位置。
• ChannelPipeline addLast(ChannelHandler… handlers),把一个业务处理类(handler) 添加到链中的最后一个位置。
7. ChannelHandlerContext
- 保存 Channel 相关的所有上下文信息,同时关联一个 ChannelHandler 对象。
- 即ChannelHandlerContext 中 包 含 一 个 具 体 的 事 件 处 理 器 ChannelHandler ,同 时ChannelHandlerContext 中也绑定了对应的 pipeline 和 Channel 的信息,方便 对 ChannelHandler进行调用。
- 常用方法
• ChannelFuture close(),关闭通道
• ChannelOutboundInvoker flush(),刷新
• ChannelFuture writeAndFlush(Object msg) , 将 数 据 写 到 ChannelPipeline 中 当 前
• ChannelHandler 的下一个 ChannelHandler 开始处理(出站)
8. ChannelOption
Netty 在创建 Channel 实例后,一般都需要设置 ChannelOption 参数。
ChannelOption 参数如下:
ChannelOption.SO_BACKLOG :
对应 TCP/IP 协议 listen 函数中的 backlog 参数,用来初始化服务器可连接队列大小。服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。多个客户 端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理,backlog 参数指定了队列的大小。
- ChannelOption.SO_KEEPALIVE :
一直保持连接活动状态
9. EventLoopGroup 和其实现类 NioEventLoopGroup
- EventLoopGroup 是一组 EventLoop 的抽象,Netty 为了更好的利用多核 CPU 资源, 一般会有多个 EventLoop 同时工作,每个 EventLoop 维护着一个 Selector 实例。
- EventLoopGroup 是一组 EventLoop 的抽象,Netty 为了更好的利用多核 CPU 资源, 一般会有多个 EventLoop 同时工作,每个 EventLoop 维护着一个 Selector 实例。
- 通常一个服务端口即一个 ServerSocketChannel对应一个Selector 和一个EventLoop 线程。BossEventLoop 负责接收客户端的连接并将 SocketChannel 交给 WorkerEventLoopGroup 来进行 IO 处理,如下图所示

说明:
BossEventLoopGroup 通常是一个单线程的 EventLoop,EventLoop 维护着一个注册了ServerSocketChannel 的 Selector 实例,BossEventLoop 不断轮询 Selector 将连接事件分离出来
通常是 OP_ACCEPT 事件,然后将接收到的 SocketChannel 交给 WorkerEventLoopGroup
WorkerEventLoopGroup 会由 next 选择 其中一个 EventLoop来将这个 SocketChannel 注册到其维护的 Selector 并对其后续的 IO 事件进行处理 ,一个 EventLoop 可以处理多个 Channel
常用方法
public NioEventLoopGroup(),构造方法。
public Future<?> shutdownGracefully(),断开连接,关闭线程。
10. Unpooled 类
- Netty 提供一个专门用来操作缓冲区(即Netty的数据容器)的工具类
- 常用方法如下所示
public static ByteBuf copiedBuffer(CharSequence string, Charset charset) 通过给定的数据和字符编码返回一个 ByteBuf 对象(类似于 NIO 中的 ByteBuffer 但有区别)
- 举例说明Unpooled 获取 Netty的数据容器ByteBuf 的基本使用

- 代码示例-1
体会以上三个属性值
public class ByteBuf01 {
public static void main(String[] args) {
// 创建一个 byteBuf
/*
说明
1. 创建一个对象,该对象包含一个数组,是一个 byte[10]
2. Netty 的 Buf 存取数据,不需要像 NIO 一样使用 Filp 切换
Netty 底层维护了一个 ReaderIndex(下一个读的位置) 和 WriterIndex(下一个写的位置)
*/
ByteBuf buffer = Unpooled.buffer(10);
// 向 buf 存数据
for (int i = 0; i < 10; i++) {
buffer.writeByte(i);
}
System.out.println("写完数据后 {ReaderIndex: "+buffer.readerIndex()+", WriterIndex: "+buffer.writerIndex()+"}");
System.out.println("buf 的长度 - capacity :"+ buffer.capacity());
// 输出
for (int i = 0; i < buffer.capacity(); i++) {
// 读数据的方式-1 :直接 get 第几个 byte
//System.out.println(buffer.getByte(i));
// 读数据的方式-2 :通过移动 ReaderIndex 遍历
System.out.print(buffer.readByte() + " ");
}
System.out.println();
System.out.println("读完数据后 {ReaderIndex: "+buffer.readerIndex()+", WriterIndex: "+buffer.writerIndex()+"}");
}
}

- 代码示例-2
Netty - Buf 的常用 API
public class ByteBuf02 {
public static void main(String[] args) {
// 用其他方式创建 Buf ,参数 :(存入 Buf 的文本 , 字符编码)
ByteBuf byteBuf = Unpooled.copiedBuffer("【呵呵】:Hello,Buf", CharsetUtil.UTF_8);
// 使用相关的 API
if (byteBuf.hasArray()){ // 如果有内容
// 获得 buf 中的数据
byte[] bytes = byteBuf.array();
// 转成 String 输出
System.out.println(new String(bytes, CharsetUtil.UTF_8));
// 查看 ByteBuf 中真正存的是什么
System.out.println("ByteBuf : "+ byteBuf);
// 数组的偏移量
System.out.println("偏移量 :"+ byteBuf.arrayOffset());
System.out.println("WriterIndex: "+byteBuf.writerIndex());
byteBuf.getByte(0);
System.out.println("getByte 后 :ReaderIndex: "+byteBuf.readerIndex()+",可读取的字节数 :" + byteBuf.readableBytes());
byteBuf.readByte();
System.out.println("readByte 后 :ReaderIndex: "+byteBuf.readerIndex()+",可读取的字节数 :" + byteBuf.readableBytes());
// 读取某一段,参数:(起点,终点,字符集编码)
System.out.println(byteBuf.getCharSequence(9, 24, CharsetUtil.UTF_8));
}
}
}

11. Netty应用实例-群聊系统
- 要求:
- 编写一个 Netty 群聊系统,实现服务器端和客户端之间的数据简单通讯(非阻塞)
- 实现多人群聊
- 服务器端:可以监测用户上线,离线,并实现消息转发功能
- 客户端:通过channel 可以无阻塞发送消息给其它所有用户,同时可以接受其它用 户发送的消息(有服务器转发得到)
- 目的:
进一步理解Netty非阻塞网络编程机制
代码实现
服务端 : ChatServer
public class ChatServer {
// 端口
private int port;
/**
* 构造器
*/
public ChatServer(int port) {
this.port = port;
}
/**
* 处理客户端的请求
*/
public void run() throws InterruptedException {
// 创建两个线程组
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(8);
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 获取 Pipeline
ChannelPipeline pipeline = socketChannel.pipeline();
// 通过 Pipeline 添加编、解码器(Netty 自带)
pipeline.addLast("decoder",new StringDecoder());
pipeline.addLast("encoder",new StringEncoder());
// 加入自己的 Handler
pipeline.addLast(new ChatServerHandler());
}
});
System.out.println("服务端准备完毕");
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
new ChatServer(8000).run();
}
}
服务端的处理器 :ChatServerHandler
public class ChatServerHandler extends SimpleChannelInboundHandler<String> {
/**
* 定义一个 Channel 线程组,管理所有的 Channel, 参数 执行器
* GlobalEventExecutor => 全局事件执行器
* INSTANCE => 表示是单例的
*/
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
//定义一个时间的输出格式
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
/**
* 当连接建立之后,第一个被执行
* 一连接成功,就把当前的 Channel 加入到 ChannelGroup,并将上线消息推送给其他客户
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// 获取当前 Channel
Channel channel = ctx.channel();
// 将该客户上线的信息,推送给其他在线的 客户端
// 该方法,会将 ChannelGroup 中所有的 Channel 遍历,并发送消息
Date date = new Date(System.currentTimeMillis());
channelGroup.writeAndFlush("[客户端] ["+dateFormat.format(date)+"] "+channel.remoteAddress()+" 加入群聊~\n");
// 将当前 Channel 加入 ChannelGroup
channelGroup.add(channel);
}
/**
* 当断开连接激活,将 XXX 退出群聊消息推送给当前在线的客户
* 当某个 Channel 执行到这个方法,会自动从 ChannelGroup 中移除
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Date date = new Date(System.currentTimeMillis());
channelGroup.writeAndFlush("[客户端] ["+dateFormat.format(date)+"] "+ctx.channel().remoteAddress() + " 退出群聊~\n");
// 输出 ChannelGroup 的大小
System.out.println("==== ChannelGroup-Size : " + channelGroup.size());
}
/**
* 当 Channel 处于一个活动的状态激活,可以提示 XXX 上线
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Date date = new Date(System.currentTimeMillis());
System.out.println("["+dateFormat.format(date)+"] "+ctx.channel().remoteAddress() + " 已上线~\n");
}
/**
* 当 Channel 处于不活动的状态激活,提示 XXX 离线
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Date date = new Date(System.currentTimeMillis());
System.out.println("["+dateFormat.format(date)+"] "+ctx.channel().remoteAddress() + " 已下线~\n");
}
/**
* 读取数据,并把读取到的数据转发给所有 客户
*/
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
// 获取到 当前 Channel
Channel channel = channelHandlerContext.channel();
Date date = new Date(System.currentTimeMillis());
//遍历 ChannelGroup 根据不同的情况,推送不同的消息
channelGroup.forEach(ch -> {
if (ch != channel){//遍历到的当前的 ch 不是发消息的 Channel
ch.writeAndFlush("[客户端] ["+dateFormat.format(date)+"] "+channel.remoteAddress()+" 发送了消息 :"+s+"\n");
}else {// 当前 ch 就是发消息的那个客户
ch.writeAndFlush("[自己] ["+dateFormat.format(date)+"] "+s+" | 发送成功~\n");
}
});
}
/**
* 异常处理
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 关闭该通道
ctx.close();
}
}
客户端 :ChatClient
public class ChatClient {
// 主机地址
private final String host;
// 端口号
private final int port;
public ChatClient(String HOST, int PORT) {
this.host = HOST;
this.port = PORT;
}
public void run() throws InterruptedException {
//创建线程组 boosGroup
EventLoopGroup boosGroup = new NioEventLoopGroup();
try {
// 创建启动对象,设置参数
Bootstrap bootstrap = new Bootstrap();
// 设置两个线程组boosGroup\workerGroup
bootstrap.group(boosGroup)
// 设置服务通道实现类型
.channel(NioSocketChannel.class)
// 使用匿名内部类的形式初始化通道
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast(new ChatClientHandler());
}
}); //给workerGroup的EventLoop对应的管道设置处理器
System.out.println("客户端准备完毕");
// 绑定端口号,启动服务端
ChannelFuture channelFuture = bootstrap.connect(host,port).sync();
Channel channel = channelFuture.channel();
System.out.println("------ "+ channel.localAddress()+" ------");
// 因为客户端需要输入信息,所以需要扫描器
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()){
String s = scanner.nextLine();
// 通过 Channel 发送到 服务端
channel.writeAndFlush(s+"\r\n");
}
channelFuture.channel().closeFuture().sync();
} finally {
boosGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
new ChatClient("localhost",8000).run();
}
}
客户端的处理器 :ChatClientHandler
public class ChatClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
// 直接输出从服务端获得的信息
System.out.println(msg.trim());
}
}
- 启动测试
启动服务端,三个客户端
服务端的控制台输出

三个客户端的输出



客户端-1 发送消息——“呵呵”



三个客户端下线

12. Netty心跳检测机制案例
- 要求:
- 编写一个 Netty心跳检测机制案例, 当服务器超过3秒没有读时,就提示读空闲
- 当服务器超过5秒没有写操作时,就提示写空闲
- 实现当服务器超过7秒没有读或者写操作时,就提示读写空闲
代码实现
服务端 :Server
public class Server {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(8);
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))// 在 bossGroup 增加 日志处理器
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
/*
说明:
1. IdleStateHandler 是 Netty 提供的 空闲状态处理器
2. 四个参数:
readerIdleTime : 表示多久没有 读 事件后,就会发送一个心跳检测包,检测是否还是连接状态
writerIdleTime : 表示多久没有 写 事件后,……
allIdleTime : 表示多久 既没读也没写 后,……
TimeUnit : 时间单位
3. 当 Channel 一段时间内没有执行 读 / 写 / 读写 事件后,就会触发一个 IdleStateEvent 空闲状态事件
4. 当 IdleStateEvent 触发后,就会传递给 Pipeline 中的下一个 Handler 去处理,
通过回调下一个 Handler 的 userEventTriggered 方法,在该方法中处理 IdleStateEvent
*/
pipeline.addLast(new IdleStateHandler(3, 5, 7, TimeUnit.SECONDS));
// 对 空闲检测 进一步处理的 自定义的 Handler
pipeline.addLast(new ServerHandler());
}
});
System.out.println("服务器准备好了");
ChannelFuture channelFuture = bootstrap.bind(8000).sync();
channelFuture.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
服务端的处理器(空闲事件处理):
public class ServerHandler extends ChannelInboundHandlerAdapter {
/**
* 对 空闲事件 的处理
* @param ctx 上下文
* @param evt 传递过来的事件
* @throws Exception
*/
private int list[] = new int[3];
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// 判断这个事件是否是 IdleStateEvent 空闲事件
if (evt instanceof IdleStateEvent){
// 将 event 向下转型 => IdleStateEvent
IdleStateEvent event = (IdleStateEvent) evt;
String eventType = null;
int index = -1;
// 判断具体是哪一个空闲事件
switch (event.state()){
// 读空闲
case READER_IDLE:
eventType = "读空闲";
index = 0;
break;
case WRITER_IDLE:
eventType = "写空闲";
index = 1;
break;
case ALL_IDLE:
eventType = "读写空闲";
index = 2;
break;
}
list[index] ++;
System.out.println("[超时事件] "+ctx.channel().remoteAddress()+" 发生了 "+eventType+"---第"+list[index]+"次");
System.out.println("服务器进行相应处理");
if (list[index] >= 3){
ctx.channel().close();
System.out.println("关闭该通道");
}
}
}
}
- 启动测试
启动服务端,再启动上一个群聊案例的客户端,进行测试


13. Netty 通过WebSocket编程实现服务器和客户端长连接
- 要求:
- 实现基于webSocket的长连接 的全双工的交互
- 改变Http协议多次请求的约束,实 现长连接了, 服务器可以发送消息 给浏览器
- 客户端浏览器和服务器端会相互感 知,比如服务器关闭了,浏览器会 感知,同样浏览器关闭了,服务器 会感知
代码实现
服务端 :WebServer
public class WebServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(8);
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// 因为基于 HTTP 协议,所以需要使用 HTTP 的编解码器
pipeline.addLast(new HttpServerCodec());
// 添加块处理器
pipeline.addLast(new ChunkedWriteHandler());
/*
说明:
1. 因为 HTTP 数据传输时是分段的,HttpObjectAggregator 可以将多个端聚合
2. 这就是为什么浏览器发送大量数据时,就会发出多次 HTTP 请求
*/
pipeline.addLast(new HttpObjectAggregator(8192));
/*
说明:
1. 对于 WebSocket 是以 帧 的形式传递的
2. 后面的参数表示 :请求的 URL
3. WebSocketServerProtocolHandler 将 HTTP 协议升级为 WebSocket 协议,即保持长连接
*/
pipeline.addLast(new WebSocketServerProtocolHandler("/hello"));
// 自定义的 Handler
pipeline.addLast(new WebServerHandler());
}
});
System.out.println("服务器准备好了");
ChannelFuture channelFuture = bootstrap.bind(8000).sync();
channelFuture.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
服务端的处理器 :WebServerHandler
public class WebServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
// TextWebSocketFrame 类型是 WebSocket 的一个子类,表示一个文本帧
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame msg) throws Exception {
System.out.println("服务器端收到消息:" + msg.text());
// 回复浏览器
channelHandlerContext.channel().writeAndFlush(
new TextWebSocketFrame("【服务器】"+ LocalDateTime.now()+" | "+msg.text()));
}
// web 连接后触发
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// id 表示标识,asLongText 输出的是唯一的,asShortText 不一定是唯一的
System.out.println("handlerAdded 被调用-- "+ctx.channel().id().asLongText()+" (LongText)");
System.out.println("handlerAdded 被调用-- "+ctx.channel().id().asShortText()+" (ShortText)");
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
// id 表示标识,asLongText 输出的是唯一的,asShortText 不一定是唯一的
System.out.println("handlerRemoved 被调用-- "+ctx.channel().id().asLongText()+" (LongText)");
System.out.println("handlerRemoved 被调用-- "+ctx.channel().id().asShortText()+" (ShortText)");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("【异常】 " + cause.getMessage());
ctx.close();
}
}
网页客户端:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<form onsubmit="return false">
<p>输入文本</p>
<textarea id="message" name="message" style="height: 300px; width: 300px"></textarea>
<input type="button" value="发送消息" onclick="send(this.form.message.value)">
<p>回复文本</p>
<textarea id="responseText" style="height: 300px; width: 300px"></textarea>
<input type="button" value="清空内容" onclick="document.getElementById('responseText').value=''">
</form>
</body>
<script>
var socket;
// 判断当前浏览器是否支持 WebSocket
if (window.WebSocket){
socket = new WebSocket("ws://localhost:8000/hello");
// 相当于 channelRead0 方法,ev 收到服务器端回送的消息
socket.onmessage = function (ev){
var rt = document.getElementById("responseText");
rt.value = rt.value + "\n" + ev.data;
}
// 相当于连接开启,感知到连接开启
socket.onopen = function (){
var rt = document.getElementById("responseText");
rt.value = rt.value + "\n" + "连接开启……";
}
// 感知连接关闭
socket.onclose = function (){
var rt = document.getElementById("responseText");
rt.value = rt.value + "\n" + "连接关闭……";
}
}else {
alert("不支持 WebSocket");
}
// 发送消息到服务器
function send(message){
// 判断 WebSocket 是否创建好了
if (!window.socket){
return ;
}
// 判断 WebSocket 是否开启
if (socket.readyState == WebSocket.OPEN){
// 通过 Socket 发送消息
socket.send(message);
}else {
alert("连接未开启");
}
}
</script>
</html>
- 启动测试





10. Google Protobu
1. 编码和解码
- 编写网络应用程序时,因为数据在网络中传输的都是二进制字节码数据,在发送 数据时就需要编码,接收数据时就需要解码 [示意图]。
- codec(编解码器) 的组成部分有两个:decoder(解码器)和 encoder(编码器)。encoder 负责把业务数据转换成字节码数据,decoder 负责把字节码数据转换成 业务数据。

2. Netty本身编解码器机制和问题
编码器 | 解码器 |
---|---|
StringEncoder:对字符串数据进行编码 ObjectEncoder:对 Java 对象进行编码 ······· | StringDecoder:对字符串数据进行解码 ObjectDecoder:对 Java 对象进行解 ······· |
**机制:**Netty 本身自带的 ObjectDecoder 和 ObjectEncoder 可以用来实现 POJO 对象或各种业务对象的编码和解码,底层使用的仍是 Java 序列化技术 , 而Java 序列化技术本身效率就不高
存在问题:
- 无法跨语言
- 序列化后的体积太大,是二进制编码的 5 倍多。
- 序列化性能太低
3. Protobuf 介绍

- Protobuf 是 Google 发布的开源项目,全称 Google Protocol Buffers,是一种轻便高 效的结构化数据存储格式,可以用于结构化数据串行化,或者说序列化。它很适合做 数据存储或 RPC[远程过程调用 remote procedure call ] 数据交换格式 。 目前很多公司 http+json → tcp+protobuf
- 参考文档 : https://developers.google.com/protocol-buffers/docs/proto 语言指南
- Protobuf 是以 message 的方式来管理数据的.
- 支持跨平台、跨语言,即[客户端和服务器端可以是不同的语言编写的] (支持目前绝 大多数语言,例如 C++、C#、Java、python 等)
- 高性能,高可靠性
- 使用 protobuf 编译器能自动生成代码,Protobuf 是将类的定义使用.proto 文件进行描 述。说明,在idea 中编写 .proto 文件时,会自动提示是否下载 .ptotot 编写插件. 可 以让语法高亮。
- 然后通过 protoc.exe 编译器根据.proto 自动生成.java 文件
4. 案例演示
[1] 传输单一固定类型
需求:
- 客户端可以发送一个Student PoJo 对象到服 务器 (通过 Protobuf 编码)
- 服务端能接收Student PoJo 对象,并显示信 息(通过 Protobuf 解码)
步骤:
- 编写 .proto 文件
- 通过 protoc.exe 编译器根据 .proto 自动生成 .java 文件
- 将 .java 文件复制到项目中
- 编写业务代码
① Student.proto
syntax = "proto3"; //版本
option java_outer_classname = "StudentPOJO";//生成的外部类名,同时也是文件名
//protobuf 使用message 管理数据
message Student { //会在 StudentPOJO 外部类生成一个内部类 Student, 他是真正发送的POJO对象
int32 id = 1; // Student 类中有 一个属性 名字为 id 类型为int32(protobuf类型) 1表示属性序号,不是值
string name = 2;
}
② 生成 .java 文件


执行命令
# 格式
protoc.exe --java_out=. 文件名
# 举例
protoc.exe --java_out=. Student.proto

③ 文件直接生成在项目中

④ 服务端
项目中加入protobuf依赖,版本必须与protoc.exe版本一致
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.22.0</version>
</dependency>
public class NettyServer {
public static void main(String[] args) throws Exception {
//创建BossGroup 和 WorkerGroup
//说明
//1. 创建两个线程组 bossGroup 和 workerGroup
//2. bossGroup 只是处理连接请求 , 真正的和客户端业务处理,会交给 workerGroup完成
//3. 两个都是无限循环
//4. bossGroup 和 workerGroup 含有的子线程(NioEventLoop)的个数
// 默认实际 cpu核数 * 2
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(); //8
try {
//创建服务器端的启动对象,配置参数
ServerBootstrap bootstrap = new ServerBootstrap();
//使用链式编程来进行设置
bootstrap.group(bossGroup, workerGroup) //设置两个线程组
.channel(NioServerSocketChannel.class) //使用NioSocketChannel 作为服务器的通道实现
.option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列得到连接个数
.childOption(ChannelOption.SO_KEEPALIVE, true) //设置保持活动连接状态
// .handler(null) // 该 handler对应 bossGroup , childHandler 对应 workerGroup
.childHandler(new ChannelInitializer<SocketChannel>() {//创建一个通道初始化对象(匿名对象)
//给pipeline 设置处理器
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//在pipeline加入ProtoBufDecoder
//指定对哪种对象进行解码
pipeline.addLast("decoder", new ProtobufDecoder(StudentPOJO.Student.getDefaultInstance()));
pipeline.addLast(new NettyServerHandler());
}
}); // 给我们的workerGroup 的 EventLoop 对应的管道设置处理器
System.out.println(".....服务器 is ready...");
//绑定一个端口并且同步, 生成了一个 ChannelFuture 对象
//启动服务器(并绑定端口)
ChannelFuture cf = bootstrap.bind(6668).sync();
//给cf 注册监听器,监控我们关心的事件
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (cf.isSuccess()) {
System.out.println("监听端口 6668 成功");
} else {
System.out.println("监听端口 6668 失败");
}
}
});
//对关闭通道进行监听
cf.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
⑤ 服务端 Handler
/*
说明
1. 我们自定义一个Handler 需要继续netty 规定好的某个HandlerAdapter(规范)
2. 这时我们自定义一个Handler , 才能称为一个handler
*/
//public class NettyServerHandler extends ChannelInboundHandlerAdapter {
public class NettyServerHandler extends SimpleChannelInboundHandler<StudentPOJO.Student> {
//读取数据实际(这里我们可以读取客户端发送的消息)
/*
1. ChannelHandlerContext ctx:上下文对象, 含有 管道pipeline , 通道channel, 地址
2. Object msg: 就是客户端发送的数据 默认Object
*/
@Override
public void channelRead0(ChannelHandlerContext ctx, StudentPOJO.Student msg) throws Exception {
//读取从客户端发送的StudentPojo.Student
System.out.println("客户端发送的数据 id=" + msg.getId() + " 名字=" + msg.getName());
}
//数据读取完毕
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//writeAndFlush 是 write + flush
//将数据写入到缓存,并刷新
//一般讲,我们对这个发送的数据进行编码
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵1", CharsetUtil.UTF_8));
}
//处理异常, 一般是需要关闭通道
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
⑥ 客户端
public class NettyClient {
public static void main(String[] args) throws Exception {
//客户端需要一个事件循环组
EventLoopGroup group = new NioEventLoopGroup();
try {
//创建客户端启动对象
//注意客户端使用的不是 ServerBootstrap 而是 Bootstrap
Bootstrap bootstrap = new Bootstrap();
//设置相关参数
bootstrap.group(group) //设置线程组
.channel(NioSocketChannel.class) // 设置客户端通道的实现类(反射)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//在pipeline中加入 ProtoBufEncoder
pipeline.addLast("encoder", new ProtobufEncoder());
pipeline.addLast(new NettyClientHandler()); //加入自己的处理器
}
});
System.out.println("客户端 ok..");
//启动客户端去连接服务器端
//关于 ChannelFuture 要分析,涉及到netty的异步模型
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
//给关闭通道进行监听
channelFuture.channel().closeFuture().sync();
}finally {
group.shutdownGracefully();
}
}
}
⑦ 客户端 Handler
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
//当通道就绪就会触发该方法
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//发生一个Student 对象到服务器
StudentPOJO.Student student = StudentPOJO.Student.newBuilder().setId(4).setName("智多星 吴用").build();
//Teacher , Member ,Message
ctx.writeAndFlush(student);
}
//当通道有读取事件时,会触发
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("服务器回复的消息:" + buf.toString(CharsetUtil.UTF_8));
System.out.println("服务器的地址: "+ ctx.channel().remoteAddress());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
启动测试

[2] 随机传输多种类型
- 客户端可以随机发送Student PoJo/ Worker PoJo 对象到服务器 (通过 Protobuf 编码)
- 服务端能接收Student PoJo/ Worker PoJo 对象(需要判断是哪种类型),并显示信息(通过 Protobuf 解码)
① MyDataInfo.proto
syntax = "proto3";
option optimize_for = SPEED; // 加快解析
option java_package="netty.codec2"; //指定生成到哪个包下
option java_outer_classname="MyDataInfo"; // 外部类名, 文件名
//protobuf 可以使用message 管理其他的message
message MyMessage {
//定义一个枚举类型
enum DataType {
StudentType = 0; //在proto3 要求enum的编号从0开始
WorkerType = 1;
}
//用data_type 来标识传的是哪一个枚举类型
DataType data_type = 1;
//表示每次枚举类型最多只能出现其中的一个, 节省空间
oneof dataBody {
Student student = 2;
Worker worker = 3;
}
}
message Student {
int32 id = 1;//Student类的属性
string name = 2; //
}
message Worker {
string name=1;
int32 age=2;
}
② 生成 .java 文件

③ 基于上个例子修改
server
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//在pipeline加入ProtoBufDecoder
//指定对哪种对象进行解码
// pipeline.addLast("decoder", new ProtobufDecoder(StudentPOJO.Student.getDefaultInstance()));
pipeline.addLast("decoder", new ProtobufDecoder(MyDataInfo.MyMessage.getDefaultInstance()));
pipeline.addLast(new NettyServerHandler());
}
serverHandler
public class NettyServerHandler extends SimpleChannelInboundHandler<MyDataInfo.MyMessage> {
//读取数据实际(这里我们可以读取客户端发送的消息)
/*
1. ChannelHandlerContext ctx:上下文对象, 含有 管道pipeline , 通道channel, 地址
2. Object msg: 就是客户端发送的数据 默认Object
*/
@Override
// public void channelRead0(ChannelHandlerContext ctx, StudentPOJO.Student msg) throws Exception {
public void channelRead0(ChannelHandlerContext ctx, MyDataInfo.MyMessage msg) throws Exception {
//读取从客户端发送的StudentPojo.Student
// System.out.println("客户端发送的数据 id=" + msg.getId() + " 名字=" + msg.getName());
//根据dataType 来显示不同的信息
MyDataInfo.MyMessage.DataType dataType = msg.getDataType();
if(dataType == MyDataInfo.MyMessage.DataType.StudentType) {
MyDataInfo.Student student = msg.getStudent();
System.out.println("学生id=" + student.getId() + " 学生名字=" + student.getName());
} else if(dataType == MyDataInfo.MyMessage.DataType.WorkerType) {
MyDataInfo.Worker worker = msg.getWorker();
System.out.println("工人的名字=" + worker.getName() + " 年龄=" + worker.getAge());
} else {
System.out.println("传输的类型不正确");
}
}
}
clientHandler
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
//当通道就绪就会触发该方法
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// //发生一个Student 对象到服务器
//
// StudentPOJO.Student student = StudentPOJO.Student.newBuilder().setId(4).setName("智多星 吴用").build();
// //Teacher , Member ,Message
// ctx.writeAndFlush(student);
//随机的发送Student 或者 Workder 对象
int random = new Random().nextInt(3);
MyDataInfo.MyMessage myMessage = null;
if(0 == random) { //发送Student 对象
myMessage = MyDataInfo.MyMessage.newBuilder().setDataType(MyDataInfo.MyMessage.DataType.StudentType).setStudent(MyDataInfo.Student.newBuilder().setId(5).setName("玉麒麟 卢俊义").build()).build();
} else { // 发送一个Worker 对象
myMessage = MyDataInfo.MyMessage.newBuilder().setDataType(MyDataInfo.MyMessage.DataType.WorkerType).setWorker(MyDataInfo.Worker.newBuilder().setAge(20).setName("老李").build()).build();
}
ctx.writeAndFlush(myMessage);
}
}
启动测试
11. Netty编解码机制
1. 说明(以解码器为例)
详细的使用请看 【Handler 调用机制】中的案例

- 编解码器本质是一个 Handler
- 以入站为例,对于每个从入站 Channel 读取的消息,这个方法会被调用。随后, 它将调用由解码器所提供的 decode() 方法进行解码,并将已经解码的字节转发给 ChannelPipeline 中的下一个 Handler (链式调用),链式调用—见 Handler 机制
- 由于不可能知道远程节点是否会一次性发送一个完整的信息, TCP 有可能出现粘包拆包的问题, 这个类会对入站数据进行缓冲, 直到它准备好被处理,见粘包拆包章节
2. 代码演示(以解码器为例)
public class ToIntegerDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception{
if (in.readableBytes() >= 4) {
out.add(in.readInt());
}
}
}
**文字说明:**这个例子,每次入站从 ByteBuf 中读取 4 字节(因为 int 占 4 个字节),将其解码为一个 int,然后将它添加到下一个 List中。 当没有更多元素可以被添加到该 List 中时,它的内容将会被发送给下一个 ChannelInboundHandler 。 int在被添加到 List 中时,会被自动装箱为 Integer。在调用 readInt() 方法前必须验证所输入的 ByteBuf 是否具有足够的数据

3. ReplayingDecoder
说明:ReplayingDecoder 扩展了 ByteToMessageDecoder 类,使用这个类,我们不必调用 readableBytes() 方法。参数 S 指定了用户状态管理的类型,其中 Void 代表不需要状态管理(状态管理:解码的类型,如 int、long)
代码演示(对 5.2 的改进):
public class MyByteToLongDecoder2 extends ReplayingDecoder<Void> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
//在 ReplayingDecoder 不需要判断数据是否足够读取,内部会进行处理判断
out.add(in.readInt());
}
}
4. 其它解码器
类型 | 说明 |
---|---|
LineBasedFrameDecoder | 这个类在Netty内 部也有使用,它使用行尾控制字符(\n或者\r\n) 作为分隔符来解析数据 |
DelimiterBasedFrameDecoder | 使用自定义 的特殊字符作为消息的分隔符 |
HttpObjectDecoder | 一个HTTP数据的解码器 |
LengthFieldBasedFrameDecoder | 通过指定 长度来标识整包消息,这样就可以自动的处理 黏包和半包消息 |
5. Handler 调用机制
Handler 采用链式调用执行的机制。举例:接力赛

需求:
- 客户端发送 long 型数据 ----> 服务器
- 服务端发送 long 型数据 ----> 客户端
***代码见:*inoutboundhandler
1. 服务器端
- Server
public class MyServer {
public static void main(String[] args) throws Exception{
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new MyServerInitializer()); //自定义一个初始化类
ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
channelFuture.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
- ServerInitializer
public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();//一会下断点
//入站的handler进行解码 MyByteToLongDecoder
//pipeline.addLast(new MyByteToLongDecoder());
pipeline.addLast(new MyByteToLongDecoder2());
//出站的handler进行编码
pipeline.addLast(new MyLongToByteEncoder());
//自定义的handler 处理业务逻辑
pipeline.addLast(new MyServerHandler());
System.out.println("xx");
}
}
- ServerHandler
public class MyServerHandler extends SimpleChannelInboundHandler<Long> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
System.out.println("从客户端" + ctx.channel().remoteAddress() + " 读取到long " + msg);
//给客户端发送一个long
ctx.writeAndFlush(98765L);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
2. 客户端
- Client
public class MyClient {
public static void main(String[] args) throws Exception{
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
.handler(new MyClientInitializer()); //自定义一个初始化类
ChannelFuture channelFuture = bootstrap.connect("localhost", 7000).sync();
channelFuture.channel().closeFuture().sync();
}finally {
group.shutdownGracefully();
}
}
}
- ClientInitializer
public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//加入一个出站的handler 对数据进行一个编码
pipeline.addLast(new MyLongToByteEncoder());
//这时一个入站的解码器(入站handler )
//pipeline.addLast(new MyByteToLongDecoder());
pipeline.addLast(new MyByteToLongDecoder2());
//加入一个自定义的handler , 处理业务
pipeline.addLast(new MyClientHandler());
}
}
- ClientHandler
public class MyClientHandler extends SimpleChannelInboundHandler<Long> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
System.out.println("服务器的ip=" + ctx.channel().remoteAddress());
System.out.println("收到服务器消息=" + msg);
}
//重写channelActive 发送数据
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("MyClientHandler 发送数据");
//ctx.writeAndFlush(Unpooled.copiedBuffer(""))
ctx.writeAndFlush(123456L); //发送的是一个long
}
}
3. 编解码器
- 编码器
public class MyLongToByteEncoder extends MessageToByteEncoder<Long> {
//编码方法
@Override
protected void encode(ChannelHandlerContext ctx, Long msg, ByteBuf out) throws Exception {
System.out.println("MyLongToByteEncoder encode 被调用");
System.out.println("msg=" + msg);
out.writeLong(msg);
}
}
- 解码器
public class MyByteToLongDecoder2 extends ReplayingDecoder<Void> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println("MyByteToLongDecoder2 被调用");
//在 ReplayingDecoder 不需要判断数据是否足够读取,内部会进行处理判断
out.add(in.readLong());
}
}
4. 启动测试
先启动服务端什么都没有。

启动客户端。



12. TCP粘包和拆包及解决方案
**简而言之:**假设我说了两句话 “吕弘”,“刘璐”,两句话间隔很短【粘包】;听的人可能就听成了 “吕”,“弘刘璐”
- TCP是面向连接的,面向流的,提供高可靠性服务。收发两端(客户端和服务器端) 都要有一一成对的socket,因此,发送端为了将多个发给接收端的包,更有效的发 给对方,使用了优化方法(Nagle算法),将多次间隔较小且数据量小的数据,合并成一个大的数据块,然后进行封包。这样做虽然提高了效率,但是接收端就难于分辨出完整的数据包了,因为面向流的通信是无消息保护边界的。
- 由于TCP无消息保护边界, 需要在接收端处理消息边界问题,也就是我们所说的粘包、拆包问题
- TCP粘包、拆包图解:

1. 代码演示粘包拆包现象
*关键代码:******客户端*处理器中 channelActive()** 循环发送的数据
public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
private int count;// 标记接收次数
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//使用客户端发送10条数据 hello,server 编号
for(int i= 0; i< 10; ++i) {
ByteBuf buffer = Unpooled.copiedBuffer("hello,server " + i, Charset.forName("utf-8"));
ctx.writeAndFlush(buffer);
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
byte[] buffer = new byte[msg.readableBytes()];
msg.readBytes(buffer);
String message = new String(buffer, Charset.forName("utf-8"));
System.out.println("客户端接收到消息=" + message);
System.out.println("客户端接收消息数量=" + (++this.count));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
2. 将上个例子拷贝删掉编解码器


并修改MyServerHandler,并注意将传输的对象修改成****ByteBuf,本例子的传输的对象用的是ByteBuf。
public class MyServerHandler extends SimpleChannelInboundHandler<ByteBuf>{
private int count;// 标记接收次数
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//cause.printStackTrace();
ctx.close();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
byte[] buffer = new byte[msg.readableBytes()];
msg.readBytes(buffer);
//将buffer转成字符串
String message = new String(buffer, Charset.forName("utf-8"));
System.out.println("服务器接收到数据 " + message);
System.out.println("服务器接收到消息量=" + (++this.count));
//服务器回送数据给客户端, 回送一个随机id ,
ByteBuf responseByteBuf = Unpooled.copiedBuffer(UUID.randomUUID().toString() + " ", Charset.forName("utf-8"));
ctx.writeAndFlush(responseByteBuf);
}
}
3. 启动测试
启动一个服务端,两个客户端。出现现象

4. 解决方案
解决方案:自定义协议 + 编解码器。解码时,在解码器中获取数据长度,只读取该数据长度的数据
需求(对 上个例子进行改进):
- 要求客户端发送 5 个 Message 对象, 客户端每次发送一个 Message 对象
- 服务器端每次接收一个 Message, 分 5 次进行解码, 每读取到 一个 Message , 会回 复一个 Message 对象 给客户端
1. 服务器端
- Server
public class MyServer {
public static void main(String[] args) throws Exception{
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new MyServerInitializer()); //自定义一个初始化类
ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
channelFuture.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
- ServerInitializer
public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new MyMessageDecoder());//解码器
pipeline.addLast(new MyMessageEncoder());//编码器
pipeline.addLast(new MyServerHandler());
}
}
- ServerHandler
//处理业务的handler
public class MyServerHandler extends SimpleChannelInboundHandler<MessageProtocol> {
private int count;
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//cause.printStackTrace();
ctx.close();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
//接收到数据,并处理
int len = msg.getLen();
byte[] content = msg.getContent();
System.out.println();
System.out.println();
System.out.println("服务器接收到信息如下");
System.out.println("长度=" + len);
System.out.println("内容=" + new String(content, Charset.forName("utf-8")));
System.out.println("服务器接收到消息包数量=" + (++this.count));
//回复消息
String responseContent = UUID.randomUUID().toString();
int responseLen = responseContent.getBytes("utf-8").length;
byte[] responseContent2 = responseContent.getBytes("utf-8");
//构建一个协议包
MessageProtocol messageProtocol = new MessageProtocol();
messageProtocol.setLen(responseLen);
messageProtocol.setContent(responseContent2);
ctx.writeAndFlush(messageProtocol);
}
}
2. 客户端
- Client
public class MyClient {
public static void main(String[] args) throws Exception{
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
.handler(new MyClientInitializer()); //自定义一个初始化类
ChannelFuture channelFuture = bootstrap.connect("localhost", 7000).sync();
channelFuture.channel().closeFuture().sync();
}finally {
group.shutdownGracefully();
}
}
}
- ClientInitializer
public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new MyMessageEncoder()); //加入编码器
pipeline.addLast(new MyMessageDecoder()); //加入解码器
pipeline.addLast(new MyClientHandler());
}
}
- ClientHandler
public class MyClientHandler extends SimpleChannelInboundHandler<MessageProtocol> {
private int count;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//使用客户端发送10条数据 "今天天气冷,吃火锅" 编号
for(int i = 0; i< 5; i++) {
String mes = "今天天气冷,吃火锅";
byte[] content = mes.getBytes(Charset.forName("utf-8"));
int length = mes.getBytes(Charset.forName("utf-8")).length;
//创建协议包对象
MessageProtocol messageProtocol = new MessageProtocol();
messageProtocol.setLen(length);
messageProtocol.setContent(content);
ctx.writeAndFlush(messageProtocol);
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
int len = msg.getLen();
byte[] content = msg.getContent();
System.out.println("客户端接收到消息如下");
System.out.println("长度=" + len);
System.out.println("内容=" + new String(content, Charset.forName("utf-8")));
System.out.println("客户端接收消息数量=" + (++this.count));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("异常消息=" + cause.getMessage());
ctx.close();
}
}
3. 编解码器
- 数据包
//协议包
public class MessageProtocol {
private int len; //关键:数据长度
private byte[] content; // 要发送的数据
public int getLen() {
return len;
}
public void setLen(int len) {
this.len = len;
}
public byte[] getContent() {
return content;
}
public void setContent(byte[] content) {
this.content = content;
}
}
- 编码器
public class MyMessageEncoder extends MessageToByteEncoder<MessageProtocol> {
@Override
protected void encode(ChannelHandlerContext ctx, MessageProtocol msg, ByteBuf out) throws Exception {
System.out.println("MyMessageEncoder encode 方法被调用");
out.writeInt(msg.getLen());
out.writeBytes(msg.getContent());
}
}
- 解码器
public class MyMessageDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println();
System.out.println();
System.out.println("MyMessageDecoder decode 被调用");
//需要将得到二进制字节码-> MessageProtocol 数据包(对象)
int length = in.readInt();
byte[] content = new byte[length];
in.readBytes(content);
//封装成 MessageProtocol 对象,放入 out, 传递下一个handler业务处理
MessageProtocol messageProtocol = new MessageProtocol();
messageProtocol.setLen(length);
messageProtocol.setContent(content);
//放入out传给下一个hanlder进行处理
out.add(messageProtocol);
}
}