添加NIO聊天系统
This commit is contained in:
parent
2c1e62b911
commit
a9299d605b
@ -0,0 +1,97 @@
|
||||
package cn.zyjblogs.netty.nio.groupchat;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.SelectableChannel;
|
||||
import java.nio.channels.SelectionKey;
|
||||
import java.nio.channels.Selector;
|
||||
import java.nio.channels.SocketChannel;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Iterator;
|
||||
import java.util.Scanner;
|
||||
import java.util.Set;
|
||||
|
||||
public class GroupChatClient {
|
||||
//定义相关属性
|
||||
private final String HOST = "127.0.0.1";
|
||||
private static final int PORT = 6667;
|
||||
//定义属性
|
||||
private Selector selector;
|
||||
|
||||
private SocketChannel socketChannel;
|
||||
private String username;
|
||||
//构造器
|
||||
public GroupChatClient(){
|
||||
try {
|
||||
selector = Selector.open();
|
||||
//连接服务器
|
||||
socketChannel = SocketChannel.open(new InetSocketAddress(HOST, PORT));
|
||||
socketChannel.configureBlocking(false);
|
||||
socketChannel.register(selector, SelectionKey.OP_READ);
|
||||
//获得username
|
||||
username = socketChannel.getLocalAddress().toString().substring(1);
|
||||
System.out.println(username + " is ok .....");
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
//向服务器发送消息
|
||||
public void sendInfo(String info){
|
||||
info = username +" 说: "+info;
|
||||
try {
|
||||
socketChannel.write(ByteBuffer.wrap(info.getBytes(StandardCharsets.UTF_8)));
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
//读取服务端回复的消息
|
||||
public void readInfo(){
|
||||
try {
|
||||
int readChannels = selector.select();
|
||||
if (readChannels > 0){
|
||||
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
|
||||
while (iterator.hasNext()){
|
||||
SelectionKey selectionKey = iterator.next();
|
||||
if (selectionKey.isReadable()){
|
||||
SocketChannel channel = (SocketChannel)selectionKey.channel();
|
||||
//得到一个Buffer
|
||||
ByteBuffer buffer = ByteBuffer.allocate(1024);
|
||||
//读取数据到缓冲区
|
||||
channel.read(buffer);
|
||||
String msg = new String(buffer.array());
|
||||
System.out.println(msg.trim());
|
||||
}
|
||||
}
|
||||
//删除当前的selectionKey 防止重复操作
|
||||
iterator.remove();
|
||||
}else {
|
||||
//System.out.println("没有可用的通道....");
|
||||
}
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
//启动客户端
|
||||
GroupChatClient chatClient = new GroupChatClient();
|
||||
//启动一个线程 每3秒 读取从服务器发送的数据
|
||||
new Thread(()->{
|
||||
while (true){
|
||||
chatClient.readInfo();
|
||||
try {
|
||||
Thread.sleep(3000);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}).start();
|
||||
//发送数据给服务器端
|
||||
Scanner scanner = new Scanner(System.in);
|
||||
while (scanner.hasNext()){
|
||||
String s = scanner.nextLine();
|
||||
chatClient.sendInfo(s);
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,128 @@
|
||||
package cn.zyjblogs.netty.nio.groupchat;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.*;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Iterator;
|
||||
import java.util.Set;
|
||||
|
||||
public class GroupChatServer {
|
||||
//定义属性
|
||||
private Selector selector;
|
||||
|
||||
private ServerSocketChannel listenChannel;
|
||||
private static final int PORT = 6667;
|
||||
|
||||
//构造器
|
||||
public GroupChatServer() {
|
||||
try {
|
||||
//得到选择器
|
||||
selector = Selector.open();
|
||||
//ServerSocketChannel
|
||||
listenChannel = ServerSocketChannel.open();
|
||||
//绑定端口
|
||||
listenChannel.socket().bind(new InetSocketAddress(PORT));
|
||||
//设置非阻塞模式
|
||||
listenChannel.configureBlocking(false);
|
||||
//将该listenChannel 注册到selector
|
||||
listenChannel.register(selector, SelectionKey.OP_ACCEPT);
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
public void listen() {
|
||||
try {
|
||||
while (true) {
|
||||
int count = selector.select(2000);
|
||||
//有事件处理
|
||||
if (count > 0) {
|
||||
//遍历得到selectionKey 集合
|
||||
Set<SelectionKey> selectionKeys = selector.selectedKeys();
|
||||
Iterator<SelectionKey> iterator = selectionKeys.iterator();
|
||||
while (iterator.hasNext()) {
|
||||
//取出selectionKey
|
||||
SelectionKey key = iterator.next();
|
||||
//监听到accept
|
||||
if (key.isAcceptable()) {
|
||||
SocketChannel sc = listenChannel.accept();
|
||||
//设置非阻塞
|
||||
sc.configureBlocking(false);
|
||||
//将sc注册到selector上
|
||||
sc.register(selector, SelectionKey.OP_READ);
|
||||
//提示上线
|
||||
System.out.println(sc.getRemoteAddress() + "上线 ");
|
||||
}
|
||||
//通道发送read事件,即通道是可读状态
|
||||
if (key.isReadable()) {
|
||||
readData(key);
|
||||
}
|
||||
//手动从集合中移动当前的selectionKey,防止重复操作
|
||||
iterator.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
} finally {
|
||||
//异常处理
|
||||
}
|
||||
}
|
||||
|
||||
//读取客户端消息
|
||||
private void readData(SelectionKey key) {
|
||||
//定义一个SocketChannel
|
||||
SocketChannel channel = null;
|
||||
try {
|
||||
channel = (SocketChannel) key.channel();
|
||||
//创建缓冲buffer
|
||||
ByteBuffer buffer = ByteBuffer.allocate(1024);
|
||||
int count = channel.read(buffer);
|
||||
//根据count做处理 >0 代表读到消息了
|
||||
if (count > 0) {
|
||||
//吧缓冲区的数据转成字符串
|
||||
String msg = new String(buffer.array());
|
||||
//输出消息
|
||||
System.out.println("form 客户端: " + msg.trim());
|
||||
//向其他客户端转发消息
|
||||
sendInfoToOtherClients(msg, channel);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
try {
|
||||
System.out.println(channel.getRemoteAddress() +" 离线...");
|
||||
// 取消注册
|
||||
key.channel();
|
||||
//关闭通道
|
||||
channel.close();
|
||||
} catch (IOException ioException) {
|
||||
ioException.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//转发消息给其他客户发送(通道)
|
||||
private void sendInfoToOtherClients(String msg, SocketChannel self) throws IOException {
|
||||
System.out.println("服务器转发消息中....");
|
||||
//遍历 所有注册到selector 上的SocketChannel,并排除自己
|
||||
for (SelectionKey key : selector.keys()) {
|
||||
Channel targetChannel = key.channel();
|
||||
//排除自己
|
||||
if (targetChannel instanceof SocketChannel && !targetChannel.equals(self)) {
|
||||
//类型转换
|
||||
SocketChannel dest = (SocketChannel) targetChannel;
|
||||
//将msg存储到buffer
|
||||
ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes(StandardCharsets.UTF_8));
|
||||
//将buffer数据写入通道中
|
||||
dest.write(buffer);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
//创建一个服务器对象
|
||||
GroupChatServer groupChatServer = new GroupChatServer();
|
||||
groupChatServer.listen();
|
||||
}
|
||||
}
|
@ -0,0 +1,33 @@
|
||||
package cn.zyjblogs.netty.nio.selector;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.SocketChannel;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
public class NIOClient {
|
||||
public static void main(String[] args) {
|
||||
//得到一个网路通道
|
||||
try (SocketChannel socketChannel = SocketChannel.open()){
|
||||
//设置非阻塞
|
||||
socketChannel.configureBlocking(false);
|
||||
//提供服务端的ip 和 端口
|
||||
InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 6666);
|
||||
//链接服务器
|
||||
if (!socketChannel.connect(inetSocketAddress)){
|
||||
while (!socketChannel.finishConnect()){
|
||||
System.out.println("因为连接需要时间,客户端不会阻塞,可以做其他工作");
|
||||
}
|
||||
}
|
||||
//如果连接成功 就发送数据
|
||||
String str = "逝水无痕客户端数据: 你好呀!";
|
||||
ByteBuffer buffer = ByteBuffer.wrap(str.getBytes());
|
||||
socketChannel.write(buffer);
|
||||
System.in.read();
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
}
|
||||
}
|
@ -0,0 +1,78 @@
|
||||
package cn.zyjblogs.netty.nio.selector;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.*;
|
||||
import java.util.Iterator;
|
||||
import java.util.Set;
|
||||
|
||||
public class NIOServer {
|
||||
public static void main(String[] args) {
|
||||
//创建ServerSocketChannel -> ServerSocket
|
||||
try (ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
|
||||
//得到一个Selector
|
||||
Selector selector = Selector.open()) {
|
||||
//绑定一个端口6666,在服务端监听
|
||||
final int port = 6666;
|
||||
System.out.println("启动监听,监听端口: "+port);
|
||||
serverSocketChannel.socket().bind(new InetSocketAddress(port));
|
||||
//设置为非阻塞
|
||||
serverSocketChannel.configureBlocking(false);
|
||||
//把serverSocketChannel 注册到selector 关心 事件为OP_ACCEPT
|
||||
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
|
||||
//1
|
||||
System.out.println("注册后的selectionKey 数量 = "+selector.keys().size());
|
||||
//循环等待客户端链接
|
||||
System.out.println("等待客户端链接");
|
||||
while (true){
|
||||
//等待1秒,如果没有事件发生,就继续
|
||||
if (selector.select(1000) == 0){
|
||||
System.out.println("服务器等待了1秒,无连接");
|
||||
continue;
|
||||
}
|
||||
//如果返回的>0,获取到相关的selectionKey集合
|
||||
//1.如果返回的大于0, 表示已经获取到关注的事件的集合
|
||||
//2. selector.selectedKeys()返回关注事件的集合
|
||||
// 通过selectionKeys反向获取通道
|
||||
Set<SelectionKey> selectionKeys = selector.selectedKeys();
|
||||
//selectionKeys.size() 和 selector.keys().size() 不一样
|
||||
//selectionKeys 代表有事件发生的数量
|
||||
System.out.println("selectionKeys 数量 = "+selectionKeys.size());
|
||||
//遍历Set<SelectionKey>,使用迭代器便利
|
||||
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
|
||||
while (keyIterator.hasNext()){
|
||||
//获取到selectionKey
|
||||
SelectionKey key = keyIterator.next();
|
||||
//根据key 对应的通道发生的事件做相应处理
|
||||
//如果是OP_ACCEPT,有客户端链接
|
||||
if (key.isAcceptable()){
|
||||
//给该客户端生成一个SocketChannel
|
||||
SocketChannel socketChannel = serverSocketChannel.accept();
|
||||
//设置为非阻塞
|
||||
socketChannel.configureBlocking(false);
|
||||
System.out.println("客户端连接成功 生成了一个SocketChannel"+socketChannel.hashCode());
|
||||
//将当前socketChannel 注册到selector,关注事件为OP_READ 同时给socketChannel关联一个buffer
|
||||
socketChannel.register(selector,SelectionKey.OP_READ, ByteBuffer.allocate(1024));
|
||||
System.out.println("客户端连接后, 注册后的selectionKey 数量 = "+selector.keys().size());
|
||||
}
|
||||
//发生OP_READ事件
|
||||
if (key.isReadable()){
|
||||
//通过key反向获取到对应的channel
|
||||
SocketChannel channel = (SocketChannel)key.channel();
|
||||
//获取到该channel关联buffer
|
||||
ByteBuffer buffer = (ByteBuffer)key.attachment();
|
||||
//将buffer读取到channel中
|
||||
channel.read(buffer);
|
||||
System.out.println("form 客户端 "+new String(buffer.array()).trim());
|
||||
}
|
||||
//手动从集合中移动当前的selectionKey,防止重复操作
|
||||
keyIterator.remove();
|
||||
}
|
||||
|
||||
}
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user