diff --git a/nettyPro/src/main/java/cn/zyjblogs/netty/dubborpc/constant/CommonConstant.java b/nettyPro/src/main/java/cn/zyjblogs/netty/dubborpc/constant/CommonConstant.java new file mode 100644 index 0000000..2574d0b --- /dev/null +++ b/nettyPro/src/main/java/cn/zyjblogs/netty/dubborpc/constant/CommonConstant.java @@ -0,0 +1,6 @@ +package cn.zyjblogs.netty.dubborpc.constant; + + +public class CommonConstant { + public static String FRE = "HelloService#hello#"; +} diff --git a/nettyPro/src/main/java/cn/zyjblogs/netty/dubborpc/customer/ClientBootStrap.java b/nettyPro/src/main/java/cn/zyjblogs/netty/dubborpc/customer/ClientBootStrap.java new file mode 100644 index 0000000..05e97f0 --- /dev/null +++ b/nettyPro/src/main/java/cn/zyjblogs/netty/dubborpc/customer/ClientBootStrap.java @@ -0,0 +1,21 @@ +package cn.zyjblogs.netty.dubborpc.customer; + +import cn.zyjblogs.netty.dubborpc.constant.CommonConstant; +import cn.zyjblogs.netty.dubborpc.netty.NettyClient; +import cn.zyjblogs.netty.dubborpc.publicinterface.HelloService; + +import java.util.Scanner; + +public class ClientBootStrap { + public static void main(String[] args) throws InterruptedException { + NettyClient customer = new NettyClient(); + //创建代理对象 + HelloService service = (HelloService) customer.getBean(HelloService.class, CommonConstant.FRE); + //通过代理对象调用服务提供者的方法(服务) + while (true) { + Thread.sleep(4 * 1000); + String res = service.hello("你好 ~~~~"); + System.out.println(res); + } + } +} diff --git a/nettyPro/src/main/java/cn/zyjblogs/netty/dubborpc/netty/NettyClient.java b/nettyPro/src/main/java/cn/zyjblogs/netty/dubborpc/netty/NettyClient.java new file mode 100644 index 0000000..ded01b1 --- /dev/null +++ b/nettyPro/src/main/java/cn/zyjblogs/netty/dubborpc/netty/NettyClient.java @@ -0,0 +1,56 @@ +package cn.zyjblogs.netty.dubborpc.netty; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.string.StringDecoder; +import io.netty.handler.codec.string.StringEncoder; + +import java.lang.reflect.Proxy; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +public class NettyClient { + private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); + private static NettyClientHandler client; + + public Object getBean(final Class serviceClass,final String prefix){ + return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),new Class[]{serviceClass},(proxy,method,args)->{ + //{}部分代码,客户端每次调用都会执行 + if (client == null){ + initClient(); + } + //设置要发给服务器端的信息参数 + client.setParam(prefix+args[0]); + return executor.submit(client).get(); + }); + } + //初始化客户端 + private static void initClient() { + client = new NettyClientHandler(); + //创建EventLoopGroup + NioEventLoopGroup group = new NioEventLoopGroup(); + Bootstrap b = new Bootstrap(); + b.group(group) + .channel(NioSocketChannel.class) + .option(ChannelOption.TCP_NODELAY,true) + .handler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline pipeline = ch.pipeline(); + pipeline.addLast(new StringDecoder()) + .addLast(new StringEncoder()) + .addLast(client); + } + }); + try { + b.connect("localhost",7000).sync(); + } catch (InterruptedException e) { + group.shutdownGracefully(); + } + } +} diff --git a/nettyPro/src/main/java/cn/zyjblogs/netty/dubborpc/netty/NettyClientHandler.java b/nettyPro/src/main/java/cn/zyjblogs/netty/dubborpc/netty/NettyClientHandler.java new file mode 100644 index 0000000..7eaa814 --- /dev/null +++ b/nettyPro/src/main/java/cn/zyjblogs/netty/dubborpc/netty/NettyClientHandler.java @@ -0,0 +1,63 @@ +package cn.zyjblogs.netty.dubborpc.netty; + + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; + +import java.util.concurrent.Callable; + +public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable { + /** + * 上下文 + */ + private ChannelHandlerContext context; + /** + * 返回结果 + */ + private String result; + /** + * 客户端调用方法时 传入的结果 + */ + private String param; + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + //会在其他方法使用上下文 + context = ctx; + } + + /** + * 接收到服务器的数据后,调用方法 + * @param ctx + * @param msg + * @throws Exception + */ + @Override + public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + result = msg.toString(); + //唤醒等待的线程 + notify(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + ctx.close(); + } + + /** + * 被代理对象调用,发送数据给服务器,等待被唤醒(channelRead) -> 返回结果 + * @return + * @throws Exception + */ + @Override + public synchronized String call() throws Exception { + context.writeAndFlush(param); + //进行wait + wait(); + return result; + } + + public void setParam(String param) { + this.param = param; + } +} diff --git a/nettyPro/src/main/java/cn/zyjblogs/netty/dubborpc/netty/NettyServer.java b/nettyPro/src/main/java/cn/zyjblogs/netty/dubborpc/netty/NettyServer.java new file mode 100644 index 0000000..43fdcb4 --- /dev/null +++ b/nettyPro/src/main/java/cn/zyjblogs/netty/dubborpc/netty/NettyServer.java @@ -0,0 +1,67 @@ +package cn.zyjblogs.netty.dubborpc.netty; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.string.StringDecoder; +import io.netty.handler.codec.string.StringEncoder; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; + +/** + * + */ +public class NettyServer { + //编写一个方法,完成对Netty Server的初始化和启动 + private static void startServer0(String hostname,int port) { + EventLoopGroup bossGroup = new NioEventLoopGroup(1); + EventLoopGroup workerGroup = new NioEventLoopGroup(); + + //创建服务器端的启动对象,配置参数 + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .option(ChannelOption.SO_BACKLOG, 100) + .handler(new LoggingHandler(LogLevel.INFO)) + .childHandler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline p = ch.pipeline(); + //p.addLast(new LoggingHandler(LogLevel.INFO)); + p.addLast(new StringDecoder()) + .addLast(new StringEncoder()) + //业务处理器 + .addLast(new NettyServerHandler()); + } + }); + try { + //绑定一个端口并且同步,生成了一个ChannelFuture对象 + ChannelFuture cf = bootstrap.bind(hostname,port).sync(); + System.out.println("服务提供方开始提供"); + cf.addListener((ChannelFuture channelFuture) -> { + if (channelFuture.isSuccess()) { + System.out.println("监听端口 6668 成功"); + } else { + System.err.println("监听端口 6668 失败"); + } + }); + //对关闭通道进行监听 + cf.channel().closeFuture().sync(); + } catch (Exception e) { + System.out.println("启动报错了" + e.getMessage()); + }finally { + //优雅关闭 + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + } + } + public static void startServer(String hostname,int port){ + startServer0(hostname,port); + } +} diff --git a/nettyPro/src/main/java/cn/zyjblogs/netty/dubborpc/netty/NettyServerHandler.java b/nettyPro/src/main/java/cn/zyjblogs/netty/dubborpc/netty/NettyServerHandler.java new file mode 100644 index 0000000..5a2b7ef --- /dev/null +++ b/nettyPro/src/main/java/cn/zyjblogs/netty/dubborpc/netty/NettyServerHandler.java @@ -0,0 +1,26 @@ +package cn.zyjblogs.netty.dubborpc.netty; + +import cn.zyjblogs.netty.dubborpc.constant.CommonConstant; +import cn.zyjblogs.netty.dubborpc.provider.HelloServiceImpl; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; + +//服务器这边handler比较简单 +public class NettyServerHandler extends ChannelInboundHandlerAdapter { + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + //获取客户端发送的消息并调用服务 + System.out.println("msg: "+msg); + //客户端在调用服务器的api 时,我们需要定义一个协议 + //要求 每次发消息时都必须以某个字符串开头 “HelloService#hello#” + if (msg.toString().startsWith(CommonConstant.FRE)){ + String hello = new HelloServiceImpl().hello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1)); + ctx.writeAndFlush(hello); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + ctx.close(); + } +} diff --git a/nettyPro/src/main/java/cn/zyjblogs/netty/dubborpc/provider/HelloServiceImpl.java b/nettyPro/src/main/java/cn/zyjblogs/netty/dubborpc/provider/HelloServiceImpl.java new file mode 100644 index 0000000..b1ba523 --- /dev/null +++ b/nettyPro/src/main/java/cn/zyjblogs/netty/dubborpc/provider/HelloServiceImpl.java @@ -0,0 +1,17 @@ +package cn.zyjblogs.netty.dubborpc.provider; + + +import cn.zyjblogs.netty.dubborpc.publicinterface.HelloService; +import io.netty.util.internal.StringUtil; + +public class HelloServiceImpl implements HelloService { + @Override + public String hello(String msg) { + System.out.println("收到客户端消息: "+msg); + //更加mes 返回不同的结果 + if (msg != null && msg != ""){ + return "你好客户端,我已经收到你的消息 ["+msg+"]"; + } + return "你好客户端,我已经收到你的消息"; + } +} diff --git a/nettyPro/src/main/java/cn/zyjblogs/netty/dubborpc/provider/ServerBootStrap.java b/nettyPro/src/main/java/cn/zyjblogs/netty/dubborpc/provider/ServerBootStrap.java new file mode 100644 index 0000000..2e6fff8 --- /dev/null +++ b/nettyPro/src/main/java/cn/zyjblogs/netty/dubborpc/provider/ServerBootStrap.java @@ -0,0 +1,10 @@ +package cn.zyjblogs.netty.dubborpc.provider; + +import cn.zyjblogs.netty.dubborpc.netty.NettyServer; + +//启动一个服务提供者 Netty Server +public class ServerBootStrap { + public static void main(String[] args) { + NettyServer.startServer("localhost",7000); + } +} diff --git a/nettyPro/src/main/java/cn/zyjblogs/netty/dubborpc/publicinterface/HelloService.java b/nettyPro/src/main/java/cn/zyjblogs/netty/dubborpc/publicinterface/HelloService.java new file mode 100644 index 0000000..b0c73ee --- /dev/null +++ b/nettyPro/src/main/java/cn/zyjblogs/netty/dubborpc/publicinterface/HelloService.java @@ -0,0 +1,10 @@ +package cn.zyjblogs.netty.dubborpc.publicinterface; + +/** + * 公共接口 + * @author zhuyijun + */ +@FunctionalInterface +public interface HelloService { + String hello(String msg); +} diff --git a/nettyPro/src/main/java/cn/zyjblogs/netty/source/echo/EchoClient.java b/nettyPro/src/main/java/cn/zyjblogs/netty/source/echo/EchoClient.java new file mode 100644 index 0000000..cce208a --- /dev/null +++ b/nettyPro/src/main/java/cn/zyjblogs/netty/source/echo/EchoClient.java @@ -0,0 +1,89 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package cn.zyjblogs.netty.source.echo; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.DefaultEventLoopGroup; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.util.InsecureTrustManagerFactory; +import io.netty.util.concurrent.DefaultEventExecutorGroup; +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.EventExecutorGroup; + +/** + * Sends one message when a connection is open and echoes back any received + * data to the server. Simply put, the echo client initiates the ping-pong + * traffic between the echo client and server by sending the first message to + * the server. + */ +public final class EchoClient { + + static final boolean SSL = System.getProperty("ssl") != null; + static final String HOST = System.getProperty("host", "127.0.0.1"); + static final int PORT = Integer.parseInt(System.getProperty("port", "8007")); + static final int SIZE = Integer.parseInt(System.getProperty("size", "256")); + + public static void main(String[] args) throws Exception { + EventExecutorGroup eventExecutors = new DefaultEventExecutorGroup(4); + // Configure SSL.git + final SslContext sslCtx; + if (SSL) { + sslCtx = SslContextBuilder.forClient() + .trustManager(InsecureTrustManagerFactory.INSTANCE).build(); + } else { + sslCtx = null; + } + + // Configure the client. + EventLoopGroup group = new NioEventLoopGroup(); + try { + Bootstrap b = new Bootstrap(); + b.group(group) + .channel(NioSocketChannel.class) + .option(ChannelOption.TCP_NODELAY, true) + .handler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline p = ch.pipeline(); + if (sslCtx != null) { + p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT)); + } + //p.addLast(new LoggingHandler(LogLevel.INFO)); + //如果在handler前面添加指定EventExecutorGroup,name该handler优先加入到该线程池 + p.addLast(eventExecutors,new EchoClientHandler()); + } + }); + + // Start the client. + ChannelFuture f = b.connect(HOST, PORT).sync(); + + // Wait until the connection is closed. + f.channel().closeFuture().sync(); + } finally { + // Shut down the event loop to terminate all threads. + group.shutdownGracefully(); + } + } +} diff --git a/nettyPro/src/main/java/cn/zyjblogs/netty/source/echo/EchoClientHandler.java b/nettyPro/src/main/java/cn/zyjblogs/netty/source/echo/EchoClientHandler.java new file mode 100644 index 0000000..36f7a8f --- /dev/null +++ b/nettyPro/src/main/java/cn/zyjblogs/netty/source/echo/EchoClientHandler.java @@ -0,0 +1,65 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package cn.zyjblogs.netty.source.echo; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.util.CharsetUtil; + +/** + * Handler implementation for the echo client. It initiates the ping-pong + * traffic between the echo client and server by sending the first message to + * the server. + */ +public class EchoClientHandler extends ChannelInboundHandlerAdapter { + + private final ByteBuf firstMessage; + + /** + * Creates a client-side handler. + */ + public EchoClientHandler() { + firstMessage = Unpooled.buffer(EchoClient.SIZE); + for (int i = 0; i < firstMessage.capacity(); i ++) { + firstMessage.writeByte((byte) i); + } + } + + @Override + public void channelActive(ChannelHandlerContext ctx) { + ctx.writeAndFlush(firstMessage); + ctx.writeAndFlush(Unpooled.copiedBuffer("你好", CharsetUtil.UTF_8)); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + ctx.write(msg); + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) { + ctx.flush(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + // Close the connection when an exception is raised. + cause.printStackTrace(); + ctx.close(); + } +} diff --git a/nettyPro/src/main/java/cn/zyjblogs/netty/source/echo/EchoServer.java b/nettyPro/src/main/java/cn/zyjblogs/netty/source/echo/EchoServer.java new file mode 100644 index 0000000..ef34009 --- /dev/null +++ b/nettyPro/src/main/java/cn/zyjblogs/netty/source/echo/EchoServer.java @@ -0,0 +1,84 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package cn.zyjblogs.netty.source.echo; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.util.SelfSignedCertificate; + +/** + * Echoes back any received data from a client. + */ +public final class EchoServer { + + static final boolean SSL = System.getProperty("ssl") != null; + static final int PORT = Integer.parseInt(System.getProperty("port", "8007")); + + public static void main(String[] args) throws Exception { + // Configure SSL. + final SslContext sslCtx; + if (SSL) { + SelfSignedCertificate ssc = new SelfSignedCertificate(); + sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build(); + } else { + sslCtx = null; + } + + // Configure the server. + EventLoopGroup bossGroup = new NioEventLoopGroup(1); + EventLoopGroup workerGroup = new NioEventLoopGroup(); + final EchoServerHandler serverHandler = new EchoServerHandler(); + try { + ServerBootstrap b = new ServerBootstrap(); + b.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .option(ChannelOption.SO_BACKLOG, 100) + .handler(new LoggingHandler(LogLevel.INFO)) + .childHandler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline p = ch.pipeline(); + if (sslCtx != null) { + p.addLast(sslCtx.newHandler(ch.alloc())); + } + //p.addLast(new LoggingHandler(LogLevel.INFO)); + p.addLast(serverHandler); + } + }); + + // Start the server. + ChannelFuture f = b.bind(PORT).sync(); + + // Wait until the server socket is closed. + f.channel().closeFuture().sync(); + } finally { + // Shut down all event loops to terminate all threads. + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + } + } +} diff --git a/nettyPro/src/main/java/cn/zyjblogs/netty/source/echo/EchoServerHandler.java b/nettyPro/src/main/java/cn/zyjblogs/netty/source/echo/EchoServerHandler.java new file mode 100644 index 0000000..a4a30f3 --- /dev/null +++ b/nettyPro/src/main/java/cn/zyjblogs/netty/source/echo/EchoServerHandler.java @@ -0,0 +1,44 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package cn.zyjblogs.netty.source.echo; + +import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; + +/** + * Handler implementation for the echo server. + */ +@Sharable +public class EchoServerHandler extends ChannelInboundHandlerAdapter { + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + ctx.write(msg); + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) { + ctx.flush(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + // Close the connection when an exception is raised. + cause.printStackTrace(); + ctx.close(); + } +} diff --git a/pom.xml b/pom.xml index 3eb0d05..77a83fc 100644 --- a/pom.xml +++ b/pom.xml @@ -29,10 +29,10 @@ protobuf-java 3.17.3 - - log4j - log4j - 1.2.17 - + + + + + \ No newline at end of file