Skip to content

Commit

Permalink
Use a customized version of QueryExistingArtifacts
Browse files Browse the repository at this point in the history
In this change, the stage is comparing only sha256 digests between
existing and newly added artifacts. Furthermore, the time complexity
for the old comparison was O(N^2). With the change, we work with O(2N).

[noissue]
  • Loading branch information
lubosmj committed Nov 13, 2023
1 parent fd13b5e commit 7e39af1
Showing 1 changed file with 73 additions and 2 deletions.
75 changes: 73 additions & 2 deletions pulp_ostree/app/tasks/importing.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import tarfile

from collections import defaultdict
from gettext import gettext

from asgiref.sync import sync_to_async
Expand All @@ -10,11 +11,11 @@
ArtifactSaver,
ContentSaver,
DeclarativeVersion,
QueryExistingArtifacts,
QueryExistingContents,
ResolveContentFutures,
Stage,
)
from pulpcore.plugin.sync import sync_to_async_iterable

from pulp_ostree.app.models import (
OstreeCommit,
Expand Down Expand Up @@ -371,14 +372,84 @@ async def run(self):
await self.submit_metafile_object("summary", OstreeSummary())


class QueryExistingArtifactsOstree(Stage):
"""
A Stages API stage that replaces :attr:`DeclarativeContent.content` objects with already-saved
:class:`~pulpcore.plugin.models.Artifact` objects.
This stage expects :class:`~pulpcore.plugin.stages.DeclarativeContent` units from `self._in_q`
and inspects their associated :class:`~pulpcore.plugin.stages.DeclarativeArtifact` objects. Each
:class:`~pulpcore.plugin.stages.DeclarativeArtifact` object stores one
:class:`~pulpcore.plugin.models.Artifact`.
This stage inspects any unsaved :class:`~pulpcore.plugin.models.Artifact` objects and searches
using their metadata for existing saved :class:`~pulpcore.plugin.models.Artifact` objects inside
Pulp with the same digest value(s). Any existing :class:`~pulpcore.plugin.models.Artifact`
objects found will replace their unsaved counterpart in the
:class:`~pulpcore.plugin.stages.DeclarativeArtifact` object.
Each :class:`~pulpcore.plugin.stages.DeclarativeContent` is sent to `self._out_q` after all of
its :class:`~pulpcore.plugin.stages.DeclarativeArtifact` objects have been handled.
This stage drains all available items from `self._in_q` and batches everything into one large
call to the db for efficiency.
"""

async def run(self):
"""
The coroutine for this stage.
Returns:
The coroutine for this stage.
"""
async for batch in self.batches():
artifacts_digests = []

# For each unsaved artifact, check its digests in the order of COMMON_DIGEST_FIELDS
# and the first digest which is found is added to the list of digests of that type.
# We assume that in general only one digest is provided and that it will be
# sufficient to identify the Artifact.
for d_content in batch:
d_artifact = d_content.d_artifacts[0]
if d_artifact.artifact._state.adding:
digest_value = d_artifact.artifact.sha256
artifacts_digests.append(digest_value)

# For each type of digest, fetch all the existing Artifacts where digest "in"
# the list we built earlier. Walk over all the artifacts again compare the
# digest of the new artifact to those of the existing ones - if one matches,
# swap it out with the existing one.
query_params = {
"sha256__in": artifacts_digests,
"pulp_domain": self.domain,
}

existing_artifacts_qs = Artifact.objects.filter(**query_params)
await sync_to_async(existing_artifacts_qs.touch)()

d = {}
async for result in sync_to_async_iterable(existing_artifacts_qs):
d[result.sha256] = result

for d_content in batch:
d_artifact = d_content.d_artifacts[0]
artifact_digest = d_artifact.artifact.sha256
m = d.get(artifact_digest)
if m:
d_artifact.artifact = m

for d_content in batch:
await self.put(d_content)


class OstreeImportDeclarativeVersion(DeclarativeVersion):
"""A customized DeclarativeVersion class that creates a pipeline for the OSTree import."""

def pipeline_stages(self, new_version):
"""Build a list of stages."""
pipeline = [
self.first_stage,
QueryExistingArtifacts(),
QueryExistingArtifactsOstree(),
ArtifactSaver(),
QueryExistingContents(),
ContentSaver(),
Expand Down

0 comments on commit 7e39af1

Please sign in to comment.