diff --git a/folding/base/reward.py b/folding/base/reward.py new file mode 100644 index 00000000..20d1cb5d --- /dev/null +++ b/folding/base/reward.py @@ -0,0 +1,80 @@ +from pydantic import BaseModel +from abc import ABC, abstractmethod +from typing import Optional +import time +import torch +from folding.store import Job + + +class RewardEvent(BaseModel): + """Contains rewards for all the responses in a batch""" + + reward_name: str + rewards: torch.Tensor + batch_time: float + + extra_info: Optional[dict] = None + + class Config: + arbitrary_types_allowed = True + + +class BatchRewardOutput(BaseModel): + rewards: torch.Tensor + extra_info: Optional[dict] = None + + class Config: + arbitrary_types_allowed = True + + +class BatchRewardInput(BaseModel): + energies: torch.Tensor + top_reward: float + + class Config: + arbitrary_types_allowed = True + + +class BaseReward(ABC): + @abstractmethod + def name(self) -> str: + ... + + @abstractmethod + def __init__(self, **kwargs): + pass + + @abstractmethod + async def get_rewards( + self, data: BatchRewardInput, rewards: torch.Tensor + ) -> BatchRewardOutput: + pass + + @abstractmethod + async def calculate_final_reward(self, rewards: torch.Tensor) -> torch.Tensor: + pass + + async def setup_rewards(self, energies: torch.Tensor) -> torch.Tensor: + """Setup rewards for the given energies""" + return torch.zeros(len(energies)) + + async def apply(self, data: BatchRewardInput) -> RewardEvent: + self.rewards: torch.Tensor = await self.setup_rewards(energies=data.energies) + t0: float = time.time() + batch_rewards_output: BatchRewardOutput = await self.get_rewards( + data=data, rewards=self.rewards + ) + batch_rewards_output.rewards = await self.calculate_final_reward( + rewards=batch_rewards_output.rewards + ) + batch_rewards_time: float = time.time() - t0 + + return RewardEvent( + reward_name=self.name(), + rewards=batch_rewards_output.rewards, + batch_time=batch_rewards_time, + extra_info=batch_rewards_output.extra_info, + ) + + def __repr__(self) -> str: + return f"{self.__class__.__name__}(name={self.name})" diff --git a/folding/rewards/folding_reward.py b/folding/rewards/folding_reward.py new file mode 100644 index 00000000..778fc51e --- /dev/null +++ b/folding/rewards/folding_reward.py @@ -0,0 +1,144 @@ +from folding.base.reward import BaseReward, BatchRewardOutput, BatchRewardInput +import torch +from loguru import logger +from folding.store import Job +from folding.rewards.linear_reward import divide_decreasing + + +class FoldingReward(BaseReward): + """Folding reward class""" + + def __init__(self, **kwargs): + super().__init__(**kwargs) + + async def get_rewards( + self, data: BatchRewardInput, rewards: torch.Tensor + ) -> BatchRewardOutput: + """ + A reward pipeline that determines how to place rewards onto the miners sampled within the batch. + Currently applies a linearly decreasing reward on all miners that are not the current best / previously + best loss using the function "divide_decreasing". + + Args: + energies (torch.Tensor): tensor of returned energies + rewards (torch.Tensor): tensor of rewards, floats. + top_reward (float): upper bound reward. + job (Job) + """ + energies: torch.Tensor = data.energies + top_reward: float = data.top_reward + job: Job = data.job + + nonzero_energies: torch.Tensor = torch.nonzero(energies) + + info = { + "name": self.name(), + "top_reward": top_reward, + } + + # If the best hotkey is not in the set of hotkeys in the job, this means that the top miner has stopped replying. + if job.best_hotkey not in job.hotkeys: + logger.warning( + f"Best hotkey {job.best_hotkey} not in hotkeys {job.hotkeys}. Assigning no reward." + ) + return BatchRewardOutput( + rewards=rewards, extra_info=info + ) # rewards of all 0s. + + best_index: int = job.hotkeys.index(job.best_hotkey) + + # There are cases where the top_miner stops replying. ensure to assign reward. + rewards[best_index] = top_reward + + # If no miners reply, we want *all* reward to go to the top miner. + if len(nonzero_energies) == 0: + rewards[best_index] = 1 + return BatchRewardOutput(rewards=rewards, extra_info=info) + + if (len(nonzero_energies) == 1) and (nonzero_energies[0] == best_index): + rewards[best_index] = 1 + return BatchRewardOutput(rewards=rewards, extra_info=info) + + # Find if there are any indicies that are the same as the best value + remaining_miners = {} + for index in nonzero_energies: + # There could be multiple max energies. + # The best energy could be the one that is saved in the store. + if (energies[index] == job.best_loss) or (index == best_index): + rewards[index] = top_reward + else: + remaining_miners[index] = energies[index] + + # The amount of reward that is distributed to the remaining miners MUST be less than the reward given to the top miners. + num_reminaing_miners = len(remaining_miners) + if num_reminaing_miners > 1: + sorted_remaining_miners = dict( + sorted(remaining_miners.items(), key=lambda item: item[1]) + ) # sort smallest to largest + + # Apply a fixed decrease in reward on the remaining non-zero miners. + rewards_per_miner = divide_decreasing( + amount_to_distribute=1 - top_reward, + number_of_elements=num_reminaing_miners, + ) + for index, r in zip(sorted_remaining_miners.keys(), rewards_per_miner): + rewards[index] = r + else: + for index in remaining_miners.keys(): + rewards[index] = 1 - top_reward + + return BatchRewardOutput(rewards=rewards, extra_info=info) + + +class SyntheticFoldingReward(FoldingReward): + """Synthetic Folding reward class""" + + def __init__(self, **kwargs): + super().__init__(**kwargs) + + def name(self) -> str: + return "synthetic_folding_reward" + + async def calculate_final_reward(self, rewards: torch.Tensor) -> torch.Tensor: + """ + Calculate the final reward for the job. + + Args: + rewards (torch.Tensor): tensor of rewards, floats. + job (Job) + + Returns: + torch.Tensor: tensor of rewards, floats. + """ + # priority_multiplier = 1 + (job.priority - 1) * 0.1 TODO: Implement priority + priority_multiplier = 1.0 + organic_multiplier = 1.0 + + return rewards * priority_multiplier * organic_multiplier + + +class OrganicFoldingReward(FoldingReward): + """Organic Folding reward class""" + + def __init__(self, **kwargs): + super().__init__(**kwargs) + + def name(self) -> str: + return "organic_folding_reward" + + async def calculate_final_reward(self, rewards: torch.Tensor) -> torch.Tensor: + """ + Calculate the final reward for the job. + + Args: + rewards (torch.Tensor): tensor of rewards, floats. + job (Job) + + Returns: + torch.Tensor: tensor of rewards, floats. + """ + # priority_multiplier = 1 + (job.priority - 1) * 0.1 TODO: Implement priority + priority_multiplier = 1.0 + organic_multiplier = 10.0 + + return rewards * priority_multiplier * organic_multiplier diff --git a/folding/rewards/reward_pipeline.py b/folding/rewards/reward_pipeline.py deleted file mode 100644 index 4f7116f7..00000000 --- a/folding/rewards/reward_pipeline.py +++ /dev/null @@ -1,72 +0,0 @@ -import torch -import bittensor as bt -from folding.store import Job -from folding.rewards.linear_reward import divide_decreasing -from loguru import logger - - -async def reward_pipeline( - energies: torch.Tensor, rewards: torch.Tensor, top_reward: float, job: Job -): - """A reward pipeline that determines how to place rewards onto the miners sampled within the batch. - Currently applies a linearly decreasing reward on all miners that are not the current best / previously - best loss using the function "divide_decreasing". - - Args: - energies (torch.Tensor): tensor of returned energies - rewards (torch.Tensor): tensor of rewards, floats. - top_reward (float): upper bound reward. - job (Job) - """ - nonzero_energies = torch.nonzero(energies) - - # If the best hotkey is not in the set of hotkeys in the job, this means that the top miner has stopped replying. - if job.best_hotkey not in job.hotkeys: - logger.warning( - f"Best hotkey {job.best_hotkey} not in hotkeys {job.hotkeys}. Assigning no reward." - ) - return rewards # rewards of all 0s. - - best_index = job.hotkeys.index(job.best_hotkey) - - # There are cases where the top_miner stops replying. ensure to assign reward. - rewards[best_index] = top_reward - - # If no miners reply, we want *all* reward to go to the top miner. - if len(nonzero_energies) == 0: - rewards[best_index] = 1 - return rewards - - if (len(nonzero_energies) == 1) and (nonzero_energies[0] == best_index): - rewards[best_index] = 1 - return rewards - - # Find if there are any indicies that are the same as the best value - remaining_miners = {} - for index in nonzero_energies: - # There could be multiple max energies. - # The best energy could be the one that is saved in the store. - if (energies[index] == job.best_loss) or (index == best_index): - rewards[index] = top_reward - else: - remaining_miners[index] = energies[index] - - # The amount of reward that is distributed to the remaining miners MUST be less than the reward given to the top miners. - num_reminaing_miners = len(remaining_miners) - if num_reminaing_miners > 1: - sorted_remaining_miners = dict( - sorted(remaining_miners.items(), key=lambda item: item[1]) - ) # sort smallest to largest - - # Apply a fixed decrease in reward on the remaining non-zero miners. - rewards_per_miner = divide_decreasing( - amount_to_distribute=1 - top_reward, - number_of_elements=num_reminaing_miners, - ) - for index, r in zip(sorted_remaining_miners.keys(), rewards_per_miner): - rewards[index] = r - else: - for index in remaining_miners.keys(): - rewards[index] = 1 - top_reward - - return rewards diff --git a/folding/store.py b/folding/store.py index ddc9f1f5..c9822e30 100644 --- a/folding/store.py +++ b/folding/store.py @@ -151,6 +151,7 @@ def insert( hotkeys: List[str], epsilon: float, system_kwargs: dict, + reward_model: str, **kwargs, ): """Insert a new job into the database.""" @@ -173,6 +174,7 @@ def insert( updated_at=pd.Timestamp.now().floor("s"), epsilon=epsilon, system_kwargs=system_kwargs, + reward_model=reward_model, **kwargs, ) @@ -229,6 +231,7 @@ class Job: hotkeys: list created_at: pd.Timestamp updated_at: pd.Timestamp + reward_model: str active: bool = True best_loss: float = np.inf best_loss_at: pd.Timestamp = pd.NaT diff --git a/neurons/validator.py b/neurons/validator.py index 44e5428d..0a58e185 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -30,13 +30,14 @@ from async_timeout import timeout from folding.utils.uids import get_random_uids -from folding.rewards.reward_pipeline import reward_pipeline +from folding.rewards.folding_reward import SyntheticFoldingReward, OrganicFoldingReward from folding.validators.forward import create_new_challenge, run_step, run_ping_step from folding.validators.protein import Protein # import base validator class which takes care of most of the boilerplate from folding.store import Job, SQLiteJobStore from folding.base.validator import BaseValidatorNeuron +from folding.base.reward import BatchRewardInput, RewardEvent from folding.utils.logging import log_event from loguru import logger @@ -214,9 +215,12 @@ async def add_job(self, job_event: dict[str, Any], uids: List[int] = None) -> bo job_event["uid_search_time"] = time.time() - start_time selected_hotkeys = [self.metagraph.hotkeys[uid] for uid in valid_uids] + reward_model = SyntheticFoldingReward.__qualname__ + if len(valid_uids) >= self.config.neuron.sample_size: # If the job is organic, we still need to run the setup simulation to create the files needed for the job. if job_event.get("is_organic"): + reward_model = OrganicFoldingReward.__qualname__ self.config.protein.input_source = job_event["source"] protein = Protein(**job_event, config=self.config.protein) @@ -247,6 +251,7 @@ async def add_job(self, job_event: dict[str, Any], uids: List[int] = None) -> bo hotkeys=selected_hotkeys, epsilon=job_event["epsilon"], system_kwargs=job_event["system_kwargs"], + reward_model=reward_model, event=job_event, ) @@ -317,12 +322,11 @@ async def update_job(self, job: Job): logger.success("Non-zero energies received. Applying reward pipeline.") if apply_pipeline: - rewards: torch.Tensor = await reward_pipeline( - energies=energies, - rewards=rewards, - top_reward=top_reward, - job=job, + folding_reward = globals()[job.reward_model]() + reward_event: RewardEvent = await folding_reward.apply( + data=BatchRewardInput(energies=energies, top_reward=top_reward, job=job) ) + rewards: torch.Tensor = reward_event.rewards uids = self.get_uids(hotkeys=job.hotkeys) await self.update_scores( diff --git a/tests/test_rewards.py b/tests/test_rewards.py new file mode 100644 index 00000000..378be280 --- /dev/null +++ b/tests/test_rewards.py @@ -0,0 +1,141 @@ +import pytest +import torch +from folding.base.reward import BatchRewardInput, BatchRewardOutput +from folding.store import MockJob +from folding.rewards.folding_reward import FoldingReward + + +@pytest.fixture +def mock_job(): + mock_job = MockJob() + mock_job.best_hotkey = "hotkey1" + mock_job.hotkeys = ["hotkey1", "hotkey2", "hotkey3"] + mock_job.best_loss = 0.5 + mock_job.event = {"is_organic": False} + return mock_job + + +@pytest.fixture +def folding_reward(): + return FoldingReward() + + +@pytest.mark.asyncio +async def test_get_rewards_best_hotkey_not_in_set(folding_reward, mock_job): + # Modify job to have best_hotkey not in hotkeys + mock_job.best_hotkey = "non_existent_hotkey" + + data = BatchRewardInput( + energies=torch.tensor([0.1, 0.2, 0.3]), top_reward=0.8, job=mock_job + ) + rewards = torch.zeros(3) + + result = await folding_reward.get_rewards(data, rewards) + + assert torch.all(result.rewards == 0) + assert result.extra_info["name"] == folding_reward.name() + assert result.extra_info["top_reward"] == 0.8 + + +@pytest.mark.asyncio +async def test_get_rewards_no_replies(folding_reward, mock_job): + data = BatchRewardInput(energies=torch.zeros(3), top_reward=0.8, job=mock_job) + rewards = torch.zeros(3) + + result = await folding_reward.get_rewards(data, rewards) + + # Best hotkey (index 0) should get reward of 1, others 0 + expected = torch.tensor([1.0, 0.0, 0.0]) + assert torch.all(result.rewards == expected) + + +@pytest.mark.asyncio +async def test_get_rewards_only_best_replies(folding_reward, mock_job): + energies = torch.zeros(3) + energies[0] = 0.5 # Only best hotkey replies + + data = BatchRewardInput(energies=energies, top_reward=0.8, job=mock_job) + rewards = torch.zeros(3) + + result = await folding_reward.get_rewards(data, rewards) + + expected = torch.tensor([1.0, 0.0, 0.0]) + assert torch.all(result.rewards == expected) + + +@pytest.mark.asyncio +async def test_get_rewards_multiple_best_values(folding_reward, mock_job): + energies = torch.tensor([0.5, 0.5, 0.3]) # Two miners with best loss + + data = BatchRewardInput(energies=energies, top_reward=0.8, job=mock_job) + rewards = torch.zeros(3) + + result = await folding_reward.get_rewards(data, rewards) + + # Both miners with best loss should get top_reward + assert result.rewards[0] == 0.8 + assert result.rewards[1] == 0.8 + assert result.rewards[2] < 0.8 # Third miner should get lower reward + + +@pytest.mark.asyncio +async def test_get_rewards_distribution(folding_reward, mock_job): + energies = torch.tensor([0.5, 0.6, 0.7]) # Different energy values + + data = BatchRewardInput(energies=energies, top_reward=0.8, job=mock_job) + rewards = torch.zeros(3) + + result = await folding_reward.get_rewards(data, rewards) + + # Check that rewards are properly distributed + assert result.rewards[0] == 0.8 # Best miner gets top_reward + assert result.rewards[1] > result.rewards[2] # Better performers get higher rewards + assert torch.all(result.rewards >= 0) # No negative rewards + assert torch.all(result.rewards <= 1) # No rewards above 1 + + +@pytest.mark.asyncio +async def test_calculate_final_reward_organic(folding_reward): + rewards = torch.tensor([0.8, 0.5, 0.3]) + + job = MockJob() + job.best_hotkey = "hotkey1" + job.hotkeys = ["hotkey1", "hotkey2", "hotkey3"] + job.best_loss = 0.5 + job.event = {"is_organic": True} + result = await folding_reward.calculate_final_reward(rewards, job) + + # Organic multiplier should be 10.0 + expected = rewards * 10.0 + assert torch.all(result == expected) + + +@pytest.mark.asyncio +async def test_calculate_final_reward_non_organic(folding_reward): + rewards = torch.tensor([0.8, 0.5, 0.3]) + job = MockJob() + job.best_hotkey = "hotkey1" + job.hotkeys = ["hotkey1", "hotkey2", "hotkey3"] + job.best_loss = 0.5 + job.event = {"is_organic": False} + + result = await folding_reward.calculate_final_reward(rewards, job) + + # Non-organic multiplier should be 1.0 + expected = rewards * 1.0 + assert torch.all(result == expected) + + +@pytest.mark.asyncio +async def test_full_reward_pipeline(folding_reward, mock_job): + data = BatchRewardInput( + energies=torch.tensor([0.5, 0.6, 0.7]), top_reward=0.8, job=mock_job + ) + + result = await folding_reward.apply(data) + + assert isinstance(result.rewards, torch.Tensor) + assert result.reward_name == folding_reward.name() + assert isinstance(result.batch_time, float) + assert result.batch_time > 0 + assert result.extra_info is not None