diff --git a/MANIFEST.in b/MANIFEST.in index 7a5e72b..a8cf5c1 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,3 +1,4 @@ include *.md include *.txt include bexi/*.yaml +include VERSION diff --git a/README.rst b/README.rst index 7878602..646cdfa 100644 --- a/README.rst +++ b/README.rst @@ -70,25 +70,19 @@ The active key is what you normally see as the private key, since it allows to move funds, whereas the memo key only allows reading the memo message of transfers. -Then initiate the blockchain monitor - -.. code-block:: bash - - $ python3 cli.py blockchain_monitor - -Start the blockchain monitor service (isalive wsgi response for blockchain monitor) +Start the blockchain monitor service (isalive wsgi response with the actual monitor as coroutine, needs chain connection) .. code-block:: bash $ python3 cli.py blockchain_monitor_service -Start the sign service +Start the sign service (create wallets, sign transactions, offline) .. code-block:: bash $ python3 cli.py sign_service -and the manage service +and the manage service (buil and broadcast transactions, manage balances, history, ..., needs chain connection) .. code-block:: bash diff --git a/VERSION b/VERSION new file mode 100644 index 0000000..ceab6e1 --- /dev/null +++ b/VERSION @@ -0,0 +1 @@ +0.1 \ No newline at end of file diff --git a/bexi/__init__.py b/bexi/__init__.py index 88cf48b..8e44dbc 100644 --- a/bexi/__init__.py +++ b/bexi/__init__.py @@ -1,15 +1,21 @@ import os import yaml import logging -from logging.handlers import TimedRotatingFileHandler +from logging.handlers import TimedRotatingFileHandler, HTTPHandler from copy import deepcopy import io import urllib.request import collections import json +import threading -__VERSION__ = "0.1" +def get_version(): + with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), "..", 'VERSION')) as version_file: + return version_file.read().strip() + + +__VERSION__ = get_version() class Config(dict): @@ -179,34 +185,86 @@ def _nested_update(d, u): return d +class LykkeHttpHandler(HTTPHandler): + + def mapLogRecord(self, record): + from .wsgi import flask_setup + + record_dict = record.__dict__ + record_dict["appName"] = Config.get("wsgi", "name") + record_dict["appVersion"] = __VERSION__ + record_dict["envInfo"] = flask_setup.get_env_info() + record_dict["logLevel"] = record_dict["levelname"] + record_dict["component"] = record_dict["name"] + record_dict["process"] = record_dict["processName"] + record_dict["context"] = None + + if record_dict.get("exc_info", None) is not None: + record_dict["callStack"] = record_dict["exc_text"] + record_dict["exceptionType"] = record_dict["exc_info"][0].__name__ + + return record_dict + + def update_blocking(self): + self.blocking = Config.get("logs", "http", "blocking", True) + + def emit(self, record): + if self.blocking: + super(LykkeHttpHandler, self).emit(record) + else: + def super_emit(): + super(LykkeHttpHandler, self).emit(record) + + thread = threading.Thread(target=super_emit) + thread.daemon = True + thread.start() + + def set_global_logger(existing_loggers=None): + use_handlers = [] + # setup logging - # ... log to file system - log_folder = os.path.join(Config.get("dump_folder", default="dump"), "logs") log_level = logging.getLevelName(Config.get("logs", "level", default="INFO")) - - os.makedirs(log_folder, exist_ok=True) log_format = ('%(asctime)s %(levelname) -10s: %(message)s') - trfh = TimedRotatingFileHandler( - os.path.join(log_folder, "bexi.log"), - "midnight", - 1 - ) - trfh.suffix = "%Y-%m-%d" - trfh.setFormatter(logging.Formatter(log_format)) - trfh.setLevel(log_level) + + if Config.get("logs", "file", True): + # ... log to file system + log_folder = os.path.join(Config.get("dump_folder", default="dump"), "logs") + os.makedirs(log_folder, exist_ok=True) + trfh = TimedRotatingFileHandler( + os.path.join(log_folder, "bexi.log"), + "midnight", + 1 + ) + trfh.suffix = "%Y-%m-%d" + trfh.setFormatter(logging.Formatter(log_format)) + trfh.setLevel(log_level) + + use_handlers.append(trfh) # ... and to console sh = logging.StreamHandler() sh.setFormatter(logging.Formatter(log_format)) sh.setLevel(log_level) + use_handlers.append(sh) + + if not Config.get("logs", "http", {}) == {}: + # ... and http logger + lhh = LykkeHttpHandler( + host=Config.get("logs", "http", "host"), + url=Config.get("logs", "http", "url"), + method="POST", + secure=Config.get("logs", "http", "secure") + ) + lhh.setLevel(log_level) + lhh.update_blocking() + use_handlers.append(lhh) + # global config (e.g. for werkzeug) logging.basicConfig(level=log_level, format=log_format, - handlers=[trfh, sh]) - - use_handlers = [trfh, sh] + handlers=use_handlers) if existing_loggers is not None: if not type(existing_loggers) == list: diff --git a/bexi/addresses.py b/bexi/addresses.py index fdd5119..5d7e819 100644 --- a/bexi/addresses.py +++ b/bexi/addresses.py @@ -25,7 +25,7 @@ def split_unique_address(address): return always -def get_from_address_from_operation(operation): +def get_from_address(operation): """ returns the from address of the given operation. if the from address is the exchange account, the address contains the customer_id, otherwise empty string @@ -34,13 +34,14 @@ def get_from_address_from_operation(operation): :param operation: operation formatted for operation storage :type operation: dict """ - if utils.is_exchange_account(operation["from"]): - return get_address_from_operation(operation) + if is_internal(operation["from"], operation["to"]): + # internal transfer + return create_unique_address(operation["from"], operation["customer_id"]) else: - return ensure_account_name(operation["from"]) + DELIMITER + "" + return create_unique_address(operation["from"], "") -def get_to_address_from_operation(operation): +def get_to_address(operation): """ returns the to address of the given operation. if the to address is the exchange account, the address contains the customer_id, otherwise empty string @@ -49,29 +50,81 @@ def get_to_address_from_operation(operation): :param operation: operation formatted for operation storage :type operation: dict """ - if utils.is_exchange_account(operation["to"]) and not utils.is_exchange_account(operation["from"]): - return get_address_from_operation(operation) + if not is_internal(operation["from"], operation["to"]): + # no internal transfer + return create_unique_address(operation["to"], operation["customer_id"]) else: - return ensure_account_name(operation["to"]) + DELIMITER + "" + return create_unique_address(operation["to"], "") -def get_address_from_operation(operation): - """ assumes that the operation is either from or to an exchange account. - the address of this operation is then returned as - DELIMITER +def get_tracking_address(operation): + """ + Get the tracking address of this operation, either from or to an exchange account. + Decision depends on internal transfer, deposit, withdraw operation :param operation: operation formatted for operation storage :type operation: dict + + :returns address as defined in `func`:create_unique_address """ - if utils.is_exchange_account(operation["from"]) and utils.is_exchange_account(operation["to"]): - return ensure_account_name(operation["from"]) + DELIMITER + operation["customer_id"] - elif utils.is_exchange_account(operation["from"]): - return ensure_account_name(operation["from"]) + DELIMITER + operation["customer_id"] - elif utils.is_exchange_account(operation["to"]): - return ensure_account_name(operation["to"]) + DELIMITER + operation["customer_id"] + if is_internal(operation["from"], operation["to"]): + # internal transfer + return create_unique_address(operation["from"], operation["customer_id"]) + elif is_withdraw(operation["from"], operation["to"]): + # withdraw + return create_unique_address(operation["to"], operation["customer_id"]) + elif is_deposit(operation["from"], operation["to"]): + # deposit + return create_unique_address(operation["to"], operation["customer_id"]) raise Exception("No operaton concerning this exchange") +def decide_tracking_address(from_address, to_address): + """ + Given two addresses it decides which is the tracking address for the underlying operation. + Creates and splits the address to use common functionality buried in both methods. + Decision depends on internal transfer, deposit, withdraw operation + + :param from_address: from address + :type from_address: str or split address + :param to_address: to address + :type to_address: str or split address3 + + :returns split address as defined in `func`:split_unique_address + """ + if type(from_address) == str: + from_address = split_unique_address(from_address) + if type(to_address) == str: + to_address = split_unique_address(to_address) + if is_internal(from_address, to_address): + # internal transfer + return split_unique_address(create_unique_address(from_address["account_id"], from_address["customer_id"])) + elif is_withdraw(from_address, to_address): + # withdraw + return split_unique_address(create_unique_address(to_address["account_id"], to_address["customer_id"])) + elif is_deposit(from_address, to_address): + # deposit + return split_unique_address(create_unique_address(to_address["account_id"], to_address["customer_id"])) + raise Exception("No operaton concerning this exchange") + + +def ensure_address_format(address): + """ + Ensures that the address has the correct format name:uuid + + :param address: address to be checked + :type address: address + + :returns properly formatted address + """ + if type(address) == str: + if not address.startswith("1.2."): + return address + address = split_unique_address(address) + assert type(address) == dict + return create_unique_address(address["account_id"], address["customer_id"]) + + @requires_blockchain def _account_name_to_id(account_name, bitshares_instance): return Account(account_name, bitshares_instance=bitshares_instance)["id"] @@ -115,27 +168,54 @@ def create_unique_address(account_id_or_name, randomizer=uuid.uuid4): """ account_id_or_name = ensure_account_name(account_id_or_name) if type(randomizer) == str: - return account_id_or_name + DELIMITER + randomizer + if randomizer == "": + return account_id_or_name + else: + return account_id_or_name + DELIMITER + randomizer return account_id_or_name + DELIMITER + str(randomizer()) -def create_memo(address, incident_id): - """ Create plain text memo for an address/incident pair. - The memo will contain DELIMITER, - this is done to have full transparency on the blockchain +def is_withdraw(from_address, to_address): + if type(from_address) == dict: + from_address = from_address["account_id"] + if type(to_address) == dict: + to_address = to_address["account_id"] + return utils.is_exchange_account(from_address) and not utils.is_exchange_account(to_address) + + +def is_deposit(from_address, to_address): + if type(from_address) == dict: + from_address = from_address["account_id"] + if type(to_address) == dict: + to_address = to_address["account_id"] + return not utils.is_exchange_account(from_address) and utils.is_exchange_account(to_address) + + +def is_internal(from_address, to_address): + if type(from_address) == dict: + from_address = from_address["account_id"] + if type(to_address) == dict: + to_address = to_address["account_id"] + return utils.is_exchange_account(from_address) and utils.is_exchange_account(to_address) + + +def create_memo(from_address, to_address, incident_id): + """ Create plain text memo for a transfer as defined by arguments. + Depending on the case (internal transfer, deposit, withdraw operation), + the memo will contain [[DELIMITER]]. :param address: address in the format DELIMITER :type address: str :param incident_id: unique incident id :type incident_id: str """ - address = split_unique_address(address) + address = decide_tracking_address(from_address, to_address) memo = "" if address["customer_id"]: memo = memo + address["customer_id"] - if incident_id: + if incident_id and not is_withdraw(from_address, to_address): if memo != "": memo = memo + DELIMITER + incident_id else: @@ -154,9 +234,9 @@ def split_memo(memo): raise ValueError() splitted = memo.split(DELIMITER) - always = {"customer_id": splitted[0]} + always = {"customer_id": splitted[0].strip()} if len(splitted) == 2: - always["incident_id"] = splitted[1] + always["incident_id"] = splitted[1].strip() else: always["incident_id"] = None return always diff --git a/bexi/blockchain_monitor/__init__.py b/bexi/blockchain_monitor/__init__.py index 5081755..790a513 100644 --- a/bexi/blockchain_monitor/__init__.py +++ b/bexi/blockchain_monitor/__init__.py @@ -1,4 +1,4 @@ -from ..operation_storage.exceptions import DuplicateOperationException +from ..operation_storage.exceptions import DuplicateOperationException, OperationStorageBadRequestException from ..factory import get_operation_storage from ..connection import requires_blockchain from .. import Config @@ -13,6 +13,10 @@ import logging +class BlockchainMonitorRetryException(Exception): + pass + + class BlockchainMonitor(object): """ The BlockchainMonitor Class is used to listen to the BitShares blockchain and react on transfers to the monitored account (as per the configuration) @@ -75,7 +79,7 @@ def __init__( # make sure the memo key is added to the instance memo_key = Config.get("bitshares", "exchange_account_memo_key") if not self.bitshares.wallet.created() or\ - memo_key in self.bitshares.wallet.keys: + memo_key not in self.bitshares.wallet.keys: self.bitshares.wallet.setKeys(memo_key) # Get configuration @@ -112,14 +116,6 @@ def __init__( self.start_block = kwargs.pop("start_block", None) self.stop_block = kwargs.pop("stop_block", None) - last_block = self.storage.get_last_head_block_num() - - logging.getLogger(__name__).debug("Init with start=" + str(self.start_block) + " stop=" + str(self.stop_block) + " last=" + str(last_block)) - - if not self.start_block: - if last_block > 0: - self.start_block = last_block + 1 - def unlock_wallet(self, pwd): """ Unlock the pybitshares wallet with the provided password """ @@ -136,18 +132,64 @@ def listen(self): last block) and "irreversible" (the block that is confirmed by 2/3 of all block producers and is thus irreversible) """ - for block in Blockchain( - mode=self.watch_mode, - max_block_wait_repetition=12, - bitshares_instance=self.bitshares - ).blocks( - start=self.start_block, - stop=self.stop_block - ): - logging.getLogger(__name__).debug("Processing block " + str(block["block_num"])) + last_block = self.storage.get_last_head_block_num() + + logging.getLogger(__name__).debug("Start listen with start=" + str(self.start_block) + " stop=" + str(self.stop_block) + " last=" + str(last_block)) + + # if set to true, block numbers may not be consecutively + self.allow_block_jump = False + + if not self.start_block: + if last_block > 0: + self.start_block = last_block + 1 + else: + if not self.start_block == self.storage.get_last_head_block_num() + 1: + # allow first block to jump + self.allow_block_jump = True + logging.getLogger(__name__).warning("Force listen with different block than last in storage (storage=" + str(last_block) + ", given=" + str(self.start_block) + ")") + + retry = True + while (retry): + retry = False + + block_listener = Blockchain( + mode=self.watch_mode, + max_block_wait_repetition=12, + bitshares_instance=self.bitshares + ) - self.process_block(block) - self.storage.set_last_head_block_num(block["block_num"]) + if not block_listener.mode == "last_irreversible_block_num": + block_listener.mode = "last_irreversible_block_num" + + try: + for block in block_listener.blocks( + start=self.start_block, + stop=self.stop_block + ): + logging.getLogger(__name__).debug("Processing block " + str(block["block_num"])) + + last_head_block = self.storage.get_last_head_block_num() + + if last_head_block == 0 or\ + block["block_num"] == last_head_block + 1 or\ + (self.allow_block_jump and last_head_block < block["block_num"]): + self.allow_block_jump = False + # no blocks missed + self.process_block(block) + self.storage.set_last_head_block_num(block["block_num"]) + elif block["block_num"] == last_head_block: + # possible on connection error, skip block + continue + else: + self.start_block = last_head_block + 1 + if self.stop_block is not None and self.start_block > self.stop_block: + logging.getLogger(__name__).error("Block was missed, or trying to march backwards. Stop block already reached, shutting down ...") + return + else: + logging.getLogger(__name__).error("Block was missed, or trying to march backwards. Retry with next block " + str(last_head_block + 1)) + raise BlockchainMonitorRetryException() + except BlockchainMonitorRetryException: + retry = True def process_block(self, block): """ Process block and send transactions to @@ -158,11 +200,12 @@ def process_block(self, block): This method takes all transactions (appends block-specific informations) and sends them to transaction processing """ - for transaction in block.get("transactions", []): + for tx_in_block, transaction in enumerate(block.get("transactions", [])): # Add additional information transaction.update({ "block_num": block.get("block_num"), "timestamp": block.get("timestamp"), + "tx_in_block": tx_in_block }) self.process_transaction(transaction) @@ -198,6 +241,7 @@ def get_tx_id(transaction): "block_num": transaction.get("block_num"), "timestamp": transaction.get("timestamp"), "expiration": transaction.get("expiration"), + "tx_in_block": transaction.get("tx_in_block"), "op_in_tx": op_in_tx, "op": [ # Decode the operation type id as string @@ -300,4 +344,7 @@ def postprocess_operation(self, operation): try: self.storage.insert_or_update_operation(operation) except DuplicateOperationException: - pass + logging.getLogger(__name__).debug("Storage already contained operation, skipping ...") + except OperationStorageBadRequestException as e: + logging.getLogger(__name__).debug("Storage gave bad request, exception below. Skipping ...") + logging.getLogger(__name__).exception(e) diff --git a/bexi/config_bitshares_connection.yaml b/bexi/config_bitshares_connection.yaml index 39c6678..102df49 100644 --- a/bexi/config_bitshares_connection.yaml +++ b/bexi/config_bitshares_connection.yaml @@ -2,6 +2,7 @@ network_type: Test bitshares: transaction_expiration_in_sec: 43200 + blockchain_allowed_sync_delay_in_sec: 180 # irreversible, or head watch_mode: irreversible diff --git a/bexi/config_common.yaml b/bexi/config_common.yaml index bf86d47..cb076a6 100644 --- a/bexi/config_common.yaml +++ b/bexi/config_common.yaml @@ -2,9 +2,14 @@ dump_folder: dump logs: level: DEBUG + http: + host: logging-adapter-dev.lykkex.net + url: /api/logs + secure: True + blocking: True wsgi: - name: bexi - host: localhost - port: 5000 - detailed_error: True \ No newline at end of file + name: bexi + host: localhost + port: 5000 + detailed_error: True \ No newline at end of file diff --git a/bexi/config_operation_storage.yaml b/bexi/config_operation_storage.yaml index 5b332a9..e4b8075 100644 --- a/bexi/config_operation_storage.yaml +++ b/bexi/config_operation_storage.yaml @@ -4,33 +4,23 @@ operation_storage: wait_in_ms: 0 incident_id: format: "[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89ab][0-9a-fA-F]{3}-[0-9a-fA-F]{12}" - short_hash_digits: 3 - use: azuretest - mongodb: - seeds: - - # insert list of seeds here - db: bitshares-operation-storage - operation_collection: operations - status_collection: status - address_collection: address + key_hash: + digits: 3 + type: crc32 + use: azure mongodbtest: seeds: - localhost:27017 - db: bitshares-operation-storage - operation_collection: operations_test - status_collection: status_test - address_collection: address_test + db: bitshares-operation-storage-test + azureauto: + account: bitsharesdev + key: 3EhS8fLDZNKbS/O6DSIzbWZuU+PwClRAgmFYViDP1L/INlTnR/GW+2D06BTmB02pZTzERhidqXTgDaJ7pmajLg== + db: lykkebitsharesoperationstorage azure: - account: # insert account name - key: # insert account key + account: bitsharesdevautotests + key: 0H3WReLsK1GmDU8axor7M3lz4xuAzgdPrqy597AcoCZ5+4kcNFxJ2QoMM52Uz2nRRV77oZ1voKGxtH6rJCaXVw== db: lykkebitsharesoperationstorage - operation_table: operations - status_table: status - address_table: address azuretest: account: lkedevbcnbitshares key: Sqvrhc8Y1rPhqiWaDT6k9BNFy2nE57ebi51zTH2jMs0jbZLBKD4wvIsadPU5iH9R0/858CE/TfKaJ5IC5B/Oxw== - db: lykkebitsharesoperationstorage - operation_table: operationstest - status_table: statustest - address_table: addresstest \ No newline at end of file + db: lykkebitsharesoperationstoragetest diff --git a/bexi/connection.py b/bexi/connection.py index aae255e..cd9748c 100644 --- a/bexi/connection.py +++ b/bexi/connection.py @@ -32,6 +32,8 @@ def reset(): def _get_configured_instance(): network = Config.get("network_type") connection = Config.get("bitshares", "connection", network) + if connection.get("num_retries", None) is None: + connection["num_retries"] = -1 logging.getLogger(__name__).debug("BitShares connection is initialized with with given config ... \n") return bitshares.BitShares(**connection) diff --git a/bexi/factory.py b/bexi/factory.py index b11101a..51504bf 100644 --- a/bexi/factory.py +++ b/bexi/factory.py @@ -41,15 +41,10 @@ def get_mongodb_test(use_config): connectTimeoutMS=500, serverSelectionTimeoutMS=1500 ) - if purge or purge is None: - mongodb_client[use_config["db"]][use_config["status_collection"] - ].drop() - mongodb_client[use_config["db"]][use_config["operation_collection"] - ].drop() - mongodb_client[use_config["db"]][use_config["address_collection"] - ].drop() - - return MongoDBOperationsStorage(mongodb_config=use_config, mongodb_client=mongodb_client) + if purge is None: + return MongoDBOperationsStorage(mongodb_config=use_config, mongodb_client=mongodb_client, purge=True) + else: + return MongoDBOperationsStorage(mongodb_config=use_config, mongodb_client=mongodb_client, purge=purge) def get_azure(use_config): return AzureOperationsStorage(azure_config=use_config) diff --git a/bexi/operation_storage/azure_storage.py b/bexi/operation_storage/azure_storage.py index c1c122e..ece60b0 100644 --- a/bexi/operation_storage/azure_storage.py +++ b/bexi/operation_storage/azure_storage.py @@ -4,10 +4,10 @@ AzureHttpError from azure.cosmosdb.table.tableservice import TableService from urllib3.exceptions import NewConnectionError -import hashlib -from ..addresses import split_unique_address +from ..addresses import split_unique_address, get_tracking_address, ensure_address_format from .exceptions import ( + BalanceConcurrentException, AddressNotTrackedException, AddressAlreadyTrackedException, InputInvalidException, @@ -21,6 +21,10 @@ from bexi import Config import json from json.decoder import JSONDecodeError +import hashlib +import zlib +import logging +from bitshares.amount import Amount class AzureOperationsStorage(BasicOperationStorage): @@ -37,7 +41,7 @@ class AzureOperationsStorage(BasicOperationStorage): """ def get_retry_exceptions(self): - return (NewConnectionError) + return (NewConnectionError, AzureHttpError) @retry_auto_reconnect def __init__(self, azure_config, purge=False): @@ -47,6 +51,12 @@ def __init__(self, azure_config, purge=False): raise Exception("No azure table storage configuration provided!") self._azure_config = azure_config + # ensure defaults + self._azure_config["operation_table"] = self._azure_config.get("operation_table", "operations") + self._azure_config["address_table"] = self._azure_config.get("address_table", "address") + self._azure_config["status_table"] = self._azure_config.get("status_table", "status") + self._azure_config["balances_table"] = self._azure_config.get("balances_table", "balances") + if not self._azure_config["account"]: raise Exception("Please include the azure account name in the config") if not self._azure_config["key"]: @@ -58,6 +68,7 @@ def __init__(self, azure_config, purge=False): self._create_operations_storage(purge) self._create_status_storage(purge) self._create_address_storage(purge) + self._create_balances_storage(purge) def _debug_print(self, operation): from pprint import pprint @@ -98,15 +109,38 @@ def _create_status_storage(self, purge): self._service.create_table(self._azure_config["status_table"]) time.sleep(0.1) + def _create_balances_storage(self, purge): + if purge: + try: + tablename = self._azure_config["balances_table"] + for item in self._service.query_entities(tablename): + self._service.delete_entity( + tablename, + item["PartitionKey"], + item["RowKey"]) + except AzureMissingResourceHttpError: + pass + while not self._service.exists(self._azure_config["balances_table"]): + self._service.create_table(self._azure_config["balances_table"]) + time.sleep(0.1) + def _create_operations_storage(self, purge): - self._operation_varients = ["incident", "status"] # "customer" + self._operation_varients = ["incident", "statuscompleted", "statusfailed", "statusinprogress"] # "customer" self._operation_tables = {} for variant in self._operation_varients: self._operation_tables[variant] = self._azure_config["operation_table"] + variant self._operation_prep = { - "status": lambda op: { - "PartitionKey": op["status"], + "statusinprogress": lambda op: { + "PartitionKey": self._short_digit_hash(op["chain_identifier"]), + "RowKey": op["chain_identifier"] + }, + "statuscompleted": lambda op: { + "PartitionKey": self._short_digit_hash(op["chain_identifier"]), + "RowKey": op["chain_identifier"] + }, + "statusfailed": lambda op: { + "PartitionKey": self._short_digit_hash(op["chain_identifier"]), "RowKey": op["chain_identifier"] }, "customer": lambda op: { @@ -139,17 +173,27 @@ def _get_with_ck(self, variant, operation): return with_ck def _short_digit_hash(self, value): - return str(abs(hash(value)) % (10 ** Config.get("operation_storage", "short_hash_digits", 3))) + hash_type = Config.get("operation_storage", "key_hash", "type", default="crc32") + + if hash_type == "crc32": + short_hash = hex(zlib.crc32(value.encode(encoding='UTF-8'))) + short_hash = short_hash[2:len(short_hash)] + + elif hash_type == "sha256": + checker = hashlib.sha256() + checker.update(value.encode(encoding='UTF-8')) + short_hash = checker.hexdigest() + return short_hash[0:Config.get("operation_storage", "key_hash", "digits", 3)] @retry_auto_reconnect def track_address(self, address, usage="balance"): - split = split_unique_address(address) - if not split.get("customer_id") or not split.get("account_id"): - raise OperationStorageException() + address = ensure_address_format(address) try: + short_hash = self._short_digit_hash(address) + logging.getLogger(__name__).debug("track_address with " + str(address) + ", hash " + str(short_hash)) self._service.insert_entity( self._azure_config["address_table"] + usage, - {"PartitionKey": self._short_digit_hash(address), + {"PartitionKey": short_hash, "RowKey": address, "address": address, "usage": usage} @@ -159,41 +203,90 @@ def track_address(self, address, usage="balance"): @retry_auto_reconnect def untrack_address(self, address, usage="balance"): + address = ensure_address_format(address) try: + short_hash = self._short_digit_hash(address) + logging.getLogger(__name__).debug("untrack_address with " + str(address) + ", hash " + str(short_hash)) self._service.delete_entity( self._azure_config["address_table"] + usage, - self._short_digit_hash(address), + short_hash, + address) + try: + self._delete_balance(address) + except AzureMissingResourceHttpError: + pass + except AzureMissingResourceHttpError: + raise AddressNotTrackedException() + + @retry_auto_reconnect + def _get_address(self, address, usage="balance"): + try: + short_hash = self._short_digit_hash(address) + logging.getLogger(__name__).debug("_get_address with " + str(address) + ", hash " + str(short_hash)) + return self._service.get_entity( + self._azure_config["address_table"] + usage, + short_hash, address) except AzureMissingResourceHttpError: raise AddressNotTrackedException() def _update(self, operation, status=None): try: - for variant in self._operation_varients: - operation = self._get_with_ck(variant, operation) - new_operation = operation.copy() - if status: - tmp = self.get_operation(operation["incident_id"]) - new_operation["timestamp"] = tmp["timestamp"] - new_operation["status"] = status - new_operation = self._get_with_ck(variant, new_operation) - if variant == "status": - # needs delete and insert + mapping = {"in_progress": "statusinprogress", + "completed": "statuscompleted", + "failed": "statusfailed"} + + operation = self._get_with_ck("incident", operation.copy()) + new_operation = operation + if status: + tmp = self.get_operation(operation["incident_id"]) + new_operation["timestamp"] = tmp["timestamp"] + new_operation["status"] = status + new_operation = self._get_with_ck("incident", new_operation) + + logging.getLogger(__name__).debug("_update: Table " + self._operation_tables["incident"] + " PartitionKey " + new_operation["PartitionKey"] + " " + new_operation["RowKey"]) + + self._service.update_entity( + self._operation_tables["incident"], + new_operation) + + operation = self._get_with_ck("statuscompleted", operation.copy()) + new_operation = operation + if status: + tmp = self.get_operation(operation["incident_id"]) + new_operation["timestamp"] = tmp["timestamp"] + new_operation["status"] = status + new_operation = self._get_with_ck("statuscompleted", new_operation) + self._service.update_entity( + self._operation_tables["statuscompleted"], + new_operation) + + logging.getLogger(__name__).debug("_update: Table " + self._operation_tables["statuscompleted"] + " PartitionKey " + new_operation["PartitionKey"] + " " + new_operation["RowKey"]) + + if status: + # needs delete and insert + try: self._service.delete_entity( - self._operation_tables[variant], + self._operation_tables[mapping[operation["status"]]], operation["PartitionKey"], operation["RowKey"]) + except AzureMissingResourceHttpError: + pass + try: self._service.insert_entity( - self._operation_tables[variant], + self._operation_tables[mapping[new_operation["status"]]], new_operation) - else: + except AzureConflictHttpError: + # already exists, try update self._service.update_entity( - self._operation_tables[variant], + self._operation_tables[mapping[new_operation["status"]]], new_operation) + else: + self._service.update_entity( + self._operation_tables[mapping[new_operation["status"]]], + new_operation) except AzureMissingResourceHttpError: raise OperationNotFoundException() - except AzureConflictHttpError: - raise DuplicateOperationException() def _insert(self, operation): try: @@ -204,6 +297,8 @@ def _insert(self, operation): raise AzureMissingResourceHttpError() if not to_insert["RowKey"]: raise AzureMissingResourceHttpError() + + logging.getLogger(__name__).debug("_insert: Table " + self._operation_tables[variant] + " PartitionKey " + to_insert["PartitionKey"] + " " + to_insert["RowKey"]) self._service.insert_entity( self._operation_tables[variant], to_insert @@ -230,6 +325,8 @@ def flag_operation_completed(self, operation): self._update(operation, status="completed") + self._ensure_balances(operation) + @retry_auto_reconnect def flag_operation_failed(self, operation, message=None): # do basics @@ -242,25 +339,200 @@ def insert_operation(self, operation): # do basics operation = super(AzureOperationsStorage, self).insert_operation(operation) - self._insert(operation) + error = None + try: + self._insert(operation) + except DuplicateOperationException as e: + error = e + + try: + # always check if balances are ok + if operation["status"] == "completed": + self._ensure_balances(operation) + except BalanceConcurrentException as e: + if error is None: + error = e + + if error is not None: + raise error + + @retry_auto_reconnect + def _delete_balance(self, address, if_match='*'): + self._service.delete_entity( + self._azure_config["balances_table"], + self._short_digit_hash(address), + address, + if_match=if_match + ) + + @retry_auto_reconnect + def _ensure_balances(self, operation): + affected_address = get_tracking_address(operation) + logging.getLogger(__name__).debug("_ensure_balances: with " + operation["chain_identifier"] + " for address " + str(affected_address)) + try: + self._get_address(affected_address) + except AddressNotTrackedException: + # delte if exists and return + try: + self._delete_balance(affected_address) + except AzureMissingResourceHttpError: + pass + return + + try: + balance_dict = self._service.get_entity( + self._azure_config["balances_table"], + self._short_digit_hash(affected_address), + affected_address) + insert = False + except AzureMissingResourceHttpError as e: + balance_dict = {"address": affected_address} + balance_dict["PartitionKey"] = self._short_digit_hash(balance_dict["address"]) + balance_dict["RowKey"] = balance_dict["address"] + insert = True + + if operation["block_num"] < balance_dict.get("blocknum", 0): + raise BalanceConcurrentException() + elif operation["block_num"] == balance_dict.get("blocknum", 0) and\ + operation["txnum"] < balance_dict.get("txnum", 0): + raise BalanceConcurrentException() + elif operation["block_num"] == balance_dict.get("blocknum", 0) and\ + operation["txnum"] == balance_dict.get("txnum", 0) and\ + operation["opnum"] <= balance_dict.get("opnum", 0): + raise BalanceConcurrentException() + + balance_dict["blocknum"] = max(balance_dict.get("blocknum", 0), operation["block_num"]) + balance_dict["txnum"] = max(balance_dict.get("txnum", 0), operation["tx_in_block"]) + balance_dict["opnum"] = max(balance_dict.get("opnum", 0), operation["op_in_tx"]) + total = 0 + + addrs = split_unique_address(affected_address) + + asset_id_key = "balance" + operation["amount_asset_id"].split("1.3.")[1] + asset_id = operation["amount_asset_id"] + balance = Amount({ + "asset_id": asset_id, + "amount": balance_dict.get(asset_id_key, "0")}) + amount_value = Amount({ + "asset_id": asset_id, + "amount": operation["amount_value"]}) + + if addrs["account_id"] == operation["from"]: + # internal transfer and withdraw + + # negative + balance_dict[asset_id_key] = str(int(balance - amount_value)) + + # fee as well + asset_id_key = "balance" + operation["fee_asset_id"].split("1.3.")[1] + asset_id = operation["fee_asset_id"] + balance = Amount({ + "asset_id": asset_id, + "amount": balance_dict.get(asset_id_key, "0")}) + fee_value = Amount({ + "asset_id": asset_id, + "amount": operation["fee_value"]}) + + balance_dict[asset_id_key] = str(int(balance - fee_value)) + elif addrs["account_id"] == operation["to"]: + # deposit + + # positive + balance_dict[asset_id_key] = str(int(balance + amount_value)) + + # fees were paid by someone else + else: + raise InvalidOperationException() + + for key, value in balance_dict.items(): + if key.startswith("balance"): + total = total + int(value) + + if total == 0: + if not insert: + try: + self._delete_balance(affected_address, + if_match=balance_dict.etag) + except AzureMissingResourceHttpError: + pass + return + + # may be updated or inserted, total > 0 + if (insert): + try: + self._service.insert_entity( + self._azure_config["balances_table"], + balance_dict + ) + except AzureMissingResourceHttpError: + raise OperationStorageException("Critical error in database consistency") + else: + try: + self._service.update_entity( + self._azure_config["balances_table"], + balance_dict, + if_match=balance_dict.etag + ) + except AzureConflictHttpError: + raise OperationStorageException("Critical error in database consistency") @retry_auto_reconnect def insert_or_update_operation(self, operation): # do basics operation = super(AzureOperationsStorage, self).insert_operation(operation) + # check if this is from in_progress to complete (for withdrawals we need to find incident id as its + # not stored onchain) try: - self._insert(operation) - except DuplicateOperationException as ex: - # could be an update to completed ... + logging.getLogger(__name__).debug("insert_or_update_operation: check if in_progress with " + str(operation["chain_identifier"]) + " exists") + existing_operation = self.get_operation_by_chain_identifier("in_progress", operation["chain_identifier"]) + logging.getLogger(__name__).debug("insert_or_update_operation: found existing in_progress operation") + if not existing_operation["incident_id"] == operation["incident_id"] and\ + operation["incident_id"] == operation["chain_identifier"]: + logging.getLogger(__name__).debug("insert_or_update_operation: using preset incident_id " + str(existing_operation["incident_id"])) + operation["incident_id"] = existing_operation["incident_id"] + except OperationNotFoundException: + existing_operation = None + + if existing_operation is None: + try: + logging.getLogger(__name__).debug("insert_or_update_operation: attempting insert") + + error = None + try: + self._insert(operation) + except DuplicateOperationException as e: + error = e + + try: + # always check if balances are ok + if operation["status"] == "completed": + self._ensure_balances(operation) + except BalanceConcurrentException as e: + if error is None: + error = e + + if error is not None: + raise error + except DuplicateOperationException as ex: + logging.getLogger(__name__).debug("insert_or_update_operation: fallback to update") + # could be an update to completed ... + if operation.get("block_num"): + try: + operation.pop("status") + self.flag_operation_completed(operation) + except OperationNotFoundException: + raise ex + else: + raise ex + else: + logging.getLogger(__name__).debug("insert_or_update_operation: attempting update") if operation.get("block_num"): try: operation.pop("status") self.flag_operation_completed(operation) except OperationNotFoundException: raise ex - else: - raise ex @retry_auto_reconnect def delete_operation(self, operation_or_incident_id): @@ -273,12 +545,32 @@ def delete_operation(self, operation_or_incident_id): operation = operation_or_incident_id self._delete(operation) + @retry_auto_reconnect + def get_operation_by_chain_identifier(self, status, chain_identifier): + mapping = {"in_progress": "statusinprogress", + "completed": "statuscompleted", + "failed": "statusfailed"} + try: + operation = self._service.get_entity( + self._operation_tables[mapping[status]], + self._short_digit_hash(chain_identifier), + chain_identifier) + operation.pop("PartitionKey") + operation.pop("RowKey") + operation.pop("Timestamp") + operation.pop("etag") + except AzureMissingResourceHttpError: + raise OperationNotFoundException() + return operation + @retry_auto_reconnect def get_operation(self, incident_id): try: + short_hash = self._short_digit_hash(incident_id) + logging.getLogger(__name__).debug("get_operation with " + str(incident_id) + ", hash " + str(short_hash)) operation = self._service.get_entity( self._operation_tables["incident"], - self._short_digit_hash(incident_id), + short_hash, incident_id) operation.pop("PartitionKey") operation.pop("RowKey") @@ -289,13 +581,51 @@ def get_operation(self, incident_id): return operation @retry_auto_reconnect - def get_balances(self, take, continuation=None, addresses=None): + def get_balances(self, take, continuation=None, addresses=None, recalculate=False): + if recalculate: + raise Exception("Currently not supported due to memo change on withdraw") + return self._get_balances_recalculate(take, continuation, addresses) + else: + if continuation is not None: + try: + continuation_marker = json.loads(continuation) + continuation_marker = str(continuation_marker) + except TypeError: + raise InputInvalidException() + except JSONDecodeError: + raise InputInvalidException() + + balances = self._service.query_entities( + self._azure_config["balances_table"], + num_results=take, + marker=continuation_marker) + else: + balances = self._service.query_entities( + self._azure_config["balances_table"], + num_results=take) + return_balances = {} + for address_balance in balances: + return_balances[address_balance["address"]] = { + "block_num": address_balance["blocknum"] + } + for key, value in address_balance.items(): + if key.startswith("balance"): + asset_id = "1.3." + key.split("balance")[1] + return_balances[address_balance["address"]][asset_id] = value + return_balances["continuation"] = None + if balances.next_marker: + return_balances["continuation"] = json.dumps(balances.next_marker) + return return_balances + + @retry_auto_reconnect + def _get_balances_recalculate(self, take, continuation=None, addresses=None): address_balances = collections.defaultdict(lambda: collections.defaultdict()) if not addresses: - if continuation: + if continuation is not None: try: continuation_marker = json.loads(continuation) + continuation_marker = str(continuation_marker) except TypeError: raise InputInvalidException() except JSONDecodeError: @@ -324,26 +654,35 @@ def get_balances(self, take, continuation=None, addresses=None): "customer_id": addrs["customer_id"] }): this_block_num = operation["block_num"] + asset_id = operation["amount_asset_id"] + balance = Amount({ + "asset_id": asset_id, + "amount": address_balances[address].get(asset_id, "0")}) + amount_value = Amount({ + "asset_id": asset_id, + "amount": operation["amount_value"]}) + if addrs["account_id"] == operation["from"]: # negative - balance = address_balances[address].get(asset_id, 0) - address_balances[address][asset_id] =\ - balance - operation["amount_value"] + str(int(balance - amount_value)) # fee as well asset_id = operation["fee_asset_id"] - balance = address_balances[address].get(asset_id, 0) + balance = Amount({ + "asset_id": asset_id, + "amount": address_balances[address].get(asset_id, "0")}) + fee_value = Amount({ + "asset_id": asset_id, + "amount": operation["fee_value"]}) address_balances[address][asset_id] =\ - balance - operation["fee_value"] + str(int(balance - fee_value)) elif addrs["account_id"] == operation["to"]: # positive - balance = address_balances[address].get(asset_id, 0) - address_balances[address][asset_id] =\ - balance + operation["amount_value"] + str(int(balance + amount_value)) else: raise InvalidOperationException() max_block_number = max(max_block_number, this_block_num) @@ -363,47 +702,70 @@ def _parse_filter(self, filter_by): if filter_by.get("address"): addrs = split_unique_address(filter_by.pop("address")) return {"customer_id": addrs["customer_id"]} + if filter_by.get("from"): + addrs = split_unique_address(filter_by.pop("from")) + return {"from": addrs["account_id"]} + if filter_by.get("to"): + addrs = split_unique_address(filter_by.pop("to")) + return {"to": addrs["account_id"]} if filter_by: raise Exception("Filter not supported") return {} + def _filter_dict_to_string(self, filter_dict, partition_key=None): + filter_str = None + for key, value in filter_dict.items(): + if partition_key == key: + key = "PartitionKey" + if filter_str is not None: + delimiter = " and " + delimiter = "" + filter_str = delimiter + key + " eq '" + value + "'" + return filter_str + @retry_auto_reconnect def get_operations_in_progress(self, filter_by=None): - filter_dict = {"status": "in_progress"} + mapping = {"in_progress": "statusinprogress", + "completed": "statuscompleted", + "failed": "statusfailed"} + + filter_dict = {} filter_dict.update(self._parse_filter(filter_by)) - filter_str = "PartitionKey eq '" + filter_dict.get("status") + "'" - if filter_dict.get("customer_id"): - filter_str = filter_str + " and customer_id eq '" + str(filter_dict.get("customer_id")) + "'" + filter_str = self._filter_dict_to_string(filter_dict, "status") return list(self._service.query_entities( - self._operation_tables["status"], + self._operation_tables[mapping["in_progress"]], filter_str)) @retry_auto_reconnect def get_operations_completed(self, filter_by=None): - filter_dict = {"status": "completed"} + mapping = {"in_progress": "statusinprogress", + "completed": "statuscompleted", + "failed": "statusfailed"} + + filter_dict = {} filter_dict.update(self._parse_filter(filter_by)) - filter_str = "PartitionKey eq '" + filter_dict.get("status") + "'" - if filter_dict.get("customer_id"): - filter_str = filter_str + " and customer_id eq '" + str(filter_dict.get("customer_id")) + "'" + filter_str = self._filter_dict_to_string(filter_dict, "status") return list(self._service.query_entities( - self._operation_tables["status"], + self._operation_tables[mapping["completed"]], filter_str)) @retry_auto_reconnect def get_operations_failed(self, filter_by=None): - filter_dict = {"status": "failed"} + mapping = {"in_progress": "statusinprogress", + "completed": "statuscompleted", + "failed": "statusfailed"} + + filter_dict = {} filter_dict.update(self._parse_filter(filter_by)) - filter_str = "PartitionKey eq '" + filter_dict.get("status") + "'" - if filter_dict.get("customer_id"): - filter_str = filter_str + " and customer_id eq '" + str(filter_dict.get("customer_id")) + "'" + filter_str = self._filter_dict_to_string(filter_dict, "status") return list(self._service.query_entities( - self._operation_tables["status"], + self._operation_tables[mapping["failed"]], filter_str)) @retry_auto_reconnect diff --git a/bexi/operation_storage/exceptions.py b/bexi/operation_storage/exceptions.py index f2c479b..fb88798 100644 --- a/bexi/operation_storage/exceptions.py +++ b/bexi/operation_storage/exceptions.py @@ -11,6 +11,10 @@ class OperationStorageLostException(OperationStorageException): pass +class OperationStorageBadRequestException(OperationStorageException): + pass + + class AddressAlreadyTrackedException(OperationStorageException): pass @@ -45,3 +49,7 @@ class OperationNotFoundException(OperationStorageException): class DuplicateOperationException(OperationNotFoundException): pass + + +class BalanceConcurrentException(OperationNotFoundException): + pass diff --git a/bexi/operation_storage/interface.py b/bexi/operation_storage/interface.py index 54c7e76..f7f076f 100644 --- a/bexi/operation_storage/interface.py +++ b/bexi/operation_storage/interface.py @@ -1,8 +1,8 @@ from abc import ABC, abstractmethod import time -from .exceptions import OperationStorageLostException,StatusInvalidException,\ - InvalidOperationException, NoBlockNumException +from .exceptions import OperationStorageLostException, StatusInvalidException,\ + InvalidOperationException, NoBlockNumException, OperationStorageBadRequestException from ..operation_storage import operation_formatter from ..utils import date_to_string @@ -211,6 +211,9 @@ def f_retry(self, *args, **kwargs): if wait_in_ms > 0: time.sleep(wait_in_ms / 1000) continue + # if it reacheas here, we have a persistent exception + if "bad request" in str(last_exception).lower() or "badrequest" in str(last_exception).lower(): + raise OperationStorageBadRequestException(last_exception) raise OperationStorageLostException(last_exception) return f_retry diff --git a/bexi/operation_storage/mongodb_storage.py b/bexi/operation_storage/mongodb_storage.py index 83f0739..25f03ef 100644 --- a/bexi/operation_storage/mongodb_storage.py +++ b/bexi/operation_storage/mongodb_storage.py @@ -14,6 +14,7 @@ DuplicateOperationException, InvalidOperationException, OperationStorageException) +from bitshares.amount import Amount class MongoDBOperationsStorage(BasicOperationStorage): @@ -30,17 +31,31 @@ def get_retry_exceptions(self): pymongo.errors.ServerSelectionTimeoutError,) @retry_auto_reconnect - def __init__(self, mongodb_config, mongodb_client=None): + def __init__(self, mongodb_config, mongodb_client=None, purge=False): super(MongoDBOperationsStorage, self).__init__() if not mongodb_config: raise Exception("No mongo db configuration provided!") self._mongodb_config = mongodb_config + # ensure defaults + self._mongodb_config["operation_collection"] = self._mongodb_config.get("operation_collection", "operations") + self._mongodb_config["status_collection"] = self._mongodb_config.get("status_collection", "status") + self._mongodb_config["address_collection"] = self._mongodb_config.get("address_collection", "address") + if not mongodb_client: - mongodb_client = MongoClient(host=mongodb_config["seeds"]) + mongodb_client = MongoClient(host=mongodb_config["seeds"], + socketTimeoutMS=1000, + connectTimeoutMS=1000, + serverSelectionTimeoutMS=1000) + self._db = mongodb_client[mongodb_config["db"]] + if purge: + mongodb_client[self._mongodb_config["db"]][self._mongodb_config["status_collection"]].drop() + mongodb_client[self._mongodb_config["db"]][self._mongodb_config["operation_collection"]].drop() + mongodb_client[self._mongodb_config["db"]][self._mongodb_config["address_collection"]].drop() + # if collections doesnt exist, create it if mongodb_config["operation_collection"] not in\ self._db.collection_names(include_system_collections=False): @@ -222,6 +237,7 @@ def get_operation(self, incident_id): @retry_auto_reconnect def get_balances(self, take, continuation=None, addresses=None): + # deprecated, redo logic according to azure storage for performance address_balances = collections.defaultdict(lambda: collections.defaultdict()) if continuation is None: @@ -252,26 +268,35 @@ def get_balances(self, take, continuation=None, addresses=None): "customer_id": addrs["customer_id"] }): this_block_num = operation["block_num"] + asset_id = operation["amount_asset_id"] + balance = Amount({ + "asset_id": asset_id, + "amount": address_balances[address].get(asset_id, "0")}) + amount_value = Amount({ + "asset_id": asset_id, + "amount": operation["amount_value"]}) + if addrs["account_id"] == operation["from"]: # negative - balance = address_balances[address].get(asset_id, 0) - address_balances[address][asset_id] =\ - balance - operation["amount_value"] + str(int(balance - amount_value)) # fee as well asset_id = operation["fee_asset_id"] - balance = address_balances[address].get(asset_id, 0) + balance = Amount({ + "asset_id": asset_id, + "amount": address_balances[address].get(asset_id, "0")}) + fee_value = Amount({ + "asset_id": asset_id, + "amount": operation["fee_value"]}) address_balances[address][asset_id] =\ - balance - operation["fee_value"] + str(int(balance - fee_value)) elif addrs["account_id"] == operation["to"]: # positive - balance = address_balances[address].get(asset_id, 0) - address_balances[address][asset_id] =\ - balance + operation["amount_value"] + str(int(balance + amount_value)) else: raise InvalidOperationException() max_block_number = max(max_block_number, this_block_num) @@ -291,6 +316,12 @@ def _parse_filter(self, filter_by): if filter_by.get("address"): addrs = split_unique_address(filter_by.pop("address")) return {"customer_id": addrs["customer_id"]} + if filter_by.get("from"): + addrs = split_unique_address(filter_by.pop("from")) + return {"customer_id": addrs["customer_id"]} + if filter_by.get("to"): + addrs = split_unique_address(filter_by.pop("to")) + return {"customer_id": addrs["customer_id"]} if filter_by: raise Exception("Filter not supported") return {} diff --git a/bexi/operation_storage/operation_formatter.py b/bexi/operation_storage/operation_formatter.py index f70b92d..f84c4f4 100644 --- a/bexi/operation_storage/operation_formatter.py +++ b/bexi/operation_storage/operation_formatter.py @@ -12,6 +12,7 @@ INCIDENT_ID_REGEX = None +CHAIN_IDENTIFIER_REGEX = None def validate_incident_id(incident_id): @@ -37,6 +38,29 @@ def validate_incident_id(incident_id): raise InvalidOperationIdException() +def validate_chain_identifier(incident_id): + """ + Validates the given incident id against the configured regular expresssion + :param incident_id: + :type incident_id: + """ + global CHAIN_IDENTIFIER_REGEX + if CHAIN_IDENTIFIER_REGEX is None: + CHAIN_IDENTIFIER_REGEX = re.compile( + Config.get("operation_storage", + "chain_identifier", + "format", + default="[0-9a-fA-F]+:[0-9]{1}") + ) + try: + if incident_id is None: + raise InvalidOperationIdException() + if CHAIN_IDENTIFIER_REGEX.match(incident_id) is None: + raise InvalidOperationIdException() + except KeyError: + raise InvalidOperationIdException() + + def decode_operation(operation): """ The given operation comes directly from the blockchain and is here reformatted. This is done to better suit the needs of our processing and also due to the fact @@ -48,7 +72,9 @@ def decode_operation(operation): memo = operation["op"][1].get("memo", "unknown") new_operation = { - "block_num": operation.get("block_num"), + "block_num": operation.get("block_num", None), + "tx_in_block": operation.get("tx_in_block", None), + "op_in_tx": operation.get("op_in_tx", None), "memo": json.dumps(memo), "from": operation["op"][1]["from"], "to": operation["op"][1]["to"], @@ -77,7 +103,10 @@ def decode_operation(operation): ) if not memo["incident_id"]: - memo["incident_id"] = chain_identifier + if operation.get("incident_id", None) is not None: + memo["incident_id"] = operation["incident_id"] + else: + memo["incident_id"] = chain_identifier data = { "chain_identifier": chain_identifier, diff --git a/bexi/operation_storage/operation_schema.json b/bexi/operation_storage/operation_schema.json index b2e9c1d..67fa21c 100644 --- a/bexi/operation_storage/operation_schema.json +++ b/bexi/operation_storage/operation_schema.json @@ -13,7 +13,8 @@ "memo", "expiration", "customer_id", - "chain_identifier" + "chain_identifier", + "op_in_tx" ], "properties": { "status": { @@ -30,6 +31,17 @@ "number" ] }, + "op_in_tx": { + "type": [ + "number" + ] + }, + "tx_in_block": { + "type": [ + "null", + "number" + ] + }, "fee_value": { "type": "number" }, @@ -43,7 +55,7 @@ "type": "string" }, "amount_value": { - "type": "number" + "type": ["number", "string"] }, "amount_asset_id": { "type": "string" diff --git a/bexi/wsgi/blockchain_monitor_service/implementations.py b/bexi/wsgi/blockchain_monitor_service/implementations.py index e8e745e..d97b082 100644 --- a/bexi/wsgi/blockchain_monitor_service/implementations.py +++ b/bexi/wsgi/blockchain_monitor_service/implementations.py @@ -4,7 +4,9 @@ from ...connection import requires_blockchain from ... import Config, factory, __VERSION__ from ...wsgi import flask_setup + import json +from datetime import datetime, timedelta class BlockChainMonitorOutOfSyncExcpetion(Exception): @@ -30,14 +32,27 @@ def isalive(bitshares_instance): last_block = bitshares_instance.rpc.get_dynamic_global_properties().get("last_irreversible_block_num") last_block_stored = _get_os().get_last_head_block_num() + issueIndicators = None if last_block_stored != 0 and last_block_stored < last_block - 5: - raise BlockChainMonitorOutOfSyncExcpetion(json.dumps( - { - "description": "Blockchain Monitor out of sync", - "last_processed": last_block_stored, - "last_irreversible_block_num": last_block - } - )) + issueIndicators = [ + {"type": "unknown", + "value": "Blockchain Monitor out of sync"} + ] + + blockchain_time = datetime.strptime( + bitshares_instance.rpc.get_dynamic_global_properties().get("time"), "%Y-%m-%dT%H:%M:%S" + ) + + delayed_blockchain_time = ( + blockchain_time + + timedelta(seconds=Config.get("bitshares", "blockchain_allowed_sync_delay_in_sec", 180))) + + if delayed_blockchain_time < datetime.now(): + if issueIndicators is None: + issueIndicators = [] + issueIndicators.append({ + "type": "unknown", + "value": "Blockchain is stuck in the past: Chain time is " + str(blockchain_time) + ", server time is " + str(datetime.now())}) info = { "name": Config.get("wsgi", "name"), @@ -49,5 +64,7 @@ def isalive(bitshares_instance): "last_processed": last_block_stored, "last_irreversible_block_num": last_block} } + if issueIndicators is not None: + info["issueIndicators"] = issueIndicators return info diff --git a/bexi/wsgi/manage_service/implementations.py b/bexi/wsgi/manage_service/implementations.py index 952e904..3311c1a 100644 --- a/bexi/wsgi/manage_service/implementations.py +++ b/bexi/wsgi/manage_service/implementations.py @@ -15,15 +15,17 @@ from ...addresses import ( split_unique_address, - get_from_address_from_operation, + get_from_address, create_memo, - get_to_address_from_operation) + get_to_address, + create_unique_address, + is_withdraw) from ...connection import requires_blockchain from ... import Config, factory from ... import utils from ...operation_storage import operation_formatter -from ...operation_storage.exceptions import InputInvalidException +from ...operation_storage.exceptions import InputInvalidException, InvalidOperationIdException from bitsharesapi.exceptions import UnhandledRPCError from bitshares.wallet import Wallet @@ -115,7 +117,7 @@ def is_valid_address(address, bitshares_instance=None): else: Account(split["account_id"], bitshares_instance=bitshares_instance) return True - except Exception: + except AccountDoesNotExistsException: return False @@ -139,14 +141,14 @@ def get_balances(take, continuation=None): except InputInvalidException: raise BadArgumentException() - continuation = balancesDict.pop("continuation", None) + continuation = balancesDict.pop("continuation") all_accounts = sorted(balancesDict.keys()) all_balances = [] for account in all_accounts: block_num = balancesDict[account].pop("block_num") for asset_id in balancesDict[account].keys(): - if balancesDict[account][asset_id] > 0: + if int(balancesDict[account][asset_id]) > 0: all_balances.append( { "address": account, @@ -174,21 +176,29 @@ def _get_from_history(address, take, to_or_from, after_hash=None): all_operations = [] + # normalize address (given address can contain id or name) address_split = split_unique_address(address) + address = create_unique_address(address_split["account_id"], address_split["customer_id"]) + afterTimestamp = datetime.fromtimestamp(0) + + filter_dict = {to_or_from: address_split["account_id"]} + if address_split["customer_id"]: + filter_dict.update({"customer_id": address_split["customer_id"]}) + for operation in _get_os().get_operations_completed( - filter_by={"customer_id": address_split["customer_id"]}): + filter_by=filter_dict): # deposit, thus from add_op = { "timestamp": operation.get("timestamp", None), - "fromAddress": get_from_address_from_operation(operation), - "toAddress": get_to_address_from_operation(operation), + "fromAddress": get_from_address(operation), + "toAddress": get_to_address(operation), "assetId": operation["amount_asset_id"], "amount": str(operation["amount_value"]), "hash": operation["chain_identifier"] } - if (to_or_from == "to" and address == add_op["toAddress"]) or\ - (to_or_from == "from" and address == add_op["fromAddress"]): + if (to_or_from == "to" and add_op["toAddress"] == address) or\ + (to_or_from == "from" and add_op["fromAddress"] == address): all_operations.append(add_op) if operation["chain_identifier"] == after_hash and add_op["timestamp"]: afterTimestamp = utils.string_to_date(add_op["timestamp"]) @@ -212,7 +222,7 @@ def build_transaction(incidentId, fromAddress, fromMemoWif, toAddress, asset_id, amount, includeFee, bitshares_instance=None): """ Builds a transaction (without signature) - :param guid operationId: Lykke unique operation ID + :param guid incidentId: Lykke unique operation ID :param str fromAddress: Source address :param str toAddress: Destination address :param str assetId: Asset ID to transfer @@ -279,17 +289,7 @@ def obtain_raw_tx(): to_address["account_id"], bitshares_instance=bitshares_instance) - if utils.is_exchange_account(from_account["id"]) and utils.is_exchange_account(to_account["id"]): - # internal shift - memo_plain = create_memo(fromAddress, incidentId) - elif utils.is_exchange_account(from_account["id"]): - # Withdrawal - memo_plain = create_memo(fromAddress, incidentId) - elif utils.is_exchange_account(to_account["id"]): - # Deposit - memo_plain = create_memo(toAddress, incidentId) - else: - raise Exception("No exchange account involved") + memo_plain = create_memo(from_address, to_address, incidentId) try: # Construct amount @@ -337,7 +337,10 @@ def obtain_raw_tx(): tx = obtain_raw_tx() # Add additional/optional information + # - add incident_id as fallback for internal database + # - add decoded memo to avoid double decoding tx.update({ + "incident_id": incidentId, "decoded_memo": memo_plain, }) @@ -374,6 +377,8 @@ def map_operation(tx, op_in_tx, operation): op_in_tx=op_in_tx, transaction_id=tx["transaction_id"] ) + if tx.get("incident_id", None) is not None: + j["incident_id"] = tx["incident_id"] return operation_formatter.decode_operation(j) for op_in_tx, operation in enumerate(tx.get("operations", [])): @@ -390,9 +395,9 @@ def map_operation(tx, op_in_tx, operation): try: tx = tx.broadcast() if tx.get("id", None): - return {"chain_identifier": tx["id"] + ":" + str(tx["trx_num"]), "block_num": tx["block_num"]} + return {"hash": tx["id"] + ":" + str(tx["trx_num"]), "block": tx["block_num"]} else: - return {"chain_identifier": "no_id_given", "block_num": -1} + return {"hash": "no_id_given", "block": -1} except UnhandledRPCError as e: if "insufficient_balance" in str(e): raise NotEnoughBalanceException() @@ -418,15 +423,23 @@ def map_operation(tx, op_in_tx, operation): for op_in_tx, operation in enumerate(tx.get("operations", [])): op = map_operation(tx, op_in_tx, operation) op["block_num"] = block_num + (op_in_tx + 1) * 0.1 + op["tx_in_block"] = 0 op["fee_value"] = 0 storage.flag_operation_completed(op) - return {"chain_identifier": "virtual_transfer", "block_num": op["block_num"]} + return {"hash": "virtual_transfer", "block": op["block_num"]} else: raise Exception("This should be unreachable") def get_broadcasted_transaction(operationId): - operation_formatter.validate_incident_id(operationId) + try: + operation_formatter.validate_incident_id(operationId) + except InvalidOperationIdException as e: + # also allow hash + try: + operation_formatter.validate_chain_identifier(operationId) + except InvalidOperationIdException: + raise e operation = _get_os().get_operation(operationId) diff --git a/bexi/wsgi/manage_service/views.py b/bexi/wsgi/manage_service/views.py index 3e765e9..3518f89 100644 --- a/bexi/wsgi/manage_service/views.py +++ b/bexi/wsgi/manage_service/views.py @@ -25,8 +25,7 @@ MemoMatchingFailedException, BadArgumentException ) -from bexi import addresses -from bexi.addresses import split_unique_address +from ... import addresses blueprint_manage_service = Blueprint("Blockchain.Api", __name__) @@ -50,8 +49,8 @@ def _get_take(): custom_abort(400) -def _get_continuation(): - # continuation is optional +def _get_continuation_int(): + # continuation is optional, int continuation = request.args.get("continuation", None) if continuation: if type(continuation) != int and type(continuation) != str: @@ -64,6 +63,17 @@ def _get_continuation(): return 0 +def _get_continuation_str(): + # continuation is optional + continuation = request.args.get("continuation", None) + if continuation: + if type(continuation) != str: + custom_abort(400) + return continuation + else: + return None + + def _body(parameter_name, default_value=None): try: lookup = request.get_json() @@ -98,9 +108,12 @@ def get_contants(): [GET] /api/constants """ return jsonify( - {"publicAddressExtension": {"separator": addresses.DELIMITER, - "displayName": "Memo", - "baseDisplayName": "Send to account"} + { + "publicAddressExtension": { + "separator": addresses.DELIMITER, + "displayName": "Memo", + "baseDisplayName": "Send to account" + } } ) @@ -125,7 +138,7 @@ def get_all_assets(): """ try: return jsonify( - implementations.get_all_assets(_get_take(), _get_continuation()) + implementations.get_all_assets(_get_take(), _get_continuation_int()) ) except ValueError: custom_abort(400) @@ -215,7 +228,7 @@ def get_balances(): """ try: return jsonify( - implementations.get_balances(_get_take(), _get_continuation()) + implementations.get_balances(_get_take(), _get_continuation_str()) ) except BadArgumentException: custom_abort(400) @@ -411,9 +424,12 @@ def get_address_history_from(address): :type address: string """ - return jsonify( - implementations.get_address_history_from(address, _get_take(), _get("afterHash")) - ) + try: + return jsonify( + implementations.get_address_history_from(address, _get_take(), _get("afterHash")) + ) + except AccountDoesNotExistsException: + custom_abort(204) @blueprint_manage_service.route("/api/transactions/history/to/
", methods=["GET"]) @@ -425,6 +441,9 @@ def get_address_history_to(address): :type address: string """ - return jsonify( - implementations.get_address_history_to(address, _get_take(), _get("afterHash")) - ) + try: + return jsonify( + implementations.get_address_history_to(address, _get_take(), _get("afterHash")) + ) + except AccountDoesNotExistsException: + custom_abort(204) diff --git a/bexi/wsgi/sign_service/implementations.py b/bexi/wsgi/sign_service/implementations.py index db3909f..0c90c44 100644 --- a/bexi/wsgi/sign_service/implementations.py +++ b/bexi/wsgi/sign_service/implementations.py @@ -25,6 +25,10 @@ def sign(tx, keys): signedTransaction["decoded_memo"] = tx["decoded_memo"] except KeyError: pass + try: + signedTransaction["incident_id"] = tx["incident_id"] + except KeyError: + pass return dict( signedTransaction=json.dumps(signedTransaction) @@ -42,7 +46,7 @@ def create_address(): } else: return { - "privateKey": Config.get("bitshares", "exchange_account_active_key"), + "privateKey": Config.get("bitshares", "exchange_account_active_key", default="keep_keys_private"), "publicAddress": create_unique_address(Config.get("bitshares", "exchange_account_name")), "addressContext": Config.get("bitshares", "exchange_account_memo_key"), } diff --git a/cli.py b/cli.py index 9b4c7cd..a81a0d0 100755 --- a/cli.py +++ b/cli.py @@ -1,15 +1,14 @@ #!/usr/bin/env python3 import click -from bexi.wsgi.app import ( - get_manage_service_app, - get_sign_service_app, - get_blockchain_monitor_service_app -) -from bexi import Config +import time +from bexi import Config, factory from bexi.connection import requires_blockchain -from bexi.blockchain_monitor import BlockchainMonitor + import logging import threading +import json +from pprint import pprint +from bexi.operation_storage.exceptions import OperationNotFoundException config = Config.get("wsgi") @@ -26,6 +25,8 @@ def wsgi(host, port): host = host or config["host"] port = port or config["port"] + from bexi.wsgi.app import get_manage_service_app, get_sign_service_app + app = get_manage_service_app() app = get_sign_service_app(app) app.logger.info("Starting " + config["name"] + " with manage and sign service ...") @@ -39,6 +40,8 @@ def sign_service(host, port): host = host or config["host"] port = port or config["port"] + from bexi.wsgi.app import get_sign_service_app + app = get_sign_service_app() app.logger.info("Starting " + config["name"] + " sign service ...") app.run(host=host, port=port) @@ -51,6 +54,8 @@ def manage_service(host, port): host = host or config["host"] port = port or config["port"] + from bexi.wsgi.app import get_manage_service_app + app = get_manage_service_app() app.logger.info("Starting " + config["name"] + " manage service ...") app.run(host=host, port=port) @@ -62,6 +67,9 @@ def manage_service(host, port): def blockchain_monitor_service(host, port): host = host or config["host"] port = port or config["port"] + + from bexi.wsgi.app import get_blockchain_monitor_service_app + app = get_blockchain_monitor_service_app() logging.getLogger(__name__).info("Starting BitShares blockchain monitor as coroutines ...") @@ -74,13 +82,15 @@ def blockchain_monitor_service(host, port): @main.command() -def only_blockchain_monitor(): +@click.option("--start") +@click.option("--stop") +def only_blockchain_monitor(start, stop): Config.load(["config_bitshares_connection.yaml", "config_bitshares_memo_keys.yaml", "config_bitshares.yaml", "config_operation_storage.yaml"]) logging.getLogger(__name__).info("Starting BitShares blockchain monitor ...") - start_block_monitor() + start_block_monitor(start, stop) @main.command() @@ -90,15 +100,106 @@ def only_blockchain_monitor_service(host, port): host = host or config["host"] port = port or config["port"] + from bexi.wsgi.app import get_blockchain_monitor_service_app + app = get_blockchain_monitor_service_app() app.logger.info("Starting " + config["name"] + " blockchain monitor service ...") app.run(host=host, port=port) @requires_blockchain -def start_block_monitor(): - monitor = BlockchainMonitor() - monitor.listen() +def start_block_monitor(start=None, stop=None): + from bexi.blockchain_monitor import BlockchainMonitor + retry = True + while (retry): + retry = stop is None + try: + monitor = BlockchainMonitor() + + if start is not None: + monitor.start_block = start + if stop is not None: + monitor.stop_block = stop + + monitor.listen() + except Exception as e: + logging.getLogger(__name__).info("Blockchain monitor failed, exception below. Retrying after sleep") + logging.getLogger(__name__).exception(e) + time.sleep(1.5) + + logging.getLogger(__name__).error("Monitoring done") + + +@main.command() +@click.option("--txid") +@click.option("--customerid") +@click.option("--contains") +@click.option("--status") +@click.option("--incidentid") +def find(txid, customerid, contains, status, incidentid): + Config.load(["config_bitshares_connection.yaml", + "config_bitshares.yaml", + "config_operation_storage.yaml"]) + + storage = factory.get_operation_storage() + + def get_all(): + return (storage.get_operations_completed() + + storage.get_operations_in_progress() + + storage.get_operations_failed()) + operations = [] + + if contains: + for op in get_all(): + print(op) + if status is not None and not status == op["status"]: + continue + + if contains in str(op): + operations.append(op) + + if incidentid: + for op in list(storage._service.query_entities( + storage._operation_tables["incident"])): + if incidentid in str(op): + operations.append(op) + + print("---------- finding transfers ---------------") + print("found: " + str(len(operations))) + + for op in operations: + pprint(op) + + +@main.command() +@click.option("--take") +def balance(take=100): + Config.load(["config_bitshares_connection.yaml", + "config_bitshares.yaml", + "config_operation_storage.yaml"]) + + pprint(factory.get_operation_storage().get_balances(take)) + + +@main.command() +@click.option("--take") +def balance_calc(take=100): + Config.load(["config_bitshares_connection.yaml", + "config_bitshares.yaml", + "config_operation_storage.yaml"]) + + pprint(factory.get_operation_storage()._get_balances_recalculate(take)) + + +@main.command() +def tracked(): + Config.load(["config_bitshares_connection.yaml", + "config_bitshares.yaml", + "config_operation_storage.yaml"]) + + storage = factory.get_operation_storage() + + pprint(list(storage._service.query_entities(storage._azure_config["address_table"] + "balance"))) # @main.command() diff --git a/setup.py b/setup.py index c43fb47..d74637d 100755 --- a/setup.py +++ b/setup.py @@ -1,22 +1,24 @@ #!/usr/bin/env python -import bexi - from setuptools import setup, find_packages import sys +import os assert sys.version_info[0] == 3, "We require Python > 3" +with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'VERSION')) as version_file: + version = version_file.read().strip() + setup( name='bexi', - version=bexi.__VERSION__, + version=version, description=( 'BitShares Exchange Integration (BEXI).' 'A toolkit that allows to deal with deposits and withdrawals on' 'the BitShares Blockchain.' ), long_description=open('README.rst').read(), - download_url='https://github.com/blockchainbv/bexi/tarball/' + bexi.__VERSION__, + download_url='https://github.com/blockchainbv/bexi/tarball/' + version, author='Blockchain BV', author_email='info@BlockchainBV.com', maintainer='Blockchain Projects BV', diff --git a/tests/abstract_tests.py b/tests/abstract_tests.py index 10361bb..655d834 100644 --- a/tests/abstract_tests.py +++ b/tests/abstract_tests.py @@ -81,6 +81,7 @@ class ATestOperationStorage(ATestnetTest): TEST_OP = {'block_num': 23645414, 'transaction_id': 23, "op_in_tx": 2, + "tx_in_block": 0, 'op': ['transfer', { "fee": { diff --git a/tests/test_blockchain_monitor.py b/tests/test_blockchain_monitor.py index adfc765..858381f 100644 --- a/tests/test_blockchain_monitor.py +++ b/tests/test_blockchain_monitor.py @@ -8,13 +8,14 @@ class TestBlockchainMonitor(ATestnetTest): def setUp(self): # only load the config we want (no active keys!) - if Config.data and Config.data["network_type"] != "Test": + if Config.data and Config.data.get("network_type", None) != "Test": connection.reset() Config.load(["config_bitshares_connection.yaml", "config_bitshares_memo_keys.yaml", "config_bitshares.yaml", - "config_operation_storage.yaml"]) + "config_operation_storage.yaml", + "../tests/config_test.yaml"]) Config.data["operation_storage"]["use"] = "mongodbtest" Config.data["network_type"] = "Test" Config.data["bitshares"]["connection"]["Test"]["nobroadcast"] = True @@ -25,6 +26,6 @@ def tearDown(self): @requires_blockchain def test_listen(self): monitor = BlockchainMonitor() - monitor.start_block = 14972965 - monitor.stop_block = 14972975 + monitor.start_block = 18351647 + monitor.stop_block = 18351651 monitor.listen() diff --git a/tests/test_integration.py b/tests/test_integration.py index d5cd63e..17ac199 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -1,10 +1,11 @@ from flask.helpers import url_for -from tests.abstract_tests import AFlaskTest -from bexi import Config +from tests.abstract_tests import AFlaskTest, ATestOperationStorage +from bexi import Config, utils from bexi.addresses import create_unique_address from time import sleep import json +from bexi.wsgi.manage_service import implementations class TestIntegration(AFlaskTest): @@ -126,6 +127,12 @@ def test_get_balances(self): self.invalidate(url_for('Blockchain.Api.get_balances', take="35.23"), 400) + answer1 = self.invalidate(url_for('Blockchain.Api.get_balances', + take="1", + continuation='{"nextpartitionkey": "1!4!NjY5", "nextrowkey": "1!64!MS4yLjIwNDA3OjVmNWUyZjU1LWNmYzUtNDcwZi1iYmU4LTgwOTQyZGUxMzFkZQ--"}' + ), + 200) + def test_build_transaction(self): self.invalidate(url_for('Blockchain.Api.build_transaction'), 400, diff --git a/tests/test_integration_broadcast.py b/tests/test_integration_broadcast.py index 86cd356..ef9e61d 100644 --- a/tests/test_integration_broadcast.py +++ b/tests/test_integration_broadcast.py @@ -9,6 +9,7 @@ from bexi.blockchain_monitor import BlockchainMonitor from bitshares.bitshares import BitShares from bexi import __VERSION__ +from bexi.utils import get_exchange_account_name class TestIntegration(AFlaskTest): @@ -45,6 +46,8 @@ def test_wallet(self): @requires_blockchain def test_track_balance(self, bitshares_instance=None): + store = factory.get_operation_storage(purge=True) + addressDW = self.client.post(url_for('Blockchain.SignService.wallets')).json["publicAddress"] addressEW = create_unique_address(self.get_customer_id(), "") @@ -85,7 +88,7 @@ def build_sign_and_broadcast(op, memo_key, active_key): print(broadcast_transaction) - return broadcast.json["block_num"] + return broadcast.json["block"] def flag_completed(block_num): network = Config.get("network_type") @@ -93,7 +96,6 @@ def flag_completed(block_num): # connection["keys"] = key instance = BitShares(**connection) - store = factory.get_operation_storage(purge=False) irr = instance.rpc.get_dynamic_global_properties().get("last_irreversible_block_num") head = instance.rpc.get_dynamic_global_properties().get("head_block_number") @@ -123,7 +125,7 @@ def flag_completed(block_num): response = self.client.get(url_for('Blockchain.Api.get_balances') + "?take=2") assert response.status_code == 200 self.assertEqual(response.json["items"], - [{'address': addressDW, 'assetId': '1.3.0', 'balance': 110000, 'block': block_num * 10}]) + [{'address': addressDW, 'assetId': '1.3.0', 'balance': "110000", 'block': block_num * 10}]) block_num = build_sign_and_broadcast( { @@ -140,17 +142,21 @@ def flag_completed(block_num): response = self.client.get(url_for('Blockchain.Api.get_balances') + "?take=2") assert response.status_code == 200 - self.assertEqual(response.json["items"][0]["balance"], 10000) + self.assertEqual(response.json["items"][0]["balance"], "10000") assert block_num > 0 + balance_block_num = block_num + response = self.client.get(url_for('Blockchain.Api.get_broadcasted_transaction', operationId="cbeea30e-2218-4405-9089-86d003e4df61")) self.assertEqual(response.json["block"], block_num * 10) + WW = create_unique_address(self.get_customer_id(), "some_user_memo") + block_num = build_sign_and_broadcast( { "operationId": "cbeea30e-2218-4405-9089-86d003e4df62", "fromAddress": addressHW, - "toAddress": create_unique_address(self.get_customer_id(), ""), + "toAddress": WW, "assetId": "1.3.0", "amount": 100000, "includeFee": True @@ -163,7 +169,7 @@ def flag_completed(block_num): response = self.client.get(url_for('Blockchain.Api.get_balances') + "?take=2") assert response.status_code == 200 self.assertEqual(response.json["items"], - [{'address': addressDW, 'assetId': '1.3.0', 'balance': 10000, 'block': block_num * 10}]) + [{'address': addressDW, 'assetId': '1.3.0', 'balance': "10000", 'block': balance_block_num * 10}]) self.maxDiff = None @@ -175,13 +181,39 @@ def flag_completed(block_num): fromDW = self.client.get(url_for('Blockchain.Api.get_address_history_from', address=addressDW) + "?take=3") assert fromDW.status_code == 200 self.assertEqual(fromDW.json, - [{'amount': '100000', 'assetId': '1.3.0', 'fromAddress': addressDW, 'hash': fromDW.json[0]['hash'], 'timestamp': fromDW.json[0]['timestamp'], 'toAddress': 'lykke-test:'}]) + [{'amount': '100000', 'assetId': '1.3.0', 'fromAddress': addressDW, 'hash': fromDW.json[0]['hash'], 'timestamp': fromDW.json[0]['timestamp'], 'toAddress': 'lykke-test'}]) toHW = self.client.get(url_for('Blockchain.Api.get_address_history_to', address=addressHW) + "?take=3") assert toHW.status_code == 200 assert toHW.json == [] - fromHW = self.client.get(url_for('Blockchain.Api.get_address_history_from', address=addressHW) + "?take=3") + fromHW = self.client.get(url_for('Blockchain.Api.get_address_history_from', address=split_unique_address(addressHW)["account_id"]) + "?take=3") assert fromHW.status_code == 200 self.assertEqual(fromHW.json, - [{'amount': '99900', 'assetId': '1.3.0', 'fromAddress': addressHW, 'hash': fromHW.json[0]['hash'], 'timestamp': fromHW.json[0]['timestamp'], 'toAddress': 'lykke-customer:'}]) + [{'amount': '99900', 'assetId': '1.3.0', 'fromAddress': get_exchange_account_name(), 'hash': fromHW.json[0]['hash'], 'timestamp': fromHW.json[0]['timestamp'], 'toAddress': WW}]) + + response = self.client.get(url_for('Blockchain.Api.get_broadcasted_transaction', operationId="cbeea30e-2218-4405-9089-86d003e4df60")) + assert response.status_code == 200 + self.assertEqual(response.json['operationId'], + 'cbeea30e-2218-4405-9089-86d003e4df60') + + response = self.client.get(url_for('Blockchain.Api.get_broadcasted_transaction', operationId="cbeea30e-2218-4405-9089-86d003e4df61")) + assert response.status_code == 200 + self.assertEqual(response.json['operationId'], + 'cbeea30e-2218-4405-9089-86d003e4df61') + + response = self.client.get(url_for('Blockchain.Api.get_broadcasted_transaction', operationId="cbeea30e-2218-4405-9089-86d003e4df62")) + assert response.status_code == 200 + self.assertEqual(response.json['operationId'], + 'cbeea30e-2218-4405-9089-86d003e4df62') + + response = self.client.delete(url_for('Blockchain.Api.unobserve_address', address=addressDW)) + assert response.status_code == 200 + + response = self.client.delete(url_for('Blockchain.Api.unobserve_address', address=addressHW)) + assert response.status_code == 200 + + response = self.client.get(url_for('Blockchain.Api.get_balances') + "?take=2") + assert response.status_code == 200 + self.assertEqual(response.json["items"], + []) diff --git a/tests/test_mongo_db.py b/tests/test_mongo_db.py index 102fac4..afea494 100644 --- a/tests/test_mongo_db.py +++ b/tests/test_mongo_db.py @@ -19,7 +19,9 @@ def setUp(self): def _get_db_config(self): config = Config.get_config()["operation_storage"] - return config["mongodbtest"] + mongodb_config = config["mongodbtest"] + mongodb_config["operation_collection"] = mongodb_config.get("operation_collection", "operations") + return mongodb_config def test_not_reachable(self): mongodb_config = self._get_db_config() @@ -118,12 +120,12 @@ def test_index(self): "custom_index": { "type": "number", "description": "must be an int and is required" - }, + }, "content": { "type": "string", "description": "must be a string and is required" - } - } + } + } } }) @@ -138,8 +140,12 @@ def test_index(self): invalid_empty = {"content": "nothing"} + invalid_overflow = {"custom_index": 9999900000, + "content": "nothing"} + collection.insert_one(valid) collection.insert_one(also_valid) + collection.insert_one(invalid_overflow) self.assertRaises(pymongo.errors.WriteError, collection.insert_one, diff --git a/tests/test_operations_storage.py b/tests/test_operations_storage.py index cd688ca..4cb5691 100644 --- a/tests/test_operations_storage.py +++ b/tests/test_operations_storage.py @@ -8,13 +8,14 @@ from bexi.addresses import create_unique_address, split_unique_address from bexi.factory import get_operation_storage from jsonschema.exceptions import ValidationError +from bexi.utils import get_exchange_account_id class TestMongoOperationStorage(ATestOperationStorage): def setUp(self): super(TestMongoOperationStorage, self).setUp() - self.storage = get_operation_storage("mongodbtest") + self.storage = get_operation_storage("azuretest") def test_insert_and_complete(self): filled_operation = self.get_in_progress_op() @@ -32,6 +33,7 @@ def test_insert_and_complete(self): filled_operation) filled_operation = self.get_completed_op() + filled_operation["to"] = get_exchange_account_id() self.storage.flag_operation_completed(filled_operation) filled_operation["status"] = "in-progress" @@ -52,6 +54,7 @@ def test_insert(self): filled_operation = self.get_completed_op() filled_operation["chain_identifier"] = "some_other_chain_identifier_1" filled_operation["incident_id"] = "some_other_incident_id" + filled_operation["to"] = get_exchange_account_id() self.storage.insert_operation(filled_operation) self.assertRaises(DuplicateOperationException, @@ -100,8 +103,12 @@ def test_insert_wrong_status(self): filled_operation) def test_delete(self): - self.storage.insert_operation(self.get_in_progress_op()) - self.storage.flag_operation_completed(self.get_completed_op()) + op = self.get_in_progress_op() + op["to"] = get_exchange_account_id() + self.storage.insert_operation(op) + op = self.get_completed_op() + op["to"] = get_exchange_account_id() + self.storage.flag_operation_completed(op) operations = self.storage.get_operations_completed() @@ -125,9 +132,11 @@ def test_delete(self): assert len(self.storage.get_operations_in_progress()) == 0 def test_get_balance(self): - address = create_unique_address("lykke-customer") + address = create_unique_address(get_exchange_account_id()) addrs = split_unique_address(address) - asset = "BTS_test" + asset = "1.3.131" + + self.storage.track_address(address) filled_operation = self.get_completed_op() filled_operation["to"] = addrs["account_id"] @@ -150,15 +159,13 @@ def test_get_balance(self): self.storage.insert_operation(filled_operation) filled_operation["from"] = addrs["account_id"] - filled_operation["to"] = "some_user" + filled_operation["to"] = get_exchange_account_id() filled_operation["incident_id"] = "some_operation_id_4" filled_operation["chain_identifier"] = "some_chain_identifier_4" filled_operation["amount_value"] = 7 self.storage.insert_operation(filled_operation) - self.storage.track_address(address) - balances = self.storage.get_balances(2) assert balances[address][asset] == 18 @@ -174,7 +181,7 @@ def test_get_balance(self): "069548") def test_tracking(self): - address1 = create_unique_address("lykke-customer") + address1 = create_unique_address(get_exchange_account_id()) address2 = create_unique_address("lykke-test") addr2s = split_unique_address(address2) @@ -189,7 +196,7 @@ def test_tracking(self): filled_operation = self.get_completed_op() filled_operation["to"] = addr2s["account_id"] filled_operation["customer_id"] = addr2s["customer_id"] - filled_operation["amount_asset_id"] = "TEST" + filled_operation["amount_asset_id"] = "1.3.4123" filled_operation["amount_value"] = 1234 self.storage.insert_operation(filled_operation) @@ -197,7 +204,7 @@ def test_tracking(self): assert address1 not in balances.keys() assert address2 in balances.keys() - assert balances[address2]["TEST"] == 1234 + assert balances[address2]["1.3.4123"] == 1234 self.storage.untrack_address(address1) @@ -227,5 +234,5 @@ class TestAzureOperationStorageFactory(TestMongoOperationStorage): def setUp(self): super(TestAzureOperationStorageFactory, self).setUp() - self.storage = get_operation_storage("azuretest") + self.storage = get_operation_storage("mongodbtest") diff --git a/tests/test_wsgi_manage_service.py b/tests/test_wsgi_manage_service.py index 29fa314..a2a8822 100644 --- a/tests/test_wsgi_manage_service.py +++ b/tests/test_wsgi_manage_service.py @@ -3,7 +3,7 @@ from bexi import utils from bexi.wsgi.manage_service.implementations import MemoMatchingFailedException -from bexi.addresses import get_address_from_operation, create_unique_address, DELIMITER +from bexi.addresses import get_tracking_address, create_unique_address, DELIMITER from bexi.wsgi.manage_service.views import implementations from bexi.wsgi.manage_service.implementations import AssetNotFoundException from bexi.operation_storage.exceptions import AddressAlreadyTrackedException,\ @@ -21,6 +21,7 @@ class TestBlockchainApi(ATestOperationStorage): def setUp(self): super(TestBlockchainApi, self).setUp() + self.storage = self.storage = get_operation_storage("azuretest") implementations._get_os(storage=self.storage) def test_get_all_assets(self): @@ -87,7 +88,7 @@ def test_observe_address(self): address) def test_get_balances(self): - assert implementations.get_balances(1, 0) ==\ + assert implementations.get_balances(1) ==\ {'continuation': None, 'items': []} transfer = self.get_completed_op() @@ -96,7 +97,6 @@ def test_get_balances(self): transfer["customer_id"] = "user_name_bla" transfer["to"] = utils.get_exchange_account_id() - implementations._get_os().insert_operation(transfer) first = transfer.copy() @@ -104,7 +104,6 @@ def test_get_balances(self): transfer["customer_id"] = "user_name_bla_2" transfer["chain_identifier"] = "24:25" transfer["amount_value"] = 50000001 - implementations._get_os().insert_operation(transfer) second = transfer.copy() @@ -112,7 +111,6 @@ def test_get_balances(self): transfer["customer_id"] = "user_name_bla_3" transfer["chain_identifier"] = "24:24" transfer["amount_value"] = 50000002 - implementations._get_os().insert_operation(transfer) third = transfer.copy() @@ -122,11 +120,15 @@ def test_get_balances(self): transfer["from"] = utils.get_exchange_account_id() transfer["to"] = "some_dude" transfer["fee_value"] = 0 - implementations._get_os().insert_operation(transfer) - implementations.observe_address(get_address_from_operation(first)) - implementations.observe_address(get_address_from_operation(second)) - implementations.observe_address(get_address_from_operation(third)) + implementations.observe_address(get_tracking_address(first)) + implementations.observe_address(get_tracking_address(second)) + implementations.observe_address(get_tracking_address(third)) + + implementations._get_os().insert_operation(first) + implementations._get_os().insert_operation(second) + implementations._get_os().insert_operation(third) + implementations._get_os().insert_operation(transfer) sleep(1) @@ -155,10 +157,6 @@ def test_get_balances(self): 'block': 1010 * 10}], all_balances, ) - self.assertIn( - [], - all_balances, - ) def test_build_transaction_not_enough_balance(self): from_id = utils.get_exchange_account_id() @@ -315,6 +313,7 @@ def test_broadcast_transaction(self): def test_get_and_delete_broadcasted(self): completed = self.get_completed_op() completed["incident_id"] = "cbeea30e-2218-4405-9089-86d003e4df83" + completed["to"] = utils.get_exchange_account_id() implementations._get_os().insert_operation(completed) operation = implementations.get_broadcasted_transaction("cbeea30e-2218-4405-9089-86d003e4df83") @@ -345,26 +344,33 @@ def test_get_address_history_to(self): transfer["to"] = utils.get_exchange_account_id() implementations._get_os().insert_operation(transfer) - history = implementations.get_address_history_to(get_address_from_operation(transfer), 1, 0) + history = implementations.get_address_history_to(get_tracking_address(transfer), 1, 0) self.assertEqual( history, - [{'timestamp': history[0]['timestamp'], 'fromAddress': 'lykke2018:', 'toAddress': 'lykke-test:user_name_bla', 'assetId': '1.3.121', 'amount': '50000000', 'hash': 'chainidentifier_1234'}] + [{'timestamp': history[0]['timestamp'], 'fromAddress': 'lykke2018', 'toAddress': 'lykke-test:user_name_bla', 'assetId': '1.3.121', 'amount': '50000000', 'hash': 'chainidentifier_1234'}] ) def test_get_address_history_from(self): transfer = self.get_completed_op() transfer["incident_id"] = "cbeea30e-2218-4405-9089-86d003e4df81" transfer["chain_identifier"] = "chainidentifier_1235" - transfer["customer_id"] = "user_name_bla" + transfer["customer_id"] = "user_memo_message" transfer["from"] = utils.get_exchange_account_id() implementations._get_os().insert_operation(transfer) - history = implementations.get_address_history_from(get_address_from_operation(transfer), 1, 0) + history = implementations.get_address_history_to(get_tracking_address(transfer), 1, 0) + + self.assertEqual( + history, + [{'timestamp': history[0]['timestamp'], 'fromAddress': 'lykke-test', 'toAddress': 'lykke-dev-autotests:user_memo_message', 'assetId': '1.3.121', 'amount': '50000000', 'hash': 'chainidentifier_1235'}] + ) + + history = implementations.get_address_history_from(utils.get_exchange_account_id(), 1, 0) self.assertEqual( history, - [{'timestamp': history[0]['timestamp'], 'fromAddress': 'lykke-test:user_name_bla', 'toAddress': 'lykke-dev-autotests:', 'assetId': '1.3.121', 'amount': '50000000', 'hash': 'chainidentifier_1235'}] + [{'timestamp': history[0]['timestamp'], 'fromAddress': 'lykke-test', 'toAddress': 'lykke-dev-autotests:user_memo_message', 'assetId': '1.3.121', 'amount': '50000000', 'hash': 'chainidentifier_1235'}] ) @@ -372,5 +378,5 @@ class TestBlockchainApiAzure(TestBlockchainApi): def setUp(self): super(TestBlockchainApiAzure, self).setUp() - self.storage = self.storage = get_operation_storage("azuretest") + self.storage = self.storage = get_operation_storage("mongodbtest") implementations._get_os(storage=self.storage)