corefx 源码学习:NetworkStream.ReadAsync 是如何从 Socket 异步读取数据的

摘要:
NetworkStream继承自System.IO.Stream。System.IO.Sream.ReadAsync方法签名为publicTask<int>ReadAsync;实际调用是publicvirtualTask<int>ReadAsync。ReadAsync上面的方法由NetworkStream重写。调用是Socket_streamSocket.ReceiveAsync.AsTask()的ReceiveAsync方法返回;Socket的方法签名。ReceiveAsync internalValueTask<int>ReceiveAsync主要实现代码AwaitableSocketAsyncEventArgssaea=LazyInitializer.EnsureInitialized;如果{saea.SetBuffer;saea.SocketFlags=SocketFlags;saea.WrapExceptionsInIOExceptions=fromNetworkStream;varresult=saea.ReceiveAsync;returnresult;}否则{//由于套接字上的当前接收操作,我们无法获取元数据,所以我们只从saea.ReceiveAsync往下看。saea.Receive Async调用Socket.Receive异步方法,后者调用SocketAsyncEventArgs.DoOperationReceive。Linux上DoOperationReceive的实现是在SocketAsync EventArgs.Unix.cs中实现的。主要代码如下:internalunsafe SocketErrorDoOperationReceive{//…if{errorCode=handle.AsyncContext.ReceiveAsync;}else{errorCode=handle.异步上下文.Receive异步;}if(errorCode!

最近遇到 NetworkStream.ReadAsync 在 Linux 上高并发读取数据的问题,由此激发了阅读 corefx 中 System.Net.Sockets 实现源码(基于 corefx 2.2)的兴趣。

这篇随笔是阅读 NetworkStream.ReadAsync 相关源码的简单笔记,基于在 Linux 上运行的场景。 

NetworkStream 继承自 System.IO.Stream ,System.IO.Stream.ReadAsync 方法签名是

public Task<int> ReadAsync(byte[] buffer, int offset, int count);

实际调用的是

public virtual Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)

上面的的方法被 NetworkStream 重写(override),调用的是 Socket 的 ReceiveAsync 方法

return _streamSocket.ReceiveAsync(
    new Memory<byte>(buffer, offset, size),
    SocketFlags.None,
    fromNetworkStream: true,
    cancellationToken).AsTask();

Socket.ReceiveAsync 的方法签名

internal ValueTask<int> ReceiveAsync(Memory<byte> buffer, SocketFlags socketFlags, bool fromNetworkStream, CancellationToken cancellationToken)

主要实现代码

AwaitableSocketAsyncEventArgs saea = LazyInitializer.EnsureInitialized(ref LazyInitializer.EnsureInitialized(ref _cachedTaskEventArgs).ValueTaskReceive);
if (saea.Reserve())
{
    saea.SetBuffer(buffer);
    saea.SocketFlags = socketFlags;
    saea.WrapExceptionsInIOExceptions = fromNetworkStream;
    var result = saea.ReceiveAsync(this);
    return result;
}
else
{
    // We couldn't get a cached instance, due to a concurrent receive operation on the socket.
    // Fall back to wrapping APM.
    return new ValueTask<int>(ReceiveAsyncApm(buffer, socketFlags));
}

通常情况下都会使用 AwaitableSocketAsyncEventArgs 异步读取数据,所以我们这里只从 saea.ReceiveAsync 往下看。

saea.ReceiveAsync 调用的是 Socket.ReceiveAsync(SocketAsyncEventArgs e)  方法,而后者调用的是 SocketAsyncEventArgs.DoOperationReceive(SafeCloseSocket handle) 。

在 Linux 上 DoOperationReceive 的实现在 SocketAsyncEventArgs.Unix.cs 中,主要代码如下

internal unsafe SocketError DoOperationReceive(SafeCloseSocket handle)
{
    //...
    if (_bufferList == null)
    {
        errorCode = handle.AsyncContext.ReceiveAsync(_buffer.Slice(_offset, _count), _socketFlags, out bytesReceived, out flags, TransferCompletionCallback);
    }
    else
    {
        errorCode = handle.AsyncContext.ReceiveAsync(_bufferListInternal, _socketFlags, out bytesReceived, out flags, TransferCompletionCallback);
    }

    if (errorCode != SocketError.IOPending)
    {
        CompleteTransferOperation(bytesReceived, null, 0, flags, errorCode);
        FinishOperationSync(errorCode, bytesReceived, flags);
    }

    return errorCode;
}

handle.AsyncContext.ReceiveAsync 对应的 Linux 实现在 SocketAsyncContext.Unix.cs 中,调用的是 SocketAsyncContext 的 ReceiveFrom 方法,ReceiveFrom 的主要实现代码如下

public SocketError ReceiveFromAsync(Memory<byte> buffer,  SocketFlags flags, byte[] socketAddress, ref int socketAddressLen, out int bytesReceived, out SocketFlags receivedFlags, Action<int, byte[], int, SocketFlags, SocketError> callback)
{
    SetNonBlocking();

    SocketError errorCode;
    int observedSequenceNumber;
    if (_receiveQueue.IsReady(this, out observedSequenceNumber) &&
        SocketPal.TryCompleteReceiveFrom(_socket, buffer.Span, flags, socketAddress, ref socketAddressLen, out bytesReceived, out receivedFlags, out errorCode))
    {
        return errorCode;
    }

    BufferMemoryReceiveOperation operation = RentBufferMemoryReceiveOperation();
    operation.Callback = callback;
    operation.Buffer = buffer;
    operation.Flags = flags;
    operation.SocketAddress = socketAddress;
    operation.SocketAddressLen = socketAddressLen;

    if (!_receiveQueue.StartAsyncOperation(this, operation, observedSequenceNumber))
    {
        receivedFlags = operation.ReceivedFlags;
        bytesReceived = operation.BytesTransferred;
        errorCode = operation.ErrorCode;

        ReturnOperation(operation);
        return errorCode;
    }

    bytesReceived = 0;
    receivedFlags = SocketFlags.None;
    return SocketError.IOPending;
}

SocketPal.TryCompleteReceiveFrom 的实现代码在 SocketPal.Unix.cs 中,所调用的另一个 TryCompleteReceiveFrom 方法的签名是

public static unsafe bool TryCompleteReceiveFrom(SafeCloseSocket socket, Span<byte> buffer, IList<ArraySegment<byte>> buffers, SocketFlags flags, byte[] socketAddress, ref int socketAddressLen, out int bytesReceived, out SocketFlags receivedFlags, out SocketError errorCode)

该方法调用的是 Receive 方法

private static unsafe int Receive(SafeCloseSocket socket, SocketFlags flags, IList<ArraySegment<byte>> buffers, byte[] socketAddress, ref int socketAddressLen, out SocketFlags receivedFlags, out Interop.Error errno)

在 Receive 方法中调用了 

errno = Interop.Sys.ReceiveMessage(
    socket.DangerousGetHandle(), 
    &messageHeader,
    flags,
    &received);

Interop.Sys.ReceiveMessage 对应的是 Linux 本地库中的方法

internal static partial class Sys
{
    [DllImport(Libraries.SystemNative, EntryPoint = "SystemNative_ReceiveMessage")]
    internal static extern unsafe Error ReceiveMessage(IntPtr socket, MessageHeader* messageHeader, SocketFlags flags, long* received);
}

Libraries.SystemNative 对应的是哪个库呢?

它就是 System.Native.so 

$ find /usr/share/dotnet/ -name System.Native.so
/usr/share/dotnet/shared/Microsoft.NETCore.App/2.2.0/System.Native.so

接下来根据 SocketError.IOPending 的情况阅读源码。

SocketAsyncEventArgs 在 DoOperationReceive 方法中调用 SocketAsyncContext.ReceiveFrom 方法时(handle.AsyncContext.ReceiveAsync)传递了 TransferCompletionCallback 参数值,在异步操作时是通过这个 callback 读取 socket 数据的,对应的方法是 TransferCompletionCallbackCore 。

private void TransferCompletionCallbackCore(int bytesTransferred, byte[] socketAddress, int socketAddressSize, SocketFlags receivedFlags, SocketError socketError)
{
    CompleteTransferOperation(bytesTransferred, socketAddress, socketAddressSize, receivedFlags, socketError);

    CompletionCallback(bytesTransferred, receivedFlags, socketError);
}

TransferCompletionCallbackCore 中进一步调用 CompletionCallback 

private void CompletionCallback(int bytesTransferred, SocketFlags flags, SocketError socketError)
{
    if (socketError == SocketError.Success)
    {
        FinishOperationAsyncSuccess(bytesTransferred, flags);
    }
    else
    {
        if (_currentSocket.CleanedUp)
        {
            socketError = SocketError.OperationAborted;
        }

        FinishOperationAsyncFailure(socketError, bytesTransferred, flags);
    }
}

在 CompletionCallback 中当 SocketError.Success 时进一步调用 FinishOperationAsyncSuccess 

internal void FinishOperationAsyncSuccess(int bytesTransferred, SocketFlags flags)
{
    FinishOperationSyncSuccess(bytesTransferred, flags);

    // Raise completion event.
    if (_context == null)
    {
        OnCompleted(this);
    }
    else
    {
        ExecutionContext.Run(_context, s_executionCallback, this);
    }
}

从上面的代码可以看出实际调用的也是 FinishOperationSyncSuccess ,异步与同步读取数据最终调用的是同一个方法。

免责声明:文章转载自《corefx 源码学习:NetworkStream.ReadAsync 是如何从 Socket 异步读取数据的》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇biztalk中使用.net class类型的消息(一) 相关知识介绍【转】ESRI系列产品报价表(ArcGIS 9.2系列)下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

cjson库的使用以及源码阅读

cjson是一个用c语言开发的json解析库,免费开源只有一个c文件和一个h文件。json和xml功能相似,可以用来传输数据,存储数据以及表达程序当前的状态。 1、下载cjson的源码         https://github.com/DaveGamble/cJSON 2、阅读readme文件可以大概的了解一下cjson的介绍以及使用方法,我尝试着把...

chromium浏览器开发系列第三篇:chromium源码目录结构

上两篇介绍了下载源码和编译源码,这次主要介绍chromium的源码目录结构,我也是通过源码和官网结合来跟大家说,如果有说的不准确的,欢迎交流。 另外,官网的不一定准确,他们其实也很懒,所以最主要还是靠自己。官网只能作为一个参考。 Chromium结构相对两年前变化很大。目录结构依然很清晰,主要有三个部分(不包括其他的库):浏览器,渲染器,webkit。浏览...

ubuntu 16.04 安装nodejs

Ubuntu 上安装 Node.js Node.js 源码安装 以下部分我们将介绍在Ubuntu Linux下安装 Node.js 。 其他的Linux系统,如Centos等类似如下安装步骤。 在 Github 上获取 Node.js 源码: $ sudo git clone https://github.com/nodejs/node.git Clon...

KBEngine源码:组件方案

相对于skynet,KBEngine提供了完整的组件方案。 Loginapp 登录验证、注册、Client的接入口。 Baseapp 通过Loginapp分配过来的Client会与Baseapp保持连接,完成客户端与服务端的交互。 定时把Entity的数据保存进数据库。 Baseapp之间会进行互相备份,保证数据的安全。 灾难恢复-当Baseapp发生问...

Spring Boot源码分析-配置文件加载原理

在Spring Boot源码分析-启动过程中我们进行了启动源码的分析,大致了解了整个Spring Boot的启动过程,具体细节这里不再赘述,感兴趣的同学可以自行阅读。今天让我们继续阅读源码,了解配置文件加载原理。 基于Spring Boot 2.1.0.RELEASE 在开始阅读源码之前,首先准备三个问题。 什么时候开始加载配置文件? 如何读取相关配置文...

mysql-5.6.27源码安装及错误解决办法

环境:centos6.5.x86_64 wgethttp://mirrors.sohu.com/mysql/MySQL-5.6/mysql-5.6.27.tar.gz yum install -y cmake 当然也可以自己下载源码包安装,为方便就Yum安装了 useradd -s /sbin/nologin mysql tar zxvf mysql-...