添加简单仿dubbo rpc

This commit is contained in:
朱毅骏 2021-10-18 18:01:23 +08:00
parent 29e022c4c1
commit 6312555395
14 changed files with 563 additions and 5 deletions

View File

@ -0,0 +1,6 @@
package cn.zyjblogs.netty.dubborpc.constant;
public class CommonConstant {
public static String FRE = "HelloService#hello#";
}

View File

@ -0,0 +1,21 @@
package cn.zyjblogs.netty.dubborpc.customer;
import cn.zyjblogs.netty.dubborpc.constant.CommonConstant;
import cn.zyjblogs.netty.dubborpc.netty.NettyClient;
import cn.zyjblogs.netty.dubborpc.publicinterface.HelloService;
import java.util.Scanner;
public class ClientBootStrap {
public static void main(String[] args) throws InterruptedException {
NettyClient customer = new NettyClient();
//创建代理对象
HelloService service = (HelloService) customer.getBean(HelloService.class, CommonConstant.FRE);
//通过代理对象调用服务提供者的方法服务
while (true) {
Thread.sleep(4 * 1000);
String res = service.hello("你好 ~~~~");
System.out.println(res);
}
}
}

View File

@ -0,0 +1,56 @@
package cn.zyjblogs.netty.dubborpc.netty;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.lang.reflect.Proxy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class NettyClient {
private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
private static NettyClientHandler client;
public Object getBean(final Class<?> serviceClass,final String prefix){
return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),new Class<?>[]{serviceClass},(proxy,method,args)->{
//{}部分代码客户端每次调用都会执行
if (client == null){
initClient();
}
//设置要发给服务器端的信息参数
client.setParam(prefix+args[0]);
return executor.submit(client).get();
});
}
//初始化客户端
private static void initClient() {
client = new NettyClientHandler();
//创建EventLoopGroup
NioEventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY,true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder())
.addLast(new StringEncoder())
.addLast(client);
}
});
try {
b.connect("localhost",7000).sync();
} catch (InterruptedException e) {
group.shutdownGracefully();
}
}
}

View File

@ -0,0 +1,63 @@
package cn.zyjblogs.netty.dubborpc.netty;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.concurrent.Callable;
public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable<String> {
/**
* 上下文
*/
private ChannelHandlerContext context;
/**
* 返回结果
*/
private String result;
/**
* 客户端调用方法时 传入的结果
*/
private String param;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//会在其他方法使用上下文
context = ctx;
}
/**
* 接收到服务器的数据后调用方法
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
result = msg.toString();
//唤醒等待的线程
notify();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
/**
* 被代理对象调用发送数据给服务器等待被唤醒channelRead -> 返回结果
* @return
* @throws Exception
*/
@Override
public synchronized String call() throws Exception {
context.writeAndFlush(param);
//进行wait
wait();
return result;
}
public void setParam(String param) {
this.param = param;
}
}

View File

@ -0,0 +1,67 @@
package cn.zyjblogs.netty.dubborpc.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
/**
*
*/
public class NettyServer {
//编写一个方法完成对Netty Server的初始化和启动
private static void startServer0(String hostname,int port) {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
//创建服务器端的启动对象配置参数
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(new StringDecoder())
.addLast(new StringEncoder())
//业务处理器
.addLast(new NettyServerHandler());
}
});
try {
//绑定一个端口并且同步生成了一个ChannelFuture对象
ChannelFuture cf = bootstrap.bind(hostname,port).sync();
System.out.println("服务提供方开始提供");
cf.addListener((ChannelFuture channelFuture) -> {
if (channelFuture.isSuccess()) {
System.out.println("监听端口 6668 成功");
} else {
System.err.println("监听端口 6668 失败");
}
});
//对关闭通道进行监听
cf.channel().closeFuture().sync();
} catch (Exception e) {
System.out.println("启动报错了" + e.getMessage());
}finally {
//优雅关闭
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void startServer(String hostname,int port){
startServer0(hostname,port);
}
}

View File

@ -0,0 +1,26 @@
package cn.zyjblogs.netty.dubborpc.netty;
import cn.zyjblogs.netty.dubborpc.constant.CommonConstant;
import cn.zyjblogs.netty.dubborpc.provider.HelloServiceImpl;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
//服务器这边handler比较简单
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//获取客户端发送的消息并调用服务
System.out.println("msg: "+msg);
//客户端在调用服务器的api 我们需要定义一个协议
//要求 每次发消息时都必须以某个字符串开头 HelloService#hello#
if (msg.toString().startsWith(CommonConstant.FRE)){
String hello = new HelloServiceImpl().hello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1));
ctx.writeAndFlush(hello);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}

View File

@ -0,0 +1,17 @@
package cn.zyjblogs.netty.dubborpc.provider;
import cn.zyjblogs.netty.dubborpc.publicinterface.HelloService;
import io.netty.util.internal.StringUtil;
public class HelloServiceImpl implements HelloService {
@Override
public String hello(String msg) {
System.out.println("收到客户端消息: "+msg);
//更加mes 返回不同的结果
if (msg != null && msg != ""){
return "你好客户端,我已经收到你的消息 ["+msg+"]";
}
return "你好客户端,我已经收到你的消息";
}
}

View File

@ -0,0 +1,10 @@
package cn.zyjblogs.netty.dubborpc.provider;
import cn.zyjblogs.netty.dubborpc.netty.NettyServer;
//启动一个服务提供者 Netty Server
public class ServerBootStrap {
public static void main(String[] args) {
NettyServer.startServer("localhost",7000);
}
}

View File

@ -0,0 +1,10 @@
package cn.zyjblogs.netty.dubborpc.publicinterface;
/**
* 公共接口
* @author zhuyijun
*/
@FunctionalInterface
public interface HelloService {
String hello(String msg);
}

View File

@ -0,0 +1,89 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package cn.zyjblogs.netty.source.echo;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.EventExecutorGroup;
/**
* Sends one message when a connection is open and echoes back any received
* data to the server. Simply put, the echo client initiates the ping-pong
* traffic between the echo client and server by sending the first message to
* the server.
*/
public final class EchoClient {
static final boolean SSL = System.getProperty("ssl") != null;
static final String HOST = System.getProperty("host", "127.0.0.1");
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));
public static void main(String[] args) throws Exception {
EventExecutorGroup eventExecutors = new DefaultEventExecutorGroup(4);
// Configure SSL.git
final SslContext sslCtx;
if (SSL) {
sslCtx = SslContextBuilder.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE).build();
} else {
sslCtx = null;
}
// Configure the client.
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
//如果在handler前面添加指定EventExecutorGroupname该handler优先加入到该线程池
p.addLast(eventExecutors,new EchoClientHandler());
}
});
// Start the client.
ChannelFuture f = b.connect(HOST, PORT).sync();
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down the event loop to terminate all threads.
group.shutdownGracefully();
}
}
}

View File

@ -0,0 +1,65 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package cn.zyjblogs.netty.source.echo;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/**
* Handler implementation for the echo client. It initiates the ping-pong
* traffic between the echo client and server by sending the first message to
* the server.
*/
public class EchoClientHandler extends ChannelInboundHandlerAdapter {
private final ByteBuf firstMessage;
/**
* Creates a client-side handler.
*/
public EchoClientHandler() {
firstMessage = Unpooled.buffer(EchoClient.SIZE);
for (int i = 0; i < firstMessage.capacity(); i ++) {
firstMessage.writeByte((byte) i);
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.writeAndFlush(firstMessage);
ctx.writeAndFlush(Unpooled.copiedBuffer("你好", CharsetUtil.UTF_8));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}

View File

@ -0,0 +1,84 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package cn.zyjblogs.netty.source.echo;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.SelfSignedCertificate;
/**
* Echoes back any received data from a client.
*/
public final class EchoServer {
static final boolean SSL = System.getProperty("ssl") != null;
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception {
// Configure SSL.
final SslContext sslCtx;
if (SSL) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
} else {
sslCtx = null;
}
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
// Start the server.
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

View File

@ -0,0 +1,44 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package cn.zyjblogs.netty.source.echo;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
* Handler implementation for the echo server.
*/
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}

10
pom.xml
View File

@ -29,10 +29,10 @@
<artifactId>protobuf-java</artifactId> <artifactId>protobuf-java</artifactId>
<version>3.17.3</version> <version>3.17.3</version>
</dependency> </dependency>
<dependency> <!-- <dependency>-->
<groupId>log4j</groupId> <!-- <groupId>log4j</groupId>-->
<artifactId>log4j</artifactId> <!-- <artifactId>log4j</artifactId>-->
<version>1.2.17</version> <!-- <version>1.2.17</version>-->
</dependency> <!-- </dependency>-->
</dependencies> </dependencies>
</project> </project>