Skip to content

Commit

Permalink
typing improvement for ComInterface
Browse files Browse the repository at this point in the history
  • Loading branch information
robamu committed Feb 3, 2025
1 parent 6192ccf commit a2e33fd
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 14 deletions.
10 changes: 5 additions & 5 deletions src/tmtccmd/com/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
"""Communication module. Provides generic abstraction for communication and commonly used
concrete implementations."""

from abc import abstractmethod, ABC
from typing import Any, List, Optional
from __future__ import annotations

from tmtccmd.tmtc.common import TelemetryListT
from abc import abstractmethod, ABC
from typing import Any, Optional


class ReceptionDecodeError(Exception):
Expand Down Expand Up @@ -61,14 +61,14 @@ def close(self, args: Any = 0):
"""

@abstractmethod
def send(self, data: bytes):
def send(self, data: bytes | bytearray):
"""Send raw data.
:raises SendError: Sending failed for some reason.
"""

@abstractmethod
def receive(self, parameters: Any = 0) -> List[bytes]:
def receive(self, parameters: Any = 0) -> list[bytes]:
"""Returns a list of received packets. The child class can use a separate thread to poll for
the packets or use some other mechanism and container like a deque to store packets
to be returned here.
Expand Down
8 changes: 5 additions & 3 deletions src/tmtccmd/com/dummy.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
external hardware or an extra socket
"""

from typing import List, Optional
from __future__ import annotations

from typing import Optional

from deprecated.sphinx import deprecated
from spacepackets.ccsds.time import CdsShortTimestamp
Expand Down Expand Up @@ -133,9 +135,9 @@ def data_available(self, timeout: float = 0, parameters: any = 0):
return True
return False

def receive(self, parameters: any = 0) -> List[bytes]:
def receive(self, parameters: any = 0) -> list[bytes]:
return self.dummy_handler.receive_reply_package()

def send(self, data: bytes):
def send(self, data: bytes | bytearray):
if data is not None:
self.dummy_handler.insert_telecommand(data)
3 changes: 2 additions & 1 deletion src/tmtccmd/com/serial_cobs.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from __future__ import annotations
import collections
import logging
import threading
Expand Down Expand Up @@ -49,7 +50,7 @@ def close(self, args: any = None) -> None:
self.__reception_thread.join(0.4)
super().close_port()

def send(self, data: bytes):
def send(self, data: bytes | bytearray):
encoded = bytearray([0])
encoded.extend(cobs.encode(data))
encoded.append(0)
Expand Down
3 changes: 2 additions & 1 deletion src/tmtccmd/com/serial_dle.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from __future__ import annotations
import dataclasses
import logging
import threading
Expand Down Expand Up @@ -87,7 +88,7 @@ def close(self, args: any = None) -> None:
self.__reception_thread.join(0.4)
super().close_port()

def send(self, data: bytes):
def send(self, data: bytes | bytearray):
encoded_data = self.__encoder.encode(source_packet=data, add_stx_etx=True)
self.serial.write(encoded_data)

Expand Down
8 changes: 5 additions & 3 deletions src/tmtccmd/com/tcp.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""TCP communication interface"""

from __future__ import annotations

import logging
import queue
import socket
Expand All @@ -8,7 +10,7 @@
import threading
import select
from collections import deque
from typing import Any, List, Optional, Sequence
from typing import Any, Optional, Sequence

from spacepackets.ccsds.spacepacket import parse_space_packets, PacketId

Expand Down Expand Up @@ -123,10 +125,10 @@ def close(self, args: any = None) -> None:
self.__connected = False
self.__tcp_socket = None

def send(self, data: bytes):
def send(self, data: bytes | bytearray):
self.__tc_queue.put(data)

def receive(self, poll_timeout: float = 0) -> List[bytes]:
def receive(self, poll_timeout: float = 0) -> list[bytes]:
self.__tm_queue_to_packet_list()
tm_packet_list = self.tm_packet_list
self.tm_packet_list = []
Expand Down
4 changes: 3 additions & 1 deletion src/tmtccmd/com/udp.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""UDP Communication Interface"""

from __future__ import annotations

import logging
import select
import socket
Expand Down Expand Up @@ -64,7 +66,7 @@ def close(self, args: any = None) -> None:
if self.udp_socket is not None:
self.udp_socket.close()

def send(self, data: bytes):
def send(self, data: bytes | bytearray):
if self.udp_socket is None:
return
bytes_sent = self.udp_socket.sendto(data, self.send_address.to_tuple)
Expand Down

0 comments on commit a2e33fd

Please sign in to comment.