Skip to content

Commit

Permalink
Improve handling of not found schema
Browse files Browse the repository at this point in the history
  • Loading branch information
TheSuperiorStanislav committed Jul 10, 2024
1 parent 3fb2243 commit ede2934
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 41 deletions.
29 changes: 28 additions & 1 deletion sns_sqs_communicator/processing/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@

import pydantic

from .. import messages, metrics, schemas
import mypy_boto3_sqs.type_defs

from .. import messages, metrics, parsers, schemas

ProcessorT = typing.TypeVar(
"ProcessorT",
Expand Down Expand Up @@ -134,6 +136,31 @@ class ModelProcessor(
cls.registry[for_type] = cls()
cls.for_type = for_type

@classmethod
@metrics.tracker
async def process(
cls,
raw_message: mypy_boto3_sqs.type_defs.MessageTypeDef,
parser: type[parsers.ParserProtocol[messages.MessageActionT]],
logger: logging.Logger,
) -> ProcessingResult[typing.Any]:
"""Process raw message."""
try:
message = parser.parse(raw_message)
except schemas.QueueBodySchemaNotRegisteredError as not_found_error:
logger.info(f"Cancelled, reason: {not_found_error!s}")
return ProcessingResult[typing.Any](
status=ProcessingResultStatus.canceled,
message=str(not_found_error),
result=None,
exception=not_found_error,
)
processor = cls.get(message_type=message.type)
return await processor(
message=message,
logger=logger,
)

@classmethod
def get(
cls,
Expand Down
6 changes: 5 additions & 1 deletion sns_sqs_communicator/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
import typing_extensions


class QueueBodySchemaNotRegisteredError(Exception):
"""Exception we're unable to find schema for type."""


class QueueBodySchema(pydantic.BaseModel):
"""Base schema for serialization message body for queue message."""

Expand Down Expand Up @@ -46,7 +50,7 @@ def get_schema_by_message_type(
"""Retrieve schema for given message type from registry."""
if schema := cls.registry.get(message_type):
return schema
raise ValueError(
raise QueueBodySchemaNotRegisteredError(
f"Schema for message type {message_type} is not registered",
)

Expand Down
20 changes: 4 additions & 16 deletions sns_sqs_communicator/sqs_poll_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ class SQSPollWorker(
logging_level: str = "INFO"
queue_class: type[queue.SQSQueue]
parser_class: type[parsers.ParserProtocol[messages.MessageActionT]]
core_processor_class: type[
processing.core.Processor[typing.Any, messages.MessageActionT]
] = processing.core.Processor

@classmethod
def setup_sqs_client(cls) -> clients.SQSClient:
Expand Down Expand Up @@ -198,27 +201,12 @@ async def process_message(
logger: logging.Logger,
) -> typing.Any:
"""Handle incoming message from queue."""
message = parser.parse(raw_message)
processor = await cls.get_processor(
return await cls.core_processor_class.process(
raw_message=raw_message,
parser=parser,
)
return await processor(
message=message,
logger=logger,
)

@classmethod
@metrics.tracker
async def get_processor(
cls,
raw_message: mypy_boto3_sqs.type_defs.MessageTypeDef,
parser: type[parsers.ParserProtocol[messages.MessageActionT]],
) -> processing.Processor[typing.Any, typing.Any]:
"""Get matching processor for message."""
message = parser.parse(raw_message)
return processing.Processor.get(message.type)

@classmethod
@metrics.tracker
async def handle_processing_error(
Expand Down
22 changes: 22 additions & 0 deletions tests/test_canceled.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,25 @@ async def test_not_found_action(
message=message,
expected_message="No method defined for not_found_action action",
)


async def test_not_found_schema(
sns_sqs_worker: sns_sqs_communicator.testing.TestWorker[
queues.MessageAction
],
) -> None:
"""Test that cancellation works on not found schema as expected."""
message = queues.Message[queues.UnknownQueueBodySchema](
body_schema=queues.UnknownQueueBodySchema(
message="test_not_found_schema",
),
action=queues.MessageAction.not_found_action,
type="unknown_schema",
)
await sns_sqs_communicator.testing.push_and_pull_canceled_result(
sns_sqs_worker=sns_sqs_worker,
message=message,
expected_message=(
"Schema for message type unknown_schema is not registered"
),
)
23 changes: 0 additions & 23 deletions tests/test_failure.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,26 +69,3 @@ async def test_not_found_processor(
),
expected_exception=KeyError,
)


async def test_not_found_schema(
sns_sqs_worker: sns_sqs_communicator.testing.TestWorker[
queues.MessageAction
],
) -> None:
"""Test that cancellation works on not found schema as expected."""
message = queues.Message[queues.UnknownQueueBodySchema](
body_schema=queues.UnknownQueueBodySchema(
message="test_not_found_schema",
),
action=queues.MessageAction.not_found_action,
type="unknown_schema",
)
await sns_sqs_communicator.testing.push_and_pull_failed_result(
sns_sqs_worker=sns_sqs_worker,
message=message,
expected_message=(
"Schema for message type unknown_schema is not registered"
),
expected_exception=ValueError,
)

0 comments on commit ede2934

Please sign in to comment.