diff --git a/.github/workflows/lint.yaml b/.github/workflows/lint.yaml index 8b0220cbb..904b1f010 100644 --- a/.github/workflows/lint.yaml +++ b/.github/workflows/lint.yaml @@ -23,6 +23,13 @@ jobs: # list of changed files within `super-linter` fetch-depth: 0 + # Superlinter will load renv if .Rprofile is present - it will then fail + # because the renv environment doesn't have lintr. Removing the .Rprofile + # file loads the default superlinter R environment. + - name: Disable renv + shell: bash + run: rm etl/.Rprofile + - name: Lint uses: github/super-linter@v6 env: diff --git a/dbt/models/spatial/spatial.stadium.sql b/dbt/models/spatial/spatial.stadium.sql index 81c99d1c7..e01067b62 100644 --- a/dbt/models/spatial/spatial.stadium.sql +++ b/dbt/models/spatial/spatial.stadium.sql @@ -1,5 +1,5 @@ -{{ - config(materialized='table') +{{ + config(materialized='table') }} WITH distinct_years AS ( @@ -26,6 +26,7 @@ SELECT DISTINCT CAST(stadium_years.year AS VARCHAR) AS year, ST_ASBINARY(ST_POINT(stadium_years.lon, stadium_years.lat)) AS geometry, ST_ASBINARY(ST_POINT(stadium_years.x_3435, stadium_years.y_3435)) - AS geometry_3435 + AS geometry_3435, + DATE_FORMAT(CURRENT_TIMESTAMP, '%Y-%m-%d %H:%i:%s.%f') AS loaded_at FROM stadium_years ORDER BY year, name diff --git a/etl/renv.lock b/etl/renv.lock index 4f6f0dc6f..3d32c3624 100644 --- a/etl/renv.lock +++ b/etl/renv.lock @@ -1,6 +1,6 @@ { "R": { - "Version": "4.4.2", + "Version": "4.4.1", "Repositories": [ { "Name": "CRAN", @@ -2808,6 +2808,19 @@ ], "Hash": "ad57b543f7c3fca05213ba78ff63df9b" }, + "sfarrow": { + "Package": "sfarrow", + "Version": "0.4.1", + "Source": "Repository", + "Repository": "CRAN", + "Requirements": [ + "arrow", + "dplyr", + "jsonlite", + "sf" + ], + "Hash": "b320f164b1d7bb7e4582b841e22d15a0" + }, "sfheaders": { "Package": "sfheaders", "Version": "0.4.4", diff --git a/etl/scripts-ccao-data-raw-us-east-1/ccao/ccao-condominium-pin_condo_char.R b/etl/scripts-ccao-data-raw-us-east-1/ccao/ccao-condominium-pin_condo_char.R index 72c93a3a4..539a6fa2f 100644 --- a/etl/scripts-ccao-data-raw-us-east-1/ccao/ccao-condominium-pin_condo_char.R +++ b/etl/scripts-ccao-data-raw-us-east-1/ccao/ccao-condominium-pin_condo_char.R @@ -19,7 +19,8 @@ output_bucket <- file.path( source_paths <- c( "//fileserver/ocommon/2022 Data Collection/Condo Project/William Approved Layout North Tri Condo Project FINAL COMPLETED/", # nolint "//fileserver/ocommon/2023 Data Collection/South Tri Condo Project COMPLETED", - "//fileserver/ocommon/2024 Data Collection/City Tri Condo Characteristics COMPLETED" # nolint + "//fileserver/ocommon/2024 Data Collection/City Tri Condo Characteristics COMPLETED", # nolint + "//fileserver/ocommon/CCAODATA/data/condo_chars_2025_completed" # Local copy of sharepoint file ) source_files <- grep( diff --git a/etl/scripts-ccao-data-raw-us-east-1/housing/housing-ihs_index.R b/etl/scripts-ccao-data-raw-us-east-1/housing/housing-ihs_index.R index 4ec2b4130..8e8dab9ea 100644 --- a/etl/scripts-ccao-data-raw-us-east-1/housing/housing-ihs_index.R +++ b/etl/scripts-ccao-data-raw-us-east-1/housing/housing-ihs_index.R @@ -26,13 +26,13 @@ remote_file <- file.path(output_bucket, paste0("ihs_price_index_data.parquet")) # Grab the data, clean it just a bit, and write if it doesn't already exist data.frame(t( - openxlsx::read.xlsx(most_recent_ihs_data_url, sheet = 2) %>% - dplyr::select(-c("X2", "X3", "X4")) + read.xlsx(most_recent_ihs_data_url, sheet = 2) %>% + select(-c("X2", "X3", "X4")) )) %>% # Names and columns are kind of a mess after the transpose, # shift up first row, shift over column names - janitor::row_to_names(1) %>% - dplyr::mutate(puma = rownames(.)) %>% - dplyr::relocate(puma, .before = "YEARQ") %>% - dplyr::rename(name = "YEARQ") %>% - arrow::write_parquet(remote_file) + row_to_names(1) %>% + mutate(puma = rownames(.)) %>% + relocate(puma, .before = "YEARQ") %>% + rename(name = "YEARQ") %>% + write_parquet(remote_file) diff --git a/etl/scripts-ccao-data-raw-us-east-1/sale/sale-mydec.R b/etl/scripts-ccao-data-raw-us-east-1/sale/sale-mydec.R index 68b288942..fb87794b7 100644 --- a/etl/scripts-ccao-data-raw-us-east-1/sale/sale-mydec.R +++ b/etl/scripts-ccao-data-raw-us-east-1/sale/sale-mydec.R @@ -46,6 +46,5 @@ down_up <- function(x) { } } - # Apply function to foreclosure data walk(files, down_up) diff --git a/etl/scripts-ccao-data-raw-us-east-1/spatial/spatial-access.R b/etl/scripts-ccao-data-raw-us-east-1/spatial/spatial-access.R index 44a1eb523..6237a16b5 100644 --- a/etl/scripts-ccao-data-raw-us-east-1/spatial/spatial-access.R +++ b/etl/scripts-ccao-data-raw-us-east-1/spatial/spatial-access.R @@ -12,6 +12,8 @@ output_bucket <- file.path(AWS_S3_RAW_BUCKET, "spatial", "access") # List APIs from city site sources_list <- bind_rows(list( # INDUSTRIAL CORRIDORS + # See https://data.cityofchicago.org/Community-Economic-Development/IndustrialCorridor_Jan2013/3tu3-iesz/about_data # nolint + # for more information "ind_2013" = c( "source" = "https://data.cityofchicago.org/api/geospatial/", "api_url" = "e6xh-nr8w?method=export&format=GeoJSON", diff --git a/etl/scripts-ccao-data-raw-us-east-1/spatial/spatial-ccao.R b/etl/scripts-ccao-data-raw-us-east-1/spatial/spatial-ccao.R index 745c4581c..0d0b8c533 100644 --- a/etl/scripts-ccao-data-raw-us-east-1/spatial/spatial-ccao.R +++ b/etl/scripts-ccao-data-raw-us-east-1/spatial/spatial-ccao.R @@ -11,19 +11,16 @@ output_bucket <- file.path(AWS_S3_RAW_BUCKET, "spatial", "ccao") # Read privileges for the this drive location are limited. # Contact Cook County GIS if permissions need to be changed. -file_path <- "//gisemcv1.ccounty.com/ArchiveServices/" +file_path <- "//gisemcv1.ccounty.com/ArchiveServices/" # nolint -sources_list <- bind_rows(list( +sources_list <- data.frame( # NEIGHBORHOOD - "neighborhood" = c( - "url" = paste0( - "https://gitlab.com/ccao-data-science---modeling/packages/ccao", - "/-/raw/master/data-raw/nbhd_shp.geojson" - ), - "boundary" = "neighborhood", - "year" = "2021" - ) -)) + "url" = paste0( + "https://github.com/ccao-data/ccao/blob/master/data-raw/nbhd_shp.geojson" + ), + "boundary" = "neighborhood", + "year" = "2021" +) # Function to call referenced API, pull requested data, and write it to S3 pwalk(sources_list, function(...) { @@ -45,6 +42,7 @@ gdb_files <- data.frame("path" = list.files(file_path, full.names = TRUE)) %>% filter( str_detect(path, "Current", negate = TRUE) & str_detect(path, "20") & + # We detect parcel GDBs, but will extract the township layer str_detect(path, "Parcels") ) diff --git a/etl/scripts-ccao-data-raw-us-east-1/spatial/spatial-political.R b/etl/scripts-ccao-data-raw-us-east-1/spatial/spatial-political.R index 2acad2bd3..c77905045 100644 --- a/etl/scripts-ccao-data-raw-us-east-1/spatial/spatial-political.R +++ b/etl/scripts-ccao-data-raw-us-east-1/spatial/spatial-political.R @@ -12,7 +12,7 @@ output_bucket <- file.path(AWS_S3_RAW_BUCKET, "spatial", "political") # Read privileges for the this drive location are limited. # Contact Cook County GIS if permissions need to be changed. -file_path <- "//gisemcv1.ccounty.com/ArchiveServices/" +file_path <- "//gisemcv1.ccounty.com/ArchiveServices/" # nolint sources_list <- bind_rows(list( # BOARD OF REVIEW @@ -23,7 +23,7 @@ sources_list <- bind_rows(list( "year" = "2012" ), "bor_2023" = c( - "source" = "https://gis.cookcountyil.gov/traditional/rest/services/politicalBoundary/MapServer/", + "source" = "https://gis.cookcountyil.gov/traditional/rest/services/politicalBoundary/MapServer/", # nolint "api_url" = "10/query?outFields=*&where=1%3D1&f=geojson", "boundary" = "board_of_review_district", "year" = "2023" @@ -37,7 +37,7 @@ sources_list <- bind_rows(list( "year" = "2012" ), "cmd_2023" = c( - "source" = "https://gis.cookcountyil.gov/traditional/rest/services/politicalBoundary/MapServer/", + "source" = "https://gis.cookcountyil.gov/traditional/rest/services/politicalBoundary/MapServer/", # nolint "api_url" = "9/query?outFields=*&where=1%3D1&f=geojson", "boundary" = "commissioner_district", "year" = "2023" @@ -51,7 +51,7 @@ sources_list <- bind_rows(list( "year" = "2010" ), "cnd_2023" = c( - "source" = "https://gis.cookcountyil.gov/traditional/rest/services/politicalBoundary/MapServer/", + "source" = "https://gis.cookcountyil.gov/traditional/rest/services/politicalBoundary/MapServer/", # nolint "api_url" = "13/query?outFields=*&where=1%3D1&f=geojson", "boundary" = "congressional_district", "year" = "2023" @@ -65,7 +65,7 @@ sources_list <- bind_rows(list( "year" = "2012" ), "jsd_2022" = c( - "source" = "https://gis.cookcountyil.gov/traditional/rest/services/politicalBoundary/MapServer/", + "source" = "https://gis.cookcountyil.gov/traditional/rest/services/politicalBoundary/MapServer/", # nolint "api_url" = "5/query?outFields=*&where=1%3D1&f=geojson", "boundary" = "judicial_district", "year" = "2022" @@ -79,7 +79,7 @@ sources_list <- bind_rows(list( "year" = "2010" ), "str_2023" = c( - "source" = "https://gis.cookcountyil.gov/traditional/rest/services/politicalBoundary/MapServer/", + "source" = "https://gis.cookcountyil.gov/traditional/rest/services/politicalBoundary/MapServer/", # nolint "api_url" = "11/query?outFields=*&where=1%3D1&f=geojson", "boundary" = "state_representative_district", "year" = "2023" @@ -93,7 +93,7 @@ sources_list <- bind_rows(list( "year" = "2010" ), "sts_2023" = c( - "source" = "https://gis.cookcountyil.gov/traditional/rest/services/politicalBoundary/MapServer/", + "source" = "https://gis.cookcountyil.gov/traditional/rest/services/politicalBoundary/MapServer/", # nolint "api_url" = "12/query?outFields=*&where=1%3D1&f=geojson", "boundary" = "state_senate_district", "year" = "2023" diff --git a/etl/scripts-ccao-data-raw-us-east-1/spatial/spatial-school.R b/etl/scripts-ccao-data-raw-us-east-1/spatial/spatial-school.R index f4f08742e..fd22da874 100644 --- a/etl/scripts-ccao-data-raw-us-east-1/spatial/spatial-school.R +++ b/etl/scripts-ccao-data-raw-us-east-1/spatial/spatial-school.R @@ -121,6 +121,12 @@ sources_list <- bind_rows(list( "boundary" = "cps_attendance_elementary", "year" = "2023-2024" ), + "attendance_ele_2025" = c( + "source" = "https://data.cityofchicago.org/api/geospatial/", + "api_url" = "5ihw-cbdn?method=export&format=GeoJSON", + "boundary" = "cps_attendance_elementary", + "year" = "2024-2025" + ), # CPS ATTENDANCE - SECONDARY "attendance_sec_0607" = c( @@ -231,6 +237,12 @@ sources_list <- bind_rows(list( "boundary" = "cps_attendance_secondary", "year" = "2023-2024" ), + "attendance_sec_2025" = c( + "source" = "https://data.cityofchicago.org/api/geospatial/", + "api_url" = "4kfz-zr3a?method=export&format=GeoJSON", + "boundary" = "cps_attendance_secondary", + "year" = "2024-2025" + ), # LOCATION "locations_all_21" = c( diff --git a/etl/scripts-ccao-data-raw-us-east-1/spatial/spatial-transit.R b/etl/scripts-ccao-data-raw-us-east-1/spatial/spatial-transit.R index a148a7297..4d8c89569 100644 --- a/etl/scripts-ccao-data-raw-us-east-1/spatial/spatial-transit.R +++ b/etl/scripts-ccao-data-raw-us-east-1/spatial/spatial-transit.R @@ -20,15 +20,25 @@ options(timeout = max(300, getOption("timeout"))) cta_feed_dates_list <- c( "2015-10-29", "2016-09-30", "2017-10-22", "2018-10-06", "2019-10-04", "2020-10-10", "2021-10-09", "2022-10-20", - "2023-10-04" + "2023-10-04", "2024-10-17" ) # If missing feed on S3, download and remove .htm file (causes errors) # then rezip and upload get_cta_feed <- function(feed_date) { - feed_url <- paste0( - "https://transitfeeds.com/p/chicago-transit-authority/165/", - str_remove_all(feed_date, "-"), "/download" + feed_url <- ifelse( + substr(feed_date, 1, 4) <= "2023", + paste0( + "https://transitfeeds.com/p/chicago-transit-authority/165/", + str_remove_all(feed_date, "-"), "/download" + ), + paste0( + "https://files.mobilitydatabase.org/mdb-389/mdb-389-", + str_remove_all(feed_date, "-"), + "0023/mdb-389-", + str_remove_all(feed_date, "-"), + "0023.zip" + ) ) s3_uri <- file.path(output_path, "cta", paste0(feed_date, "-gtfs.zip")) @@ -55,14 +65,26 @@ walk(cta_feed_dates_list, get_cta_feed) metra_feed_dates_list <- c( "2015-10-30", "2016-09-30", "2017-10-21", "2018-10-05", "2019-10-04", "2020-10-10", "2021-10-08", "2022-10-21", - "2023-10-14" + "2023-10-14", "2024-04-22" ) get_metra_feed <- function(feed_date) { - feed_url <- paste0( - "https://transitfeeds.com/p/metra/169/", - str_remove_all(feed_date, "-"), "/download" + + feed_url <- ifelse( + substr(feed_date, 1, 4) <= "2023", + paste0( + "https://transitfeeds.com/p/metra/169/", + str_remove_all(feed_date, "-"), "/download" + ), + paste0( + "https://files.mobilitydatabase.org/mdb-1187/mdb-1187-", + str_remove_all(feed_date, "-"), + "0016/mdb-1187-", + str_remove_all(feed_date, "-"), + "0016.zip" + ) ) + s3_uri <- file.path(output_path, "metra", paste0(feed_date, "-gtfs.zip")) if (!aws.s3::object_exists(s3_uri)) { @@ -80,7 +102,8 @@ walk(metra_feed_dates_list, get_metra_feed) ##### Pace ##### pace_feed_dates_list <- c( "2015-10-16", "2016-10-15", "2017-10-16", "2018-10-17", - "2019-10-22", "2020-09-23", "2021-03-15", "2023-09-24" + "2019-10-22", "2020-09-23", "2021-03-15", "2023-09-24", + "2024-02-07" ) get_pace_feed <- function(feed_date) { diff --git a/etl/scripts-ccao-data-warehouse-us-east-1/ccao/ccao-condominium-pin_condo_char.R b/etl/scripts-ccao-data-warehouse-us-east-1/ccao/ccao-condominium-pin_condo_char.R index 8c381b26e..2a000b617 100644 --- a/etl/scripts-ccao-data-warehouse-us-east-1/ccao/ccao-condominium-pin_condo_char.R +++ b/etl/scripts-ccao-data-warehouse-us-east-1/ccao/ccao-condominium-pin_condo_char.R @@ -1,218 +1,255 @@ -# This script cleans and combines raw condo characteristics data for the -# warehouse -library(arrow) -library(aws.s3) -library(DBI) -library(data.table) -library(dplyr) -library(glue) -library(noctua) -library(purrr) -library(stringr) -library(tidyr) -source("utils.R") - -# Declare raw and clean condo data locations -AWS_S3_RAW_BUCKET <- Sys.getenv("AWS_S3_RAW_BUCKET") -AWS_S3_WAREHOUSE_BUCKET <- Sys.getenv("AWS_S3_WAREHOUSE_BUCKET") -output_bucket <- file.path( - AWS_S3_WAREHOUSE_BUCKET, - "ccao", "condominium", "pin_condo_char" -) - -# Connect to Athena -AWS_ATHENA_CONN_NOCTUA <- dbConnect(noctua::athena()) - -# Get S3 file addresses -files <- grep( - ".parquet", - file.path( - AWS_S3_RAW_BUCKET, - aws.s3::get_bucket_df( - AWS_S3_RAW_BUCKET, - prefix = "ccao/condominium/pin_condo_char/" - )$Key - ), - value = TRUE -) - -# Grab sales/spatial data -classes <- dbGetQuery( - conn = AWS_ATHENA_CONN_NOCTUA, " - SELECT DISTINCT - parid AS pin, - class - FROM iasworld.pardat - WHERE taxyr = (SELECT MAX(taxyr) FROM iasworld.pardat) - AND class IN ('299', '399') - " -) - -# Grab all years of previously assembled condo data already present on Athena -years <- dbGetQuery( - conn = AWS_ATHENA_CONN_NOCTUA, " - SELECT DISTINCT year FROM ccao.pin_condo_char - " -) %>% - pull(year) - -# Function to grab chars data from Athena if it's already available -athena_chars <- function(x) { - dbGetQuery( - conn = AWS_ATHENA_CONN_NOCTUA, glue(" - SELECT * FROM ccao.pin_condo_char - WHERE year = '{x}' - ") - ) -} - -# A place to store characteristics data so we can stack it -chars <- list() - -# We use tax year, valuations uses year the work was done -for (i in c("2021", "2022", "2023")) { - if (!("2021" %in% years) && i == "2021") { - # If clean 2021 data is not already in Athena, load and clean it - chars[[i]] <- map( - grep("2022", files, value = TRUE), function(x) { - read_parquet(x) %>% - tibble(.name_repair = "unique") %>% - rename_with(~ tolower(.x)) %>% - mutate(pin = str_pad(parid, 14, side = "left", pad = "0")) %>% - select(contains(c("pin", "sqft", "bed", "source"))) %>% - select(-contains(c("x", "all", "search"))) %>% - rename_with(~"bedrooms", contains("bed")) %>% - rename_with(~"unit_sf", contains("unit")) %>% - rename_with(~"building_sf", contains("building")) - } - ) %>% - rbindlist(fill = TRUE) %>% - inner_join(classes) %>% - mutate(across(c(unit_sf, building_sf), ~ na_if(., "0"))) %>% - mutate(across(c(unit_sf, building_sf), ~ na_if(., "1"))) %>% - mutate( - across(c(building_sf, unit_sf, bedrooms), ~ gsub("[^0-9.-]", "", .)) - ) %>% - mutate(across(.cols = everything(), ~ trimws(., which = "both"))) %>% - na_if("") %>% - mutate( - bedrooms = case_when( - is.na(unit_sf) & bedrooms == "0" ~ NA_character_, - TRUE ~ bedrooms - ) - ) %>% - mutate(across(c(building_sf, unit_sf, bedrooms), ~ as.numeric(.))) %>% - mutate( - bedrooms = ceiling(bedrooms), - parking_pin = str_detect(source, "(?i)parking|garage") & - is.na(unit_sf) & is.na(building_sf), - year = "2021" - ) %>% - select(-c(class, source)) %>% - # These are obvious typos - mutate(unit_sf = case_when( - unit_sf == 28002000 ~ 2800, - unit_sf == 20002800 ~ 2000, - unit_sf == 182901 ~ 1829, - TRUE ~ unit_sf - )) - } else if (!("2022" %in% years) && i == "2022") { - # If clean 2022 data is not already in Athena, load and clean it - chars[[i]] <- lapply(grep("2023", files, value = TRUE), function(x) { - raw <- read_parquet(x)[, 1:20] - - names <- tolower(names(raw)) - names(raw) <- make.unique(names) - - raw %>% - select(!contains("pin")) %>% - rename_with(~ str_replace(.x, "iasworold", "iasworld")) %>% - mutate(pin = str_pad(iasworld_parid, 14, side = "left", pad = "0")) %>% - rename_with(~ str_replace_all(.x, "[[:space:]]", "")) %>% - rename_with(~ str_replace_all(.x, "\\.{4}", "")) %>% - select(!contains(c("1", "2", "all"))) %>% - select(contains(c("pin", "sq", "bed", "bath"))) %>% - rename_with(~"bedrooms", contains("bed")) %>% - rename_with(~"unit_sf", contains("unit")) %>% - rename_with(~"building_sf", contains(c("building", "bldg"))) %>% - rename_with(~"half_baths", contains("half")) %>% - rename_with(~"full_baths", contains("full")) %>% - mutate( - across(!contains("pin"), as.numeric), - year = "2022", - # Define a parking pin as a unit with only 0 or NA values for - # characteristics - parking_pin = case_when( - (bedrooms == 0 | unit_sf == 0) & - rowSums( - across(c(unit_sf, bedrooms, full_baths, half_baths)), - na.rm = TRUE - ) == 0 ~ TRUE, - TRUE ~ FALSE - ), - # Really low unit_sf should be considered NA - unit_sf = case_when( - unit_sf < 5 & !parking_pin ~ NA_real_, - TRUE ~ unit_sf - ), - # Assume missing half_baths value is 0 if there is full bathroom data - # for PIN - half_baths = case_when( - is.na(half_baths) & !is.na(full_baths) & full_baths > 0 ~ 0, - TRUE ~ half_baths - ), - # Make beds and baths are integers - across(c(half_baths, full_baths, bedrooms), ~ ceiling(.x)), - # Set all characteristics to NA for parking pins - across( - c(bedrooms, unit_sf, half_baths, full_baths), - ~ ifelse(parking_pin, NA, .x) - ) - ) - }) %>% - bind_rows() %>% - group_by(pin) %>% - arrange(unit_sf) %>% - filter(row_number() == 1) %>% - ungroup() %>% - filter(!is.na(pin)) - } else if (!("2023" %in% years) && i == "2023") { - chars[[i]] <- lapply(grep("2024", files, value = TRUE), function(x) { - read_parquet(x) %>% - select( - pin = "14.Digit.PIN", - building_sf = "Building.Square.Footage", - unit_sf = "Unit.Square.Footage", - bedrooms = "Bedrooms", - parking_pin = "Parking.Space.Change", - full_baths = "Full.Baths", - half_baths = "Half.Baths" - ) %>% - mutate( - pin = gsub("[^0-9]", "", pin), - parking_pin = if_all( - c(unit_sf, bedrooms, full_baths, half_baths), is.na - ) & !is.na(parking_pin), - year = "2023", - bedrooms = case_when(bedrooms > 15 ~ NA_real_, TRUE ~ bedrooms), - full_baths = case_when(full_baths > 10 ~ NA_real_, TRUE ~ full_baths), - unit_sf = case_when(unit_sf < 5 ~ NA_real_, TRUE ~ unit_sf) - ) - }) %>% - bind_rows() - } else { - # If data is already in Athena, just take it from there - chars[[i]] <- athena_chars(i) - } -} - -# Upload cleaned data to S3 -chars %>% - bind_rows() %>% - group_by(year) %>% - arrow::write_dataset( - path = output_bucket, - format = "parquet", - hive_style = TRUE, - compression = "snappy" - ) +# This script cleans and combines raw condo characteristics data for the +# warehouse +library(arrow) +library(aws.s3) +library(DBI) +library(data.table) +library(dplyr) +library(glue) +library(noctua) +library(purrr) +library(stringr) +library(tidyr) +source("utils.R") + +# Declare raw and clean condo data locations +AWS_S3_RAW_BUCKET <- Sys.getenv("AWS_S3_RAW_BUCKET") +AWS_S3_WAREHOUSE_BUCKET <- Sys.getenv("AWS_S3_WAREHOUSE_BUCKET") +output_bucket <- file.path( + AWS_S3_WAREHOUSE_BUCKET, + "ccao", "condominium", "pin_condo_char" +) + +# Connect to Athena +AWS_ATHENA_CONN_NOCTUA <- dbConnect(noctua::athena()) + +# Get S3 file addresses +files <- grep( + ".parquet", + file.path( + AWS_S3_RAW_BUCKET, + aws.s3::get_bucket_df( + AWS_S3_RAW_BUCKET, + prefix = "ccao/condominium/pin_condo_char/" + )$Key + ), + value = TRUE +) + +# Grab sales/spatial data +classes <- dbGetQuery( + conn = AWS_ATHENA_CONN_NOCTUA, " + SELECT DISTINCT + parid AS pin, + class + FROM iasworld.pardat + WHERE taxyr = (SELECT MAX(taxyr) FROM iasworld.pardat) + AND class IN ('299', '399') + " +) + +# Grab all years of previously assembled condo data already present on Athena +years <- dbGetQuery( + conn = AWS_ATHENA_CONN_NOCTUA, " + SELECT DISTINCT year FROM ccao.pin_condo_char + " +) %>% + pull(year) + +# Function to grab chars data from Athena if it's already available +athena_chars <- function(x) { + dbGetQuery( + conn = AWS_ATHENA_CONN_NOCTUA, glue(" + SELECT * FROM ccao.pin_condo_char + WHERE year = '{x}' + ") + ) +} + +# A place to store characteristics data so we can stack it +chars <- list() + +# We use tax year, valuations uses year the work was done +for (i in c("2021", "2022", "2023")) { + if (!("2021" %in% years) && i == "2021") { + # If clean 2021 data is not already in Athena, load and clean it + chars[[i]] <- map( + grep("2022", files, value = TRUE), function(x) { + read_parquet(x) %>% + tibble(.name_repair = "unique") %>% + rename_with(~ tolower(.x)) %>% + mutate(pin = str_pad(parid, 14, side = "left", pad = "0")) %>% + select(contains(c("pin", "sqft", "bed", "source"))) %>% + select(-contains(c("x", "all", "search"))) %>% + rename_with(~"bedrooms", contains("bed")) %>% + rename_with(~"unit_sf", contains("unit")) %>% + rename_with(~"building_sf", contains("building")) + } + ) %>% + rbindlist(fill = TRUE) %>% + inner_join(classes) %>% + mutate(across(c(unit_sf, building_sf), ~ na_if(., "0"))) %>% + mutate(across(c(unit_sf, building_sf), ~ na_if(., "1"))) %>% + mutate( + across(c(building_sf, unit_sf, bedrooms), ~ gsub("[^0-9.-]", "", .)) + ) %>% + mutate(across(.cols = everything(), ~ trimws(., which = "both"))) %>% + na_if("") %>% + mutate( + bedrooms = case_when( + is.na(unit_sf) & bedrooms == "0" ~ NA_character_, + TRUE ~ bedrooms + ) + ) %>% + mutate(across(c(building_sf, unit_sf, bedrooms), ~ as.numeric(.))) %>% + mutate( + bedrooms = ceiling(bedrooms), + parking_pin = str_detect(source, "(?i)parking|garage") & + is.na(unit_sf) & is.na(building_sf), + year = "2021" + ) %>% + select(-c(class, source)) %>% + # These are obvious typos + mutate(unit_sf = case_when( + unit_sf == 28002000 ~ 2800, + unit_sf == 20002800 ~ 2000, + unit_sf == 182901 ~ 1829, + TRUE ~ unit_sf + )) + } else if (!("2022" %in% years) && i == "2022") { + # If clean 2022 data is not already in Athena, load and clean it + chars[[i]] <- lapply(grep("2023", files, value = TRUE), function(x) { + raw <- read_parquet(x)[, 1:20] + + names <- tolower(names(raw)) + names(raw) <- make.unique(names) + + raw %>% + select(!contains("pin")) %>% + rename_with(~ str_replace(.x, "iasworold", "iasworld")) %>% + mutate(pin = str_pad(iasworld_parid, 14, side = "left", pad = "0")) %>% + rename_with(~ str_replace_all(.x, "[[:space:]]", "")) %>% + rename_with(~ str_replace_all(.x, "\\.{4}", "")) %>% + select(!contains(c("1", "2", "all"))) %>% + select(contains(c("pin", "sq", "bed", "bath"))) %>% + rename_with(~"bedrooms", contains("bed")) %>% + rename_with(~"unit_sf", contains("unit")) %>% + rename_with(~"building_sf", contains(c("building", "bldg"))) %>% + rename_with(~"half_baths", contains("half")) %>% + rename_with(~"full_baths", contains("full")) %>% + mutate( + across(!contains("pin"), as.numeric), + year = "2022", + # Define a parking pin as a unit with only 0 or NA values for + # characteristics + parking_pin = case_when( + (bedrooms == 0 | unit_sf == 0) & + rowSums( + across(c(unit_sf, bedrooms, full_baths, half_baths)), + na.rm = TRUE + ) == 0 ~ TRUE, + TRUE ~ FALSE + ), + # Really low unit_sf should be considered NA + unit_sf = case_when( + unit_sf < 5 & !parking_pin ~ NA_real_, + TRUE ~ unit_sf + ), + # Assume missing half_baths value is 0 if there is full bathroom data + # for PIN + half_baths = case_when( + is.na(half_baths) & !is.na(full_baths) & full_baths > 0 ~ 0, + TRUE ~ half_baths + ), + # Make beds and baths are integers + across(c(half_baths, full_baths, bedrooms), ~ ceiling(.x)), + # Set all characteristics to NA for parking pins + across( + c(bedrooms, unit_sf, half_baths, full_baths), + ~ ifelse(parking_pin, NA, .x) + ) + ) + }) %>% + bind_rows() %>% + group_by(pin) %>% + arrange(unit_sf) %>% + filter(row_number() == 1) %>% + ungroup() %>% + filter(!is.na(pin)) + } else if (!("2023" %in% years) && i == "2023") { + chars[[i]] <- lapply(grep("2024", files, value = TRUE), function(x) { + read_parquet(x) %>% + select( + pin = "14.Digit.PIN", + building_sf = "Building.Square.Footage", + unit_sf = "Unit.Square.Footage", + bedrooms = "Bedrooms", + parking_pin = "Parking.Space.Change", + full_baths = "Full.Baths", + half_baths = "Half.Baths" + ) %>% + mutate( + pin = gsub("[^0-9]", "", pin), + parking_pin = if_all( + c(unit_sf, bedrooms, full_baths, half_baths), is.na + ) & !is.na(parking_pin), + year = "2023", + bedrooms = case_when(bedrooms > 15 ~ NA_real_, TRUE ~ bedrooms), + full_baths = case_when(full_baths > 10 ~ NA_real_, TRUE ~ full_baths), + unit_sf = case_when(unit_sf < 5 ~ NA_real_, TRUE ~ unit_sf) + ) + }) %>% + bind_rows() + } else { + # If data is already in Athena, just take it from there + chars[[i]] <- athena_chars(i) + } +} + +# At the end of 2024 valuations revisited some old condos and updated their +# characteristics +updates <- map( + file.path( + "s3://ccao-data-raw-us-east-1", + aws.s3::get_bucket_df( + AWS_S3_RAW_BUCKET, + prefix = "ccao/condominium/pin_condo_char/2025" + )$Key), + \(x) { + read_parquet(x) %>% + mutate(across(.cols = everything(), as.character)) + }) %>% + bind_rows() %>% + rename_with(~gsub("\\.", "_", tolower(.x)), .cols = everything()) %>% + select("pin", starts_with("new")) %>% + mutate( + pin = gsub("-", "", pin), + across(starts_with("new"), as.numeric), + # Three units with 100 for unit sqft + new_unit_sf = ifelse(new_unit_sf == 100, 1000, new_unit_sf) + ) %>% + filter(!if_all(starts_with("new"), is.na)) + +# Update parcels with new column values +chars <- chars %>% + bind_rows() %>% + left_join(updates, by = "pin") %>% + mutate( + building_sf = coalesce(new_building_sf, building_sf), + unit_sf = coalesce(new_unit_sf, unit_sf), + bedrooms = coalesce(new_bedrooms, bedrooms), + full_baths = coalesce(new_full_baths, full_baths), + half_baths = coalesce(new_half_baths, half_baths) + ) %>% + select(!starts_with("new")) + +# Upload cleaned data to S3 +chars %>% + mutate(loaded_at = as.character(Sys.time())) %>% + group_by(year) %>% + arrow::write_dataset( + path = output_bucket, + format = "parquet", + hive_style = TRUE, + compression = "snappy" + ) diff --git a/etl/scripts-ccao-data-warehouse-us-east-1/ccao/ccao-condominium_parking.R b/etl/scripts-ccao-data-warehouse-us-east-1/ccao/ccao-condominium_parking.R index cee094c33..61b2dc5f6 100644 --- a/etl/scripts-ccao-data-warehouse-us-east-1/ccao/ccao-condominium_parking.R +++ b/etl/scripts-ccao-data-warehouse-us-east-1/ccao/ccao-condominium_parking.R @@ -65,6 +65,7 @@ nonlivable[["neg_pred"]] <- map( # Upload all nonlivable spaces to nonlivable table nonlivable %>% bind_rows() %>% + mutate(loaded_at = as.character(Sys.time())) %>% group_by(year) %>% arrow::write_dataset( path = output_bucket, diff --git a/etl/scripts-ccao-data-warehouse-us-east-1/ccao/ccao-land-land_nbhd_rate.R b/etl/scripts-ccao-data-warehouse-us-east-1/ccao/ccao-land-land_nbhd_rate.R index 9aa97fa86..5c9ee49bd 100644 --- a/etl/scripts-ccao-data-warehouse-us-east-1/ccao/ccao-land-land_nbhd_rate.R +++ b/etl/scripts-ccao-data-warehouse-us-east-1/ccao/ccao-land-land_nbhd_rate.R @@ -1,149 +1,150 @@ -library(arrow) -library(aws.s3) -library(dplyr) -library(noctua) -library(openxlsx) -library(purrr) -library(readr) -library(snakecase) -library(stringr) -library(tidyr) -source("utils.R") - -# This script retrieves and cleans land value spreadsheets provided by -# the Valuations department and formats them for use in Athena -AWS_S3_RAW_BUCKET <- Sys.getenv("AWS_S3_RAW_BUCKET") -AWS_S3_WAREHOUSE_BUCKET <- Sys.getenv("AWS_S3_WAREHOUSE_BUCKET") -input_bucket <- file.path(AWS_S3_RAW_BUCKET, "ccao", "land") -output_bucket <- file.path(AWS_S3_WAREHOUSE_BUCKET, "ccao", "land") - -AWS_ATHENA_CONN_NOCTUA <- dbConnect(noctua::athena(), rstudio_conn_tab = FALSE) - -# Location of remote files -remote_file_raw_nbhd_rate_2022 <- file.path( - input_bucket, "nbhd_rate", "2022.xlsx" -) -remote_file_raw_nbhd_rate_2023 <- file.path( - input_bucket, "nbhd_rate", "2023.xlsx" -) -remote_file_raw_nbhd_rate_2024 <- file.path( - input_bucket, "nbhd_rate", "2024.xlsx" -) -remote_file_warehouse_nbhd_rate <- file.path( - output_bucket, "land_nbhd_rate" -) - - -# Temp file to download workbook -tmp_file_nbhd_rate_2022 <- tempfile(fileext = ".xlsx") -tmp_file_nbhd_rate_2023 <- tempfile(fileext = ".xlsx") -tmp_file_nbhd_rate_2024 <- tempfile(fileext = ".xlsx") - -# Grab the workbook from the raw S3 bucket -aws.s3::save_object( - object = remote_file_raw_nbhd_rate_2022, - file = tmp_file_nbhd_rate_2022 -) -aws.s3::save_object( - object = remote_file_raw_nbhd_rate_2023, - file = tmp_file_nbhd_rate_2023 -) -aws.s3::save_object( - object = remote_file_raw_nbhd_rate_2024, - file = tmp_file_nbhd_rate_2024 -) - -# List of regression classes -class <- dbGetQuery( - AWS_ATHENA_CONN_NOCTUA, - "SELECT class_code FROM ccao.class_dict WHERE regression_class" -) %>% - pull(class_code) - -# Load the raw workbooks, rename and clean up columns -land_nbhd_rate_2022 <- openxlsx::read.xlsx(tmp_file_nbhd_rate_2022) %>% - set_names(snakecase::to_snake_case(names(.))) %>% - select( - township_code = twp_number, - township_name = twp_name, - town_nbhd = twp_nbhd, - `2019` = `2019_rate`, - `2022` = `2022_rate` - ) %>% - pivot_longer( - c(`2019`, `2022`), - names_to = "year", values_to = "land_rate_per_sqft" - ) %>% - mutate( - across(c(township_code:town_nbhd, year), as.character), - town_nbhd = str_remove_all(town_nbhd, "-"), - land_rate_per_sqft = parse_number(land_rate_per_sqft) - ) %>% - expand_grid(class) - -land_nbhd_rate_2023 <- openxlsx::read.xlsx(tmp_file_nbhd_rate_2023) %>% - set_names(snakecase::to_snake_case(names(.))) %>% - select( - town_nbhd = neighborhood_id, - `2020` = `2020_2_00_class_unit_price`, - `2023` = `2023_2_00_class_unit_price` - ) %>% - mutate( - town_nbhd = gsub("\\D", "", town_nbhd), - township_code = substr(town_nbhd, 1, 2), - township_name = ccao::town_convert(township_code) - ) %>% - relocate(c(township_code, township_name)) %>% - pivot_longer( - c(`2020`, `2023`), - names_to = "year", values_to = "land_rate_per_sqft" - ) %>% - mutate(across(c(township_code:town_nbhd, year), as.character)) %>% - expand_grid(class) - -land_nbhd_rate_2024 <- openxlsx::read.xlsx(tmp_file_nbhd_rate_2024) %>% - set_names(snakecase::to_snake_case(names(.))) %>% - mutate( - town_nbhd = paste0( - township_code, str_pad(neighborhood, 3, side = "left", pad = "0") - ) - ) %>% - select( - town_nbhd, - classes, - `2021` = `2021_unit_price`, - `2024` = `2024_unit_price` - ) %>% - mutate( - town_nbhd = gsub("\\D", "", town_nbhd), - township_code = substr(town_nbhd, 1, 2), - township_name = ccao::town_convert(township_code) - ) %>% - relocate(c(township_code, township_name)) %>% - pivot_longer( - c(`2021`, `2024`), - names_to = "year", values_to = "land_rate_per_sqft" - ) %>% - mutate(across(c(township_code:town_nbhd, year), as.character)) %>% - expand_grid(class) %>% - # 2024 contains bifurcated neighborhood land rates across class - filter( - !(classes == "all other regression classes" & class %in% c("210", "295")), - !(classes == "2-10s/2-95s" & !(class %in% c("210", "295"))) - ) %>% - select(-classes) - -# Write the rates to S3, partitioned by year -bind_rows( - land_nbhd_rate_2022, - land_nbhd_rate_2023, - land_nbhd_rate_2024 -) %>% - relocate(land_rate_per_sqft, .after = last_col()) %>% - group_by(year) %>% - arrow::write_dataset( - path = remote_file_warehouse_nbhd_rate, - format = "parquet", - hive_style = TRUE, - compression = "snappy" - ) +library(arrow) +library(aws.s3) +library(dplyr) +library(noctua) +library(openxlsx) +library(purrr) +library(readr) +library(snakecase) +library(stringr) +library(tidyr) +source("utils.R") + +# This script retrieves and cleans land value spreadsheets provided by +# the Valuations department and formats them for use in Athena +AWS_S3_RAW_BUCKET <- Sys.getenv("AWS_S3_RAW_BUCKET") +AWS_S3_WAREHOUSE_BUCKET <- Sys.getenv("AWS_S3_WAREHOUSE_BUCKET") +input_bucket <- file.path(AWS_S3_RAW_BUCKET, "ccao", "land") +output_bucket <- file.path(AWS_S3_WAREHOUSE_BUCKET, "ccao", "land") + +AWS_ATHENA_CONN_NOCTUA <- dbConnect(noctua::athena(), rstudio_conn_tab = FALSE) + +# Location of remote files +remote_file_raw_nbhd_rate_2022 <- file.path( + input_bucket, "nbhd_rate", "2022.xlsx" +) +remote_file_raw_nbhd_rate_2023 <- file.path( + input_bucket, "nbhd_rate", "2023.xlsx" +) +remote_file_raw_nbhd_rate_2024 <- file.path( + input_bucket, "nbhd_rate", "2024.xlsx" +) +remote_file_warehouse_nbhd_rate <- file.path( + output_bucket, "land_nbhd_rate" +) + + +# Temp file to download workbook +tmp_file_nbhd_rate_2022 <- tempfile(fileext = ".xlsx") +tmp_file_nbhd_rate_2023 <- tempfile(fileext = ".xlsx") +tmp_file_nbhd_rate_2024 <- tempfile(fileext = ".xlsx") + +# Grab the workbook from the raw S3 bucket +aws.s3::save_object( + object = remote_file_raw_nbhd_rate_2022, + file = tmp_file_nbhd_rate_2022 +) +aws.s3::save_object( + object = remote_file_raw_nbhd_rate_2023, + file = tmp_file_nbhd_rate_2023 +) +aws.s3::save_object( + object = remote_file_raw_nbhd_rate_2024, + file = tmp_file_nbhd_rate_2024 +) + +# List of regression classes +class <- dbGetQuery( + AWS_ATHENA_CONN_NOCTUA, + "SELECT class_code FROM ccao.class_dict WHERE regression_class" +) %>% + pull(class_code) + +# Load the raw workbooks, rename and clean up columns +land_nbhd_rate_2022 <- openxlsx::read.xlsx(tmp_file_nbhd_rate_2022) %>% + set_names(snakecase::to_snake_case(names(.))) %>% + select( + township_code = twp_number, + township_name = twp_name, + town_nbhd = twp_nbhd, + `2019` = `2019_rate`, + `2022` = `2022_rate` + ) %>% + pivot_longer( + c(`2019`, `2022`), + names_to = "year", values_to = "land_rate_per_sqft" + ) %>% + mutate( + across(c(township_code:town_nbhd, year), as.character), + town_nbhd = str_remove_all(town_nbhd, "-"), + land_rate_per_sqft = parse_number(land_rate_per_sqft) + ) %>% + expand_grid(class) + +land_nbhd_rate_2023 <- openxlsx::read.xlsx(tmp_file_nbhd_rate_2023) %>% + set_names(snakecase::to_snake_case(names(.))) %>% + select( + town_nbhd = neighborhood_id, + `2020` = `2020_2_00_class_unit_price`, + `2023` = `2023_2_00_class_unit_price` + ) %>% + mutate( + town_nbhd = gsub("\\D", "", town_nbhd), + township_code = substr(town_nbhd, 1, 2), + township_name = ccao::town_convert(township_code) + ) %>% + relocate(c(township_code, township_name)) %>% + pivot_longer( + c(`2020`, `2023`), + names_to = "year", values_to = "land_rate_per_sqft" + ) %>% + mutate(across(c(township_code:town_nbhd, year), as.character)) %>% + expand_grid(class) + +land_nbhd_rate_2024 <- openxlsx::read.xlsx(tmp_file_nbhd_rate_2024) %>% + set_names(snakecase::to_snake_case(names(.))) %>% + mutate( + town_nbhd = paste0( + township_code, str_pad(neighborhood, 3, side = "left", pad = "0") + ) + ) %>% + select( + town_nbhd, + classes, + `2021` = `2021_unit_price`, + `2024` = `2024_unit_price` + ) %>% + mutate( + town_nbhd = gsub("\\D", "", town_nbhd), + township_code = substr(town_nbhd, 1, 2), + township_name = ccao::town_convert(township_code) + ) %>% + relocate(c(township_code, township_name)) %>% + pivot_longer( + c(`2021`, `2024`), + names_to = "year", values_to = "land_rate_per_sqft" + ) %>% + mutate(across(c(township_code:town_nbhd, year), as.character)) %>% + expand_grid(class) %>% + # 2024 contains bifurcated neighborhood land rates across class + filter( + !(classes == "all other regression classes" & class %in% c("210", "295")), + !(classes == "2-10s/2-95s" & !(class %in% c("210", "295"))) + ) %>% + select(-classes) + +# Write the rates to S3, partitioned by year +bind_rows( + land_nbhd_rate_2022, + land_nbhd_rate_2023, + land_nbhd_rate_2024 +) %>% + relocate(land_rate_per_sqft, .after = last_col()) %>% + mutate(loaded_at = as.character(Sys.time())) %>% + group_by(year) %>% + arrow::write_dataset( + path = remote_file_warehouse_nbhd_rate, + format = "parquet", + hive_style = TRUE, + compression = "snappy" + ) diff --git a/etl/scripts-ccao-data-warehouse-us-east-1/ccao/ccao-land-land_site_rate.R b/etl/scripts-ccao-data-warehouse-us-east-1/ccao/ccao-land-land_site_rate.R index 55c114c33..194aa7cb9 100644 --- a/etl/scripts-ccao-data-warehouse-us-east-1/ccao/ccao-land-land_site_rate.R +++ b/etl/scripts-ccao-data-warehouse-us-east-1/ccao/ccao-land-land_site_rate.R @@ -1,59 +1,60 @@ -library(arrow) -library(aws.s3) -library(dplyr) -library(openxlsx) -library(purrr) -library(readr) -library(snakecase) -library(stringr) -library(tidyr) -source("utils.R") - -# This script retrieves and cleans land value spreadsheets provided by -# the Valuations department and formats them for use in Athena -AWS_S3_RAW_BUCKET <- Sys.getenv("AWS_S3_RAW_BUCKET") -AWS_S3_WAREHOUSE_BUCKET <- Sys.getenv("AWS_S3_WAREHOUSE_BUCKET") -input_bucket <- file.path(AWS_S3_RAW_BUCKET, "ccao", "land") -output_bucket <- file.path(AWS_S3_WAREHOUSE_BUCKET, "ccao", "land") - -# Location of remote files -remote_file_raw_site_rate_2022 <- file.path( - input_bucket, "site_rate", "2022.xlsx" -) -remote_file_warehouse_site_rate <- file.path( - output_bucket, "land_site_rate" -) - -# Temp file to download workbook -tmp_file_site_rate_2022 <- tempfile(fileext = ".xlsx") - -# Grab the workbook from the raw S3 bucket -aws.s3::save_object( - object = remote_file_raw_site_rate_2022, - file = tmp_file_site_rate_2022 -) - -# Load the raw workbook, rename and clean up columns, then write to S3 -# partitioned by year -land_site_rate <- openxlsx::read.xlsx(tmp_file_site_rate_2022) %>% - set_names(snakecase::to_snake_case(names(.))) %>% - select( - pin = parid, - class, - town_nbhd = nbhd, - land_rate_per_pin = flat_townhome_value_2022, - land_rate_per_sqft = rate_sf_2022, - land_pct_tot_fmv = flat_tot_mv - ) %>% - mutate( - year = "2022", - across(c(town_nbhd, class), str_remove_all, "-"), - land_rate_per_pin = as.integer(land_rate_per_pin) - ) %>% - drop_na(pin, land_rate_per_pin) %>% - group_by(year) %>% - write_partitions_to_s3( - remote_file_warehouse_site_rate, - is_spatial = FALSE, - overwrite = TRUE - ) +library(arrow) +library(aws.s3) +library(dplyr) +library(openxlsx) +library(purrr) +library(readr) +library(snakecase) +library(stringr) +library(tidyr) +source("utils.R") + +# This script retrieves and cleans land value spreadsheets provided by +# the Valuations department and formats them for use in Athena +AWS_S3_RAW_BUCKET <- Sys.getenv("AWS_S3_RAW_BUCKET") +AWS_S3_WAREHOUSE_BUCKET <- Sys.getenv("AWS_S3_WAREHOUSE_BUCKET") +input_bucket <- file.path(AWS_S3_RAW_BUCKET, "ccao", "land") +output_bucket <- file.path(AWS_S3_WAREHOUSE_BUCKET, "ccao", "land") + +# Location of remote files +remote_file_raw_site_rate_2022 <- file.path( + input_bucket, "site_rate", "2022.xlsx" +) +remote_file_warehouse_site_rate <- file.path( + output_bucket, "land_site_rate" +) + +# Temp file to download workbook +tmp_file_site_rate_2022 <- tempfile(fileext = ".xlsx") + +# Grab the workbook from the raw S3 bucket +aws.s3::save_object( + object = remote_file_raw_site_rate_2022, + file = tmp_file_site_rate_2022 +) + +# Load the raw workbook, rename and clean up columns, then write to S3 +# partitioned by year +land_site_rate <- openxlsx::read.xlsx(tmp_file_site_rate_2022) %>% + set_names(snakecase::to_snake_case(names(.))) %>% + select( + pin = parid, + class, + town_nbhd = nbhd, + land_rate_per_pin = flat_townhome_value_2022, + land_rate_per_sqft = rate_sf_2022, + land_pct_tot_fmv = flat_tot_mv + ) %>% + mutate( + year = "2022", + across(c(town_nbhd, class), str_remove_all, "-"), + land_rate_per_pin = as.integer(land_rate_per_pin) + ) %>% + drop_na(pin, land_rate_per_pin) %>% + mutate(loaded_at = as.character(Sys.time())) %>% + group_by(year) %>% + write_partitions_to_s3( + remote_file_warehouse_site_rate, + is_spatial = FALSE, + overwrite = TRUE + ) diff --git a/etl/scripts-ccao-data-warehouse-us-east-1/ccao/ccao-legacy.R b/etl/scripts-ccao-data-warehouse-us-east-1/ccao/ccao-legacy.R index 84673e731..da8d64e95 100644 --- a/etl/scripts-ccao-data-warehouse-us-east-1/ccao/ccao-legacy.R +++ b/etl/scripts-ccao-data-warehouse-us-east-1/ccao/ccao-legacy.R @@ -99,6 +99,7 @@ cc_dli_senfrr <- map_dfr(files_cc_dli_senfrr$Key, \(f) { # Write the files to S3, partitioned by year cc_dli_senfrr %>% + mutate(loaded_at = as.character(Sys.time())) %>% group_by(year) %>% arrow::write_dataset( path = file.path(output_bucket, "cc_dli_senfrr"), @@ -180,6 +181,7 @@ cc_pifdb_piexemptre_sted <- map_dfr(files_cc_pifdb_piexemptre_sted$Key, \(f) { # Write the files to S3, partitioned by year cc_pifdb_piexemptre_sted %>% + mutate(loaded_at = as.character(Sys.time())) %>% group_by(year) %>% arrow::write_dataset( path = file.path( @@ -253,6 +255,7 @@ cc_pifdb_piexemptre_dise <- map_dfr(files_cc_pifdb_piexemptre_dise$Key, \(f) { # Write the files to S3, partitioned by year cc_pifdb_piexemptre_dise %>% + mutate(loaded_at = as.character(Sys.time())) %>% group_by(year) %>% arrow::write_dataset( path = file.path( @@ -341,6 +344,7 @@ cc_pifdb_piexemptre_ownr <- map_dfr(files_cc_pifdb_piexemptre_ownr$Key, \(f) { # Write the files to S3, partitioned by year cc_pifdb_piexemptre_ownr %>% + mutate(loaded_at = as.character(Sys.time())) %>% group_by(year) %>% arrow::write_dataset( path = file.path( diff --git a/etl/scripts-ccao-data-warehouse-us-east-1/ccao/ccao-other-hie.R b/etl/scripts-ccao-data-warehouse-us-east-1/ccao/ccao-other-hie.R index 0b01ab1a5..e0036ade3 100644 --- a/etl/scripts-ccao-data-warehouse-us-east-1/ccao/ccao-other-hie.R +++ b/etl/scripts-ccao-data-warehouse-us-east-1/ccao/ccao-other-hie.R @@ -1,104 +1,105 @@ -library(arrow) -library(aws.s3) -library(DBI) -library(dplyr) -library(lubridate) -library(odbc) -library(purrr) -library(stringr) -library(tidyr) -source("utils.R") - -# This script retrieves and cleans home improvement exemption data stored in -# the CCAO's legacy AS/400 system -AWS_S3_WAREHOUSE_BUCKET <- Sys.getenv("AWS_S3_WAREHOUSE_BUCKET") -output_bucket <- file.path(AWS_S3_WAREHOUSE_BUCKET, "ccao", "other", "hie") - -# Connect to legacy CCAO SQL server -CCAODATA <- dbConnect( - odbc(), - .connection_string = Sys.getenv("DB_CONFIG_CCAODATA") -) - -# Grab all legacy HIE data from the ADDCHARS table -hie <- DBI::dbGetQuery( - CCAODATA, - " - SELECT - QU_PIN, - QU_TOWN, - QU_MLT_CD, - QU_HOME_IMPROVEMENT, - QU_USE, - QU_EXTERIOR_WALL, - QU_ROOF, - QU_BASEMENT_TYPE, - QU_BASEMENT_FINISH, - QU_HEAT, - QU_AIR, - QU_ATTIC_TYPE, - QU_ATTIC_FINISH, - QU_TYPE_PLAN, - QU_TYPE_DESIGN, - QU_CONSTRUCT_QUALITY, - QU_PORCH, - QU_GARAGE_SIZE, - QU_GARAGE_CONST, - QU_GARAGE_ATTACHED, - QU_GARAGE_AREA, - QU_NUM_APTS, - QU_SQFT_BLD, - QU_LND_SQFT, - QU_CLASS, - QU_ROOMS, - QU_BEDS, - QU_FULL_BATH, - QU_HALF_BATH, - QU_FIRE_PLACE, - QU_NO__COM_UNIT, - QU_TYPE_OF_RES, - QU_UPLOAD_DATE, - TAX_YEAR - FROM ADDCHARS - WHERE QU_HOME_IMPROVEMENT = 1 - " -) - -# Clean up raw ADDCHARS data -hie_clean <- hie %>% - mutate( - QU_CLASS = as.numeric(stringr::str_sub(QU_CLASS, 1, 3)), - QU_PIN = str_pad(QU_PIN, 14, "left", "0"), - hie_last_year_active = map_chr( - ccao::chars_288_active(TAX_YEAR, as.character(QU_TOWN)), - ~ tail(.x, n = 1) - ), - QU_NO__COM_UNIT = as.numeric(QU_NO__COM_UNIT), - QU_NO__COM_UNIT = replace_na(QU_NO__COM_UNIT, 0), - across( - c(QU_TOWN:QU_NUM_APTS, QU_CLASS, QU_TYPE_OF_RES, TAX_YEAR), - as.character - ), - across(everything(), na_if, " "), - QU_CLASS = na_if(QU_CLASS, "0"), - # Convert upload date to date format and if missing, set as the earliest - # date for the year - QU_UPLOAD_DATE = lubridate::ymd(QU_UPLOAD_DATE), - QU_UPLOAD_DATE = lubridate::as_date(ifelse( - is.na(QU_UPLOAD_DATE), - lubridate::make_date(as.numeric(TAX_YEAR), 1, 1), - QU_UPLOAD_DATE - )), - ) %>% - rename_with(tolower) %>% - rename(pin = qu_pin, year = tax_year, qu_no_com_unit = qu_no__com_unit) - -# Save HIE data to warehouse, partitioned by year -hie_clean %>% - group_by(year) %>% - arrow::write_dataset( - path = output_bucket, - format = "parquet", - hive_style = TRUE, - compression = "snappy" - ) +library(arrow) +library(aws.s3) +library(DBI) +library(dplyr) +library(lubridate) +library(odbc) +library(purrr) +library(stringr) +library(tidyr) +source("utils.R") + +# This script retrieves and cleans home improvement exemption data stored in +# the CCAO's legacy AS/400 system +AWS_S3_WAREHOUSE_BUCKET <- Sys.getenv("AWS_S3_WAREHOUSE_BUCKET") +output_bucket <- file.path(AWS_S3_WAREHOUSE_BUCKET, "ccao", "other", "hie") + +# Connect to legacy CCAO SQL server +CCAODATA <- dbConnect( + odbc(), + .connection_string = Sys.getenv("DB_CONFIG_CCAODATA") +) + +# Grab all legacy HIE data from the ADDCHARS table +hie <- DBI::dbGetQuery( + CCAODATA, + " + SELECT + QU_PIN, + QU_TOWN, + QU_MLT_CD, + QU_HOME_IMPROVEMENT, + QU_USE, + QU_EXTERIOR_WALL, + QU_ROOF, + QU_BASEMENT_TYPE, + QU_BASEMENT_FINISH, + QU_HEAT, + QU_AIR, + QU_ATTIC_TYPE, + QU_ATTIC_FINISH, + QU_TYPE_PLAN, + QU_TYPE_DESIGN, + QU_CONSTRUCT_QUALITY, + QU_PORCH, + QU_GARAGE_SIZE, + QU_GARAGE_CONST, + QU_GARAGE_ATTACHED, + QU_GARAGE_AREA, + QU_NUM_APTS, + QU_SQFT_BLD, + QU_LND_SQFT, + QU_CLASS, + QU_ROOMS, + QU_BEDS, + QU_FULL_BATH, + QU_HALF_BATH, + QU_FIRE_PLACE, + QU_NO__COM_UNIT, + QU_TYPE_OF_RES, + QU_UPLOAD_DATE, + TAX_YEAR + FROM ADDCHARS + WHERE QU_HOME_IMPROVEMENT = 1 + " +) + +# Clean up raw ADDCHARS data +hie_clean <- hie %>% + mutate( + QU_CLASS = as.numeric(stringr::str_sub(QU_CLASS, 1, 3)), + QU_PIN = str_pad(QU_PIN, 14, "left", "0"), + hie_last_year_active = map_chr( + ccao::chars_288_active(TAX_YEAR, as.character(QU_TOWN)), + ~ tail(.x, n = 1) + ), + QU_NO__COM_UNIT = as.numeric(QU_NO__COM_UNIT), + QU_NO__COM_UNIT = replace_na(QU_NO__COM_UNIT, 0), + across( + c(QU_TOWN:QU_NUM_APTS, QU_CLASS, QU_TYPE_OF_RES, TAX_YEAR), + as.character + ), + across(where(is.character), na_if, " "), + QU_CLASS = na_if(QU_CLASS, "0"), + # Convert upload date to date format and if missing, set as the earliest + # date for the year + QU_UPLOAD_DATE = lubridate::ymd(QU_UPLOAD_DATE), + QU_UPLOAD_DATE = lubridate::as_date(ifelse( + is.na(QU_UPLOAD_DATE), + lubridate::make_date(as.numeric(TAX_YEAR), 1, 1), + QU_UPLOAD_DATE + )), + ) %>% + rename_with(tolower) %>% + rename(pin = qu_pin, year = tax_year, qu_no_com_unit = qu_no__com_unit) + +# Save HIE data to warehouse, partitioned by year +hie_clean %>% + mutate(loaded_at = as.character(Sys.time())) %>% + group_by(year) %>% + arrow::write_dataset( + path = output_bucket, + format = "parquet", + hive_style = TRUE, + compression = "snappy" + ) diff --git a/etl/scripts-ccao-data-warehouse-us-east-1/census/census-acs.R b/etl/scripts-ccao-data-warehouse-us-east-1/census/census-acs.R index a17ba3bcf..fa467133c 100644 --- a/etl/scripts-ccao-data-warehouse-us-east-1/census/census-acs.R +++ b/etl/scripts-ccao-data-warehouse-us-east-1/census/census-acs.R @@ -133,10 +133,11 @@ pull_and_write_acs <- function( )) %>% rename(any_of(c("GEOID" = "GEOID...1"))) %>% select(-starts_with("GEOID..."), -starts_with("NAME")) %>% - filter(!str_detect(GEOID, "Z")) + filter(!str_detect(GEOID, "Z")) %>% + mutate(loaded_at = as.character(Sys.time())) # Write to S3 - arrow::write_parquet(df, remote_file) + write_parquet(df, remote_file) } } diff --git a/etl/scripts-ccao-data-warehouse-us-east-1/census/census-decennial.R b/etl/scripts-ccao-data-warehouse-us-east-1/census/census-decennial.R index 50977f0ba..7825c7c70 100644 --- a/etl/scripts-ccao-data-warehouse-us-east-1/census/census-decennial.R +++ b/etl/scripts-ccao-data-warehouse-us-east-1/census/census-decennial.R @@ -110,7 +110,8 @@ pull_and_write_dec <- function(s3_bucket_uri, survey, folder, geography, year) { cache_table = TRUE ) %>% select(-NAME) %>% - rename_with(~ rename_to_2020(.x, year), .cols = !GEOID) + rename_with(~ rename_to_2020(.x, year), .cols = !GEOID) %>% + mutate(loaded_at = as.character(Sys.time())) # Write to S3 arrow::write_parquet(df, remote_file) diff --git a/etl/scripts-ccao-data-warehouse-us-east-1/census/census-dictionary.R b/etl/scripts-ccao-data-warehouse-us-east-1/census/census-dictionary.R index a1e3f46f0..dbc5a16e6 100644 --- a/etl/scripts-ccao-data-warehouse-us-east-1/census/census-dictionary.R +++ b/etl/scripts-ccao-data-warehouse-us-east-1/census/census-dictionary.R @@ -61,6 +61,7 @@ census_dec_tables <- # Combine table defs and write to dataset census_tables <- bind_rows(census_acs_tables_df, census_dec_tables) %>% + mutate(loaded_at = as.character(Sys.time())) %>% group_by(survey) %>% select(variable_table_code, variable_table_title, survey) remote_path_tables <- file.path(output_bucket, "table_dict") @@ -130,6 +131,7 @@ census_dec_vars <- load_variables(2020, "pl", cache = TRUE) %>% # Combine ACS and decennial census_vars_merged <- bind_rows(census_vars, census_dec_vars) %>% + mutate(loaded_at = as.character(Sys.time())) %>% group_by(survey) # Write final data to S3 diff --git a/etl/scripts-ccao-data-warehouse-us-east-1/environment/environment-airport_noise.R b/etl/scripts-ccao-data-warehouse-us-east-1/environment/environment-airport_noise.R index 3dd104e15..8508cbca8 100644 --- a/etl/scripts-ccao-data-warehouse-us-east-1/environment/environment-airport_noise.R +++ b/etl/scripts-ccao-data-warehouse-us-east-1/environment/environment-airport_noise.R @@ -3,6 +3,7 @@ library(aws.s3) library(dplyr) library(purrr) library(sf) +library(sfarrow) library(stars) library(stringr) library(tidyr) @@ -40,7 +41,7 @@ merge_pins_with_raster <- function(raw_file) { message("Now processing:", year) rast <- stars::read_stars(tmp_file) - pins <- read_sf_dataset(arrow::open_dataset(paste0( + pins <- sfarrow::read_sf_dataset(arrow::open_dataset(paste0( "s3://ccao-data-warehouse-us-east-1/spatial/parcel/year=", year ))) %>% @@ -55,6 +56,7 @@ merge_pins_with_raster <- function(raw_file) { select(pin10, airport_noise_dnl) %>% mutate(airport_noise_dnl = replace_na(airport_noise_dnl, 52.5)) %>% st_drop_geometry() %>% + mutate(loaded_at = as.character(Sys.time())) %>% write_parquet( file.path(output_bucket, paste0("year=", year), "part-0.parquet") ) @@ -84,6 +86,7 @@ pins %>% select(pin10, airport_noise_dnl) %>% mutate(airport_noise_dnl = replace_na(airport_noise_dnl, 52.5)) %>% st_drop_geometry() %>% + mutate(loaded_at = as.character(Sys.time())) %>% write_parquet( file.path(output_bucket, paste0("year=omp"), "part-0.parquet") ) diff --git a/etl/scripts-ccao-data-warehouse-us-east-1/housing/housing-ari.py b/etl/scripts-ccao-data-warehouse-us-east-1/housing/housing-ari.py index 576335ec7..bdefd7181 100644 --- a/etl/scripts-ccao-data-warehouse-us-east-1/housing/housing-ari.py +++ b/etl/scripts-ccao-data-warehouse-us-east-1/housing/housing-ari.py @@ -1,5 +1,6 @@ import os import tempfile +from datetime import datetime import boto3 import pandas as pd @@ -33,6 +34,7 @@ temp_file.close() # Upload the Parquet file to S3 +data["loaded_at"] = str(datetime.now()) data.to_parquet( os.path.join( os.environ["AWS_S3_WAREHOUSE_BUCKET"], diff --git a/etl/scripts-ccao-data-warehouse-us-east-1/housing/housing-dci.py b/etl/scripts-ccao-data-warehouse-us-east-1/housing/housing-dci.py index e62d1c0df..fb8ab7492 100644 --- a/etl/scripts-ccao-data-warehouse-us-east-1/housing/housing-dci.py +++ b/etl/scripts-ccao-data-warehouse-us-east-1/housing/housing-dci.py @@ -1,5 +1,6 @@ import os import tempfile +from datetime import datetime import boto3 import pandas as pd @@ -39,6 +40,8 @@ ) data["geoid"] = data["geoid"].astype(str) +# Upload the Parquet file to S3 +data["loaded_at"] = str(datetime.now()) data.to_parquet( os.path.join( os.environ["AWS_S3_WAREHOUSE_BUCKET"], diff --git a/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-access-grocery_store.R b/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-access-grocery_store.R index 4fdefd13c..8c07460ae 100644 --- a/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-access-grocery_store.R +++ b/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-access-grocery_store.R @@ -54,6 +54,6 @@ for (year in years) { geometry_3435 = st_transform(geometry, 3435) ) %>% select(osm_id, name, category = shop, geometry, geometry_3435) %>% - geoarrow::write_geoparquet(remote_file) + geoparquet_to_s3(remote_file) } } diff --git a/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-access.R b/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-access.R index 5462fb404..b4b26687e 100644 --- a/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-access.R +++ b/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-access.R @@ -42,7 +42,7 @@ if (!aws.s3::object_exists(remote_file_bike_warehouse)) { trail_width = trailwdth, trail_type = trailtype, trail_surface = trailsurfa ) %>% - geoarrow::write_geoparquet(remote_file_bike_warehouse) + geoparquet_to_s3(remote_file_bike_warehouse) } @@ -68,7 +68,7 @@ if (!aws.s3::object_exists(remote_file_ceme_warehouse)) { name = cfname, address, gniscode, source, community, comment, mergeid, geometry, geometry_3435 ) %>% - geoarrow::write_geoparquet(remote_file_ceme_warehouse) + geoparquet_to_s3(remote_file_ceme_warehouse) } @@ -94,7 +94,7 @@ if (!aws.s3::object_exists(remote_file_hosp_warehouse)) { name = cfname, address, gniscode, source, community, comment, mergeid, geometry, geometry_3435 ) %>% - geoarrow::write_geoparquet(remote_file_hosp_warehouse) + geoparquet_to_s3(remote_file_hosp_warehouse) } @@ -119,7 +119,7 @@ walk(remote_files_park_warehouse, function(x) { add_osm_feature(key = "leisure", value = "park") %>% osmdata_sf() - cook_boundary <- st_read_parquet( + cook_boundary <- geoarrow::read_geoparquet_sf( file.path( AWS_S3_WAREHOUSE_BUCKET, "spatial/ccao/county/2019.parquet" @@ -141,7 +141,7 @@ walk(remote_files_park_warehouse, function(x) { )) ) - geoarrow::write_geoparquet(parks_df, x, compression = "snappy") + geoparquet_to_s3(parks_df, x) } }) @@ -171,7 +171,7 @@ if (!aws.s3::object_exists(remote_file_indc_warehouse)) { num = no, hud_qualif, acres, geometry, geometry_3435 ) %>% - geoarrow::write_geoparquet(remote_file_indc_warehouse) + geoparquet_to_s3(remote_file_indc_warehouse) } ##### WALKABILITY ##### @@ -189,18 +189,17 @@ if (!aws.s3::object_exists(remote_file_walk_warehouse)) { tmp_file_walk <- tempfile(fileext = ".geojson") aws.s3::save_object(remote_file_walk_raw, file = tmp_file_walk) - temp <- st_read(tmp_file_walk) %>% + st_read(tmp_file_walk) %>% st_transform(4326) %>% + rename( + walkability_rating = Walkabilit, + amenities_score = Amenities, + transitaccess = TransitAcc + ) %>% rename_with(tolower) %>% rename_with(~ gsub("sc$|sco|scor|score", "_score", .x)) %>% rename_with(~"walk_num", contains("subzone")) %>% - rename( - walkability_rating = walkabilit, - amenities_score = amenities, - transitaccess = transitacc - ) %>% standardize_expand_geo() %>% select(-contains("shape")) %>% - mutate(year = "2017") %>% - geoarrow::write_geoparquet(remote_file_walk_warehouse) + geoparquet_to_s3(remote_file_walk_warehouse) } diff --git a/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-building_footprint.R b/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-building_footprint.R index b7de255e9..1d4cde9db 100644 --- a/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-building_footprint.R +++ b/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-building_footprint.R @@ -56,10 +56,9 @@ if (!aws.s3::object_exists(esri_chicago_remote)) { lon = X, lat = Y, x_3435 = X.1, y_3435 = Y.1, geometry, geometry_3435 ) - write_geoparquet( + geoparquet_to_s3( esri_chicago_df_clean, - esri_chicago_remote, - compression = "snappy" + esri_chicago_remote ) } @@ -94,7 +93,7 @@ if (!aws.s3::object_exists(esri_sub_remote)) { lon = X, lat = Y, x_3435 = X.1, y_3435 = Y.1, geometry, geometry_3435 ) - write_geoparquet(esri_sub_df_clean, esri_sub_remote, compression = "snappy") + geoparquet_to_s3(esri_sub_df_clean, esri_sub_remote) } @@ -126,7 +125,7 @@ if (!aws.s3::object_exists(osm_remote)) { lon = X, lat = Y, x_3435 = X.1, y_3435 = Y.1, geometry, geometry_3435 ) - write_geoparquet(osm_df_clean, osm_remote, compression = "snappy") + geoparquet_to_s3(osm_df_clean, osm_remote) } @@ -182,5 +181,5 @@ if (!aws.s3::object_exists(ms_remote)) { lon = X, lat = Y, x_3435 = X.1, y_3435 = Y.1, geometry, geometry_3435 ) - write_geoparquet(ms_df_clean_cook_only, ms_remote, compression = "snappy") + geoparquet_to_s3(ms_df_clean_cook_only, ms_remote) } diff --git a/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-ccao-corner.R b/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-ccao-corner.R index 74a85e414..fbd21add0 100644 --- a/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-ccao-corner.R +++ b/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-ccao-corner.R @@ -237,7 +237,7 @@ for (iter_year in parcel_years) { select(pin10, id) %>% inner_join(cross_final, by = "id") %>% select(-id) %>% - write_geoparquet(remote_file) + geoparquet_to_s3(remote_file) } tictoc::toc() } diff --git a/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-ccao-county.R b/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-ccao-county.R index d7089b71c..aea2e6741 100644 --- a/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-ccao-county.R +++ b/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-ccao-county.R @@ -34,5 +34,5 @@ if (!aws.s3::object_exists(remote_file_county_warehouse)) { geometry_3435 = st_transform(geometry, 3435), ) %>% select(geometry, geometry_3435) %>% - geoarrow::write_geoparquet(remote_file_county_warehouse) + geoparquet_to_s3(remote_file_county_warehouse) } diff --git a/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-ccao-neighborhood.R b/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-ccao-neighborhood.R index aefbfb171..4ac379db0 100644 --- a/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-ccao-neighborhood.R +++ b/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-ccao-neighborhood.R @@ -196,8 +196,7 @@ for (year in 2010:2021) { township_name, township_code, triad_name, triad_code, nbhd, town_nbhd, geometry, geometry_3435 ) %>% - write_geoparquet( - file.path(output_bucket, paste0("year=", year), "part-0.parquet"), - compression = "snappy" + geoparquet_to_s3( + file.path(output_bucket, paste0("year=", year), "part-0.parquet") ) } diff --git a/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-ccao-township.R b/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-ccao-township.R index 828894745..a05845810 100644 --- a/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-ccao-township.R +++ b/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-ccao-township.R @@ -50,5 +50,5 @@ if (!aws.s3::object_exists(remote_file_town_warehouse)) { geometry_3435 = st_transform(geometry, 3435), across(township_code:triad_code, as.character) ) %>% - geoarrow::write_geoparquet(remote_file_town_warehouse) + geoparquet_to_s3(remote_file_town_warehouse) } diff --git a/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-census.R b/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-census.R index 2d56b3da9..30736ff5c 100644 --- a/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-census.R +++ b/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-census.R @@ -81,7 +81,7 @@ normalize_census_geo <- function(key) { geometry, geometry_3435 ) %>% filter(!str_detect(geoid, "Z")) %>% - write_geoparquet(remote_file, compression = "snappy") + geoparquet_to_s3(remote_file) } } diff --git a/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-environment-golf_course.R b/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-environment-golf_course.R index 58bf06ffd..7f0603aaa 100644 --- a/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-environment-golf_course.R +++ b/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-environment-golf_course.R @@ -43,5 +43,5 @@ if (!aws.s3::object_exists(remote_file_golf_course_warehouse)) { geometry_3435 = st_transform(geometry, 3435) ) %>% select(-touches) %>% - geoarrow::write_geoparquet(remote_file_golf_course_warehouse) + geoparquet_to_s3(remote_file_golf_course_warehouse) } diff --git a/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-environment-major_road.R b/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-environment-major_road.R index b45fdcb6a..d09523083 100644 --- a/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-environment-major_road.R +++ b/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-environment-major_road.R @@ -73,5 +73,5 @@ for (year in years) { paste0("major_road-", year, ".parquet") ) - geoarrow::write_geoparquet(data_to_write, output_file) + geoparquet_to_s3(data_to_write, output_file) } diff --git a/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-environment-midway_noise.R b/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-environment-midway_noise.R index fc562652d..df311a487 100644 --- a/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-environment-midway_noise.R +++ b/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-environment-midway_noise.R @@ -95,4 +95,4 @@ data.frame( geometry_3435 = st_transform(geometry, 3435), year = str_replace(year, "X", "") ) %>% - write_geoparquet(remote_file) + geoparquet_to_s3(remote_file) diff --git a/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-environment-ohare_noise.R b/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-environment-ohare_noise.R index 14d37b6bf..cefd82e66 100644 --- a/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-environment-ohare_noise.R +++ b/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-environment-ohare_noise.R @@ -60,8 +60,10 @@ names(noise_levels) <- columns # Clean NAs and remove inactive sites noise_levels <- noise_levels %>% - na_if("n/a") %>% - na_if("--") %>% + mutate( + across(where(is.character), ~ na_if(.x, "n/a")), + across(where(is.character), ~ na_if(.x, "--")) + ) %>% filter(!Site %in% c("6", "9", "36", "45")) # Grab sensor addresses pdf @@ -126,7 +128,7 @@ remote_file <- file.path( AWS_S3_WAREHOUSE_BUCKET, "spatial", "environment", "ohare_noise_monitor", "ohare_noise_monitor.parquet" ) -write_geoparquet(noise_addresses_clean, remote_file) +geoparquet_to_s3(noise_addresses_clean, remote_file) file.remove(tmp_file) @@ -153,4 +155,4 @@ ohare_noise_contour <- st_read(tmp_file) %>% geometry_3435 = st_transform(geom, 3435) ) %>% select(airport, decibels, geometry = geom, geometry_3435) %>% - write_geoparquet(remote_file) + geoparquet_to_s3(remote_file) diff --git a/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-environment-secondary_road.R b/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-environment-secondary_road.R index 7bacf7dcb..3481a1ce1 100644 --- a/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-environment-secondary_road.R +++ b/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-environment-secondary_road.R @@ -185,5 +185,5 @@ for (year in years) { paste0("secondary_road-", year, ".parquet") ) - geoarrow::write_geoparquet(data_to_write, output_file) + geoparquet_to_s3(data_to_write, output_file) } diff --git a/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-environment.R b/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-environment.R index 69674d57a..c76880ab4 100644 --- a/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-environment.R +++ b/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-environment.R @@ -53,7 +53,7 @@ walk(2013:current_year, function(x) { geometry_3435 = st_transform(geometry, 3435) ) %>% rename_with(tolower) %>% - geoarrow::write_geoparquet(remote_file_coastline_warehouse) + geoparquet_to_s3(remote_file_coastline_warehouse) } }) @@ -61,10 +61,10 @@ walk(2013:current_year, function(x) { ##### FEMA FLOODPLAINS ##### flood_fema_raw <- file.path( - input_bucket, "flood_fema", "2023.geojson" + input_bucket, "flood_fema", "2024.geojson" ) flood_fema_warehouse <- file.path( - output_bucket, "flood_fema", "year=2023", "part-0.parquet" + output_bucket, "flood_fema", "year=2024", "part-0.parquet" ) # Write FEMA floodplains to S3 if they don't exist @@ -87,7 +87,7 @@ if ( fema_special_flood_hazard_area = SFHA_TF, geometry, geometry_3435 ) %>% - geoarrow::write_geoparquet(flood_fema_warehouse) + geoparquet_to_s3(flood_fema_warehouse) file.remove(tmp_file) } @@ -113,7 +113,7 @@ if (!aws.s3::object_exists(remote_file_rail_warehouse)) { mutate( geometry_3435 = st_transform(geometry, 3435) ) %>% - geoarrow::write_geoparquet(remote_file_rail_warehouse) + geoparquet_to_s3(remote_file_rail_warehouse) } @@ -166,7 +166,7 @@ walk(2011:current_year, function(year) { select(id = HYDROID, name = FULLNAME, hydrology_type, geometry) ) %>% mutate(geometry_3435 = st_transform(geometry, 3435)) %>% - geoarrow::write_geoparquet(remote_file_hydro_warehouse) + geoparquet_to_s3(remote_file_hydro_warehouse) file.remove(tmp_file) } diff --git a/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-environment_road.R b/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-environment_road.R index 4a098d798..6e94ccd5f 100644 --- a/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-environment_road.R +++ b/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-environment_road.R @@ -290,8 +290,12 @@ walk(parquet_files, \(file_key) { )) %>% select(-year) - output_path <- file.path(output_bucket, paste0("year=", tools::file_path_sans_ext(basename(file_key))), "part-0.parquet") - geoarrow::write_geoparquet(shapefile_data, output_path) + output_path <- file.path( + output_bucket, + paste0("year=", tools::file_path_sans_ext(basename(file_key))), + "part-0.parquet" + ) + geoparquet_to_s3(shapefile_data, output_path) print(paste(file_key, "cleaned and uploaded.")) } diff --git a/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-other.R b/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-other.R index 1c0d15472..721a488bd 100644 --- a/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-other.R +++ b/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-other.R @@ -43,7 +43,7 @@ walk(subdivisions_raw, function(shapefile_path) { filter(st_is_valid(geometry) & !is.na(pagesubref)) %>% mutate(geometry_3435 = st_transform(geometry, 3435)) %>% select(pagesubref, geometry, geometry_3435) %>% - geoarrow::write_geoparquet(dest_path) + geoparquet_to_s3(dest_path) } file.remove(tmp_file) @@ -79,7 +79,7 @@ clean_comm_areas <- function(shapefile_path) { area_number = area_numbe, geometry, geometry_3435 ) %>% - geoarrow::write_geoparquet( + geoparquet_to_s3( file.path( AWS_S3_WAREHOUSE_BUCKET, "spatial", "other", "community_area", paste0("year=", str_extract(shapefile_path, "[0-9]{4}")), diff --git a/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-parcel.R b/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-parcel.R index 6153e8d3d..a61ff8ce2 100644 --- a/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-parcel.R +++ b/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-parcel.R @@ -387,7 +387,7 @@ process_parcel_file <- function(s3_bucket_uri, } # Write local backup copy - write_geoparquet(spatial_df_final, local_backup_file) + geoparquet_to_s3(spatial_df_final, local_backup_file) tictoc::toc() } else { message("Loading processed parcels from backup for: ", file_year) diff --git a/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-transit.R b/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-transit.R index 7c6ad7812..8efd3c1c7 100644 --- a/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-transit.R +++ b/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-transit.R @@ -70,7 +70,7 @@ process_gtfs_feed <- function(s3_bucket_uri, date, year, agency, feed_url) { any_of(c("location_type", "parent_station", "wheelchair_boarding")), any_of(c("feed_pull_date", "geometry", "geometry_3435")) ) %>% - write_geoparquet(remote_file_stop) + geoparquet_to_s3(remote_file_stop) } # Now create route geometries and save. Skip PACE since they have no geoms @@ -101,7 +101,7 @@ process_gtfs_feed <- function(s3_bucket_uri, date, year, agency, feed_url) { route_color, route_text_color, feed_pull_date, geometry, geometry_3435 ) %>% - write_geoparquet(remote_file_route) + geoparquet_to_s3(remote_file_route) } } } @@ -117,7 +117,7 @@ pwalk(gtfs_feeds_df, function(...) { agency = df$agency, feed_url = df$raw_feed_url ) -}) +}, .progress = TRUE) # Create dictionary for GTFS numeric codes # See: https://developers.google.com/transit/gtfs/reference @@ -136,7 +136,10 @@ transit_dict <- tribble( "route_type", 12, "monorail", "Monorail. Railway in which the track consists of a single rail or a beam." ) %>% # nolint end - mutate(field_code = as.integer(field_code)) + mutate( + field_code = as.integer(field_code), + loaded_at = as.character(Sys.time()) + ) # Write dict to parquet remote_file_dict <- file.path( diff --git a/etl/utils.R b/etl/utils.R index 1b429ac04..24313d832 100644 --- a/etl/utils.R +++ b/etl/utils.R @@ -7,7 +7,7 @@ library(tools) save_s3_to_local <- function(s3_uri, path, overwrite = FALSE) { - if (!file.exists(path) | overwrite) { + if (!file.exists(path) || overwrite) { message("Saving file: ", s3_uri, " to: ", path) aws.s3::save_object(object = s3_uri, file = path) } @@ -15,14 +15,14 @@ save_s3_to_local <- function(s3_uri, path, overwrite = FALSE) { save_local_to_s3 <- function(s3_uri, path, overwrite = FALSE) { - if (!aws.s3::object_exists(s3_uri) | overwrite) { + if (!aws.s3::object_exists(s3_uri) || overwrite) { message("Saving file: ", path, "to: ", s3_uri) aws.s3::put_object( file = path, object = s3_uri, show_progress = TRUE, multipart = TRUE - ) + ) } } @@ -34,8 +34,7 @@ open_data_to_s3 <- function(s3_bucket_uri, file_year, file_ext, file_prefix = NULL, - overwrite = FALSE - ) { + overwrite = FALSE) { open_data_file <- paste0(base_url, data_url) remote_file <- file.path( s3_bucket_uri, dir_name, @@ -61,15 +60,15 @@ open_data_to_s3 <- function(s3_bucket_uri, write_partitions_to_s3 <- function(df, s3_output_path, is_spatial = TRUE, - overwrite = FALSE - ) { + overwrite = FALSE) { if (!dplyr::is.grouped_df(df)) { warning("Input data must contain grouping vars for partitioning") } + df <- df %>% mutate(loaded_at = as.character(Sys.time())) dplyr::group_walk(df, ~ { partitions_df <- purrr::map_dfr( - .y, replace_na, "__HIVE_DEFAULT_PARTITION__" + .y, tidyr::replace_na, "__HIVE_DEFAULT_PARTITION__" ) partition_path <- paste0(purrr::map2_chr( names(partitions_df), @@ -79,7 +78,7 @@ write_partitions_to_s3 <- function(df, remote_path <- file.path( s3_output_path, partition_path, "part-0.parquet" ) - if (!object_exists(remote_path) | overwrite) { + if (!object_exists(remote_path) || overwrite) { message("Now uploading: ", partition_path) tmp_file <- tempfile(fileext = ".parquet") if (is_spatial) { @@ -93,33 +92,32 @@ write_partitions_to_s3 <- function(df, } -standardize_expand_geo <- function(spatial_df, make_valid = FALSE, polygon = TRUE) { - +standardize_expand_geo <- function( + spatial_df, make_valid = FALSE, polygon = TRUE) { return( - spatial_df %>% st_transform(4326) %>% - { if (make_valid) st_make_valid(.) else .} %>% + { + if (make_valid) st_make_valid(.) else . + } %>% mutate(geometry_3435 = st_transform(geometry, 3435)) %>% - { if (polygon) { - - mutate(., centroid = st_centroid(st_transform(geometry, 3435))) %>% - cbind(., - st_coordinates(st_transform(.$centroid, 4326)), - st_coordinates(.$centroid) - ) %>% - select(!contains("centroid"), - lon = X, lat = Y, x_3435 = `X.1`, y_3435 = `Y.1`, geometry, geometry_3435) - - } else { - - select(., dplyr::everything(), geometry, geometry_3435) - - } + { + if (polygon) { + mutate(., centroid = st_centroid(st_transform(geometry, 3435))) %>% + cbind( + ., + st_coordinates(st_transform(.$centroid, 4326)), + st_coordinates(.$centroid) + ) %>% + select(!contains("centroid"), + lon = X, lat = Y, x_3435 = `X.1`, y_3435 = `Y.1`, + geometry, geometry_3435 + ) + } else { + select(., dplyr::everything(), geometry, geometry_3435) } - + } ) - } county_gdb_to_s3 <- function( @@ -127,9 +125,7 @@ county_gdb_to_s3 <- function( dir_name, file_path, layer, - overwrite = FALSE -) { - + overwrite = FALSE) { remote_file <- file.path( s3_bucket_uri, dir_name, @@ -137,30 +133,29 @@ county_gdb_to_s3 <- function( ) if (!aws.s3::object_exists(remote_file)) { - message(paste0("Reading ", basename(file_path))) if (layer %in% st_layers(file_path)$name) { - try({ - tmp_file <- tempfile(fileext = ".geojson") st_read(file_path, layer) %>% st_write(tmp_file) save_local_to_s3(remote_file, tmp_file, overwrite = overwrite) file.remove(tmp_file) cat(paste0("File successfully written to ", remote_file, "\n")) - }) - } else { - - cat(paste0("Layer '", layer, - "' not present in ", - basename(file_path), - "... skipping.\n") - ) - + cat(paste0( + "Layer '", layer, + "' not present in ", + basename(file_path), + "... skipping.\n" + )) } - } } + +geoparquet_to_s3 <- function(spatial_df, s3_uri) { + spatial_df %>% + mutate(loaded_at = as.character(Sys.time())) %>% + geoarrow::write_geoparquet(s3_uri, compression = "snappy") +}