Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated Actuator Board package with USB to CAN (Until new protocol package is created with electrical team) #9

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
16 changes: 16 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"python.autoComplete.extraPaths": [
"/home/daniel/mil2/install/sub_actuator_board/lib/python3.12/site-packages",
"/home/daniel/mil2/install/mil_usb_to_can/lib/python3.12/site-packages",
"/home/daniel/mil2/install/mil_tools/lib/python3.12/site-packages",
"/home/daniel/mil2/install/electrical_protocol/lib/python3.12/site-packages",
"/opt/ros/jazzy/lib/python3.12/site-packages"
],
"python.analysis.extraPaths": [
"/home/daniel/mil2/install/sub_actuator_board/lib/python3.12/site-packages",
"/home/daniel/mil2/install/mil_usb_to_can/lib/python3.12/site-packages",
"/home/daniel/mil2/install/mil_tools/lib/python3.12/site-packages",
"/home/daniel/mil2/install/electrical_protocol/lib/python3.12/site-packages",
"/opt/ros/jazzy/lib/python3.12/site-packages"
]
}
43 changes: 43 additions & 0 deletions src/mil_common/drivers/mil_usb_to_can/launch/example.launch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument, SetLaunchConfiguration
from launch_ros.actions import Node
from launch.substitutions import LaunchConfiguration

def generate_launch_description():
# Declare the 'simulated' argument
declare_simulated_arg = DeclareLaunchArgument(
'simulated',
default_value='False',
description='Set to True if running in simulation'
)

# Set 'is_simulation' parameter based on the 'simulated' argument
set_is_simulation_param = SetLaunchConfiguration(
name='is_simulation',
value=LaunchConfiguration('simulated')
)

# Define the node
usb_to_can_driver_node = Node(
package='mil_usb_to_can',
executable='sub9_driver',
name='usb_to_can_driver',
parameters=[{
'is_simulation': LaunchConfiguration('simulated'),
'port': '/dev/serial/by-id/usb-FTDI_FT232R_USB_UART_A900KV22-if00-port0',
'baudrate': 115200,
'can_id': 0,
'device_handles': {
"3": "mil_usb_to_can.ExampleEchoDeviceHandle"
},
'simulated_devices': {
"3": "mil_usb_to_can.ExampleSimulatedEchoDevice"
}
}]
)

return LaunchDescription([
declare_simulated_arg,
set_is_simulation_param,
usb_to_can_driver_node
])
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .device import CANDeviceHandle, SimulatedCANDeviceHandle
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
from __future__ import annotations

from typing import TYPE_CHECKING

if TYPE_CHECKING:
import electrical_protocol

from .sub9_driver import SimulatedUSBtoCANStream, USBtoCANDriver


class SimulatedCANDeviceHandle:
"""
Simulates a CAN device, with functions to be overridden to handle data requests
and sends from motherboard.

Child classes can inherit from this class to implement a simulated CAN device.

Attributes:
inbound_packets (type[electrical_protocol.Packet]): The types of packets listened to by this device. Packets of this type will be routed to the :meth:`~.on_data` method of the device handle.
"""

def __init__(
self,
sim_board: SimulatedUSBtoCANStream,
inbound_packets: list[type[electrical_protocol.Packet]],
):
self._sim_board = sim_board
self.inbound_packets = inbound_packets

def send_data(self, data: bytes):
"""
Sends data over the serial connection.
"""
self._sim_board.send_to_bus(data)

def on_data(self, packet: electrical_protocol.Packet):
"""
Called when an relevant incoming packet is received over the serial
connection. Relevant packets are those listed in :attr:`~.inbound_packets`.

Partial data (ie, incomplete packets) are not sent through this event.

Args:
packet (electrical_protocol.Packet): The incoming packet.
"""
del packet


class CANDeviceHandle:
"""
Base class to allow developers to write handles for communication with a
particular CAN device. The two methods of the handle allow the handle to listen
to incoming data, as well as send data.
"""

def __init__(self, driver: USBtoCANDriver):
"""
Args:
driver (USBtoCANBoard): Driver that is used to communicate with the board.
device_id (int): The CAN ID of the device this class will handle. Not currently used.
"""
self._driver = driver

def on_data(self, data: electrical_protocol.Packet):
"""
Called when a return packet is sent over the serial connection. In the
USB to CAN protocol, it is assumed that packets will be returned to the
motherboard in the order they are sent out. Therefore, the type of packet
sent through this event is not guaranteed, and is only determined by the
message and subclass ID sent by the individual board.

Partial data (ie, incomplete packets) are not sent through this event.

Args:
packet (electrical_protocol.Packet): The incoming packet.
"""
del data

def send_data(self, data: electrical_protocol.Packet):
"""
Sends a packet over the serial connection.

Args:
data (electrical_protocol.Packet): The packet to send.
"""
return self._driver.send_data(self, data)
158 changes: 158 additions & 0 deletions src/mil_common/drivers/mil_usb_to_can/mil_usb_to_can/sub9/example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
import random
import string
from dataclasses import dataclass

import rclpy
from rclpy.node import Node
from rclpy.time import Time
from rclpy.duration import Duration
from std_srvs.srv import Trigger
from example_interfaces.srv import AddTwoInts # ROS 2 equivalent for AddTwoInts
from electrical_protocol import Packet

from .device import CANDeviceHandle, SimulatedCANDeviceHandle


@dataclass
class ExampleEchoRequestPacket(
Packet,
class_id=0x99,
subclass_id=0x00,
payload_format="<10s",
):
my_special_string: bytes


@dataclass
class ExampleEchoResponsePacket(
Packet,
class_id=0x99,
subclass_id=0x01,
payload_format="<10s",
):
my_special_string: bytes


@dataclass
class ExampleAdderRequestPacket(
Packet,
class_id=0x99,
subclass_id=0x02,
payload_format="<BB",
):
num_one: int
num_two: int


@dataclass
class ExampleAdderResponsePacket(
Packet,
class_id=0x99,
subclass_id=0x03,
payload_format="<B",
):
response: int


class ExampleEchoDeviceHandle(CANDeviceHandle, Node):
"""
An example implementation of a CANDeviceHandle which will handle
a device that echoes back any data sent to it.
"""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
Node.__init__(self, "echo_device")

self.last_sent = None
self.count = 0
self._srv = self.create_service(Trigger, "start_echo", self.srv_req)

def srv_req(self, req, response):
while self.count < 10:
if not self.send_new_string(10):
response.success = False
response.message = "Unable to send string of length ten."
return response
response.success = True
response.message = "Complete!"
return response

def on_data(self, data: ExampleEchoRequestPacket):
response = data.my_special_string.decode()
if self.last_sent is None:
raise RuntimeError(f"Received {data} but have not yet sent anything")
elif response != self.last_sent[0]:
raise ValueError(
f"ERROR! Received {response} but last sent {self.last_sent}",
)
else:
self.count += 1

def send_new_string(self, length: int = 10):
# Generate a test string
test = "".join(random.choice(string.ascii_letters) for _ in range(length))
self.last_sent = (test, self.get_clock().now())
self.send_data(ExampleEchoRequestPacket(test.encode()))
start = self.get_clock().now()
count_now = self.count
while self.count == count_now:
if self.get_clock().now() - start > Duration(seconds=1):
return False
return True


class ExampleAdderDeviceHandle(CANDeviceHandle, Node):
"""
An example implementation of a CANDeviceHandle which will handle
a device that processes addition requests.
"""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
Node.__init__(self, "adder_device")
self.response_received = None
self._srv = self.create_service(AddTwoInts, "add_two_ints", self.on_service_req)

def on_service_req(self, req, response):
self.response_received = None
self.send_data(ExampleAdderRequestPacket(req.a, req.b))
start = self.get_clock().now()
while self.response_received is None:
if self.get_clock().now() - start > Duration(seconds=1):
response.sum = -1
return response
response.sum = self.response_received.response
return response

def on_data(self, data: ExampleAdderResponsePacket):
self.response_received = data


class ExampleSimulatedEchoDevice(SimulatedCANDeviceHandle):
"""
Example implementation of a SimulatedCANDevice.
On sends, stores the transmitted data in a buffer.
When data is requested, it echoes this data back.
"""

def __init__(self, handle, inbound_packets):
super().__init__(handle, inbound_packets)

def on_data(self, data: ExampleEchoRequestPacket):
# Echo data received back onto the bus
self.send_data(bytes(ExampleEchoResponsePacket(data.my_special_string)))


class ExampleSimulatedAdderDevice(SimulatedCANDeviceHandle):
"""
Example implementation of a SimulatedCANDevice.
On sends, stores the transmitted data in a buffer.
When data is requested, it echoes this data back.
"""

def __init__(self, handle, inbound_packets):
super().__init__(handle, inbound_packets)

def on_data(self, data: ExampleAdderRequestPacket):
self.send_data(bytes(ExampleAdderResponsePacket(data.num_one + data.num_two)))
Loading
Loading