MessagePack编解码

MessagePack编解码教程

MessagePack 是一个类似于 JSON,但是比 JSON 更快,更小的高效的二进制序列化框架。我们都知道 JSON 是跨语言的,所以 MessagePack 也是支持跨语言的高效的序列化框架。

MessagePack案例

在 Netty 中可以很方便的集成第三方序列化框架,Netty 已经预集成了几种常用的编码框架,本章节中,我们可以写一个案例用来集成 MessagePack 框架。

我们需要新建一个 pom 项目文件,引入相关的 netty 和 messagePack 的包,代码如下:

<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.20.Final</version> </dependency> <dependency> <groupId>org.msgpack</groupId> <artifactId>msgpack</artifactId> <version>0.6.12</version> </dependency>

使用 MessagePackage 编码和解码

MsgPackEncoder编码

package net.haicoder.messagepack; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; import org.msgpack.MessagePack; public class MsgPackEncoder extends MessageToByteEncoder<Object> { /** * 将 Object 类型的 POJO 对象编码为 byte 数组,写到 ByteBuffer 中 * * @param ctx * @param msg * @param out * @throws Exception */ @Override protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { MessagePack messagePack = new MessagePack(); byte[] raw = messagePack.write(msg); out.writeBytes(raw); } }

MsgpackDecoder解码

package net.haicoder.messagepack; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageDecoder; import org.msgpack.MessagePack; import java.util.List; public class MsgpackDecoder extends MessageToMessageDecoder<ByteBuf> { /** * 解码 * * @param ctx * @param msg * @param out * @throws Exception */ @Override protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception { final byte[] array; final int length = msg.readableBytes(); array = new byte[length]; msg.getBytes(msg.readerIndex(), array, 0, length); MessagePack messagePack = new MessagePack(); out.add(messagePack.read(array)); } }

服务端

NettyServer

package net.haicoder.messagepack; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; public class NettyServer { public static void main(String[] args) { Integer port = 8080; new NettyServer().bind(port); } public void bind(Integer port) { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { final ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2)); ch.pipeline().addLast("msgpack decoder", new MsgpackDecoder()); ch.pipeline().addLast("frameEncoder", new LengthFieldPrepender(2)); ch.pipeline().addLast("msgpack encoder", new MsgPackEncoder()); ch.pipeline().addLast(new NettyServerHandler()); } }); ChannelFuture future = serverBootstrap.bind(port).sync(); //进行阻塞,等待服务器连接关闭之后 main 方法退出,程序结束 future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }

NettyServerHandler

package net.haicoder.messagepack; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.util.concurrent.atomic.AtomicInteger; public class NettyServerHandler extends ChannelInboundHandlerAdapter { private static AtomicInteger atomicInteger = new AtomicInteger(); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println(atomicInteger.addAndGet(1) + "----->" + Thread.currentThread().getName() + ",The server receive msg : " + msg); ctx.writeAndFlush(msg); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); // 发生异常关闭链路 } }

客户端

NettyClient

package net.haicoder.messagepack; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; public class NettyClient { public void connector(int port, String host, final int sendNumber) throws Exception { EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(eventLoopGroup).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2)); ch.pipeline().addLast("msgpack decoder", new MsgpackDecoder()); ch.pipeline().addLast("frameEncoder)", new LengthFieldPrepender(2)); ch.pipeline().addLast("msg encoder", new MsgPackEncoder()); ch.pipeline().addLast(new NettyClientHandler(sendNumber)); } }); ChannelFuture f = b.connect(host, port).sync(); f.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { eventLoopGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { new NettyClient().connector(8080, "localhost", 10); } }

NettyClientHandler

package net.haicoder.messagepack; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.util.concurrent.atomic.AtomicInteger; public class NettyClientHandler extends ChannelInboundHandlerAdapter { private static AtomicInteger atomicInteger = new AtomicInteger(); private final int sendNumber; public NettyClientHandler(int sendNumber) { this.sendNumber = sendNumber; } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { for (int i = 0; i < sendNumber; i++) { ctx.write("hello,haicoder。I am " + i); } ctx.flush(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println((atomicInteger.addAndGet(1)) + "---" + Thread.currentThread().getName() + ",Server return Message:" + msg); } }

运行服务端结果如下:

03 messagepack 服务端运行结果.png

客户端如下:

04 messagepack客户端运行结果.png

MessagePack编解码总结

我们通过自己实现 MessagePack 的编码和解码功能,在 netty 中很好的接入进去。让客户端和服务端进行交互。