Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pipeline) : Ensure the zone_diffusion_codes for DROM/COM #288

Merged
merged 6 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion pipeline/dbt/dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@ models:

staging:
+schema: staging
+materialized: view

sources:
+materialized: view

decoupage_administratif:
+materialized: table

intermediate:
+schema: "{{ 'intermediate_tmp' if var('build_intermediate_tmp', false) else 'intermediate' }}"
Expand All @@ -41,6 +46,9 @@ models:
+contract:
enforced: false

tests:
+store_failures: true

seeds:
data_inclusion:
schema:
Expand Down
11 changes: 10 additions & 1 deletion pipeline/dbt/macros/domain/checks/check_service.sql
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,16 @@ BEGIN
("modes_accueil", "modes_accueil IS NULL OR modes_accueil <@ ARRAY(SELECT m.value FROM " ~ ref('modes_accueil') ~ "AS m)"),
("modes_orientation_accompagnateur", "modes_orientation_accompagnateur IS NULL OR modes_orientation_accompagnateur <@ ARRAY(SELECT m.value FROM " ~ ref('modes_orientation_accompagnateur') ~ "AS m)"),
("modes_orientation_beneficiaire", "modes_orientation_beneficiaire IS NULL OR modes_orientation_beneficiaire <@ ARRAY(SELECT m.value FROM " ~ ref('modes_orientation_beneficiaire') ~ "AS m)"),
("zone_diffusion_code", "zone_diffusion_code IS NULL OR zone_diffusion_code ~ '^(\d{9}|\w{5}|\w{2,3}|\d{2})$'"),
(
"zone_diffusion_code",
"zone_diffusion_code IS NULL
OR zone_diffusion_type = 'pays'
OR zone_diffusion_type = 'region' AND zone_diffusion_code IN (SELECT code FROM " ~ ref('stg_decoupage_administratif__regions') ~ ")
OR zone_diffusion_type = 'departement' AND zone_diffusion_code IN (SELECT code FROM " ~ ref('stg_decoupage_administratif__departements') ~ ")
OR zone_diffusion_type = 'epci' AND zone_diffusion_code IN (SELECT code FROM " ~ ref('stg_decoupage_administratif__epcis') ~ ")
OR zone_diffusion_type = 'commune' AND zone_diffusion_code IN (SELECT code FROM " ~ ref('stg_decoupage_administratif__communes') ~ ")
"
),
("zone_diffusion_type", "zone_diffusion_type IS NULL OR zone_diffusion_type IN (SELECT t.value FROM " ~ ref('zones_de_diffusion_types') ~ "AS t)"),
]
%}
Expand Down
9 changes: 0 additions & 9 deletions pipeline/dbt/models/_sources.yml
Original file line number Diff line number Diff line change
Expand Up @@ -211,12 +211,3 @@ sources:
- name: services
meta:
kind: service

- name: decoupage_administratif
schema: decoupage_administratif
tables:
- name: regions
- name: departements
- name: epcis
- name: communes
- name: arrondissements
87 changes: 41 additions & 46 deletions pipeline/dbt/models/intermediate/_models.yml
Original file line number Diff line number Diff line change
@@ -1,25 +1,17 @@
version: 2

x-union-common-check-args: &union-common-check-args
include:
- _di_surrogate_id
config:
severity: warn
store_failures: true

models:
- name: int__union_adresses
description: |
Gathers addresses from all sources
- name: int__plausible_personal_emails

* model can contain faulty data
* test failure are saved (see log output)
- name: int__union_adresses
data_tests:
- check_adresse: *union-common-check-args
- check_adresse:
include:
- _di_surrogate_id
config:
severity: warn

- name: int__union_contacts
description: |
Gathers contacts from all sources
columns:
- name: contact_uid
data_tests:
Expand All @@ -31,52 +23,55 @@ models:
- dbt_utils.not_empty_string

- name: int__union_services
description: |
Gathers services from all sources

* model can contain faulty data
* test failure are saved (see log output)
data_tests:
- check_service: *union-common-check-args
- check_service:
include:
- _di_surrogate_id
config:
severity: warn

- name: int__union_structures
description: |
Gathers structures from all sources

* model can contain faulty data
* test failure are saved (see log output)
data_tests:
- check_structure: *union-common-check-args

- name: int__plausible_personal_emails
- check_structure:
include:
- _di_surrogate_id
config:
severity: warn

- name: int__union_adresses__enhanced
description: |
All valid adresses, with geocoding

- name: int__union_services__enhanced
description: |
All valid services, with extra data:

* geocoded addresses
* zone_diffusion_* filled with geocoded data (monenfant, soliguide)

A service belonging to a structure data failing validation is considered invalid.
columns:
- name: zone_diffusion_code
data_tests:
- not_null:
config:
severity: warn
- dbt_utils.not_empty_string
- dbt_utils.not_constant
- relationships:
to: ref('stg_decoupage_administratif__regions')
field: code
where: "zone_diffusion_type = 'region'"
- relationships:
to: ref('stg_decoupage_administratif__departements')
field: code
where: "zone_diffusion_type = 'departement'"
- relationships:
to: ref('stg_decoupage_administratif__epcis')
field: code
where: "zone_diffusion_type = 'epci'"
- relationships:
to: ref('stg_decoupage_administratif__communes')
field: code
where: "zone_diffusion_type = 'commune'"

- name: int__union_structures__enhanced
description: |
All valid structures, with extra data:

* geocoded addresses
* email with pii flag

- name: int__geocodages
description: |
Geocoding results for all sources.

This model is incremental, it will only geocode new or changed addresses.
It stores raw geocoding results, without filtering.

Geocoding is done by calling the BAN api in PL/Python.
columns:
- name: geocoded_at
Expand Down
155 changes: 98 additions & 57 deletions pipeline/dbt/models/intermediate/int__union_services__enhanced.sql
Original file line number Diff line number Diff line change
Expand Up @@ -6,86 +6,126 @@ structures AS (
SELECT * FROM {{ ref('int__union_structures__enhanced') }}
),

departements AS (
SELECT * FROM {{ ref('stg_decoupage_administratif__departements') }}
),

adresses AS (
SELECT * FROM {{ ref('int__union_adresses__enhanced') }}
),

departements AS (
SELECT * FROM {{ ref('stg_decoupage_administratif__departements') }}
adresses_with_code_departement AS (
SELECT
adresses.*,
CASE
WHEN LEFT(adresses.code_insee, 2) = '97' THEN LEFT(adresses.code_insee, 3)
ELSE LEFT(adresses.code_insee, 2)
END AS "code_departement"
FROM adresses
),

-- TODO: Refactoring needed to be able to do geocoding per source and then use the result in the mapping
services_with_zone_diffusion AS (
services_with_valid_structure AS (
SELECT services.*
FROM services
INNER JOIN structures
ON services._di_structure_surrogate_id = structures._di_surrogate_id
),

-- For some providers, zone_diffusion_code can not be set at the source mapping level for lack of proper codification.
-- Now that the data has been geocoded, it can be set, according to the mapped zone_diffusion_type.
-- FIXME(vperron) : ODSPEP services have such a catastrophic adress columns quality
-- that trying to reuse them for the zone diffusion makes the situation worse.
zones_diffusion AS (
SELECT
{{ dbt_utils.star(from=ref('int__union_services'), relation_alias='services', except=["zone_diffusion_code", "zone_diffusion_nom"]) }},
services._di_surrogate_id AS "_di_surrogate_id",
services.zone_diffusion_type AS "zone_diffusion_type",
CASE
WHEN services.source = ANY(ARRAY['monenfant', 'soliguide']) THEN adresses.code_insee
WHEN services.source = ANY(ARRAY['reseau-alpha', 'action-logement']) THEN LEFT(adresses.code_insee, 2)
WHEN NOT (services.source = ANY(ARRAY['monenfant', 'action-logement', 'soliguide', 'reseau-alpha', 'mediation-numerique']))
THEN services.zone_diffusion_code
WHEN services.zone_diffusion_type = 'communes' AND adresses.code_insee IS NOT NULL
THEN adresses.code_insee
WHEN services.zone_diffusion_type = 'departement' AND adresses.code_departement IS NOT NULL
THEN adresses.code_departement
ELSE services.zone_diffusion_code
END AS "zone_diffusion_code",
END AS "zone_diffusion_code",
CASE
WHEN services.source = ANY(ARRAY['monenfant', 'soliguide']) THEN adresses.commune
WHEN services.source = ANY(ARRAY['reseau-alpha', 'action-logement']) THEN (SELECT departements."nom" FROM departements WHERE departements."code" = LEFT(adresses.code_insee, 2))
WHEN services.source = 'mediation-numerique' THEN (SELECT departements."nom" FROM departements WHERE departements."code" = services.zone_diffusion_code)
WHEN NOT (services.source = ANY(ARRAY['monenfant', 'action-logement', 'soliguide', 'reseau-alpha', 'mediation-numerique']))
THEN services.zone_diffusion_nom
WHEN services.zone_diffusion_type = 'communes' AND adresses.commune IS NOT NULL
THEN adresses.commune
WHEN services.zone_diffusion_type = 'departement' AND departements.nom IS NOT NULL
THEN departements.nom
ELSE services.zone_diffusion_nom
END AS "zone_diffusion_nom"
FROM
services
LEFT JOIN adresses ON services._di_adresse_surrogate_id = adresses._di_surrogate_id
END AS "zone_diffusion_nom"
FROM services_with_valid_structure AS services
LEFT JOIN adresses_with_code_departement AS adresses
ON services._di_adresse_surrogate_id = adresses._di_surrogate_id
LEFT JOIN departements
ON adresses.code_departement = departements.code
),

services_with_valid_structure AS (
SELECT services_with_zone_diffusion.*
FROM services_with_zone_diffusion
INNER JOIN structures ON services_with_zone_diffusion._di_structure_surrogate_id = structures._di_surrogate_id
services_with_zone_diffusion AS (
SELECT
{{
dbt_utils.star(
from=ref('int__union_services'),
relation_alias='services',
except=["zone_diffusion_code", "zone_diffusion_nom"]
)
}},
zones_diffusion.zone_diffusion_code AS "zone_diffusion_code",
zones_diffusion.zone_diffusion_nom AS "zone_diffusion_nom"
FROM services
LEFT JOIN zones_diffusion
ON services._di_surrogate_id = zones_diffusion._di_surrogate_id
),

valid_services AS (
SELECT services_with_valid_structure.*
FROM services_with_valid_structure
SELECT services.*
FROM services_with_zone_diffusion AS services
LEFT JOIN
LATERAL
LIST_SERVICE_ERRORS(
services_with_valid_structure.contact_public,
services_with_valid_structure.contact_nom_prenom,
services_with_valid_structure.courriel,
services_with_valid_structure.cumulable,
services_with_valid_structure.date_creation,
services_with_valid_structure.date_maj,
services_with_valid_structure.date_suspension,
services_with_valid_structure.frais,
services_with_valid_structure.frais_autres,
services_with_valid_structure.id,
services_with_valid_structure.justificatifs,
services_with_valid_structure.lien_source,
services_with_valid_structure.modes_accueil,
services_with_valid_structure.modes_orientation_accompagnateur,
services_with_valid_structure.modes_orientation_accompagnateur_autres,
services_with_valid_structure.modes_orientation_beneficiaire,
services_with_valid_structure.modes_orientation_beneficiaire_autres,
services_with_valid_structure.nom,
services_with_valid_structure.page_web,
services_with_valid_structure.presentation_detail,
services_with_valid_structure.presentation_resume,
services_with_valid_structure.prise_rdv,
services_with_valid_structure.profils,
services_with_valid_structure.recurrence,
services_with_valid_structure.source,
services_with_valid_structure.structure_id,
services_with_valid_structure.telephone,
services_with_valid_structure.thematiques,
services_with_valid_structure.types,
services_with_valid_structure.zone_diffusion_code,
services_with_valid_structure.zone_diffusion_nom,
services_with_valid_structure.zone_diffusion_type,
services_with_valid_structure.pre_requis
services.contact_public,
services.contact_nom_prenom,
services.courriel,
services.cumulable,
services.date_creation,
services.date_maj,
services.date_suspension,
services.frais,
services.frais_autres,
services.id,
services.justificatifs,
services.lien_source,
services.modes_accueil,
services.modes_orientation_accompagnateur,
services.modes_orientation_accompagnateur_autres,
services.modes_orientation_beneficiaire,
services.modes_orientation_beneficiaire_autres,
services.nom,
services.page_web,
services.presentation_detail,
services.presentation_resume,
services.prise_rdv,
services.profils,
services.recurrence,
services.source,
services.structure_id,
services.telephone,
services.thematiques,
services.types,
services.zone_diffusion_code,
services.zone_diffusion_nom,
services.zone_diffusion_type,
services.pre_requis
) AS errors ON TRUE
WHERE errors.field IS NULL
),

final AS (
SELECT
valid_services.*,
services.*,
adresses.longitude AS "longitude",
adresses.latitude AS "latitude",
adresses.complement_adresse AS "complement_adresse",
Expand All @@ -94,8 +134,9 @@ final AS (
adresses.code_postal AS "code_postal",
adresses.code_insee AS "code_insee"
FROM
valid_services
LEFT JOIN adresses ON valid_services._di_adresse_surrogate_id = adresses._di_surrogate_id
valid_services AS services
LEFT JOIN adresses_with_code_departement AS adresses
ON services._di_adresse_surrogate_id = adresses._di_surrogate_id
)

SELECT * FROM final
Loading
Loading