添加protocol

This commit is contained in:
zhuyijun 2021-10-18 19:09:43 +08:00
parent 6312555395
commit 3634fe2ab8
9 changed files with 27 additions and 23 deletions

View File

@ -19,7 +19,10 @@ public class MyClientHandler extends SimpleChannelInboundHandler<MessageProtocol
*/ */
@Override @Override
protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception { protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
System.out.println("客户端接收到的消息:");
System.out.println("长度="+msg.getLen());
System.out.println("内容="+new String(msg.getContent(),CharsetUtil.UTF_8));
System.out.println("客户端接收消息包数量="+(++this.count));
} }
@Override @Override

View File

@ -10,7 +10,10 @@ public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
@Override @Override
protected void initChannel(SocketChannel ch) throws Exception { protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline() ch.pipeline()
//编码器
.addLast(new MyMessageEncoder()) .addLast(new MyMessageEncoder())
//解码器
.addLast(new MyMessageDecoder())
.addLast(new MyClientHandler()); .addLast(new MyClientHandler());
} }
} }

View File

@ -13,7 +13,7 @@ public class MyMessageDecoder extends ReplayingDecoder<MessageProtocol> {
@Override @Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println("MyMessageDecoder方法被调用 解码"); System.out.println("\nMyMessageDecoder方法被调用 解码");
//需要将得到的二进制字节码 转成 MessageProtocol //需要将得到的二进制字节码 转成 MessageProtocol
int length = in.readInt(); int length = in.readInt();
byte[] content = new byte[length]; byte[] content = new byte[length];

View File

@ -9,7 +9,7 @@ public class MyMessageEncoder extends MessageToByteEncoder<MessageProtocol> {
@Override @Override
protected void encode(ChannelHandlerContext ctx, MessageProtocol msg, ByteBuf out) throws Exception { protected void encode(ChannelHandlerContext ctx, MessageProtocol msg, ByteBuf out) throws Exception {
System.out.println("MyMessageEncoder方法被调用 编码"); System.out.println("\nMyMessageEncoder方法被调用 编码");
out.writeInt(msg.getLen()); out.writeInt(msg.getLen());
out.writeBytes(msg.getContent()); out.writeBytes(msg.getContent());
} }

View File

@ -7,6 +7,7 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil; import io.netty.util.CharsetUtil;
import java.nio.charset.StandardCharsets;
import java.util.UUID; import java.util.UUID;
public class MyServerHandler extends SimpleChannelInboundHandler<MessageProtocol> { public class MyServerHandler extends SimpleChannelInboundHandler<MessageProtocol> {
@ -20,9 +21,19 @@ public class MyServerHandler extends SimpleChannelInboundHandler<MessageProtocol
@Override @Override
protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception { protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
//接收数据并处理 //接收数据并处理
System.out.println("\n服务端接收信息:"); System.out.println("服务端接收信息:");
System.out.println("长度="+msg.getLen()); System.out.println("长度="+msg.getLen());
System.out.println("内容="+new String(msg.getContent(),CharsetUtil.UTF_8)); System.out.println("内容="+new String(msg.getContent(),CharsetUtil.UTF_8));
System.out.println("服务器接收消息包数量="+(++this.count)); System.out.println("服务器接收消息包数量="+(++this.count));
//回复消息
String responseContent = UUID.randomUUID().toString();
int responseLen = responseContent.getBytes(CharsetUtil.UTF_8).length;
MessageProtocol messageProtocol = new MessageProtocol();
messageProtocol.setLen(responseLen);
messageProtocol.setContent(responseContent.getBytes(CharsetUtil.UTF_8));
ctx.writeAndFlush(messageProtocol);
} }
} }

View File

@ -11,7 +11,10 @@ public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
@Override @Override
protected void initChannel(SocketChannel ch) throws Exception { protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline() ch.pipeline()
//解码器
.addLast(new MyMessageDecoder()) .addLast(new MyMessageDecoder())
//编码器
.addLast(new MyMessageEncoder())
.addLast(new MyServerHandler()); .addLast(new MyServerHandler());
} }
} }

View File

@ -16,21 +16,13 @@
package cn.zyjblogs.netty.source.echo; package cn.zyjblogs.netty.source.echo;
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture; import io.netty.channel.*;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.EventExecutorGroup;
/** /**
* Sends one message when a connection is open and echoes back any received * Sends one message when a connection is open and echoes back any received
@ -46,7 +38,6 @@ public final class EchoClient {
static final int SIZE = Integer.parseInt(System.getProperty("size", "256")); static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
EventExecutorGroup eventExecutors = new DefaultEventExecutorGroup(4);
// Configure SSL.git // Configure SSL.git
final SslContext sslCtx; final SslContext sslCtx;
if (SSL) { if (SSL) {
@ -71,8 +62,7 @@ public final class EchoClient {
p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT)); p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
} }
//p.addLast(new LoggingHandler(LogLevel.INFO)); //p.addLast(new LoggingHandler(LogLevel.INFO));
//如果在handler前面添加指定EventExecutorGroupname该handler优先加入到该线程池 p.addLast(new EchoClientHandler());
p.addLast(eventExecutors,new EchoClientHandler());
} }
}); });

View File

@ -19,7 +19,6 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/** /**
* Handler implementation for the echo client. It initiates the ping-pong * Handler implementation for the echo client. It initiates the ping-pong
@ -43,7 +42,6 @@ public class EchoClientHandler extends ChannelInboundHandlerAdapter {
@Override @Override
public void channelActive(ChannelHandlerContext ctx) { public void channelActive(ChannelHandlerContext ctx) {
ctx.writeAndFlush(firstMessage); ctx.writeAndFlush(firstMessage);
ctx.writeAndFlush(Unpooled.copiedBuffer("你好", CharsetUtil.UTF_8));
} }
@Override @Override

View File

@ -16,11 +16,7 @@
package cn.zyjblogs.netty.source.echo; package cn.zyjblogs.netty.source.echo;
import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture; import io.netty.channel.*;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;