Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(utils): add binary reader and recorder for encoded states #152

Merged
merged 9 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ Release Versions:
- fix(controllers): remove duplicate function (#140)
- chore: format repository (#142)
- docs: update schema path in component descriptions (#154)
- feat(utils): add binary reader and recorder for encoded states (#152)

## 4.2.2

Expand Down
119 changes: 119 additions & 0 deletions source/modulo_utils/modulo_utils/binary_io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import os
import struct
import time
from datetime import datetime
from io import BufferedWriter
from typing import List

import clproto
from modulo_interfaces.msg import EncodedState


class BinaryRecorder():
domire8 marked this conversation as resolved.
Show resolved Hide resolved
def __init__(self, filepath: str):
"""
Construct the BinaryRecorder. By calling first open() and then write(msg) successively, the BinaryRecorder will
write the provided message as binary data to a file under the desired path.

:param filepath: The full path of the recording file
"""
self._filepath = filepath
if not self._filepath.endswith(".bin"):
self._filepath += ".bin"
self._file: BufferedWriter

def open(self):
"""
Open the file for writing.
"""
os.makedirs(os.path.dirname(self._filepath), exist_ok=True)
self._file = open(self._filepath, 'wb')

def write(self, msg: EncodedState):
"""
Write an EncodedState to the file.
"""
self._file.write(clproto.pack_fields([struct.pack('d', time.time()), bytes(msg.data)]))

def close(self):
"""
Close the file after writing.
"""
self._file.close()
domire8 marked this conversation as resolved.
Show resolved Hide resolved


def read_binary_file(filepath: str):
domire8 marked this conversation as resolved.
Show resolved Hide resolved
"""
Decode the binary data of a file created by a BinaryRecorder and return its content.

:param filepath: The full path of the recorded file
"""
if not filepath.endswith(".bin"):
filepath += ".bin"
data = []
block_size = 4
with open(filepath, 'rb') as f:
field_bytes = f.read(block_size)
while field_bytes:
package = field_bytes
# read the size of the timestamp
timestamp_size_bytes = f.read(block_size)
package += timestamp_size_bytes
timestamp_size = int.from_bytes(timestamp_size_bytes, byteorder='little', signed=False)
# read the size of the state
state_size_bytes = f.read(block_size)
package += state_size_bytes
state_size = int.from_bytes(state_size_bytes, byteorder='little', signed=False)
# read the whole package
package += f.read(timestamp_size)
package += f.read(state_size)
# decode the package with clproto
fields = clproto.unpack_fields(package)
timestamp = datetime.fromtimestamp(struct.unpack('d', fields[0])[0])
state = clproto.decode(fields[1])
data.append({'timestamp': timestamp, 'state': state})
# read again the first 4 bytes
field_bytes = f.read(block_size)
return data


def read_directory(directory, filenames: List[str] = None) -> dict:
domire8 marked this conversation as resolved.
Show resolved Hide resolved
"""
Read a directory of recorded files.

:param directory: The path to recording directory
:param filenames: If provided, only read the given files in the recording directory
:return: The content of the files as a dict with {name: data}
"""
if not os.path.isdir(directory):
return {}
data = {}
if filenames is None:
filenames = next(os.walk(directory))[2]
for file in filenames:
if file.endswith(".bin"):
file = file[:-4]
data[file] = read_binary_file(os.path.join(directory, file))
return data


def read_directories(directory: str, recording_directories: List[str] = None, filenames: List[str] = None) -> dict:
"""
Read a directory tree of recorded files.

:param directory: The path to the directry containing one or several recording directories
:param recording_directories: If provided, only read the files in the given recording directories
:param filenames: If provided, only read the given files in each recording directory
:return: The dataset of recorded states per recording and filename as a dict of dict {recording: {name: data}}
"""
if not os.path.isdir(directory):
return {}
data = {}
if recording_directories is None:
recording_directories = next(os.walk(directory))[1]
for recording_dir in recording_directories:
recording_data = read_directory(os.path.join(directory, recording_dir), filenames)
if not recording_data:
continue
data[recording_dir] = recording_data
return data
26 changes: 26 additions & 0 deletions source/modulo_utils/test/python/test_binary_io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import time

import clproto
import state_representation as sr
from modulo_interfaces.msg import EncodedState
from modulo_utils.binary_io import BinaryRecorder, read_binary_file, read_directory


def test_binary_io():
current_time = time.time()
random_state = sr.CartesianState().Random("test")
recorder = BinaryRecorder(f"/tmp/{current_time}")
recorder.open()
msg = EncodedState()
msg.data = clproto.encode(random_state, clproto.MessageType.CARTESIAN_STATE_MESSAGE)
recorder.write(msg)
recorder.close()

data = read_binary_file(f"/tmp/{current_time}")
assert data
assert len(data) == 1
assert data[0]["state"].get_name() == random_state.get_name()

full_data = read_directory("/tmp")
assert len(full_data) == 1
assert full_data[f"{current_time}"]
Loading