Netty的ChannelPipeline和ChannelHndler

Netty的ChannelPipeline和ChannelHndler教程

Netty 的 ChannelPipeline 我们可以简单的理解为一个 Channel 数据管道,消息在 ChannelPipeline 中流动和传递。ChannelPipeline 持有 I/O 事件的连接器 ChannelHandler 的链表。它是 ChannelHandler 的容器,负责 ChannelHandler 的管理和事件拦截与调度。在Channel创建的时候,会同时创建ChannelPipeline。

ChannelHandler 像一个拦截器,它在 ChannelPipeline 中,比如有一串数据,在 ChannelPipeline 流通,那么 ChannelHandler 就会对这部分数据进行一一拦截处理,如果符合自己的处理规则,就对数据进行处理,如果不符合就让数据流通。ChannelHandler 有系统自带的 handler 用户也可以自定义 Handler 来实现自定义逻辑。

Netty的ChannelPipeline事件处理

ChannelPipeline 的处理流程如下图:

24 ChannelPipeline事件处理.png

从图中我们可以看到有种类型的事件,一个是 inbound 事件,另外一个就是 outbound 事件。inbound 事件通常由 I/O 线程触发,例如 TCP 链路建立事件,链路关闭事件,读事件,异常通知事件等等。outbound 事件通常是由用户主动发起的网络 I/O 操作,例如用户发起的连接操作、绑定操作、消息发送等操作。

上图的事件处理流程大概:

  1. 底层的 SocketChannel read() 方法读取 ByteBuf,触发 ChannelRead 事件,由 I/O 线程 NioEventLoop 调用 ChannelPipeline 的 fireChannelRead(Object msg) 方法,将消息 (ByteBuf) 传输到 ChannelPipeline 中。
  2. 读取消息依次调用 HeadHandler、InboundHandler 1、InboundHandler 2 … TailHandler 拦截和处理,在这个过程中,任何的 InboundHandler 都可以中断当前的流程,结束消息传递。
  3. 调用 handler 的 write 方法发送消息的时候,和读取消息顺序相反,从 TailHandler 开始,从 OutboundHandler n 到 OutboundHandler 1、HeadHandler,最终被添加到消息发送缓冲区中等待刷新和发送,在这个过程中也可以终端消息的传递。

inbound相关事件

  • ChannelHandlerContext.fireChannelRegistered():Channel 注册事件
  • ChannelHandlerContext.fireChannelActive():TCP 链路建立成功,Channel 激活事件
  • ChannelHandlerContext.fireChannelRead(Object):读事件
  • ChannelHandlerContext.fireChannelReadComplete():读操作完成通知事件
  • ChannelHandlerContext.fireExceptionCaught(Throwable):异常通知事件
  • ChannelHandlerContext.fireUserEventTriggered(Object):用户自定义事件
  • ChannelHandlerContext.fireChannelWritabilityChanged():Channel 的可写状态变化通知事件
  • ChannelHandlerContext.fireChannelInactive():TCP 连接关闭,链路不可用通知事件
  • ChannelHandlerContext.fireChannelUnregistered():Channel 没有注册成功通知事件

当发送某个 I/O 事件的时候,例如链路建立,链路关闭,读取操作完成等,都会产生一个事件,事件在 pipeline 中得到传播和处理,它是事件处理总入口。由于网络 I/O 相关的事件有限,所以 Netty 对这些事件进行了统一的抽象。pipeline 里面以 fireXXX 命名的方法都是从 I/O 线程流向用户业务 Handler 的 inbound 事件。

outbound相关事件

  • ChannelHandlerContext.bind(SocketAddress, ChannelPromise):绑定本地地址事件
  • ChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise):连接服务端事件
  • ChannelHandlerContext.write(Object, ChannelPromise):发送事件
  • ChannelHandlerContext.flush():刷新数据事件
  • ChannelHandlerContext.read():读事件
  • ChannelHandlerContext.disconnect(ChannelPromise):断开连接事件
  • ChannelHandlerContext.close(ChannelPromise):关闭当前 Channel 事件
  • ChannelHandlerContext.deregister(ChannelPromise):注销事件

由用户线程或者代码发起 I/O 操作被称为 outbound 事件,事实上 inbound 和 outbound 是 netty 自身根据事件在 pipeline 中的流向抽象出来的术语。

ChannelHandler

Netty 中有自带的 ChannelHandler 实现,用户也可以按照自己的诉求,自定义 Handler。因为 ChannelHandler 是一个接口,所以任何实现该接口的都需要实现里面所有的方法,因此就有了 ChannelHandlerAdapter 类。用户可以只实现自己感兴趣的事件。Netty 框架提供了 ChannelInboundHandlerAdapter、ChannelOutboundHandlerAdapter 和 ChannelDuplexHandler 三种适配类。我们在使用的时候,只需要实现我们关注的方法就可以了。

ChannelHandlerContext

每个 ChannelHandler 通过 add 方法加入到 ChannelPipeline 中去的时候,会创建一个对应的 ChannelHandlerContext,并且绑定,ChannelPipeline 实际维护的是 ChannelHandlerContext 的关系。在 DefaultChannelPipeline 源码中可以看到会保存第一个 ChannelHandlerContext 以及最后一个 ChannelHandlerContext 的引用。

Nettty的ChannelPipeline和ChannelHndler总结

ChannelPipeline 是 ChannelHandler 的管理容器,负责 ChannelHandler 的查询、添加、替换和删除。每个 Channel 都会有一个 ChannelPipeline , 它里面会管理 ChannelHandler ,用来处理 inbound 和 outbound 相关事件。inbound 和 outbound 是自身根据事件在 pipeline 中的流向来定义的,用户线程写入的数据叫 outbound,从底层读取数据就是 inbound。