Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

96247 add organization address validation to trexler file process job #20085

Open
wants to merge 28 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
dfa372d
[WIP]
opticbob Jan 2, 2025
5e28438
[WIP] comments to move forward
opticbob Jan 2, 2025
f2ffd38
Restore original job config
opticbob Jan 3, 2025
08a863b
[WIP] update for orgs
opticbob Jan 3, 2025
e845234
[WIP] Continue removing non-org code
opticbob Jan 3, 2025
42157a4
Queue updates for orgs
opticbob Jan 3, 2025
f512d14
[WIP] VSOs processing
opticbob Jan 6, 2025
d6956b5
[WIP] Allow processing of test file
opticbob Jan 6, 2025
ddc5fd6
[WIP] Add diff checking
opticbob Jan 7, 2025
b7d57b4
Org not rep
opticbob Jan 7, 2025
dc036da
Queue updates spec
opticbob Jan 7, 2025
97c7caf
Test fixes
opticbob Jan 7, 2025
d21b612
[WIP] Update spec progress
opticbob Jan 7, 2025
d078048
[WIP] file processor spec
opticbob Jan 7, 2025
bd584dd
Test fixes
opticbob Jan 8, 2025
85a771d
Removing unused code
opticbob Jan 8, 2025
cce3d14
Fix remaining tests
opticbob Jan 8, 2025
e2595b3
Cleanup
opticbob Jan 8, 2025
a9212a3
Merge branch 'master' into 96247-add-organization-address-validation-…
opticbob Jan 8, 2025
aab565b
Merge branch 'master' into 96247-add-organization-address-validation-…
opticbob Jan 8, 2025
ac02173
Merge branch 'master' into 96247-add-organization-address-validation-…
opticbob Jan 9, 2025
2e30c32
Merge branch 'master' into 96247-add-organization-address-validation-…
opticbob Jan 9, 2025
e8071e4
Simplify organization diff
opticbob Jan 9, 2025
e8ba6d3
poa -> id
opticbob Jan 9, 2025
bf531fd
More poa -> id and string conversion
opticbob Jan 9, 2025
a96cc1c
Queue update test fix
opticbob Jan 9, 2025
76f07fd
Merge branch 'master' into 96247-add-organization-address-validation-…
opticbob Jan 9, 2025
c46c027
States hash -> array
opticbob Jan 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/periodic_jobs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@

# Updates veteran representatives address attributes (including lat, long, location, address fields, email address, phone number) # rubocop:disable Layout/LineLength
mgr.register('0 3 * * *', 'Representatives::QueueUpdates')
mgr.register('0 3 * * *', 'Organizations::QueueUpdates')

# Updates veteran service organization names
mgr.register('0 5 * * *', 'Organizations::UpdateNames')
Expand Down
25 changes: 25 additions & 0 deletions modules/veteran/app/models/veteran/service/organization.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,31 @@ class Organization < ApplicationRecord
self.primary_key = :poa

validates :poa, presence: true

#
# Compares org's current info with new data to detect changes in address.
# @param org_data [Hash] New data with :address keys for comparison.
#
# @return [Hash] Hash with "address_changed" keys as a boolean.
def diff(org_data)
{ 'address_changed' => address_changed?(org_data) }
end

private

#
# Checks if the org's address has changed compared to a new address hash.
# @param other_address [Hash] New address data with keys for address components and state code.
#
# @return [Boolean] True if current address differs from `other_address`, false otherwise.
def address_changed?(org_data)
address = [address_line1, address_line2, address_line3, city, zip_code, zip_suffix, state_code].join(' ')
other_address = org_data[:address]
.values_at(:address_line1, :address_line2, :address_line3, :city, :zip_code5, :zip_code4)
.push(org_data.dig(:address, :state_province, :code))
.join(' ')
address != other_address
end
end
end
end
67 changes: 67 additions & 0 deletions modules/veteran/app/sidekiq/organizations/queue_updates.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# frozen_string_literal: true

require 'sidekiq'
require 'sentry_logging'

module Organizations
class QueueUpdates
include Sidekiq::Job
include SentryLogging

SLICE_SIZE = 30

def perform
file_content = fetch_file_content
return unless file_content

processed_data = Organizations::XlsxFileProcessor.new(file_content).process
queue_address_updates(processed_data)
rescue => e
log_error("Error in file fetching process: #{e.message}")
end

private

def fetch_file_content
Representatives::XlsxFileFetcher.new.fetch
end

def queue_address_updates(data)
delay = 0

Organizations::XlsxFileProcessor::SHEETS_TO_PROCESS.each do |sheet|
next if data[sheet].blank?

batch = Sidekiq::Batch.new
batch.description = "Batching #{sheet} sheet data"

begin
batch.jobs do
rows_to_process(data[sheet]).each_slice(SLICE_SIZE) do |rows|
json_rows = rows.to_json
Organizations::Update.perform_in(delay.minutes, json_rows)
delay += 1
end
end
rescue => e
log_error("Error queuing address updates: #{e.message}")
end
end
end

def rows_to_process(rows)
rows.map do |row|
org = Veteran::Service::Organization.find_by(poa: row[:poa])
diff = org.diff(row)
row.merge(diff.merge({ address_exists: org.location.present? })) if diff.values.any?
rescue ActiveRecord::RecordNotFound => e
log_error("Error: Organization not found #{e.message}")
nil
end.compact
end

def log_error(message)
log_message_to_sentry("QueueUpdates error: #{message}", :error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to log this to datadog instead? Then you could put a monitor in place to look for this log

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@LindseySaari Do you have a place in vets-api where you could point me to a good example of logging to datadog? I see some code referencing statsd and some for Datadog::Tracing, but I'm not sure what you're looking for.

Copy link
Contributor Author

@opticbob opticbob Jan 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@LindseySaari would it work to investigate datadog integration in a future PR? We are waiting for this to merge before we expand the percentage of users exposed to our product launch.

end
end
end
283 changes: 283 additions & 0 deletions modules/veteran/app/sidekiq/organizations/update.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,283 @@
# frozen_string_literal: true

require 'sidekiq'
require 'sentry_logging'
require 'va_profile/models/validation_address'
require 'va_profile/address_validation/service'
require 'va_profile/models/v3/validation_address'
require 'va_profile/v3/address_validation/service'

module Organizations
# Processes updates for organization records based on provided JSON data.
# This class is designed to parse organization data, validate addresses using an external service,
# and update records in the database accordingly.
class Update
include Sidekiq::Job
include SentryLogging

# Processes each organization's data provided in JSON format.
# This method parses the JSON, validates each organization's address, and updates the database records.
# @param orgs_json [String] JSON string containing an array of organization data.
def perform(orgs_json)
orgs_data = JSON.parse(orgs_json)
orgs_data.each { |org_data| process_org_data(org_data) }
rescue => e
log_error("Error processing job: #{e.message}")
end

private

# Processes individual organization data, validates the address, and updates the record.
# If the address validation fails or an error occurs during the update, the error is logged and the process
# is halted for the current organization.
# @param org_data [Hash] The organization data including id and address.
def process_org_data(org_data) # rubocop:disable Metrics/MethodLength
return unless record_can_be_updated?(org_data)

address_validation_api_response = nil

if org_data['address_changed']

api_response = if Flipper.enabled?(:va_v3_contact_information_service)
get_best_address_candidate(org_data)
else
get_best_address_candidate(org_data['address'])
end

# don't update the record if there is not a valid address with non-zero lat and long at this point
if api_response.nil?
return
else
address_validation_api_response = api_response
end
end

begin
update_org_record(org_data, address_validation_api_response)
rescue Common::Exceptions::BackendServiceException => e
log_error("Address validation failed for Org id: #{org_data['id']}: #{e.message}")
nil
rescue => e
log_error("Update failed for Org id: #{org_data['id']}: #{e.message}")
nil
end
end

def record_can_be_updated?(org_data)
org_data['address_exists'] || org_data['address_changed']
end

# Constructs a validation address object from the provided address data.
# @param address [Hash] A hash containing the details of the organization's address.
# @return [VAProfile::Models::ValidationAddress] A validation address object ready for address validation service.
def build_validation_address(address)
if Flipper.enabled?(:va_v3_contact_information_service)
validation_model = VAProfile::Models::V3::ValidationAddress
state_code = address['state']['state_code']
city = address['city_name']
else
validation_model = VAProfile::Models::ValidationAddress
state_code = address['state_province']['code']
city = address['city']
end

validation_model.new(
address_pou: address['address_pou'],
address_line1: address['address_line1'],
address_line2: address['address_line2'],
address_line3: address['address_line3'],
city: city,
state_code: state_code,
zip_code: address['zip_code5'],
zip_code_suffix: address['zip_code4'],
country_code_iso3: address['country_code_iso3']
)
end

# Validates the given address using the VAProfile address validation service.
# @param candidate_address [VAProfile::Models::ValidationAddress] The address to be validated.
# @return [Hash] The response from the address validation service.
def validate_address(candidate_address)
validation_service = if Flipper.enabled?(:va_v3_contact_information_service)
VAProfile::V3::AddressValidation::Service.new
else
VAProfile::AddressValidation::Service.new
end
validation_service.candidate(candidate_address)
end

# Checks if the address validation response is valid.
# @param response [Hash] The response from the address validation service.
# @return [Boolean] True if the address is valid, false otherwise.
def address_valid?(response)
response.key?('candidate_addresses') && !response['candidate_addresses'].empty?
end

# Updates the address record based on the org_data and validation response.
# If the record cannot be found, logs an error to Sentry.
# @param org_data [Hash] Original org_data containing the address and other details.
# @param api_response [Hash] The response from the address validation service.
def update_org_record(org_data, api_response)
record =
Veteran::Service::Organization.find_by(poa: org_data['id'])
if record.nil?
raise StandardError, 'Organization not found.'
else
address_attributes = org_data['address_changed'] ? build_address_attributes(org_data, api_response) : {}
record.update(address_attributes)
end
end

# Updates the given record with the new address and other relevant attributes.
# @param org_data [Hash] Original org_data containing the address and other details.
# @param api_response [Hash] The response from the address validation service.
def build_address_attributes(org_data, api_response)
if Flipper.enabled?(:va_v3_contact_information_service)
build_v3_address(api_response['candidate_addresses'].first)
else
address = api_response['candidate_addresses'].first['address']
geocode = api_response['candidate_addresses'].first['geocode']
meta = api_response['candidate_addresses'].first['address_meta_data']
build_address(address, geocode, meta).merge({ raw_address: org_data['address'].to_json })
end
end

# Builds the attributes for the record update from the address, geocode, and metadata.
# @param address [Hash] Address details from the validation response.
# @param geocode [Hash] Geocode details from the validation response.
# @param meta [Hash] Metadata about the address from the validation response.
# @return [Hash] The attributes to update the record with.
def build_address(address, geocode, meta)
{
address_type: meta['address_type'],
address_line1: address['address_line1'],
address_line2: address['address_line2'],
address_line3: address['address_line3'],
city: address['city'],
province: address['state_province']['name'],
state_code: address['state_province']['code'],
zip_code: address['zip_code5'],
zip_suffix: address['zip_code4'],
country_code_iso3: address['country']['iso3_code'],
country_name: address['country']['name'],
county_name: address.dig('county', 'name'),
county_code: address.dig('county', 'county_fips_code'),
lat: geocode['latitude'],
long: geocode['longitude'],
location: "POINT(#{geocode['longitude']} #{geocode['latitude']})"
}
end

def build_v3_address(address)
{
address_type: address['address_type'],
address_line1: address['address_line1'],
address_line2: address['address_line2'],
address_line3: address['address_line3'],
city: address['city_name'],
province: address['state']['state_name'],
state_code: address['state']['state_code'],
zip_code: address['zip_code5'],
zip_suffix: address['zip_code4'],
country_code_iso3: address['country']['iso3_code'],
country_name: address['country']['country_name'],
county_name: address.dig('county', 'county_name'),
county_code: address.dig('county', 'county_code'),
lat: address['geocode']['latitude'],
long: address['geocode']['longitude'],
location: "POINT(#{address['geocode']['longitude']} #{address['geocode']['latitude']})"
}
end

# Logs an error to Sentry.
# @param error [Exception] The error string to be logged.
def log_error(error)
log_message_to_sentry("Organizations::Update: #{error}", :error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

end

# Checks if the latitude and longitude of an address are both set to zero, which are the default values
# for DualAddressError warnings we see with some P.O. Box addresses the validator struggles with
# @param candidate_address [Hash] an address hash object returned by [VAProfile::AddressValidation::Service]
# @return [Boolean]
def lat_long_zero?(candidate_address)
address = candidate_address['candidate_addresses']&.first
return false if address.blank?

geocode = address['geocode']
return false if geocode.blank?

geocode['latitude']&.zero? && geocode['longitude']&.zero?
end

# Attempt to get valid address with non-zero coordinates by modifying the OGC address data
# @param address [Hash] the OGC address object
# @param retry_count [Integer] the current retry attempt which determines how the address object should be modified
# @return [Hash] the response from the address validation service
def modified_validation(address, retry_count)
address_attempt = address.dup
case retry_count
when 1 # only use the original address_line1
when 2 # set address_line1 to the original address_line2
address_attempt['address_line1'] = address['address_line2']
else # set address_line1 to the original address_line3
address_attempt['address_line1'] = address['address_line3']
end

address_attempt['address_line2'] = nil
address_attempt['address_line3'] = nil

validate_address(build_validation_address(address_attempt))
end

# An address validation attempt is retriable if the address is invalid OR the coordinates are zero
# @param response [Hash, Nil] the response from the address validation service
# @return [Boolean]
def retriable?(response)
return true if response.blank?

!address_valid?(response) || lat_long_zero?(response)
end

# Retry address validation
# @param org_address [Hash] the address provided by OGC
# @return [Hash, Nil] the response from the address validation service
def retry_validation(org_address)
# the address validation service requires at least one of address_line1, address_line2, and address_line3 to
# exist. No need to run the retry if we know it will fail before attempting the api call.
api_response = modified_validation(org_address, 1) if org_address['address_line1'].present?

if retriable?(api_response) && org_address['address_line2'].present?
api_response = modified_validation(org_address, 2)
end

if retriable?(api_response) && org_address['address_line3'].present?
api_response = modified_validation(org_address, 3)
end

api_response
end

# Get the best address that the address validation api can provide with some retry logic added in
# @param org_address [Hash] the address provided by OGC
# @return [Hash, Nil] the response from the address validation service
def get_best_address_candidate(org_address)
candidate_address = build_validation_address(org_address)
original_response = validate_address(candidate_address)
return nil unless address_valid?(original_response)

# retry validation if we get zero as the coordinates - this should indicate some warning with validation that
# is typically seen with addresses that mix street addresses with P.O. Boxes
if lat_long_zero?(original_response)
retry_response = retry_validation(org_address)

if retriable?(retry_response)
nil
else
retry_response
end
else
original_response
end
end
end
end
Loading
Loading