Skip to content

Commit

Permalink
Refactor DebUpdateReleaseFileAttributes and add unit tests
Browse files Browse the repository at this point in the history
[noissue]
  • Loading branch information
hstct committed Jan 20, 2025
1 parent cb489f6 commit 39e878f
Show file tree
Hide file tree
Showing 2 changed files with 389 additions and 111 deletions.
280 changes: 169 additions & 111 deletions pulp_deb/app/tasks/synchronizing.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from collections import defaultdict
from tempfile import NamedTemporaryFile
from debian import deb822
from urllib.parse import quote, urlparse, urlunparse, urljoin
from urllib.parse import quote, urlparse, urlunparse
from django.conf import settings
from django.db.utils import IntegrityError

Expand Down Expand Up @@ -351,133 +351,117 @@ def __init__(self, remote, *args, **kwargs):
super().__init__(*args, **kwargs)
self.remote = remote
self.gpgkey = remote.gpgkey
self.gpg = None
if self.gpgkey:
gnupghome = os.path.join(os.getcwd(), "gpg-home")
os.makedirs(gnupghome)
os.makedirs(gnupghome, exist_ok=True)
self.gpg = gnupg.GPG(gpgbinary="/usr/bin/gpg", gnupghome=gnupghome)
import_res = self.gpg.import_keys(self.gpgkey)
if import_res.count == 0:
log.warning(_("Key import failed."))
pass

async def run(self):
"""
Parse ReleaseFile content units.
Update release content with information obtained from its artifact.
Parse ReleaseFile content units, verify GPG if needed, and update attributes.
"""
dropped_count = 0
async with ProgressReport(
message="Update ReleaseFile units", code="update.release_file"
) as pb:
async for d_content in self.items():
if isinstance(d_content.content, ReleaseFile):
release_file = d_content.content
da_names = {
os.path.basename(da.relative_path): da for da in d_content.d_artifacts
}
if "Release" in da_names:
if "Release.gpg" in da_names:
if self.gpgkey:
with NamedTemporaryFile() as tmp_file:
tmp_file.write(da_names["Release"].artifact.file.read())
tmp_file.flush()
verified = self.gpg.verify_file(
da_names["Release.gpg"].artifact.file, tmp_file.name
)
if verified.valid:
log.info(_("Verification of Release successful."))
release_file_artifact = da_names["Release"].artifact
release_file.relative_path = da_names["Release"].relative_path
else:
log.warning(_("Verification of Release failed. Dropping it."))
d_content.d_artifacts.remove(da_names.pop("Release"))
d_content.d_artifacts.remove(da_names.pop("Release.gpg"))
dropped_count += 1
else:
release_file_artifact = da_names["Release"].artifact
release_file.relative_path = da_names["Release"].relative_path
else:
if self.gpgkey:
d_content.d_artifacts.remove(da_names["Release"])
else:
release_file_artifact = da_names["Release"].artifact
release_file.relative_path = da_names["Release"].relative_path
else:
if "Release.gpg" in da_names:
# No need to keep the signature without "Release"
d_content.d_artifacts.remove(da_names.pop("Release.gpg"))

if "InRelease" in da_names:
if self.gpgkey:
verified = self.gpg.verify_file(da_names["InRelease"].artifact.file)
if verified.valid:
log.info(_("Verification of InRelease successful."))
release_file_artifact = da_names["InRelease"].artifact
release_file.relative_path = da_names["InRelease"].relative_path
else:
log.warning(_("Verification of InRelease failed. Dropping it."))
d_content.d_artifacts.remove(da_names.pop("InRelease"))
dropped_count += 1
else:
release_file_artifact = da_names["InRelease"].artifact
release_file.relative_path = da_names["InRelease"].relative_path
release_file = d_content.content
if not isinstance(release_file, ReleaseFile):
await self.put(d_content)
continue

if not d_content.d_artifacts:
# No (proper) artifacts left -> distribution not found
release_file_url = urljoin(self.remote.url, release_file.relative_path)
if dropped_count > 0:
raise NoValidSignatureForKey(url=release_file_url)
else:
raise NoReleaseFile(url=release_file_url)

release_file.sha256 = release_file_artifact.sha256
release_file.artifact_set_sha256 = _get_artifact_set_sha256(
d_content, ReleaseFile.SUPPORTED_ARTIFACTS
)
release_file_dict = deb822.Release(release_file_artifact.file)
if "codename" in release_file_dict:
release_file.codename = release_file_dict["Codename"]
if "suite" in release_file_dict:
release_file.suite = release_file_dict["Suite"]

if "components" in release_file_dict:
release_file.components = release_file_dict["Components"]
elif "component" in release_file_dict and settings.PERMISSIVE_SYNC:
release_file.components = release_file_dict["Component"]
elif release_file.distribution[-1] == "/":
message = (
"The Release file for distribution '{}' contains no 'Components' "
"field, but since we are dealing with a flat repo, we can continue "
"regardless."
)
log.warning(_(message).format(release_file.distribution))
# TODO: Consider not setting the field at all (requires migrations).
release_file.components = ""
else:
raise MissingReleaseFileField(release_file.distribution, "Components")

if "architectures" in release_file_dict:
release_file.architectures = release_file_dict["Architectures"]
elif "architecture" in release_file_dict and settings.PERMISSIVE_SYNC:
release_file.architectures = release_file_dict["Architecture"]
elif release_file.distribution[-1] == "/":
message = (
"The Release file for distribution '{}' contains no 'Architectures' "
"field, but since we are dealing with a flat repo, we can extract them "
"from the repos single Package index later."
)
log.warning(_(message).format(release_file.distribution))
release_file.architectures = ""
else:
raise MissingReleaseFileField(release_file.distribution, "Architectures")

log.debug(_("Codename: {}").format(release_file.codename))
log.debug(_("Components: {}").format(release_file.components))
log.debug(_("Architectures: {}").format(release_file.architectures))
await pb.aincrement()
await self.process_release_file_d_content(d_content, pb)
await self.put(d_content)

async def process_release_file_d_content(self, d_content, pb):
"""
Orchestrates the steps for a single ReleaseFile item.
"""
release_da, release_gpg_da, inrelease_da = _collect_release_artifacts(d_content)

release_artifact = None
if self.gpg:
release_artifact = self.verify_gpg_artifacts(
d_content, release_da, release_gpg_da, inrelease_da
)
else:
# If no gpgkey, pick main artifact in an order: InRelease => Release => None
if inrelease_da:
release_artifact = inrelease_da.artifact
d_content.content.relative_path = inrelease_da.relative_path
elif release_da:
release_artifact = release_da.artifact
d_content.content.relative_path = release_da.relative_path

# If there isn't a valid release
if not release_artifact:
if release_da in d_content.d_artifacts:
d_content.d_artifacts.remove(release_da)
if release_gpg_da in d_content.d_artifacts:
d_content.d_artifacts.remove(release_gpg_da)
if inrelease_da in d_content.d_artifacts:
d_content.d_artifacts.remove(inrelease_da)
raise NoReleaseFile(url=os.path.join(self.remote.url, d_content.content.relative_path))

d_content.content.sha256 = release_artifact.sha256
d_content.content.artifact_set_sha256 = _get_artifact_set_sha256(
d_content, ReleaseFile.SUPPORTED_ARTIFACTS
)
_parse_release_file_attributes(d_content, release_artifact)
await pb.aincrement()

def verify_gpg_artifacts(self, d_content, release_da, release_gpg_da, inrelease_da):
"""
Handle GPG verification. Returns the main artifact or raises and exception.
"""
if inrelease_da:
if self.verify_single_file(inrelease_da.artifact):
d_content.content.relative_path = inrelease_da.relative_path
return inrelease_da.artifact
else:
log.warning(_("Verification of InRelease failed. Removing it."))
d_content.d_artifacts.remove(inrelease_da)

if release_da and release_gpg_da:
if self.verify_detached_signature(release_da.artifact, release_gpg_da.artifact):
d_content.content.relative_path = release_da.relative_path
return release_da.artifact
else:
log.warning(_("Verification of Release + Release.gpg failed. Removing it."))
d_content.d_artifacts.remove(release_da)
d_content.d_artifacts.remove(release_gpg_da)
elif release_da:
log.warning(_("Release found bu no signature and gpgkey was provided. Removing it."))
d_content.d_artifacts.remove(release_da)

raise NoValidSignatureForKey(url=os.path.join(self.remote.url, "Release"))

def verify_single_file(self, artifact):
"""
Attempt to verify an inline-signed file (InRelease).
Returns True if valid, False otherwise.
"""
with artifact.file.open("rb") as inrelease_fh:
verified = self.gpg.verify_file(inrelease_fh)
return bool(verified.valid)

def verify_detached_signature(self, release_artifact, release_gpg_artifact):
"""
Attempt to verify a detached signature with "Release" and "Release.gpg".
Return True if valid, False otherwise.
"""
with NamedTemporaryFile() as tmp_file:
with release_artifact.file.open("rb") as rel_fh:
tmp_file.write(rel_fh.read())
tmp_file.flush()

with release_gpg_artifact.file.open("rb") as detached_fh:
verified = self.gpg.verify_file(detached_fh, tmp_file.name)
return bool(verified.valid)


class DebUpdatePackageIndexAttributes(Stage): # TODO: Needs a new name
"""
Expand Down Expand Up @@ -1322,6 +1306,80 @@ def _save_artifact_blocking(d_artifact):
d_artifact.artifact.touch()


def _collect_release_artifacts(d_content):
"""
Return (release_da, release_gpg_da, inrelease_da) from d_content.d_artifacts.
Looks for items whose filename is exactly "Release", "Release.gpg", or "InRelease".
If not found, return None for that slot.
"""
release_da = None
release_gpg_da = None
inrelease_da = None

da_names = {os.path.basename(da.relative_path): da for da in d_content.d_artifacts}
if "Release" in da_names:
release_da = da_names["Release"]
if "Release.gpg" in da_names:
release_gpg_da = da_names["Release.gpg"]
if "InRelease" in da_names:
inrelease_da = da_names["InRelease"]

return release_da, release_gpg_da, inrelease_da


def _parse_release_file_attributes(d_content, main_artifact):
"""
Parse the contents of main_artifact as a 'Release' file and update
d_content.content.* fields accordingly (e.g. codename, suite, etc.).
"""
from debian import deb822

with main_artifact.file.open("rb") as fh:
release_file_data = fh.read()
release_dict = deb822.Release(release_file_data)

if "codename" in release_dict:
d_content.content.codename = release_dict["Codename"]
if "suite" in release_dict:
d_content.content.suite = release_dict["Suite"]
if "components" in release_dict:
d_content.content.components = release_dict["Components"]
elif "component" in release_dict and settings.PERMISSIVE_SYNC:
d_content.content.components = release_dict["Component"]
elif d_content.content.distribution[-1] == "/":
message = (
"The Release file for distribution '{}' contains no 'Components' "
"field, but since we are dealing with a flat repo, we can continue "
"regardless."
)
log.warning(_(message).format(d_content.content.distribution))
# TODO: Consider not setting the field at all (requires migrations).
d_content.content.components = ""
else:
raise MissingReleaseFileField(d_content.content.distribution, "Components")

if "architectures" in release_dict:
d_content.content.architectures = release_dict["Architectures"]
elif "architecture" in release_dict and settings.PERMISSIVE_SYNC:
d_content.content.architectures = release_dict["Architecture"]
elif d_content.content.distribution[-1] == "/":
message = (
"The Release file for distribution '{}' contains no 'Architectures' "
"field, but since we are dealing with a flat repo, we can extract them "
"from the repos single Package index later."
)
log.warning(_(message).format(d_content.content.distribution))
d_content.content.architectures = ""
else:
raise MissingReleaseFileField(d_content.content.distribution, "Architectures")

log.debug(f"Codename: {d_content.content.codename}")
log.debug(f"Suite: {d_content.content.suite}")
log.debug(f"Components: {d_content.content.components}")
log.debug(f"Architectures: {d_content.content.architectures}")


def _get_artifact_set_sha256(d_content, supported_artifacts):
"""
Get the checksum of checksums for a set of artifacts associated with a multi artifact
Expand Down
Loading

0 comments on commit 39e878f

Please sign in to comment.