C#实现Socket异步通信,及完整源码库

C#实现Socket异步通信,及完整源码库

背景

工控上位机系统开发过程中不可避免的会用到socket通信技术,但是在支持多客户端并发连接时,常规方法效率很低。提高通信效率的一种途径就是使用Socket的异步通信,最开始从网上查了些资料,并进行整理。如果服务端使用异步通信,客户端使用常规同步通信(SocketTool.exe进行测试),没有什么问题。这个测试版本大概是在7年前写的,而且放到现场已经用了很多年。最近把Socket客户端也改为了异步通信,却发现最初写的服务端代码还是有问题。今天反复测试,最终修复bug。现把最新整理的完整通讯类库源码共享给大家。

关键代码

因为类库依赖了插件库ICSharp.Core.dll,请把代码中进行日志输出的部分,改为Console.WriteLine()输出,
实际把ICSharpCode.Core.LoggingService.Error换成Console.WriteLine
TcpService.cs

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Configuration;
using System.Net.Sockets;

namespace Mesnac.Communication
{
   
    public class TcpService
    {
   
        private static TcpService instance = null;
        private SocketListener listener = null;
        private int maxConnection = 100;            //允许的最大连接数
        private int bufferSize = 1024 * 1024;       //接收缓冲区的大小
        private int port = 3666;                    //监听的端口号
        //private string ip = "127.0.0.1";            //监听的IP地址
        private TcpService()
        {
   
            try
            {
   
                object oMaxConnection = ConfigurationManager.AppSettings["MaxConnection"];
                if (oMaxConnection != null)
                {
   
                    int.TryParse(oMaxConnection.ToString(), out this.maxConnection);
                }
                object oPort = ConfigurationManager.AppSettings["Port"];
                if (oPort != null)
                {
   
                    int.TryParse(oPort.ToString(), out this.port);
                }
                
                ICSharpCode.Core.LoggingService<TcpService>.Debug("最大连接数:" + this.maxConnection);
                ICSharpCode.Core.LoggingService<TcpService>.Debug("侦听的端口号:" + this.port);
                this.listener = new SocketListener(this.maxConnection, this.bufferSize, this.GetIDByIP);
                this.listener.StartListenThread += new SocketListener.StartListenHandler(listener_StartListenThread);
                this.listener.ProcessAcceptComplete += new EventHandler<SocketAsyncEventArgs>(listener_ProcessAcceptComplete);
                this.listener.OnSended += new SocketListener.SendCompletedHandler(listener_OnSended);
                this.listener.OnMsgReceived += new SocketListener.ReceiveMsgHandler(listener_OnMsgReceived);
                this.listener.OnClientClose += new SocketListener.ClientClose(listener_OnClientClose);
                this.listener.Init();
                try
                {
   
                    this.listener.Start(this.port);
                }
                catch(System.Net.Sockets.SocketException se)
                {
   
                    if (se.SocketErrorCode == System.Net.Sockets.SocketError.AddressAlreadyInUse)
                    {
   
                        ICSharpCode.Core.LoggingService<TcpService>.Error(String.Format("端口号{0}已被占用!", this.port));
                    }
                }
            }
            catch (Exception ex)
            {
   
                ICSharpCode.Core.LoggingService<TcpService>.Error(ex.Message);
                ICSharpCode.Core.LoggingService<TcpService>.Error(ex.StackTrace);
            }
        }

        /// <summary>
        /// 处理客户端连接完成事件
        /// </summary>
        /// <param name="sender">客户端ip</param>
        /// <param name="e"></param>
        private void listener_ProcessAcceptComplete(object sender, SocketAsyncEventArgs e)
        {
   
            //触发处理客户端连接完成事件
            if (this.ProcessAcceptComplete != null)
            {
   
                this.ProcessAcceptComplete(sender, e);
            }
        }

        private void listener_OnMsgReceived(string uid, string info)
        {
   
            if (this.ReceiveMsgHandler != null)
            {
   
                this.ReceiveMsgHandler(uid, info);
            }
            ICSharpCode.Core.LoggingService<TcpService>.Debug(String.Format("接收到{0}发送的消息:{1}", uid, info));
        }

        private void listener_OnSended(string uid, string exception)
        {
   
            if (exception == "100")
            {
   
                ICSharpCode.Core.LoggingService<TcpService>.Debug(String.Format("已发送消息到{0}", uid));
            }
            else
            {
   
                //ICSharpCode.Core.LoggingService.Error(String.Format("消息发送错误:{0}", exception));
                ICSharpCode.Core.LoggingService<TcpService>.Warn("消息发送错误:客户端已经断开连接!");
            }
        }

        /// <summary>
        /// 客户端断开连接的事件处理程序
        /// </summary>
        /// <param name="uid"></param>
        private void listener_OnClientClose(string uid)
        {
   
            ICSharpCode.Core.LoggingService<TcpService>.Debug(String.Format("{0}:断开了连接...{1:yyyy-MM-dd HH:mm:ss}", uid, DateTime.Now));
        }

        private void listener_StartListenThread()
        {
   
            ICSharpCode.Core.LoggingService<TcpService>.Debug("开始侦听...");
            ThreadPool.QueueUserWorkItem(new WaitCallback(this.StartListenCallBack), true);
        }

        private void StartListenCallBack(object listenFlag)
        {
   
            this.listener.ListenFlag = Convert.ToBoolean(listenFlag);
            this.listener.Listen();
        }
        /// <summary>
        /// 获取侦听的服务器IP地址
        /// </summary>
        /// <param name="ip"></param>
        /// <returns></returns>
        private string GetIDByIP(string ip)
        {
   
            return ip;
        }

        /// <summary>
        /// TCP通信服务实例
        /// </summary>
        public static TcpService Instance
        {
   
            get
            {
   
                if (instance == null)
                {
   
                    instance = new TcpService();
                }
                return instance;
            }
        }
        /// <summary>
        /// 启动服务
        /// </summary>
        public static void StartService()
        {
   
            if (instance == null)
            {
   
                instance = new TcpService();
            }
        }
        /// <summary>
        /// 重启服务
        /// </summary>
        public static void ReStartService()
        {
   
            StopService();
            StartService();
        }
        /// <summary>
        /// 停止服务
        /// </summary>
        public static void StopService()
        {
   
            if (instance != null)
            {
   
                try
                {
   
                    instance.listener.Stop();
                }
                catch (Exception ex)
                {
   
                    ICSharpCode.Core.LoggingService<TcpService>.Error("停止Socket侦听失败:" + ex.Message);
                }
                finally
                {
   
                    instance = null;
                }
            }
        }
        /// <summary>
        /// 发送消息至所有连接至本服务器的客户端计算机
        /// </summary>
        /// <param name="msg">消息内容</param>
        public void NetSendMsg(string msg)
        {
   
            foreach (string uid in this.listener.OnlineUID)
            {
   
                this.listener.Send(uid, msg);
            }
        }
        /// <summary>
        /// 发送消息至指定计算机
        /// </summary>
        /// <param name="uid">计算机IP地址</param>
        /// <param name="msg">消息内容</param>
        public void NetSendMsg(string uid, string msg)
        {
   
            this.listener.Send(uid, msg);
        }

        /// <summary>
        /// 接收到信息时触发的事件
        /// </summary>
        public event SocketListener.ReceiveMsgHandler ReceiveMsgHandler;
        /// <summary>
        /// 处理接收客户端连接完成事件
        /// </summary>
        public event EventHandler<SocketAsyncEventArgs> ProcessAcceptComplete;
    }
}

SocketListener.cs

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Net;
using System.Net.Sockets;
using System.Configuration;

namespace Mesnac.Communication
{
   
    public sealed class SocketListener : IDisposable
    {
   
        /// <summary>
        /// 缓冲区
        /// </summary>
        private BufferManager bufferManager;
        /// <summary>
        /// 服务器端Socket
        /// </summary>
        private Socket listenSocket;
        /// <summary>
        /// 服务同步锁
        /// </summary>
        private Mutex mutex;
        /// <summary>
        /// 当前连接数
        /// </summary>
        private Int32 numConnections;
        /// <summary>
        /// 最大并发量
        /// </summary>
        private Int32 numConcurrence;
        /// <summary>
        /// 服务器状态
        /// </summary>
        private ServerState serverstate;
        /// <summary>
        /// 读取写入字节
        /// </summary>
        private const Int32 opsToPreAlloc = 1;
        /// <summary>
        /// Socket连接池
        /// </summary>
        private SocketAsyncEventArgsPool readWritePool;
        /// <summary>
        /// 并发控制信号量
        /// </summary>
        private Semaphore semaphoreAcceptedClients;
        /// <summary>
        /// 通信协议
        /// </summary>
        private RequestHandler handler;
        /// <summary>
        /// 回调委托
        /// </summary>
        /// <param name="IP"></param>
        /// <returns></returns>
        public delegate string GetIDByIPFun(string IP);
        /// <summary>
        /// 回调方法实例
        /// </summary>
        private GetIDByIPFun GetIDByIP;
        /// <summary>
        /// 接收到信息时的事件委托
        /// </summary>
        /// <param name="info"></param>
        public delegate void ReceiveMsgHandler(string uid, string info);
        /// <summary>
        /// 接收到信息时的事件
        /// </summary>
        public event ReceiveMsgHandler OnMsgReceived;
        /// <summary>
        /// 客户端关闭连接时的委托
        /// </summary>
        /// <param name="uid"></param>
        public delegate void ClientClose(string uid);
        /// <summary>
        /// 客户端关闭连接时的事件
        /// </summary>
        public event ClientClose OnClientClose;
        /// <summary>
        /// 开始监听数据的委托
        /// </summary>
        public delegate void StartListenHandler()
评论 13
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值