diff --git a/acapy_agent/ledger/indy_vdr.py b/acapy_agent/ledger/indy_vdr.py index d26734ec6f..e966f04c50 100644 --- a/acapy_agent/ledger/indy_vdr.py +++ b/acapy_agent/ledger/indy_vdr.py @@ -11,7 +11,7 @@ from io import StringIO from pathlib import Path from time import time -from typing import List, Optional, Tuple, Union +from typing import Dict, List, Optional, Tuple, Union from indy_vdr import Pool, Request, VdrError, ledger, open_pool @@ -40,36 +40,48 @@ def _normalize_txns(txns: str) -> str: """Normalize a set of genesis transactions.""" + LOGGER.debug("Normalizing genesis transactions") lines = StringIO() for line in txns.splitlines(): line = line.strip() if line: lines.write(line) lines.write("\n") + LOGGER.debug("Finished normalizing genesis transactions") return lines.getvalue() def _write_safe(path: Path, content: str): """Atomically write to a file path.""" + LOGGER.debug("Writing content safely to path: %s", path) dir_path = path.parent with tempfile.NamedTemporaryFile(dir=dir_path, delete=False) as tmp: + LOGGER.debug("Created temporary file: %s", tmp.name) tmp.write(content.encode("utf-8")) tmp_name = tmp.name + LOGGER.debug("Renaming temporary file to target path") os.rename(tmp_name, path) + LOGGER.debug("Successfully wrote content to: %s", path) def _hash_txns(txns: str) -> str: """Obtain a hash of a set of genesis transactions.""" - return hashlib.sha256(txns.encode("utf-8")).hexdigest()[-16:] + LOGGER.debug("Calculating hash of genesis transactions") + hash_value = hashlib.sha256(txns.encode("utf-8")).hexdigest()[-16:] + LOGGER.debug("Generated transaction hash: %s", hash_value) + return hash_value class IndyVdrLedgerPool: - """Indy-VDR ledger pool manager.""" + """Indy-VDR ledger pool manager with singleton behavior based on configuration.""" + + _instances: Dict[tuple, "IndyVdrLedgerPool"] = {} + _lock = asyncio.Lock() def __init__( self, - name: str, *, + name: str, keepalive: int = 0, cache: Optional[BaseCache] = None, cache_duration: int = 600, @@ -77,69 +89,232 @@ def __init__( read_only: bool = False, socks_proxy: Optional[str] = None, ): - """Initialize an IndyLedger instance. + """Private constructor. Use 'create_instance' to instantiate.""" + LOGGER.debug( + "Initializing IndyVdrLedgerPool with name: %s, keepalive: %s, " + "cache_duration: %s, read_only: %s", + name, + keepalive, + cache_duration, + read_only, + ) - Args: - name: The pool ledger configuration name - keepalive: How many seconds to keep the ledger open - cache: The cache instance to use - cache_duration: The TTL for ledger cache entries - genesis_transactions: The ledger genesis transaction as a string - read_only: Prevent any ledger write operations - socks_proxy: Specifies socks proxy for ZMQ to connect to ledger pool - """ - self.ref_count = 0 - self.ref_lock = asyncio.Lock() + # Instance attributes + self.name = name self.keepalive = keepalive - self.close_task: asyncio.Future = None self.cache = cache - self.cache_duration: int = cache_duration + self.cache_duration = cache_duration + self.genesis_transactions = genesis_transactions + self.read_only = read_only + self.socks_proxy = socks_proxy + + self.ref_count = 0 + self.ref_lock = asyncio.Lock() + self.close_task: Optional[asyncio.Task] = None self.handle: Optional[Pool] = None - self.name = name self.cfg_path_cache: Optional[Path] = None self.genesis_hash_cache: Optional[str] = None self.genesis_txns_cache = genesis_transactions self.init_config = bool(genesis_transactions) self.taa_cache: Optional[str] = None - self.read_only: bool = read_only - self.socks_proxy: str = socks_proxy + + LOGGER.debug("Pool %s initialization staged", name) + + @classmethod + async def get_or_create( + cls, + *, + name: str, + keepalive: int = 0, + cache: Optional[BaseCache] = None, + cache_duration: int = 600, + genesis_transactions: Optional[str] = None, + read_only: bool = False, + socks_proxy: Optional[str] = None, + ) -> "IndyVdrLedgerPool": + """Asynchronously get or create the singleton instance based on configuration. + + Args: + name: The pool ledger configuration name. + keepalive: How many seconds to keep the ledger open. + cache: The cache instance to use. + cache_duration: The TTL for ledger cache entries. + genesis_transactions: The ledger genesis transaction as a string. + read_only: Prevent any ledger write operations. + socks_proxy: Specifies socks proxy for ZMQ to connect to ledger pool. + + Returns: + An initialized instance of IndyVdrLedgerPool. + """ + LOGGER.debug( + "Creating or retrieving IndyVdrLedgerPool instance with params: name=%s, " + "keepalive=%s, cache_duration=%s, read_only=%s, socks_proxy=%s", + name, + keepalive, + cache_duration, + read_only, + socks_proxy, + ) + + config_key = ( + name, + keepalive, + cache_duration, + genesis_transactions, + read_only, + socks_proxy, + ) + LOGGER.debug("Generated config key: %s", config_key) + + async with cls._lock: + if config_key not in cls._instances: + LOGGER.debug( + "No existing instance found for config key, creating new instance" + ) + instance = cls( + name=name, + keepalive=keepalive, + cache=cache, + cache_duration=cache_duration, + genesis_transactions=genesis_transactions, + read_only=read_only, + socks_proxy=socks_proxy, + ) + try: + LOGGER.debug("Initializing new IndyVdrLedgerPool instance") + await instance.initialize() + except Exception as e: + LOGGER.exception( + "Initialization failed for IndyVdrLedgerPool with config: %s", + config_key, + exc_info=e, + ) + raise + cls._instances[config_key] = instance + LOGGER.debug( + "Successfully created and stored new IndyVdrLedgerPool instance: %s", + config_key, + ) + else: + LOGGER.debug( + "Found existing IndyVdrLedgerPool instance for config: %s", config_key + ) + instance = cls._instances[config_key] + + async with instance.ref_lock: + instance.ref_count += 1 + LOGGER.debug( + "Incremented reference count to %s for instance %s", + instance.ref_count, + config_key, + ) + + LOGGER.debug( + "Returning IndyVdrLedgerPool instance with ref_count: %s", + instance.ref_count, + ) + return instance + + async def initialize(self): + """Initialize the ledger pool.""" + LOGGER.debug("Beginning pool initialization") + if self.init_config: + LOGGER.debug("Creating pool config with genesis transactions") + await self.create_pool_config(self.genesis_txns_cache, recreate=True) + self.init_config = False + LOGGER.debug("Opening pool connection") + await self.open() + LOGGER.debug("Pool initialization complete") + + @classmethod + async def release_instance(cls, instance: "IndyVdrLedgerPool"): + """Release a reference to the instance and possibly remove it from the registry. + + Args: + instance: The IndyVdrLedgerPool instance to release. + """ + LOGGER.debug("Beginning instance release process for pool: %s", instance.name) + config_key = ( + instance.name, + instance.keepalive, + instance.cache_duration, + instance.genesis_transactions, + instance.read_only, + instance.socks_proxy, + ) + LOGGER.debug("Generated config key for release: %s", config_key) + + async with cls._lock: + async with instance.ref_lock: + instance.ref_count -= 1 + LOGGER.debug( + "Decremented reference count to %s for instance %s", + instance.ref_count, + config_key, + ) + if instance.ref_count <= 0: + LOGGER.debug( + "Reference count is zero or negative, cleaning up instance" + ) + await instance.close() + del cls._instances[config_key] + LOGGER.debug( + "Successfully removed IndyVdrLedgerPool instance: %s", config_key + ) + else: + LOGGER.debug( + "Instance still has active references: %s", instance.ref_count + ) @property def cfg_path(self) -> Path: """Get the path to the configuration file, ensuring it's created.""" if not self.cfg_path_cache: + LOGGER.debug("Creating configuration path cache") self.cfg_path_cache = storage_path("vdr", create=True) + LOGGER.debug("Configuration path set to: %s", self.cfg_path_cache) return self.cfg_path_cache @property def genesis_hash(self) -> str: """Get the hash of the configured genesis transactions.""" if not self.genesis_hash_cache: + LOGGER.debug("Calculating genesis transactions hash") self.genesis_hash_cache = _hash_txns(self.genesis_txns) + LOGGER.debug("Genesis hash calculated: %s", self.genesis_hash_cache) return self.genesis_hash_cache @property def genesis_txns(self) -> str: """Get the configured genesis transactions.""" if not self.genesis_txns_cache: + LOGGER.debug("Loading genesis transactions from file") try: path = self.cfg_path.joinpath(self.name, "genesis") + LOGGER.debug("Reading genesis file from: %s", path) self.genesis_txns_cache = _normalize_txns(open(path).read()) + LOGGER.debug("Successfully loaded genesis transactions") except FileNotFoundError: + LOGGER.error("Pool config '%s' not found", self.name) raise LedgerConfigError("Pool config '%s' not found", self.name) from None return self.genesis_txns_cache async def create_pool_config(self, genesis_transactions: str, recreate: bool = False): """Create the pool ledger configuration.""" + LOGGER.debug("Creating pool config for '%s', recreate=%s", self.name, recreate) cfg_pool = self.cfg_path.joinpath(self.name) cfg_pool.mkdir(exist_ok=True) + LOGGER.debug("Created pool configuration directory: %s", cfg_pool) + genesis = _normalize_txns(genesis_transactions) if not genesis: + LOGGER.error("Empty genesis transactions provided") raise LedgerConfigError("Empty genesis transactions") genesis_path = cfg_pool.joinpath("genesis") try: + LOGGER.debug("Checking existing genesis file: %s", genesis_path) cmp_genesis = open(genesis_path).read() if _normalize_txns(cmp_genesis) == genesis: LOGGER.debug( @@ -148,66 +323,94 @@ async def create_pool_config(self, genesis_transactions: str, recreate: bool = F ) return elif not recreate: + LOGGER.error( + "Pool ledger '%s' exists with different genesis transactions", + self.name, + ) raise LedgerConfigError( f"Pool ledger '{self.name}' exists with " "different genesis transactions" ) except FileNotFoundError: + LOGGER.debug("No existing genesis file found") pass try: + LOGGER.debug("Writing genesis transactions to: %s", genesis_path) _write_safe(genesis_path, genesis) except OSError as err: + LOGGER.exception("Error writing genesis transactions", exc_info=err) raise LedgerConfigError("Error writing genesis transactions") from err - LOGGER.debug("Wrote pool ledger config '%s'", self.name) + LOGGER.debug("Successfully wrote pool ledger config '%s'", self.name) self.genesis_txns_cache = genesis async def open(self): """Open the pool ledger, creating it if necessary.""" + LOGGER.debug("Opening pool ledger: %s", self.name) if self.init_config: + LOGGER.debug("Initializing pool config with genesis transactions") await self.create_pool_config(self.genesis_txns_cache, recreate=True) self.init_config = False genesis_hash = self.genesis_hash + LOGGER.debug("Using genesis hash: %s", genesis_hash) cfg_pool = self.cfg_path.joinpath(self.name) cfg_pool.mkdir(exist_ok=True) cache_path = cfg_pool.joinpath(f"cache-{genesis_hash}") try: + LOGGER.debug("Attempting to read cached transactions from: %s", cache_path) txns = open(cache_path).read() cached = True + LOGGER.debug("Successfully read cached transactions") except FileNotFoundError: + LOGGER.debug("No cached transactions found, using genesis transactions") txns = self.genesis_txns cached = False + LOGGER.debug("Opening pool with transactions, socks_proxy=%s", self.socks_proxy) self.handle = await open_pool(transactions=txns, socks_proxy=self.socks_proxy) + LOGGER.debug("Pool opened successfully") + upd_txns = _normalize_txns(await self.handle.get_transactions()) if not cached or upd_txns != txns: + LOGGER.debug("Updating cached transactions") try: _write_safe(cache_path, upd_txns) + LOGGER.debug("Successfully wrote updated cached transactions") except OSError: LOGGER.exception("Error writing cached genesis transactions") async def close(self): """Close the pool ledger.""" if self.handle: + LOGGER.debug("Attempting to close pool ledger") exc = None for attempt in range(3): try: + LOGGER.debug("Close attempt %s/3", attempt + 1) self.handle.close() except VdrError as err: + LOGGER.warning( + "Error closing pool ledger (attempt %s/3): %s", + attempt + 1, + str(err), + ) await asyncio.sleep(0.01) exc = err continue self.handle = None exc = None + LOGGER.debug("Successfully closed pool ledger") break if exc: - LOGGER.exception("Exception when closing pool ledger", exc_info=exc) + LOGGER.exception( + "Failed to close pool ledger after 3 attempts", exc_info=exc + ) self.ref_count += 1 # if we are here, we should have self.ref_lock self.close_task = None raise LedgerError("Exception when closing pool ledger") from exc @@ -227,18 +430,43 @@ async def context_close(self): async def closer(timeout: int): """Close the pool ledger after a timeout.""" - await asyncio.sleep(timeout) - async with self.ref_lock: - if not self.ref_count: - LOGGER.debug("Closing pool ledger after timeout") - await self.close() + try: + LOGGER.debug( + "Coroutine will sleep for %d seconds before closing the pool.", + timeout, + ) + await asyncio.sleep(timeout) + async with self.ref_lock: + if not self.ref_count: + LOGGER.debug( + "No more references. Proceeding to close the pool ledger." + ) + await self.close() + else: + LOGGER.debug( + "Reference count is %d. Not closing the pool yet.", + self.ref_count, + ) + except Exception as e: + LOGGER.exception( + "Exception occurred in closer coroutine during pool closure.", + exc_info=e, + ) async with self.ref_lock: self.ref_count -= 1 + LOGGER.debug("Decremented ref_count to %d.", self.ref_count) if not self.ref_count: if self.keepalive: - self.close_task = asyncio.ensure_future(closer(self.keepalive)) + LOGGER.debug( + "Scheduling closer coroutine with keepalive=%s", + self.keepalive, + ) + self.close_task = asyncio.create_task(closer(self.keepalive)) else: + LOGGER.debug( + "No keepalive set. Proceeding to close the pool immediately." + ) await self.close()