网络编程学习

网络编程学习

NIO基础

在JavaSE的学习中,了解到如何使用IO进行数据传输,Java IO是阻塞的,如果在一次读写数据调用时数据还没有准备好,或者目前不可写,那么读写操作就会被阻塞直到数据准备好或目标可写为止。Java NIO则是非阻塞的,每一次数据读写调用都会立即返回,并将目前可读(或可写)的内容写入缓冲区或者从缓冲区中输出,即使当前没有可用数据,调用仍然会立即返回并且不对缓冲区做任何操作。

NIO框架是在JDK1.4推出的,它的出现就是为了解决传统IO的不足。

缓冲区

一切的一切还要从缓冲区开始讲起,包括源码在内,其实这个不是很难,只是需要理清思路。

Buffer类及其实现

Buffer类是缓冲区的实现,类似于Java中的数组,也是用于存放和获取数据的。但是Buffer相比Java中的数组,功能就非常强大了,它包含一系列对于数组的快捷操作。

Buffer是一个抽象类,它的核心内容:

1
2
3
4
5
6
7
8
9
10
public abstract class Buffer {
// 这四个变量的关系: mark <= position <= limit <= capacity
// 这些变量就是Buffer操作的核心了,之后我们学习的过程中可以看源码是如何操作这些变量的
private int mark = -1;
private int position = 0;
private int limit;
private int capacity;

// 直接缓冲区实现子类的数据内存地址(之后会讲解)
long address;

我们来看看Buffer类的子类,包括我们认识到的所有基本类型(除了boolean类型之外):

  • IntBuffer - int类型的缓冲区。
  • ShortBuffer - short类型的缓冲区。
  • LongBuffer - long类型的缓冲区。
  • FloatBuffer - float类型的缓冲区。
  • DoubleBuffer - double类型的缓冲区。
  • ByteBuffer - byte类型的缓冲区。
  • CharBuffer - char类型的缓冲区。

(注意我们之前在JavaSE中学习过的StringBuffer虽然也是这种命名方式,但是不属于Buffer体系,这里不会进行介绍)

这里我们以IntBuffer为例,我们来看看如何创建一个Buffer类:

1
2
3
4
5
6
7
8
public static void main(String[] args) {
//创建一个缓冲区不能直接new,而是需要使用静态方法去生成,有两种方式:
//1. 申请一个容量为10的int缓冲区
IntBuffer buffer = IntBuffer.allocate(10);
//2. 可以将现有的数组直接转换为缓冲区(包括数组中的数据)
int[] arr = new int[]{1, 2, 3, 4, 5, 6};
IntBuffer buffer = IntBuffer.wrap(arr);
}

那么它的内部是本质上如何进行操作的呢?我们来看看它的源码:

1
2
3
4
5
6
public static IntBuffer allocate(int capacity) {
if (capacity < 0) //如果申请的容量小于0,那还有啥意思
throw new IllegalArgumentException();
return new HeapIntBuffer(capacity, capacity); //可以看到这里会直接创建一个新的IntBuffer实现类
//HeapIntBuffer是在堆内存中存放数据,本质上就数组,一会我们可以在深入看一下
}
1
2
3
4
5
6
7
8
9
10
11
12
public static IntBuffer wrap(int[] array, int offset, int length) {
try {
//可以看到这个也是创建了一个新的HeapIntBuffer对象,并且给了初始数组以及截取的起始位置和长度
return new HeapIntBuffer(array, offset, length);
} catch (IllegalArgumentException x) {
throw new IndexOutOfBoundsException();
}
}

public static IntBuffer wrap(int[] array) {
return wrap(array, 0, array.length); //调用的是上面的wrap方法
}

那么这个HeapIntBuffer又是如何实现的呢,我们接着来看:

1
2
3
4
HeapIntBuffer(int[] buf, int off, int len) { // 注意这个构造方法不是public,是默认的访问权限
super(-1, off, off + len, buf.length, buf, 0); //你会发现这怎么又去调父类的构造方法了,绕来绕去
//mark是标记,off是当前起始下标位置,off+len是最大下标位置,buf.length是底层维护的数组真正长度,buf就是数组,最后一个0是起始偏移位置
}

我们又来看看IntBuffer中的构造方法是如何定义的:

1
2
3
4
5
6
7
8
9
10
11
final int[] hb;                  // 只有在堆缓冲区实现时才会使用
final int offset;
boolean isReadOnly; // 只有在堆缓冲区实现时才会使用

IntBuffer(int mark, int pos, int lim, int cap, // 注意这个构造方法不是public,是默认的访问权限
int[] hb, int offset)
{
super(mark, pos, lim, cap); //调用Buffer类的构造方法
this.hb = hb; //hb就是真正我们要存放数据的数组,堆缓冲区底层其实就是这么一个数组
this.offset = offset; //起始偏移位置
}

最后我们来看看Buffer中的构造方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
Buffer(int mark, int pos, int lim, int cap) {       // 注意这个构造方法不是public,是默认的访问权限
if (cap < 0) //容量不能小于0,小于0还玩个锤子
throw new IllegalArgumentException("Negative capacity: " + cap);
this.capacity = cap; //设定缓冲区容量
limit(lim); //设定最大position位置
position(pos); //设定起始位置
if (mark >= 0) { //如果起始标记大于等于0
if (mark > pos) //并且标记位置大于起始位置,那么就抛异常(至于为啥不能大于我们后面再说)
throw new IllegalArgumentException("mark > position: ("
+ mark + " > " + pos + ")");
this.mark = mark; //否则设定mark位置(mark默认为-1)
}
}

通过对源码的观察,我们大致可以得到以下结构了:

image-20220424093805677

现在我们来总结一下上面这些结构的各自职责划分:

  • Buffer:缓冲区的一些基本变量定义,比如当前的位置(position)、容量 (capacity)、最大限制 (limit)、标记 (mark)等,你肯定会疑惑这些变量有啥用,别着急,这些变量会在后面的操作中用到,我们逐步讲解。
  • IntBuffer等子类:定义了存放数据的数组(只有堆缓冲区实现子类才会用到)、是否只读等,也就是说数据的存放位置、以及对于底层数组的相关操作都在这里已经定义好了,并且已经实现了Comparable接口。
  • HeapIntBuffer堆缓冲区实现子类:数据存放在堆中,实际上就是用的父类的数组在保存数据,并且将父类定义的所有底层操作全部实现了。

这样,我们对于Buffer类的基本结构就有了一个大致的认识。

缓冲区写操作

前面我们了解了Buffer类的基本操作,现在我们来看一下如何向缓冲区中存放数据以及获取数据,数据的存放包括以下四个方法:

  • public abstract IntBuffer put(int i); - 在当前position位置插入数据,由具体子类实现
  • public abstract IntBuffer put(int index, int i); - 在指定位置存放数据,也是由具体子类实现
  • public final IntBuffer put(int[] src); - 直接存放所有数组中的内容(数组长度不能超出缓冲区大小)
  • public IntBuffer put(int[] src, int offset, int length); - 直接存放数组中的内容,同上,但是可以指定存放一段范围
  • public IntBuffer put(IntBuffer src); - 直接存放另一个缓冲区中的内容

我们从最简的开始看,是在当前位置插入一个数据,那么这个当前位置是怎么定义的呢,我们来看看源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public IntBuffer put(int x) {
hb[ix(nextPutIndex())] = x; //这个ix和nextPutIndex()很灵性,我们来看看具体实现
return this;
}

protected int ix(int i) {
return i + offset; //将i的值加上我们之前设定的offset偏移量值,但是默认是0(非0的情况后面会介绍)
}

final int nextPutIndex() {
int p = position; //获取Buffer类中的position位置(一开始也是0)
if (p >= limit) //位置肯定不能超过底层数组最大长度,否则越界
throw new BufferOverflowException();
position = p + 1; //获取之后会使得Buffer类中的position+1
return p; //返回当前的位置
}

所以put操作实际上是将底层数组hb在position位置上的数据进行设定。

image-20220424113417640

设定完成后,position自动后移:

image-20220424113440765

我们可以编写代码来看看:

1
2
3
4
5
6
7
8
public static void main(String[] args) {
IntBuffer buffer = IntBuffer.allocate(10);
buffer
.put(1)
.put(2)
.put(3); //我们依次存放三个数据试试看
System.out.println(buffer);
}

通过断点调试,我们来看看实际的操作情况:

image-20220424105031549

可以看到我们不断地put操作,position会一直向后移动,当然如果超出最大长度,那么会直接抛出异常:

image-20220424105131279

接着我们来看看第二个put操作是如何进行,它能够在指定位置插入数据:

1
2
3
4
5
6
7
8
9
10
public IntBuffer put(int i, int x) {
hb[ix(checkIndex(i))] = x; //这里依然会使用ix,但是会检查位置是否合法
return this;
}

final int checkIndex(int i) { // package-private
if ((i < 0) || (i >= limit)) //插入的位置不能小于0并且不能大于等于底层数组最大长度
throw new IndexOutOfBoundsException();
return i; //没有问题就把i返回
}

实际上这个比我们之前的要好理解一些,注意全程不会操作position的值,这里需要注意一下。

我们接着来看第三个put操作,它是直接在IntBuffer中实现的,是基于前两个put方法的子类实现来完成的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public IntBuffer put(int[] src, int offset, int length) {
checkBounds(offset, length, src.length); //检查截取范围是否合法,给offset、调用者指定长度、数组实际长度
if (length > remaining()) //接着判断要插入的数据量在缓冲区是否容得下,装不下也不行
throw new BufferOverflowException();
int end = offset + length; //计算出最终读取位置,下面开始for
for (int i = offset; i < end; i++)
this.put(src[i]); //注意是直接从postion位置开始插入,直到指定范围结束
return this; //ojbk
}

public final IntBuffer put(int[] src) {
return put(src, 0, src.length); //因为不需要指定范围,所以直接0和length,然后调上面的,多捞哦
}

public final int remaining() { //计算并获取当前缓冲区的剩余空间
int rem = limit - position; //最大容量减去当前位置,就是剩余空间
return rem > 0 ? rem : 0; //没容量就返回0
}
1
2
3
4
5
static void checkBounds(int off, int len, int size) { // package-private
if ((off | len | (off + len) | (size - (off + len))) < 0) //让我猜猜,看不懂了是吧
throw new IndexOutOfBoundsException();
//实际上就是看给定的数组能不能截取出指定的这段数据,如果都不够了那肯定不行啊
}

大致流程如下,首先来了一个数组要取一段数据全部丢进缓冲区:

image-20220424113337189

在检查没有什么问题并且缓冲区有容量时,就可以开始插入了:

Img

最后我们通过代码来看看:

1
2
3
4
5
6
7
public static void main(String[] args) {
IntBuffer buffer = IntBuffer.allocate(10);
int[] arr = new int[]{1,2,3,4,5,6,7,8,9};
buffer.put(arr, 3, 4); //从下标3开始,截取4个元素

System.out.println(Arrays.toString(buffer.array())); //array方法可以直接获取到数组
}

可以看到最后结果为:

image-20220424113040485

当然我们也可以将一个缓冲区的内容保存到另一个缓冲区:

1
2
3
4
5
6
7
8
9
10
11
12
public IntBuffer put(IntBuffer src) {
if (src == this) //不会吧不会吧,不会有人保存自己吧
throw new IllegalArgumentException();
if (isReadOnly()) //如果是只读的话,那么也是不允许插入操作的(我猜你们肯定会问为啥就这里会判断只读,前面四个呢)
throw new ReadOnlyBufferException();
int n = src.remaining(); //给进来的src看看容量(注意这里不remaining的结果不是剩余容量,是转换后的,之后会说)
if (n > remaining()) //这里判断当前剩余容量是否小于src容量
throw new BufferOverflowException();
for (int i = 0; i < n; i++) //也是从position位置开始继续写入
put(src.get()); //通过get方法一个一个读取数据出来,具体过程后面讲解
return this;
}

我们来看看效果:

1
2
3
4
5
6
public static void main(String[] args) {
IntBuffer src = IntBuffer.wrap(new int[]{1, 2, 3, 4, 5});
IntBuffer buffer = IntBuffer.allocate(10);
buffer.put(src);
System.out.println(Arrays.toString(buffer.array()));
}

但是如果是这样的话,会出现问题:

1
2
3
4
5
6
7
public static void main(String[] args) {
IntBuffer src = IntBuffer.allocate(5);
for (int i = 0; i < 5; i++) src.put(i); //手动插入数据
IntBuffer buffer = IntBuffer.allocate(10);
buffer.put(src);
System.out.println(Arrays.toString(buffer.array()));
}

我们发现,结果和上面的不一样,并没有成功地将数据填到下面的IntBuffer中,这是为什么呢?实际上就是因为remaining()的计算问题,因为这个方法是直接计算postion的位置,但是由于我们在写操作完成之后,position跑到后面去了,也就导致remaining()结果最后算出来为0。

因为这里不是写操作,是接下来需要从头开始进行读操作,所以我们得想个办法把position给退回到一开始的位置,这样才可以从头开始读取,那么怎么做呢?一般我们在写入完成后需要进行读操作时(后面都是这样,不只是这里),会使用flip()方法进行翻转:

1
2
3
4
5
6
public final Buffer flip() {
limit = position; //修改limit值,当前写到哪里,下次读的最终位置就是这里,limit的作用开始慢慢体现了
position = 0; //position归零
mark = -1; //标记还原为-1,但是现在我们还没用到
return this;
}

这样,再次计算remaining()的结果就是我们需要读取的数量了,这也是为什么put方法中要用remaining()来计算的原因,我们再来测试一下:

1
2
3
4
5
6
7
8
9
public static void main(String[] args) {
IntBuffer src = IntBuffer.allocate(5);
for (int i = 0; i < 5; i++) src.put(i);
IntBuffer buffer = IntBuffer.allocate(10);

src.flip(); //我们可以通过flip来翻转缓冲区
buffer.put(src);
System.out.println(Arrays.toString(buffer.array()));
}

翻转之后再次进行转移,就正常了。

缓冲区读操作

前面我们看完了写操作,现在我们接着来看看读操作。读操作有四个方法:

  • public abstract int get(); - 直接获取当前position位置的数据,由子类实现
  • public abstract int get(int index); - 获取指定位置的数据,也是子类实现
  • public IntBuffer get(int[] dst) - 将数据读取到给定的数组中
  • public IntBuffer get(int[] dst, int offset, int length) - 同上,加了个范围

我们还是从最简单的开始看,第一个get方法的实现在IntBuffer类中:

1
2
3
4
5
6
7
8
9
10
11
public int get() {
return hb[ix(nextGetIndex())]; //直接从数组中取就完事
}

final int nextGetIndex() { // 好家伙,这不跟前面那个一模一样吗
int p = position;
if (p >= limit)
throw new BufferUnderflowException();
position = p + 1;
return p;
}

可以看到每次读取操作之后,也会将postion+1,直到最后一个位置,如果还要继续读,那么就直接抛出异常。

image-20220424123743020

我们来看看第二个:

1
2
3
public int get(int i) {
return hb[ix(checkIndex(i))]; //这里依然是使用checkIndex来检查位置是否非法
}

我们来看看第三个和第四个:

1
2
3
4
5
6
7
8
9
10
11
12
13
public IntBuffer get(int[] dst, int offset, int length) {
checkBounds(offset, length, dst.length); //跟put操作一样,也是需要检查是否越界
if (length > remaining()) //如果读取的长度比可以读的长度大,那肯定是不行的
throw new BufferUnderflowException();
int end = offset + length; //计算出最终读取位置
for (int i = offset; i < end; i++)
dst[i] = get(); //开始从position把数据读到数组中,注意是在数组的offset位置开始
return this;
}

public IntBuffer get(int[] dst) {
return get(dst, 0, dst.length); //不指定范围的话,那就直接用上面的
}

我们来看看效果:

1
2
3
4
5
6
public static void main(String[] args) {
IntBuffer buffer = IntBuffer.wrap(new int[]{1, 2, 3, 4, 5});
int[] arr = new int[10];
buffer.get(arr, 2, 5);
System.out.println(Arrays.toString(arr));
}

image-20220424125203822

可以看到成功地将数据读取到了数组中。

当然如果我们需要直接获取数组,也可以使用array()方法来拿到:

1
2
3
4
5
6
7
public final int[] array() {
if (hb == null) //为空那说明底层不是数组实现的,肯定就没法转换了
throw new UnsupportedOperationException();
if (isReadOnly) //只读也是不让直接取出的,因为一旦取出去岂不是就能被修改了
throw new ReadOnlyBufferException();
return hb; //直接返回hb
}

我们来试试看:

1
2
3
4
public static void main(String[] args) {
IntBuffer buffer = IntBuffer.wrap(new int[]{1, 2, 3, 4, 5});
System.out.println(Arrays.toString(buffer.array()));
}

当然,既然都已经拿到了底层的hb了,我们来看看如果直接修改之后是不是读取到的就是我们的修改之后的结果了:

1
2
3
4
5
6
public static void main(String[] args) {
IntBuffer buffer = IntBuffer.wrap(new int[]{1, 2, 3, 4, 5});
int[] arr = buffer.array();
arr[0] = 99999; //拿到数组对象直接改
System.out.println(buffer.get());
}

可以看到这种方式由于是直接拿到的底层数组,所有修改会直接生效在缓冲区中。

当然除了常规的读取方式之外,我们也可以通过mark()来实现跳转读取,这里需要介绍一下几个操作:

  • public final Buffer mark() - 标记当前位置
  • public final Buffer reset() - 让当前的position位置跳转到mark当时标记的位置

我们首先来看标记方法:

1
2
3
4
public final Buffer mark() {
mark = position; //直接标记到当前位置,mark变量终于派上用场了,当然这里仅仅是标记
return this;
}

我们再来看看重置方法:

1
2
3
4
5
6
7
public final Buffer reset() {
int m = mark; //存一下当前的mark位置
if (m < 0) //因为mark默认是-1,要是没有进行过任何标记操作,那reset个毛
throw new InvalidMarkException();
position = m; //直接让position变成mark位置
return this;
}

那比如我们在读取到1号位置时进行标记:

image-20220424135842228

接着我们使用reset方法就可以直接回退回去了:

image-20220424135925501

现在我们来测试一下:

1
2
3
4
5
6
7
8
public static void main(String[] args) {
IntBuffer buffer = IntBuffer.wrap(new int[]{1, 2, 3, 4, 5});
buffer.get(); //读取一位,那么position就变成1了
buffer.mark(); //这时标记,那么mark = 1
buffer.get(); //又读取一位,那么position就变成2了
buffer.reset(); //直接将position = mark,也就是变回1
System.out.println(buffer.get());
}

可以看到,读取的位置根据我们的操作进行了变化,有关缓冲区的读操作,就暂时讲到这里。

缓冲区其他操作

前面我们大致了解了一下缓冲区的读写操作,那么我们接着来看看,除了常规的读写操作之外,还有哪些其他的操作:

  • public abstract IntBuffer compact() - 压缩缓冲区,由具体实现类实现
  • public IntBuffer duplicate() - 复制缓冲区,会直接创建一个新的数据相同的缓冲区
  • public abstract IntBuffer slice() - 划分缓冲区,会将原本的容量大小的缓冲区划分为更小的出来进行操作
  • public final Buffer rewind() - 重绕缓冲区,其实就是把position归零,然后mark变回-1
  • public final Buffer clear() - 将缓冲区清空,所有的变量变回最初的状态

我们先从压缩缓冲区开始看起,它会将整个缓冲区的大小和数据内容变成position位置到limit之间的数据,并移动到数组头部:

1
2
3
4
5
6
7
8
9
10
11
public IntBuffer compact() {
int pos = position(); //获取当前位置
int lim = limit(); //获取当前最大position位置
assert (pos <= lim); //断言表达式,position必须小于最大位置,肯定的
int rem = (pos <= lim ? lim - pos : 0); //计算pos距离最大位置的长度
System.arraycopy(hb, ix(pos), hb, ix(0), rem); //直接将hb数组当前position位置的数据拷贝到头部去,然后长度改成刚刚计算出来的空间
position(rem); //直接将position移动到rem位置
limit(capacity()); //pos最大位置修改为最大容量
discardMark(); //mark变回-1
return this;
}

比如现在的状态是:

image-20220424140040711

那么我们在执行 compact()方法之后,会进行截取,此时limit - position = 6,那么就会截取第4、5、6、7、8、9这6个数据然后丢到最前面,接着position跑到7表示这是下一个继续的位置:

image-20220424140326373

现在我们通过代码来检验一下:

1
2
3
4
5
6
7
8
9
public static void main(String[] args) {
IntBuffer buffer = IntBuffer.wrap(new int[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 0});
for (int i = 0; i < 4; i++) buffer.get(); //先正常读4个
buffer.compact(); //压缩缓冲区

System.out.println("压缩之后的情况:"+Arrays.toString(buffer.array()));
System.out.println("当前position位置:"+buffer.position());
System.out.println("当前limit位置:"+buffer.limit());
}

可以看到最后的结果没有问题:

image-20220424141916082

我们接着来看第二个方法,那么如果我们现在需要复制一个内容一模一样的的缓冲区,该怎么做?直接使用duplicate()方法就可以复制了:

1
2
3
4
5
6
7
8
public IntBuffer duplicate() {   //直接new一个新的,但是是吧hb给丢进去了,而不是拷贝一个新的
return new HeapIntBuffer(hb,
this.markValue(),
this.position(),
this.limit(),
this.capacity(),
offset);
}

那么各位猜想一下,如果通过这种方式创了一个新的IntBuffer,那么下面的例子会出现什么结果:

1
2
3
4
5
6
7
public static void main(String[] args) {
IntBuffer buffer = IntBuffer.wrap(new int[]{1, 2, 3, 4, 5});
IntBuffer duplicate = buffer.duplicate();

System.out.println(buffer == duplicate);
System.out.println(buffer.array() == duplicate.array());
}

由于buffer是重新new的,所以第一个为false,而底层的数组由于在构造的时候没有进行任何的拷贝而是直接传递,因此实际上两个缓冲区的底层数组是同一个对象。所以,一个发生修改,那么另一个就跟着变了:

1
2
3
4
5
6
7
public static void main(String[] args) {
IntBuffer buffer = IntBuffer.wrap(new int[]{1, 2, 3, 4, 5});
IntBuffer duplicate = buffer.duplicate();

buffer.put(0, 66666);
System.out.println(duplicate.get());
}

现在我们接着来看下一个方法,slice()方法会将缓冲区进行划分:

1
2
3
4
5
6
7
8
9
10
11
public IntBuffer slice() {
int pos = this.position(); //获取当前position
int lim = this.limit(); //获取position最大位置
int rem = (pos <= lim ? lim - pos : 0); //求得剩余空间
return new HeapIntBuffer(hb, //返回一个新的划分出的缓冲区,但是底层的数组用的还是同一个
-1,
0,
rem, //新的容量变成了剩余空间的大小
rem,
pos + offset); //可以看到offset的地址不再是0了,而是当前的position加上原有的offset值
}

虽然现在底层依然使用的是之前的数组,但是由于设定了offset值,我们之前的操作似乎变得不太一样了:

image-20220424142642088

回顾前面我们所讲解的内容,在读取和存放时,会被ix方法进行调整:

1
2
3
4
5
6
7
protected int ix(int i) {
return i + offset; //现在offset为4,那么也就是说逻辑上的i是0但是得到真实位置却是4
}

public int get() {
return hb[ix(nextGetIndex())]; //最后会经过ix方法转换为真正在数组中的位置
}

当然,在逻辑上我们可以认为是这样的:

image-20220424143002885

现在我们来测试一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static void main(String[] args) {
IntBuffer buffer = IntBuffer.wrap(new int[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 0});
for (int i = 0; i < 4; i++) buffer.get();
IntBuffer slice = buffer.slice();

System.out.println("划分之后的情况:"+Arrays.toString(slice.array()));
System.out.println("划分之后的偏移地址:"+slice.arrayOffset());
System.out.println("当前position位置:"+slice.position());
System.out.println("当前limit位置:"+slice.limit());

while (slice.hasRemaining()) { //将所有的数据全部挨着打印出来
System.out.print(slice.get()+", ");
}
}

可以看到,最终结果:

image-20220424143036449

最后两个方法就比较简单了,我们先来看rewind(),它相当于是对position和mark进行了一次重置:

1
2
3
4
5
public final Buffer rewind() {
position = 0;
mark = -1;
return this;
}

接着是clear(),它相当于是将整个缓冲区回归到最初的状态了:

1
2
3
4
5
6
public final Buffer clear() {
position = 0; //同上
limit = capacity; //limit变回capacity
mark = -1;
return this;
}

到这里,关于缓冲区的一些其他操作,我们就讲解到此。

缓冲区比较

缓冲区之间是可以进行比较的,我们可以看到equals方法和compareTo方法都是被重写了的,我们首先来看看equals方法,注意,它是判断两个缓冲区剩余的内容是否一致:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public boolean equals(Object ob) {
if (this == ob) //要是两个缓冲区是同一个对象,肯定一样
return true;
if (!(ob instanceof IntBuffer)) //类型不是IntBuffer那也不用比了
return false;
IntBuffer that = (IntBuffer)ob; //转换为IntBuffer
int thisPos = this.position(); //获取当前缓冲区的相关信息
int thisLim = this.limit();
int thatPos = that.position(); //获取另一个缓冲区的相关信息
int thatLim = that.limit();
int thisRem = thisLim - thisPos;
int thatRem = thatLim - thatPos;
if (thisRem < 0 || thisRem != thatRem) //如果剩余容量小于0或是两个缓冲区的剩余容量不一样,也不行
return false;
//注意比较的是剩余的内容
for (int i = thisLim - 1, j = thatLim - 1; i >= thisPos; i--, j--) //从最后一个开始倒着往回比剩余的区域
if (!equals(this.get(i), that.get(j)))
return false; //只要发现不一样的就不用继续了,直接false
return true; //上面的比较都没问题,那么就true
}

private static boolean equals(int x, int y) {
return x == y;
}

那么我们按照它的思路来验证一下:

1
2
3
4
5
6
7
8
9
public static void main(String[] args) {
IntBuffer buffer1 = IntBuffer.wrap(new int[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 0});
IntBuffer buffer2 = IntBuffer.wrap(new int[]{6, 5, 4, 3, 2, 1, 7, 8, 9, 0});
System.out.println(buffer1.equals(buffer2)); //直接比较

buffer1.position(6);
buffer2.position(6);
System.out.println(buffer1.equals(buffer2)); //比较从下标6开始的剩余内容
}

可以看到结果就是我们所想的那样:

image-20220424145009464

那么我们接着来看比较,compareTo方法,它实际上是Comparable接口提供的方法,它实际上比较的也是pos开始剩余的内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public int compareTo(IntBuffer that) {
int thisPos = this.position(); //获取并计算两个缓冲区的pos和remain
int thisRem = this.limit() - thisPos;
int thatPos = that.position();
int thatRem = that.limit() - thatPos;
int length = Math.min(thisRem, thatRem); //选取一个剩余空间最小的出来
if (length < 0) //如果最小的小于0,那就返回-1
return -1;
int n = thisPos + Math.min(thisRem, thatRem); //计算n的值当前的pos加上剩余的最小空间
for (int i = thisPos, j = thatPos; i < n; i++, j++) { //从两个缓冲区的当前位置开始,一直到n结束
int cmp = compare(this.get(i), that.get(j)); //比较
if (cmp != 0)
return cmp; //只要出现不相同的,那么就返回比较出来的值
}
return thisRem - thatRem; //如果没比出来个所以然,那么就比长度
}

private static int compare(int x, int y) {
return Integer.compare(x, y);
}

这里我们就不多做介绍了。

只读缓冲区

接着我们来看看只读缓冲区,只读缓冲区就像其名称一样,它只能进行读操作,而不允许进行写操作。

那么我们怎么创建只读缓冲区呢?

  • public abstract IntBuffer asReadOnlyBuffer(); - 基于当前缓冲区生成一个只读的缓冲区。

我们来看看此方法的具体实现:

1
2
3
4
5
6
7
8
public IntBuffer asReadOnlyBuffer() {
return new HeapIntBufferR(hb, //注意这里并不是直接创建了HeapIntBuffer,而是HeapIntBufferR,并且直接复制的hb数组
this.markValue(),
this.position(),
this.limit(),
this.capacity(),
offset);
}

那么这个HeapIntBufferR类跟我们普通的HeapIntBuffer有什么不同之处呢?

image-20220424150625847

可以看到它是继承自HeapIntBuffer的,那么我们来看看它的实现有什么不同:

1
2
3
4
5
6
7
protected HeapIntBufferR(int[] buf,
int mark, int pos, int lim, int cap,
int off)
{
super(buf, mark, pos, lim, cap, off);
this.isReadOnly = true;
}

可以看到在其构造方法中,除了直接调用父类的构造方法外,还会将isReadOnly标记修改为true,我们接着来看put操作有什么不同之处:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public boolean isReadOnly() {
return true;
}

public IntBuffer put(int x) {
throw new ReadOnlyBufferException();
}

public IntBuffer put(int i, int x) {
throw new ReadOnlyBufferException();
}

public IntBuffer put(int[] src, int offset, int length) {
throw new ReadOnlyBufferException();
}

public IntBuffer put(IntBuffer src) {
throw new ReadOnlyBufferException();
}

可以看到所有的put方法全部凉凉,只要调用就会直接抛出ReadOnlyBufferException异常。但是其他get方法依然没有进行重写,也就是说get操作还是可以正常使用的,但是只要是写操作就都不行:

1
2
3
4
5
6
7
8
public static void main(String[] args) {
IntBuffer buffer = IntBuffer.wrap(new int[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 0});
IntBuffer readBuffer = buffer.asReadOnlyBuffer();

System.out.println(readBuffer.isReadOnly());
System.out.println(readBuffer.get());
readBuffer.put(0, 666);
}

可以看到结果为:

image-20220424151322831

这就是只读状态下的缓冲区。

ByteBuffer和CharBuffer

通过前面的学习,我们基本上已经了解了缓冲区的使用,但是都是基于IntBuffer进行讲解,现在我们来看看另外两种基本类型的缓冲区ByteBuffer和CharBuffer,因为ByteBuffer底层存放的是很多单个byte字节,所以会有更多的玩法,同样CharBuffer是一系列字节,所以也有很多便捷操作。

我们先来看看ByteBuffer,我们可以直接点进去看:

1
2
3
4
5
public abstract class ByteBuffer extends Buffer implements Comparable<ByteBuffer> {
final byte[] hb; // Non-null only for heap buffers
final int offset;
boolean isReadOnly; // Valid only for heap buffers
....

可以看到如果也是使用堆缓冲区子类实现,那么依然是一个byte[]的形式保存数据。我们来尝试使用一下:

1
2
3
4
5
6
7
8
9
10
11
12
public static void main(String[] args) {
ByteBuffer buffer = ByteBuffer.allocate(10);
//除了直接丢byte进去之外,我们也可以丢其他的基本类型(注意容量消耗)
buffer.putInt(Integer.MAX_VALUE); //丢个int的最大值进去,注意一个int占4字节
System.out.println("当前缓冲区剩余字节数:"+buffer.remaining()); //只剩6个字节了

//我们来尝试读取一下,记得先翻转
buffer.flip();
while (buffer.hasRemaining()) {
System.out.println(buffer.get()); //一共四个字节
}
}

最后的结果为:

image-20220424153520843

可以看到第一个byte为127、然后三个都是-1,我们来分析一下:

  • 127 转换为二进制补码形式就是 01111111,而-1转换为二进制补码形式为11111111

那也就是说,第一个字节是01111111,而后续字节就是11111111,把它们拼接在一起:

  • 二进制补码表示01111111 11111111 11111111 11111111 转换为十进制就是2147483647,也就是int的最大值。

那么根据我们上面的推导,各位能否计算得到下面的结果呢?

1
2
3
4
5
6
7
8
9
10
public static void main(String[] args) {
ByteBuffer buffer = ByteBuffer.allocate(10);
buffer.put((byte) 0);
buffer.put((byte) 0);
buffer.put((byte) 1);
buffer.put((byte) -1);

buffer.flip(); //翻转一下
System.out.println(buffer.getInt()); //以int形式获取,那么就是一次性获取4个字节
}

经过上面的计算,得到的结果就是:

  • 上面的数据以二进制补码的形式表示为:00000000 00000000 00000001 11111111
  • 将其转换为十进制那么就是:256 + 255 = 511

好吧,再来个魔鬼问题,把第一个换成1呢:10000000 00000000 00000001 11111111,自己算。

我们接着来看看CharBuffer,这种缓冲区实际上也是保存一大堆char类型的数据:

1
2
3
4
5
public static void main(String[] args) {
CharBuffer buffer = CharBuffer.allocate(10);
buffer.put("lbwnb"); //除了可以直接丢char之外,字符串也可以一次性丢进入
System.out.println(Arrays.toString(buffer.array()));
}

但是正是得益于char数组,它包含了很多的字符串操作,可以一次性存放一整个字符串。我们甚至还可以将其当做一个String来进行处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static void main(String[] args) {
CharBuffer buffer = CharBuffer.allocate(10);
buffer.put("lbwnb");
buffer.append("!"); //可以像StringBuilder一样使用append来继续添加数据

System.out.println("剩余容量:"+buffer.remaining()); //已经用了6个字符了

buffer.flip();
System.out.println("整个字符串为:"+buffer); //直接将内容转换为字符串
System.out.println("第3个字符是:"+buffer.charAt(2)); //直接像String一样charAt

buffer //也可以转换为IntStream进行操作
.chars()
.filter(i -> i < 'l')
.forEach(i -> System.out.print((char) i));
}

当然除了一些常规操作之外,我们还可以直接将一个字符串作为参数创建:

1
2
3
4
5
6
7
public static void main(String[] args) {
//可以直接使用wrap包装一个字符串,但是注意,包装出来之后是只读的
CharBuffer buffer = CharBuffer.wrap("收藏等于学会~");
System.out.println(buffer);

buffer.put("111"); //这里尝试进行一下写操作
}

可以看到结果也是我们预料中的:

image-20220424161925938

对于这两个比较特殊的缓冲区,我们就暂时讲解到这里。

直接缓冲区

注意:推荐学习完成JVM篇再来学习这一部分。

最后我们来看一下直接缓冲区,我们前面一直使用的都是堆缓冲区,也就是说实际上数据是保存在一个数组中的,如果你已经完成了JVM篇的学习,一定知道实际上占用的是堆内存,而我们也可以创建一个直接缓冲区,也就是申请堆外内存进行数据保存,采用操作系统本地的IO,相比堆缓冲区会快一些。

那么怎么使用直接缓冲区呢?我们可以通过allocateDirect方法来创建:

1
2
3
4
5
6
7
8
public static void main(String[] args) {
//这里我们申请一个直接缓冲区
ByteBuffer buffer = ByteBuffer.allocateDirect(10);
//使用方式基本和之前是一样的
buffer.put((byte) 66);
buffer.flip();
System.out.println(buffer.get());
}

我们来看看这个allocateDirect方法是如何创建一个直接缓冲区的:

1
2
3
public static ByteBuffer allocateDirect(int capacity) {
return new DirectByteBuffer(capacity);
}

这个方法直接创建了一个新的DirectByteBuffer对象,那么这个类又是怎么进行创建的呢?

image-20220424163028578

可以看到它并不是直接继承自ByteBuffer,而是MappedByteBuffer,并且实现了接口DirectBuffer,我们先来看看这个接口:

1
2
3
4
5
public interface DirectBuffer {
public long address(); //获取内存地址
public Object attachment(); //附加对象,这是为了保证某些情况下内存不被释放,我们后面细谈
public Cleaner cleaner(); //内存清理类
}
1
2
3
4
5
6
public abstract class MappedByteBuffer extends ByteBuffer {
//这三个方法目前暂时用不到,后面文件再说
public final MappedByteBuffer load();
public final boolean isLoaded();
public final MappedByteBuffer force();
}

接着我们来看看DirectByteBuffer类的成员变量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 把Unsafe类取出来
protected static final Unsafe unsafe = Bits.unsafe();

// 在内存中直接创建的内存空间地址
private static final long arrayBaseOffset = (long)unsafe.arrayBaseOffset(byte[].class);

// 是否具有非对齐访问能力,根据CPU架构而定,intel、AMD、AppleSilicon 都是支持的
protected static final boolean unaligned = Bits.unaligned();

// 直接缓冲区的内存地址,为了提升速度就放到Buffer类中去了
// protected long address;

// 附加对象,一会有大作用
private final Object att;

接着我们来看看构造方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
DirectByteBuffer(int cap) {                   // package-private
super(-1, 0, cap, cap);
boolean pa = VM.isDirectMemoryPageAligned(); //是否直接内存分页对齐,需要额外计算
int ps = Bits.pageSize();
long size = Math.max(1L, (long)cap + (pa ? ps : 0)); //计算出最终需要申请的大小
//判断堆外内存是否足够,够的话就作为保留内存
Bits.reserveMemory(size, cap);

long base = 0;
try {
//通过Unsafe申请内存空间,并得到内存地址
base = unsafe.allocateMemory(size);
} catch (OutOfMemoryError x) {
//申请失败就取消一开始的保留内存
Bits.unreserveMemory(size, cap);
throw x;
}
//批量将申请到的这一段内存每个字节都设定为0
unsafe.setMemory(base, size, (byte) 0);
if (pa && (base % ps != 0)) {
// Round up to page boundary
address = base + ps - (base & (ps - 1));
} else {
//将address变量(在Buffer中定义)设定为base的地址
address = base;
}
//创建一个针对于此缓冲区的Cleaner,由于是堆外内存,所以现在由它来进行内存清理
cleaner = Cleaner.create(this, new Deallocator(base, size, cap));
att = null;
}

可以看到在构造方法中,是直接通过Unsafe类来申请足够的堆外内存保存数据,那么当我们不使用此缓冲区时,内存会被如何清理呢?我们来看看这个Cleaner:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class Cleaner extends PhantomReference<Object>{ //继承自鬼引用,也就是说此对象会存放一个没有任何引用的对象

//引用队列,PhantomReference构造方法需要
private static final ReferenceQueue<Object> dummyQueue = new ReferenceQueue<>();

//执行清理的具体流程
private final Runnable thunk;

static private Cleaner first = null; //Cleaner双向链表,每创建一个Cleaner对象都会添加一个结点

private Cleaner
next = null,
prev = null;

private static synchronized Cleaner add(Cleaner cl) { //添加操作会让新来的变成新的头结点
if (first != null) {
cl.next = first;
first.prev = cl;
}
first = cl;
return cl;
}

//可以看到创建鬼引用的对象就是传进的缓冲区对象
private Cleaner(Object referent, Runnable thunk) {
super(referent, dummyQueue);
//清理流程实际上是外面的Deallocator
this.thunk = thunk;
}

//通过此方法创建一个新的Cleaner
public static Cleaner create(Object ob, Runnable thunk) {
if (thunk == null)
return null;
return add(new Cleaner(ob, thunk)); //调用add方法将Cleaner添加到队列
}

//清理操作
public void clean() {
if (!remove(this))
return; //进行清理操作时会从双向队列中移除当前Cleaner,false说明已经移除过了,直接return
try {
thunk.run(); //这里就是直接执行具体清理流程
} catch (final Throwable x) {
...
}
}

那么我们先来看看具体的清理程序在做些什么,Deallocator是在直接缓冲区中声明的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private static class Deallocator implements Runnable {

private static Unsafe unsafe = Unsafe.getUnsafe();

private long address; //内存地址
private long size; //大小
private int capacity; //申请的容量

private Deallocator(long address, long size, int capacity) {
assert (address != 0);
this.address = address;
this.size = size;
this.capacity = capacity;
}

public void run() { //具体的清理操作
if (address == 0) {
// Paranoia
return;
}
unsafe.freeMemory(address); //这里是直接调用了Unsafe进行内存释放操作
address = 0; //内存地址改为0,NULL
Bits.unreserveMemory(size, capacity); //取消一开始的保留内存
}
}

好了,现在我们可以明确在清理的时候实际上也是调用Unsafe类进行内存释放操作,那么,这个清理操作具体是在什么时候进行的呢?首先我们要明确,如果是普通的堆缓冲区,由于使用的数组,那么一旦此对象没有任何引用时,就随时都会被GC给回收掉,但是现在是堆外内存,只能我们手动进行内存回收,那么当DirectByteBuffer也失去引用时,会不会触发内存回收呢?

答案是可以的,还记得我们刚刚看到Cleaner是PhantomReference的子类吗,而DirectByteBuffer是被鬼引用的对象,而具体的清理操作是Cleaner类的clean方法,莫非这两者有什么联系吗?

你别说,还真有,我们直接看到PhantomReference的父类Reference,我们会发现这样一个类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private static class ReferenceHandler extends Thread {
...
static {
// 预加载并初始化 InterruptedException 和 Cleaner 类
// 以避免出现在循环运行过程中时由于内存不足而无法加载
ensureClassInitialized(InterruptedException.class);
ensureClassInitialized(Cleaner.class);
}

public void run() {
while (true) {
tryHandlePending(true); //这里是一个无限循环调用tryHandlePending方法
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
private T referent;         /* 会被GC回收的对象,也就是我们给过来被引用的对象 */

volatile ReferenceQueue<? super T> queue; //引用队列,可以和下面的next搭配使用,形成链表
//Reference对象也是一个一个连起来的节点,这样才能放到ReferenceQueue中形成链表
volatile Reference next;

//即将被GC的引用链表
transient private Reference<T> discovered; /* 由虚拟机操作 */

//pending与discovered一起构成了一个pending单向链表,标记为static类所有,pending为链表的头节点,discovered为链表当前
//Reference节点指向下一个节点的引用,这个队列是由JVM构建的,当对象除了被reference引用之外没有其它强引用了,JVM就会将指向
//需要回收的对象的Reference对象都放入到这个队列里面,这个队列会由下面的 Reference Hander 线程来处理。
private static Reference<Object> pending = null;
1
2
3
4
5
6
7
8
9
10
11
12
static {    //Reference类的静态代码块
ThreadGroup tg = Thread.currentThread().getThreadGroup();
for (ThreadGroup tgn = tg;
tgn != null;
tg = tgn, tgn = tg.getParent());
Thread handler = new ReferenceHandler(tg, "Reference Handler"); //在一开始的时候就会创建
handler.setPriority(Thread.MAX_PRIORITY); //以最高优先级启动
handler.setDaemon(true); //此线程直接作为一个守护线程
handler.start(); //也就是说在一开始的时候这个守护线程就会启动

...
}

那么也就是说Reference Handler线程是在一开始就启动了,那么我们的关注点可以放在tryHandlePending方法上,看看这玩意到底在做个啥:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
static boolean tryHandlePending(boolean waitForNotify) {
Reference<Object> r;
Cleaner c;
try {
synchronized (lock) { //加锁办事
//当Cleaner引用的DirectByteBuffer对象即将被回收时,pending会变成此Cleaner对象
//这里判断到pending不为null时就需要处理一下对象销毁了
if (pending != null) {
r = pending;
// 'instanceof' 有时会导致内存溢出,所以将r从链表中移除之前就进行类型判断
// 如果是Cleaner类型就给到c
c = r instanceof Cleaner ? (Cleaner) r : null;
// 将pending更新为链表下一个待回收元素
pending = r.discovered;
r.discovered = null; //r不再引用下一个节点
} else {
//否则就进入等待
if (waitForNotify) {
lock.wait();
}
return waitForNotify;
}
}
} catch (OutOfMemoryError x) {
Thread.yield();
return true;
} catch (InterruptedException x) {
return true;
}

// 如果元素是Cleaner类型,c在上面就会被赋值,这里就会执行其clean方法(破案了)
if (c != null) {
c.clean();
return true;
}

ReferenceQueue<? super Object> q = r.queue;
if (q != ReferenceQueue.NULL) q.enqueue(r); //这个是引用队列,实际上就是我们之前在JVM篇中讲解的入队机制
return true;
}

通过对源码的解读,我们就了解了直接缓冲区的内存加载释放整个流程。和堆缓冲区一样,当直接缓冲区没有任何强引用时,就有机会被GC正常回收掉并自动释放申请的内存。

我们接着来看看直接缓冲区的读写操作是如何进行的:

1
2
3
public byte get() {
return ((unsafe.getByte(ix(nextGetIndex())))); //直接通过Unsafe类读取对应地址上的byte数据
}
1
2
3
private long ix(int i) {
return address + ((long)i << 0); //ix现在是内存地址再加上i
}

我们接着来看看写操作:

1
2
3
4
public ByteBuffer put(byte x) {
unsafe.putByte(ix(nextPutIndex()), ((x)));
return this;
}

可以看到无论是读取还是写入操作都是通过Unsafe类操作对应的内存地址完成的。

那么它的复制操作是如何实现的呢?

1
2
3
4
5
6
7
8
public ByteBuffer duplicate() {
return new DirectByteBuffer(this,
this.markValue(),
this.position(),
this.limit(),
this.capacity(),
0);
}
1
2
3
4
5
6
7
8
DirectByteBuffer(DirectBuffer db,         // 这里给的db是进行复制操作的DirectByteBuffer对象
int mark, int pos, int lim, int cap,
int off) {
super(mark, pos, lim, cap);
address = db.address() + off; //直接继续使用之前申请的内存空间
cleaner = null; //因为用的是之前的内存空间,已经有对应的Cleaner了,这里不需要再搞一个
att = db; //将att设定为此对象
}

可以看到,如果是进行复制操作,那么会直接会继续使用执行复制操作的DirectByteBuffer申请的内存空间。不知道各位是否能够马上联想到一个问题,我们知道,如果执行复制操作的DirectByteBuffer对象失去了强引用被回收,那么就会触发Cleaner并进行内存释放,但是有个问题就是,这段内存空间可能复制出来的DirectByteBuffer对象还需要继续使用,这时肯定是不能进行回收的,所以说这里使用了att变量将之前的DirectByteBuffer对象进行引用,以防止其失去强引用被垃圾回收,所以只要不是原来的DirectByteBuffer对象和复制出来的DirectByteBuffer对象都失去强引用时,就不会导致这段内存空间被回收。

这样,我们之前的未解之谜为啥有个att也就得到答案了,有关直接缓冲区的介绍,就到这里为止。


通道

前面我们学习了NIO的基石——缓冲区,那么缓冲区具体用在什么地方呢,在本板块我们学习通道之后,相信各位就能知道了。那么,什么是通道呢?

在传统IO中,我们都是通过流进行传输,数据会源源不断从流中传出;而在NIO中,数据是放在缓冲区中进行管理,再使用通道将缓冲区中的数据传输到目的地。

通道接口层次

通道的根基接口是Channel,所以的派生接口和类都是从这里开始的,我们来看看它定义了哪些基本功能:

1
2
3
4
5
6
7
public interface Channel extends Closeable {
//通道是否处于开启状态
public boolean isOpen();

//因为通道开启也需要关闭,所以实现了Closeable接口,所以这个方法懂的都懂
public void close() throws IOException;
}

我们接着来看看它的一些子接口,首先是最基本的读写操作:

1
2
3
4
public interface ReadableByteChannel extends Channel {
//将通道中的数据读取到给定的缓冲区中
public int read(ByteBuffer dst) throws IOException;
}
1
2
3
4
public interface WritableByteChannel extends Channel {
//将给定缓冲区中的数据写入到通道中
public int write(ByteBuffer src) throws IOException;
}

有了读写功能后,最后整合为了一个ByteChannel接口:

1
2
3
public interface ByteChannel extends ReadableByteChannel, WritableByteChannel{

}

image-20220425092355354

在ByteChannel之下,还有更多的派生接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//允许保留position和更改position的通道,以及对通道连接实体的相关操作
public interface SeekableByteChannel extends ByteChannel {
...

//获取当前的position
long position() throws IOException;

//修改当前的position
SeekableByteChannel position(long newPosition) throws IOException;

//返回此通道连接到的实体(比如文件)的当前大小
long size() throws IOException;

//将此通道连接到的实体截断(比如文件,截断之后,文件后面一半就没了)为给定大小
SeekableByteChannel truncate(long size) throws IOException;
}

接着我们来看,除了读写之外,Channel还可以具有响应中断的能力:

1
2
3
4
public interface InterruptibleChannel extends Channel {
//当其他线程调用此方法时,在此通道上处于阻塞状态的线程会直接抛出 AsynchronousCloseException 异常
public void close() throws IOException;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
//这是InterruptibleChannel的抽象实现,完成了一部分功能
public abstract class AbstractInterruptibleChannel implements Channel, InterruptibleChannel {
//加锁关闭操作用到
private final Object closeLock = new Object();
//当前Channel的开启状态
private volatile boolean open = true;

protected AbstractInterruptibleChannel() { }

//关闭操作实现
public final void close() throws IOException {
synchronized (closeLock) { //同时只能有一个线程进行此操作,加锁
if (!open) //如果已经关闭了,那么就不用继续了
return;
open = false; //开启状态变成false
implCloseChannel(); //开始关闭通道
}
}

//该方法由 close 方法调用,以执行关闭通道的具体操作,仅当通道尚未关闭时才调用此方法,不会多次调用。
protected abstract void implCloseChannel() throws IOException;

public final boolean isOpen() {
return open;
}

//开始阻塞(有可能一直阻塞下去)操作之前,需要调用此方法进行标记,
protected final void begin() {
...
}

//阻塞操作结束之后,也需要需要调用此方法,为了防止异常情况导致此方法没有被调用,建议放在finally中
protected final void end(boolean completed)
...
}

...
}

而之后的一些实现类,都是基于这些接口定义的方法去进行实现的,比如FileChannel:

image-20220426090845530

这样,我们就大致了解了一下通道相关的接口定义,那么我来看看具体是如何如何使用的。

比如现在我们要实现从输入流中读取数据然后打印出来,那么之前传统IO的写法:

1
2
3
4
5
6
7
8
9
10
11
12
public static void main(String[] args) throws IOException {
//数组创建好,一会用来存放从流中读取到的数据
byte[] data = new byte[10];
//直接使用输入流
InputStream in = System.in;
while (true) {
int len;
while ((len = in.read(data)) >= 0) { //将输入流中的数据一次性读取到数组中
System.out.print("读取到一批数据:"+new String(data, 0, len)); //读取了多少打印多少
}
}
}

而现在我们使用通道之后:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static void main(String[] args) throws IOException {
//缓冲区创建好,一会就靠它来传输数据
ByteBuffer buffer = ByteBuffer.allocate(10);
//将System.in作为输入源,一会Channel就可以从这里读取数据,然后通过缓冲区装载一次性传递数据
ReadableByteChannel readChannel = Channels.newChannel(System.in);
while (true) {
//将通道中的数据写到缓冲区中,缓冲区最多一次装10个
readChannel.read(buffer);
//写入操作结束之后,需要进行翻转,以便接下来的读取操作
buffer.flip();
//最后转换成String打印出来康康
System.out.println("读取到一批数据:"+new String(buffer.array(), 0, buffer.remaining()));
//回到最开始的状态
buffer.clear();
}
}

乍一看,好像感觉也没啥区别,不就是把数组换成缓冲区了吗,效果都是一样的,数据也是从Channel中读取得到,并且通过缓冲区进行数据装载然后得到结果,但是,Channel不像流那样是单向的,它就像它的名字一样,一个通道可以从一端走到另一端,也可以从另一端走到这一端,我们后面进行介绍。

文件传输FileChannel

前面我们介绍了通道的基本情况,这里我们就来尝试实现一下文件的读取和写入,在传统IO中,文件的写入和输出都是依靠FileOutputStream和FileInputStream来完成的:

1
2
3
4
5
6
7
8
9
10
11
12
public static void main(String[] args) throws IOException {
try(FileOutputStream out = new FileOutputStream("test.txt");
FileInputStream in = new FileInputStream("test.txt")){
String data = "伞兵一号卢本伟准备就绪!";
out.write(data.getBytes()); //向文件的输出流中写入数据,也就是把数据写到文件中
out.flush();

byte[] bytes = new byte[in.available()];
in.read(bytes); //从文件的输入流中读取文件的信息
System.out.println(new String(bytes));
}
}

而现在,我们只需要通过一个FileChannel就可以完成这两者的操作,获取文件通道的方式有以下几种:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static void main(String[] args) throws IOException {
//1. 直接通过输入或输出流获取对应的通道
FileInputStream in = new FileInputStream("test.txt");
//但是这里的通道只支持读取或是写入操作
FileChannel channel = in.getChannel();
//创建一个容量为128的缓冲区
ByteBuffer buffer = ByteBuffer.allocate(128);
//从通道中将数据读取到缓冲区中
channel.read(buffer);
//翻转一下,接下来要读取了
buffer.flip();

System.out.println(new String(buffer.array(), 0, buffer.remaining()));
}

可以看到通过输入流获取的文件通道读取是没有任何问题的,但是写入操作:

1
2
3
4
5
6
7
8
public static void main(String[] args) throws IOException {
//1. 直接通过输入或输出流获取对应的通道
FileInputStream in = new FileInputStream("test.txt");
//但是这里的通道只支持读取或是写入操作
FileChannel channel = in.getChannel();
//尝试写入一下
channel.write(ByteBuffer.wrap("伞兵一号卢本伟准备就绪!".getBytes()));
}

image-20220426103818019

直接报错,说明只支持读取操作,那么输出流呢?

1
2
3
4
5
6
7
8
public static void main(String[] args) throws IOException {
//1. 直接通过输入或输出流获取对应的通道
FileOutputStream out = new FileOutputStream("test.txt");
//但是这里的通道只支持读取或是写入操作
FileChannel channel = out.getChannel();
//尝试写入一下
channel.write(ByteBuffer.wrap("伞兵一号卢本伟准备就绪!".getBytes()));
}

可以看到能够正常进行写入,但是读取呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static void main(String[] args) throws IOException {
//1. 直接通过输入或输出流获取对应的通道
FileOutputStream out = new FileOutputStream("test.txt");
//但是这里的通道只支持读取或是写入操作
FileChannel channel = out.getChannel();

ByteBuffer buffer = ByteBuffer.allocate(128);
//从通道中将数据读取到缓冲区中
channel.read(buffer);
//翻转一下,接下来要读取了
buffer.flip();

System.out.println(new String(buffer.array(), 0, buffer.remaining()));
}

image-20220426104016649

可以看到输出流生成的Channel又不支持读取,所以说本质上还是保持着输入输出流的特性,但是之前不是说Channel又可以输入又可以输出吗?这里我们来看看第二种方式:

1
2
//RandomAccessFile能够支持文件的随机访问,并且实现了数据流
public class RandomAccessFile implements DataOutput, DataInput, Closeable {

我们可以通过RandomAccessFile来创建通道:

1
2
3
4
5
6
7
8
9
10
11
12
public static void main(String[] args) throws IOException {
/*
通过RandomAccessFile进行创建,注意后面的mode有几种:
r 以只读的方式使用
rw 读操作和写操作都可以
rws 每当进行写操作,同步的刷新到磁盘,刷新内容和元数据
rwd 每当进行写操作,同步的刷新到磁盘,刷新内容
*/
try(RandomAccessFile f = new RandomAccessFile("test.txt", "")){

}
}

现在我们来测试一下它的读写操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public static void main(String[] args) throws IOException {
/*
通过RandomAccessFile进行创建,注意后面的mode有几种:
r 以只读的方式使用
rw 读操作和写操作都可以
rws 每当进行写操作,同步的刷新到磁盘,刷新内容和元数据
rwd 每当进行写操作,同步的刷新到磁盘,刷新内容
*/
try(RandomAccessFile f = new RandomAccessFile("test.txt", "rw"); //这里设定为支持读写,这样创建的通道才能具有这些功能
FileChannel channel = f.getChannel()){ //通过RandomAccessFile创建一个通道
channel.write(ByteBuffer.wrap("伞兵二号马飞飞准备就绪!".getBytes()));

System.out.println("写操作完成之后文件访问位置:"+channel.position()); //注意读取也是从现在的位置开始
channel.position(0); //需要将位置变回到最前面,这样下面才能从文件的最开始进行读取

ByteBuffer buffer = ByteBuffer.allocate(128);
channel.read(buffer);
buffer.flip();

System.out.println(new String(buffer.array(), 0, buffer.remaining()));
}
}

可以看到,一个FileChannel既可以完成文件读取,也可以完成文件的写入。

除了基本的读写操作,我们也可以直接对文件进行截断:

1
2
3
4
5
6
7
8
9
10
11
12
public static void main(String[] args) throws IOException {
try(RandomAccessFile f = new RandomAccessFile("test.txt", "rw");
FileChannel channel = f.getChannel()){
//截断文件,只留前20个字节
channel.truncate(20);

ByteBuffer buffer = ByteBuffer.allocate(128);
channel.read(buffer);
buffer.flip();
System.out.println(new String(buffer.array(), 0, buffer.remaining()));
}
}

可以看到文件的内容直接被截断了,文件内容就只剩一半了。

当然,如果我们要进行文件的拷贝,也是很方便的,只需要使用通道就可以,比如我们现在需要将一个通道的数据写入到另一个通道,就可以直接使用transferTo方法:

1
2
3
4
5
6
7
8
public static void main(String[] args) throws IOException {
try(FileOutputStream out = new FileOutputStream("test2.txt");
FileInputStream in = new FileInputStream("test.txt")){

FileChannel inChannel = in.getChannel(); //获取到test文件的通道
inChannel.transferTo(0, inChannel.size(), out.getChannel()); //直接将test文件通道中的数据转到test2文件的通道中
}
}

可以看到执行后,文件的内容全部被复制到另一个文件了。

当然,反向操作也是可以的:

1
2
3
4
5
6
7
8
public static void main(String[] args) throws IOException {
try(FileOutputStream out = new FileOutputStream("test2.txt");
FileInputStream in = new FileInputStream("test.txt")){

FileChannel inChannel = in.getChannel(); //获取到test文件的通道
out.getChannel().transferFrom(inChannel, 0, inChannel.size()); //直接将从test文件通道中传来的数据转给test2文件的通道
}
}

当我们要编辑某个文件时,通过使用MappedByteBuffer类,可以将其映射到内存中进行编辑,编辑的内容会同步更新到文件中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//注意一定要是可写的,不然无法进行修改操作
try(RandomAccessFile f = new RandomAccessFile("test.txt", "rw");
FileChannel channel = f.getChannel()){

//通过map方法映射文件的某一段内容,创建MappedByteBuffer对象
//比如这里就是从第四个字节开始,映射10字节内容到内存中
//注意这里需要使用MapMode.READ_WRITE模式,其他模式无法保存数据到文件
MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_WRITE, 4, 10);

//我们可以直接对在内存中的数据进行编辑,也就是编辑Buffer中的内容
//注意这里写入也是从pos位置开始的,默认是从0开始,相对于文件就是从第四个字节开始写
//注意我们只映射了10个字节,也就是写的内容不能超出10字节了
buffer.put("yyds".getBytes());

//编辑完成后,通过force方法将数据写回文件的映射区域
buffer.force();
}

可以看到,文件的某一个区域已经被我们修改了,并且这里实际上使用的就是DirectByteBuffer直接缓冲区,效率还是很高的。

文件锁FileLock

我们可以创建一个跨进程文件锁来防止多个进程之间的文件争抢操作(注意这里是进程,不是线程)FileLock是文件锁,它能保证同一时间只有一个进程(程序)能够修改它,或者都只可以读,这样就解决了多进程间的同步文件,保证了安全性。但是需要注意的是,它进程级别的,不是线程级别的,他可以解决多个进程并发访问同一个文件的问题,但是它不适用于控制同一个进程中多个线程对一个文件的访问。

那么我们来看看如何使用文件锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static void main(String[] args) throws IOException, InterruptedException {
//创建RandomAccessFile对象,并拿到Channel
RandomAccessFile f = new RandomAccessFile("test.txt", "rw");
FileChannel channel = f.getChannel();
System.out.println(new Date() + " 正在尝试获取文件锁...");
//接着我们直接使用lock方法进行加锁操作(如果其他进程已经加锁,那么会一直阻塞在这里)
//加锁操作支持对文件的某一段进行加锁,比如这里就是从0开始后的6个字节加锁,false代表这是一把独占锁
//范围锁甚至可以提前加到一个还未写入的位置上
FileLock lock = channel.lock(0, 6, false);
System.out.println(new Date() + " 已获取到文件锁!");
Thread.sleep(5000); //假设要处理5秒钟
System.out.println(new Date() + " 操作完毕,释放文件锁!");

//操作完成之后使用release方法进行锁释放
lock.release();
}

有关共享锁和独占锁:

  • 进程对文件加独占锁后,当前进程对文件可读可写,独占此文件,其它进程是不能读该文件进行读写操作的。
  • 进程对文件加共享锁后,进程可以对文件进行读操作,但是无法进行写操作,共享锁可以被多个进程添加,但是只要存在共享锁,就不能添加独占锁。

现在我们来启动两个进程试试看,我们需要在IDEA中配置一下两个启动项:

image-20220426153541728

现在我们依次启动它们:

image-20220426153636218

image-20220426153648363

可以看到确实是两个进程同一时间只能有一个进行访问,而另一个需要等待锁释放。

那么如果我们申请的是文件的不同部分呢?

1
2
3
4
//其中一个进程锁 0 - 5
FileLock lock = channel.lock(0, 6, false);
//另一个进程锁 6 - 11
FileLock lock = channel.lock(6, 6, false);

可以看到,两个进程这时就可以同时进行加锁操作了,因为它们锁的是不同的段落。

那么要是交叉呢?

1
2
3
4
//其中一个进程锁 0 - 5
FileLock lock = channel.lock(0, 6, false);
//另一个进程锁 3 - 8
FileLock lock = channel.lock(3, 6, false);

可以看到交叉的情况下也是会出现阻塞的。

接着我们来看看共享锁,共享锁允许多个进程同时加锁,但是不能进行写操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static void main(String[] args) throws IOException, InterruptedException {
RandomAccessFile f = new RandomAccessFile("test.txt", "rw");
FileChannel channel = f.getChannel();
System.out.println(new Date() + " 正在尝试获取文件锁...");
//现在使用共享锁
FileLock lock = channel.lock(0, Long.MAX_VALUE, true);
System.out.println(new Date() + " 已获取到文件锁!");
//进行写操作
channel.write(ByteBuffer.wrap(new Date().toString().getBytes()));

System.out.println(new Date() + " 操作完毕,释放文件锁!");
//操作完成之后使用release方法进行锁释放
lock.release();
}

当我们进行写操作时:

image-20220426223636761

可以看到直接抛出异常,说另一个程序已锁定文件的一部分,进程无法访问(某些系统或是环境实测无效,比如UP主的arm架构MacOS就不生效,这个异常是在Windows环境下运行得到的)

当然,我们也可以测试一下多个进行同时加共享锁:

1
2
3
4
5
6
7
8
9
10
11
12
public static void main(String[] args) throws IOException, InterruptedException {
RandomAccessFile f = new RandomAccessFile("test.txt", "rw");
FileChannel channel = f.getChannel();
System.out.println(new Date() + " 正在尝试获取文件锁...");

FileLock lock = channel.lock(0, Long.MAX_VALUE, true);
System.out.println(new Date() + " 已获取到文件锁!");
Thread.sleep(5000); //假设要处理5秒钟
System.out.println(new Date() + " 操作完毕,释放文件锁!");

lock.release();
}

可以看到结果是多个进程都能加共享锁:

image-20220426224938834

image-20220426224954291

当然,除了直接使用lock()方法进行加锁之外,我们也可以使用tryLock()方法以非阻塞方式获取文件锁,但是如果获取锁失败会得到null:

1
2
3
4
5
6
7
8
9
10
11
public static void main(String[] args) throws IOException, InterruptedException {
RandomAccessFile f = new RandomAccessFile("test.txt", "rw");
FileChannel channel = f.getChannel();
System.out.println(new Date() + " 正在尝试获取文件锁...");

FileLock lock = channel.tryLock(0, Long.MAX_VALUE, false);
System.out.println(lock);
Thread.sleep(5000); //假设要处理5秒钟

lock.release();
}

可以看到,两个进程都去尝试获取独占锁:

image-20220426230102206

image-20220426230117926

第一个成功加锁的进程获得了对应的锁对象,而第二个进程直接得到的是null

到这里,有关文件锁的相关内容就差不多了。


多路复用网络通信

前面我们已经介绍了NIO框架的两大核心:Buffer和Channel,我们接着来看看最后一个内容。

传统阻塞I/O网络通信

说起网络通信,相信各位并不陌生,正是因为网络的存在我们才能走进现代化的社会,在JavaWeb阶段,我们学习了如何使用Socket建立TCP连接进行网络通信:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static void main(String[] args) {
try(ServerSocket server = new ServerSocket(8080)){ //将服务端创建在端口8080上
System.out.println("正在等待客户端连接...");
Socket socket = server.accept();
System.out.println("客户端已连接,IP地址为:"+socket.getInetAddress().getHostAddress());
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); //通过
System.out.print("接收到客户端数据:");
System.out.println(reader.readLine());
OutputStreamWriter writer = new OutputStreamWriter(socket.getOutputStream());
writer.write("已收到!");
writer.flush();
}catch (IOException e){
e.printStackTrace();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static void main(String[] args) {
try (Socket socket = new Socket("localhost", 8080);
Scanner scanner = new Scanner(System.in)){
System.out.println("已连接到服务端!");
OutputStream stream = socket.getOutputStream();
OutputStreamWriter writer = new OutputStreamWriter(stream); //通过转换流来帮助我们快速写入内容
System.out.println("请输入要发送给服务端的内容:");
String text = scanner.nextLine();
writer.write(text+'\n'); //因为对方是readLine()这里加个换行符
writer.flush();
System.out.println("数据已发送:"+text);
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
System.out.println("收到服务器返回:"+reader.readLine());
}catch (IOException e){
System.out.println("服务端连接失败!");
e.printStackTrace();
}finally {
System.out.println("客户端断开连接!");
}
}

当然,我们也可以使用前面讲解的通道来进行通信:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public static void main(String[] args) {
//创建一个新的ServerSocketChannel,一会直接使用SocketChannel进行网络IO操作
try (ServerSocketChannel serverChannel = ServerSocketChannel.open()){
//依然是将其绑定到8080端口
serverChannel.bind(new InetSocketAddress(8080));
//同样是调用accept()方法,阻塞等待新的连接到来
SocketChannel socket = serverChannel.accept();
//因为是通道,两端的信息都是可以明确的,这里获取远端地址,当然也可以获取本地地址
System.out.println("客户端已连接,IP地址为:"+socket.getRemoteAddress());

//使用缓冲区进行数据接收
ByteBuffer buffer = ByteBuffer.allocate(128);
socket.read(buffer); //SocketChannel同时实现了读写通道接口,所以可以直接进行双向操作
buffer.flip();
System.out.print("接收到客户端数据:"+new String(buffer.array(), 0, buffer.remaining()));

//直接向通道中写入数据就行
socket.write(ByteBuffer.wrap("已收到!".getBytes()));

//记得关
socket.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public static void main(String[] args) {
//创建一个新的SocketChannel,一会通过通道进行通信
try (SocketChannel channel = SocketChannel.open(new InetSocketAddress("localhost", 8080));
Scanner scanner = new Scanner(System.in)){
System.out.println("已连接到服务端!");
System.out.println("请输入要发送给服务端的内容:");
String text = scanner.nextLine();
//直接向通道中写入数据,真舒服
channel.write(ByteBuffer.wrap(text.getBytes()));

ByteBuffer buffer = ByteBuffer.allocate(128);
channel.read(buffer); //直接从通道中读取数据
buffer.flip();
System.out.println("收到服务器返回:"+new String(buffer.array(), 0, buffer.remaining()));
} catch (IOException e) {
throw new RuntimeException(e);
}
}

虽然可以通过传统的Socket进行网络通信,但是我们发现,如果要进行IO操作,我们需要单独创建一个线程来进行处理,比如现在有很多个客户端,服务端需要同时进行处理,那么如果我们要处理这些客户端的请求,那么我们就只能单独为其创建一个线程来进行处理:

image-20220427165019293

虽然这样看起来比较合理,但是随着客户端数量的增加,如果要保持持续通信,那么就不能摧毁这些线程,而是需要一直保留(但是实际上很多时候只是保持连接,一直在阻塞等待客户端的读写操作,IO操作的频率很低,这样就白白占用了一条线程,很多时候都是站着茅坑不拉屎),但是我们的线程不可能无限制的进行创建,总有一天会耗尽服务端的资源,那么现在怎么办呢,关键是现在又有很多客户端源源不断地连接并进行操作,这时,我们就可以利用NIO为我们提供的多路复用编程模型。

我们来看看NIO为我们提供的模型:

image-20220427170247004

服务端不再是一个单纯通过accept()方法来创建连接的机制了,而是根据客户端不同的状态,Selector会不断轮询,只有客户端在对应的状态时,比如真正开始读写操作时,才会创建线程或进行处理(这样就不会一直阻塞等待某个客户端的IO操作了),而不是创建之后需要一直保持连接,即使没有任何的读写操作。这样就不会因为占着茅坑不拉屎导致线程无限制地创建下去了。

通过这种方式,甚至单线程都能做到高效的复用,最典型的例子就是Redis了,因为内存的速度非常快,多线程上下文的开销就会显得有些拖后腿,还不如直接单线程简单高效,这也是为什么Redis单线程也能这么快的原因。

因此,我们就从NIO框架的第三个核心内容:Selector,开始讲起。

选择器与I/O多路复用

前面我们大概了解了一下选择器,我们知道,选择器是当具体有某一个状态(比如读、写、请求)已经就绪时,才会进行处理,而不是让我们的程序主动地进行等待。

既然我们现在需要实现IO多路复用,那么我们来看看常见的IO多路复用模型,也就是Selector的实现方案,比如现在有很多个用户连接到我们的服务器:

  • select:当这些连接出现具体的某个状态时,只是知道已经就绪了,但是不知道详具体是哪一个连接已经就绪,每次调用都进行线性遍历所有连接,时间复杂度为O(n),并且存在最大连接数限制。
  • poll:同上,但是由于底层采用链表,所以没有最大连接数限制。
  • epoll:采用事件通知方式,当某个连接就绪,能够直接进行精准通知(这是因为在内核实现中epoll是根据每个fd上面的callback函数实现的,只要就绪会会直接回调callback函数,实现精准通知,但是只有Linux支持这种方式),时间复杂度O(1),Java在Linux环境下正是采用的这种模式进行实现的。

好了,既然多路复用模型了解完毕了,那么我们就来看看如何让我们的网络通信实现多路复用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public static void main(String[] args) {
try (ServerSocketChannel serverChannel = ServerSocketChannel.open();
Selector selector = Selector.open()){ //开启一个新的Selector,这玩意也是要关闭释放资源的
serverChannel.bind(new InetSocketAddress(8080));
//要使用选择器进行操作,必须使用非阻塞的方式,这样才不会像阻塞IO那样卡在accept(),而是直接通过,让选择器去进行下一步操作
serverChannel.configureBlocking(false);
//将选择器注册到ServerSocketChannel中,后面是选择需要监听的时间,只有发生对应事件时才会进行选择,多个事件用 | 连接,注意,并不是所有的Channel都支持以下全部四个事件,可能只支持部分
//因为是ServerSocketChannel这里我们就监听accept就可以了,等待客户端连接
//SelectionKey.OP_CONNECT --- 连接就绪事件,表示客户端与服务器的连接已经建立成功
//SelectionKey.OP_ACCEPT --- 接收连接事件,表示服务器监听到了客户连接,服务器可以接收这个连接了
//SelectionKey.OP_READ --- 读 就绪事件,表示通道中已经有了可读的数据,可以执行读操作了
//SelectionKey.OP_WRITE --- 写 就绪事件,表示已经可以向通道写数据了(这玩意比较特殊,一般情况下因为都是可以写入的,所以可能会无限循环)
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) { //无限循环等待新的用户网络操作
//每次选择都可能会选出多个已经就绪的网络操作,没有操作时会暂时阻塞
int count = selector.select();
System.out.println("监听到 "+count+" 个事件");
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
//根据不同的事件类型,执行不同的操作即可
if(key.isAcceptable()) { //如果当前ServerSocketChannel已经做好准备处理Accept
SocketChannel channel = serverChannel.accept();
System.out.println("客户端已连接,IP地址为:"+channel.getRemoteAddress());
//现在连接就建立好了,接着我们需要将连接也注册选择器,比如我们需要当这个连接有内容可读时就进行处理
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_READ);
//这样就在连接建立时完成了注册
} else if(key.isReadable()) { //如果当前连接有可读的数据并且可以写,那么就开始处理
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(128);
channel.read(buffer);
buffer.flip();
System.out.println("接收到客户端数据:"+new String(buffer.array(), 0, buffer.remaining()));

//直接向通道中写入数据就行
channel.write(ByteBuffer.wrap("已收到!".getBytes()));
//别关,说不定用户还要继续通信呢
}
//处理完成后,一定记得移出迭代器,不然下次还有
iterator.remove();
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}

接着我们来编写一下客户客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static void main(String[] args) {
//创建一个新的SocketChannel,一会通过通道进行通信
try (SocketChannel channel = SocketChannel.open(new InetSocketAddress("localhost", 8080));
Scanner scanner = new Scanner(System.in)){
System.out.println("已连接到服务端!");
while (true) { //咱给它套个无限循环,这样就能一直发消息了
System.out.println("请输入要发送给服务端的内容:");
String text = scanner.nextLine();
//直接向通道中写入数据,真舒服
channel.write(ByteBuffer.wrap(text.getBytes()));
System.out.println("已发送!");
ByteBuffer buffer = ByteBuffer.allocate(128);
channel.read(buffer); //直接从通道中读取数据
buffer.flip();
System.out.println("收到服务器返回:"+new String(buffer.array(), 0, buffer.remaining()));
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}

我们来看看效果:

image-20220504155104437

image-20220504155116276

可以看到成功实现了,当然各位也可以跟自己的室友一起开客户端进行测试,现在,我们只用了一个线程,就能够同时处理多个请求,可见多路复用是多么重要。

实现Reactor模式

前面我们简单实现了多路复用网络通信,我们接着来了解一下Reactor模式,对我们的服务端进行优化。

现在我们来看看如何进行优化,我们首先抽象出两个组件,Reactor线程和Handler处理器:

  • Reactor线程:负责响应IO事件,并分发到Handler处理器。新的事件包含连接建立就绪、读就绪、写就绪等。
  • Handler处理器:执行非阻塞的操作。

实际上我们之前编写的算是一种单线程Reactor的朴素模型(面向过程的写法),我们来看看标准的写法:

image-20220504163417826

客户端还是按照我们上面的方式连接到Reactor,并通过选择器走到Acceptor或是Handler,Acceptor主要负责客户端连接的建立,Handler负责读写操作,代码如下,首先是Handler:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class Handler implements Runnable{

private final SocketChannel channel;

public Handler(SocketChannel channel) {
this.channel = channel;
}

@Override
public void run() {
try {
ByteBuffer buffer = ByteBuffer.allocate(128);
channel.read(buffer);
buffer.flip();
System.out.println("接收到客户端数据:"+new String(buffer.array(), 0, buffer.remaining()));
channel.write(ByteBuffer.wrap("已收到!".getBytes()));
}catch (IOException e){
e.printStackTrace();
}
}
}

接着是Acceptor,实际上就是把上面的业务代码搬个位置罢了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* Acceptor主要用于处理连接操作
*/
public class Acceptor implements Runnable{

private final ServerSocketChannel serverChannel;
private final Selector selector;

public Acceptor(ServerSocketChannel serverChannel, Selector selector) {
this.serverChannel = serverChannel;
this.selector = selector;
}

@Override
public void run() {
try{
SocketChannel channel = serverChannel.accept();
System.out.println("客户端已连接,IP地址为:"+channel.getRemoteAddress());
channel.configureBlocking(false);
//这里在注册时,创建好对应的Handler,这样在Reactor中分发的时候就可以直接调用Handler了
channel.register(selector, SelectionKey.OP_READ, new Handler(channel));
}catch (IOException e){
e.printStackTrace();
}
}
}

这里我们在注册时丢了一个附加对象进去,这个附加对象会在选择器选择到此通道上时,可以通过attachment()方法进行获取,对于我们简化代码有大作用,一会展示,我们接着来看看Reactor:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class Reactor implements Closeable, Runnable{

private final ServerSocketChannel serverChannel;
private final Selector selector;
public Reactor() throws IOException{
serverChannel = ServerSocketChannel.open();
selector = Selector.open();
}

@Override
public void run() {
try {
serverChannel.bind(new InetSocketAddress(8080));
serverChannel.configureBlocking(false);
//注册时,将Acceptor作为附加对象存放,当选择器选择后也可以获取到
serverChannel.register(selector, SelectionKey.OP_ACCEPT, new Acceptor(serverChannel, selector));
while (true) {
int count = selector.select();
System.out.println("监听到 "+count+" 个事件");
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
this.dispatch(iterator.next()); //通过dispatch方法进行分发
iterator.remove();
}
}
}catch (IOException e) {
e.printStackTrace();
}
}

//通过此方法进行分发
private void dispatch(SelectionKey key){
Object att = key.attachment(); //获取attachment,ServerSocketChannel和对应的客户端Channel都添加了的
if(att instanceof Runnable) {
((Runnable) att).run(); //由于Handler和Acceptor都实现自Runnable接口,这里就统一调用一下
} //这样就实现了对应的时候调用对应的Handler或是Acceptor了
}

//用了记得关,保持好习惯,就像看完视频要三连一样
@Override
public void close() throws IOException {
serverChannel.close();
selector.close();
}
}

最后我们编写一下主类:

1
2
3
4
5
6
7
8
public static void main(String[] args) {
//创建Reactor对象,启动,完事
try (Reactor reactor = new Reactor()){
reactor.run();
}catch (IOException e) {
e.printStackTrace();
}
}

这样,我们就实现了单线程Reactor模式,注意全程使用到的都只是一个线程,没有创建新的线程来处理任何事情。

但是单线程始终没办法应对大量的请求,如果请求量上去了,单线程还是很不够用,接着我们来看看多线程Reactor模式,它创建了多个线程处理,我们可以将数据读取完成之后的操作交给线程池来执行:

image-20220504171307721

其实我们只需要稍微修改一下Handler就行了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class Handler implements Runnable{
//把线程池给安排了,10个线程
private static final ExecutorService POOL = Executors.newFixedThreadPool(10);
private final SocketChannel channel;
public Handler(SocketChannel channel) {
this.channel = channel;
}

@Override
public void run() {
try {
ByteBuffer buffer = ByteBuffer.allocate(1024);
channel.read(buffer);
buffer.flip();
POOL.submit(() -> {
try {
System.out.println("接收到客户端数据:"+new String(buffer.array(), 0, buffer.remaining()));
channel.write(ByteBuffer.wrap("已收到!".getBytes()));
}catch (IOException e){
e.printStackTrace();
}
});
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

这样,在数据读出之后,就可以将数据处理交给线程池执行。

但是这样感觉还是划分的不够,一个Reactor需要同时处理来自客户端的所有操作请求,显得有些乏力,那么不妨我们将Reactor做成一主多从的模式,让主Reactor只负责Accept操作,而其他的Reactor进行各自的其他操作:

image-20220505131410997

现在我们来重新设计一下我们的代码,Reactor类就作为主节点,不进行任何修改,我们来修改一下其他的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
//SubReactor作为从Reactor
public class SubReactor implements Runnable, Closeable {
//每个从Reactor也有一个Selector
private final Selector selector;

//创建一个4线程的线程池,也就是四个从Reactor工作
private static final ExecutorService POOL = Executors.newFixedThreadPool(4);
private static final SubReactor[] reactors = new SubReactor[4];
private static int selectedIndex = 0; //采用轮询机制,每接受一个新的连接,就轮询分配给四个从Reactor
static { //在一开始的时候就让4个从Reactor跑起来
for (int i = 0; i < 4; i++) {
try {
reactors[i] = new SubReactor();
POOL.submit(reactors[i]);
} catch (IOException e) {
e.printStackTrace();
}
}
}
//轮询获取下一个Selector(Acceptor用)
public static Selector nextSelector(){
Selector selector = reactors[selectedIndex].selector;
selectedIndex = (selectedIndex + 1) % 4;
return selector;
}

private SubReactor() throws IOException {
selector = Selector.open();
}

@Override
public void run() {
try { //启动后直接等待selector监听到对应的事件即可,其他的操作逻辑和Reactor一致
while (true) {
int count = selector.select();
System.out.println(Thread.currentThread().getName()+" >> 监听到 "+count+" 个事件");
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
this.dispatch(iterator.next());
iterator.remove();
}
}
}catch (IOException e) {
e.printStackTrace();
}
}

private void dispatch(SelectionKey key){
Object att = key.attachment();
if(att instanceof Runnable) {
((Runnable) att).run();
}
}

@Override
public void close() throws IOException {
selector.close();
}
}

我们接着来修改一下Acceptor类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class Acceptor implements Runnable{

private final ServerSocketChannel serverChannel; //只需要一个ServerSocketChannel就行了

public Acceptor(ServerSocketChannel serverChannel) {
this.serverChannel = serverChannel;
}

@Override
public void run() {
try{
SocketChannel channel = serverChannel.accept(); //还是正常进行Accept操作,得到SocketChannel
System.out.println(Thread.currentThread().getName()+" >> 客户端已连接,IP地址为:"+channel.getRemoteAddress());
channel.configureBlocking(false);
Selector selector = SubReactor.nextSelector(); //选取下一个从Reactor的Selector
selector.wakeup(); //在注册之前唤醒一下防止卡死
channel.register(selector, SelectionKey.OP_READ, new Handler(channel)); //注意现在注册的是从Reactor的Selector
}catch (IOException e){
e.printStackTrace();
}
}
}

现在,SocketChannel相关的操作就由从Reactor进行处理了,而不是一律交给主Reactor进行操作。

至此,我们已经了解了NIO的三大组件:Buffer、Channel、Selector,有关NIO基础相关的内容,就讲解到这里。下一章我们将继续讲解基于NIO实现的高性能网络通信框架Netty。

Netty框架

前面我们学习了Java为我们提供的NIO框架,提供使用NIO提供的三大组件,我们就可以编写更加高性能的客户端/服务端网络程序了,甚至还可以自行规定一种通信协议进行通信。

NIO框架存在的问题

但是之前我们在使用NIO框架的时候,还是发现了一些问题,我们先来盘点一下。

客户端关闭导致服务端空轮询

可能在之前的实验中,你发现了这样一个问题:

image-20220506214320210

当我们的客户端主动与服务端断开连接时,会导致READ事件一直被触发,也就是说selector.select()会直接通过,并且是可读的状态,但是我们发现实际上读到是数据是一个空的(上面的图中在空轮询两次后抛出异常了,也有可能是无限的循环下去)所以这里我们得稍微处理一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
} else if(key.isReadable()) {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(128);
//这里我们需要判断一下,如果read操作得到的结果是-1,那么说明服务端已经断开连接了
if(channel.read(buffer) < 0) {
System.out.println("客户端已经断开连接了:"+channel.getRemoteAddress());
channel.close(); //直接关闭此通道
continue; //继续进行选择
}
buffer.flip();
System.out.println("接收到客户端数据:"+new String(buffer.array(), 0, buffer.remaining()));
channel.write(ByteBuffer.wrap("已收到!".getBytes()));
}

这样,我们就可以在客户端主动断开时关闭连接了:

image-20220506222006550

当然,除了这种情况可能会导致空轮询之外,实际上还有一种可能,这种情况是NIO框架本身的BUG:

1
2
3
4
5
while (true) {
int count = selector.select(); //由于底层epoll机制的问题,导致select方法可能会一直返回0,造成无限循环的情况。
System.out.println("监听到 "+count+" 个事件");
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();

详细请看JDK官方BUG反馈:

  1. JDK-6670302 : (se) NIO selector wakes up with 0 selected keys infinitely
  2. JDK-6403933 : (se) Selector doesn’t block on Selector.select(timeout) (lnx)

本质原因也是因为客户端的主动断开导致:

This is an issue with poll (and epoll) on Linux. If a file descriptor for a connected socket is polled with a request event mask of 0, and if the connection is abruptly terminated (RST) then the poll wakes up with the POLLHUP (and maybe POLLERR) bit set in the returned event set. The implication of this behaviour is that Selector will wakeup and as the interest set for the SocketChannel is 0 it means there aren’t any selected events and the select method returns 0.

这个问题本质是与操作系统有关的,所以JDK一直都认为是操作系统的问题,不应该由自己来处理,所以这个问题在当时的好几个JDK版本都是存在的,这是一个很严重的空转问题,无限制地进行空转操作会导致CPU资源被疯狂消耗。

不过,这个问题,却被Netty框架巧妙解决了,我们后面再说。

粘包/拆包问题

除了上面的问题之外,我们接着来看下一个问题。

我们在计算机网络这门课程中学习过,操作系统通过TCP协议发送数据的时候,也会先将数据存放在缓冲区中,而至于什么时候真正地发出这些数据,是由TCP协议来决定的,这是我们无法控制的事情。

image-20220506224926169

也就是说,比如现在我们要发送两个数据包(P1/P2),理想情况下,这两个包应该是依次到达服务端,并由服务端正确读取两次数据出来,但是由于上面的机制,可能会出现下面的情况:

  1. 可能P1和P2被合在一起发送给了服务端(粘包现象)
  2. 可能P1和P2的前半部分合在一起发送给了服务端(拆包现象)
  3. 可能P1的前半部分就被单独作为一个部分发给了服务端,后面的和P2一起发给服务端(也是拆包现象)

image-20220506224258538

当然,对于这种问题,也有一些比较常见的解决方案:

  1. 消息定长,发送方和接收方规定固定大小的消息长度,例如每个数据包大小固定为200字节,如果不够,空位补空格,只有接收了200个字节之后,作为一个完整的数据包进行处理。
  2. 在每个包的末尾使用固定的分隔符,比如每个数据包末尾都是\r\n,这样就一定需要读取到这样的分隔符才能将前面所有的数据作为一个完整的数据包进行处理。
  3. 将消息分为头部和本体,在头部中保存有当前整个数据包的长度,只有在读到足够长度之后才算是读到了一个完整的数据包。

这里我们就来演示一下第一种解决方案:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static void main(String[] args) {
try (ServerSocketChannel serverChannel = ServerSocketChannel.open();
Selector selector = Selector.open()){
serverChannel.bind(new InetSocketAddress(8080));
serverChannel.configureBlocking(false);
serverChannel.register(selector, SelectionKey.OP_ACCEPT);

//一个数据包要求必须塞满30个字节
ByteBuffer buffer = ByteBuffer.allocate(30);

while (true) {
int count = selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
...
if(buffer.remaining() == 0) {
buffer.flip();
System.out.println("接收到客户端数据:"+new String(buffer.array(), 0, buffer.remaining()));
buffer.clear();
}
channel.write(ByteBuffer.wrap(("已收到 "+size+" 字节的数据!").getBytes()));
}
...

现在,当我们的客户端发送消息时,如果没有达到30个字节,那么会暂时存储起来,等有30个之后再一次性得到,当然如果数据量超过了30,那么最多也只会读取30个字节,其他的放在下一批:

image-20220507102955570

image-20220507103009255

这样就可以在一定程度上解决粘包/拆包问题了。


走进Netty框架

前面我们盘点了一下NIO存在的一些问题,而在Netty框架中,这些问题都被巧妙的解决了。

Netty是由JBOSS提供的一个开源的java网络编程框架,主要是对java的nio包进行了再次封装。Netty比java原生的nio包提供了更加强大、稳定的功能和易于使用的api。 netty的作者是Trustin Lee,这是一个韩国人,他还开发了另外一个著名的网络编程框架,mina。二者在很多方面都十分相似,它们的线程模型也是基本一致 。不过netty社区的活跃程度要mina高得多。

Netty实际上应用场景非常多,比如我们的Minecraft游戏服务器:

image-20220507110120090

Java版本的Minecraft服务器就是使用Netty框架作为网络通信的基础,正是得益于Netty框架的高性能,我们才能愉快地和其他的小伙伴一起在服务器里面炸服。

学习了Netty框架后,说不定你也可以摸索到部分Minecraft插件/模组开发的底层细节(太折磨了,UP主高中搞了大半年这玩意)

当然除了游戏服务器之外,我们微服务之间的远程调用也可以使用Netty来完成,比如Dubbo的RPC框架,包括最新的SpringWebFlux框架,也抛弃了内嵌Tomcat而使用Netty作为通信框架。既然Netty这么强大,那么现在我们就开始Netty的学习吧!

导包先:

1
2
3
4
5
6
7
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.76.Final</version>
</dependency>
</dependencies>

ByteBuf介绍

Netty并没有使用NIO中提供的ByteBuffer来进行数据装载,而是自行定义了一个ByteBuf类。

那么这个类相比NIO中的ByteBuffer有什么不同之处呢?

  • 写操作完成后无需进行flip()翻转。
  • 具有比ByteBuffer更快的响应速度。
  • 动态扩容。

首先我们来看看它的内部结构:

1
2
3
4
5
6
7
public abstract class AbstractByteBuf extends ByteBuf {
...
int readerIndex; //index被分为了读和写,是两个指针在同时工作
int writerIndex;
private int markedReaderIndex; //mark操作也分两种
private int markedWriterIndex;
private int maxCapacity; //最大容量,没错,这玩意能动态扩容

可以看到,读操作和写操作分别由两个指针在进行维护,每写入一次,writerIndex向后移动一位,每读取一次,也是readerIndex向后移动一位,当然readerIndex不能大于writerIndex,这样就不会像NIO中的ByteBuffer那样还需要进行翻转了。

image-20220507160235552

其中readerIndexwriterIndex之间的部分就是是可读的内容,而writerIndex之后到capacity都是可写的部分。

我们来实际使用一下看看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static void main(String[] args) {
//创建一个初始容量为10的ByteBuf缓冲区,这里的Unpooled是用于快速生成ByteBuf的工具类
//至于为啥叫Unpooled是池化的意思,ByteBuf有池化和非池化两种,区别在于对内存的复用,我们之后再讨论
ByteBuf buf = Unpooled.buffer(10);
System.out.println("初始状态:"+Arrays.toString(buf.array()));
buf.writeInt(-888888888); //写入一个Int数据
System.out.println("写入Int后:"+Arrays.toString(buf.array()));
buf.readShort(); //无需翻转,直接读取一个short数据出来
System.out.println("读取Short后:"+Arrays.toString(buf.array()));
buf.discardReadBytes(); //丢弃操作,会将当前的可读部分内容丢到最前面,并且读写指针向前移动丢弃的距离
System.out.println("丢弃之后:"+Arrays.toString(buf.array()));
buf.clear(); //清空操作,清空之后读写指针都归零
System.out.println("清空之后:"+Arrays.toString(buf.array()));
}

通过结合断点调试,我们可以观察读写指针的移动情况,更加清楚的认识一下ByteBuf的底层操作。

我们再来看看划分操作是不是和之前一样的:

1
2
3
4
5
6
7
8
9
10
public static void main(String[] args) {
//我们也可以将一个byte[]直接包装进缓冲区(和NIO是一样的)不过写指针的值一开始就跑到最后去了,但是这玩意是不是只读的
ByteBuf buf = Unpooled.wrappedBuffer("abcdefg".getBytes());
//除了包装,也可以复制数据,copiedBuffer()会完完整整将数据拷贝到一个新的缓冲区中
buf.readByte(); //读取一个字节
ByteBuf slice = buf.slice(); //现在读指针位于1,然后进行划分

System.out.println(slice.arrayOffset()); //得到划分出来的ByteBuf的偏移地址
System.out.println(Arrays.toString(slice.array()));
}

可以看到,划分也是根据当前读取的位置来进行的。

我们继续来看看它的另一个特性,动态扩容,比如我们申请一个容量为10的缓冲区:

1
2
3
4
5
6
7
public static void main(String[] args) {
ByteBuf buf = Unpooled.buffer(10); //容量只有10字节
System.out.println(buf.capacity());
//直接写一个字符串
buf.writeCharSequence("卢本伟牛逼!", StandardCharsets.UTF_8); //很明显这么多字已经超过10字节了
System.out.println(buf.capacity());
}

通过结果我们发现,在写入一个超出当前容量的数据时,会进行动态扩容,扩容会从64开始,之后每次触发扩容都会x2,当然如果我们不希望它扩容,可以指定最大容量:

1
2
3
4
5
6
7
public static void main(String[] args) {
//在生成时指定maxCapacity也为10
ByteBuf buf = Unpooled.buffer(10, 10);
System.out.println(buf.capacity());
buf.writeCharSequence("卢本伟牛逼!", StandardCharsets.UTF_8);
System.out.println(buf.capacity());
}

可以看到现在无法再动态扩容了:

image-20220507165153381

我们接着来看一下缓冲区的三种实现模式:堆缓冲区模式、直接缓冲区模式、复合缓冲区模式。

堆缓冲区(数组实现)和直接缓冲区(堆外内存实现)不用多说,前面我们在NIO中已经了解过了,我们要创建一个直接缓冲区也很简单,直接调用:

1
2
3
4
public static void main(String[] args) {
ByteBuf buf = Unpooled.directBuffer(10);
System.out.println(Arrays.toString(buf.array()));
}

同样的不能直接拿到数组,因为底层压根不是数组实现的:

image-20220507163253662

我们来看看复合模式,复合模式可以任意地拼凑组合其他缓冲区,比如我们可以:

image-20220507171216323

这样,如果我们想要对两个缓冲区组合的内容进行操作,我们就不用再单独创建一个新的缓冲区了,而是直接将其进行拼接操作,相当于是作为多个缓冲区组合的视图。

1
2
3
4
5
6
7
8
//创建一个复合缓冲区
CompositeByteBuf buf = Unpooled.compositeBuffer();
buf.addComponent(Unpooled.copiedBuffer("abc".getBytes()));
buf.addComponent(Unpooled.copiedBuffer("def".getBytes()));

for (int i = 0; i < buf.capacity(); i++) {
System.out.println((char) buf.getByte(i));
}

可以看到我们也可以正常操作组合后的缓冲区。

最后我们来看看,池化缓冲区和非池化缓冲区的区别。

我们研究一下Unpooled工具类中具体是如何创建buffer的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public final class Unpooled {
private static final ByteBufAllocator ALLOC; //实际上内部是有一个ByteBufAllocator对象的
public static final ByteOrder BIG_ENDIAN;
public static final ByteOrder LITTLE_ENDIAN;
public static final ByteBuf EMPTY_BUFFER;

public static ByteBuf buffer() {
return ALLOC.heapBuffer(); //缓冲区的创建操作实际上是依靠ByteBufAllocator来进行的
}

...

static { //ALLOC在静态代码块中进行指定,实际上真正的实现类是UnpooledByteBufAllocator
ALLOC = UnpooledByteBufAllocator.DEFAULT;
BIG_ENDIAN = ByteOrder.BIG_ENDIAN;
LITTLE_ENDIAN = ByteOrder.LITTLE_ENDIAN;
EMPTY_BUFFER = ALLOC.buffer(0, 0); //空缓冲区容量和最大容量都是0

assert EMPTY_BUFFER instanceof EmptyByteBuf : "EMPTY_BUFFER must be an EmptyByteBuf.";

}
}

那么我们来看看,这个ByteBufAllocator又是个啥,顾名思义,其实就是负责分配缓冲区的。

它有两个具体实现类:UnpooledByteBufAllocatorPooledByteBufAllocator,一个是非池化缓冲区生成器,还有一个是池化缓冲区生成器,那么池化和非池化有啥区别呢?

实际上池化缓冲区利用了池化思想,将缓冲区通过设置内存池来进行内存块复用,这样就不用频繁地进行内存的申请,尤其是在使用堆外内存的时候,避免多次重复通过底层malloc()函数系统调用申请内存造成的性能损失。Netty的内存管理机制主要是借鉴Jemalloc内存分配策略,感兴趣的小伙伴可以深入了解一下。

所以,由于是复用内存空间,我们来看个例子:

1
2
3
4
5
6
7
8
9
10
public static void main(String[] args) {
ByteBufAllocator allocator = PooledByteBufAllocator.DEFAULT;
ByteBuf buf = allocator.directBuffer(10); //申请一个容量为10的直接缓冲区
buf.writeChar('T'); //随便操作操作
System.out.println(buf.readChar());
buf.release(); //释放此缓冲区

ByteBuf buf2 = allocator.directBuffer(10); //重新再申请一个同样大小的直接缓冲区
System.out.println(buf2 == buf);
}

可以看到,在我们使用完一个缓冲区之后,我们将其进行资源释放,当我们再次申请一个同样大小的缓冲区时,会直接得到之前已经申请好的缓冲区,所以,PooledByteBufAllocator实际上是将ByteBuf实例放入池中在进行复用。

零拷贝简介

注意:此小节作为选学内容,需要掌握操作系统计算机组成原理才能学习。

零拷贝是一种I/O操作优化技术,可以快速高效地将数据从文件系统移动到网络接口,而不需要将其从内核空间复制到用户空间,首先第一个问题,什么是内核空间,什么又是用户空间呢?

其实早期操作系统是不区分内核空间和用户空间的,但是应用程序能访问任意内存空间,程序很容易不稳定,常常把系统搞崩溃,比如清除操作系统的内存数据。实际上让应用程序随便访问内存真的太危险了,于是就按照CPU 指令的重要程度对指令进行了分级,指令分为四个级别:Ring0 ~ Ring3,Linux 下只使用了 Ring0 和 Ring3 两个运行级别,进程运行在 Ring3 级别时运行在用户态,指令只访问用户空间,而运行在 Ring0 级别时被称为运行在内核态,可以访问任意内存空间。

image-20220512122211805

比如我们Java中创建一个新的线程,实际上最终是要交给操作系统来为我们进行分配的,而需要操作系统帮助我们完成任务则需要进行系统调用,是内核在进行处理,不是我们自己的程序在处理,这时就相当于我们的程序处于了内核态,而当操作系统底层分配完成,最后到我们Java代码中返回得到线程对象时,又继续由我们的程序进行操作,所以从内核态转换回了用户态。

而我们的文件操作也是这样,我们实际上也是需要让操作系统帮助我们从磁盘上读取文件数据或是向网络发送数据,比如使用传统IO的情况下,我们要从磁盘上读取文件然后发送到网络上,就会经历以下流程:

image-20220512123113806

可以看到整个过程中是经历了2次CPU拷贝+2次DMA拷贝,一共四次拷贝,虽然逻辑比较清晰,但是数据老是这样来回进行复制,是不是太浪费时间了点?所以我们就需要寻找一种更好的方式,来实现零拷贝。

实现零拷贝我们这里演示三种方案:

  1. 使用虚拟内存

    现在的操作系统基本都是支持虚拟内存的,我们可以让内核空间和用户空间的虚拟地址指向同一个物理地址,这样就相当于是直接共用了这一块区域,也就谈不上拷贝操作了:

    image-20220512124512936

  2. 使用mmap/write内存映射

    实际上这种方式就是将内核空间中的缓存直接映射到用户空间缓存,比如我们之前在学习NIO中使用的MappedByteBuffer,就是直接作为映射存在,当我们需要将数据发送到Socket缓冲区时,直接在内核空间中进行操作就行了:

    image-20220512124732995

    不过这样还是会出现用户态和内核态的切换,我们得再优化优化。

  3. 使用sendfile方式

    在Linux2.1开始,引入了sendfile方式来简化操作,我们可以直接告诉内核要把哪个文件数据拷贝拷贝到Socket上,直接在内核空间中一步到位:

    image-20220512124950007

    比如我们之前在NIO中使用的transferTo()方法,就是利用了这种机制来实现零拷贝的。

Netty工作模型

前面我们了解了Netty为我们提供的更高级的缓冲区类,我们接着来看看Netty是如何工作的,上一章我们介绍了Reactor模式,而Netty正是以主从Reactor多线程模型为基础,构建出了一套高效的工作模型。

大致工作模型图如下:

image-20220509215109408

可以看到,和我们之前介绍的主从Reactor多线程模型非常类似:

image-20220505131410997

所有的客户端需要连接到主Reactor完成Accept操作后,其他的操作由从Reactor去完成,这里也是差不多的思想,但是它进行了一些改进,我们来看一下它的设计:

  • Netty 抽象出两组线程池BossGroup和WorkerGroup,BossGroup专门负责接受客户端的连接, WorkerGroup专门负读写,就像我们前面说的主从Reactor一样。
  • 无论是BossGroup还是WorkerGroup,都是使用EventLoop(事件循环,很多系统都采用了事件循环机制,比如前端框架Node.js,事件循环顾名思义,就是一个循环,不断地进行事件通知)来进行事件监听的,整个Netty也是使用事件驱动来运作的,比如当客户端已经准备好读写、连接建立时,都会进行事件通知,说白了就像我们之前写NIO多路复用那样,只不过这里换成EventLoop了而已,它已经帮助我们封装好了一些常用操作,而且我们可以自己添加一些额外的任务,如果有多个EventLoop,会存放在EventLoopGroup中,EventLoopGroup就是BossGroup和WorkerGroup的具体实现。
  • 在BossGroup之后,会正常将SocketChannel绑定到WorkerGroup中的其中一个EventLoop上,进行后续的读写操作监听。

前面我们大致了解了一下Netty的工作模型,接着我们来尝试创建一个Netty服务器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public static void main(String[] args) {
//这里我们使用NioEventLoopGroup实现类即可,创建BossGroup和WorkerGroup
//当然还有EpollEventLoopGroup,但是仅支持Linux,这是Netty基于Linux底层Epoll单独编写的一套本地实现,没有使用NIO那套
EventLoopGroup bossGroup = new NioEventLoopGroup(), workerGroup = new NioEventLoopGroup();

//创建服务端启动引导类
ServerBootstrap bootstrap = new ServerBootstrap();
//可链式,就很棒
bootstrap
.group(bossGroup, workerGroup) //指定事件循环组
.channel(NioServerSocketChannel.class) //指定为NIO的ServerSocketChannel
.childHandler(new ChannelInitializer<SocketChannel>() { //注意,这里的SocketChannel不是我们NIO里面的,是Netty的
@Override
protected void initChannel(SocketChannel channel) {
//获取流水线,当我们需要处理客户端的数据时,实际上是像流水线一样在处理,这个流水线上可以有很多Handler
channel.pipeline().addLast(new ChannelInboundHandlerAdapter(){ //添加一个Handler,这里使用ChannelInboundHandlerAdapter
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) { //ctx是上下文,msg是收到的消息,默认以ByteBuf形式(也可以是其他形式,后面再说)
ByteBuf buf = (ByteBuf) msg; //类型转换一下
System.out.println(Thread.currentThread().getName()+" >> 接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
//通过上下文可以直接发送数据回去,注意要writeAndFlush才能让客户端立即收到
ctx.writeAndFlush(Unpooled.wrappedBuffer("已收到!".getBytes()));
}
});
}
});
//最后绑定端口,启动
bootstrap.bind(8080);
}

可以看到上面写了很多东西,但是你一定会懵逼,这些新来的东西,都是什么跟什么啊,怎么一个也没看明白?没关系,我们可以暂时先将代码写在这里,具体的各个部分,还请听后面细细道来。

我们接着编写一个客户端,客户端可以直接使用我们之前的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public static void main(String[] args) {
//创建一个新的SocketChannel,一会通过通道进行通信
try (SocketChannel channel = SocketChannel.open(new InetSocketAddress("localhost", 8080));
Scanner scanner = new Scanner(System.in)){
System.out.println("已连接到服务端!");
while (true) { //咱给它套个无限循环,这样就能一直发消息了
System.out.println("请输入要发送给服务端的内容:");
String text = scanner.nextLine();
if(text.isEmpty()) continue;
//直接向通道中写入数据,真舒服
channel.write(ByteBuffer.wrap(text.getBytes()));
System.out.println("已发送!");
ByteBuffer buffer = ByteBuffer.allocate(128);
channel.read(buffer); //直接从通道中读取数据
buffer.flip();
System.out.println("收到服务器返回:"+new String(buffer.array(), 0, buffer.remaining()));
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}

通过通道正常收发数据即可,这样我们就成功搭建好了一个Netty服务器。

Channel详解

在学习NIO时,我们就已经接触到Channel了,我们可以通过通道来进行数据的传输,并且通道支持双向传输。

而在Netty中,也有对应的Channel类型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {
ChannelId id(); //通道ID
EventLoop eventLoop(); //获取此通道所属的EventLoop,因为一个Channel在它的生命周期内只能注册到一个EventLoop中
Channel parent(); //Channel是具有层级关系的,这里是返回父Channel
ChannelConfig config();
boolean isOpen(); //通道当前的相关状态
boolean isRegistered();
boolean isActive();
ChannelMetadata metadata(); //通道相关信息
SocketAddress localAddress();
SocketAddress remoteAddress();
ChannelFuture closeFuture(); //关闭通道,但是会用到ChannelFuture,后面说
boolean isWritable();
long bytesBeforeUnwritable();
long bytesBeforeWritable();
Unsafe unsafe();
ChannelPipeline pipeline(); //流水线,之后也会说
ByteBufAllocator alloc(); //可以直接从Channel拿到ByteBufAllocator的实例,来分配ByteBuf
Channel read();
Channel flush(); //刷新,基操
}

可以看到,Netty中的Channel相比NIO功能就多得多了。Netty中的Channel主要特点如下:

  • 所有的IO操作都是异步的,并不是在当前线程同步运行,方法调用之后就直接返回了,那怎么获取操作的结果呢?还记得我们在前面JUC篇教程中学习的Future吗,没错,这里的ChannelFuture也是干这事的。

我们可以来看一下Channel接口的父接口ChannelOutboundInvoker接口,这里面定义了大量的I/O操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public interface ChannelOutboundInvoker {   //通道出站调用(包含大量的网络出站操作,比如写)
ChannelFuture bind(SocketAddress var1); //Socket绑定、连接、断开、关闭等操作
ChannelFuture connect(SocketAddress var1);
ChannelFuture connect(SocketAddress var1, SocketAddress var2);
ChannelFuture disconnect();
ChannelFuture close();
ChannelFuture deregister();
ChannelFuture bind(SocketAddress var1, ChannelPromise var2); //下面这一系列还有附带ChannelPromise的,ChannelPromise我们后面再说,其实就是ChannelFuture的增强版
ChannelFuture connect(SocketAddress var1, ChannelPromise var2);
ChannelFuture connect(SocketAddress var1, SocketAddress var2, ChannelPromise var3);
ChannelFuture disconnect(ChannelPromise var1);
ChannelFuture close(ChannelPromise var1);
ChannelFuture deregister(ChannelPromise var1);
ChannelOutboundInvoker read();

ChannelFuture write(Object var1); //可以看到这些常见的写操作,都是返回的ChannelFuture,而不是直接给结果
ChannelFuture write(Object var1, ChannelPromise var2);
ChannelOutboundInvoker flush();
ChannelFuture writeAndFlush(Object var1, ChannelPromise var2);
ChannelFuture writeAndFlush(Object var1);

ChannelPromise newPromise(); //其他的暂时不提
ChannelProgressivePromise newProgressivePromise();
ChannelFuture newSucceededFuture();
ChannelFuture newFailedFuture(Throwable var1);
ChannelPromise voidPromise();
}

当然它还实现了AttributeMap接口,其实有点类似于Session那种感觉,我们可以添加一些属性之类的:

1
2
3
4
5
public interface AttributeMap {
<T> Attribute<T> attr(AttributeKey<T> var1);

<T> boolean hasAttr(AttributeKey<T> var1);
}

我们了解了Netty底层的Channel之后,我们接着来看ChannelHandler,既然现在有了通道,那么怎么进行操作呢?我们可以将需要处理的事情放在ChannelHandler中,ChannelHandler充当了所有入站和出站数据的应用程序逻辑的容器,实际上就是我们之前Reactor模式中的Handler,全靠它来处理读写操作。

不过这里不仅仅是一个简单的ChannelHandler在进行处理,而是一整套流水线,我们之后会介绍ChannelPipeline。

比如我们上面就是使用了ChannelInboundHandlerAdapter抽象类,它是ChannelInboundHandler接口的实现,用于处理入站数据,可以看到我们实际上就是通过重写对应的方法来进行处理,这些方法会在合适的时间被调用:

1
2
3
4
5
6
7
8
9
10
channel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
//ctx是上下文,msg是收到的消息,以ByteBuf形式
ByteBuf buf = (ByteBuf) msg; //类型转换一下
System.out.println(Thread.currentThread().getName()+" >> 接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
//通过上下文可以直接发送数据回去,注意要writeAndFlush才能让客户端立即收到
ctx.writeAndFlush(Unpooled.wrappedBuffer("已收到!".getBytes()));
}
});

我们先从顶层接口开始看起:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public interface ChannelHandler {
//当ChannelHandler被添加到流水线中时调用
void handlerAdded(ChannelHandlerContext var1) throws Exception;
//当ChannelHandler从流水线中移除时调用
void handlerRemoved(ChannelHandlerContext var1) throws Exception;

/** @deprecated 已过时那咱就不管了 */
@Deprecated
void exceptionCaught(ChannelHandlerContext var1, Throwable var2) throws Exception;

@Inherited
@Documented
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface Sharable {
}
}

顶层接口的定义比较简单,就只有一些流水线相关的回调方法,我们接着来看下一级:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//ChannelInboundHandler用于处理入站相关事件
public interface ChannelInboundHandler extends ChannelHandler {
//当Channel已经注册到自己的EventLoop上时调用,前面我们说了,一个Channel只会注册到一个EventLoop上,注册到EventLoop后,这样才会在发生对应事件时被通知。
void channelRegistered(ChannelHandlerContext var1) throws Exception;
//从EventLoop上取消注册时
void channelUnregistered(ChannelHandlerContext var1) throws Exception;
//当Channel已经处于活跃状态时被调用,此时Channel已经连接/绑定,并且已经就绪
void channelActive(ChannelHandlerContext var1) throws Exception;
//跟上面相反,不再活跃了,并且不在连接它的远程节点
void channelInactive(ChannelHandlerContext var1) throws Exception;
//当从Channel读取数据时被调用,可以看到数据被自动包装成了一个Object(默认是ByteBuf)
void channelRead(ChannelHandlerContext var1, Object var2) throws Exception;
//上一个读取操作完成后调用
void channelReadComplete(ChannelHandlerContext var1) throws Exception;
//暂时不介绍
void userEventTriggered(ChannelHandlerContext var1, Object var2) throws Exception;
//当Channel的可写状态发生改变时被调用
void channelWritabilityChanged(ChannelHandlerContext var1) throws Exception;
//出现异常时被调用
void exceptionCaught(ChannelHandlerContext var1, Throwable var2) throws Exception;
}

而我们上面用到的ChannelInboundHandlerAdapter实际上就是对这些方法实现的抽象类,相比直接用接口,我们可以只重写我们需要的方法,没有重写的方法会默认向流水线下一个ChannelHandler发送。

我们来测试一下吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class TestChannelHandler extends ChannelInboundHandlerAdapter {

public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelRegistered");
}

public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelUnregistered");
}

public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelActive");
}

public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelInactive");
}

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println(Thread.currentThread().getName()+" >> 接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
//这次我们就直接使用ctx.alloc()来生成缓冲区
ByteBuf back = ctx.alloc().buffer();
back.writeCharSequence("已收到!", StandardCharsets.UTF_8);
ctx.writeAndFlush(back);
System.out.println("channelRead");
}

public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelReadComplete");
}

public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
System.out.println("userEventTriggered");
}

public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelWritabilityChanged");
}

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("exceptionCaught"+cause);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup(), workerGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
//ChannelInitializer是一个特殊的ChannelHandler,它本身不处理任何出站/入站事件,它的目的仅仅是完成Channel的初始化
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) {
//将我们自定义的ChannelHandler添加到流水线
channel.pipeline().addLast(new TestChannelHandler());
}
});
bootstrap.bind(8080);
}

现在我们启动服务器,让客户端来连接并发送一下数据试试看:

image-20220510092703319

可以看到ChannelInboundHandler的整个生命周期,首先是Channel注册成功,然后才会变成可用状态,接着就差不多可以等待客户端来数据了,当客户端主动断开连接时,会再次触发一次channelReadComplete,然后不可用,最后取消注册。

我们来测试一下出现异常的情况呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println(Thread.currentThread().getName()+" >> 接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
ByteBuf back = ctx.alloc().buffer();
back.writeCharSequence("已收到!", StandardCharsets.UTF_8);
ctx.writeAndFlush(back);
System.out.println("channelRead");
throw new RuntimeException("我是自定义异常1"); //弄点异常上去
}

public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelReadComplete");
throw new RuntimeException("我是自定义异常2"); //弄点异常上去
}

...

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("exceptionCaught"+cause);
}

可以看到发生异常时,会接着调用exceptionCaught方法:

image-20220510094007913

与ChannelInboundHandler对应的还有ChannelOutboundHandler用于处理出站相关的操作,这里就不进行演示了。

我们接着来看看ChannelPipeline,每一个Channel都对应一个ChannelPipeline(在Channel初始化时就被创建了)

image-20220511152035030

它就像是一条流水线一样,整条流水线上可能会有很多个Handler(包括入站和出站),整条流水线上的两端还有两个默认的处理器(用于一些预置操作和后续操作,比如释放资源等),我们只需要关心如何安排这些自定义的Handler即可,比如我们现在希望创建两个入站ChannelHandler,一个用于接收请求并处理,还有一个用于处理当前接收请求过程中出现的异常:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
.childHandler(new ChannelInitializer<SocketChannel>() {   //注意,这里的SocketChannel不是我们NIO里面的,是Netty的
@Override
protected void initChannel(SocketChannel channel) {
channel.pipeline() //直接获取pipeline,然后添加两个Handler,注意顺序
.addLast(new ChannelInboundHandlerAdapter(){ //第一个用于处理消息接收
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
throw new RuntimeException("我是异常");
}
})
.addLast(new ChannelInboundHandlerAdapter(){ //第二个用于处理异常
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("我是异常处理:"+cause);
}
});
}
});

那么它是如何运作的呢?实际上如果我们不在ChannelInboundHandlerAdapter中重写对应的方法,它会默认传播到流水线的下一个ChannelInboundHandlerAdapter进行处理,比如:

1
2
3
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.fireExceptionCaught(cause); //通过ChannelHandlerContext来向下传递,ChannelHandlerContext是在Handler添加进Pipeline中时就被自动创建的
}

比如我们现在需要将一个消息在两个Handler中进行处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Override
protected void initChannel(SocketChannel channel) {
channel.pipeline() //直接获取pipeline,然后添加两个Handler
.addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("1接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
ctx.fireChannelRead(msg); //通过ChannelHandlerContext
}
})
.addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("2接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
}
});
}

我们接着来看看出站相关操作,我们可以使用ChannelOutboundHandlerAdapter来完成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Override
protected void initChannel(SocketChannel channel) {
channel.pipeline()
.addLast(new ChannelOutboundHandlerAdapter(){
//注意出栈站操作应该在入站操作的前面,当我们使用ChannelHandlerContext的write方法时,是从流水线的当前位置倒着往前找下一个ChannelOutboundHandlerAdapter,而我们之前使用的ChannelInboundHandlerAdapter是从前往后找下一个,如果我们使用的是Channel的write方法,那么会从整个流水线的最后开始倒着往前找ChannelOutboundHandlerAdapter,一定要注意顺序。
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { //当执行write操作时,会
System.out.println(msg); //write的是啥,这里就是是啥
//我们将其转换为ByteBuf,这样才能发送回客户端
ctx.writeAndFlush(Unpooled.wrappedBuffer(msg.toString().getBytes()));
}
})
.addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("1接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
ctx.fireChannelRead(msg);
}
})
.addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("2接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
ctx.writeAndFlush("不会吧不会吧,不会还有人都看到这里了还没三连吧"); //这里可以write任何对象
//ctx.channel().writeAndFlush("啊对对对"); 或是通过Channel进行write也可以
}
});
}

现在我们来试试看,搞两个出站的Handler,验证一下是不是上面的样子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Override
protected void initChannel(SocketChannel channel) {
channel.pipeline() //直接获取pipeline,然后添加两个Handler
.addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("1接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
ctx.fireChannelRead(msg);
}
})
.addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("2接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
ctx.channel().writeAndFlush("伞兵一号卢本伟"); //这里我们使用channel的write
}
})
.addLast(new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("1号出站:"+msg);
}
})
.addLast(new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("2号出站:"+msg);
ctx.write(msg); //继续write给其他的出站Handler,不然到这里就断了
}
});
}

所以,出站操作在流水线上是反着来的,整个流水线操作大概流程如下:

image-20220510130021714

有关Channel及其处理相关操作,就先讲到这里。

EventLoop和任务调度

前面我们讲解了Channel,那么在EventLoop中具体是如何进行调度的呢?实际上我们之前在编写NIO的时候,就是一个while循环在源源不断地等待新的事件,而EventLoop也正是这种思想,它本质就是一个事件等待/处理线程。

image-20220510133359757

我们上面使用的就是EventLoopGroup,包含很多个EventLoop,我们每创建一个连接,就需要绑定到一个EventLoop上,之后EventLoop就会开始监听这个连接(只要连接不关闭,一直都是这个EventLoop负责此Channel),而一个EventLoop可以同时监听很多个Channel,实际上就是我们之前学习的Selector罢了。

当然,EventLoop并不只是用于网络操作的,我们前面所说的EventLoop其实都是NioEventLoop,它是专用于网络通信的,除了网络通信之外,我们也可以使用普通的EventLoop来处理一些其他的事件。

比如我们现在编写的服务端,虽然结构上和主从Reactor多线程模型差不多,但是我们发现,Handler似乎是和读写操作在一起进行的,而我们之前所说的模型中,Handler是在读写之外的单独线程中进行的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup(), workerGroup = new NioEventLoopGroup(1); //线程数先限制一下
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap
.group(bossGroup, workerGroup) //指定事件循环组
.channel(NioServerSocketChannel.class) //指定为NIO的ServerSocketChannel
.childHandler(new ChannelInitializer<SocketChannel>() { //注意,这里的SocketChannel不是我们NIO里面的,是Netty的
@Override
protected void initChannel(SocketChannel channel) {
channel.pipeline()
.addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
Thread.sleep(10000); //这里我们直接卡10秒假装在处理任务
ctx.writeAndFlush(Unpooled.wrappedBuffer("已收到!".getBytes()));
}
});
}
});
bootstrap.bind(8080);
}

可以看到,如果在这里卡住了,那么就没办法处理EventLoop绑定的其他Channel了,所以我们这里就创建一个普通的EventLoop来专门处理读写之外的任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup(), workerGroup = new NioEventLoopGroup(1); //线程数先限制一下
EventLoopGroup handlerGroup = new DefaultEventLoopGroup(); //使用DefaultEventLoop来处理其他任务
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) {
channel.pipeline()
.addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
handlerGroup.submit(() -> {
//由于继承自ScheduledExecutorService,我们直接提交任务就行了,是不是感觉贼方便
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
ctx.writeAndFlush(Unpooled.wrappedBuffer("已收到!".getBytes()));
});
}
});
}
});
bootstrap.bind(8080);
}

当然我们也可以写成一条流水线:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup(), workerGroup = new NioEventLoopGroup(1); //线程数先限制一下
EventLoopGroup handlerGroup = new DefaultEventLoopGroup(); //使用DefaultEventLoop来处理其他任务
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) {
channel.pipeline()
.addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
ctx.fireChannelRead(msg);
}
}).addLast(handlerGroup, new ChannelInboundHandlerAdapter(){ //在添加时,可以直接指定使用哪个EventLoopGroup
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
ctx.writeAndFlush(Unpooled.wrappedBuffer("已收到!".getBytes()));
}
});
}
});
bootstrap.bind(8080);
}

这样,我们就进一步地将EventLoop利用起来了。

按照前面服务端的方式,我们来把Netty版本的客户端也给写了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public static void main(String[] args) {
Bootstrap bootstrap = new Bootstrap(); //客户端也是使用Bootstrap来启动
bootstrap
.group(new NioEventLoopGroup()) //客户端就没那么麻烦了,直接一个EventLoop就行,用于处理发回来的数据
.channel(NioSocketChannel.class) //客户端肯定就是使用SocketChannel了
.handler(new ChannelInitializer<SocketChannel>() { //这里的数据处理方式和服务端是一样的
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println(">> 接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
}
});
}
});
Channel channel = bootstrap.connect("localhost", 8080).channel(); //连接后拿到对应的Channel对象
//注意上面连接操作是异步的,调用之后会继续往下走,下面我们就正式编写客户端的数据发送代码了
try(Scanner scanner = new Scanner(System.in)){ //还是和之前一样,扫了就发
while (true) {
System.out.println("<< 请输入要发送给服务端的内容:");
String text = scanner.nextLine();
if(text.isEmpty()) continue;
channel.writeAndFlush(Unpooled.wrappedBuffer(text.getBytes())); //通过Channel对象发送数据
}
}
}

我们来测试一下吧:

image-20220510144513303

Future和Promise

我们接着来看ChannelFuture,前面我们提到,Netty中Channel的相关操作都是异步进行的,并不是在当前线程同步执行,我们不能立即得到执行结果,如果需要得到结果,那么我们就必须要利用到Future。

我们先来看看ChannelFutuer接口怎么定义的:

1
2
3
4
5
6
7
8
9
10
11
12
public interface ChannelFuture extends Future<Void> {
Channel channel(); //我们可以直接获取此任务的Channel
ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> var1); //当任务完成时,会直接执行GenericFutureListener的任务,注意执行的位置也是在EventLoop中
ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... var1);
ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> var1);
ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... var1);
ChannelFuture sync() throws InterruptedException; //在当前线程同步等待异步任务完成,任务失败会抛出异常
ChannelFuture syncUninterruptibly(); //同上,但是无法响应中断
ChannelFuture await() throws InterruptedException; //同上,但是任务中断不会抛出异常,需要手动判断
ChannelFuture awaitUninterruptibly(); //不用我说了吧?
boolean isVoid(); //返回类型是否为void
}

此接口是继承自Netty中的Future接口的(不是JDK的那个):

1
2
3
4
5
6
7
8
9
10
public interface Future<V> extends java.util.concurrent.Future<V> {   //再往上才是JDK的Future
boolean isSuccess(); //用于判断任务是否执行成功的
boolean isCancellable();
Throwable cause(); //获取导致任务失败的异常

...

V getNow(); //立即获取结果,如果还未产生结果,得到null,不过ChannelFuture定义V为Void,就算完成了获取也是null
boolean cancel(boolean var1); //取消任务
}

Channel的很多操作都是异步完成的,直接返回一个ChannelFuture,比如Channel的write操作,返回的就是一个ChannelFuture对象:

1
2
3
4
5
6
7
8
9
.addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
ChannelFuture future = ctx.writeAndFlush(Unpooled.wrappedBuffer("已收到!".getBytes()));
System.out.println("任务完成状态:"+future.isDone()); //通过ChannelFuture来获取相关信息
}
});

包括我们的服务端启动也是返回的ChannelFuture:

1
2
3
4
5
6
7
...
}
});
ChannelFuture future = bootstrap.bind(8080);
System.out.println("服务端启动状态:"+future.isDone());
System.out.println("我是服务端启动完成之后要做的事情!");
}

可以看到,服务端的启动就比较慢了,所以在一开始直接获取状态会返回false,但是这个时候我们又需要等到服务端启动完成之后做一些事情,这个时候该怎么办呢?现在我们就有两种方案了:

1
2
3
4
5
6
7
                }
});
ChannelFuture future = bootstrap.bind(8080);
future.sync(); //让当前线程同步等待任务完成
System.out.println("服务端启动状态:"+future.isDone());
System.out.println("我是服务端启动完成之后要做的事情!");
}

第一种方案是直接让当前线程同步等待异步任务完成,我们可以使用sync()方法,这样当前线程会一直阻塞直到任务结束。第二种方案是添加一个监听器,等待任务完成时通知:

1
2
3
4
5
6
                }
});
ChannelFuture future = bootstrap.bind(8080);
//直接添加监听器,当任务完成时自动执行,但是注意执行也是异步的,不是在当前线程
future.addListener(f -> System.out.println("我是服务端启动完成之后要做的事情!"));
}

包括客户端的关闭,也是异步进行的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
try(Scanner scanner = new Scanner(System.in)){
while (true) {
System.out.println("<< 请输入要发送给服务端的内容:");
String text = scanner.nextLine();
if(text.isEmpty()) continue;
if(text.equals("exit")) { //输入exit就退出
ChannelFuture future = channel.close();
future.sync(); //等待Channel完全关闭
break;
}
channel.writeAndFlush(Unpooled.wrappedBuffer(text.getBytes()));
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
group.shutdownGracefully(); //优雅退出EventLoop,其实就是把还没发送的数据之类的事情做完,当然也可以shutdownNow立即关闭
}

我们接着来看看Promise接口,它支持手动设定成功和失败的结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//此接口也是继承自Netty中的Future接口
public interface Promise<V> extends Future<V> {
Promise<V> setSuccess(V var1); //手动设定成功
boolean trySuccess(V var1);
Promise<V> setFailure(Throwable var1); //手动设定失败
boolean tryFailure(Throwable var1);
boolean setUncancellable();
//这些就和之前的Future是一样的了
Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> var1);
Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... var1);
Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> var1);
Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... var1);
Promise<V> await() throws InterruptedException;
Promise<V> awaitUninterruptibly();
Promise<V> sync() throws InterruptedException;
Promise<V> syncUninterruptibly();
}

比如我们来测试一下:

1
2
3
4
5
6
7
public static void main(String[] args) throws ExecutionException, InterruptedException {
Promise<String> promise = new DefaultPromise<>(new DefaultEventLoop());
System.out.println(promise.isSuccess()); //在一开始肯定不是成功的
promise.setSuccess("lbwnb"); //设定成功
System.out.println(promise.isSuccess()); //再次获取,可以发现确实成功了
System.out.println(promise.get()); //获取结果,就是我们刚刚给进去的
}

可以看到我们可以手动指定成功状态,包括ChannelOutboundInvoker中的一些基本操作,都是支持ChannelPromise的:

1
2
3
4
5
6
7
8
9
10
11
12
13
.addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
String text = buf.toString(StandardCharsets.UTF_8);
System.out.println("接收到客户端发送的数据:"+text);
ChannelPromise promise = new DefaultChannelPromise(channel);
System.out.println(promise.isSuccess());
ctx.writeAndFlush(Unpooled.wrappedBuffer("已收到!".getBytes()), promise);
promise.sync(); //同步等待一下
System.out.println(promise.isSuccess());
}
});

最后结果就是我们想要的了,当然我们也可以像Future那样添加监听器,当成功时自动通知:

1
2
3
4
5
6
7
public static void main(String[] args) throws ExecutionException, InterruptedException {
Promise<String> promise = new DefaultPromise<>(new DefaultEventLoop());
promise.addListener(f -> System.out.println(promise.get())); //注意是在上面的DefaultEventLoop执行的
System.out.println(promise.isSuccess());
promise.setSuccess("lbwnb");
System.out.println(promise.isSuccess());
}

有关Future和Promise就暂时讲解到这里。

编码器和解码器

前面我们已经了解了Netty的大部分基础内容,我们接着来看看Netty内置的一些编码器和解码器。

在前面的学习中,我们的数据发送和接收都是需要以ByteBuf形式传输,但是这样是不是有点太不方便了,咱们能不能参考一下JavaWeb那种搞个Filter,在我们开始处理数据之前,过过滤一次,并在过滤的途中将数据转换成我们想要的类型,也可以将发出的数据进行转换,这就要用到编码解码器了。

我们先来看看最简的,字符串,如果我们要直接在客户端或是服务端处理字符串,可以直接添加一个字符串解码器到我们的流水线中:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
protected void initChannel(SocketChannel channel) {
channel.pipeline()
//解码器本质上也算是一种ChannelInboundHandlerAdapter,用于处理入站请求
.addLast(new StringDecoder()) //当客户端发送来的数据只是简单的字符串转换的ByteBuf时,我们直接使用内置的StringDecoder即可转换
.addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//经过StringDecoder转换后,msg直接就是一个字符串,所以打印就行了
System.out.println(msg);
}
});
}

可以看到,使用起来还是非常方便的,我们只需要将其添加到流水线即可,实际上器本质就是一个ChannelInboundHandlerAdapter:

image-20220511123807650

我们看到它是继承自MessageToMessageDecoder,用于将传入的Message转换为另一种类型,我们也可以自行编写一个实现:

1
2
3
4
5
6
7
8
9
10
11
/**
* 我们也来搞一个自定义的
*/
public class TestDecoder extends MessageToMessageDecoder<ByteBuf> {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf buf, List<Object> list) throws Exception {
System.out.println("数据已收到,正在进行解码...");
String text = buf.toString(StandardCharsets.UTF_8); //直接转换为UTF8字符串
list.add(text); //解码后需要将解析后的数据丢进List中,如果丢进去多个数据,相当于数据被分成了多个,后面的Handler就需要每个都处理一次
}
}

运行,可以看到:

image-20220511124755974

当然如果我们在List里面丢很多个数据的话:

1
2
3
4
5
6
7
8
9
10
public class TestDecoder extends MessageToMessageDecoder<ByteBuf> {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf buf, List<Object> list) throws Exception {
System.out.println("数据已收到,正在进行解码...");
String text = buf.toString(StandardCharsets.UTF_8); //直接转换为UTF8字符串
list.add(text);
list.add(text+"2");
list.add(text+'3'); //一条消息被解码成三条消息
}
}

image-20220511124933026

可以看到,后面的Handler会依次对三条数据都进行处理,当然,除了MessageToMessageDecoder之外,还有其他类型的解码器,比如ByteToMessageDecoder等,这里就不一一介绍了,Netty内置了很多的解码器实现来方便我们开发,比如HTTP(下一节介绍),SMTP、MQTT等,以及我们常用的Redis、Memcached、JSON等数据包。

当然,有了解码器处理发来的数据,那发出去的数据肯定也是需要被处理的,所以编码器就出现了:

1
2
3
4
5
6
7
8
9
10
11
channel.pipeline()
//解码器本质上也算是一种ChannelInboundHandlerAdapter,用于处理入站请求
.addLast(new StringDecoder())
.addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("收到客户端的数据:"+msg);
ctx.channel().writeAndFlush("可以,不跟你多BB"); //直接发字符串回去
}
})
.addLast(new StringEncoder()); //使用内置的StringEncoder可以直接将出站的字符串数据编码成ByteBuf

和上面的StringDecoder一样,StringEncoder本质上就是一个ChannelOutboundHandlerAdapter:

image-20220511130100984

是不是感觉前面学习的Handler和Pipeline突然就变得有用了,直接一条线把数据处理安排得明明白白啊。

现在我们把客户端也改成使用编码、解码器的样子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public static void main(String[] args) {
Bootstrap bootstrap = new Bootstrap();
bootstrap
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast(new StringDecoder()) //解码器安排
.addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(">> 接收到客户端发送的数据:" + msg); //直接接收字符串
}
})
.addLast(new StringEncoder()); //编码器安排
}
});
Channel channel = bootstrap.connect("localhost", 8080).channel();
try(Scanner scanner = new Scanner(System.in)){
while (true) {
System.out.println("<< 请输入要发送给服务端的内容:");
String text = scanner.nextLine();
if(text.isEmpty()) continue;
channel.writeAndFlush(text); //直接发送字符串就行
}
}
}

这样我们的代码量又蹭蹭的减少了很多:

image-20220511130605337

当然,除了编码器和解码器之外,还有编解码器。??缝合怪??

image-20220511130937624

可以看到它是既继承了ChannelInboundHandlerAdapter也实现了ChannelOutboundHandler接口,又能处理出站也能处理入站请求,实际上就是将之前的给组合到一起了,比如我们也可以实现一个缝合在一起的StringCodec类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//需要指定两个泛型,第一个是入站的消息类型,还有一个是出站的消息类型,出站是String类型,我们要转成ByteBuf
public class StringCodec extends MessageToMessageCodec<ByteBuf, String> {

@Override
protected void encode(ChannelHandlerContext channelHandlerContext, String buf, List<Object> list) throws Exception {
System.out.println("正在处理出站数据...");
list.add(Unpooled.wrappedBuffer(buf.getBytes())); //同样的,添加的数量就是出站的消息数量
}

@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf buf, List<Object> list) throws Exception {
System.out.println("正在处理入站数据...");
list.add(buf.toString(StandardCharsets.UTF_8)); //和之前一样,直接一行解决
}
}

可以看到实际上就是需要我们同时去实现编码和解码方法,继承MessageToMessageCodec类即可。

当然,如果整条流水线上有很多个解码器或是编码器,那么也可以多次进行编码或是解码,比如:

1
2
3
4
5
6
7
8
public class StringToStringEncoder extends MessageToMessageEncoder<String> {

@Override
protected void encode(ChannelHandlerContext channelHandlerContext, String s, List<Object> list) throws Exception {
System.out.println("我是预处理编码器,就要皮这一下。");
list.add("[已处理] "+s);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
channel.pipeline()
//解码器本质上也算是一种ChannelInboundHandlerAdapter,用于处理入站请求
.addLast(new StringDecoder())
.addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("收到客户端的数据:"+msg);
ctx.channel().writeAndFlush("可以,不跟你多BB"); //直接发字符串回去
}
})
.addLast(new StringEncoder()) //最后再转成ByteBuf
.addLast(new StringToStringEncoder()); //先从我们自定义的开始

可以看到,数据在流水线上一层一层处理最后再回到的客户端:

image-20220511133025492

我们在一开始提到的粘包/拆包问题,也可以使用一个解码器解决:

1
2
3
4
channel.pipeline()
.addLast(new FixedLengthFrameDecoder(10))
//第一种解决方案,使用定长数据包,每个数据包都要是指定长度
...
1
2
3
4
channel.pipeline()
.addLast(new DelimiterBasedFrameDecoder(1024, Unpooled.wrappedBuffer("!".getBytes())))
//第二种,就是指定一个特定的分隔符,比如我们这里以感叹号为分隔符
//在收到分隔符之前的所有数据,都作为同一个数据包的内容
1
2
3
4
5
channel.pipeline()
.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4))
//第三种方案,就是在头部添加长度信息,来确定当前发送的数据包具体长度是多少
//offset是从哪里开始,length是长度信息占多少字节,这里是从0开始读4个字节表示数据包长度
.addLast(new StringDecoder())
1
2
3
4
5
6
7
8
9
10
channel.pipeline()
.addLast(new StringDecoder())
.addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(">> 接收到客户端发送的数据:" + msg);
}
})
.addLast(new LengthFieldPrepender(4)) //客户端在发送时也需要将长度拼到前面去
.addLast(new StringEncoder());

有关编码器和解码器的内容就先介绍到这里。

实现HTTP协议通信

前面我们介绍了Netty为我们提供的编码器和解码器,这里我们就来使用一下支持HTTP协议的编码器和解码器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
channel.pipeline()
.addLast(new HttpRequestDecoder()) //Http请求解码器
.addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("收到客户端的数据:"+msg.getClass()); //看看是个啥类型
//收到浏览器请求后,我们需要给一个响应回去
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); //HTTP版本为1.1,状态码就OK(200)即可
//直接向响应内容中写入数据
response.content().writeCharSequence("Hello World!", StandardCharsets.UTF_8);
ctx.channel().writeAndFlush(response); //发送响应
ctx.channel().close(); //HTTP请求是一次性的,所以记得关闭
}
})
.addLast(new HttpResponseEncoder()); //响应记得也要编码后发送哦

现在我们用浏览器访问一下我们的服务器吧:

image-20220511142040941

可以看到浏览器成功接收到服务器响应,然后控制台打印了以下类型:

image-20220511142121619

可以看到一次请求是一个DefaultHttpRequest+LastHttpContent$1,这里有两组是因为浏览器请求了一个地址之后紧接着请求了我们网站的favicon图标。

这样把数据分开处理肯定是不行的,要是直接整合成一个多好,安排:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
channel.pipeline()
.addLast(new HttpRequestDecoder()) //Http请求解码器
.addLast(new HttpObjectAggregator(Integer.MAX_VALUE)) //搞一个聚合器,将内容聚合为一个FullHttpRequest,参数是最大内容长度
.addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
FullHttpRequest request = (FullHttpRequest) msg;
System.out.println("浏览器请求路径:"+request.uri()); //直接获取请求相关信息
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
response.content().writeCharSequence("Hello World!", StandardCharsets.UTF_8);
ctx.channel().writeAndFlush(response);
ctx.channel().close();
}
})
.addLast(new HttpResponseEncoder());

再次访问,我们发现可以正常读取请求路径了:

image-20220511143318500

我们来试试看搞个静态页面代理玩玩,拿出我们的陈年老模板:

image-20220511144020424

全部放进Resource文件夹,一会根据浏览器的请求路径,我们就可以返回对应的页面了,先安排一个解析器,用于解析路径然后将静态页面的内容返回:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class PageResolver {
//直接单例模式
private static final PageResolver INSTANCE = new PageResolver();
private PageResolver(){}
public static PageResolver getInstance(){
return INSTANCE;
}

//请求路径给进来,接着我们需要将页面拿到,然后转换成响应数据包发回去
public FullHttpResponse resolveResource(String path){
if(path.startsWith("/")) { //判断一下是不是正常的路径请求
path = path.equals("/") ? "index.html" : path.substring(1); //如果是直接请求根路径,那就默认返回index页面,否则就该返回什么路径的文件就返回什么
try(InputStream stream = this.getClass().getClassLoader().getResourceAsStream(path)) {
if(stream != null) { //拿到文件输入流之后,才可以返回页面
byte[] bytes = new byte[stream.available()];
stream.read(bytes);
return this.packet(HttpResponseStatus.OK, bytes); //数据先读出来,然后交给下面的方法打包
}
} catch (IOException e){
e.printStackTrace();
}
}
//其他情况一律返回404
return this.packet(HttpResponseStatus.NOT_FOUND, "404 Not Found!".getBytes());
}

//包装成FullHttpResponse,把状态码和数据写进去
private FullHttpResponse packet(HttpResponseStatus status, byte[] data){
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status);
response.content().writeBytes(data);
return response;
}
}

现在我们的静态资源解析就写好了,接着:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
channel.pipeline()
.addLast(new HttpRequestDecoder()) //Http请求解码器
.addLast(new HttpObjectAggregator(Integer.MAX_VALUE)) //搞一个聚合器,将内容聚合为一个FullHttpRequest,参数是最大内容长度
.addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
FullHttpRequest request = (FullHttpRequest) msg;
//请求进来了直接走解析
PageResolver resolver = PageResolver.getInstance();
ctx.channel().writeAndFlush(resolver.resolveResource(request.uri()));
ctx.channel().close();
}
})
.addLast(new HttpResponseEncoder());

现在我们启动服务器来试试看吧:

image-20220511150714100

可以看到页面可以正常展示了,是不是有Tomcat哪味了。

其他内置Handler介绍

Netty也为我们内置了一些其他比较好用的Handler,比如我们要打印日志:

1
2
3
4
5
channel.pipeline()
.addLast(new HttpRequestDecoder())
.addLast(new HttpObjectAggregator(Integer.MAX_VALUE))
.addLast(new LoggingHandler(LogLevel.INFO)) //添加一个日志Handler,在请求到来时会自动打印相关日志
...

日志级别我们选择INFO,现在我们用浏览器访问一下:

image-20220512125851248

可以看到每次请求的内容和详细信息都会在日志中出现,包括详细的数据包解析过程,请求头信息都是完整地打印在控制台上的。

我们也可以使用Handler对IP地址进行过滤,比如我们不希望某些IP地址连接我们的服务器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
channel.pipeline()
.addLast(new HttpRequestDecoder())
.addLast(new HttpObjectAggregator(Integer.MAX_VALUE))
.addLast(new RuleBasedIpFilter(new IpFilterRule() {
@Override
public boolean matches(InetSocketAddress inetSocketAddress) {
return !inetSocketAddress.getHostName().equals("127.0.0.1");
//进行匹配,返回false表示匹配失败
//如果匹配失败,那么会根据下面的类型决定该干什么,比如我们这里判断是不是本地访问的,如果是那就拒绝
}

@Override
public IpFilterRuleType ruleType() {
return IpFilterRuleType.REJECT; //类型,REJECT表示拒绝连接,ACCEPT表示允许连接
}
}))

现在我们浏览器访问一下看看:

image-20220512130926968

我们也可以对那些长期处于空闲的进行处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
channel.pipeline()
.addLast(new StringDecoder())
.addLast(new IdleStateHandler(10, 10, 0)) //IdleStateHandler能够侦测连接空闲状态
//第一个参数表示连接多少秒没有读操作时触发事件,第二个是写操作,第三个是读写操作都算,0表示禁用
//事件需要在ChannelInboundHandlerAdapter中进行监听处理
.addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("收到客户端数据:"+msg);
ctx.channel().writeAndFlush("已收到!");
}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
//没想到吧,这个方法原来是在这个时候用的
if(evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if(event.state() == IdleState.WRITER_IDLE) {
System.out.println("好久都没写了,看视频的你真的有认真在跟着敲吗");
} else if(event.state() == IdleState.READER_IDLE) {
System.out.println("已经很久很久没有读事件发生了,好寂寞");
}
}
}
})
.addLast(new StringEncoder());

可以看到,当我们超过一段时间不发送数据时,就会这样:

image-20220512131845296

通过这种机制,我们就可以直接关掉那些占着茅坑不拉屎的连接。

启动流程源码解读

前面我们完成了对Netty基本功能的讲解,我们最后就来看一下,Netty到底是如何启动以及进行数据处理的。

首先我们知道,整个服务端是在bind之后启动的,那么我们就从这里开始下手,不多BB直接上源码:

1
2
3
public ChannelFuture bind(int inetPort) {
return this.bind(new InetSocketAddress(inetPort)); //转换成InetSocketAddress对象
}

进来之后发现是调用的其他绑定方法,继续:

1
2
3
4
public ChannelFuture bind(SocketAddress localAddress) {
this.validate(); //再次验证一下,看看EventLoopGroup和Channel指定了没
return this.doBind((SocketAddress)ObjectUtil.checkNotNull(localAddress, "localAddress"));
}

我们继续往下看:

1
2
3
4
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = this.initAndRegister(); //上来第一句初始化然后注册
...
}

我们看看是怎么注册的:

1
2
3
4
5
6
7
8
9
10
11
12
final ChannelFuture initAndRegister() {
Channel channel = null;

try {
channel = this.channelFactory.newChannel(); //通过channelFactory创建新的Channel,实际上就是我们在一开始设定的NioServerSocketChannel
this.init(channel); //接着对创建好的NioServerSocketChannel进行初始化
...

ChannelFuture regFuture = this.config().group().register(channel); //将通道注册到bossGroup中的一个EventLoop中
...
return regFuture;
}

我们来看看是如何对创建好的ServerSocketChannel进行初始化的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
void init(Channel channel) {
setChannelOptions(channel, this.newOptionsArray(), logger);
setAttributes(channel, this.newAttributesArray());
ChannelPipeline p = channel.pipeline();
...
//在流水线上添加一个Handler,在Handler初始化的时候向EventLoop中提交一个任务,将ServerBootstrapAcceptor添加到流水线上
//这样我们的ServerSocketChannel在客户端连接时就能Accept了
p.addLast(new ChannelHandler[]{new ChannelInitializer<Channel>() {
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = ServerBootstrap.this.config.handler();
if (handler != null) {
pipeline.addLast(new ChannelHandler[]{handler});
}

ch.eventLoop().execute(new Runnable() {
public void run() {
//这里提交一个任务,将ServerBootstrapAcceptor添加到ServerSocketChannel的pipeline中
pipeline.addLast(new ChannelHandler[]{new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)});
}
});
}
}});
}

我们来看一下,ServerBootstrapAcceptor怎么处理的,直接看到它的channelRead方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//当底层NIO的ServerSocketChannel的Selector有OP_ACCEPT事件到达时,NioEventLoop会接收客户端连接,创建SocketChannel,并触发channelRead回调
public void channelRead(ChannelHandlerContext ctx, Object msg) {
//此时msg就是Accept连接创建之后的Channel对象
final Channel child = (Channel)msg;
//这里直接将我们之前编写的childHandler添加到新创建的客户端连接的流水线中(是不是感觉突然就通了)
child.pipeline().addLast(new ChannelHandler[]{this.childHandler});
AbstractBootstrap.setChannelOptions(child, this.childOptions, ServerBootstrap.logger);
AbstractBootstrap.setAttributes(child, this.childAttrs);

try {
//直接向workGroup中的一个EventLoop注册新创建好的客户端连接Channel,等待读写事件
this.childGroup.register(child).addListener(new ChannelFutureListener() {
//异步操作完成后,如果没有注册成功,就强制关闭这个Channel
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
ServerBootstrap.ServerBootstrapAcceptor.forceClose(child, future.cause());
...

所以,实际上就是我们之前讲解的主从Reactor多线程模型,只要前面理解了,这里其实很好推断。

初始化完成之后,我们来看看注册,在之前NIO阶段我们也是需要将Channel注册到对应的Selector才可以开始选择:

1
2
3
4
5
6
7
8
9
public ChannelFuture register(Channel channel) {
return this.register((ChannelPromise)(new DefaultChannelPromise(channel, this))); //转换成ChannelPromise继续
}

public ChannelFuture register(ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise); //调用Channel的Unsafe接口实现进行注册
return promise;
}

继续向下:

1
2
3
4
5
6
7
8
9
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
...
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
this.register0(promise); //这里是继续调用register0方法在进行注册
}
...
}
}

继续:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private void register0(ChannelPromise promise) {
try {
...

boolean firstRegistration = this.neverRegistered;
AbstractChannel.this.doRegister(); //这里开始执行AbstractNioChannel中的doRegister方法进行注册
AbstractChannel.this.registered = true;
AbstractChannel.this.pipeline.invokeHandlerAddedIfNeeded();
this.safeSetSuccess(promise);
if (AbstractChannel.this.isActive()) {
if (firstRegistration) {
AbstractChannel.this.pipeline.fireChannelActive(); //这里是关键
} else if (AbstractChannel.this.config().isAutoRead()) {
this.beginRead();
}
}
...
}

来到最后一级:

1
2
3
4
5
6
7
8
9
10
11
12
13
protected void doRegister() throws Exception {
boolean selected = false;

while(true) {
try {
//可以看到在这里终于是真正的进行了注册,javaChannel()得到NIO的Channel对象,然后调用register方法
//这里就和我们之前NIO一样了,将Channel注册到Selector中,可以看到Selector也是EventLoop中的
//但是注意,这里的ops参数是0,也就是不监听任何事件
this.selectionKey = this.javaChannel().register(this.eventLoop().unwrappedSelector(), 0, this);
return;
...
}
}

我们回到上一级,在doRegister完成之后,会拿到selectionKey,但是注意这时还没有监听任何事件,我们接着看到下面的fireChannelActive方法:

1
2
3
4
public final ChannelPipeline fireChannelActive() {
AbstractChannelHandlerContext.invokeChannelActive(this.head); //传的是流水线上的默认头结点
return this;
}
1
2
3
4
5
6
7
8
9
10
11
12
static void invokeChannelActive(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelActive(); //继续向下
} else {
executor.execute(new Runnable() {
public void run() {
next.invokeChannelActive();
}
});
}
}
1
2
3
4
5
6
7
8
9
10
11
private void invokeChannelActive() {
if (this.invokeHandler()) {
try {
((ChannelInboundHandler)this.handler()).channelActive(this); //依然是调用的头结点的channelActive方法进行处理
} catch (Throwable var2) {
this.invokeExceptionCaught(var2);
}
} else {
this.fireChannelActive();
}
}
1
2
3
4
public void channelActive(ChannelHandlerContext ctx) {   //这里是头结点的
ctx.fireChannelActive();
this.readIfIsAutoRead(); //继续向下
}
1
2
3
4
5
private void readIfIsAutoRead() {
if (DefaultChannelPipeline.this.channel.config().isAutoRead()) {
DefaultChannelPipeline.this.channel.read(); //继续不断向下
}
}
1
2
3
public void read(ChannelHandlerContext ctx) {
this.unsafe.beginRead(); //最后这里会调用beginRead方法
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public final void beginRead() {
this.assertEventLoop();

try {
AbstractChannel.this.doBeginRead(); //这里就是调用AbstractNioChannel的doBeginRead方法了
} catch (final Exception var2) {
this.invokeLater(new Runnable() {
public void run() {
AbstractChannel.this.pipeline.fireExceptionCaught(var2);
}
});
this.close(this.voidPromise());
}

}
1
2
3
4
5
6
7
8
9
10
protected void doBeginRead() throws Exception {
SelectionKey selectionKey = this.selectionKey; //先拿到之前注册好的selectionKey
if (selectionKey.isValid()) {
this.readPending = true;
int interestOps = selectionKey.interestOps(); //把监听的操作取出来
if ((interestOps & this.readInterestOp) == 0) { //如果没有监听任何操作
selectionKey.interestOps(interestOps | this.readInterestOp); //那就把readInterestOp事件进行监听,这里的readInterestOp实际上就是OP_ACCEPT
}
}
}

这样,Channel在初始化完成之后也完成了底层的注册,已经可以开始等待事件了。

我们现在回到之前的doBind方法的注册位置,现在注册完成之后,基本上整个主从Reactor结构就已经出来了,我们来看看还要做些什么:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = this.initAndRegister(); //目前初始化和注册都已经成功了
final Channel channel = regFuture.channel(); //由于是异步操作,我们通过ChannelFuture拿到对应的ServerSocketChannel对象
if (regFuture.cause() != null) {
return regFuture;
} else if (regFuture.isDone()) { //如果说初始化已经完成了
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise); //直接开始进行进一步的绑定
return promise;
} else {
//如果还没搞完,那就创Promis继续等待任务完成
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
promise.setFailure(cause);
} else {
promise.registered();
AbstractBootstrap.doBind0(regFuture, channel, localAddress, promise);
}

}
});
return promise;
}
}

可以看到最后都会走到doBind0方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
private static void doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) {
//最后会向Channel已经注册到的EventLoop中提交一个新的任务
channel.eventLoop().execute(new Runnable() {
public void run() {
if (regFuture.isSuccess()) {
//这里才是真正调用Channel底层进行绑定操作
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}

至此,服务端的启动流程结束。我们前面还提到了NIO的空轮询问题,这里我们来看看Netty是如何解决的,我们直接定位到NioEventLoop中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//由于代码太多,这里省略大部分代码
while(true) {
boolean var34;
try {
...
try {
if (!this.hasTasks()) {
strategy = this.select(curDeadlineNanos); //首先会在这里进行Selector.select()操作,跟NIO是一样的
}
...

++selectCnt; //每次唤醒都会让selectCnt自增
this.cancelledKeys = 0;

...

if (!ranTasks && strategy <= 0) {
if (this.unexpectedSelectorWakeup(selectCnt)) { //这里会进行判断是否出现空轮询BUG
...

我们来看看是怎么进行判断的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private boolean unexpectedSelectorWakeup(int selectCnt) {
if (Thread.interrupted()) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because Thread.currentThread().interrupt() was called. Use NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
}

return true;
//如果selectCnt大于等于SELECTOR_AUTO_REBUILD_THRESHOLD(默认为512)那么会直接重建Selector
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.", selectCnt, this.selector);
this.rebuildSelector(); //当前的Selector出现BUG了,得重建一个Selector
return true;
} else {
return false;
}
}

实际上,当每次空轮询发生时会有专门的计数器+1,如果空轮询的次数超过了512次,就认为其触发了空轮询bug,触发bug后,Netty直接重建一个Selector,将原来的Channel重新注册到新的 Selector上,将旧的 Selector关掉,这样就防止了无限循环。