From 652620463b5ca91d8764fb124a530425597c5b8c Mon Sep 17 00:00:00 2001 From: Lubos Mjachky Date: Mon, 13 Nov 2023 10:53:02 +0100 Subject: [PATCH] Use a customized version of QueryExistingArtifacts In this change, the stage is comparing only sha256 digests between existing and newly added artifacts. Furthermore, the time complexity for the old comparison was O(N^2). With the attached change, we work with O(2N). closes #289 (cherry picked from commit 0e1931945b54aa322aaaab4cb171e1dded169db5) --- CHANGES/289.bugfix | 1 + pulp_ostree/app/tasks/importing.py | 41 ++++++++++++++++++++++++++++-- 2 files changed, 40 insertions(+), 2 deletions(-) create mode 100644 CHANGES/289.bugfix diff --git a/CHANGES/289.bugfix b/CHANGES/289.bugfix new file mode 100644 index 00000000..95615080 --- /dev/null +++ b/CHANGES/289.bugfix @@ -0,0 +1 @@ +Improved the performance of subsequent imports. diff --git a/pulp_ostree/app/tasks/importing.py b/pulp_ostree/app/tasks/importing.py index 3663fc27..31e89ff0 100644 --- a/pulp_ostree/app/tasks/importing.py +++ b/pulp_ostree/app/tasks/importing.py @@ -10,11 +10,11 @@ ArtifactSaver, ContentSaver, DeclarativeVersion, - QueryExistingArtifacts, QueryExistingContents, ResolveContentFutures, Stage, ) +from pulpcore.plugin.sync import sync_to_async_iterable from pulp_ostree.app.models import ( OstreeCommit, @@ -371,6 +371,43 @@ async def run(self): await self.submit_metafile_object("summary", OstreeSummary()) +class QueryExistingArtifactsOstree(Stage): + """A customized version of the QueryExistingArtifacts stage comparing just sha256 digests.""" + + async def run(self): + """Compare existing artifacts by leveraging dictionary access.""" + async for batch in self.batches(): + artifacts_digests = [] + + for d_content in batch: + d_artifact = d_content.d_artifacts[0] + if d_artifact.artifact._state.adding: + digest_value = d_artifact.artifact.sha256 + artifacts_digests.append(digest_value) + + query_params = { + "sha256__in": artifacts_digests, + "pulp_domain": self.domain, + } + + existing_artifacts_qs = Artifact.objects.filter(**query_params) + await sync_to_async(existing_artifacts_qs.touch)() + + d = {} + async for result in sync_to_async_iterable(existing_artifacts_qs): + d[result.sha256] = result + + for d_content in batch: + d_artifact = d_content.d_artifacts[0] + artifact_digest = d_artifact.artifact.sha256 + m = d.get(artifact_digest) + if m: + d_artifact.artifact = m + + for d_content in batch: + await self.put(d_content) + + class OstreeImportDeclarativeVersion(DeclarativeVersion): """A customized DeclarativeVersion class that creates a pipeline for the OSTree import.""" @@ -378,7 +415,7 @@ def pipeline_stages(self, new_version): """Build a list of stages.""" pipeline = [ self.first_stage, - QueryExistingArtifacts(), + QueryExistingArtifactsOstree(), ArtifactSaver(), QueryExistingContents(), ContentSaver(),