From 5a1bf105da6b7a258dd5831491ae9cb18f7b1ff5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zbigniew=20J=C4=99drzejewski-Szmek?= Date: Wed, 15 Sep 2021 15:46:31 +0200 Subject: [PATCH 1/9] Move PartitionTable to backend I think we should split out utils.py or something later on. But let's at least move it out of the main file. --- mkosi/__init__.py | 97 ++--------------------------------------------- mkosi/backend.py | 94 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 97 insertions(+), 94 deletions(-) diff --git a/mkosi/__init__.py b/mkosi/__init__.py index d886805fd..e4ee383d7 100644 --- a/mkosi/__init__.py +++ b/mkosi/__init__.py @@ -21,7 +21,6 @@ import http.server import importlib.resources import json -import math import os import platform import re @@ -71,6 +70,7 @@ MkosiException, MkosiPrinter, OutputFormat, + PartitionTable, SourceFileTransfer, die, install_grub, @@ -78,6 +78,8 @@ partition, patch_file, path_relative_to_cwd, + print_between_lines, + roundup, run, run_with_backoff, run_workspace_command, @@ -142,13 +144,6 @@ def write_resource( V = TypeVar("V") -def print_between_lines(s: str) -> None: - size = os.get_terminal_size() - print('-' * size.columns) - print(s.rstrip('\n')) - print('-' * size.columns) - - def dictify(f: Callable[..., Generator[Tuple[T, V], None, None]]) -> Callable[..., Dict[T, V]]: def wrapper(*args: Any, **kwargs: Any) -> Dict[T, V]: return dict(f(*args, **kwargs)) @@ -332,10 +327,6 @@ def format_bytes(num_bytes: int) -> str: return f"{num_bytes}B" -def roundup(x: int, step: int) -> int: - return ((x + step - 1) // step) * step - - _IOC_NRBITS = 8 # NOQA: E221,E222 _IOC_TYPEBITS = 8 # NOQA: E221,E222 _IOC_SIZEBITS = 14 # NOQA: E221,E222 @@ -3429,88 +3420,6 @@ def make_generated_root(args: CommandLineArguments, root: Path, for_cache: bool) return None -@dataclasses.dataclass -class PartitionTable: - partitions: List[str] - last_partition_sector: Optional[int] - sector_size: int - first_lba: Optional[int] - - grain: int = 4096 - - @classmethod - def read(cls, loopdev: Path) -> PartitionTable: - table = [] - last_sector = 0 - sector_size = 512 - first_lba = None - - c = run(["sfdisk", "--dump", loopdev], - stdout=PIPE, - universal_newlines=True) - - if 'disk' in ARG_DEBUG: - print_between_lines(c.stdout) - - in_body = False - for line in c.stdout.splitlines(): - line = line.strip() - - if line.startswith('sector-size:'): - sector_size = int(line[12:]) - if line.startswith('first-lba:'): - first_lba = int(line[10:]) - - if line == "": # empty line is where the body begins - in_body = True - continue - if not in_body: - continue - - table += [line] - - _, rest = line.split(":", 1) - fields = rest.split(",") - - start = None - size = None - - for field in fields: - field = field.strip() - - if field.startswith("start="): - start = int(field[6:]) - if field.startswith("size="): - size = int(field[5:]) - - if start is not None and size is not None: - end = start + size - last_sector = max(last_sector, end) - - return cls(table, last_sector * sector_size, sector_size, first_lba) - - @classmethod - def empty(cls, first_lba: Optional[int] = None) -> PartitionTable: - return cls([], None, 512, first_lba) - - def first_usable_offset(self, max_partitions: int = 128) -> int: - if self.last_partition_sector: - return roundup(self.last_partition_sector, self.grain) - elif self.first_lba is not None: - # No rounding here, we honour the specified value exactly. - return self.first_lba * self.sector_size - else: - # The header is like the footer, but we have a one-sector "protective MBR" at offset 0 - return roundup(self.sector_size + self.footer_size(), self.grain) - - def footer_size(self, max_partitions: int = 128) -> int: - # The footer must have enough space for the GPT header (one sector), - # and the GPT parition entry area. PEA size of 16384 (128 partitions) - # is recommended. - pea_sectors = math.ceil(max_partitions * 128 / self.sector_size) - return (1 + pea_sectors) * self.sector_size - - def insert_partition( args: CommandLineArguments, raw: BinaryIO, diff --git a/mkosi/backend.py b/mkosi/backend.py index 7b3bcded6..83ab19074 100644 --- a/mkosi/backend.py +++ b/mkosi/backend.py @@ -6,6 +6,7 @@ import contextlib import dataclasses import enum +import math import os import shlex import shutil @@ -39,6 +40,17 @@ def shell_join(cmd: Sequence[PathString]) -> str: return " ".join(shlex.quote(str(x)) for x in cmd) +def print_between_lines(s: str) -> None: + size = os.get_terminal_size() + print('-' * size.columns) + print(s.rstrip('\n')) + print('-' * size.columns) + + +def roundup(x: int, step: int) -> int: + return ((x + step - 1) // step) * step + + # These types are only generic during type checking and not at runtime, leading # to a TypeError during compilation. # Let's be as strict as we can with the description for the usage we have. @@ -194,6 +206,88 @@ class ManifestFormat(Parseable, enum.Enum): changelog = "changelog" # human-readable text file with package changelogs +@dataclasses.dataclass +class PartitionTable: + partitions: List[str] + last_partition_sector: Optional[int] + sector_size: int + first_lba: Optional[int] + + grain: int = 4096 + + @classmethod + def read(cls, loopdev: Path) -> PartitionTable: + table = [] + last_sector = 0 + sector_size = 512 + first_lba = None + + c = run(["sfdisk", "--dump", loopdev], + stdout=subprocess.PIPE, + universal_newlines=True) + + if 'disk' in ARG_DEBUG: + print_between_lines(c.stdout) + + in_body = False + for line in c.stdout.splitlines(): + line = line.strip() + + if line.startswith('sector-size:'): + sector_size = int(line[12:]) + if line.startswith('first-lba:'): + first_lba = int(line[10:]) + + if line == "": # empty line is where the body begins + in_body = True + continue + if not in_body: + continue + + table += [line] + + _, rest = line.split(":", 1) + fields = rest.split(",") + + start = None + size = None + + for field in fields: + field = field.strip() + + if field.startswith("start="): + start = int(field[6:]) + if field.startswith("size="): + size = int(field[5:]) + + if start is not None and size is not None: + end = start + size + last_sector = max(last_sector, end) + + return cls(table, last_sector * sector_size, sector_size, first_lba) + + @classmethod + def empty(cls, first_lba: Optional[int] = None) -> PartitionTable: + return cls([], None, 512, first_lba) + + def first_usable_offset(self, max_partitions: int = 128) -> int: + if self.last_partition_sector: + return roundup(self.last_partition_sector, self.grain) + elif self.first_lba is not None: + # No rounding here, we honour the specified value exactly. + return self.first_lba * self.sector_size + else: + # The header is like the footer, but we have a one-sector "protective MBR" at offset 0 + return roundup(self.sector_size + self.footer_size(), self.grain) + + def footer_size(self, max_partitions: int = 128) -> int: + # The footer must have enough space for the GPT header (one sector), + # and the GPT parition entry area. PEA size of 16384 (128 partitions) + # is recommended. + pea_sectors = math.ceil(max_partitions * 128 / self.sector_size) + return (1 + pea_sectors) * self.sector_size + + @dataclasses.dataclass class CommandLineArguments: """Type-hinted storage for command line arguments.""" From 1268cdc7b13641ed6d119a4a815afe478d1c2833 Mon Sep 17 00:00:00 2001 From: Lennart Poettering Date: Tue, 7 Sep 2021 22:56:20 +0200 Subject: [PATCH 2/9] mkosi: optionally, sign verity data This adds support for creating signed GPT disk images. If Verity=signed is set this will not only generate and insert Verity data into the image, but then use the resulting root hash, sign it and include it in an additional partition. It will also write the resulting PKCS7 signature out into a new .roothash.p7s file. This scheme is compatible with kernel 5.4's PKCS7 signature logic for dm-verity: the resulting .p7s file can be passed as-is to the kernel (or systemd's RootHashSignature= setting). The partition this embedds contains a simple JSON object containing three fields. The verity root hash, the PKCS7 data (i.e. the same data as in the .p7s file, but in base64), and SHA256 fingerprint of the signing key. This partition is supposed to be read by the image dissection logic of systemd, to implement signed single-file images. (The corresponding PR for systemd I am still working on). This opens up two avenues for image verification: 1. Everything in one file: the single, "unified" GPT disk image contains three partitions, for payload data, verity data and verity signature. 2. Split out: root hash and its signature are stored in two "sidecar" files. (Of course I'd personally always go the "unified" way, but given the RootHashSignature= logic exists already, and it's easy to support, let's support it.) This uses the key already used for doing secureboot image signing. Conceptually this makes a ton of sense: we sign the same stuff here after all: the contents of the image, supporting two different entrypoints to the image: one via UEFI booting the image, and once for attaching directly to an image from a running system. Admittedly, the "mkosi.secure-boot.key" and "mkosi.secure-boot.certificate" monikers for this key pair might be a bit suprising though. Corresponding PR in systemd is: https://github.com/systemd/systemd/pull/20691 --- mkosi/__init__.py | 363 ++++++++++++++++++++++++++++++------ mkosi/backend.py | 6 +- tests/test_config_parser.py | 1 + 3 files changed, 311 insertions(+), 59 deletions(-) diff --git a/mkosi/__init__.py b/mkosi/__init__.py index e4ee383d7..bea855fa0 100644 --- a/mkosi/__init__.py +++ b/mkosi/__init__.py @@ -4,6 +4,7 @@ import argparse import ast +import base64 import collections import configparser import contextlib @@ -35,6 +36,7 @@ import urllib.parse import urllib.request import uuid + from pathlib import Path from subprocess import DEVNULL, PIPE from textwrap import dedent @@ -179,30 +181,38 @@ def print_running_cmd(cmdline: Iterable[str]) -> None: MkosiPrinter.print_step(" ".join(shlex.quote(x) for x in cmdline) + "\n") -GPT_ROOT_X86 = uuid.UUID("44479540f29741b29af7d131d5f0458a") # NOQA: E221 -GPT_ROOT_X86_64 = uuid.UUID("4f68bce3e8cd4db196e7fbcaf984b709") # NOQA: E221 -GPT_ROOT_ARM = uuid.UUID("69dad7102ce44e3cb16c21a1d49abed3") # NOQA: E221 -GPT_ROOT_ARM_64 = uuid.UUID("b921b0451df041c3af444c6f280d3fae") # NOQA: E221 -GPT_USR_X86 = uuid.UUID("75250d768cc6458ebd66bd47cc81a812") # NOQA: E221 -GPT_USR_X86_64 = uuid.UUID("8484680c952148c69c11b0720656f69e") # NOQA: E221 -GPT_USR_ARM = uuid.UUID("7d0359a302b34f0a865c654403e70625") # NOQA: E221 -GPT_USR_ARM_64 = uuid.UUID("b0e01050ee5f4390949a9101b17104e9") # NOQA: E221 -GPT_ESP = uuid.UUID("c12a7328f81f11d2ba4b00a0c93ec93b") # NOQA: E221 -GPT_BIOS = uuid.UUID("2168614864496e6f744e656564454649") # NOQA: E221 -GPT_SWAP = uuid.UUID("0657fd6da4ab43c484e50933c84b4f4f") # NOQA: E221 -GPT_HOME = uuid.UUID("933ac7e12eb44f13b8440e14e2aef915") # NOQA: E221 -GPT_SRV = uuid.UUID("3b8f842520e04f3b907f1a25a76f98e8") # NOQA: E221 -GPT_XBOOTLDR = uuid.UUID("bc13c2ff59e64262a352b275fd6f7172") # NOQA: E221 -GPT_ROOT_X86_VERITY = uuid.UUID("d13c5d3bb5d1422ab29f9454fdc89d76") # NOQA: E221 -GPT_ROOT_X86_64_VERITY = uuid.UUID("2c7357edebd246d9aec123d437ec2bf5") # NOQA: E221 -GPT_ROOT_ARM_VERITY = uuid.UUID("7386cdf2203c47a9a498f2ecce45a2d6") # NOQA: E221 -GPT_ROOT_ARM_64_VERITY = uuid.UUID("df3300ced69f4c92978c9bfb0f38d820") # NOQA: E221 -GPT_USR_X86_VERITY = uuid.UUID("8f461b0d14ee4e819aa9049b6fb97abd") # NOQA: E221 -GPT_USR_X86_64_VERITY = uuid.UUID("77ff5f63e7b64633acf41565b864c0e6") # NOQA: E221 -GPT_USR_ARM_VERITY = uuid.UUID("c215d7517bcd4649be906627490a4c05") # NOQA: E221 -GPT_USR_ARM_64_VERITY = uuid.UUID("6e11a4e7fbca4dedb9e9e1a512bb664e") # NOQA: E221 -GPT_TMP = uuid.UUID("7ec6f5573bc54acab29316ef5df639d1") # NOQA: E221 -GPT_VAR = uuid.UUID("4d21b016b53445c2a9fb5c16e091fd2d") # NOQA: E221 +GPT_ROOT_X86 = uuid.UUID("44479540f29741b29af7d131d5f0458a") # NOQA: E221 +GPT_ROOT_X86_64 = uuid.UUID("4f68bce3e8cd4db196e7fbcaf984b709") # NOQA: E221 +GPT_ROOT_ARM = uuid.UUID("69dad7102ce44e3cb16c21a1d49abed3") # NOQA: E221 +GPT_ROOT_ARM_64 = uuid.UUID("b921b0451df041c3af444c6f280d3fae") # NOQA: E221 +GPT_USR_X86 = uuid.UUID("75250d768cc6458ebd66bd47cc81a812") # NOQA: E221 +GPT_USR_X86_64 = uuid.UUID("8484680c952148c69c11b0720656f69e") # NOQA: E221 +GPT_USR_ARM = uuid.UUID("7d0359a302b34f0a865c654403e70625") # NOQA: E221 +GPT_USR_ARM_64 = uuid.UUID("b0e01050ee5f4390949a9101b17104e9") # NOQA: E221 +GPT_ESP = uuid.UUID("c12a7328f81f11d2ba4b00a0c93ec93b") # NOQA: E221 +GPT_BIOS = uuid.UUID("2168614864496e6f744e656564454649") # NOQA: E221 +GPT_SWAP = uuid.UUID("0657fd6da4ab43c484e50933c84b4f4f") # NOQA: E221 +GPT_HOME = uuid.UUID("933ac7e12eb44f13b8440e14e2aef915") # NOQA: E221 +GPT_SRV = uuid.UUID("3b8f842520e04f3b907f1a25a76f98e8") # NOQA: E221 +GPT_XBOOTLDR = uuid.UUID("bc13c2ff59e64262a352b275fd6f7172") # NOQA: E221 +GPT_ROOT_X86_VERITY = uuid.UUID("d13c5d3bb5d1422ab29f9454fdc89d76") # NOQA: E221 +GPT_ROOT_X86_64_VERITY = uuid.UUID("2c7357edebd246d9aec123d437ec2bf5") # NOQA: E221 +GPT_ROOT_ARM_VERITY = uuid.UUID("7386cdf2203c47a9a498f2ecce45a2d6") # NOQA: E221 +GPT_ROOT_ARM_64_VERITY = uuid.UUID("df3300ced69f4c92978c9bfb0f38d820") # NOQA: E221 +GPT_USR_X86_VERITY = uuid.UUID("8f461b0d14ee4e819aa9049b6fb97abd") # NOQA: E221 +GPT_USR_X86_64_VERITY = uuid.UUID("77ff5f63e7b64633acf41565b864c0e6") # NOQA: E221 +GPT_USR_ARM_VERITY = uuid.UUID("c215d7517bcd4649be906627490a4c05") # NOQA: E221 +GPT_USR_ARM_64_VERITY = uuid.UUID("6e11a4e7fbca4dedb9e9e1a512bb664e") # NOQA: E221 +GPT_ROOT_X86_VERITY_SIG = uuid.UUID("5996fc05109c48de808b23fa0830b676") # NOQA: E221 +GPT_ROOT_X86_64_VERITY_SIG = uuid.UUID("41092b059fc84523994f2def0408b176") # NOQA: E221 +GPT_ROOT_ARM_VERITY_SIG = uuid.UUID("42b0455feb11491d98d356145ba9d037") # NOQA: E221 +GPT_ROOT_ARM_64_VERITY_SIG = uuid.UUID("6db69de629f44758a7a5962190f00ce3") # NOQA: E221 +GPT_USR_X86_VERITY_SIG = uuid.UUID("974a71c0de4143c3be5d5c5ccd1ad2c0") # NOQA: E221 +GPT_USR_X86_64_VERITY_SIG = uuid.UUID("e7bb33fb06cf4e818273e543b413e2e2") # NOQA: E221 +GPT_USR_ARM_VERITY_SIG = uuid.UUID("d7ff812f37d14902a810d76ba57b975a") # NOQA: E221 +GPT_USR_ARM_64_VERITY_SIG = uuid.UUID("c23ce4ff44bd4b00b2d4b41b3419e02a") # NOQA: E221 +GPT_TMP = uuid.UUID("7ec6f5573bc54acab29316ef5df639d1") # NOQA: E221 +GPT_VAR = uuid.UUID("4d21b016b53445c2a9fb5c16e091fd2d") # NOQA: E221 # This is a non-formatted partition used to store the second stage @@ -260,40 +270,42 @@ def print_running_cmd(cmdline: Iterable[str]) -> None: } -class GPTRootTypePair(NamedTuple): +class GPTRootTypeTriplet(NamedTuple): root: uuid.UUID verity: uuid.UUID + verity_sig: uuid.UUID -def gpt_root_native(arch: Optional[str], usr_only: bool = False) -> GPTRootTypePair: - """The tag for the native GPT root partition for the given architecture +def gpt_root_native(arch: Optional[str], usr_only: bool = False) -> GPTRootTypeTriplet: + """The type UUID for the native GPT root partition for the given architecture - Returns a tuple of two tags: for the root partition and for the - matching verity partition. + Returns a tuple of three UUIDs: for the root partition, for the + matching verity partition, and for the matching Verity signature + partition. """ if arch is None: arch = platform.machine() if usr_only: if arch in ("i386", "i486", "i586", "i686"): - return GPTRootTypePair(GPT_USR_X86, GPT_USR_X86_VERITY) + return GPTRootTypeTriplet(GPT_USR_X86, GPT_USR_X86_VERITY, GPT_USR_X86_VERITY_SIG) elif arch == "x86_64": - return GPTRootTypePair(GPT_USR_X86_64, GPT_USR_X86_64_VERITY) + return GPTRootTypeTriplet(GPT_USR_X86_64, GPT_USR_X86_64_VERITY, GPT_USR_X86_64_VERITY_SIG) elif arch == "aarch64": - return GPTRootTypePair(GPT_USR_ARM_64, GPT_USR_ARM_64_VERITY) + return GPTRootTypeTriplet(GPT_USR_ARM_64, GPT_USR_ARM_64_VERITY, GPT_USR_ARM_64_VERITY_SIG) elif arch == "armv7l": - return GPTRootTypePair(GPT_USR_ARM, GPT_USR_ARM_VERITY) + return GPTRootTypeTriplet(GPT_USR_ARM, GPT_USR_ARM_VERITY, GPT_USR_ARM_VERITY_SIG) else: die(f"Unknown architecture {arch}.") else: if arch in ("i386", "i486", "i586", "i686"): - return GPTRootTypePair(GPT_ROOT_X86, GPT_ROOT_X86_VERITY) + return GPTRootTypeTriplet(GPT_ROOT_X86, GPT_ROOT_X86_VERITY, GPT_ROOT_X86_VERITY_SIG) elif arch == "x86_64": - return GPTRootTypePair(GPT_ROOT_X86_64, GPT_ROOT_X86_64_VERITY) + return GPTRootTypeTriplet(GPT_ROOT_X86_64, GPT_ROOT_X86_64_VERITY, GPT_ROOT_X86_64_VERITY_SIG) elif arch == "aarch64": - return GPTRootTypePair(GPT_ROOT_ARM_64, GPT_ROOT_ARM_64_VERITY) + return GPTRootTypeTriplet(GPT_ROOT_ARM_64, GPT_ROOT_ARM_64_VERITY, GPT_ROOT_ARM_64_VERITY_SIG) elif arch == "armv7l": - return GPTRootTypePair(GPT_ROOT_ARM, GPT_ROOT_ARM_VERITY) + return GPTRootTypeTriplet(GPT_ROOT_ARM, GPT_ROOT_ARM_VERITY, GPT_ROOT_ARM_VERITY_SIG) else: die(f"Unknown architecture {arch}.") @@ -305,6 +317,10 @@ def roothash_suffix(usr_only: bool = False) -> str: return ".roothash" +def roothash_p7s_suffix(usr_only: bool = False) -> str: + return roothash_suffix(usr_only) + ".p7s" + + def unshare(flags: int) -> None: libc_name = ctypes.util.find_library("c") if libc_name is None: @@ -535,6 +551,8 @@ def image_size(args: CommandLineArguments) -> int: size += args.swap_size if args.verity_size is not None: size += args.verity_size + if args.verity_sig_size is not None: + size += args.verity_sig_size return size @@ -547,7 +565,7 @@ def disable_cow(path: PathString) -> None: def root_partition_name( args: Optional[CommandLineArguments], - verity: Optional[bool] = False, + suffix: Optional[str] = None, image_id: Optional[str] = None, image_version: Optional[str] = None, usr_only: Optional[bool] = False, @@ -581,9 +599,7 @@ def root_partition_name( # If no image id is specified we just return a descriptive string # for the partition. prefix = "System Resources" if usr_only else "Root" - if verity: - return prefix + " Verity" - return prefix + " Partition" + return prefix + ' ' + (suffix if suffix is not None else 'Partition') def determine_partition_table(args: CommandLineArguments) -> Tuple[str, bool]: @@ -671,6 +687,12 @@ def determine_partition_table(args: CommandLineArguments) -> Tuple[str, bool]: else: args.verity_partno = None + if args.verity == "signed": + args.verity_sig_partno = pn + pn += 1 + else: + args.verity_sig_partno = None + return table, run_sfdisk @@ -3535,7 +3557,7 @@ def insert_generated_root( def make_verity( args: CommandLineArguments, dev: Optional[Path], do_run_build_script: bool, for_cache: bool ) -> Tuple[Optional[BinaryIO], Optional[str]]: - if do_run_build_script or not args.verity: + if do_run_build_script or args.verity is False: return None, None if for_cache: return None, None @@ -3580,13 +3602,120 @@ def insert_verity( loopdev, args.verity_partno, verity, - root_partition_name(args, True), + root_partition_name(args, "Verity"), gpt_root_native(args.architecture, args.usr_only).verity, True, u, ) +PKCS7_NOCERTS = 0x2 # Don't include signature certificates in result +PKCS7_DETACHED = 0x40 # Don't include data to sign in result +PKCS7_BINARY = 0x80 # Don't mangle newlines for MIME canonical format +PKCS7_NOATTR = 0x100 # Don't include signature time, … in result + + +def make_verity_sig( + args: CommandLineArguments, root_hash: Optional[str], do_run_build_script: bool, for_cache: bool +) -> Tuple[Optional[BinaryIO], Optional[bytes], Optional[str]]: + + if do_run_build_script or args.verity != "signed": + return None, None, None + if for_cache: + return None, None, None + + assert root_hash is not None + + try: + from OpenSSL import crypto + except ImportError: + die("Python OpenSSL package is not installed."); + + with complete_step("Signing verity root hash…"): + + key = crypto.load_privatekey(crypto.FILETYPE_PEM, args.secure_boot_key.read_bytes()) + certificate = crypto.load_certificate(crypto.FILETYPE_PEM, args.secure_boot_certificate.read_bytes()) + + # Note that the library returns the SHA256 digest in an + # uppecase format with : as byte separators. Let's convert to + # classic lowercase series of unseparated hex digits + fingerprint = certificate.digest("sha256").decode("ascii").replace(":", "").lower() + + bio_in = crypto._new_mem_buf(root_hash.encode("utf-8")) + pkcs7 = crypto._lib.PKCS7_sign( + certificate._x509, + key._pkey, + crypto._ffi.NULL, + bio_in, + PKCS7_DETACHED | PKCS7_NOCERTS | PKCS7_NOATTR | PKCS7_BINARY, + ) + bio_out = crypto._new_mem_buf() + crypto._lib.i2d_PKCS7_bio(bio_out, pkcs7) + sigbytes = crypto._bio_to_string(bio_out) + + b64encoded = base64.b64encode(sigbytes).decode("ascii") + + # This is supposed to be extensible, but care should be taken + # not to include unprotected data here. + j = json.dumps({ + "rootHash": root_hash, + "certificateFingerprint": fingerprint, + "signature": b64encoded + }).encode("utf-8") + + # Pad to next multiple of 4K with NUL bytes + padded = j + b"\0" * (roundup(len(j), 4096) - len(j)) + + f: BinaryIO = cast(BinaryIO, tempfile.NamedTemporaryFile(mode="w+b", dir=args.output.parent, prefix=".mkosi-")) + f.write(padded) + f.flush() + + # Returns a file with zero-padded JSON data to insert as + # signature partition as first element, and the DER PKCS7 + # signature bytes as second argument (to store as detached + # PKCS7 file), and finally the SHA256 fingerprint of the + # certificate used (which is used to deterministically + # generate the partition UUID for the signature partition). + + return f, sigbytes, fingerprint + + +def insert_verity_sig( + args: CommandLineArguments, + raw: Optional[BinaryIO], + loopdev: Optional[Path], + verity_sig: Optional[BinaryIO], + root_hash: Optional[str], + fingerprint: Optional[str], + for_cache: bool, +) -> None: + if verity_sig is None: + return + if for_cache: + return + assert loopdev is not None + assert raw is not None + assert root_hash is not None + assert fingerprint is not None + assert args.verity_sig_partno is not None + + # Hash the concatenation of verity roothash and the X509 certificate fingerprint to generate a UUID for the signature partition + u = uuid.UUID(hashlib.sha256(bytes.fromhex(root_hash) + bytes.fromhex(fingerprint)).hexdigest()[:32]) + + with complete_step("Inserting verity signature partition…"): + insert_partition( + args, + raw, + loopdev, + args.verity_sig_partno, + verity_sig, + root_partition_name(args, "Signature"), + gpt_root_native(args.architecture, args.usr_only).verity_sig, + True, + u, + ) + + def patch_root_uuid( args: CommandLineArguments, loopdev: Optional[Path], root_hash: Optional[str], for_cache: bool ) -> None: @@ -3709,7 +3838,7 @@ def secure_boot_sign( return if for_cache and args.verity: return - if cached and not args.verity: + if cached and args.verity is False: return with mount(): @@ -3825,6 +3954,25 @@ def write_root_hash_file(args: CommandLineArguments, root_hash: Optional[str]) - return f +def write_root_hash_p7s_file(args: CommandLineArguments, root_hash_p7s: Optional[bytes]) -> Optional[BinaryIO]: + if root_hash_p7s is None: + return None + + assert args.output_root_hash_p7s_file is not None + + suffix = roothash_p7s_suffix(args.usr_only) + with complete_step(f"Writing {suffix} file…"): + f: BinaryIO = cast( + BinaryIO, + tempfile.NamedTemporaryFile( + mode="w+b", prefix=".mkosi", dir=args.output_root_hash_p7s_file.parent + ), + ) + f.write(root_hash_p7s) + + return f + + def copy_nspawn_settings(args: CommandLineArguments) -> Optional[BinaryIO]: if args.nspawn_settings is None: return None @@ -3863,8 +4011,10 @@ def calculate_sha256sum( raw: Optional[BinaryIO], archive: Optional[BinaryIO], root_hash_file: Optional[BinaryIO], + root_hash_p7s_file: Optional[BinaryIO], split_root: Optional[BinaryIO], split_verity: Optional[BinaryIO], + split_verity_sig: Optional[BinaryIO], split_kernel: Optional[BinaryIO], nspawn_settings: Optional[BinaryIO], ) -> Optional[TextIO]: @@ -3891,12 +4041,18 @@ def calculate_sha256sum( if root_hash_file is not None: assert args.output_root_hash_file is not None hash_file(f, root_hash_file, os.path.basename(args.output_root_hash_file)) + if root_hash_p7s_file is not None: + assert args.output_root_hash_p7s_file is not None + hash_file(f, root_hash_p7s_file, args.output_root_hash_p7s_file.name) if split_root is not None: assert args.output_split_root is not None hash_file(f, split_root, os.path.basename(args.output_split_root)) if split_verity is not None: assert args.output_split_verity is not None hash_file(f, split_verity, os.path.basename(args.output_split_verity)) + if split_verity_sig is not None: + assert args.output_split_verity_sig is not None + hash_file(f, split_verity_sig, args.output_split_verity_sig.name) if split_kernel is not None: assert args.output_split_kernel is not None hash_file(f, split_kernel, os.path.basename(args.output_split_kernel)) @@ -4044,6 +4200,16 @@ def link_output_root_hash_file(args: CommandLineArguments, root_hash_file: Optio _link_output(args, root_hash_file.name, args.output_root_hash_file) +def link_output_root_hash_p7s_file(args: CommandLineArguments, root_hash_p7s_file: Optional[SomeIO]) -> None: + if root_hash_p7s_file: + assert args.output_root_hash_p7s_file + suffix = roothash_p7s_suffix(args.usr_only) + with complete_step( + f"Linking {suffix} file…", f"Linked {path_relative_to_cwd(args.output_root_hash_p7s_file)}" + ): + _link_output(args, root_hash_p7s_file.name, args.output_root_hash_p7s_file) + + def link_output_signature(args: CommandLineArguments, signature: Optional[SomeIO]) -> None: if signature: assert args.output_signature is not None @@ -4082,6 +4248,15 @@ def link_output_split_verity(args: CommandLineArguments, split_verity: Optional[ _link_output(args, split_verity.name, args.output_split_verity) +def link_output_split_verity_sig(args: CommandLineArguments, split_verity_sig: Optional[SomeIO]) -> None: + if split_verity_sig: + assert args.output_split_verity_sig + with complete_step( + "Linking split Verity Signature data…", f"Linked {path_relative_to_cwd(args.output_split_verity_sig)}" + ): + _link_output(args, split_verity_sig.name, args.output_split_verity_sig) + + def link_output_split_kernel(args: CommandLineArguments, split_kernel: Optional[SomeIO]) -> None: if split_kernel: assert args.output_split_kernel @@ -4310,6 +4485,23 @@ def __call__( super().__call__(parser, namespace, values, option_string) +class VerityAction(BooleanAction): + def __call__( + self, + parser: argparse.ArgumentParser, + namespace: argparse.Namespace, + values: Union[str, Sequence[Any], None, bool], + option_string: Optional[str] = None, + ) -> None: + + if isinstance(values, str): + if values == "signed": + setattr(namespace, self.dest, "signed") + return + + super().__call__(parser, namespace, values, option_string) + + class CustomHelpFormatter(argparse.HelpFormatter): def _format_action_invocation(self, action: argparse.Action) -> str: if not action.option_strings or action.nargs == 0: @@ -4519,6 +4711,12 @@ def create_parser() -> ArgumentParserMkosi: type=Path, metavar="PATH", ) + group.add_argument( + "--output-split-verity-sig", + help="Output Verity Signature partition image path (if --split-artifacts is used)", + type=Path, + metavar="PATH", + ) group.add_argument( "--output-split-kernel", help="Output kernel path (if --split-artifacts is used)", @@ -4602,7 +4800,11 @@ def create_parser() -> ArgumentParserMkosi: group.add_argument( "--encrypt", choices=("all", "data"), help='Encrypt everything except: ESP ("all") or ESP and root ("data")' ) - group.add_argument("--verity", action=BooleanAction, help="Add integrity partition (implies --read-only)") + group.add_argument( + "--verity", + action=VerityAction, + help="Add integrity partition, and optionally sign it (implies --read-only)", + ) group.add_argument( "--compress", type=parse_compression, @@ -5257,6 +5459,8 @@ def unlink_output(args: CommandLineArguments) -> None: if args.verity: unlink_try_hard(args.output_root_hash_file) + if args.verity == "signed": + unlink_try_hard(args.output_root_hash_p7s_file) if args.sign: unlink_try_hard(args.output_signature) @@ -5267,6 +5471,7 @@ def unlink_output(args: CommandLineArguments) -> None: if args.split_artifacts: unlink_try_hard(args.output_split_root) unlink_try_hard(args.output_split_verity) + unlink_try_hard(args.output_split_verity_sig) unlink_try_hard(args.output_split_kernel) if args.nspawn_settings is not None: @@ -5424,7 +5629,7 @@ def find_password(args: argparse.Namespace) -> None: def find_secure_boot(args: argparse.Namespace) -> None: - if not args.secure_boot: + if not args.secure_boot and args.verity != "signed": return if args.secure_boot_key is None: @@ -5705,6 +5910,9 @@ def load_args(args: argparse.Namespace) -> CommandLineArguments: args.read_only = True args.output_root_hash_file = build_auxiliary_output_path(args, roothash_suffix(args.usr_only)) + if args.verity == "signed": + args.output_root_hash_p7s_file = build_auxiliary_output_path(args, roothash_p7s_suffix(args.usr_only)) + if args.checksum: args.output_checksum = args.output.with_name("SHA256SUMS") @@ -5726,6 +5934,8 @@ def load_args(args: argparse.Namespace) -> CommandLineArguments: args.output_split_root = build_auxiliary_output_path(args, ".usr" if args.usr_only else ".root", True) if args.verity: args.output_split_verity = build_auxiliary_output_path(args, ".verity", True) + if args.verity == "signed": + args.output_split_verity_sig = build_auxiliary_output_path(args, ".verity-sig", True) if args.bootable: args.output_split_kernel = build_auxiliary_output_path(args, ".efi", True) @@ -5778,6 +5988,7 @@ def load_args(args: argparse.Namespace) -> CommandLineArguments: args.esp_size = 256 * 1024 * 1024 args.verity_size = None + args.verity_sig_size = None if args.secure_boot_key is not None: args.secure_boot_key = args.secure_boot_key.absolute() @@ -5785,15 +5996,15 @@ def load_args(args: argparse.Namespace) -> CommandLineArguments: if args.secure_boot_certificate is not None: args.secure_boot_certificate = args.secure_boot_certificate.absolute() - if args.secure_boot: + if args.secure_boot or args.verity == "signed": if args.secure_boot_key is None: die( - "UEFI SecureBoot enabled, but couldn't find private key. (Consider placing it in mkosi.secure-boot.key?)" + "UEFI SecureBoot or signed Verity enabled, but couldn't find private key. (Consider placing it in mkosi.secure-boot.key?)" ) # NOQA: E501 if args.secure_boot_certificate is None: die( - "UEFI SecureBoot enabled, but couldn't find certificate. (Consider placing it in mkosi.secure-boot.crt?)" + "UEFI SecureBoot or signed Verity enabled, but couldn't find certificate. (Consider placing it in mkosi.secure-boot.crt?)" ) # NOQA: E501 if args.verb in ("shell", "boot"): @@ -5888,6 +6099,7 @@ def check_output(args: CommandLineArguments) -> None: args.output_sshkey if args.ssh else None, args.output_split_root if args.split_artifacts else None, args.output_split_verity if args.split_artifacts else None, + args.output_split_verity_sig if args.split_artifacts else None, args.output_split_kernel if args.split_artifacts else None, ): @@ -5977,6 +6189,9 @@ def print_summary(args: CommandLineArguments) -> None: MkosiPrinter.info( f" Output Split Verity: {none_to_na(args.output_split_verity if args.split_artifacts else None)}" ) + MkosiPrinter.info( + f" Output Split Verity Sig.: {none_to_na(args.output_split_verity_sig if args.split_artifacts else None)}" + ) MkosiPrinter.info( f" Output Split Kernel: {none_to_na(args.output_split_kernel if args.split_artifacts else None)}" ) @@ -6001,7 +6216,7 @@ def print_summary(args: CommandLineArguments) -> None: MkosiPrinter.info(" QCow2: " + yes_no(args.qcow2)) MkosiPrinter.info(" Encryption: " + none_to_no(args.encrypt)) - MkosiPrinter.info(" Verity: " + yes_no(args.verity)) + MkosiPrinter.info(" Verity: " + yes_no_or(args.verity)) if args.output_format.is_disk(): MkosiPrinter.info(" Bootable: " + yes_no(args.bootable)) @@ -6010,15 +6225,15 @@ def print_summary(args: CommandLineArguments) -> None: MkosiPrinter.info(" Kernel Command Line: " + " ".join(args.kernel_command_line)) MkosiPrinter.info(" UEFI SecureBoot: " + yes_no(args.secure_boot)) - if args.secure_boot: - MkosiPrinter.info(f" UEFI SecureBoot Key: {args.secure_boot_key}") - MkosiPrinter.info(f" UEFI SecureBoot Cert.: {args.secure_boot_certificate}") - MkosiPrinter.info(" Boot Protocols: " + line_join_list(args.boot_protocols)) MkosiPrinter.info(" Unified Kernel Images: " + yes_no(args.with_unified_kernel_images)) MkosiPrinter.info(" GPT First LBA: " + str(args.gpt_first_lba)) MkosiPrinter.info(" Hostonly Initrd: " + yes_no(args.hostonly_initrd)) + if args.secure_boot or args.verity == "sign": + MkosiPrinter.info(f"SecureBoot/Verity Sign Key: {args.secure_boot_key}") + MkosiPrinter.info(f" SecureBoot/verity Cert.: {args.secure_boot_certificate}") + MkosiPrinter.info("\nCONTENT:") MkosiPrinter.info(" Packages: " + line_join_list(args.packages)) @@ -6211,9 +6426,13 @@ class BuildOutput: raw: Optional[BinaryIO] archive: Optional[BinaryIO] root_hash: Optional[str] + root_hash_p7s: Optional[bytes] sshkey: Optional[TextIO] + + # Partition contents split_root: Optional[BinaryIO] split_verity: Optional[BinaryIO] + split_verity_sig: Optional[BinaryIO] split_kernel: Optional[BinaryIO] def raw_name(self) -> Optional[str]: @@ -6221,7 +6440,7 @@ def raw_name(self) -> Optional[str]: @classmethod def empty(cls) -> BuildOutput: - return cls(None, None, None, None, None, None, None) + return cls(None, None, None, None, None, None, None, None, None) def build_image( @@ -6332,6 +6551,10 @@ def build_image( insert_verity(args, raw, loopdev, verity, root_hash, for_cache) split_verity = verity if args.split_artifacts else None + verity_sig, root_hash_p7s, fingerprint = make_verity_sig(args, root_hash, do_run_build_script, for_cache) + insert_verity_sig(args, raw, loopdev, verity_sig, root_hash, fingerprint, for_cache) + split_verity_sig = verity_sig if args.split_artifacts else None + # This time we mount read-only, as we already generated # the verity data, and hence really shouldn't modify the # image anymore. @@ -6354,7 +6577,17 @@ def build_image( archive = make_tar(args, root, do_run_build_script, for_cache) or \ make_cpio(args, root, do_run_build_script, for_cache) - return BuildOutput(raw or generated_root, archive, root_hash, sshkey, split_root, split_verity, split_kernel) + return BuildOutput( + raw or generated_root, + archive, + root_hash, + root_hash_p7s, + sshkey, + split_root, + split_verity, + split_verity_sig, + split_kernel, + ) def one_zero(b: bool) -> str: @@ -6538,16 +6771,29 @@ def build_stuff(args: CommandLineArguments) -> Manifest: raw = compress_output(args, raw) split_root = compress_output(args, image.split_root, ".usr" if args.usr_only else ".root") split_verity = compress_output(args, image.split_verity, ".verity") + split_verity_sig = compress_output(args, image.split_verity_sig, ".verity-sig") split_kernel = compress_output(args, image.split_kernel, ".efi") root_hash_file = write_root_hash_file(args, image.root_hash) + root_hash_p7s_file = write_root_hash_p7s_file(args, image.root_hash_p7s) settings = copy_nspawn_settings(args) - checksum = calculate_sha256sum(args, raw, image.archive, root_hash_file, - split_root, split_verity, split_kernel, settings) + checksum = calculate_sha256sum( + args, + raw, + image.archive, + root_hash_file, + root_hash_p7s_file, + split_root, + split_verity, + split_verity_sig, + split_kernel, + settings, + ) signature = calculate_signature(args, checksum) bmap = calculate_bmap(args, raw) link_output(args, root, raw or image.archive) link_output_root_hash_file(args, root_hash_file) + link_output_root_hash_p7s_file(args, root_hash_p7s_file) link_output_checksum(args, checksum) link_output_signature(args, signature) link_output_bmap(args, bmap) @@ -6556,6 +6802,7 @@ def build_stuff(args: CommandLineArguments) -> Manifest: link_output_sshkey(args, image.sshkey) link_output_split_root(args, split_root) link_output_split_verity(args, split_verity) + link_output_split_verity_sig(args, split_verity_sig) link_output_split_kernel(args, split_kernel) if image.root_hash is not None: diff --git a/mkosi/backend.py b/mkosi/backend.py index 83ab19074..c49ef9904 100644 --- a/mkosi/backend.py +++ b/mkosi/backend.py @@ -316,7 +316,7 @@ class CommandLineArguments: secure_boot_common_name: str read_only: bool encrypt: Optional[str] - verity: bool + verity: Union[bool, str] compress: Union[None, str, bool] compress_fs: Union[None, str, bool] compress_output: Union[None, str, bool] @@ -396,6 +396,7 @@ class CommandLineArguments: # Some extra stuff that's stored in CommandLineArguments for convenience but isn't populated by arguments verity_size: Optional[int] + verity_sig_size: Optional[int] machine_id: str force: bool original_umask: int @@ -405,9 +406,11 @@ class CommandLineArguments: output_nspawn_settings: Optional[Path] = None output_sshkey: Optional[Path] = None output_root_hash_file: Optional[Path] = None + output_root_hash_p7s_file: Optional[Path] = None output_bmap: Optional[Path] = None output_split_root: Optional[Path] = None output_split_verity: Optional[Path] = None + output_split_verity_sig: Optional[Path] = None output_split_kernel: Optional[Path] = None cache_pre_inst: Optional[Path] = None cache_pre_dev: Optional[Path] = None @@ -423,6 +426,7 @@ class CommandLineArguments: var_partno: Optional[int] = None tmp_partno: Optional[int] = None verity_partno: Optional[int] = None + verity_sig_partno: Optional[int] = None releasever: Optional[str] = None ran_sfdisk: bool = False diff --git a/tests/test_config_parser.py b/tests/test_config_parser.py index cf40c843b..08ae31f09 100644 --- a/tests/test_config_parser.py +++ b/tests/test_config_parser.py @@ -137,6 +137,7 @@ def add_reference_config(self, job_name=DEFAULT_JOB_NAME): "output_split_root": None, "output_split_kernel": None, "output_split_verity": None, + "output_split_verity_sig": None, "image_id": None, "image_version": None, "auto_bump": False, From bcc870040898261868067f50c0733654c4622364 Mon Sep 17 00:00:00 2001 From: Lennart Poettering Date: Fri, 10 Sep 2021 22:10:25 +0200 Subject: [PATCH 3/9] doc: document Verity=signed --- mkosi.md | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/mkosi.md b/mkosi.md index b51e03547..e3761d1e6 100644 --- a/mkosi.md +++ b/mkosi.md @@ -528,10 +528,24 @@ a boolean argument: either "1", "yes", or "true" to enable, or "0", `Verity=`, `--verity` -: Add an "Verity" integrity partition to the image. If enabled, the - root partition is protected with `dm-verity` against off-line - modification, the verification data is placed in an additional GPT - partition. Implies `ReadOnly=yes`. +: Add a "Verity" integrity partition to the image. Takes a boolean or + the special value `signed`, and defaults to disabled. If enabled, + the root partition (or `/usr/` partition, in case `UsrOnly=` is + enabled) is protected with `dm-verity` against offline modification, + the verification data is placed in an additional GPT + partition. Implies `ReadOnly=yes`. If this is enabled, the Verity + root hash is written to an output file with `.roothash` or + `.usrhash` suffix. If set to `signed`, Verity is also enabled, but + the resulting root hash is then also signed (in PKCS#7 format) with + the signature key configured with `SecureBootKey=`. Or in other + words: the SecureBoot key pair is then used to both sign the kernel, + if that is enabled, and the root/`/usr/` file system. This signature + is then stored in an additional output file with the `.roothash.p7s` + or `.usrhash.p7s` suffix in DER format. It is also written to an + additional partition in the image. The latter allows generating + self-contained signed disk images, implementing the Verity + provisions described in the [Discoverable Partitions + Specification](https://systemd.io/DISCOVERABLE_PARTITIONS). `CompressFs=`, `--compress-fs=` @@ -597,7 +611,7 @@ a boolean argument: either "1", "yes", or "true" to enable, or "0", : Configure the image identifier. This accepts a freeform string that shall be used to identify the image with. If set the default output file will be named after it (possibly suffixed with the version). If - this option is used the root, `/usr/` and verity partitions in the + this option is used the root, `/usr/` and Verity partitions in the image will have their labels set to this (possibly suffixed by the image version). The identifier is also passed via the `$IMAGE_ID` to any build scripts invoked (which may be useful to patch it into From 77dc59f4411c1585f2a2d635ea37237d4a607285 Mon Sep 17 00:00:00 2001 From: Lennart Poettering Date: Mon, 13 Sep 2021 17:45:45 +0200 Subject: [PATCH 4/9] mkosi: port to python-cryptography MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This is mostly based on Jörg Behrmann's (@behrmann) work. --- mkosi/__init__.py | 56 +++++++++++++++++++++++------------------------ 1 file changed, 28 insertions(+), 28 deletions(-) diff --git a/mkosi/__init__.py b/mkosi/__init__.py index bea855fa0..9911f7e5e 100644 --- a/mkosi/__init__.py +++ b/mkosi/__init__.py @@ -3609,12 +3609,6 @@ def insert_verity( ) -PKCS7_NOCERTS = 0x2 # Don't include signature certificates in result -PKCS7_DETACHED = 0x40 # Don't include data to sign in result -PKCS7_BINARY = 0x80 # Don't mangle newlines for MIME canonical format -PKCS7_NOATTR = 0x100 # Don't include signature time, … in result - - def make_verity_sig( args: CommandLineArguments, root_hash: Optional[str], do_run_build_script: bool, for_cache: bool ) -> Tuple[Optional[BinaryIO], Optional[bytes], Optional[str]]: @@ -3626,35 +3620,41 @@ def make_verity_sig( assert root_hash is not None - try: - from OpenSSL import crypto - except ImportError: - die("Python OpenSSL package is not installed."); + from cryptography import x509 + from cryptography.hazmat.primitives import hashes + from cryptography.hazmat.primitives import serialization + from cryptography.hazmat.primitives.serialization import pkcs7 with complete_step("Signing verity root hash…"): - key = crypto.load_privatekey(crypto.FILETYPE_PEM, args.secure_boot_key.read_bytes()) - certificate = crypto.load_certificate(crypto.FILETYPE_PEM, args.secure_boot_certificate.read_bytes()) - - # Note that the library returns the SHA256 digest in an - # uppecase format with : as byte separators. Let's convert to - # classic lowercase series of unseparated hex digits - fingerprint = certificate.digest("sha256").decode("ascii").replace(":", "").lower() - - bio_in = crypto._new_mem_buf(root_hash.encode("utf-8")) - pkcs7 = crypto._lib.PKCS7_sign( - certificate._x509, - key._pkey, - crypto._ffi.NULL, - bio_in, - PKCS7_DETACHED | PKCS7_NOCERTS | PKCS7_NOATTR | PKCS7_BINARY, + key = serialization.load_pem_private_key(args.secure_boot_key.read_bytes(), password=None) + certificate = x509.load_pem_x509_certificate(args.secure_boot_certificate.read_bytes()) + + fingerprint = certificate.fingerprint(hashes.SHA256()).hex() + + sigbytes = pkcs7.PKCS7SignatureBuilder().add_signer( + certificate, + key, + hashes.SHA256() + ).set_data( + root_hash.encode("utf-8") + ).sign( + options=[ + pkcs7.PKCS7Options.DetachedSignature, + pkcs7.PKCS7Options.NoCerts, + pkcs7.PKCS7Options.NoAttributes, + pkcs7.PKCS7Options.Binary + ], + encoding=serialization.Encoding.DER ) - bio_out = crypto._new_mem_buf() - crypto._lib.i2d_PKCS7_bio(bio_out, pkcs7) - sigbytes = crypto._bio_to_string(bio_out) + # We base64 the DER result, because we want to include it in + # JSON. This is not PEM (i.e. not header/footer line, no line + # breaks), but just base64 encapsulated DER). b64encoded = base64.b64encode(sigbytes).decode("ascii") + print(b64encoded) + # This is supposed to be extensible, but care should be taken # not to include unprotected data here. j = json.dumps({ From fd14ca4ebbb214623869a97cc84df6582a923c89 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zbigniew=20J=C4=99drzejewski-Szmek?= Date: Mon, 20 Sep 2021 16:06:27 +0200 Subject: [PATCH 5/9] Add Partition and PartitionTable classes The general idea is that those closes collect attributes and information about the abstract partition table, but are independent of any underlying block device. The definitions exists indendependently of a block device and can be applied later on. Partitions are referred to by enum PartitionIdentifier. The calculation of the space necessary for those partitions is centralized in the class, so we don't have multiplace places where we arrive at slightly different formulas for the expected disk size. --- mkosi/__init__.py | 527 ++++++++++++++++++---------------------------- mkosi/backend.py | 195 ++++++++++------- 2 files changed, 317 insertions(+), 405 deletions(-) diff --git a/mkosi/__init__.py b/mkosi/__init__.py index 9911f7e5e..13042412b 100644 --- a/mkosi/__init__.py +++ b/mkosi/__init__.py @@ -72,18 +72,17 @@ MkosiException, MkosiPrinter, OutputFormat, + Partition, + PartitionIdentifier, PartitionTable, SourceFileTransfer, die, install_grub, nspawn_params_for_blockdev_access, - partition, patch_file, path_relative_to_cwd, - print_between_lines, roundup, run, - run_with_backoff, run_workspace_command, should_compress_fs, should_compress_output, @@ -525,38 +524,6 @@ def is_generated_root(args: Union[argparse.Namespace, CommandLineArguments]) -> return args.minimize or args.output_format.is_squashfs() or args.usr_only -def image_size(args: CommandLineArguments) -> int: - gpt = PartitionTable.empty(args.gpt_first_lba) - size = gpt.first_usable_offset() + gpt.footer_size() - - if args.root_size is not None: - size += args.root_size - if args.home_size is not None: - size += args.home_size - if args.srv_size is not None: - size += args.srv_size - if args.var_size is not None: - size += args.var_size - if args.tmp_size is not None: - size += args.tmp_size - if args.bootable: - if "uefi" in args.boot_protocols: - assert args.esp_size - size += args.esp_size - if "bios" in args.boot_protocols: - size += BIOS_PARTITION_SIZE - if args.xbootldr_size is not None: - size += args.xbootldr_size - if args.swap_size is not None: - size += args.swap_size - if args.verity_size is not None: - size += args.verity_size - if args.verity_sig_size is not None: - size += args.verity_sig_size - - return size - - def disable_cow(path: PathString) -> None: """Disable copy-on-write if applicable on filesystem""" @@ -602,118 +569,52 @@ def root_partition_name( return prefix + ' ' + (suffix if suffix is not None else 'Partition') -def determine_partition_table(args: CommandLineArguments) -> Tuple[str, bool]: - pn = 1 - table = "label: gpt\n" - if args.gpt_first_lba is not None: - table += f"first-lba: {args.gpt_first_lba:d}\n" - run_sfdisk = False - args.esp_partno = None - args.bios_partno = None - - if args.bootable: - if "uefi" in args.boot_protocols: - assert args.esp_size is not None - table += f'size={args.esp_size // 512}, type={GPT_ESP}, name="ESP System Partition"\n' - args.esp_partno = pn - pn += 1 - - if "bios" in args.boot_protocols: - table += f'size={BIOS_PARTITION_SIZE // 512}, type={GPT_BIOS}, name="BIOS Boot Partition"\n' - args.bios_partno = pn - pn += 1 - - run_sfdisk = True - - if args.xbootldr_size is not None: - table += f'size={args.xbootldr_size // 512}, type={GPT_XBOOTLDR}, name="Boot Loader Partition"\n' - args.xbootldr_partno = pn - pn += 1 - else: - args.xbootldr_partno = None - - if args.swap_size is not None: - table += f'size={args.swap_size // 512}, type={GPT_SWAP}, name="Swap Partition"\n' - args.swap_partno = pn - pn += 1 - run_sfdisk = True - else: - args.swap_partno = None - - args.home_partno = None - args.srv_partno = None - args.var_partno = None - args.tmp_partno = None - - if args.output_format != OutputFormat.gpt_btrfs: - if args.home_size is not None: - table += f'size={args.home_size // 512}, type={GPT_HOME}, name="Home Partition"\n' - args.home_partno = pn - pn += 1 - run_sfdisk = True - - if args.srv_size is not None: - table += f'size={args.srv_size // 512}, type={GPT_SRV}, name="Server Data Partition"\n' - args.srv_partno = pn - pn += 1 - run_sfdisk = True - - if args.var_size is not None: - table += f'size={args.var_size // 512}, type={GPT_VAR}, name="Variable Data Partition"\n' - args.var_partno = pn - pn += 1 - run_sfdisk = True - - if args.tmp_size is not None: - table += f'size={args.tmp_size // 512}, type={GPT_TMP}, name="Temporary Data Partition"\n' - args.tmp_partno = pn - pn += 1 - run_sfdisk = True - - if not is_generated_root(args): - table += 'type={}, attrs={}, name="{}"\n'.format( - gpt_root_native(args.architecture, args.usr_only).root, - "GUID:60" if args.read_only and args.output_format != OutputFormat.gpt_btrfs else "", - root_partition_name(args), - ) - run_sfdisk = True - - args.root_partno = pn - pn += 1 - - if args.verity: - args.verity_partno = pn - pn += 1 - else: - args.verity_partno = None - - if args.verity == "signed": - args.verity_sig_partno = pn - pn += 1 - else: - args.verity_sig_partno = None - - return table, run_sfdisk - - -def exec_sfdisk(args: CommandLineArguments, f: BinaryIO) -> None: - - table, run_sfdisk = determine_partition_table(args) +def initialize_partition_table(args: CommandLineArguments) -> None: + if args.partition_table is not None: + return - if run_sfdisk: - run(["sfdisk", "--color=never", f.name], input=table.encode("utf-8")) - run(["sync"]) + if not args.output_format.is_disk(): + return - args.ran_sfdisk = run_sfdisk + table = PartitionTable(first_lba=args.gpt_first_lba) + no_btrfs = args.output_format != OutputFormat.gpt_btrfs + + for condition, label, size, type_uuid, name, read_only in ( + (args.bootable and "uefi" in args.boot_protocols, + PartitionIdentifier.esp, args.esp_size, GPT_ESP, "ESP System Partition", False), + (args.bootable and "bios" in args.boot_protocols, + PartitionIdentifier.bios, BIOS_PARTITION_SIZE, GPT_BIOS, "BIOS Boot Partition", False), + (args.xbootldr_size is not None, + PartitionIdentifier.xbootldr, args.xbootldr_size, GPT_XBOOTLDR, "Boot Loader Partition", False), + (args.swap_size is not None, + PartitionIdentifier.swap, args.swap_size, GPT_SWAP, "Swap Partition", False), + (no_btrfs and args.home_size is not None, + PartitionIdentifier.home, args.home_size, GPT_HOME, "Home Partition", False), + (no_btrfs and args.srv_size is not None, + PartitionIdentifier.srv, args.srv_size, GPT_SRV, "Server Data Partition", False), + (no_btrfs and args.var_size is not None, + PartitionIdentifier.var, args.var_size, GPT_VAR, "Variable Data Partition", False), + (no_btrfs and args.tmp_size is not None, + PartitionIdentifier.tmp, args.tmp_size, GPT_TMP, "Temporary Data Partition", False), + (not is_generated_root(args), + PartitionIdentifier.root, args.root_size, + gpt_root_native(args.architecture, args.usr_only).root, + root_partition_name(args), + args.read_only)): + + if condition and size is not None: + table.add(label, size, type_uuid, name, read_only=read_only) + + args.partition_table = table def create_image(args: CommandLineArguments, for_cache: bool) -> Optional[BinaryIO]: - if not args.output_format.is_disk(): + initialize_partition_table(args) + if args.partition_table is None: return None - with complete_step( - "Creating image with partition table…", "Created image with partition table as {.name}" - ) as output: + with complete_step("Creating image with partition table…", + "Created image with partition table as {.name}") as output: f: BinaryIO = cast( BinaryIO, @@ -721,15 +622,18 @@ def create_image(args: CommandLineArguments, for_cache: bool) -> Optional[Binary ) output.append(f) disable_cow(f.name) - f.truncate(image_size(args)) + disk_size = args.partition_table.disk_size() + f.truncate(disk_size) - exec_sfdisk(args, f) + if args.partition_table.partitions: + args.partition_table.run_sfdisk(f.name) return f def refresh_partition_table(args: CommandLineArguments, f: BinaryIO) -> None: - if not args.output_format.is_disk(): + initialize_partition_table(args) + if args.partition_table is None: return # Let's refresh all UUIDs and labels to match the new build. This @@ -745,7 +649,8 @@ def refresh_partition_table(args: CommandLineArguments, f: BinaryIO) -> None: # configuration is identical. with complete_step("Refreshing partition table…", "Refreshed partition table."): - exec_sfdisk(args, f) + if args.partition_table.partitions: + args.partition_table.run_sfdisk(f.name) def refresh_file_system(args: CommandLineArguments, dev: Optional[Path], cached: bool) -> None: @@ -835,8 +740,6 @@ def reuse_cache_image( return None, False output.append(f) - _, run_sfdisk = determine_partition_table(args) - args.ran_sfdisk = run_sfdisk return f, True @@ -863,23 +766,17 @@ def attach_image_loopback( run(["losetup", "--detach", loopdev]) -def optional_partition(loopdev: Path, partno: Optional[int]) -> Optional[Path]: - if partno is None: - return None - - return partition(loopdev, partno) - - def prepare_swap(args: CommandLineArguments, loopdev: Optional[Path], cached: bool) -> None: if loopdev is None: return if cached: return - if args.swap_partno is None: + part = args.get_partition(PartitionIdentifier.swap) + if not part: return with complete_step("Formatting swap partition"): - run(["mkswap", "-Lswap", partition(loopdev, args.swap_partno)]) + run(["mkswap", "-Lswap", part.blockdev(loopdev)]) def prepare_esp(args: CommandLineArguments, loopdev: Optional[Path], cached: bool) -> None: @@ -887,11 +784,12 @@ def prepare_esp(args: CommandLineArguments, loopdev: Optional[Path], cached: boo return if cached: return - if args.esp_partno is None: + part = args.get_partition(PartitionIdentifier.esp) + if not part: return with complete_step("Formatting ESP partition"): - run(["mkfs.fat", "-nEFI", "-F32", partition(loopdev, args.esp_partno)]) + run(["mkfs.fat", "-nEFI", "-F32", part.blockdev(loopdev)]) def prepare_xbootldr(args: CommandLineArguments, loopdev: Optional[Path], cached: bool) -> None: @@ -899,11 +797,13 @@ def prepare_xbootldr(args: CommandLineArguments, loopdev: Optional[Path], cached return if cached: return - if args.xbootldr_partno is None: + + part = args.get_partition(PartitionIdentifier.xbootldr) + if not part: return with complete_step("Formatting XBOOTLDR partition"): - run(["mkfs.fat", "-nXBOOTLDR", "-F32", partition(loopdev, args.xbootldr_partno)]) + run(["mkfs.fat", "-nXBOOTLDR", "-F32", part.blockdev(loopdev)]) def mkfs_ext4_cmd(label: str, mount: PathString) -> List[str]: @@ -984,7 +884,8 @@ def luks_format_root( ) -> None: if args.encrypt != "all": return - if args.root_partno is None: + part = args.get_partition(PartitionIdentifier.root) + if not part: return if is_generated_root(args) and not inserting_generated_root: return @@ -994,14 +895,15 @@ def luks_format_root( return assert args.passphrase is not None - with complete_step("Setting up LUKS on root partition…"): - luks_format(partition(loopdev, args.root_partno), args.passphrase) + with complete_step(f"Setting up LUKS on {part.name}…"): + luks_format(part.blockdev(loopdev), args.passphrase) def luks_format_home(args: CommandLineArguments, loopdev: Path, do_run_build_script: bool, cached: bool) -> None: if args.encrypt is None: return - if args.home_partno is None: + part = args.get_partition(PartitionIdentifier.home) + if not part: return if do_run_build_script: return @@ -1009,14 +911,15 @@ def luks_format_home(args: CommandLineArguments, loopdev: Path, do_run_build_scr return assert args.passphrase is not None - with complete_step("Setting up LUKS on home partition…"): - luks_format(partition(loopdev, args.home_partno), args.passphrase) + with complete_step(f"Setting up LUKS on {part.name}…"): + luks_format(part.blockdev(loopdev), args.passphrase) def luks_format_srv(args: CommandLineArguments, loopdev: Path, do_run_build_script: bool, cached: bool) -> None: if args.encrypt is None: return - if args.srv_partno is None: + part = args.get_partition(PartitionIdentifier.srv) + if not part: return if do_run_build_script: return @@ -1024,14 +927,15 @@ def luks_format_srv(args: CommandLineArguments, loopdev: Path, do_run_build_scri return assert args.passphrase is not None - with complete_step("Setting up LUKS on server data partition…"): - luks_format(partition(loopdev, args.srv_partno), args.passphrase) + with complete_step(f"Setting up LUKS on {part.name}…"): + luks_format(part.blockdev(loopdev), args.passphrase) def luks_format_var(args: CommandLineArguments, loopdev: Path, do_run_build_script: bool, cached: bool) -> None: if args.encrypt is None: return - if args.var_partno is None: + part = args.get_partition(PartitionIdentifier.var) + if not part: return if do_run_build_script: return @@ -1039,14 +943,15 @@ def luks_format_var(args: CommandLineArguments, loopdev: Path, do_run_build_scri return assert args.passphrase is not None - with complete_step("Setting up LUKS on variable data partition…"): - luks_format(partition(loopdev, args.var_partno), args.passphrase) + with complete_step(f"Setting up LUKS on {part.name}…"): + luks_format(part.blockdev(loopdev), args.passphrase) def luks_format_tmp(args: CommandLineArguments, loopdev: Path, do_run_build_script: bool, cached: bool) -> None: if args.encrypt is None: return - if args.tmp_partno is None: + part = args.get_partition(PartitionIdentifier.tmp) + if not part: return if do_run_build_script: return @@ -1054,16 +959,16 @@ def luks_format_tmp(args: CommandLineArguments, loopdev: Path, do_run_build_scri return assert args.passphrase is not None - with complete_step("Setting up LUKS on temporary data partition…"): - luks_format(partition(loopdev, args.tmp_partno), args.passphrase) + with complete_step(f"Setting up LUKS on {part.name}…"): + luks_format(part.blockdev(loopdev), args.passphrase) @contextlib.contextmanager -def luks_open(dev: Path, passphrase: Dict[str, str], partition: str) -> Generator[Path, None, None]: +def luks_open(part: Partition, loopdev: Path, passphrase: Dict[str, str]) -> Generator[Path, None, None]: name = str(uuid.uuid4()) - # FIXME: partition is only used in messages, rename it? + dev = part.blockdev(loopdev) - with complete_step(f"Setting up LUKS on {partition}…"): + with complete_step(f"Setting up LUKS on {part.name}…"): if passphrase["type"] == "stdin": passphrase_content = (passphrase["content"] + "\n").encode("utf-8") run(["cryptsetup", "open", "--type", "luks", dev, name], input=passphrase_content) @@ -1076,7 +981,7 @@ def luks_open(dev: Path, passphrase: Dict[str, str], partition: str) -> Generato try: yield path finally: - with complete_step(f"Closing LUKS {partition}"): + with complete_step(f"Closing LUKS on {part.name}"): run(["cryptsetup", "close", path]) @@ -1085,7 +990,8 @@ def luks_setup_root( ) -> ContextManager[Optional[Path]]: if args.encrypt != "all": return contextlib.nullcontext() - if args.root_partno is None: + part = args.get_partition(PartitionIdentifier.root) + if not part: return contextlib.nullcontext() if is_generated_root(args) and not inserting_generated_root: return contextlib.nullcontext() @@ -1093,7 +999,7 @@ def luks_setup_root( return contextlib.nullcontext() assert args.passphrase is not None - return luks_open(partition(loopdev, args.root_partno), args.passphrase, "root partition") + return luks_open(part, loopdev, args.passphrase) def luks_setup_home( @@ -1101,13 +1007,14 @@ def luks_setup_home( ) -> ContextManager[Optional[Path]]: if args.encrypt is None: return contextlib.nullcontext() - if args.home_partno is None: + part = args.get_partition(PartitionIdentifier.home) + if not part: return contextlib.nullcontext() if do_run_build_script: return contextlib.nullcontext() assert args.passphrase is not None - return luks_open(partition(loopdev, args.home_partno), args.passphrase, "home partition") + return luks_open(part, loopdev, args.passphrase) def luks_setup_srv( @@ -1115,13 +1022,14 @@ def luks_setup_srv( ) -> ContextManager[Optional[Path]]: if args.encrypt is None: return contextlib.nullcontext() - if args.srv_partno is None: + part = args.get_partition(PartitionIdentifier.srv) + if not part: return contextlib.nullcontext() if do_run_build_script: return contextlib.nullcontext() assert args.passphrase is not None - return luks_open(partition(loopdev, args.srv_partno), args.passphrase, "server data partition") + return luks_open(part, loopdev, args.passphrase) def luks_setup_var( @@ -1129,13 +1037,14 @@ def luks_setup_var( ) -> ContextManager[Optional[Path]]: if args.encrypt is None: return contextlib.nullcontext() - if args.var_partno is None: + part = args.get_partition(PartitionIdentifier.var) + if not part: return contextlib.nullcontext() if do_run_build_script: return contextlib.nullcontext() assert args.passphrase is not None - return luks_open(partition(loopdev, args.var_partno), args.passphrase, "variable data partition") + return luks_open(part, loopdev, args.passphrase) def luks_setup_tmp( @@ -1143,13 +1052,14 @@ def luks_setup_tmp( ) -> ContextManager[Optional[Path]]: if args.encrypt is None: return contextlib.nullcontext() - if args.tmp_partno is None: + part = args.get_partition(PartitionIdentifier.tmp) + if not part: return contextlib.nullcontext() if do_run_build_script: return contextlib.nullcontext() assert args.passphrase is not None - return luks_open(partition(loopdev, args.tmp_partno), args.passphrase, "temporary data partition") + return luks_open(part, loopdev, args.passphrase) class LuksSetupOutput(NamedTuple): @@ -1180,6 +1090,7 @@ def luks_setup_all( return assert loopdev is not None + assert args.partition_table is not None with luks_setup_root(args, loopdev, do_run_build_script) as root, \ luks_setup_home(args, loopdev, do_run_build_script) as home, \ @@ -1188,12 +1099,11 @@ def luks_setup_all( luks_setup_tmp(args, loopdev, do_run_build_script) as tmp: yield LuksSetupOutput( - optional_partition(loopdev, args.root_partno) if root is None else root, - optional_partition(loopdev, args.home_partno) if home is None else home, - optional_partition(loopdev, args.srv_partno) if srv is None else srv, - optional_partition(loopdev, args.var_partno) if var is None else var, - optional_partition(loopdev, args.tmp_partno) if tmp is None else tmp, - ) + root or args.partition_table.partition_path(PartitionIdentifier.root, loopdev), + home or args.partition_table.partition_path(PartitionIdentifier.home, loopdev), + srv or args.partition_table.partition_path(PartitionIdentifier.srv, loopdev), + var or args.partition_table.partition_path(PartitionIdentifier.var, loopdev), + tmp or args.partition_table.partition_path(PartitionIdentifier.tmp, loopdev)) def prepare_root(args: CommandLineArguments, dev: Optional[Path], cached: bool) -> None: @@ -1319,11 +1229,16 @@ def mount_image( if image.tmp is not None: mount_loop(args, image.tmp, root / "var/tmp") - if args.esp_partno is not None and loopdev is not None: - mount_loop(args, partition(loopdev, args.esp_partno), root / "efi") + if loopdev is not None: + assert args.partition_table is not None + path = args.partition_table.partition_path(PartitionIdentifier.esp, loopdev) + + if path: + mount_loop(args, path, root / "efi") - if args.xbootldr_partno is not None and loopdev is not None: - mount_loop(args, partition(loopdev, args.xbootldr_partno), root / "boot") + path = args.partition_table.partition_path(PartitionIdentifier.xbootldr, loopdev) + if path: + mount_loop(args, path, root / "boot") # Make sure /tmp and /run are not part of the image mount_tmpfs(root / "run") @@ -1431,19 +1346,17 @@ def configure_dracut(args: CommandLineArguments, root: Path) -> None: f'filesystems+=" {(args.output_format.needed_kernel_module())} "\n' ) - # These distros need uefi_stub configured explicitly for dracut to find the systemd-boot uefi stub. - if args.esp_partno is not None and args.distribution in ( - Distribution.ubuntu, - Distribution.debian, - Distribution.mageia, - Distribution.openmandriva, - ): - dracut_dir.joinpath("30-mkosi-uefi-stub.conf").write_text( - "uefi_stub=/usr/lib/systemd/boot/efi/linuxx64.efi.stub\n" - ) + if args.get_partition(PartitionIdentifier.esp): + # These distros need uefi_stub configured explicitly for dracut to find the systemd-boot uefi stub. + if args.distribution in (Distribution.ubuntu, + Distribution.debian, + Distribution.mageia, + Distribution.openmandriva): + dracut_dir.joinpath("30-mkosi-uefi-stub.conf").write_text( + "uefi_stub=/usr/lib/systemd/boot/efi/linuxx64.efi.stub\n" + ) - # efivarfs must be present in order to GPT root discovery work - if args.esp_partno is not None: + # efivarfs must be present in order to GPT root discovery work dracut_dir.joinpath("30-mkosi-efivarfs.conf").write_text( '[[ $(modinfo -k "$kernel" -F filename efivarfs 2>/dev/null) == /* ]] && add_drivers+=" efivarfs "\n' ) @@ -1487,7 +1400,7 @@ def prepare_tree(args: CommandLineArguments, root: Path, do_run_build_script: bo root.joinpath("etc/machine-id").write_text(f"{args.machine_id}\n") if not do_run_build_script and args.bootable: - if args.xbootldr_partno is not None: + if args.get_partition(PartitionIdentifier.xbootldr): # Create directories for kernels and entries if this is enabled root.joinpath("boot/EFI").mkdir(mode=0o700) root.joinpath("boot/EFI/Linux").mkdir(mode=0o700) @@ -1498,13 +1411,13 @@ def prepare_tree(args: CommandLineArguments, root: Path, do_run_build_script: bo # If this is not enabled, let's create an empty directory on /boot root.joinpath("boot").mkdir(mode=0o700) - if args.esp_partno is not None: + if args.get_partition(PartitionIdentifier.esp): root.joinpath("efi/EFI").mkdir(mode=0o700) root.joinpath("efi/EFI/BOOT").mkdir(mode=0o700) root.joinpath("efi/EFI/systemd").mkdir(mode=0o700) root.joinpath("efi/loader").mkdir(mode=0o700) - if args.xbootldr_partno is None: + if not args.get_partition(PartitionIdentifier.xbootldr): # Create directories for kernels and entries, unless the XBOOTLDR partition is turned on root.joinpath("efi/EFI/Linux").mkdir(mode=0o700) root.joinpath("efi/loader/entries").mkdir(mode=0o700) @@ -1571,7 +1484,7 @@ def disable_kernel_install(args: CommandLineArguments, root: Path) -> None: # off any kernel installation beforehand. # # For BIOS mode, we don't have that option, so do not mask the units. - if not args.bootable or args.bios_partno is not None or not args.with_unified_kernel_images: + if not args.bootable or args.get_partition(PartitionIdentifier.bios) or not args.with_unified_kernel_images: return for subdir in ("etc", "etc/kernel", "etc/kernel/install.d"): @@ -1582,7 +1495,7 @@ def disable_kernel_install(args: CommandLineArguments, root: Path) -> None: def reenable_kernel_install(args: CommandLineArguments, root: Path) -> None: - if not args.bootable or args.bios_partno is not None or not args.with_unified_kernel_images: + if not args.bootable or args.get_partition(PartitionIdentifier.bios) or not args.with_unified_kernel_images: return write_resource( @@ -1634,7 +1547,7 @@ def make_rpm_list(args: CommandLineArguments, packages: Set[str], do_run_build_s if args.output_format == OutputFormat.gpt_btrfs: add_packages(args, packages, "btrfs-progs") - if args.bios_partno: + if args.get_partition(PartitionIdentifier.bios): if args.distribution in (Distribution.mageia, Distribution.openmandriva): add_packages(args, packages, "grub2") else: @@ -2384,7 +2297,7 @@ def install_debian_or_ubuntu(args: CommandLineArguments, root: Path, *, do_run_b else: add_packages(args, extra_packages, "linux-image-amd64") - if args.bios_partno: + if args.get_partition(PartitionIdentifier.bios): add_packages(args, extra_packages, "grub-pc") if args.output_format == OutputFormat.gpt_btrfs: @@ -2598,10 +2511,10 @@ def install_arch(args: CommandLineArguments, root: Path, do_run_build_script: bo write_resource(scripts_dir / "mkosi-kernel-remove", "mkosi.resources.arch", "kernel_remove.sh", executable=True) - if args.esp_partno is not None: + if args.get_partition(PartitionIdentifier.esp): write_resource(hooks_dir / "91-mkosi-bootctl-update.hook", "mkosi.resources.arch", "91_bootctl_update.hook") - if args.bios_partno is not None: + if args.get_partition(PartitionIdentifier.bios): write_resource(hooks_dir / "90-mkosi-vmlinuz-add.hook", "mkosi.resources.arch", "90_vmlinuz_add.hook") write_resource(hooks_dir / "60-mkosi-vmlinuz-remove.hook", "mkosi.resources.arch", "60_vmlinuz_remove.hook") @@ -2619,7 +2532,7 @@ def install_arch(args: CommandLineArguments, root: Path, do_run_build_script: bo add_packages(args, packages, "xfsprogs") if args.encrypt: add_packages(args, packages, "cryptsetup", "device-mapper") - if args.bios_partno: + if args.get_partition(PartitionIdentifier.bios): add_packages(args, packages, "grub") add_packages(args, packages, "dracut", "binutils") @@ -2700,7 +2613,7 @@ def install_opensuse(args: CommandLineArguments, root: Path, do_run_build_script add_packages(args, packages, "kernel-default", "dracut", "binutils") configure_dracut(args, root) - if args.bios_partno is not None: + if args.get_partition(PartitionIdentifier.bios): add_packages(args, packages, "grub2") if not do_run_build_script and args.encrypt: @@ -3068,7 +2981,7 @@ def install_boot_loader( return with complete_step("Installing boot loader…"): - if args.esp_partno: + if args.get_partition(PartitionIdentifier.esp): if args.distribution == Distribution.clear: pass elif args.distribution in (Distribution.centos, Distribution.centos_epel) and is_older_than_centos8( @@ -3078,7 +2991,7 @@ def install_boot_loader( else: run_workspace_command(args, root, ["bootctl", "install"]) - if args.bios_partno and args.distribution != Distribution.clear: + if args.get_partition(PartitionIdentifier.bios) and args.distribution != Distribution.clear: grub = ( "grub" if args.distribution in (Distribution.ubuntu, Distribution.debian, Distribution.arch) @@ -3446,79 +3359,41 @@ def insert_partition( args: CommandLineArguments, raw: BinaryIO, loopdev: Path, - partno: int, blob: BinaryIO, + ident: PartitionIdentifier, name: str, type_uuid: uuid.UUID, read_only: bool, - uuid_opt: Optional[uuid.UUID] = None, + part_uuid: Optional[uuid.UUID] = None, ) -> int: - if args.ran_sfdisk: - old_table = PartitionTable.read(loopdev) - else: - # No partition table yet? Then let's fake one... - old_table = PartitionTable.empty(args.gpt_first_lba) - - blob_size = roundup(os.stat(blob.name).st_size, 512) - luks_extra = 16 * 1024 * 1024 if args.encrypt == "all" else 0 - partition_offset = old_table.first_usable_offset() - new_size = roundup(partition_offset + blob_size + luks_extra + old_table.footer_size(), 4096) - - ss = f" ({new_size // old_table.sector_size} sectors)" if 'disk' in ARG_DEBUG else "" - MkosiPrinter.print_step(f"Resizing disk image to {format_bytes(new_size)}{ss}") - - os.truncate(raw.name, new_size) - run(["losetup", "--set-capacity", loopdev]) - - ss = f" ({blob_size // old_table.sector_size} sectors)" if 'disk' in ARG_DEBUG else "" - MkosiPrinter.print_step(f"Inserting partition of {format_bytes(blob_size)}{ss}...") - - if args.gpt_first_lba is not None: - first_lba: Optional[int] = args.gpt_first_lba - elif old_table.partitions: - first_lba = None # no need to specify this if we already have partitions - else: - first_lba = partition_offset // old_table.sector_size - - new = [] - if uuid_opt is not None: - new += [f'uuid={uuid_opt}'] - n_sectors = (blob_size + luks_extra) // 512 - new += [f'size={n_sectors}', - f'type={type_uuid}', - f'attrs={"GUID:60" if read_only else ""}', - f'name="{name}"'] + assert args.partition_table is not None - table = ["label: gpt", - f"grain: {old_table.grain}"] - if first_lba is not None: - table += [f"first-lba: {first_lba}"] - - table += [*old_table.partitions, - ', '.join(new)] - - if 'disk' in ARG_DEBUG: - print_between_lines('\n'.join(table)) - - run(["sfdisk", "--color=never", "--no-reread", "--no-tell-kernel", loopdev], - input='\n'.join(table).encode("utf-8")) - run(["sync"]) - run_with_backoff(["blockdev", "--rereadpt", loopdev], attempts=10) - - MkosiPrinter.print_step("Writing partition...") - - if args.root_partno == partno: - luks_format_root(args, loopdev, False, False, True) - cm = luks_setup_root(args, loopdev, False, True) - else: - cm = contextlib.nullcontext() - - with cm as dev: - path = dev if dev is not None else partition(loopdev, partno) - run(["dd", f"if={blob.name}", f"of={path}", "conv=nocreat,sparse"]) + luks_extra = 16 * 1024 * 1024 if args.encrypt == "all" else 0 + blob_size = os.stat(blob.name).st_size + part = args.partition_table.add(ident, blob_size + luks_extra, type_uuid, name, part_uuid) + + disk_size = args.partition_table.disk_size() + ss = f" ({disk_size // args.partition_table.sector_size} sectors)" if 'disk' in ARG_DEBUG else "" + with complete_step(f"Resizing disk image to {format_bytes(disk_size)}{ss}"): + os.truncate(raw.name, disk_size) + run(["losetup", "--set-capacity", loopdev]) + + part_size = part.n_sectors * args.partition_table.sector_size + ss = f" ({part.n_sectors} sectors)" if 'disk' in ARG_DEBUG else "" + with complete_step(f"Inserting partition of {format_bytes(part_size)}{ss}..."): + args.partition_table.run_sfdisk(loopdev) + + with complete_step("Writing partition..."): + if ident == PartitionIdentifier.root: + luks_format_root(args, loopdev, False, False, True) + cm = luks_setup_root(args, loopdev, False, True) + else: + cm = contextlib.nullcontext() - args.ran_sfdisk = True + with cm as dev: + path = dev if dev is not None else part.blockdev(loopdev) + run(["dd", f"if={blob.name}", f"of={path}", "conv=nocreat,sparse"]) return blob_size @@ -3539,19 +3414,17 @@ def insert_generated_root( assert raw is not None assert loopdev is not None assert image is not None - assert args.root_partno is not None + assert args.partition_table is not None with complete_step("Inserting generated root partition…"): - args.root_size = insert_partition( - args, - raw, - loopdev, - args.root_partno, - image, - root_partition_name(args), - gpt_root_native(args.architecture, args.usr_only).root, - args.read_only, - ) + insert_partition(args, + raw, + loopdev, + image, + PartitionIdentifier.root, + root_partition_name(args), + type_uuid=gpt_root_native(args.architecture, args.usr_only).root, + read_only=args.read_only) def make_verity( @@ -3590,23 +3463,21 @@ def insert_verity( assert loopdev is not None assert raw is not None assert root_hash is not None - assert args.verity_partno is not None + assert args.partition_table is not None # Use the final 128 bit of the root hash as partition UUID of the verity partition u = uuid.UUID(root_hash[-32:]) with complete_step("Inserting verity partition…"): - insert_partition( - args, - raw, - loopdev, - args.verity_partno, - verity, - root_partition_name(args, "Verity"), - gpt_root_native(args.architecture, args.usr_only).verity, - True, - u, - ) + insert_partition(args, + raw, + loopdev, + verity, + PartitionIdentifier.verity, + root_partition_name(args, "Verity"), + gpt_root_native(args.architecture, args.usr_only).verity, + read_only=True, + part_uuid=u) def make_verity_sig( @@ -3697,23 +3568,21 @@ def insert_verity_sig( assert raw is not None assert root_hash is not None assert fingerprint is not None - assert args.verity_sig_partno is not None + assert args.partition_table is not None # Hash the concatenation of verity roothash and the X509 certificate fingerprint to generate a UUID for the signature partition u = uuid.UUID(hashlib.sha256(bytes.fromhex(root_hash) + bytes.fromhex(fingerprint)).hexdigest()[:32]) with complete_step("Inserting verity signature partition…"): - insert_partition( - args, - raw, - loopdev, - args.verity_sig_partno, - verity_sig, - root_partition_name(args, "Signature"), - gpt_root_native(args.architecture, args.usr_only).verity_sig, - True, - u, - ) + insert_partition(args, + raw, + loopdev, + verity_sig, + PartitionIdentifier.verity_sig, + root_partition_name(args, "Signature"), + gpt_root_native(args.architecture, args.usr_only).verity_sig, + read_only=True, + part_uuid=u) def patch_root_uuid( @@ -3730,7 +3599,11 @@ def patch_root_uuid( u = uuid.UUID(root_hash[:32]) with complete_step("Patching root partition UUID…"): - run(["sfdisk", "--part-uuid", loopdev, str(args.root_partno), str(u)], check=True) + part = args.get_partition(PartitionIdentifier.root) + assert part is not None + + run(["sfdisk", "--part-uuid", loopdev, str(part.number), str(u)], + check=True) def extract_partition( @@ -3764,7 +3637,9 @@ def install_unified_kernel( # benefit that they can be signed like normal EFI binaries, and can encode everything necessary to boot a # specific root device, including the root hash. - if not args.bootable or args.esp_partno is None or not args.with_unified_kernel_images: + if not (args.bootable and + args.get_partition(PartitionIdentifier.esp) and + args.with_unified_kernel_images): return # Don't run dracut if this is for the cache. The unified kernel @@ -3794,7 +3669,7 @@ def install_unified_kernel( if not (kver.is_dir() and os.path.isfile(os.path.join(kver, "modules.dep"))): continue - prefix = "/boot" if args.xbootldr_partno is not None else "/efi" + prefix = "/boot" if args.get_partition(PartitionIdentifier.xbootldr) else "/efi" # While the kernel version can generally be found as a directory under /usr/lib/modules, the # kernel image files can be found either in /usr/lib/modules//vmlinuz or in # /boot depending on the distro. By invoking the kernel-install script directly, we can pass diff --git a/mkosi/backend.py b/mkosi/backend.py index c49ef9904..31e880de5 100644 --- a/mkosi/backend.py +++ b/mkosi/backend.py @@ -206,80 +206,66 @@ class ManifestFormat(Parseable, enum.Enum): changelog = "changelog" # human-readable text file with package changelogs -@dataclasses.dataclass -class PartitionTable: - partitions: List[str] - last_partition_sector: Optional[int] - sector_size: int - first_lba: Optional[int] - - grain: int = 4096 - - @classmethod - def read(cls, loopdev: Path) -> PartitionTable: - table = [] - last_sector = 0 - sector_size = 512 - first_lba = None - - c = run(["sfdisk", "--dump", loopdev], - stdout=subprocess.PIPE, - universal_newlines=True) - - if 'disk' in ARG_DEBUG: - print_between_lines(c.stdout) - - in_body = False - for line in c.stdout.splitlines(): - line = line.strip() +class PartitionIdentifier(enum.Enum): + esp = 'esp' + bios = 'bios' + xbootldr = 'xbootldr' + root = 'root' + swap = 'swap' + home = 'home' + srv = 'srv' + var = 'var' + tmp = 'tmp' + verity = 'verity' + verity_sig = 'verity-sig' - if line.startswith('sector-size:'): - sector_size = int(line[12:]) - if line.startswith('first-lba:'): - first_lba = int(line[10:]) - if line == "": # empty line is where the body begins - in_body = True - continue - if not in_body: - continue - - table += [line] +@dataclasses.dataclass +class Partition: + number: int - _, rest = line.split(":", 1) - fields = rest.split(",") + n_sectors: int + type_uuid: uuid.UUID + part_uuid: Optional[uuid.UUID] + read_only: Optional[bool] - start = None - size = None + name: str - for field in fields: - field = field.strip() + def blockdev(self, loopdev: Path) -> Path: + return Path(f"{loopdev}p{self.number}") - if field.startswith("start="): - start = int(field[6:]) - if field.startswith("size="): - size = int(field[5:]) + def sfdisk_spec(self) -> str: + desc = [f'size={self.n_sectors}', + f'type={self.type_uuid}', + f'attrs={"GUID:60" if self.read_only else ""}', + f'name="{self.name}"', + f'uuid={self.part_uuid}' if self.part_uuid is not None else None] + return ', '.join(filter(None, desc)) - if start is not None and size is not None: - end = start + size - last_sector = max(last_sector, end) - return cls(table, last_sector * sector_size, sector_size, first_lba) +@dataclasses.dataclass +class PartitionTable: + partitions: Dict[PartitionIdentifier, Partition] = dataclasses.field(default_factory=dict) + last_partition_sector: Optional[int] = None + sector_size: int = 512 + first_lba: Optional[int] = None - @classmethod - def empty(cls, first_lba: Optional[int] = None) -> PartitionTable: - return cls([], None, 512, first_lba) + grain: int = 4096 - def first_usable_offset(self, max_partitions: int = 128) -> int: - if self.last_partition_sector: - return roundup(self.last_partition_sector, self.grain) - elif self.first_lba is not None: + def first_partition_offset(self, max_partitions: int = 128) -> int: + if self.first_lba is not None: # No rounding here, we honour the specified value exactly. return self.first_lba * self.sector_size else: # The header is like the footer, but we have a one-sector "protective MBR" at offset 0 return roundup(self.sector_size + self.footer_size(), self.grain) + def last_partition_offset(self, max_partitions: int = 128) -> int: + if self.last_partition_sector: + return roundup(self.last_partition_sector * self.sector_size, self.grain) + else: + return self.first_partition_offset(max_partitions) + def footer_size(self, max_partitions: int = 128) -> int: # The footer must have enough space for the GPT header (one sector), # and the GPT parition entry area. PEA size of 16384 (128 partitions) @@ -287,6 +273,58 @@ def footer_size(self, max_partitions: int = 128) -> int: pea_sectors = math.ceil(max_partitions * 128 / self.sector_size) return (1 + pea_sectors) * self.sector_size + def disk_size(self) -> int: + return roundup(self.last_partition_offset() + self.footer_size(), self.grain) + + def add(self, + ident: PartitionIdentifier, + size: int, + type_uuid: uuid.UUID, + name: str, + part_uuid: Optional[uuid.UUID] = None, + read_only: Optional[bool] = False) -> Partition: + + assert '"' not in name + + size = roundup(size, self.grain) + n_sectors = size // self.sector_size + + part = Partition(len(self.partitions) + 1, + n_sectors, type_uuid, part_uuid, read_only, name) + self.partitions[ident] = part + + self.last_partition_sector = self.last_partition_offset() // self.sector_size + n_sectors + + return part + + def partition_path(self, ident: PartitionIdentifier, loopdev: Path) -> Optional[Path]: + part = self.partitions.get(ident) + if part is None: + return None + + return part.blockdev(loopdev) + + def sfdisk_spec(self) -> str: + table = ["label: gpt", + f"grain: {self.grain}", + f"first-lba: {self.first_partition_offset() // self.sector_size}", + *(p.sfdisk_spec() for p in self.partitions.values())] + return '\n'.join(table) + + def run_sfdisk(self, device: PathString) -> None: + spec = self.sfdisk_spec() + device = Path(device) + + if 'disk' in ARG_DEBUG: + print_between_lines(spec) + + run(["sfdisk", "--color=never", "--no-reread", "--no-tell-kernel", device], + input=spec.encode("utf-8")) + + if device.is_block_device(): + run(["sync"]) + run_with_backoff(["blockdev", "--rereadpt", device], attempts=10) + @dataclasses.dataclass class CommandLineArguments: @@ -416,21 +454,17 @@ class CommandLineArguments: cache_pre_dev: Optional[Path] = None output_signature: Optional[Path] = None - root_partno: Optional[int] = None - swap_partno: Optional[int] = None - esp_partno: Optional[int] = None - xbootldr_partno: Optional[int] = None - bios_partno: Optional[int] = None - home_partno: Optional[int] = None - srv_partno: Optional[int] = None - var_partno: Optional[int] = None - tmp_partno: Optional[int] = None - verity_partno: Optional[int] = None - verity_sig_partno: Optional[int] = None + partition_table: Optional[PartitionTable] = None releasever: Optional[str] = None ran_sfdisk: bool = False + def get_partition(self, ident: PartitionIdentifier) -> Optional[Partition]: + "A shortcut to check that we have a partition table and extract the partition object" + if self.partition_table is None: + return None + return self.partition_table.partitions.get(ident) + def should_compress_fs(args: Union[argparse.Namespace, CommandLineArguments]) -> Union[bool, str]: """True for the default compression, a string, or False. @@ -473,11 +507,9 @@ def var_tmp(root: Path) -> Path: return p -def partition(loopdev: Path, partno: int) -> Path: - return Path(f"{loopdev}p{partno}") - - def nspawn_params_for_blockdev_access(args: CommandLineArguments, loopdev: Path) -> List[str]: + assert args.partition_table is not None + params = [ f"--bind-ro={loopdev}", f"--property=DeviceAllow={loopdev}", @@ -485,11 +517,13 @@ def nspawn_params_for_blockdev_access(args: CommandLineArguments, loopdev: Path) "--bind-ro=/dev/disk", ] - for partno in (args.esp_partno, args.bios_partno, args.root_partno, args.xbootldr_partno): - if partno is not None: - p = partition(loopdev, partno) - if p.exists(): - params += [f"--bind-ro={p}", f"--property=DeviceAllow={p}"] + for ident in (PartitionIdentifier.esp, + PartitionIdentifier.bios, + PartitionIdentifier.root, + PartitionIdentifier.xbootldr): + path = args.partition_table.partition_path(ident, loopdev) + if path and path.exists(): + params += [f"--bind-ro={path}", f"--property=DeviceAllow={path}"] params += [f"--setenv={env}" for env in args.environment] @@ -698,7 +732,10 @@ def jj(line: str) -> str: def install_grub(args: CommandLineArguments, root: Path, loopdev: Path, grub: str) -> None: - if args.bios_partno is None: + assert args.partition_table is not None + + part = args.get_partition(PartitionIdentifier.bios) + if not part: return write_grub_config(args, root) From 25c1ef7852d3b905f0ce6ca3203ec35848ceecbc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zbigniew=20J=C4=99drzejewski-Szmek?= Date: Mon, 20 Sep 2021 16:09:09 +0200 Subject: [PATCH 6/9] indentation: keep each condition on a separate line We would have two conditions joined by 'and', and we would have the first operand and half of the second on the first line, and the remainder of the second operand on multiple lines. --- mkosi/__init__.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/mkosi/__init__.py b/mkosi/__init__.py index 13042412b..a346a3ccf 100644 --- a/mkosi/__init__.py +++ b/mkosi/__init__.py @@ -829,9 +829,9 @@ def mkfs_generic(args: CommandLineArguments, label: str, mount: PathString, dev: cmdline = mkfs_ext4_cmd(label, mount) if args.output_format == OutputFormat.gpt_ext4: - if args.distribution in (Distribution.centos, Distribution.centos_epel) and is_older_than_centos8( - args.release - ): + if (args.distribution in (Distribution.centos, Distribution.centos_epel) and + is_older_than_centos8(args.release)): + # e2fsprogs in centos7 is too old and doesn't support this feature cmdline += ["-O", "^metadata_csum"] @@ -2984,9 +2984,8 @@ def install_boot_loader( if args.get_partition(PartitionIdentifier.esp): if args.distribution == Distribution.clear: pass - elif args.distribution in (Distribution.centos, Distribution.centos_epel) and is_older_than_centos8( - args.release - ): + elif (args.distribution in (Distribution.centos, Distribution.centos_epel) and + is_older_than_centos8(args.release)): install_boot_loader_centos_old_efi(args, root, loopdev) else: run_workspace_command(args, root, ["bootctl", "install"]) @@ -3570,7 +3569,8 @@ def insert_verity_sig( assert fingerprint is not None assert args.partition_table is not None - # Hash the concatenation of verity roothash and the X509 certificate fingerprint to generate a UUID for the signature partition + # Hash the concatenation of verity roothash and the X509 certificate + # fingerprint to generate a UUID for the signature partition. u = uuid.UUID(hashlib.sha256(bytes.fromhex(root_hash) + bytes.fromhex(fingerprint)).hexdigest()[:32]) with complete_step("Inserting verity signature partition…"): From 2f0b9bf263a2c06595abcd947697c408ddecc172 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zbigniew=20J=C4=99drzejewski-Szmek?= Date: Tue, 28 Sep 2021 18:05:15 +0200 Subject: [PATCH 7/9] isort: apply isort --- mkosi/__init__.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/mkosi/__init__.py b/mkosi/__init__.py index a346a3ccf..ba798478b 100644 --- a/mkosi/__init__.py +++ b/mkosi/__init__.py @@ -36,7 +36,6 @@ import urllib.parse import urllib.request import uuid - from pathlib import Path from subprocess import DEVNULL, PIPE from textwrap import dedent @@ -3491,8 +3490,7 @@ def make_verity_sig( assert root_hash is not None from cryptography import x509 - from cryptography.hazmat.primitives import hashes - from cryptography.hazmat.primitives import serialization + from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.serialization import pkcs7 with complete_step("Signing verity root hash…"): From 0b750a39dad588b22029b65ba91db0e8cd8956a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zbigniew=20J=C4=99drzejewski-Szmek?= Date: Mon, 4 Oct 2021 14:53:47 +0200 Subject: [PATCH 8/9] Rename Partition.name to Partition.description As suggested in https://github.com/systemd/mkosi/pull/798#discussion_r714875622. --- mkosi/__init__.py | 30 +++++++++++++++--------------- mkosi/backend.py | 10 +++++----- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/mkosi/__init__.py b/mkosi/__init__.py index ba798478b..d0e8c7f8b 100644 --- a/mkosi/__init__.py +++ b/mkosi/__init__.py @@ -529,7 +529,7 @@ def disable_cow(path: PathString) -> None: run(["chattr", "+C", path], stdout=DEVNULL, stderr=DEVNULL, check=False) -def root_partition_name( +def root_partition_description( args: Optional[CommandLineArguments], suffix: Optional[str] = None, image_id: Optional[str] = None, @@ -598,7 +598,7 @@ def initialize_partition_table(args: CommandLineArguments) -> None: (not is_generated_root(args), PartitionIdentifier.root, args.root_size, gpt_root_native(args.architecture, args.usr_only).root, - root_partition_name(args), + root_partition_description(args), args.read_only)): if condition and size is not None: @@ -894,7 +894,7 @@ def luks_format_root( return assert args.passphrase is not None - with complete_step(f"Setting up LUKS on {part.name}…"): + with complete_step(f"Setting up LUKS on {part.description}…"): luks_format(part.blockdev(loopdev), args.passphrase) @@ -910,7 +910,7 @@ def luks_format_home(args: CommandLineArguments, loopdev: Path, do_run_build_scr return assert args.passphrase is not None - with complete_step(f"Setting up LUKS on {part.name}…"): + with complete_step(f"Setting up LUKS on {part.description}…"): luks_format(part.blockdev(loopdev), args.passphrase) @@ -926,7 +926,7 @@ def luks_format_srv(args: CommandLineArguments, loopdev: Path, do_run_build_scri return assert args.passphrase is not None - with complete_step(f"Setting up LUKS on {part.name}…"): + with complete_step(f"Setting up LUKS on {part.description}…"): luks_format(part.blockdev(loopdev), args.passphrase) @@ -942,7 +942,7 @@ def luks_format_var(args: CommandLineArguments, loopdev: Path, do_run_build_scri return assert args.passphrase is not None - with complete_step(f"Setting up LUKS on {part.name}…"): + with complete_step(f"Setting up LUKS on {part.description}…"): luks_format(part.blockdev(loopdev), args.passphrase) @@ -958,7 +958,7 @@ def luks_format_tmp(args: CommandLineArguments, loopdev: Path, do_run_build_scri return assert args.passphrase is not None - with complete_step(f"Setting up LUKS on {part.name}…"): + with complete_step(f"Setting up LUKS on {part.description}…"): luks_format(part.blockdev(loopdev), args.passphrase) @@ -967,7 +967,7 @@ def luks_open(part: Partition, loopdev: Path, passphrase: Dict[str, str]) -> Gen name = str(uuid.uuid4()) dev = part.blockdev(loopdev) - with complete_step(f"Setting up LUKS on {part.name}…"): + with complete_step(f"Setting up LUKS on {part.description}…"): if passphrase["type"] == "stdin": passphrase_content = (passphrase["content"] + "\n").encode("utf-8") run(["cryptsetup", "open", "--type", "luks", dev, name], input=passphrase_content) @@ -980,7 +980,7 @@ def luks_open(part: Partition, loopdev: Path, passphrase: Dict[str, str]) -> Gen try: yield path finally: - with complete_step(f"Closing LUKS on {part.name}"): + with complete_step(f"Closing LUKS on {part.description}"): run(["cryptsetup", "close", path]) @@ -3359,7 +3359,7 @@ def insert_partition( loopdev: Path, blob: BinaryIO, ident: PartitionIdentifier, - name: str, + description: str, type_uuid: uuid.UUID, read_only: bool, part_uuid: Optional[uuid.UUID] = None, @@ -3369,7 +3369,7 @@ def insert_partition( luks_extra = 16 * 1024 * 1024 if args.encrypt == "all" else 0 blob_size = os.stat(blob.name).st_size - part = args.partition_table.add(ident, blob_size + luks_extra, type_uuid, name, part_uuid) + part = args.partition_table.add(ident, blob_size + luks_extra, type_uuid, description, part_uuid) disk_size = args.partition_table.disk_size() ss = f" ({disk_size // args.partition_table.sector_size} sectors)" if 'disk' in ARG_DEBUG else "" @@ -3420,7 +3420,7 @@ def insert_generated_root( loopdev, image, PartitionIdentifier.root, - root_partition_name(args), + root_partition_description(args), type_uuid=gpt_root_native(args.architecture, args.usr_only).root, read_only=args.read_only) @@ -3472,7 +3472,7 @@ def insert_verity( loopdev, verity, PartitionIdentifier.verity, - root_partition_name(args, "Verity"), + root_partition_description(args, "Verity"), gpt_root_native(args.architecture, args.usr_only).verity, read_only=True, part_uuid=u) @@ -3577,7 +3577,7 @@ def insert_verity_sig( loopdev, verity_sig, PartitionIdentifier.verity_sig, - root_partition_name(args, "Signature"), + root_partition_description(args, "Signature"), gpt_root_native(args.architecture, args.usr_only).verity_sig, read_only=True, part_uuid=u) @@ -5907,7 +5907,7 @@ def load_args(args: argparse.Namespace) -> CommandLineArguments: args.kernel_command_line.append( "mount.usr=/dev/disk/by-partlabel/" + xescape( - root_partition_name( + root_partition_description( args=None, image_id=args.image_id, image_version=args.image_version, usr_only=args.usr_only ) ) diff --git a/mkosi/backend.py b/mkosi/backend.py index 31e880de5..ae02b57de 100644 --- a/mkosi/backend.py +++ b/mkosi/backend.py @@ -229,7 +229,7 @@ class Partition: part_uuid: Optional[uuid.UUID] read_only: Optional[bool] - name: str + description: str def blockdev(self, loopdev: Path) -> Path: return Path(f"{loopdev}p{self.number}") @@ -238,7 +238,7 @@ def sfdisk_spec(self) -> str: desc = [f'size={self.n_sectors}', f'type={self.type_uuid}', f'attrs={"GUID:60" if self.read_only else ""}', - f'name="{self.name}"', + f'name="{self.description}"', f'uuid={self.part_uuid}' if self.part_uuid is not None else None] return ', '.join(filter(None, desc)) @@ -280,17 +280,17 @@ def add(self, ident: PartitionIdentifier, size: int, type_uuid: uuid.UUID, - name: str, + description: str, part_uuid: Optional[uuid.UUID] = None, read_only: Optional[bool] = False) -> Partition: - assert '"' not in name + assert '"' not in description size = roundup(size, self.grain) n_sectors = size // self.sector_size part = Partition(len(self.partitions) + 1, - n_sectors, type_uuid, part_uuid, read_only, name) + n_sectors, type_uuid, part_uuid, read_only, description) self.partitions[ident] = part self.last_partition_sector = self.last_partition_offset() // self.sector_size + n_sectors From e4996c93a1bbaa93c4da84359f4f401dd73465fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zbigniew=20J=C4=99drzejewski-Szmek?= Date: Tue, 5 Oct 2021 20:12:59 +0200 Subject: [PATCH 9/9] ci: add types-cryptography to CI environment --- .github/workflows/ci-unit-test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci-unit-test.yml b/.github/workflows/ci-unit-test.yml index 23739e6c4..0a1fe67d7 100644 --- a/.github/workflows/ci-unit-test.yml +++ b/.github/workflows/ci-unit-test.yml @@ -16,7 +16,7 @@ jobs: - name: Install run: | - python3 -m pip install pytest mypy isort pyflakes + python3 -m pip install pytest mypy types-cryptography isort pyflakes npm install -g pyright - name: Check that imports are sorted