diff --git a/server/parsec/cli/__init__.py b/server/parsec/cli/__init__.py index 4669a9f606a..d4b4bdf2c3c 100644 --- a/server/parsec/cli/__init__.py +++ b/server/parsec/cli/__init__.py @@ -7,37 +7,34 @@ import click +from parsec.cli.export import export_realm +from parsec.cli.inspect import human_accesses from parsec.cli.migration import migrate from parsec.cli.options import version_option from parsec.cli.run import run_cmd -from parsec.cli.sequester import ( - create_service, - export_realm, - extract_realm_export, - generate_service_certificate, - human_accesses, - import_service_certificate, - list_services, - update_service, -) +from parsec.cli.sequester_create import create_service, generate_service_certificate +from parsec.cli.sequester_list import list_services +from parsec.cli.sequester_revoke import generate_service_revocation_certificate, revoke_service from parsec.cli.testbed import TESTBED_AVAILABLE, testbed_cmd __all__ = ("cli",) -@click.group(short_help="Handle sequestered organization") +@click.group( + short_help="Handle sequestered organization", +) @version_option def server_sequester_cmd() -> None: pass -server_sequester_cmd.add_command(create_service, "create_service") server_sequester_cmd.add_command(list_services, "list_services") -server_sequester_cmd.add_command(update_service, "update_service") -server_sequester_cmd.add_command(export_realm, "export_realm") -server_sequester_cmd.add_command(extract_realm_export, "extract_realm_export") server_sequester_cmd.add_command(generate_service_certificate, "generate_service_certificate") -server_sequester_cmd.add_command(import_service_certificate, "import_service_certificate") +server_sequester_cmd.add_command(create_service, "create_service") +server_sequester_cmd.add_command( + generate_service_revocation_certificate, "generate_service_revocation_certificate" +) +server_sequester_cmd.add_command(revoke_service, "revoke_service") @click.group() @@ -48,6 +45,7 @@ def cli() -> None: cli.add_command(run_cmd, "run") cli.add_command(migrate, "migrate") +cli.add_command(export_realm, "export_realm") cli.add_command(human_accesses, "human_accesses") cli.add_command(server_sequester_cmd, "sequester") if TESTBED_AVAILABLE: diff --git a/server/parsec/cli/export.py b/server/parsec/cli/export.py new file mode 100644 index 00000000000..389cf9e949a --- /dev/null +++ b/server/parsec/cli/export.py @@ -0,0 +1,192 @@ +# Parsec Cloud (https://parsec.cloud) Copyright (c) BUSL-1.1 2016-present Scille SAS +from __future__ import annotations + +import asyncio +from pathlib import Path +from typing import Any + +import click + +from parsec._parsec import ( + DateTime, + OrganizationID, + VlobID, +) +from parsec.cli.options import ( + blockstore_server_options, + db_server_options, + debug_config_options, + logging_config_options, +) +from parsec.cli.testbed import if_testbed_available +from parsec.cli.utils import cli_exception_handler, start_backend +from parsec.config import ( + BaseBlockStoreConfig, + BaseDatabaseConfig, + LogLevel, +) +from parsec.realm_export import ExportProgressStep, get_earliest_allowed_snapshot_timestamp +from parsec.realm_export import export_realm as do_export_realm + + +class DevOption(click.Option): + def handle_parse_result( + self, ctx: click.Context, opts: Any, args: list[str] + ) -> tuple[Any, list[str]]: + value, args = super().handle_parse_result(ctx, opts, args) + if value: + for key, value in ( + ("debug", True), + ("db", "MOCKED"), + ("blockstore", ("MOCKED",)), + ("with_testbed", "workspace_history"), + ("organization", "WorkspaceHistoryOrgTemplate"), + ("realm", "f0000000000000000000000000000008"), + ): + if key not in opts: + opts[key] = value + + return value, args + + +@click.command( + short_help="Export the content of a realm in order to consult it with a sequester service key" +) +@click.argument("output", type=Path, required=False) +@click.option("--organization", type=OrganizationID, required=True) +@click.option("--realm", type=VlobID.from_hex, required=True) +@click.option("--snapshot-timestamp", type=DateTime.from_rfc3339) +@db_server_options +@blockstore_server_options +# Add --log-level/--log-format/--log-file +@logging_config_options(default_log_level="INFO") +# Add --debug & --version +@debug_config_options +@if_testbed_available( + click.option("--with-testbed", help="Start by populating with a testbed template") +) +@if_testbed_available( + click.option( + "--dev", + cls=DevOption, + is_flag=True, + is_eager=True, + help=( + "Equivalent to `--debug --db=MOCKED --blockstore=MOCKED" + " --with-testbed=workspace_history --organization WorkspaceHistoryOrgTemplate --realm f0000000000000000000000000000008`" + ), + ) +) +def export_realm( + organization: OrganizationID, + realm: VlobID, + snapshot_timestamp: DateTime | None, + output: Path, + db: BaseDatabaseConfig, + db_min_connections: int, + db_max_connections: int, + blockstore: BaseBlockStoreConfig, + log_level: LogLevel, + log_format: str, + log_file: str | None, + debug: bool, + with_testbed: str | None = None, + dev: bool = False, +) -> None: + with cli_exception_handler(debug): + asyncio.run( + _export_realm( + debug=debug, + db_config=db, + blockstore_config=blockstore, + organization_id=organization, + realm_id=realm, + snapshot_timestamp=snapshot_timestamp, + output=output, + with_testbed=with_testbed, + ) + ) + + +async def _export_realm( + db_config: BaseDatabaseConfig, + blockstore_config: BaseBlockStoreConfig, + debug: bool, + with_testbed: str | None, + organization_id: OrganizationID, + realm_id: VlobID, + snapshot_timestamp: DateTime | None, + output: Path | None, +): + snapshot_timestamp = snapshot_timestamp or get_earliest_allowed_snapshot_timestamp() + output = output or Path.cwd() + + if output.is_dir(): + # Output is pointing to a directory, use a default name for the database extract + output_db_path = ( + output + / f"parsec-export-{organization_id.str}-realm-{realm_id.hex}-{snapshot_timestamp.to_rfc3339()}.sqlite" + ) + + else: + output_db_path = output + + output_db_display = click.style(str(output_db_path), fg="green") + if output_db_path.exists(): + click.echo( + f"File {output_db_display} already exists, continue the extract from where it was left" + ) + else: + click.echo(f"Creating {output_db_display}") + + click.echo( + f"Use { click.style('^C', fg='yellow') } to stop the export," + " progress won't be lost when restarting the command" + ) + + async with start_backend( + db_config=db_config, + blockstore_config=blockstore_config, + debug=debug, + populate_with_template=with_testbed, + ) as backend: + with click.progressbar( + length=0, label="Starting", show_pos=True, update_min_steps=0 + ) as bar: + + def _on_progress(step: ExportProgressStep): + match step: + case "certificates_start": + bar.finished = False + bar.label = "1/4 Exporting certificates" + bar.length = 1 + bar.update(0) + case "certificates_done": + bar.update(1) + case ("vlobs", exported, total): + bar.finished = False + bar.label = "2/4 Exporting vlobs" + bar.length = total + bar.pos = exported + bar.update(0) + case ("blocks_metadata", exported, total): + bar.finished = False + bar.label = "3/4 Exporting blocks metadata" + bar.length = total + bar.pos = exported + bar.update(0) + case ("blocks_data", exported, total): + bar.finished = False + bar.label = "4/4 Exporting blocks data" + bar.length = total + bar.pos = exported + bar.update(0) + + await do_export_realm( + backend, + organization_id, + realm_id, + snapshot_timestamp, + output_db_path, + _on_progress, + ) diff --git a/server/parsec/cli/inspect.py b/server/parsec/cli/inspect.py new file mode 100644 index 00000000000..f8a177ce8de --- /dev/null +++ b/server/parsec/cli/inspect.py @@ -0,0 +1,212 @@ +# Parsec Cloud (https://parsec.cloud) Copyright (c) BUSL-1.1 2016-present Scille SAS +from __future__ import annotations + +import asyncio +from typing import Any + +import click + +from parsec._parsec import ( + HumanHandle, + OrganizationID, + UserID, + VlobID, +) +from parsec.cli.options import db_server_options, debug_config_options, logging_config_options +from parsec.cli.testbed import if_testbed_available +from parsec.cli.utils import cli_exception_handler, start_backend +from parsec.components.realm import RealmGrantedRole +from parsec.components.user import UserDump +from parsec.config import BaseDatabaseConfig, LogLevel, MockedBlockStoreConfig + + +class DevOption(click.Option): + def handle_parse_result( + self, ctx: click.Context, opts: Any, args: list[str] + ) -> tuple[Any, list[str]]: + value, args = super().handle_parse_result(ctx, opts, args) + if value: + for key, value in ( + ("debug", True), + ("db", "MOCKED"), + ("with_testbed", "coolorg"), + ("organization", "CoolorgOrgTemplate"), + ): + if key not in opts: + opts[key] = value + + return value, args + + +@click.command(short_help="Get information about user&realm accesses") +@click.option("--organization", type=OrganizationID, required=True) +@click.option("--filter", type=str, default="", help="Filter by human handle or user ID") +@db_server_options +# Add --log-level/--log-format/--log-file +@logging_config_options(default_log_level="INFO") +# Add --debug & --version +@debug_config_options +@if_testbed_available( + click.option("--with-testbed", help="Start by populating with a testbed template") +) +@if_testbed_available( + click.option( + "--dev", + cls=DevOption, + is_flag=True, + is_eager=True, + help=( + "Equivalent to `--debug --db=MOCKED --with-testbed=coolorg --organization CoolorgOrgTemplate`" + ), + ) +) +def human_accesses( + filter: str, + organization: OrganizationID, + db: BaseDatabaseConfig, + db_max_connections: int, + db_min_connections: int, + log_level: LogLevel, + log_format: str, + log_file: str | None, + debug: bool, + with_testbed: str | None = None, + dev: bool = False, +) -> None: + with cli_exception_handler(debug): + asyncio.run( + _human_accesses( + debug=debug, + db_config=db, + organization_id=organization, + with_testbed=with_testbed, + user_filter=filter, + ) + ) + + +async def _human_accesses( + db_config: BaseDatabaseConfig, + debug: bool, + with_testbed: str | None, + organization_id: OrganizationID, + user_filter: str, +) -> None: + # Can use a dummy blockstore config since we are not going to query it + blockstore_config = MockedBlockStoreConfig() + + async with start_backend( + db_config=db_config, + blockstore_config=blockstore_config, + debug=debug, + populate_with_template=with_testbed, + ) as backend: + dump = await backend.user.test_dump_current_users(organization_id=organization_id) + users = list(dump.values()) + if user_filter: + # Now is a good time to filter out + filter_split = user_filter.split() + filtered_users = [] + for user in users: + # Note user ID is present twice to handle both compact and dash separated formats + # (i.e. `a11cec00-1000-0000-0000-000000000000` vs `a11cec00100000000000000000000000`). + txt = f"{user.human_handle.str if user.human_handle else ''} {user.user_id.hex} {user.user_id}".lower() + if len([True for sq in filter_split if sq in txt]) == len(filter_split): + filtered_users.append(user) + users = filtered_users + + realms_granted_roles = await backend.realm.dump_realms_granted_roles( + organization_id=organization_id + ) + assert isinstance(realms_granted_roles, list) + per_user_granted_roles: dict[UserID, list[RealmGrantedRole]] = {} + for granted_role in realms_granted_roles: + user_granted_roles = per_user_granted_roles.setdefault(granted_role.user_id, []) + user_granted_roles.append(granted_role) + + humans: dict[HumanHandle, list[tuple[UserDump, dict[VlobID, list[RealmGrantedRole]]]]] = {} + for user in users: + human_users = humans.setdefault(user.human_handle, []) + per_user_per_realm_granted_role: dict[VlobID, list[RealmGrantedRole]] = {} + for granted_role in per_user_granted_roles.get(user.user_id, []): + realm_granted_roles = per_user_per_realm_granted_role.setdefault( + granted_role.realm_id, [] + ) + realm_granted_roles.append(granted_role) + + for realm_granted_roles in per_user_per_realm_granted_role.values(): + realm_granted_roles.sort(key=lambda x: x.granted_on) + + human_users.append((user, per_user_per_realm_granted_role)) + + # Typical output to display: + # + # Found 2 results: + # Human John Doe + # + # User 02e0486752d34d6ab3bf8e0befef1935 (REVOKED) + # 2000-01-01T00:00:00Z: Created with profile STANDARD + # 2000-01-02T00:00:00Z: Updated to profile CONTRIBUTOR + # 2000-12-31T00:00:00Z: Revoked + # + # User 9e082a43b51e44ab9858628bac4a61d9 (ADMIN) + # 2001-01-01T00:00:00Z: Created with profile ADMIN + # + # Realm 8006a491f0704040ae9a197ca7501f71 + # 2001-02-01T00:00:00Z: Access OWNER granted + # 2001-02-02T00:00:00Z: Access removed + # 2001-02-03T00:00:00Z: Access READER granted + # + # Realm 109c48b7c931435c913945f08d23432d + # 2001-02-01T00:00:00Z: Access OWNER granted + # + # Human Jane Doe + # + # User baf59386baf740bba93151cdde1beac8 (OUTSIDER) + # 2000-01-01T00:00:00Z: Created with profile OUTSIDER + # + # Realm 8006a491f0704040ae9a197ca7501f71 + # 2001-02-01T00:00:00Z: Access READER granted + + def _display_user( + user: UserDump, + per_realm_granted_role: dict[VlobID, list[RealmGrantedRole]], + indent: int, + ) -> None: + base_indent = "\t" * indent + display_user = click.style(user.user_id, fg="green") + if not user.revoked_on: + user_info = f"{user.current_profile}" + else: + user_info = "REVOKED" + print(base_indent + f"User {display_user} ({user_info})") + print(base_indent + f"\t{user.created_on}: Created with profile {user.initial_profile}") + + for profile_update in user.profile_updates: + print( + base_indent + f"\t{profile_update[0]}: Updated to profile {profile_update[1]}" + ) + + if user.revoked_on: + print(base_indent + f"\t{user.revoked_on}: Revoked") + + print() + + for realm_id, granted_roles in per_realm_granted_role.items(): + display_realm = click.style(realm_id.hex, fg="yellow") + print(base_indent + f"\tRealm {display_realm}") + for granted_role in granted_roles: + if granted_role.role is None: + display_role = "Access removed" + else: + display_role = f"Access {granted_role.role.str} granted" + print(base_indent + f"\t\t{granted_role.granted_on}: {display_role}") + + print(f"Found {len(humans)} result(s)") + + for human_handle, human_users in humans.items(): + display_human = click.style(human_handle, fg="green") + print(f"Human {display_human}") + for user, per_realm_granted_roles in human_users: + _display_user(user, per_realm_granted_roles, indent=1) + print() diff --git a/server/parsec/cli/sequester.py b/server/parsec/cli/sequester.py deleted file mode 100644 index 69a3b8e5ad0..00000000000 --- a/server/parsec/cli/sequester.py +++ /dev/null @@ -1,828 +0,0 @@ -# Parsec Cloud (https://parsec.cloud) Copyright (c) BUSL-1.1 2016-present Scille SAS -from __future__ import annotations - -import asyncio -import textwrap -from base64 import b64decode, b64encode -from contextlib import asynccontextmanager -from dataclasses import dataclass -from datetime import datetime -from pathlib import Path -from typing import AsyncContextManager, AsyncIterator, cast - -import click - -from parsec._parsec import ( - CryptoError, - DateTime, - HumanHandle, - OrganizationID, - SequesterPrivateKeyDer, - SequesterPublicKeyDer, - SequesterServiceCertificate, - SequesterServiceID, - SequesterSigningKeyDer, - SequesterVerifyKeyDer, - UserID, - VlobID, -) -from parsec.cli.options import blockstore_server_options, db_server_options, debug_config_options -from parsec.cli.utils import cli_exception_handler, operation -from parsec.components.blockstore import blockstore_factory -from parsec.components.organization import Organization -from parsec.components.postgresql import AsyncpgConnection, AsyncpgPool -from parsec.components.postgresql.events import event_bus_factory -from parsec.components.postgresql.handler import asyncpg_pool_factory -from parsec.components.postgresql.organization import PGOrganizationComponent -from parsec.components.postgresql.realm import PGRealmComponent -from parsec.components.postgresql.sequester import PGSequesterComponent -from parsec.components.postgresql.sequester_export import RealmExporter -from parsec.components.postgresql.user import PGUserComponent -from parsec.components.realm import RealmGrantedRole -from parsec.components.sequester import ( - BaseSequesterService, - SequesterServiceType, - StorageSequesterService, - WebhookSequesterService, -) -from parsec.components.user import UserDump -from parsec.config import BaseBlockStoreConfig -from parsec.sequester_export_reader import RealmExportProgress, extract_workspace - -SEQUESTER_SERVICE_CERTIFICATE_PEM_HEADER = "-----BEGIN PARSEC SEQUESTER SERVICE CERTIFICATE-----" -SEQUESTER_SERVICE_CERTIFICATE_PEM_FOOTER = "-----END PARSEC SEQUESTER SERVICE CERTIFICATE-----" - - -def dump_sequester_service_certificate_pem( - certificate_data: SequesterServiceCertificate, - authority_signing_key: SequesterSigningKeyDer, -) -> str: - certificate = authority_signing_key.sign(certificate_data.dump()) - return "\n".join( - ( - SEQUESTER_SERVICE_CERTIFICATE_PEM_HEADER, - *textwrap.wrap(b64encode(certificate).decode(), width=64), - SEQUESTER_SERVICE_CERTIFICATE_PEM_FOOTER, - "", - ) - ) - - -def load_sequester_service_certificate_pem( - data: str, authority_verify_key: SequesterVerifyKeyDer -) -> tuple[SequesterServiceCertificate, bytes]: - err_msg = "Not a valid Parsec sequester service certificate PEM file" - try: - header, *content, footer = data.strip().splitlines() - except ValueError as exc: - raise ValueError(err_msg) from exc - - if header != SEQUESTER_SERVICE_CERTIFICATE_PEM_HEADER: - raise ValueError(f"{err_msg}: missing `{SEQUESTER_SERVICE_CERTIFICATE_PEM_HEADER}` header") - if footer != SEQUESTER_SERVICE_CERTIFICATE_PEM_FOOTER: - raise ValueError(f"{err_msg}: missing `{SEQUESTER_SERVICE_CERTIFICATE_PEM_FOOTER}` footer") - - try: - certificate = b64decode("".join(content)) - return ( - SequesterServiceCertificate.load(authority_verify_key.verify(certificate)), - certificate, - ) - except (ValueError, CryptoError) as exc: - raise ValueError(f"{err_msg}: invalid body ({exc})") from exc - - -class SequesterBackendCliError(Exception): - pass - - -SERVICE_TYPE_CHOICES: dict[str, SequesterServiceType] = { - service.value: service for service in SequesterServiceType -} - - -@dataclass(slots=True) -class BackendDbConfig: - db_url: str - db_min_connections: int - db_max_connections: int - - def pool(self) -> AsyncContextManager[AsyncpgPool]: - return asyncpg_pool_factory(self.db_url, self.db_min_connections, self.db_max_connections) - - -@asynccontextmanager -async def run_sequester_component(config: BackendDbConfig) -> AsyncIterator[PGSequesterComponent]: - raise NotImplementedError - yield - - -async def _create_service( - config: BackendDbConfig, - organization_id: OrganizationID, - register_service_req: BaseSequesterService, -) -> None: - async with run_sequester_component(config) as sequester_component: - await sequester_component.create_service(organization_id, register_service_req) - - -def _display_service(service: BaseSequesterService) -> None: - display_service_id = click.style(service.service_id.hex, fg="yellow") - display_service_label = click.style(service.service_label, fg="green") - click.echo(f"Service {display_service_label} (id: {display_service_id})") - click.echo(f"\tCreated on: {service.created_on}") - click.echo(f"\tService type: {service.service_type}") - if isinstance(service, WebhookSequesterService): - click.echo(f"\tWebhook endpoint URL {service.webhook_url}") - if service.is_revoked: - display_disable = click.style("Disabled", fg="red") - click.echo(f"\t{display_disable} on: {service.revoked_on}") - - -async def _list_services(config: BackendDbConfig, organization_id: OrganizationID) -> None: - async with run_sequester_component(config) as sequester_component: - services = await sequester_component.get_organization_services(organization_id) - - display_services_count = click.style(len(services), fg="green") - click.echo(f"Found {display_services_count} sequester service(s)") - - # Display active services first - for service in services: - if not service.is_revoked: - _display_service(service) - for service in services: - if service.is_revoked: - _display_service(service) - - -async def _disable_service( - config: BackendDbConfig, organization_id: OrganizationID, service_id: SequesterServiceID -) -> None: - async with run_sequester_component(config) as sequester_component: - await sequester_component.disable_service(organization_id, service_id) - - -async def _enable_service( - config: BackendDbConfig, organization_id: OrganizationID, service_id: SequesterServiceID -) -> None: - async with run_sequester_component(config) as sequester_component: - await sequester_component.enable_service(organization_id, service_id) - - -def _get_config(db: str, db_min_connections: int, db_max_connections: int) -> BackendDbConfig: - if db.upper() == "MOCKED": - raise SequesterBackendCliError("MOCKED DB can not be used with sequester services") - - return BackendDbConfig( - db_url=db, db_min_connections=db_min_connections, db_max_connections=db_max_connections - ) - - -@click.command(short_help="Generate a certificate for a new sequester service") -@click.option("--service-label", type=str, help="New service name", required=True) -@click.option( - "--service-public-key", - help="The service encryption public key used to encrypt data to the sequester service", - type=click.Path(exists=True, file_okay=True, dir_okay=False, path_type=Path), - required=True, -) -@click.option( - "--authority-private-key", - help="The private authority key use. Used to sign the encryption key.", - type=click.Path(exists=True, file_okay=True, dir_okay=False, path_type=Path), - required=True, -) -@click.option( - "--output", - "-o", - help="File to write the sequester service certificate into.", - type=click.File("w", encoding="utf8"), - required=True, - metavar="CERTIFICATE.pem", -) -# Add --debug -@debug_config_options -def generate_service_certificate( - service_label: str, - service_public_key: Path, - authority_private_key: Path, - output: click.utils.LazyFile, - debug: bool, -) -> None: - with cli_exception_handler(debug): - # Load key files - service_key = SequesterPublicKeyDer.load_pem(service_public_key.read_text()) - authority_key = SequesterSigningKeyDer.load_pem(authority_private_key.read_text()) - - # Generate data schema - service_id = SequesterServiceID.new() - now = DateTime.now() - certificate_data = SequesterServiceCertificate( - timestamp=now, - service_id=service_id, - service_label=service_label, - encryption_key_der=service_key, - ) - - # Write the output file - pem_content = dump_sequester_service_certificate_pem( - certificate_data=certificate_data, - authority_signing_key=authority_key, - ) - output.write(pem_content) - - display_service = f"{click.style(service_label, fg='yellow')} (id: {click.style(service_id.hex, fg='yellow')})" - display_file = click.style(output.name, fg="green") - click.echo(f"Sequester service certificate {display_service} exported in {display_file}") - click.echo( - f"Use {click.style('import_service_certificate', fg='yellow')} command to add it to an organization" - ) - - -@click.command(short_help="Register a new sequester service from it existing certificate") -@click.option( - "--service-certificate", - help="File containing the sequester service certificate (previously generated by `generate_service_certificate` command).", - type=click.File("r", encoding="utf8"), - required=True, - metavar="CERTIFICATE.pem", -) -@click.option( - "--organization", - type=OrganizationID, - help="Organization ID where to register the service", - required=True, -) -@click.option( - "--service-type", - type=click.Choice(list(SERVICE_TYPE_CHOICES.keys()), case_sensitive=False), - default=SequesterServiceType.STORAGE.value, - help="Service type", -) -@click.option( - "--webhook-url", - type=str, - default=None, - help="[Service Type webhook only] webhook url used to send encrypted service data", -) -@db_server_options -# Add --debug -@debug_config_options -def import_service_certificate( - service_certificate: click.utils.LazyFile, - organization: OrganizationID, - db: str, - db_max_connections: int, - db_min_connections: int, - service_type: str, - webhook_url: str | None, - debug: bool, -) -> None: - with cli_exception_handler(debug): - cooked_service_type: SequesterServiceType = SERVICE_TYPE_CHOICES[service_type] - # Check service type - if webhook_url is not None and cooked_service_type != SequesterServiceType.WEBHOOK: - raise SequesterBackendCliError( - f"Incompatible service type {cooked_service_type} with webhook_url option\nwebhook_url can only be used with {SequesterServiceType.WEBHOOK}." - ) - if cooked_service_type == SequesterServiceType.WEBHOOK and not webhook_url: - raise SequesterBackendCliError( - "Webhook sequester service requires webhook_url argument" - ) - - db_config = _get_config(db, db_min_connections, db_max_connections) - service_certificate_pem = service_certificate.read() - - asyncio.run( - _import_service_certificate( - db_config, - organization, - service_certificate_pem, - cooked_service_type, - webhook_url, - ) - ) - click.echo(click.style("Service created", fg="green")) - - -async def _import_service_certificate( - db_config: BackendDbConfig, - organization_id: OrganizationID, - service_certificate_pem: str, - service_type: SequesterServiceType, - webhook_url: str | None, -) -> None: - async with db_config.pool() as pool: - # 1) Retrieve the sequester authority verify key and check organization is compatible - - async with pool.acquire() as conn: - conn = cast(AsyncpgConnection, conn) - organization = await PGOrganizationComponent._get(conn, id=organization_id) - assert isinstance(organization, Organization) - - if not organization.is_bootstrapped: - raise RuntimeError("Organization is not bootstrapped, aborting.") - - if ( - organization.sequester_authority_certificate is None - or organization.sequester_authority_verify_key_der is None - ): - raise RuntimeError("Organization doesn't support sequester, aborting.") - - # 2) Validate the certificate - - ( - service_certificate_data, - service_certificate_content, - ) = load_sequester_service_certificate_pem( - data=service_certificate_pem, - authority_verify_key=organization.sequester_authority_verify_key_der, - ) - - # 3) Insert the certificate - - service: BaseSequesterService - if service_type == SequesterServiceType.STORAGE: - assert webhook_url is None - service = StorageSequesterService( - service_id=service_certificate_data.service_id, - service_label=service_certificate_data.service_label, - service_certificate=service_certificate_content, - created_on=service_certificate_data.timestamp, - revoked_on=None, - ) - else: - assert service_type == SequesterServiceType.WEBHOOK - assert webhook_url - # Removing the extra slash if present to avoid a useless redirection - service = WebhookSequesterService( - service_id=service_certificate_data.service_id, - service_label=service_certificate_data.service_label, - service_certificate=service_certificate_content, - created_on=service_certificate_data.timestamp, - webhook_url=webhook_url, - revoked_on=None, - ) - - sequester_component = PGSequesterComponent(pool=pool) - await sequester_component.create_service(organization_id, service) - - -@click.command(short_help="Register a new sequester service") -@click.option( - "--service-public-key", - help="The service encryption public key used to encrypt data to the sequester service", - type=click.Path(exists=True, file_okay=True, dir_okay=False, path_type=Path), - required=True, -) -@click.option( - "--authority-private-key", - help="The private authority key use. Used to sign the encryption key.", - type=click.Path(exists=True, file_okay=True, dir_okay=False, path_type=Path), - required=True, -) -@click.option("--service-label", type=str, help="New service name", required=True) -@click.option( - "--organization", - type=OrganizationID, - help="Organization ID where to register the service", - required=True, -) -@click.option( - "--service-type", - type=click.Choice(list(SERVICE_TYPE_CHOICES.keys()), case_sensitive=False), - default=SequesterServiceType.STORAGE.value, - help="Service type", -) -@click.option( - "--webhook-url", - type=str, - default=None, - help="[Service Type webhook only] webhook url used to send encrypted service data", -) -@db_server_options -# Add --debug -@debug_config_options -def create_service( - service_public_key: Path, - authority_private_key: Path, - service_label: str, - organization: OrganizationID, - db: str, - db_max_connections: int, - db_min_connections: int, - service_type: str, - webhook_url: str | None, - debug: bool, -) -> None: - with cli_exception_handler(debug): - cooked_service_type: SequesterServiceType = SERVICE_TYPE_CHOICES[service_type] - # Check service type - if webhook_url is not None and cooked_service_type != SequesterServiceType.WEBHOOK: - raise SequesterBackendCliError( - f"Incompatible service type {cooked_service_type} with webhook_url option\nwebhook_url can only be used with {SequesterServiceType.WEBHOOK}." - ) - if cooked_service_type == SequesterServiceType.WEBHOOK and not webhook_url: - raise SequesterBackendCliError( - "Webhook sequester service requires webhook_url argument" - ) - # Load key files - service_key = SequesterPublicKeyDer.load_pem(service_public_key.read_text()) - authority_key = SequesterSigningKeyDer.load_pem(authority_private_key.read_text()) - # Generate data schema - service_id = SequesterServiceID.new() - now = DateTime.now() - certif_data = SequesterServiceCertificate( - timestamp=now, - service_id=service_id, - service_label=service_label, - encryption_key_der=service_key, - ) - certificate = authority_key.sign(certif_data.dump()) - - sequester_service: BaseSequesterService - if cooked_service_type == SequesterServiceType.STORAGE: - assert webhook_url is None - sequester_service = StorageSequesterService( - service_id=service_id, - service_label=service_label, - service_certificate=certificate, - created_on=now, - revoked_on=None, - ) - else: - assert cooked_service_type == SequesterServiceType.WEBHOOK - assert webhook_url - # Removing the extra slash if present to avoid a useless redirection - sequester_service = WebhookSequesterService( - service_id=service_id, - service_label=service_label, - service_certificate=certificate, - created_on=now, - webhook_url=webhook_url, - revoked_on=None, - ) - - db_config = _get_config(db, db_min_connections, db_max_connections) - - asyncio.run(_create_service(db_config, organization, sequester_service)) - click.echo(click.style("Service created", fg="green")) - - -@click.command(short_help="List available sequester services") -@click.option("--organization", type=OrganizationID, help="Organization ID", required=True) -@db_server_options -# Add --debug -@debug_config_options -def list_services( - organization: OrganizationID, - db: str, - db_max_connections: int, - db_min_connections: int, - debug: bool, -) -> None: - with cli_exception_handler(debug): - db_config = _get_config(db, db_min_connections, db_max_connections) - asyncio.run(_list_services(db_config, organization)) - - -@click.command(short_help="Disable/re-enable a sequester service") -@click.option("--organization", type=OrganizationID, help="Organization ID", required=True) -@click.option( - "--service", - type=SequesterServiceID.from_hex, - help="ID of the sequester service to update", - required=True, -) -@click.option("--enable", is_flag=True, help="Enable service") -@click.option("--disable", is_flag=True, help="Disable service") -@db_server_options -# Add --debug -@debug_config_options -def update_service( - organization: OrganizationID, - service: SequesterServiceID, - db: str, - db_max_connections: int, - db_min_connections: int, - enable: bool, - disable: bool, - debug: bool, -) -> None: - with cli_exception_handler(debug): - if enable and disable: - raise click.BadParameter("Enable and disable flags are both set") - if not enable and not disable: - raise click.BadParameter("Required: enable or disable flag") - db_config = _get_config(db, db_min_connections, db_max_connections) - if disable: - asyncio.run(_disable_service(db_config, organization, service)) - if enable: - asyncio.run(_enable_service(db_config, organization, service)) - click.echo(click.style("Service updated", fg="green")) - - -async def _human_accesses( - config: BackendDbConfig, organization: OrganizationID, user_filter: str -) -> None: - async with ( - config.pool() as pool, - event_bus_factory(pool) as event_bus, - ): - user_component = PGUserComponent(pool=pool, event_bus=event_bus) - realm_component = PGRealmComponent(pool=pool, event_bus=event_bus) - - dump = await user_component.test_dump_current_users(organization_id=organization) - users = list(dump.values()) - if user_filter: - # Now is a good time to filter out - filter_split = user_filter.split() - filtered_users = [] - for user in users: - txt = f"{user.human_handle.str if user.human_handle else ''} {user.user_id.hex}".lower() - if len([True for sq in filter_split if sq in txt]) == len(filter_split): - filtered_users.append(user) - users = filtered_users - - realms_granted_roles = await realm_component.dump_realms_granted_roles( - organization_id=organization - ) - assert isinstance(realms_granted_roles, list) - per_user_granted_roles: dict[UserID, list[RealmGrantedRole]] = {} - for granted_role in realms_granted_roles: - user_granted_roles = per_user_granted_roles.setdefault(granted_role.user_id, []) - user_granted_roles.append(granted_role) - - humans: dict[HumanHandle, list[tuple[UserDump, dict[VlobID, list[RealmGrantedRole]]]]] = {} - for user in users: - human_users = humans.setdefault(user.human_handle, []) - per_user_per_realm_granted_role: dict[VlobID, list[RealmGrantedRole]] = {} - for granted_role in per_user_granted_roles.get(user.user_id, []): - realm_granted_roles = per_user_per_realm_granted_role.setdefault( - granted_role.realm_id, [] - ) - realm_granted_roles.append(granted_role) - - for realm_granted_roles in per_user_per_realm_granted_role.values(): - realm_granted_roles.sort(key=lambda x: x.granted_on) - - human_users.append((user, per_user_per_realm_granted_role)) - - # Typical output to display: - # - # Found 2 results: - # Human John Doe - # - # User 9e082a43b51e44ab9858628bac4a61d9 (ADMIN, created on 2000-01-02T00:00:00Z) - # - # Realm 8006a491f0704040ae9a197ca7501f71 - # 2000-01-04T00:00:00Z: Access OWNER granted - # 2000-01-03T00:00:00Z: Access removed - # 2000-01-02T00:00:00Z: Access READER granted - # - # Realm 109c48b7c931435c913945f08d23432d - # 2000-01-01T00:00:00Z: Access OWNER granted - # - # User 02e0486752d34d6ab3bf8e0befef1935 (STANDARD, created on 2000-01-01T00:00:00Z, revoked on 2000-01-02T00:00:00Z) - # - # Human Jane Doe - # - # User baf59386baf740bba93151cdde1beac8 (OUTSIDER, created on 2000-01-01T00:00:00Z) - # - # Realm 8006a491f0704040ae9a197ca7501f71 - # 2000-01-01T00:00:00Z: Access READER granted - - def _display_user( - user: UserDump, - per_realm_granted_role: dict[VlobID, list[RealmGrantedRole]], - indent: int, - ) -> None: - base_indent = "\t" * indent - display_user = click.style(user.user_id, fg="green") - user_info = f"{user.current_profile}, created on {user.created_on}" - if user.revoked_on: - user_info += f", revoked on {user.revoked_on}" - print(base_indent + f"User {display_user} ({user_info})") - for realm_id, granted_roles in per_realm_granted_role.items(): - display_realm = click.style(realm_id.hex, fg="yellow") - print(base_indent + f"\tRealm {display_realm}") - for granted_role in granted_roles: - if granted_role.role is None: - display_role = "Access removed" - else: - display_role = f"Access {granted_role.role.str} granted" - print(base_indent + f"\t\t{granted_role.granted_on}: {display_role}") - - print(f"Found {len(humans)} result(s)") - - for human_handle, human_users in humans.items(): - display_human = click.style(human_handle, fg="green") - print(f"Human {display_human}") - for user, per_realm_granted_roles in human_users: - _display_user(user, per_realm_granted_roles, indent=1) - print() - - -@click.command(short_help="Get information about user&realm accesses") -@click.option("--organization", type=OrganizationID, required=True) -@click.option("--filter", type=str, default="") -@db_server_options -def human_accesses( - filter: str, - organization: OrganizationID, - db: str, - db_max_connections: int, - db_min_connections: int, -) -> None: - db_config = _get_config(db, db_min_connections, db_max_connections) - asyncio.run(_human_accesses(db_config, organization, filter)) - - -async def _export_realm( - db_config: BackendDbConfig, - blockstore_config: BaseBlockStoreConfig, - organization_id: OrganizationID, - realm_id: VlobID, - service_id: SequesterServiceID, - output: Path, -) -> None: - if output.is_dir(): - # Output is pointing to a directory, use a default name for the database extract - output_db_path = output / f"parsec-sequester-export-realm-{realm_id.hex}.sqlite" - else: - output_db_path = output - - output_db_display = click.style(str(output_db_path), fg="green") - if output.exists(): - click.echo( - f"File {output_db_display} already exists, continue the extract from where it was left" - ) - else: - click.echo(f"Creating {output_db_display}") - - click.echo( - f"Use { click.style('^C', fg='yellow') } to stop the export," - " progress won't be lost when restarting the command" - ) - - async with db_config.pool() as pool: - blockstore_component = blockstore_factory(config=blockstore_config, postgresql_pool=pool) - - async with RealmExporter.run( - organization_id=organization_id, - realm_id=realm_id, - service_id=service_id, - output_db_path=output_db_path, - input_pool=pool, - input_blockstore=blockstore_component, - ) as exporter: - # 1) Export vlobs - - with operation("Computing vlobs (i.e. file/folder metadata) to export"): - ( - vlob_total_count, - vlob_batch_offset_marker, - ) = await exporter.compute_vlobs_export_status() - - if not vlob_total_count: - click.echo("No more vlobs to export !") - else: - vlob_total_count_display = click.style(str(vlob_total_count), fg="green") - click.echo(f"About {vlob_total_count_display} vlobs need to be exported") - with click.progressbar(length=vlob_total_count, label="Exporting vlobs") as bar: - vlobs_exported_count = 0 - vlob_batch_size = 1000 - while True: - new_vlob_batch_offset_marker = await exporter.export_vlobs( - batch_size=vlob_batch_size, batch_offset_marker=vlob_batch_offset_marker - ) - if new_vlob_batch_offset_marker <= vlob_batch_offset_marker: - break - vlob_batch_offset_marker = new_vlob_batch_offset_marker - # Note we might end up with vlobs_exported_count > vlob_total_count - # in case additional vlobs are created during the export, this is no - # big deal though (progress bar will stay at 100%) - bar.update(vlobs_exported_count) - - # Export blocks - - with operation("Computing blocks (i.e. files data) to export"): - ( - block_total_count, - block_batch_offset_marker, - ) = await exporter.compute_blocks_export_status() - - if not block_total_count: - click.echo("No more blocks to export !") - else: - block_total_count_display = click.style(str(block_total_count), fg="green") - - click.echo(f"About {block_total_count_display} blocks need to be exported") - with click.progressbar(length=block_total_count, label="Exporting blocks") as bar: - blocks_exported_count = 0 - block_batch_size = 100 - while True: - new_block_batch_offset_marker = await exporter.export_blocks( - batch_size=block_batch_size, - batch_offset_marker=block_batch_offset_marker, - ) - if new_block_batch_offset_marker <= block_batch_offset_marker: - break - block_batch_offset_marker = new_block_batch_offset_marker - # Note we might end up with blocks_exported_count > block_total_count - # in case additional blocks are created during the export, this is no - # big deal though (progress bar will stay at 100%) - bar.update(blocks_exported_count) - - -@click.command(short_help="Export a realm to consult it with a sequester service key") -@click.option("--organization", type=OrganizationID, required=True) -@click.option("--realm", type=VlobID.from_hex, required=True) -@click.option( - "--service", - type=SequesterServiceID.from_hex, - help="ID of the sequester service to retrieve data from", - required=True, -) -@click.option("--output", type=Path, required=True) -@db_server_options -@blockstore_server_options -# Add --debug -@debug_config_options -def export_realm( - organization: OrganizationID, - realm: VlobID, - service: SequesterServiceID, - output: Path, - db: str, - db_max_connections: int, - db_min_connections: int, - blockstore: BaseBlockStoreConfig, - debug: bool, -) -> None: - with cli_exception_handler(debug): - db_config = _get_config(db, db_min_connections, db_max_connections) - asyncio.run( - _export_realm( - db_config, - blockstore, - organization, - realm, - service, - output, - ) - ) - - -@click.command(short_help="Open a realm export using the sequester service key and dump it content") -@click.option( - "--service-decryption-key", - help="Decryption key of the sequester service that have been use to create the realm export archive", - type=click.Path(exists=True, file_okay=True, dir_okay=False, path_type=Path), - required=True, -) -@click.option("--input", type=Path, required=True, help="Realm export archive") -@click.option( - "--output", type=Path, required=True, help="Directory where to dump the content of the realm" -) -@click.option( - "--filter-date", - type=click.DateTime(formats=["%Y-%m-%d"]), - required=False, - help="Extract at a specific date; format year-month-day", -) -# Add --debug -@debug_config_options -def extract_realm_export( - service_decryption_key: Path, - input: Path, - output: Path, - filter_date: datetime | None, - debug: bool, -) -> int: - with cli_exception_handler(debug): - # Finally a command that is not async ! - # This is because here we do only a single thing at a time and sqlite3 provide - # a synchronous api anyway - decryption_key = SequesterPrivateKeyDer.load_pem(service_decryption_key.read_text()) - - # Convert filter_date from click.Datetime to parsec.Datetime - date: DateTime - if filter_date: - date = DateTime.from_timestamp_micros(int(filter_date.timestamp() * 1_000_000)) - else: - date = DateTime.now() - ret = 0 - for fs_path, event_type, event_msg in extract_workspace( - output=output, export_db=input, decryption_key=decryption_key, filter_on_date=date - ): - if event_type == RealmExportProgress.EXTRACT_IN_PROGRESS: - fs_path_display = click.style(str(fs_path), fg="yellow") - click.echo(f"{ fs_path_display }: { event_msg }") - else: - # Error - ret = 1 - fs_path_display = click.style(str(fs_path), fg="red") - click.echo(f"{ fs_path_display }: { event_type.value } { event_msg }") - - return ret diff --git a/server/parsec/cli/sequester_create.py b/server/parsec/cli/sequester_create.py new file mode 100644 index 00000000000..5aeaf627094 --- /dev/null +++ b/server/parsec/cli/sequester_create.py @@ -0,0 +1,391 @@ +# Parsec Cloud (https://parsec.cloud) Copyright (c) BUSL-1.1 2016-present Scille SAS +from __future__ import annotations + +import asyncio +import textwrap +from base64 import b64decode, b64encode +from pathlib import Path +from typing import Any + +import click + +from parsec._parsec import ( + CryptoError, + DateTime, + OrganizationID, + SequesterPublicKeyDer, + SequesterServiceCertificate, + SequesterServiceID, + SequesterSigningKeyDer, + SequesterVerifyKeyDer, +) +from parsec.ballpark import RequireGreaterTimestamp +from parsec.cli.options import db_server_options, debug_config_options, logging_config_options +from parsec.cli.testbed import if_testbed_available +from parsec.cli.utils import cli_exception_handler, start_backend +from parsec.components.organization import Organization, OrganizationGetBadOutcome +from parsec.components.sequester import ( + SequesterCreateServiceStoreBadOutcome, + SequesterCreateServiceValidateBadOutcome, + SequesterServiceConfig, + SequesterServiceType, +) +from parsec.config import BaseDatabaseConfig, LogLevel, MockedBlockStoreConfig + +SEQUESTER_SERVICE_CERTIFICATE_PEM_HEADER = "-----BEGIN PARSEC SEQUESTER SERVICE CERTIFICATE-----" +SEQUESTER_SERVICE_CERTIFICATE_PEM_FOOTER = "-----END PARSEC SEQUESTER SERVICE CERTIFICATE-----" + + +def _dump_sequester_service_certificate_pem( + certificate: SequesterServiceCertificate, + authority_signing_key: SequesterSigningKeyDer, +) -> str: + signed = authority_signing_key.sign(certificate.dump()) + return "\n".join( + ( + SEQUESTER_SERVICE_CERTIFICATE_PEM_HEADER, + *textwrap.wrap(b64encode(signed).decode(), width=64), + SEQUESTER_SERVICE_CERTIFICATE_PEM_FOOTER, + "", + ) + ) + + +def _load_sequester_service_certificate_pem( + pem: str, authority_verify_key: SequesterVerifyKeyDer +) -> tuple[SequesterServiceCertificate, bytes]: + err_msg = "Not a valid Parsec sequester service certificate PEM file" + try: + header, *content, footer = pem.strip().splitlines() + except ValueError as exc: + raise ValueError(err_msg) from exc + + if header != SEQUESTER_SERVICE_CERTIFICATE_PEM_HEADER: + raise ValueError(f"{err_msg}: missing `{SEQUESTER_SERVICE_CERTIFICATE_PEM_HEADER}` header") + if footer != SEQUESTER_SERVICE_CERTIFICATE_PEM_FOOTER: + raise ValueError(f"{err_msg}: missing `{SEQUESTER_SERVICE_CERTIFICATE_PEM_FOOTER}` footer") + + try: + signed = b64decode("".join(content)) + return ( + SequesterServiceCertificate.load(authority_verify_key.verify(signed)), + signed, + ) + except (ValueError, CryptoError) as exc: + raise ValueError(f"{err_msg}: invalid body ({exc})") from exc + + +SERVICE_TYPE_CHOICES: dict[str, SequesterServiceType] = { + service.value: service for service in SequesterServiceType +} + + +class GenerateServiceCertificateDevOption(click.Option): + def handle_parse_result( + self, ctx: click.Context, opts: Any, args: list[str] + ) -> tuple[Any, list[str]]: + value, args = super().handle_parse_result(ctx, opts, args) + if value: + import os + import tempfile + + from parsec._parsec import testbed + + template_content = testbed.test_get_testbed_template("sequestered") + assert template_content is not None + event = template_content.events[0] + assert isinstance(event, testbed.TestbedEventBootstrapOrganization) + assert event.sequester_authority_signing_key is not None + + # Note this file is not deleted when the application ends, this is considered + # okay since it is only used for niche testing purpose. + file_fd, file_path = tempfile.mkstemp() + os.write(file_fd, event.sequester_authority_signing_key.dump_pem().encode("utf8")) + os.close(file_fd) + + for key, value in ( + ("debug", True), + ("authority_private_key", file_path), + ): + if key not in opts: + opts[key] = value + + return value, args + + +@click.command( + short_help="Generate a certificate for a new sequester service", + help="""Generate a certificate for a new sequester service. + + A sequester service certificate references a RSA key that will be used + to decrypt all the data encrypted within the sequester organization. + + To be accepted, the RSA key must be signed by the sequester authority + RSA key (configured during organization bootstrap). + """, +) +@click.option("--service-label", type=str, help="New service name", required=True) +@click.option( + "--service-public-key", + help="File containing the service encryption public key used to encrypt data to the sequester service", + type=click.Path(exists=True, file_okay=True, dir_okay=False, path_type=Path), + required=True, +) +@click.option( + "--authority-private-key", + help="File containing the private authority key use. Used to sign the encryption key.", + type=click.Path(exists=True, file_okay=True, dir_okay=False, path_type=Path), + required=True, +) +@click.argument("output", required=False) +# Add --debug & --version +@debug_config_options +@if_testbed_available( + click.option( + "--dev", + cls=GenerateServiceCertificateDevOption, + is_flag=True, + is_eager=True, + help=( + "Equivalent to `--debug --authority-private-key=`" + ), + ) +) +def generate_service_certificate( + service_label: str, + service_public_key: Path, + authority_private_key: Path, + output: str | None, + debug: bool, + dev: bool = False, +) -> None: + output = output or str(Path.cwd()) + + with cli_exception_handler(debug): + # 1) Load key files + + service_key = SequesterPublicKeyDer.load_pem(service_public_key.read_text()) + authority_key = SequesterSigningKeyDer.load_pem(authority_private_key.read_text()) + + # 2) Generate certificate + + service_id = SequesterServiceID.new() + timestamp = DateTime.now() + certificate = SequesterServiceCertificate( + timestamp=timestamp, + service_id=service_id, + service_label=service_label, + encryption_key_der=service_key, + ) + + # 3) Write the certificate as PEM in output file + + pem_content = _dump_sequester_service_certificate_pem( + certificate=certificate, + authority_signing_key=authority_key, + ) + + cooked_output = Path(output) + if cooked_output.is_dir(): + output_file = ( + cooked_output + / f"sequester_service_certificate-{service_id.hex}-{timestamp.to_rfc3339()}.pem" + ) + else: + output_file = cooked_output + output_file.write_bytes(pem_content.encode("utf8")) + + display_service = f"{click.style(service_label, fg='yellow')} (id: {click.style(service_id.hex, fg='yellow')}, timestamp: {click.style(timestamp, fg='yellow')})" + display_file = click.style(output_file, fg="green") + click.echo(f"Sequester service certificate {display_service} exported in {display_file}") + click.echo( + f"Use {click.style('parsec sequester create_service', fg='yellow')} command to add it to an organization" + ) + + +class CreateServiceDevOption(click.Option): + def handle_parse_result( + self, ctx: click.Context, opts: Any, args: list[str] + ) -> tuple[Any, list[str]]: + value, args = super().handle_parse_result(ctx, opts, args) + if value: + for key, value in ( + ("debug", True), + ("db", "MOCKED"), + ("with_testbed", "sequestered"), + ("organization", "SequesteredOrgTemplate"), + ): + if key not in opts: + opts[key] = value + + return value, args + + +@click.command(short_help="Create a new sequester service from its existing certificate") +@click.option( + "--service-certificate", + help="File containing the sequester service certificate (previously generated by `parsec sequester generate_service_certificate` command).", + type=click.File("r", encoding="utf8"), + required=True, + metavar="CERTIFICATE.pem", +) +@click.option( + "--organization", + type=OrganizationID, + help="Organization ID where to register the service", + required=True, +) +# TODO: Webhook sequester service not implemented yet +# @click.option( +# "--service-type", +# type=click.Choice(list(SERVICE_TYPE_CHOICES.keys()), case_sensitive=False), +# default=SequesterServiceType.STORAGE.value, +# help="Service type", +# ) +# @click.option( +# "--webhook-url", +# type=str, +# default=None, +# help="[Service Type webhook only] webhook url used to send encrypted service data", +# ) +@db_server_options +# Add --log-level/--log-format/--log-file +@logging_config_options(default_log_level="INFO") +# Add --debug & --version +@debug_config_options +@if_testbed_available( + click.option("--with-testbed", help="Start by populating with a testbed template") +) +@if_testbed_available( + click.option( + "--dev", + cls=CreateServiceDevOption, + is_flag=True, + is_eager=True, + help=( + "Equivalent to `--debug --db=MOCKED --with-testbed=sequestered --organization SequesteredOrgTemplate`" + ), + ) +) +def create_service( + service_certificate: click.utils.LazyFile, + organization: OrganizationID, + db: BaseDatabaseConfig, + db_max_connections: int, + db_min_connections: int, + # TODO: Webhook sequester service not implemented yet + # service_type: str, + # webhook_url: str | None, + log_level: LogLevel, + log_format: str, + log_file: str | None, + debug: bool, + with_testbed: str | None = None, + dev: bool = False, +) -> None: + with cli_exception_handler(debug): + sequester_service_config = SequesterServiceType.STORAGE + + # TODO: Webhook sequester service not implemented yet + # cooked_service_type: SequesterServiceType = SERVICE_TYPE_CHOICES[service_type] + # # Check service type + # if webhook_url is not None and cooked_service_type != SequesterServiceType.WEBHOOK: + # raise RuntimeError( + # f"Incompatible service type {cooked_service_type} with webhook_url option\nwebhook_url can only be used with {SequesterServiceType.WEBHOOK}." + # ) + # if cooked_service_type == SequesterServiceType.WEBHOOK and not webhook_url: + # raise RuntimeError( + # "Webhook sequester service requires webhook_url argument" + # ) + + service_certificate_pem = service_certificate.read() + + asyncio.run( + _create_service( + db_config=db, + debug=debug, + with_testbed=with_testbed, + organization_id=organization, + service_certificate_pem=service_certificate_pem, + sequester_service_config=sequester_service_config, + ) + ) + click.echo(click.style("Service created", fg="green")) + + +async def _create_service( + db_config: BaseDatabaseConfig, + debug: bool, + with_testbed: str | None, + organization_id: OrganizationID, + service_certificate_pem: str, + sequester_service_config: SequesterServiceConfig, +) -> None: + # Can use a dummy blockstore config since we are not going to query it + blockstore_config = MockedBlockStoreConfig() + + async with start_backend( + db_config=db_config, + blockstore_config=blockstore_config, + debug=debug, + populate_with_template=with_testbed, + ) as backend: + # 1) Retrieve the organization and check it is compatible + + outcome = await backend.organization.get(organization_id) + match outcome: + case Organization() as org: + pass + case OrganizationGetBadOutcome.ORGANIZATION_NOT_FOUND: + raise RuntimeError("Organization doesn't exist") + + if not org.is_bootstrapped: + raise RuntimeError("Organization is not bootstrapped") + + if not org.is_sequestered: + raise RuntimeError("Organization is not sequestered") + + # 2) Validate the certificate + + assert org.sequester_authority_verify_key_der is not None + + ( + service_certificate_cooked, + service_certificate_raw, + ) = _load_sequester_service_certificate_pem( + pem=service_certificate_pem, + authority_verify_key=org.sequester_authority_verify_key_der, + ) + + # 3) Insert the certificate + + outcome = await backend.sequester.create_service( + now=DateTime.now(), + organization_id=organization_id, + service_certificate=service_certificate_raw, + config=sequester_service_config, + ) + match outcome: + case SequesterServiceCertificate(): + pass + + case RequireGreaterTimestamp() as err: + raise RuntimeError( + f"Cannot import certificate since its timestamp (`{service_certificate_cooked.timestamp}`) is older " + f"than the most recent sequester certificate already on the on server (`{err.strictly_greater_than}`)" + ) + + case SequesterCreateServiceStoreBadOutcome.SEQUESTER_SERVICE_ALREADY_EXISTS: + raise RuntimeError( + f"Sequester service with ID `{service_certificate_cooked.service_id}` already exists" + ) + + case SequesterCreateServiceStoreBadOutcome.ORGANIZATION_NOT_FOUND: + raise RuntimeError("Organization doesn't exist") + + case SequesterCreateServiceStoreBadOutcome.SEQUESTER_DISABLED: + raise RuntimeError("Organization is not sequestered") + + # Should never occur since we have already checked the validity at step 2 + case SequesterCreateServiceValidateBadOutcome.INVALID_CERTIFICATE: + assert False diff --git a/server/parsec/cli/sequester_list.py b/server/parsec/cli/sequester_list.py new file mode 100644 index 00000000000..751162dfbb8 --- /dev/null +++ b/server/parsec/cli/sequester_list.py @@ -0,0 +1,137 @@ +# Parsec Cloud (https://parsec.cloud) Copyright (c) BUSL-1.1 2016-present Scille SAS +from __future__ import annotations + +import asyncio +from typing import Any + +import click + +from parsec._parsec import ( + OrganizationID, +) +from parsec.cli.options import db_server_options, debug_config_options, logging_config_options +from parsec.cli.testbed import if_testbed_available +from parsec.cli.utils import cli_exception_handler, start_backend +from parsec.components.sequester import ( + BaseSequesterService, + SequesterGetOrganizationServicesBadOutcome, + WebhookSequesterService, +) +from parsec.config import BaseDatabaseConfig, LogLevel, MockedBlockStoreConfig + + +class DevOption(click.Option): + def handle_parse_result( + self, ctx: click.Context, opts: Any, args: list[str] + ) -> tuple[Any, list[str]]: + value, args = super().handle_parse_result(ctx, opts, args) + if value: + for key, value in ( + ("debug", True), + ("db", "MOCKED"), + ("with_testbed", "sequestered"), + ("organization", "SequesteredOrgTemplate"), + ): + if key not in opts: + opts[key] = value + + return value, args + + +@click.command(short_help="List sequester services in a given organization") +@click.option("--organization", type=OrganizationID, help="Organization ID", required=True) +@db_server_options +# Add --log-level/--log-format/--log-file +@logging_config_options(default_log_level="INFO") +# Add --debug & --version +@debug_config_options +@if_testbed_available( + click.option("--with-testbed", help="Start by populating with a testbed template") +) +@if_testbed_available( + click.option( + "--dev", + cls=DevOption, + is_flag=True, + is_eager=True, + help=( + "Equivalent to `--debug --db=MOCKED --with-testbed=sequestered --organization SequesteredOrgTemplate`" + ), + ) +) +def list_services( + organization: OrganizationID, + db: BaseDatabaseConfig, + db_max_connections: int, + db_min_connections: int, + log_level: LogLevel, + log_format: str, + log_file: str | None, + debug: bool, + with_testbed: str | None = None, + dev: bool = False, +) -> None: + with cli_exception_handler(debug): + asyncio.run( + _list_services( + db_config=db, + debug=debug, + with_testbed=with_testbed, + organization_id=organization, + ) + ) + + +def _display_service(service: BaseSequesterService) -> None: + display_service_id = click.style(service.service_id.hex, fg="yellow") + display_service_label = click.style(service.service_label, fg="green") + click.echo(f"Service {display_service_label} (id: {display_service_id})") + click.echo(f"\tCreated on: {service.created_on}") + click.echo(f"\tService type: {service.service_type}") + if isinstance(service, WebhookSequesterService): + click.echo(f"\tWebhook endpoint URL {service.webhook_url}") + if service.is_revoked: + display_revoked = click.style("Revoked", fg="red") + click.echo(f"\t{display_revoked} on: {service.revoked_on}") + + +async def _list_services( + db_config: BaseDatabaseConfig, + debug: bool, + with_testbed: str | None, + organization_id: OrganizationID, +) -> None: + # Can use a dummy blockstore config since we are not going to query it + blockstore_config = MockedBlockStoreConfig() + + async with start_backend( + db_config=db_config, + blockstore_config=blockstore_config, + debug=debug, + populate_with_template=with_testbed, + ) as backend: + # 1) Retrieve the organization and check it is compatible + + outcome = await backend.sequester.get_organization_services(organization_id=organization_id) + match outcome: + case list() as services: + pass + case SequesterGetOrganizationServicesBadOutcome.ORGANIZATION_NOT_FOUND: + raise RuntimeError("Organization doesn't exist") + case SequesterGetOrganizationServicesBadOutcome.SEQUESTER_DISABLED: + raise RuntimeError("Organization is not sequestered") + + display_services_count = click.style(len(services), fg="green") + click.echo(f"Found {display_services_count} sequester service(s)") + + # Display active services first, and order them by creation date + + active = (service for service in services if not service.is_revoked) + for service in sorted(active, key=lambda s: s.created_on): + print() + _display_service(service) + + revoked = (service for service in services if service.is_revoked) + for service in sorted(revoked, key=lambda s: s.created_on): + print() + _display_service(service) diff --git a/server/parsec/cli/sequester_revoke.py b/server/parsec/cli/sequester_revoke.py new file mode 100644 index 00000000000..c0936839918 --- /dev/null +++ b/server/parsec/cli/sequester_revoke.py @@ -0,0 +1,350 @@ +# Parsec Cloud (https://parsec.cloud) Copyright (c) BUSL-1.1 2016-present Scille SAS +from __future__ import annotations + +import asyncio +import textwrap +from base64 import b64decode, b64encode +from pathlib import Path +from typing import Any + +import click + +from parsec._parsec import ( + CryptoError, + DateTime, + OrganizationID, + SequesterRevokedServiceCertificate, + SequesterServiceID, + SequesterSigningKeyDer, + SequesterVerifyKeyDer, +) +from parsec.ballpark import RequireGreaterTimestamp +from parsec.cli.options import db_server_options, debug_config_options, logging_config_options +from parsec.cli.testbed import if_testbed_available +from parsec.cli.utils import cli_exception_handler, start_backend +from parsec.components.organization import Organization, OrganizationGetBadOutcome +from parsec.components.sequester import ( + SequesterRevokeServiceStoreBadOutcome, + SequesterRevokeServiceValidateBadOutcome, +) +from parsec.config import BaseDatabaseConfig, LogLevel, MockedBlockStoreConfig + +SEQUESTER_SERVICE_REVOCATION_CERTIFICATE_PEM_HEADER = ( + "-----BEGIN PARSEC SEQUESTER SERVICE REVOCATION CERTIFICATE-----" +) +SEQUESTER_SERVICE_REVOCATION_CERTIFICATE_PEM_FOOTER = ( + "-----END PARSEC SEQUESTER SERVICE REVOCATION CERTIFICATE-----" +) + + +def _dump_sequester_service_revocation_certificate_pem( + certificate: SequesterRevokedServiceCertificate, + authority_signing_key: SequesterSigningKeyDer, +) -> str: + signed = authority_signing_key.sign(certificate.dump()) + return "\n".join( + ( + SEQUESTER_SERVICE_REVOCATION_CERTIFICATE_PEM_HEADER, + *textwrap.wrap(b64encode(signed).decode(), width=64), + SEQUESTER_SERVICE_REVOCATION_CERTIFICATE_PEM_FOOTER, + "", + ) + ) + + +def _load_sequester_service_revocation_certificate_pem( + pem: str, authority_verify_key: SequesterVerifyKeyDer +) -> tuple[SequesterRevokedServiceCertificate, bytes]: + err_msg = "Not a valid Parsec sequester service revocation certificate PEM file" + try: + header, *content, footer = pem.strip().splitlines() + except ValueError as exc: + raise ValueError(err_msg) from exc + + if header != SEQUESTER_SERVICE_REVOCATION_CERTIFICATE_PEM_HEADER: + raise ValueError( + f"{err_msg}: missing `{SEQUESTER_SERVICE_REVOCATION_CERTIFICATE_PEM_HEADER}` header" + ) + if footer != SEQUESTER_SERVICE_REVOCATION_CERTIFICATE_PEM_FOOTER: + raise ValueError( + f"{err_msg}: missing `{SEQUESTER_SERVICE_REVOCATION_CERTIFICATE_PEM_FOOTER}` footer" + ) + + try: + signed = b64decode("".join(content)) + return ( + SequesterRevokedServiceCertificate.load(authority_verify_key.verify(signed)), + signed, + ) + except (ValueError, CryptoError) as exc: + raise ValueError(f"{err_msg}: invalid body ({exc})") from exc + + +class SequesterBackendCliError(Exception): + pass + + +class GenerateServiceRevocationCertificateDevOption(click.Option): + def handle_parse_result( + self, ctx: click.Context, opts: Any, args: list[str] + ) -> tuple[Any, list[str]]: + value, args = super().handle_parse_result(ctx, opts, args) + if value: + import os + import tempfile + + from parsec._parsec import testbed + + template_content = testbed.test_get_testbed_template("sequestered") + assert template_content is not None + event = template_content.events[0] + assert isinstance(event, testbed.TestbedEventBootstrapOrganization) + assert event.sequester_authority_signing_key is not None + + # Note this file is not deleted when the application ends, this is considered + # okay since it is only used for niche testing purpose. + file_fd, file_path = tempfile.mkstemp() + os.write(file_fd, event.sequester_authority_signing_key.dump_pem().encode("utf8")) + os.close(file_fd) + + for key, value in ( + ("debug", True), + ("authority_private_key", file_path), + ): + if key not in opts: + opts[key] = value + + return value, args + + +@click.command(short_help="Generate a certificate for a new sequester service") +@click.option( + "--service-id", + type=SequesterServiceID.from_hex, + required=True, +) +@click.option( + "--authority-private-key", + help="File containing the private authority key use. Used to sign the encryption key.", + type=click.Path(exists=True, file_okay=True, dir_okay=False, path_type=Path), + required=True, +) +@click.argument("output", required=False) +# Add --debug & --version +@debug_config_options +@if_testbed_available( + click.option( + "--dev", + cls=GenerateServiceRevocationCertificateDevOption, + is_flag=True, + is_eager=True, + help=( + "Equivalent to `--debug --authority-private-key=`" + ), + ) +) +def generate_service_revocation_certificate( + service_id: SequesterServiceID, + authority_private_key: Path, + output: str | None, + debug: bool, + dev: bool = False, +) -> None: + output = output or str(Path.cwd()) + + with cli_exception_handler(debug): + # 1) Load key files + + authority_key = SequesterSigningKeyDer.load_pem(authority_private_key.read_text()) + + # 2) Generate certificate + + timestamp = DateTime.now() + certificate = SequesterRevokedServiceCertificate( + timestamp=timestamp, + service_id=service_id, + ) + + # 3) Write the certificate as PEM in output file + + pem_content = _dump_sequester_service_revocation_certificate_pem( + certificate=certificate, + authority_signing_key=authority_key, + ) + + cooked_output = Path(output) + if cooked_output.is_dir(): + output_file = ( + cooked_output + / f"sequester_service_revocation_certificate-{service_id.hex}-{timestamp.to_rfc3339()}.pem" + ) + else: + output_file = cooked_output + output_file.write_bytes(pem_content.encode("utf8")) + + display_service = f"(sequester service ID: {click.style(service_id.hex, fg='yellow')}, timestamp: {click.style(timestamp, fg='yellow')})" + display_file = click.style(output_file, fg="green") + click.echo( + f"Sequester service revocation certificate {display_service} exported in {display_file}" + ) + click.echo( + f"Use {click.style('parsec sequester revoke_service', fg='yellow')} command to add it to an organization" + ) + + +class RevokeServiceDevOption(click.Option): + def handle_parse_result( + self, ctx: click.Context, opts: Any, args: list[str] + ) -> tuple[Any, list[str]]: + value, args = super().handle_parse_result(ctx, opts, args) + if value: + for key, value in ( + ("debug", True), + ("db", "MOCKED"), + ("with_testbed", "sequestered"), + ("organization", "SequesteredOrgTemplate"), + ): + if key not in opts: + opts[key] = value + + return value, args + + +@click.command(short_help="Create a new sequester service from its existing certificate") +@click.option( + "--service-revocation-certificate", + help="File containing the sequester service certificate (previously generated by `parsec sequester generate_service_revocation_certificate` command).", + type=click.File("r", encoding="utf8"), + required=True, + metavar="CERTIFICATE.pem", +) +@click.option( + "--organization", + type=OrganizationID, + help="Organization ID where to register the service", + required=True, +) +@db_server_options +# Add --log-level/--log-format/--log-file +@logging_config_options(default_log_level="INFO") +# Add --debug & --version +@debug_config_options +@if_testbed_available( + click.option("--with-testbed", help="Start by populating with a testbed template") +) +@if_testbed_available( + click.option( + "--dev", + cls=RevokeServiceDevOption, + is_flag=True, + is_eager=True, + help=( + "Equivalent to `--debug --db=MOCKED --with-testbed=sequestered --organization SequesteredOrgTemplate`" + ), + ) +) +def revoke_service( + service_revocation_certificate: click.utils.LazyFile, + organization: OrganizationID, + db: BaseDatabaseConfig, + db_max_connections: int, + db_min_connections: int, + log_level: LogLevel, + log_format: str, + log_file: str | None, + debug: bool, + with_testbed: str | None = None, + dev: bool = False, +) -> None: + debug = False + with cli_exception_handler(debug): + service_revocation_certificate_pem = service_revocation_certificate.read() + + asyncio.run( + _revoke_service( + db_config=db, + debug=debug, + with_testbed=with_testbed, + organization_id=organization, + service_revocation_certificate_pem=service_revocation_certificate_pem, + ) + ) + click.echo(click.style("Service revoked", fg="green")) + + +async def _revoke_service( + db_config: BaseDatabaseConfig, + debug: bool, + with_testbed: str | None, + organization_id: OrganizationID, + service_revocation_certificate_pem: str, +) -> None: + # Can use a dummy blockstore config since we are not going to query it + blockstore_config = MockedBlockStoreConfig() + + async with start_backend( + db_config=db_config, + blockstore_config=blockstore_config, + debug=debug, + populate_with_template=with_testbed, + ) as backend: + # 1) Retrieve the organization and check it is compatible + + outcome = await backend.organization.get(organization_id) + match outcome: + case Organization() as org: + pass + case OrganizationGetBadOutcome.ORGANIZATION_NOT_FOUND: + raise RuntimeError("Organization doesn't exist") + + if not org.is_bootstrapped: + raise RuntimeError("Organization is not bootstrapped") + + if not org.is_sequestered: + raise RuntimeError("Organization is not sequestered") + + # 2) Validate the certificate + + assert org.sequester_authority_verify_key_der is not None + + ( + revoked_service_cooked, + revoked_service_raw, + ) = _load_sequester_service_revocation_certificate_pem( + pem=service_revocation_certificate_pem, + authority_verify_key=org.sequester_authority_verify_key_der, + ) + + # 3) Insert the certificate + + outcome = await backend.sequester.revoke_service( + now=DateTime.now(), + organization_id=organization_id, + revoked_service_certificate=revoked_service_raw, + ) + + match outcome: + case SequesterRevokedServiceCertificate(): + pass + + case RequireGreaterTimestamp() as err: + raise RuntimeError( + f"Cannot import certificate since its timestamp (`{revoked_service_cooked.timestamp}`) is older " + f"than the most recent sequester certificate already on the on server (`{err.strictly_greater_than}`)" + ) + + case SequesterRevokeServiceStoreBadOutcome.ORGANIZATION_NOT_FOUND: + raise RuntimeError("Organization doesn't exist") + + case SequesterRevokeServiceStoreBadOutcome.SEQUESTER_DISABLED: + raise RuntimeError("Organization is not sequestered") + + case SequesterRevokeServiceStoreBadOutcome.SEQUESTER_SERVICE_NOT_FOUND: + raise RuntimeError("Sequester service not found") + + case SequesterRevokeServiceStoreBadOutcome.SEQUESTER_SERVICE_ALREADY_REVOKED: + raise RuntimeError("Sequester service already revoked") + + # Should never occur since we have already checked the validity at step 2 + case SequesterRevokeServiceValidateBadOutcome.INVALID_CERTIFICATE: + assert False diff --git a/server/parsec/cli/testbed.py b/server/parsec/cli/testbed.py index 7070f0332df..de461fbbf0b 100644 --- a/server/parsec/cli/testbed.py +++ b/server/parsec/cli/testbed.py @@ -5,7 +5,7 @@ import asyncio import tempfile from contextlib import asynccontextmanager -from typing import Any, AsyncIterator, TypeAlias +from typing import Any, AsyncIterator, Callable, TypeAlias import anyio import click @@ -44,6 +44,14 @@ logger: structlog.stdlib.BoundLogger = structlog.get_logger() +# Helper for other CLI commands to add dev-related options +def if_testbed_available[FC](decorator: Callable[[FC], FC]) -> Callable[[FC], FC]: + if TESTBED_AVAILABLE: + return decorator + else: + return lambda f: f + + DEFAULT_PORT = 6770 diff --git a/server/parsec/cli/utils.py b/server/parsec/cli/utils.py index c11205711ff..65a9cbce2af 100644 --- a/server/parsec/cli/utils.py +++ b/server/parsec/cli/utils.py @@ -21,7 +21,17 @@ import anyio import click -from parsec._parsec import DateTime +from parsec._parsec import DateTime, ParsecAddr +from parsec.backend import Backend, backend_factory +from parsec.config import ( + BackendConfig, + BaseBlockStoreConfig, + BaseDatabaseConfig, + SmtpEmailConfig, +) +from parsec.logging import get_logger + +logger = get_logger() class SchemesInternalType(TypedDict): @@ -252,3 +262,64 @@ def convert( py_datetime = py_datetime.replace(tzinfo=datetime.timezone.utc) return DateTime.from_timestamp_micros(int(py_datetime.timestamp() * 1_000_000)) + + +@asynccontextmanager +async def start_backend( + db_config: BaseDatabaseConfig, + blockstore_config: BaseBlockStoreConfig, + debug: bool, + populate_with_template: str | None = None, +): + """ + Start backend for + """ + + class CliBackendConfig(BackendConfig): + __slots__ = () + + @property + def administration_token(self) -> str: # type: ignore[reportIncompatibleVariableOverride] + assert False, "Unused configuration" + + @property + def email_config(self) -> SmtpEmailConfig: # type: ignore[reportIncompatibleVariableOverride] + assert False, "Unused configuration" + + @property + def server_addr(self) -> ParsecAddr: # type: ignore[reportIncompatibleVariableOverride] + assert False, "Unused configuration" + + config = BackendConfig( + debug=debug, + db_config=db_config, + blockstore_config=blockstore_config, + administration_token=None, # type: ignore + email_config=None, # type: ignore + server_addr=None, # type: ignore + ) + # Cannot directly initialize a `CliBackendConfig` since its + # `administration_token`/`email_config`/`server_addr` fields have not setter. + # + # Also note that swapping the class of an existing instance is totally fine + # as long as both classes have the same fields. + config.__class__ = CliBackendConfig + + async with backend_factory(config=config, verbose=False) as backend: + if populate_with_template is not None: + await _populate_backend(backend, populate_with_template) + + yield backend + + +async def _populate_backend(backend: Backend, testbed_template: str) -> None: + from parsec._parsec import testbed + + template_content = testbed.test_get_testbed_template(testbed_template) + if template_content is None: + raise RuntimeError(f"Testbed template `{testbed_template}` not found") + + organization_id = await backend.test_load_template(template_content) + logger.warning( + f"Populating backend with testbed template `{testbed_template}` as organization `{organization_id}`" + )