From df4cc6250ae7f1549ae974dabc50e1095c1c12af Mon Sep 17 00:00:00 2001 From: Yi Huang Date: Wed, 25 Oct 2023 13:44:35 -0700 Subject: [PATCH] Fix epoll TCP implementation (#3940) --- src/platform/datapath_epoll.c | 171 ++++++++++++++++--------------- src/platform/platform_internal.h | 5 + 2 files changed, 91 insertions(+), 85 deletions(-) diff --git a/src/platform/datapath_epoll.c b/src/platform/datapath_epoll.c index d86a1d6fb5..94c8965964 100644 --- a/src/platform/datapath_epoll.c +++ b/src/platform/datapath_epoll.c @@ -186,6 +186,11 @@ typedef struct CXPLAT_SEND_DATA { // uint8_t Buffer[CXPLAT_LARGE_IO_BUFFER_SIZE]; + // + // The total number of bytes buffer sent (only used for TCP). + // + uint32_t TotalBytesSent; + // // IO vectors used for sends on the socket. // @@ -1299,7 +1304,6 @@ CxPlatSocketCreateTcpInternal( { QUIC_STATUS Status; uint16_t PartitionIndex; - BOOLEAN IsServerSocket = RemoteAddress == NULL; CXPLAT_DBG_ASSERT(Datapath->TcpHandlers.Receive != NULL); @@ -1334,6 +1338,8 @@ CxPlatSocketCreateTcpInternal( } else { Binding->LocalAddress.Ip.sa_family = QUIC_ADDRESS_FAMILY_INET6; } + + Binding->RecvBufLen = Datapath->RecvBlockSize - Datapath->RecvBlockBufferOffset; PartitionIndex = RemoteAddress ? ((uint16_t)(CxPlatProcCurrentNumber() % Datapath->PartitionCount)) : 0; @@ -1360,16 +1366,6 @@ CxPlatSocketCreateTcpInternal( goto Exit; } - if (IsServerSocket) { - // - // The return value is being ignored here, as if a system does not support - // bpf we still want the server to work. If this happens, the sockets will - // round robin, but each flow will be sent to the same socket, just not - // based on RSS. - // - (void)CxPlatSocketConfigureRss(SocketContext, 1); - } - if (Type == CXPLAT_SOCKET_TCP_SERVER) { *NewBinding = Binding; Binding = NULL; @@ -1960,78 +1956,83 @@ CxPlatSocketReceiveMessages( } void -CxPlatSocketTcpRecvComplete( +CxPlatSocketReceiveTcpData( _In_ CXPLAT_SOCKET_CONTEXT* SocketContext ) { CXPLAT_DATAPATH_PARTITION* DatapathPartition = SocketContext->DatapathPartition; DATAPATH_RX_IO_BLOCK* IoBlock = NULL; - uint32_t RetryCount = 0; + // + // Read in a loop until the blocking error is encountered unless EOF or other failures + // are met. + // do { - IoBlock = CxPlatPoolAlloc(&DatapathPartition->RecvBlockPool); - } while (IoBlock == NULL && ++RetryCount < 10); - if (IoBlock == NULL) { - QuicTraceEvent( - AllocFailure, - "Allocation of '%s' failed. (%llu bytes)", - "DATAPATH_RX_IO_BLOCK", - 0); - goto Exit; - } + uint32_t RetryCount = 0; + do { + IoBlock = CxPlatPoolAlloc(&DatapathPartition->RecvBlockPool); + } while (IoBlock == NULL && ++RetryCount < 10); + if (IoBlock == NULL) { + QuicTraceEvent( + AllocFailure, + "Allocation of '%s' failed. (%llu bytes)", + "DATAPATH_RX_IO_BLOCK", + 0); + goto Exit; + } - IoBlock->OwningPool = &DatapathPartition->RecvBlockPool; - IoBlock->Route.State = RouteResolved; - IoBlock->Route.Queue = SocketContext; - IoBlock->RefCount = 0; + IoBlock->OwningPool = &DatapathPartition->RecvBlockPool; + IoBlock->Route.State = RouteResolved; + IoBlock->Route.Queue = SocketContext; + IoBlock->RefCount = 0; - uint8_t* Buffer = (uint8_t*)IoBlock + DatapathPartition->Datapath->RecvBlockBufferOffset; - int NumberOfBytesTransferred = read(SocketContext->SocketFd, Buffer, CXPLAT_LARGE_IO_BUFFER_SIZE); + uint8_t* Buffer = (uint8_t*)IoBlock + DatapathPartition->Datapath->RecvBlockBufferOffset; + int NumberOfBytesTransferred = read(SocketContext->SocketFd, Buffer, SocketContext->Binding->RecvBufLen); - if (NumberOfBytesTransferred == 0) { - if (!SocketContext->Binding->DisconnectIndicated) { - SocketContext->Binding->DisconnectIndicated = TRUE; - SocketContext->Binding->Datapath->TcpHandlers.Connect( + if (NumberOfBytesTransferred == 0) { + if (!SocketContext->Binding->DisconnectIndicated) { + SocketContext->Binding->DisconnectIndicated = TRUE; + SocketContext->Binding->Datapath->TcpHandlers.Connect( + SocketContext->Binding, + SocketContext->Binding->ClientContext, + FALSE); + } + goto Exit; + } else if (NumberOfBytesTransferred < 0) { + if (errno != EAGAIN && errno != EWOULDBLOCK) { + QuicTraceEvent( + DatapathErrorStatus, + "[data][%p] ERROR, %u, %s.", + SocketContext->Binding, + errno, + "read failed"); + } + goto Exit; + } else { + DATAPATH_RX_PACKET* Datagram = (DATAPATH_RX_PACKET*)(IoBlock + 1); + Datagram->IoBlock = IoBlock; + CXPLAT_RECV_DATA* Data = &Datagram->Data; + + Data->Next = NULL; + Data->Buffer = Buffer; + Data->BufferLength = NumberOfBytesTransferred; + Data->Route = &IoBlock->Route; + Data->PartitionIndex = SocketContext->DatapathPartition->PartitionIndex; + Data->TypeOfService = 0; + Data->Allocated = TRUE; + Data->Route->DatapathType = Data->DatapathType = CXPLAT_DATAPATH_TYPE_USER; + Data->QueuedOnConnection = FALSE; + IoBlock->RefCount++; + IoBlock = NULL; + + SocketContext->Binding->Datapath->TcpHandlers.Receive( SocketContext->Binding, SocketContext->Binding->ClientContext, - FALSE); + Data); } - goto Exit; - } else if (NumberOfBytesTransferred < 0) { - if (errno != EAGAIN && errno != EWOULDBLOCK) { - QuicTraceEvent( - DatapathErrorStatus, - "[data][%p] ERROR, %u, %s.", - SocketContext->Binding, - errno, - "read failed"); - } - goto Exit; - } else { - DATAPATH_RX_PACKET* Datagram = (DATAPATH_RX_PACKET*)(IoBlock + 1); - Datagram->IoBlock = IoBlock; - CXPLAT_RECV_DATA* Data = &Datagram->Data; - - Data->Next = NULL; - Data->Buffer = Buffer; - Data->BufferLength = NumberOfBytesTransferred; - Data->Route = &IoBlock->Route; - Data->PartitionIndex = SocketContext->DatapathPartition->PartitionIndex; - Data->TypeOfService = 0; - Data->Allocated = TRUE; - Data->Route->DatapathType = Data->DatapathType = CXPLAT_DATAPATH_TYPE_USER; - Data->QueuedOnConnection = FALSE; - IoBlock->RefCount++; - IoBlock = NULL; - - SocketContext->Binding->Datapath->TcpHandlers.Receive( - SocketContext->Binding, - SocketContext->Binding->ClientContext, - Data); - } + } while (TRUE); Exit: - if (IoBlock) { CxPlatPoolFree(&DatapathPartition->RecvBlockPool, IoBlock); } @@ -2049,7 +2050,7 @@ CxPlatSocketReceive( CxPlatSocketReceiveMessages(SocketContext); } } else { - CxPlatSocketTcpRecvComplete(SocketContext); + CxPlatSocketReceiveTcpData(SocketContext); } } @@ -2096,6 +2097,7 @@ SendDataAlloc( SendData->ClientBuffer.Buffer = SendData->Buffer; SendData->ClientBuffer.Length = 0; SendData->TotalSize = 0; + SendData->TotalBytesSent = 0; SendData->SegmentSize = (Socket->Type != CXPLAT_SOCKET_UDP || Socket->Datapath->Features & CXPLAT_DATAPATH_FEATURE_SEND_SEGMENTATION) @@ -2459,24 +2461,17 @@ CxPlatSendDataSendTcp( _In_ CXPLAT_SEND_DATA* SendData ) { - uint32_t TotalSize = SendData->TotalSize; - uint8_t *Buffer = SendData->Buffer; - while (TotalSize > 0) { + while (SendData->TotalSize > SendData->TotalBytesSent) { int BytesSent = send( SendData->SocketContext->SocketFd, - Buffer, - TotalSize, + SendData->Buffer + SendData->TotalBytesSent, + SendData->TotalSize - SendData->TotalBytesSent, 0); if (BytesSent < 0) { - // forcibly send inline - if (errno == EAGAIN || errno == EWOULDBLOCK) { - continue; - } return FALSE; } - Buffer += BytesSent; - TotalSize -= BytesSent; + SendData->TotalBytesSent += BytesSent; } return TRUE; @@ -2493,14 +2488,20 @@ CxPlatSendDataSend( QUIC_STATUS Status = QUIC_STATUS_SUCCESS; CXPLAT_SOCKET_CONTEXT* SocketContext = SendData->SocketContext; - BOOLEAN Success = - SocketType == CXPLAT_SOCKET_UDP ? + BOOLEAN Success; + + if (SocketType == CXPLAT_SOCKET_UDP) { + Success = #ifdef UDP_SEGMENT SendData->SegmentationSupported ? - CxPlatSendDataSendSegmented(SendData) : + CxPlatSendDataSendSegmented(SendData) : CxPlatSendDataSendMessages(SendData); +#else + CxPlatSendDataSendMessages(SendData); #endif - CxPlatSendDataSendMessages(SendData) : - CxPlatSendDataSendTcp(SendData); + } else { + Success = CxPlatSendDataSendTcp(SendData); + } + if (!Success) { if (errno == EAGAIN || errno == EWOULDBLOCK) { Status = QUIC_STATUS_PENDING; diff --git a/src/platform/platform_internal.h b/src/platform/platform_internal.h index 4d26f842ec..87d0c5ec5f 100644 --- a/src/platform/platform_internal.h +++ b/src/platform/platform_internal.h @@ -781,6 +781,11 @@ typedef struct CXPLAT_SOCKET { // uint16_t Mtu; + // + // The size of a receive buffer's payload. + // + uint32_t RecvBufLen; + // // Indicates the binding connected to a remote IP address. //