-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbackend.py
46 lines (35 loc) · 1.38 KB
/
backend.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
from aws_lambda_powertools.utilities import data_classes
from aws_lambda_powertools.utilities.data_classes import SQSEvent
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
from aws_lambda_powertools.utilities.data_classes.event_bridge_event import EventBridgeEvent
from aws_lambda_powertools.utilities.typing import LambdaContext
from typing import cast, List, Dict, Optional
import json
from lib.event import BaseEvent
### Events
class TestEvent(BaseEvent):
text: Optional[str]
def handle(self):
print(self)
SUPPORTED_EVENTS:List[BaseEvent] = [TestEvent]
## Handlers
EVENTS_HANDLERS:Dict[str, BaseEvent] = { c.__name__: c for c in SUPPORTED_EVENTS}
def handler(event:dict, context: LambdaContext):
if "name" in event:
# Raw Event / Scheduled
handle_event(event, context)
elif "Records" in event:
# SQS Records
sqsEvent = SQSEvent(event)
for record in sqsEvent.records:
handle_event(json.loads(record.body), context)
else:
raise ValueError(f"Expected SQS or Raw Event: {event}")
def handle_event(json_body:dict, context: LambdaContext):
if not "name" in json_body:
raise ValueError(f"No name in event")
name = json_body["name"]
if name not in EVENTS_HANDLERS:
raise ValueError(f"Unrecognized Event name: {name}")
print(f"Received Event: name={name}")
return EVENTS_HANDLERS[name].parse_obj(json_body).handle()