diff --git a/pulp_ostree/app/tasks/importing.py b/pulp_ostree/app/tasks/importing.py index 3663fc27..99a3419f 100644 --- a/pulp_ostree/app/tasks/importing.py +++ b/pulp_ostree/app/tasks/importing.py @@ -1,6 +1,7 @@ import os import tarfile +from collections import defaultdict from gettext import gettext from asgiref.sync import sync_to_async @@ -10,11 +11,11 @@ ArtifactSaver, ContentSaver, DeclarativeVersion, - QueryExistingArtifacts, QueryExistingContents, ResolveContentFutures, Stage, ) +from pulpcore.plugin.sync import sync_to_async_iterable from pulp_ostree.app.models import ( OstreeCommit, @@ -371,6 +372,76 @@ async def run(self): await self.submit_metafile_object("summary", OstreeSummary()) +class QueryExistingArtifactsOstree(Stage): + """ + A Stages API stage that replaces :attr:`DeclarativeContent.content` objects with already-saved + :class:`~pulpcore.plugin.models.Artifact` objects. + + This stage expects :class:`~pulpcore.plugin.stages.DeclarativeContent` units from `self._in_q` + and inspects their associated :class:`~pulpcore.plugin.stages.DeclarativeArtifact` objects. Each + :class:`~pulpcore.plugin.stages.DeclarativeArtifact` object stores one + :class:`~pulpcore.plugin.models.Artifact`. + + This stage inspects any unsaved :class:`~pulpcore.plugin.models.Artifact` objects and searches + using their metadata for existing saved :class:`~pulpcore.plugin.models.Artifact` objects inside + Pulp with the same digest value(s). Any existing :class:`~pulpcore.plugin.models.Artifact` + objects found will replace their unsaved counterpart in the + :class:`~pulpcore.plugin.stages.DeclarativeArtifact` object. + + Each :class:`~pulpcore.plugin.stages.DeclarativeContent` is sent to `self._out_q` after all of + its :class:`~pulpcore.plugin.stages.DeclarativeArtifact` objects have been handled. + + This stage drains all available items from `self._in_q` and batches everything into one large + call to the db for efficiency. + """ + + async def run(self): + """ + The coroutine for this stage. + + Returns: + The coroutine for this stage. + """ + async for batch in self.batches(): + artifacts_digests = [] + + # For each unsaved artifact, check its digests in the order of COMMON_DIGEST_FIELDS + # and the first digest which is found is added to the list of digests of that type. + # We assume that in general only one digest is provided and that it will be + # sufficient to identify the Artifact. + for d_content in batch: + d_artifact = d_content.d_artifacts[0] + if d_artifact.artifact._state.adding: + digest_value = d_artifact.artifact.sha256 + artifacts_digests.append(digest_value) + + # For each type of digest, fetch all the existing Artifacts where digest "in" + # the list we built earlier. Walk over all the artifacts again compare the + # digest of the new artifact to those of the existing ones - if one matches, + # swap it out with the existing one. + query_params = { + "sha256__in": artifacts_digests, + "pulp_domain": self.domain, + } + + existing_artifacts_qs = Artifact.objects.filter(**query_params) + await sync_to_async(existing_artifacts_qs.touch)() + + d = {} + async for result in sync_to_async_iterable(existing_artifacts_qs): + d[result.sha256] = result + + for d_content in batch: + d_artifact = d_content.d_artifacts[0] + artifact_digest = d_artifact.artifact.sha256 + m = d.get(artifact_digest) + if m: + d_artifact.artifact = m + + for d_content in batch: + await self.put(d_content) + + class OstreeImportDeclarativeVersion(DeclarativeVersion): """A customized DeclarativeVersion class that creates a pipeline for the OSTree import.""" @@ -378,7 +449,7 @@ def pipeline_stages(self, new_version): """Build a list of stages.""" pipeline = [ self.first_stage, - QueryExistingArtifacts(), + QueryExistingArtifactsOstree(), ArtifactSaver(), QueryExistingContents(), ContentSaver(),