-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathEvent.py
139 lines (112 loc) · 4.21 KB
/
Event.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import uuid
from datetime import datetime
from struct import unpack
from CommonEnums import MessageType, SourceType, DataBit, Method
from EventItem import EventItem
class Event:
def __init__(self):
self.id = None
self.parentId = None
self.componentId = None
self.timestamp = None
self.method = Method.miAddUser
self.sourceType = SourceType.stErrRestore
self.subSystem = 0
self.dataBits = DataBit.dbUndefined
self.messageType = MessageType.mtEvent
self.hardware = 0
self.items = []
def fromByteArray(value: bytes):
result = Event()
# skip 36 bytes envelope
offset = 36
result.id = uuid.UUID(bytes_le=bytes(value[offset:offset + 16]))
offset += 16
result.parentId = uuid.UUID(bytes_le=bytes(value[offset:offset + 16]))
offset += 16
result.componentId = uuid.UUID(bytes_le=bytes(value[offset:offset + 16]))
offset += 16
time, \
code, \
itemsCount = unpack("QII", value[offset:offset + 16])
offset += 16
result.timestamp = datetime.fromtimestamp(time)
result.method = Method(code & 0xff)
result.sourceType = SourceType((code >> 8) & 0x1f)
result.subSystem = (code >> 13) & 0x7
result.dataBits = DataBit((code >> 16) & 0xff)
result.messageType = MessageType((code >> 24) & 0x1f)
result.hardware = (code >> 29) & 0x7
for index in range(itemsCount):
result.items.append(EventItem.fromByteArray(value[offset:offset + EventItem.binarySize]))
offset += EventItem.binarySize
return result
def toByteArray(self):
result = self.id.bytes_le
result += self.parentId.bytes_le
result += self.componentId.bytes_le
result += int(self.timestamp()).to_bytes(length=4, byteorder="little")
code = 0
code |= (self.hardware & 0x7) << 29
code |= (self.messageType & 0x1f) << 24
code |= (self.dataBits & 0xff) << 16
code |= (self.subSystem & 0x7) << 13
code |= (self.sourceType & 0x1f) << 8
code |= self.method & 0xff
result += code.to_bytes(4, "little")
result += len(self.items).to_bytes(4, "little")
for item in self.items:
result += item.toByteArray()
return result
self.id = None
self.parentId = None
self.componentId = None
self.timestamp = None
self.method = Method.miAddUser
self.sourceType = SourceType.stErrRestore
self.subSystem = 0
self.dataBits = DataBit.dbUndefined
self.messageType = MessageType.mtEvent
self.hardware = 0
self.items = []
def json(self):
return {
"id": self.id,
"parentId": self.parentId,
"componentId": self.componentId,
"timestamp": self.timestamp,
"method": self.method,
"sourceType": self.sourceType,
"subSystem": self.subSystem,
"dataBits": self.dataBits,
"messageType": self.messageType,
"hardware": self.hardware,
"items": map(lambda x: x.json(), self.items)}
"""
def create(id : mId, parentId : mParentId, componentId : mComponentId, timestamp: mDate, \
method: mOperationCode, sourceType: mSourceType, subSystem: mSubSystem, \
dataBits: mDataBits, messageType: mMessageType, hardware: mHardware, value=None):
result = Event()
result.id = id
result.parentId = parentId
result.componentId = componentId
result.timestamp = timestamp
result.method = method
result.sourceType = sourceType
result.subSystem = subSystem
result.dataBits = dataBits
result.messageType = messageType
result.hardware = hardware
if value:
result.items = value
return result
def json(self):
return {
"operationCode": self.operationCode
"sourceType": self.sourceType
"subSystem": self.subSystem
"dataBits": self.dataBits
"messageType": self.messageType
"hardware": self.hardware
}
"""