diff --git a/funcx_endpoint/funcx_endpoint/mock_broker/README.rst b/funcx_endpoint/funcx_endpoint/mock_broker/README.rst deleted file mode 100644 index 9bc236d39..000000000 --- a/funcx_endpoint/funcx_endpoint/mock_broker/README.rst +++ /dev/null @@ -1,53 +0,0 @@ -Notes -===== - - -We want the mock_broker to be hosting a REST service. This service will have the following routes: - -/register ---------- - -This route expects a POST with a json payload that identifies the endpoint info and responds with a -json response. - -For eg: - -POST payload:: - - { - 'python_v': '3.6', - 'os': 'Linux', - 'hname': 'borgmachine2', - 'username': 'yadu', - 'funcx_v': '0.0.1' - } - - -Response payload:: - - { - 'endpoint_id': endpoint_id, - 'task_url': 'tcp://55.77.66.22:50001', - 'result_url': 'tcp://55.77.66.22:50002', - 'command_port': 'tcp://55.77.66.22:50003' - } - - - - -Architecture and Notes ----------------------- - -The endpoint registers and receives the information - -``` - TaskQ ResultQ - | | -REST /register--> Forwarder----->Executor Client - ^ | ^ - | | | - | v | - | +-------------> Interchange -User ----> Endpoint ----| - +--> Provider -``` diff --git a/funcx_endpoint/funcx_endpoint/mock_broker/__init__.py b/funcx_endpoint/funcx_endpoint/mock_broker/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/funcx_endpoint/funcx_endpoint/mock_broker/forwarder.py b/funcx_endpoint/funcx_endpoint/mock_broker/forwarder.py deleted file mode 100644 index 3e0bbd430..000000000 --- a/funcx_endpoint/funcx_endpoint/mock_broker/forwarder.py +++ /dev/null @@ -1,191 +0,0 @@ -import logging -from functools import partial -import uuid -import os -import queue -from multiprocessing import Queue - -from multiprocessing import Process -from funcx import set_file_logger - - -def double(x): - return x * 2 - - -def failer(x): - return x / 0 - - -class Forwarder(Process): - """ Forwards tasks/results between the executor and the queues - - Tasks_Q Results_Q - | ^ - | | - V | - Executors - - Todo : We need to clarify what constitutes a task that comes down - the task pipe. Does it already have the code fragment? Or does that need to be sorted - out from some DB ? - """ - - def __init__(self, task_q, result_q, executor, endpoint_id, - logdir="forwarder", logging_level=logging.INFO): - """ - Params: - task_q : A queue object - Any queue object that has get primitives. This must be a thread-safe queue. - - result_q : A queue object - Any queue object that has put primitives. This must be a thread-safe queue. - - executor: Executor object - Executor to which tasks are to be forwarded - - endpoint_id: str - Usually a uuid4 as string that identifies the executor - - logdir: str - Path to logdir - - logging_level : int - Logging level as defined in the logging module. Default: logging.INFO (20) - - """ - super().__init__() - self.logdir = logdir - os.makedirs(self.logdir, exist_ok=True) - - global logger - logger = set_file_logger(os.path.join(self.logdir, "forwarder.{}.log".format(endpoint_id)), - level=logging_level) - - logger.info("Initializing forwarder for endpoint:{}".format(endpoint_id)) - self.task_q = task_q - self.result_q = result_q - self.executor = executor - self.endpoint_id = endpoint_id - self.internal_q = Queue() - self.client_ports = None - - def handle_app_update(self, task_id, future): - """ Triggered when the executor sees a task complete. - - This can be further optimized at the executor level, where we trigger this - or a similar function when we see a results item inbound from the interchange. - """ - logger.debug("[RESULTS] Updating result") - try: - res = future.result() - self.result_q.put(task_id, res) - except Exception: - logger.debug("Task:{} failed".format(task_id)) - # Todo : Since we caught an exception, we should wrap it here, and send it - # back onto the results queue. - else: - logger.debug("Task:{} succeeded".format(task_id)) - - def run(self): - """ Process entry point. - """ - logger.info("[TASKS] Loop starting") - logger.info("[TASKS] Executor: {}".format(self.executor)) - - try: - self.task_q.connect() - self.result_q.connect() - except Exception: - logger.exception("Connecting to the queues have failed") - - self.executor.start() - conn_info = self.executor.connection_info - self.internal_q.put(conn_info) - logger.info("[TASKS] Endpoint connection info: {}".format(conn_info)) - - while True: - try: - task = self.task_q.get(timeout=10) - logger.debug("[TASKS] Not doing {}".format(task)) - except queue.Empty: - # This exception catching isn't very general, - # Essentially any timeout exception should be caught and ignored - logger.debug("[TASKS] Waiting for tasks") - pass - else: - # TODO: We are piping down a mock task. This needs to be fixed. - task_id = str(uuid.uuid4()) - args = [5] - kwargs = {} - fu = self.executor.submit(double, *args, **kwargs) - fu.add_done_callback(partial(self.handle_app_update, task_id)) - - logger.info("[TASKS] Terminating self due to user requested kill") - return - - @property - def connection_info(self): - """Get the client ports to which the interchange must connect to - """ - - if not self.client_ports: - self.client_ports = self.internal_q.get() - - return self.client_ports - - -def spawn_forwarder(address, - executor=None, - task_q=None, - result_q=None, - endpoint_id=uuid.uuid4(), - logging_level=logging.INFO): - """ Spawns a forwarder and returns the forwarder process for tracking. - - Parameters - ---------- - - address : str - IP Address to which the endpoint must connect - - executor : Executor object. Optional - Executor object to be instantiated. - - task_q : Queue object - Queue object matching funcx.queues.base.FuncxQueue interface - - logging_level : int - Logging level as defined in the logging module. Default: logging.INFO (20) - - endpoint_id : uuid string - Endpoint id for which the forwarder is being spawned. - - Returns: - A Forwarder object - """ - from funcx_endpoint.queues import RedisQueue - from funcx_endpoint.executors import HighThroughputExecutor as HTEX - from parsl.providers import LocalProvider - from parsl.channels import LocalChannel - - task_q = RedisQueue('task', '127.0.0.1') - result_q = RedisQueue('result', '127.0.0.1') - - if not executor: - executor = HTEX(label='htex', - provider=LocalProvider( - channel=LocalChannel), - address=address) - - fw = Forwarder(task_q, result_q, executor, - "Endpoint_{}".format(endpoint_id), - logging_level=logging_level) - fw.start() - return fw - - -if __name__ == "__main__": - - pass - # test() diff --git a/funcx_endpoint/funcx_endpoint/mock_broker/mock_broker.py b/funcx_endpoint/funcx_endpoint/mock_broker/mock_broker.py deleted file mode 100644 index 4a1a1ef8f..000000000 --- a/funcx_endpoint/funcx_endpoint/mock_broker/mock_broker.py +++ /dev/null @@ -1,75 +0,0 @@ -""" The broker service - -This REST service fields incoming registration requests from endpoints, -creates an appropriate forwarder to which the endpoint can connect up. -""" - - -import bottle -from bottle import post, run, request, route -import argparse -import json -import uuid -import sys - -from funcx_endpoint.mock_broker.forwarder import Forwarder, spawn_forwarder - - -@post('/register') -def register(): - """ Register an endpoint request - - 1. Start an executor client object corresponding to the endpoint - 2. Pass connection info back as a json response. - """ - - print("Request: ", request) - print("foo: ", request.app.ep_mapping) - print(json.load(request.body)) - endpoint_details = json.load(request.body) - print(endpoint_details) - - # Here we want to start an executor client. - # Make sure to not put anything into the client, until after an interchange has - # connected to avoid clogging up the pipe. Submits will block if the client has - # no endpoint connected. - endpoint_id = str(uuid.uuid4()) - fw = spawn_forwarder(request.app.address, endpoint_id=endpoint_id) - connection_info = fw.connection_info - ret_package = {'endpoint_id': endpoint_id} - ret_package.update(connection_info) - print("Ret_package : ", ret_package) - - print("Ep_id: ", endpoint_id) - request.app.ep_mapping[endpoint_id] = ret_package - return ret_package - - -@route('/list_mappings') -def list_mappings(): - return request.app.ep_mapping - - -if __name__ == '__main__': - parser = argparse.ArgumentParser() - parser.add_argument("-p", "--port", default=8088, - help="Port at which the service will listen on") - parser.add_argument("-a", "--address", default='127.0.0.1', - help="Address at which the service is running") - parser.add_argument("-c", "--config", default=None, - help="Config file") - parser.add_argument("-d", "--debug", action='store_true', - help="Enables debug logging") - - args = parser.parse_args() - - app = bottle.default_app() - app.address = args.address - app.ep_mapping = {} - - try: - run(host='localhost', app=app, port=int(args.port), debug=True) - except Exception as e: - # This doesn't do anything - print("Caught exception : {}".format(e)) - exit(-1) diff --git a/funcx_endpoint/funcx_endpoint/mock_broker/mock_tester.py b/funcx_endpoint/funcx_endpoint/mock_broker/mock_tester.py deleted file mode 100644 index 07c59f39c..000000000 --- a/funcx_endpoint/funcx_endpoint/mock_broker/mock_tester.py +++ /dev/null @@ -1,32 +0,0 @@ -import argparse -import requests -import funcx -import sys -import platform -import getpass - - -def test(address): - r = requests.post(address + '/register', - json={'python_v': "{}.{}".format(sys.version_info.major, - sys.version_info.minor), - 'os': platform.system(), - 'hname': platform.node(), - 'username': getpass.getuser(), - 'funcx_v': str(funcx.__version__) - } - ) - print("Status code :", r.status_code) - print("Json : ", r.json()) - - -if __name__ == '__main__': - parser = argparse.ArgumentParser() - parser.add_argument("-p", "--port", default=8088, - help="Port at which the service will listen on") - parser.add_argument("-d", "--debug", action='store_true', - help="Enables debug logging") - - args = parser.parse_args() - - test("http://0.0.0.0:{}".format(args.port)) diff --git a/funcx_endpoint/funcx_endpoint/mock_broker/test.py b/funcx_endpoint/funcx_endpoint/mock_broker/test.py deleted file mode 100644 index e73b6206d..000000000 --- a/funcx_endpoint/funcx_endpoint/mock_broker/test.py +++ /dev/null @@ -1,69 +0,0 @@ -from funcx_endpoint.executors import HighThroughputExecutor as HTEX -from parsl.providers import LocalProvider -from parsl.channels import LocalChannel -import parsl -import time -parsl.set_stream_logger() - - -def double(x): - return x * 2 - - -def fail(x): - return x / 0 - - -def test_1(): - - x = HTEX(label='htex', - provider=LocalProvider( - channel=LocalChannel), - address="127.0.0.1", - ) - task_p, result_p, command_p = x.start() - print(task_p, result_p, command_p) - print("Executor initialized : ", x) - - args = [2] - kwargs = {} - f1 = x.submit(double, *args, **kwargs) - print("Sent task with :", f1) - args = [2] - kwargs = {} - f2 = x.submit(fail, *args, **kwargs) - - print("hi") - while True: - stop = input("Stop ? (y/n)") - if stop == "y": - break - - print("F1: {}, f2: {}".format(f1.done(), f2.done())) - x.shutdown() - - -def test_2(): - - from funcx_endpoint.executors.high_throughput.executor import executor_starter - - htex = HTEX(label='htex', - provider=LocalProvider( - channel=LocalChannel), - address="127.0.0.1") - print("Foo") - executor_starter(htex, "forwarder", "ep_01") - print("Here") - - -def test_3(): - from funcx_endpoint.mock_broker.forwarder import Forwarder, spawn_forwarder - fw = spawn_forwarder("127.0.0.1", endpoint_id="0001") - print("Spawned forwarder") - time.sleep(120) - print("Terminating") - fw.terminate() - - -if __name__ == '__main__': - test_3() diff --git a/funcx_endpoint/funcx_endpoint/queues/__init__.py b/funcx_endpoint/funcx_endpoint/queues/__init__.py deleted file mode 100644 index 102a72285..000000000 --- a/funcx_endpoint/funcx_endpoint/queues/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from funcx_endpoint.queues.redis.redis_q import RedisQueue - -__all__ = ['RedisQueue'] diff --git a/funcx_endpoint/funcx_endpoint/queues/base.py b/funcx_endpoint/funcx_endpoint/queues/base.py deleted file mode 100644 index 875933845..000000000 --- a/funcx_endpoint/funcx_endpoint/queues/base.py +++ /dev/null @@ -1,50 +0,0 @@ -from abc import ABCMeta, abstractmethod, abstractproperty -from funcx.utils.errors import FuncxError - - -class NotConnected(FuncxError): - """ Queue is not connected/active - """ - - def __init__(self, queue): - self.queue = queue - - def __repr__(self): - return "Queue {} is not connected. Cannot execute queue operations".format(self.queue) - - -class FuncxQueue(metaclass=ABCMeta): - """ Queue interface required by the Forwarder - - This is a metaclass that only enforces concrete implementations of - functionality by the child classes. - """ - - @abstractmethod - def connect(self, *args, **kwargs): - """ Connects and creates the queue. - The queue is not active until this is called - """ - pass - - @abstractmethod - def get(self, *args, **kwargs): - """ Get an item from the Queue - """ - pass - - @abstractmethod - def put(self, *args, **kwargs): - """ Put an item into the Queue - """ - pass - - @abstractproperty - def is_connected(self): - """ Returns the connected status of the queue. - - Returns - ------- - Bool - """ - pass diff --git a/funcx_endpoint/funcx_endpoint/queues/redis/__init__.py b/funcx_endpoint/funcx_endpoint/queues/redis/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/funcx_endpoint/funcx_endpoint/queues/redis/redis_q.py b/funcx_endpoint/funcx_endpoint/queues/redis/redis_q.py deleted file mode 100644 index 3bf267b3c..000000000 --- a/funcx_endpoint/funcx_endpoint/queues/redis/redis_q.py +++ /dev/null @@ -1,98 +0,0 @@ -import redis -import json - -from funcx_endpoint.queues.base import NotConnected, FuncxQueue - - -class RedisQueue(FuncxQueue): - """ A basic redis queue - - The queue only connects when the `connect` method is called to avoid - issues with passing an object across processes. - - Parameters - ---------- - - hostname : str - Hostname of the redis server - - port : int - Port at which the redis server can be reached. Default: 6379 - - """ - - def __init__(self, prefix, hostname, port=6379): - """ Initialize - """ - self.hostname = hostname - self.port = port - self.redis_client = None - self.prefix = prefix - - def connect(self): - """ Connects to the Redis server - """ - try: - if not self.redis_client: - self.redis_client = redis.StrictRedis(host=self.hostname, port=self.port, decode_responses=True) - except redis.exceptions.ConnectionError: - print("ConnectionError while trying to connect to Redis@{}:{}".format(self.hostname, - self.port)) - - raise - - def get(self, timeout=1): - """ Get an item from the redis queue - - Parameters - ---------- - timeout : int - Timeout for the blocking get in seconds - """ - try: - task_list, task_id = self.redis_client.blpop(f'{self.prefix}_list', timeout=timeout) - jtask_info = self.redis_client.get(f'{self.prefix}:{task_id}') - task_info = json.loads(jtask_info) - except AttributeError: - raise NotConnected(self) - except redis.exceptions.ConnectionError: - print(f"ConnectionError while trying to connect to Redis@{self.hostname}:{self.port}") - raise - - return task_id, task_info - - def put(self, key, payload): - """ Put's the key:payload into a dict and pushes the key onto a queue - Parameters - ---------- - key : str - The task_id to be pushed - - payload : dict - Dict of task information to be stored - """ - try: - self.redis_client.set(f'{self.prefix}:{key}', json.dumps(payload)) - self.redis_client.rpush(f'{self.prefix}_list', key) - except AttributeError: - raise NotConnected(self) - except redis.exceptions.ConnectionError: - print("ConnectionError while trying to connect to Redis@{}:{}".format(self.hostname, - self.port)) - raise - - @property - def is_connected(self): - return self.redis_client is not None - - -def test(): - rq = RedisQueue('task', '127.0.0.1') - rq.connect() - rq.put("01", {'a': 1, 'b': 2}) - res = rq.get(timeout=1) - print("Result : ", res) - - -if __name__ == '__main__': - test() diff --git a/funcx_endpoint/tests/integration/test_redis.py b/funcx_endpoint/tests/integration/test_redis.py deleted file mode 100644 index b17645259..000000000 --- a/funcx_endpoint/tests/integration/test_redis.py +++ /dev/null @@ -1,70 +0,0 @@ -import argparse -from funcx.serialize import FuncXSerializer -from funcx_endpoint.queues import RedisQueue -import time - - -def slow_double(i, duration=0): - import time - time.sleep(duration) - return i * 4 - - -def test(endpoint_id=None, tasks=10, duration=1, hostname=None, port=None): - tasks_rq = RedisQueue(f'task_{endpoint_id}', hostname) - results_rq = RedisQueue('results', hostname) - fxs = FuncXSerializer() - - ser_code = fxs.serialize(slow_double) - fn_code = fxs.pack_buffers([ser_code]) - - tasks_rq.connect() - results_rq.connect() - - while True: - try: - _ = results_rq.get(timeout=1) - except Exception: - print("No more results left") - break - - start = time.time() - for i in range(tasks): - ser_args = fxs.serialize([i]) - ser_kwargs = fxs.serialize({'duration': duration}) - input_data = fxs.pack_buffers([ser_args, ser_kwargs]) - payload = fn_code + input_data - container_id = "odd" if i % 2 else "even" - tasks_rq.put(f"0{i};{container_id}", payload) - - d1 = time.time() - start - print("Time to launch {} tasks: {:8.3f} s".format(tasks, d1)) - - print(f"Launched {tasks} tasks") - for _i in range(tasks): - _ = results_rq.get(timeout=300) - # print("Result : ", res) - - delta = time.time() - start - print("Time to complete {} tasks: {:8.3f} s".format(tasks, delta)) - print("Throughput : {:8.3f} Tasks/s".format(tasks / delta)) - return delta - - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument("-r", "--redis_hostname", required=True, - help="Hostname of the Redis server") - parser.add_argument("-e", "--endpoint_id", required=True, - help="Endpoint_id") - parser.add_argument("-d", "--duration", required=True, - help="Duration of the tasks") - parser.add_argument("-c", "--count", required=True, - help="Number of tasks") - - args = parser.parse_args() - - test(endpoint_id=args.endpoint_id, - hostname=args.redis_hostname, - duration=int(args.duration), - tasks=int(args.count))