Skip to content

Commit

Permalink
Implement batch processing in mosaic cache invalidation
Browse files Browse the repository at this point in the history
This commit introduces a mechanism to handle large sets of mosaic cache keys by processing them in batches. When the set size reaches a predefined maximum (MAX_SET_SIZE), the current batch is processed, and the set is cleared to make room for new entries. This change ensures that the cache invalidation job can handle large volumes of data without running into memory constraints. Additionally, the commit includes a final call to process any remaining keys after iterating through all mosaic tiles. This improvement optimizes the cache invalidation process, making it more efficient and reliable.
  • Loading branch information
dqunbp committed Apr 22, 2024
1 parent 855746d commit 9adea75
Showing 1 changed file with 44 additions and 32 deletions.
76 changes: 44 additions & 32 deletions src/mosaic_cache_invalidation_job.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ async function invalidateImage(geojson, maxzoom, presentMosaicCacheKeys) {
}

const OAM_LAYER_ID = process.env.OAM_LAYER_ID || "openaerialmap";
const MAX_SET_SIZE = 1_000_000;

async function invalidateMosaicCache() {
const cacheInfo = JSON.parse((await cacheGet("__info__.json")).toString());
Expand Down Expand Up @@ -107,45 +108,56 @@ async function invalidateMosaicCache() {
// the cache
const presentMosaicCacheKeys = new Set();
for await (const key of mosaicTilesIterable()) {
if (presentMosaicCacheKeys.size >= MAX_SET_SIZE) {
// call processBatch when the set reaches the maximum allowed Set size
await processBatch(presentMosaicCacheKeys);
presentMosaicCacheKeys.clear();
}

presentMosaicCacheKeys.add(key);
}

for await (const metadataCacheKey of metadataJsonsIterable()) {
const key = metadataCacheKey.replace("__metadata__/", "").replace(".json", "");
const image = allImages.find((image) => image.uuid.includes(key));
// if metadata for an image is present in the cache but missing in the "origin" database
// all cached mosaic tiles that contain this image need to be invalidated
// because the image itself was deleted.
if (!image) {
let bounds, maxzoom;
try {
const metadataBuffer = await cacheGet(metadataCacheKey);
if (!metadataBuffer || !metadataBuffer.length) continue;
const metadata = JSON.parse(metadataBuffer.toString());
if (!metadata) continue;
bounds = metadata.bounds;
maxzoom = metadata.maxzoom;
} catch (error) {
logger.warn(`metadata cache invalid for key ${metadataCacheKey}`);
continue; // skip invalid metadata jsons
// call processBatch for the last batch
if (presentMosaicCacheKeys.size > 0) await processBatch(presentMosaicCacheKeys);

async function processBatch(mosaicCacheKeys) {
for await (const metadataCacheKey of metadataJsonsIterable()) {
const key = metadataCacheKey.replace("__metadata__/", "").replace(".json", "");
const image = allImages.find((image) => image.uuid.includes(key));
// if metadata for an image is present in the cache but missing in the "origin" database
// all cached mosaic tiles that contain this image need to be invalidated
// because the image itself was deleted.
if (!image) {
let bounds, maxzoom;
try {
const metadataBuffer = await cacheGet(metadataCacheKey);
if (!metadataBuffer || !metadataBuffer.length) continue;
const metadata = JSON.parse(metadataBuffer.toString());
if (!metadata) continue;
bounds = metadata.bounds;
maxzoom = metadata.maxzoom;
} catch (error) {
logger.warn(`metadata cache invalid for key ${metadataCacheKey}`);
continue; // skip invalid metadata jsons
}
const geojson = geojsonGeometryFromBounds(bounds.slice(0, 2), bounds.slice(2));
await invalidateImage(geojson, maxzoom, mosaicCacheKeys);
await cacheDelete(metadataCacheKey);
}
const geojson = geojsonGeometryFromBounds(bounds.slice(0, 2), bounds.slice(2));
await invalidateImage(geojson, maxzoom, presentMosaicCacheKeys);
await cacheDelete(metadataCacheKey);
}
}

let latestUploadedAt;
if (imagesAddedSinceLastInvalidation.length > 0) {
latestUploadedAt = imagesAddedSinceLastInvalidation[0].uploaded_at;
for (const row of imagesAddedSinceLastInvalidation) {
const url = row.uuid;
const geojson = JSON.parse(row.geojson);
if (Date.parse(row.uploaded_at) > Date.parse(latestUploadedAt)) {
latestUploadedAt = row.uploaded_at;
let latestUploadedAt;
if (imagesAddedSinceLastInvalidation.length > 0) {
latestUploadedAt = imagesAddedSinceLastInvalidation[0].uploaded_at;
for (const row of imagesAddedSinceLastInvalidation) {
const url = row.uuid;
const geojson = JSON.parse(row.geojson);
if (Date.parse(row.uploaded_at) > Date.parse(latestUploadedAt)) {
latestUploadedAt = row.uploaded_at;
}
const { maxzoom } = await getGeotiffMetadata(url);
await invalidateImage(geojson, maxzoom, mosaicCacheKeys);
}
const { maxzoom } = await getGeotiffMetadata(url);
await invalidateImage(geojson, maxzoom, presentMosaicCacheKeys);
}
}

Expand Down

0 comments on commit 9adea75

Please sign in to comment.