Skip to content

Commit

Permalink
add additional key edit ops
Browse files Browse the repository at this point in the history
  • Loading branch information
Dax Harris committed Mar 22, 2024
1 parent 12308b5 commit 89b2b5d
Show file tree
Hide file tree
Showing 4 changed files with 240 additions and 25 deletions.
6 changes: 3 additions & 3 deletions docs/api/operators/keys.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ Wrapper for individual key functions, such as signing, encryption, etc. Returned

---

## `KeyEditSession()` - Wrapper for `gpg --edit-key`
## `KeyEditor()` - Wrapper for `gpg --edit-key`

Wraps advanced key editing functions in a stateless interface. Returned by `Key().edit(...)`
Wraps advanced key editing functions in a stateless interface. Returned by `Key().edit(...)`. Operations in this category should be considered unsafe, as they rely on an unstable terminal menu to function.

::: gpyg.operators.KeyEditSession
::: gpyg.operators.KeyEditor
5 changes: 5 additions & 0 deletions gpyg/models/key_editing.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,8 @@ class SigningModes(StrEnum):
NON_EXPORTABLE = "lsign"
NON_REVOCABLE = "nrsign"
TRUST = "tsign"


class RevocationReason(StrEnum):
NO_REASON = "0"
USER_ID_INVALID = "4"
241 changes: 227 additions & 14 deletions gpyg/operators/keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,19 @@
from typing import Any, Literal

from pydantic import Field, PrivateAttr, computed_field

from gpyg.util import interactive
from .common import BaseOperator
from ..util import ExecutionError, ProcessSession, StatusInteractive, StatusLine
from ..models import InfoLine, parse_infoline, KeyModel, StatusCodes, SigningModes
from ..models import (
InfoLine,
parse_infoline,
KeyModel,
StatusCodes,
SigningModes,
SignatureInfo,
RevocationReason,
)


class KeyOperator(BaseOperator):
Expand Down Expand Up @@ -554,31 +564,40 @@ def set_primary_uid(self, uid: str, passphrase: str | None = None) -> "Key":
raise ExecutionError(proc.output)

@contextmanager
def edit(self, passphrase: str | None = None, user: str | None = None):
def edit(self, user: str | None = None):
"""Create an interactive editing instance
Args:
user (str | None, optional): Optional user to run as. If left as None, uses the current Key's user ID. Defaults to None.
Yields:
KeyEditor: An initialized interactive instance
"""
with StatusInteractive(
self.session,
f"gpg --command-fd 0 --status-fd 1 -u {user if user else self.fingerprint} --with-sig-list --debug-level guru --pinentry-mode loopback --yes --with-colons --expert --no-tty --edit-key {self.fingerprint}",
f"gpg --command-fd 0 --status-fd 1 -u {user if user else self.fingerprint} --with-sig-list --pinentry-mode loopback --yes --with-colons --no-tty --edit-key {self.fingerprint}",
) as interactive:
editor = KeyEditor(
self, passphrase, user if user else self.fingerprint, interactive
)
editor = KeyEditor(self, user if user else self.fingerprint, interactive)
yield editor


class KeyEditor:

def __init__(
self,
key: Key,
passphrase: str | None,
user: str,
interactive: StatusInteractive,
):
self.key = key
self.passphrase = passphrase
self.user = user
self.interactive = interactive
self.wait_for_status(StatusCodes.GET_LINE)

def dbg(self):
for line in self.interactive.readlines(yield_empty=False):
print(line)

def wait_for_status(self, *code: str):
lines: list[StatusLine] = []
for line in self.interactive.readlines():
Expand All @@ -594,34 +613,228 @@ def list(self) -> list[InfoLine]:
return [parse_infoline(line.content) for line in non_status]

def quit(self):
"""Quit without saving"""
self.interactive.writelines("quit")
self.interactive.process.wait()

def save(self):
"""Save changes & quit"""
self.interactive.writelines("save")
self.interactive.process.wait()

def set_uid(self, uid: str):
"""Select a specific UID within the current key
Args:
uid (str): UID to select, or "0" for no UID
"""
self.interactive.writelines(f"uid {uid}")
self.wait_for_status(StatusCodes.GET_LINE)

def set_key(self, key: str):
"""Select a specific subkey
Args:
key (str): KEY to select, or "0" for none.
"""
self.interactive.writelines(f"key {key}")
self.wait_for_status(StatusCodes.GET_LINE)

def sign(self, mode: SigningModes = SigningModes.EXPORTABLE) -> bool:
def sign(
self,
mode: SigningModes = SigningModes.EXPORTABLE,
signer_passphrase: str | None = None,
) -> None:
"""Sign the active key.
Args:
mode (SigningModes, optional): What mode to use. Defaults to SigningModes.EXPORTABLE.
signer_passphrase (str | None, optional): Signer passphrase, if applicable. Signer is set with .set_uid(...) Defaults to None.
Raises:
ExecutionError: Raised if the operation fails
"""
self.interactive.writelines(
str(mode if mode in SigningModes else SigningModes.EXPORTABLE)
)
lines = self.wait_for_status(StatusCodes.GET_LINE, StatusCodes.GET_BOOL)
if lines[-1].code == StatusCodes.GET_LINE:
return False
self.interactive.seek(0)
raise ExecutionError(output=self.interactive.read().decode())
self.interactive.writelines("y")
lines = self.wait_for_status(StatusCodes.GET_LINE, StatusCodes.GET_HIDDEN)
if lines[-1].code == StatusCodes.GET_LINE:
return False
self.interactive.seek(0)
raise ExecutionError(output=self.interactive.read().decode())

if self.passphrase:
self.interactive.writelines(self.passphrase)
if signer_passphrase:
self.interactive.writelines(signer_passphrase)
else:
self.interactive.writelines("")

lines = self.wait_for_status(StatusCodes.GET_LINE, StatusCodes.GET_HIDDEN)
self.wait_for_status(StatusCodes.GET_LINE)

def delete_signature(self, *signers: Key | str):
"""Deletes signatures on the active UID signed by *signers. Does not revoke, just deletes.
Args:
*signers (Key | str): Any number of signers (removes all if not specified)
Raises:
ExecutionError: If the command fails
"""
self.interactive.writelines("delsig")
lines = self.wait_for_status(StatusCodes.GET_LINE, StatusCodes.GET_BOOL)
if lines[-1].code == StatusCodes.GET_LINE:
self.interactive.seek(0)
raise ExecutionError(
output="Failed: UID/KEY not selected?\n"
+ self.interactive.read().decode()
)

while True:
if lines[-2].content.startswith("sig:"):
signature: SignatureInfo = parse_infoline(lines[-2].content)
if len(signers) == 0 or signature.key_id in [
i.key_id if isinstance(i, Key) else i for i in signers
]:
self.interactive.writelines("y")
else:
self.interactive.writelines("n")
lines = self.wait_for_status(StatusCodes.GET_LINE, StatusCodes.GET_BOOL)
if lines[-1].code == StatusCodes.GET_LINE:
return

def revoke_signature(
self,
*signers: Key | str,
reason: RevocationReason = RevocationReason.NO_REASON,
description: str | None = None,
):
"""Revokes signatures on the selected UID
Args:
*signers (Key | str): List of signers (removes all if not specified)
reason (RevocationReason, optional): Revocation reason. Defaults to RevocationReason.NO_REASON.
description (str | None, optional): Revocation description. Defaults to None.
Raises:
ExecutionError: If the command fails
"""
self.interactive.writelines("revsig")
lines = self.wait_for_status(StatusCodes.GET_LINE, StatusCodes.GET_BOOL)
if lines[-1].code == StatusCodes.GET_LINE:
self.interactive.seek(0)
raise ExecutionError(
output="Failed: UID/KEY not selected?\n"
+ self.interactive.read().decode()
)

signer_ids = [s.key_id if isinstance(s, Key) else s for s in signers]

while True:
request = lines[-1]
if request.is_status:
if request.code == StatusCodes.GET_BOOL:
match request.arguments[0]:
case "ask_revoke_sig.one":
sig: SignatureInfo = parse_infoline(lines[-2].content)
if len(signers) == 0 or sig.key_id in signer_ids:
self.interactive.writelines("y")
else:
self.interactive.writelines("n")
case "ask_revoke_sig.okay":
self.interactive.writelines("y")
case "ask_revocation_reason.okay":
self.interactive.writelines("y")
case _:
self.interactive.writelines("")
elif request.code == StatusCodes.GET_LINE:
match request.arguments[0]:
case "ask_revocation_reason.code":
self.interactive.writelines(str(reason))
case "ask_revocation_reason.text":
if description:
self.interactive.writelines(*description.split("\n"))
else:
self.interactive.writelines("")
case "keyedit.prompt":
return
lines = self.wait_for_status()

def add_uid(
self,
real_name: str,
email: str | None = None,
comment: str | None = None,
passphrase: str | None = None,
):
"""Adds a single UID to the selected key
Args:
real_name (str): UID name
email (str | None, optional): UID email. Defaults to None.
comment (str | None, optional): UID comment. Defaults to None.
passphrase (str | None, optional): Key passphrase, if required. Defaults to None.
Raises:
ExecutionError: If the operation fails
"""
self.interactive.writelines("adduid")
while True:
lines = self.wait_for_status()

if lines[-1].is_status and lines[-1].code == StatusCodes.GET_LINE:
match lines[-1].arguments[0]:
case "keygen.name":
self.interactive.writelines(real_name)
case "keygen.email":
if email:
self.interactive.writelines(email)
else:
self.interactive.writelines("")
case "keygen.comment":
if comment:
self.interactive.writelines(comment)
else:
self.interactive.writelines("")
case "keyedit.prompt":
return
elif lines[-1].is_status and lines[-1].code == StatusCodes.GET_HIDDEN:
if passphrase:
self.interactive.writelines(passphrase)
else:
self.interactive.writelines("")
elif lines[-1].is_status and lines[-1].code == StatusCodes.ERROR:
self.interactive.seek(0)
raise ExecutionError(self.interactive.read().decode())

def delete_uid(self):
self.interactive.writelines("deluid")
lines = self.wait_for_status(StatusCodes.GET_LINE, StatusCodes.GET_BOOL)
if lines[-1].code == StatusCodes.GET_LINE:
raise ExecutionError("Invalid UID selected")

self.interactive.writelines("y")
self.wait_for_status(StatusCodes.GET_LINE)

def revoke_uid(
self,
reason: RevocationReason = RevocationReason.NO_REASON,
description: str | None = None,
):
self.interactive.writelines("revuid")
lines = self.wait_for_status(StatusCodes.GET_LINE, StatusCodes.GET_BOOL)
if lines[-1].code == StatusCodes.GET_LINE:
raise ExecutionError("Invalid UID selected")

self.interactive.writelines("y")
lines = self.wait_for_status(StatusCodes.GET_LINE)
self.interactive.writelines(str(reason))

lines = self.wait_for_status(StatusCodes.GET_LINE)
self.interactive.writelines(description if description else "")

lines = self.wait_for_status(StatusCodes.GET_BOOL)
self.interactive.writelines("y")
lines = self.wait_for_status(StatusCodes.GET_LINE)
13 changes: 5 additions & 8 deletions scratchpad.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,12 @@
gpg = GPG(homedir=tmpdir, kill_existing_agent=True)
key = gpg.keys.generate_key("Bongus", passphrase="test")
key_other = gpg.keys.generate_key("Bingus", passphrase="test2")
with key.edit(passphrase="test2", user=key_other.fingerprint) as editor:
editor.set_key("*")
editor.sign()
print(editor.interactive.writelines("check"))
with key.edit() as editor:
editor.add_uid("bebby", passphrase="test")
print(editor.list())
editor.set_uid("2")
editor.revoke_uid()
editor.save()
editor.interactive.seek(0)
print("===")
print(editor.interactive.read().decode())
print("===")

key.reload()
print(key.model_dump_json(indent=4))
Expand Down

0 comments on commit 89b2b5d

Please sign in to comment.