Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PoC] Wallet policies (and miniscript support) #647

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions hwilib/_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
getdescriptors,
prompt_pin,
toggle_passphrase,
registerpolicy,
restore_device,
send_pin,
setup_device,
Expand Down Expand Up @@ -57,7 +58,7 @@ def backup_device_handler(args: argparse.Namespace, client: HardwareWalletClient
return backup_device(client, label=args.label, backup_passphrase=args.backup_passphrase)

def displayaddress_handler(args: argparse.Namespace, client: HardwareWalletClient) -> Dict[str, str]:
return displayaddress(client, desc=args.desc, path=args.path, addr_type=args.addr_type)
return displayaddress(client, desc=args.desc, path=args.path, policy=args.policy, addr_type=args.addr_type, name=args.name, keys=args.keys, change=args.change, index=args.index, extra=args.extra)

def enumerate_handler(args: argparse.Namespace) -> List[Dict[str, Any]]:
return enumerate(password=args.password)
Expand All @@ -74,6 +75,9 @@ def getkeypool_handler(args: argparse.Namespace, client: HardwareWalletClient) -
def getdescriptors_handler(args: argparse.Namespace, client: HardwareWalletClient) -> Dict[str, List[str]]:
return getdescriptors(client, account=args.account)

def registerpolicy_handler(args: argparse.Namespace, client: HardwareWalletClient) -> Dict[str, str]:
return registerpolicy(client, name=args.name, policy=args.policy, keys=args.keys, extra=args.extra)

def restore_device_handler(args: argparse.Namespace, client: HardwareWalletClient) -> Dict[str, bool]:
if args.interactive:
return restore_device(client, label=args.label, word_count=args.word_count)
Expand All @@ -88,7 +92,7 @@ def signmessage_handler(args: argparse.Namespace, client: HardwareWalletClient)
return signmessage(client, message=args.message, path=args.path)

def signtx_handler(args: argparse.Namespace, client: HardwareWalletClient) -> Dict[str, Union[bool, str]]:
return signtx(client, psbt=args.psbt)
return signtx(client, psbt=args.psbt, name=args.name, policy=args.policy, keys=args.keys, extra=args.extra)

def wipe_device_handler(args: argparse.Namespace, client: HardwareWalletClient) -> Dict[str, bool]:
return wipe_device(client)
Expand Down Expand Up @@ -160,6 +164,11 @@ def get_parser() -> HWIArgumentParser:

signtx_parser = subparsers.add_parser('signtx', help='Sign a PSBT')
signtx_parser.add_argument('psbt', help='The Partially Signed Bitcoin Transaction to sign')
signtx_parser.add_argument('--policy', help='The descriptor template of the wallet policy. E.g. wpkh(@0/**)', type=str)
signtx_parser.add_argument('--name', help='The name of the policy. E.g. "Cold storage"', type=str)
signtx_parser.add_argument('--keys', help='The list of keys in the wallet policy, encoded in JSON', type=str)
signtx_parser.add_argument('--extra', help='JSON encoded string; it might contain proof_of_registration, or vendor-specific fields', type=str, default="{}")

signtx_parser.set_defaults(func=signtx_handler)

getxpub_parser = subparsers.add_parser('getxpub', help='Get an extended public key')
Expand Down Expand Up @@ -192,8 +201,14 @@ def get_parser() -> HWIArgumentParser:
displayaddr_parser = subparsers.add_parser('displayaddress', help='Display an address')
group = displayaddr_parser.add_mutually_exclusive_group(required=True)
group.add_argument('--desc', help='Output Descriptor. E.g. wpkh([00000000/84h/0h/0h]xpub.../0/0), where 00000000 must match --fingerprint and xpub can be obtained with getxpub. See doc/descriptors.md in Bitcoin Core')
group.add_argument('--policy', help='The descriptor template of the wallet policy. E.g. wpkh(@0/**)')
group.add_argument('--path', help='The BIP 32 derivation path of the key embedded in the address, default follows BIP43 convention, e.g. ``m/84h/0h/0h/1/*``')
displayaddr_parser.add_argument("--addr-type", help="The address type to display", type=AddressType.argparse, choices=list(AddressType), default=AddressType.WIT) # type: ignore
displayaddr_parser.add_argument('--name', help='The name of the policy. E.g. "Cold storage". Can be empty for default single-signature wallets')
displayaddr_parser.add_argument('--keys', help='The list of keys in the wallet policy, encoded in JSON')
displayaddr_parser.add_argument('--change', type=int, help='0 if not change, 1 if change', default=0) # TODO: can we use 'choices=' here?
displayaddr_parser.add_argument('--index', help='address index', type=int, default=0)
displayaddr_parser.add_argument('--extra', help='JSON encoded string; it might contain proof_of_registration, or vendor-specific fields', type=str, default="{}")
displayaddr_parser.set_defaults(func=displayaddress_handler)

setupdev_parser = subparsers.add_parser('setup', help='Setup a device. Passphrase protection uses the password given by -p. Requires interactive mode')
Expand All @@ -204,6 +219,13 @@ def get_parser() -> HWIArgumentParser:
wipedev_parser = subparsers.add_parser('wipe', help='Wipe a device')
wipedev_parser.set_defaults(func=wipe_device_handler)

registerpolicy_parser = subparsers.add_parser('registerpolicy', help='Register a policy')
registerpolicy_parser.add_argument('--name', help='The name of the policy. E.g. "Cold storage"', type=str, required=True)
registerpolicy_parser.add_argument('--policy', help='The descriptor template of the wallet policy. E.g. wpkh(@0/**)', type=str, required=True)
registerpolicy_parser.add_argument('--keys', help='The list of keys in the wallet policy, encoded in JSON', type=str, required=True)
registerpolicy_parser.add_argument('--extra', help='JSON encoded string; it might contain proof_of_registration, or vendor-specific fields', type=str, default="{}")
registerpolicy_parser.set_defaults(func=registerpolicy_handler)

restore_parser = subparsers.add_parser('restore', help='Initiate the device restoring process. Requires interactive mode')
restore_parser.add_argument('--word_count', '-w', help='Word count of your BIP39 recovery phrase (options: 12/18/24)', type=int, default=24)
restore_parser.add_argument('--label', '-l', help='The name to give to the device', default='')
Expand Down
101 changes: 95 additions & 6 deletions hwilib/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import importlib
import logging
import platform
import json

from ._base58 import xpub_to_pub_hex, xpub_to_xonly_pub_hex
from .key import (
Expand Down Expand Up @@ -49,6 +50,7 @@
WPKHDescriptor,
WSHDescriptor,
)
from .wallet_policy import WalletPolicy
from .devices import __all__ as all_devs
from .common import (
AddressType,
Expand Down Expand Up @@ -182,19 +184,44 @@ def getmasterxpub(client: HardwareWalletClient, addrtype: AddressType = AddressT
"""
return {"xpub": client.get_master_xpub(addrtype, account).to_string()}

def signtx(client: HardwareWalletClient, psbt: str) -> Dict[str, Union[bool, str]]:
def signtx(
client: HardwareWalletClient,
psbt: str,
policy: Optional[str],
name: Optional[str],
keys: Optional[str],
extra: str = "{}"
) -> Dict[str, Union[bool, str]]:
"""
Sign a Partially Signed Bitcoin Transaction (PSBT) with the client.

:param client: The client to interact with
:param psbt: The PSBT to sign
:param policy: The descriptor template for the wallet policy to display the address for. Mutually exclusive with ``desc`` or ``path``
:param name: The name of the wallet policy, if registered. Only works with ``policy``; if omitted, empty name is implied
:param keys: The list of keys information, as a JSON-encoded string. Only works with ``policy``
:param extra: A JSON-encoded string representing a dictionary with any additional field for the hardware wallet
:return: A dictionary containing the processed PSBT serialized in Base64.
Returned as ``{"psbt": <base64 psbt string>}``.
"""
# Deserialize the transaction
tx = PSBT()
tx.deserialize(psbt)
result = client.sign_tx(tx).serialize()
if policy is None:
result = client.sign_tx(tx).serialize()
else:
if keys is None:
raise BadArgumentError("--keys parameter is compulsory when using --policy")

keys_info = json.loads(keys)
if not isinstance(keys_info, list) or any(not isinstance(k, str) for k in keys_info):
raise BadArgumentError("keys should be a json-encoded list of keys information")

# TODO: could do more validation of each key origin info

parsed_extra = json.loads(extra)
wp = WalletPolicy(name if name is not None else "", policy, keys_info, parsed_extra)
result = client.sign_tx_with_wallet_policy(tx, wp, parsed_extra).serialize()
return {"psbt": result, "signed": result != psbt}

def getxpub(client: HardwareWalletClient, path: str, expert: bool = False) -> Dict[str, Any]:
Expand Down Expand Up @@ -436,20 +463,68 @@ def getdescriptors(

return result


def registerpolicy(
client: HardwareWalletClient,
policy: str,
name: str,
keys: str,
extra: str = "{}"
) -> Dict[str, str]:
"""
Display an address on the device for client.
The address can be specified by the path with additional parameters, or by a descriptor.

:param client: The client to interact with
:param policy: The descriptor template for the wallet policy to display the address for. Mutually exclusive with ``desc`` or ``path``
:param name: The name of the wallet policy, if registered. Only works with ``policy``; if omitted, empty name is implied
:param keys: The list of keys information, as a JSON-encoded string. Only works with ``policy``
:param extra: A JSON-encoded string representing a dictionary with any additional field for the hardware wallet
:return: On success, returns the proof of registration if the hardware wallet requires it.
Returned as ``{"proof_of_registration": <hex string>}``. The proof_of_registration is the empty string is the hardware
wallet does not return a proof of registration.
:raises: BadArgumentError: if an argument is malformed or missing.
"""
if name == "":
raise BadArgumentError("The policy name cannot be empty")

keys_info = json.loads(keys)
if not isinstance(keys_info, list) or any(not isinstance(k, str) for k in keys_info):
raise BadArgumentError("keys should be a json-encoded list of keys information")

# TODO: could do more validation of each key origin info

parsed_extra = json.loads(extra)
wp = WalletPolicy(name if name is not None else "", policy, keys_info, parsed_extra)
return {"proof_of_registration": client.register_wallet_policy(wp, parsed_extra).hex()}


def displayaddress(
client: HardwareWalletClient,
path: Optional[str] = None,
desc: Optional[str] = None,
addr_type: AddressType = AddressType.WIT
policy: Optional[str] = None,
addr_type: AddressType = AddressType.WIT,
name: Optional[str] = None,
keys: Optional[str] = None,
change: Optional[int] = 0,
index: Optional[int] = 0,
extra: str = "{}"
) -> Dict[str, str]:
"""
Display an address on the device for client.
The address can be specified by the path with additional parameters, or by a descriptor.

:param client: The client to interact with
:param path: The path of the address to display. Mutually exclusive with ``desc``
:param desc: The descriptor to display the address for. Mutually exclusive with ``path``
:param path: The path of the address to display. Mutually exclusive with ``desc`` or ``policy``
:param desc: The descriptor to display the address for. Mutually exclusive with ``path`` or ``policy``
:param policy: The descriptor template for the wallet policy to display the address for. Mutually exclusive with ``desc`` or ``path``
:param addr_type: The address type to return. Only works with ``path``
:param name: The name of the wallet policy, if registered. Only works with ``policy``; if omitted, empty name is implied
:param keys: The list of keys information, as a JSON-encoded string. Only works with ``policy``
:param change: 0 if a normal receive address is desired, 1 for a change address. Only works with ``policy``
:param index: The address index of the required address, a number between 0 and 2147483647 (include). Only works with ``policy``
:param extra: A JSON-encoded string representing a dictionary with any additional field for the hardware wallet
:return: A dictionary containing the address displayed.
Returned as ``{"address": <base58 or bech32 address string>}``.
:raises: BadArgumentError: if an argument is malformed, missing, or conflicts.
Expand Down Expand Up @@ -491,7 +566,21 @@ def displayaddress(
elif isinstance(descriptor, TRDescriptor):
addr_type = AddressType.TAP
return {"address": client.display_singlesig_address(pubkey.get_full_derivation_path(0), addr_type)}
raise BadArgumentError("Missing both path and descriptor")
elif policy is not None:
if keys is None:
raise BadArgumentError("--keys parameter is compulsory when using --policy")

keys_info = json.loads(keys)
if not isinstance(keys_info, list) or any(not isinstance(k, str) for k in keys_info):
raise BadArgumentError("keys should be a json-encoded list of keys information")

# TODO: could do more validation of each key origin info

parsed_extra = json.loads(extra)
wp = WalletPolicy(name if name is not None else "", policy, keys_info, parsed_extra)
return {"address": client.display_wallet_policy_address(wp, change == 1, index, parsed_extra)}

raise BadArgumentError("Missing all of path, descriptor and policy")

def setup_device(client: HardwareWalletClient, label: str = "", backup_passphrase: str = "") -> Dict[str, bool]:
"""
Expand Down
114 changes: 108 additions & 6 deletions hwilib/devices/ledger.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,8 @@
LegacyClient,
TransportClient,
)
from .ledger_bitcoin.command_builder import get_wallet_policy_id
from .ledger_bitcoin.exception import NotSupportedError
from .ledger_bitcoin.wallet import (
MultisigWallet,
WalletPolicy,
)
from .ledger_bitcoin.btchip.btchipException import BTChipException

import builtins
Expand All @@ -66,6 +63,7 @@
parse_multisig,
)
from ..psbt import PSBT
from ..wallet_policy import WalletPolicy
import logging
import re

Expand Down Expand Up @@ -138,6 +136,41 @@ def func(*args: Any, **kwargs: Any) -> Any:
raise e
return func

class MultisigWallet(WalletPolicy):
"""Helper class to represent common multisignature wallet policies."""

def __init__(self, name: str, address_type: AddressType, threshold: int, keys_info: List[str], sorted: bool = True) -> None:
n_keys = len(keys_info)

if not (1 <= threshold <= n_keys <= 16):
raise ValueError("Invalid threshold or number of keys")

multisig_op = "sortedmulti" if sorted else "multi"

if (address_type == AddressType.LEGACY):
policy_prefix = f"sh({multisig_op}("
policy_suffix = "))"
elif address_type == AddressType.WIT:
policy_prefix = f"wsh({multisig_op}("
policy_suffix = "))"
elif address_type == AddressType.SH_WIT:
policy_prefix = f"sh(wsh({multisig_op}("
policy_suffix = ")))"
else:
raise ValueError(f"Unexpected address type: {address_type}")

descriptor_template = "".join([
policy_prefix,
str(threshold) + ",",
",".join("@" + str(k) + "/**" for k in range(n_keys)),
policy_suffix
])

super().__init__(name, descriptor_template, keys_info)

self.threshold = threshold


# This class extends the HardwareWalletClient for Ledger Nano S and Nano X specific things
class LedgerClient(HardwareWalletClient):

Expand Down Expand Up @@ -289,7 +322,7 @@ def legacy_sign_tx() -> PSBT:

# Make and register the MultisigWallet
msw = MultisigWallet(f"{k} of {len(key_exprs)} Multisig", script_addrtype, k, key_exprs)
msw_id = msw.id
msw_id = get_wallet_policy_id(msw)
if msw_id not in wallets:
_, registered_hmac = self.client.register_wallet(msw)
wallets[msw_id] = (
Expand All @@ -304,7 +337,8 @@ def process_origin(origin: KeyOriginInfo) -> None:
# TODO: Deal with non-default wallets
return
policy = self._get_singlesig_default_wallet_policy(script_addrtype, origin.path[2])
wallets[policy.id] = (
policy_id = get_wallet_policy_id(policy)
wallets[policy_id] = (
signing_priority[script_addrtype],
script_addrtype,
self._get_singlesig_default_wallet_policy(script_addrtype, origin.path[2]),
Expand Down Expand Up @@ -538,6 +572,74 @@ def can_sign_taproot(self) -> bool:
"""
return isinstance(self.client, NewClient)

def can_register_wallet_policies(self) -> bool:
return True

@ledger_exception
def register_wallet_policy(self, wallet_policy: WalletPolicy, extra: Dict[str, Any]) -> bytes:
_, hmac = self.client.register_wallet(wallet_policy)
return hmac

@ledger_exception
def display_wallet_policy_address(self, wallet_policy: WalletPolicy, is_change: bool, address_index: int, extra: Dict[str, Any]) -> str:
# TODO: if non standard policy, should return an error if proof_of_registration is missing
if "proof_of_registration" not in extra:
hmac = None
else:
# TODO: error handling
hmac = bytes.fromhex(extra["proof_of_registration"])
assert len(hmac) == 32

return self.client.get_wallet_address(wallet_policy, hmac, int(is_change), address_index, True)

@ledger_exception
def sign_tx_with_wallet_policy(self, psbt: PSBT, wallet_policy: WalletPolicy, extra: Dict[str, Any]) -> PSBT:
# TODO: proof_of_registration not required for standard policies
if "proof_of_registration" not in extra:
raise BadArgumentError("Ledger Bitcoin app requires a proof_of_registration for non-standard wallet policies")

# TODO: proper error handling
wallet_hmac = bytes.fromhex(extra["proof_of_registration"])

assert len(wallet_hmac) == 32

# Make a deepcopy of this psbt. We will need to modify it to get signing to work,
# which will affect the caller's detection for whether signing occured.
psbt2 = copy.deepcopy(psbt)
if psbt.version != 2:
psbt2.convert_to_v2()

input_sigs = self.client.sign_psbt(psbt2, wallet_policy, wallet_hmac)

for idx, pubkey, sig in input_sigs:
psbt_in = psbt2.inputs[idx]

utxo = None
if psbt_in.witness_utxo:
utxo = psbt_in.witness_utxo
if psbt_in.non_witness_utxo:
assert psbt_in.prev_out is not None
utxo = psbt_in.non_witness_utxo.vout[psbt_in.prev_out]
assert utxo is not None

is_wit, wit_ver, _ = utxo.is_witness()

if is_wit and wit_ver >= 1:
# TODO: Deal with script path signatures
# For now, assume key path signature
psbt_in.tap_key_sig = sig
else:
psbt_in.partial_sigs[pubkey] = sig

# Extract the sigs from psbt2 and put them into tx
for sig_in, psbt_in in zip(psbt2.inputs, psbt.inputs):
psbt_in.partial_sigs.update(sig_in.partial_sigs)
psbt_in.tap_script_sigs.update(sig_in.tap_script_sigs)
if len(sig_in.tap_key_sig) != 0 and len(psbt_in.tap_key_sig) == 0:
psbt_in.tap_key_sig = sig_in.tap_key_sig

return psbt


def enumerate(password: str = '') -> List[Dict[str, Any]]:
results = []
Expand Down
4 changes: 1 addition & 3 deletions hwilib/devices/ledger_bitcoin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,4 @@
from .client import createClient
from ...common import Chain

from .wallet import AddressType, WalletPolicy, MultisigWallet

__all__ = ["Client", "TransportClient", "createClient", "Chain", "AddressType", "WalletPolicy", "MultisigWallet"]
__all__ = ["Client", "TransportClient", "createClient", "Chain"]
Loading