C#高性能Socket服务器SocketAsyncEventArgs的实现(IOCP)

摘要:
或者,线程可以等待IAsyncResult.AsyncWaitHandle。当调用回调或发出等待信号时,将调用End方法以获得异步操作的结果。但是,您必须意识到,大量异步套接字操作需要付出代价。对于每个操作,必须创建IAsyncResult对象,并且不能重复使用。为了解决这个问题,新版本提供了另一种在套接字上使用异步I/O的方法模式。这种新模式不要求为每个套接字操作分配操作上下文对象。
网址:http://blog.csdn.net/zhujunxxxxx/article/details/43573879
引言
我一直在探寻一个高性能的Socket客户端代码。以前,我使用Socket类写了一些基于传统异步编程模型的代码(BeginSend、BeginReceive,等等)也看过很多博客的知识,在linux中有poll和epoll来实现,在windows下面
微软MSDN中也提供了SocketAsyncEventArgs这个类来实现IOCP 地址:https://msdn.microsoft.com/zh-cn/library/system.net.sockets.socketasynceventargs.aspx
NET Framework中的APM也称为Begin/End模式。这是因为会调用Begin方法来启动异步操作,然后返回一个IAsyncResult 对象。可以选择将一个代理作为参数提供给Begin方法,异步操作完成时会调用该方法。或者,一个线程可以等待 IAsyncResult.AsyncWaitHandle。当回调被调用或发出等待信号时,就会调用End方法来获取异步操作的结果。这种模式很灵活,使用相对简单,在 .NET Framework 中非常常见。
但是,您必须注意,如果进行大量异步套接字操作,是要付出代价的。针对每次操作,都必须创建一个IAsyncResult对象,而且该对象不能被重复使用。由于大量使用对象分配和垃圾收集,这会影响性能。为了解决这个问题,新版本提供了另一个使用套接字上执行异步I/O的方法模式。这种新模式并不要求为每个套接字操作分配操作上下文对象。
 
代码下载:http://download.csdn.net/detail/zhujunxxxxx/8431289 这里的代码优化了的
目标
在上面微软提供的例子我觉得不是很完整,没有具体一个流程,只是受到客户端消息后发送相同内容给客户端,初学者不容易看懂流程,因为我花了一天的时间来实现一个功能齐全的IOCP服务器,
 
效果如下
C#高性能Socket服务器SocketAsyncEventArgs的实现(IOCP)第1张
 
 
代码
 
首先是ICOPServer.cs 这个类是IOCP服务器的核心类,目前这个类是网络上比较全的代码,MSDN上面的例子都没有我的全
 
[csharp] view plain copy
 
  1. using System;  
  2. using System.Collections.Generic;  
  3. using System.Linq;  
  4. using System.Text;  
  5. using System.Net.Sockets;  
  6. using System.Net;  
  7. using System.Threading;  
  8.   
  9. namespace ServerTest  
  10. {  
  11.     /// <summary>  
  12.     /// IOCP SOCKET服务器  
  13.     /// </summary>  
  14.     public class IOCPServer : IDisposable  
  15.     {  
  16.         const int opsToPreAlloc = 2;  
  17.         #region Fields  
  18.         /// <summary>  
  19.         /// 服务器程序允许的最大客户端连接数  
  20.         /// </summary>  
  21.         private int _maxClient;  
  22.   
  23.         /// <summary>  
  24.         /// 监听Socket,用于接受客户端的连接请求  
  25.         /// </summary>  
  26.         private Socket _serverSock;  
  27.   
  28.         /// <summary>  
  29.         /// 当前的连接的客户端数  
  30.         /// </summary>  
  31.         private int _clientCount;  
  32.   
  33.         /// <summary>  
  34.         /// 用于每个I/O Socket操作的缓冲区大小  
  35.         /// </summary>  
  36.         private int _bufferSize = 1024;  
  37.   
  38.         /// <summary>  
  39.         /// 信号量  
  40.         /// </summary>  
  41.         Semaphore _maxAcceptedClients;  
  42.   
  43.         /// <summary>  
  44.         /// 缓冲区管理  
  45.         /// </summary>  
  46.         BufferManager _bufferManager;  
  47.   
  48.         /// <summary>  
  49.         /// 对象池  
  50.         /// </summary>  
  51.         SocketAsyncEventArgsPool _objectPool;  
  52.   
  53.         private bool disposed = false;  
  54.  
  55.         #endregion  
  56.  
  57.         #region Properties  
  58.   
  59.         /// <summary>  
  60.         /// 服务器是否正在运行  
  61.         /// </summary>  
  62.         public bool IsRunning { get; private set; }  
  63.         /// <summary>  
  64.         /// 监听的IP地址  
  65.         /// </summary>  
  66.         public IPAddress Address { get; private set; }  
  67.         /// <summary>  
  68.         /// 监听的端口  
  69.         /// </summary>  
  70.         public int Port { get; private set; }  
  71.         /// <summary>  
  72.         /// 通信使用的编码  
  73.         /// </summary>  
  74.         public Encoding Encoding { get; set; }  
  75.  
  76.         #endregion  
  77.  
  78.         #region Ctors  
  79.   
  80.         /// <summary>  
  81.         /// 异步IOCP SOCKET服务器  
  82.         /// </summary>  
  83.         /// <param name="listenPort">监听的端口</param>  
  84.         /// <param name="maxClient">最大的客户端数量</param>  
  85.         public IOCPServer(int listenPort,int maxClient)  
  86.             : this(IPAddress.Any, listenPort, maxClient)  
  87.         {  
  88.         }  
  89.   
  90.         /// <summary>  
  91.         /// 异步Socket TCP服务器  
  92.         /// </summary>  
  93.         /// <param name="localEP">监听的终结点</param>  
  94.         /// <param name="maxClient">最大客户端数量</param>  
  95.         public IOCPServer(IPEndPoint localEP, int maxClient)  
  96.             : this(localEP.Address, localEP.Port,maxClient)  
  97.         {  
  98.         }  
  99.   
  100.         /// <summary>  
  101.         /// 异步Socket TCP服务器  
  102.         /// </summary>  
  103.         /// <param name="localIPAddress">监听的IP地址</param>  
  104.         /// <param name="listenPort">监听的端口</param>  
  105.         /// <param name="maxClient">最大客户端数量</param>  
  106.         public IOCPServer(IPAddress localIPAddress, int listenPort, int maxClient)  
  107.         {  
  108.             this.Address = localIPAddress;  
  109.             this.Port = listenPort;  
  110.             this.Encoding = Encoding.Default;  
  111.   
  112.             _maxClient = maxClient;  
  113.   
  114.             _serverSock = new Socket(localIPAddress.AddressFamily, SocketType.Stream, ProtocolType.Tcp);  
  115.   
  116.             _bufferManager = new BufferManager(_bufferSize * _maxClient * opsToPreAlloc,_bufferSize);  
  117.   
  118.             _objectPool = new SocketAsyncEventArgsPool(_maxClient);  
  119.   
  120.             _maxAcceptedClients = new Semaphore(_maxClient, _maxClient);   
  121.         }  
  122.  
  123.         #endregion  
  124.  
  125.  
  126.         #region 初始化  
  127.   
  128.         /// <summary>  
  129.         /// 初始化函数  
  130.         /// </summary>  
  131.         public void Init()  
  132.         {  
  133.             // Allocates one large byte buffer which all I/O operations use a piece of.  This gaurds   
  134.             // against memory fragmentation  
  135.             _bufferManager.InitBuffer();  
  136.   
  137.             // preallocate pool of SocketAsyncEventArgs objects  
  138.             SocketAsyncEventArgs readWriteEventArg;  
  139.   
  140.             for (int i = 0; i < _maxClient; i++)  
  141.             {  
  142.                 //Pre-allocate a set of reusable SocketAsyncEventArgs  
  143.                 readWriteEventArg = new SocketAsyncEventArgs();  
  144.                 readWriteEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(OnIOCompleted);  
  145.                 readWriteEventArg.UserToken = null;  
  146.   
  147.                 // assign a byte buffer from the buffer pool to the SocketAsyncEventArg object  
  148.                 _bufferManager.SetBuffer(readWriteEventArg);  
  149.   
  150.                 // add SocketAsyncEventArg to the pool  
  151.                 _objectPool.Push(readWriteEventArg);  
  152.             }  
  153.   
  154.         }  
  155.  
  156.         #endregion  
  157.  
  158.         #region Start  
  159.         /// <summary>  
  160.         /// 启动  
  161.         /// </summary>  
  162.         public void Start()  
  163.         {  
  164.             if (!IsRunning)  
  165.             {  
  166.                 Init();  
  167.                 IsRunning = true;  
  168.                 IPEndPoint localEndPoint = new IPEndPoint(Address, Port);  
  169.                 // 创建监听socket  
  170.                 _serverSock = new Socket(localEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);  
  171.                 //_serverSock.ReceiveBufferSize = _bufferSize;  
  172.                 //_serverSock.SendBufferSize = _bufferSize;  
  173.                 if (localEndPoint.AddressFamily == AddressFamily.InterNetworkV6)  
  174.                 {  
  175.                     // 配置监听socket为 dual-mode (IPv4 & IPv6)   
  176.                     // 27 is equivalent to IPV6_V6ONLY socket option in the winsock snippet below,  
  177.                     _serverSock.SetSocketOption(SocketOptionLevel.IPv6, (SocketOptionName)27, false);  
  178.                     _serverSock.Bind(new IPEndPoint(IPAddress.IPv6Any, localEndPoint.Port));  
  179.                 }  
  180.                 else  
  181.                 {  
  182.                     _serverSock.Bind(localEndPoint);  
  183.                 }  
  184.                 // 开始监听  
  185.                 _serverSock.Listen(this._maxClient);  
  186.                 // 在监听Socket上投递一个接受请求。  
  187.                 StartAccept(null);  
  188.             }  
  189.         }  
  190.         #endregion  
  191.  
  192.         #region Stop  
  193.   
  194.         /// <summary>  
  195.         /// 停止服务  
  196.         /// </summary>  
  197.         public void Stop()  
  198.         {  
  199.             if (IsRunning)  
  200.             {  
  201.                 IsRunning = false;  
  202.                 _serverSock.Close();  
  203.                 //TODO 关闭对所有客户端的连接  
  204.   
  205.             }  
  206.         }  
  207.  
  208.         #endregion  
  209.  
  210.  
  211.         #region Accept  
  212.   
  213.         /// <summary>  
  214.         /// 从客户端开始接受一个连接操作  
  215.         /// </summary>  
  216.         private void StartAccept(SocketAsyncEventArgs asyniar)  
  217.         {  
  218.             if (asyniar == null)  
  219.             {  
  220.                 asyniar = new SocketAsyncEventArgs();  
  221.                 asyniar.Completed += new EventHandler<SocketAsyncEventArgs>(OnAcceptCompleted);  
  222.             }  
  223.             else  
  224.             {  
  225.                 //socket must be cleared since the context object is being reused  
  226.                 asyniar.AcceptSocket = null;  
  227.             }  
  228.             _maxAcceptedClients.WaitOne();  
  229.             if (!_serverSock.AcceptAsync(asyniar))  
  230.             {  
  231.                 ProcessAccept(asyniar);  
  232.                 //如果I/O挂起等待异步则触发AcceptAsyn_Asyn_Completed事件  
  233.                 //此时I/O操作同步完成,不会触发Asyn_Completed事件,所以指定BeginAccept()方法  
  234.             }  
  235.         }  
  236.   
  237.         /// <summary>  
  238.         /// accept 操作完成时回调函数  
  239.         /// </summary>  
  240.         /// <param name="sender">Object who raised the event.</param>  
  241.         /// <param name="e">SocketAsyncEventArg associated with the completed accept operation.</param>  
  242.         private void OnAcceptCompleted(object sender, SocketAsyncEventArgs e)  
  243.         {  
  244.             ProcessAccept(e);  
  245.         }  
  246.   
  247.         /// <summary>  
  248.         /// 监听Socket接受处理  
  249.         /// </summary>  
  250.         /// <param name="e">SocketAsyncEventArg associated with the completed accept operation.</param>  
  251.         private void ProcessAccept(SocketAsyncEventArgs e)  
  252.         {  
  253.             if (e.SocketError == SocketError.Success)  
  254.             {  
  255.                 Socket s = e.AcceptSocket;//和客户端关联的socket  
  256.                 if (s.Connected)  
  257.                 {  
  258.                     try  
  259.                     {  
  260.                           
  261.                         Interlocked.Increment(ref _clientCount);//原子操作加1  
  262.                         SocketAsyncEventArgs asyniar = _objectPool.Pop();  
  263.                         asyniar.UserToken = s;  
  264.   
  265.                         Log4Debug(String.Format("客户 {0} 连入, 共有 {1} 个连接。", s.RemoteEndPoint.ToString(), _clientCount));  
  266.                           
  267.                         if (!s.ReceiveAsync(asyniar))//投递接收请求  
  268.                         {  
  269.                             ProcessReceive(asyniar);  
  270.                         }  
  271.                     }  
  272.                     catch (SocketException ex)  
  273.                     {  
  274.                         Log4Debug(String.Format("接收客户 {0} 数据出错, 异常信息: {1} 。", s.RemoteEndPoint, ex.ToString()));  
  275.                         //TODO 异常处理  
  276.                     }  
  277.                     //投递下一个接受请求  
  278.                     StartAccept(e);  
  279.                 }  
  280.             }  
  281.         }  
  282.  
  283.         #endregion  
  284.  
  285.         #region 发送数据  
  286.   
  287.         /// <summary>  
  288.         /// 异步的发送数据  
  289.         /// </summary>  
  290.         /// <param name="e"></param>  
  291.         /// <param name="data"></param>  
  292.         public void Send(SocketAsyncEventArgs e, byte[] data)  
  293.         {  
  294.             if (e.SocketError == SocketError.Success)  
  295.             {  
  296.                 Socket s = e.AcceptSocket;//和客户端关联的socket  
  297.                 if (s.Connected)  
  298.                 {  
  299.                     Array.Copy(data, 0, e.Buffer, 0, data.Length);//设置发送数据  
  300.   
  301.                     //e.SetBuffer(data, 0, data.Length); //设置发送数据  
  302.                     if (!s.SendAsync(e))//投递发送请求,这个函数有可能同步发送出去,这时返回false,并且不会引发SocketAsyncEventArgs.Completed事件  
  303.                     {  
  304.                         // 同步发送时处理发送完成事件  
  305.                         ProcessSend(e);  
  306.                     }  
  307.                     else  
  308.                     {  
  309.                         CloseClientSocket(e);  
  310.                     }  
  311.                 }  
  312.             }  
  313.         }  
  314.   
  315.         /// <summary>  
  316.         /// 同步的使用socket发送数据  
  317.         /// </summary>  
  318.         /// <param name="socket"></param>  
  319.         /// <param name="buffer"></param>  
  320.         /// <param name="offset"></param>  
  321.         /// <param name="size"></param>  
  322.         /// <param name="timeout"></param>  
  323.         public void Send(Socket socket, byte[] buffer, int offset, int size, int timeout)  
  324.         {  
  325.             socket.SendTimeout = 0;  
  326.             int startTickCount = Environment.TickCount;  
  327.             int sent = 0; // how many bytes is already sent  
  328.             do  
  329.             {  
  330.                 if (Environment.TickCount > startTickCount + timeout)  
  331.                 {  
  332.                     //throw new Exception("Timeout.");  
  333.                 }  
  334.                 try  
  335.                 {  
  336.                     sent += socket.Send(buffer, offset + sent, size - sent, SocketFlags.None);  
  337.                 }  
  338.                 catch (SocketException ex)  
  339.                 {  
  340.                     if (ex.SocketErrorCode == SocketError.WouldBlock ||  
  341.                     ex.SocketErrorCode == SocketError.IOPending ||  
  342.                     ex.SocketErrorCode == SocketError.NoBufferSpaceAvailable)  
  343.                     {  
  344.                         // socket buffer is probably full, wait and try again  
  345.                         Thread.Sleep(30);  
  346.                     }  
  347.                     else  
  348.                     {  
  349.                         throw ex; // any serious error occurr  
  350.                     }  
  351.                 }  
  352.             } while (sent < size);  
  353.         }  
  354.   
  355.   
  356.         /// <summary>  
  357.         /// 发送完成时处理函数  
  358.         /// </summary>  
  359.         /// <param name="e">与发送完成操作相关联的SocketAsyncEventArg对象</param>  
  360.         private void ProcessSend(SocketAsyncEventArgs e)  
  361.         {  
  362.             if (e.SocketError == SocketError.Success)  
  363.             {  
  364.                 Socket s = (Socket)e.UserToken;  
  365.   
  366.                 //TODO  
  367.             }  
  368.             else  
  369.             {  
  370.                 CloseClientSocket(e);  
  371.             }  
  372.         }  
  373.  
  374.         #endregion  
  375.  
  376.         #region 接收数据  
  377.   
  378.   
  379.         /// <summary>  
  380.         ///接收完成时处理函数  
  381.         /// </summary>  
  382.         /// <param name="e">与接收完成操作相关联的SocketAsyncEventArg对象</param>  
  383.         private void ProcessReceive(SocketAsyncEventArgs e)  
  384.         {  
  385.             if (e.SocketError == SocketError.Success)//if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success)  
  386.             {  
  387.                 // 检查远程主机是否关闭连接  
  388.                 if (e.BytesTransferred > 0)  
  389.                 {  
  390.                     Socket s = (Socket)e.UserToken;  
  391.                     //判断所有需接收的数据是否已经完成  
  392.                     if (s.Available == 0)  
  393.                     {  
  394.                         //从侦听者获取接收到的消息。   
  395.                         //String received = Encoding.ASCII.GetString(e.Buffer, e.Offset, e.BytesTransferred);  
  396.                         //echo the data received back to the client  
  397.                         //e.SetBuffer(e.Offset, e.BytesTransferred);  
  398.   
  399.                         byte[] data = new byte[e.BytesTransferred];  
  400.                         Array.Copy(e.Buffer, e.Offset, data, 0, data.Length);//从e.Buffer块中复制数据出来,保证它可重用  
  401.   
  402.                         string info=Encoding.Default.GetString(data);  
  403.                         Log4Debug(String.Format("收到 {0} 数据为 {1}",s.RemoteEndPoint.ToString(),info));  
  404.                         //TODO 处理数据  
  405.   
  406.                         //增加服务器接收的总字节数。  
  407.                     }  
  408.   
  409.                     if (!s.ReceiveAsync(e))//为接收下一段数据,投递接收请求,这个函数有可能同步完成,这时返回false,并且不会引发SocketAsyncEventArgs.Completed事件  
  410.                     {  
  411.                         //同步接收时处理接收完成事件  
  412.                         ProcessReceive(e);  
  413.                     }  
  414.                 }  
  415.             }  
  416.             else  
  417.             {  
  418.                 CloseClientSocket(e);  
  419.             }  
  420.         }  
  421.  
  422.         #endregion  
  423.  
  424.         #region 回调函数  
  425.   
  426.         /// <summary>  
  427.         /// 当Socket上的发送或接收请求被完成时,调用此函数  
  428.         /// </summary>  
  429.         /// <param name="sender">激发事件的对象</param>  
  430.         /// <param name="e">与发送或接收完成操作相关联的SocketAsyncEventArg对象</param>  
  431.         private void OnIOCompleted(object sender, SocketAsyncEventArgs e)  
  432.         {  
  433.             // Determine which type of operation just completed and call the associated handler.  
  434.             switch (e.LastOperation)  
  435.             {  
  436.                 case SocketAsyncOperation.Accept:  
  437.                     ProcessAccept(e);  
  438.                     break;  
  439.                 case SocketAsyncOperation.Receive:  
  440.                     ProcessReceive(e);  
  441.                     break;  
  442.                 default:  
  443.                     throw new ArgumentException("The last operation completed on the socket was not a receive or send");  
  444.             }  
  445.         }  
  446.  
  447.         #endregion  
  448.  
  449.         #region Close  
  450.         /// <summary>  
  451.         /// 关闭socket连接  
  452.         /// </summary>  
  453.         /// <param name="e">SocketAsyncEventArg associated with the completed send/receive operation.</param>  
  454.         private void CloseClientSocket(SocketAsyncEventArgs e)  
  455.         {  
  456.             Log4Debug(String.Format("客户 {0} 断开连接!",((Socket)e.UserToken).RemoteEndPoint.ToString()));  
  457.             Socket s = e.UserToken as Socket;  
  458.             CloseClientSocket(s, e);  
  459.         }  
  460.   
  461.         /// <summary>  
  462.         /// 关闭socket连接  
  463.         /// </summary>  
  464.         /// <param name="s"></param>  
  465.         /// <param name="e"></param>  
  466.         private void CloseClientSocket(Socket s, SocketAsyncEventArgs e)  
  467.         {  
  468.             try  
  469.             {  
  470.                 s.Shutdown(SocketShutdown.Send);  
  471.             }  
  472.             catch (Exception)  
  473.             {  
  474.                 // Throw if client has closed, so it is not necessary to catch.  
  475.             }  
  476.             finally  
  477.             {  
  478.                 s.Close();  
  479.             }  
  480.             Interlocked.Decrement(ref _clientCount);  
  481.             _maxAcceptedClients.Release();  
  482.             _objectPool.Push(e);//SocketAsyncEventArg 对象被释放,压入可重用队列。  
  483.         }  
  484.         #endregion  
  485.  
  486.         #region Dispose  
  487.         /// <summary>  
  488.         /// Performs application-defined tasks associated with freeing,   
  489.         /// releasing, or resetting unmanaged resources.  
  490.         /// </summary>  
  491.         public void Dispose()  
  492.         {  
  493.             Dispose(true);  
  494.             GC.SuppressFinalize(this);  
  495.         }  
  496.   
  497.         /// <summary>  
  498.         /// Releases unmanaged and - optionally - managed resources  
  499.         /// </summary>  
  500.         /// <param name="disposing"><c>true</c> to release   
  501.         /// both managed and unmanaged resources; <c>false</c>   
  502.         /// to release only unmanaged resources.</param>  
  503.         protected virtual void Dispose(bool disposing)  
  504.         {  
  505.             if (!this.disposed)  
  506.             {  
  507.                 if (disposing)  
  508.                 {  
  509.                     try  
  510.                     {  
  511.                         Stop();  
  512.                         if (_serverSock != null)  
  513.                         {  
  514.                             _serverSock = null;  
  515.                         }  
  516.                     }  
  517.                     catch (SocketException ex)  
  518.                     {  
  519.                         //TODO 事件  
  520.                     }  
  521.                 }  
  522.                 disposed = true;  
  523.             }  
  524.         }  
  525.         #endregion  
  526.   
  527.         public void Log4Debug(string msg)  
  528.         {  
  529.             Console.WriteLine("notice:"+msg);  
  530.         }  
  531.   
  532.     }  
  533. }  

BufferManager.cs 这个类是缓存管理类,是采用MSDN上面的例子一样的 地址: https://msdn.microsoft.com/zh-cn/library/bb517542.aspx
 
SocketAsyncEventArgsPool.cs 这个类也是来自MSDN的 地址:https://msdn.microsoft.com/zh-cn/library/system.net.sockets.socketasynceventargs.aspx
 
需要的话自己到MSDN网站上去取,我就不贴出来了
 
服务器端
[csharp] view plain copy
 
  1. static void Main(string[] args)  
  2.         {  
  3.   
  4.             IOCPServer server = new IOCPServer(8088, 1024);  
  5.             server.Start();  
  6.             Console.WriteLine("服务器已启动....");  
  7.             System.Console.ReadLine();  
  8.         }  


客户端
 
客户端代码也是很简单
[csharp] view plain copy
 
  1. static void Main(string[] args)  
  2.         {  
  3.             IPAddress remote=IPAddress.Parse("192.168.3.4");  
  4.             client c = new client(8088,remote);  
  5.   
  6.             c.connect();  
  7.             Console.WriteLine("服务器连接成功!");  
  8.             while (true)  
  9.             {  
  10.                 Console.Write("send>");  
  11.                 string msg=Console.ReadLine();  
  12.                 if (msg == "exit")  
  13.                     break;  
  14.                 c.send(msg);  
  15.             }  
  16.             c.disconnect();  
  17.             Console.ReadLine();  
  18.         }  
 
client.cs
[csharp] view plain copy
 
  1. public class client  
  2.     {  
  3.   
  4.         public TcpClient _client;  
  5.   
  6.         public int port;  
  7.   
  8.         public IPAddress remote;  
  9.   
  10.         public client(int port,IPAddress remote)  
  11.         {  
  12.   
  13.             this.port = port;  
  14.             this.remote = remote;  
  15.         }  
  16.   
  17.         public void connect()  
  18.         {  
  19.             this._client=new TcpClient();  
  20.             _client.Connect(remote, port);  
  21.         }  
  22.         public void disconnect()  
  23.         {  
  24.             _client.Close();  
  25.         }  
  26.         public void send(string msg)  
  27.         {  
  28.             byte[] data=Encoding.Default.GetBytes(msg);  
  29.             _client.GetStream().Write(data, 0, data.Length);  
  30.         }  
  31.     }  


IOCPClient类,使用SocketAsyncEventArgs类建立一个Socket客户端。虽然MSDN说这个类特别设计给网络服务器应用,但也没有限制在客户端代码中使用APM。下面给出了IOCPClient类的样例代码:
[csharp] view plain copy
 
  1. public class IOCPClient  
  2.    {  
  3.        /// <summary>  
  4.        /// 连接服务器的socket  
  5.        /// </summary>  
  6.        private Socket _clientSock;  
  7.   
  8.        /// <summary>  
  9.        /// 用于服务器执行的互斥同步对象  
  10.        /// </summary>  
  11.        private static Mutex mutex = new Mutex();  
  12.        /// <summary>  
  13.        /// Socket连接标志  
  14.        /// </summary>  
  15.        private Boolean _connected = false;  
  16.   
  17.        private const int ReceiveOperation = 1, SendOperation = 0;  
  18.   
  19.        private static AutoResetEvent[]  
  20.                 autoSendReceiveEvents = new AutoResetEvent[]  
  21.         {  
  22.             new AutoResetEvent(false),  
  23.             new AutoResetEvent(false)  
  24.         };  
  25.   
  26.        /// <summary>  
  27.        /// 服务器监听端点  
  28.        /// </summary>  
  29.        private IPEndPoint _remoteEndPoint;  
  30.   
  31.        public IOCPClient(IPEndPoint local,IPEndPoint remote)  
  32.        {  
  33.            _clientSock = new Socket(local.AddressFamily,SocketType.Stream, ProtocolType.Tcp);  
  34.            _remoteEndPoint = remote;  
  35.        }  
  36.  
  37.        #region 连接服务器  
  38.   
  39.        /// <summary>  
  40.        /// 连接远程服务器  
  41.        /// </summary>  
  42.        public void Connect()  
  43.        {  
  44.            SocketAsyncEventArgs connectArgs = new SocketAsyncEventArgs();  
  45.   
  46.            connectArgs.UserToken = _clientSock;  
  47.            connectArgs.RemoteEndPoint = _remoteEndPoint;  
  48.            connectArgs.Completed += new EventHandler<SocketAsyncEventArgs>(OnConnected);  
  49.            mutex.WaitOne();  
  50.            if (!_clientSock.ConnectAsync(connectArgs))//异步连接  
  51.            {  
  52.                ProcessConnected(connectArgs);  
  53.            }  
  54.              
  55.        }  
  56.        /// <summary>  
  57.        /// 连接上的事件  
  58.        /// </summary>  
  59.        /// <param name="sender"></param>  
  60.        /// <param name="e"></param>  
  61.        void OnConnected(object sender, SocketAsyncEventArgs e)  
  62.        {  
  63.            mutex.ReleaseMutex();  
  64.            //设置Socket已连接标志。   
  65.            _connected = (e.SocketError == SocketError.Success);  
  66.        }  
  67.        /// <summary>  
  68.        /// 处理连接服务器  
  69.        /// </summary>  
  70.        /// <param name="e"></param>  
  71.        private void ProcessConnected(SocketAsyncEventArgs e)  
  72.        {  
  73.            //TODO  
  74.        }  
  75.  
  76.        #endregion  
  77.  
  78.        #region 发送消息  
  79.        /// <summary>  
  80.        /// 向服务器发送消息  
  81.        /// </summary>  
  82.        /// <param name="data"></param>  
  83.        public void Send(byte[] data)  
  84.        {  
  85.            SocketAsyncEventArgs asyniar = new SocketAsyncEventArgs();  
  86.            asyniar.Completed += new EventHandler<SocketAsyncEventArgs>(OnSendComplete);  
  87.            asyniar.SetBuffer(data, 0, data.Length);  
  88.            asyniar.UserToken = _clientSock;  
  89.            asyniar.RemoteEndPoint = _remoteEndPoint;  
  90.            autoSendReceiveEvents[SendOperation].WaitOne();  
  91.            if (!_clientSock.SendAsync(asyniar))//投递发送请求,这个函数有可能同步发送出去,这时返回false,并且不会引发SocketAsyncEventArgs.Completed事件  
  92.            {  
  93.                // 同步发送时处理发送完成事件  
  94.                ProcessSend(asyniar);  
  95.            }  
  96.        }  
  97.   
  98.        /// <summary>  
  99.        /// 发送操作的回调方法  
  100.        /// </summary>  
  101.        /// <param name="sender"></param>  
  102.        /// <param name="e"></param>  
  103.        private void OnSendComplete(object sender, SocketAsyncEventArgs e)  
  104.        {  
  105.            //发出发送完成信号。   
  106.            autoSendReceiveEvents[SendOperation].Set();  
  107.            ProcessSend(e);  
  108.        }  
  109.   
  110.        /// <summary>  
  111.        /// 发送完成时处理函数  
  112.        /// </summary>  
  113.        /// <param name="e">与发送完成操作相关联的SocketAsyncEventArg对象</param>  
  114.        private void ProcessSend(SocketAsyncEventArgs e)  
  115.        {  
  116.            //TODO  
  117.        }  
  118.        #endregion  
  119.  
  120.        #region 接收消息  
  121.        /// <summary>  
  122.        /// 开始监听服务端数据  
  123.        /// </summary>  
  124.        /// <param name="e"></param>  
  125.        public void StartRecive(SocketAsyncEventArgs e)  
  126.        {  
  127.            //准备接收。   
  128.            Socket s = e.UserToken as Socket;  
  129.            byte[] receiveBuffer = new byte[255];  
  130.            e.SetBuffer(receiveBuffer, 0, receiveBuffer.Length);  
  131.            e.Completed += new EventHandler<SocketAsyncEventArgs>(OnReceiveComplete);  
  132.            autoSendReceiveEvents[ReceiveOperation].WaitOne();  
  133.            if (!s.ReceiveAsync(e))  
  134.            {  
  135.                ProcessReceive(e);  
  136.            }  
  137.        }  
  138.   
  139.        /// <summary>  
  140.        /// 接收操作的回调方法  
  141.        /// </summary>  
  142.        /// <param name="sender"></param>  
  143.        /// <param name="e"></param>  
  144.        private void OnReceiveComplete(object sender, SocketAsyncEventArgs e)  
  145.        {  
  146.            //发出接收完成信号。   
  147.            autoSendReceiveEvents[ReceiveOperation].Set();  
  148.            ProcessReceive(e);  
  149.        }  
  150.   
  151.        /// <summary>  
  152.        ///接收完成时处理函数  
  153.        /// </summary>  
  154.        /// <param name="e">与接收完成操作相关联的SocketAsyncEventArg对象</param>  
  155.        private void ProcessReceive(SocketAsyncEventArgs e)  
  156.        {  
  157.            if (e.SocketError == SocketError.Success)  
  158.            {  
  159.                // 检查远程主机是否关闭连接  
  160.                if (e.BytesTransferred > 0)  
  161.                {  
  162.                    Socket s = (Socket)e.UserToken;  
  163.                    //判断所有需接收的数据是否已经完成  
  164.                    if (s.Available == 0)  
  165.                    {  
  166.                        byte[] data = new byte[e.BytesTransferred];  
  167.                        Array.Copy(e.Buffer, e.Offset, data, 0, data.Length);//从e.Buffer块中复制数据出来,保证它可重用  
  168.   
  169.                        //TODO 处理数据  
  170.                    }  
  171.   
  172.                    if (!s.ReceiveAsync(e))//为接收下一段数据,投递接收请求,这个函数有可能同步完成,这时返回false,并且不会引发SocketAsyncEventArgs.Completed事件  
  173.                    {  
  174.                        //同步接收时处理接收完成事件  
  175.                        ProcessReceive(e);  
  176.                    }  
  177.                }  
  178.            }  
  179.        }  
  180.  
  181.        #endregion  
  182.   
  183.   
  184.        public void Close()  
  185.        {  
  186.            _clientSock.Disconnect(false);  
  187.        }  
  188.   
  189.        /// <summary>  
  190.        /// 失败时关闭Socket,根据SocketError抛出异常。  
  191.        /// </summary>  
  192.        /// <param name="e"></param>  
  193.   
  194.        private void ProcessError(SocketAsyncEventArgs e)  
  195.        {  
  196.            Socket s = e.UserToken as Socket;  
  197.            if (s.Connected)  
  198.            {  
  199.                //关闭与客户端关联的Socket  
  200.                try  
  201.                {  
  202.                    s.Shutdown(SocketShutdown.Both);  
  203.                }  
  204.                catch (Exception)  
  205.                {  
  206.                    //如果客户端处理已经关闭,抛出异常   
  207.                }  
  208.                finally  
  209.                {  
  210.                    if (s.Connected)  
  211.                    {  
  212.                        s.Close();  
  213.                    }  
  214.                }  
  215.            }  
  216.            //抛出SocketException   
  217.            throw new SocketException((Int32)e.SocketError);  
  218.        }  
  219.   
  220.   
  221.        /// <summary>  
  222.        /// 释放SocketClient实例  
  223.        /// </summary>  
  224.        public void Dispose()  
  225.        {  
  226.            mutex.Close();  
  227.            autoSendReceiveEvents[SendOperation].Close();  
  228.            autoSendReceiveEvents[ReceiveOperation].Close();  
  229.            if (_clientSock.Connected)  
  230.            {  
  231.                _clientSock.Close();  
  232.            }  
  233.        }  
  234.   
  235.    }  
这个类我没有测试,但是理论上是没问题的。

免责声明:文章转载自《C#高性能Socket服务器SocketAsyncEventArgs的实现(IOCP)》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇SqlServer nvarchar中的中文字符匹配,更改SqlServer实例和数据库排序规则的办法完美的外出上网解决方案随身随地享用你的专有WIFI网络(3G无线路由器+sim卡卡托+3G资费卡)下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

cookie详解

背景 在HTTP协议的定义中,采用了一种机制来记录客户端和服务器端交互的信息,这种机制被称为cookie,cookie规范定义了服务器和客户端交互信息的格式、生存期、使用范围、安全性。 在JavaScript中可以通过 document.cookie 来读取或设置这些信息。由于 cookie 多用在客户端和服务端之间进行通信,所以除了JavaScript以...

详解tomcat的连接数与线程池

前言 在使用tomcat时,经常会遇到连接数、线程数之类的配置问题,要真正理解这些概念,必须先了解Tomcat的连接器(Connector)。 在前面的文章详解Tomcat配置文件server.xml中写到过:Connector的主要功能,是接收连接请求,创建Request和Response对象用于和请求端交换数据;然后分配线程让Engine(也就是Ser...

加密通讯协议SSL编程周立发

Linux网络编程:加密通讯协议SSL编程<?xml:namespace prefix = o ns = "urn:schemas-microsoft-com:office:office" /> 服务器端源代码如下: #include <stdio.h>#include <stdlib.h>#include &...

socket.io建立长连接

   socket.io是基于node.js,在命令行里输入npm socket.io下载模块,用node.js搭建后台 示例代码,客户端 1 <!DOCTYPE html> 2 <html lang="zh-CN"> 3 <head> 4 <meta charset="UTF-8"> 5...

【转】TCP/UDP简易通信框架源码,支持轻松管理多个TCP服务端(客户端)、UDP客户端

【转】TCP/UDP简易通信框架源码,支持轻松管理多个TCP服务端(客户端)、UDP客户端 目录 说明 TCP/UDP通信主要结构 管理多个Socket的解决方案 框架中TCP部分的使用 框架中UDP部分的使用 框架源码结构 补充说明 源码地址 说明 之前有好几篇博客在讲TCP/UDP通信方面的内容,也有做过一些Demo(包括整理出来的、可供学习使用的...

《篡权的ss》linux命令五分钟系列之三十一

原文链接 本原创文章属于《Linux大棚》博客。 博客地址为http://roclinux.cn。 文章作者为roc。 === 上篇文章《和netstat说再见》中说到netstat已经被抛弃,取而代之的是ss命令。一些朋友在问“netstat为什么会被抛弃呢?ss又是什么命令呢?” 这篇文章,我们就来揭晓答案,重点说一说“篡权的ss”。 【作者粗心大意?...