From 34012dedb8f7fd389611d37acf00c549d2f58983 Mon Sep 17 00:00:00 2001 From: Lubos Mjachky Date: Mon, 17 Apr 2023 12:25:22 +0200 Subject: [PATCH] Sign manifests asynchronously closes #1208 --- CHANGES/1208.feature | 1 + pulp_container/app/tasks/sign.py | 69 ++++++++++++++++++++------------ 2 files changed, 45 insertions(+), 25 deletions(-) create mode 100644 CHANGES/1208.feature diff --git a/CHANGES/1208.feature b/CHANGES/1208.feature new file mode 100644 index 000000000..97bccda4a --- /dev/null +++ b/CHANGES/1208.feature @@ -0,0 +1 @@ +Started signing manifests asynchronously. This feature improves the performance of signing tasks. diff --git a/pulp_container/app/tasks/sign.py b/pulp_container/app/tasks/sign.py index 18b485dc9..6571e90c1 100644 --- a/pulp_container/app/tasks/sign.py +++ b/pulp_container/app/tasks/sign.py @@ -1,11 +1,12 @@ +import asyncio import base64 import hashlib -import os -import tempfile +from aiofiles import tempfile from django.conf import settings from pulpcore.plugin.models import Repository +from pulpcore.plugin.sync import sync_to_async_iterable, sync_to_async from pulp_container.app.models import ( ManifestSignature, @@ -19,6 +20,10 @@ SIGNATURE_TYPE, ) +SIGNING_TASKS_COUNTER = 10 + +semaphore = asyncio.Semaphore(SIGNING_TASKS_COUNTER) + def sign(repository_pk, signing_service_pk, reference, tags_list=None): """ @@ -47,26 +52,38 @@ def sign(repository_pk, signing_service_pk, reference, tags_list=None): ) else: latest_repo_content_tags = latest_version.content.filter(pulp_type=Tag.get_pulp_type()) - latest_repo_tags = Tag.objects.filter(pk__in=latest_repo_content_tags) + latest_repo_tags = Tag.objects.filter( + pk__in=latest_repo_content_tags + ).select_related("tagged_manifest") signing_service = ManifestSigningService.objects.get(pk=signing_service_pk) - added_signatures = [] - for tag in latest_repo_tags: - tagged_manifest = tag.tagged_manifest - docker_reference = ":".join((reference, tag.name)) - signature_pk = create_signature(tagged_manifest, docker_reference, signing_service) - added_signatures.append(signature_pk) - if tagged_manifest.media_type in MANIFEST_MEDIA_TYPES.LIST: - # parse ML and sign per-arches - for manifest in tagged_manifest.listed_manifests.iterator(): - signature_pk = create_signature(manifest, docker_reference, signing_service) - added_signatures.append(signature_pk) + + async def sign_manifests(): + added_signatures = [] + async for tag in sync_to_async_iterable(latest_repo_tags): + tagged_manifest = tag.tagged_manifest + docker_reference = ":".join((reference, tag.name)) + signature_pk = await create_signature( + tagged_manifest, docker_reference, signing_service + ) + added_signatures.append(signature_pk) + if tagged_manifest.media_type in MANIFEST_MEDIA_TYPES.LIST: + # parse ML and sign per-arches + added_signatures_manifests = [] + manifests_iterator = tagged_manifest.listed_manifests.iterator() + async for manifest in sync_to_async_iterable(manifests_iterator): + signature_pk = await create_signature( + manifest, docker_reference, signing_service + ) + added_signatures_manifests.append(signature_pk) + + added_signatures = asyncio.run(sign_manifests()) added_signatures_qs = ManifestSignature.objects.filter(pk__in=added_signatures) with repository.new_version() as new_version: new_version.add_content(added_signatures_qs) -def create_signature(manifest, reference, signing_service): +async def create_signature(manifest, reference, signing_service): """ Create manifest signature. @@ -81,20 +98,22 @@ def create_signature(manifest, reference, signing_service): pk of created ManifestSignature. """ - with tempfile.TemporaryDirectory(dir=".") as working_directory: + async with semaphore: # download and write file for object storage + artifact = await sync_to_async(manifest._artifacts.get)() if settings.DEFAULT_FILE_STORAGE != "pulpcore.app.models.storage.FileSystem": - manifest_file = tempfile.NamedTemporaryFile(dir=working_directory, delete=False) - artifact = manifest._artifacts.get() - manifest_file.write(artifact.file.read()) - manifest_file.flush() + async with tempfile.NamedTemporaryFile(dir=".", mode="wb", delete=False) as tf: + await tf.write(artifact.file.read()) + await tf.flush() + artifact.file.close() - manifest_path = manifest_file.name + manifest_path = tf.name else: - manifest_path = manifest._artifacts.get().file.path - sig_path = os.path.join(working_directory, "signature") + manifest_path = artifact.file.path + + sig_path = tempfile.mkstemp(dir=".", prefix="signature") - signed = signing_service.sign( + signed = await signing_service.asign( manifest_path, env_vars={"REFERENCE": reference, "SIG_PATH": sig_path} ) @@ -115,6 +134,6 @@ def create_signature(manifest, reference, signing_service): data=encoded_sig, signed_manifest=manifest, ) - signature.save() + await sync_to_async(signature.save)() return signature.pk