(七)socket·socket 实现 服务端广播 -不限制次序,不限制交互次数

本文介绍了一种基于Socket的服务端向多个客户端广播消息的实现方案。通过维护客户端连接列表,并利用多线程技术处理客户端的读写操作,实现了服务端与客户端间的高效通信。

摘要生成于 C知道 ,由 DeepSeek-R1 满血版支持, 前往体验 >

前言

在之前的test中,我们主要围绕客户端和服务端之间的自由通信。
不过作为服务端,其功能应该包含两种,第一种是广播,即服务端向所有客户端发送消息。
第二种是支持客户端之间的交互。
本文实现服务端对所有客户端的广播。

总体思路

服务端启动后,等待客户端的接入。
将接入的客户单维护到一个Map,这样做是为了知道谁连接了服务器,以及对方的socketClient。

服务端读取客户端的消息暂时不需要调整。
服务端从控制台读取消息之后,遍历所有的客户端,然后进行消息发送。

目标

多客户端可以随时向服务端发送消息。
服务端可以随时以广播的方式向客户端发送消息。

代码

服务器端

import com.google.common.collect.Maps;
import org.springframework.util.CollectionUtils;

import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.nio.charset.Charset;
import java.util.Map;
import java.util.Objects;
import java.util.Scanner;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @Author: jie.wu
 * @Version: v1.0
 * @Updatedby:
 */
public class TransferServer {
    //线程池
    private static ExecutorService executorService = Executors.newFixedThreadPool(10);

    private final static String charset = "UTF8";
    private static volatile Map<String, Socket> clientIpSocketMap = Maps.newHashMap();

    public static void main(String[] args) {
        //服务端
        ServerSocket serverSocket;
        try {
            serverSocket = new ServerSocket(10005);
            System.out.println("服务端已启动,您本地的字符格式为" + Charset.defaultCharset().name() + ",等待客户端访问");
            if (CollectionUtils.isEmpty(clientIpSocketMap.values())) {
                //开启一个广播写线程,用于服务端向所有客户端发送消息
                executorService.submit(new TransferServer.BroadCastWriteThread());
            }
            //等待客户端的访问
            for (; ; ) {
                //客户端请求
                Socket clientSocket;
                try {
                    clientSocket = serverSocket.accept();
                    executorService.submit(new TransferServer.Server(clientSocket));
                } catch (IOException e) {
                    System.out.println("报错了2" + e);
                }
            }
        } catch (IOException e) {
            System.out.println("报错了1" + e);
        }
    }

    /**
     * 获取客户端信息
     *
     * @param clientSocket
     * @return
     * @throws SocketException
     */
    public static String getClientInfo(Socket clientSocket) {
        if (Objects.isNull(clientSocket)) {
            return "";
        }
        return clientSocket.getInetAddress().getHostAddress() + ":" + clientSocket.getPort();
    }

    //服务器处理客户端请求的线程
    public static class Server implements Runnable {
        private Socket clientSocket;

        public Server(Socket clientSocket) {
            this.clientSocket = clientSocket;
            String clientInfo = getClientInfo(clientSocket);
            clientIpSocketMap.put(clientInfo, clientSocket);
        }

        @Override
        public void run() {

            //开启一个读线程
            executorService.submit(new TransferServer.ReadThread(clientSocket));
        }
    }

    //接收信息的线程
    public static class ReadThread implements Runnable {
        private Socket clientSocket;

        private InputStream inputStream;

        private BufferedReader bufferedReader;

        public ReadThread(Socket clientSocket) {
            this.clientSocket = clientSocket;
        }

        @Override
        public void run() {
            //如果客户端没有关闭写,则持续读取
            while (true) {
                try {
                    inputStream = clientSocket.getInputStream();
                    bufferedReader = new BufferedReader(new InputStreamReader(inputStream, charset));

                    String str;
                    for (; (str = bufferedReader.readLine()) != null; ) {
                        System.out.println("客户端【"+getClientInfo(clientSocket) + "】 : " + str);
                    }

                } catch (Exception e) {
                    System.out.println("报错了3,可能是客户端由于某种原因断开了连接" + e + ";" + Thread.currentThread().getName());
                    break;//结束本次对客户端的循环
                } finally {
                    try {
                        if (inputStream != null) {
                            inputStream.close();
                        }
                        if (bufferedReader != null) {
                            bufferedReader.close();
                        }
                        if (clientSocket != null) {
                            clientSocket.close();
                        }
                    } catch (Exception e) {
                        System.out.println("报错了" + 5);
                    }
                }
            }
        }
    }

    //发送信息的线程
    public static class BroadCastWriteThread implements Runnable {
        private OutputStream outputStream;
        private PrintWriter printWriter;

        public BroadCastWriteThread() {
        }

        @Override
        public void run() {
            Scanner s = new Scanner(System.in);
            try {
                String str = "";
                while (s.hasNext() && ((str = s.nextLine()) != null)) {
                    final String finalStr = str;
                    //System.out.println("服务端:" + str);
                    clientIpSocketMap.values().stream()
                            .filter(Objects::nonNull)
                            .forEach(clientSocket -> {
                                try {
                                    printWriter = doWriter(clientSocket, outputStream, printWriter);
                                    printWriter.println(finalStr);
                                } catch (IOException e) {
                                    System.out.println("报错6 发送出现异常 " + e.getStackTrace());

                                }
                            });

                }

            } catch (Exception e) {
                System.out.println("报错了4,可能是客户端由于某种原因断开了连接" + e);
            } finally {
                try {
                    if (printWriter != null) {
                        printWriter.close();
                    }
                    if (outputStream != null) {
                        outputStream.close();
                    }
                } catch (Exception e) {
                    System.out.println("报错了" + 5);
                }
            }


        }
    }

    private static PrintWriter doWriter(Socket clientSocket,
                                        OutputStream outputStream,
                                        PrintWriter printWriter) throws IOException {
        outputStream = clientSocket.getOutputStream();
        printWriter = new PrintWriter(new OutputStreamWriter(outputStream, charset), true);
        return printWriter;
    }
}

客户端

为了模拟多客户端,本地复制俩一模一样的文件,当然也可以用两台电脑

客户端1
import java.io.*;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.charset.Charset;
import java.util.Scanner;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * socket 客户端
 * 可以和服务器端进行自由交互
 * @Author: jie.wu
 */
public class FreedomClient {
    //线程池
    private static ExecutorService executorService=Executors.newFixedThreadPool(10);

    private final static String charset="UTF8";

    public static void main(String [] args){
        try{
            Socket socket=new Socket();
            socket.connect(new InetSocketAddress("192.168.1.6", 10005));
            System.out.println("客户端已启动,您本地的字符格式为"+Charset.defaultCharset().name()+",请输入信息:");
            executorService.submit(new ReadThread(socket));
            executorService.submit(new WriteThread(socket));
            executorService.shutdown();//在子线程运行结束后关闭线程池,否则主进程不会结束,感觉测试不完整

//由于客户端只有两个进程,所有可以不使用线程池
//            Thread t1=new Thread(new ReadThread(socket));
//            Thread t2=new Thread(new WriteThread(socket));
//            t1.start();
//            t2.start();
//            t1.join();
//            t2.join();
//            System.out.println("结束了--");

        }catch(Exception e){
            System.out.println("报错了1"+e);
        }finally {

        }
    }

    //发送消息的线程
    public static class WriteThread implements Runnable{
        private Socket clientSocket;
        private OutputStream outputStream;
        private PrintWriter printWriter;
        public WriteThread(Socket clientSocket){
            this.clientSocket=clientSocket;
        }
        @Override
        public void run() {
            Scanner s=new Scanner(System.in);
            while(true){
                try{
                    outputStream =clientSocket.getOutputStream();
                    printWriter=new PrintWriter(new OutputStreamWriter(outputStream,charset),true);
                    printWriter.println(s.nextLine());

                }catch(Exception e){
                    System.out.println("报错了4,可能是服务端由于某种原因断开了连接,不再向服务器发送信息"+e);

                    try{
                        if(printWriter!=null){ printWriter.close();}
                        if(outputStream!=null){ outputStream.close();}
                        if(clientSocket!=null){ clientSocket.close();}
                        break;//结束本次对客户端的循环
                    }catch(Exception e2){
                        System.out.println("报错了5"+e2);
                    }

                }
            }
        }
    }

    //接收消息的线程
    public static class ReadThread implements Runnable{
        private Socket clientSocket;

        private InputStream inputStream;

        private BufferedReader bufferedReader;

        public ReadThread(Socket clientSocket){
            this.clientSocket=clientSocket;
        }
        @Override
        public void run() {
            //如果客户端没有关闭写,则持续读取
            while(true){
                try{
                    inputStream=clientSocket.getInputStream();
                    bufferedReader=new BufferedReader(new InputStreamReader(inputStream,charset));

                    String str;
                    for(;(str=bufferedReader.readLine())!=null;){
                        System.out.println("服务端:"+str);
                    }

                }catch(Exception e) {
                    System.out.println("报错了3,可能是服务端由于某种原因断开了连接,停止接收服务器信息,请输入任意字符结束访问" + e + ";" + Thread.currentThread().getName());
                    break;//结束本次对客户端的循环
                }finally{
                    System.out.println("ReadThread finally");
                    try{
                        if(inputStream!=null){ inputStream.close();}
                        if(bufferedReader!=null){ bufferedReader.close();}
                        if(clientSocket!=null){ clientSocket.close();}
                    }catch(Exception e){
                        System.out.println("报错了"+5);
                    }
                }
            }
        }
    }
}

客户端2
import java.io.*;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.charset.Charset;
import java.util.Scanner;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * socket 客户端
 * 可以和服务器端进行自由交互
 * @Author: jie.wu
 */
public class FreedomClient2 {
    //线程池
    private static ExecutorService executorService=Executors.newFixedThreadPool(10);

    private final static String charset="UTF8";

    public static void main(String [] args){
        try{
            Socket socket=new Socket();
            socket.connect(new InetSocketAddress("192.168.1.6", 10005));
            System.out.println("客户端已启动,您本地的字符格式为"+Charset.defaultCharset().name()+",请输入信息:");
            executorService.submit(new ReadThread(socket));
            executorService.submit(new WriteThread(socket));
            executorService.shutdown();//在子线程运行结束后关闭线程池,否则主进程不会结束,感觉测试不完整

//由于客户端只有两个进程,所有可以不使用线程池
//            Thread t1=new Thread(new ReadThread(socket));
//            Thread t2=new Thread(new WriteThread(socket));
//            t1.start();
//            t2.start();
//            t1.join();
//            t2.join();
//            System.out.println("结束了--");

        }catch(Exception e){
            System.out.println("报错了1"+e);
        }finally {

        }
    }

    //发送消息的线程
    public static class WriteThread implements Runnable{
        private Socket clientSocket;
        private OutputStream outputStream;
        private PrintWriter printWriter;
        public WriteThread(Socket clientSocket){
            this.clientSocket=clientSocket;
        }
        @Override
        public void run() {
            Scanner s=new Scanner(System.in);
            while(true){
                try{
                    outputStream =clientSocket.getOutputStream();
                    printWriter=new PrintWriter(new OutputStreamWriter(outputStream,charset),true);
                    printWriter.println(s.nextLine());

                }catch(Exception e){
                    System.out.println("报错了4,可能是服务端由于某种原因断开了连接,不再向服务器发送信息"+e);

                    try{
                        if(printWriter!=null){ printWriter.close();}
                        if(outputStream!=null){ outputStream.close();}
                        if(clientSocket!=null){ clientSocket.close();}
                        break;//结束本次对客户端的循环
                    }catch(Exception e2){
                        System.out.println("报错了5"+e2);
                    }

                }
            }
        }
    }

    //接收消息的线程
    public static class ReadThread implements Runnable{
        private Socket clientSocket;

        private InputStream inputStream;

        private BufferedReader bufferedReader;

        public ReadThread(Socket clientSocket){
            this.clientSocket=clientSocket;
        }
        @Override
        public void run() {
            //如果客户端没有关闭写,则持续读取
            while(true){
                try{
                    inputStream=clientSocket.getInputStream();
                    bufferedReader=new BufferedReader(new InputStreamReader(inputStream,charset));

                    String str;
                    for(;(str=bufferedReader.readLine())!=null;){
                        System.out.println("服务端:"+str);
                    }

                }catch(Exception e) {
                    System.out.println("报错了3,可能是服务端由于某种原因断开了连接,停止接收服务器信息,请输入任意字符结束访问" + e + ";" + Thread.currentThread().getName());
                    break;//结束本次对客户端的循环
                }finally{
                    System.out.println("ReadThread finally");
                    try{
                        if(inputStream!=null){ inputStream.close();}
                        if(bufferedReader!=null){ bufferedReader.close();}
                        if(clientSocket!=null){ clientSocket.close();}
                    }catch(Exception e){
                        System.out.println("报错了"+5);
                    }
                }
            }
        }
    }
}

效果

服务端

服务端已启动,您本地的字符格式为UTF-8,等待客户端访问
1
客户端【192.168.1.6:55565】 : 1
客户端【192.168.1.6:55578】 : 1
aaa

客户端1

客户端已启动,您本地的字符格式为UTF-8,请输入信息:
1
服务端:aaa

客户端2

客户端已启动,您本地的字符格式为UTF-8,请输入信息:
1
服务端:aaa

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值