Skip to content

Commit

Permalink
Update/cleanup; added Queue class in server for better "instancing"; …
Browse files Browse the repository at this point in the history
…restructured files for Dockerfile
  • Loading branch information
ptmcg committed Apr 6, 2021
1 parent 79df2db commit b23090d
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 36 deletions.
60 changes: 60 additions & 0 deletions client/rq_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import requests
from rich import print


class RestQueue:
def __init__(self, host: str, port: int, queue_name: str):
self._host = host
self._port = port
self._queue_name = queue_name
self._queue_url = f"http://{host}:{port}/queues/{queue_name}"
self._session = requests.Session()

def create(self):
response = self._session.post(self._queue_url)
return response.status_code, response.json()

def push(self, value):
if value is None:
print(f"pushed value cannot be None")
return

url = f"{self._queue_url}/push"
response = self._session.post(url, params={"value": value})
if response.status_code == 404:
print(f"no such queue {self._queue_name!r}")

return response.json()

def pop(self):
url = f"{self._queue_url}/pop"
response = self._session.post(url)
if response.status_code == 404:
print(f"no such queue {self._queue_name!r}")
return

return response.json()

def list(self):
url = self._queue_url
response = self._session.get(url)
if response.status_code == 404:
print(f"no such queue {self._queue_name!r}")
return

return response.json()


if __name__ == '__main__':
qname = "test"
rq = RestQueue("localhost", 8000, qname)
print(rq.create())

print(rq.push(100))
print(rq.push("secret message"))
print(rq.push(3.14159))
print(rq.push(True))

print(rq.list())

print(rq.pop())
4 changes: 1 addition & 3 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ FROM python:3.9.2-alpine

WORKDIR /usr/src/app

COPY src .
COPY ../rest_queue/src .

RUN pip install --no-cache-dir -r requirements.txt

CMD [ "uvicorn", "--host", "0.0.0.0", "rest_queue:app" ]


107 changes: 74 additions & 33 deletions rest_queue/src/rest_queue.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,59 @@
#
# simple_rest_queue.py
# rest_queue.py
#
# A simple FastAPI web service that maintains multiple named FIFO queues
# A lightweight FastAPI web service that maintains multiple named FIFO queues
# that can be created, pushed, and popped using REST calls
#
# NOTE: this queue does NO persistence, so restarting the server will lose all
# items that are still in the queue.
#
from collections import deque
from typing import Dict

import fastapi


class Queue:
"""
A small container class to wrap a deque with push and pop methods,
and __iter__ and __len__ dunder methods.
"""
def __init__(self, name):
self._name = name
self._contents = deque()

def __iter__(self):
return iter(self._contents)

def __len__(self):
return len(self._contents)

def push(self, value):
self._contents.append(value)

def pop(self):
return self._contents.popleft()


# registry for REST-accessible queues
queues: Dict[str, deque] = {}
queues: Dict[str, Queue] = {}


app = fastapi.FastAPI()

"""
Implement these endpoints:
Create - POST http://host:port/queues/<queue_name>
Push - POST http://host:port/queues/<queue_name>/push
Pop - POST http://host:port/queues/<queue_name>/pop
List - GET http://host:port/queues/<queue_name>
Delete - DELETE http://host:port/queues/<queue_name>
List - GET http://host:port/queues
"""


# find HTTP statuses at http://httpstatuses.com
@app.post("/queues/{queue_name}", status_code=fastapi.status.HTTP_201_CREATED)
async def queue_new(queue_name: str):
"""
Expand All @@ -28,12 +66,30 @@ async def queue_new(queue_name: str):
detail=f"Queue {queue_name!r} already exists")

# create new deque and add to registry under the given name
queues[queue_name] = deque()
queues[queue_name] = Queue(queue_name)

return {"message": f"created {queue_name!r}"}


@app.get("/queues/{queue_name}", status_code=fastapi.status.HTTP_200_OK)
@app.post("/queues/{queue_name}/push")
async def queue_push(queue_name: str, value: str):
"""
Push new value onto queue.
"""
if queue_name not in queues:
raise fastapi.HTTPException(status_code=fastapi.status.HTTP_404_NOT_FOUND,
detail=f"No such queue {queue_name!r}")

q = queues[queue_name]
q.push(value)

return {
"message": f"push to queue {queue_name!r}",
"remaining_queue_size": len(q),
}


@app.post("/queues/{queue_name}/pop")
async def queue_pop(queue_name: str):
"""
Pop oldest value from queue, or None if queue is empty.
Expand All @@ -44,35 +100,34 @@ async def queue_pop(queue_name: str):

q = queues[queue_name]
if q:
value = q.popleft()
value = q.pop()
else:
value = None

return {"message": f"get from queue {queue_name!r}",
return {"message": f"pop from queue {queue_name!r}",
"data": value,
"remaining_queue_size": len(q),
}


@app.put("/queues/{queue_name}", status_code=fastapi.status.HTTP_200_OK)
async def queue_push(queue_name: str, value: str):
@app.get("/queues/{queue_name}")
async def queue_list(queue_name: str):
"""
Push new value onto queue.
List out contents of queue.
"""
if queue_name not in queues:
raise fastapi.HTTPException(status_code=fastapi.status.HTTP_404_NOT_FOUND,
detail=f"No such queue {queue_name!r}")

q = queues[queue_name]
q.append(value)

return {
"message": f"put to queue {queue_name!r}",
"remaining_queue_size": len(q),
"message": f"items in queue {queue_name!r}",
"items": list(q),
}


@app.delete("/queues/{queue_name}", status_code=fastapi.status.HTTP_200_OK)
@app.delete("/queues/{queue_name}")
async def queue_delete(queue_name: str,
safe_delete: bool = True):
"""
Expand All @@ -86,7 +141,10 @@ async def queue_delete(queue_name: str,

if safe_delete and q:
raise fastapi.HTTPException(status_code=fastapi.status.HTTP_409_CONFLICT,
detail=f"Queue {queue_name!r} not empty (contains {len(q)} items)")
detail={"message": f"Queue {queue_name!r} not empty",
"count": len(q),
}
)

# remove queue from registry
del queues[queue_name]
Expand All @@ -97,24 +155,7 @@ async def queue_delete(queue_name: str,
}


@app.get("/queues/{queue_name}/list", status_code=fastapi.status.HTTP_200_OK)
async def queue_list(queue_name: str):
"""
List out contents of queue.
"""
if queue_name not in queues:
raise fastapi.HTTPException(status_code=fastapi.status.HTTP_404_NOT_FOUND,
detail=f"No such queue {queue_name!r}")

q = queues[queue_name]

return {
"message": f"items in queue {queue_name!r}",
"items": list(q),
}


@app.get("/queues", status_code=fastapi.status.HTTP_200_OK)
@app.get("/queues")
async def queue_list_all():
"""
List out all queue names.
Expand Down

0 comments on commit b23090d

Please sign in to comment.