MessagePack 是一个类似于 JSON,但是比 JSON 更快,更小的高效的二进制序列化框架。我们都知道 JSON 是跨语言的,所以 MessagePack 也是支持跨语言的高效的序列化框架。
在 Netty 中可以很方便的集成第三方序列化框架,Netty 已经预集成了几种常用的编码框架,本章节中,我们可以写一个案例用来集成 MessagePack 框架。
我们需要新建一个 pom 项目文件,引入相关的 netty 和 messagePack 的包,代码如下:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.20.Final</version>
</dependency>
<dependency>
<groupId>org.msgpack</groupId>
<artifactId>msgpack</artifactId>
<version>0.6.12</version>
</dependency>
package net.haicoder.messagepack;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import org.msgpack.MessagePack;
public class MsgPackEncoder extends MessageToByteEncoder<Object> {
/**
* 将 Object 类型的 POJO 对象编码为 byte 数组,写到 ByteBuffer 中
*
* @param ctx
* @param msg
* @param out
* @throws Exception
*/
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
MessagePack messagePack = new MessagePack();
byte[] raw = messagePack.write(msg);
out.writeBytes(raw);
}
}
package net.haicoder.messagepack;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import org.msgpack.MessagePack;
import java.util.List;
public class MsgpackDecoder extends MessageToMessageDecoder<ByteBuf> {
/**
* 解码
*
* @param ctx
* @param msg
* @param out
* @throws Exception
*/
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
final byte[] array;
final int length = msg.readableBytes();
array = new byte[length];
msg.getBytes(msg.readerIndex(), array, 0, length);
MessagePack messagePack = new MessagePack();
out.add(messagePack.read(array));
}
}
package net.haicoder.messagepack;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
public class NettyServer {
public static void main(String[] args) {
Integer port = 8080;
new NettyServer().bind(port);
}
public void bind(Integer port) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
final ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2));
ch.pipeline().addLast("msgpack decoder", new MsgpackDecoder());
ch.pipeline().addLast("frameEncoder", new LengthFieldPrepender(2));
ch.pipeline().addLast("msgpack encoder", new MsgPackEncoder());
ch.pipeline().addLast(new NettyServerHandler());
}
});
ChannelFuture future = serverBootstrap.bind(port).sync();
//进行阻塞,等待服务器连接关闭之后 main 方法退出,程序结束
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
package net.haicoder.messagepack;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.concurrent.atomic.AtomicInteger;
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
private static AtomicInteger atomicInteger = new AtomicInteger();
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(atomicInteger.addAndGet(1) + "----->" + Thread.currentThread().getName() + ",The server receive msg : " + msg);
ctx.writeAndFlush(msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close(); // 发生异常关闭链路
}
}
package net.haicoder.messagepack;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
public class NettyClient {
public void connector(int port, String host, final int sendNumber) throws Exception {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(eventLoopGroup).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2));
ch.pipeline().addLast("msgpack decoder", new MsgpackDecoder());
ch.pipeline().addLast("frameEncoder)", new LengthFieldPrepender(2));
ch.pipeline().addLast("msg encoder", new MsgPackEncoder());
ch.pipeline().addLast(new NettyClientHandler(sendNumber));
}
});
ChannelFuture f = b.connect(host, port).sync();
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
eventLoopGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new NettyClient().connector(8080, "localhost", 10);
}
}
package net.haicoder.messagepack;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.concurrent.atomic.AtomicInteger;
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
private static AtomicInteger atomicInteger = new AtomicInteger();
private final int sendNumber;
public NettyClientHandler(int sendNumber) {
this.sendNumber = sendNumber;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for (int i = 0; i < sendNumber; i++) {
ctx.write("hello,haicoder。I am " + i);
}
ctx.flush();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println((atomicInteger.addAndGet(1)) + "---" + Thread.currentThread().getName() + ",Server return Message:" + msg);
}
}
运行服务端结果如下:
客户端如下:
我们通过自己实现 MessagePack 的编码和解码功能,在 netty 中很好的接入进去。让客户端和服务端进行交互。