Skip to content

Commit

Permalink
[Issue #3527] Modify the logic around the opportunity change tracking…
Browse files Browse the repository at this point in the history
… table to never delete records (#3565)

## Summary
Fixes #3527

### Time to review: 30 mins

## Changes proposed
Add a new table to track task progress
Rename queue table and add all opportunities to it. Add any new
opportunities that come in (should basically always be 1:1)
For index job, load data based on `last_loaded_at` date rather than
using `has_update` field or deleting records as we go.

## Context for reviewers
Cascade deleting orphan relationship remains unchanged
JobTable is meant to be general purpose and can be used, possibly ended,
by other tasks.

## Additional information
See unit tests
  • Loading branch information
mikehgrantsgov authored Jan 27, 2025
1 parent f6d3484 commit 029ea8e
Show file tree
Hide file tree
Showing 12 changed files with 457 additions and 46 deletions.
6 changes: 6 additions & 0 deletions api/src/constants/lookup_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,9 @@ class OpportunityAttachmentType(StrEnum):

class ExternalUserType(StrEnum):
LOGIN_GOV = "login_gov"


class JobStatus(StrEnum):
STARTED = "started"
COMPLETED = "completed"
FAILED = "failed"
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
"""Rename tables and create job table
Revision ID: dc04ce955a9a
Revises: 99bb8e01ad38
Create Date: 2025-01-16 18:34:48.013913
"""

import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import postgresql
from sqlalchemy.sql import text

# revision identifiers, used by Alembic.
revision = "dc04ce955a9a"
down_revision = "fe052c05c757"
branch_labels = None
depends_on = None


create_trigger_function = """
CREATE OR REPLACE FUNCTION update_opportunity_search_queue()
RETURNS TRIGGER AS $$
DECLARE
opp_id bigint;
BEGIN
-- Determine the opportunity_id based on the table
CASE TG_TABLE_NAME
WHEN 'link_opportunity_summary_funding_instrument' THEN
opp_id := (SELECT opportunity_id FROM api.opportunity_summary WHERE opportunity_summary_id = NEW.opportunity_summary_id);
WHEN 'link_opportunity_summary_funding_category' THEN
opp_id := (SELECT opportunity_id FROM api.opportunity_summary WHERE opportunity_summary_id = NEW.opportunity_summary_id);
WHEN 'link_opportunity_summary_applicant_type' THEN
opp_id := (SELECT opportunity_id FROM api.opportunity_summary WHERE opportunity_summary_id = NEW.opportunity_summary_id);
WHEN 'opportunity_summary' THEN
opp_id := NEW.opportunity_id;
WHEN 'current_opportunity_summary' THEN
opp_id := NEW.opportunity_id;
ELSE
opp_id := NEW.opportunity_id;
END CASE;
INSERT INTO api.opportunity_change_audit (opportunity_id)
VALUES (opp_id)
ON CONFLICT (opportunity_id)
DO UPDATE SET updated_at = CURRENT_TIMESTAMP;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
"""


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"job_log",
sa.Column("job_id", sa.UUID(), nullable=False),
sa.Column("job_type", sa.Text(), nullable=False),
sa.Column(
"job_status",
sa.Text(),
nullable=False,
),
sa.Column(
"created_at",
sa.TIMESTAMP(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated_at",
sa.TIMESTAMP(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.PrimaryKeyConstraint("job_id", name=op.f("job_pkey")),
schema="api",
)
op.create_table(
"opportunity_change_audit",
sa.Column("opportunity_id", sa.BigInteger(), nullable=False),
sa.Column(
"created_at",
sa.TIMESTAMP(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated_at",
sa.TIMESTAMP(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(
["opportunity_id"],
["api.opportunity.opportunity_id"],
name=op.f("opportunity_change_audit_opportunity_id_opportunity_fkey"),
),
sa.PrimaryKeyConstraint("opportunity_id", name=op.f("opportunity_change_audit_pkey")),
schema="api",
)
op.create_index(
op.f("opportunity_change_audit_opportunity_id_idx"),
"opportunity_change_audit",
["opportunity_id"],
unique=False,
schema="api",
)

op.execute(create_trigger_function)

# Insert all existing opportunities into the audit table
op.execute(
text(
"""
INSERT INTO api.opportunity_change_audit (opportunity_id, created_at, updated_at)
SELECT
opportunity_id,
CURRENT_TIMESTAMP as created_at,
CURRENT_TIMESTAMP as updated_at
FROM api.opportunity
ON CONFLICT (opportunity_id) DO NOTHING
"""
)
)

op.drop_index(
"opportunity_search_index_queue_opportunity_id_idx",
table_name="opportunity_search_index_queue",
schema="api",
)
op.drop_table("opportunity_search_index_queue", schema="api")

op.create_table(
"lk_job_status",
sa.Column("job_status_id", sa.Integer(), nullable=False),
sa.Column("description", sa.Text(), nullable=False),
sa.Column(
"created_at",
sa.TIMESTAMP(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"updated_at",
sa.TIMESTAMP(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.PrimaryKeyConstraint("job_status_id", name=op.f("lk_job_status_pkey")),
schema="api",
)
op.add_column("job_log", sa.Column("job_status_id", sa.Integer(), nullable=False), schema="api")
op.add_column(
"job_log",
sa.Column("metrics", postgresql.JSONB(astext_type=sa.Text()), nullable=True),
schema="api",
)
op.create_foreign_key(
op.f("job_log_job_status_id_lk_job_status_fkey"),
"job_log",
"lk_job_status",
["job_status_id"],
["job_status_id"],
source_schema="api",
referent_schema="api",
)
op.drop_column("job_log", "job_status", schema="api")
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column(
"job_log",
sa.Column("job_status", sa.TEXT(), autoincrement=False, nullable=False),
schema="api",
)
op.drop_constraint(
op.f("job_job_status_id_lk_job_status_fkey"), "job_log", schema="api", type_="foreignkey"
)
op.drop_column("job_log", "metrics", schema="api")
op.drop_column("job_log", "job_status_id", schema="api")
op.drop_table("lk_job_status", schema="api")
op.create_table(
"opportunity_search_index_queue",
sa.Column("opportunity_id", sa.BIGINT(), autoincrement=False, nullable=False),
sa.Column(
"created_at",
postgresql.TIMESTAMP(timezone=True),
server_default=sa.text("now()"),
autoincrement=False,
nullable=False,
),
sa.Column(
"updated_at",
postgresql.TIMESTAMP(timezone=True),
server_default=sa.text("now()"),
autoincrement=False,
nullable=False,
),
sa.ForeignKeyConstraint(
["opportunity_id"],
["api.opportunity.opportunity_id"],
name="opportunity_search_index_queue_opportunity_id_opportunity_fkey",
),
sa.PrimaryKeyConstraint("opportunity_id", name="opportunity_search_index_queue_pkey"),
schema="api",
)
op.create_index(
"opportunity_search_index_queue_opportunity_id_idx",
"opportunity_search_index_queue",
["opportunity_id"],
unique=False,
schema="api",
)
op.drop_index(
op.f("opportunity_change_audit_opportunity_id_idx"),
table_name="opportunity_change_audit",
schema="api",
)
op.drop_table("opportunity_change_audit", schema="api")
op.drop_table("job_log", schema="api")
# ### end Alembic commands ###
11 changes: 10 additions & 1 deletion api/src/db/models/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
import logging

from . import agency_models, base, extract_models, lookup_models, opportunity_models, user_models
from . import (
agency_models,
base,
extract_models,
lookup_models,
opportunity_models,
task_models,
user_models,
)

logger = logging.getLogger(__name__)

Expand All @@ -15,4 +23,5 @@
"agency_models",
"user_models",
"extract_models",
"task_models",
]
21 changes: 21 additions & 0 deletions api/src/db/models/lookup_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
ExtractType,
FundingCategory,
FundingInstrument,
JobStatus,
OpportunityAttachmentType,
OpportunityCategory,
OpportunityStatus,
Expand Down Expand Up @@ -116,6 +117,14 @@
]
)

JOB_STATUS_CONFIG = LookupConfig(
[
LookupStr(JobStatus.STARTED, 1),
LookupStr(JobStatus.COMPLETED, 2),
LookupStr(JobStatus.FAILED, 3),
]
)

EXTERNAL_USER_TYPE_CONFIG = LookupConfig([LookupStr(ExternalUserType.LOGIN_GOV, 1)])

EXTRACT_TYPE_CONFIG = LookupConfig(
Expand Down Expand Up @@ -266,3 +275,15 @@ def from_lookup(cls, lookup: Lookup) -> "LkExtractType":
return LkExtractType(
extract_type_id=lookup.lookup_val, description=lookup.get_description()
)


@LookupRegistry.register_lookup(JOB_STATUS_CONFIG)
class LkJobStatus(LookupTable, TimestampMixin):
__tablename__ = "lk_job_status"

job_status_id: Mapped[int] = mapped_column(primary_key=True)
description: Mapped[str]

@classmethod
def from_lookup(cls, lookup: Lookup) -> "LkJobStatus":
return LkJobStatus(job_status_id=lookup.lookup_val, description=lookup.get_description())
6 changes: 3 additions & 3 deletions api/src/db/models/opportunity_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def agency(self) -> str | None:
back_populates="opportunity", uselist=True, cascade="all, delete-orphan"
)

opportunity_search_index_queue: Mapped["OpportunitySearchIndexQueue | None"] = relationship(
opportunity_change_audit: Mapped["OpportunityChangeAudit | None"] = relationship(
back_populates="opportunity", single_parent=True, cascade="all, delete-orphan"
)

Expand Down Expand Up @@ -452,8 +452,8 @@ class OpportunityAttachment(ApiSchemaTable, TimestampMixin):
legacy_folder_id: Mapped[int | None] = mapped_column(BigInteger)


class OpportunitySearchIndexQueue(ApiSchemaTable, TimestampMixin):
__tablename__ = "opportunity_search_index_queue"
class OpportunityChangeAudit(ApiSchemaTable, TimestampMixin):
__tablename__ = "opportunity_change_audit"

opportunity_id: Mapped[int] = mapped_column(
BigInteger, ForeignKey(Opportunity.opportunity_id), primary_key=True, index=True
Expand Down
22 changes: 22 additions & 0 deletions api/src/db/models/task_models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import uuid

from sqlalchemy import ForeignKey
from sqlalchemy.dialects.postgresql import JSONB, UUID
from sqlalchemy.orm import Mapped, mapped_column

from src.adapters.db.type_decorators.postgres_type_decorators import LookupColumn
from src.db.models.base import ApiSchemaTable, TimestampMixin
from src.db.models.lookup_models import JobStatus, LkJobStatus


class JobLog(ApiSchemaTable, TimestampMixin):
__tablename__ = "job_log"

job_id: Mapped[uuid.UUID] = mapped_column(UUID, primary_key=True, default=uuid.uuid4)
job_type: Mapped[str]
job_status: Mapped[JobStatus] = mapped_column(
"job_status_id",
LookupColumn(LkJobStatus),
ForeignKey(LkJobStatus.job_status_id),
)
metrics: Mapped[dict | None] = mapped_column(JSONB)
Loading

0 comments on commit 029ea8e

Please sign in to comment.