diff --git a/.env.example b/.env.example index d47dbd711..e123c35dc 100644 --- a/.env.example +++ b/.env.example @@ -35,6 +35,7 @@ export CONCURRENCY= # 1,2,3... Positive whole number for how many blocks get pro export YPRICEAPI_URL= # YPriceAPI url export YPRICEAPI_SIGNER= # Address you signed up for yPriceAPI on export YPRICEAPI_SIGNATURE= # Signature from subscription +export YPRICEMAGIC_GETLOGS_DOP= # Max number of concurrent eth_getLogs calls export YPRICEMAGIC_RECURSION_TIMEOUT= # Time in seconds till yPriceMagic Loop for pricing times out without returning data. export SKIP_YPRICEAPI= # False or True. Defaults to True. diff --git a/Makefile b/Makefile index 86301a59a..bc381ea21 100644 --- a/Makefile +++ b/Makefile @@ -275,6 +275,18 @@ apy-yeth-monitoring: apy-yeth: make up commands="yeth" network=eth filter=yeth + +aerodrome-apy-previews: + make up commands="drome_apy_previews" network=base + +aerodrome-apy-previews-monitoring: + make up commands="drome_apy_previews with_monitoring" network=base + +velodrome-apy-previews: + make up commands="drome_apy_previews" network=optimism + +velodrome-apy-previews-monitoring: + make up commands="drome_apy_previews with_monitoring" network=optimism # revenue scripts revenues: diff --git a/scripts/curve_apy_previews.py b/scripts/curve_apy_previews.py index fc9142ca7..edc815619 100644 --- a/scripts/curve_apy_previews.py +++ b/scripts/curve_apy_previews.py @@ -1,25 +1,19 @@ import dataclasses -import json import logging -import os import re -import shutil from time import sleep, time -from datetime import datetime -import traceback -import boto3 import requests import sentry_sdk from brownie import ZERO_ADDRESS, chain from brownie.exceptions import ContractNotFound from multicall.utils import await_awaitable -from y import Contract, Network, PriceError +from y import Contract, Network from y.exceptions import ContractNotVerified -from yearn.apy import Apy, ApyFees, ApyPoints, ApySamples, get_samples +from yearn.apy import Apy, ApyFees, ApyPoints, get_samples from yearn.apy.curve.simple import Gauge, calculate_simple -from yearn.exceptions import EmptyS3Export +from yearn.helpers import s3, telegram logger = logging.getLogger(__name__) sentry_sdk.set_tag('script','curve_apy_previews') @@ -33,7 +27,7 @@ def main(): gauges = _get_gauges() data = _build_data(gauges) - _upload(data) + s3.upload('apy-previews', 'curve-factory', data) def _build_data(gauges): samples = get_samples() @@ -142,99 +136,8 @@ def _get_gauges(): raise ValueError(f"Error fetching gauges from {url}") attempts += 1 sleep(.1) - - else: raise ValueError(f"can't get curve gauges for unsupported network: {chain.id}") - - -def _upload(data): - print(json.dumps(data, sort_keys=True, indent=4)) - - file_name, s3_path = _get_export_paths("curve-factory") - with open(file_name, "w+") as f: - json.dump(data, f) - - if os.getenv("DEBUG", None): - return - - for item in _get_s3s(): - s3 = item["s3"] - aws_bucket = item["aws_bucket"] - s3.upload_file( - file_name, - aws_bucket, - s3_path, - ExtraArgs={'ContentType': "application/json", 'CacheControl': "max-age=1800"}, - ) - - -def _get_s3s(): - s3s = [] - aws_buckets = os.environ.get("AWS_BUCKET").split(";") - aws_endpoint_urls = os.environ.get("AWS_ENDPOINT_URL").split(";") - aws_keys = os.environ.get("AWS_ACCESS_KEY").split(";") - aws_secrets = os.environ.get("AWS_ACCESS_SECRET").split(";") - - for i in range(len(aws_buckets)): - aws_bucket = aws_buckets[i] - aws_endpoint_url = aws_endpoint_urls[i] - aws_key = aws_keys[i] - aws_secret = aws_secrets[i] - kwargs = {} - if aws_endpoint_url is not None: - kwargs["endpoint_url"] = aws_endpoint_url - if aws_key is not None: - kwargs["aws_access_key_id"] = aws_key - if aws_secret is not None: - kwargs["aws_secret_access_key"] = aws_secret - - s3s.append( - { - "s3": boto3.client("s3", **kwargs), - "aws_bucket": aws_bucket - } - ) - - return s3s - - -def _get_export_paths(suffix): - out = "generated" - if os.path.isdir(out): - shutil.rmtree(out) - os.makedirs(out, exist_ok=True) - - api_path = os.path.join("v1", "chains", f"{chain.id}", "apy-previews") - - file_base_path = os.path.join(out, api_path) - os.makedirs(file_base_path, exist_ok=True) - - file_name = os.path.join(file_base_path, suffix) - s3_path = os.path.join(api_path, suffix) - return file_name, s3_path - + def with_monitoring(): - if os.getenv("DEBUG", None): - main() - return - from telegram.ext import Updater - - private_group = os.environ.get('TG_YFIREBOT_GROUP_INTERNAL') - public_group = os.environ.get('TG_YFIREBOT_GROUP_EXTERNAL') - updater = Updater(os.environ.get('TG_YFIREBOT')) - now = datetime.now() - message = f"`[{now}]`\nāš™ļø Curve Previews API for {Network.name()} is updating..." - ping = updater.bot.send_message(chat_id=private_group, text=message, parse_mode="Markdown") - ping = ping.message_id - try: - main() - except Exception as error: - tb = traceback.format_exc() - now = datetime.now() - message = f"`[{now}]`\nšŸ”„ Curve Previews API update for {Network.name()} failed!\n```\n{tb}\n```"[:4000] - updater.bot.send_message(chat_id=private_group, text=message, parse_mode="Markdown", reply_to_message_id=ping) - updater.bot.send_message(chat_id=public_group, text=message, parse_mode="Markdown") - raise error - message = f"āœ… Curve Previews API update for {Network.name()} successful!" - updater.bot.send_message(chat_id=private_group, text=message, reply_to_message_id=ping) \ No newline at end of file + telegram.run_job_with_monitoring('Curve Previews API', main) diff --git a/scripts/drome_apy_previews.py b/scripts/drome_apy_previews.py new file mode 100644 index 000000000..0d1a4ead0 --- /dev/null +++ b/scripts/drome_apy_previews.py @@ -0,0 +1,169 @@ + +""" +This script produces a list of velodrome/aerodrome gauges for which vaults can be created +""" + +import asyncio +import dataclasses +import logging +import os +from pprint import pformat +from time import time +from typing import List, Optional + +import sentry_sdk +from brownie import ZERO_ADDRESS, chain +from msgspec import Struct +from multicall.utils import await_awaitable +from tqdm.asyncio import tqdm_asyncio +from y import Contract, Network, magic +from y.exceptions import ContractNotVerified +from y.time import get_block_timestamp_async + +from yearn.apy import Apy, ApyFees, get_samples +from yearn.apy.common import SECONDS_PER_YEAR +from yearn.apy.curve.simple import Gauge +from yearn.apy.velo import COMPOUNDING +from yearn.debug import Debug +from yearn.helpers import s3, telegram +from yearn.v2.registry import Registry + +logger = logging.getLogger(__name__) +sentry_sdk.set_tag('script','curve_apy_previews') + +class Drome(Struct): + """Holds various params for a drome deployment""" + label: str + job_name: str + sugar: str + voter: str + # A random vault to check fees + fee_checker: str + +try: + drome = { + Network.Optimism: Drome( + label='velo', + job_name='Velodrome Previews API', + sugar='0x4D996E294B00cE8287C16A2b9A4e637ecA5c939f', + voter='0x41c914ee0c7e1a5edcd0295623e6dc557b5abf3c', + fee_checker='0xbC61B71562b01a3a4808D3B9291A3Bf743AB3361', + ), + Network.Base: Drome( + label='aero', + job_name='Aerodrome Previews API', + sugar='0x2073D8035bB2b0F2e85aAF5a8732C6f397F9ff9b', + voter='0x16613524e02ad97eDfeF371bC883F2F5d6C480A5', + fee_checker='0xEcFc1e5BDa4d4191c9Cab053ec704347Db87Be5d', + ), + }[chain.id] +except KeyError: + raise ValueError(f"there is no drome on unsupported network: {chain.id}") + +fee_checker = Contract(drome.fee_checker) +performance_fee = fee_checker.performanceFee() / 1e4 +management_fee = fee_checker.managementFee() / 1e4 +fee_checker_strat = Contract(fee_checker.withdrawalQueue(0)) + +keep = fee_checker_strat.localKeepVELO() / 1e4 +unkeep = 1 - keep + +fees = ApyFees(performance=performance_fee, management=management_fee, keep_velo=keep) + +def main(): + data = await_awaitable(_build_data()) + s3.upload('apy-previews', f'{drome.label}-factory', data) + +async def _build_data(): + start = int(time()) + block = get_samples().now + data = [d for d in await tqdm_asyncio.gather(*[_build_data_for_lp(lp, block) for lp in await _get_lps_with_vault_potential()]) if d] + for d in data: + d['updated'] = start + print(data) + return data + +async def _get_lps_with_vault_potential() -> List[dict]: + sugar_oracle = await Contract.coroutine(drome.sugar) + current_vaults = await Registry(include_experimental=False).vaults + current_underlyings = [str(vault.token) for vault in current_vaults] + return [lp for lp in await sugar_oracle.all.coroutine(999999999999999999999, 0, ZERO_ADDRESS) if lp[0] not in current_underlyings and lp[11] != ZERO_ADDRESS] + +async def _build_data_for_lp(lp: dict, block: Optional[int] = None) -> Optional[dict]: + lp_token = lp[0] + gauge_name = lp[1] + + try: + gauge = await _load_gauge(lp, block=block) + except ContractNotVerified as e: + return { + "gauge_name": gauge_name, + "apy": dataclasses.asdict(Apy("error:unverified", 0, 0, fees, error_reason=str(e))), + "block": block, + } + + try: + apy = await _staking_apy(lp, gauge.gauge, block=block) if gauge.gauge_weight > 0 else Apy("zero_weight", 0, 0, fees) + except Exception as error: + logger.error(error) + logger.error(gauge) + apy = Apy("error", 0, 0, fees, error_reason=":".join(str(arg) for arg in error.args)) + + return { + "gauge_name": gauge_name, + "gauge_address": str(gauge.gauge), + "token0": lp[5], + "token1": lp[8], + "lp_token": lp_token, + "weight": str(gauge.gauge_weight), + "inflation_rate": str(gauge.gauge_inflation_rate), + "working_supply": str(gauge.gauge_working_supply), + "apy": dataclasses.asdict(apy), + "block": block, + } + +async def _load_gauge(lp: dict, block: Optional[int] = None) -> Gauge: + lp_address = lp[0] + gauge_address = lp[11] + voter = await Contract.coroutine(drome.voter) + pool, gauge, weight = await asyncio.gather( + Contract.coroutine(lp_address), + Contract.coroutine(gauge_address), + voter.weights.coroutine(lp_address, block_identifier=block), + ) + inflation_rate, working_supply = await asyncio.gather( + gauge.rewardRate.coroutine(block_identifier=block), + gauge.totalSupply.coroutine(block_identifier=block), + ) + return Gauge(lp_address, pool, gauge, weight, inflation_rate, working_supply) + +async def _staking_apy(lp: dict, staking_rewards: Contract, block: Optional[int]=None) -> float: + query_at_time = time() if block is None else await get_block_timestamp_async(block) + + reward_token, rate, total_supply, end = await asyncio.gather( + staking_rewards.rewardToken.coroutine(block_identifier=block), + staking_rewards.rewardRate.coroutine(block_identifier=block), + staking_rewards.totalSupply.coroutine(block_identifier=block), + staking_rewards.periodFinish.coroutine(block_identifier=block), + ) + + rate *= unkeep + + if end < query_at_time or total_supply == 0 or rate == 0: + return Apy(f"v2:{drome.label}_unpopular", gross_apr=0, net_apy=0, fees=fees) + + pool_price, token_price = await asyncio.gather( + magic.get_price(lp[0], block=block, sync=False), + magic.get_price(reward_token, block=block, sync=False), + ) + + gross_apr = (SECONDS_PER_YEAR * (rate / 1e18) * token_price) / (pool_price * (total_supply / 1e18)) + + net_apr = gross_apr * (1 - performance_fee) - management_fee + net_apy = (1 + (net_apr / COMPOUNDING)) ** COMPOUNDING - 1 + if os.getenv("DEBUG", None): + logger.info(pformat(Debug().collect_variables(locals()))) + return Apy(f"v2:{drome.label}", gross_apr=gross_apr, net_apy=net_apy, fees=fees) + +def with_monitoring(): + telegram.run_job_with_monitoring(drome.job_name, main) diff --git a/scripts/exporters/transactions.py b/scripts/exporters/transactions.py index 7483eb0db..0e3113f59 100644 --- a/scripts/exporters/transactions.py +++ b/scripts/exporters/transactions.py @@ -29,7 +29,7 @@ warnings.simplefilter("ignore", BrownieEnvironmentWarning) -yearn = Yearn(load_strategies=False) +yearn = Yearn() logger = logging.getLogger('yearn.transactions_exporter') @@ -39,7 +39,7 @@ Network.Gnosis: 2_000_000, Network.Arbitrum: 1_500_000, Network.Optimism: 4_000_000, - Network.Base: 100_000, + Network.Base: 500_000, }[chain.id] FIRST_END_BLOCK = { @@ -95,8 +95,8 @@ def process_and_cache_user_txs(last_saved_block=None): from_address=cache_address(row['from']), to_address=cache_address(row['to']), amount = row.amount, - price = price, - value_usd = usd, + price = Decimal(price), + value_usd = Decimal(usd), gas_used = row.gas_used, gas_price = row.gas_price ) diff --git a/scripts/exporters/treasury_transactions.py b/scripts/exporters/treasury_transactions.py index 2f51e51cc..8c5f48fd8 100644 --- a/scripts/exporters/treasury_transactions.py +++ b/scripts/exporters/treasury_transactions.py @@ -53,10 +53,13 @@ def main() -> NoReturn: @a_sync(default='sync') async def load_new_txs(start_block: Block, end_block: Block) -> int: - futs = [] - async for entry in treasury.ledger._get_and_yield(start_block, end_block): - futs.append(asyncio.create_task(insert_treasury_tx(entry))) - return sum(await tqdm_asyncio.gather(*futs, desc="Insert Txs to Postgres")) + futs = [ + asyncio.create_task(insert_treasury_tx(entry)) + async for entry in treasury.ledger._get_and_yield(start_block, end_block) + if not isinstance(entry, _Done) and entry.value + ] + to_sort = sum(await tqdm_asyncio.gather(*futs, desc="Insert Txs to Postgres")) + return to_sort # NOTE: Things get sketchy when we bump these higher diff --git a/scripts/exporters/wallets.py b/scripts/exporters/wallets.py index 729d39d2c..fd681e015 100644 --- a/scripts/exporters/wallets.py +++ b/scripts/exporters/wallets.py @@ -20,7 +20,7 @@ logger = logging.getLogger('yearn.wallet_exporter') -yearn = Yearn(load_strategies=False, watch_events_forever=False) +yearn = Yearn() # start: 2020-02-12 first iearn deployment # start opti: 2022-01-01 an arbitrary start timestamp because the default start is < block 1 on opti and messes things up diff --git a/scripts/historical_tvl.py b/scripts/historical_tvl.py index d65404746..eedde109b 100644 --- a/scripts/historical_tvl.py +++ b/scripts/historical_tvl.py @@ -29,7 +29,7 @@ def generate_snapshot_range(start, interval): def main(): - yearn = Yearn(load_strategies=False) + yearn = Yearn() start = START_DATE[chain.id] interval = timedelta(hours=24) diff --git a/scripts/print_strategies.py b/scripts/print_strategies.py index 1fd8b20d4..98c535682 100644 --- a/scripts/print_strategies.py +++ b/scripts/print_strategies.py @@ -2,6 +2,7 @@ import click import sentry_sdk +from multicall.utils import await_awaitable from brownie.utils.output import build_tree sentry_sdk.set_tag('script','print_strategies') @@ -11,7 +12,6 @@ def main(): from yearn.v2.registry import Registry registry = Registry() print(registry) - registry.load_strategies() tree = [] for vault in registry.vaults: transforms = { @@ -28,7 +28,7 @@ def main(): 'totalLoss': lambda tokens: f'{tokens / vault.scale}', } strategies = [] - for strategy in vault.strategies + vault.revoked_strategies: + for strategy in await_awaitable(vault.strategies) + await_awaitable(vault.revoked_strategies): config = vault.vault.strategies(strategy.strategy).dict() color = 'green' if strategy in vault.strategies else 'red' strategies.append([ diff --git a/scripts/s3.py b/scripts/s3.py index f77b953e3..42a945bc0 100644 --- a/scripts/s3.py +++ b/scripts/s3.py @@ -7,7 +7,10 @@ import shutil import traceback import warnings +from contextlib import suppress from datetime import datetime +from decimal import Decimal +from functools import lru_cache from time import time from typing import Union @@ -25,11 +28,9 @@ from yearn import logs from yearn.apy import (Apy, ApyBlocks, ApyFees, ApyPoints, ApySamples, get_samples) -from yearn.common import Tvl from yearn.exceptions import EmptyS3Export from yearn.graphite import send_metric from yearn.special import Backscratcher, YveCRVJar -from yearn.utils import chunks, contract from yearn.v1.registry import Registry as RegistryV1 from yearn.v1.vaults import VaultV1 from yearn.v2.registry import Registry as RegistryV2 @@ -40,15 +41,25 @@ warnings.simplefilter("ignore", BrownieEnvironmentWarning) METRIC_NAME = "yearn.exporter.apy" +DEBUG = os.getenv("DEBUG", None) logs.basicConfig(level=logging.DEBUG) logger = logging.getLogger("yearn.apy") async def wrap_vault( - vault: Union[VaultV1, VaultV2], samples: ApySamples, aliases: dict, icon_url: str, assets_metadata: dict + vault: Union[VaultV1, VaultV2], + samples: ApySamples, + aliases: dict, + icon_url: str, + assets_metadata: dict, + pos: int, + total: int, ) -> dict: - + if DEBUG: + await _get_debug_lock().acquire() + logger.info(f"wrapping vault [{pos}/{total}]: {vault.name} {str(vault.vault)}") + # We don't need results for these right away but they take a while so lets start them now inception_fut = asyncio.create_task(contract_creation_block_async(str(vault.vault))) apy_fut = asyncio.create_task(get_apy(vault, samples)) @@ -62,7 +73,8 @@ async def wrap_vault( } ] else: - strategies = [{"address": str(strategy.strategy), "name": strategy.name} for strategy in vault.strategies] + strategies = await vault.strategies if isinstance(vault, VaultV2) else vault.strategies + strategies = [{"address": str(strategy.strategy), "name": strategy.name} for strategy in strategies] token_alias = aliases[str(vault.token)]["symbol"] if str(vault.token) in aliases else await ERC20(vault.token, asynchronous=True).symbol vault_alias = token_alias @@ -72,7 +84,7 @@ async def wrap_vault( if str(vault.vault) in assets_metadata: migration = {"available": assets_metadata[str(vault.vault)][1], "address": assets_metadata[str(vault.vault)][2]} - object = { + data = { "inception": await inception_fut, "address": str(vault.vault), "symbol": vault.symbol if hasattr(vault, "symbol") else await ERC20(vault.vault, asynchronous=True).symbol, @@ -90,7 +102,7 @@ async def wrap_vault( "tvl": dataclasses.asdict(await tvl_fut), "apy": dataclasses.asdict(await apy_fut), "strategies": strategies, - "endorsed": vault.is_endorsed if hasattr(vault, "is_endorsed") else True, + "endorsed": await vault.is_endorsed if hasattr(vault, "is_endorsed") else True, "version": vault.api_version if hasattr(vault, "api_version") else "0.1", "decimals": vault.decimals if hasattr(vault, "decimals") else await ERC20(vault.vault, asynchronous=True).decimals, "type": "v2" if isinstance(vault, VaultV2) else "v1", @@ -100,9 +112,12 @@ async def wrap_vault( } if chain.id == 1 and any([isinstance(vault, t) for t in [Backscratcher, YveCRVJar]]): - object["special"] = True + data["special"] = True - return object + logger.info(f"done wrapping vault [{pos}/{total}]: {vault.name} {str(vault.vault)}") + if DEBUG: + _get_debug_lock().release() + return _dedecimal(data) async def get_apy(vault, samples) -> Apy: @@ -216,23 +231,16 @@ async def _main(): if chain.id == Network.Mainnet: special = [YveCRVJar(), Backscratcher()] registry_v1 = RegistryV1() - vaults = list(itertools.chain(special, registry_v1.vaults, registry_v2.vaults, registry_v2.experiments)) + vaults = list(itertools.chain(special, registry_v1.vaults, await registry_v2.vaults, await registry_v2.experiments)) else: - vaults = registry_v2.vaults + vaults = await registry_v2.vaults if len(vaults) == 0: raise ValueError(f"No vaults found for chain_id: {chain.id}") - assets_metadata = await get_assets_metadata(registry_v2.vaults) + assets_metadata = await get_assets_metadata(await registry_v2.vaults) - data = [] - total = len(vaults) - - for i, vault in enumerate(vaults): - pos = i + 1 - logger.info(f"wrapping vault [{pos}/{total}]: {vault.name} {str(vault.vault)}") - data.append(await wrap_vault(vault, samples, aliases, icon_url, assets_metadata)) - logger.info(f"done wrapping vault [{pos}/{total}]: {vault.name} {str(vault.vault)}") + data = await tqdm_asyncio.gather(*[wrap_vault(vault, samples, aliases, icon_url, assets_metadata, i + 1, len(vaults)) for i, vault in enumerate(vaults)]) if len(data) == 0: raise ValueError(f"Data is empty for chain_id: {chain.id}") @@ -263,7 +271,7 @@ def _export(data, file_name, s3_path): with open(file_name, "w+") as f: json.dump(data, f) - if os.getenv("DEBUG", None): + if DEBUG: return for item in _get_s3s(): @@ -279,10 +287,10 @@ def _export(data, file_name, s3_path): def _get_s3s(): s3s = [] - aws_buckets = os.environ.get("AWS_BUCKET").split(";") - aws_endpoint_urls = os.environ.get("AWS_ENDPOINT_URL").split(";") - aws_keys = os.environ.get("AWS_ACCESS_KEY").split(";") - aws_secrets = os.environ.get("AWS_ACCESS_SECRET").split(";") + aws_buckets = os.environ.get("AWS_BUCKET", "").split(";") + aws_endpoint_urls = os.environ.get("AWS_ENDPOINT_URL", "").split(";") + aws_keys = os.environ.get("AWS_ACCESS_KEY", "").split(";") + aws_secrets = os.environ.get("AWS_ACCESS_SECRET", "").split(";") for i in range(len(aws_buckets)): aws_bucket = aws_buckets[i] @@ -326,7 +334,7 @@ def _get_export_paths(suffix): def with_monitoring(): - if os.getenv("DEBUG", None): + if DEBUG: main() return from telegram.ext import Updater @@ -345,15 +353,24 @@ def with_monitoring(): tb = traceback.format_exc() now = datetime.now() message = f"`[{now}]`\nšŸ”„ {export_mode} Vaults API update for {Network.name()} failed!\n" - try: + with suppress(BadRequest): detail_message = (message + f"```\n{tb}\n```")[:4000] updater.bot.send_message(chat_id=private_group, text=detail_message, parse_mode="Markdown", reply_to_message_id=ping) updater.bot.send_message(chat_id=public_group, text=detail_message, parse_mode="Markdown") - except BadRequest: - pass - #detail_message = message + f"{error.__class__.__name__}({error})" - #updater.bot.send_message(chat_id=private_group, text=detail_message, parse_mode="Markdown", reply_to_message_id=ping) - #updater.bot.send_message(chat_id=public_group, text=detail_message, parse_mode="Markdown") raise error message = f"āœ… {export_mode} Vaults API update for {Network.name()} successful!" updater.bot.send_message(chat_id=private_group, text=message, reply_to_message_id=ping) + +def _dedecimal(dct: dict): + """Decimal type cant be json encoded, we make them into floats""" + for k, v in dct.items(): + if isinstance(v, dict): + _dedecimal(v) + elif isinstance(v, Decimal): + dct[k] = float(v) + return dct + +@lru_cache +def _get_debug_lock() -> asyncio.Lock: + # we use this helper function to ensure the lock is always on the right loop + return asyncio.Lock() \ No newline at end of file diff --git a/scripts/tokenlist.py b/scripts/tokenlist.py index 50456750c..e566238b8 100644 --- a/scripts/tokenlist.py +++ b/scripts/tokenlist.py @@ -18,7 +18,7 @@ def main(): - yearn = Yearn(load_strategies=False) + yearn = Yearn() excluded = { "0xBa37B002AbaFDd8E89a1995dA52740bbC013D992", "0xe2F6b9773BF3A015E2aA70741Bde1498bdB9425b", diff --git a/scripts/tvl.py b/scripts/tvl.py index 928f17973..c8c7e8fd9 100644 --- a/scripts/tvl.py +++ b/scripts/tvl.py @@ -14,7 +14,7 @@ def main(): data = [] - yearn = Yearn(load_strategies=False) + yearn = Yearn() for product, registry in yearn.registries.items(): for name, tvl in registry.total_value_at().items(): diff --git a/services/dashboard/docker-compose.infra.yml b/services/dashboard/docker-compose.infra.yml index 4fd50372d..5bde03cea 100644 --- a/services/dashboard/docker-compose.infra.yml +++ b/services/dashboard/docker-compose.infra.yml @@ -4,6 +4,7 @@ volumes: grafana_data: {} victoria_metrics_data: {} postgres_data: {} + ypostgres_data: {} networks: stack: @@ -103,3 +104,17 @@ services: volumes: - postgres_data:/var/lib/postgresql/data restart: always + ypostgres: + image: postgres:14 + command: -c 'max_connections=${PGCONNECTIONS:-5000}' + ports: + - 5420:5432 + environment: + - POSTGRES_USER=postgres + - POSTGRES_PASSWORD=yearn-exporter + - POSTGRES_DB=postgres + networks: + - stack + volumes: + - ypostgres_data:/var/lib/postgresql/data + restart: always diff --git a/services/dashboard/docker-compose.yml b/services/dashboard/docker-compose.yml index d29ce7771..17cf84c4b 100644 --- a/services/dashboard/docker-compose.yml +++ b/services/dashboard/docker-compose.yml @@ -110,6 +110,13 @@ x-envs: &envs # DOCKER CONTAINER ENVS - CONTAINER_NAME + + - YPRICEMAGIC_DB_PROVIDER=postgres + - YPRICEMAGIC_DB_HOST=ypostgres + - YPRICEMAGIC_DB_PORT=5432 + - YPRICEMAGIC_DB_USER=${PGUSER:-postgres} + - YPRICEMAGIC_DB_PASSWORD=${PGPASSWORD:-yearn-exporter} + - YPRICEMAGIC_DB_DATABASE=${YPRICEMAGIC_DB_DATABASE:-postgres} x-volumes: &volumes - brownie:/root/.brownie @@ -126,6 +133,7 @@ services: - yearn-exporter-infra_stack external_links: - yearn-exporter-infra-postgres-1:postgres + - yearn-exporter-infra-ypostgres-1:ypostgres - yearn-exporter-infra-victoria-metrics-1:victoria-metrics logging: driver: "json-file" diff --git a/yearn/apy/aero.py b/yearn/apy/aero.py index 653cc5fcf..a544ac6b2 100644 --- a/yearn/apy/aero.py +++ b/yearn/apy/aero.py @@ -28,7 +28,7 @@ async def get_staking_pool(underlying: str) -> Optional[Contract]: return None if staking_pool == ZERO_ADDRESS else await Contract.coroutine(staking_pool) async def staking(vault: "Vault", staking_rewards: Contract, samples: ApySamples, block: Optional[int]=None) -> float: - if len(vault.strategies) == 0: + if len(await vault.strategies) == 0: return Apy("v2:aero_no_strats", 0, 0, ApyFees(0, 0), ApyPoints(0, 0, 0)) end = await staking_rewards.periodFinish.coroutine(block_identifier=block) @@ -38,16 +38,17 @@ async def staking(vault: "Vault", staking_rewards: Contract, samples: ApySamples performance = await vault.vault.performanceFee.coroutine(block_identifier=block) / 1e4 if hasattr(vault.vault, "performanceFee") else 0 management = await vault.vault.managementFee.coroutine(block_identifier=block) / 1e4 if hasattr(vault.vault, "managementFee") else 0 # since its a fork we still call it keepVELO - keep = await vault.strategies[0].strategy.localKeepVELO.coroutine(block_identifier=block) / 1e4 if hasattr(vault.strategies[0].strategy, "localKeepVELO") else 0 + strats = await vault.strategies + keep = await strats[0].strategy.localKeepVELO.coroutine(block_identifier=block) / 1e4 if hasattr(strats[0].strategy, "localKeepVELO") else 0 rate = rate * (1 - keep) fees = ApyFees(performance=performance, management=management, keep_velo=keep) if end < current_time or total_supply == 0 or rate == 0: return Apy("v2:aero_unpopular", gross_apr=0, net_apy=0, fees=fees) - pool_price = await magic.get_price(vault.token.address, block=block, sync=False) + pool_price = float(await magic.get_price(vault.token.address, block=block, sync=False)) reward_token = await staking_rewards.rewardToken.coroutine(block_identifier=block) if hasattr(staking_rewards, "rewardToken") else None token = reward_token - token_price = await magic.get_price(token, block=block, sync=False) + token_price = float(await magic.get_price(token, block=block, sync=False)) gross_apr = (SECONDS_PER_YEAR * (rate / 1e18) * token_price) / (pool_price * (total_supply / 1e18)) diff --git a/yearn/apy/curve/rewards.py b/yearn/apy/curve/rewards.py index 6e002facd..cbee0a6b9 100644 --- a/yearn/apy/curve/rewards.py +++ b/yearn/apy/curve/rewards.py @@ -39,7 +39,7 @@ async def staking(staking_rewards: Contract, pool_price: int, base_asset_price: if token and rate: # Single reward token - token_price = await magic.get_price(token, block=block, sync=False) + token_price = float(await magic.get_price(token, block=block, sync=False)) return (SECONDS_PER_YEAR * (rate / 1e18) * token_price) / ( (pool_price / 1e18) * (total_supply / 1e18) * base_asset_price ) @@ -60,7 +60,7 @@ async def staking(staking_rewards: Contract, pool_price: int, base_asset_price: except ValueError: token = None rate = data.rewardRate / 1e18 if data else 0 - token_price = await magic.get_price(token, block=block, sync=False) or 0 + token_price = float(await magic.get_price(token, block=block, sync=False) or 0) apr += SECONDS_PER_YEAR * rate * token_price / ((pool_price / 1e18) * (total_supply / 1e18) * token_price) queue += 1 try: @@ -88,7 +88,7 @@ async def multi(address: str, pool_price: int, base_asset_price: int, block: Opt token = None if data.periodFinish >= time(): rate = data.rewardRate / 1e18 if data else 0 - token_price = await magic.get_price(token, block=block, sync=False) or 0 + token_price = float(await magic.get_price(token, block=block, sync=False) or 0) apr += SECONDS_PER_YEAR * rate * token_price / ((pool_price / 1e18) * (total_supply / 1e18) * token_price) queue += 1 try: diff --git a/yearn/apy/curve/simple.py b/yearn/apy/curve/simple.py index b54042943..26d205e54 100644 --- a/yearn/apy/curve/simple.py +++ b/yearn/apy/curve/simple.py @@ -4,14 +4,12 @@ import os from dataclasses import dataclass from pprint import pformat -from functools import lru_cache from time import time from http import HTTPStatus import requests -from brownie import ZERO_ADDRESS, chain, interface -from dank_mids.brownie_patch import patch_contract +from brownie import ZERO_ADDRESS, chain from eth_abi import encode_single from eth_utils import function_signature_to_4byte_selector as fourbyte from semantic_version import Version @@ -19,8 +17,8 @@ from y.prices import magic from y.prices.stable_swap.curve import curve as y_curve from y.time import get_block_timestamp_async -from y.utils.dank_mids import dank_w3 +from yearn import constants from yearn.apy.common import (SECONDS_PER_WEEK, SECONDS_PER_YEAR, Apy, ApyError, ApyFees, ApySamples, SharePricePoint, calculate_roi) @@ -58,8 +56,6 @@ class Gauge: 'yearn_voter_proxy': '0xF147b8125d2ef93FB6965Db97D6746952a133934', 'convex_voter_proxy': '0x989AEb4d175e16225E39E87d0D97A3360524AD80', 'convex_booster': '0xF403C135812408BFbE8713b5A23a04b3D48AAE31', - 'rkp3r_rewards': '0xEdB67Ee1B171c4eC66E6c10EC43EDBbA20FaE8e9', - 'kp3r': '0x1cEB5cB57C4D4E2b2433641b95Dd330A33185A44', } } @@ -164,7 +160,7 @@ async def calculate_simple(vault, gauge: Gauge, samples: ApySamples) -> Apy: raise ValueError(f"Error! Could not find price for {gauge.lp_token} at block {block}") crv_price, pool_price = await asyncio.gather( - magic.get_price(curve.crv, block=block, sync=False), + magic.get_price(constants.CRV, block=block, sync=False), gauge.pool.get_virtual_price.coroutine(block_identifier=block) ) gauge_weight = gauge.gauge_weight @@ -194,7 +190,7 @@ async def calculate_simple(vault, gauge: Gauge, samples: ApySamples) -> Apy: * gauge_weight * (SECONDS_PER_YEAR / gauge.gauge_working_supply) * (PER_MAX_BOOST / pool_price) - * crv_price + * float(crv_price) ) / base_asset_price if y_gauge_balance > 0: @@ -227,7 +223,7 @@ async def calculate_simple(vault, gauge: Gauge, samples: ApySamples) -> Apy: else: reward_data, token_price, total_supply = await asyncio.gather( gauge.gauge.reward_data.coroutine(gauge_reward_token), - _get_reward_token_price(gauge_reward_token), + magic.get_price(gauge_reward_token, block=samples.now, sync=False), gauge.gauge.totalSupply.coroutine(), ) rate = reward_data['rate'] @@ -236,7 +232,7 @@ async def calculate_simple(vault, gauge: Gauge, samples: ApySamples) -> Apy: if period_finish < current_time: reward_apr = 0 else: - reward_apr = (SECONDS_PER_YEAR * (rate / 1e18) * token_price) / ((pool_price / 1e18) * (total_supply / 1e18) * base_asset_price) + reward_apr = (SECONDS_PER_YEAR * (rate / 1e18) * float(token_price)) / ((float(pool_price) / 1e18) * (total_supply / 1e18) * float(base_asset_price)) else: reward_apr = 0 @@ -271,10 +267,11 @@ async def calculate_simple(vault, gauge: Gauge, samples: ApySamples) -> Apy: if vault: if isinstance(vault, VaultV2): vault_contract = vault.vault - if len(vault.strategies) > 0 and hasattr(vault.strategies[0].strategy, "keepCRV"): - crv_keep_crv = await vault.strategies[0].strategy.keepCRV.coroutine(block_identifier=block) / 1e4 - elif len(vault.strategies) > 0 and hasattr(vault.strategies[0].strategy, "keepCrvPercent"): - crv_keep_crv = await vault.strategies[0].strategy.keepCrvPercent.coroutine(block_identifier=block) / 1e4 + strats = await vault.strategies + if len(strats) > 0 and hasattr(strats[0].strategy, "keepCRV"): + crv_keep_crv = await strats[0].strategy.keepCRV.coroutine(block_identifier=block) / 1e4 + elif len(strats) > 0 and hasattr(strats[0].strategy, "keepCrvPercent"): + crv_keep_crv = await strats[0].strategy.keepCrvPercent.coroutine(block_identifier=block) / 1e4 else: crv_keep_crv = 0 performance = await vault_contract.performanceFee.coroutine(block_identifier=block) / 1e4 if hasattr(vault_contract, "performanceFee") else 0 @@ -297,24 +294,25 @@ async def calculate_simple(vault, gauge: Gauge, samples: ApySamples) -> Apy: cvx_vault = None # if the vault consists of only a convex strategy then return # specialized apy calculations for convex - if _ConvexVault.is_convex_vault(vault): - cvx_strategy = vault.strategies[0].strategy + if await _ConvexVault.is_convex_vault(vault): + strats = await vault.strategies + cvx_strategy = strats[0].strategy cvx_vault = _ConvexVault(cvx_strategy, vault, gauge.gauge) return await cvx_vault.apy(base_asset_price, pool_price, base_apr, pool_apy, management, performance) # if the vault has two strategies then the first is curve and the second is convex - if isinstance(vault, VaultV2) and len(vault.strategies) == 2: # this vault has curve and convex + if isinstance(vault, VaultV2) and len(strats := await vault.strategies) == 2: # this vault has curve and convex # The first strategy should be curve, the second should be convex. # However the order on the vault object here does not correspond # to the order on the withdrawal queue on chain, therefore we need to # re-order so convex is always second if necessary - first_strategy = vault.strategies[0].strategy - second_strategy = vault.strategies[1].strategy + first_strategy = strats[0].strategy + second_strategy = strats[1].strategy crv_strategy = first_strategy cvx_strategy = second_strategy - if not _ConvexVault.is_convex_strategy(vault.strategies[1]): + if not _ConvexVault.is_convex_strategy(strats[1]): cvx_strategy = first_strategy crv_strategy = second_strategy @@ -382,7 +380,7 @@ def __init__(self, cvx_strategy, vault, gauge, block=None) -> None: self.gauge = gauge @staticmethod - def is_convex_vault(vault) -> bool: + async def is_convex_vault(vault) -> bool: """Determines whether the passed in vault is a Convex vault i.e. it only has one strategy that's based on farming Convex. """ @@ -391,7 +389,7 @@ def is_convex_vault(vault) -> bool: if not isinstance(vault, VaultV2): return False - return len(vault.strategies) == 1 and _ConvexVault.is_convex_strategy(vault.strategies[0]) + return len(strats := await vault.strategies) == 1 and _ConvexVault.is_convex_strategy(strats[0]) @staticmethod def is_convex_strategy(strategy) -> bool: @@ -461,7 +459,7 @@ async def get_detailed_apy_data(self, base_asset_price, pool_price, base_apr) -> async def _get_cvx_emissions_converted_to_crv(self) -> float: """The amount of CVX emissions at the current block for a given pool, converted to CRV (from a pricing standpoint) to ease calculation of total APY.""" crv_price, cvx = await asyncio.gather( - magic.get_price(curve.crv, block=self.block, sync=False), + magic.get_price(constants.CRV, block=self.block, sync=False), Contract.coroutine(addresses[chain.id]['cvx']), ) total_cliff = 1e3 # the total number of cliffs to happen @@ -514,12 +512,12 @@ async def _get_reward_apr(self, cvx_strategy, cvx_booster, base_asset_price, poo if await virtual_rewards_pool.periodFinish.coroutine() > current_time: reward_token = await virtual_rewards_pool.rewardToken.coroutine() reward_token_price, reward_rate, total_supply = await asyncio.gather( - _get_reward_token_price(reward_token, block), + magic.get_price(reward_token, block=block, sync=False), virtual_rewards_pool.rewardRate.coroutine(), virtual_rewards_pool.totalSupply.coroutine(), ) - reward_apr = (reward_rate * SECONDS_PER_YEAR * reward_token_price) / (base_asset_price * (pool_price / 1e18) * total_supply) + reward_apr = (reward_rate * SECONDS_PER_YEAR * float(reward_token_price)) / (float(base_asset_price) * (float(pool_price) / 1e18) * total_supply) convex_reward_apr += reward_apr return convex_reward_apr @@ -538,24 +536,3 @@ async def _get_convex_fee(self, cvx_booster, block=None) -> float: def _debt_ratio(self) -> float: """The debt ratio of the Convex strategy.""" return self.vault.vault.strategies(self._cvx_strategy)[2] / 1e4 - - -@lru_cache -def _get_rkp3r() -> Contract: - return patch_contract(interface.rKP3R(addresses[chain.id]['rkp3r_rewards']), dank_w3) - -async def _get_reward_token_price(reward_token, block=None): - if chain.id not in addresses: - return await magic.get_price(reward_token, block=block, sync=False) - - # if the reward token is rKP3R we need to calculate it's price in - # terms of KP3R after the discount - contract_addresses = addresses[chain.id] - if reward_token == contract_addresses['rkp3r_rewards']: - price, discount = await asyncio.gather( - magic.get_price(contract_addresses['kp3r'], block=block, sync=False), - _get_rkp3r().discount.coroutine(block_identifier=block), - ) - return price * (100 - discount) / 100 - else: - return await magic.get_price(reward_token, block=block, sync=False) diff --git a/yearn/apy/staking_rewards.py b/yearn/apy/staking_rewards.py index de51ee2c8..8cb20baa4 100644 --- a/yearn/apy/staking_rewards.py +++ b/yearn/apy/staking_rewards.py @@ -14,10 +14,11 @@ async def get_staking_rewards_apr(vault, samples: ApySamples): return 0 vault_address = str(vault.vault) - if vault_address not in vault.registry.staking_pools: + staking_pools = await vault.registry.staking_pools + if vault_address not in staking_pools: return 0 - staking_pool = await Contract.coroutine(vault.registry.staking_pools[vault_address]) + staking_pool = await Contract.coroutine(staking_pools[vault_address]) if await staking_pool.periodFinish.coroutine() < now: return 0 diff --git a/yearn/apy/v2.py b/yearn/apy/v2.py index 8d491c5eb..25cf1f7d7 100644 --- a/yearn/apy/v2.py +++ b/yearn/apy/v2.py @@ -6,13 +6,12 @@ from brownie import chain from semantic_version.base import Version -from y.networks import Network +from y import Network from yearn.apy.common import (Apy, ApyBlocks, ApyError, ApyFees, ApyPoints, ApySamples, SharePricePoint, calculate_roi) from yearn.apy.staking_rewards import get_staking_rewards_apr from yearn.debug import Debug -from yearn.utils import run_in_thread logger = logging.getLogger(__name__) @@ -29,9 +28,8 @@ def closest(haystack, needle): else: return before - async def simple(vault, samples: ApySamples) -> Apy: - harvests = sorted([harvest for strategy in vault.strategies for harvest in await run_in_thread(getattr, strategy, "harvests")]) + harvests = sorted([harvest for strategy in await vault.strategies async for harvest in strategy.harvests(samples.now)]) # we don't want to display APYs when vaults are ramping up if len(harvests) < 2: @@ -51,7 +49,7 @@ async def simple(vault, samples: ApySamples) -> Apy: # get our inception data # the first report is when the vault first allocates funds to farm with - reports = await run_in_thread(getattr, vault, 'reports') + reports = await vault.reports inception_block = reports[0].block_number inception_price = await price_per_share(block_identifier=inception_block) @@ -91,7 +89,7 @@ async def simple(vault, samples: ApySamples) -> Apy: # for performance fee, half comes from strategy (strategist share) and half from the vault (treasury share) strategy_fees = [] - for strategy in vault.strategies: # look at all of our strategies + for strategy in await vault.strategies: # look at all of our strategies strategy_info = await contract.strategies.coroutine(strategy.strategy) debt_ratio = strategy_info['debtRatio'] / 10000 performance_fee = strategy_info['performanceFee'] @@ -126,7 +124,7 @@ async def simple(vault, samples: ApySamples) -> Apy: async def average(vault, samples: ApySamples) -> Apy: - reports = await run_in_thread(getattr, vault, "reports") + reports = await vault.reports # we don't want to display APYs when vaults are ramping up if len(reports) < 2: @@ -189,7 +187,7 @@ async def average(vault, samples: ApySamples) -> Apy: # for performance fee, half comes from strategy (strategist share) and half from the vault (treasury share) strategy_fees = [] - for strategy in vault.strategies: # look at all of our strategies + for strategy in await vault.strategies: # look at all of our strategies strategy_info = await contract.strategies.coroutine(strategy.strategy) debt_ratio = strategy_info['debtRatio'] / 10000 performance_fee = strategy_info['performanceFee'] diff --git a/yearn/apy/velo.py b/yearn/apy/velo.py index 4afd16e59..c3774630a 100644 --- a/yearn/apy/velo.py +++ b/yearn/apy/velo.py @@ -28,7 +28,7 @@ async def get_staking_pool(underlying: str) -> Optional[Contract]: return None if staking_pool == ZERO_ADDRESS else await Contract.coroutine(staking_pool) async def staking(vault: "Vault", staking_rewards: Contract, samples: ApySamples, block: Optional[int]=None) -> float: - if len(vault.strategies) == 0: + if len(await vault.strategies) == 0: return Apy("v2:velo_no_strats", 0, 0, ApyFees(0, 0), ApyPoints(0, 0, 0)) end = await staking_rewards.periodFinish.coroutine(block_identifier=block) @@ -37,17 +37,18 @@ async def staking(vault: "Vault", staking_rewards: Contract, samples: ApySamples rate = await staking_rewards.rewardRate.coroutine(block_identifier=block) if hasattr(staking_rewards, "rewardRate") else 0 performance = await vault.vault.performanceFee.coroutine(block_identifier=block) / 1e4 if hasattr(vault.vault, "performanceFee") else 0 management = await vault.vault.managementFee.coroutine(block_identifier=block) / 1e4 if hasattr(vault.vault, "managementFee") else 0 - keep = await vault.strategies[0].strategy.localKeepVELO.coroutine(block_identifier=block) / 1e4 if hasattr(vault.strategies[0].strategy, "localKeepVELO") else 0 + strats = await vault.strategies + keep = await strats[0].strategy.localKeepVELO.coroutine(block_identifier=block) / 1e4 if hasattr(strats[0].strategy, "localKeepVELO") else 0 rate = rate * (1 - keep) fees = ApyFees(performance=performance, management=management, keep_velo=keep) if end < current_time or total_supply == 0 or rate == 0: return Apy("v2:velo_unpopular", gross_apr=0, net_apy=0, fees=fees) - else: - pool_price = await magic.get_price(vault.token.address, block=block, sync=False) - reward_token = await staking_rewards.rewardToken.coroutine(block_identifier=block) if hasattr(staking_rewards, "rewardToken") else None + + pool_price = float(await magic.get_price(vault.token.address, block=block, sync=False)) + reward_token = await staking_rewards.rewardToken.coroutine(block_identifier=block) token = reward_token - token_price = await magic.get_price(token, block=block, sync=False) + token_price = float(await magic.get_price(token, block=block, sync=False)) gross_apr = (SECONDS_PER_YEAR * (rate / 1e18) * token_price) / (pool_price * (total_supply / 1e18)) diff --git a/yearn/constants.py b/yearn/constants.py index 467c8dd7d..835e10f0f 100644 --- a/yearn/constants.py +++ b/yearn/constants.py @@ -112,4 +112,12 @@ TREASURY_WALLETS = {convert.to_address(address) for address in TREASURY_WALLETS} -RANDOMIZE_EXPORTS = bool(os.environ.get("RANDOMIZE_EXPORTS")) \ No newline at end of file +RANDOMIZE_EXPORTS = bool(os.environ.get("RANDOMIZE_EXPORTS")) + +CRV = { + Network.Mainnet: "0xD533a949740bb3306d119CC777fa900bA034cd52", + Network.Gnosis: "0x712b3d230f3c1c19db860d80619288b1f0bdd0bd", + Network.Fantom: "0x1E4F97b9f9F913c46F1632781732927B9019C68b", + Network.Arbitrum: "0x11cDb42B0EB46D95f990BeDD4695A6e3fA034978", + Network.Optimism: "0x0994206dfE8De6Ec6920FF4D779B0d950605Fb53", +}.get(chain.id, None) diff --git a/yearn/decorators.py b/yearn/decorators.py index 3604394ae..fb7020bb5 100644 --- a/yearn/decorators.py +++ b/yearn/decorators.py @@ -1,5 +1,5 @@ import _thread -import signal +import asyncio import functools import logging import signal @@ -24,14 +24,31 @@ def wrap(self): def wait_or_exit_before(func): @functools.wraps(func) - def wrap(self): - self._done.wait() - if self._has_exception: - logger.error(self._exception) - _thread.interrupt_main(signal.SIGTERM) - return func(self) + async def wrap(self): + task: asyncio.Task = self._task + logger.debug("waiting for %s", self) + while not self._done.is_set() and not task.done(): + await asyncio.sleep(10) + logger.debug("%s not done", self) + logger.debug("loading %s complete", self) + if task.done() and (e := task.exception()): + logger.debug('task %s has exception %s, awaiting', task, e) + raise e + return await func(self) return wrap +_main_thread_loop = asyncio.get_event_loop() + +def set_exc(func): + @functools.wraps(func) + async def wrap(self): + # in case this loads in a diff thread + try: + return await func(self) + except Exception as e: + self._done.set() + raise e + return wrap def wait_or_exit_after(func): @functools.wraps(func) diff --git a/yearn/events.py b/yearn/events.py index d786193d8..ed471d34e 100644 --- a/yearn/events.py +++ b/yearn/events.py @@ -68,7 +68,7 @@ def get_logs_asap( if verbose > 0: logger.info('fetching %d batches', len(ranges)) - batches = Parallel(1, "threading", verbose=verbose)( + batches = Parallel(64, "threading", verbose=verbose)( delayed(web3.eth.get_logs)(_get_logs_params(addresses, topics, start, end)) for start, end in ranges ) diff --git a/yearn/helpers/s3.py b/yearn/helpers/s3.py new file mode 100644 index 000000000..30b84bf81 --- /dev/null +++ b/yearn/helpers/s3.py @@ -0,0 +1,68 @@ +import os +import shutil, json +from typing import List, TypedDict, Any + +import boto3 +from brownie import chain + +print(boto3.__dict__) + +class S3(TypedDict): + s3: boto3.client + aws_bucket: str + +def get_s3s() -> List[S3]: + s3s = [] + aws_buckets = os.environ.get("AWS_BUCKET").split(";") + aws_endpoint_urls = os.environ.get("AWS_ENDPOINT_URL").split(";") + aws_keys = os.environ.get("AWS_ACCESS_KEY").split(";") + aws_secrets = os.environ.get("AWS_ACCESS_SECRET").split(";") + + for i in range(len(aws_buckets)): + aws_bucket = aws_buckets[i] + aws_endpoint_url = aws_endpoint_urls[i] + aws_key = aws_keys[i] + aws_secret = aws_secrets[i] + kwargs = {} + if aws_endpoint_url is not None: + kwargs["endpoint_url"] = aws_endpoint_url + if aws_key is not None: + kwargs["aws_access_key_id"] = aws_key + if aws_secret is not None: + kwargs["aws_secret_access_key"] = aws_secret + + s3s.append(S3(s3=boto3.client("s3", **kwargs), aws_bucket=aws_bucket)) + return s3s + +def get_export_paths(path_presufix: str, path_suffix: str): + out = "generated" + if os.path.isdir(out): + shutil.rmtree(out) + os.makedirs(out, exist_ok=True) + + api_path = os.path.join("v1", "chains", f"{chain.id}", path_presufix) + + file_base_path = os.path.join(out, api_path) + os.makedirs(file_base_path, exist_ok=True) + + file_name = os.path.join(file_base_path, path_suffix) + s3_path = os.path.join(api_path, path_suffix) + return file_name, s3_path + +def upload(path_presufix: str, path_suffix: str, data: Any) -> None: + print(json.dumps(data, sort_keys=True, indent=4)) + + file_name, s3_path = get_export_paths(path_presufix, path_suffix) + with open(file_name, "w+") as f: + json.dump(data, f) + + if os.getenv("DEBUG", None): + return + + for s3 in get_s3s(): + s3["s3"].upload_file( + file_name, + s3["aws_bucket"], + s3_path, + ExtraArgs={'ContentType': "application/json", 'CacheControl': "max-age=1800"}, + ) \ No newline at end of file diff --git a/yearn/helpers/telegram.py b/yearn/helpers/telegram.py new file mode 100644 index 000000000..6ada71f3f --- /dev/null +++ b/yearn/helpers/telegram.py @@ -0,0 +1,39 @@ + +import os +import traceback +from datetime import datetime +from typing import Callable, TypeVar + +from y import Network + +T = TypeVar('T') + +PRIVATE_GROUP = os.environ.get('TG_YFIREBOT_GROUP_INTERNAL') +PUBLIC_GROUP = os.environ.get('TG_YFIREBOT_GROUP_EXTERNAL') + + +def run_job_with_monitoring(job_name: str, job: Callable[[], T]) -> T: + """A helper function used when we want to run a job and monitor it via telegram""" + + if os.getenv("DEBUG", None): + return job() + + from telegram.ext import Updater + UPDATER = Updater(os.environ.get('TG_YFIREBOT')) + + now = datetime.now() + message = f"`[{now}]`\nāš™ļø {job_name} for {Network.name()} is updating..." + ping = UPDATER.bot.send_message(chat_id=PRIVATE_GROUP, text=message, parse_mode="Markdown") + ping = ping.message_id + try: + retval = job() + except Exception as error: + tb = traceback.format_exc() + now = datetime.now() + message = f"`[{now}]`\nšŸ”„ {job_name} update for {Network.name()} failed!\n```\n{tb}\n```"[:4000] + UPDATER.bot.send_message(chat_id=PRIVATE_GROUP, text=message, parse_mode="Markdown", reply_to_message_id=ping) + UPDATER.bot.send_message(chat_id=PUBLIC_GROUP, text=message, parse_mode="Markdown") + raise error + message = f"āœ… {job_name} update for {Network.name()} successful!" + UPDATER.bot.send_message(chat_id=PRIVATE_GROUP, text=message, reply_to_message_id=ping) + return retval \ No newline at end of file diff --git a/yearn/iearn.py b/yearn/iearn.py index 495f090bb..2996b9a9e 100644 --- a/yearn/iearn.py +++ b/yearn/iearn.py @@ -66,7 +66,7 @@ async def describe(self, block=None) -> dict: "pooled balance": res["pool"] / vault.scale, "price per share": res['getPricePerFullShare'] / 1e18, "token price": price, - "tvl": res["pool"] / vault.scale * price, + "tvl": res["pool"] / vault.scale * float(price), "address": vault.vault, "version": "iearn", } diff --git a/yearn/ironbank.py b/yearn/ironbank.py index 65c9deba1..f7cf620f4 100644 --- a/yearn/ironbank.py +++ b/yearn/ironbank.py @@ -110,7 +110,7 @@ async def describe(self, block=None): for attr in ["getCash", "totalBorrows", "totalReserves"]: res[attr] /= 10 ** m.decimals - tvl = (res["getCash"] + res["totalBorrows"] - res["totalReserves"]) * price + tvl = (res["getCash"] + res["totalBorrows"] - res["totalReserves"]) * float(price) supplied = res["getCash"] + res["totalBorrows"] - res["totalReserves"] ratio = res["totalBorrows"] / supplied if supplied != 0 else None @@ -121,7 +121,7 @@ async def describe(self, block=None): "total borrows": res["totalBorrows"], "total reserves": res["totalReserves"], "exchange rate": exchange_rate, - "token price": price * exchange_rate, + "token price": float(price) * exchange_rate, "underlying price": price, "supply apy": res["supplyRatePerBlock"] / 1e18 * blocks_per_year, "borrow apy": res["borrowRatePerBlock"] / 1e18 * blocks_per_year, diff --git a/yearn/outputs/describers/vault.py b/yearn/outputs/describers/vault.py index 962ffc37b..eeea7a061 100644 --- a/yearn/outputs/describers/vault.py +++ b/yearn/outputs/describers/vault.py @@ -1,6 +1,7 @@ import asyncio from concurrent.futures import ProcessPoolExecutor +from decimal import Decimal from yearn.outputs.postgres.utils import fetch_balances from yearn.prices.magic import _get_price @@ -26,10 +27,10 @@ async def describe_wallets(self, vault_address, block=None): 'total wallets': len(set(wallet for wallet, bal in balances.items())), 'wallet balances': { wallet: { - "token balance": float(bal), - "usd balance": float(bal) * price - } for wallet, bal in balances.items() - } + "token balance": bal, + "usd balance": bal * Decimal(price) + } for wallet, bal in balances.items() } + } info['active wallets'] = sum(1 if balances['usd balance'] > ACTIVE_WALLET_USD_THRESHOLD else 0 for balances in info['wallet balances'].values()) return info diff --git a/yearn/outputs/postgres/utils.py b/yearn/outputs/postgres/utils.py index eb083161f..8afc5de0a 100644 --- a/yearn/outputs/postgres/utils.py +++ b/yearn/outputs/postgres/utils.py @@ -1,5 +1,6 @@ import logging -from typing import Optional +from decimal import Decimal +from typing import Dict, Optional from brownie import ZERO_ADDRESS, chain, convert from brownie.convert.datatypes import HexString @@ -115,7 +116,7 @@ def last_recorded_block(Entity: db.Entity) -> int: return select(max(e.block) for e in Entity if e.chainid == chain.id).first() @db_session -def fetch_balances(vault_address: str, block=None): +def fetch_balances(vault_address: str, block=None) -> Dict[str, Decimal]: token_dbid = select(t.token_id for t in Token if t.chain.chainid == chain.id and t.address.address == vault_address).first() if block and block > last_recorded_block(UserTx): # NOTE: we use `postgres.` instead of `self.` so we can make use of parallelism diff --git a/yearn/partners/snapshot.py b/yearn/partners/snapshot.py index a4910d4b8..ff15adf4b 100644 --- a/yearn/partners/snapshot.py +++ b/yearn/partners/snapshot.py @@ -232,12 +232,13 @@ class WildcardWrapper: async def unwrap(self) -> List[Wrapper]: registry = Registry() wrappers = [self.wrapper] if isinstance(self.wrapper, str) else self.wrapper + vaults = await registry.vaults topics = construct_event_topic_set( - filter_by_name('Transfer', registry.vaults[0].vault.abi)[0], + filter_by_name('Transfer', vaults[0].vault.abi)[0], web3.codec, {'receiver': wrappers}, ) - addresses = [str(vault.vault) for vault in registry.vaults] + addresses = [str(vault.vault) for vault in vaults] from_block = min(await asyncio.gather(*[threads.run(contract_creation_block, address) for address in addresses])) # wrapper -> {vaults} @@ -249,7 +250,7 @@ async def unwrap(self) -> List[Wrapper]: return [ Wrapper(name=vault.name, vault=str(vault.vault), wrapper=wrapper) for wrapper in wrappers - for vault in registry.vaults + for vault in vaults if str(vault.vault) in deposits[wrapper] ] diff --git a/yearn/prices/curve.py b/yearn/prices/curve.py index aa089bbd2..be48c06d4 100644 --- a/yearn/prices/curve.py +++ b/yearn/prices/curve.py @@ -23,18 +23,18 @@ from brownie import ZERO_ADDRESS, Contract, chain, convert, interface from brownie.convert import to_address from brownie.convert.datatypes import EthAddress -from cachetools.func import lru_cache, ttl_cache -from y.constants import EEE_ADDRESS -from y.exceptions import NodeNotSynced, PriceError +from cachetools.func import lru_cache +from y.exceptions import NodeNotSynced from y.networks import Network from y.prices import magic +from yearn import constants from yearn.decorators import sentry_catch_all, wait_or_exit_after from yearn.events import decode_logs, get_logs_asap from yearn.exceptions import UnsupportedNetwork from yearn.multicall2 import fetch_multicall, fetch_multicall_async from yearn.typing import Address, AddressOrContract, Block -from yearn.utils import Singleton, contract, get_event_loop +from yearn.utils import Singleton, contract logger = logging.getLogger(__name__) @@ -64,7 +64,6 @@ curve_contracts = { Network.Mainnet: { 'address_provider': ADDRESS_PROVIDER, - 'crv': '0xD533a949740bb3306d119CC777fa900bA034cd52', 'voting_escrow': '0x5f3b5DfEb7B28CDbD7FAba78963EE202a494e2A2', 'gauge_controller': '0x2F50D538606Fa9EDD2B11E2446BEb18C9D5846bB', }, @@ -72,19 +71,15 @@ # Curve has not properly initialized the provider. contract(self.address_provider.get_address(5)) returns 0x0. # CurveRegistry class has extra handling to fetch registry in this case. 'address_provider': ADDRESS_PROVIDER, - 'crv': '0x712b3d230f3c1c19db860d80619288b1f0bdd0bd', }, Network.Fantom: { 'address_provider': ADDRESS_PROVIDER, - 'crv': '0x1E4F97b9f9F913c46F1632781732927B9019C68b', }, Network.Arbitrum: { 'address_provider': ADDRESS_PROVIDER, - 'crv': '0x11cDb42B0EB46D95f990BeDD4695A6e3fA034978', }, Network.Optimism: { 'address_provider': ADDRESS_PROVIDER, - 'crv': '0x0994206dfE8De6Ec6920FF4D779B0d950605Fb53', } } @@ -104,7 +99,7 @@ class Ids(IntEnum): Curve_Tricrypto_Factory = 11 class CurveRegistry(metaclass=Singleton): - + # NOTE: before deprecating, figure out why this loads more pools than ypm @wait_or_exit_after def __init__(self) -> None: if chain.id not in curve_contracts: @@ -115,7 +110,6 @@ def __init__(self) -> None: self.voting_escrow = contract(addrs['voting_escrow']) self.gauge_controller = contract(addrs['gauge_controller']) - self.crv = contract(addrs['crv']) self.identifiers = defaultdict(list) # id -> versions self.registries = defaultdict(set) # registry -> pools self.factories = defaultdict(set) # factory -> pools @@ -125,8 +119,13 @@ def __init__(self) -> None: self._done = threading.Event() self._thread = threading.Thread(target=self.watch_events, daemon=True) - self._has_exception = False self._thread.start() + self._has_exception = False + + def ensure_loaded(self): + if not self._thread._started.is_set(): + logger.debug("starting thread") + self._thread.start() @sentry_catch_all def watch_events(self) -> None: @@ -240,6 +239,7 @@ def get_factory(self, pool: AddressOrContract) -> EthAddress: """ Get metapool factory that has spawned a pool. """ + self.ensure_loaded() try: return next( factory @@ -253,6 +253,7 @@ def get_registry(self, pool: AddressOrContract) -> EthAddress: """ Get registry containing a pool. """ + self.ensure_loaded() try: return next( registry @@ -270,6 +271,7 @@ def get_pool(self, token: AddressOrContract) -> EthAddress: """ Get Curve pool (swap) address by LP token address. Supports factory pools. """ + self.ensure_loaded() token = to_address(token) if token in self.token_to_pool: return self.token_to_pool[token] @@ -279,6 +281,7 @@ def get_gauge(self, pool: AddressOrContract, lp_token: AddressOrContract) -> Eth """ Get liquidity gauge address by pool or lp_token. """ + self.ensure_loaded() pool = to_address(pool) lp_token = to_address(lp_token) if chain.id == Network.Mainnet: @@ -300,7 +303,6 @@ def get_gauge(self, pool: AddressOrContract, lp_token: AddressOrContract) -> Eth if gauge != ZERO_ADDRESS: return gauge - @lru_cache(maxsize=None) def get_coins(self, pool: AddressOrContract) -> List[EthAddress]: """ @@ -320,182 +322,7 @@ def get_coins(self, pool: AddressOrContract) -> List[EthAddress]: return [coin for coin in coins if coin not in {None, ZERO_ADDRESS}] - @lru_cache(maxsize=None) - def get_underlying_coins(self, pool: AddressOrContract) -> List[EthAddress]: - pool = to_address(pool) - factory = self.get_factory(pool) - registry = self.get_registry(pool) - - if factory: - factory = contract(factory) - # new factory reverts for non-meta pools - if not hasattr(factory, 'is_meta') or factory.is_meta(pool): - if hasattr(factory, 'get_underlying_coins'): - coins = factory.get_underlying_coins(pool) - elif hasattr(factory, 'get_coins'): - coins = factory.get_coins(pool) - else: - coins = {ZERO_ADDRESS} - else: - coins = factory.get_coins(pool) - elif registry: - registry = contract(registry) - if hasattr(registry, 'get_underlying_coins'): - coins = registry.get_underlying_coins(pool) - elif hasattr(registry, 'get_coins'): - coins = registry.get_coins(pool) - - # pool not in registry, not checking for underlying_coins here - if set(coins) == {ZERO_ADDRESS}: - return self.get_coins(pool) - - return [coin for coin in coins if coin != ZERO_ADDRESS] - - @lru_cache(maxsize=None) - def get_decimals(self, pool: AddressOrContract) -> List[int]: - pool = to_address(pool) - factory = self.get_factory(pool) - registry = self.get_registry(pool) - source = contract(factory or registry) - decimals = source.get_decimals(pool) - - # pool not in registry - if not any(decimals): - coins = self.get_coins(pool) - decimals = fetch_multicall( - *[[contract(token), 'decimals'] for token in coins] - ) - - return [dec for dec in decimals if dec != 0] - - def get_balances(self, pool: AddressOrContract, block: Optional[Block] = None, should_raise_err: bool = True) -> Optional[Dict[EthAddress,float]]: - """ - Get {token: balance} of liquidity in the pool. - """ - pool = to_address(pool) - factory = self.get_factory(pool) - registry = self.get_registry(pool) - coins = self.get_coins(pool) - decimals = self.get_decimals(pool) - - try: - source = contract(factory or registry) - balances = source.get_balances(pool, block_identifier=block) - # fallback for historical queries - except ValueError as e: - if str(e) not in [ - 'execution reverted', - 'No data was returned - the call likely reverted' - ]: raise - - balances = fetch_multicall( - *[[contract(pool), 'balances', i] for i, _ in enumerate(coins)], - block=block - ) - - if not any(balances): - if should_raise_err: - raise ValueError(f'could not fetch balances {pool} at {block}') - return None - - return { - coin: balance / 10 ** dec - for coin, balance, dec in zip(coins, balances, decimals) - } - def get_virtual_price(self, pool: Address, block: Optional[Block] = None) -> Optional[float]: - pool = contract(pool) - try: - return pool.get_virtual_price(block_identifier=block) / 1e18 - except ValueError as e: - if str(e) == "execution reverted": - return None - raise - - def get_tvl(self, pool: AddressOrContract, block: Optional[Block] = None) -> float: - """ - Get total value in Curve pool. - """ - pool = to_address(pool) - balances = self.get_balances(pool, block=block) - - return sum( - amount * magic.get_price(coin, block=block) - for coin, amount in balances.items() - ) - - @ttl_cache(maxsize=None, ttl=600) - def get_price(self, token: AddressOrContract, block: Optional[Block] = None) -> Optional[float]: - token = to_address(token) - pool = self.get_pool(token) - # crypto pools can have different tokens, use slow method - try: - tvl = self.get_tvl(pool, block=block) - except ValueError: - tvl = 0 - supply = contract(token).totalSupply(block_identifier=block) / 1e18 - if supply == 0: - if tvl > 0: - raise ValueError('curve pool has balance but no supply') - return 0 - return tvl / supply - - def get_coin_price(self, token: AddressOrContract, block: Optional[Block] = None) -> Optional[float]: - - # Select the most appropriate pool - pools = self.coin_to_pools[token] - if not pools: - return - elif len(pools) == 1: - pool = pools[0] - else: - # We need to find the pool with the deepest liquidity - balances = [self.get_balances(pool, block, should_raise_err=False) for pool in pools] - deepest_pool, deepest_bal = None, 0 - for pool, pool_bals in zip(pools, balances): - if pool_bals is None: - continue - if isinstance(pool_bals, Exception): - if str(pool_bals).startswith("could not fetch balances"): - continue - raise pool_bals - for _token, bal in pool_bals.items(): - if _token == token and bal > deepest_bal: - deepest_pool = pool - deepest_bal = bal - pool = deepest_pool - - # Get the index for `token` - coins = self.get_coins(pool) - token_in_ix = [i for i, coin in enumerate(coins) if coin == token][0] - amount_in = 10 ** contract(str(token)).decimals() - if len(coins) == 2: - # this works for most typical metapools - token_out_ix = 0 if token_in_ix == 1 else 1 if token_in_ix == 0 else None - elif len(coins) == 3: - # We will just default to using token 0 until we have a reason to make this more flexible - token_out_ix = 0 if token_in_ix in [1, 2] else 1 if token_in_ix == 0 else None - else: - # TODO: handle this sitch if necessary - return None - - # Get the price for `token` using the selected pool. - try: - dy = contract(pool).get_dy(token_in_ix, token_out_ix, amount_in, block_identifier = block) - except: - return None - - if coins[token_out_ix] == EEE_ADDRESS: - token_out = EEE_ADDRESS - amount_out = dy / 10 ** 18 - else: - token_out = contract(coins[token_out_ix]) - amount_out = dy / 10 ** token_out.decimals() - try: - return amount_out * magic.get_price(token_out, block = block) - except PriceError: - return None - async def calculate_boost(self, gauge: Contract, addr: Address, block: Optional[Block] = None) -> Dict[str,float]: results = await fetch_multicall_async( [gauge, "balanceOf", addr], @@ -564,7 +391,7 @@ async def calculate_apy(self, gauge: Contract, lp_token: AddressOrContract, bloc block=block, ) crv_price, token_price, results = await asyncio.gather( - magic.get_price(self.crv, block=block, sync=False), + magic.get_price(constants.CRV, block=block, sync=False), magic.get_price(lp_token, block=block, sync=False), results ) @@ -573,7 +400,7 @@ async def calculate_apy(self, gauge: Contract, lp_token: AddressOrContract, bloc try: rate = ( inflation_rate * relative_weight * 86400 * 365 / working_supply * 0.4 - ) / token_price + ) / float(token_price) except ZeroDivisionError: rate = 0 @@ -583,7 +410,7 @@ async def calculate_apy(self, gauge: Contract, lp_token: AddressOrContract, bloc "inflation rate": inflation_rate, "virtual price": virtual_price, "crv reward rate": rate, - "crv apy": rate * crv_price, + "crv apy": rate * float(crv_price), "token price": token_price, } diff --git a/yearn/prices/magic.py b/yearn/prices/magic.py index 261a5ec29..7d5c898d2 100644 --- a/yearn/prices/magic.py +++ b/yearn/prices/magic.py @@ -8,7 +8,8 @@ from y.exceptions import PriceError from y.networks import Network -from yearn.prices import constants, curve +from yearn.constants import CRV +from yearn.prices import constants from yearn.prices.aave import aave from yearn.prices.balancer import balancer as bal from yearn.prices.band import band @@ -30,6 +31,7 @@ async def _get_price(token: AnyAddressType, block: Optional[Block]) -> float: if chain.id == Network.Mainnet: # fixes circular import from yearn.special import Backscratcher + # no liquid market for yveCRV-DAO -> return CRV token price if token == Backscratcher().vault.address and block < 11786563: return await _get_price("0xD533a949740bb3306d119CC777fa900bA034cd52", block) @@ -121,8 +123,8 @@ def find_price( elif chain.id == Network.Mainnet: # no liquid market for yveCRV-DAO -> return CRV token price if token == Backscratcher().vault.address and block < 11786563: - if curve.curve and curve.curve.crv: - return get_price(curve.curve.crv, block=block) + if CRV: + return get_price(CRV, block=block) # no liquidity for curve pool (yvecrv-f) -> return 0 elif token == "0x7E46fd8a30869aa9ed55af031067Df666EfE87da" and block < 14987514: return 0 @@ -131,7 +133,6 @@ def find_price( return 0 markets = [ - curve.curve, compound, fixed_forex, generic_amm, @@ -157,10 +158,6 @@ def find_price( price, underlying = price logger.debug("peel %s %s", price, underlying) return price * get_price(underlying, block=block) - - if price is None and token in curve.curve.coin_to_pools: - logger.debug(f'Curve.get_coin_price -> {price}') - price = curve.curve.get_coin_price(token, block = block) if price is None and return_price_during_vault_downtime: for incident in INCIDENTS[token]: diff --git a/yearn/special.py b/yearn/special.py index 609fa9475..8c79a560f 100644 --- a/yearn/special.py +++ b/yearn/special.py @@ -6,13 +6,15 @@ import eth_retry import requests from brownie import chain -from y import Contract, magic +from y import ERC20, Contract, magic from y.contracts import contract_creation_block_async from y.exceptions import PriceError, yPriceMagicError +from yearn import constants from yearn.apy.common import (Apy, ApyBlocks, ApyError, ApyFees, ApyPoints, ApySamples) from yearn.common import Tvl +from yearn.prices.curve import curve from yearn.utils import Singleton if TYPE_CHECKING: @@ -69,10 +71,9 @@ def __init__(self): self.proxy = Contract("0xF147b8125d2ef93FB6965Db97D6746952a133934") async def _locked(self, block=None) -> Tuple[float,float]: - from yearn.prices.curve import curve crv_locked, crv_price = await asyncio.gather( curve.voting_escrow.balanceOf["address"].coroutine(self.proxy, block_identifier=block), - magic.get_price(curve.crv, block=block, sync=False), + magic.get_price(constants.CRV, block=block, sync=False), ) crv_locked /= 1e18 return crv_locked, crv_price @@ -82,7 +83,7 @@ async def describe(self, block=None): return { 'totalSupply': crv_locked, 'token price': crv_price, - 'tvl': crv_locked * crv_price, + 'tvl': crv_locked * float(crv_price), } async def total_value_at(self, block=None): @@ -135,12 +136,9 @@ async def tvl(self, block=None) -> Tvl: if not isinstance(e.exception, PriceError): raise e price = None - tvl = total_assets * price / 10 ** await self.vault.decimals.coroutine(block_identifier=block) if price else None + tvl = total_assets * price / await ERC20(self.vault, asynchronous=True).scale if price else None return Tvl(total_assets, price, tvl) - - - class Ygov(metaclass = Singleton): def __init__(self): self.name = "yGov" diff --git a/yearn/treasury/_setup.py b/yearn/treasury/_setup.py index b25195ad7..1d2eb6ed7 100644 --- a/yearn/treasury/_setup.py +++ b/yearn/treasury/_setup.py @@ -25,6 +25,7 @@ "0x528Ff33Bf5bf96B5392c10bc4748d9E9Fb5386B2", # PRM "0x53fFFB19BAcD44b82e204d036D579E86097E5D09", # BGBG "0x57b9d10157f66D8C00a815B5E289a152DeDBE7ed", # ēŽÆēƒč‚” + "0x1d41cf24dF81E3134319BC11c308c5589A486166", # Strangers NFT from @marcoworms <3 }, Network.Arbitrum: { "0x89b0f9dB18FD079063cFA91F174B300C1ce0003C", # AIELON diff --git a/yearn/treasury/accountant/__init__.py b/yearn/treasury/accountant/__init__.py index 9cb16c3d2..4544ec065 100644 --- a/yearn/treasury/accountant/__init__.py +++ b/yearn/treasury/accountant/__init__.py @@ -1,4 +1,6 @@ +import logging + from brownie import Contract from brownie.exceptions import ContractNotFound from tqdm import tqdm @@ -9,6 +11,8 @@ from yearn.treasury.accountant.prepare_db import prepare_db from yearn.utils import contract +logger = logging.getLogger(__name__) + __all__ = [ "all_txs", "unsorted_txs", diff --git a/yearn/treasury/accountant/expenses/people.py b/yearn/treasury/accountant/expenses/people.py index 6ba160210..e89f1de04 100644 --- a/yearn/treasury/accountant/expenses/people.py +++ b/yearn/treasury/accountant/expenses/people.py @@ -267,4 +267,7 @@ def is_rantom(tx: TreasuryTx) -> bool: return tx.to_address.address == "0x254b42CaCf7290e72e2C84c0337E36E645784Ce1" def is_tx_creator(tx: TreasuryTx) -> bool: - return tx.to_address.address == "0x4007c53A48DefaB0b9D2F05F34df7bd3088B3299" \ No newline at end of file + return tx.to_address.address == "0x4007c53A48DefaB0b9D2F05F34df7bd3088B3299" + +def is_dinobots(tx: TreasuryTx) -> bool: + return tx.token.symbol == "DAI" and tx._from_nickname == "Yearn yChad Multisig" and tx._to_nickname == "yMechs Multisig" and int(tx.amount) == 47_500 \ No newline at end of file diff --git a/yearn/treasury/accountant/ignore/__init__.py b/yearn/treasury/accountant/ignore/__init__.py index 1cf5f38cf..e78c83a5f 100644 --- a/yearn/treasury/accountant/ignore/__init__.py +++ b/yearn/treasury/accountant/ignore/__init__.py @@ -2,7 +2,7 @@ from decimal import Decimal from brownie import chain -from y.networks import Network +from y import Network from yearn.entities import TreasuryTx from yearn.treasury.accountant.classes import HashMatcher, TopLevelTxGroup diff --git a/yearn/treasury/accountant/ignore/vaults.py b/yearn/treasury/accountant/ignore/vaults.py index c682dd204..7fd9fedf8 100644 --- a/yearn/treasury/accountant/ignore/vaults.py +++ b/yearn/treasury/accountant/ignore/vaults.py @@ -1,13 +1,13 @@ - from brownie import ZERO_ADDRESS, chain from y.networks import Network from yearn.entities import TreasuryTx +from yearn.treasury.accountant.revenue.fees import v2_vaults from yearn.treasury.accountant.classes import Filter, HashMatcher, IterFilter -from yearn.treasury.accountant.constants import treasury, v1, v2 +from yearn.treasury.accountant.constants import treasury, v1 from yearn.utils import contract -vaults = (v1.vaults + v2.vaults) if v1 else v2.vaults +vaults = (v1.vaults + v2_vaults) if v1 else v2_vaults def is_vault_deposit(tx: TreasuryTx) -> bool: """ This code doesn't validate amounts but so far that's not been a problem. """ diff --git a/yearn/treasury/accountant/other_expenses/general.py b/yearn/treasury/accountant/other_expenses/general.py index 6fa3fe953..1eb7f30e3 100644 --- a/yearn/treasury/accountant/other_expenses/general.py +++ b/yearn/treasury/accountant/other_expenses/general.py @@ -113,4 +113,4 @@ def is_yfi_dot_eth(tx: TreasuryTx) -> bool: def is_yyper_contest(tx: TreasuryTx) -> bool: """Grant for a vyper compiler audit context, vyper-context.eth""" - return tx in HashMatcher([["0xb8bb3728fdfb49d7c86c08dba8e3586e3761f13d2c88fa6fab80227b6a3f4519", Filter('log_index', 202)]]) \ No newline at end of file + return tx in HashMatcher([["0xb8bb3728fdfb49d7c86c08dba8e3586e3761f13d2c88fa6fab80227b6a3f4519", Filter('log_index', 202)]]) diff --git a/yearn/treasury/accountant/revenue/fees.py b/yearn/treasury/accountant/revenue/fees.py index 574274f0c..e197f676b 100644 --- a/yearn/treasury/accountant/revenue/fees.py +++ b/yearn/treasury/accountant/revenue/fees.py @@ -1,4 +1,6 @@ +import asyncio +import logging from brownie import chain from y.networks import Network @@ -41,13 +43,19 @@ def is_fees_v1(tx: TreasuryTx) -> bool: return False +_vaults = asyncio.get_event_loop().run_until_complete(v2.vaults) +_experiments = asyncio.get_event_loop().run_until_complete(v2.experiments) +v2_vaults = _vaults + _experiments +logger = logging.getLogger(__name__) +logger.info('%s v2 vaults loaded', len(v2_vaults)) + def is_fees_v2(tx: TreasuryTx) -> bool: if any( tx.from_address.address == vault.vault.address and tx.token.address.address == vault.vault.address and tx.to_address.address in treasury.addresses and tx.to_address.address == vault.vault.rewards(block_identifier=tx.block) - for vault in v2.vaults + v2.experiments + for vault in v2_vaults ): return True elif is_inverse_fees_from_stash_contract(tx): diff --git a/yearn/v1/registry.py b/yearn/v1/registry.py index 41963e170..0a752eb28 100644 --- a/yearn/v1/registry.py +++ b/yearn/v1/registry.py @@ -3,9 +3,10 @@ from functools import cached_property from typing import Dict, List, Optional -from brownie import chain, interface, web3 +from brownie import chain, interface from dank_mids.brownie_patch import patch_contract from y.contracts import contract_creation_block_async +from y.decorators import stuck_coro_debugger from y.networks import Network from y.utils.dank_mids import dank_w3 @@ -37,6 +38,7 @@ def vaults(self) -> List[VaultV1]: def __repr__(self) -> str: return f"" + @stuck_coro_debugger async def describe(self, block: Optional[Block] = None) -> Dict[str, Dict]: vaults = await self.active_vaults_at(block) share_prices = await fetch_multicall_async(*[[vault.vault, "getPricePerFullShare"] for vault in vaults], block=block) @@ -44,6 +46,7 @@ async def describe(self, block: Optional[Block] = None) -> Dict[str, Dict]: data = await asyncio.gather(*[vault.describe(block=block) for vault in vaults]) return {vault.name: desc for vault, desc in zip(vaults, data)} + @stuck_coro_debugger async def total_value_at(self, block: Optional[Block] = None) -> Dict[str, float]: vaults = await self.active_vaults_at(block) balances = await fetch_multicall_async(*[[vault.vault, "balance"] for vault in vaults], block=block) @@ -52,6 +55,7 @@ async def total_value_at(self, block: Optional[Block] = None) -> Dict[str, float prices = await asyncio.gather(*[vault.get_price(block) for (vault, balance) in vaults]) return {vault.name: balance * price / 10 ** vault.decimals for (vault, balance), price in zip(vaults, prices)} + @stuck_coro_debugger async def active_vaults_at(self, block: Optional[Block] = None) -> List[VaultV1]: if block: blocks = await asyncio.gather(*[contract_creation_block_async(str(vault.vault)) for vault in self.vaults]) diff --git a/yearn/v1/vaults.py b/yearn/v1/vaults.py index dcdd922e4..9bcbb8620 100644 --- a/yearn/v1/vaults.py +++ b/yearn/v1/vaults.py @@ -1,19 +1,21 @@ import asyncio import logging from dataclasses import dataclass -from functools import cached_property from typing import TYPE_CHECKING, Optional +from async_property import async_cached_property from brownie import ZERO_ADDRESS, interface from brownie.network.contract import InterfaceContainer from dank_mids.brownie_patch import patch_contract +from y import Contract, magic +from y.decorators import stuck_coro_debugger from y.exceptions import PriceError, yPriceMagicError -from y.prices import magic from y.utils.dank_mids import dank_w3 from yearn import constants from yearn.common import Tvl from yearn.multicall2 import fetch_multicall_async +from yearn.prices.curve import curve from yearn.utils import contract from yearn.v1 import constants @@ -52,6 +54,7 @@ async def get_price(self, block=None): return await magic.get_price(underlying, block=block, sync=False) return await magic.get_price(self.token, block=block, sync=False) + @stuck_coro_debugger async def get_strategy(self, block=None): if self.name in ["aLINK", "LINK"] or block is None: return self.strategy @@ -61,16 +64,18 @@ async def get_strategy(self, block=None): if strategy != ZERO_ADDRESS: return contract(strategy) + @stuck_coro_debugger async def get_controller(self, block=None): if block is None: return self.controller - return contract(self.vault.controller(block_identifier=block)) + return await Contract.coroutine(await self.vault.controller.coroutine(block_identifier=block)) - @cached_property - def is_curve_vault(self): - from yearn.prices.curve import curve - return curve.get_pool(str(self.token)) is not None + @async_cached_property + @stuck_coro_debugger + async def is_curve_vault(self): + return await magic.curve.get_pool(str(self.token)) is not None + @stuck_coro_debugger async def describe(self, block=None): info = {} strategy = self.strategy @@ -94,7 +99,7 @@ async def describe(self, block=None): attrs["max"] = [self.vault, "max"] # new curve voter proxy vaults - if self.is_curve_vault and hasattr(strategy, "proxy"): + if await self.is_curve_vault and hasattr(strategy, "proxy"): vote_proxy, gauge = await fetch_multicall_async( [strategy, "voter"], # voter is static, can pin [strategy, "gauge"], # gauge is static per strategy, can cache @@ -103,7 +108,6 @@ async def describe(self, block=None): # guard historical queries where there are no vote_proxy and gauge # for block <= 10635293 (2020-08-11) if vote_proxy and gauge: - from yearn.prices.curve import curve vote_proxy = patch_contract(interface.CurveYCRVVoter(vote_proxy), dank_w3) gauge = contract(gauge) boost, _apy = await asyncio.gather( @@ -140,18 +144,20 @@ async def describe(self, block=None): if "token price" not in info: info["token price"] = await self.get_price(block=block) if info["vault total"] > 0 else 0 - info["tvl"] = info["vault balance"] * info["token price"] + info["tvl"] = info["vault balance"] * float(info["token price"]) return info + @stuck_coro_debugger async def apy(self, samples: "ApySamples"): from yearn import apy from yearn.prices.curve import curve - if curve.get_pool(self.token.address): + if await magic.curve.get_pool(self.token.address): return await apy.curve.simple(self, samples) else: return await apy.v1.simple(self, samples) + @stuck_coro_debugger async def tvl(self, block=None): total_assets = await self.vault.balance.coroutine(block_identifier=block) try: diff --git a/yearn/v2/registry.py b/yearn/v2/registry.py index 4d8741a1a..1aeb7f7ce 100644 --- a/yearn/v2/registry.py +++ b/yearn/v2/registry.py @@ -1,25 +1,29 @@ import asyncio +import itertools import logging -import threading import time from collections import OrderedDict -from typing import Dict, List +from functools import cached_property +from logging import getLogger +from typing import AsyncIterator, Awaitable, Dict, List, NoReturn, overload +import a_sync import inflection -from brownie import Contract, chain, web3 +from async_property import async_cached_property, async_property +from brownie import chain, web3 +from brownie.network.event import _EventItem from dank_mids.brownie_patch import patch_contract -from joblib import Parallel, delayed from web3._utils.abi import filter_by_name from web3._utils.events import construct_event_topic_set -from y.contracts import contract_creation_block_async +from y import Contract +from y.decorators import stuck_coro_debugger from y.exceptions import NodeNotSynced from y.networks import Network from y.prices import magic from y.utils.dank_mids import dank_w3 +from y.utils.events import Events, ProcessedEvents -from yearn.decorators import (sentry_catch_all, wait_or_exit_after, - wait_or_exit_before) -from yearn.events import decode_logs, get_logs_asap +from yearn.decorators import set_exc, wait_or_exit_before from yearn.exceptions import UnsupportedNetwork from yearn.multicall2 import fetch_multicall_async from yearn.utils import Singleton, contract @@ -44,111 +48,114 @@ ] } +VaultName = str + class Registry(metaclass=Singleton): - def __init__(self, watch_events_forever=True, include_experimental=True): + def __init__(self, include_experimental=True): self.releases = {} # api_version => template - self._vaults = {} # address -> Vault - self._experiments = {} # address => Vault - self._staking_pools = {} # vault address -> staking_pool address self.governance = None self.tags = {} - self._watch_events_forever = watch_events_forever self.include_experimental = include_experimental - self.registries = self.load_registry() - # load registry state in the background - self._done = threading.Event() - self._has_exception = False - self._thread = threading.Thread(target=self.watch_events, daemon=True) - self._thread.start() - - def load_registry(self): + self._done = a_sync.Event(name=f"{self.__module__}.{self.__class__.__name__}._done") + self._registries = [] + self._vaults = {} # address -> Vault + self._experiments = {} # address => Vault + self._staking_pools = {} # vault address -> staking_pool address + + @async_cached_property + @stuck_coro_debugger + async def registries(self) -> List[Contract]: if chain.id == Network.Mainnet: - registries = self.load_from_ens() + registries = await self.load_from_ens() elif chain.id == Network.Gnosis: - registries = [contract('0xe2F12ebBa58CAf63fcFc0e8ab5A61b145bBA3462')] + registries = [await Contract.coroutine('0xe2F12ebBa58CAf63fcFc0e8ab5A61b145bBA3462')] elif chain.id == Network.Fantom: - registries = [contract('0x727fe1759430df13655ddb0731dE0D0FDE929b04')] + registries = [await Contract.coroutine('0x727fe1759430df13655ddb0731dE0D0FDE929b04')] elif chain.id == Network.Arbitrum: - registries = [contract('0x3199437193625DCcD6F9C9e98BDf93582200Eb1f')] + registries = [await Contract.coroutine('0x3199437193625DCcD6F9C9e98BDf93582200Eb1f')] elif chain.id == Network.Optimism: - registries = [ - contract('0x79286Dd38C9017E5423073bAc11F53357Fc5C128'), - contract('0x81291ceb9bB265185A9D07b91B5b50Df94f005BF'), - contract('0x8ED9F6343f057870F1DeF47AaE7CD88dfAA049A8'), # StakingRewardsRegistry - ] + registries = await asyncio.gather(*[ + Contract.coroutine('0x79286Dd38C9017E5423073bAc11F53357Fc5C128'), + Contract.coroutine('0x81291ceb9bB265185A9D07b91B5b50Df94f005BF'), + Contract.coroutine('0x8ED9F6343f057870F1DeF47AaE7CD88dfAA049A8'), # StakingRewardsRegistry + ]) elif chain.id == Network.Base: - registries = [contract('0xF3885eDe00171997BFadAa98E01E167B53a78Ec5')] + registries = [await Contract.coroutine('0xF3885eDe00171997BFadAa98E01E167B53a78Ec5')] else: raise UnsupportedNetwork('yearn v2 is not available on this network') for r in registries[:]: if hasattr(r, 'releaseRegistry') and "ReleaseRegistryUpdated" in r.topics: - logs = get_logs_asap(str(r), [r.topics["ReleaseRegistryUpdated"]]) # Add all past and present Release Registries - for rr in {list(event.values())[0] for event in decode_logs(logs)}: - registries.append(contract(rr)) + events = Events(addresses=r, topics=[r.topics['ReleaseRegistryUpdated']]) + for rr in set(await asyncio.gather(*[ + asyncio.create_task(Contract.coroutine(list(event.values())[0])) + async for event in events.events(to_block=await dank_w3.eth.block_number) + ])): + registries.append(rr) logger.debug("release registry %s found for registry %s", rr, r) + logger.info('registry loaded') + events._task.cancel() return registries - def load_from_ens(self): + @stuck_coro_debugger + async def load_from_ens(self): # track older registries to pull experiments - resolver = contract('0x4976fb03C32e5B8cfe2b6cCB31c09Ba78EBaBa41') + resolver = await Contract.coroutine('0x4976fb03C32e5B8cfe2b6cCB31c09Ba78EBaBa41') topics = construct_event_topic_set( filter_by_name('AddressChanged', resolver.abi)[0], web3.codec, {'node': web3.ens.namehash('v2.registry.ychad.eth')}, ) - events = decode_logs(get_logs_asap(str(resolver), topics)) - registries = [contract(event['newAddress'].hex()) for event in events] + events = Events(addresses=resolver, topics=topics) + registries = [ + asyncio.create_task( + coro=Contract.coroutine(event['newAddress'].hex()), + name=f"load registry {event['newAddress']}", + ) + async for event in events.events(to_block = await dank_w3.eth.block_number) + ] + if registries: + registries = await asyncio.gather(*registries) logger.info('loaded %d registry versions', len(registries)) + events._task.cancel() return registries - @property + @async_property + @stuck_coro_debugger @wait_or_exit_before - def vaults(self) -> List[Vault]: + async def vaults(self) -> List[Vault]: return list(self._vaults.values()) - @property + @async_property + @stuck_coro_debugger @wait_or_exit_before - def experiments(self) -> List[Vault]: + async def experiments(self) -> List[Vault]: return list(self._experiments.values()) - @property + @async_property + @stuck_coro_debugger @wait_or_exit_before - def staking_pools(self) -> Dict: + async def staking_pools(self) -> Dict: return self._staking_pools - @wait_or_exit_before def __repr__(self) -> str: - return f"" - - @wait_or_exit_after - def load_vaults(self): - if not self._thread._started.is_set(): - self._thread.start() - - @sentry_catch_all - def watch_events(self): + return f"" + + @set_exc + async def watch_events(self) -> NoReturn: start = time.time() - sleep_time = 300 - from_block = None - height = chain.height - while True: - logs = get_logs_asap([str(addr) for addr in self.registries], None, from_block=from_block, to_block=height) - self.process_events(decode_logs(logs)) + events = await self._events + def done_callback(task: asyncio.Task) -> None: + logger.info("loaded v2 registry in %.3fs", time.time() - start) + self._done.set() + done_task = asyncio.create_task(events._lock.wait_for(await dank_w3.eth.block_number)) + done_task.add_done_callback(done_callback) + async for _ in events: self._filter_vaults() if not self._done.is_set(): self._done.set() logger.info("loaded v2 registry in %.3fs", time.time() - start) - if not self._watch_events_forever: - return - time.sleep(sleep_time) - - # set vars for next loop - from_block = height + 1 - height = chain.height - if height < from_block: - raise NodeNotSynced(f"No new blocks in the past {sleep_time/60} minutes.") def process_events(self, events): temp_rekt_vaults = [] @@ -207,23 +214,16 @@ def vault_from_event(self, event): token=event["token"], api_version=event["api_version"], registry=self, - watch_events_forever=self._watch_events_forever, ) - def load_strategies(self): - # stagger loading strategies to not run out of connections in the pool - vaults = self.vaults + self.experiments - Parallel(1, "threading")(delayed(vault.load_strategies)() for vault in vaults) - - def load_harvests(self): - vaults = self.vaults + self.experiments - Parallel(1, "threading")(delayed(vault.load_harvests)() for vault in vaults) - - async def describe(self, block=None): - vaults = await self.active_vaults_at(block) - results = await asyncio.gather(*[vault.describe(block=block) for vault in vaults]) - return {vault.name: result for vault, result in zip(vaults, results)} + @stuck_coro_debugger + async def describe(self, block=None) -> [VaultName, Dict]: + return await a_sync.gather({ + vault.name: asyncio.create_task(vault.describe(block=block)) + async for vault in self.active_vaults_at(block, iter=True) + }) + @stuck_coro_debugger async def total_value_at(self, block=None): vaults = await self.active_vaults_at(block) prices, results = await asyncio.gather( @@ -232,24 +232,109 @@ async def total_value_at(self, block=None): ) return {vault.name: assets * price / vault.scale for vault, assets, price in zip(vaults, results, prices)} - async def active_vaults_at(self, block=None): - vaults = self.vaults + self.experiments - if block: - blocks = await asyncio.gather(*[contract_creation_block_async(str(vault.vault)) for vault in vaults]) - vaults = [vault for vault, deploy_block in zip(vaults, blocks) if deploy_block <= block] - # fixes edge case: a vault is not necessarily initialized on creation - activations = await fetch_multicall_async(*[[vault.vault, 'activation'] for vault in vaults], block=block) - return [vault for vault, activation in zip(vaults, activations) if activation] - + @overload + def active_vaults_at(self, block=None, iter = False) -> Awaitable[List[Vault]]:... + @overload + def active_vaults_at(self, block=None, iter = True) -> AsyncIterator[Vault]:... + def active_vaults_at(self, block=None, iter: bool = False): + if iter: + return self._active_vaults_at_iter(block=block) + else: + return self._active_vaults_at(block=block) + + @stuck_coro_debugger + async def _active_vaults_at(self, block=None) -> List[Vault]: + self._task + events = await self._events + await events._lock.wait_for(events._init_block) + vaults = list(itertools.chain(self._vaults.values(), self._experiments.values())) + return [vault for vault, active in zip(vaults, await asyncio.gather(*[vault.is_active(block) for vault in vaults])) if active] + + async def _active_vaults_at_iter(self, block=None) -> AsyncIterator[Vault]: + # ensure loader task is running + self._task + events = await self._events + # make sure the events are loaded thru now before proceeding + await events._lock.wait_for(events._init_block) + + vaults: List[Vault] = list(itertools.chain(self._vaults.values(), self._experiments.values())) + + i = 0 # TODO figure out why we need this here + while len(vaults) == 0: + await asyncio.sleep(6) + vaults = list(itertools.chain(self._vaults.values(), self._experiments.values())) + i += 1 + if i >= 20: + logger.error("we're stuck") + + async for vault, active in a_sync.as_completed({vault: vault.is_active(block) for vault in vaults}, aiter=True): + if active: + yield vault + + @async_cached_property + async def _events(self) -> "RegistryEvents": + return RegistryEvents(self, await self.registries) + + @cached_property + def _task(self) -> asyncio.Task: + return asyncio.create_task(self.watch_events()) + def _filter_vaults(self): - logger.debug('filtering vaults') if chain.id in DEPRECATED_VAULTS: for vault in DEPRECATED_VAULTS[chain.id]: self._remove_vault(vault) - logger.debug('vaults filtered') def _remove_vault(self, address): self._vaults.pop(address, None) self._experiments.pop(address, None) self.tags.pop(address, None) logger.debug('removed %s', address) + + +class RegistryEvents(ProcessedEvents[_EventItem]): + __slots__ = "_init_block", "_registry" + def __init__(self, registry: Registry, registries: List[Contract]): + self._init_block = chain.height + self._registry = registry + super().__init__(addresses=registries) + def _process_event(self, event: _EventItem) -> _EventItem: + # hack to make camels to snakes + event._ordered = [OrderedDict({inflection.underscore(k): v for k, v in od.items()}) for od in event._ordered] + logger.debug("starting to process %s for %s: %s", event.name, event.address, dict(event)) + if event.name == "NewGovernance": + self._registry.governance = event["governance"] + + if event.name == "NewRelease": + self._registry.releases[event["api_version"]] = contract(event["template"]) + + if event.name == "NewVault": + # experiment was endorsed + if event["vault"] in self._registry._experiments: + vault = self._registry._experiments.pop(event["vault"]) + vault.name = f"{vault.vault.symbol()} {event['api_version']}" + self._registry._vaults[event["vault"]] = vault + logger.debug("endorsed vault %s %s", vault.vault, vault.name) + # we already know this vault from another registry + elif event["vault"] not in self._registry._vaults: + vault = self._registry.vault_from_event(event) + vault.name = f"{vault.vault.symbol()} {event['api_version']}" + self._registry._vaults[event["vault"]] = vault + logger.debug("new vault %s %s", vault.vault, vault.name) + + if self._registry.include_experimental and event.name == "NewExperimentalVault": + vault = self._registry.vault_from_event(event) + vault.name = f"{vault.vault.symbol()} {event['api_version']} {event['vault'][:8]}" + self._registry._experiments[event["vault"]] = vault + logger.debug("new experiment %s %s", vault.vault, vault.name) + + if event.name == "VaultTagged": + if event["tag"] == "Removed": + self._registry._remove_vault(event["vault"]) + logger.debug("Removed vault %s", event["vault"]) + else: + self._registry.tags[event["vault"]] = event["tag"] + + if event.name == "StakingPoolAdded": + self._registry._staking_pools[event["token"]] = event["staking_pool"] + logger.debug("done processing %s for %s: %s", event.name, event.address, dict(event)) + return event diff --git a/yearn/v2/strategies.py b/yearn/v2/strategies.py index cf39f6371..b7f4f579c 100644 --- a/yearn/v2/strategies.py +++ b/yearn/v2/strategies.py @@ -1,16 +1,15 @@ import logging -import threading -import time from functools import cached_property -from typing import Any, List +from typing import Any, AsyncIterator, List -from brownie import chain +from async_property import async_property +from brownie.network.event import _EventItem from eth_utils import encode_hex, event_abi_to_log_topic from multicall.utils import run_in_subprocess -from y.exceptions import NodeNotSynced +from y import Contract +from y.decorators import stuck_coro_debugger +from y.utils.events import ProcessedEvents -from yearn.decorators import sentry_catch_all, wait_or_exit_after -from yearn.events import decode_logs, get_logs_asap from yearn.multicall2 import fetch_multicall_async from yearn.utils import contract, safe_views @@ -30,23 +29,9 @@ logger = logging.getLogger(__name__) -def _unpack_results(views: List[str], results: List[Any], scale: int): - # unpack self.vault.vault.strategies(self.strategy) - info = dict(zip(views, results)) - info.update(results[-1].dict()) - # scale views - for view in STRATEGY_VIEWS_SCALED: - if view in info: - info[view] = (info[view] or 0) / scale - # unwrap structs - for view in info: - if hasattr(info[view], '_dict'): - info[view] = info[view].dict() - return info - class Strategy: - def __init__(self, strategy, vault, watch_events_forever): + def __init__(self, strategy, vault): self.strategy = contract(strategy) self.vault = vault try: @@ -54,22 +39,11 @@ def __init__(self, strategy, vault, watch_events_forever): except ValueError: self.name = strategy[:10] self._views = safe_views(self.strategy.abi) - self._harvests = [] - self._topics = [ - [ - encode_hex(event_abi_to_log_topic(event)) - for event in self.strategy.abi - if event["type"] == "event" and event["name"] in STRATEGY_EVENTS - ] - ] - self._watch_events_forever = watch_events_forever - self._done = threading.Event() - self._has_exception = False - self._thread = threading.Thread(target=self.watch_events, daemon=True) + self._events = Harvests(self) - @property - def unique_name(self): - if [strategy.name for strategy in self.vault.strategies].count(self.name) > 1: + @async_property + async def unique_name(self): + if [strategy.name for strategy in await self.vault.strategies].count(self.name) > 1: return f'{self.name} {str(self.strategy)[:8]}' else: return self.name @@ -80,60 +54,55 @@ def __repr__(self) -> str: def __eq__(self, other): if isinstance(other, Strategy): return self.strategy == other.strategy - if isinstance(other, str): return self.strategy == other - raise ValueError("Strategy is only comparable with [Strategy, str]") - @sentry_catch_all - def watch_events(self): - start = time.time() - sleep_time = 300 - from_block = None - height = chain.height - while True: - logs = get_logs_asap(str(self.strategy), topics=self._topics, from_block=from_block, to_block=height) - events = decode_logs(logs) - self.process_events(events) - if not self._done.is_set(): - self._done.set() - logger.info("loaded %d harvests %s in %.3fs", len(self._harvests), self.name, time.time() - start) - if not self._watch_events_forever: - return - time.sleep(sleep_time) - - # read new logs at end of loop - from_block = height + 1 - height = chain.height - if height < from_block: - raise NodeNotSynced(f"No new blocks in the past {sleep_time/60} minutes.") - - - def process_events(self, events): - for event in events: - if event.name == "Harvested": - block = event.block_number - logger.debug("%s harvested on %d", self.name, block) - self._harvests.append(block) - - @wait_or_exit_after - def load_harvests(self): - if not self._thread._started.is_set(): - self._thread.start() - - @property - def harvests(self) -> List[int]: - self.load_harvests() - return self._harvests + async def harvests(self, thru_block: int) -> AsyncIterator[dict]: + async for event in self._events.events(to_block=thru_block): + yield event + @stuck_coro_debugger + async def describe(self, block=None): + results = await fetch_multicall_async(*self._calls, block=block) + return await self._unpack_results(results) + @cached_property def _calls(self): return *[[self.strategy, view] for view in self._views], [self.vault.vault, "strategies", self.strategy], async def _unpack_results(self, results): return await run_in_subprocess(_unpack_results, self._views, results, self.vault.scale) + + +class Harvests(ProcessedEvents[int]): + def __init__(self, strategy: Strategy): + topics = [ + [ + encode_hex(event_abi_to_log_topic(event)) + for event in strategy.strategy.abi + if event["type"] == "event" and event["name"] in STRATEGY_EVENTS + ] + ] + super().__init__(addresses=[str(strategy.strategy)], topics=topics) + self.strategy = strategy + def _include_event(self, event: _EventItem) -> bool: + return event.name == "Harvested" + def _get_block_for_obj(self, block: int) -> int: + return block + # TODO: work this in somehow: + # logger.info("loaded %d harvests %s in %.3fs", len(self._harvests), self.name, time.time() - start) + def _process_event(self, event: _EventItem) -> int: + block = event.block_number + logger.debug("%s harvested on %d", self.strategy.name, block) + return block - async def describe(self, block=None): - results = await fetch_multicall_async(*self._calls, block=block) - return await self._unpack_results(results) + +def _unpack_results(views: List[str], results: List[Any], scale: int): + # unpack self.vault.vault.strategies(self.strategy) + info = dict(zip(views, results)) + info.update(results[-1].dict()) + # scale views + info = {view: (result or 0) / scale if view in STRATEGY_VIEWS_SCALED else result for view, result in info.items()} + # unwrap structs + return {view: result.dict() if hasattr(info[view], '_dict') else result for view, result in info.items()} \ No newline at end of file diff --git a/yearn/v2/vaults.py b/yearn/v2/vaults.py index b229eac90..9ae0a19a8 100644 --- a/yearn/v2/vaults.py +++ b/yearn/v2/vaults.py @@ -1,29 +1,33 @@ import asyncio import logging import re -import threading import time +from contextlib import suppress from functools import cached_property -from typing import TYPE_CHECKING, Any, Dict, List, Union +from typing import (TYPE_CHECKING, Any, AsyncIterator, Dict, List, NoReturn, + Optional, Union) +from a_sync.utils.iterators import exhaust_iterator +from async_property import async_cached_property, async_property from brownie import chain +from brownie.network.event import _EventItem from eth_utils import encode_hex, event_abi_to_log_topic -from joblib import Parallel, delayed from multicall.utils import run_in_subprocess from semantic_version.base import Version from y import ERC20, Contract, Network, magic -from y.exceptions import NodeNotSynced, PriceError, yPriceMagicError +from y.contracts import contract_creation_block_async +from y.decorators import stuck_coro_debugger +from y.exceptions import PriceError, yPriceMagicError from y.networks import Network from y.prices import magic -from y.utils.events import get_logs_asap +from y.utils.dank_mids import dank_w3 +from y.utils.events import ProcessedEvents from yearn.common import Tvl -from yearn.decorators import sentry_catch_all, wait_or_exit_after -from yearn.events import decode_logs, get_logs_asap from yearn.multicall2 import fetch_multicall_async from yearn.special import Ygov from yearn.typing import Address -from yearn.utils import run_in_thread, safe_views +from yearn.utils import safe_views from yearn.v2.strategies import Strategy if TYPE_CHECKING: @@ -91,7 +95,7 @@ def _unpack_results(vault: Address, is_experiment: bool, _views: List[str], resu info["token price"] = float(price) if "totalAssets" in info: - info["tvl"] = info["token price"] * info["totalAssets"] + info["tvl"] = float(info["token price"]) * info["totalAssets"] for strategy_name, desc in zip(strategies, strategy_descs): info["strategies"][strategy_name] = desc @@ -101,9 +105,9 @@ def _unpack_results(vault: Address, is_experiment: bool, _views: List[str], resu info["version"] = "v2" return info - + class Vault: - def __init__(self, vault: Contract, api_version=None, token=None, registry=None, watch_events_forever=True): + def __init__(self, vault: Contract, api_version=None, token=None, registry=None): self._strategies: Dict[Address, Strategy] = {} self._revoked: Dict[Address, Strategy] = {} self._reports = [] @@ -116,26 +120,21 @@ def __init__(self, vault: Contract, api_version=None, token=None, registry=None, self.scale = 10 ** self.vault.decimals() # multicall-safe views with 0 inputs and numeric output. self._views = safe_views(self.vault.abi) + self._calls = [[self.vault, view] for view in self._views] # load strategies from events and watch for freshly attached strategies - self._topics = [ - [ - encode_hex(event_abi_to_log_topic(event)) - for event in self.vault.abi - if event["type"] == "event" and event["name"] in STRATEGY_EVENTS - ] - ] - self._watch_events_forever = watch_events_forever - self._done = threading.Event() - self._has_exception = False - self._thread = threading.Thread(target=self.watch_events, daemon=True) + self._events = VaultEvents(self) def __repr__(self): strategies = "..." # don't block if we don't have the strategies loaded - if self._done.is_set(): - strategies = ", ".join(f"{strategy}" for strategy in self.strategies) + with suppress(RuntimeError): # NOTE on RuntimeError, event loop isnt running and task doesn't exist. can't be created, is not complete. no need to check strats. + if self._task.done(): + strategies = ", ".join(f"{strategy}" for strategy in self._strategies) return f'' + def __hash__(self) -> int: + return hash(self.vault.address) + def __eq__(self, other): if isinstance(other, Vault): return self.vault == other.vault @@ -156,126 +155,90 @@ def from_address(cls, address): instance.name = vault.name() return instance - @property - def strategies(self) -> List[Strategy]: - self.load_strategies() + @async_property + @stuck_coro_debugger + async def strategies(self) -> List[Strategy]: + await self.load_strategies() return list(self._strategies.values()) - - @property - def revoked_strategies(self) -> List[Strategy]: - self.load_strategies() + + async def strategies_at_block(self, block: int) -> AsyncIterator[Strategy]: + self._task + working = {} + async for _ in self._events.events(to_block=block): + for address in self._strategies: + if address in working: + continue + working[address] = asyncio.create_task(contract_creation_block_async(address, when_no_history_return_0=True)) + for address in list(working.keys()): + if working[address].done(): + if await working.pop(address) > block: + return + yield self._strategies[address] + + await self._events._lock.wait_for(block) + + @async_property + @stuck_coro_debugger + async def revoked_strategies(self) -> List[Strategy]: + await self.load_strategies() return list(self._revoked.values()) - @property - def reports(self): + @async_property + @stuck_coro_debugger + async def reports(self): # strategy reports are loaded at the same time as other vault strategy events - self.load_strategies() + await self.load_strategies() return self._reports - @property - def is_endorsed(self): + @async_property + @stuck_coro_debugger + async def is_endorsed(self): if not self.registry: return None - return str(self.vault) in self.registry.vaults + return str(self.vault) in await self.registry.vaults - @property - def is_experiment(self): + @async_property + @stuck_coro_debugger + async def is_experiment(self): if not self.registry: return None # experimental vaults are either listed in the registry or have the 0x address suffix in the name - return str(self.vault) in self.registry.experiments or re.search(r"0x.*$", self.name) is not None + return str(self.vault) in await self.registry.experiments or re.search(r"0x.*$", self.name) is not None - @wait_or_exit_after - def load_strategies(self): - if not self._thread._started.is_set(): - self._thread.start() + @stuck_coro_debugger + async def is_active(self, block: Optional[int]) -> bool: + if block and await contract_creation_block_async(str(self.vault)) > block: + return False + # fixes edge case: a vault is not necessarily initialized on creation + return await self.vault.activation.coroutine(block_identifier=block) - def load_harvests(self): - Parallel(1, "threading")(delayed(strategy.load_harvests)() for strategy in self.strategies) + @stuck_coro_debugger + async def load_strategies(self): + await self._task - @sentry_catch_all - def watch_events(self): + async def watch_events(self) -> NoReturn: start = time.time() - sleep_time = 300 - from_block = None - height = chain.height - while True: - logs = get_logs_asap(str(self.vault), topics=self._topics, from_block=from_block, to_block=height) - events = decode_logs(logs) - self.process_events(events) - if not self._done.is_set(): - self._done.set() - logger.info("loaded %d strategies %s in %.3fs", len(self._strategies), self.name, time.time() - start) - if not self._watch_events_forever: - return - time.sleep(sleep_time) - - # set vars for next loop - from_block = height + 1 - height = chain.height - if height < from_block: - raise NodeNotSynced(f"No new blocks in the past {sleep_time/60} minutes.") - - - def process_events(self, events): - for event in events: - # some issues during the migration of this strat prevented it from being verified so we skip it here... - if chain.id == Network.Optimism: - failed_migration = False - for key in ["newVersion", "oldVersion", "strategy"]: - failed_migration |= (key in event and event[key] == "0x4286a40EB3092b0149ec729dc32AD01942E13C63") - if failed_migration: - continue - - if event.name == "StrategyAdded": - strategy_address = event["strategy"] - logger.debug("%s strategy added %s", self.name, strategy_address) - try: - self._strategies[strategy_address] = Strategy(strategy_address, self, self._watch_events_forever) - except ValueError: - logger.error(f"Error loading strategy {strategy_address}") - pass - elif event.name == "StrategyRevoked": - logger.debug("%s strategy revoked %s", self.name, event["strategy"]) - self._revoked[event["strategy"]] = self._strategies.pop( - event["strategy"], Strategy(event["strategy"], self, self._watch_events_forever) - ) - elif event.name == "StrategyMigrated": - logger.debug("%s strategy migrated %s -> %s", self.name, event["oldVersion"], event["newVersion"]) - self._revoked[event["oldVersion"]] = self._strategies.pop( - event["oldVersion"], Strategy(event["oldVersion"], self, self._watch_events_forever) - ) - self._strategies[event["newVersion"]] = Strategy(event["newVersion"], self, self._watch_events_forever) - elif event.name == "StrategyReported": - self._reports.append(event) - - async def _unpack_results(self, results): - results, strategy_descs, price = results - return await run_in_subprocess( - _unpack_results, - self.vault.address, - self.is_experiment, - self._views, - results, - self.scale, - price, - # must be picklable. - [strategy.unique_name for strategy in self.strategies], - strategy_descs, - ) - + async for event in self._events.events(await dank_w3.eth.block_number): + # we iterate thru the list to ensure they're loaded thru now + pass + logger.info("loaded %d strategies %s in %.3fs", len(self._strategies), self.name, time.time() - start) + # Keep the loader running in the background + self._daemon = asyncio.create_task(exhaust_iterator(self._events)) + + @stuck_coro_debugger async def describe(self, block=None): - await run_in_thread(self.load_strategies) + block = block or await dank_w3.eth.block_number results = await asyncio.gather( - fetch_multicall_async(*[[self.vault, view] for view in self._views], block=block), - asyncio.gather(*[strategy.describe(block=block) for strategy in self.strategies]), - get_price_return_exceptions(self.token, block=block) + fetch_multicall_async(*self._calls, block=block), + self._describe_strategies(block), + get_price_return_exceptions(self.token, block=block), ) return await self._unpack_results(results) - + + @stuck_coro_debugger async def apy(self, samples: "ApySamples"): from yearn import apy - if self._needs_curve_simple: + if await self._needs_curve_simple: return await apy.curve.simple(self, samples) elif pool := await apy.velo.get_staking_pool(self.token.address): return await apy.velo.staking(self, pool, samples) @@ -286,6 +249,7 @@ async def apy(self, samples: "ApySamples"): else: return await apy.v2.simple(self, samples) + @stuck_coro_debugger async def tvl(self, block=None): total_assets = await self.vault.totalAssets.coroutine(block_identifier=block) try: @@ -303,8 +267,12 @@ async def tvl(self, block=None): return Tvl(total_assets, price, tvl) @cached_property - def _needs_curve_simple(self): - from yearn.prices.curve import curve + def _task(self) -> asyncio.Task: + return asyncio.create_task(self.watch_events()) + + @async_cached_property + @stuck_coro_debugger + async def _needs_curve_simple(self): # some curve vaults which should not be calculated with curve logic curve_simple_excludes = { Network.Arbitrum: [ @@ -315,4 +283,67 @@ def _needs_curve_simple(self): if chain.id in curve_simple_excludes: needs_simple = self.vault.address not in curve_simple_excludes[chain.id] - return needs_simple and curve and curve.get_pool(self.token.address) + return needs_simple and magic.curve and await magic.curve.get_pool(self.token.address) + + @stuck_coro_debugger + async def _describe_strategies(self, block: int) -> List[dict]: + return asyncio.gather(*[asyncio.create_task(strategy.describe(block=block)) async for strategy in self.strategies_at_block(block)]) + + @stuck_coro_debugger + async def _unpack_results(self, results): + # TODO: get rid of this + results, strategy_descs, price = results + return await run_in_subprocess( + _unpack_results, + self.vault.address, + await self.is_experiment, + self._views, + results, + self.scale, + price, + # must be picklable. + [strategy.unique_name for strategy in await self.strategies], + strategy_descs, + ) + + +class VaultEvents(ProcessedEvents[_EventItem]): + __slots__ = "vault", + def __init__(self, vault: Vault, **kwargs: Any): + topics = [[encode_hex(event_abi_to_log_topic(event)) for event in vault.vault.abi if event["type"] == "event" and event["name"] in STRATEGY_EVENTS]] + super().__init__(addresses=[str(vault.vault)], topics=topics, **kwargs) + self.vault = vault + def _process_event(self, event: _EventItem) -> _EventItem: + # some issues during the migration of this strat prevented it from being verified so we skip it here... + try: + if chain.id == Network.Optimism: + failed_migration = False + for key in ["newVersion", "oldVersion", "strategy"]: + failed_migration |= (key in event and event[key] == "0x4286a40EB3092b0149ec729dc32AD01942E13C63") + if failed_migration: + return event + + if event.name == "StrategyAdded": + strategy_address = event["strategy"] + logger.debug("%s strategy added %s", self.vault.name, strategy_address) + try: + self.vault._strategies[strategy_address] = Strategy(strategy_address, self.vault) + except ValueError: + logger.error(f"Error loading strategy {strategy_address}") + elif event.name == "StrategyRevoked": + logger.debug("%s strategy revoked %s", self.vault.name, event["strategy"]) + self.vault._revoked[event["strategy"]] = self.vault._strategies.pop( + event["strategy"], Strategy(event["strategy"], self.vault) + ) + elif event.name == "StrategyMigrated": + logger.debug("%s strategy migrated %s -> %s", self.vault.name, event["oldVersion"], event["newVersion"]) + self.vault._revoked[event["oldVersion"]] = self.vault._strategies.pop( + event["oldVersion"], Strategy(event["oldVersion"], self.vault) + ) + self.vault._strategies[event["newVersion"]] = Strategy(event["newVersion"], self.vault) + elif event.name == "StrategyReported": + self.vault._reports.append(event) + return event + except Exception as e: + logger.exception(e) + raise e \ No newline at end of file diff --git a/yearn/yearn.py b/yearn/yearn.py index 7f0ec550c..9319ad529 100644 --- a/yearn/yearn.py +++ b/yearn/yearn.py @@ -6,6 +6,7 @@ from brownie import chain from y.contracts import contract_creation_block_async +from y.decorators import stuck_coro_debugger from y.networks import Network import yearn.iearn @@ -28,37 +29,32 @@ class Yearn: Can describe all products. """ - def __init__(self, load_strategies=True, load_harvests=False, load_transfers=False, watch_events_forever=True, exclude_ib_tvl=True) -> None: + def __init__(self, exclude_ib_tvl=True) -> None: start = time() if chain.id == Network.Mainnet: self.registries = { "earn": yearn.iearn.Registry(), "v1": yearn.v1.registry.Registry(), - "v2": yearn.v2.registry.Registry(watch_events_forever=watch_events_forever), + "v2": yearn.v2.registry.Registry(), "ib": yearn.ironbank.Registry(exclude_ib_tvl=exclude_ib_tvl), "special": yearn.special.Registry(), } elif chain.id in [Network.Gnosis, Network.Base]: self.registries = { - "v2": yearn.v2.registry.Registry(watch_events_forever=watch_events_forever), + "v2": yearn.v2.registry.Registry(), } elif chain.id in [Network.Fantom, Network.Arbitrum, Network.Optimism]: self.registries = { - "v2": yearn.v2.registry.Registry(watch_events_forever=watch_events_forever), + "v2": yearn.v2.registry.Registry(), "ib": yearn.ironbank.Registry(exclude_ib_tvl=exclude_ib_tvl), } else: raise UnsupportedNetwork('yearn is not supported on this network') self.exclude_ib_tvl = exclude_ib_tvl - - if load_strategies: - self.registries["v2"].load_strategies() - if load_harvests: - self.registries["v2"].load_harvests() logger.info('loaded yearn in %.3fs', time() - start) - + @stuck_coro_debugger async def active_vaults_at(self, block=None): active_vaults_by_registry = await asyncio.gather(*[registry.active_vaults_at(block) for registry in self.registries.values()]) active = [vault for registry in active_vaults_by_registry for vault in registry] @@ -71,14 +67,14 @@ async def active_vaults_at(self, block=None): return active - + @stuck_coro_debugger async def describe(self, block=None): if block is None: block = chain.height desc = await asyncio.gather(*[self.registries[key].describe(block=block) for key in self.registries]) return dict(zip(self.registries, desc)) - + @stuck_coro_debugger async def describe_wallets(self, block=None): from yearn.outputs.describers.registry import RegistryWalletDescriber describer = RegistryWalletDescriber() @@ -102,12 +98,12 @@ async def describe_wallets(self, block=None): data.update(agg_stats) return data - + @stuck_coro_debugger async def total_value_at(self, block=None): desc = await asyncio.gather(*[self.registries[key].total_value_at(block=block) for key in self.registries]) return dict(zip(self.registries, desc)) - + @stuck_coro_debugger async def data_for_export(self, block, timestamp) -> List: start = time() data = await self.describe(block) @@ -183,7 +179,7 @@ async def data_for_export(self, block, timestamp) -> List: return metrics_to_export - + @stuck_coro_debugger async def wallet_data_for_export(self, block: int, timestamp: int): data = await self.describe_wallets(block) metrics_to_export = [] diff --git a/yearn/yeth.py b/yearn/yeth.py index b8704afd2..7e0e948d4 100644 --- a/yearn/yeth.py +++ b/yearn/yeth.py @@ -1,26 +1,27 @@ import asyncio +import logging import os import re -import logging -from datetime import datetime, timezone +from datetime import datetime +from pprint import pformat +from typing import Optional import eth_retry - from brownie import chain -from pprint import pformat - -from y import Contract, magic, Network -from y.time import get_block_timestamp +from y import Contract, Network, magic from y.contracts import contract_creation_block_async +from y.datatypes import Block from y.exceptions import PriceError, yPriceMagicError +from y.time import get_block_timestamp -from yearn.apy.common import (Apy, ApyFees, - ApySamples, SECONDS_PER_YEAR, SECONDS_PER_WEEK, SharePricePoint, calculate_roi, get_samples) +from yearn.apy.common import (SECONDS_PER_WEEK, SECONDS_PER_YEAR, Apy, ApyFees, + ApySamples, SharePricePoint, calculate_roi, + get_samples) from yearn.common import Tvl +from yearn.debug import Debug from yearn.events import decode_logs, get_logs_asap -from yearn.utils import Singleton from yearn.prices.constants import weth -from yearn.debug import Debug +from yearn.utils import Singleton logger = logging.getLogger("yearn.yeth") @@ -49,19 +50,15 @@ def decimals(self): def symbol(self): return 'st-yETH' + async def get_supply(self, block: Optional[Block] = None) -> float: + return (await YETH_POOL.vb_prod_sum.coroutine(block_identifier=block))[1] / 10 ** 18 - async def _get_supply_price(self, block=None): - data = YETH_POOL.vb_prod_sum(block_identifier=block) - supply = data[1] / 1e18 + async def get_price(self, block: Optional[Block] = None) -> Optional[float]: try: - price = await magic.get_price(YETH_TOKEN, block=block, sync=False) + return float(await magic.get_price(YETH_TOKEN, block=block, sync=False)) except yPriceMagicError as e: if not isinstance(e.exception, PriceError): raise e - price = None - - return supply, price - @eth_retry.auto_retry async def apy(self, samples: ApySamples) -> Apy: @@ -83,14 +80,13 @@ async def apy(self, samples: ApySamples) -> Apy: @eth_retry.auto_retry async def tvl(self, block=None) -> Tvl: - supply, price = await self._get_supply_price(block=block) + supply, price = await asyncio.gather(self.get_supply(block), self.get_price(block)) tvl = supply * price if price else None - return Tvl(supply, price, tvl) async def describe(self, block=None): - supply, price = await self._get_supply_price(block=block) + supply, price = await asyncio.gather(self.get_supply(block), self.get_price(block)) try: pool_supply = YETH_POOL.supply(block_identifier=block) total_assets = STAKING_CONTRACT.totalAssets(block_identifier=block) @@ -118,7 +114,7 @@ async def describe(self, block=None): async def total_value_at(self, block=None): - supply, price = await self._get_supply_price(block=block) + supply, price = await asyncio.gather(self.get_supply(block), self.get_price(block)) return supply * price @@ -236,7 +232,7 @@ async def _get_daily_swap_volumes(self, from_block, to_block): volume_in_eth[asset_in] += amount_in * rates[asset_in] volume_out_eth[asset_out] += amount_out * rates[asset_out] - weth_price = await magic.get_price(weth, block=from_block, sync=False) + weth_price = float(await magic.get_price(weth, block=from_block, sync=False)) for i, value in enumerate(volume_in_eth): volume_in_usd[i] = value * weth_price @@ -258,16 +254,16 @@ async def describe(self, block=None): samples = get_samples() self.swap_volumes = await self._get_daily_swap_volumes(samples.day_ago, samples.now) - products = await self.active_products_at(block) + products = await self.active_vaults_at(block) data = await asyncio.gather(*[product.describe(block=block) for product in products]) return {product.name: desc for product, desc in zip(products, data)} async def total_value_at(self, block=None): - products = await self.active_products_at(block) + products = await self.active_vaults_at(block) tvls = await asyncio.gather(*[product.total_value_at(block=block) for product in products]) return {product.name: tvl for product, tvl in zip(products, tvls)} - async def active_products_at(self, block=None): + async def active_vaults_at(self, block=None): products = [self.st_yeth] + self.st_yeth.lsts if block: blocks = await asyncio.gather(*[contract_creation_block_async(str(product.address)) for product in products])