-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathrecorder_ring.c
288 lines (252 loc) · 10.3 KB
/
recorder_ring.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
// *****************************************************************************
// recorder_ring.c Recorder project
// *****************************************************************************
//
// File description:
//
// Implement common ring functionality
//
//
//
//
//
//
//
//
// *****************************************************************************
// This software is licensed under the GNU Lesser General Public License v2+
// (C) 2017-2020, Christophe de Dinechin <[email protected]>
// *****************************************************************************
// This file is part of Recorder
//
// Recorder is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 2 of the License, or
// (at your option) any later version.
//
// Recorder is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with Recorder, in a file named COPYING.
// If not, see <https://www.gnu.org/licenses/>.
// *****************************************************************************
#include "recorder_ring.h"
#include <stdlib.h>
#include <string.h>
typedef intptr_t ringdiff_t;
recorder_ring_p recorder_ring_init(recorder_ring_p ring,
size_t size, size_t item_size)
// ----------------------------------------------------------------------------
// Initialize a ring
// ----------------------------------------------------------------------------
{
ring->size = size;
ring->item_size = item_size;
ring->reader = 0;
ring->writer = 0;
ring->commit = 0;
ring->overflow = 0;
return ring;
}
#ifndef RECORDER_STANDALONE
recorder_ring_p recorder_ring_new(size_t size, size_t item_size)
// ----------------------------------------------------------------------------
// Create a new ring with the given name
// ----------------------------------------------------------------------------
{
recorder_ring_p ring = malloc(sizeof(recorder_ring_t) + size * item_size);
recorder_ring_init(ring, size, item_size);
return ring;
}
void recorder_ring_delete(recorder_ring_p ring)
// ----------------------------------------------------------------------------
// Delete the given ring from the list
// ----------------------------------------------------------------------------
{
free(ring);
}
#endif
extern size_t recorder_ring_readable(recorder_ring_p ring, ringidx_t *reader)
// ----------------------------------------------------------------------------
// Return number of elements readable in the ring
// ----------------------------------------------------------------------------
{
if (!reader)
reader = &ring->reader;
size_t readable = ring->commit - *reader;
if (readable > ring->size)
readable = ring->size;
return readable;
}
extern size_t recorder_ring_writable(recorder_ring_p ring)
// ----------------------------------------------------------------------------
// Return number of elements that can be written in the ring
// ----------------------------------------------------------------------------
{
const ringidx_t size = ring->size;
ringidx_t reader = ring->reader;
ringidx_t writer = ring->writer;
ringidx_t written = writer - reader;
ringidx_t writable = size - written - 1;
// Check if we overflowed
if (written >= size - 1)
writable = 0;
return writable;
}
void *recorder_ring_peek(recorder_ring_p ring)
// ----------------------------------------------------------------------------
// Peek the next entry that would be read in the ring and advance by 1
// ----------------------------------------------------------------------------
{
char *data = (char *) (ring + 1);
const size_t size = ring->size;
const size_t item_size = ring->item_size;
ringidx_t reader = ring->reader;
ringidx_t commit = ring->commit;
size_t written = commit - reader;
if (written >= size)
{
ringidx_t minR = commit - size + 1;
ringidx_t skip = minR - reader;
recorder_ring_add_fetch(ring->overflow, skip);
reader = recorder_ring_add_fetch(ring->reader, skip);
written = commit - reader;
}
return written ? data + reader % size * item_size : NULL;
}
ringidx_t recorder_ring_read(recorder_ring_p ring,
void *destination,
size_t count,
ringidx_t *reader_ptr,
recorder_ring_block_fn read_block,
recorder_ring_block_fn read_overflow)
// ----------------------------------------------------------------------------
// Ring up to 'count' elements, return number of elements read
// ----------------------------------------------------------------------------
// If enough data is available in the ring buffer, the elements read are
// guaranteed to be contiguous.
{
const size_t size = ring->size;
const size_t item_size = ring->item_size;
char *ptr = destination;
char *data = (char *) (ring + 1);
ringidx_t reader, writer, commit, available, to_copy;
ringidx_t first_reader, next_reader;
ringidx_t idx, to_end;
size_t this_round, byte_count;
if (!reader_ptr)
reader_ptr = &ring->reader;
// First commit to reading a given amount of contiguous data
do
{
reader = *reader_ptr;
commit = ring->commit;
writer = ring->writer;
available = commit - reader;
to_copy = count;
// Check if we want to copy more than available
if (to_copy > available)
if (!read_block || !read_block(ring, reader, reader + to_copy))
to_copy = available;
// Check if write may have overwritten beyond our read point
if (writer - reader >= size)
{
// If so, catch up
ringidx_t first_valid = writer - size + 1;
if (!read_overflow || !read_overflow(ring, reader, first_valid))
{
ringidx_t skip = first_valid - reader;
recorder_ring_add_fetch(ring->overflow, skip);
recorder_ring_add_fetch(*reader_ptr, skip);
reader = first_valid;
}
}
// Then copy data in contiguous memcpy chunks (normally at most two)
ptr = destination;
first_reader = reader;
next_reader = first_reader + to_copy;
while (to_copy)
{
// Compute how much we can copy in one memcpy
idx = reader % size;
to_end = size - idx;
this_round = to_copy < to_end ? to_copy : to_end;
byte_count = this_round * item_size;
// Copy data from buffer into destination
memcpy(ptr, data + idx * item_size, byte_count);
ptr += byte_count;
to_copy -= this_round;
reader += this_round;
}
} while (!recorder_ring_compare_exchange(*reader_ptr,
first_reader, next_reader));
// Return number of items effectively read
return count - to_copy;
}
ringidx_t recorder_ring_write(recorder_ring_p ring,
const void *source,
size_t count,
recorder_ring_block_fn write_block,
recorder_ring_block_fn commit_block,
ringidx_t *writer_ptr)
// ----------------------------------------------------------------------------
// Write 'count' elements from 'ptr' into 'rb', return entry idx
// ----------------------------------------------------------------------------
{
const size_t size = ring->size;
const size_t item_size = ring->item_size;
const char * ptr = source;
char * data = (char *) (ring + 1);
size_t to_copy = count;
ringidx_t reader, writer, idx, available, to_end, first_writer;
size_t this_round, byte_count;
// First commit to writing a given amount of contiguous data
do
{
reader = ring->reader;
writer = ring->writer;
available = size + reader - writer;
to_copy = count;
// Check if we want to copy more than can be written
if (to_copy > available)
if (write_block && !write_block(ring, writer, writer + to_copy))
to_copy = available;
} while (!recorder_ring_compare_exchange(ring->writer,
writer, writer + to_copy));
// Record first writer, to see if we will be the one committing
first_writer = writer;
if (writer_ptr)
*writer_ptr = writer;
// Then copy data in contiguous memcpy chunks (normally at most two)
while (to_copy)
{
// Compute how much we can copy in one memcpy
idx = writer % size;
to_end = size - idx;
this_round = to_copy < to_end ? to_copy : to_end;
byte_count = this_round * item_size;
// Copy data from buffer into destination
memcpy(data + idx * item_size, ptr, byte_count);
ptr += byte_count;
to_copy -= this_round;
writer += this_round;
}
// Commit buffer change, but only if commit is first_writer.
// Otherwise, some other write is still copying its data, we must spin.
ringidx_t expected = first_writer;
while (!recorder_ring_compare_exchange(ring->commit, expected, writer))
{
if (!commit_block || !commit_block(ring, ring->commit, first_writer))
{
// Skip forward
recorder_ring_fetch_add(ring->commit, writer - first_writer);
break;
}
expected = first_writer;
}
// Return number of items effectively written
return count - to_copy;
}