Skip to content

Commit

Permalink
Add topic management (#77)
Browse files Browse the repository at this point in the history
feat: Add topic management to the client

* Add topic management request handler
* Add topic management tests
* Add FcmOptions attr to Message dataclass
* Fix typing
* Precommit done
* Black review updates
* Fix tests
* Rename topic management error info class
  • Loading branch information
eighthcolor22 authored Mar 20, 2024
1 parent e2a9040 commit 5296976
Show file tree
Hide file tree
Showing 5 changed files with 307 additions and 20 deletions.
81 changes: 64 additions & 17 deletions async_firebase/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,14 @@
from google.oauth2 import service_account # type: ignore

from async_firebase._config import DEFAULT_REQUEST_LIMITS, DEFAULT_REQUEST_TIMEOUT, RequestLimits, RequestTimeout
from async_firebase.messages import FCMBatchResponse, FCMResponse
from async_firebase.utils import FCMBatchResponseHandler, FCMResponseHandler, join_url, serialize_mime_message
from async_firebase.messages import FCMBatchResponse, FCMResponse, TopicManagementResponse
from async_firebase.utils import (
FCMBatchResponseHandler,
FCMResponseHandler,
TopicManagementResponseHandler,
join_url,
serialize_mime_message,
)


class AsyncClientBase:
Expand All @@ -32,6 +38,10 @@ class AsyncClientBase:
SCOPES: t.List[str] = [
"https://www.googleapis.com/auth/cloud-platform",
]
IID_URL = "https://iid.googleapis.com"
IID_HEADERS = {"access_token_auth": "true"}
TOPIC_ADD_ACTION = "iid/v1:batchAdd"
TOPIC_REMOVE_ACTION = "iid/v1:batchRemove"

def __init__(
self,
Expand Down Expand Up @@ -162,25 +172,14 @@ async def prepare_headers(self) -> t.Dict[str, str]:
"X-FIREBASE-CLIENT": "async-firebase/{0}".format(version("async-firebase")),
}

async def send_request(
async def _send_request(
self,
uri: str,
response_handler: t.Union[FCMResponseHandler, FCMBatchResponseHandler],
url: str,
response_handler: t.Union[FCMResponseHandler, FCMBatchResponseHandler, TopicManagementResponseHandler],
json_payload: t.Optional[t.Dict[str, t.Any]] = None,
headers: t.Optional[t.Dict[str, str]] = None,
content: t.Union[str, bytes, t.Iterable[bytes], t.AsyncIterable[bytes], None] = None,
) -> t.Union[FCMResponse, FCMBatchResponse]:
"""
Sends an HTTP call using the ``httpx`` library.
:param uri: URI to be requested.
:param response_handler: the model to handle response.
:param json_payload: request JSON payload
:param headers: request headers.
:param content: request content
:return: HTTP response
"""
url = join_url(self.BASE_URL, uri)
) -> t.Union[FCMResponse, FCMBatchResponse, TopicManagementResponse]:
logging.debug(
"Requesting POST %s, payload: %s, content: %s, headers: %s",
url,
Expand All @@ -207,3 +206,51 @@ async def send_request(
response = response_handler.handle_response(raw_fcm_response)

return response

async def send_request(
self,
uri: str,
response_handler: t.Union[FCMResponseHandler, FCMBatchResponseHandler],
json_payload: t.Optional[t.Dict[str, t.Any]] = None,
headers: t.Optional[t.Dict[str, str]] = None,
content: t.Union[str, bytes, t.Iterable[bytes], t.AsyncIterable[bytes], None] = None,
) -> t.Union[FCMResponse, FCMBatchResponse]:
"""
Sends an HTTP call using the ``httpx`` library to FCM.
:param uri: URI to be requested.
:param response_handler: the model to handle response.
:param json_payload: request JSON payload
:param headers: request headers.
:param content: request content
:return: HTTP response
"""
url = join_url(self.BASE_URL, uri)
return await self._send_request( # type: ignore
url=url, response_handler=response_handler, json_payload=json_payload, headers=headers, content=content
)

async def send_iid_request(
self,
uri: str,
response_handler: TopicManagementResponseHandler,
json_payload: t.Optional[t.Dict[str, t.Any]] = None,
headers: t.Optional[t.Dict[str, str]] = None,
content: t.Union[str, bytes, t.Iterable[bytes], t.AsyncIterable[bytes], None] = None,
) -> TopicManagementResponse:
"""
Sends an HTTP call using the ``httpx`` library to the IID service for topic management functionality.
:param uri: URI to be requested.
:param response_handler: the model to handle response.
:param json_payload: request JSON payload
:param headers: request headers.
:param content: request content
:return: HTTP response
"""
url = join_url(self.IID_URL, uri)
headers = headers or await self.prepare_headers()
headers.update(self.IID_HEADERS)
return await self._send_request( # type: ignore
url=url, response_handler=response_handler, json_payload=json_payload, headers=headers, content=content
)
42 changes: 42 additions & 0 deletions async_firebase/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
Message,
MulticastMessage,
PushNotification,
TopicManagementResponse,
WebpushConfig,
WebpushFCMOptions,
WebpushNotification,
Expand All @@ -40,6 +41,7 @@
from async_firebase.utils import (
FCMBatchResponseHandler,
FCMResponseHandler,
TopicManagementResponseHandler,
cleanup_firebase_message,
serialize_mime_message,
)
Expand Down Expand Up @@ -547,3 +549,43 @@ async def send_each_for_multicast(
]

return await self.send_each(messages, dry_run=dry_run)

async def _make_topic_management_request(
self, device_tokens: t.List[str], topic_name: str, action: str
) -> TopicManagementResponse:
payload = {
"to": f"/topics/{topic_name}",
"registration_tokens": device_tokens,
}
response = await self.send_iid_request(
uri=action,
json_payload=payload,
response_handler=TopicManagementResponseHandler(),
)
return response

async def subscribe_devices_to_topic(self, device_tokens: t.List[str], topic_name: str) -> TopicManagementResponse:
"""
Subscribes devices to the topic.
:param device_tokens: devices ids to be subscribed.
:param topic_name: name of the topic.
:returns: Instance of messages.TopicManagementResponse.
"""
return await self._make_topic_management_request(
device_tokens=device_tokens, topic_name=topic_name, action=self.TOPIC_ADD_ACTION
)

async def unsubscribe_devices_from_topic(
self, device_tokens: t.List[str], topic_name: str
) -> TopicManagementResponse:
"""
Unsubscribes devices from the topic.
:param device_tokens: devices ids to be unsubscribed.
:param topic_name: name of the topic.
:returns: Instance of messages.TopicManagementResponse.
"""
return await self._make_topic_management_request(
device_tokens=device_tokens, topic_name=topic_name, action=self.TOPIC_REMOVE_ACTION
)
74 changes: 74 additions & 0 deletions async_firebase/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import typing as t
from dataclasses import dataclass, field

import httpx

from async_firebase.errors import AsyncFirebaseError


Expand Down Expand Up @@ -280,6 +282,17 @@ class WebpushConfig:
fcm_options: t.Optional[WebpushFCMOptions] = field(default=None)


@dataclass
class FcmOptions:
"""
Platform independent options for features provided by the FCM SDKs
Arguments:
analytics_label: Label associated with the message's analytics data.
"""

analytics_label: str


@dataclass
class Message:
"""
Expand All @@ -297,6 +310,7 @@ class Message:
token: the registration token of the device to which the message should be sent.
topic: name of the Firebase topic to which the message should be sent (optional).
condition: the Firebase condition to which the message should be sent (optional).
fcm_options: platform independent options for features provided by the FCM SDKs.
"""

token: t.Optional[str] = None
Expand All @@ -307,6 +321,7 @@ class Message:
apns: t.Optional[APNSConfig] = field(default=None)
topic: t.Optional[str] = None
condition: t.Optional[str] = None
fcm_options: t.Optional[FcmOptions] = None


@dataclass
Expand Down Expand Up @@ -395,3 +410,62 @@ def success_count(self):
@property
def failure_count(self):
return len(self.responses) - self.success_count


class TopicManagementErrorInfo:
"""An error encountered when performing a topic management operation."""

def __init__(self, index, reason):
self._index = index
self._reason = reason

@property
def index(self):
"""Index of the registration token to which this error is related to."""
return self._index

@property
def reason(self):
"""String describing the nature of the error."""
return self._reason


class TopicManagementResponse:
"""The response received from a topic management operation."""

def __init__(self, resp: t.Optional[httpx.Response] = None, exception: t.Optional[AsyncFirebaseError] = None):
self.exception = exception
self._success_count = 0
self._failure_count = 0
self._errors: t.List[TopicManagementErrorInfo] = []

if resp:
self._handle_response(resp)

def _handle_response(self, resp: httpx.Response):
response = resp.json()
results = response.get("results")
if not results:
raise ValueError("Unexpected topic management response: {0}.".format(resp))

for index, result in enumerate(results):
if "error" in result:
self._failure_count += 1
self._errors.append(TopicManagementErrorInfo(index, result["error"]))
else:
self._success_count += 1

@property
def success_count(self):
"""Number of tokens that were successfully subscribed or unsubscribed."""
return self._success_count

@property
def failure_count(self):
"""Number of tokens that could not be subscribed or unsubscribed due to errors."""
return self._failure_count

@property
def errors(self):
"""A list of ``messaging.ErrorInfo`` objects (possibly empty)."""
return self._errors
21 changes: 19 additions & 2 deletions async_firebase/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
UnknownError,
UnregisteredError,
)
from async_firebase.messages import FCMBatchResponse, FCMResponse
from async_firebase.messages import FCMBatchResponse, FCMResponse, TopicManagementResponse


def join_url(
Expand Down Expand Up @@ -158,7 +158,7 @@ def serialize_mime_message(
return fp.getvalue()


FCMResponseType = t.TypeVar("FCMResponseType", FCMResponse, FCMBatchResponse)
FCMResponseType = t.TypeVar("FCMResponseType", FCMResponse, FCMBatchResponse, TopicManagementResponse)


class FCMResponseHandlerBase(ABC, t.Generic[FCMResponseType]):
Expand Down Expand Up @@ -355,3 +355,20 @@ def _deserialize_batch_response(response: httpx.Response) -> t.List[httpx.Respon
responses.append(resp)

return responses


class TopicManagementResponseHandler(FCMResponseHandlerBase[TopicManagementResponse]):
def handle_error(self, error: httpx.HTTPError) -> TopicManagementResponse:
exc = (
(isinstance(error, httpx.HTTPStatusError) and self._handle_fcm_error(error))
or (isinstance(error, httpx.HTTPError) and self._handle_request_error(error))
or AsyncFirebaseError(
code=FcmErrorCode.UNKNOWN.value,
message="Unexpected error has happened when hitting the FCM API",
cause=error,
)
)
return TopicManagementResponse(exception=exc)

def handle_response(self, response: httpx.Response) -> TopicManagementResponse:
return TopicManagementResponse(response)
Loading

0 comments on commit 5296976

Please sign in to comment.