Skip to content

Commit

Permalink
allow preconfigured redis clients
Browse files Browse the repository at this point in the history
  • Loading branch information
alex-oleshkevich committed Nov 8, 2024
1 parent a422d8a commit c3caadd
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 4 deletions.
15 changes: 11 additions & 4 deletions broadcaster/backends/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,26 @@


class RedisBackend(BroadcastBackend):
def __init__(self, url: str):
self._conn = redis.Redis.from_url(url)
_conn: redis.Redis

def __init__(self, url: str | None = None, *, conn: redis.Redis | None = None):
if url is None:
assert conn is not None, "conn must be provided if url is not"
self._conn = conn
else:
self._conn = redis.Redis.from_url(url)

self._pubsub = self._conn.pubsub()
self._ready = asyncio.Event()
self._queue: asyncio.Queue[Event] = asyncio.Queue()
self._listener: asyncio.Task[None] | None = None

async def connect(self) -> None:
self._listener = asyncio.create_task(self._pubsub_listener())
await self._pubsub.connect()
await self._pubsub.connect() # type: ignore[no-untyped-call]

async def disconnect(self) -> None:
await self._pubsub.aclose()
await self._pubsub.aclose() # type: ignore[no-untyped-call]
await self._conn.aclose()
if self._listener is not None:
self._listener.cancel()
Expand Down
19 changes: 19 additions & 0 deletions tests/test_broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
import typing

import pytest
from redis import asyncio as redis

from broadcaster import Broadcast, BroadcastBackend, Event
from broadcaster.backends.kafka import KafkaBackend
from broadcaster.backends.redis import RedisBackend


class CustomBackend(BroadcastBackend):
Expand Down Expand Up @@ -56,6 +58,23 @@ async def test_redis():
assert event.message == "hello"


@pytest.mark.asyncio
async def test_redis_configured_client():
backend = RedisBackend(conn=redis.Redis.from_url("redis://localhost:6379"))
async with Broadcast(backend=backend) as broadcast:
async with broadcast.subscribe("chatroom") as subscriber:
await broadcast.publish("chatroom", "hello")
event = await subscriber.get()
assert event.channel == "chatroom"
assert event.message == "hello"


@pytest.mark.asyncio
async def test_redis_requires_url_or_connection():
with pytest.raises(AssertionError, match="conn must be provided if url is not"):
RedisBackend()


@pytest.mark.asyncio
async def test_redis_stream():
async with Broadcast("redis-stream://localhost:6379") as broadcast:
Expand Down

0 comments on commit c3caadd

Please sign in to comment.