From 9e23d75850155bf2e8a196be4b52f6a1c1fe60e7 Mon Sep 17 00:00:00 2001 From: Jean-Louis Huynen Date: Mon, 24 Jul 2023 17:02:27 +0200 Subject: [PATCH 01/34] chg: [serializer] add support for phoenix version 2 serialization --- realtime/channel.py | 6 +++++- realtime/connection.py | 31 +++++++++++++++++++++---------- realtime/message.py | 3 ++- 3 files changed, 28 insertions(+), 12 deletions(-) diff --git a/realtime/channel.py b/realtime/channel.py index e4ac908..ba59a71 100644 --- a/realtime/channel.py +++ b/realtime/channel.py @@ -3,6 +3,7 @@ import asyncio import json from typing import Any, List, Dict, TYPE_CHECKING, NamedTuple +from realtime.message import * from realtime.types import Callback @@ -51,8 +52,11 @@ async def _join(self) -> None: Coroutine that attempts to join Phoenix Realtime server via a certain topic :return: None """ - join_req = dict(topic=self.topic, event="phx_join", + if self.socket.version == 1: + join_req = dict(topic=self.topic, event="phx_join", payload={}, ref=None) + elif self.socket.version == 2: + join_req = [None, None, self.topic, ChannelEvents.join, {}] try: await self.socket.ws_connection.send(json.dumps(join_req)) diff --git a/realtime/connection.py b/realtime/connection.py index cc017a3..37f9b04 100644 --- a/realtime/connection.py +++ b/realtime/connection.py @@ -31,14 +31,15 @@ def wrapper(*args: T_ParamSpec.args, **kwargs: T_ParamSpec.kwargs) -> T_Retval: class Socket: - def __init__(self, url: str, auto_reconnect: bool = False, params: Dict[str, Any] = {}, hb_interval: int = 5) -> None: + def __init__(self, url: str, auto_reconnect: bool = False, params: Dict[str, Any] = {}, hb_interval: int = 30, version: int = 2) -> None: """ `Socket` is the abstraction for an actual socket connection that receives and 'reroutes' `Message` according to its `topic` and `event`. Socket-Channel has a 1-many relationship. Socket-Topic has a 1-many relationship. :param url: Websocket URL of the Realtime server. starts with `ws://` or `wss://` :param params: Optional parameters for connection. - :param hb_interval: WS connection is kept alive by sending a heartbeat message. Optional, defaults to 5. + :param hb_interval: WS connection is kept alive by sending a heartbeat message. Optional, defaults to 30. + :param version: phoenix JSON serializer version. """ self.url = url self.channels = defaultdict(list) @@ -48,6 +49,7 @@ def __init__(self, url: str, auto_reconnect: bool = False, params: Dict[str, Any self.ws_connection: websockets.client.WebSocketClientProtocol self.kept_alive = False self.auto_reconnect = auto_reconnect + self.version = version self.channels: DefaultDict[str, List[Channel]] = defaultdict(list) @@ -70,7 +72,11 @@ async def _listen(self) -> None: while True: try: msg = await self.ws_connection.recv() - msg = Message(**json.loads(msg)) + if self.version == 1 : + msg = Message(**json.loads(msg)) + elif self.version == 2: + msg_array = json.loads(msg) + msg = Message(chanid=msg_array[0], ref= msg_array[1], topic=msg_array[2], event= msg_array[3], payload= msg_array[4]) if msg.event == ChannelEvents.reply: continue @@ -115,12 +121,17 @@ async def _keep_alive(self) -> None: """ while True: try: - data = dict( - topic=PHOENIX_CHANNEL, - event=ChannelEvents.heartbeat, - payload=HEARTBEAT_PAYLOAD, - ref=None, - ) + if self.version == 1 : + data = dict( + topic=PHOENIX_CHANNEL, + event=ChannelEvents.heartbeat, + payload=HEARTBEAT_PAYLOAD, + ref=None, + ) + elif self.version == 2 : + # [null,"4","phoenix","heartbeat",{}] + data = [None, None, PHOENIX_CHANNEL, ChannelEvents.heartbeat, HEARTBEAT_PAYLOAD] + await self.ws_connection.send(json.dumps(data)) await asyncio.sleep(self.hb_interval) except websockets.exceptions.ConnectionClosed: @@ -150,4 +161,4 @@ def summary(self) -> None: for topic, chans in self.channels.items(): for chan in chans: print( - f"Topic: {topic} | Events: {[e for e, _ in chan.callbacks]}]") + f"Topic: {topic} | Events: {[e for e, _ in chan.listeners]}]") diff --git a/realtime/message.py b/realtime/message.py index 9909d4d..df890b6 100644 --- a/realtime/message.py +++ b/realtime/message.py @@ -11,6 +11,7 @@ class Message: event: str payload: Dict[str, Any] ref: Any + chanid: Any topic: str def __hash__(self): @@ -32,4 +33,4 @@ class ChannelEvents(str, Enum): PHOENIX_CHANNEL = "phoenix" -HEARTBEAT_PAYLOAD = {"msg": "ping"} +HEARTBEAT_PAYLOAD = {} From e5bd3330cd06bb8ca922279cfc8948a4c4b94cf5 Mon Sep 17 00:00:00 2001 From: Jean-Louis Huynen Date: Wed, 4 Oct 2023 12:44:39 +0200 Subject: [PATCH 02/34] add: [send] sending to the channel poc --- realtime/channel.py | 61 ++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 60 insertions(+), 1 deletion(-) diff --git a/realtime/channel.py b/realtime/channel.py index ba59a71..eb5d9c1 100644 --- a/realtime/channel.py +++ b/realtime/channel.py @@ -53,7 +53,7 @@ async def _join(self) -> None: :return: None """ if self.socket.version == 1: - join_req = dict(topic=self.topic, event="phx_join", + join_req = dict(topic=self.topic, event=ChannelEvents.join, payload={}, ref=None) elif self.socket.version == 2: join_req = [None, None, self.topic, ChannelEvents.join, {}] @@ -64,6 +64,33 @@ async def _join(self) -> None: print(str(e)) # TODO: better error propagation return + def leave(self) -> Channel: + """ + Wrapper for async def _leave() to expose a non-async interface + Essentially gets the only event loop and attempt leaving a topic + :return: Channel + """ + loop = asyncio.get_event_loop() # TODO: replace with get_running_loop + loop.run_until_complete(self._join()) + return self + + async def _leave(self) -> None: + """ + Coroutine that attempts to join Phoenix Realtime server via a certain topic + :return: None + """ + if self.socket.version == 1: + join_req = dict(topic=self.topic, event=ChannelEvents.leave, + payload={}, ref=None) + elif self.socket.version == 2: + join_req = [None, None, self.topic, ChannelEvents.leave, {}] + + try: + await self.socket.ws_connection.send(json.dumps(join_req)) + except Exception as e: + print(str(e)) # TODO: better error propagation + return + def on(self, event: str, callback: Callback) -> Channel: """ :param event: A specific event will have a specific callback @@ -81,3 +108,35 @@ def off(self, event: str) -> None: """ self.listeners = [ callback for callback in self.listeners if callback.event != event] + + def send(self, event_name: str, payload:str) -> None: + """ + Wrapper for async def _send() to expose a non-async interface + Essentially gets the only event loop and attempt sending a payload + to a topic + :param event_name: The event_name: it must match the first argument of a handle_in function on the server channel module. + :param payload: The payload to be sent to the phoenix server + :return: None + """ + loop = asyncio.get_event_loop() # TODO: replace with get_running_loop + loop.run_until_complete(self._send(event_name, payload)) + return self + + async def _send(self, event_name: str, payload:str) -> None: + """ + Coroutine that attempts to join Phoenix Realtime server via a certain topic + :param event_name: The event_name: it must match the first argument of a handle_in function on the server channel module. + :param payload: The payload to be sent to the phoenix server + :return: None + """ + if self.socket.version == 1: + msg = dict(topic=self.topic, event=event_name, + payload=payload, ref=None) + elif self.socket.version == 2: + msg = [3, 3, self.topic, event_name, payload] + + try: + await self.socket.ws_connection.send(json.dumps(msg)) + except Exception as e: + print(str(e)) # TODO: better error propagation + return From 93bf4987c853b8e72a5e57821d8f2b2d7f8f00f7 Mon Sep 17 00:00:00 2001 From: Jean-Louis Huynen Date: Wed, 4 Oct 2023 16:57:51 +0200 Subject: [PATCH 03/34] add: [send] listener on a msg ref --- poetry.lock | 14 ++++++++++++-- pyproject.toml | 1 + realtime/channel.py | 32 +++++++++++++++++++++----------- realtime/connection.py | 17 +++++++++++++---- realtime/message.py | 2 +- 5 files changed, 48 insertions(+), 18 deletions(-) diff --git a/poetry.lock b/poetry.lock index a74d67a..029988b 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.5.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.6.1 and should not be changed by hand. [[package]] name = "colorama" @@ -131,6 +131,16 @@ files = [ {file = "typing_extensions-4.7.1.tar.gz", hash = "sha256:b75ddc264f0ba5615db7ba217daeb99701ad295353c45f9e95963337ceeeffb2"}, ] +[[package]] +name = "uuid" +version = "1.30" +description = "UUID object and generation functions (Python 2.3 or higher)" +optional = false +python-versions = "*" +files = [ + {file = "uuid-1.30.tar.gz", hash = "sha256:1f87cc004ac5120466f36c5beae48b4c48cc411968eed0eaecd3da82aa96193f"}, +] + [[package]] name = "websockets" version = "11.0.3" @@ -213,4 +223,4 @@ files = [ [metadata] lock-version = "2.0" python-versions = "^3.8" -content-hash = "542d5628e562e1d06aba9216b8b4308a7d5723a64a146f5311aae7b18a9e69e5" +content-hash = "d5bdcceb9e4ab6423b4c727ea2a0b4cde830fa00fa0a9f9d72f4e0d9fad77f9e" diff --git a/pyproject.toml b/pyproject.toml index 59adaef..0fc4d8e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,6 +9,7 @@ python = "^3.8" websockets = "^11.0" python-dateutil = "^2.8.1" typing-extensions = "^4.2.0" +uuid = "^1.30" [tool.poetry.dev-dependencies] pytest = "^7.2.0" diff --git a/realtime/channel.py b/realtime/channel.py index eb5d9c1..3413d34 100644 --- a/realtime/channel.py +++ b/realtime/channel.py @@ -2,6 +2,7 @@ import asyncio import json +import uuid from typing import Any, List, Dict, TYPE_CHECKING, NamedTuple from realtime.message import * @@ -14,6 +15,7 @@ class CallbackListener(NamedTuple): """A tuple with `event` and `callback` """ event: str + ref: str callback: Callback @@ -21,6 +23,7 @@ class Channel: """ `Channel` is an abstraction for a topic listener for an existing socket connection. Each Channel has its own topic and a list of event-callbacks that responds to messages. + A client can also send messages to a channel and register callback when expecting replies. Should only be instantiated through `connection.Socket().set_channel(topic)` Topic-Channel has a 1-many relationship. """ @@ -36,6 +39,8 @@ def __init__(self, socket: Socket, topic: str, params: Dict[str, Any] = {}) -> N self.topic = topic self.listeners: List[CallbackListener] = [] self.joined = False + self.join_ref = str(uuid.uuid4()) + self.join_msg_ref = str(uuid.uuid4()) def join(self) -> Channel: """ @@ -56,7 +61,8 @@ async def _join(self) -> None: join_req = dict(topic=self.topic, event=ChannelEvents.join, payload={}, ref=None) elif self.socket.version == 2: - join_req = [None, None, self.topic, ChannelEvents.join, {}] + #[join_reference, message_reference, topic_name, event_name, payload] + join_req = [self.join_ref, self.join_msg_ref, self.topic, ChannelEvents.join, {}] try: await self.socket.ws_connection.send(json.dumps(join_req)) @@ -83,7 +89,7 @@ async def _leave(self) -> None: join_req = dict(topic=self.topic, event=ChannelEvents.leave, payload={}, ref=None) elif self.socket.version == 2: - join_req = [None, None, self.topic, ChannelEvents.leave, {}] + join_req = [self.join_ref, None, self.topic, ChannelEvents.leave, {}] try: await self.socket.ws_connection.send(json.dumps(join_req)) @@ -91,52 +97,56 @@ async def _leave(self) -> None: print(str(e)) # TODO: better error propagation return - def on(self, event: str, callback: Callback) -> Channel: + def on(self, event: str, ref: str, callback: Callback) -> Channel: """ :param event: A specific event will have a specific callback + :param ref: A specific reference that will have a specific callback :param callback: Callback that takes msg payload as its first argument :return: Channel """ - cl = CallbackListener(event=event, callback=callback) + cl = CallbackListener(event=event, ref=ref, callback=callback) self.listeners.append(cl) return self - def off(self, event: str) -> None: + def off(self, event: str, ref: str) -> None: """ :param event: Stop responding to a certain event + :param event: Stop responding to a certain reference :return: None """ self.listeners = [ - callback for callback in self.listeners if callback.event != event] + callback for callback in self.listeners if (callback.event != event and callback.ref != ref)] - def send(self, event_name: str, payload:str) -> None: + def send(self, event_name: str, payload:str, ref: uuid = str(uuid.uuid4())) -> None: """ Wrapper for async def _send() to expose a non-async interface Essentially gets the only event loop and attempt sending a payload to a topic :param event_name: The event_name: it must match the first argument of a handle_in function on the server channel module. :param payload: The payload to be sent to the phoenix server + :param ref: The message reference that the server will use for replying - if none is set, generates the string repr of a uuidv4 :return: None """ loop = asyncio.get_event_loop() # TODO: replace with get_running_loop - loop.run_until_complete(self._send(event_name, payload)) + loop.run_until_complete(self._send(event_name, payload, ref)) return self - async def _send(self, event_name: str, payload:str) -> None: + async def _send(self, event_name: str, payload: str, ref: str) -> None: """ Coroutine that attempts to join Phoenix Realtime server via a certain topic :param event_name: The event_name: it must match the first argument of a handle_in function on the server channel module. :param payload: The payload to be sent to the phoenix server + :param ref: The message reference that the server will use for replying :return: None """ if self.socket.version == 1: msg = dict(topic=self.topic, event=event_name, payload=payload, ref=None) elif self.socket.version == 2: - msg = [3, 3, self.topic, event_name, payload] + msg = [None, ref, self.topic, event_name, payload] try: await self.socket.ws_connection.send(json.dumps(msg)) except Exception as e: print(str(e)) # TODO: better error propagation - return + return \ No newline at end of file diff --git a/realtime/connection.py b/realtime/connection.py index 37f9b04..2f101f4 100644 --- a/realtime/connection.py +++ b/realtime/connection.py @@ -1,6 +1,7 @@ import asyncio import json import logging +import pdb from collections import defaultdict from functools import wraps from typing import Any, Callable, List, Dict, TypeVar, DefaultDict @@ -76,15 +77,23 @@ async def _listen(self) -> None: msg = Message(**json.loads(msg)) elif self.version == 2: msg_array = json.loads(msg) - msg = Message(chanid=msg_array[0], ref= msg_array[1], topic=msg_array[2], event= msg_array[3], payload= msg_array[4]) + msg = Message(join_ref=msg_array[0], ref= msg_array[1], topic=msg_array[2], event= msg_array[3], payload= msg_array[4]) if msg.event == ChannelEvents.reply: - continue + for channel in self.channels.get(msg.topic, []): + if msg.ref == channel.join_msg_ref : + logging.info(f"Successfully joined {msg.topic}") + continue + else: + for cl in channel.listeners: + if cl.ref in ["*", msg.ref]: + cl.callback(msg.payload) for channel in self.channels.get(msg.topic, []): for cl in channel.listeners: if cl.event in ["*", msg.event]: cl.callback(msg.payload) + except websockets.exceptions.ConnectionClosed: if self.auto_reconnect: logging.info("Connection with server closed, trying to reconnect...") @@ -155,10 +164,10 @@ def set_channel(self, topic: str) -> Channel: def summary(self) -> None: """ - Prints a list of topics and event the socket is listening to + Prints a list of topics and event, and reference that the socket is listening to :return: None """ for topic, chans in self.channels.items(): for chan in chans: print( - f"Topic: {topic} | Events: {[e for e, _ in chan.listeners]}]") + f"Topic: {topic} | Events: {[e for e, _, _ in chan.listeners]} | References: {[r for _, r, _ in chan.listeners]}]") diff --git a/realtime/message.py b/realtime/message.py index df890b6..87da6e0 100644 --- a/realtime/message.py +++ b/realtime/message.py @@ -11,7 +11,7 @@ class Message: event: str payload: Dict[str, Any] ref: Any - chanid: Any + join_ref: Any topic: str def __hash__(self): From 4b3215bd67df1ecd165c87126b24f6a537fb96ea Mon Sep 17 00:00:00 2001 From: Jean-Louis Huynen Date: Thu, 5 Oct 2023 11:27:02 +0200 Subject: [PATCH 04/34] add: [leave] leaving a channel --- realtime/channel.py | 17 +++++++++-------- realtime/connection.py | 8 +++++++- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/realtime/channel.py b/realtime/channel.py index 3413d34..8fd673e 100644 --- a/realtime/channel.py +++ b/realtime/channel.py @@ -40,7 +40,7 @@ def __init__(self, socket: Socket, topic: str, params: Dict[str, Any] = {}) -> N self.listeners: List[CallbackListener] = [] self.joined = False self.join_ref = str(uuid.uuid4()) - self.join_msg_ref = str(uuid.uuid4()) + self.control_msg_ref = "" def join(self) -> Channel: """ @@ -62,7 +62,8 @@ async def _join(self) -> None: payload={}, ref=None) elif self.socket.version == 2: #[join_reference, message_reference, topic_name, event_name, payload] - join_req = [self.join_ref, self.join_msg_ref, self.topic, ChannelEvents.join, {}] + self.control_msg_ref = str(uuid.uuid4()) + join_req = [self.join_ref, self.control_msg_ref, self.topic, ChannelEvents.join, {}] try: await self.socket.ws_connection.send(json.dumps(join_req)) @@ -70,14 +71,14 @@ async def _join(self) -> None: print(str(e)) # TODO: better error propagation return - def leave(self) -> Channel: + def leave(self) -> None: """ Wrapper for async def _leave() to expose a non-async interface Essentially gets the only event loop and attempt leaving a topic - :return: Channel + :return: None """ loop = asyncio.get_event_loop() # TODO: replace with get_running_loop - loop.run_until_complete(self._join()) + loop.run_until_complete(self._leave()) return self async def _leave(self) -> None: @@ -86,13 +87,13 @@ async def _leave(self) -> None: :return: None """ if self.socket.version == 1: - join_req = dict(topic=self.topic, event=ChannelEvents.leave, + leave_req = dict(topic=self.topic, event=ChannelEvents.leave, payload={}, ref=None) elif self.socket.version == 2: - join_req = [self.join_ref, None, self.topic, ChannelEvents.leave, {}] + leave_req = [self.join_ref, None, self.topic, ChannelEvents.leave, {}] try: - await self.socket.ws_connection.send(json.dumps(join_req)) + await self.socket.ws_connection.send(json.dumps(leave_req)) except Exception as e: print(str(e)) # TODO: better error propagation return diff --git a/realtime/connection.py b/realtime/connection.py index 2f101f4..3518749 100644 --- a/realtime/connection.py +++ b/realtime/connection.py @@ -81,13 +81,19 @@ async def _listen(self) -> None: if msg.event == ChannelEvents.reply: for channel in self.channels.get(msg.topic, []): - if msg.ref == channel.join_msg_ref : + if msg.ref == channel.control_msg_ref : logging.info(f"Successfully joined {msg.topic}") continue else: for cl in channel.listeners: if cl.ref in ["*", msg.ref]: cl.callback(msg.payload) + + if msg.event == ChannelEvents.close: + for channel in self.channels.get(msg.topic, []): + if msg.join_ref == channel.join_ref : + logging.info(f"Successfully left {msg.topic}") + continue for channel in self.channels.get(msg.topic, []): for cl in channel.listeners: From ca06e8ed518732a623b8fa950fc130f4e8fe1b81 Mon Sep 17 00:00:00 2001 From: Jean-Louis Huynen Date: Tue, 17 Oct 2023 10:54:55 +0200 Subject: [PATCH 05/34] chg: [logging] handling control msg --- realtime/connection.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/realtime/connection.py b/realtime/connection.py index 3518749..ff8c038 100644 --- a/realtime/connection.py +++ b/realtime/connection.py @@ -82,8 +82,15 @@ async def _listen(self) -> None: if msg.event == ChannelEvents.reply: for channel in self.channels.get(msg.topic, []): if msg.ref == channel.control_msg_ref : - logging.info(f"Successfully joined {msg.topic}") - continue + if msg.payload == "{'status': 'error', 'response': {'reason': 'sink not found'}}": + logging.info(f"{msg.topic} not found") + break + elif msg.payload == "{'status': 'error', 'response': {'reason': 'unauthorized'}}": + logging.info(f"{msg.topic} unauthorized") + break + elif msg.payload == "{'status': 'ok', 'response': {}}": + logging.info(f"Successfully joined {msg.topic}") + continue else: for cl in channel.listeners: if cl.ref in ["*", msg.ref]: From 172f9c1732f3d97d2548d7152ee327555cc7eb40 Mon Sep 17 00:00:00 2001 From: Jean-Louis Huynen Date: Tue, 17 Oct 2023 11:24:31 +0200 Subject: [PATCH 06/34] chg: [logging] handling control msg - again --- realtime/connection.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/realtime/connection.py b/realtime/connection.py index ff8c038..e270ba8 100644 --- a/realtime/connection.py +++ b/realtime/connection.py @@ -82,13 +82,10 @@ async def _listen(self) -> None: if msg.event == ChannelEvents.reply: for channel in self.channels.get(msg.topic, []): if msg.ref == channel.control_msg_ref : - if msg.payload == "{'status': 'error', 'response': {'reason': 'sink not found'}}": - logging.info(f"{msg.topic} not found") + if msg.payload["status"] == "error": + logging.info(f"Error joining channel: {msg.topic} - {msg.payload['response']['reason']}") break - elif msg.payload == "{'status': 'error', 'response': {'reason': 'unauthorized'}}": - logging.info(f"{msg.topic} unauthorized") - break - elif msg.payload == "{'status': 'ok', 'response': {}}": + elif msg.payload["status"] == "ok": logging.info(f"Successfully joined {msg.topic}") continue else: From ccc9e7873a9c341b8dbbbdb4aebb1cb4b80ac73d Mon Sep 17 00:00:00 2001 From: Jean-Louis Huynen Date: Wed, 18 Oct 2023 11:30:18 +0200 Subject: [PATCH 07/34] chg: [connection] leave on kb interrupt --- realtime/channel.py | 2 +- realtime/connection.py | 12 ++++++++++-- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/realtime/channel.py b/realtime/channel.py index 8fd673e..360ce01 100644 --- a/realtime/channel.py +++ b/realtime/channel.py @@ -83,7 +83,7 @@ def leave(self) -> None: async def _leave(self) -> None: """ - Coroutine that attempts to join Phoenix Realtime server via a certain topic + Coroutine that attempts to leave Phoenix Realtime server via a certain topic :return: None """ if self.socket.version == 1: diff --git a/realtime/connection.py b/realtime/connection.py index e270ba8..3b5b2aa 100644 --- a/realtime/connection.py +++ b/realtime/connection.py @@ -62,8 +62,16 @@ def listen(self) -> None: :return: None """ loop = asyncio.get_event_loop() # TODO: replace with get_running_loop - loop.run_until_complete(asyncio.gather( - self._listen(), self._keep_alive())) + loop.create_task(self._listen()) + loop.create_task(self._keep_alive()) + try: + loop.run_forever() + except KeyboardInterrupt: + # we leave all channels properly + for channel in self.channels: + for chan in self.channels.get(channel, []): + chan.leave() + async def _listen(self) -> None: """ From 3e4424db461573f2cc9df5db36338bdc7b8fce60 Mon Sep 17 00:00:00 2001 From: Jean-Louis Huynen Date: Fri, 20 Oct 2023 23:18:18 +0200 Subject: [PATCH 08/34] chg: [async] remove non-async API --- async-usage.py | 55 ++++++++++++++++++++++++++++++++++++++++++ realtime/channel.py | 42 +++----------------------------- realtime/connection.py | 49 ++++++++++++++----------------------- 3 files changed, 77 insertions(+), 69 deletions(-) create mode 100644 async-usage.py diff --git a/async-usage.py b/async-usage.py new file mode 100644 index 0000000..880ddd0 --- /dev/null +++ b/async-usage.py @@ -0,0 +1,55 @@ +from realtime.connection import Socket +import asyncio +import uuid + +def callback1(payload): + print(f"c1: {payload}") + +def callback2(payload): + print(f"c2: {payload}") + + +async def main(): + + TOKEN = "" + URLsink = f"ws://127.0.0.1:4000/socket/websocket?token={TOKEN}&vsn=2.0.0" + + client = Socket(URLsink) + + await client.connect() + + # fire and forget the listening routine + listen_task = asyncio.ensure_future(client.listen()) + + channel_s = client.set_channel("yourchannel") + await channel_s.join() + channel_s.on("test_event", None, callback1) + + # non sense elixir handler, we would not have an event on a reply + #def handle_in("request_ping", payload, socket) do + # push(socket, "test_event", %{body: payload}) + # {:noreply, socket} + #end + + await channel_s.send("request_ping", "this is my payload 1", None) + await channel_s.send("request_ping", "this is my payload 2", None) + await channel_s.send("request_ping", "this is my payload 3", None) + + # proper relpy elixir handler + #def handle_in("ping", payload, socket) do + # {:reply, {:ok, payload}, socket} + #end + + ref = str(uuid.uuid4()) + channel_s.on(None, ref, callback2) + await channel_s.send("ping", "this is my ping payload", ref) + + # we give it some time to complete + await asyncio.sleep(15) + + # proper shut down + listen_task.cancel() + +if __name__ == '__main__': + loop = asyncio.get_event_loop() + loop.run_until_complete(main()) diff --git a/realtime/channel.py b/realtime/channel.py index 360ce01..a8bbed2 100644 --- a/realtime/channel.py +++ b/realtime/channel.py @@ -1,6 +1,6 @@ from __future__ import annotations -import asyncio +import logging import json import uuid from typing import Any, List, Dict, TYPE_CHECKING, NamedTuple @@ -42,17 +42,7 @@ def __init__(self, socket: Socket, topic: str, params: Dict[str, Any] = {}) -> N self.join_ref = str(uuid.uuid4()) self.control_msg_ref = "" - def join(self) -> Channel: - """ - Wrapper for async def _join() to expose a non-async interface - Essentially gets the only event loop and attempt joining a topic - :return: Channel - """ - loop = asyncio.get_event_loop() # TODO: replace with get_running_loop - loop.run_until_complete(self._join()) - return self - - async def _join(self) -> None: + async def join(self) -> None: """ Coroutine that attempts to join Phoenix Realtime server via a certain topic :return: None @@ -71,17 +61,7 @@ async def _join(self) -> None: print(str(e)) # TODO: better error propagation return - def leave(self) -> None: - """ - Wrapper for async def _leave() to expose a non-async interface - Essentially gets the only event loop and attempt leaving a topic - :return: None - """ - loop = asyncio.get_event_loop() # TODO: replace with get_running_loop - loop.run_until_complete(self._leave()) - return self - - async def _leave(self) -> None: + async def leave(self) -> None: """ Coroutine that attempts to leave Phoenix Realtime server via a certain topic :return: None @@ -118,21 +98,7 @@ def off(self, event: str, ref: str) -> None: self.listeners = [ callback for callback in self.listeners if (callback.event != event and callback.ref != ref)] - def send(self, event_name: str, payload:str, ref: uuid = str(uuid.uuid4())) -> None: - """ - Wrapper for async def _send() to expose a non-async interface - Essentially gets the only event loop and attempt sending a payload - to a topic - :param event_name: The event_name: it must match the first argument of a handle_in function on the server channel module. - :param payload: The payload to be sent to the phoenix server - :param ref: The message reference that the server will use for replying - if none is set, generates the string repr of a uuidv4 - :return: None - """ - loop = asyncio.get_event_loop() # TODO: replace with get_running_loop - loop.run_until_complete(self._send(event_name, payload, ref)) - return self - - async def _send(self, event_name: str, payload: str, ref: str) -> None: + async def send(self, event_name: str, payload: str, ref: str) -> None: """ Coroutine that attempts to join Phoenix Realtime server via a certain topic :param event_name: The event_name: it must match the first argument of a handle_in function on the server channel module. diff --git a/realtime/connection.py b/realtime/connection.py index 3b5b2aa..25b3ed8 100644 --- a/realtime/connection.py +++ b/realtime/connection.py @@ -48,36 +48,20 @@ def __init__(self, url: str, auto_reconnect: bool = False, params: Dict[str, Any self.params = params self.hb_interval = hb_interval self.ws_connection: websockets.client.WebSocketClientProtocol - self.kept_alive = False + self.kept_alive = set() self.auto_reconnect = auto_reconnect self.version = version self.channels: DefaultDict[str, List[Channel]] = defaultdict(list) @ensure_connection - def listen(self) -> None: - """ - Wrapper for async def _listen() to expose a non-async interface - In most cases, this should be the last method executed as it starts an infinite listening loop. - :return: None - """ - loop = asyncio.get_event_loop() # TODO: replace with get_running_loop - loop.create_task(self._listen()) - loop.create_task(self._keep_alive()) - try: - loop.run_forever() - except KeyboardInterrupt: - # we leave all channels properly - for channel in self.channels: - for chan in self.channels.get(channel, []): - chan.leave() - - - async def _listen(self) -> None: + async def listen(self) -> None: """ An infinite loop that keeps listening. :return: None """ + self.kept_alive.add(asyncio.ensure_future(self.keep_alive())) + while True: try: msg = await self.ws_connection.recv() @@ -86,7 +70,6 @@ async def _listen(self) -> None: elif self.version == 2: msg_array = json.loads(msg) msg = Message(join_ref=msg_array[0], ref= msg_array[1], topic=msg_array[2], event= msg_array[3], payload= msg_array[4]) - if msg.event == ChannelEvents.reply: for channel in self.channels.get(msg.topic, []): if msg.ref == channel.control_msg_ref : @@ -123,25 +106,29 @@ async def _listen(self) -> None: logging.exception("Connection with the server closed.") break - def connect(self) -> None: - """ - Wrapper for async def _connect() to expose a non-async interface - """ - loop = asyncio.get_event_loop() # TODO: replace with get_running - loop.run_until_complete(self._connect()) - self.connected = True + except asyncio.CancelledError: + logging.info("Listen task was cancelled.") + await self.leave_all() + + except Exception as e: + logging.error(f"Unexpected error in listen: {e}") - async def _connect(self) -> None: + async def connect(self) -> None: ws_connection = await websockets.connect(self.url) if ws_connection.open: - logging.info("Connection was successful") self.ws_connection = ws_connection self.connected = True + logging.info("Connection was successful") else: raise Exception("Connection Failed") + + async def leave_all(self) -> None: + for channel in self.channels: + for chan in self.channels.get(channel, []): + await chan.leave() - async def _keep_alive(self) -> None: + async def keep_alive(self) -> None: """ Sending heartbeat to server every 5 seconds Ping - pong messages to verify connection is alive From f92d75551347d9d8b001dc955e71e6fbf8ecec3c Mon Sep 17 00:00:00 2001 From: Jean-Louis Huynen Date: Wed, 25 Oct 2023 14:04:23 +0200 Subject: [PATCH 09/34] chg: [async] {} -> self.params --- realtime/channel.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/realtime/channel.py b/realtime/channel.py index a8bbed2..89a26c1 100644 --- a/realtime/channel.py +++ b/realtime/channel.py @@ -53,7 +53,7 @@ async def join(self) -> None: elif self.socket.version == 2: #[join_reference, message_reference, topic_name, event_name, payload] self.control_msg_ref = str(uuid.uuid4()) - join_req = [self.join_ref, self.control_msg_ref, self.topic, ChannelEvents.join, {}] + join_req = [self.join_ref, self.control_msg_ref, self.topic, ChannelEvents.join, self.params] try: await self.socket.ws_connection.send(json.dumps(join_req)) From 5009fcfd450ff8b9d86fb9592e0028fab4f03445 Mon Sep 17 00:00:00 2001 From: Max Baluev Date: Tue, 31 Oct 2023 12:07:57 +0100 Subject: [PATCH 10/34] Fix reconnection --- realtime/connection.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/realtime/connection.py b/realtime/connection.py index 25b3ed8..ce75557 100644 --- a/realtime/connection.py +++ b/realtime/connection.py @@ -98,7 +98,7 @@ async def listen(self) -> None: except websockets.exceptions.ConnectionClosed: if self.auto_reconnect: logging.info("Connection with server closed, trying to reconnect...") - await self._connect() + await self.connect() for topic, channels in self.channels.items(): for channel in channels: await channel._join() @@ -151,7 +151,7 @@ async def keep_alive(self) -> None: except websockets.exceptions.ConnectionClosed: if self.auto_reconnect: logging.info("Connection with server closed, trying to reconnect...") - await self._connect() + await self.connect() else: logging.exception("Connection with the server closed.") break From 86d930624aac769dfc16193903ad8a3b22ee5c1a Mon Sep 17 00:00:00 2001 From: Max Baluev Date: Tue, 31 Oct 2023 12:46:20 +0100 Subject: [PATCH 11/34] Update connection.py, fix join async --- realtime/connection.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/realtime/connection.py b/realtime/connection.py index ce75557..90a703b 100644 --- a/realtime/connection.py +++ b/realtime/connection.py @@ -101,7 +101,7 @@ async def listen(self) -> None: await self.connect() for topic, channels in self.channels.items(): for channel in channels: - await channel._join() + await channel.join() else: logging.exception("Connection with the server closed.") break From d010483d8f405cbae299005e2786ec53aa4aa0fd Mon Sep 17 00:00:00 2001 From: Max Baluev Date: Thu, 2 Nov 2023 00:18:29 +0100 Subject: [PATCH 12/34] Improve error handling --- realtime/connection.py | 114 ++++++++++++++++++++++++++++------------- 1 file changed, 77 insertions(+), 37 deletions(-) diff --git a/realtime/connection.py b/realtime/connection.py index 90a703b..45dd1f6 100644 --- a/realtime/connection.py +++ b/realtime/connection.py @@ -1,12 +1,17 @@ import asyncio import json import logging -import pdb from collections import defaultdict from functools import wraps from typing import Any, Callable, List, Dict, TypeVar, DefaultDict import websockets +from websockets.exceptions import ( + ConnectionClosed, + InvalidHandshake, + InvalidMessage, + ConnectionClosedOK, +) from typing_extensions import ParamSpec from realtime.channel import Channel @@ -17,8 +22,8 @@ T_Retval = TypeVar("T_Retval") T_ParamSpec = ParamSpec("T_ParamSpec") -logging.basicConfig( - format="%(asctime)s:%(levelname)s - %(message)s", level=logging.INFO) +logging.basicConfig(format="%(asctime)s:%(levelname)s - %(message)s", level=logging.INFO) + def ensure_connection(func: Callable[T_ParamSpec, T_Retval]): @wraps(func) @@ -32,7 +37,14 @@ def wrapper(*args: T_ParamSpec.args, **kwargs: T_ParamSpec.kwargs) -> T_Retval: class Socket: - def __init__(self, url: str, auto_reconnect: bool = False, params: Dict[str, Any] = {}, hb_interval: int = 30, version: int = 2) -> None: + def __init__( + self, + url: str, + auto_reconnect: bool = False, + params: Dict[str, Any] = {}, + hb_interval: int = 30, + version: int = 2, + ) -> None: """ `Socket` is the abstraction for an actual socket connection that receives and 'reroutes' `Message` according to its `topic` and `event`. Socket-Channel has a 1-many relationship. @@ -65,14 +77,20 @@ async def listen(self) -> None: while True: try: msg = await self.ws_connection.recv() - if self.version == 1 : + if self.version == 1: msg = Message(**json.loads(msg)) elif self.version == 2: msg_array = json.loads(msg) - msg = Message(join_ref=msg_array[0], ref= msg_array[1], topic=msg_array[2], event= msg_array[3], payload= msg_array[4]) + msg = Message( + join_ref=msg_array[0], + ref=msg_array[1], + topic=msg_array[2], + event=msg_array[3], + payload=msg_array[4], + ) if msg.event == ChannelEvents.reply: for channel in self.channels.get(msg.topic, []): - if msg.ref == channel.control_msg_ref : + if msg.ref == channel.control_msg_ref: if msg.payload["status"] == "error": logging.info(f"Error joining channel: {msg.topic} - {msg.payload['response']['reason']}") break @@ -83,10 +101,10 @@ async def listen(self) -> None: for cl in channel.listeners: if cl.ref in ["*", msg.ref]: cl.callback(msg.payload) - + if msg.event == ChannelEvents.close: for channel in self.channels.get(msg.topic, []): - if msg.join_ref == channel.join_ref : + if msg.join_ref == channel.join_ref: logging.info(f"Successfully left {msg.topic}") continue @@ -95,23 +113,28 @@ async def listen(self) -> None: if cl.event in ["*", msg.event]: cl.callback(msg.payload) - except websockets.exceptions.ConnectionClosed: - if self.auto_reconnect: - logging.info("Connection with server closed, trying to reconnect...") - await self.connect() - for topic, channels in self.channels.items(): - for channel in channels: - await channel.join() - else: - logging.exception("Connection with the server closed.") - break + except ConnectionClosedOK: + logging.info("Connection was closed normally.") + await self.leave_all() + break + + except InvalidMessage: + logging.error("Received an invalid message. Check message format and content.") + + except ConnectionClosed as e: + logging.error(f"Connection closed unexpectedly: {e}") + self._handle_reconnection() + + except InvalidHandshake: + logging.error("Invalid handshake while connecting. Ensure your client and server configurations match.") except asyncio.CancelledError: logging.info("Listen task was cancelled.") await self.leave_all() - except Exception as e: + except Exception as e: # A general exception handler should be the last resort logging.error(f"Unexpected error in listen: {e}") + self._handle_reconnection() async def connect(self) -> None: ws_connection = await websockets.connect(self.url) @@ -122,7 +145,17 @@ async def connect(self) -> None: logging.info("Connection was successful") else: raise Exception("Connection Failed") - + + async def _handle_reconnection(self) -> None: + if self.auto_reconnect: + logging.info("Connection with server closed, trying to reconnect...") + await self.connect() + for topic, channels in self.channels.items(): + for channel in channels: + await channel.join() + else: + logging.exception("Connection with the server closed.") + async def leave_all(self) -> None: for channel in self.channels: for chan in self.channels.get(channel, []): @@ -135,26 +168,32 @@ async def keep_alive(self) -> None: """ while True: try: - if self.version == 1 : + if self.version == 1: data = dict( - topic=PHOENIX_CHANNEL, - event=ChannelEvents.heartbeat, - payload=HEARTBEAT_PAYLOAD, - ref=None, + topic=PHOENIX_CHANNEL, + event=ChannelEvents.heartbeat, + payload=HEARTBEAT_PAYLOAD, + ref=None, ) - elif self.version == 2 : + elif self.version == 2: # [null,"4","phoenix","heartbeat",{}] - data = [None, None, PHOENIX_CHANNEL, ChannelEvents.heartbeat, HEARTBEAT_PAYLOAD] - + data = [ + None, + None, + PHOENIX_CHANNEL, + ChannelEvents.heartbeat, + HEARTBEAT_PAYLOAD, + ] + await self.ws_connection.send(json.dumps(data)) await asyncio.sleep(self.hb_interval) - except websockets.exceptions.ConnectionClosed: - if self.auto_reconnect: - logging.info("Connection with server closed, trying to reconnect...") - await self.connect() - else: - logging.exception("Connection with the server closed.") - break + + except ConnectionClosed: + logging.error("Connection closed unexpectedly during heartbeat. Ensure the server is alive and responsive.") + self._handle_reconnection() + + except Exception as e: # A general exception handler should be the last resort + logging.error(f"Unexpected error in keep_alive: {e}") @ensure_connection def set_channel(self, topic: str) -> Channel: @@ -175,4 +214,5 @@ def summary(self) -> None: for topic, chans in self.channels.items(): for chan in chans: print( - f"Topic: {topic} | Events: {[e for e, _, _ in chan.listeners]} | References: {[r for _, r, _ in chan.listeners]}]") + f"Topic: {topic} | Events: {[e for e, _, _ in chan.listeners]} | References: {[r for _, r, _ in chan.listeners]}]" + ) From e9b4d66f15d7f5a1ee3d2592580c9a6b260dcdd2 Mon Sep 17 00:00:00 2001 From: Max Baluev Date: Thu, 2 Nov 2023 00:32:14 +0100 Subject: [PATCH 13/34] fix _handle_reconnection --- realtime/connection.py | 34 ++++++++++++++++++++++++---------- 1 file changed, 24 insertions(+), 10 deletions(-) diff --git a/realtime/connection.py b/realtime/connection.py index 45dd1f6..09c00ad 100644 --- a/realtime/connection.py +++ b/realtime/connection.py @@ -22,7 +22,9 @@ T_Retval = TypeVar("T_Retval") T_ParamSpec = ParamSpec("T_ParamSpec") -logging.basicConfig(format="%(asctime)s:%(levelname)s - %(message)s", level=logging.INFO) +logging.basicConfig( + format="%(asctime)s:%(levelname)s - %(message)s", level=logging.INFO +) def ensure_connection(func: Callable[T_ParamSpec, T_Retval]): @@ -92,7 +94,9 @@ async def listen(self) -> None: for channel in self.channels.get(msg.topic, []): if msg.ref == channel.control_msg_ref: if msg.payload["status"] == "error": - logging.info(f"Error joining channel: {msg.topic} - {msg.payload['response']['reason']}") + logging.info( + f"Error joining channel: {msg.topic} - {msg.payload['response']['reason']}" + ) break elif msg.payload["status"] == "ok": logging.info(f"Successfully joined {msg.topic}") @@ -119,22 +123,28 @@ async def listen(self) -> None: break except InvalidMessage: - logging.error("Received an invalid message. Check message format and content.") + logging.error( + "Received an invalid message. Check message format and content." + ) except ConnectionClosed as e: logging.error(f"Connection closed unexpectedly: {e}") - self._handle_reconnection() + await self._handle_reconnection() except InvalidHandshake: - logging.error("Invalid handshake while connecting. Ensure your client and server configurations match.") + logging.error( + "Invalid handshake while connecting. Ensure your client and server configurations match." + ) except asyncio.CancelledError: logging.info("Listen task was cancelled.") await self.leave_all() - except Exception as e: # A general exception handler should be the last resort + except ( + Exception + ) as e: # A general exception handler should be the last resort logging.error(f"Unexpected error in listen: {e}") - self._handle_reconnection() + await self._handle_reconnection() async def connect(self) -> None: ws_connection = await websockets.connect(self.url) @@ -189,10 +199,14 @@ async def keep_alive(self) -> None: await asyncio.sleep(self.hb_interval) except ConnectionClosed: - logging.error("Connection closed unexpectedly during heartbeat. Ensure the server is alive and responsive.") - self._handle_reconnection() + logging.error( + "Connection closed unexpectedly during heartbeat. Ensure the server is alive and responsive." + ) + await self._handle_reconnection() - except Exception as e: # A general exception handler should be the last resort + except ( + Exception + ) as e: # A general exception handler should be the last resort logging.error(f"Unexpected error in keep_alive: {e}") @ensure_connection From e9e44d8407d74de622744226e2fd8b8961b570f5 Mon Sep 17 00:00:00 2001 From: seva Date: Mon, 6 Nov 2023 14:51:26 +0100 Subject: [PATCH 14/34] test improvements --- realtime/connection.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/realtime/connection.py b/realtime/connection.py index 09c00ad..6f978c8 100644 --- a/realtime/connection.py +++ b/realtime/connection.py @@ -74,7 +74,7 @@ async def listen(self) -> None: An infinite loop that keeps listening. :return: None """ - self.kept_alive.add(asyncio.ensure_future(self.keep_alive())) + self.kept_alive.add(asyncio.create_task(self.keep_alive())) while True: try: @@ -115,7 +115,11 @@ async def listen(self) -> None: for channel in self.channels.get(msg.topic, []): for cl in channel.listeners: if cl.event in ["*", msg.event]: - cl.callback(msg.payload) + if asyncio.iscoroutinefunction(cl.callback): + asyncio.create_task(cl.callback(msg.payload)) + else: + cl.callback(msg.payload) + except ConnectionClosedOK: logging.info("Connection was closed normally.") From f27185169c85ad69b9e54c96c4fc9ba1d329e99e Mon Sep 17 00:00:00 2001 From: seva Date: Mon, 6 Nov 2023 15:05:24 +0100 Subject: [PATCH 15/34] test improvements --- realtime/connection.py | 104 +++++++++++++++++++++-------------------- 1 file changed, 54 insertions(+), 50 deletions(-) diff --git a/realtime/connection.py b/realtime/connection.py index 6f978c8..48d8d8b 100644 --- a/realtime/connection.py +++ b/realtime/connection.py @@ -18,7 +18,6 @@ from realtime.exceptions import NotConnectedError from realtime.message import HEARTBEAT_PAYLOAD, PHOENIX_CHANNEL, ChannelEvents, Message - T_Retval = TypeVar("T_Retval") T_ParamSpec = ParamSpec("T_ParamSpec") @@ -40,12 +39,12 @@ def wrapper(*args: T_ParamSpec.args, **kwargs: T_ParamSpec.kwargs) -> T_Retval: class Socket: def __init__( - self, - url: str, - auto_reconnect: bool = False, - params: Dict[str, Any] = {}, - hb_interval: int = 30, - version: int = 2, + self, + url: str, + auto_reconnect: bool = False, + params: Dict[str, Any] = {}, + hb_interval: int = 30, + version: int = 2, ) -> None: """ `Socket` is the abstraction for an actual socket connection that receives and 'reroutes' `Message` according to its `topic` and `event`. @@ -68,6 +67,51 @@ def __init__( self.channels: DefaultDict[str, List[Channel]] = defaultdict(list) + async def _process_message(self, msg: str): + try: + if self.version == 1: + msg = Message(**json.loads(msg)) + elif self.version == 2: + msg_array = json.loads(msg) + msg = Message( + join_ref=msg_array[0], + ref=msg_array[1], + topic=msg_array[2], + event=msg_array[3], + payload=msg_array[4], + ) + if msg.event == ChannelEvents.reply: + for channel in self.channels.get(msg.topic, []): + if msg.ref == channel.control_msg_ref: + if msg.payload["status"] == "error": + logging.info( + f"Error joining channel: {msg.topic} - {msg.payload['response']['reason']}" + ) + break + elif msg.payload["status"] == "ok": + logging.info(f"Successfully joined {msg.topic}") + continue + else: + for cl in channel.listeners: + if cl.ref in ["*", msg.ref]: + cl.callback(msg.payload) + + if msg.event == ChannelEvents.close: + for channel in self.channels.get(msg.topic, []): + if msg.join_ref == channel.join_ref: + logging.info(f"Successfully left {msg.topic}") + continue + + for channel in self.channels.get(msg.topic, []): + for cl in channel.listeners: + if cl.event in ["*", msg.event]: + if asyncio.iscoroutinefunction(cl.callback): + await cl.callback(msg.payload) + else: + cl.callback(msg.payload) + except Exception as e: + logging.error(f"Error processing message: {e}", exc_info=True) + @ensure_connection async def listen(self) -> None: """ @@ -79,47 +123,7 @@ async def listen(self) -> None: while True: try: msg = await self.ws_connection.recv() - if self.version == 1: - msg = Message(**json.loads(msg)) - elif self.version == 2: - msg_array = json.loads(msg) - msg = Message( - join_ref=msg_array[0], - ref=msg_array[1], - topic=msg_array[2], - event=msg_array[3], - payload=msg_array[4], - ) - if msg.event == ChannelEvents.reply: - for channel in self.channels.get(msg.topic, []): - if msg.ref == channel.control_msg_ref: - if msg.payload["status"] == "error": - logging.info( - f"Error joining channel: {msg.topic} - {msg.payload['response']['reason']}" - ) - break - elif msg.payload["status"] == "ok": - logging.info(f"Successfully joined {msg.topic}") - continue - else: - for cl in channel.listeners: - if cl.ref in ["*", msg.ref]: - cl.callback(msg.payload) - - if msg.event == ChannelEvents.close: - for channel in self.channels.get(msg.topic, []): - if msg.join_ref == channel.join_ref: - logging.info(f"Successfully left {msg.topic}") - continue - - for channel in self.channels.get(msg.topic, []): - for cl in channel.listeners: - if cl.event in ["*", msg.event]: - if asyncio.iscoroutinefunction(cl.callback): - asyncio.create_task(cl.callback(msg.payload)) - else: - cl.callback(msg.payload) - + asyncio.create_task(self._process_message(msg)) except ConnectionClosedOK: logging.info("Connection was closed normally.") @@ -145,7 +149,7 @@ async def listen(self) -> None: await self.leave_all() except ( - Exception + Exception ) as e: # A general exception handler should be the last resort logging.error(f"Unexpected error in listen: {e}") await self._handle_reconnection() @@ -209,7 +213,7 @@ async def keep_alive(self) -> None: await self._handle_reconnection() except ( - Exception + Exception ) as e: # A general exception handler should be the last resort logging.error(f"Unexpected error in keep_alive: {e}") From fcdc3de91cbd96961203a29a10f81bb4dfa84069 Mon Sep 17 00:00:00 2001 From: seva Date: Mon, 6 Nov 2023 15:12:49 +0100 Subject: [PATCH 16/34] test improvements --- realtime/connection.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/realtime/connection.py b/realtime/connection.py index 48d8d8b..1834688 100644 --- a/realtime/connection.py +++ b/realtime/connection.py @@ -102,13 +102,14 @@ async def _process_message(self, msg: str): logging.info(f"Successfully left {msg.topic}") continue + loop = asyncio.get_running_loop() for channel in self.channels.get(msg.topic, []): for cl in channel.listeners: if cl.event in ["*", msg.event]: if asyncio.iscoroutinefunction(cl.callback): - await cl.callback(msg.payload) + loop.create_task(cl.callback(msg.payload)) else: - cl.callback(msg.payload) + loop.run_in_executor(None, cl.callback, msg.payload) except Exception as e: logging.error(f"Error processing message: {e}", exc_info=True) From 1251851c0febc8a30da316554acb0a690a74fad5 Mon Sep 17 00:00:00 2001 From: seva Date: Mon, 6 Nov 2023 15:17:31 +0100 Subject: [PATCH 17/34] test improvements --- realtime/connection.py | 93 ++++++++++++++++++++---------------------- 1 file changed, 45 insertions(+), 48 deletions(-) diff --git a/realtime/connection.py b/realtime/connection.py index 1834688..db7d16e 100644 --- a/realtime/connection.py +++ b/realtime/connection.py @@ -6,13 +6,13 @@ from typing import Any, Callable, List, Dict, TypeVar, DefaultDict import websockets +from typing_extensions import ParamSpec from websockets.exceptions import ( ConnectionClosed, InvalidHandshake, InvalidMessage, ConnectionClosedOK, ) -from typing_extensions import ParamSpec from realtime.channel import Channel from realtime.exceptions import NotConnectedError @@ -67,52 +67,6 @@ def __init__( self.channels: DefaultDict[str, List[Channel]] = defaultdict(list) - async def _process_message(self, msg: str): - try: - if self.version == 1: - msg = Message(**json.loads(msg)) - elif self.version == 2: - msg_array = json.loads(msg) - msg = Message( - join_ref=msg_array[0], - ref=msg_array[1], - topic=msg_array[2], - event=msg_array[3], - payload=msg_array[4], - ) - if msg.event == ChannelEvents.reply: - for channel in self.channels.get(msg.topic, []): - if msg.ref == channel.control_msg_ref: - if msg.payload["status"] == "error": - logging.info( - f"Error joining channel: {msg.topic} - {msg.payload['response']['reason']}" - ) - break - elif msg.payload["status"] == "ok": - logging.info(f"Successfully joined {msg.topic}") - continue - else: - for cl in channel.listeners: - if cl.ref in ["*", msg.ref]: - cl.callback(msg.payload) - - if msg.event == ChannelEvents.close: - for channel in self.channels.get(msg.topic, []): - if msg.join_ref == channel.join_ref: - logging.info(f"Successfully left {msg.topic}") - continue - - loop = asyncio.get_running_loop() - for channel in self.channels.get(msg.topic, []): - for cl in channel.listeners: - if cl.event in ["*", msg.event]: - if asyncio.iscoroutinefunction(cl.callback): - loop.create_task(cl.callback(msg.payload)) - else: - loop.run_in_executor(None, cl.callback, msg.payload) - except Exception as e: - logging.error(f"Error processing message: {e}", exc_info=True) - @ensure_connection async def listen(self) -> None: """ @@ -122,9 +76,52 @@ async def listen(self) -> None: self.kept_alive.add(asyncio.create_task(self.keep_alive())) while True: + + await asyncio.sleep(0) + try: msg = await self.ws_connection.recv() - asyncio.create_task(self._process_message(msg)) + if self.version == 1: + msg = Message(**json.loads(msg)) + elif self.version == 2: + msg_array = json.loads(msg) + msg = Message( + join_ref=msg_array[0], + ref=msg_array[1], + topic=msg_array[2], + event=msg_array[3], + payload=msg_array[4], + ) + if msg.event == ChannelEvents.reply: + for channel in self.channels.get(msg.topic, []): + if msg.ref == channel.control_msg_ref: + if msg.payload["status"] == "error": + logging.info( + f"Error joining channel: {msg.topic} - {msg.payload['response']['reason']}" + ) + break + elif msg.payload["status"] == "ok": + logging.info(f"Successfully joined {msg.topic}") + continue + else: + for cl in channel.listeners: + if cl.ref in ["*", msg.ref]: + cl.callback(msg.payload) + + if msg.event == ChannelEvents.close: + for channel in self.channels.get(msg.topic, []): + if msg.join_ref == channel.join_ref: + logging.info(f"Successfully left {msg.topic}") + continue + + for channel in self.channels.get(msg.topic, []): + for cl in channel.listeners: + if cl.event in ["*", msg.event]: + if asyncio.iscoroutinefunction(cl.callback): + asyncio.create_task(cl.callback(msg.payload)) + else: + cl.callback(msg.payload) + except ConnectionClosedOK: logging.info("Connection was closed normally.") From 6b32e15d9a560ce3c1a739c2445c7efcc4a2569c Mon Sep 17 00:00:00 2001 From: seva Date: Tue, 7 Nov 2023 10:37:57 +0100 Subject: [PATCH 18/34] test improvements --- realtime/connection.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/realtime/connection.py b/realtime/connection.py index db7d16e..71f1e12 100644 --- a/realtime/connection.py +++ b/realtime/connection.py @@ -106,7 +106,11 @@ async def listen(self) -> None: else: for cl in channel.listeners: if cl.ref in ["*", msg.ref]: - cl.callback(msg.payload) + if asyncio.iscoroutinefunction(cl.callback): + asyncio.create_task(cl.callback(msg.payload)) + else: + loop = asyncio.get_event_loop() + loop.run_in_executor(None, cl.callback(msg.payload)) if msg.event == ChannelEvents.close: for channel in self.channels.get(msg.topic, []): @@ -120,7 +124,8 @@ async def listen(self) -> None: if asyncio.iscoroutinefunction(cl.callback): asyncio.create_task(cl.callback(msg.payload)) else: - cl.callback(msg.payload) + loop = asyncio.get_event_loop() + loop.run_in_executor(None, cl.callback(msg.payload)) except ConnectionClosedOK: From 8debb8e753ef92ba2a1751f131fdc50c6869a0e4 Mon Sep 17 00:00:00 2001 From: seva Date: Tue, 7 Nov 2023 11:30:24 +0100 Subject: [PATCH 19/34] break on terminate --- realtime/connection.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/realtime/connection.py b/realtime/connection.py index 71f1e12..137ce8d 100644 --- a/realtime/connection.py +++ b/realtime/connection.py @@ -76,11 +76,11 @@ async def listen(self) -> None: self.kept_alive.add(asyncio.create_task(self.keep_alive())) while True: - - await asyncio.sleep(0) - try: + await asyncio.sleep(0) + msg = await self.ws_connection.recv() + if self.version == 1: msg = Message(**json.loads(msg)) elif self.version == 2: @@ -150,6 +150,7 @@ async def listen(self) -> None: except asyncio.CancelledError: logging.info("Listen task was cancelled.") await self.leave_all() + break except ( Exception @@ -209,6 +210,9 @@ async def keep_alive(self) -> None: await self.ws_connection.send(json.dumps(data)) await asyncio.sleep(self.hb_interval) + except asyncio.CancelledError: + logging.info("Keep alive task was cancelled.") + break except ConnectionClosed: logging.error( "Connection closed unexpectedly during heartbeat. Ensure the server is alive and responsive." From f227115c7c3483fd2b1922838b8ba743338f4c62 Mon Sep 17 00:00:00 2001 From: seva Date: Tue, 7 Nov 2023 19:46:44 +0100 Subject: [PATCH 20/34] fix message --- realtime/message.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/realtime/message.py b/realtime/message.py index 87da6e0..532a145 100644 --- a/realtime/message.py +++ b/realtime/message.py @@ -11,9 +11,10 @@ class Message: event: str payload: Dict[str, Any] ref: Any - join_ref: Any topic: str + join_ref: Any = None # V2 + def __hash__(self): return hash((self.event, tuple(list(self.payload.values())), self.ref, self.topic)) From fdc34400271dd1fd80bf8f9ccad79b5113f1f7d8 Mon Sep 17 00:00:00 2001 From: seva Date: Wed, 8 Nov 2023 09:59:32 +0100 Subject: [PATCH 21/34] fix message --- realtime/connection.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/realtime/connection.py b/realtime/connection.py index 137ce8d..35b72ad 100644 --- a/realtime/connection.py +++ b/realtime/connection.py @@ -73,7 +73,8 @@ async def listen(self) -> None: An infinite loop that keeps listening. :return: None """ - self.kept_alive.add(asyncio.create_task(self.keep_alive())) + if self.hb_interval >= 0: + self.kept_alive.add(asyncio.create_task(self.keep_alive())) while True: try: From d00acfba30b4f12039736a2d4c6dd6e326662649 Mon Sep 17 00:00:00 2001 From: seva Date: Wed, 8 Nov 2023 10:27:04 +0100 Subject: [PATCH 22/34] fix message --- realtime/connection.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/realtime/connection.py b/realtime/connection.py index 35b72ad..6d33709 100644 --- a/realtime/connection.py +++ b/realtime/connection.py @@ -45,6 +45,7 @@ def __init__( params: Dict[str, Any] = {}, hb_interval: int = 30, version: int = 2, + ping_timeout: int = 20, ) -> None: """ `Socket` is the abstraction for an actual socket connection that receives and 'reroutes' `Message` according to its `topic` and `event`. @@ -64,6 +65,7 @@ def __init__( self.kept_alive = set() self.auto_reconnect = auto_reconnect self.version = version + self.ping_timeout = ping_timeout self.channels: DefaultDict[str, List[Channel]] = defaultdict(list) @@ -160,7 +162,7 @@ async def listen(self) -> None: await self._handle_reconnection() async def connect(self) -> None: - ws_connection = await websockets.connect(self.url) + ws_connection = await websockets.connect(self.url, ping_timeout=self.ping_timeout) if ws_connection.open: self.ws_connection = ws_connection From 001eb5ac976b1a6012bfde1b50b8e94a39453ef2 Mon Sep 17 00:00:00 2001 From: seva Date: Wed, 8 Nov 2023 12:34:51 +0100 Subject: [PATCH 23/34] fix message --- realtime/channel.py | 6 +++++- realtime/connection.py | 14 ++++++++++++++ 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/realtime/channel.py b/realtime/channel.py index 89a26c1..2774281 100644 --- a/realtime/channel.py +++ b/realtime/channel.py @@ -41,6 +41,7 @@ def __init__(self, socket: Socket, topic: str, params: Dict[str, Any] = {}) -> N self.joined = False self.join_ref = str(uuid.uuid4()) self.control_msg_ref = "" + self.members_count = 0 async def join(self) -> None: """ @@ -116,4 +117,7 @@ async def send(self, event_name: str, payload: str, ref: str) -> None: await self.socket.ws_connection.send(json.dumps(msg)) except Exception as e: print(str(e)) # TODO: better error propagation - return \ No newline at end of file + return + + async def has_members(self) -> bool: + return self.members_count > 0 diff --git a/realtime/connection.py b/realtime/connection.py index 6d33709..75c72ab 100644 --- a/realtime/connection.py +++ b/realtime/connection.py @@ -121,6 +121,20 @@ async def listen(self) -> None: logging.info(f"Successfully left {msg.topic}") continue + if msg.event == ChannelEvents.join: + for channel in self.channels.get(msg.topic, []): + if msg.join_ref != channel.join_ref: + channel.members_count += 1 + logging.info(f"New member joined to {msg.topic}") + continue + + if msg.event == ChannelEvents.leave: + for channel in self.channels.get(msg.topic, []): + if msg.join_ref != channel.join_ref: + channel.members_count -= 1 + logging.info(f"Member left from {msg.topic}") + continue + for channel in self.channels.get(msg.topic, []): for cl in channel.listeners: if cl.event in ["*", msg.event]: From a6be91543451a1ba26b468904780216a4508abca Mon Sep 17 00:00:00 2001 From: seva Date: Wed, 8 Nov 2023 12:47:15 +0100 Subject: [PATCH 24/34] fix message --- realtime/channel.py | 4 ---- realtime/connection.py | 14 -------------- 2 files changed, 18 deletions(-) diff --git a/realtime/channel.py b/realtime/channel.py index 2774281..defbdea 100644 --- a/realtime/channel.py +++ b/realtime/channel.py @@ -41,7 +41,6 @@ def __init__(self, socket: Socket, topic: str, params: Dict[str, Any] = {}) -> N self.joined = False self.join_ref = str(uuid.uuid4()) self.control_msg_ref = "" - self.members_count = 0 async def join(self) -> None: """ @@ -118,6 +117,3 @@ async def send(self, event_name: str, payload: str, ref: str) -> None: except Exception as e: print(str(e)) # TODO: better error propagation return - - async def has_members(self) -> bool: - return self.members_count > 0 diff --git a/realtime/connection.py b/realtime/connection.py index 75c72ab..6d33709 100644 --- a/realtime/connection.py +++ b/realtime/connection.py @@ -121,20 +121,6 @@ async def listen(self) -> None: logging.info(f"Successfully left {msg.topic}") continue - if msg.event == ChannelEvents.join: - for channel in self.channels.get(msg.topic, []): - if msg.join_ref != channel.join_ref: - channel.members_count += 1 - logging.info(f"New member joined to {msg.topic}") - continue - - if msg.event == ChannelEvents.leave: - for channel in self.channels.get(msg.topic, []): - if msg.join_ref != channel.join_ref: - channel.members_count -= 1 - logging.info(f"Member left from {msg.topic}") - continue - for channel in self.channels.get(msg.topic, []): for cl in channel.listeners: if cl.event in ["*", msg.event]: From 52af8ff8ade2efe011dccda5419c95f8b5fadccb Mon Sep 17 00:00:00 2001 From: seva Date: Wed, 8 Nov 2023 12:57:41 +0100 Subject: [PATCH 25/34] fix message --- realtime/connection.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/realtime/connection.py b/realtime/connection.py index 6d33709..ac3ac5a 100644 --- a/realtime/connection.py +++ b/realtime/connection.py @@ -112,8 +112,7 @@ async def listen(self) -> None: if asyncio.iscoroutinefunction(cl.callback): asyncio.create_task(cl.callback(msg.payload)) else: - loop = asyncio.get_event_loop() - loop.run_in_executor(None, cl.callback(msg.payload)) + cl.callback(msg.payload) if msg.event == ChannelEvents.close: for channel in self.channels.get(msg.topic, []): @@ -127,8 +126,7 @@ async def listen(self) -> None: if asyncio.iscoroutinefunction(cl.callback): asyncio.create_task(cl.callback(msg.payload)) else: - loop = asyncio.get_event_loop() - loop.run_in_executor(None, cl.callback(msg.payload)) + cl.callback(msg.payload) except ConnectionClosedOK: From 7a5474df87fa3f529203329ff8647e67e65e0a5e Mon Sep 17 00:00:00 2001 From: seva Date: Wed, 8 Nov 2023 13:43:42 +0100 Subject: [PATCH 26/34] fix message --- realtime/connection.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/realtime/connection.py b/realtime/connection.py index ac3ac5a..fa02bab 100644 --- a/realtime/connection.py +++ b/realtime/connection.py @@ -236,6 +236,13 @@ def set_channel(self, topic: str) -> Channel: return chan + def remove_channel(self, topic: str) -> None: + """ + :param topic: Removes a channel from the socket + :return: None + """ + self.channels.pop(topic, None) + def summary(self) -> None: """ Prints a list of topics and event, and reference that the socket is listening to From cb1b69531f13d51e1d8aa9c69951c9e14be9f4f0 Mon Sep 17 00:00:00 2001 From: seva Date: Fri, 10 Nov 2023 17:10:06 +0100 Subject: [PATCH 27/34] raise exceptions --- realtime/channel.py | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/realtime/channel.py b/realtime/channel.py index defbdea..afdad6c 100644 --- a/realtime/channel.py +++ b/realtime/channel.py @@ -1,11 +1,11 @@ from __future__ import annotations -import logging import json +import logging import uuid -from typing import Any, List, Dict, TYPE_CHECKING, NamedTuple -from realtime.message import * +from typing import List, TYPE_CHECKING, NamedTuple, Dict, Any +from realtime.message import ChannelEvents from realtime.types import Callback if TYPE_CHECKING: @@ -49,17 +49,17 @@ async def join(self) -> None: """ if self.socket.version == 1: join_req = dict(topic=self.topic, event=ChannelEvents.join, - payload={}, ref=None) + payload={}, ref=None) elif self.socket.version == 2: - #[join_reference, message_reference, topic_name, event_name, payload] + # [join_reference, message_reference, topic_name, event_name, payload] self.control_msg_ref = str(uuid.uuid4()) join_req = [self.join_ref, self.control_msg_ref, self.topic, ChannelEvents.join, self.params] try: await self.socket.ws_connection.send(json.dumps(join_req)) except Exception as e: - print(str(e)) # TODO: better error propagation - return + logging.error(f"Error while joining channel: {str(e)}", exc_info=True) + raise async def leave(self) -> None: """ @@ -68,15 +68,15 @@ async def leave(self) -> None: """ if self.socket.version == 1: leave_req = dict(topic=self.topic, event=ChannelEvents.leave, - payload={}, ref=None) + payload={}, ref=None) elif self.socket.version == 2: leave_req = [self.join_ref, None, self.topic, ChannelEvents.leave, {}] try: await self.socket.ws_connection.send(json.dumps(leave_req)) except Exception as e: - print(str(e)) # TODO: better error propagation - return + logging.error(f"Error while leaving channel: {str(e)}", exc_info=True) + raise def on(self, event: str, ref: str, callback: Callback) -> Channel: """ @@ -108,12 +108,12 @@ async def send(self, event_name: str, payload: str, ref: str) -> None: """ if self.socket.version == 1: msg = dict(topic=self.topic, event=event_name, - payload=payload, ref=None) + payload=payload, ref=None) elif self.socket.version == 2: msg = [None, ref, self.topic, event_name, payload] try: await self.socket.ws_connection.send(json.dumps(msg)) except Exception as e: - print(str(e)) # TODO: better error propagation - return + logging.error(f"Error while sending message: {str(e)}", exc_info=True) + raise From 6f8d9c8da8e65d5b6383b792ac518e8872ca64b3 Mon Sep 17 00:00:00 2001 From: Vsevolod Karvetskiy <56288164+karvetskiy@users.noreply.github.com> Date: Sat, 11 Nov 2023 20:05:31 +0300 Subject: [PATCH 28/34] Test callback improvements (#1) * safe callback --------- Co-authored-by: seva --- realtime/connection.py | 28 +++++++++++++++++++--------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/realtime/connection.py b/realtime/connection.py index fa02bab..0f3124e 100644 --- a/realtime/connection.py +++ b/realtime/connection.py @@ -17,6 +17,7 @@ from realtime.channel import Channel from realtime.exceptions import NotConnectedError from realtime.message import HEARTBEAT_PAYLOAD, PHOENIX_CHANNEL, ChannelEvents, Message +from realtime.types import Callback T_Retval = TypeVar("T_Retval") T_ParamSpec = ParamSpec("T_ParamSpec") @@ -37,6 +38,10 @@ def wrapper(*args: T_ParamSpec.args, **kwargs: T_ParamSpec.kwargs) -> T_Retval: return wrapper +class CallbackError(Exception): + pass + + class Socket: def __init__( self, @@ -69,6 +74,15 @@ def __init__( self.channels: DefaultDict[str, List[Channel]] = defaultdict(list) + async def _run_callback_safe(self, callback: Callback, payload: Dict) -> None: + try: + if asyncio.iscoroutinefunction(callback): + asyncio.create_task(callback(payload)) + else: + callback(payload) + except Exception as e: + raise CallbackError("Error in callback") from e + @ensure_connection async def listen(self) -> None: """ @@ -109,10 +123,7 @@ async def listen(self) -> None: else: for cl in channel.listeners: if cl.ref in ["*", msg.ref]: - if asyncio.iscoroutinefunction(cl.callback): - asyncio.create_task(cl.callback(msg.payload)) - else: - cl.callback(msg.payload) + await self._run_callback_safe(cl.callback, msg.payload) if msg.event == ChannelEvents.close: for channel in self.channels.get(msg.topic, []): @@ -123,11 +134,7 @@ async def listen(self) -> None: for channel in self.channels.get(msg.topic, []): for cl in channel.listeners: if cl.event in ["*", msg.event]: - if asyncio.iscoroutinefunction(cl.callback): - asyncio.create_task(cl.callback(msg.payload)) - else: - cl.callback(msg.payload) - + await self._run_callback_safe(cl.callback, msg.payload) except ConnectionClosedOK: logging.info("Connection was closed normally.") @@ -153,6 +160,9 @@ async def listen(self) -> None: await self.leave_all() break + except CallbackError: + logging.info("Error in callback") + except ( Exception ) as e: # A general exception handler should be the last resort From 47907426ab47c5b69d3c5da99d318786dd2ca750 Mon Sep 17 00:00:00 2001 From: seva Date: Mon, 13 Nov 2023 10:00:44 +0100 Subject: [PATCH 29/34] do not raise exceptions on disconnect --- realtime/channel.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/realtime/channel.py b/realtime/channel.py index afdad6c..8aaf7d3 100644 --- a/realtime/channel.py +++ b/realtime/channel.py @@ -59,7 +59,7 @@ async def join(self) -> None: await self.socket.ws_connection.send(json.dumps(join_req)) except Exception as e: logging.error(f"Error while joining channel: {str(e)}", exc_info=True) - raise + return async def leave(self) -> None: """ @@ -76,7 +76,7 @@ async def leave(self) -> None: await self.socket.ws_connection.send(json.dumps(leave_req)) except Exception as e: logging.error(f"Error while leaving channel: {str(e)}", exc_info=True) - raise + return def on(self, event: str, ref: str, callback: Callback) -> Channel: """ From 373383a916e68cf078624d64eacf0c2a1a2344e8 Mon Sep 17 00:00:00 2001 From: seva Date: Thu, 23 Nov 2023 12:12:13 +0100 Subject: [PATCH 30/34] retry connection to socket --- realtime/connection.py | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/realtime/connection.py b/realtime/connection.py index 0f3124e..e13eaf3 100644 --- a/realtime/connection.py +++ b/realtime/connection.py @@ -170,14 +170,22 @@ async def listen(self) -> None: await self._handle_reconnection() async def connect(self) -> None: - ws_connection = await websockets.connect(self.url, ping_timeout=self.ping_timeout) + while True: + try: + ws_connection = await websockets.connect(self.url, ping_timeout=self.ping_timeout) - if ws_connection.open: - self.ws_connection = ws_connection - self.connected = True - logging.info("Connection was successful") - else: - raise Exception("Connection Failed") + self.ws_connection = ws_connection + self.connected = True + logging.info("Connection was successful") + break + except OSError: + logging.error( + "Connection failed. Retrying in 3 seconds. Ensure the server is alive and responsive." + ) + await asyncio.sleep(3) + except asyncio.CancelledError: + logging.info("Connect task was cancelled.") + break async def _handle_reconnection(self) -> None: if self.auto_reconnect: From 080419ec037c9a1f5d34829426570fe851b1ac60 Mon Sep 17 00:00:00 2001 From: Jean-Louis Huynen Date: Thu, 8 Feb 2024 10:27:45 +0100 Subject: [PATCH 31/34] chg: [connection] put sync callback in threads --- realtime/connection.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/realtime/connection.py b/realtime/connection.py index e13eaf3..df2c3d2 100644 --- a/realtime/connection.py +++ b/realtime/connection.py @@ -79,7 +79,7 @@ async def _run_callback_safe(self, callback: Callback, payload: Dict) -> None: if asyncio.iscoroutinefunction(callback): asyncio.create_task(callback(payload)) else: - callback(payload) + asyncio.create_task(asyncio.to_thread(callback, payload)) except Exception as e: raise CallbackError("Error in callback") from e From a1c3b1882e8913f73dae03b9f6eba255114049fa Mon Sep 17 00:00:00 2001 From: Jean-Louis Huynen Date: Tue, 13 Feb 2024 11:21:09 +0100 Subject: [PATCH 32/34] chg: [doc] change the readme, adds examples --- README.md | 92 +++++++++++++++++++++++++++----------- async-usage.py | 55 ----------------------- fd-usage.py | 65 +++++++++++++++++++++++++++ sending-receiving-usage.py | 69 ++++++++++++++++++++++++++++ usage.py | 45 +++++++++++++------ 5 files changed, 233 insertions(+), 93 deletions(-) delete mode 100644 async-usage.py create mode 100644 fd-usage.py create mode 100644 sending-receiving-usage.py diff --git a/README.md b/README.md index 39d11cc..5c9b2e3 100644 --- a/README.md +++ b/README.md @@ -11,63 +11,105 @@ pip3 install realtime==1.0.2 ## Installation from source ```bash -pip3 install -r requirements.txt +poetry install python3 usage.py - ``` ## Quick Start ```python from realtime.connection import Socket +import asyncio -def callback1(payload): - print("Callback 1: ", payload) +async def callback1(payload): + print(f"Got message: {payload}") -def callback2(payload): - print("Callback 2: ", payload) +async def main(): -if __name__ == "__main__": - URL = "ws://localhost:4000/socket/websocket" - s = Socket(URL) - s.connect() + # your phoenix server token + TOKEN = "" + # your phoenix server URL + URL = f"ws://127.0.0.1:4000/socket/websocket?token={TOKEN}&vsn=2.0.0" + + client = Socket(URL) + + # connect to the server + await client.connect() + + # fire and forget the listening routine + listen_task = asyncio.ensure_future(client.listen()) + + # join the channel + channel = client.set_channel("this:is:my:topic") + await channel.join() + + # by using a partial function + channel.on("your_event_name", None, callback1) + + # we give it some time to complete + await asyncio.sleep(10) - channel_1 = s.set_channel("realtime:public:todos") - channel_1.join().on("UPDATE", callback1) + # proper shut down + listen_task.cancel() - channel_2 = s.set_channel("realtime:public:users") - channel_2.join().on("*", callback2) +if __name__ == '__main__': + loop = asyncio.get_event_loop() + try: + loop.run_until_complete(main()) - s.listen() + except KeyboardInterrupt: + loop.stop() + exit(0) ``` +## Sending and Receiving data +Sending data to phoenix channels using `send`: +```python +await channel.send("your_handler", "this is my payload", None) +``` +One can also use references for queries/answers: +```python +ref = 1 +channel.on(None, ref, callback1) +await channel.send("your_handler", "this is my payload", ref) +# remove the callback when your are done +# |-> exercise left to the reader ;) +channel.off(None, ref, callback1) +``` ## Sample usage with Supabase -Here's how you could connect to your realtime endpoint using Supabase endpoint. Correct as of 5th June 2021. Please replace `SUPABASE_ID` and `API_KEY` with your own `SUPABASE_ID` and `API_KEY`. The variables shown below are fake and they will not work if you try to run the snippet. +Here's how you could connect to your realtime endpoint using Supabase endpoint. Should be correct as of 13th Feb 2024. Please replace `SUPABASE_ID` and `API_KEY` with your own `SUPABASE_ID` and `API_KEY`. The variables shown below are fake and they will not work if you try to run the snippet. ```python from realtime.connection import Socket +import asyncio SUPABASE_ID = "dlzlllxhaakqdmaapvji" API_KEY = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJyb2xlIjoiYW5vbiIsImlhdCI6MT" -def callback1(payload): +async def callback1(payload): print("Callback 1: ", payload) -if __name__ == "__main__": +async def main(): URL = f"wss://{SUPABASE_ID}.supabase.co/realtime/v1/websocket?apikey={API_KEY}&vsn=1.0.0" s = Socket(URL) - s.connect() + await s.connect() + listen_task = asyncio.ensure_future(s.listen()) channel_1 = s.set_channel("realtime:*") - channel_1.join().on("UPDATE", callback1) - s.listen() - -``` - -Then, go to the Supabase interface and toggle a row in a table. You should see a corresponding payload show up in your console/terminal. + await channel_1.join() + channel_1.on("UPDATE", callback1) +if __name__ == '__main__': + loop = asyncio.get_event_loop() + try: + loop.run_until_complete(main()) + except KeyboardInterrupt: + loop.stop() + exit(0) +``` +Then, go to the Supabase interface and toggle a row in a table. You should see a corresponding payload show up in your console/terminal. \ No newline at end of file diff --git a/async-usage.py b/async-usage.py deleted file mode 100644 index 880ddd0..0000000 --- a/async-usage.py +++ /dev/null @@ -1,55 +0,0 @@ -from realtime.connection import Socket -import asyncio -import uuid - -def callback1(payload): - print(f"c1: {payload}") - -def callback2(payload): - print(f"c2: {payload}") - - -async def main(): - - TOKEN = "" - URLsink = f"ws://127.0.0.1:4000/socket/websocket?token={TOKEN}&vsn=2.0.0" - - client = Socket(URLsink) - - await client.connect() - - # fire and forget the listening routine - listen_task = asyncio.ensure_future(client.listen()) - - channel_s = client.set_channel("yourchannel") - await channel_s.join() - channel_s.on("test_event", None, callback1) - - # non sense elixir handler, we would not have an event on a reply - #def handle_in("request_ping", payload, socket) do - # push(socket, "test_event", %{body: payload}) - # {:noreply, socket} - #end - - await channel_s.send("request_ping", "this is my payload 1", None) - await channel_s.send("request_ping", "this is my payload 2", None) - await channel_s.send("request_ping", "this is my payload 3", None) - - # proper relpy elixir handler - #def handle_in("ping", payload, socket) do - # {:reply, {:ok, payload}, socket} - #end - - ref = str(uuid.uuid4()) - channel_s.on(None, ref, callback2) - await channel_s.send("ping", "this is my ping payload", ref) - - # we give it some time to complete - await asyncio.sleep(15) - - # proper shut down - listen_task.cancel() - -if __name__ == '__main__': - loop = asyncio.get_event_loop() - loop.run_until_complete(main()) diff --git a/fd-usage.py b/fd-usage.py new file mode 100644 index 0000000..3e9e82b --- /dev/null +++ b/fd-usage.py @@ -0,0 +1,65 @@ +from realtime.connection import Socket +import asyncio +import uuid +import json +# We will use a partial function to pass the file descriptor to the callback +from functools import partial + +# notice that the callback has two arguments +# and that it is not an async function +# it will be executed in a different thread +def callback(fd, payload): + fd.write(json.dumps(payload)) + print(f"Callback with reference c2: {payload}") + +async def main(): + + # your phoenix server token + TOKEN = "" + # your phoenix server URL + URL = f"ws://127.0.0.1:4000/socket/websocket?token={TOKEN}&vsn=2.0.0" + + # We create a file descriptor to write the received messages + fd = create_file_and_return_fd() + + client = Socket(URL) + + # connect to the server + await client.connect() + + # fire and forget the listening routine + listen_task = asyncio.ensure_future(client.listen()) + + # join the channel + channel = client.set_channel("this:is:my:topic") + await channel.join() + + # we can also use reference for the callback + # with a proper reply elixir handler: + #def handle_in("ping", payload, socket) do + # {:reply, {:ok, payload}, socket} + # Here we use uuid, use whatever you want + ref = str(uuid.uuid4()) + # Pass the file descriptor to the callback through a partial function + channel.on(None, ref, partial(callback, fd)) + await channel.send("ping", "this is the ping payload that shall appear in myfile.txt", ref) + + # we give it some time to complete + await asyncio.sleep(10) + + # proper shut down + listen_task.cancel() + fd.close() + +def create_file_and_return_fd(): + fd = open("myfile.txt", "w") + return fd + +if __name__ == '__main__': + loop = asyncio.get_event_loop() + try: + loop.run_until_complete(main()) + + except KeyboardInterrupt: + loop.stop() + exit(0) \ No newline at end of file diff --git a/sending-receiving-usage.py b/sending-receiving-usage.py new file mode 100644 index 0000000..f39a2e7 --- /dev/null +++ b/sending-receiving-usage.py @@ -0,0 +1,69 @@ +from realtime.connection import Socket +import asyncio +import uuid + +async def callback1(payload): + print(f"c1: {payload}") + +def callback2(payload): + print(f"c2: {payload}") + +async def main(): + + # your phoenix server token + # TOKEN = "" + TOKEN = "SFMyNTY.g2gDYRRuBgBK_9yHjQFiAAFRgA.Fs-vgUBiBiSBtKq9qlKar8ny7SjO0ykXfaLUk2a1TMM" + # your phoenix server URL + # URL = f"ws://127.0.0.1:4000/socket/websocket?token={TOKEN}&vsn=2.0.0" + URL = f"ws://127.0.0.1:4000/feedsocket/websocket?token={TOKEN}&vsn=2.0.0" + + client = Socket(URL) + + # connect to the server + await client.connect() + + # fire and forget the listening routine + listen_task = asyncio.ensure_future(client.listen()) + + # join the channel + # channel = client.set_channel("this:is:my:topic") + channel = client.set_channel("feed:44") + await channel.join() + + channel.on("test_event", None, callback1) + + # here is an example corresponding elixir handler for the sake of the example: + #def handle_in("request_ping", payload, socket) do + # push(socket, "test_event", %{body: payload}) + # {:noreply, socket} + #end + + await channel.send("request_ping", "this is my payload 1", None) + await channel.send("request_ping", "this is my payload 2", None) + await channel.send("request_ping", "this is my payload 3", None) + + # we can also use reference for the callback + # with a proper reply elixir handler: + #def handle_in("ping", payload, socket) do + # {:reply, {:ok, payload}, socket} + #end + + # Here we use uuid, use whatever you want + ref = str(uuid.uuid4()) + channel.on(None, ref, callback2) + await channel.send("ping", "this is my ping payload", ref) + + # we give it some time to complete + await asyncio.sleep(10) + + # proper shut down + listen_task.cancel() + +if __name__ == '__main__': + loop = asyncio.get_event_loop() + try: + loop.run_until_complete(main()) + + except KeyboardInterrupt: + loop.stop() + exit(0) \ No newline at end of file diff --git a/usage.py b/usage.py index 56339ee..5085e95 100644 --- a/usage.py +++ b/usage.py @@ -1,23 +1,42 @@ from realtime.connection import Socket +import asyncio +async def callback1(payload): + print(f"Got message: {payload}") -def callback1(payload): - print("Callback 1: ", payload) +async def main(): + # your phoenix server token + TOKEN = "" + # your phoenix server URL + URL = f"ws://127.0.0.1:4000/socket/websocket?token={TOKEN}&vsn=2.0.0" -def callback2(payload): - print("Callback 2: ", payload) + client = Socket(URL) + # connect to the server + await client.connect() -if __name__ == "__main__": - URL = "ws://localhost:4000/socket/websocket" - s = Socket(URL) - s.connect() + # fire and forget the listening routine + listen_task = asyncio.ensure_future(client.listen()) - channel_1 = s.set_channel("realtime:public:todos") - channel_1.join().on("UPDATE", callback1) + # join the channel + channel = client.set_channel("this:is:my:topic") + await channel.join() - channel_2 = s.set_channel("realtime:public:users") - channel_2.join().on("*", callback2) + # by using a partial function + channel.on("your_event_name", None, callback1) - s.listen() + # we give it some time to complete + await asyncio.sleep(10) + + # proper shut down + listen_task.cancel() + +if __name__ == '__main__': + loop = asyncio.get_event_loop() + try: + loop.run_until_complete(main()) + + except KeyboardInterrupt: + loop.stop() + exit(0) \ No newline at end of file From efded7fbb5e4dcbaf361d2c013f326c25add74fe Mon Sep 17 00:00:00 2001 From: Jean-Louis Huynen Date: Tue, 13 Feb 2024 11:22:42 +0100 Subject: [PATCH 33/34] chg: [python] bumps python to 3.9 for better functools --- poetry.lock | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/poetry.lock b/poetry.lock index cc13076..a168c32 100644 --- a/poetry.lock +++ b/poetry.lock @@ -5,7 +5,7 @@ name = "annotated-types" version = "0.6.0" description = "Reusable constraint types to use with typing.Annotated" optional = false -python-versions = ">=3.8" +python-versions = ">=3.9" files = [ {file = "annotated_types-0.6.0-py3-none-any.whl", hash = "sha256:0641064de18ba7a25dee8f96403ebc39113d0cb953a01429249d5c7564666a43"}, {file = "annotated_types-0.6.0.tar.gz", hash = "sha256:563339e807e53ffd9c267e99fc6d9ea23eb8443c08f112651963e24e22f84a5d"}, From 46725e7faa8801ea606dcfe08011fcd1365a7f6f Mon Sep 17 00:00:00 2001 From: Jean-Louis Huynen Date: Tue, 13 Feb 2024 11:32:26 +0100 Subject: [PATCH 34/34] chg: [docs] typos, links to examples --- README.md | 3 +++ sending-receiving-usage.py | 9 +++------ 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 5c9b2e3..aeda292 100644 --- a/README.md +++ b/README.md @@ -77,6 +77,9 @@ await channel.send("your_handler", "this is my payload", ref) channel.off(None, ref, callback1) ``` +## Examples +see `usage.py`, `sending-receiving-usage.py`, and `fd-usage.py`. + ## Sample usage with Supabase Here's how you could connect to your realtime endpoint using Supabase endpoint. Should be correct as of 13th Feb 2024. Please replace `SUPABASE_ID` and `API_KEY` with your own `SUPABASE_ID` and `API_KEY`. The variables shown below are fake and they will not work if you try to run the snippet. diff --git a/sending-receiving-usage.py b/sending-receiving-usage.py index f39a2e7..597e24b 100644 --- a/sending-receiving-usage.py +++ b/sending-receiving-usage.py @@ -11,11 +11,9 @@ def callback2(payload): async def main(): # your phoenix server token - # TOKEN = "" - TOKEN = "SFMyNTY.g2gDYRRuBgBK_9yHjQFiAAFRgA.Fs-vgUBiBiSBtKq9qlKar8ny7SjO0ykXfaLUk2a1TMM" + TOKEN = "" # your phoenix server URL - # URL = f"ws://127.0.0.1:4000/socket/websocket?token={TOKEN}&vsn=2.0.0" - URL = f"ws://127.0.0.1:4000/feedsocket/websocket?token={TOKEN}&vsn=2.0.0" + URL = f"ws://127.0.0.1:4000/socket/websocket?token={TOKEN}&vsn=2.0.0" client = Socket(URL) @@ -26,8 +24,7 @@ async def main(): listen_task = asyncio.ensure_future(client.listen()) # join the channel - # channel = client.set_channel("this:is:my:topic") - channel = client.set_channel("feed:44") + channel = client.set_channel("this:is:my:topic") await channel.join() channel.on("test_event", None, callback1)