Skip to content

Commit

Permalink
3321/load attachment (#3467)
Browse files Browse the repository at this point in the history
## Summary
Fixes [#{3321}](#3321)

### Time to review: __10 mins__

## Changes proposed
Add `ENABLE_OPPORTUNITY_ATTACHMENT_PIPELINE` feature flag
Use multi-attachment pipeline to bulk update opportunities into
openSearch
Add/Update tests: Check attachment data was encoded 
Ensure attachments are indexed properly 

## Context for reviewers


## Additional information
> Screenshots, GIF demos, code examples or output to help show the
changes working as expected.

---------

Co-authored-by: nava-platform-bot <[email protected]>
  • Loading branch information
babebe and nava-platform-bot authored Jan 16, 2025
1 parent 5d3d50b commit 9a99264
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 24 deletions.
1 change: 1 addition & 0 deletions api/local.env
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ IS_LOCAL_AWS=1
# Feature Flags
############################
ENABLE_OPPORTUNITY_LOG_MSG=false
ENABLE_OPPORTUNITY_ATTACHMENT_PIPELINE=true

############################
# Endpoint Configuration
Expand Down
7 changes: 6 additions & 1 deletion api/src/adapters/search/opensearch_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ def bulk_upsert(
primary_key_field: str,
*,
refresh: bool = True,
pipeline: str | None = None,
) -> None:
"""
Bulk upsert records to an index
Expand Down Expand Up @@ -124,7 +125,11 @@ def bulk_upsert(
"operation": "update",
},
)
self._client.bulk(index=index_name, body=bulk_operations, refresh=refresh)
bulk_args = {"index": index_name, "body": bulk_operations, "refresh": refresh}
if pipeline:
bulk_args["pipeline"] = pipeline

self._client.bulk(**bulk_args)

def bulk_delete(self, index_name: str, ids: Iterable[Any], *, refresh: bool = True) -> None:
"""
Expand Down
45 changes: 42 additions & 3 deletions api/src/search/backend/load_opportunities_to_index.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import base64
import logging
from enum import StrEnum
from typing import Iterator, Sequence

import smart_open
from opensearchpy.exceptions import ConnectionTimeout, TransportError
from pydantic import Field
from pydantic_settings import SettingsConfigDict
Expand All @@ -16,6 +18,7 @@
from src.db.models.opportunity_models import (
CurrentOpportunitySummary,
Opportunity,
OpportunityAttachment,
OpportunitySearchIndexQueue,
)
from src.task.task import Task
Expand All @@ -36,6 +39,10 @@ class LoadOpportunitiesToIndexConfig(PydanticBaseEnvConfig):
alias_name: str = Field(default="opportunity-index-alias") # LOAD_OPP_SEARCH_ALIAS_NAME
index_prefix: str = Field(default="opportunity-index") # LOAD_OPP_INDEX_PREFIX

enable_opportunity_attachment_pipeline: bool = Field(
default=False, alias="ENABLE_OPPORTUNITY_ATTACHMENT_PIPELINE"
)


class LoadOpportunitiesToIndex(Task):
class Metrics(StrEnum):
Expand All @@ -53,7 +60,6 @@ def __init__(

self.search_client = search_client
self.is_full_refresh = is_full_refresh

if config is None:
config = LoadOpportunitiesToIndexConfig()
self.config = config
Expand Down Expand Up @@ -269,6 +275,31 @@ def fetch_existing_opportunity_ids_in_index(self) -> set[int]:

return opportunity_ids

def filter_attachments(
self, attachments: list[OpportunityAttachment]
) -> list[OpportunityAttachment]:
return [attachment for attachment in attachments]

def get_attachment_json_for_opportunity(
self, opp_attachments: list[OpportunityAttachment]
) -> list[dict]:

attachments = []
for att in opp_attachments:
with smart_open.open(
att.file_location,
"rb",
) as file:
file_content = file.read()
attachments.append(
{
"filename": att.file_name,
"data": base64.b64encode(file_content).decode("utf-8"),
}
)

return attachments

@retry(
stop=stop_after_attempt(3), # Retry up to 3 times
wait=wait_fixed(2), # Wait 2 seconds between retries
Expand Down Expand Up @@ -300,11 +331,19 @@ def load_records(self, records: Sequence[Opportunity]) -> set[int]:
self.increment(self.Metrics.TEST_RECORDS_SKIPPED)
continue

json_records.append(schema.dump(record))
json_record = schema.dump(record)
if self.config.enable_opportunity_attachment_pipeline:
json_record["attachments"] = self.get_attachment_json_for_opportunity(
record.opportunity_attachments
)

json_records.append(json_record)
self.increment(self.Metrics.RECORDS_LOADED)

loaded_opportunity_ids.add(record.opportunity_id)

self.search_client.bulk_upsert(self.index_name, json_records, "opportunity_id")
self.search_client.bulk_upsert(
self.index_name, json_records, "opportunity_id", pipeline="multi-attachment"
)

return loaded_opportunity_ids
4 changes: 3 additions & 1 deletion api/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ def upload_opportunity_attachment_s3(reset_aws_env_vars, mock_s3_bucket):
for file in files:
file_path = os.path.join(root, file)
s3_client.upload_file(
file_path, Bucket=mock_s3_bucket, Key=os.path.relpath(file_path, test_folder_path)
file_path,
Bucket=mock_s3_bucket,
Key=os.path.relpath(file_path, test_folder_path),
)

# Check files were uploaded to mock s3
Expand Down
1 change: 0 additions & 1 deletion api/tests/src/adapters/search/test_opensearch_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ def test_bulk_upsert(search_client, generic_index):
]

search_client.bulk_upsert(generic_index, records, primary_key_field="id")

# Verify the records are in the index
for record in records:
assert search_client._client.get(generic_index, record["id"])["_source"] == record
Expand Down
117 changes: 99 additions & 18 deletions api/tests/src/search/backend/test_load_opportunities_to_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
LoadOpportunitiesToIndex,
LoadOpportunitiesToIndexConfig,
)
from src.util import file_util
from src.util.datetime_util import get_now_us_eastern_datetime
from tests.conftest import BaseTestClass
from tests.src.db.models.factories import (
AgencyFactory,
OpportunityAttachmentFactory,
OpportunityFactory,
OpportunitySearchIndexQueueFactory,
)
Expand Down Expand Up @@ -39,26 +41,44 @@ def test_load_opportunities_to_index(
opportunities = []
opportunities.extend(
OpportunityFactory.create_batch(
size=6, is_posted_summary=True, agency_code=agency.agency_code
size=6,
is_posted_summary=True,
agency_code=agency.agency_code,
opportunity_attachments=[],
)
)
opportunities.extend(OpportunityFactory.create_batch(size=3, is_forecasted_summary=True))
opportunities.extend(OpportunityFactory.create_batch(size=2, is_closed_summary=True))
opportunities.extend(
OpportunityFactory.create_batch(size=8, is_archived_non_forecast_summary=True)
OpportunityFactory.create_batch(
size=3, is_forecasted_summary=True, opportunity_attachments=[]
)
)
opportunities.extend(
OpportunityFactory.create_batch(
size=6, is_archived_forecast_summary=True, agency_code=agency.agency_code
size=2, is_closed_summary=True, opportunity_attachments=[]
)
)
opportunities.extend(
OpportunityFactory.create_batch(
size=8, is_archived_non_forecast_summary=True, opportunity_attachments=[]
)
)
opportunities.extend(
OpportunityFactory.create_batch(
size=6,
is_archived_forecast_summary=True,
agency_code=agency.agency_code,
opportunity_attachments=[],
)
)

# Create some opportunities that won't get fetched / loaded into search
OpportunityFactory.create_batch(size=3, is_draft=True)
OpportunityFactory.create_batch(size=4, no_current_summary=True)
OpportunityFactory.create_batch(size=3, is_draft=True, opportunity_attachments=[])
OpportunityFactory.create_batch(size=4, no_current_summary=True, opportunity_attachments=[])

AgencyFactory.create(agency_code="MY-TEST-AGENCY", is_test_agency=True)
OpportunityFactory.create_batch(size=3, agency_code="MY-TEST-AGENCY")
OpportunityFactory.create_batch(
size=3, agency_code="MY-TEST-AGENCY", opportunity_attachments=[]
)

for opportunity in opportunities:
OpportunitySearchIndexQueueFactory.create(
Expand Down Expand Up @@ -105,7 +125,7 @@ def test_load_opportunities_to_index(
)

# Rerunning but first add a few more opportunities to show up
opportunities.extend(OpportunityFactory.create_batch(size=3))
opportunities.extend(OpportunityFactory.create_batch(size=3, opportunity_attachments=[]))
load_opportunities_to_index.index_name = (
load_opportunities_to_index.index_name + "-new-data"
)
Expand All @@ -119,6 +139,45 @@ def test_load_opportunities_to_index(
[record["opportunity_id"] for record in resp.records]
)

def test_opportunity_attachment_pipeline(
self,
mock_s3_bucket,
db_session,
enable_factory_create,
load_opportunities_to_index,
monkeypatch: pytest.MonkeyPatch,
opportunity_index_alias,
search_client,
):
filename = "test_file_1.txt"
file_path = f"s3://{mock_s3_bucket}/{filename}"
content = "I am a file"
with file_util.open_stream(file_path, "w") as outfile:
outfile.write(content)

opportunity = OpportunityFactory.create(opportunity_attachments=[])
OpportunityAttachmentFactory.create(
mime_type="text/plain",
opportunity=opportunity,
file_location=file_path,
file_name=filename,
)

load_opportunities_to_index.index_name = (
load_opportunities_to_index.index_name + "-pipeline"
)

load_opportunities_to_index.run()

resp = search_client.search(opportunity_index_alias, {"size": 100})

record = [d for d in resp.records if d.get("opportunity_id") == opportunity.opportunity_id]
attachment = record[0]["attachments"][0]
# assert correct attachment was uploaded
assert attachment["filename"] == filename
# assert data was b64encoded
assert attachment["attachment"]["content"] == content # decoded b64encoded attachment


class TestLoadOpportunitiesToIndexPartialRefresh(BaseTestClass):
@pytest.fixture(scope="class")
Expand Down Expand Up @@ -148,18 +207,36 @@ def test_load_opportunities_to_index(

# Load a bunch of records into the DB
opportunities = []
opportunities.extend(OpportunityFactory.create_batch(size=6, is_posted_summary=True))
opportunities.extend(OpportunityFactory.create_batch(size=3, is_forecasted_summary=True))
opportunities.extend(OpportunityFactory.create_batch(size=2, is_closed_summary=True))
opportunities.extend(
OpportunityFactory.create_batch(size=8, is_archived_non_forecast_summary=True)
OpportunityFactory.create_batch(
size=6, is_posted_summary=True, opportunity_attachments=[]
)
)
opportunities.extend(
OpportunityFactory.create_batch(
size=3, is_forecasted_summary=True, opportunity_attachments=[]
)
)
opportunities.extend(
OpportunityFactory.create_batch(
size=2, is_closed_summary=True, opportunity_attachments=[]
)
)
opportunities.extend(
OpportunityFactory.create_batch(
size=8, is_archived_non_forecast_summary=True, opportunity_attachments=[]
)
)
opportunities.extend(
OpportunityFactory.create_batch(size=6, is_archived_forecast_summary=True)
OpportunityFactory.create_batch(
size=6, is_archived_forecast_summary=True, opportunity_attachments=[]
)
)

AgencyFactory.create(agency_code="MY-TEST-AGENCY-123", is_test_agency=True)
test_opps = OpportunityFactory.create_batch(size=2, agency_code="MY-TEST-AGENCY-123")
test_opps = OpportunityFactory.create_batch(
size=2, agency_code="MY-TEST-AGENCY-123", opportunity_attachments=[]
)

for opportunity in itertools.chain(opportunities, test_opps):
OpportunitySearchIndexQueueFactory.create(
Expand All @@ -179,7 +256,11 @@ def test_load_opportunities_to_index(
] == len(test_opps)

# Add a few more opportunities that will be created
opportunities.extend(OpportunityFactory.create_batch(size=3, is_posted_summary=True))
opportunities.extend(
OpportunityFactory.create_batch(
size=3, is_posted_summary=True, opportunity_attachments=[]
)
)

# Delete some opportunities
opportunities_to_delete = [opportunities.pop(), opportunities.pop(), opportunities.pop()]
Expand Down Expand Up @@ -221,7 +302,7 @@ def test_load_opportunities_to_index_index_does_not_exist(self, db_session, sear

def test_new_opportunity_gets_indexed(self, db_session, load_opportunities_to_index):
"""Test that a new opportunity in the queue gets indexed"""
test_opportunity = OpportunityFactory.create()
test_opportunity = OpportunityFactory.create(opportunity_attachments=[])

# Add to queue
OpportunitySearchIndexQueueFactory.create(opportunity=test_opportunity)
Expand All @@ -234,7 +315,7 @@ def test_new_opportunity_gets_indexed(self, db_session, load_opportunities_to_in

def test_draft_opportunity_not_indexed(self, db_session, load_opportunities_to_index):
"""Test that draft opportunities are not indexed"""
test_opportunity = OpportunityFactory.create(is_draft=True)
test_opportunity = OpportunityFactory.create(is_draft=True, opportunity_attachments=[])

# Add to queue
OpportunitySearchIndexQueueFactory.create(opportunity=test_opportunity)
Expand Down

0 comments on commit 9a99264

Please sign in to comment.