上一章,我们了解了 FixedLengthFrameDecoder 理论知识,它在 Netty 中是一个固定长度解码器,有着很重要的作用。用户可以在服务端和客户端设置自定义的长度。该解码器就会以该长度定长解码。
我们新建一个 Maven 项目,然后在根 pom 文件中引入 Netty 相关 jar 包。代码如下:
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.20.Final</version>
</dependency>
</dependencies>
package net.haicoder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
public class FixedLengthServer {
public static void main(String[] args) {
Integer port = 9899;
new FixedLengthServer().bind(port);
}
public void bind(Integer port) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
final ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 64)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new FixedLengthFrameDecoder(150));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new FixedServerHandler());
}
});
ChannelFuture future = serverBootstrap.bind(port).sync();
//进行阻塞,等待服务器连接关闭之后 main 方法退出,程序结束
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
package net.haicoder;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.concurrent.atomic.AtomicInteger;
public class FixedServerHandler extends ChannelInboundHandlerAdapter {
private static AtomicInteger atomicInteger = new AtomicInteger();
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String) msg;
String receiveMsg = body.trim();
System.out.println(atomicInteger.addAndGet(1) + "----->" + Thread.currentThread().getName() + ",The server receive order : " + receiveMsg);
ByteBuf respByteBuf = Unpooled.copiedBuffer(body.getBytes());
ctx.writeAndFlush(respByteBuf);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("-----客户端关闭:" + ctx.channel().remoteAddress());
/**当发生异常时,关闭 ChannelHandlerContext,释放和它相关联的句柄等资源 */
ctx.close();
}
}
package net.haicoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
public class FixedLengthClient {
public static void main(String[] args) {
for (int i = 0; i < 2; i++) {
new Thread(new MyThread()).start();
}
}
static class MyThread implements Runnable {
public void run() {
connect("localhost", 9899);
}
public void connect(String host, int port) {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new FixedLengthFrameDecoder(64));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new FixedClientHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
channelFuture.channel().closeFuture().channel();
} catch (Exception e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
}
}
package net.haicoder;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.concurrent.atomic.AtomicInteger;
public class FixedClientHandler extends ChannelInboundHandlerAdapter {
private static AtomicInteger atomicInteger = new AtomicInteger();
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for (int i = 0; i < 10; i++) {
String reqMsg = (i + 1) + "-我是客户端" + Thread.currentThread().getName();
byte[] reqMsgByte = reqMsg.getBytes("UTF-8");
ByteBuf reqByteBuf = Unpooled.buffer(reqMsgByte.length);
reqByteBuf.writeBytes(reqMsgByte);
/** 每次发送的同时进行刷新*/
ctx.writeAndFlush(reqByteBuf);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
/**与服务器同理
* 这个 msg 已经是解码成功的消息,所以不再需要像以前一样使用 ByteBuf 进行编码
* 直接转为 string 字符串即可*/
String body = (String) msg;
System.out.println((atomicInteger.addAndGet(1)) + "---" + Thread.currentThread().getName() + ",Server return Message:" + body);
}
/**
* 当发生异常时,打印异常 日志,释放客户端资源
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
/**释放资源*/
ctx.close();
}
}
我们以长度为 64 为例子,运行结果如下:
FixedLengthFrameDecoder 它是一个定长解码器,它不会识别到你的输入的语句是否已经结束,只要语句满足定长度就会获取解析。