From 53443cd7936935e88dcacae0379d336418c98f4d Mon Sep 17 00:00:00 2001 From: Yang Chun Ung Date: Thu, 21 Mar 2024 18:20:52 +0900 Subject: [PATCH] Set countdown when stage transactions --- tests/graphql_test.py | 16 +++------------- tests/raid_test.py | 17 +++++++++++++++++ tests/tasks_test.py | 30 +++++++++++++++++++++++++++--- world_boss/app/graphql.py | 21 ++++++--------------- world_boss/app/raid.py | 4 ++++ world_boss/app/tasks.py | 17 ++++++++++++++++- 6 files changed, 73 insertions(+), 32 deletions(-) diff --git a/tests/graphql_test.py b/tests/graphql_test.py index 47a18db..642e578 100644 --- a/tests/graphql_test.py +++ b/tests/graphql_test.py @@ -6,7 +6,6 @@ from pytest_httpx import HTTPXMock from world_boss.app.config import config -from world_boss.app.enums import NetworkType from world_boss.app.models import Transaction, WorldBossReward, WorldBossRewardAmount @@ -283,21 +282,12 @@ def test_stage_transactions( for tx in fx_transactions: fx_session.add(tx) fx_session.commit() - network_type = NetworkType.MAIN query = f'mutation {{ stageTransactions(password: "{config.graphql_password}") }}' - with patch( - "world_boss.app.tasks.signer.stage_transaction", return_value="tx_id" - ) as m, patch("world_boss.app.tasks.client.chat_postMessage") as m2: + with patch("world_boss.app.graphql.stage_transactions_with_countdown.delay") as m: req = fx_test_client.post("/graphql", json={"query": query}) assert req.status_code == 200 - task_id = req.json()["data"]["stageTransactions"] - task: AsyncResult = AsyncResult(task_id) - task.get(timeout=30) - assert m.call_count == len(fx_transactions) - m2.assert_called_once_with( - channel=config.slack_channel_id, - text=f"stage {len(fx_transactions)} transactions", - ) + req.json()["data"]["stageTransactions"] + m.assert_called_once_with(config.headless_url, [1, 2]) def test_transaction_result( diff --git a/tests/raid_test.py b/tests/raid_test.py index fb03bc4..16a4650 100644 --- a/tests/raid_test.py +++ b/tests/raid_test.py @@ -8,6 +8,7 @@ from world_boss.app.raid import ( get_assets, get_next_tx_nonce, + get_tx_delay_factor, list_tx_nonce, update_agent_address, write_ranking_rewards_csv, @@ -216,3 +217,19 @@ def test_list_tx_nonce(fx_session, nonce_list: List[int]): fx_session.add(tx) fx_session.flush() assert list_tx_nonce(fx_session) == nonce_list + + +@pytest.mark.parametrize( + "expected, index", + [ + (0, 0), + (0, 1), + (0, 2), + (0, 3), + (4, 4), + (4, 5), + (8, 8), + ], +) +def test_get_tx_delay_factor(expected: int, index: int): + assert get_tx_delay_factor(index) == expected diff --git a/tests/tasks_test.py b/tests/tasks_test.py index f2278e4..1bf6412 100644 --- a/tests/tasks_test.py +++ b/tests/tasks_test.py @@ -1,4 +1,5 @@ import json +import time import unittest.mock from typing import List @@ -22,6 +23,7 @@ send_slack_message, sign_transfer_assets, stage_transaction, + stage_transactions_with_countdown, upload_balance_result, upload_tx_result, ) @@ -240,9 +242,7 @@ def test_send_slack_message( @pytest.mark.parametrize("network_type", [NetworkType.MAIN, NetworkType.INTERNAL]) def test_stage_transaction( - celery_session_worker, - fx_session, - network_type: NetworkType, + celery_session_worker, fx_session, network_type: NetworkType ): transaction = Transaction() transaction.tx_id = "tx_id" @@ -315,3 +315,27 @@ def test_upload_balance_result(celery_session_worker): ) msg = f"world boss pool balance.\naddress:{signer.address}\n\n0 CRYSTAL\n1 RUNE" m.assert_called_once_with(channel="channel_id", text=msg) + + +def test_stage_transactions_with_countdown( + fx_test_client, + celery_session_worker, + fx_session, + fx_transactions, +): + for tx in fx_transactions: + fx_session.add(tx) + fx_session.commit() + with unittest.mock.patch( + "world_boss.app.tasks.signer.stage_transaction", return_value="tx_id" + ) as m, unittest.mock.patch("world_boss.app.tasks.client.chat_postMessage") as m2: + stage_transactions_with_countdown.delay(config.headless_url, [1, 2]).get( + timeout=30 + ) + # wait for subtask call + time.sleep(2) + assert m.call_count == len(fx_transactions) + m2.assert_called_once_with( + channel=config.slack_channel_id, + text=f"stage {len(fx_transactions)} transactions", + ) diff --git a/world_boss/app/graphql.py b/world_boss/app/graphql.py index cf34a6e..21f015e 100644 --- a/world_boss/app/graphql.py +++ b/world_boss/app/graphql.py @@ -27,9 +27,8 @@ get_ranking_rewards, insert_world_boss_rewards, query_tx_result, - send_slack_message, sign_transfer_assets, - stage_transaction, + stage_transactions_with_countdown, upload_prepare_reward_assets, upload_tx_result, ) @@ -146,21 +145,13 @@ def prepare_reward_assets(self, season_id: int, password: str) -> str: @strawberry.mutation def stage_transactions(self, password: str, info: Info) -> str: db = info.context["db"] - nonce_list = ( - db.query(Transaction.nonce) + nonce_list = [ + i[0] + for i in db.query(Transaction.nonce) .filter_by(signer=signer.address, tx_result=None) .all() - ) - headless_urls = [config.headless_url] - task = chord( - stage_transaction.s(headless_url, nonce) - for headless_url in headless_urls - for nonce, in nonce_list - )( - send_slack_message.si( - config.slack_channel_id, f"stage {len(nonce_list)} transactions" - ) - ) + ] + task = stage_transactions_with_countdown.delay(config.headless_url, nonce_list) return task.id @strawberry.mutation(permission_classes=[IsAuthenticated]) diff --git a/world_boss/app/raid.py b/world_boss/app/raid.py index 627a041..f9636fa 100644 --- a/world_boss/app/raid.py +++ b/world_boss/app/raid.py @@ -257,3 +257,7 @@ def get_jwt_auth_header() -> dict[str, str]: algorithm=config.headless_jwt_algorithm, ) return {"Authorization": f"Bearer {encoded}"} + + +def get_tx_delay_factor(index: int) -> int: + return 4 * (index // 4) diff --git a/world_boss/app/tasks.py b/world_boss/app/tasks.py index 8817b4c..6061a93 100644 --- a/world_boss/app/tasks.py +++ b/world_boss/app/tasks.py @@ -3,7 +3,7 @@ from typing import List, Tuple import bencodex -from celery import Celery +from celery import Celery, chord from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker @@ -14,6 +14,7 @@ from world_boss.app.models import Transaction, WorldBossReward, WorldBossRewardAmount from world_boss.app.raid import ( get_assets, + get_tx_delay_factor, update_agent_address, write_ranking_rewards_csv, write_tx_result_csv, @@ -195,3 +196,17 @@ def upload_balance_result(balance: List[str], channel_id: str): balance_str = "\n".join(balance) msg = f"world boss pool balance.\naddress:{signer.address}\n\n{balance_str}" send_slack_message(channel_id, msg) + + +@celery.task() +def stage_transactions_with_countdown(headless_url: str, nonce_list: List[int]): + chord( + stage_transaction.signature( + (headless_url, nonce), countdown=get_tx_delay_factor(i) + ) + for i, nonce in enumerate(nonce_list) + )( + send_slack_message.si( + config.slack_channel_id, f"stage {len(nonce_list)} transactions" + ) + )