From 2e7bc86855b5c32bd8ebf5b626aa22602aa6f868 Mon Sep 17 00:00:00 2001 From: Arseny Teramachi Date: Fri, 24 Jan 2025 14:05:23 +0900 Subject: [PATCH] remove OOM caused by very large execution plans --- .../worksap/nlp/uzushio/lib/runners/MergeDedupStats.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/src/main/scala/com/worksap/nlp/uzushio/lib/runners/MergeDedupStats.scala b/lib/src/main/scala/com/worksap/nlp/uzushio/lib/runners/MergeDedupStats.scala index d8c720b..1d3cb7e 100644 --- a/lib/src/main/scala/com/worksap/nlp/uzushio/lib/runners/MergeDedupStats.scala +++ b/lib/src/main/scala/com/worksap/nlp/uzushio/lib/runners/MergeDedupStats.scala @@ -92,7 +92,7 @@ object MergeDedupStats { explode($"tgtAll").as("srcHash"), $"tgtMin".as("tgtHash") ) - ).where($"srcHash" =!= $"tgtHash").distinct().persist() + ).where($"srcHash" =!= $"tgtHash").distinct().localCheckpoint(eager = true) // Step 2b: collect all repr hash candidates to consider for updating // find all repr hashes which have distinct hashes @@ -108,10 +108,10 @@ object MergeDedupStats { df.join(nonUnique, "hash").select($"reprHash".as("initReprHash")) } - val seedGroups = seedGroupsA.union(seedGroupsB).distinct().select( + val seedGroups = seedGroupsA.union(seedGroupsB).distinct().localCheckpoint(eager = true).select( $"initReprHash", $"initReprHash".as("newReprHash") - ).persist() + ) // compute the correct remaps themselves iteratively // this will have false positives, but hopefully not much