前言
在之前的test中,我们主要围绕客户端和服务端之间的自由通信。
不过作为服务端,其功能应该包含两种,第一种是广播,即服务端向所有客户端发送消息。
第二种是支持客户端之间的交互。
本文实现服务端对所有客户端的广播。
总体思路
服务端启动后,等待客户端的接入。
将接入的客户单维护到一个Map,这样做是为了知道谁连接了服务器,以及对方的socketClient。
服务端读取客户端的消息暂时不需要调整。
服务端从控制台读取消息之后,遍历所有的客户端,然后进行消息发送。
目标
多客户端可以随时向服务端发送消息。
服务端可以随时以广播的方式向客户端发送消息。
代码
服务器端
import com.google.common.collect.Maps;
import org.springframework.util.CollectionUtils;
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.nio.charset.Charset;
import java.util.Map;
import java.util.Objects;
import java.util.Scanner;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @Author: jie.wu
* @Version: v1.0
* @Updatedby:
*/
public class TransferServer {
//线程池
private static ExecutorService executorService = Executors.newFixedThreadPool(10);
private final static String charset = "UTF8";
private static volatile Map<String, Socket> clientIpSocketMap = Maps.newHashMap();
public static void main(String[] args) {
//服务端
ServerSocket serverSocket;
try {
serverSocket = new ServerSocket(10005);
System.out.println("服务端已启动,您本地的字符格式为" + Charset.defaultCharset().name() + ",等待客户端访问");
if (CollectionUtils.isEmpty(clientIpSocketMap.values())) {
//开启一个广播写线程,用于服务端向所有客户端发送消息
executorService.submit(new TransferServer.BroadCastWriteThread());
}
//等待客户端的访问
for (; ; ) {
//客户端请求
Socket clientSocket;
try {
clientSocket = serverSocket.accept();
executorService.submit(new TransferServer.Server(clientSocket));
} catch (IOException e) {
System.out.println("报错了2" + e);
}
}
} catch (IOException e) {
System.out.println("报错了1" + e);
}
}
/**
* 获取客户端信息
*
* @param clientSocket
* @return
* @throws SocketException
*/
public static String getClientInfo(Socket clientSocket) {
if (Objects.isNull(clientSocket)) {
return "";
}
return clientSocket.getInetAddress().getHostAddress() + ":" + clientSocket.getPort();
}
//服务器处理客户端请求的线程
public static class Server implements Runnable {
private Socket clientSocket;
public Server(Socket clientSocket) {
this.clientSocket = clientSocket;
String clientInfo = getClientInfo(clientSocket);
clientIpSocketMap.put(clientInfo, clientSocket);
}
@Override
public void run() {
//开启一个读线程
executorService.submit(new TransferServer.ReadThread(clientSocket));
}
}
//接收信息的线程
public static class ReadThread implements Runnable {
private Socket clientSocket;
private InputStream inputStream;
private BufferedReader bufferedReader;
public ReadThread(Socket clientSocket) {
this.clientSocket = clientSocket;
}
@Override
public void run() {
//如果客户端没有关闭写,则持续读取
while (true) {
try {
inputStream = clientSocket.getInputStream();
bufferedReader = new BufferedReader(new InputStreamReader(inputStream, charset));
String str;
for (; (str = bufferedReader.readLine()) != null; ) {
System.out.println("客户端【"+getClientInfo(clientSocket) + "】 : " + str);
}
} catch (Exception e) {
System.out.println("报错了3,可能是客户端由于某种原因断开了连接" + e + ";" + Thread.currentThread().getName());
break;//结束本次对客户端的循环
} finally {
try {
if (inputStream != null) {
inputStream.close();
}
if (bufferedReader != null) {
bufferedReader.close();
}
if (clientSocket != null) {
clientSocket.close();
}
} catch (Exception e) {
System.out.println("报错了" + 5);
}
}
}
}
}
//发送信息的线程
public static class BroadCastWriteThread implements Runnable {
private OutputStream outputStream;
private PrintWriter printWriter;
public BroadCastWriteThread() {
}
@Override
public void run() {
Scanner s = new Scanner(System.in);
try {
String str = "";
while (s.hasNext() && ((str = s.nextLine()) != null)) {
final String finalStr = str;
//System.out.println("服务端:" + str);
clientIpSocketMap.values().stream()
.filter(Objects::nonNull)
.forEach(clientSocket -> {
try {
printWriter = doWriter(clientSocket, outputStream, printWriter);
printWriter.println(finalStr);
} catch (IOException e) {
System.out.println("报错6 发送出现异常 " + e.getStackTrace());
}
});
}
} catch (Exception e) {
System.out.println("报错了4,可能是客户端由于某种原因断开了连接" + e);
} finally {
try {
if (printWriter != null) {
printWriter.close();
}
if (outputStream != null) {
outputStream.close();
}
} catch (Exception e) {
System.out.println("报错了" + 5);
}
}
}
}
private static PrintWriter doWriter(Socket clientSocket,
OutputStream outputStream,
PrintWriter printWriter) throws IOException {
outputStream = clientSocket.getOutputStream();
printWriter = new PrintWriter(new OutputStreamWriter(outputStream, charset), true);
return printWriter;
}
}
客户端
为了模拟多客户端,本地复制俩一模一样的文件,当然也可以用两台电脑
客户端1
import java.io.*;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.charset.Charset;
import java.util.Scanner;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* socket 客户端
* 可以和服务器端进行自由交互
* @Author: jie.wu
*/
public class FreedomClient {
//线程池
private static ExecutorService executorService=Executors.newFixedThreadPool(10);
private final static String charset="UTF8";
public static void main(String [] args){
try{
Socket socket=new Socket();
socket.connect(new InetSocketAddress("192.168.1.6", 10005));
System.out.println("客户端已启动,您本地的字符格式为"+Charset.defaultCharset().name()+",请输入信息:");
executorService.submit(new ReadThread(socket));
executorService.submit(new WriteThread(socket));
executorService.shutdown();//在子线程运行结束后关闭线程池,否则主进程不会结束,感觉测试不完整
//由于客户端只有两个进程,所有可以不使用线程池
// Thread t1=new Thread(new ReadThread(socket));
// Thread t2=new Thread(new WriteThread(socket));
// t1.start();
// t2.start();
// t1.join();
// t2.join();
// System.out.println("结束了--");
}catch(Exception e){
System.out.println("报错了1"+e);
}finally {
}
}
//发送消息的线程
public static class WriteThread implements Runnable{
private Socket clientSocket;
private OutputStream outputStream;
private PrintWriter printWriter;
public WriteThread(Socket clientSocket){
this.clientSocket=clientSocket;
}
@Override
public void run() {
Scanner s=new Scanner(System.in);
while(true){
try{
outputStream =clientSocket.getOutputStream();
printWriter=new PrintWriter(new OutputStreamWriter(outputStream,charset),true);
printWriter.println(s.nextLine());
}catch(Exception e){
System.out.println("报错了4,可能是服务端由于某种原因断开了连接,不再向服务器发送信息"+e);
try{
if(printWriter!=null){ printWriter.close();}
if(outputStream!=null){ outputStream.close();}
if(clientSocket!=null){ clientSocket.close();}
break;//结束本次对客户端的循环
}catch(Exception e2){
System.out.println("报错了5"+e2);
}
}
}
}
}
//接收消息的线程
public static class ReadThread implements Runnable{
private Socket clientSocket;
private InputStream inputStream;
private BufferedReader bufferedReader;
public ReadThread(Socket clientSocket){
this.clientSocket=clientSocket;
}
@Override
public void run() {
//如果客户端没有关闭写,则持续读取
while(true){
try{
inputStream=clientSocket.getInputStream();
bufferedReader=new BufferedReader(new InputStreamReader(inputStream,charset));
String str;
for(;(str=bufferedReader.readLine())!=null;){
System.out.println("服务端:"+str);
}
}catch(Exception e) {
System.out.println("报错了3,可能是服务端由于某种原因断开了连接,停止接收服务器信息,请输入任意字符结束访问" + e + ";" + Thread.currentThread().getName());
break;//结束本次对客户端的循环
}finally{
System.out.println("ReadThread finally");
try{
if(inputStream!=null){ inputStream.close();}
if(bufferedReader!=null){ bufferedReader.close();}
if(clientSocket!=null){ clientSocket.close();}
}catch(Exception e){
System.out.println("报错了"+5);
}
}
}
}
}
}
客户端2
import java.io.*;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.charset.Charset;
import java.util.Scanner;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* socket 客户端
* 可以和服务器端进行自由交互
* @Author: jie.wu
*/
public class FreedomClient2 {
//线程池
private static ExecutorService executorService=Executors.newFixedThreadPool(10);
private final static String charset="UTF8";
public static void main(String [] args){
try{
Socket socket=new Socket();
socket.connect(new InetSocketAddress("192.168.1.6", 10005));
System.out.println("客户端已启动,您本地的字符格式为"+Charset.defaultCharset().name()+",请输入信息:");
executorService.submit(new ReadThread(socket));
executorService.submit(new WriteThread(socket));
executorService.shutdown();//在子线程运行结束后关闭线程池,否则主进程不会结束,感觉测试不完整
//由于客户端只有两个进程,所有可以不使用线程池
// Thread t1=new Thread(new ReadThread(socket));
// Thread t2=new Thread(new WriteThread(socket));
// t1.start();
// t2.start();
// t1.join();
// t2.join();
// System.out.println("结束了--");
}catch(Exception e){
System.out.println("报错了1"+e);
}finally {
}
}
//发送消息的线程
public static class WriteThread implements Runnable{
private Socket clientSocket;
private OutputStream outputStream;
private PrintWriter printWriter;
public WriteThread(Socket clientSocket){
this.clientSocket=clientSocket;
}
@Override
public void run() {
Scanner s=new Scanner(System.in);
while(true){
try{
outputStream =clientSocket.getOutputStream();
printWriter=new PrintWriter(new OutputStreamWriter(outputStream,charset),true);
printWriter.println(s.nextLine());
}catch(Exception e){
System.out.println("报错了4,可能是服务端由于某种原因断开了连接,不再向服务器发送信息"+e);
try{
if(printWriter!=null){ printWriter.close();}
if(outputStream!=null){ outputStream.close();}
if(clientSocket!=null){ clientSocket.close();}
break;//结束本次对客户端的循环
}catch(Exception e2){
System.out.println("报错了5"+e2);
}
}
}
}
}
//接收消息的线程
public static class ReadThread implements Runnable{
private Socket clientSocket;
private InputStream inputStream;
private BufferedReader bufferedReader;
public ReadThread(Socket clientSocket){
this.clientSocket=clientSocket;
}
@Override
public void run() {
//如果客户端没有关闭写,则持续读取
while(true){
try{
inputStream=clientSocket.getInputStream();
bufferedReader=new BufferedReader(new InputStreamReader(inputStream,charset));
String str;
for(;(str=bufferedReader.readLine())!=null;){
System.out.println("服务端:"+str);
}
}catch(Exception e) {
System.out.println("报错了3,可能是服务端由于某种原因断开了连接,停止接收服务器信息,请输入任意字符结束访问" + e + ";" + Thread.currentThread().getName());
break;//结束本次对客户端的循环
}finally{
System.out.println("ReadThread finally");
try{
if(inputStream!=null){ inputStream.close();}
if(bufferedReader!=null){ bufferedReader.close();}
if(clientSocket!=null){ clientSocket.close();}
}catch(Exception e){
System.out.println("报错了"+5);
}
}
}
}
}
}
效果
服务端
服务端已启动,您本地的字符格式为UTF-8,等待客户端访问
1
客户端【192.168.1.6:55565】 : 1
客户端【192.168.1.6:55578】 : 1
aaa
客户端1
客户端已启动,您本地的字符格式为UTF-8,请输入信息:
1
服务端:aaa
客户端2
客户端已启动,您本地的字符格式为UTF-8,请输入信息:
1
服务端:aaa