Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chg: [serializer] add support for phoenix version 2 serialization #68

Open
wants to merge 37 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
9e23d75
chg: [serializer] add support for phoenix version 2 serialization
gallypette Jul 24, 2023
52ece71
Merge branch 'master' of https://github.com/supabase-community/realti…
gallypette Oct 4, 2023
e5bd333
add: [send] sending to the channel poc
gallypette Oct 4, 2023
93bf498
add: [send] listener on a msg ref
gallypette Oct 4, 2023
4b3215b
add: [leave] leaving a channel
gallypette Oct 5, 2023
ca06e8e
chg: [logging] handling control msg
gallypette Oct 17, 2023
172f9c1
chg: [logging] handling control msg - again
gallypette Oct 17, 2023
ccc9e78
chg: [connection] leave on kb interrupt
gallypette Oct 18, 2023
3e4424d
chg: [async] remove non-async API
gallypette Oct 20, 2023
f92d755
chg: [async] {} -> self.params
gallypette Oct 25, 2023
5009fcf
Fix reconnection
maxbaluev Oct 31, 2023
86d9306
Update connection.py, fix join async
maxbaluev Oct 31, 2023
d010483
Improve error handling
maxbaluev Nov 1, 2023
e9b4d66
fix _handle_reconnection
maxbaluev Nov 1, 2023
e9e44d8
test improvements
Nov 6, 2023
f271851
test improvements
Nov 6, 2023
fcdc3de
test improvements
Nov 6, 2023
1251851
test improvements
Nov 6, 2023
6b32e15
test improvements
Nov 7, 2023
8debb8e
break on terminate
Nov 7, 2023
f227115
fix message
Nov 7, 2023
fdc3440
fix message
Nov 8, 2023
d00acfb
fix message
Nov 8, 2023
001eb5a
fix message
Nov 8, 2023
a6be915
fix message
Nov 8, 2023
52af8ff
fix message
Nov 8, 2023
7a5474d
fix message
Nov 8, 2023
cb1b695
raise exceptions
Nov 10, 2023
6f8d9c8
Test callback improvements (#1)
sevkar Nov 11, 2023
4790742
do not raise exceptions on disconnect
Nov 13, 2023
fac7bfe
Merge remote-tracking branch 'origin/master'
Nov 13, 2023
373383a
retry connection to socket
Nov 23, 2023
080419e
chg: [connection] put sync callback in threads
gallypette Feb 8, 2024
7c70dd5
merge: supabase/master 30699c9
gallypette Feb 8, 2024
a1c3b18
chg: [doc] change the readme, adds examples
gallypette Feb 13, 2024
efded7f
chg: [python] bumps python to 3.9 for better functools
gallypette Feb 13, 2024
46725e7
chg: [docs] typos, links to examples
gallypette Feb 13, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 70 additions & 25 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,63 +11,108 @@ pip3 install realtime==1.0.2

## Installation from source
```bash
pip3 install -r requirements.txt
poetry install
python3 usage.py

```

## Quick Start
```python
from realtime.connection import Socket
import asyncio

def callback1(payload):
print("Callback 1: ", payload)
async def callback1(payload):
print(f"Got message: {payload}")

def callback2(payload):
print("Callback 2: ", payload)
async def main():

if __name__ == "__main__":
URL = "ws://localhost:4000/socket/websocket"
s = Socket(URL)
s.connect()
# your phoenix server token
TOKEN = ""
# your phoenix server URL
URL = f"ws://127.0.0.1:4000/socket/websocket?token={TOKEN}&vsn=2.0.0"

client = Socket(URL)

# connect to the server
await client.connect()

# fire and forget the listening routine
listen_task = asyncio.ensure_future(client.listen())

# join the channel
channel = client.set_channel("this:is:my:topic")
await channel.join()

# by using a partial function
channel.on("your_event_name", None, callback1)

# we give it some time to complete
await asyncio.sleep(10)

channel_1 = s.set_channel("realtime:public:todos")
channel_1.join().on("UPDATE", callback1)
# proper shut down
listen_task.cancel()

channel_2 = s.set_channel("realtime:public:users")
channel_2.join().on("*", callback2)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())

s.listen()
except KeyboardInterrupt:
loop.stop()
exit(0)
```

## Sending and Receiving data
Sending data to phoenix channels using `send`:
```python
await channel.send("your_handler", "this is my payload", None)
```
One can also use references for queries/answers:
```python

ref = 1
channel.on(None, ref, callback1)
await channel.send("your_handler", "this is my payload", ref)
# remove the callback when your are done
# |-> exercise left to the reader ;)
channel.off(None, ref, callback1)
```

## Examples
see `usage.py`, `sending-receiving-usage.py`, and `fd-usage.py`.

## Sample usage with Supabase

Here's how you could connect to your realtime endpoint using Supabase endpoint. Correct as of 5th June 2021. Please replace `SUPABASE_ID` and `API_KEY` with your own `SUPABASE_ID` and `API_KEY`. The variables shown below are fake and they will not work if you try to run the snippet.
Here's how you could connect to your realtime endpoint using Supabase endpoint. Should be correct as of 13th Feb 2024. Please replace `SUPABASE_ID` and `API_KEY` with your own `SUPABASE_ID` and `API_KEY`. The variables shown below are fake and they will not work if you try to run the snippet.

```python
from realtime.connection import Socket
import asyncio

SUPABASE_ID = "dlzlllxhaakqdmaapvji"
API_KEY = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJyb2xlIjoiYW5vbiIsImlhdCI6MT"


def callback1(payload):
async def callback1(payload):
print("Callback 1: ", payload)

if __name__ == "__main__":
async def main():
URL = f"wss://{SUPABASE_ID}.supabase.co/realtime/v1/websocket?apikey={API_KEY}&vsn=1.0.0"
s = Socket(URL)
s.connect()
await s.connect()
listen_task = asyncio.ensure_future(s.listen())

channel_1 = s.set_channel("realtime:*")
channel_1.join().on("UPDATE", callback1)
s.listen()

```

Then, go to the Supabase interface and toggle a row in a table. You should see a corresponding payload show up in your console/terminal.
await channel_1.join()
channel_1.on("UPDATE", callback1)

if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())

except KeyboardInterrupt:
loop.stop()
exit(0)
```

Then, go to the Supabase interface and toggle a row in a table. You should see a corresponding payload show up in your console/terminal.
65 changes: 65 additions & 0 deletions fd-usage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from realtime.connection import Socket
import asyncio
import uuid
import json
# We will use a partial function to pass the file descriptor to the callback
from functools import partial

# notice that the callback has two arguments
# and that it is not an async function
# it will be executed in a different thread
def callback(fd, payload):
fd.write(json.dumps(payload))
print(f"Callback with reference c2: {payload}")

async def main():

# your phoenix server token
TOKEN = ""
# your phoenix server URL
URL = f"ws://127.0.0.1:4000/socket/websocket?token={TOKEN}&vsn=2.0.0"

# We create a file descriptor to write the received messages
fd = create_file_and_return_fd()

client = Socket(URL)

# connect to the server
await client.connect()

# fire and forget the listening routine
listen_task = asyncio.ensure_future(client.listen())

# join the channel
channel = client.set_channel("this:is:my:topic")
await channel.join()

# we can also use reference for the callback
# with a proper reply elixir handler:
#def handle_in("ping", payload, socket) do
# {:reply, {:ok, payload}, socket}
# Here we use uuid, use whatever you want
ref = str(uuid.uuid4())
# Pass the file descriptor to the callback through a partial function
channel.on(None, ref, partial(callback, fd))
await channel.send("ping", "this is the ping payload that shall appear in myfile.txt", ref)

# we give it some time to complete
await asyncio.sleep(10)

# proper shut down
listen_task.cancel()
fd.close()

def create_file_and_return_fd():
fd = open("myfile.txt", "w")
return fd

if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())

except KeyboardInterrupt:
loop.stop()
exit(0)
2 changes: 1 addition & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ python = "^3.8"
websockets = "^11.0"
python-dateutil = "^2.8.1"
typing-extensions = "^4.2.0"
uuid = "^1.30"

[tool.poetry.dev-dependencies]
pytest = "^7.2.0"
Expand Down
78 changes: 59 additions & 19 deletions realtime/channel.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from __future__ import annotations

import asyncio
import json
from typing import Any, List, Dict, TYPE_CHECKING, NamedTuple
import logging
import uuid
from typing import List, TYPE_CHECKING, NamedTuple, Dict, Any

from realtime.message import ChannelEvents
from realtime.types import Callback

if TYPE_CHECKING:
Expand All @@ -13,13 +15,15 @@
class CallbackListener(NamedTuple):
"""A tuple with `event` and `callback` """
event: str
ref: str
callback: Callback


class Channel:
"""
`Channel` is an abstraction for a topic listener for an existing socket connection.
Each Channel has its own topic and a list of event-callbacks that responds to messages.
A client can also send messages to a channel and register callback when expecting replies.
Should only be instantiated through `connection.Socket().set_channel(topic)`
Topic-Channel has a 1-many relationship.
"""
Expand All @@ -35,45 +39,81 @@ def __init__(self, socket: Socket, topic: str, params: Dict[str, Any] = {}) -> N
self.topic = topic
self.listeners: List[CallbackListener] = []
self.joined = False
self.join_ref = str(uuid.uuid4())
self.control_msg_ref = ""

def join(self) -> Channel:
async def join(self) -> None:
"""
Wrapper for async def _join() to expose a non-async interface
Essentially gets the only event loop and attempt joining a topic
:return: Channel
Coroutine that attempts to join Phoenix Realtime server via a certain topic
:return: None
"""
loop = asyncio.get_event_loop() # TODO: replace with get_running_loop
loop.run_until_complete(self._join())
return self
if self.socket.version == 1:
join_req = dict(topic=self.topic, event=ChannelEvents.join,
payload={}, ref=None)
elif self.socket.version == 2:
# [join_reference, message_reference, topic_name, event_name, payload]
self.control_msg_ref = str(uuid.uuid4())
join_req = [self.join_ref, self.control_msg_ref, self.topic, ChannelEvents.join, self.params]

try:
await self.socket.ws_connection.send(json.dumps(join_req))
except Exception as e:
logging.error(f"Error while joining channel: {str(e)}", exc_info=True)
return

async def _join(self) -> None:
async def leave(self) -> None:
"""
Coroutine that attempts to join Phoenix Realtime server via a certain topic
Coroutine that attempts to leave Phoenix Realtime server via a certain topic
:return: None
"""
join_req = dict(topic=self.topic, event="phx_join",
payload={}, ref=None)
if self.socket.version == 1:
leave_req = dict(topic=self.topic, event=ChannelEvents.leave,
payload={}, ref=None)
elif self.socket.version == 2:
leave_req = [self.join_ref, None, self.topic, ChannelEvents.leave, {}]

try:
await self.socket.ws_connection.send(json.dumps(join_req))
await self.socket.ws_connection.send(json.dumps(leave_req))
except Exception as e:
print(str(e)) # TODO: better error propagation
logging.error(f"Error while leaving channel: {str(e)}", exc_info=True)
return

def on(self, event: str, callback: Callback) -> Channel:
def on(self, event: str, ref: str, callback: Callback) -> Channel:
"""
:param event: A specific event will have a specific callback
:param ref: A specific reference that will have a specific callback
:param callback: Callback that takes msg payload as its first argument
:return: Channel
"""
cl = CallbackListener(event=event, callback=callback)
cl = CallbackListener(event=event, ref=ref, callback=callback)
self.listeners.append(cl)
return self

def off(self, event: str) -> None:
def off(self, event: str, ref: str) -> None:
"""
:param event: Stop responding to a certain event
:param event: Stop responding to a certain reference
:return: None
"""
self.listeners = [
callback for callback in self.listeners if callback.event != event]
callback for callback in self.listeners if (callback.event != event and callback.ref != ref)]

async def send(self, event_name: str, payload: str, ref: str) -> None:
"""
Coroutine that attempts to join Phoenix Realtime server via a certain topic
:param event_name: The event_name: it must match the first argument of a handle_in function on the server channel module.
:param payload: The payload to be sent to the phoenix server
:param ref: The message reference that the server will use for replying
:return: None
"""
if self.socket.version == 1:
msg = dict(topic=self.topic, event=event_name,
payload=payload, ref=None)
elif self.socket.version == 2:
msg = [None, ref, self.topic, event_name, payload]

try:
await self.socket.ws_connection.send(json.dumps(msg))
except Exception as e:
logging.error(f"Error while sending message: {str(e)}", exc_info=True)
raise
Loading