Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(utils): add binary reader and recorder for encoded states #152

Merged
merged 9 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ Release Versions:
- fix(controllers): remove duplicate function (#140)
- chore: format repository (#142)
- docs: update schema path in component descriptions (#154)
- feat(utils): add binary reader and recorder for encoded states (#152)

## 4.2.2

Expand Down
126 changes: 126 additions & 0 deletions source/modulo_utils/modulo_utils/encoded_state_recorder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import os
import struct
import time
from datetime import datetime
from io import BufferedWriter
from typing import List

import clproto
from modulo_interfaces.msg import EncodedState


class EncodedStateRecorder():
def __init__(self, filepath: str):
"""
Construct the EncodedStateRecorder. By calling first open() and then write(msg) successively, the
EncodedStateRecorder will write the provided message as binary data to a file under the desired path.

:param filepath: The full path of the recording file
"""
self._filepath = filepath
if not self._filepath.endswith(".bin"):
self._filepath += ".bin"
self._file: BufferedWriter

def __enter__(self):
self.open()
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.close()

def open(self):
"""
Open the file for writing.
"""
os.makedirs(os.path.dirname(self._filepath), exist_ok=True)
self._file = open(self._filepath, 'wb')

def write(self, msg: EncodedState):
"""
Write an EncodedState to the file.
"""
self._file.write(clproto.pack_fields([struct.pack('d', time.time()), bytes(msg.data)]))

def close(self):
"""
Close the file after writing.
"""
self._file.close()


def read_encoded_state_recording(filepath: str):
"""
Decode the binary data of a file created by a BinaryRecorder and return its content.
domire8 marked this conversation as resolved.
Show resolved Hide resolved

:param filepath: The full path of the recorded file
"""
if not filepath.endswith(".bin"):
filepath += ".bin"
data = []
block_size = 4
with open(filepath, 'rb') as f:
field_bytes = f.read(block_size)
while field_bytes:
package = field_bytes
# read the size of the timestamp
timestamp_size_bytes = f.read(block_size)
package += timestamp_size_bytes
timestamp_size = int.from_bytes(timestamp_size_bytes, byteorder='little', signed=False)
# read the size of the state
state_size_bytes = f.read(block_size)
package += state_size_bytes
state_size = int.from_bytes(state_size_bytes, byteorder='little', signed=False)
# read the whole package
package += f.read(timestamp_size)
package += f.read(state_size)
# decode the package with clproto
fields = clproto.unpack_fields(package)
timestamp = datetime.fromtimestamp(struct.unpack('d', fields[0])[0])
state = clproto.decode(fields[1])
data.append({'timestamp': timestamp, 'state': state})
# read again the first 4 bytes
field_bytes = f.read(block_size)
return data


def read_recording_directory(directory, filenames: List[str] = None) -> dict:
"""
Read a directory of recorded files.

:param directory: The path to recording directory
:param filenames: If provided, only read the given files in the recording directory
:return: The content of the files as a dict with {name: data}
"""
if not os.path.isdir(directory):
return {}
data = {}
if filenames is None:
filenames = next(os.walk(directory))[2]
for file in filenames:
if file.endswith(".bin"):
file = file[:-4]
data[file] = read_encoded_state_recording(os.path.join(directory, file))
return data


def read_recording_directories(directory: str, recording_directories: List[str] = None, filenames: List[str] = None) -> dict:
"""
Read a directory tree of recorded files.

:param directory: The path to the directry containing one or several recording directories
:param recording_directories: If provided, only read the files in the given recording directories
:param filenames: If provided, only read the given files in each recording directory
:return: The dataset of recorded states per recording and filename as a dict of dict {recording: {name: data}}
"""
if not os.path.isdir(directory):
return {}
data = {}
if recording_directories is None:
recording_directories = next(os.walk(directory))[1]
for recording_dir in recording_directories:
recording_data = read_recording_directory(os.path.join(directory, recording_dir), filenames)
if not recording_data:
continue
data[recording_dir] = recording_data
return data
26 changes: 26 additions & 0 deletions source/modulo_utils/test/python/test_encoded_state_recorder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import time

import clproto
import state_representation as sr
from modulo_interfaces.msg import EncodedState
from modulo_utils.encoded_state_recorder import (EncodedStateRecorder,
read_encoded_state_recording,
read_recording_directory)


def test_encoded_state_recorder():
current_time = time.time()
random_state = sr.CartesianState().Random("test")
msg = EncodedState()
msg.data = clproto.encode(random_state, clproto.MessageType.CARTESIAN_STATE_MESSAGE)
with EncodedStateRecorder(f"/tmp/{current_time}") as rec:
rec.write(msg)

data = read_encoded_state_recording(f"/tmp/{current_time}")
assert data
assert len(data) == 1
assert data[0]["state"].get_name() == random_state.get_name()

full_data = read_recording_directory("/tmp")
assert len(full_data) == 1
assert full_data[f"{current_time}"]
Loading