From d28c450fa86f0aa0c0f572144deeaee940d61ab5 Mon Sep 17 00:00:00 2001 From: hnewey7 Date: Tue, 6 Aug 2024 00:24:34 +0100 Subject: [PATCH 1/3] feat: add duration param to run method for DataStream --- alpaca/data/live/websocket.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/alpaca/data/live/websocket.py b/alpaca/data/live/websocket.py index c8966abe..d640edb9 100644 --- a/alpaca/data/live/websocket.py +++ b/alpaca/data/live/websocket.py @@ -3,6 +3,7 @@ import queue from collections import defaultdict from typing import Callable, Dict, List, Optional, Tuple, Union +from datetime import timedelta import msgpack import websockets @@ -359,10 +360,19 @@ async def _run_forever(self) -> None: finally: await asyncio.sleep(0) - def run(self) -> None: - """Starts up the websocket connection's event loop""" + def run(self, duration: timedelta = None) -> None: + """Starts up the websocket connection's event loop + + Parameters: + ----------- + duration: timedelta, default 'None' + Duration of event loop before timeout.""" try: - asyncio.run(self._run_forever()) + timeout_seconds = duration.total_seconds() if duration != None else 0 + asyncio.run(asyncio.wait([self._run_forever()], timeout=timeout_seconds)) + except TypeError: + print("invalid duration type entered") + pass except KeyboardInterrupt: print("keyboard interrupt, bye") pass From f3406df534313d916487be3322a834d9c787fb22 Mon Sep 17 00:00:00 2001 From: hnewey7 Date: Tue, 6 Aug 2024 22:14:30 +0100 Subject: [PATCH 2/3] fix: create task for _run_forever and add timeout duration --- alpaca/data/live/websocket.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/alpaca/data/live/websocket.py b/alpaca/data/live/websocket.py index d640edb9..ca34ddac 100644 --- a/alpaca/data/live/websocket.py +++ b/alpaca/data/live/websocket.py @@ -360,6 +360,11 @@ async def _run_forever(self) -> None: finally: await asyncio.sleep(0) + async def _run(self, duration: timedelta = None) -> None: + timeout_seconds = duration.total_seconds() if duration != None else None + run_task = asyncio.create_task(self._run_forever()) + await asyncio.wait([run_task], timeout=timeout_seconds) + def run(self, duration: timedelta = None) -> None: """Starts up the websocket connection's event loop @@ -368,9 +373,9 @@ def run(self, duration: timedelta = None) -> None: duration: timedelta, default 'None' Duration of event loop before timeout.""" try: - timeout_seconds = duration.total_seconds() if duration != None else 0 - asyncio.run(asyncio.wait([self._run_forever()], timeout=timeout_seconds)) - except TypeError: + asyncio.run(self._run(duration)) + except TypeError as e: + print(e) print("invalid duration type entered") pass except KeyboardInterrupt: From 8cf8c3c71d36ace67e5da53db5ad8b2058db309b Mon Sep 17 00:00:00 2001 From: hnewey7 Date: Tue, 6 Aug 2024 22:16:17 +0100 Subject: [PATCH 3/3] test: add test for running DataStream with duration --- tests/data/test_websockets.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/tests/data/test_websockets.py b/tests/data/test_websockets.py index 40e196a7..7dce5ff5 100644 --- a/tests/data/test_websockets.py +++ b/tests/data/test_websockets.py @@ -1,4 +1,4 @@ -from datetime import datetime +from datetime import datetime, timedelta import pytest from msgpack.ext import Timestamp @@ -232,3 +232,18 @@ async def handler_star(d): assert len(articles_b) == 1 assert len(articles_star) == 2 assert articles_star[1].headline == "c" + + +@pytest.mark.parametrize("duration_seconds", [(1), (2), (3)]) +def test_run(ws_client: DataStream, duration_seconds: int) -> None: + """Testing for different durations. + + Parameters: + ----------- + ws_client (DataStream): DataStream to test. + duration_seconds (int): Duration to test. + """ + start_time = datetime.now() + duration = timedelta(seconds=duration_seconds) + ws_client.run(duration) + assert round((datetime.now() - start_time).total_seconds()) >= duration_seconds