Skip to content

Commit

Permalink
Update the colin sync job to work with batch processings
Browse files Browse the repository at this point in the history
  • Loading branch information
leodube-aot committed Oct 11, 2024
1 parent 707cc59 commit ef33531
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 42 deletions.
159 changes: 125 additions & 34 deletions jobs/update-colin-filings/update_colin_filings.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def shell_context():


def get_filings(app: Flask, token, page, limit):
"""Get a filing with filing_id."""
"""Get filings ready for colin sync."""
requests_timeout = int(app.config.get('ACCOUNT_SVC_TIMEOUT'))
req = requests.get(f'{app.config["LEGAL_API_URL"]}/businesses/internal/filings?page={page}&limit={limit}',
headers={'Authorization': 'Bearer ' + token},
Expand All @@ -72,6 +72,18 @@ def get_filings(app: Flask, token, page, limit):
return req.json()


def get_batch_processings(app: Flask, token, page, limit):
"""Get a batch processings ready for colin sync."""
requests_timeout = int(app.config.get('ACCOUNT_SVC_TIMEOUT'))
req = requests.get(f'{app.config["LEGAL_API_URL"]}/businesses/internal/batch_processings?page={page}&limit={limit}',
headers={'Authorization': 'Bearer ' + token},
timeout=requests_timeout)
if not req or req.status_code != 200:
app.logger.error(f'Failed to collect batch processings from legal-api. {req} {req.json()} {req.status_code}')
raise Exception # pylint: disable=broad-exception-raised
return req.json()


def send_filing(app: Flask, token: str, filing: dict, filing_id: str):
"""Post to colin-api with filing."""
clean_none(filing)
Expand All @@ -97,6 +109,29 @@ def send_filing(app: Flask, token: str, filing: dict, filing_id: str):
return req.json()['filing']['header']['colinIds']


def send_batch_processing(app: Flask, token: str, batch_processing: dict, batch_processing_id: str):
"""Post to colin-api with batch processing."""
clean_none(batch_processing)

identifier = batch_processing.get('businessIdentifier', None)
legal_type = batch_processing.get('businessLegalType', None)

req = None
if legal_type and identifier:
requests_timeout = int(app.config.get('ACCOUNT_SVC_TIMEOUT'))
req = requests.patch(f'{app.config["COLIN_URL"]}/internal/{legal_type}/{identifier}',
headers={'Content-Type': 'application/json',
'Authorization': 'Bearer ' + token},
json=batch_processing,
timeout=requests_timeout)

if not req or req.status_code != 201:
app.logger.error(f'Batch processing {batch_processing_id} not created in colin {identifier}.')
# raise Exception
return None
return req.json()['batchProcessing']['colinIds']


def update_colin_id(app: Flask, token: dict, filing_id: str, colin_ids: list):
"""Update the colin_id in the filings table."""
requests_timeout = int(app.config.get('ACCOUNT_SVC_TIMEOUT'))
Expand All @@ -111,6 +146,20 @@ def update_colin_id(app: Flask, token: dict, filing_id: str, colin_ids: list):
return False
return True

def update_batch_processing_colin_id(app: Flask, token: dict, batch_processing_id: str, colin_ids: list):
"""Update the colin_id in the batch processing table."""
requests_timeout = int(app.config.get('ACCOUNT_SVC_TIMEOUT'))
req = requests.patch(
f'{app.config["LEGAL_API_URL"]}/businesses/internal/batch_processings/{batch_processing_id}',
headers={'Authorization': 'Bearer ' + token},
json={'colinIds': colin_ids},
timeout=requests_timeout
)
if not req or req.status_code != 202:
app.logger.error(f'Failed to update colin id in legal db for batch processing {batch_processing_id} {req.status_code}')
return False
return True


def clean_none(dictionary: dict = None):
"""Replace all none values with empty string."""
Expand Down Expand Up @@ -143,46 +192,88 @@ def get_bearer_token(app):
except Exception: # noqa: B902
return None

def update_filings(app: Flask, token: dict):
"""Get filings that haven't been synced with colin and send them to the colin-api."""
page = 1
limit = 50
pending_filings = None
corps_with_failed_filing = []

while ((pending_filings is None or page <= math.ceil(pending_filings/limit)) and
(results := get_filings(app, token, page, limit))):
page += 1
pending_filings = results.get('total')
if not (filings := results.get('filings')):
# pylint: disable=no-member; false positive
app.logger.debug('No completed filings to send to colin.')
for filing in filings:
filing_id = filing['filingId']
identifier = filing['filing']['business']['identifier']
if identifier in corps_with_failed_filing:
# pylint: disable=no-member; false positive
app.logger.debug(f'Skipping filing {filing_id} for'
f' {filing["filing"]["business"]["identifier"]}.')
else:
colin_ids = send_filing(app, token, filing, filing_id)
update = None
if colin_ids:
update = update_colin_id(app, token, filing_id, colin_ids)
if update:
# pylint: disable=no-member; false positive
app.logger.debug(f'Successfully updated filing {filing_id}')
pending_filings -= 1
else:
corps_with_failed_filing.append(filing['filing']['business']['identifier'])
# pylint: disable=no-member; false positive
app.logger.error(f'Failed to update filing {filing_id} with colin event id.')


def update_batch_processings(app: Flask, token: dict):
"""Get eligible batch processings and send them to the colin-api."""
page = 1
limit = 50
pending_batch_processings = None
corps_with_failed_batch_processing = []

while (
(pending_batch_processings is None or page <= math.ceil(pending_batch_processings/limit)) and
(results := get_batch_processings(app, token, page, limit))
):
page += 1
pending_batch_processings = results.get('total')
if not (batch_processings := results.get('batchProcessings')):
# pylint: disable=no-member; false positive
app.logger.debug('No eligible batch processings to send to colin.')
for batch_processing in batch_processings:
batch_processing_id = batch_processing['id']
identifier = batch_processing['businessIdentifier']
if identifier in corps_with_failed_batch_processing:
# pylint: disable=no-member; false positive
app.logger.debug(f'Skipping batch processing {batch_processing_id} for'
f' {batch_processing['businessIdentifier']}.')
else:
colin_ids = send_batch_processing(app, token, batch_processing, batch_processing_id)
update = None
if colin_ids:
update = update_batch_processing_colin_id(app, token, batch_processing_id, colin_ids)
if update:
# pylint: disable=no-member; false positive
app.logger.debug(f'Successfully updated batch processing {batch_processing_id}')
pending_batch_processings -= 1
else:
corps_with_failed_batch_processing.append(batch_processing['businessIdentifier'])
# pylint: disable=no-member; false positive
app.logger.error(f'Failed to update batch processing {batch_processing_id} with colin event id.')

def run():
"""Get filings that haven't been synced with colin and send them to the colin-api."""
"""Run the job to update filings and batch processings."""
application = create_app()
corps_with_failed_filing = []
with application.app_context():
try:
# get updater-job token
token = get_bearer_token(application)

page = 1
limit = 50
pending_filings = None
while ((pending_filings is None or page <= math.ceil(pending_filings/limit)) and
(results := get_filings(application, token, page, limit))):
page += 1
pending_filings = results.get('total')
if not (filings := results.get('filings')):
# pylint: disable=no-member; false positive
application.logger.debug('No completed filings to send to colin.')
for filing in filings:
filing_id = filing['filingId']
identifier = filing['filing']['business']['identifier']
if identifier in corps_with_failed_filing:
# pylint: disable=no-member; false positive
application.logger.debug(f'Skipping filing {filing_id} for'
f' {filing["filing"]["business"]["identifier"]}.')
else:
colin_ids = send_filing(application, token, filing, filing_id)
update = None
if colin_ids:
update = update_colin_id(application, token, filing_id, colin_ids)
if update:
# pylint: disable=no-member; false positive
application.logger.debug(f'Successfully updated filing {filing_id}')
pending_filings -= 1
else:
corps_with_failed_filing.append(filing['filing']['business']['identifier'])
# pylint: disable=no-member; false positive
application.logger.error(f'Failed to update filing {filing_id} with colin event id.')
update_filings(application, token)
update_batch_processings(application, token)

except Exception as err: # noqa: B902
# pylint: disable=no-member; false positive
Expand Down
3 changes: 1 addition & 2 deletions legal-api/src/legal_api/models/batch_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ def json(self):
}
return d


def save(self):
"""Save the object to the database immediately."""
db.session.add(self)
Expand Down Expand Up @@ -157,7 +156,7 @@ def get_eligible_batch_processings_for_colin(page=1, limit=20):
return {
'page': page,
'limit': limit,
'batch_processings': batch_processings.items,
'batchProcessings': batch_processings.items,
'pages': batch_processings.pages,
'total': batch_processings.total
}
21 changes: 15 additions & 6 deletions legal-api/src/legal_api/resources/v2/business/colin_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ def get_completed_filings_for_colin():
'total': pending_filings.get('total')
}), HTTPStatus.OK


@bp.route('/internal/batch_processings', methods=['GET'])
@cross_origin(origin='*')
@jwt.has_one_of_roles([UserRoles.colin])
Expand All @@ -112,11 +113,16 @@ def get_eligible_batch_processings_for_colin():
page = int(request.args.get('page', 1))
limit = int(request.args.get('limit', 20))
pending_batch_processings = BatchProcessing.get_eligible_batch_processings_for_colin(page, limit)
for batch_processing in pending_batch_processings.get('batch_processings'):
batch_processings.append(batch_processing.json)
for batch_processing in pending_batch_processings.get('batchProcessings'):
business = Business.find_by_internal_id(batch_processing.business_id)

batch_processings.append({
**batch_processing.json,
'businessLegalType': business.legal_type
})

return jsonify({
'batch_processings': batch_processings,
'batchProcessings': batch_processings,
'page': page,
'limit': limit,
'pages': pending_batch_processings.get('pages'),
Expand Down Expand Up @@ -222,8 +228,9 @@ def update_batch_processing_colin_id(batch_processing_id):
try:
json_input = request.get_json()
if not json_input:
return None, None, {'message': f'No batch processing json data in body of patch for {batch_processing_id}.'}, \
HTTPStatus.BAD_REQUEST
return None, None, {
'message': f'No batch processing json data in body of patch for {batch_processing_id}.'
}, HTTPStatus.BAD_REQUEST

colin_ids = json_input['colinIds']
batch_processing = BatchProcessing.find_by_id(batch_processing_id)
Expand All @@ -237,7 +244,9 @@ def update_batch_processing_colin_id(batch_processing_id):
batch_processing.colin_event_ids.append(colin_event_id_obj)
batch_processing.save()
except BusinessException as err:
current_app.logger.Error(f'Error adding colin event id {colin_id} to batch processing with id {batch_processing_id}')
current_app.logger.Error(
f'Error adding colin event id {colin_id} to batch processing with id {batch_processing_id}'
)
return None, None, {'message': err.error}, err.status_code

return jsonify(batch_processing.json), HTTPStatus.ACCEPTED
Expand Down

0 comments on commit ef33531

Please sign in to comment.