Skip to content

Commit

Permalink
Update sender for memory pool tutorial
Browse files Browse the repository at this point in the history
This should probably be a tutorial on its own.
  • Loading branch information
bmerry committed Nov 30, 2023
1 parent 7b68631 commit 18c0443
Show file tree
Hide file tree
Showing 5 changed files with 419 additions and 0 deletions.
193 changes: 193 additions & 0 deletions doc/tut-7-recv-memory-pool.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
Memory pools
============
To demonstrate more features of spead2, we'll need to experiment with different
heap sizes. Instead of editing the hard-coded value, let's introduce more
command-line options. The modified sender code can be found in
:file:`examples/tut_7_send.py` and :file:`examples/tut_7_send.cpp` in the
spead2 repository. Don't forget to delete the original definitions of
``heap_size`` and ``n_heaps``.

.. tab-set-code::

.. code-block:: python
async def main():
parser = argparse.ArgumentParser()
parser.add_argument("-H", "--heap-size", type=int, default=1024 * 1024)
parser.add_argument("-n", "--heaps", type=int, default=10000)
parser.add_argument("host", type=str)
parser.add_argument("port", type=int)
args = parser.parse_args()
heap_size = args.heap_size
n_heaps = args.heaps
.. code-block:: c++

#include <unistd.h>
...
int main(int argc, char * const argv[])
{
int opt;
std::int64_t heap_size = 1024 * 1024;
int n_heaps = 10000;
while ((opt = getopt(argc, argv, "H:n:")) != -1)
{
switch (opt)
{
case 'H':
heap_size = std::stoll(optarg);
break;
case 'n':
n_heaps = std::stoi(optarg);
break;
default:
std::cerr << "Usage: " << argv[0] << " [-H heap-size] [-n heaps] host port\n";
return 2;
}
}
if (argc - optind != 2)
{
std::cerr << "Usage: " << argv[0] << " [-H heap-size] [-n heaps] host port\n";
return 2;
}
...
boost::asio::ip::udp::endpoint endpoint(
boost::asio::ip::address::from_string(argv[optind]),
std::atoi(argv[optind + 1])
);

As with previous versions of the sender, the command-line parsing in C++ is
not very robust to user mistakes.

For this rest of this section we'll pass the options ``-n 100 -H 67108864`` to
use 64 MiB heaps (and reduce the number of heaps to speed things up). First
let us see what impact it has on the sender in isolation (this assumes you
have set up the dummy network interface as in :doc:`tut-5-send-pktsize`.

.. code-block:: sh
tut_7_send -n 100 -H 67108864 192.168.31.2 8888
The performance is worse: significantly so in the C++ case (I get around 1900
MB/s for C++ and 3100 MB/s for Python). This is somewhat surprising, because
with bigger heaps should mean that per-heap overheads are reduced, just like
increasing the packet size reduced the per-packet overheads. There are (at
least) two things going on here:

1. Caching. My CPU has a 1.5 MiB L2 cache (per core) and a 12 MiB L3 cache.
The heap no longer fits into either of them, and so cache misses are
substantially increased. In Python, the original command (1 MiB heaps)
missed on 0.42% of L3 cache loads, while this new command misses on 6.4% of
L3 cache loads.

2. Memory allocation. When the application allocates memory to hold the data
for a heap, the underlying library can do it in one of two ways: it can
either hand out some memory that it has previously requested from the
operating system but which isn't in use, or it can request new memory from
the operating system. In the latter case, Linux will provide a virtual
memory address, but it won't actually allocate the physical memory.
Instead, the first time each page is accessed, a page fault will occur, and
the kernel will allocate a page of physical memory and zero it out. Page
faults are expensive, so if this happens for every heap it becomes
expensive.

In Glibc (the standard C library on most Linux distributions) the memory
allocator uses heuristics to try to avoid this. However, for allocations
bigger than 32 MiB (at the time of writing) it will always request memory
directly from the operating system, and return to directly to the operating
system when it is freed. That is why we see such poor performance with our
64 MiB heaps.

In numpy the situation is slightly different: it is also obtaining the
memory from the operating system, but it uses a hint to request that the
memory is backed by "huge pages" (2 MiB pages on x86_64, compared to the
default of 4 kiB pages). Since it takes far fewer pages to provide the
physical memory, there are fewer page faults, and performance suffers less
as a result.

We can't do anything about the caching problem [#cache-size-heaps]_, but we can
rewrite our code to avoid doing memory allocation on every iteration. We'll do
that by re-using our state class, but instead of creating a new one each
iteration, we'll keep a pool of two of them and alternate between them
(so-called "double-buffer").

In general when we start to fill in the data for a heap we need to make sure
that previous asynchronous use of that heap has completed (by waiting for a
corresponding future), but the first time each heap gets used is special. To
avoid having to deal with special cases, we can set things up with a future
that is already complete.

.. tab-set-code::

.. code-block:: python
@dataclass
class State:
adc_samples: np.ndarray
future: asyncio.Future[int] = field(default_factory=asyncio.Future)
def __post_init__(self):
# Make it safe to wait on the future immediately
self.future.set_result(0)
.. code-block:: c++

struct state
{
...
state()
{
// Make it safe to wait on the future immediately
std::promise<spead2::item_pointer_t> promise;
promise.set_value(0);
future = promise.get_future();
}
};

Now we can get rid of ``old_state`` and ``new_state``, and instead use an
array of states.

.. tab-set-code::

.. code-block:: python
:dedent: 0
states = [State(adc_samples=np.ones(heap_size, np.int8)) for _ in range(2)]
for i in range(n_heaps):
state = states[i % len(states)]
await state.future # Wait for any previous use of this state to complete
state.adc_samples.fill(i)
item_group["timestamp"].value = i * heap_size
item_group["adc_samples"].value = state.adc_samples
heap = item_group.get_heap()
state.future = stream.async_send_heap(heap)
for state in states:
await state.future
.. code-block:: c++
:dedent: 0

std::array<state, 2> states;
for (auto &state : states)
state.adc_samples.resize(heap_size);
for (int i = 0; i < n_heaps; i++)
{
auto &state = states[i % states.size()];
// Wait for any previous use of this state to complete
state.future.wait();
auto &heap = state.heap;
auto &adc_samples = state.adc_samples;

heap = spead2::send::heap(); // reset to default state
// Fill with the heap number
std::fill(adc_samples.begin(), adc_samples.end(), i);
// Add descriptors to the first heap
...
state.future = stream.async_send_heap(heap, boost::asio::use_future);
}
for (const auto &state : states)
state.future.wait();

.. [#cache-size-heaps] For this reason, it's generally a good idea to design
your applications around a heap size that's small enough to fit into the L2
cache.
1 change: 1 addition & 0 deletions doc/tutorial.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ behaviour that's pointed out will be different.
tut-4-send-pipeline
tut-5-send-pktsize
tut-6-recv-power
tut-7-recv-memory-pool
1 change: 1 addition & 0 deletions examples/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ foreach name : [
'tut_4_send_pipeline',
'tut_5_send_pktsize',
'tut_6_recv_power',
'tut_7_send',
]
executable(name, name + '.cpp', dependencies : [st_dep])
endforeach
Expand Down
138 changes: 138 additions & 0 deletions examples/tut_7_send.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/* Copyright 2023 National Research Foundation (SARAO)
*
* This program is free software: you can redistribute it and/or modify it under
* the terms of the GNU Lesser General Public License as published by the Free
* Software Foundation, either version 3 of the License, or (at your option) any
* later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
* details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

#include <cstdint>
#include <string>
#include <vector>
#include <utility>
#include <chrono>
#include <memory>
#include <iostream>
#include <unistd.h>
#include <boost/asio.hpp>
#include <spead2/common_defines.h>
#include <spead2/common_thread_pool.h>
#include <spead2/send_heap.h>
#include <spead2/send_stream_config.h>
#include <spead2/send_udp.h>

struct state
{
std::future<spead2::item_pointer_t> future;
std::vector<std::int8_t> adc_samples;
spead2::send::heap heap;

state()
{
// Make it safe to wait on the future immediately
std::promise<spead2::item_pointer_t> promise;
promise.set_value(0);
future = promise.get_future();
}
};

int main(int argc, char * const argv[])
{
int opt;
std::int64_t heap_size = 1024 * 1024;
int n_heaps = 10000;
while ((opt = getopt(argc, argv, "H:n:")) != -1)
{
switch (opt)
{
case 'H':
heap_size = std::stoll(optarg);
break;
case 'n':
n_heaps = std::stoi(optarg);
break;
default:
std::cerr << "Usage: " << argv[0] << " [-H heap-size] [-n heaps] host port\n";
return 2;
}
}
if (argc - optind != 2)
{
std::cerr << "Usage: " << argv[0] << " [-H heap-size] [-n heaps] host port\n";
return 2;
}

spead2::thread_pool thread_pool;
spead2::send::stream_config config;
config.set_rate(0.0);
config.set_max_heaps(2);
config.set_max_packet_size(9000);
boost::asio::ip::udp::endpoint endpoint(
boost::asio::ip::address::from_string(argv[optind]),
std::atoi(argv[optind + 1])
);
spead2::send::udp_stream stream(thread_pool, {endpoint}, config);

spead2::descriptor timestamp_desc;
timestamp_desc.id = 0x1600;
timestamp_desc.name = "timestamp";
timestamp_desc.description = "Index of the first sample";
timestamp_desc.format.emplace_back('u', spead2::flavour().get_heap_address_bits());
spead2::descriptor adc_samples_desc;
adc_samples_desc.id = 0x3300;
adc_samples_desc.name = "adc_samples";
adc_samples_desc.description = "ADC converter output";
adc_samples_desc.numpy_header =
"{'shape': (" + std::to_string(heap_size) + ",), 'fortran_order': False, 'descr': 'i1'}";

auto start = std::chrono::high_resolution_clock::now();
std::array<state, 2> states;
for (auto &state : states)
state.adc_samples.resize(heap_size);
for (int i = 0; i < n_heaps; i++)
{
auto &state = states[i % states.size()];
// Wait for any previous use of this state to complete
state.future.wait();
auto &heap = state.heap;
auto &adc_samples = state.adc_samples;

heap = spead2::send::heap(); // reset to default state
// Fill with the heap number
std::fill(adc_samples.begin(), adc_samples.end(), i);
// Add descriptors to the first heap
if (i == 0)
{
heap.add_descriptor(timestamp_desc);
heap.add_descriptor(adc_samples_desc);
}
// Add the data and timestamp to the heap
heap.add_item(timestamp_desc.id, i * heap_size);
heap.add_item(
adc_samples_desc.id,
adc_samples.data(),
adc_samples.size() * sizeof(adc_samples[0]),
true
);
state.future = stream.async_send_heap(heap, boost::asio::use_future);
}
for (const auto &state : states)
state.future.wait();
auto elapsed = std::chrono::duration_cast<std::chrono::duration<double>>(
std::chrono::high_resolution_clock::now() - start);
std::cout << heap_size * n_heaps / elapsed.count() / 1e6 << " MB/s\n";

// Send an end-of-stream control item
spead2::send::heap heap;
heap.add_end();
stream.async_send_heap(heap, boost::asio::use_future).get();
return 0;
}
Loading

0 comments on commit 18c0443

Please sign in to comment.