From 8ac6ec7a4dfc4665a0fc274eafdeea8000be1288 Mon Sep 17 00:00:00 2001 From: ARCJ137442 <61109168+ARCJ137442@users.noreply.github.com> Date: Thu, 25 Jul 2024 14:20:34 +0800 Subject: [PATCH 1/3] refactor(event_buffer): A trial to add type hints for `EventBuffer.py` and `Utils.py` --- pynars/NARS/DataStructures/MC/EventBuffer.py | 108 ++++++++++++------- pynars/NARS/DataStructures/MC/Utils.py | 41 ++++--- 2 files changed, 96 insertions(+), 53 deletions(-) diff --git a/pynars/NARS/DataStructures/MC/EventBuffer.py b/pynars/NARS/DataStructures/MC/EventBuffer.py index b32d9a9..4562c41 100644 --- a/pynars/NARS/DataStructures/MC/EventBuffer.py +++ b/pynars/NARS/DataStructures/MC/EventBuffer.py @@ -4,12 +4,18 @@ from pynars.NARS.DataStructures.MC import Utils from pynars.NARS.DataStructures.MC.OutputBuffer import Reaction from pynars.NARS.DataStructures.MC.Utils import PriorityQueue, BufferTask, satisfaction_level, preprocessing +from pynars.NARS.DataStructures import Memory from pynars.Narsese import Compound, Judgement, Task, Interval, parser, Term, Truth, Copula, Statement +from typing import Iterable, List, Optional, Tuple class Anticipation: - def __init__(self, task, prediction): + matched: bool + task: Task + prediction: 'PredictiveImplication' + + def __init__(self, task: Task, prediction: 'PredictiveImplication') -> None: self.matched = False self.task = task self.prediction = prediction @@ -17,7 +23,14 @@ def __init__(self, task, prediction): class PredictiveImplication: - def __init__(self, condition, interval, conclusion, task): + condition: Term + interval: Interval + conclusion: Term + to_memory_cooldown: int + expiration: int + task: Task + + def __init__(self, condition: Term, interval: Interval, conclusion: Term, task: Task) -> None: self.condition = condition """ As explained the conceptual design, "+1, +2" cannot be used in buffers, thus the interval is only kept in the @@ -34,7 +47,7 @@ def __init__(self, condition, interval, conclusion, task): self.expiration = 0 self.task = task - def get_conclusion(self, condition_task): + def get_conclusion(self, condition_task: 'BufferTask') -> Tuple[Optional[Interval], Optional[Task]]: # when "A" is matched with "A =/> B", return B with truth deduction truth = Truth_deduction(self.task.truth, condition_task.task.truth) if truth.c < 0.3: @@ -50,26 +63,41 @@ class Slot: It contains 3 parts: 1) events observed, 2) anticipations made, 3) operations to do. """ - def __init__(self, num_events, num_anticipations, num_operations): + events: PriorityQueue[BufferTask] + anticipations: List[Anticipation] + num_anticipations: int + operations: list # TODO: operation types + num_operations: int + + def __init__(self, num_events: int, num_anticipations: int, num_operations: int) -> None: self.events = PriorityQueue(num_events) self.anticipations = [] self.num_anticipations = num_anticipations self.operations = [] self.num_operations = num_operations - def push(self, item, value): + def push(self, item: BufferTask, value: float): self.events.push(item, value) - def pop(self): + def pop(self) -> Tuple[BufferTask, float]: return self.events.pop() - def random_pop(self): + def random_pop(self) -> Optional[BufferTask]: return self.events.random_pop() class EventBuffer: - def __init__(self, num_slot, num_events, num_anticipations, num_operations, num_predictive_implications, N=1): + num_events: int + num_anticipations: int + num_operations: int + slots: List[Slot] + curr: int + predictive_implications: PriorityQueue[PredictiveImplication] + reactions: PriorityQueue[Reaction] + N: int + + def __init__(self, num_slot: int, num_events: int, num_anticipations: int, num_operations: int, num_predictive_implications: int, N=1) -> None: # num slot is the number of slots on one side. If num_slot is 2, there are 1 (in the middle) + 2*2=5 slots self.num_events = num_events self.num_anticipations = num_anticipations @@ -80,13 +108,13 @@ def __init__(self, num_slot, num_events, num_anticipations, num_operations, num_ self.reactions = PriorityQueue(num_predictive_implications * 5) self.N = N - def push(self, tasks, memory): + def push(self, tasks: Iterable[Task], memory: Memory) -> None: for task in tasks: buffer_task = BufferTask(task) buffer_task.preprocess_effect = Utils.preprocessing(task, memory) self.slots[self.curr].push(buffer_task, buffer_task.priority) - def pop(self): + def pop(self) -> List[Task]: ret = [] for _ in range(self.N): if len(self.slots[self.curr].events) != 0: @@ -95,7 +123,7 @@ def pop(self): return ret @staticmethod - def contemporary_composition(events): + def contemporary_composition(events: List[Task]) -> Task: # according to the conceptual design, currently only 2-compounds are allowed, # though in the future, we may have compounds with many components, @@ -120,13 +148,13 @@ def contemporary_composition(events): budget = Budget_merge(budget, each.budget) # sentence - sentence = Judgement(term, stamp, truth) + sentence = Judgement(term, stamp, truth) # ! Argument 1 to "Judgement" has incompatible type "Type[Compound]"; expected "Term" (Mypy arg-type) # task return Task(sentence, budget) @staticmethod - def sequential_composition(event_1, interval, event_2): + def sequential_composition(event_1: Task, interval: Interval, event_2: Task) -> Task: # according to the conceptual design, we currently only have "event_1, interval, event_2" schema, # though in the future this may also change, but it is too early to decide here @@ -145,7 +173,7 @@ def sequential_composition(event_1, interval, event_2): return Task(sentence, budget) @staticmethod - def generate_prediction_util(event_1, interval, event_2): + def generate_prediction_util(event_1: Task, interval: Interval, event_2: Task) -> PredictiveImplication: if interval != 0: copula = Copula.PredictiveImplication # =/> else: @@ -169,7 +197,7 @@ def generate_prediction_util(event_1, interval, event_2): # predictive implication return PredictiveImplication(event_1.term, interval, event_2.term, task) - def compound_composition(self, memory): + def compound_composition(self, memory: Memory) -> None: """ After the initial composition, pick the one with the highest priority in the current slot. Compose it with all other events in the current slot and the previous max events. @@ -184,7 +212,7 @@ def compound_composition(self, memory): curr_max.is_component = 1 curr_composition.append(self.contemporary_composition([curr_max.task, curr_remaining[-1].task])) - previous_max = [] + previous_max: List[Optional[BufferTask]] = [] previous_composition = [] for i in range(self.curr): if len(self.slots[i].events) != 0: @@ -192,7 +220,7 @@ def compound_composition(self, memory): previous_max.append(tmp) # don't change previous max's "is_component" curr_max.is_component = 1 - previous_composition.append(self.sequential_composition(previous_max[-1].task, + previous_composition.append(self.sequential_composition(previous_max[-1].task, # ? should change this `previous_max[-1]` to `tmp` to assume the non-none type? Interval(self.curr - i), curr_max.task)) else: previous_max.append(None) @@ -208,12 +236,12 @@ def compound_composition(self, memory): # add all compositions to the current slot self.push(curr_composition + previous_composition, memory) - def check_anticipation(self, memory): + def check_anticipation(self, memory: Memory) -> None: """ Check all anticipations, award or punish the corresponding predictive implications. If an anticipation does not even exist, apply the lowest satisfaction. """ - prediction_award_penalty = [] + prediction_award_penalty: List[Tuple[PredictiveImplication, float]] = [] checked_buffer_tasks = [] while len(self.slots[self.curr].events) != 0: buffer_task, _ = self.slots[self.curr].pop() @@ -224,19 +252,19 @@ def check_anticipation(self, memory): each_anticipation.matched = True buffer_task.task = revision(each_anticipation.task, buffer_task.task) satisfaction = 1 - satisfaction_level(each_anticipation.task.truth, buffer_task.task.truth) - prediction_award_penalty.append([each_anticipation.prediction, satisfaction]) + prediction_award_penalty.append((each_anticipation.prediction, satisfaction)) checked_buffer_tasks.append(buffer_task) # if there are some unmatched anticipations, apply the lowest satisfaction for each_anticipation in self.slots[self.curr].anticipations: if not each_anticipation.matched: - prediction_award_penalty.append([each_anticipation.prediction, 0]) + prediction_award_penalty.append((each_anticipation.prediction, 0)) print("prediction_award_penalty", prediction_award_penalty) # put all buffer tasks back, some evaluations may change - for each in checked_buffer_tasks: - self.slots[self.curr].push(each, each.priority) + for each_task in checked_buffer_tasks: # ! have to use different variable names to avoid type conflicts + self.slots[self.curr].push(each_task, each_task.priority) # update the predictive implications for each in prediction_award_penalty: @@ -244,7 +272,7 @@ def check_anticipation(self, memory): self.predictive_implications.edit(each[0], each[0].task.truth.e * preprocessing(each[0].task, memory), lambda x: x.task.term) - def predictive_implication_application(self, memory): + def predictive_implication_application(self, memory: Memory) -> None: """ Check all predictive implications, whether some of them can fire. If so, calculate the corresponding task of the conclusion and create it as an anticipation in the corresponding @@ -256,9 +284,9 @@ def predictive_implication_application(self, memory): implication, _ = self.predictive_implications.pop() applied = False for each_event in self.slots[self.curr].events.pq: - if implication.condition == each_event[1].task.term: + if implication.condition == each_event[1].task.term: # ! Unsupported operand types for == ("Term" and "Term") (Mypy operator) interval, conclusion = implication.get_conclusion(each_event[1]) - if interval is None: + if interval is None or conclusion is None: # ! if the interval isn't `None`, same to the conclusion break applied = True implication.expiration = max(0, implication.expiration - 1) @@ -273,7 +301,7 @@ def predictive_implication_application(self, memory): self.predictive_implications.push(each, each.task.truth.e * preprocessing(each.task, memory) * (1 / (1 + each.expiration))) - def to_memory_predictive_implication(self, memory, threshold_f=0.9, threshold_c=0.8, default_cooldown=100): + def to_memory_predictive_implication(self, memory: Memory, threshold_f: float=0.9, threshold_c: float=0.8, default_cooldown: int=100) -> List[Reaction]: # when a predictive implication reaches a relatively high truth value, it will be forwarded to the memory # (not the next level) # this does not mean it is removed from the predictive implication pq @@ -295,6 +323,7 @@ def to_memory_predictive_implication(self, memory, threshold_f=0.9, threshold_c= # I have to cheat. # ================================================================================================== + # ? for `each[1].task.term.predicate`, should assume the `each[1]`'s term is a statement? if each[1].task.term.predicate.word == "<{SELF}-->[good]>": if (each[1].task.term.subject.is_compound and each[1].task.term.subject.components[-1].word[0] @@ -325,7 +354,7 @@ def to_memory_predictive_implication(self, memory, threshold_f=0.9, threshold_c= return reactions # ============================================================================================================== - def local_evaluation(self, memory, threshold_f=0.8, threshold_c=0.9, default_cooldown=100): + def local_evaluation(self, memory: Memory, threshold_f: float=0.8, threshold_c: float=0.9, default_cooldown: int=100) -> List[Reaction]: self.check_anticipation(memory) self.predictive_implication_application(memory) # cheating @@ -337,7 +366,7 @@ def local_evaluation(self, memory, threshold_f=0.8, threshold_c=0.9, default_coo # original # self.to_memory_predictive_implication(memory, threshold_f, threshold_c, default_cooldown) - def memory_based_evaluation(self, memory): + def memory_based_evaluation(self, memory: Memory) -> None: evaluated_buffer_tasks = [] while len(self.slots[self.curr].events) != 0: buffer_task, _ = self.slots[self.curr].pop() @@ -348,12 +377,12 @@ def memory_based_evaluation(self, memory): self.slots[self.curr].push(each, each.priority) @staticmethod - def prediction_revision(existed_prediction, new_prediction): + def prediction_revision(existed_prediction: PredictiveImplication, new_prediction: PredictiveImplication) -> PredictiveImplication: existed_prediction.task = revision(existed_prediction.task, new_prediction.task) existed_prediction.expiration = max(0, existed_prediction.expiration - 1) return existed_prediction - def prediction_generation(self, max_events_per_slot, memory): + def prediction_generation(self, max_events_per_slot: int, memory: Memory) -> None: """ For each slot, randomly pop "max events per slot" buffer tasks to generate predictions. Currently, concurrent predictive implications (==>) are not supported. @@ -376,19 +405,19 @@ def prediction_generation(self, max_events_per_slot, memory): for each_curr_event in selected_buffer_tasks[-1]: for each_previous_event in selected_buffer_tasks[i]: if each_curr_event is not None and each_previous_event is not None: - tmp = self.generate_prediction_util(each_previous_event.task, Interval(self.curr - i), + tmp2 = self.generate_prediction_util(each_previous_event.task, Interval(self.curr - i), # ! for type stability, it may need to be renamed to `tmp2` each_curr_event.task) - # if tmp.task.truth.e * preprocessing(tmp.task, memory) <= 0.05: + # if tmp2.task.truth.e * preprocessing(tmp2.task, memory) <= 0.05: # continue existed = None for j in range(len(self.predictive_implications)): - if self.predictive_implications.pq[j][1].task.term == tmp.task.term: + if self.predictive_implications.pq[j][1].task.term == tmp2.task.term: existed = self.predictive_implications.pq.pop(j) break if existed is not None: - tmp = self.prediction_revision(existed[1], tmp) + tmp2 = self.prediction_revision(existed[1], tmp2) - self.predictive_implications.push(tmp, tmp.task.truth.e * preprocessing(tmp.task, memory)) + self.predictive_implications.push(tmp2, tmp2.task.truth.e * preprocessing(tmp2.task, memory)) # after the prediction generation, put the randomly selected buffer tasks back for i in range(self.curr + 1): @@ -396,11 +425,12 @@ def prediction_generation(self, max_events_per_slot, memory): if each is not None: self.slots[i].push(each, each.priority) - def slots_cycle(self): + def slots_cycle(self) -> None: self.slots = self.slots[1:] + [Slot(self.num_events, self.num_anticipations, self.num_operations)] - def buffer_cycle(self, tasks, memory, max_events_per_slot=5, threshold_f=0.8, threshold_c=0.9, - default_cooldown=10): + def buffer_cycle(self, tasks: Iterable[Task], memory: Memory, + max_events_per_slot: int=5, threshold_f: float=0.8, threshold_c: float=0.9, + default_cooldown: int=10) -> Tuple[List[Reaction], List[Task]]: # put all tasks to the current slot self.push(tasks, memory) self.compound_composition(memory) diff --git a/pynars/NARS/DataStructures/MC/Utils.py b/pynars/NARS/DataStructures/MC/Utils.py index eb50e25..2333555 100644 --- a/pynars/NARS/DataStructures/MC/Utils.py +++ b/pynars/NARS/DataStructures/MC/Utils.py @@ -1,22 +1,29 @@ import random from pynars.NAL.Functions import Or +from pynars.NARS.DataStructures import Memory +from pynars.Narsese import Task, Truth +from typing import Any, Callable, Generic, List, Optional, Tuple, TypeVar +T = TypeVar('T') -class PriorityQueue: +class PriorityQueue(Generic[T]): """ It is not a heap, it is a sorted array by insertion sort. Since we need to 1) access the largest item, 2) access the smallest item, 3) access an item in the middle. """ - def __init__(self, size): + pq: List[Tuple[float, T]] + size: int + + def __init__(self, size: int) -> None: self.pq = [] self.size = size - def __len__(self): + def __len__(self) -> int: return len(self.pq) - def push(self, item, value): + def push(self, item, value) -> None: """ Add a new one, regardless whether there are duplicates. """ @@ -31,7 +38,7 @@ def push(self, item, value): if len(self.pq) > self.size: self.pq = self.pq[1:] - def edit(self, item, value, identifier): + def edit(self, item: T, value: float, identifier: Callable[[T], Any]) -> None: """ Replacement. """ @@ -46,7 +53,7 @@ def edit(self, item, value, identifier): return self.push(item, value) - def pop(self): + def pop(self) -> Tuple[T, float]: """ Pop the highest. """ @@ -54,7 +61,7 @@ def pop(self): self.pq = self.pq[:-1] return item, value - def random_pop(self): + def random_pop(self) -> Optional[T]: """ Based on the priority (not budget.priority), randomly pop one buffer task. The higher the priority, the higher the probability to be popped. @@ -70,13 +77,13 @@ def random_pop(self): return ret[1] return None - def show(self, identifier): + def show(self, identifier: Callable[[T], str]) -> None: """ Show each item in the priority queue. Since it may contain items other than BufferTasks, you can design you own identifier to show what you want to show. """ for each in sorted(self.pq, key=lambda x: x[0]): - print(round(each[0], 3), "|", each[1].interval, "|", identifier(each[1])) + print(round(each[0], 3), "|", each[1].interval, "|", identifier(each[1])) # ? Why assume that the element in the queue has an attribute "interval"? print("---") @@ -87,7 +94,13 @@ class BufferTask: Therefore, we restore each factor independently; only when everything is decided, the final budget is given. """ - def __init__(self, task): + task: Task + channel_parameter: int + preprocess_effect: float + is_component: int + expiration_effect: int + + def __init__(self, task: Task) -> None: self.task = task # the original task # the influence of a channel, currently, all channels have parameter 1 and can't be changed self.channel_parameter = 1 @@ -96,7 +109,7 @@ def __init__(self, task): self.expiration_effect = 1 @property - def priority(self): + def priority(self) -> float: """ The priority of a BufferTask, which is not the budget of the corresponding Task. """ @@ -104,19 +117,19 @@ def priority(self): ((2 - self.is_component) / 2)) -def preprocessing(task, memory): +def preprocessing(task: Task, memory: Memory) -> float: """ Check whether a task is already a concept in the memory. If not, its budget is greatly decreased (based on its complexity). Else, its budget is the OR of the existed budget. """ - concept_in_memory = memory.take_by_key(task.term, False) + concept_in_memory = memory.take_by_key(task.term, False) # ? Why does `take_by_key` use `task` index instead of `task.term` using at this if concept_in_memory is None: return 1 / (1 + task.term.complexity) else: return Or(task.budget.priority, concept_in_memory.budget.priority) -def satisfaction_level(truth_1, truth_2): +def satisfaction_level(truth_1: Truth, truth_2: Truth) -> float: """ Mainly used for check whether an anticipation is satisfied. """ From ce2b709bf9634996422e1f416c826671dd91fd5e Mon Sep 17 00:00:00 2001 From: ARCJ137442 <61109168+ARCJ137442@users.noreply.github.com> Date: Thu, 25 Jul 2024 14:24:58 +0800 Subject: [PATCH 2/3] refactor: Adjust the order of class `Anticipation` to make it easier for Python to load type annotations Reason: the class `Anticipation` requires `PredictiveImplication` as a type dependency --- pynars/NARS/DataStructures/MC/EventBuffer.py | 24 ++++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/pynars/NARS/DataStructures/MC/EventBuffer.py b/pynars/NARS/DataStructures/MC/EventBuffer.py index 4562c41..33213b5 100644 --- a/pynars/NARS/DataStructures/MC/EventBuffer.py +++ b/pynars/NARS/DataStructures/MC/EventBuffer.py @@ -9,18 +9,6 @@ from typing import Iterable, List, Optional, Tuple -class Anticipation: - - matched: bool - task: Task - prediction: 'PredictiveImplication' - - def __init__(self, task: Task, prediction: 'PredictiveImplication') -> None: - self.matched = False - self.task = task - self.prediction = prediction - - class PredictiveImplication: condition: Term @@ -58,6 +46,18 @@ def get_conclusion(self, condition_task: 'BufferTask') -> Tuple[Optional[Interva return self.interval, task +class Anticipation: + + matched: bool + task: Task + prediction: PredictiveImplication + + def __init__(self, task: Task, prediction: PredictiveImplication) -> None: + self.matched = False + self.task = task + self.prediction = prediction + + class Slot: """ It contains 3 parts: 1) events observed, 2) anticipations made, 3) operations to do. From 7a28ad83550b2d28f860f4362994c0a2d686298b Mon Sep 17 00:00:00 2001 From: ARCJ137442 <61109168+ARCJ137442@users.noreply.github.com> Date: Thu, 25 Jul 2024 14:41:33 +0800 Subject: [PATCH 3/3] refactor(event_buffer): remove the unnecessary quotes around a type that is already loaded from context --- pynars/NARS/DataStructures/MC/EventBuffer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pynars/NARS/DataStructures/MC/EventBuffer.py b/pynars/NARS/DataStructures/MC/EventBuffer.py index 33213b5..d320c10 100644 --- a/pynars/NARS/DataStructures/MC/EventBuffer.py +++ b/pynars/NARS/DataStructures/MC/EventBuffer.py @@ -35,7 +35,7 @@ def __init__(self, condition: Term, interval: Interval, conclusion: Term, task: self.expiration = 0 self.task = task - def get_conclusion(self, condition_task: 'BufferTask') -> Tuple[Optional[Interval], Optional[Task]]: + def get_conclusion(self, condition_task: BufferTask) -> Tuple[Optional[Interval], Optional[Task]]: # when "A" is matched with "A =/> B", return B with truth deduction truth = Truth_deduction(self.task.truth, condition_task.task.truth) if truth.c < 0.3: