diff --git a/docs/api/operators/keys.md b/docs/api/operators/keys.md index d54873d..3ed2ac0 100644 --- a/docs/api/operators/keys.md +++ b/docs/api/operators/keys.md @@ -20,8 +20,8 @@ Wrapper for individual key functions, such as signing, encryption, etc. Returned --- -## `KeyEditSession()` - Wrapper for `gpg --edit-key` +## `KeyEditor()` - Wrapper for `gpg --edit-key` -Wraps advanced key editing functions in a stateless interface. Returned by `Key().edit(...)` +Wraps advanced key editing functions in a stateless interface. Returned by `Key().edit(...)`. Operations in this category should be considered unsafe, as they rely on an unstable terminal menu to function. -::: gpyg.operators.KeyEditSession \ No newline at end of file +::: gpyg.operators.KeyEditor \ No newline at end of file diff --git a/gpyg/models/key_editing.py b/gpyg/models/key_editing.py index c271d9a..33a299b 100644 --- a/gpyg/models/key_editing.py +++ b/gpyg/models/key_editing.py @@ -107,3 +107,8 @@ class SigningModes(StrEnum): NON_EXPORTABLE = "lsign" NON_REVOCABLE = "nrsign" TRUST = "tsign" + + +class RevocationReason(StrEnum): + NO_REASON = "0" + USER_ID_INVALID = "4" diff --git a/gpyg/operators/keys.py b/gpyg/operators/keys.py index 4356cb4..c2f1fb4 100644 --- a/gpyg/operators/keys.py +++ b/gpyg/operators/keys.py @@ -4,9 +4,19 @@ from typing import Any, Literal from pydantic import Field, PrivateAttr, computed_field + +from gpyg.util import interactive from .common import BaseOperator from ..util import ExecutionError, ProcessSession, StatusInteractive, StatusLine -from ..models import InfoLine, parse_infoline, KeyModel, StatusCodes, SigningModes +from ..models import ( + InfoLine, + parse_infoline, + KeyModel, + StatusCodes, + SigningModes, + SignatureInfo, + RevocationReason, +) class KeyOperator(BaseOperator): @@ -554,31 +564,40 @@ def set_primary_uid(self, uid: str, passphrase: str | None = None) -> "Key": raise ExecutionError(proc.output) @contextmanager - def edit(self, passphrase: str | None = None, user: str | None = None): + def edit(self, user: str | None = None): + """Create an interactive editing instance + + Args: + user (str | None, optional): Optional user to run as. If left as None, uses the current Key's user ID. Defaults to None. + + Yields: + KeyEditor: An initialized interactive instance + """ with StatusInteractive( self.session, - f"gpg --command-fd 0 --status-fd 1 -u {user if user else self.fingerprint} --with-sig-list --debug-level guru --pinentry-mode loopback --yes --with-colons --expert --no-tty --edit-key {self.fingerprint}", + f"gpg --command-fd 0 --status-fd 1 -u {user if user else self.fingerprint} --with-sig-list --pinentry-mode loopback --yes --with-colons --no-tty --edit-key {self.fingerprint}", ) as interactive: - editor = KeyEditor( - self, passphrase, user if user else self.fingerprint, interactive - ) + editor = KeyEditor(self, user if user else self.fingerprint, interactive) yield editor class KeyEditor: + def __init__( self, key: Key, - passphrase: str | None, user: str, interactive: StatusInteractive, ): self.key = key - self.passphrase = passphrase self.user = user self.interactive = interactive self.wait_for_status(StatusCodes.GET_LINE) + def dbg(self): + for line in self.interactive.readlines(yield_empty=False): + print(line) + def wait_for_status(self, *code: str): lines: list[StatusLine] = [] for line in self.interactive.readlines(): @@ -594,34 +613,228 @@ def list(self) -> list[InfoLine]: return [parse_infoline(line.content) for line in non_status] def quit(self): + """Quit without saving""" self.interactive.writelines("quit") + self.interactive.process.wait() def save(self): + """Save changes & quit""" self.interactive.writelines("save") + self.interactive.process.wait() def set_uid(self, uid: str): + """Select a specific UID within the current key + + Args: + uid (str): UID to select, or "0" for no UID + """ self.interactive.writelines(f"uid {uid}") self.wait_for_status(StatusCodes.GET_LINE) def set_key(self, key: str): + """Select a specific subkey + + Args: + key (str): KEY to select, or "0" for none. + """ self.interactive.writelines(f"key {key}") self.wait_for_status(StatusCodes.GET_LINE) - def sign(self, mode: SigningModes = SigningModes.EXPORTABLE) -> bool: + def sign( + self, + mode: SigningModes = SigningModes.EXPORTABLE, + signer_passphrase: str | None = None, + ) -> None: + """Sign the active key. + + Args: + mode (SigningModes, optional): What mode to use. Defaults to SigningModes.EXPORTABLE. + signer_passphrase (str | None, optional): Signer passphrase, if applicable. Signer is set with .set_uid(...) Defaults to None. + + Raises: + ExecutionError: Raised if the operation fails + """ self.interactive.writelines( str(mode if mode in SigningModes else SigningModes.EXPORTABLE) ) lines = self.wait_for_status(StatusCodes.GET_LINE, StatusCodes.GET_BOOL) if lines[-1].code == StatusCodes.GET_LINE: - return False + self.interactive.seek(0) + raise ExecutionError(output=self.interactive.read().decode()) self.interactive.writelines("y") lines = self.wait_for_status(StatusCodes.GET_LINE, StatusCodes.GET_HIDDEN) if lines[-1].code == StatusCodes.GET_LINE: - return False + self.interactive.seek(0) + raise ExecutionError(output=self.interactive.read().decode()) - if self.passphrase: - self.interactive.writelines(self.passphrase) + if signer_passphrase: + self.interactive.writelines(signer_passphrase) else: self.interactive.writelines("") - lines = self.wait_for_status(StatusCodes.GET_LINE, StatusCodes.GET_HIDDEN) + self.wait_for_status(StatusCodes.GET_LINE) + + def delete_signature(self, *signers: Key | str): + """Deletes signatures on the active UID signed by *signers. Does not revoke, just deletes. + + Args: + *signers (Key | str): Any number of signers (removes all if not specified) + + Raises: + ExecutionError: If the command fails + """ + self.interactive.writelines("delsig") + lines = self.wait_for_status(StatusCodes.GET_LINE, StatusCodes.GET_BOOL) + if lines[-1].code == StatusCodes.GET_LINE: + self.interactive.seek(0) + raise ExecutionError( + output="Failed: UID/KEY not selected?\n" + + self.interactive.read().decode() + ) + + while True: + if lines[-2].content.startswith("sig:"): + signature: SignatureInfo = parse_infoline(lines[-2].content) + if len(signers) == 0 or signature.key_id in [ + i.key_id if isinstance(i, Key) else i for i in signers + ]: + self.interactive.writelines("y") + else: + self.interactive.writelines("n") + lines = self.wait_for_status(StatusCodes.GET_LINE, StatusCodes.GET_BOOL) + if lines[-1].code == StatusCodes.GET_LINE: + return + + def revoke_signature( + self, + *signers: Key | str, + reason: RevocationReason = RevocationReason.NO_REASON, + description: str | None = None, + ): + """Revokes signatures on the selected UID + + Args: + *signers (Key | str): List of signers (removes all if not specified) + reason (RevocationReason, optional): Revocation reason. Defaults to RevocationReason.NO_REASON. + description (str | None, optional): Revocation description. Defaults to None. + + Raises: + ExecutionError: If the command fails + """ + self.interactive.writelines("revsig") + lines = self.wait_for_status(StatusCodes.GET_LINE, StatusCodes.GET_BOOL) + if lines[-1].code == StatusCodes.GET_LINE: + self.interactive.seek(0) + raise ExecutionError( + output="Failed: UID/KEY not selected?\n" + + self.interactive.read().decode() + ) + + signer_ids = [s.key_id if isinstance(s, Key) else s for s in signers] + + while True: + request = lines[-1] + if request.is_status: + if request.code == StatusCodes.GET_BOOL: + match request.arguments[0]: + case "ask_revoke_sig.one": + sig: SignatureInfo = parse_infoline(lines[-2].content) + if len(signers) == 0 or sig.key_id in signer_ids: + self.interactive.writelines("y") + else: + self.interactive.writelines("n") + case "ask_revoke_sig.okay": + self.interactive.writelines("y") + case "ask_revocation_reason.okay": + self.interactive.writelines("y") + case _: + self.interactive.writelines("") + elif request.code == StatusCodes.GET_LINE: + match request.arguments[0]: + case "ask_revocation_reason.code": + self.interactive.writelines(str(reason)) + case "ask_revocation_reason.text": + if description: + self.interactive.writelines(*description.split("\n")) + else: + self.interactive.writelines("") + case "keyedit.prompt": + return + lines = self.wait_for_status() + + def add_uid( + self, + real_name: str, + email: str | None = None, + comment: str | None = None, + passphrase: str | None = None, + ): + """Adds a single UID to the selected key + + Args: + real_name (str): UID name + email (str | None, optional): UID email. Defaults to None. + comment (str | None, optional): UID comment. Defaults to None. + passphrase (str | None, optional): Key passphrase, if required. Defaults to None. + + Raises: + ExecutionError: If the operation fails + """ + self.interactive.writelines("adduid") + while True: + lines = self.wait_for_status() + + if lines[-1].is_status and lines[-1].code == StatusCodes.GET_LINE: + match lines[-1].arguments[0]: + case "keygen.name": + self.interactive.writelines(real_name) + case "keygen.email": + if email: + self.interactive.writelines(email) + else: + self.interactive.writelines("") + case "keygen.comment": + if comment: + self.interactive.writelines(comment) + else: + self.interactive.writelines("") + case "keyedit.prompt": + return + elif lines[-1].is_status and lines[-1].code == StatusCodes.GET_HIDDEN: + if passphrase: + self.interactive.writelines(passphrase) + else: + self.interactive.writelines("") + elif lines[-1].is_status and lines[-1].code == StatusCodes.ERROR: + self.interactive.seek(0) + raise ExecutionError(self.interactive.read().decode()) + + def delete_uid(self): + self.interactive.writelines("deluid") + lines = self.wait_for_status(StatusCodes.GET_LINE, StatusCodes.GET_BOOL) + if lines[-1].code == StatusCodes.GET_LINE: + raise ExecutionError("Invalid UID selected") + + self.interactive.writelines("y") + self.wait_for_status(StatusCodes.GET_LINE) + + def revoke_uid( + self, + reason: RevocationReason = RevocationReason.NO_REASON, + description: str | None = None, + ): + self.interactive.writelines("revuid") + lines = self.wait_for_status(StatusCodes.GET_LINE, StatusCodes.GET_BOOL) + if lines[-1].code == StatusCodes.GET_LINE: + raise ExecutionError("Invalid UID selected") + + self.interactive.writelines("y") + lines = self.wait_for_status(StatusCodes.GET_LINE) + self.interactive.writelines(str(reason)) + + lines = self.wait_for_status(StatusCodes.GET_LINE) + self.interactive.writelines(description if description else "") + + lines = self.wait_for_status(StatusCodes.GET_BOOL) + self.interactive.writelines("y") + lines = self.wait_for_status(StatusCodes.GET_LINE) diff --git a/scratchpad.py b/scratchpad.py index f549d85..677e920 100644 --- a/scratchpad.py +++ b/scratchpad.py @@ -12,15 +12,12 @@ gpg = GPG(homedir=tmpdir, kill_existing_agent=True) key = gpg.keys.generate_key("Bongus", passphrase="test") key_other = gpg.keys.generate_key("Bingus", passphrase="test2") - with key.edit(passphrase="test2", user=key_other.fingerprint) as editor: - editor.set_key("*") - editor.sign() - print(editor.interactive.writelines("check")) + with key.edit() as editor: + editor.add_uid("bebby", passphrase="test") + print(editor.list()) + editor.set_uid("2") + editor.revoke_uid() editor.save() - editor.interactive.seek(0) - print("===") - print(editor.interactive.read().decode()) - print("===") key.reload() print(key.model_dump_json(indent=4))