Skip to content

Commit

Permalink
feat: add logic for handling FK constraint violation & added Tweet model
Browse files Browse the repository at this point in the history
  • Loading branch information
farbodahm committed May 10, 2023
1 parent c1fe005 commit 9a13919
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 29 deletions.
108 changes: 94 additions & 14 deletions cosumer/dabase_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,29 @@

from sqlalchemy.orm import Session
from google.protobuf.message import Message
from sqlalchemy.exc import IntegrityError

from model import twitter_pb2
from model.twitter_database_model import Base, User, Gender
from model.twitter_database_model import (
Base,
User,
Gender,
Tweet,
TweetLike,
)
from utility.generic_configs import Topics
from utility.logger import logging
from utility.logger import logger
from utility.exceptions import ProtobufToORMTransformerNotFound


class DatabaseWriter:
"""Class for writing consumed Protobuf messages to database."""

def __init__(self, db_session: Session, max_write_pool_buffer: int = 5) -> None:
def __init__(
self,
db_session: Session,
max_write_pool_buffer: int = 5,
) -> None:
self.db_session = db_session

self.topic_to_transformers = self._topic_to_transformer()
Expand All @@ -23,9 +34,13 @@ def __init__(self, db_session: Session, max_write_pool_buffer: int = 5) -> None:
self.max_write_pool_buffer = max_write_pool_buffer
self._write_pool_buffer: List[Base] = []

# A child record may be recieved faster than the parent record from Kafka
# topics. So it will be failed to write to DB because of ForeignKey
# constraint violation. We store it in a temp buffer and try to write it in next cycles.
self._fk_constraint_failed_buffer: List[Base] = []

def write_to_database(self, topic: str, message: Message) -> None:
"""Add Protobuf message to DB write pool and flush the write pool when threshold reached."""

transformer = self.topic_to_transformers.get(topic, None)
if transformer is None:
raise ProtobufToORMTransformerNotFound(
Expand All @@ -35,7 +50,7 @@ def write_to_database(self, topic: str, message: Message) -> None:
try:
db_model = transformer(message)
except Exception as e:
logging.error(
logger.error(
f"Error while transforming Protobuf message {message} to ORM model: {e}"
)
raise e
Expand All @@ -47,19 +62,67 @@ def write_to_database(self, topic: str, message: Message) -> None:

def _flush_write_pool(self) -> None:
"""Flush pool to database."""
logging.info(
logger.info(
f"Flushing write pool with {len(self._write_pool_buffer)} messages to database."
)
self.db_session.add_all(self._write_pool_buffer)

self.db_session.add_all(self._write_pool_buffer)
try:
self.db_session.commit()
except IntegrityError as e:
# Parent record is not processed yet; Insert records into a temp buffer
# and try to save later again.
logger.warning(f"Forign ket violation: {e}")
self.db_session.rollback()
self._fk_constraint_failed_buffer += self._write_pool_buffer
self._write_pool_buffer = []
return
except Exception as e:
logging.error(f"Error while committing write pool to database: {e}")
logger.error(f"Error while committing write pool to database: {e}")
raise e

self._write_pool_buffer.clear()

if len(self._fk_constraint_failed_buffer) > 0:
self._flush_fk_constraint_failed_pool()

def _flush_fk_constraint_failed_pool(self) -> None:
"""Flush list which contains records that failed to write to DB
because of FK constraint failed error."""
logger.info(
f"Flushing FK constraint failed pool with {len(self._fk_constraint_failed_buffer)} records"
)
temp_backup_pool: List[Base] = []

for record in self._fk_constraint_failed_buffer:
self.db_session.add(record)
try:
self.db_session.commit()
except IntegrityError as e:
# Parent record is not added yet, store it for next cycle.
self.db_session.rollback()
temp_backup_pool.append(record)
continue
except Exception as e:
logger.error(f"Error while committing backup pool to database: {e}")
raise e

self._fk_constraint_failed_buffer = temp_backup_pool
logger.info(
f"After flushing FK constraint failed pool: {len(self._fk_constraint_failed_buffer)}"
)

def _topic_to_transformer(self) -> Dict[str, Callable[[Message], Base]]:
"""Map each topic to its Protobuf to ORM model transformer."""

transformers = {
Topics.UsersTopic: self._transform_user_protobuf_to_db_model,
Topics.TweetsTopic: self._transform_tweet_protobuf_to_db_model,
Topics.TweetLikesTopic: self._transform_tweet_like_protobuf_to_db_model,
}

return transformers

def _transform_user_protobuf_to_db_model(
self, protobuf_message: twitter_pb2.User
) -> User:
Expand All @@ -76,11 +139,28 @@ def _transform_user_protobuf_to_db_model(
)
return user

def _topic_to_transformer(self) -> Dict[str, Callable[[Message], Base]]:
"""Map each topic to its Protobuf to ORM model transformer."""
def _transform_tweet_protobuf_to_db_model(
self, protobuf_message: twitter_pb2.Tweet
) -> User:
"""Transforms Protobuf Tweet message to related database model."""

transformers = {
Topics.UsersTopic: self._transform_user_protobuf_to_db_model,
}
tweet = Tweet(
id=int(protobuf_message.id),
text=protobuf_message.text,
user_id=int(protobuf_message.user_id),
tweeted_date=protobuf_message.tweeted_date.ToDatetime(),
)
return tweet

return transformers
def _transform_tweet_like_protobuf_to_db_model(
self, protobuf_message: twitter_pb2.TweetLike
) -> User:
"""Transforms Protobuf Tweet message to related database model."""

tweet = TweetLike(
id=int(protobuf_message.id),
tweet_id=int(protobuf_message.tweet_id),
user_id=int(protobuf_message.user_id),
liked_date=protobuf_message.liked_date.ToDatetime(),
)
return tweet
6 changes: 5 additions & 1 deletion cosumer/twitter_model_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,11 @@ def _process_message(self, protobuf_message: Message, topic: str) -> None:
# TODO: Remove after tests finished
logger.info(f"Processing from {topic} message {protobuf_message}")

if topic == Topics.UsersTopic or topic == Topics.TweetsTopic:
if (
topic == Topics.UsersTopic
or topic == Topics.TweetsTopic
or topic == Topics.TweetLikesTopic
):
self.db_writer.write_to_database(topic=topic, message=protobuf_message)

def _get_deserializers(self) -> Dict[str, ProtobufDeserializer]:
Expand Down
52 changes: 38 additions & 14 deletions model/twitter_database_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,8 @@
from datetime import datetime
import enum

from sqlalchemy import ForeignKey, String, Enum, DateTime
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column
from sqlalchemy.orm import relationship
from sqlalchemy import ForeignKey, String, Enum, DateTime, Table, Column
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship


class Gender(enum.Enum):
Expand All @@ -19,6 +16,25 @@ class Base(DeclarativeBase):
pass


# tweet_like_association_table = Table(
# "tweet_like",
# Base.metadata,
# Column("id", primary_key=True),
# Column("user_id", ForeignKey("user.id")),
# Column("tweet_id", ForeignKey("tweet.id")),
# Column("liked_date", DateTime),
# )


class TweetLike(Base):
__tablename__ = "tweet_like"

id: Mapped[int] = mapped_column(primary_key=True)
tweet_id: Mapped[int] = mapped_column(ForeignKey("tweet.id"))
user_id: Mapped[int] = mapped_column(ForeignKey("user.id"))
liked_date: Mapped[datetime] = mapped_column(DateTime, nullable=True)


class User(Base):
__tablename__ = "user"

Expand All @@ -29,15 +45,23 @@ class User(Base):
gender: Mapped[Gender] = mapped_column(Enum(Gender))
created_date: Mapped[datetime] = mapped_column(DateTime, nullable=True)

# Relationships
tweets: Mapped[List["Tweet"]] = relationship("Tweet", back_populates="user")
liked_tweets: Mapped[List["Tweet"]] = relationship(
secondary=TweetLike.__table__, back_populates="liked_by"
)

# addresses: Mapped[List["Address"]] = relationship(
# back_populates="user", cascade="all, delete-orphan"
# )

class Tweet(Base):
__tablename__ = "tweet"

id: Mapped[int] = mapped_column(primary_key=True)
text: Mapped[str] = mapped_column(String(255))
user_id: Mapped[int] = mapped_column(ForeignKey("user.id"))
tweeted_date: Mapped[datetime] = mapped_column(DateTime, nullable=True)

# class Address(Base):
# __tablename__ = "address"
# id: Mapped[int] = mapped_column(primary_key=True)
# email_address: Mapped[str]
# user_id: Mapped[int] = mapped_column(ForeignKey("user_account.id"))
# user: Mapped["User"] = relationship(back_populates="addresses")
# Relationships
user: Mapped["User"] = relationship("User", back_populates="tweets")
liked_by: Mapped[List["User"]] = relationship(
secondary=TweetLike.__table__, back_populates="liked_tweets"
)

0 comments on commit 9a13919

Please sign in to comment.