-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathgasnet_address_space.cc
255 lines (201 loc) · 6.72 KB
/
gasnet_address_space.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
#include "address_space.h"
#include <cassert>
#include <iostream>
#include <sstream>
#include <sys/mman.h>
#include <sys/syscall.h>
#include <sys/types.h>
#include <gasnet.h>
#include <gasnet_tools.h>
#include "common.h"
class GASNetNodeImpl : public Node {
public:
GASNetNodeImpl(gasnet_node_t node, void *base, uintptr_t size) :
node_(node), base_((char*)base), size_(size)
{}
size_t size() {
return size_;
}
void read(void *dst, void *src, size_t len) {
char *abs_src = base_ + (uintptr_t)src;
assert((abs_src + len - 1) < (base_ + size_));
gasnet_get_bulk(dst, node_, abs_src, len);
}
void write(void *dst, void *src, size_t len) {
char *abs_dst = base_ + (uintptr_t)dst;
assert((abs_dst + len - 1) < (base_ + size_));
gasnet_put_bulk(node_, abs_dst, src, len);
}
void aio_read(group_io_handle_t handle, void *dst, void *src, size_t len) {
char *abs_src = base_ + (uintptr_t)src;
assert((abs_src + len - 1) < (base_ + size_));
gasnet_handle_t h = gasnet_get_nb_bulk(dst, node_, abs_src, len);
auto handles = static_cast<std::vector<gasnet_handle_t>*>(handle);
handles->push_back(h);
}
void aio_write(group_io_handle_t handle, void *dst, void *src, size_t len) {
char *abs_dst = base_ + (uintptr_t)dst;
assert((abs_dst + len - 1) < (base_ + size_));
gasnet_handle_t h = gasnet_put_nb_bulk(node_, abs_dst, src, len);
auto handles = static_cast<std::vector<gasnet_handle_t>*>(handle);
handles->push_back(h);
}
private:
gasnet_node_t node_;
char *base_;
uintptr_t size_;
};
Node::group_io_handle_t GASNetAddressSpace::group_io_start()
{
auto handles = new std::vector<gasnet_handle_t>;
handles->reserve(4);
return handles;
}
void GASNetAddressSpace::group_io_wait(Node::group_io_handle_t handle)
{
auto handles = static_cast<std::vector<gasnet_handle_t>*>(handle);
size_t num_handles = handles->size();
if (num_handles)
gasnet_wait_syncnb_all(handles->data(), num_handles);
delete handles;
}
#ifdef GASNET_SEGMENT_EVERYTHING
static gasnet_seginfo_t *segments_ptr;
static int seginfo_done;
static gasnet_hsl_t seginfo_lock;
#define AM_SEGINFO 128
static void AM_seginfo(gasnet_token_t token, void *buf, size_t nbytes)
{
gasnet_node_t src;
GASNET_SAFE(gasnet_AMGetMsgSource(token, &src));
gasnet_seginfo_t contrib;
assert(nbytes == sizeof(contrib));
memcpy(&contrib, buf, sizeof(contrib));
// barrier: seginfo_done is checked without lock
gasnett_local_wmb();
gasnet_hsl_lock(&seginfo_lock);
segments_ptr[src] = contrib;
seginfo_done++;
gasnet_hsl_unlock(&seginfo_lock);
}
int GASNetAddressSpace::init(int *argc, char ***argv,
struct gassyfs_opts *opts)
{
std::cout << "gasnet segment = everything" << std::endl;
// handler for sending segment info to rank 0
gasnet_handlerentry_t handlers[1];
handlers[0].index = AM_SEGINFO;
handlers[0].fnptr = (void(*)())AM_seginfo;
// segment info ignored for gasnet-everything
GASNET_SAFE(gasnet_attach(handlers, 1, 0, 0));
gasnet_seginfo_t segments[gasnet_nodes()];
GASNET_SAFE(gasnet_getSegmentInfo(segments, gasnet_nodes()));
gasnet_hsl_init(&seginfo_lock);
segments_ptr = segments;
seginfo_done = 0;
// synchronized everyone before sending AMs so rank 0 initialization of
// segments structure above doesn't race. could also hold lock above...
gasnet_barrier_notify(0, GASNET_BARRIERFLAG_ANONYMOUS);
gasnet_barrier_wait(0, GASNET_BARRIERFLAG_ANONYMOUS);
/*
* currently rank 0 also contributes memory. We want to avoid this in the
* future because rank 0 should have a large kernel cache for the FUSE
* mount. But we over ride the decision if there is only one node in the
* cluster.
*/
if (!opts->rank0_alloc) {
if (gasnet_nodes() == 1)
opts->rank0_alloc = 1;
}
// heap contribution
gasnet_seginfo_t contrib;
contrib.addr = 0;
contrib.size = 0;
// if not rank 0 or alloc on rank 0 is OK
if (gasnet_mynode() || opts->rank0_alloc) {
const size_t size = opts->heap_size << 20;
void *data = mmap(NULL, size, PROT_READ|PROT_WRITE,
MAP_PRIVATE|MAP_ANONYMOUS, -1, 0);
assert(data != MAP_FAILED);
contrib.addr = data;
contrib.size = size;
}
// notify rank 0 of contribution (rank 0 sends to self)
GASNET_SAFE(gasnet_AMRequestMedium0(0, AM_SEGINFO,
&contrib, sizeof(contrib)));
// everyone but rank 0 snoozes now
if (gasnet_mynode()) {
gasnet_barrier_notify(0, GASNET_BARRIERFLAG_ANONYMOUS);
gasnet_barrier_wait(0, GASNET_BARRIERFLAG_ANONYMOUS);
gasnet_exit(0);
assert(0);
return 0;
}
// wait for nodes to report on contribution
GASNET_BLOCKUNTIL((seginfo_done == gasnet_nodes()));
gasnet_hsl_lock(&seginfo_lock);
assert(seginfo_done == gasnet_nodes());
for (int i = 0; i < gasnet_nodes(); i++) {
gasnet_seginfo_t *s = &segments[i];
if (s->size == 0)
continue;
GASNetNodeImpl *node = new GASNetNodeImpl(i,
s->addr, s->size);
nodes_.push_back(node);
}
gasnet_hsl_unlock(&seginfo_lock);
assert(gasnet_mynode() == 0);
return 0;
}
#else
/*
* When GASNet is configured with segment-[fast|large] then GASNet will
* allocate and register memory segments automatically.
*/
int GASNetAddressSpace::init(int *argc, char ***argv,
struct gassyfs_opts *opts)
{
// how much this rank will try to allocate
size_t segsz = gasnet_getMaxLocalSegmentSize();
/*
* Rank 0 allocation can be disabled, but that configuration is overridden
* when there is only one node in the GASNet cluster.
*/
if (!opts->rank0_alloc) {
if (gasnet_nodes() == 1)
opts->rank0_alloc = 1;
else if (gasnet_mynode() == 0)
segsz = 0;
}
GASNET_SAFE(gasnet_attach(NULL, 0, segsz, 0));
gasnet_seginfo_t segments[gasnet_nodes()];
GASNET_SAFE(gasnet_getSegmentInfo(segments, gasnet_nodes()));
//std::cout << "before service loop: " << gasnet_mynode() << std::endl;
// all nodes except rank 0 can start serving memory
if (gasnet_mynode()) {
//std::cout << "enter service loop: " << gasnet_mynode() << std::endl;
gasnet_barrier_notify(0, GASNET_BARRIERFLAG_ANONYMOUS);
gasnet_barrier_wait(0, GASNET_BARRIERFLAG_ANONYMOUS);
//std::cout << "exit service loop: " << gasnet_mynode() << std::endl;
gasnet_exit(0);
assert(0);
return 0;
}
//std::cout << "after service loop: " << gasnet_mynode() << std::endl;
size_t total = 0;
size_t num_nodes = 0;
for (int i = 0; i < gasnet_nodes(); i++) {
gasnet_seginfo_t *s = &segments[i];
if (s->size == 0)
continue;
GASNetNodeImpl *node = new GASNetNodeImpl(i,
s->addr, s->size);
nodes_.push_back(node);
total += node->size();
num_nodes++;
}
opts->heap_size = (total >> 20) / num_nodes;
assert(gasnet_mynode() == 0);
return 0;
}
#endif