-
Notifications
You must be signed in to change notification settings - Fork 20
/
Copy pathshmemarray.py
104 lines (83 loc) · 3.46 KB
/
shmemarray.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
#
# Based on multiprocessing.sharedctypes.RawArray
#
# Uses posix_ipc (http://semanchuk.com/philip/posix_ipc/) to allow shared ctypes arrays
# among unrelated processors
#
# Usage Notes:
# * The first two args (typecode_or_type and size_or_initializer) should work the same as with RawArray.
# * The shared array is accessible by any process, as long as tag matches.
# * The shared memory segment is unlinked when the origin array (that returned
# by ShmemRawArray(..., create=True)) is deleted/gc'ed
# * Creating an shared array using a tag that currently exists will raise an ExistentialError
# * Accessing a shared array using a tag that doesn't exist (or one that has been unlinked) will also
# raise an ExistentialError
#
# Author: Shawn Chin (http://shawnchin.github.com)
#
# Edited for python 3 by: Adam Stooke
#
import numpy as np
# import os
import sys
import mmap
import ctypes
import posix_ipc
# from _multiprocessing import address_of_buffer # (not in python 3)
from string import ascii_letters, digits
valid_chars = frozenset("/-_. %s%s" % (ascii_letters, digits))
typecode_to_type = {
'c': ctypes.c_char, 'u': ctypes.c_wchar,
'b': ctypes.c_byte, 'B': ctypes.c_ubyte,
'h': ctypes.c_short, 'H': ctypes.c_ushort,
'i': ctypes.c_int, 'I': ctypes.c_uint,
'l': ctypes.c_long, 'L': ctypes.c_ulong,
'f': ctypes.c_float, 'd': ctypes.c_double
}
def address_of_buffer(buf): # (python 3)
return ctypes.addressof(ctypes.c_char.from_buffer(buf))
class ShmemBufferWrapper(object):
def __init__(self, tag, size, create=True):
# default vals so __del__ doesn't fail if __init__ fails to complete
self._mem = None
self._map = None
self._owner = create
self.size = size
assert 0 <= size < sys.maxsize # sys.maxint (python 3)
flag = (0, posix_ipc.O_CREX)[create]
self._mem = posix_ipc.SharedMemory(tag, flags=flag, size=size)
self._map = mmap.mmap(self._mem.fd, self._mem.size)
self._mem.close_fd()
def get_address(self):
# addr, size = address_of_buffer(self._map)
# assert size == self.size
assert self._map.size() == self.size # (changed for python 3)
addr = address_of_buffer(self._map)
return addr
def __del__(self):
if self._map is not None:
self._map.close()
if self._mem is not None and self._owner:
self._mem.unlink()
def ShmemRawArray(typecode_or_type, size_or_initializer, tag, create=True):
assert frozenset(tag).issubset(valid_chars)
if tag[0] != "/":
tag = "/%s" % (tag,)
type_ = typecode_to_type.get(typecode_or_type, typecode_or_type)
if isinstance(size_or_initializer, int):
type_ = type_ * size_or_initializer
else:
type_ = type_ * len(size_or_initializer)
buffer = ShmemBufferWrapper(tag, ctypes.sizeof(type_), create=create)
obj = type_.from_address(buffer.get_address())
obj._buffer = buffer
if not isinstance(size_or_initializer, int):
obj.__init__(*size_or_initializer)
return obj
###############################################################################
# New Additions (by Adam) #
def NpShmemArray(dtype, shape, tag, create=True):
size = int(np.prod(shape))
nbytes = size * np.dtype(dtype).itemsize
shmem = ShmemRawArray(ctypes.c_char, nbytes, tag, create)
return np.frombuffer(shmem, dtype=dtype, count=size).reshape(shape)