diff --git a/Pong/PongChannel.py b/Pong/PongChannel.py
new file mode 100644
index 0000000..b752195
--- /dev/null
+++ b/Pong/PongChannel.py
@@ -0,0 +1,90 @@
+import random
+import socket
+
+from pynars.NARS import Reasoner
+from pynars.NARS.DataStructures.MC.SensorimotorChannel import SensorimotorChannel
+from pynars.Narsese import parser
+
+"""
+The sensorimotor channel for the Pong game.
+It is placed here because I believe that every channel needs to be designed by the user.
+"""
+
+nars_address = ("127.0.0.1", 54321)
+game_address = ("127.0.0.1", 12345)
+
+
+class PongChannel(SensorimotorChannel):
+
+ def __init__(self, ID, num_slot, num_events, num_anticipations, num_operations, num_predictive_implications,
+ num_reactions, N=1):
+ super().__init__(ID, num_slot, num_events, num_anticipations, num_operations, num_predictive_implications,
+ num_reactions, N)
+ """
+ Babbling is designed to be very simple. If the current channel cannot generate an operation based on the
+ reaction, then there is a certain probability that an operation will be generated through babbling. It is worth
+ noting that currently a channel can only execute one operation in a cycle. Of course, in the future, this
+ restriction can be lifted after the mutually exclusive relationship between operations is specified.
+
+ Babbling can be performed a limited number of times and cannot be replenished.
+ """
+ self.num_babbling = 200
+ self.babbling_chance = 0.5
+
+ def information_gathering(self):
+ """
+ Receive a string from the game and parse it into a task.
+ """
+ sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ sock.bind(nars_address)
+ data, _ = sock.recvfrom(1024)
+ status = data.decode()
+ if status != "GAME FAILED":
+ try:
+ return [parser.parse(each) for each in status.split("|")]
+ except: # for unexpected game input error
+ print(status)
+ exit()
+ else:
+ return []
+
+ def babbling(self):
+ """
+ Based on the probability and remaining counts.
+ """
+ if random.random() < self.babbling_chance and self.num_babbling > 0:
+ self.num_babbling -= 1
+ return random.choice(list(self.operations.keys()))
+
+
+def execute_MLeft():
+ """
+ All channels need to register for its own operations. It is recommended to list them in the channel created.
+ """
+ sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ sock.sendto("^left".encode(), game_address)
+
+
+def execute_MRight():
+ sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ sock.sendto("^right".encode(), game_address)
+
+
+def execute_Hold():
+ sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ sock.sendto("^hold".encode(), game_address)
+
+
+if __name__ == "__main__":
+ """
+ Open the game first and run this script to see all predictions generated.
+ "+1, +2" will not be in the Narsese.
+ """
+ r = Reasoner(100, 100)
+ pc = PongChannel("Pong", 2, 5, 50, 5, 50, 50, 1)
+ pc.register_operation("^left", execute_MLeft, ["^left", "left"])
+ pc.register_operation("^right", execute_MRight, ["^right", "right"])
+ pc.register_operation("^hold", execute_Hold, ["^hold", "mid"])
+ for _ in range(1000):
+ pc.channel_cycle(r.memory)
+ pc.input_buffer.predictive_implications.show(lambda x: x.task.sentence)
diff --git a/Pong/game.py b/Pong/game.py
new file mode 100644
index 0000000..990c272
--- /dev/null
+++ b/Pong/game.py
@@ -0,0 +1,164 @@
+import random
+import socket
+import sys
+from threading import Thread
+
+import pygame
+
+# initialize pygame
+pygame.init()
+
+# game canvas
+screen_width = 600
+screen_height = 400
+screen = pygame.display.set_mode((screen_width, screen_height))
+
+"""
+For NARS, the game is run independently. It is very likely that in the game, the ball runs one frame, but NARS has read
+this frame dozens of times. Adjust the overall movement speed to solve this problem.
+"""
+ball_speed_augment = 1
+
+# default variables
+ball_speed_x = random.choice([1, -1]) * random.randint(2 * ball_speed_augment, 4 * ball_speed_augment)
+ball_speed_y = -random.randint(2 * ball_speed_augment, 4 * ball_speed_augment)
+paddle_speed = 0 # paddle initial speed, 0 := not moving
+paddle_width = 100
+paddle_height = 20
+ball_radius = 10
+ball_pos = [screen_width // 2, screen_height // 2]
+paddle_pos = [screen_width // 2 - paddle_width // 2, screen_height - paddle_height]
+font = pygame.font.Font(None, 36)
+
+# game score, for display
+survive_time = 0
+
+"""
+The socket is used to establish communication between NARS and the game, and as expected, NARS will also use the exact
+same method to print content to the UI.
+"""
+sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+game_address = ("localhost", 12345)
+control_address = ("localhost", 54321)
+
+
+def reset_game():
+ global ball_pos, ball_speed_x, ball_speed_y, paddle_pos, paddle_speed
+ ball_pos = [screen_width // 2, screen_height // 2]
+ ball_speed_x = random.choice([1, -1]) * random.randint(2 * ball_speed_augment, 4 * ball_speed_augment)
+ ball_speed_y = -random.randint(2 * ball_speed_augment, 4 * ball_speed_augment)
+ paddle_pos = [screen_width // 2 - paddle_width // 2, screen_height - paddle_height]
+ paddle_speed = 0
+
+
+def send_status(message=None):
+ """
+ In this game, information is sent to NARS every frame, describing: 1) the position of the ball relative to the
+ paddle, and 2) whether the current situation is satisfactory.
+
+ Each message is just a string.
+ Messages are separated by "|".
+ """
+ if message is None:
+
+ if ball_pos[0] < paddle_pos[0]:
+ msg_1 = "<{left} --> [on]>. %1;0.9%"
+ # sock.sendto(msg.encode(), control_address)
+ msg_2 = "<{SELF} --> [good]>. %" + str(
+ 1 - (paddle_pos[0] - ball_pos[0]) / (screen_width - paddle_width)) + ";0.9%"
+ sock.sendto((msg_1 + "|" + msg_2).encode(), control_address)
+ elif ball_pos[0] > paddle_pos[0] + paddle_width:
+ msg_1 = "<{right} --> [on]>. %1;0.9%"
+ # sock.sendto(msg.encode(), control_address)
+ msg_2 = "<{SELF} --> [good]>. %" + str(
+ 1 - (ball_pos[0] - (paddle_pos[0] + paddle_width)) / (screen_width - paddle_width)) + ";0.9%"
+ sock.sendto((msg_1 + "|" + msg_2).encode(), control_address)
+ else:
+ msg = "<{SELF} --> [good]>. %1;0.9%"
+ sock.sendto(msg.encode(), control_address)
+
+
+def game_loop():
+ global ball_pos, ball_speed_x, ball_speed_y, paddle_pos, paddle_speed, survive_time, survive_time_curve
+
+ running = True
+ clock = pygame.time.Clock()
+
+ while running:
+
+ survive_time += 1
+
+ for event in pygame.event.get():
+ if event.type == pygame.QUIT:
+ running = False
+
+ if not running:
+ break
+
+ # update ball pos
+ ball_pos[0] += ball_speed_x
+ ball_pos[1] += ball_speed_y
+
+ # boundary collision check
+ if ball_pos[0] <= ball_radius or ball_pos[0] >= screen_width - ball_radius:
+ ball_speed_x = -ball_speed_x
+ if ball_pos[1] <= ball_radius:
+ ball_speed_y = -ball_speed_y
+
+ # paddle collision check
+ if ball_pos[1] + ball_radius >= paddle_pos[1] and paddle_pos[0] < ball_pos[0] < paddle_pos[0] + paddle_width:
+ ball_speed_y = -ball_speed_y
+
+ # game failed
+ if ball_pos[1] >= screen_height:
+ send_status("GAME FAILED")
+ survive_time = 0
+ reset_game() # restart
+
+ # update paddle
+ paddle_pos[0] += paddle_speed
+
+ if paddle_pos[0] < 0:
+ paddle_pos[0] = 0
+ if paddle_pos[0] > screen_width - paddle_width:
+ paddle_pos[0] = screen_width - paddle_width
+
+ screen.fill((0, 0, 0))
+ pygame.draw.rect(screen, (255, 255, 255),
+ pygame.Rect(paddle_pos[0], paddle_pos[1], paddle_width, paddle_height))
+ pygame.draw.circle(screen, (255, 255, 255), ball_pos, ball_radius)
+ # show the survived time
+ score_text = font.render(f"Score: {survive_time}", True, (255, 255, 255))
+ screen.blit(score_text, (10, 10)) # in the top left corner
+ pygame.display.flip()
+
+ send_status()
+ clock.tick(60)
+
+ pygame.quit()
+ sys.exit()
+
+
+def receive_commands():
+ global paddle_speed
+ sock.bind(game_address)
+
+ while True:
+ data, _ = sock.recvfrom(1024)
+ command = data.decode()
+ print(f"command received: {command}")
+
+ if command == "^left":
+ paddle_speed = -5 * ball_speed_augment
+ elif command == "^right":
+ paddle_speed = 5 * ball_speed_augment
+ elif command == "^stop":
+ paddle_speed = 0
+
+
+if __name__ == "__main__":
+ t = Thread(target=receive_commands)
+ t.daemon = True
+ t.start()
+
+ game_loop()
diff --git a/pynars/NARS/Control/Reasoner.py b/pynars/NARS/Control/Reasoner.py
index 1a9174c..4e84e3a 100644
--- a/pynars/NARS/Control/Reasoner.py
+++ b/pynars/NARS/Control/Reasoner.py
@@ -141,6 +141,12 @@ def cycle(self):
if random_number < self.u_top_level_attention:
judgement_revised, goal_revised, answers_question, answers_quest = self.observe(
tasks_derived)
+
+ if answers_question is not None and len(answers_question) != 0:
+ for each in answers_question:
+ if each.term.word == " S>":
+ print(1)
+
data_structure_accessed_busyness = self.overall_experience.busyness
else:
self.consider(tasks_derived)
diff --git a/pynars/NARS/DataStructures/MC/AnticipationMC.py b/pynars/NARS/DataStructures/MC/AnticipationMC.py
deleted file mode 100644
index 07be33d..0000000
--- a/pynars/NARS/DataStructures/MC/AnticipationMC.py
+++ /dev/null
@@ -1,33 +0,0 @@
-from pynars.Narsese import Task, parser
-from pynars.NAL.Inference.LocalRules import revision
-from pynars.NARS.DataStructures.MC import InputBufferMC
-
-
-class AnticipationMC:
- """
- Anticipation is the anticipation made by some predictions, e.g., (A, A=/>B |- B)
- It contains two parts:
- 1) the body of the anticipation, which is a task,
- 2) the prediction makes this anticipation, which is used for updating when the first part is satisfied/unsatisfied
- """
-
- def __init__(self, t: Task, parent_prediction: Task):
- self.t = t
- self.parent_prediction = parent_prediction
- # check whether an anticipation is examined
- self.solved = False
-
- def satisfied(self, buffer: InputBufferMC, event: Task):
- revised_t = revision(self.t, event) # revision if satisfied
-
- tmp_prediction = parser.parse(self.parent_prediction.sentence.word + "%1.0; 0.5%") # one positive case
- revised_prediction = revision(self.parent_prediction, tmp_prediction)
-
- buffer.update_prediction(revised_prediction)
-
- return revised_t
-
- def unsatisfied(self, buffer: InputBufferMC):
- tmp_prediction = parser.parse(self.parent_prediction.sentence.word + "%0.0; 0.5%")
- revised_prediction = revision(self.parent_prediction, tmp_prediction)
- buffer.update_prediction(revised_prediction)
diff --git a/pynars/NARS/DataStructures/MC/Buffer.py b/pynars/NARS/DataStructures/MC/Buffer.py
new file mode 100644
index 0000000..d12e8a6
--- /dev/null
+++ b/pynars/NARS/DataStructures/MC/Buffer.py
@@ -0,0 +1,45 @@
+from pynars.NARS.DataStructures.MC import Utils
+from pynars.NARS.DataStructures.MC.Utils import BufferTask, PriorityQueue
+
+
+class Buffer:
+
+ def __init__(self, size, N=1, D=0.9):
+ """
+ Buffer, which is not event buffer, currently only used in the Narsese channel.
+
+ pq: a queue of buffer_tasks (in Utils.py)
+ size: the length of the buffer
+ N: pop top-N tasks in the pq
+ D: decay rate for the remaining tasks in the buffer
+ """
+ self.pq = PriorityQueue(size)
+ self.N = N
+ self.D = D
+
+ def select(self):
+ """
+ Select the top-N BufferTasks from the buffer, decay the remaining.
+ """
+ ret = []
+ push_back = []
+ for i in range(len(self.pq)):
+ if i <= self.N:
+ buffer_task, _ = self.pq.pop()
+ ret.append(buffer_task)
+ else:
+ buffer_task, _ = self.pq.pop()
+ buffer_task.expiration_effect *= self.D
+ push_back.append(buffer_task)
+ for each in push_back:
+ self.pq.push(each, each.priority)
+ return [each.task for each in ret]
+
+ def add(self, tasks, memory):
+ """
+ Convert input tasks (Task) into BufferTasks.
+ """
+ for each in tasks:
+ buffer_task = BufferTask(each)
+ buffer_task.preprocess_effect = Utils.preprocessing(each, memory)
+ self.pq.push(buffer_task, buffer_task.priority)
diff --git a/pynars/NARS/DataStructures/MC/ChannelMC.py b/pynars/NARS/DataStructures/MC/ChannelMC.py
deleted file mode 100644
index 06819fd..0000000
--- a/pynars/NARS/DataStructures/MC/ChannelMC.py
+++ /dev/null
@@ -1,27 +0,0 @@
-import tkinter
-from abc import abstractmethod
-from pynars.Narsese import Term
-from pynars.NARS.DataStructures.MC.EventBufferMC import EventBufferMC
-
-
-class ChannelMC:
-
- def __init__(self, num_slot, num_event, num_anticipation, num_operation, num_prediction, memory,
- root_UI: tkinter.Tk, UI_name: str):
- self.ID = UI_name
- self.operations = []
- self.event_buffer = EventBufferMC(num_slot, num_event, num_anticipation, num_operation, num_prediction, memory,
- root_UI, UI_name)
-
- @abstractmethod
- def execute(self, term: Term):
- pass
-
- @abstractmethod
- def information_gathering(self):
- return None
-
- def step(self):
- new_contents = self.information_gathering()
- task_forward = self.event_buffer.step(new_contents, "SC2")
- return task_forward
diff --git a/pynars/NARS/DataStructures/MC/Console.py b/pynars/NARS/DataStructures/MC/Console.py
new file mode 100644
index 0000000..a8c531e
--- /dev/null
+++ b/pynars/NARS/DataStructures/MC/Console.py
@@ -0,0 +1,158 @@
+from Pong.PongChannel import PongChannel, execute_MLeft, execute_MRight, execute_Hold
+from pynars.NARS import Reasoner
+from pynars.NARS.DataStructures.MC.EventBuffer import EventBuffer
+from pynars.NARS.DataStructures.MC.NarseseChannel import NarseseChannel
+from pynars.NARS.DataStructures.MC.OutputBuffer import OutputBuffer
+from pynars.Narsese import Task
+
+NarseseChannel_size = 100
+Buffer_D = 0.9
+
+
+class Console:
+
+ def __init__(self, num_slot, num_events, num_anticipations, num_operations, num_predictive_implications, N=1):
+ self.narsese_channel = NarseseChannel(NarseseChannel_size, N, Buffer_D)
+ # internal/global buffers are event buffers
+ self.internal_buffer = EventBuffer(num_slot, num_events, num_anticipations, num_operations,
+ num_predictive_implications, N)
+ self.global_buffer = EventBuffer(num_slot, num_events, num_anticipations, num_operations,
+ num_predictive_implications, N)
+ self.output_buffer = OutputBuffer(self.narsese_channel)
+ self.for_globalBuffer = []
+
+ self.sensorimotor_channels = {}
+
+ def register_sensorimotor_channels(self, name, channel):
+ self.sensorimotor_channels[name] = channel
+
+ def unregister_sensorimotor_channels(self, name):
+ self.sensorimotor_channels.pop(name)
+
+ def cycle(self, text, reasoner):
+ # get the task(s) from the Narsese input channel
+ tmp = self.narsese_channel.channel_cycle(text, reasoner.memory)
+ self.for_globalBuffer += tmp
+ # collect the other tasks from the other channels
+ for each in self.sensorimotor_channels:
+ tmp = self.sensorimotor_channels[each].channel_cycle(reasoner.memory)
+
+ # print("from sensorimotor channel", [each_tmp.sentence for each_tmp in tmp])
+ # print("---")
+ # for each_pq in self.sensorimotor_channels[each].input_buffer.slots[
+ # self.sensorimotor_channels[each].input_buffer.curr].events.pq:
+ # print("compounds", each_pq[0], each_pq[1].task.sentence)
+ # print("---")
+ # for each_prediction in self.sensorimotor_channels[each].input_buffer.predictive_implications.pq:
+ # print("predictions", each_prediction[0], each_prediction[1].task.sentence)
+ # print("---")
+ # for each_prediction in self.sensorimotor_channels[each].input_buffer.slots[
+ # self.sensorimotor_channels[each].input_buffer.curr].anticipations:
+ # print("anticipations", each_prediction)
+ # print("---")
+
+ self.for_globalBuffer += tmp
+
+ # add all above tasks to the global buffer,
+ # get the selected (and potentially composed) tasks and send them to the memory
+
+ # cheating
+ # since something is wrong the reasoner, such that it cannot generate sub-goals correctly, I have to cheat.
+ # ==============================================================================================================
+ _, for_memory = self.global_buffer.buffer_cycle(self.for_globalBuffer, reasoner.memory)
+ # ==============================================================================================================
+ # original
+ # for_memory = self.global_buffer.buffer_cycle(self.for_globalBuffer, reasoner.memory)
+ # ==============================================================================================================
+
+ try:
+ reasoner.memory.accept(*for_memory) # unexpected errors in the reasoner ¯\_(ツ)_/¯
+ except:
+ pass
+
+ self.for_globalBuffer = []
+
+ # put all selected tasks to the memory and select one concept for reasoning
+ try:
+ tmp = reasoner.cycle() # unexpected errors in the reasoner ¯\_(ツ)_/¯
+ except:
+ return
+
+ if tmp is not None:
+ tasks_derived, judgement_revised, goal_revised, answers_question, answers_quest, _ = tmp
+ else:
+ tasks_derived, judgement_revised, goal_revised, answers_question, answers_quest = [], [], [], [], []
+
+ tasks_all = []
+ if tasks_derived is not None:
+ if isinstance(tasks_derived, Task):
+ tasks_all.append(tasks_derived)
+ elif len(tasks_derived) != 0:
+ tasks_all.extend(tasks_derived)
+ if judgement_revised is not None:
+ if isinstance(judgement_revised, Task):
+ tasks_all.append(judgement_revised)
+ elif len(judgement_revised) != 0:
+ tasks_all.extend(judgement_revised)
+ if goal_revised is not None:
+ if isinstance(goal_revised, Task):
+ tasks_all.append(goal_revised)
+ elif len(goal_revised) != 0:
+ tasks_all.extend(goal_revised)
+ if answers_question is not None:
+ if isinstance(answers_question, Task):
+ tasks_all.append(answers_question)
+ elif len(answers_question) != 0:
+ tasks_all.extend(answers_question)
+ if answers_quest is not None:
+ if isinstance(answers_quest, Task):
+ tasks_all.append(answers_quest)
+ elif len(answers_quest) != 0:
+ tasks_all.extend(answers_quest)
+ tmp = []
+ for each in tasks_all:
+ if not each.term.has_var and each.budget.priority > 0.5: # ¯\_(ツ)_/¯, the system prefers to generate (--, )
+ tmp.append(each)
+ tasks_all = tmp
+
+ self.output_buffer.buffer_cycle(tasks_all, self.sensorimotor_channels)
+
+ # put all generated new tasks (or revised tasks) to the internal buffer,
+ # for those answered questions, to-do operations, they are not sent to the corresponding output buffer,
+ # e.g., the output buffer in the Narsese Channel, or those in the sensorimotor channels.
+ for_internalBuffer = tasks_all
+
+ # send selected results to the internal buffer, and the selected tasks will be forward to self.for_globalBuffer,
+ # for the next cycle
+
+ # cheating
+ # since something is wrong the reasoner, such that it cannot generate sub-goals correctly, I have to cheat.
+ # ==============================================================================================================
+ _, tmp = self.internal_buffer.buffer_cycle(for_internalBuffer, reasoner.memory)
+ # ==============================================================================================================
+ # original
+ # tmp = self.internal_buffer.buffer_cycle(for_internalBuffer, reasoner.memory)
+ # ==============================================================================================================
+
+ if tmp is not None:
+ self.for_globalBuffer = tmp
+
+
+if __name__ == "__main__":
+ r = Reasoner(100, 100)
+ pc = PongChannel("Pong", 1, 50, 50, 5, 50, 50, 2)
+ pc.register_operation("^left", execute_MLeft, ["^left", "left"])
+ pc.register_operation("^right", execute_MRight, ["^right", "right"])
+ pc.register_operation("^hold", execute_Hold, ["^hold", "mid"])
+
+ c = Console(2, 50, 50, 5, 50, 2)
+ c.register_sensorimotor_channels(pc.ID, pc)
+
+ for i in range(10000):
+ if i % 25 == 0:
+ c.cycle("<{SELF} --> [good]>!", r)
+ # pc.input_buffer.predictive_implications.show(lambda x: x.task.sentence)
+ for each in pc.reactions.pq:
+ print("reactions", each)
+ else:
+ c.cycle("1", r)
diff --git a/pynars/NARS/DataStructures/MC/EventBuffer.py b/pynars/NARS/DataStructures/MC/EventBuffer.py
new file mode 100644
index 0000000..b32d9a9
--- /dev/null
+++ b/pynars/NARS/DataStructures/MC/EventBuffer.py
@@ -0,0 +1,425 @@
+from pynars.NAL.Functions import Stamp_merge, Budget_merge, Truth_deduction
+from pynars.NAL.Inference.LocalRules import revision
+from pynars.NARS import Reasoner
+from pynars.NARS.DataStructures.MC import Utils
+from pynars.NARS.DataStructures.MC.OutputBuffer import Reaction
+from pynars.NARS.DataStructures.MC.Utils import PriorityQueue, BufferTask, satisfaction_level, preprocessing
+from pynars.Narsese import Compound, Judgement, Task, Interval, parser, Term, Truth, Copula, Statement
+
+
+class Anticipation:
+
+ def __init__(self, task, prediction):
+ self.matched = False
+ self.task = task
+ self.prediction = prediction
+
+
+class PredictiveImplication:
+
+ def __init__(self, condition, interval, conclusion, task):
+ self.condition = condition
+ """
+ As explained the conceptual design, "+1, +2" cannot be used in buffers, thus the interval is only kept in the
+ prediction. Which might cause a trouble, but you may use +1, +2 as terms if you want.
+ I will let you know how to do it the referred function.
+ """
+ self.interval = interval
+ self.conclusion = conclusion
+ self.to_memory_cooldown = 0
+ """
+ The expiration of predictions are different from expirations in buffers. It is a non-negative integer.
+ It means how many cycles this prediction has not been used.
+ """
+ self.expiration = 0
+ self.task = task
+
+ def get_conclusion(self, condition_task):
+ # when "A" is matched with "A =/> B", return B with truth deduction
+ truth = Truth_deduction(self.task.truth, condition_task.task.truth)
+ if truth.c < 0.3:
+ return None, None
+
+ task = parser.parse(self.conclusion.word + ". " + str(truth))
+
+ return self.interval, task
+
+
+class Slot:
+ """
+ It contains 3 parts: 1) events observed, 2) anticipations made, 3) operations to do.
+ """
+
+ def __init__(self, num_events, num_anticipations, num_operations):
+ self.events = PriorityQueue(num_events)
+ self.anticipations = []
+ self.num_anticipations = num_anticipations
+ self.operations = []
+ self.num_operations = num_operations
+
+ def push(self, item, value):
+ self.events.push(item, value)
+
+ def pop(self):
+ return self.events.pop()
+
+ def random_pop(self):
+ return self.events.random_pop()
+
+
+class EventBuffer:
+
+ def __init__(self, num_slot, num_events, num_anticipations, num_operations, num_predictive_implications, N=1):
+ # num slot is the number of slots on one side. If num_slot is 2, there are 1 (in the middle) + 2*2=5 slots
+ self.num_events = num_events
+ self.num_anticipations = num_anticipations
+ self.num_operations = num_operations
+ self.slots = [Slot(num_events, num_anticipations, num_operations) for _ in range(1 + 2 * num_slot)]
+ self.curr = num_slot
+ self.predictive_implications = PriorityQueue(num_predictive_implications)
+ self.reactions = PriorityQueue(num_predictive_implications * 5)
+ self.N = N
+
+ def push(self, tasks, memory):
+ for task in tasks:
+ buffer_task = BufferTask(task)
+ buffer_task.preprocess_effect = Utils.preprocessing(task, memory)
+ self.slots[self.curr].push(buffer_task, buffer_task.priority)
+
+ def pop(self):
+ ret = []
+ for _ in range(self.N):
+ if len(self.slots[self.curr].events) != 0:
+ buffer_task, _ = self.slots[self.curr].pop()
+ ret.append(buffer_task.task)
+ return ret
+
+ @staticmethod
+ def contemporary_composition(events):
+ # according to the conceptual design, currently only 2-compounds are allowed,
+ # though in the future, we may have compounds with many components,
+
+ # term
+ each_compound_term = [each.term for each in events]
+ term = Compound.ParallelEvents(*each_compound_term)
+
+ # truth, using the truth with the lowest expectation
+ truth = events[0].truth
+ for each in events[1:]:
+ if each.truth.e < truth.e:
+ truth = each.truth
+
+ # stamp, using stamp-merge function
+ stamp = events[0].stamp
+ for each in events[1:]:
+ stamp = Stamp_merge(stamp, each.stamp)
+
+ # budget, using budget-merge function
+ budget = events[0].budget
+ for each in events[1:]:
+ budget = Budget_merge(budget, each.budget)
+
+ # sentence
+ sentence = Judgement(term, stamp, truth)
+
+ # task
+ return Task(sentence, budget)
+
+ @staticmethod
+ def sequential_composition(event_1, interval, event_2):
+ # according to the conceptual design, we currently only have "event_1, interval, event_2" schema,
+ # though in the future this may also change, but it is too early to decide here
+
+ term = Compound.SequentialEvents(*[event_1.term, interval, event_2.term])
+ # in some cases, the interval needs not to be displayer in Narsese
+ # term = Compound.SequentialEvents(*[event_1.term, event_2.term])
+ truth = event_2.truth
+ # decrease the confidence of a compound based on the length of the interval
+ # truth.c *= 1 / int(interval)
+ # truth.c *= 0.7 + 0.3 * (0.9 - (0.9 / (self.curr * 5)) * int(interval))
+ stamp = Stamp_merge(event_2.stamp, event_1.stamp)
+ budget = Budget_merge(event_2.budget, event_1.budget)
+ # sentence
+ sentence = Judgement(term, stamp, truth)
+ # task
+ return Task(sentence, budget)
+
+ @staticmethod
+ def generate_prediction_util(event_1, interval, event_2):
+ if interval != 0:
+ copula = Copula.PredictiveImplication # =/>
+ else:
+ # currently, this is only allowed in the global buffer,
+ # but only for events from different resources
+ copula = Copula.ConcurrentImplication # ==>
+ """
+ If you want to include "interval" as a term, you just need to change "term" on the next line.
+ """
+ term = Statement(event_1.term, copula, event_2.term)
+ # truth, a default truth, with only one positive example
+ truth = Truth(1, 0.9, 1)
+ # stamp, using event_2's stamp
+ stamp = event_2.stamp
+ # budget, using budget-merge function
+ budget = Budget_merge(event_1.budget, event_2.budget)
+ # sentence
+ sentence = Judgement(term, stamp, truth)
+ # task
+ task = Task(sentence, budget)
+ # predictive implication
+ return PredictiveImplication(event_1.term, interval, event_2.term, task)
+
+ def compound_composition(self, memory):
+ """
+ After the initial composition, pick the one with the highest priority in the current slot.
+ Compose it with all other events in the current slot and the previous max events.
+ """
+ if len(self.slots[self.curr].events) != 0:
+ curr_max, _ = self.slots[self.curr].pop()
+ curr_remaining = []
+ curr_composition = []
+ while len(self.slots[self.curr].events) != 0:
+ curr_remaining.append(self.slots[self.curr].pop()[0])
+ curr_remaining[-1].is_component = 1
+ curr_max.is_component = 1
+ curr_composition.append(self.contemporary_composition([curr_max.task, curr_remaining[-1].task]))
+
+ previous_max = []
+ previous_composition = []
+ for i in range(self.curr):
+ if len(self.slots[i].events) != 0:
+ tmp, _ = self.slots[i].pop()
+ previous_max.append(tmp)
+ # don't change previous max's "is_component"
+ curr_max.is_component = 1
+ previous_composition.append(self.sequential_composition(previous_max[-1].task,
+ Interval(self.curr - i), curr_max.task))
+ else:
+ previous_max.append(None)
+
+ # after get all compositions, put everything back
+ for i in range(self.curr):
+ if previous_max[i] is not None:
+ self.slots[i].push(previous_max[i], previous_max[i].priority)
+ self.slots[self.curr].push(curr_max, curr_max.priority)
+ for each in curr_remaining:
+ self.slots[self.curr].push(each, each.priority)
+
+ # add all compositions to the current slot
+ self.push(curr_composition + previous_composition, memory)
+
+ def check_anticipation(self, memory):
+ """
+ Check all anticipations, award or punish the corresponding predictive implications.
+ If an anticipation does not even exist, apply the lowest satisfaction.
+ """
+ prediction_award_penalty = []
+ checked_buffer_tasks = []
+ while len(self.slots[self.curr].events) != 0:
+ buffer_task, _ = self.slots[self.curr].pop()
+ for each_anticipation in self.slots[self.curr].anticipations:
+ # it is possible for an event satisfying multiple anticipations,
+ # e.g., A, +1 =/> B, A =/> B
+ if each_anticipation.task.term.word == buffer_task.task.term.word:
+ each_anticipation.matched = True
+ buffer_task.task = revision(each_anticipation.task, buffer_task.task)
+ satisfaction = 1 - satisfaction_level(each_anticipation.task.truth, buffer_task.task.truth)
+ prediction_award_penalty.append([each_anticipation.prediction, satisfaction])
+ checked_buffer_tasks.append(buffer_task)
+
+ # if there are some unmatched anticipations, apply the lowest satisfaction
+ for each_anticipation in self.slots[self.curr].anticipations:
+ if not each_anticipation.matched:
+ prediction_award_penalty.append([each_anticipation.prediction, 0])
+
+ print("prediction_award_penalty", prediction_award_penalty)
+
+ # put all buffer tasks back, some evaluations may change
+ for each in checked_buffer_tasks:
+ self.slots[self.curr].push(each, each.priority)
+
+ # update the predictive implications
+ for each in prediction_award_penalty:
+ each[0].task = revision(each[0].task, parser.parse(each[0].task.term.word + ". %" + str(each[1]) + ";0.9%"))
+ self.predictive_implications.edit(each[0], each[0].task.truth.e * preprocessing(each[0].task, memory),
+ lambda x: x.task.term)
+
+ def predictive_implication_application(self, memory):
+ """
+ Check all predictive implications, whether some of them can fire.
+ If so, calculate the corresponding task of the conclusion and create it as an anticipation in the corresponding
+ slot in the future.
+ If some implications cannot fire, increase the expiration of them.
+ """
+ implications = []
+ while len(self.predictive_implications) != 0:
+ implication, _ = self.predictive_implications.pop()
+ applied = False
+ for each_event in self.slots[self.curr].events.pq:
+ if implication.condition == each_event[1].task.term:
+ interval, conclusion = implication.get_conclusion(each_event[1])
+ if interval is None:
+ break
+ applied = True
+ implication.expiration = max(0, implication.expiration - 1)
+ anticipation = Anticipation(conclusion, implication)
+ self.slots[self.curr + int(interval)].anticipations.append(anticipation)
+ implications.append(implication)
+ if not applied:
+ implication.expiration += 1
+ implications.append(implication)
+
+ for each in implications:
+ self.predictive_implications.push(each, each.task.truth.e * preprocessing(each.task, memory) *
+ (1 / (1 + each.expiration)))
+
+ def to_memory_predictive_implication(self, memory, threshold_f=0.9, threshold_c=0.8, default_cooldown=100):
+ # when a predictive implication reaches a relatively high truth value, it will be forwarded to the memory
+ # (not the next level)
+ # this does not mean it is removed from the predictive implication pq
+
+ # cheating
+ # since something is wrong the reasoner, such that it cannot generate sub-goals correctly, I have to cheat.
+ # ==============================================================================================================
+ reactions = []
+ # ==============================================================================================================
+
+ for each in self.predictive_implications.pq:
+ if each[1].task.truth.f >= threshold_f and each[1].task.truth.c >= threshold_c:
+ if each[1].to_memory_cooldown <= 0:
+ memory.accept(each[1].task)
+ # print("accepted task", each[1].task.sentence)
+
+ # cheating
+ # since something is wrong the reasoner, such that it cannot generate sub-goals correctly,
+ # I have to cheat.
+ # ==================================================================================================
+
+ if each[1].task.term.predicate.word == "<{SELF}-->[good]>":
+
+ if (each[1].task.term.subject.is_compound and each[1].task.term.subject.components[-1].word[0]
+ == "^"):
+ condition = None
+ operation = None
+
+ if each[1].task.term.subject.connector == "Connector.ParallelEvents":
+ condition = Compound.ParallelEvents(*each[1].task.term.subject.components[:-1])
+ operation = each[1].task.term.subject.components[-1]
+ elif each[1].task.term.subject.connector == "Connector.ParallelEvents":
+ condition = Compound.SequentialEvents(*each[1].task.term.subject.components[:-1])
+ operation = each[1].task.term.subject.components[-1]
+
+ if condition is not None and operation is not None:
+ reaction = Reaction(condition, operation, None)
+ reactions.append(reaction)
+
+ # ==================================================================================================
+
+ each[1].to_memory_cooldown = default_cooldown
+ else:
+ each[1].to_memory_cooldown -= 1
+
+ # cheating
+ # since something is wrong the reasoner, such that it cannot generate sub-goals correctly, I have to cheat.
+ # ==============================================================================================================
+ return reactions
+ # ==============================================================================================================
+
+ def local_evaluation(self, memory, threshold_f=0.8, threshold_c=0.9, default_cooldown=100):
+ self.check_anticipation(memory)
+ self.predictive_implication_application(memory)
+ # cheating
+ # since something is wrong the reasoner, such that it cannot generate sub-goals correctly, I have to cheat.
+ # ==============================================================================================================
+ reactions = self.to_memory_predictive_implication(memory, threshold_f, threshold_c, default_cooldown)
+ return reactions
+ # ==============================================================================================================
+ # original
+ # self.to_memory_predictive_implication(memory, threshold_f, threshold_c, default_cooldown)
+
+ def memory_based_evaluation(self, memory):
+ evaluated_buffer_tasks = []
+ while len(self.slots[self.curr].events) != 0:
+ buffer_task, _ = self.slots[self.curr].pop()
+ buffer_task.preprocess_effect = preprocessing(buffer_task.task, memory)
+ evaluated_buffer_tasks.append(buffer_task)
+ # put all buffer tasks back
+ for each in evaluated_buffer_tasks:
+ self.slots[self.curr].push(each, each.priority)
+
+ @staticmethod
+ def prediction_revision(existed_prediction, new_prediction):
+ existed_prediction.task = revision(existed_prediction.task, new_prediction.task)
+ existed_prediction.expiration = max(0, existed_prediction.expiration - 1)
+ return existed_prediction
+
+ def prediction_generation(self, max_events_per_slot, memory):
+ """
+ For each slot, randomly pop "max events per slot" buffer tasks to generate predictions.
+ Currently, concurrent predictive implications (==>) are not supported.
+ """
+ # get all events needed for prediction generation
+ selected_buffer_tasks = []
+ for i in range(self.curr + 1):
+ tmp = []
+ for _ in range(max_events_per_slot):
+ tmp.append(self.slots[i].random_pop())
+ selected_buffer_tasks.append(tmp)
+
+ for i, each_selected_buffer_tasks in enumerate(selected_buffer_tasks):
+ print("selected_buffer_tasks", i,
+ [each_event.task if each_event is not None else "None" for each_event in each_selected_buffer_tasks])
+ print("===")
+
+ # generate predictions based on intervals (=/>)
+ for i in range(self.curr):
+ for each_curr_event in selected_buffer_tasks[-1]:
+ for each_previous_event in selected_buffer_tasks[i]:
+ if each_curr_event is not None and each_previous_event is not None:
+ tmp = self.generate_prediction_util(each_previous_event.task, Interval(self.curr - i),
+ each_curr_event.task)
+ # if tmp.task.truth.e * preprocessing(tmp.task, memory) <= 0.05:
+ # continue
+ existed = None
+ for j in range(len(self.predictive_implications)):
+ if self.predictive_implications.pq[j][1].task.term == tmp.task.term:
+ existed = self.predictive_implications.pq.pop(j)
+ break
+ if existed is not None:
+ tmp = self.prediction_revision(existed[1], tmp)
+
+ self.predictive_implications.push(tmp, tmp.task.truth.e * preprocessing(tmp.task, memory))
+
+ # after the prediction generation, put the randomly selected buffer tasks back
+ for i in range(self.curr + 1):
+ for each in selected_buffer_tasks[i]:
+ if each is not None:
+ self.slots[i].push(each, each.priority)
+
+ def slots_cycle(self):
+ self.slots = self.slots[1:] + [Slot(self.num_events, self.num_anticipations, self.num_operations)]
+
+ def buffer_cycle(self, tasks, memory, max_events_per_slot=5, threshold_f=0.8, threshold_c=0.9,
+ default_cooldown=10):
+ # put all tasks to the current slot
+ self.push(tasks, memory)
+ self.compound_composition(memory)
+ # cheating
+ # since something is wrong the reasoner, such that it cannot generate sub-goals correctly, I have to cheat.
+ # ==============================================================================================================
+ reactions = self.local_evaluation(memory, threshold_f, threshold_c, default_cooldown)
+ # ==============================================================================================================
+ # original
+ # self.local_evaluation(memory, threshold_f, threshold_c, default_cooldown)
+ # ==============================================================================================================
+ self.memory_based_evaluation(memory)
+ self.prediction_generation(max_events_per_slot, memory)
+ ret = self.pop()
+ self.slots_cycle()
+ # cheating
+ # since something is wrong the reasoner, such that it cannot generate sub-goals correctly, I have to cheat.
+ # ==============================================================================================================
+ return reactions, ret
+ # ==============================================================================================================
+ # original
+ # return ret
diff --git a/pynars/NARS/DataStructures/MC/EventBufferMC.py b/pynars/NARS/DataStructures/MC/EventBufferMC.py
deleted file mode 100644
index 8c1f904..0000000
--- a/pynars/NARS/DataStructures/MC/EventBufferMC.py
+++ /dev/null
@@ -1,48 +0,0 @@
-import numpy as np
-
-from pynars.NARS.DataStructures import Memory
-from pynars.NARS.DataStructures.MC.InputBufferMC import InputBufferMC
-from pynars.NARS.DataStructures.MC.SlotMC import SlotMC
-
-
-class EventBufferMC(InputBufferMC):
-
- def __init__(self, num_slot, num_event, num_anticipation, num_operation, num_prediction,
- memory: Memory, root_UI, UI_name):
- super(EventBufferMC, self).__init__(num_slot, num_event, num_anticipation, num_operation, num_prediction,
- memory, root_UI, UI_name)
-
- def operation_processing_default(self):
- """
- A default processing of operations. "By default" means that an operation is used as an event, and they are
- combined used in compound generation.
-
- And the operation is not used specially in prediction generation as "procedural knowledge".
- """
- self.slots[self.present].events = np.append(self.slots[self.present].events,
- self.slots[self.present - 1].operations)
-
- def step(self, new_content, origin = ""):
- """
- Event buffer can get at most one new content each time, and so there are no "concurrent compound generations"
- in definition. But this will change if "default operation processing" is considered.
- """
- # remove the oldest slot and create a new one
- self.slots = self.slots[1:]
- self.slots.append(SlotMC(self.num_event, self.num_anticipation, self.num_operation))
-
- self.operation_processing_default() # default operation processing
- self.concurrent_compound_generation(new_content, origin) # 1st step
- self.historical_compound_generation(origin) # 1st step
- self.local_evaluation() # 2nd step
- self.memory_based_evaluations() # 3rd step
- task_forward = self.prediction_generation() # 4th step
-
- # GUI
- # ==============================================================================================================
- self.UI_roll()
- self.UI_content_update()
- self.UI_show()
- # ==============================================================================================================
-
- return task_forward
diff --git a/pynars/NARS/DataStructures/MC/GlobalBufferMC.py b/pynars/NARS/DataStructures/MC/GlobalBufferMC.py
deleted file mode 100644
index de4b2ea..0000000
--- a/pynars/NARS/DataStructures/MC/GlobalBufferMC.py
+++ /dev/null
@@ -1,100 +0,0 @@
-from pynars.NAL.Functions import Truth_induction, Stamp_merge, Budget_merge
-from pynars.NARS.DataStructures import Memory
-from pynars.NARS.DataStructures.MC.InputBufferMC import InputBufferMC
-from pynars.NARS.DataStructures.MC.SlotMC import SlotMC
-from pynars.Narsese import Judgement, Task, Copula, Statement, Compound, Interval
-
-
-class GlobalBufferMC(InputBufferMC):
- """
- The global buffer is able to generate concurrent implications (=|>).
- Currently, this is the only difference.
- """
-
- def __init__(self, num_slot, num_event, num_anticipation, num_operation, num_prediction,
- memory: Memory, root_UI, UI_name):
- """
- Though the global buffer has an input variable "num_operation", but a global buffer will never process any
- operations, so this variable should always be 0.
- """
- super(GlobalBufferMC, self).__init__(num_slot, num_event, num_anticipation, num_operation, num_prediction,
- memory, root_UI, UI_name)
-
- def prediction_generation(self):
- """
- In the global buffer, not only =/> implications are generated. But also =|> implications are generated.
- """
- # =/>
- if self.slots[self.present].candidate:
- predicate = self.slots[self.present].candidate.term
- for i in range(self.present):
- if self.slots[i].candidate:
- # e.g., (E, +1) as the subject
- subject = Compound.SequentialEvents(self.slots[i].candidate.term, Interval(abs(self.present - i)))
- copula = Copula.PredictiveImplication # =/>
- term = Statement(subject, copula, predicate)
- # truth, using truth-induction function (TODO, may subject to change)
- truth = Truth_induction(self.slots[i].candidate.truth,
- self.slots[self.present].candidate.truth)
- # stamp, using stamp-merge function (TODO, may subject to change)
- stamp = Stamp_merge(self.slots[i].candidate.stamp,
- self.slots[self.present].candidate.stamp)
- # budget, using budget-merge function (TODO, may subject to change)
- budget = Budget_merge(self.slots[i].candidate.budget,
- self.slots[self.present].candidate.budget)
- # sentence composition
- sentence = Judgement(term, stamp, truth)
- # task generation
- prediction = Task(sentence, budget)
- self.update_prediction(prediction)
- # =|>
- if self.slots[self.present].candidate:
- # from concurrent events
- predicate = self.slots[self.present].candidate.term
- for i in range(len(self.slots[self.present].events)):
- if self.slots[self.present].events[-1][1].term.equal(self.slots[self.present].candidate.term):
- continue
- subject = self.slots[self.present].events[i][1].term
- copula = Copula.ConcurrentImplication # =|>
- term = Statement(subject, copula, predicate)
- # truth, using truth-induction function (TODO, may subject to change)
- truth = Truth_induction(self.slots[self.present].events[i][1].truth,
- self.slots[self.present].candidate.truth)
- # stamp, using stamp-merge function (TODO, may subject to change)
- stamp = Stamp_merge(self.slots[self.present].events[i][1].stamp,
- self.slots[self.present].candidate.stamp)
- # budget, using budget-merge function (TODO, may subject to change)
- budget = Budget_merge(self.slots[self.present].events[i][1].budget,
- self.slots[self.present].candidate.budget)
- # sentence composition
- sentence = Judgement(term, stamp, truth)
- # task generation
- prediction = Task(sentence, budget)
- self.update_prediction(prediction)
-
- return self.slots[self.present].candidate
-
- def step(self, new_contents, origin = ""):
- """
- Internal buffer and global buffer can have multiple inputs at the same time. And so they have contemporary and
- historical compound generations successively. But the input of the historical compound generation will be the
- highest concurrent input.
- """
- # remove the oldest slot and create a new one
- self.slots = self.slots[1:]
- self.slots.append(SlotMC(self.num_event, self.num_anticipation, self.num_operation))
-
- self.concurrent_compound_generation(new_contents, origin) # 1st step
- self.historical_compound_generation(origin) # 1st step
- self.local_evaluation() # 2nd step
- self.memory_based_evaluations() # 3rd step
- task_forward = self.prediction_generation() # 4th step
-
- # GUI
- # ==============================================================================================================
- self.UI_roll()
- self.UI_content_update()
- self.UI_show()
- # ==============================================================================================================
-
- return task_forward
diff --git a/pynars/NARS/DataStructures/MC/InputBufferMC.py b/pynars/NARS/DataStructures/MC/InputBufferMC.py
deleted file mode 100644
index 44ebcd4..0000000
--- a/pynars/NARS/DataStructures/MC/InputBufferMC.py
+++ /dev/null
@@ -1,455 +0,0 @@
-import ctypes
-import tkinter as tk
-import tkinter.font as tkFont
-from copy import deepcopy
-from tkinter import ttk
-from tkinter.scrolledtext import ScrolledText
-
-import numpy as np
-
-from pynars.NAL.Functions import Stamp_merge, Budget_merge, Truth_induction, Truth_deduction
-from pynars.NARS.DataStructures import Memory
-from pynars.NARS.DataStructures.MC.AnticipationMC import AnticipationMC
-from pynars.NARS.DataStructures.MC.SlotMC import SlotMC
-from pynars.Narsese import Compound, Task, Judgement, Interval, Statement, Copula
-
-
-# the priority value of predictions (predictive implications)
-def p_value(t: Task):
- return t.budget.summary * t.truth.e / t.term.complexity ** 2
-
-
-def UI_better_content(task: Task):
- """
- A function to help generate UI output.
- Make it not just plain texts.
- Since each buffer (event buffer, internal buffer, global buffer) will have an independent UI page.
- """
- budget = "$" + str(task.budget.priority)[:4] + ";" + str(task.budget.durability)[:4] + ";" + str(
- task.budget.quality)[:4] + "$ | "
- word = "".join(task.sentence.word.split(" ")) + "\n"
- end = "=" * 41 + "\n"
- word.replace("-->", "->")
- word.replace("==>", "=>")
- if task.truth is not None:
- truth = "% " + str(task.truth.f)[:4] + ";" + str(task.truth.c)[:4] + "%\n"
- return [budget + truth, word, end]
- else:
- return [budget + "\n", word, end]
-
-
-class InputBufferMC(object):
- """
- The super class of all input buffers (event buffer, internal buffer, global buffer).
- """
-
- def __init__(self, num_slot, num_event, num_anticipation, num_operation, num_prediction, memory: Memory, root_UI,
- UI_name):
- self.num_slot = num_slot * 2 + 1
- self.present = num_slot
-
- self.num_event = num_event
- self.num_anticipation = num_anticipation
- self.num_operation = num_operation
- self.num_prediction = num_prediction
-
- self.memory = memory
- self.prediction_table = []
- self.slots = [SlotMC(num_event, num_anticipation, num_operation) for _ in range(self.num_slot)]
-
- # GUI
- # ==============================================================================================================
- ctypes.windll.shcore.SetProcessDpiAwareness(1) # auto-resolution rate
- SF = ctypes.windll.shcore.GetScaleFactorForDevice(0)
-
- self.top = tk.Toplevel(root_UI, width=160, height=50) # top canvas created
- self.top.title(UI_name)
- self.top.tk.call("tk", "scaling", SF / 75)
-
- self.notebook = ttk.Notebook(self.top) # notebook created
- self.notebook.pack(pady=10, padx=10, expand=True)
- # each time slot has one page on the notebook
- self.P = {} # reference of all notebook pages
- self.contents_UI = [] # reference of the content of each notebook page
- for i in range(self.num_slot):
- P_i = ttk.Frame(self.notebook, width=160, height=50) # notebook page created
- self.contents_UI.append({"historical_compound": [],
- "concurrent_compound": [],
- "anticipation": [],
- "prediction": []})
- P_i.pack(fill="both", expand=True)
- self.notebook.add(P_i, text="Slot [" + str(i - self.present) + "] ")
-
- # frames of each part on the page
- F_1 = tk.LabelFrame(P_i, width=41, height=50, text="Historical Compound") # frame created on the page
- F_2 = tk.LabelFrame(P_i, width=41, height=50, text="Concurrent Compound")
- F_3 = tk.LabelFrame(P_i, width=41, height=50, text="Anticipation")
- F_4 = tk.LabelFrame(P_i, width=41, height=50, text="Prediction")
- F_1.pack(side=tk.LEFT)
- F_2.pack(side=tk.LEFT)
- F_3.pack(side=tk.LEFT)
- F_4.pack(side=tk.LEFT)
- Font = tkFont.Font(family="monaco", size=8)
- T_1 = ScrolledText(F_1, width=41, height=50, font=Font) # scrolled text created on the frame
- T_2 = ScrolledText(F_2, width=41, height=50, font=Font)
- T_3 = ScrolledText(F_3, width=41, height=50, font=Font)
- T_4 = ScrolledText(F_4, width=41, height=50, font=Font)
- T_1.pack(side=tk.RIGHT)
- T_2.pack(side=tk.RIGHT)
- T_3.pack(side=tk.RIGHT)
- T_4.pack(side=tk.RIGHT)
- T_1.insert(tk.END, "=" * 18 + "READY" + "=" * 18) # initialization reminder
- T_2.insert(tk.END, "=" * 18 + "READY" + "=" * 18)
- T_3.insert(tk.END, "=" * 18 + "READY" + "=" * 18)
- T_4.insert(tk.END, "=" * 18 + "READY" + "=" * 18)
- T_1.configure(state="disabled") # disable user input (just for display)
- T_2.configure(state="disabled")
- T_3.configure(state="disabled")
- T_4.configure(state="disabled")
- self.P.update({P_i: [T_1, T_2, T_3, T_4]})
- # ==============================================================================================================
-
- def update_prediction(self, p: Task):
- for i in range(len(self.prediction_table)): # delete if existed
- if self.prediction_table[i].term == p.term:
- del self.prediction_table[i]
- break
- P = p_value(p)
- added = False
- # large to small
- for i in range(len(self.prediction_table)):
- if P > p_value(self.prediction_table[i]):
- self.prediction_table = self.prediction_table[:i] + [p] + self.prediction_table[i:]
- added = True
- break
- if not added: # smallest
- self.prediction_table = self.prediction_table + [p]
- if len(self.prediction_table) > self.num_prediction:
- self.prediction_table = self.prediction_table[:-1]
-
- def combination(self, lst, start, num, tmp, cpds):
- """
- Compound utility function.
- """
- if num == 0:
- cpds.append(deepcopy(tmp))
- return
- elif len(lst) - start < num:
- return
- else:
- tmp.append(lst[start])
- self.combination(lst, start + 1, num - 1, tmp, cpds)
- self.combination(lst[:-1], start + 1, num, tmp, cpds)
-
- def concurrent_compound_generation(self, new_contents, origin = ""):
- """
- Each buffer will have a compound generation process, and this process is exactly the same. Though in some
- buffers, a part of the process is skipped due to blank inputs.
-
- For example, in event buffers, usually this step will be skipped since there are only one event at each time.
- """
- if new_contents is None:
- return
- for new_content in new_contents:
- self.slots[self.present].update_events(new_content)
- # check atomic anticipations
- self.slots[self.present].check_anticipation(self)
-
- # concurrent compounds generation
- compounds = []
- for i in range(len(self.slots[self.present].events)):
- self.combination(self.slots[self.present].events[:, 1], 0, i + 1, [], compounds)
- for each_compound in compounds:
- if len(each_compound) > 1:
- # term generation
- each_compound_term = [each.term for each in each_compound]
- term = Compound.ParallelEvents(*each_compound_term)
- # truth, using truth-induction function (TODO, may subject to change)
- truth = each_compound[0].truth
- for each in each_compound[1:]:
- truth = Truth_induction(truth, each.truth)
- # stamp, using stamp-merge function (TODO, may subject to change)
- stamp = each_compound[0].stamp
- for each in each_compound[1:]:
- stamp = Stamp_merge(stamp, each.stamp)
- # budget, using budget-merge function (TODO, may subject to change)
- budget = each_compound[0].budget
- for each in each_compound[1:]:
- budget = Budget_merge(budget, each.budget)
- # sentence composition
- sentence = Judgement(term, stamp, truth)
- # task generation
- task = Task(sentence, budget)
- self.slots[self.present].update_events(task)
- else:
- self.slots[self.present].update_events(each_compound[0])
- # check concurrent compound anticipations
- self.slots[self.present].check_anticipation(self)
-
- def historical_compound_generation(self, origin = ""):
- """
- Previously, this is achieved by a DP-like process, but currently it is achieved by exhaustion.
-
- It happens among all the present concurrent compound and all "previous candidates", like finding a sub-string.
- Note that one current event may not be included.
- """
- if self.slots[self.present].events is None:
- return
- for i in range(len(self.slots[self.present].events)):
- # there might be "None" in tmp_list
- tmp_list = [self.slots[i].candidate for i in range(self.present)] + [self.slots[self.present].events[i][1]]
- for j in range(1, 2 ** (self.present + 1)): # enumeration, actually this is a process finding sub-strings
- # a binary coding is used to find the sub-string
- j_binary_str = list(("{:0" + str(self.present + 1) + "b}").format(j))
- j_binary_boolean = [False if each == "0" else True for each in j_binary_str]
- cpd = []
- for k, each in enumerate(j_binary_boolean):
- if not each:
- cpd.append(1)
- elif tmp_list[k] is not None:
- cpd.append(tmp_list[k])
- else:
- cpd.append(1)
- # for example
- # tmp_list: [None, None, A, None, None, B, C]
- # i_binary_boolean: [False, False, True, True, True, True, False]
- # cpd: [1, 1, A, 1, 1, B, 1], remove the 1's at the beginning and ending
- while True:
- if len(cpd) != 0 and cpd[0] == 1:
- cpd = cpd[1:]
- else:
- break
- # cpd: [A, 1, 1, B, 1], or []
- if len(cpd) != 0:
- while True:
- if cpd[-1] == 1:
- cpd = cpd[:-1]
- else:
- break
- # cpd: [A, 1, 1, B], cpd is a list of tasks, merge adjacent intervals next
- cpd_term = []
- if len(cpd) != 0:
- interval = 0
- for k, each in enumerate(cpd):
- if each != 1:
- if interval != 0:
- cpd_term.append(Interval(interval))
- interval = 0
- cpd_term.append(each.term) # each isType Task
- else:
- interval += 1
- # cpd_term: [A.term, Interval(2), B.term] or [] TODO: ugly, but work :\
-
- if len(cpd_term) != 0:
- term = Compound.SequentialEvents(*cpd_term)
- truth = cpd[0].truth
- stamp = cpd[0].stamp
- budget = cpd[0].budget
- for each in cpd[1:]:
- if each != 1:
- # truth, using truth-induction function (TODO, may subject to change)
- truth = Truth_induction(truth, each.truth)
- # stamp, using stamp-merge function (TODO, may subject to change)
- stamp = Stamp_merge(stamp, each.stamp)
- # budget, using budget-merge function (TODO, may subject to change)
- budget = Budget_merge(budget, each.budget)
- # sentence composition
- sentence = Judgement(term, stamp, truth)
- # task generation
- task = Task(sentence, budget)
- self.slots[self.present].update_events(task)
-
- # checking historical events is moved to the end of local_evaluation
-
- def local_evaluation(self):
- # generate anticipation
- for each_prediction in self.prediction_table:
-
- # predictions may be like "(&/, A, +1) =/> B", the content of the subject will just be A
- # if it is "(&/, A, +1, B) =/> C", no need to change the subject
- interval = 0
- if isinstance(each_prediction.term.subject.terms[-1], Interval):
- subject = Compound.SequentialEvents(*each_prediction.term.subject.terms[:-1]) # condition
- interval = int(each_prediction.term.subject.terms[-1])
- else:
- subject = each_prediction.term.subject
-
- for each_event in self.slots[self.present].events:
- if subject.equal(each_event[1].term):
- # term generation
- term = each_prediction.term.predicate
- # truth, using truth-deduction function (TODO, may subject to change)
- truth = Truth_deduction(each_prediction.truth, each_event[1].truth)
- # stamp, using stamp-merge function (TODO, may subject to change)
- stamp = Stamp_merge(each_prediction.stamp, each_event[1].stamp)
- # budget, using budget-merge function (TODO, may subject to change)
- budget = Budget_merge(each_prediction.budget, each_event[1].budget)
- # sentence composition
- sentence = Judgement(term, stamp, truth)
- # task generation
- task = Task(sentence, budget)
- # anticipation generation
- anticipation = AnticipationMC(task, each_prediction)
- if interval <= self.present:
- self.slots[self.present + interval].update_anticipations(anticipation)
-
- # check anticipations with un-expectation handling (non-anticipation events)
- self.slots[self.present].check_anticipation(self, mode_unexpected=True)
-
- # unsatisfied anticipation handling
- for each_anticipation in self.slots[self.present].anticipations:
- if not self.slots[self.present].anticipations[each_anticipation].solved:
- self.slots[self.present].anticipations[each_anticipation].unsatisfied(self)
-
- def memory_based_evaluations(self):
- """
- Find whether a concept is already in the main memory. If so, merger the budget.
- """
- events_updates = []
-
- for each_event in self.slots[self.present].events:
- tmp = self.memory.concepts.take_by_key(each_event[1].term, remove=False)
- if tmp is not None:
- budget = Budget_merge(each_event[1].budget, tmp.budget)
- task = Task(each_event[1].sentence, budget)
- events_updates.append(task)
- for each_event in events_updates:
- self.slots[self.present].update_events(each_event)
-
- # find the highest concurrent compound, namely the candidate
- # sorting first
- if len(self.slots[self.present].events) != 0:
- self.slots[self.present].events = self.slots[self.present].events[
- np.argsort(self.slots[self.present].events[:, 2])]
- self.slots[self.present].candidate = self.slots[self.present].events[-1][1]
-
- def prediction_generation(self):
- # =/>
- if self.slots[self.present].candidate is not None:
- predicate = self.slots[self.present].candidate.term
- for i in range(self.present):
- if self.slots[i].candidate:
- # e.g., (E, +1) as the subject
- subject = Compound.SequentialEvents(self.slots[i].candidate.term, Interval(abs(self.present - i)))
- copula = Copula.PredictiveImplication # =/>
- term = Statement(subject, copula, predicate)
- # truth, using truth-induction function (TODO, may subject to change)
- truth = Truth_induction(self.slots[i].candidate.truth,
- self.slots[self.present].candidate.truth)
- # stamp, using stamp-merge function (TODO, may subject to change)
- stamp = Stamp_merge(self.slots[i].candidate.stamp,
- self.slots[self.present].candidate.stamp)
- # budget, using budget-merge function (TODO, may subject to change)
- budget = Budget_merge(self.slots[i].candidate.budget,
- self.slots[self.present].candidate.budget)
- # sentence composition
- sentence = Judgement(term, stamp, truth)
- # task generation
- prediction = Task(sentence, budget)
- self.update_prediction(prediction)
- return self.slots[self.present].candidate
-
- def reset(self):
- self.slots = [SlotMC(self.num_event, self.num_anticipation, self.num_operation) for _ in range(self.num_slot)]
- self.prediction_table = []
- self.contents_UI = []
- for i in range(self.num_slot):
- self.contents_UI.append({"historical_compound": [],
- "concurrent_compound": [],
- "anticipation": [],
- "prediction": []})
- for each in self.P:
- T_1, T_2, T_3, T_4 = self.P[each]
- T_1.configure(state="normal")
- T_2.configure(state="normal")
- T_3.configure(state="normal")
- T_4.configure(state="normal")
- T_1.delete("1.0", "end")
- T_2.delete("1.0", "end")
- T_3.delete("1.0", "end")
- T_4.delete("1.0", "end")
- T_1.insert(tk.END, "=" * 18 + "READY" + "=" * 18) # initialization reminder
- T_2.insert(tk.END, "=" * 18 + "READY" + "=" * 18)
- T_3.insert(tk.END, "=" * 18 + "READY" + "=" * 18)
- T_4.insert(tk.END, "=" * 18 + "READY" + "=" * 18)
- T_1.configure(state="disabled") # disable user input
- T_2.configure(state="disabled")
- T_3.configure(state="disabled")
- T_4.configure(state="disabled")
-
- def UI_roll(self):
- self.contents_UI = self.contents_UI[1:]
- self.contents_UI.append({"historical_compound": [],
- "concurrent_compound": [],
- "anticipation": [],
- "prediction": []})
-
- def UI_show_single_page(self, P_i, content_UI):
- T_1, T_2, T_3, T_4 = self.P[P_i][0], self.P[P_i][1], self.P[P_i][2], self.P[P_i][3]
-
- # add tags
- Font = tkFont.Font(family="monaco", size=8, weight="bold")
- T_1.tag_config("tag_1", font=Font) # for the word of each task
- T_1.tag_config("tag_2", foreground="red") # for the budget and truth-value
- T_2.tag_config("tag_1", font=Font)
- T_2.tag_config("tag_2", foreground="red")
- T_3.tag_config("tag_1", font=Font)
- T_3.tag_config("tag_2", foreground="red")
- T_4.tag_config("tag_1", font=Font)
- T_4.tag_config("tag_2", foreground="red")
-
- T_1.configure(state="normal") # enable inputting
- T_2.configure(state="normal")
- T_3.configure(state="normal")
- T_4.configure(state="normal")
- T_1.delete("1.0", "end") # delete old contents
- T_2.delete("1.0", "end")
- T_3.delete("1.0", "end")
- T_4.delete("1.0", "end")
-
- for each in content_UI["historical_compound"]:
- if each is not None and len(each) != 0:
- BT, word, end = each[0], each[1], each[2]
- T_1.insert(tk.END, BT, "tag_2")
- T_1.insert(tk.END, word, "tag_1")
- T_1.insert(tk.END, end)
-
- for each in content_UI["concurrent_compound"]:
- if each is not None and len(each) != 0:
- BT, word, end = each[0], each[1], each[2]
- T_2.insert(tk.END, BT, "tag_2")
- T_2.insert(tk.END, word, "tag_1")
- T_2.insert(tk.END, end)
-
- for each in content_UI["anticipation"]:
- if each is not None and len(each) != 0:
- BT, word, end = each[0], each[1], each[2]
- T_3.insert(tk.END, BT, "tag_2")
- T_3.insert(tk.END, word, "tag_1")
- T_3.insert(tk.END, end)
-
- for each in content_UI["prediction"]:
- if each is not None and len(each) != 0:
- BT, word, end = each[0], each[1], each[2]
- T_4.insert(tk.END, BT, "tag_2")
- T_4.insert(tk.END, word, "tag_1")
- T_4.insert(tk.END, end)
-
- T_1.configure(state="disabled") # disable user input
- T_2.configure(state="disabled")
- T_3.configure(state="disabled")
- T_4.configure(state="disabled")
-
- def UI_content_update(self):
- for i in range(self.num_slot):
- self.contents_UI[i].update({"historical_compound": [],
- "concurrent_compound": [UI_better_content(each[1]) for each in
- self.slots[i].events],
- "anticipation": [UI_better_content(self.slots[i].anticipations[each].t) for each
- in self.slots[i].anticipations],
- "prediction": [UI_better_content(each) for each in
- self.prediction_table]})
-
- def UI_show(self):
- for i, each in enumerate(self.P):
- self.UI_show_single_page(each, self.contents_UI[i])
diff --git a/pynars/NARS/DataStructures/MC/InternalBufferMC.py b/pynars/NARS/DataStructures/MC/InternalBufferMC.py
deleted file mode 100644
index 95ebb76..0000000
--- a/pynars/NARS/DataStructures/MC/InternalBufferMC.py
+++ /dev/null
@@ -1,51 +0,0 @@
-import numpy as np
-
-from pynars.NARS.DataStructures import Memory
-from pynars.NARS.DataStructures.MC.InputBufferMC import InputBufferMC
-from pynars.NARS.DataStructures.MC.SlotMC import SlotMC
-
-
-class InternalBufferMC(InputBufferMC):
-
- def __init__(self, num_slot, num_event, num_anticipation, num_operation, num_prediction,
- memory: Memory, root_UI, UI_name):
- super(InternalBufferMC, self).__init__(num_slot, num_event, num_anticipation, num_operation, num_prediction,
- memory, root_UI, UI_name)
-
- def operation_processing_default(self):
- """
- A default processing of operations. "By default" means that an operation is used as an event, and they are
- combined used in compound generation.
-
- And the operation is not used specially in prediction generation as "procedural knowledge".
-
- Note that, in internal buffers, this "operation" means "mental operations" specifically.
- """
- self.slots[self.present].events = np.append(self.slots[self.present].events,
- self.slots[self.present - 1].operations)
-
- def step(self, new_contents, origin = ""):
- """
- Internal buffer and global buffer can have multiple inputs at the same time. And so they have contemporary and
- historical compound generations successively. But the input of the historical compound generation will be the
- highest concurrent input.
- """
- # remove the oldest slot and create a new one
- self.slots = self.slots[1:]
- self.slots.append(SlotMC(self.num_event, self.num_anticipation, self.num_operation))
-
- self.operation_processing_default() # default operation processing
- self.concurrent_compound_generation(new_contents, origin) # 1st step
- self.historical_compound_generation(origin) # 1st step
- self.local_evaluation() # 2nd step
- self.memory_based_evaluations() # 3rd step
- task_forward = self.prediction_generation() # 4th step
-
- # GUI
- # ==============================================================================================================
- self.UI_roll()
- self.UI_content_update()
- self.UI_show()
- # ==============================================================================================================
-
- return task_forward
diff --git a/pynars/NARS/DataStructures/MC/NarseseChannel.py b/pynars/NARS/DataStructures/MC/NarseseChannel.py
new file mode 100644
index 0000000..7330a21
--- /dev/null
+++ b/pynars/NARS/DataStructures/MC/NarseseChannel.py
@@ -0,0 +1,41 @@
+from pynars.NARS import Reasoner
+from pynars.NARS.DataStructures.MC.Buffer import Buffer
+from pynars.Narsese import parser
+from pynars.utils.Print import print_out, PrintType
+
+
+class NarseseChannel:
+
+ def __init__(self, size, N=1, D=0.9):
+ self.input_buffer = Buffer(size, N, D)
+
+ def channel_cycle(self, line, memory):
+ """
+ Print out: 1) what is input (to the channel, not to the memory or the reasoner);
+ 2) what the system wants to print.
+ """
+ if not line.isdigit():
+ try:
+ task = parser.parse(line)
+ print_out(PrintType.IN, task.sentence.repr(), *task.budget)
+ self.input_buffer.add([task], memory)
+ except:
+ print_out(PrintType.ERROR, f"Invalid input! Failed to parse: {line}")
+ else:
+ print_out(PrintType.INFO, f'Run {int(line)} cycles.')
+
+ return self.input_buffer.select()
+
+ @staticmethod
+ def display(tasks):
+ for each in tasks:
+ print_out(PrintType.OUT, f"{each.sentence.repr()} {str(each.stamp)}", *each.budget)
+
+
+if __name__ == "__main__":
+ r = Reasoner(100, 100)
+ m = r.memory
+ c = NarseseChannel(10)
+ c.display([parser.parse(" X>.")])
+ print(c.channel_cycle(" B>", m))
+ print(c.channel_cycle(" B>.", m))
diff --git a/pynars/NARS/DataStructures/MC/OutputBuffer.py b/pynars/NARS/DataStructures/MC/OutputBuffer.py
new file mode 100644
index 0000000..beff2f9
--- /dev/null
+++ b/pynars/NARS/DataStructures/MC/OutputBuffer.py
@@ -0,0 +1,56 @@
+from pynars.Narsese import Compound
+
+
+class Reaction:
+
+ def __init__(self, condition, operation, goal):
+ self.condition = condition
+ self.operation = operation
+ self.goal = goal
+
+ def fire(self, task):
+ if task.term == self.condition:
+ return self.operation
+
+
+class OutputBuffer:
+ """
+ Output buffer is the bridge from NARS reasoner to all outside parts.
+ You may think the output buffer is the translator for the reasoner.
+ There are many "input buffers", but there is only one output buffer.
+ """
+
+ def __init__(self, Narsese_channel):
+ # usually, a Narsese channel is necessary for the system (for user inputs and display outputs of reasoning)
+ self.Narsese_channel = Narsese_channel
+
+ def buffer_cycle(self, tasks, sensorimotor_channels):
+ """
+ Get the derived tasks and send them to the corresponding channels as outputs.
+ It can be sent to the Narsese Chanel for display in the console, or it can be sent to the corresponding
+ sensorimotor channels for operation execution.
+ """
+
+ # put all derived things to the Narsese channel to display
+ self.Narsese_channel.display(tasks)
+ # convert sub-goals and send to the corresponding channels (if any)
+ for each_task in tasks:
+ if each_task.is_goal:
+
+ if each_task.term.is_compound and each_task.term.components[-1].word[0] == "^":
+ for each_channel in sensorimotor_channels:
+ if sensorimotor_channels[each_channel].identify(each_task.term.components):
+
+ condition = None
+ operation = None
+
+ if each_task.term.connector == "Connector.ParallelEvents":
+ condition = Compound.ParallelEvents(*each_task.term.components[:-1])
+ operation = each_task.term.components[-1]
+ elif each_task.term.connector == "Connector.ParallelEvents":
+ condition = Compound.SequentialEvents(*each_task.term.components[:-1])
+ operation = each_task.term.components[-1]
+
+ if condition is not None and operation is not None:
+ sensorimotor_channels[each_channel].add_reaction(Reaction(condition, operation,
+ each_task))
diff --git a/pynars/NARS/DataStructures/MC/OutputBufferMC.py b/pynars/NARS/DataStructures/MC/OutputBufferMC.py
deleted file mode 100644
index 0d35654..0000000
--- a/pynars/NARS/DataStructures/MC/OutputBufferMC.py
+++ /dev/null
@@ -1,140 +0,0 @@
-import tkinter as tk
-from pynars.Narsese import Task, Compound, Interval, Term
-
-
-def UI_better_content(task: Task):
- budget = "$" + str(task.budget.priority)[:4] + ";" + str(task.budget.durability)[:4] + ";" + str(
- task.budget.quality)[:4] + "$ | "
- word = "".join(task.sentence.word.split(" ")) + "\n"
- end = "=" * 79 + "\n"
- word.replace("-->", "->")
- word.replace("==>", "=>")
- if task.truth is not None:
- truth = "% " + str(task.truth.f)[:4] + ";" + str(task.truth.c)[:4] + "%\n"
- return [budget + truth, word, end]
- else:
- return [budget + "\n", word, end]
-
-
-class OutputBufferMC:
-
- def __init__(self, T):
- self.agenda_length = 20
- self.operation_of_channel = {}
- self.channel_of_operation = {}
- self.agenda = {i: [] for i in range(self.agenda_length)}
-
- # GUI
- # ==============================================================================================================
- self.active_questions = []
- self.active_goals = []
- # self.active_quests = []
- self.T = T # T is the text box in the main UI
- self.shown_content = False
- # ==============================================================================================================
-
- def register_channel(self, channel): # register channels' operations
- tmp = set()
- for each_operation in channel.operations:
- tmp.add(each_operation)
- self.operation_of_channel.update({each_operation: channel})
- self.channel_of_operation.update({channel: tmp})
-
- def decompose(self, term: Term, agenda_pointer):
- # decompose complicated compound operations, including intervals
- ap = agenda_pointer
- if isinstance(term, Compound):
- for each_component in term.terms:
- if isinstance(each_component, Interval):
- ap += each_component.interval
- elif isinstance(each_component, Compound):
- self.decompose(each_component, ap)
- else:
- for each_operation in self.operation_of_channel:
- if each_component.equal(each_operation) and ap < self.agenda_length: # only store operations
- self.agenda[ap].append(each_component)
- break
- else:
- self.agenda[ap].append(term)
-
- def distribute_execute(self): # distribute the decomposed operations into corresponding channels
- for each_operation in self.agenda[0]:
- corresponding_channel = self.operation_of_channel[Term("^" + each_operation.word)]
- corresponding_channel.execute(Term("^" + each_operation.word)) # operation execution
- corresponding_channel.event_buffer.slots[corresponding_channel.event_buffer.present].update_operations(
- Term("^" + each_operation.word)) # operation execution record added
- self.agenda = {i: self.agenda[i + 1] for i in range(self.agenda_length - 1)}
- self.agenda.update({self.agenda_length - 1: []})
-
- def UI_show(self):
- self.T.configure(state="normal") # enable inputting
-
- # show active questions
- for each in self.active_questions:
- BT, word, _ = UI_better_content(each[0])
- if each[1] == "updated":
- self.T.insert(tk.END, "[Question updated]: " + BT)
- self.T.insert(tk.END, word, "tag_2_updated")
- self.shown_content = True
- elif each[1] == "initialized":
- self.T.insert(tk.END, "[Question found]: " + BT)
- self.T.insert(tk.END, word, "tag_2")
- self.shown_content = True
- elif each[1] == "derived":
- self.T.insert(tk.END, "[Question derived]: " + BT)
- self.T.insert(tk.END, word, "tag_2_updated")
- self.shown_content = True
- each[1] = ""
-
- # show active goals
- for each in self.active_goals:
- BT, word, _ = UI_better_content(each[0])
- AL = str(each[0].achieving_level())[:4] + "\n"
- if each[1] == "updated":
- self.T.insert(tk.END, "[Goal updated]: " + BT)
- self.T.insert(tk.END, "Achieving level: " + AL)
- self.T.insert(tk.END, word, "tag_2_updated")
- self.shown_content = True
- elif each[1] == "initialized":
- self.T.insert(tk.END, "[Goal found]: " + BT)
- self.T.insert(tk.END, "Achieving level: " + AL)
- self.T.insert(tk.END, word, "tag_2")
- self.shown_content = True
- elif each[1] == "derived":
- self.T.insert(tk.END, "[Goal derived]: " + BT)
- self.T.insert(tk.END, "Achieving level: " + AL)
- self.T.insert(tk.END, word, "tag_2_updated")
- self.shown_content = True
- each[1] = ""
-
- if self.shown_content:
- self.T.insert(tk.END, "=" * 79 + "\n")
- self.shown_content = False
- self.T.configure(state="disabled") # disable user input
-
- def step(self, task: Task):
- """
- This function is used to distribute "operations" from the internal buffer to the event buffer.
- One operation goal is firstly generated in the inference engine. After that, it will be forwarded to the
- internal buffer, and if this task is further forwarded to the global buffer, this task will be viewed as
- "executed". And it is also really executed, which might be reflected in the information gathered by the
- corresponding event buffer. And it is possible for the global buffer to generate "procedural knowledge".
-
- Since such operation is executed by the event buffer, it also needs to be "percepted" by the event buffer.
- And so in event buffers, it is also possible to generate such "procedural knowledge".
-
- In short, this function will execute the operation goal selected from the internal buffer and let the
- corresponding event buffer know.
- """
- # operation goal
- if task and task.is_goal:
- self.decompose(task.term, 0)
- self.distribute_execute()
-
- def reset(self):
- self.agenda = {i: [] for i in range(self.agenda_length)}
- self.T.configure(state="normal")
- self.T.delete("1.0", "end") # clear window
- self.T.configure(state="disabled")
- self.active_questions = []
- self.active_goals = []
diff --git a/pynars/NARS/DataStructures/MC/SampleChannels/NarseseChannel.py b/pynars/NARS/DataStructures/MC/SampleChannels/NarseseChannel.py
deleted file mode 100644
index d1df6dc..0000000
--- a/pynars/NARS/DataStructures/MC/SampleChannels/NarseseChannel.py
+++ /dev/null
@@ -1,32 +0,0 @@
-from pynars.NARS.DataStructures.MC.ChannelMC import ChannelMC
-from pynars.Narsese import Term, parser
-
-
-class NarseseChannel(ChannelMC):
-
- def __init__(self, ID: str, num_slot, num_event, num_anticipation, num_prediction, memory):
- super(NarseseChannel, self).__init__(ID, num_slot, num_event, num_anticipation, num_prediction, memory)
- self.operations = [Term("^write")]
- self.read_cursor = 0
- self.count = 0
-
-
- def execute(self, term: Term):
- if term.equal(self.operations[0]):
- f = open("./playground.txt", "w")
- f.write("I am writing" + str(self.count))
- self.count += 1
- f.close()
-
- def information_gathering(self):
- f = open("./playground.txt", "r")
- lines = f.readlines()
- f.close()
- line = lines[self.read_cursor].rstrip('\n')
- line = line.split(" ")
- self.read_cursor += 1
- tasks = []
- for each in line:
- tasks.append(parser.parse("<(*, {Channel1}, " + each + ") --> ^see>."))
- return tasks
-
diff --git a/pynars/NARS/DataStructures/MC/SampleChannels/SampleChannel2.py b/pynars/NARS/DataStructures/MC/SampleChannels/SampleChannel2.py
deleted file mode 100644
index 3428b22..0000000
--- a/pynars/NARS/DataStructures/MC/SampleChannels/SampleChannel2.py
+++ /dev/null
@@ -1,90 +0,0 @@
-from pynars.Narsese import Term, parser
-from pynars.NARS.DataStructures.MC.ChannelMC import ChannelMC
-
-
-# task generation utility function
-def task_generation_util(v, ID):
- if round(v) != 0:
- task = parser.parse(
- "<" + ID + " --> shape_" + str(round(v)) + ">. :|: %0.9;0.5%")
- return task
-
-
-class SampleChannel2(ChannelMC):
-
- def __init__(self, num_slot, num_event, num_anticipation, num_operation, num_prediction,
- memory, env, root_UI, UI_name):
- super(SampleChannel2, self).__init__(num_slot, num_event, num_anticipation, num_operation, num_prediction,
- memory, root_UI, UI_name)
- # self.operations = [Term("^rotate"), Term("^zoom"), Term("^up"), Term("^down"), Term("^right"), Term("^left")]
- # rotation, zoom temporally disabled
- self.operations = [Term("^up"), Term("^down"), Term("^right"), Term("^left")]
- self.env = env
-
- def execute(self, term: Term):
- if term.equal(self.operations[0]):
- # self.env.rotate()
- pass
- elif term.equal(self.operations[1]):
- # self.env.zoom()
- pass
- elif term.equal(self.operations[2]):
- self.env.up()
- elif term.equal(self.operations[3]):
- self.env.down()
- elif term.equal(self.operations[4]):
- self.env.right()
- elif term.equal(self.operations[5]):
- self.env.left()
- else:
- return
-
- def information_gathering(self):
- ret = []
- mat = self.env.visual_signal()
- task_A, task_B, task_C, task_D = None, None, None, None
- if mat.shape[0] == 2:
- A = (mat[0, 0] + mat[0, 1]) / 2
- B = (mat[0, 0] + mat[1, 0]) / 2
- C = (mat[1, 0] + mat[1, 1]) / 2
- D = (mat[0, 1] + mat[1, 1]) / 2
- task_A = task_generation_util(A, "A")
- task_B = task_generation_util(B, "B")
- task_C = task_generation_util(C, "C")
- task_D = task_generation_util(D, "D")
- elif mat.shape[0] == 4:
- A = (sum(mat[0, :]) + mat[1, 1] + mat[1, 2]) / 6
- B = (sum(mat[:, 0]) + mat[1, 1] + mat[2, 1]) / 6
- C = (sum(mat[3, :]) + mat[2, 1] + mat[2, 2]) / 6
- D = (sum(mat[:, 3]) + mat[1, 2] + mat[2, 2]) / 6
- task_A = task_generation_util(A, "A")
- task_B = task_generation_util(B, "B")
- task_C = task_generation_util(C, "C")
- task_D = task_generation_util(D, "D")
- elif mat.shape[0] == 8:
- A = (sum(mat[0, :]) + sum(mat[1, 1:6]) + sum(mat[2, 2:5]) + mat[3, 3]) / 19
- B = (sum(mat[:, 0]) + sum(mat[1:6, 1]) + sum(mat[2:5, 2]) + mat[3, 3]) / 19
- C = (sum(mat[7, :]) + sum(mat[6, 1:6]) + sum(mat[5, 2:5]) + mat[3, 3]) / 19
- D = (sum(mat[:, 7]) + sum(mat[1:6, ]) + sum(mat[2:5, 2]) + mat[3, 3]) / 19
- task_A = task_generation_util(A, "A")
- task_B = task_generation_util(B, "B")
- task_C = task_generation_util(C, "C")
- task_D = task_generation_util(D, "D")
- if task_A:
- ret.append(task_A)
- if task_B:
- ret.append(task_B)
- if task_C:
- ret.append(task_C)
- if task_D:
- ret.append(task_D)
- # print(self.env.mask_position)
- # print(self.env.mask_size)
- # print(self.env.rotation)
- # print("==>")
- # ret.append(parser.parse(" X2>."))
- # ret.append(parser.parse(" X4>."))
- # ret.append(parser.parse(" X6>."))
- # ret.append(parser.parse(" X8>."))
-
- return ret
diff --git a/pynars/NARS/DataStructures/MC/SampleChannels/SampleChannel3.py b/pynars/NARS/DataStructures/MC/SampleChannels/SampleChannel3.py
deleted file mode 100644
index 8187eff..0000000
--- a/pynars/NARS/DataStructures/MC/SampleChannels/SampleChannel3.py
+++ /dev/null
@@ -1,26 +0,0 @@
-from pynars.Narsese import Term, parser
-from pynars.NARS.DataStructures.MC.ChannelMC import ChannelMC
-
-
-class SampleChannel3(ChannelMC):
-
- def __init__(self, num_slot, num_event, num_anticipation, num_prediction, memory, env, root_UI, UI_name):
- super(SampleChannel3, self).__init__(num_slot, num_event, num_anticipation, num_prediction, memory, root_UI,
- UI_name)
- self.operations = []
- self.env = env
-
- def execute(self, term: Term):
- pass
-
- # def information_gathering(self):
- # ret = []
- # shapes_detected = self.env.check_shape()
- # for each in shapes_detected:
- # task = parser.parse(" " + each + ">. :|:")
- # # print(task)
- # # print("==>")
- # return ret
-
- def information_gathering(self):
- return self.env.check_shape()
diff --git a/pynars/NARS/DataStructures/MC/SampleChannels/SampleEnvironment1.py b/pynars/NARS/DataStructures/MC/SampleChannels/SampleEnvironment1.py
deleted file mode 100644
index e4cd6c7..0000000
--- a/pynars/NARS/DataStructures/MC/SampleChannels/SampleEnvironment1.py
+++ /dev/null
@@ -1,134 +0,0 @@
-import numpy as np
-
-
-class SampleEnvironment1:
-
- def __init__(self):
- self.content = np.array([[2, 2, 1, 1, 0, 0, 0, 2],
- [2, 0, 1, 1, 1, 0, 0, 2],
- [0, 0, 0, 1, 0, 0, 0, 0],
- [0, 0, 0, 0, 0, 0, 0, 0],
- [0, 0, 0, 0, 0, 0, 0, 0],
- [0, 0, 0, 0, 0, 0, 0, 0],
- [0, 0, 0, 0, 0, 0, 0, 0],
- [2, 2, 0, 0, 0, 0, 0, 0]])
- self.grid_size = 8
- self.mask_size = 2
- self.step_length = 1
- self.mask_position = [0, 0] # left_top corner
- self.rotation = 0
- self.shapes = {"shape1": [[0, 2], [1, 3]]}
-
- def content_generation(self, n):
- self.grid_size = n
- self.content = np.zeros([n, n])
- anchor = [np.random.randint(n), np.random.randint(n)]
- # first block, block [1]
- self.content[anchor[0], (anchor[1] + 2) % n] = 1
- self.content[(anchor[0] + 1) % n, (anchor[1] + 2) % n] = 1
- self.content[anchor[0], (anchor[1] + 3) % n] = 1
- self.content[(anchor[0] + 1) % n, (anchor[1] + 2) % n] = 1
- # first block, block [2]
- self.content[(anchor[0] + 2) % n, anchor[1]] = 2
- self.content[(anchor[0] + 3) % n, anchor[1]] = 2
- self.content[(anchor[0] + 2) % n, (anchor[1] + 1) % n] = 2
- self.content[(anchor[0] + 3) % n, (anchor[1] + 1) % n] = 2
- # first block, block [3]
- self.content[(anchor[0] + 2) % n, (anchor[1] + 4) % n] = 3
- self.content[(anchor[0] + 2) % n, (anchor[1] + 5) % n] = 3
- self.content[(anchor[0] + 3) % n, (anchor[1] + 4) % n] = 3
- self.content[(anchor[0] + 3) % n, (anchor[1] + 5) % n] = 3
- # first block, block [4]
- self.content[(anchor[0] + 4) % n, (anchor[1] + 2) % n] = 4
- self.content[(anchor[0] + 4) % n, (anchor[1] + 3) % n] = 4
- self.content[(anchor[0] + 5) % n, (anchor[1] + 2) % n] = 4
- self.content[(anchor[0] + 5) % n, (anchor[1] + 3) % n] = 4
-
- def visualization(self):
- pass
-
- def visual_signal(self):
- if self.mask_position[0] + self.mask_size > self.grid_size and self.mask_position[1] + self.mask_size > \
- self.grid_size:
- A = self.content[self.mask_position[0]:self.grid_size, self.mask_position[1]:self.grid_size]
- B = self.content[0:self.grid_size - self.mask_position[0], 0:self.grid_size - self.mask_position[1]]
- C = self.content[self.mask_position[0]:self.grid_size, 0:self.grid_size - self.mask_position[1]]
- D = self.content[0:self.grid_size - self.mask_position[0], self.mask_position[1]:self.grid_size]
- return np.rot90(np.squeeze(np.array([[A, C], [D, B]])), self.rotation)
- elif self.mask_position[0] + self.mask_size > self.grid_size:
- A = self.content[self.mask_position[0]:self.grid_size,
- self.mask_position[1]:self.mask_position[1] + self.mask_size]
- B = self.content[0:self.grid_size - self.mask_position[0],
- self.mask_position[1]:self.mask_position[1] + self.mask_size]
- return np.rot90(np.squeeze(np.array([[A], [B]])), self.rotation)
- elif self.mask_position[1] + self.mask_size > self.grid_size:
- A = self.content[self.mask_position[0]:self.mask_position[0] + self.mask_size,
- self.mask_position[1]:self.grid_size]
- B = self.content[self.mask_position[0]:self.mask_position[0] + self.mask_size,
- 0:self.grid_size - self.mask_position[1]]
- return np.rot90(np.squeeze(np.array([A, B])), self.rotation)
- else:
- return np.rot90(np.squeeze(self.content[self.mask_position[0]:self.mask_position[0] + self.mask_size,
- self.mask_position[1]:self.mask_position[1] + self.mask_size]), self.rotation)
-
- def zoom(self):
- if self.mask_size == 2:
- self.mask_size = 4
- return
- if self.mask_size == 4:
- self.mask_size = 8
- return
- if self.mask_size == 8:
- self.mask_size = 2
- return
-
- def up(self):
- if self.mask_position[0] == 0:
- self.mask_position[0] = self.grid_size - 1
- else:
- self.mask_position[0] -= 1
-
- def down(self):
- if self.mask_position[0] == self.grid_size - 1:
- self.mask_position[0] = 0
- else:
- self.mask_position[0] += 1
-
- def right(self):
- if self.mask_position[1] == self.grid_size - 1:
- self.mask_position[1] = 0
- else:
- self.mask_position[1] += 1
-
- def left(self):
- if self.mask_position[1] == 0:
- self.mask_position[1] = self.grid_size - 1
- else:
- self.mask_position[1] -= 1
-
- def rotate(self):
- if self.rotation == 0:
- self.rotation = 1
- return
- if self.rotation == 1:
- self.rotation = 2
- return
- if self.rotation == 2:
- self.rotation = 3
- return
- if self.rotation == 3:
- self.rotation = 0
- return
-
- def check_shape(self):
- shapes_detected = []
- for each_shape in self.shapes:
- if self.mask_position[0] <= self.shapes[each_shape][0][0] \
- and self.mask_position[1] <= self.shapes[each_shape][0][1] \
- and self.mask_position[0] + self.mask_size >= self.shapes[each_shape][1][0] \
- and self.mask_position[1] + self.mask_size >= self.shapes[each_shape][1][1]:
- shapes_detected.append(each_shape)
- return shapes_detected
-
-
-# Env = SampleEnvironment1()
diff --git a/pynars/NARS/DataStructures/MC/SampleChannels/SampleEnvironment1_1.py b/pynars/NARS/DataStructures/MC/SampleChannels/SampleEnvironment1_1.py
deleted file mode 100644
index 4e18a11..0000000
--- a/pynars/NARS/DataStructures/MC/SampleChannels/SampleEnvironment1_1.py
+++ /dev/null
@@ -1,99 +0,0 @@
-import numpy as np
-
-
-class SampleEnvironment1_1:
-
- def __init__(self):
- self.content = None
- self.grid_size = None
- self.content_generation(8)
- self.mask_size = 2
- self.step_length = 1
- self.mask_position = [0, 0] # left_top corner
-
- def content_generation(self, n):
- self.grid_size = n
- self.content = np.zeros([n, n])
- anchor = [np.random.randint(n), np.random.randint(n)]
- # first block, block [1]
- self.content[anchor[0], (anchor[1] + 2) % n] = 1
- self.content[(anchor[0] + 1) % n, (anchor[1] + 2) % n] = 1
- self.content[anchor[0], (anchor[1] + 3) % n] = 1
- self.content[(anchor[0] + 1) % n, (anchor[1] + 2) % n] = 1
- # first block, block [2]
- self.content[(anchor[0] + 2) % n, anchor[1]] = 2
- self.content[(anchor[0] + 3) % n, anchor[1]] = 2
- self.content[(anchor[0] + 2) % n, (anchor[1] + 1) % n] = 2
- self.content[(anchor[0] + 3) % n, (anchor[1] + 1) % n] = 2
- # first block, block [3]
- self.content[(anchor[0] + 2) % n, (anchor[1] + 4) % n] = 3
- self.content[(anchor[0] + 2) % n, (anchor[1] + 5) % n] = 3
- self.content[(anchor[0] + 3) % n, (anchor[1] + 4) % n] = 3
- self.content[(anchor[0] + 3) % n, (anchor[1] + 5) % n] = 3
- # first block, block [4]
- self.content[(anchor[0] + 4) % n, (anchor[1] + 2) % n] = 4
- self.content[(anchor[0] + 4) % n, (anchor[1] + 3) % n] = 4
- self.content[(anchor[0] + 5) % n, (anchor[1] + 2) % n] = 4
- self.content[(anchor[0] + 5) % n, (anchor[1] + 3) % n] = 4
-
- def visualization(self):
- pass
-
- def visual_signal(self):
- if self.mask_position[0] + self.mask_size > self.grid_size and self.mask_position[1] + self.mask_size > \
- self.grid_size:
- A = self.content[self.mask_position[0]:self.grid_size, self.mask_position[1]:self.grid_size]
- B = self.content[0:self.grid_size - self.mask_position[0], 0:self.grid_size - self.mask_position[1]]
- C = self.content[self.mask_position[0]:self.grid_size, 0:self.grid_size - self.mask_position[1]]
- D = self.content[0:self.grid_size - self.mask_position[0], self.mask_position[1]:self.grid_size]
- return np.squeeze(np.array([[A, C], [D, B]]))
- elif self.mask_position[0] + self.mask_size > self.grid_size:
- A = self.content[self.mask_position[0]:self.grid_size,
- self.mask_position[1]:self.mask_position[1] + self.mask_size]
- B = self.content[0:self.grid_size - self.mask_position[0],
- self.mask_position[1]:self.mask_position[1] + self.mask_size]
- return np.squeeze(np.array([[A], [B]]))
- elif self.mask_position[1] + self.mask_size > self.grid_size:
- A = self.content[self.mask_position[0]:self.mask_position[0] + self.mask_size,
- self.mask_position[1]:self.grid_size]
- B = self.content[self.mask_position[0]:self.mask_position[0] + self.mask_size,
- 0:self.grid_size - self.mask_position[1]]
- return np.squeeze(np.array([A, B]))
- else:
- return np.squeeze(self.content[self.mask_position[0]:self.mask_position[0] + self.mask_size,
- self.mask_position[1]:self.mask_position[1] + self.mask_size])
-
- def up(self):
- if self.mask_position[0] == 0:
- self.mask_position[0] = self.grid_size - 1
- else:
- self.mask_position[0] -= 1
-
- def down(self):
- if self.mask_position[0] == self.grid_size - 1:
- self.mask_position[0] = 0
- else:
- self.mask_position[0] += 1
-
- def right(self):
- if self.mask_position[1] == self.grid_size - 1:
- self.mask_position[1] = 0
- else:
- self.mask_position[1] += 1
-
- def left(self):
- if self.mask_position[1] == 0:
- self.mask_position[1] = self.grid_size - 1
- else:
- self.mask_position[1] -= 1
-
- def check_shape(self):
- tmp = self.visual_signal()
- if tmp.all() == 1:
- return "block_1"
- elif tmp.all() == 2:
- return "block_2"
- elif tmp.all() == 3:
- return "block_3"
- elif tmp.all() == 4:
- return "block_4"
diff --git a/pynars/NARS/DataStructures/MC/SampleChannels/WumpusWorld.py b/pynars/NARS/DataStructures/MC/SampleChannels/WumpusWorld.py
deleted file mode 100644
index f401aa0..0000000
--- a/pynars/NARS/DataStructures/MC/SampleChannels/WumpusWorld.py
+++ /dev/null
@@ -1,138 +0,0 @@
-import numpy as np
-
-
-class WumpusWorld:
-
- def __init__(self, size, num_gold, num_wumpus, num_pit):
- self.size = size
- self.num_gold = num_gold
- self.num_wumpus = num_wumpus
- self.num_pit = num_pit
- # world initialization
- self.world_brick = np.arange(size ** 2)
- np.random.shuffle(self.world_brick)
- self.world_brick = self.world_brick.reshape((size, size))
- self.world_gold = None
- self.world_wumpus = None
- self.world_pit = None
- self.position = None
- self.gold_picked = 0
- self.pos_gold = None
- # world generation
- self.generate_world()
- self.visualization()
-
- def generate_world(self):
- # generate a brand-new world
- self.world_wumpus = np.zeros((self.size, self.size))
- self.world_pit = np.zeros((self.size, self.size))
- self.world_gold = np.zeros((self.size, self.size))
- pos_gold = []
- pos_wumpus = []
- pos_pit = []
- while True:
- pos = tuple(np.random.randint(0, self.size - 1, (1, 2)).squeeze())
- if not (pos_gold + pos_wumpus + pos_pit).__contains__(pos):
- pos_gold.append(tuple(pos))
- if len(pos_gold) == self.num_gold:
- break
- while True:
- pos = tuple(np.random.randint(0, self.size - 1, (1, 2)).squeeze())
- if not (pos_gold + pos_wumpus + pos_pit).__contains__(pos):
- pos_wumpus.append(tuple(pos))
- if len(pos_wumpus) == self.num_wumpus:
- break
- while True:
- pos = tuple(np.random.randint(0, self.size - 1, (1, 2)).squeeze())
- if not (pos_gold + pos_wumpus + pos_pit).__contains__(pos):
- pos_pit.append(tuple(pos))
- if len(pos_pit) == self.num_pit:
- break
- while True:
- pos = tuple(np.random.randint(0, self.size - 1, (1, 2)).squeeze())
- if not (pos_gold + pos_wumpus + pos_pit).__contains__(pos):
- self.position = list(pos)
- break
- self.pos_gold = pos_gold
- for each in pos_gold:
- self.world_gold[each] = 2
- if each[0] != 0:
- self.world_gold[each[0] - 1, each[1]] = 1
- if each[0] != self.size - 1:
- self.world_gold[each[0] + 1, each[1]] = 1
- if each[1] != 0:
- self.world_gold[each[0], each[1] - 1] = 1
- if each[1] != self.size - 1:
- self.world_gold[each[0], each[1] + 1] = 1
- for each in pos_wumpus:
- self.world_wumpus[each] = 2
- if each[0] != 0:
- self.world_wumpus[each[0] - 1, each[1]] = 1
- if each[0] != self.size - 1:
- self.world_wumpus[each[0] + 1, each[1]] = 1
- if each[1] != 0:
- self.world_wumpus[each[0], each[1] - 1] = 1
- if each[1] != self.size - 1:
- self.world_wumpus[each[0], each[1] + 1] = 1
- for each in pos_pit:
- self.world_pit[each] = 2
- if each[0] != 0:
- self.world_pit[each[0] - 1, each[1]] = 1
- if each[0] != self.size - 1:
- self.world_pit[each[0] + 1, each[1]] = 1
- if each[1] != 0:
- self.world_pit[each[0], each[1] - 1] = 1
- if each[1] != self.size - 1:
- self.world_pit[each[0], each[1] + 1] = 1
-
- def up(self):
- if self.position[0] != 0:
- self.position[0] -= 1
-
- def down(self):
- if self.position[0] != self.size - 1:
- self.position[0] += 1
-
- def right(self):
- if self.position[1] != self.size - 1:
- self.position[1] += 1
-
- def left(self):
- if self.position[1] != 0:
- self.position[1] -= 1
-
- def pick(self):
- if self.world_gold[self.position] == 2:
- self.gold_picked += 1
- self.world_gold[self.position] = 0
- if self.position[0] != 0:
- self.world_gold[self.position[0] - 1, self.position[1]] = 0
- if self.position[0] != self.size - 1:
- self.world_gold[self.position[0] + 1, self.position[1]] = 0
- if self.position[1] != 0:
- self.world_gold[self.position[0], self.position[1] - 1] = 0
- if self.position[1] != self.size - 1:
- self.world_gold[self.position[0], self.position[1] + 1] = 0
-
- def visualization(self):
- print("Position")
- print(self.position)
- print("Brick")
- print(self.world_brick)
- print("Gold")
- print(self.world_gold)
- print("Wumpus")
- print(self.world_wumpus)
- print("Pit")
- print(self.world_pit)
- print("=" * 10)
-
- def shortest_path(self):
- if self.position[0] < self.pos_gold[0][0]:
- return "down"
- if self.position[1] < self.pos_gold[0][1]:
- return "right"
- if self.position[0] > self.pos_gold[0][0]:
- return "up"
- if self.position[1] > self.pos_gold[0][1]:
- return "left"
diff --git a/pynars/NARS/DataStructures/MC/SampleChannels/WumpusWorldBodySenseChannel.py b/pynars/NARS/DataStructures/MC/SampleChannels/WumpusWorldBodySenseChannel.py
deleted file mode 100644
index 6c8597f..0000000
--- a/pynars/NARS/DataStructures/MC/SampleChannels/WumpusWorldBodySenseChannel.py
+++ /dev/null
@@ -1,31 +0,0 @@
-from pynars.NARS.DataStructures.MC.ChannelMC import ChannelMC
-from pynars.Narsese import Term, parser
-
-
-# task generation utility function
-def task_generation_util(brick_ID):
- task = parser.parse(
- "$0.5;0.5;0.5$ <(*, {SELF}, " + str(brick_ID) + ") --> at>. :|: %0.9;0.5%")
- return task
-
-
-class WumpusWorldBodySenseChannel(ChannelMC):
-
- def __init__(self, num_slot, num_event, num_anticipation, num_prediction, memory, env, root_UI, UI_name):
- super(WumpusWorldBodySenseChannel, self).__init__(num_slot, num_event, num_anticipation, num_prediction, memory,
- root_UI, UI_name)
- self.operations = [Term("^up"), Term("^down"), Term("^right"), Term("^left")]
- self.env = env
-
- def execute(self, term: Term):
- if term.equal(self.operations[0]):
- self.env.up()
- elif term.equal(self.operations[1]):
- self.env.down()
- elif term.equal(self.operations[2]):
- self.env.right()
- elif term.equal(self.operations[3]):
- self.env.left()
-
- def information_gathering(self):
- return [task_generation_util(self.env.world_brick[self.env.position[0], self.env.position[1]])]
diff --git a/pynars/NARS/DataStructures/MC/SampleChannels/WumpusWorldGoldChannel.py b/pynars/NARS/DataStructures/MC/SampleChannels/WumpusWorldGoldChannel.py
deleted file mode 100644
index 4cfdb30..0000000
--- a/pynars/NARS/DataStructures/MC/SampleChannels/WumpusWorldGoldChannel.py
+++ /dev/null
@@ -1,28 +0,0 @@
-from pynars.NARS.DataStructures.MC.ChannelMC import ChannelMC
-from pynars.Narsese import Term, parser
-
-
-# task generation utility function
-def task_generation_util(visual_signal):
- task = parser.parse(
- "$0.5;0.5;0.5$ <(*, {SELF}, " + visual_signal + ") --> see>. :|: %0.9;0.5%")
- return task
-
-
-class WumpusWorldGoldChannel(ChannelMC):
-
- def __init__(self, num_slot, num_event, num_anticipation, num_prediction, memory, env, root_UI, UI_name):
- super(WumpusWorldGoldChannel, self).__init__(num_slot, num_event, num_anticipation, num_prediction, memory,
- root_UI, UI_name)
- self.operations = []
- self.env = env
-
- def execute(self, term: Term):
- pass
-
- def information_gathering(self):
- if self.env.world_gold[self.env.position[0], self.env.position[1]] == 2:
- return [task_generation_util("Gold")]
- elif self.env.world_gold[self.env.position[0], self.env.position[1]] == 1:
- return [task_generation_util("Glow")]
-
diff --git a/pynars/NARS/DataStructures/MC/SampleChannels/WumpusWorldPitChannel.py b/pynars/NARS/DataStructures/MC/SampleChannels/WumpusWorldPitChannel.py
deleted file mode 100644
index bc2fd80..0000000
--- a/pynars/NARS/DataStructures/MC/SampleChannels/WumpusWorldPitChannel.py
+++ /dev/null
@@ -1,28 +0,0 @@
-from pynars.NARS.DataStructures.MC.ChannelMC import ChannelMC
-from pynars.Narsese import Term, parser
-
-
-# task generation utility function
-def task_generation_util(audio_signal):
- task = parser.parse(
- "$0.5;0.5;0.5$ <(*, {SELF}, " + audio_signal + ") --> feel>. :|: %0.9;0.5%")
- return task
-
-
-class WumpusWorldPitChannel(ChannelMC):
-
- def __init__(self, num_slot, num_event, num_anticipation, num_prediction, memory, env, root_UI, UI_name):
- super(WumpusWorldPitChannel, self).__init__(num_slot, num_event, num_anticipation, num_prediction, memory,
- root_UI, UI_name)
- self.operations = []
- self.env = env
-
- def execute(self, term: Term):
- pass
-
- def information_gathering(self):
- if self.env.world_pit[self.env.position[0], self.env.position[1]] == 2:
- return [task_generation_util("Pit")]
- elif self.env.world_pit[self.env.position[0], self.env.position[1]] == 1:
- return [task_generation_util("Breeze")]
-
diff --git a/pynars/NARS/DataStructures/MC/SampleChannels/WumpusWorldWumpusChannel.py b/pynars/NARS/DataStructures/MC/SampleChannels/WumpusWorldWumpusChannel.py
deleted file mode 100644
index 1c8fdf6..0000000
--- a/pynars/NARS/DataStructures/MC/SampleChannels/WumpusWorldWumpusChannel.py
+++ /dev/null
@@ -1,28 +0,0 @@
-from pynars.NARS.DataStructures.MC.ChannelMC import ChannelMC
-from pynars.Narsese import Term, parser
-
-
-# task generation utility function
-def task_generation_util(wumpus_signal):
- task = parser.parse(
- "$0.5;0.5;0.5$ <(*, {SELF}, " + wumpus_signal + ") --> smell>. :|: %0.9;0.5%")
- return task
-
-
-class WumpusWorldWumpusChannel(ChannelMC):
-
- def __init__(self, num_slot, num_event, num_anticipation, num_prediction, memory, env, root_UI, UI_name):
- super(WumpusWorldWumpusChannel, self).__init__(num_slot, num_event, num_anticipation, num_prediction, memory,
- root_UI, UI_name)
- self.operations = []
- self.env = env
-
- def execute(self, term: Term):
- pass
-
- def information_gathering(self):
- if self.env.world_wumpus[self.env.position[0], self.env.position[1]] == 2:
- return [task_generation_util("Wumpus")]
- elif self.env.world_wumpus[self.env.position[0], self.env.position[1]] == 1:
- return [task_generation_util("Stench")]
-
diff --git a/pynars/NARS/DataStructures/MC/SampleChannels/__init__.py b/pynars/NARS/DataStructures/MC/SampleChannels/__init__.py
deleted file mode 100644
index e69de29..0000000
diff --git a/pynars/NARS/DataStructures/MC/SampleChannels/playground.txt b/pynars/NARS/DataStructures/MC/SampleChannels/playground.txt
deleted file mode 100644
index 4792712..0000000
--- a/pynars/NARS/DataStructures/MC/SampleChannels/playground.txt
+++ /dev/null
@@ -1,36 +0,0 @@
-1
-2 3 4 5 6 7 l k m
-3
-4
-5
-6
-7
-8
-9
-90
-0
--
-=
-=
-
-1
-2
-3
-4
-5
-6
-8
-
-23
-4
-56
-7
-4
-5
-7
-
-8
-8
-
-dfg’dfgsd
-
diff --git a/pynars/NARS/DataStructures/MC/SensorimotorChannel.py b/pynars/NARS/DataStructures/MC/SensorimotorChannel.py
new file mode 100644
index 0000000..1544313
--- /dev/null
+++ b/pynars/NARS/DataStructures/MC/SensorimotorChannel.py
@@ -0,0 +1,140 @@
+from pynars.NARS.DataStructures.MC.EventBuffer import EventBuffer
+from pynars.NARS.DataStructures.MC.Utils import PriorityQueue
+from pynars.Narsese import parser
+
+
+class SensorimotorChannel:
+
+ def __init__(self, ID, num_slot, num_events, num_anticipations, num_operations, num_predictive_implications,
+ num_reactions, N=1):
+ self.ID = ID
+ """
+ The name "input buffer" might be a bit misleading, since there are no corresponding "output buffer" in a
+ channel, but this is the name in the conceptual design.
+ """
+ self.input_buffer = EventBuffer(num_slot, num_events, num_anticipations, num_operations,
+ num_predictive_implications, N)
+ self.reactions = PriorityQueue(num_reactions)
+ self.num_reactions = num_reactions
+ self.operations = {}
+ """
+ Vocabularies are the terms used by operations in this channel.
+ Keeping a record of them for unregistering operations.
+ And also to check whether some commands sent by the reasoner are consistent with this channel.
+ """
+ self.vocabulary = {}
+ self.function_vocabulary = {}
+
+ def register_operation(self, name, function, vocabulary):
+ # name := str
+ self.operations[name] = function
+ self.function_vocabulary[name] = vocabulary
+ for each in vocabulary:
+ self.vocabulary[each] = self.vocabulary.get(each, 0) + 1
+
+ def unregister_operation(self, name):
+ self.operations.pop(name)
+ for each in self.function_vocabulary[name]:
+ if each in self.vocabulary:
+ self.vocabulary[each] -= 1
+ if self.vocabulary[each] <= 0:
+ del self.vocabulary[each]
+
+ def information_gathering(self):
+ """
+ Define how this channel gathers information outside NARS, and how such information is turned into Narsese
+ sentences.
+ """
+ return []
+
+ def identify(self, terms):
+ """
+ Check whether terms can be produced from this channel. Return True if yes, False if no.
+ """
+ for each in terms:
+ if each.word not in self.vocabulary:
+ return False
+ return True
+
+ @staticmethod
+ def reaction_evaluation(reaction, memory):
+ """
+ Reactions are created by some goals. If the goal is still needed, the reaction is needed, and vice versa.
+ """
+
+ # cheating
+ # since something is wrong the reasoner, such that it cannot generate sub-goals correctly, I have to cheat.
+ # this means there are reactions with no corresponding goals
+ # ==============================================================================================================
+ if reaction.goal is None:
+ return 0.7
+ # ==============================================================================================================
+
+ concept = memory.take_by_key(reaction.goal.term, False)
+ if concept is not None:
+ return concept.budget.priority
+ return 0
+
+ def add_reaction(self, reaction):
+ self.reactions.push(reaction, reaction.goal.budget.priority)
+
+ def babbling(self):
+ return []
+
+ def channel_cycle(self, memory):
+ """
+ Get input information (Narsese or the other form) from the outside environment (mostly via RPC)
+
+ 1. Get the inputs from the environment and check them against the reactions, apply the one with the highest
+ priority. If there are no applicable reactions, babbling if wanted.
+ Babbling is not a reaction.
+
+ It looks like reactions only consider atomic inputs, but this can be further extended by letting some
+ specific compositions be generated in advance.
+
+ 2. Everything is then sent to the input buffer for the next level.
+ """
+
+ reactions = PriorityQueue(self.num_reactions)
+ # re-evaluate all reactions at the beginning of each cycle
+ while len(self.reactions) != 0:
+ reaction, _ = self.reactions.pop()
+ reactions.push(reaction, self.reaction_evaluation(reaction, memory))
+ self.reactions = reactions
+
+ # get the input
+ inputs = self.information_gathering()
+
+ # if there are some operations
+ has_operation = False
+ if len(self.reactions) != 0:
+ reaction_to_use, _ = self.reactions.pop()
+ if inputs is not None:
+ for each_input in inputs:
+ tmp = reaction_to_use.fire(each_input)
+ if tmp is not None:
+ self.operations[tmp]()
+ has_operation = True
+ inputs.append(tmp)
+ break
+
+ # babbling
+ if not has_operation:
+ operation_from_babbling = self.babbling()
+ if operation_from_babbling is not None:
+ self.operations[operation_from_babbling]()
+ inputs.append(parser.parse(operation_from_babbling + "."))
+
+ print("input_from_environment", inputs)
+
+ # cheating
+ # since something is wrong the reasoner, such that it cannot generate sub-goals correctly, I have to cheat.
+ # ==============================================================================================================
+ reactions, ret = self.input_buffer.buffer_cycle(inputs, memory)
+ for each in reactions:
+ self.reactions.push(each, self.reaction_evaluation(each, memory))
+ return ret
+ # ==============================================================================================================
+ # original
+ # return self.input_buffer.buffer_cycle(inputs, memory)
+ # ==============================================================================================================
diff --git a/pynars/NARS/DataStructures/MC/SlotMC.py b/pynars/NARS/DataStructures/MC/SlotMC.py
deleted file mode 100644
index 18b14f1..0000000
--- a/pynars/NARS/DataStructures/MC/SlotMC.py
+++ /dev/null
@@ -1,94 +0,0 @@
-import numpy as np
-from pynars.Narsese import Task, Budget, Term, Judgement
-from pynars.NARS.DataStructures.MC import AnticipationMC, InputBufferMC
-
-
-# the priority value of events
-def p_value(t: Task):
- return t.budget.priority * t.truth.f / t.term.complexity + np.random.rand() / 20
-
-
-class SlotMC:
- """
- Each slot shall contain 3 parts:
- 1) events, including input events and generated compounds;
- 2) anticipations, events (including compounds) expected;
- 3) to-do operations, these operations will be executed AT THE END of each cycle.
- """
-
- def __init__(self, num_event, num_anticipation, num_operation):
- self.num_event = num_event
- self.num_anticipation = num_anticipation
- self.num_operation = num_operation
-
- self.events = np.array([])
- self.anticipations = {} # anticipations need no priorities, so a dictionary is enough, compared with a 3-tuple
- self.operations = []
-
- self.candidate = None
-
- def update_events(self, t: Task):
- """
- Update a single event in the current time slot.
- Updating has two meanings:
- 1) if there are no task with the same term, just add it;
- 2) if there is a task with the same term, REPLACE the old one with the new one.
- """
- word = t.term.word
- # delete if existed
- if len(self.events) != 0:
- self.events = np.delete(self.events, np.where(self.events[:, 0] == word), axis=0).reshape((-1, 3))
- # then just add
- if len(self.events) == 0:
- self.events = np.array([(word, t, p_value(t))])
- else:
- self.events = np.append(self.events, [(word, t, p_value(t))], 0)
- # delete if overwhelmed
- # NO NEED TO SORT the PQ after each updating, you may do it when there are no further updates
- if len(self.events) > self.num_event:
- self.events = np.delete(self.events, np.where(self.events[:, 2] == self.events[:, 2].min()),
- axis=0).reshape((-1, 3))
-
- def update_anticipations(self, a: AnticipationMC):
- """
- There might be duplicate anticipations. All are used for revision.
- When the space for anticipations is full, no further anticipations will be accepted.
- """
- if len(self.anticipations) < self.num_anticipation:
- word = a.t.term.word
- if word in self.anticipations and p_value(a.t) > p_value(self.anticipations[word].t):
- self.anticipations.update({word: a})
- else:
- self.anticipations.update({word: a})
-
- def update_operations(self, term: Term):
- if len(self.operations) < self.num_operation:
- (Judgement(term), Budget(0.9, 0.9, 0.5))
-
- def check_anticipation(self, buffer: InputBufferMC, mode_unexpected = False):
- """
- Unexpected event:= not an anticipation
- Satisfied anticipation will be revised in to an event, and so we can recognize these events that are not
- anticipations, which is called UNEXPECTED.
- We might need to check these anticipations several time, but we don't need to check unexpected events often,
- so we have this "mode_unexpected" as an option.
- """
- events_updates = [] # satisfied anticipations will be used for revision, [:Task]
- events_updates_unexpected = [] # unexpected events will be boosted, if needed
-
- for each_event in self.events:
- if each_event[0] in self.anticipations:
- events_updates.append(self.anticipations[each_event[0]].satisfied(buffer, each_event[1]))
- elif mode_unexpected:
- task = Task(each_event[1].sentence,
- Budget(min(0.99, each_event[1].budget.priority * 0.9), each_event[1].budget.durability,
- min(0.99, each_event[1].budget.quality * 0.9)))
- events_updates_unexpected.append(task)
-
- for each_event in events_updates:
- self.update_events(each_event)
- if mode_unexpected:
- for each_event_unexpected in events_updates_unexpected:
- self.update_events(each_event_unexpected)
-
- # unsatisfied anticipations will be handled in InputBufferMC.py
diff --git a/pynars/NARS/DataStructures/MC/Utils.py b/pynars/NARS/DataStructures/MC/Utils.py
new file mode 100644
index 0000000..eb50e25
--- /dev/null
+++ b/pynars/NARS/DataStructures/MC/Utils.py
@@ -0,0 +1,123 @@
+import random
+
+from pynars.NAL.Functions import Or
+
+
+class PriorityQueue:
+ """
+ It is not a heap, it is a sorted array by insertion sort. Since we need to 1) access the largest item, 2) access the
+ smallest item, 3) access an item in the middle.
+ """
+
+ def __init__(self, size):
+ self.pq = []
+ self.size = size
+
+ def __len__(self):
+ return len(self.pq)
+
+ def push(self, item, value):
+ """
+ Add a new one, regardless whether there are duplicates.
+ """
+ added = False
+ for i in range(len(self.pq)):
+ if value <= self.pq[i][0]:
+ self.pq = self.pq[:i] + [(value, item)] + self.pq[i:]
+ added = True
+ break
+ if not added:
+ self.pq = self.pq + [(value, item)]
+ if len(self.pq) > self.size:
+ self.pq = self.pq[1:]
+
+ def edit(self, item, value, identifier):
+ """
+ Replacement.
+ """
+ found = False
+ for i in range(len(self.pq)):
+ if identifier(self.pq[i][1]) == identifier(item):
+ self.pq.pop(i)
+ # self.pq = self.pq[:i - 1] + self.pq[i:]
+ found = True
+ break
+ if not found:
+ return
+ self.push(item, value)
+
+ def pop(self):
+ """
+ Pop the highest.
+ """
+ value, item = self.pq[-1]
+ self.pq = self.pq[:-1]
+ return item, value
+
+ def random_pop(self):
+ """
+ Based on the priority (not budget.priority), randomly pop one buffer task.
+ The higher the priority, the higher the probability to be popped.
+
+ Design this function is mainly for the prediction generation in the conceptual design.
+
+ It only gives the item, not the value.
+ """
+ for i in range(len(self.pq) - 1, -1, -1):
+ if random.random() < self.pq[i][0]:
+ ret = self.pq[i]
+ self.pq = self.pq[:i] + self.pq[i + 1:]
+ return ret[1]
+ return None
+
+ def show(self, identifier):
+ """
+ Show each item in the priority queue. Since it may contain items other than BufferTasks, you can design you own
+ identifier to show what you want to show.
+ """
+ for each in sorted(self.pq, key=lambda x: x[0]):
+ print(round(each[0], 3), "|", each[1].interval, "|", identifier(each[1]))
+ print("---")
+
+
+class BufferTask:
+ """
+ When a task is input to a buffer, its budget might be different from the original.
+ But this is effected by many factors, and each factor might be further revised.
+ Therefore, we restore each factor independently; only when everything is decided, the final budget is given.
+ """
+
+ def __init__(self, task):
+ self.task = task # the original task
+ # the influence of a channel, currently, all channels have parameter 1 and can't be changed
+ self.channel_parameter = 1
+ self.preprocess_effect = 1
+ self.is_component = 0
+ self.expiration_effect = 1
+
+ @property
+ def priority(self):
+ """
+ The priority of a BufferTask, which is not the budget of the corresponding Task.
+ """
+ return (self.task.budget.priority * self.channel_parameter * self.preprocess_effect * self.expiration_effect *
+ ((2 - self.is_component) / 2))
+
+
+def preprocessing(task, memory):
+ """
+ Check whether a task is already a concept in the memory. If not, its budget is greatly decreased (based on its
+ complexity). Else, its budget is the OR of the existed budget.
+ """
+ concept_in_memory = memory.take_by_key(task.term, False)
+ if concept_in_memory is None:
+ return 1 / (1 + task.term.complexity)
+ else:
+ return Or(task.budget.priority, concept_in_memory.budget.priority)
+
+
+def satisfaction_level(truth_1, truth_2):
+ """
+ Mainly used for check whether an anticipation is satisfied.
+ """
+ return abs(truth_1.f - truth_2.f)