FixedLengthFrameDecoder案例

FixedLengthFrameDecoder案例教程

上一章,我们了解了 FixedLengthFrameDecoder 理论知识,它在 Netty 中是一个固定长度解码器,有着很重要的作用。用户可以在服务端和客户端设置自定义的长度。该解码器就会以该长度定长解码。

案例

我们新建一个 Maven 项目,然后在根 pom 文件中引入 Netty 相关 jar 包。代码如下:

<dependencies> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.20.Final</version> </dependency> </dependencies>

Server端

FixedLengthServer

package net.haicoder; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.FixedLengthFrameDecoder; import io.netty.handler.codec.string.StringDecoder; public class FixedLengthServer { public static void main(String[] args) { Integer port = 9899; new FixedLengthServer().bind(port); } public void bind(Integer port) { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { final ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 64) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new FixedLengthFrameDecoder(150)); ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new FixedServerHandler()); } }); ChannelFuture future = serverBootstrap.bind(port).sync(); //进行阻塞,等待服务器连接关闭之后 main 方法退出,程序结束 future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }

FixedServerHandler

package net.haicoder; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.util.concurrent.atomic.AtomicInteger; public class FixedServerHandler extends ChannelInboundHandlerAdapter { private static AtomicInteger atomicInteger = new AtomicInteger(); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String body = (String) msg; String receiveMsg = body.trim(); System.out.println(atomicInteger.addAndGet(1) + "----->" + Thread.currentThread().getName() + ",The server receive order : " + receiveMsg); ByteBuf respByteBuf = Unpooled.copiedBuffer(body.getBytes()); ctx.writeAndFlush(respByteBuf); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("-----客户端关闭:" + ctx.channel().remoteAddress()); /**当发生异常时,关闭 ChannelHandlerContext,释放和它相关联的句柄等资源 */ ctx.close(); } }

客户端

FixedLengthClient

package net.haicoder; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.FixedLengthFrameDecoder; import io.netty.handler.codec.string.StringDecoder; public class FixedLengthClient { public static void main(String[] args) { for (int i = 0; i < 2; i++) { new Thread(new MyThread()).start(); } } static class MyThread implements Runnable { public void run() { connect("localhost", 9899); } public void connect(String host, int port) { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new FixedLengthFrameDecoder(64)); ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new FixedClientHandler()); } }); ChannelFuture channelFuture = bootstrap.connect(host, port).sync(); channelFuture.channel().closeFuture().channel(); } catch (Exception e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } } } }

FixedClientHandler

package net.haicoder; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.util.concurrent.atomic.AtomicInteger; public class FixedClientHandler extends ChannelInboundHandlerAdapter { private static AtomicInteger atomicInteger = new AtomicInteger(); @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { for (int i = 0; i < 10; i++) { String reqMsg = (i + 1) + "-我是客户端" + Thread.currentThread().getName(); byte[] reqMsgByte = reqMsg.getBytes("UTF-8"); ByteBuf reqByteBuf = Unpooled.buffer(reqMsgByte.length); reqByteBuf.writeBytes(reqMsgByte); /** 每次发送的同时进行刷新*/ ctx.writeAndFlush(reqByteBuf); } } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { /**与服务器同理 * 这个 msg 已经是解码成功的消息,所以不再需要像以前一样使用 ByteBuf 进行编码 * 直接转为 string 字符串即可*/ String body = (String) msg; System.out.println((atomicInteger.addAndGet(1)) + "---" + Thread.currentThread().getName() + ",Server return Message:" + body); } /** * 当发生异常时,打印异常 日志,释放客户端资源 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { /**释放资源*/ ctx.close(); } }

我们以长度为 64 为例子,运行结果如下:

03 定长解析码运行结果.png

FixedLengthFrameDecoder案例总结

FixedLengthFrameDecoder 它是一个定长解码器,它不会识别到你的输入的语句是否已经结束,只要语句满足定长度就会获取解析。