Skip to content

Commit

Permalink
Implement support for generic receive offload
Browse files Browse the repository at this point in the history
GRO allows a socket to opt in to having the kernel group together
packets with the same header information (source, destination, size etc)
into one jumbo packet. The original packet size is provided as ancillary
data to allow userspace to reconstruct the original packets. This can
provide around a 20% speedup.
  • Loading branch information
bmerry committed Feb 15, 2024
1 parent 83fcc26 commit 703f4c5
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 21 deletions.
7 changes: 7 additions & 0 deletions .ci/all-builds.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,11 @@ for python in true false; do
meson compile
done
done
# Similar recvmmsg and gro don't interact with either of the above
for recvmmsg in auto disabled; do
for gro in auto disabled; do
meson configure -Dpython=$python -Drecvmmsg=$recvmmsg -Dgro=$gro
meson compile
done
done
done
5 changes: 5 additions & 0 deletions doc/changelog.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
Changelog
=========

.. rubric:: Development version

- Speed up receiving UDP with the Linux kernel network stack by using
generic receive offload (GRO).

.. rubric:: 4.3.1

- Switch from netifaces to netifaces2 for testing.
Expand Down
3 changes: 2 additions & 1 deletion include/spead2/common_features.h.in
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/* Copyright 2015, 2017, 2019-2020, 2023 National Research Foundation (SARAO)
/* Copyright 2015, 2017, 2019-2020, 2023-2024 National Research Foundation (SARAO)
*
* This program is free software: you can redistribute it and/or modify it under
* the terms of the GNU Lesser General Public License as published by the Free
Expand Down Expand Up @@ -34,6 +34,7 @@
#define SPEAD2_USE_RECVMMSG @SPEAD2_USE_RECVMMSG@
#define SPEAD2_USE_SENDMMSG @SPEAD2_USE_SENDMMSG@
#define SPEAD2_USE_GSO @SPEAD2_USE_GSO@
#define SPEAD2_USE_GRO @SPEAD2_USE_GRO@
#define SPEAD2_USE_EVENTFD @SPEAD2_USE_EVENTFD@
#define SPEAD2_USE_PTHREAD_SETAFFINITY_NP @SPEAD2_USE_PTHREAD_SETAFFINITY_NP@
#define SPEAD2_USE_FMV @SPEAD2_USE_FMV@
Expand Down
25 changes: 19 additions & 6 deletions include/spead2/recv_udp.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/* Copyright 2015, 2020, 2023 National Research Foundation (SARAO)
/* Copyright 2015, 2020, 2023-2024 National Research Foundation (SARAO)
*
* This program is free software: you can redistribute it and/or modify it under
* the terms of the GNU Lesser General Public License as published by the Free
Expand Down Expand Up @@ -43,8 +43,22 @@ namespace spead2::recv
class udp_reader : public udp_reader_base
{
private:
#if SPEAD2_USE_RECVMMSG
struct msg_buffer
{
/// Buffer for asynchronous receive, of size @a max_size + 1 (or larger if GRO is enabled).
std::unique_ptr<std::uint8_t[]> data;
/// Scatter-gather array
iovec iov[1];
#if SPEAD2_USE_GRO
/// Ancillary data for GRO (segment size)
alignas(cmsghdr) char control[CMSG_SPACE(sizeof(int))];
#endif
};
#endif

/* Note: declaration order is import for correct destruction
* (the stream must be closed before we start destroying buffers).
* (the socket must be closed before we start destroying buffers).
*/

/// Endpoint to bind during @ref start()
Expand All @@ -54,12 +68,11 @@ class udp_reader : public udp_reader_base
/// Maximum packet size we will accept
std::size_t max_size;
#if SPEAD2_USE_RECVMMSG
/// Buffer for asynchronous receive, of size @a max_size + 1.
std::vector<std::unique_ptr<std::uint8_t[]>> buffer;
/// Scatter-gather array for each buffer
std::vector<iovec> iov;
std::vector<msg_buffer> buffers;
/// recvmmsg control structures
std::vector<mmsghdr> msgvec;
/// If true, generic receive offload is enabled on the socket
bool use_gro;
#else
/// Buffer for asynchronous receive, of size @a max_size + 1.
std::unique_ptr<std::uint8_t[]> buffer;
Expand Down
10 changes: 9 additions & 1 deletion meson.build
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2023 National Research Foundation (SARAO)
# Copyright 2023-2024 National Research Foundation (SARAO)
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
Expand Down Expand Up @@ -167,6 +167,13 @@ use_gso = get_option('gso').require(
prefix : '#include <netinet/udp.h>'
) != ''
).allowed()
use_gro = get_option('gro').require(
compiler.get_define(
'UDP_GRO',
args : '-D_GNU_SOURCE',
prefix : '#include <netinet/udp.h>'
) != ''
).allowed()
use_eventfd = get_option('eventfd').require(
compiler.has_function(
'eventfd',
Expand Down Expand Up @@ -268,6 +275,7 @@ conf.set10('SPEAD2_USE_MLX5DV', mlx5_dep.found())
conf.set10('SPEAD2_USE_RECVMMSG', use_recvmmsg)
conf.set10('SPEAD2_USE_SENDMMSG', use_sendmmsg)
conf.set10('SPEAD2_USE_GSO', use_gso)
conf.set10('SPEAD2_USE_GRO', use_gro)
conf.set10('SPEAD2_USE_EVENTFD', use_eventfd)
conf.set10('SPEAD2_USE_POSIX_SEMAPHORES', use_posix_semaphores)
conf.set10('SPEAD2_USE_PTHREAD_SETAFFINITY_NP', use_pthread_setaffinity_np)
Expand Down
3 changes: 2 additions & 1 deletion meson.options
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2023 National Research Foundation (SARAO)
# Copyright 2023-2024 National Research Foundation (SARAO)
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
Expand All @@ -22,6 +22,7 @@ option('cap', type : 'feature', description : 'Use libcap')
option('recvmmsg', type : 'feature', description : 'Use recvmmsg system call')
option('sendmmsg', type : 'feature', description : 'Use sendmmsg system call')
option('gso', type : 'feature', description : 'Use generic segmentation offload')
option('gro', type : 'feature', description : 'Use generic receive offload')
option('eventfd', type : 'feature', description : 'Use eventfd system call for semaphores')
option('posix_semaphores', type : 'feature', description : 'Use POSIX semaphores')
option('pthread_setaffinity_np', type : 'feature', description : 'Use pthread_setaffinity_np to set thread affinity')
Expand Down
80 changes: 68 additions & 12 deletions src/recv_udp.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/* Copyright 2015, 2019-2020, 2023 National Research Foundation (SARAO)
/* Copyright 2015, 2019-2020, 2023-2024 National Research Foundation (SARAO)
*
* This program is free software: you can redistribute it and/or modify it under
* the terms of the GNU Lesser General Public License as published by the Free
Expand Down Expand Up @@ -26,6 +26,7 @@
# include <sys/socket.h>
# include <sys/types.h>
# include <unistd.h>
# include <netinet/udp.h>
#endif
#include <system_error>
#include <cstdint>
Expand All @@ -51,23 +52,38 @@ udp_reader::udp_reader(
std::size_t max_size)
: udp_reader_base(owner), max_size(max_size),
#if SPEAD2_USE_RECVMMSG
buffer(mmsg_count), iov(mmsg_count), msgvec(mmsg_count),
buffers(mmsg_count), msgvec(mmsg_count), use_gro(false),
#else
buffer(new std::uint8_t[max_size + 1]),
#endif
socket(std::move(socket))
{
assert(socket_uses_io_service(this->socket, get_io_service()));
#if SPEAD2_USE_RECVMMSG
// Allocate one extra byte so that overflow can be detected.
size_t buffer_size = max_size + 1;
#if SPEAD2_USE_GRO
{
int enable = 1;
if (setsockopt(this->socket.native_handle(), IPPROTO_UDP, UDP_GRO,
&enable, sizeof(enable)) == 0)
{
use_gro = true;
buffer_size = 65536;
}
}
#endif
for (std::size_t i = 0; i < mmsg_count; i++)
{
// Allocate one extra byte so that overflow can be detected
buffer[i].reset(new std::uint8_t[max_size + 1]);
iov[i].iov_base = (void *) buffer[i].get();
iov[i].iov_len = max_size + 1;
buffers[i].data.reset(new std::uint8_t[buffer_size]);
buffers[i].iov[0].iov_base = (void *) buffers[i].data.get();
buffers[i].iov[0].iov_len = buffer_size;
std::memset(&msgvec[i], 0, sizeof(msgvec[i]));
msgvec[i].msg_hdr.msg_iov = &iov[i];
msgvec[i].msg_hdr.msg_iov = buffers[i].iov;
msgvec[i].msg_hdr.msg_iovlen = 1;
/* msg_control and msg_controllen are initialised just before each
* recvmmsg call, since they're modified by CMSG_NXTHDR.
*/
}
#endif
}
Expand Down Expand Up @@ -192,6 +208,16 @@ void udp_reader::packet_handler(
if (!error)
{
#if SPEAD2_USE_RECVMMSG
#if SPEAD2_USE_GRO
if (use_gro)
{
for (std::size_t i = 0; i < msgvec.size(); i++)
{
msgvec[i].msg_hdr.msg_control = &buffers[i].control;
msgvec[i].msg_hdr.msg_controllen = sizeof(buffers[i].control);
}
}
#endif
int received = recvmmsg(socket.native_handle(), msgvec.data(), msgvec.size(),
MSG_DONTWAIT, nullptr);
log_debug("recvmmsg returned %1%", received);
Expand All @@ -200,12 +226,42 @@ void udp_reader::packet_handler(
std::error_code code(errno, std::system_category());
log_warning("recvmmsg failed: %1% (%2%)", code.value(), code.message());
}
for (int i = 0; i < received; i++)
bool stopped = false;
for (int i = 0; i < received && !stopped; i++)
{
bool stopped = process_one_packet(state,
buffer[i].get(), msgvec[i].msg_len, max_size);
if (stopped)
break;
#if SPEAD2_USE_GRO
if (use_gro)
{
int seg_size = -1;
for (cmsghdr *cmsg = CMSG_FIRSTHDR(&msgvec[i].msg_hdr);
cmsg != nullptr;
cmsg = CMSG_NXTHDR(&msgvec[i].msg_hdr, cmsg))
{
if (cmsg->cmsg_level == SOL_UDP && cmsg->cmsg_type == UDP_GRO)
{
std::memcpy(&seg_size, CMSG_DATA(cmsg), sizeof(seg_size));
break;
}
}
if (seg_size > 0)
{
for (unsigned int offset = 0;
offset < msgvec[i].msg_len && !stopped;
offset += seg_size)
{
unsigned int msg_len = std::min((unsigned int) seg_size,
msgvec[i].msg_len - offset);
stopped = process_one_packet(state,
buffers[i].data.get() + offset,
msg_len,
max_size);
}
continue; // Skip the non-GRO code below
}
}
#endif // SPEAD2_USE_GRO
stopped = process_one_packet(state,
buffers[i].data.get(), msgvec[i].msg_len, max_size);
}
#else
process_one_packet(state, buffer.get(), bytes_transferred, max_size);
Expand Down

0 comments on commit 703f4c5

Please sign in to comment.