-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhpc_ds_dsclient.py
213 lines (177 loc) · 6.17 KB
/
hpc_ds_dsclient.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from enum import Enum
from hpc_ds_types import Point3D, Block6D, DatastoreAccess, VOXEL_TYPES, \
MAX_URL_LEN
from time import time
import struct
import requests
class DatasetServerClient(object):
header="!lll"
binary_headers = { "Content-Type": "application/octet-stream" }
def __init__(self, base_url, regs_client):
#TODO: Use credentials from regs_client
self.base_url = base_url
self.regs_client = regs_client
self.info = self.fetch_info()
self.voxel_type = None
self.block_fmt = None
def fetch_info(self):
result = requests.get(self.base_url)
if result is not None and int(result.status_code / 100) == 2:
return result.json()
def is_running(self):
"""Check if dataset server is still running or it has timed out"""
return (self.info['serverTimeout'] < 0 or
time() < self.regs_client.expires)
@property
def can_read(self):
"""Check if dataset server info allows reading of data"""
return self.info['mode'].find(DatastoreAccess.READ.name) >= 0
@property
def can_write(self):
"""Check if dataset server info allows writing of data"""
return self.info['mode'].find(DatastoreAccess.WRITE.name) >= 0
@property
def datatype(self):
"""Return default datatype for time=0, channel=0 and angle=0"""
return self.get_datatype()
def get_datatype(self, time=0, channel=0, angle=0):
"""Get datatype for given time, channel and angle combination
:type time: int
:param time: Time dimension of data
:type channel: int
:param channel: Used channel (e.g. red) of data
:type angle: int
:param angle: Angle of data for rotating slices in 3D
:rtype: str
:return: Name of datatype sizes mappable with VOXEL_TYPES
"""
url = self.base_url + ("datatype/%i/%i/%i" % (time, channel, angle))
result = requests.get(url)
if result is not None and int(result.status_code / 100) == 2:
return result.text
return None
def init_block_fmt(self):
if self.voxel_type is None:
self.voxel_type = self.datatype
self.block_fmt = DatasetServerClient.header + "%u" \
+ VOXEL_TYPES[self.voxel_type]
def read_block(self, block_coords):
"""Request a block from dataset server
:type block_coords: Block6D
:param block_coords: Tuple representing 5D coordinates of
(x,y,z,time, channel, angle)
:rtype: tuple
:return: A tuple with data and ``Point3D`` representing its sizes
"""
if not self.can_read:
raise DataStoreAccessException(
"Collection opened from %s is not readable"
% self.regs_client.to_url())
if self.block_fmt is None:
self.init_block_fmt()
url = self.base_url + Block6D.to_ds_url_part(block_coords)
result = requests.get(url)
if result is not None and int(result.status_code / 100) == 2:
data=result.content
x,y,z = struct.unpack(DatasetServerClient.header, data[0:12])
total_size = x * y * z
if total_size != -1:
return (Point3D(x, y, z),
struct.unpack("!12x%u%s" % (total_size, \
VOXEL_TYPES[self.voxel_type]), \
data))
return None
def read_blocks(self, block_coords_array):
"""Request a block from dataset server
:type block_coords_array: Block6D
:param block_coords_array: array or list of tuples representing
5D coordinates of (x,y,z,time, channel, angle)
:rtype: tuple
:return: tuple with a dictionary containing data for datapoints
and ``Point3D`` representing its sizes
"""
if not self.can_read:
raise DataStoreAccessException(
"Collection opened from %s is not readable"
% self.regs_client.to_url())
if self.block_fmt is None:
self.init_block_fmt()
count=0
blocks=len(block_coords_array)
results = {}
ids = []
url = self.base_url
for idx in range(blocks):
item = block_coords_array[idx]
url2 = url + Block6D.to_ds_url_part(item) + '/'
if len(url2) < MAX_URL_LEN and idx != blocks - 1:
url = url2
ids += [item]
count = count+1
continue
else:
if idx != blocks - 1:
url2 = self.base_url + Block6D.to_ds_url_part(item)
else: # Last block has to be added to the URL
#TODO: We use shorter URLs, but could we get over
# the maximum? Maybe rewrite with blocks+1 and
# different ifs?
url = url2
ids += [item]
count = count+1
#print(url, count, str(ids))
result = requests.get(url[:-1])
if result is not None and int(result.status_code / 100) == 2:
all_data=result.content
start=0
for i in range(count):
hdr=all_data[start:start+12]
x,y,z = struct.unpack(DatasetServerClient.header, hdr)
total_size = x * y * z
expander="!%u%s" % (total_size, \
VOXEL_TYPES[self.voxel_type])
if total_size != -1:
typed_size = struct.calcsize(expander)
next_index = typed_size + start+12
results[ids[i]] = (Point3D(x, y, z),
struct.unpack(expander, all_data[
start+12:next_index]))
start = next_index
else:
start += 12
if idx != blocks - 1:
count = 1
url = url2
ids = [item]
return results
def write_block(self, block_coords, data, block_sizes):
"""Request a block from dataset server
:type block_coords: ``Block6D``
:param block_coords: Tuple representing 5D coordinates of
(x,y,z,time, channel, angle)
:type data: array
:param data: Array of datatype entries representing block data
:type block_sizes: ``Point3D``
:param block_sizes: Tuple representing data sizes in x, y and z
"""
if not self.can_write:
raise DataStoreAccessException(
"Collection opened from %s is not writable"
% self.regs_client.to_url())
if self.block_fmt is None:
self.init_block_fmt()
total_size = block_sizes[0] * block_sizes[1] * block_sizes[2]
assert(len(data) == total_size)
url = self.base_url + Block6D.to_ds_url_part(block_coords)
post_data=struct.pack(self.block_fmt % total_size, block_sizes[0], \
block_sizes[1], block_sizes[2], *data)
result=requests.post(url, data=post_data, headers=self.binary_headers)
return result is not None and int(result.status_code / 100) == 2
def stop(self):
"""Stop the dataset server instance"""
if self.is_running():
requests.post(self.base_url + 'stop', data="")
self.info['serverTimeout'] = 0
self.regs_client.expires = 0