From b23090d6db38103272fb3c1789462c89d67d5159 Mon Sep 17 00:00:00 2001 From: ptmcg Date: Tue, 6 Apr 2021 09:37:56 -0500 Subject: [PATCH] Update/cleanup; added Queue class in server for better "instancing"; restructured files for Dockerfile --- client/rq_client.py | 60 ++++++++++++++++++++ docker/Dockerfile | 4 +- rest_queue/src/rest_queue.py | 107 ++++++++++++++++++++++++----------- 3 files changed, 135 insertions(+), 36 deletions(-) diff --git a/client/rq_client.py b/client/rq_client.py index e69de29..49c72e3 100644 --- a/client/rq_client.py +++ b/client/rq_client.py @@ -0,0 +1,60 @@ +import requests +from rich import print + + +class RestQueue: + def __init__(self, host: str, port: int, queue_name: str): + self._host = host + self._port = port + self._queue_name = queue_name + self._queue_url = f"http://{host}:{port}/queues/{queue_name}" + self._session = requests.Session() + + def create(self): + response = self._session.post(self._queue_url) + return response.status_code, response.json() + + def push(self, value): + if value is None: + print(f"pushed value cannot be None") + return + + url = f"{self._queue_url}/push" + response = self._session.post(url, params={"value": value}) + if response.status_code == 404: + print(f"no such queue {self._queue_name!r}") + + return response.json() + + def pop(self): + url = f"{self._queue_url}/pop" + response = self._session.post(url) + if response.status_code == 404: + print(f"no such queue {self._queue_name!r}") + return + + return response.json() + + def list(self): + url = self._queue_url + response = self._session.get(url) + if response.status_code == 404: + print(f"no such queue {self._queue_name!r}") + return + + return response.json() + + +if __name__ == '__main__': + qname = "test" + rq = RestQueue("localhost", 8000, qname) + print(rq.create()) + + print(rq.push(100)) + print(rq.push("secret message")) + print(rq.push(3.14159)) + print(rq.push(True)) + + print(rq.list()) + + print(rq.pop()) diff --git a/docker/Dockerfile b/docker/Dockerfile index d74e97b..4fbae50 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -2,10 +2,8 @@ FROM python:3.9.2-alpine WORKDIR /usr/src/app -COPY src . +COPY ../rest_queue/src . RUN pip install --no-cache-dir -r requirements.txt CMD [ "uvicorn", "--host", "0.0.0.0", "rest_queue:app" ] - - diff --git a/rest_queue/src/rest_queue.py b/rest_queue/src/rest_queue.py index 22a22f7..d959bf7 100644 --- a/rest_queue/src/rest_queue.py +++ b/rest_queue/src/rest_queue.py @@ -1,21 +1,59 @@ # -# simple_rest_queue.py +# rest_queue.py # -# A simple FastAPI web service that maintains multiple named FIFO queues +# A lightweight FastAPI web service that maintains multiple named FIFO queues # that can be created, pushed, and popped using REST calls # +# NOTE: this queue does NO persistence, so restarting the server will lose all +# items that are still in the queue. +# from collections import deque from typing import Dict import fastapi + +class Queue: + """ + A small container class to wrap a deque with push and pop methods, + and __iter__ and __len__ dunder methods. + """ + def __init__(self, name): + self._name = name + self._contents = deque() + + def __iter__(self): + return iter(self._contents) + + def __len__(self): + return len(self._contents) + + def push(self, value): + self._contents.append(value) + + def pop(self): + return self._contents.popleft() + + # registry for REST-accessible queues -queues: Dict[str, deque] = {} +queues: Dict[str, Queue] = {} app = fastapi.FastAPI() +""" +Implement these endpoints: +Create - POST http://host:port/queues/ +Push - POST http://host:port/queues//push +Pop - POST http://host:port/queues//pop +List - GET http://host:port/queues/ +Delete - DELETE http://host:port/queues/ +List - GET http://host:port/queues +""" + + +# find HTTP statuses at http://httpstatuses.com @app.post("/queues/{queue_name}", status_code=fastapi.status.HTTP_201_CREATED) async def queue_new(queue_name: str): """ @@ -28,12 +66,30 @@ async def queue_new(queue_name: str): detail=f"Queue {queue_name!r} already exists") # create new deque and add to registry under the given name - queues[queue_name] = deque() + queues[queue_name] = Queue(queue_name) return {"message": f"created {queue_name!r}"} -@app.get("/queues/{queue_name}", status_code=fastapi.status.HTTP_200_OK) +@app.post("/queues/{queue_name}/push") +async def queue_push(queue_name: str, value: str): + """ + Push new value onto queue. + """ + if queue_name not in queues: + raise fastapi.HTTPException(status_code=fastapi.status.HTTP_404_NOT_FOUND, + detail=f"No such queue {queue_name!r}") + + q = queues[queue_name] + q.push(value) + + return { + "message": f"push to queue {queue_name!r}", + "remaining_queue_size": len(q), + } + + +@app.post("/queues/{queue_name}/pop") async def queue_pop(queue_name: str): """ Pop oldest value from queue, or None if queue is empty. @@ -44,35 +100,34 @@ async def queue_pop(queue_name: str): q = queues[queue_name] if q: - value = q.popleft() + value = q.pop() else: value = None - return {"message": f"get from queue {queue_name!r}", + return {"message": f"pop from queue {queue_name!r}", "data": value, "remaining_queue_size": len(q), } -@app.put("/queues/{queue_name}", status_code=fastapi.status.HTTP_200_OK) -async def queue_push(queue_name: str, value: str): +@app.get("/queues/{queue_name}") +async def queue_list(queue_name: str): """ - Push new value onto queue. + List out contents of queue. """ if queue_name not in queues: raise fastapi.HTTPException(status_code=fastapi.status.HTTP_404_NOT_FOUND, detail=f"No such queue {queue_name!r}") q = queues[queue_name] - q.append(value) return { - "message": f"put to queue {queue_name!r}", - "remaining_queue_size": len(q), + "message": f"items in queue {queue_name!r}", + "items": list(q), } -@app.delete("/queues/{queue_name}", status_code=fastapi.status.HTTP_200_OK) +@app.delete("/queues/{queue_name}") async def queue_delete(queue_name: str, safe_delete: bool = True): """ @@ -86,7 +141,10 @@ async def queue_delete(queue_name: str, if safe_delete and q: raise fastapi.HTTPException(status_code=fastapi.status.HTTP_409_CONFLICT, - detail=f"Queue {queue_name!r} not empty (contains {len(q)} items)") + detail={"message": f"Queue {queue_name!r} not empty", + "count": len(q), + } + ) # remove queue from registry del queues[queue_name] @@ -97,24 +155,7 @@ async def queue_delete(queue_name: str, } -@app.get("/queues/{queue_name}/list", status_code=fastapi.status.HTTP_200_OK) -async def queue_list(queue_name: str): - """ - List out contents of queue. - """ - if queue_name not in queues: - raise fastapi.HTTPException(status_code=fastapi.status.HTTP_404_NOT_FOUND, - detail=f"No such queue {queue_name!r}") - - q = queues[queue_name] - - return { - "message": f"items in queue {queue_name!r}", - "items": list(q), - } - - -@app.get("/queues", status_code=fastapi.status.HTTP_200_OK) +@app.get("/queues") async def queue_list_all(): """ List out all queue names.