-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmain.py
211 lines (173 loc) · 6.97 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
import argparse # noqa: D100
import csv
import os
import struct
import sys
from pyubx2 import UBXReader
from pyulog.core import ULog
from ubx_interface import CLIDPAIR_INV, MSGATTR, SYNC1, SYNC2, UbxParserState
def extract_gps_dump(ulog_filename):
"""Get gps dump data."""
# looking into gps_dump
msg_filter = ["gps_dump"]
ulog = ULog(ulog_filename, msg_filter, disable_str_exceptions=False)
data = ulog.data_list
if len(data) == 0:
print(f"File {ulog_filename} does not have necessary msgs")
sys.exit(0)
if len(ulog.dropouts) > 0:
print(f"File has {ulog.dropouts} dropouts")
for d in data:
print(f"Found {len(d.field_data)} msgs in {d.name}")
gps_dump_data = data[0]
field_names = [f.field_name for f in gps_dump_data.field_data]
if "len" not in field_names or "data[0]" not in field_names:
print("gps dump: msgs are not correct")
sys.exit(-1)
msg_lens = gps_dump_data.data["len"]
print(f"gps_dump msg lens {len(msg_lens)}")
parser_state = UbxParserState.IDLE
synced_msgs = {}
msg_buffer = []
for i in range(len(gps_dump_data.data["timestamp"])):
for d in range(79):
data_byte = gps_dump_data.data["data[{}]".format(d)][i]
if parser_state == UbxParserState.IDLE:
msg_buffer = []
if data_byte == SYNC1:
parser_state = UbxParserState.SYNC
elif parser_state == UbxParserState.SYNC:
msg_buffer = []
if data_byte == SYNC2:
msg_buffer.append(SYNC1)
msg_buffer.append(SYNC2)
parser_state = UbxParserState.READING
elif parser_state == UbxParserState.READING:
if data_byte == SYNC1 and d + 1 < 79:
next_data_byte = gps_dump_data.data["data[{}]".format(d + 1)][i]
if next_data_byte == SYNC2:
synced_msgs[i] = (
msg_buffer,
gps_dump_data.data["timestamp"][i],
)
parser_state = UbxParserState.SYNC
else:
msg_buffer.append(data_byte)
elif (
data_byte == SYNC1
and d == 78
and i + 1 < len(gps_dump_data.data["timestamp"])
):
# last index in gps_dump data, look into next batch
next_data_byte = gps_dump_data.data["data[{}]".format(0)][i + 1]
if next_data_byte == SYNC2:
synced_msgs[i] = (
msg_buffer,
gps_dump_data.data["timestamp"][i],
)
parser_state = UbxParserState.SYNC
else:
msg_buffer.append(data_byte)
else:
msg_buffer.append(data_byte)
return synced_msgs
def main():
parser = argparse.ArgumentParser(
description="PX4 ublox gps_dump extractor"
)
parser.add_argument(
"-i",
"--input_file",
action="store",
required=True,
help="path to input file (include file name)",
)
parser.add_argument(
"-o",
"--output_dir",
action="store",
help="path to output files",
)
args = parser.parse_args()
if not os.path.isfile(args.input_file):
raise FileNotFoundError
# if output path not specified, set output dir the same as input
if not args.output_dir:
output_dir = os.path.dirname(args.input_file)
output_file_prefix = os.path.basename(args.input_file)
# strip ".ulog"
if output_file_prefix.lower().endswith(".ulg"):
output_file_prefix = output_file_prefix[:-4]
raw_msgs = extract_gps_dump(args.input_file)
count_msgs = dict()
valid_msgs = dict()
for _, msg in raw_msgs.items():
if len(msg[0]) == 2:
# somehow only contains preamble 1 and preamble 2
continue
data = msg[0]
timestamp = msg[1]
if len(data) < 6:
# somehow data that is not enough to unpack
# the class, id, and payload length
continue
if data[0] == SYNC1 and data[1] == SYNC2:
try:
(cls, id, payload_len) = struct.unpack("<BBH", bytes(data[2:6]))
clid = CLIDPAIR_INV.get((cls, id))
except Exception as e:
# ignore unexpected data
print(e)
continue
if clid:
try:
# 6 bytes (sync1, sync2, class, id, len1, len2)
# 2 bytes (checksum)
parsed_msg = UBXReader.parse(bytes(data[: 6 + payload_len + 2]))
except Exception as e:
print(e)
# print(f"len: {payload_len}, {len(data[: 6 + payload_len + 2])}")
continue
if clid in count_msgs:
count_msgs[clid] += 1
else:
count_msgs[clid] = 1
entries = []
header = []
try:
for item in MSGATTR[clid]:
if isinstance(item, dict) and "rgroup" in item:
rep_attr, rep_entries = item["rgroup"]
n_rep = getattr(parsed_msg, rep_attr)
for r in range(int(n_rep)):
for r_entry in rep_entries:
attr_name = r_entry + f"_{r + 1:02}"
entries.append(getattr(parsed_msg, attr_name))
header.append(attr_name)
else:
entries.append(getattr(parsed_msg, item))
header.append(item)
if clid not in valid_msgs:
valid_msgs[clid] = dict(timestamp=header)
valid_msgs[clid][timestamp] = entries
# valid_msgs[clid] = {timestamp: entries}
else:
valid_msgs[clid][timestamp] = entries
except Exception:
continue
print(f"gps_dump contains following messages \n{count_msgs}")
# print(valid_msgs)
for k, v in valid_msgs.items():
output_file = os.path.join(
output_dir, output_file_prefix + f"_{k}.csv"
)
csv_fields = ["timestamp"] + v.get("timestamp")
with open(output_file, mode="w", newline="") as csv_entry:
csv_writer = csv.writer(csv_entry, delimiter=",")
csv_writer.writerow(csv_fields)
for t, entry in v.items():
if t == "timestamp":
continue
csv_writer.writerow([t] + list(entry))
if __name__ == "__main__":
main()