diff --git a/.env b/.env index 0f959db..50b51ec 100644 --- a/.env +++ b/.env @@ -1,6 +1,7 @@ # Port to run memas FLASK_RUN_PORT=8010 +MEMAS_CONF_FILE=memas-config.yml # Password for the 'elastic' user (at least 6 characters) ELASTIC_PASSWORD=abc123 diff --git a/.pylintrc b/.pylintrc new file mode 100644 index 0000000..a63f80b --- /dev/null +++ b/.pylintrc @@ -0,0 +1,2 @@ +[FORMAT] +max-line-length=120 \ No newline at end of file diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index f630afa..85522fc 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -31,7 +31,7 @@ To stop docker execution, run Control+C in the terminal you are running `docker If you want to clean your local docker images, run ```bash -docker compose down --volumes +docker compose --profile dev down --volumes ``` FYI you may need to run `sysctl -w vm.max_map_count=262144` if you get an error when trying to start elasticsearch. @@ -46,20 +46,20 @@ docker compose up If this is your first time initializing the MeMaS server, after `docker compose up` and wait till the dependencies are fully started, run `source setup-env.sh`, then ```bash -flask --app 'memas.app:create_app(config_filename="memas-config.yml", first_init=True)' run +flask --app 'memas.app:create_app(first_init=True)' run ``` This will run for a while then exit. Upon exit, your MeMaS is properly setup. **NOTE: Only run this phase when you are working with a clean set of docker dependencies, aka a fresh start or after `docker compose down --volumes`.** After MeMaS is properly initialized, run `source setup-env.sh`, then: ```bash -flask --app 'memas.app:create_app(config_filename="memas-config.yml")' run +flask --app 'memas.app:create_app' run ``` to start the memas server. And to run the app with wsgi server, run ```bash -gunicorn -w 1 -k eventlet 'memas.app:create_app(config_filename="memas-config.yml")' +gunicorn -w 1 -k eventlet 'memas.app:create_app' ``` note `-w` sets the number of worker threads. diff --git a/Dockerfile b/Dockerfile index 2c3abc6..ff7cad3 100644 --- a/Dockerfile +++ b/Dockerfile @@ -24,7 +24,7 @@ COPY --chmod=0755 memas-docker/init.sh ./init.sh # Copy in the default config ARG conf_file=memas-config.yml -ENV conf_file=${conf_file} +ENV MEMAS_CONF_FILE=${conf_file} COPY memas-docker/${conf_file} ./memas/${conf_file} # TODO: provide way to use custom configs in docker compose @@ -34,4 +34,4 @@ ENV PYTHONPATH "$PYTHONPATH:memas" EXPOSE 8010 -CMD gunicorn -b :8010 -w 1 -k eventlet "memas.app:create_app(config_filename=\"${conf_file}\")" +CMD gunicorn -b :8010 -w 1 -k eventlet "memas.app:create_app()" diff --git a/docker-compose.yml b/docker-compose.yml index b7b70a2..9e701e0 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -38,6 +38,29 @@ services: # command: ./wait-for-it.sh milvus-standalone:19530 -t 300 -- gunicorn -w 1 -k eventlet 'memas.app:create_app(config_filename="memas-config.yml")' profiles: ["dev"] + memas-worker: + build: + context: . + image: memas:latest + container_name: memas-worker + depends_on: + memas-init: + condition: service_completed_successfully + env_file: + - .env + volumes: + - memas_data:/memas + command: celery --app memas.make_celery worker --loglevel INFO + profiles: ["dev"] + + redis: + image: redis + container_name: redis + ports: + - 6379:6379 + volumes: + - redis_data:/data + scylla: image: scylladb/scylla container_name: scylla @@ -63,6 +86,11 @@ services: volumes: - etcd_data:/etcd command: etcd -advertise-client-urls=http://127.0.0.1:2379 -listen-client-urls http://0.0.0.0:2379 --data-dir /etcd + healthcheck: + test: ["CMD", "etcdctl", "endpoint", "health"] + interval: 30s + timeout: 20s + retries: 3 minio: container_name: milvus-minio @@ -71,8 +99,8 @@ services: MINIO_ACCESS_KEY: minioadmin MINIO_SECRET_KEY: minioadmin volumes: - - minio_data:/data - command: minio server /data + - minio_data:/minio_data + command: minio server /minio_data --console-address ":9001" healthcheck: test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"] interval: 30s @@ -81,7 +109,7 @@ services: milvus: container_name: milvus-standalone - image: milvusdb/milvus:v2.2.8 + image: milvusdb/milvus:v2.3.2 command: ["milvus", "run", "standalone"] environment: ETCD_ENDPOINTS: etcd:2379 @@ -117,6 +145,9 @@ volumes: memas_data: driver: local + redis_data: + driver: local + esdata01: driver: local diff --git a/integration-tests/conftest.py b/integration-tests/conftest.py index 5da0b42..7530be3 100644 --- a/integration-tests/conftest.py +++ b/integration-tests/conftest.py @@ -6,8 +6,8 @@ from pymilvus import connections as milvus_connection from elasticsearch import Elasticsearch from flask import Flask, Config +from memas import context_manager from memas.app import create_app -from memas.context_manager import ctx, EnvironmentConstants, read_env from memas.storage_driver import corpus_doc_store, corpus_vector_store @@ -23,7 +23,7 @@ def clean_resources(): config = Config(os.getcwd() + "/memas/") config.from_file(CONFIG_PATH, load=yaml.safe_load) - constants = EnvironmentConstants(config) + constants = context_manager.EnvironmentConstants(config) try: connection.setup([constants.cassandra_ip], "system", protocol_version=4) @@ -54,9 +54,9 @@ def create_test_app(): # first init to setup with pytest.raises(SystemExit): # Note that first init should exit after initializing. So we need to catch and verify - create_app(CONFIG_PATH, first_init=True) + create_app(config_filename=CONFIG_PATH, first_init=True) - app = create_app(CONFIG_PATH) + app = create_app(config_filename=CONFIG_PATH) return app @@ -72,4 +72,10 @@ def test_client(): @pytest.fixture def es_client(): with app.app_context(): - yield ctx.es + yield context_manager.ctx.es + + +@pytest.fixture +def ctx(): + with app.app_context(): + yield context_manager.ctx diff --git a/integration-tests/corpus/test_basic_corpus.py b/integration-tests/corpus/test_basic_corpus.py index 896766b..318eb63 100644 --- a/integration-tests/corpus/test_basic_corpus.py +++ b/integration-tests/corpus/test_basic_corpus.py @@ -1,16 +1,17 @@ import numpy as np import uuid import time -from memas.context_manager import ctx from memas.corpus import basic_corpus -from memas.interface.corpus import Citation +from memas.interface.corpus import Citation, CorpusInfo, CorpusType corpus_name = "test corpus1" -def test_save_then_search_one_corpus(es_client): - test_corpus = basic_corpus.BasicCorpus( - uuid.uuid4(), corpus_name, ctx.corpus_metadata, ctx.corpus_doc, ctx.corpus_vec) +def test_save_then_search_one_corpus(ctx): + namespace_id = uuid.uuid4() + corpus_id = uuid.uuid4() + corpus_info = CorpusInfo("test_corpus_save_search", namespace_id, corpus_id, CorpusType.CONVERSATION) + test_corpus = basic_corpus.BasicCorpus(corpus_info, ctx.corpus_metadata, ctx.corpus_doc, ctx.corpus_vec) text1 = "The sun is high. California sunshine is great. " text2 = "I picked up my phone and then dropped it again. I cant seem to get a good grip on things these days. It persists into my everyday tasks" @@ -26,3 +27,27 @@ def test_save_then_search_one_corpus(es_client): # print(output) assert "sunshine" in output[1][0] assert "weather" in output[0][0] + + +def test_delete_all_content(ctx): + namespace_id = uuid.uuid4() + corpus_id = uuid.uuid4() + corpus_info = CorpusInfo("test_delete_all_content", namespace_id, corpus_id, CorpusType.CONVERSATION) + test_corpus = basic_corpus.BasicCorpus(corpus_info, ctx.corpus_metadata, ctx.corpus_doc, ctx.corpus_vec) + + text1 = "The sun is high. California sunshine is great. " + text2 = "I picked up my phone and then dropped it again. I cant seem to get a good grip on things these days. It persists into my everyday tasks" + text3 = "The weather is great today, but I worry that tomorrow it won't be. My umbrella is in the repair shop." + + assert test_corpus.store_and_index(text1, Citation("www.docsource1", "SSSdoc1", "", "doc1")) + assert test_corpus.store_and_index(text2, Citation("were.docsource2", "SSSdoc2", "", "doc2")) + assert test_corpus.store_and_index(text3, Citation("docsource3.ai", "SSSdoc3", "", "doc3")) + + time.sleep(1) + output = test_corpus.search("It is sunny") + assert "sunshine" in output[1][0] + + test_corpus.delete_all_content() + time.sleep(1) + output = test_corpus.search("It is sunny") + assert output == [] diff --git a/integration-tests/integ-test-config.yml b/integration-tests/integ-test-config.yml index dd2814f..3ed9332 100644 --- a/integration-tests/integ-test-config.yml +++ b/integration-tests/integ-test-config.yml @@ -11,3 +11,8 @@ ELASTICSEARCH: MILVUS: ip: "127.0.0.1" port: 19530 + +CELERY: + broker_url: "redis://localhost" + result_backend: "redis://localhost" + task_ignore_result: True diff --git a/integration-tests/storage_driver/test_corpus_doc_metadata.py b/integration-tests/storage_driver/test_corpus_doc_metadata.py index d7be4f2..a0261dd 100644 --- a/integration-tests/storage_driver/test_corpus_doc_metadata.py +++ b/integration-tests/storage_driver/test_corpus_doc_metadata.py @@ -1,5 +1,7 @@ +import pytest import uuid from memas.interface.corpus import Citation +from memas.interface.exceptions import DocumentMetadataNotFound from memas.storage_driver.corpus_doc_metadata import CorpusDocumentMetadataStoreImpl @@ -17,3 +19,15 @@ def test_insert_and_get(): citation = Citation("google.com", "test google", "just a simple test", "test") metadata.insert_document_metadata(corpus_id, document_id, 1, citation) assert metadata.get_document_citation(corpus_id, document_id) == citation + + +def test_delete_corpus(): + corpus_id = uuid.uuid4() + document_id = uuid.uuid4() + + citation = Citation("google.com", "test google", "just a simple test", "test") + metadata.insert_document_metadata(corpus_id, document_id, 1, citation) + metadata.delete_corpus(corpus_id) + + with pytest.raises(DocumentMetadataNotFound): + metadata.get_document_citation(corpus_id, document_id) diff --git a/integration-tests/storage_driver/test_corpus_doc_store.py b/integration-tests/storage_driver/test_corpus_doc_store.py index d7f5274..99fd3b0 100644 --- a/integration-tests/storage_driver/test_corpus_doc_store.py +++ b/integration-tests/storage_driver/test_corpus_doc_store.py @@ -7,13 +7,13 @@ import time -def test_init(es_client: Elasticsearch): - doc_store = corpus_doc_store.ESDocumentStore(es_client) +def test_init(ctx: ContextManager): + doc_store = corpus_doc_store.ESDocumentStore(ctx.es) doc_store.init() -def test_save_then_search(es_client): - doc_store = corpus_doc_store.ESDocumentStore(es_client) +def test_save_then_search(ctx: ContextManager): + doc_store = corpus_doc_store.ESDocumentStore(ctx.es) doc_store.init() corpus_id1 = uuid.uuid1() diff --git a/integration-tests/storage_driver/test_corpus_vector_store.py b/integration-tests/storage_driver/test_corpus_vector_store.py index cb14c5e..4e8e2de 100644 --- a/integration-tests/storage_driver/test_corpus_vector_store.py +++ b/integration-tests/storage_driver/test_corpus_vector_store.py @@ -12,7 +12,7 @@ def test_init(): store.init() -def test_save_then_search2(): +def test_save_then_search(): print("before UIDs") corpus_id1 = uuid.uuid4() corpus_id2 = uuid.uuid4() @@ -46,3 +46,27 @@ def test_save_then_search2(): # Test that the document stored in the other corpus isn't a result assert document_id3 not in {t[1].document_id for t in result} + + +def test_delete_corpus(): + corpus_id1 = uuid.uuid4() + document_id0 = uuid.uuid4() + document_id1 = uuid.uuid4() + document_id2 = uuid.uuid4() + + best_match_str = "The sun is high. California sunshine is great. " + + store.save_documents([DocumentEntity(corpus_id1, document_id0, "doc0", + "Before This is a runon sentence meant to test the logic of the splitting capabilites but that is only the start, there is nothing that can break this sentecne up other than some handy logic even in the worst case, too bad I only know how to use commas")]) + store.save_documents([DocumentEntity(corpus_id1, document_id1, "doc1", + "The sun is high! California sunshine is great. Did you catch my quest? Oh oh! lol")]) + store.save_documents([DocumentEntity(corpus_id1, document_id2, "doc2", + "I picked up my phone and then dropped it again")]) + time.sleep(1) + + store.delete_corpus(corpus_id1) + time.sleep(1) + + result = store.search_corpora([corpus_id1], "How's the weather today?") + + assert len(result) == 0 diff --git a/integration-tests/storage_driver/test_memas_metadata.py b/integration-tests/storage_driver/test_memas_metadata.py index 5eb115b..47e74b9 100644 --- a/integration-tests/storage_driver/test_memas_metadata.py +++ b/integration-tests/storage_driver/test_memas_metadata.py @@ -1,6 +1,7 @@ import pytest -from memas.interface.corpus import CorpusType -from memas.interface.exceptions import NamespaceExistsException +import uuid +from memas.interface.corpus import CorpusInfo, CorpusType +from memas.interface.exceptions import IllegalStateException, NamespaceExistsException, NamespaceDoesNotExistException from memas.interface.namespace import ROOT_ID from memas.storage_driver.memas_metadata import MemasMetadataStoreImpl @@ -56,4 +57,58 @@ def test_get_query_corpora(): corpus_id2 = metadata.create_conversation_corpus("test_get_query_corpora:convo1") corpora = metadata.get_query_corpora("test_get_query_corpora") - assert corpora == {corpus_id1, corpus_id2} + assert corpora == { + CorpusInfo("test_get_query_corpora:knowledge1", namespace_id, corpus_id1, CorpusType.CONVERSATION), + CorpusInfo("test_get_query_corpora:convo1", namespace_id, corpus_id2, CorpusType.CONVERSATION) + } + + +def test_initiate_delete_corpus(): + metadata.init() + + namespace_id = metadata.create_namespace("test_initiate_delete_corpus") + corpus_pathname = "test_initiate_delete_corpus:convo1" + corpus_id = metadata.create_conversation_corpus(corpus_pathname) + + corpus_info = metadata.get_corpus_info(corpus_pathname) + assert corpus_info.corpus_id == corpus_id + + metadata.initiate_delete_corpus(namespace_id, corpus_id, corpus_pathname) + + # since it's not a full delete, we'll still be able to query metadata given the ids + corpus_info = metadata.get_corpus_info_by_id(namespace_id, corpus_id) + assert corpus_info.corpus_id == corpus_id + + with pytest.raises(NamespaceDoesNotExistException): + corpus_info = metadata.get_corpus_info(corpus_pathname) + + +def test_initiate_delete_wrong_corpus(): + metadata.init() + + namespace_id = metadata.create_namespace("initiate_delete_wrong_corpus") + corpus_pathname = "initiate_delete_wrong_corpus:convo1" + corpus_id = metadata.create_conversation_corpus(corpus_pathname) + + corpus_info = metadata.get_corpus_info(corpus_pathname) + assert corpus_info.corpus_id == corpus_id + + with pytest.raises(NamespaceDoesNotExistException): + metadata.initiate_delete_corpus(namespace_id, uuid.uuid4(), corpus_pathname) + + +def test_finish_delete_corpus(): + metadata.init() + + namespace_id = metadata.create_namespace("test_finish_delete_corpus") + corpus_pathname = "test_finish_delete_corpus:convo1" + corpus_id = metadata.create_conversation_corpus(corpus_pathname) + + corpus_info = metadata.get_corpus_info_by_id(namespace_id, corpus_id) + assert corpus_info.corpus_id == corpus_id + + metadata.initiate_delete_corpus(namespace_id, corpus_id, corpus_pathname) + metadata.finish_delete_corpus(namespace_id, corpus_id) + + with pytest.raises(IllegalStateException): + metadata.get_corpus_info_by_id(namespace_id, corpus_id) diff --git a/integration-tests/test_celery_worker.py b/integration-tests/test_celery_worker.py new file mode 100644 index 0000000..ebacc25 --- /dev/null +++ b/integration-tests/test_celery_worker.py @@ -0,0 +1,15 @@ +import pytest +from unittest import mock +import memas.celery_worker +from memas.interface.exceptions import NamespaceDoesNotExistException + + +@mock.patch("memas.celery_worker.time.sleep") +def test_delete_corpus(mock_sleep, ctx): + namespace_id = ctx.memas_metadata.create_namespace("celery_delete_corpus") + corpus_pathname = "celery_delete_corpus:corpus1" + corpus_id = ctx.memas_metadata.create_conversation_corpus(corpus_pathname) + memas.celery_worker.delete_corpus(namespace_id, corpus_id, corpus_pathname) + + with pytest.raises(NamespaceDoesNotExistException): + ctx.memas_metadata.get_corpus_ids_by_name(corpus_pathname) diff --git a/integration-tests/test_controlplane.py b/integration-tests/test_controlplane.py index c68515c..b1bbc0e 100644 --- a/integration-tests/test_controlplane.py +++ b/integration-tests/test_controlplane.py @@ -1,3 +1,4 @@ +from unittest import mock def test_create_user(test_client): @@ -22,3 +23,12 @@ def test_create_corpus_existing(test_client): resp = test_client.post("/cp/create_corpus", json={"corpus_pathname": "create_user_2:create_corpus_2"}) assert resp.status_code == 400 assert resp.json["error_code"] == "namespace_exists" + + +@mock.patch("memas.celery_worker.delete_corpus") +def test_delete_corpus(mock_delete_corpus, test_client): + test_client.post("/cp/create_corpus", json={"corpus_pathname": "create_user_2:delete_corpus"}) + + resp = test_client.post("/cp/delete_corpus", json={"corpus_pathname": "create_user_2:delete_corpus"}) + assert resp.status_code == 200 + assert mock_delete_corpus.has_called() diff --git a/memas-docker/init.sh b/memas-docker/init.sh index ae61126..8e6be92 100644 --- a/memas-docker/init.sh +++ b/memas-docker/init.sh @@ -25,5 +25,5 @@ if [ ! -e /memas/first-init.lock ] then # If initialization succeeded, create the lock file, and write our current version to it # FIXME: is running flask instead of gunicorn a security concern? Gunicorn keeps on trying to restart the worker thread despite we're intentionally exiting - flask --app "memas.app:create_app(config_filename=\"$conf_file\", first_init=True)" run && touch /memas/first-init.lock; echo $version > /memas/first-init.lock + flask --app "memas.app:create_app(first_init=True)" run && touch /memas/first-init.lock; echo $version > /memas/first-init.lock fi diff --git a/memas-docker/memas-config.yml b/memas-docker/memas-config.yml index 1574691..16681b9 100644 --- a/memas-docker/memas-config.yml +++ b/memas-docker/memas-config.yml @@ -11,3 +11,8 @@ ELASTICSEARCH: MILVUS: ip: "milvus-standalone" port: 19530 + +CELERY: + broker_url: "redis://redis" + result_backend: "redis://redis" + task_ignore_result: True diff --git a/memas/app.py b/memas/app.py index 0f03bdd..2a38b2f 100644 --- a/memas/app.py +++ b/memas/app.py @@ -1,19 +1,36 @@ +import sys import traceback import yaml +from celery import Celery, Task from flask import Flask -from memas.context_manager import ContextManager +from memas.context_manager import read_env, ContextManager from memas.interface.exceptions import MemasException -def create_app(config_filename, *, first_init=False): +def celery_init_app(app: Flask) -> Celery: + class FlaskTask(Task): + def __call__(self, *args: object, **kwargs: object) -> object: + with app.app_context(): + return self.run(*args, **kwargs) + + celery_app = Celery(app.name, task_cls=FlaskTask) + celery_app.config_from_object(app.config["CELERY"]) + celery_app.set_default() + app.extensions["celery"] = celery_app + return celery_app + + +def create_app(*, config_filename=None, first_init=False): app = Flask(__name__) + if config_filename is None: + config_filename = read_env("MEMAS_CONF_FILE") app.config.from_file(config_filename, load=yaml.safe_load) app.ctx: ContextManager = ContextManager(app.config) if first_init: app.ctx.first_init() app.logger.info("Finished first time initialization") - exit(0) + sys.exit(0) app.ctx.init() @@ -23,6 +40,8 @@ def handle_memas_exception(e: MemasException): # app.logger.info(traceback.format_exc()) return e.return_obj(), e.status_code.value + celery_init_app(app) + from memas.dataplane import dataplane from memas.controlplane import controlplane app.register_blueprint(dataplane) diff --git a/memas/celery_worker.py b/memas/celery_worker.py new file mode 100644 index 0000000..3245439 --- /dev/null +++ b/memas/celery_worker.py @@ -0,0 +1,39 @@ +import time +from uuid import UUID +from celery import shared_task +from celery.utils.log import get_task_logger +from memas.context_manager import ctx +from memas.interface.exceptions import NamespaceDoesNotExistException + + +logger = get_task_logger(__name__) + + +@shared_task(ignore_result=True) +def delete_corpus(parent_id: UUID, corpus_id: UUID, corpus_pathname: str): + logger.info( + f"celery delete corpus for [corpus_pathname={corpus_pathname}] [parent_id={parent_id}] [corpus_id={corpus_id}]") + + # Sleep for x seconds to avoid a race condition with the flask's delete_corpus handler + time.sleep(3) + corpus_exists = True + + try: + # It is expected the namespace doesn't exist, since the original delete_corpus api should have deleted it. + corpus_info = ctx.memas_metadata.get_corpus_info(corpus_pathname) + except NamespaceDoesNotExistException: + logger.debug(f"initiate_delete_corpus was successful originally") + corpus_info = ctx.memas_metadata.get_corpus_info_by_id(parent_id, corpus_id) + corpus_exists = False + + # When the get_corpus_ids_by_name doesn't fail, initiate the delete again, in case the delete earlier was interrupted + if corpus_exists: + ctx.memas_metadata.initiate_delete_corpus(parent_id, corpus_id, corpus_pathname) + logger.warn(f"Corpus deletion failed but recovered [corpus_id={corpus_id}] [corpus_pathname={corpus_pathname}]") + + # Then delete the content within the corpus + corpus = ctx.corpus_provider.get_corpus_by_info(corpus_info) + corpus.delete_all_content() + + # finally delete the metadata + ctx.memas_metadata.finish_delete_corpus(parent_id, corpus_id) diff --git a/memas/context_manager.py b/memas/context_manager.py index f3284fe..add7d7d 100644 --- a/memas/context_manager.py +++ b/memas/context_manager.py @@ -122,7 +122,8 @@ def init_datastores(self) -> None: self.corpus_vec.init() self.corpus_doc.init() - self.corpus_provider = CorpusProvider(self.corpus_metadata, self.corpus_doc, self.corpus_vec) + self.corpus_provider = CorpusProvider( + self.memas_metadata, self.corpus_metadata, self.corpus_doc, self.corpus_vec) def init(self) -> None: self.init_clients() diff --git a/memas/controlplane.py b/memas/controlplane.py index 53b0ab2..eaa5ee0 100644 --- a/memas/controlplane.py +++ b/memas/controlplane.py @@ -1,4 +1,5 @@ from flask import Blueprint, current_app, request +import memas.celery_worker as worker from memas.context_manager import ctx from memas.interface.corpus import CorpusType @@ -30,3 +31,21 @@ def create_corpus(): current_app.logger.error(f"Corpus type not supported [corpus_type={corpus_type}]") raise NotImplementedError(f"Corpus Type '{corpus_type}' not supported") return {"success": True} + + +@controlplane.route('/delete_corpus', methods=["POST"]) +def delete_corpus(): + corpus_pathname = request.json["corpus_pathname"] + + current_app.logger.info(f"Delete corpus [corpus_pathname=\"{corpus_pathname}\"]") + + # Get ids will raise an exception if the pathname is not found + parent_id, corpus_id = ctx.memas_metadata.get_corpus_ids_by_name(corpus_pathname) + + # The order is important here, first queue up the delete job + worker.delete_corpus.delay(parent_id, corpus_id, corpus_pathname) + + # Now initiate the delete, so the corpus can't be accessed by users + ctx.memas_metadata.initiate_delete_corpus(parent_id, corpus_id, corpus_pathname) + + return {"success": True} diff --git a/memas/corpus/basic_corpus.py b/memas/corpus/basic_corpus.py index e80d24b..db03d93 100644 --- a/memas/corpus/basic_corpus.py +++ b/memas/corpus/basic_corpus.py @@ -1,7 +1,7 @@ # from search_redirect import SearchSettings import logging import uuid -from memas.interface.corpus import Corpus, CorpusFactory +from memas.interface.corpus import Corpus, CorpusInfo, CorpusFactory from memas.interface.corpus import Citation from memas.interface.storage_driver import CorpusDocumentMetadataStore, CorpusDocumentStore, CorpusVectorStore, DocumentEntity from memas.interface.exceptions import SentenceLengthOverflowException @@ -15,8 +15,8 @@ class BasicCorpus(Corpus): - def __init__(self, corpus_id: uuid.UUID, corpus_name: str, metadata_store: CorpusDocumentMetadataStore, doc_store: CorpusDocumentStore, vec_store: CorpusVectorStore): - super().__init__(corpus_id, corpus_name) + def __init__(self, corpus_info: CorpusInfo, metadata_store: CorpusDocumentMetadataStore, doc_store: CorpusDocumentStore, vec_store: CorpusVectorStore): + super().__init__(corpus_info) self.metadata_store: CorpusDocumentMetadataStore = metadata_store self.doc_store: CorpusDocumentStore = doc_store self.vec_store: CorpusVectorStore = vec_store @@ -101,6 +101,12 @@ def search(self, clue: str) -> list[tuple[float, str, Citation]]: return results + def delete_all_content(self): + # TODO: parallelize + self.metadata_store.delete_corpus(self.corpus_id) + self.doc_store.delete_corpus(self.corpus_id) + self.vec_store.delete_corpus(self.corpus_id) + class BasicCorpusFactory(CorpusFactory): def __init__(self, metadata_store: CorpusDocumentMetadataStore, doc_store: CorpusDocumentStore, vec_store: CorpusVectorStore) -> None: @@ -109,6 +115,5 @@ def __init__(self, metadata_store: CorpusDocumentMetadataStore, doc_store: Corpu self.doc_store: CorpusDocumentStore = doc_store self.vec_store: CorpusVectorStore = vec_store - def produce(self, corpus_id: uuid.UUID): - # TODO: Maybe change the Corpus Name Parameter - return BasicCorpus(corpus_id, "BasicCorpus", self.metadata_store, self.doc_store, self.vec_store) + def produce(self, corpus_info: CorpusInfo): + return BasicCorpus(corpus_info, self.metadata_store, self.doc_store, self.vec_store) diff --git a/memas/corpus/corpus_provider.py b/memas/corpus/corpus_provider.py index c8fab86..99ee5fc 100644 --- a/memas/corpus/corpus_provider.py +++ b/memas/corpus/corpus_provider.py @@ -1,31 +1,46 @@ import logging -from uuid import UUID from memas.corpus.basic_corpus import BasicCorpusFactory -from memas.interface.corpus import Corpus, CorpusFactory, CorpusType -from memas.interface.storage_driver import CorpusDocumentMetadataStore, CorpusDocumentStore, CorpusVectorStore +from memas.interface.corpus import Corpus, CorpusFactory, CorpusInfo, CorpusType +from memas.interface.storage_driver import CorpusDocumentMetadataStore, CorpusDocumentStore, CorpusVectorStore, MemasMetadataStore _log = logging.getLogger(__name__) class CorpusProvider: - def __init__(self, metadata_store: CorpusDocumentMetadataStore, doc_store: CorpusDocumentStore, vec_store: CorpusVectorStore) -> None: + def __init__(self, + memas_metadata_store: MemasMetadataStore, + doc_metadata_store: CorpusDocumentMetadataStore, + doc_store: CorpusDocumentStore, + vec_store: CorpusVectorStore + ) -> None: + self.memas_metadata_store: MemasMetadataStore = memas_metadata_store + self.factory_dict: dict[CorpusType, CorpusFactory] = dict() - basic_corpus_factory = BasicCorpusFactory(metadata_store, doc_store, vec_store) + basic_corpus_factory = BasicCorpusFactory(doc_metadata_store, doc_store, vec_store) self.factory_dict[CorpusType.CONVERSATION] = basic_corpus_factory self.factory_dict[CorpusType.KNOWLEDGE] = basic_corpus_factory - def get_corpus(self, corpus_id: UUID, *, corpus_type: CorpusType, namespace_id: UUID = None) -> Corpus: - """Gets the Corpus class based on the corpus_id + def get_corpus_by_name(self, corpus_pathname: str) -> Corpus: + """Gets the Corpus class based on the corpus_pathname + + Args: + corpus_pathname (str): corpus pathname + + Returns: + Corpus: Corpus object for searching + """ + corpus_info = self.memas_metadata_store.get_corpus_info(corpus_pathname) + return self.get_corpus_by_info(corpus_info) + + def get_corpus_by_info(self, corpus_info: CorpusInfo) -> Corpus: + """Gets the Corpus class based on the CorpusInfo Args: - corpus_id (UUID): corpus_id - corpus_type (CorpusType): type of the corpus, this is necessary unless a namespace_id is provided - namespace_id (UUID): namespace_id of the corpus, this is necessary when a corpus type is not provided, - since it's needed to find the corpus type. + corpus_info (CorpusInfo): corpus info object Returns: - Corpus: _description_ + Corpus: Corpus object for searching """ - return self.factory_dict[corpus_type].produce(corpus_id) + return self.factory_dict[corpus_info.corpus_type].produce(corpus_info) diff --git a/memas/dataplane.py b/memas/dataplane.py index 0abc07b..893765f 100644 --- a/memas/dataplane.py +++ b/memas/dataplane.py @@ -14,13 +14,12 @@ def recall(): current_app.logger.info(f"Recalling [namespace_pathname=\"{namespace_pathname}\"]") - corpus_ids = ctx.memas_metadata.get_query_corpora(namespace_pathname) + corpus_infos = ctx.memas_metadata.get_query_corpora(namespace_pathname) - current_app.logger.debug(f"Querying corpuses: {corpus_ids}") + current_app.logger.debug(f"Querying corpuses: {corpus_infos}") search_results: list[tuple[str, Citation]] = [] - for corpus_id in corpus_ids: - # TODO: either provide corpus_type or namespace_pathname - corpus: Corpus = ctx.corpus_provider.get_corpus(corpus_id, corpus_type=CorpusType.KNOWLEDGE) + for corpus_info in corpus_infos: + corpus: Corpus = ctx.corpus_provider.get_corpus_by_info(corpus_info) search_results.extend(corpus.search(clue=clue)) # Combine the results and only take the top ones @@ -47,9 +46,7 @@ def memorize(): description=raw_citation.get("description", ""), document_name=document_name) - corpus_info = ctx.memas_metadata.get_corpus_info(corpus_pathname) - - corpus: Corpus = ctx.corpus_provider.get_corpus(corpus_info.corpus_id, corpus_type=corpus_info.corpus_type) + corpus: Corpus = ctx.corpus_provider.get_corpus_by_name(corpus_pathname) success = corpus.store_and_index(document, citation) current_app.logger.info(f"Memorize finished [success={success}]") diff --git a/memas/encoder/openai_ada_encoder.py b/memas/encoder/openai_ada_encoder.py index af9059e..c92d73b 100644 --- a/memas/encoder/openai_ada_encoder.py +++ b/memas/encoder/openai_ada_encoder.py @@ -3,7 +3,7 @@ from memas.interface.encoder import TextEncoder -ADA_MODEL="text-embedding-ada-002" +ADA_MODEL = "text-embedding-ada-002" class ADATextEncoder(TextEncoder): @@ -15,8 +15,8 @@ def init(self): pass def embed(self, text: str) -> np.ndarray: - return np.array(openai.Embedding.create(input = [text], model=ADA_MODEL)['data'][0]['embedding']) + return np.array(openai.Embedding.create(input=[text], model=ADA_MODEL)['data'][0]['embedding']) def embed_multiple(self, text_list: list[str]) -> list[np.ndarray]: - embeddings = openai.Embedding.create(input = text_list, model=ADA_MODEL)['data'] + embeddings = openai.Embedding.create(input=text_list, model=ADA_MODEL)['data'] return [np.array(resp['embedding']) for resp in embeddings] diff --git a/memas/interface/corpus.py b/memas/interface/corpus.py index cdb00bd..29c5af0 100644 --- a/memas/interface/corpus.py +++ b/memas/interface/corpus.py @@ -20,18 +20,22 @@ class Citation: @dataclass class CorpusInfo: corpus_pathname: str + namespace_id: UUID corpus_id: UUID corpus_type: CorpusType + def __hash__(self) -> int: + return hash((self.namespace_id, self.corpus_id, self.corpus_pathname)) + class Corpus(ABC): """ - Corpus interface used to hide the different implementations + Corpus interface used to access data within the corpus, and hide the different implementations """ - def __init__(self, corpus_id: UUID, corpus_name: str): - self.corpus_id: UUID = corpus_id - self.corpus_name: str = corpus_name + def __init__(self, corpus_info: CorpusInfo): + self.corpus_id: UUID = corpus_info.corpus_id + self.corpus_info: CorpusInfo = corpus_info @abstractmethod def store_and_index(self, document: str, citation: Citation) -> bool: @@ -56,9 +60,14 @@ def search(self, clue: str) -> list[tuple[float, str, Citation]]: list[tuple[str, Citation]]: a list of (document, citation) pairs """ + @abstractmethod + def delete_all_content(self): + """Deletes all data in the corpus + """ + class CorpusFactory(ABC): @abstractmethod - def produce(self, corpus_id: UUID): + def produce(self, corpus_info: CorpusInfo): # FIXME: do we want to pass in any arguments? pass diff --git a/memas/interface/exceptions.py b/memas/interface/exceptions.py index a631cb0..19fcd61 100644 --- a/memas/interface/exceptions.py +++ b/memas/interface/exceptions.py @@ -1,4 +1,5 @@ from enum import Enum +from uuid import UUID class MemasInternalException(Exception): @@ -29,6 +30,7 @@ class ErrorCode(Enum): NamespaceExists = "namespace_exists" NamespaceDoesNotExist = "namespace_does_not_exist" NamespaceIllegalName = "namespace_illegal_name" + NamespaceDeleting = "namespace_deleting" class MemasException(Exception): @@ -65,4 +67,9 @@ def __init__(self, pathname: str, additional_details: str = None) -> None: # TODO: properly specify this exception type class SentenceLengthOverflowException(Exception): def __init__(self, sentence_len: int) -> None: - super().__init__("Sentence length is {len} which exceededs limit".format(len=sentence_len)) + super().__init__(f"Sentence length is {sentence_len} which exceededs limit") + + +class DocumentMetadataNotFound(IllegalStateException): + def __init__(self, corpus_id: UUID, document_id: UUID) -> None: + super().__init__(f"Document metadata not found for [corpus_id={corpus_id}] [document_id={document_id}]") diff --git a/memas/interface/namespace.py b/memas/interface/namespace.py index c3e3dcf..1abb89e 100644 --- a/memas/interface/namespace.py +++ b/memas/interface/namespace.py @@ -1,6 +1,10 @@ +import logging +import re from typing import Final from uuid import UUID -import re + +_log = logging.getLogger(__name__) + ROOT_ID: Final[UUID] = UUID("0" * 32) ROOT_NAME: Final[str] = "" @@ -13,18 +17,15 @@ CORPUS_SEPARATOR: Final[str] = ":" -def is_pathname_format_valid(pathname: str) -> bool: - tokens = pathname.split(CORPUS_SEPARATOR) - if len(tokens) < 2: - namespace_pathname = pathname - elif len(tokens) == 2: - if not is_name_format_valid(tokens[1]): - return False - namespace_pathname = tokens[0] - else: +def mangle_corpus_pathname(parent_pathname: str, corpus_name: str) -> str: + return parent_pathname + CORPUS_SEPARATOR + corpus_name + + +def is_namespace_pathname_valid(pathname: str) -> bool: + if CORPUS_SEPARATOR in pathname: return False - tokens = namespace_pathname.split(NAMESPACE_SEPARATOR) + tokens = pathname.split(NAMESPACE_SEPARATOR) # We allow only the root namespace to be empty string. # This needs to be specially handled since is_name_format_valid won't allow empty names if len(tokens) == 1 and tokens[0] == ROOT_NAME: @@ -32,11 +33,19 @@ def is_pathname_format_valid(pathname: str) -> bool: for segment in tokens: if not is_name_format_valid(segment): - print(f"Segment \"{segment}\" not valid") + _log.info(f"Segment \"{segment}\" not valid") return False return True +def is_corpus_pathname_valid(pathname: str) -> bool: + tokens = pathname.split(CORPUS_SEPARATOR) + if len(tokens) != 2: + return False + + return is_name_format_valid(tokens[1]) and is_namespace_pathname_valid(tokens[0]) + + NAME_REGEX: re.Pattern = re.compile(r"[A-Za-z0-9_]+") diff --git a/memas/interface/storage_driver.py b/memas/interface/storage_driver.py index dcab8f7..b75a994 100644 --- a/memas/interface/storage_driver.py +++ b/memas/interface/storage_driver.py @@ -24,9 +24,31 @@ def first_init(self): class MemasMetadataStore(StorageDriver): """ Memas metadata store providing basic namespace level operations for managing - the namespace related data + the namespace related metadata """ + @abstractmethod + def get_namespace_ids_by_name(self, fullname: str) -> tuple[UUID, UUID]: + """Retrieve the parent and child namespace_id from the pathname + + Args: + fullname (str): the full pathname + + Returns: + tuple[UUID, UUID]: (parent_id, child_id) pair + """ + + @abstractmethod + def get_corpus_ids_by_name(self, fullname: str) -> tuple[UUID, UUID]: + """Retrieve the parent namespace_id and the child corpus id from the pathname + + Args: + fullname (str): the full pathname + + Returns: + tuple[UUID, UUID]: (parent_id, child_id) pair + """ + @abstractmethod def create_namespace(self, namespace_pathname: str, *, parent_id: UUID = None) -> UUID: """Create a namespace based on the pathname @@ -66,6 +88,7 @@ def create_knowledge_corpus(self, corpus_pathname: str, *, parent_id: UUID = Non UUID: uuid of the just created corpus """ + @abstractmethod def get_corpus_info(self, corpus_pathname: str) -> CorpusInfo: """Gets the corpus info using the corpus pathname @@ -76,14 +99,47 @@ def get_corpus_info(self, corpus_pathname: str) -> CorpusInfo: CorpusInfo: Corpus Info object """ - def get_query_corpora(self, namespace_pathname: str) -> set[UUID]: - """Retrieves the set of corpus ids this namespace user should query + @abstractmethod + def get_corpus_info_by_id(self, namespace_id: UUID, corpus_id: UUID) -> CorpusInfo: + """Gets the corpus info using the corpus id and namespace id + + Args: + namespace_id (UUID): parent namespace id of the corpus + corpus_id (UUID): corpus id of the corpus + + Returns: + CorpusInfo: Corpus Info object + """ + + @abstractmethod + def get_query_corpora(self, namespace_pathname: str) -> set[CorpusInfo]: + """Retrieves the set of CorpusInfo objects this namespace user should query Args: namespace_pathname (str): the pathname of the querying user Returns: - set[UUID]: set of corpus ids + set[CorpusInfo]: set of CorpusInfo objects + """ + + @abstractmethod + def initiate_delete_corpus(self, parent_id: UUID, corpus_id: UUID, corpus_pathname: str): + """Initiate corpus deletion. This will free up the pathname, while marking the corpus as deleted. + Note that the parent and corpus ids are needed when recovering an interrupted delete + + Args: + parent_id (UUID): parent namespace id of the corpus + corpus_id (UUID): child corpus id of the corpus + corpus_pathname (str): the full pathname of the corpus + """ + + @abstractmethod + def finish_delete_corpus(self, parent_id: UUID, corpus_id: UUID): + """Finish corpus deletion. This will fully delete a corpus' metadata. + + Args: + parent_id (UUID): parent namespace id of the corpus + corpus_id (UUID): corpus id of the corpus """ @@ -98,6 +154,7 @@ def insert_document_metadata(self, corpus_id: UUID, document_id: UUID, num_segme Args: corpus_id (UUID): corpus id document_id (UUID): document id + num_segments (int): number of segments the document is stored in citation (Citation): citation object Returns: @@ -116,6 +173,14 @@ def get_document_citation(self, corpus_id: UUID, document_id: UUID) -> Citation: Citation: Citation object of the document """ + @abstractmethod + def delete_corpus(self, corpus_id: UUID): + """Deletes all citations under a corpus + + Args: + corpus_id (UUID): corpus id + """ + @dataclass class DocumentEntity: @@ -141,6 +206,14 @@ def save_documents(self, chunk_id_doc_pairs: list[str, DocumentEntity]) -> bool: bool: success or not """ + @abstractmethod + def delete_corpus(self, corpus_id: UUID): + """delete all documents under a corpus + + Args: + corpus_id (UUID): corpus id + """ + @abstractmethod def search_corpora(self, corpus_ids: list[UUID], clue: str) -> list[tuple[float, DocumentEntity]]: """Search set of corpora using a clue @@ -171,6 +244,14 @@ def save_documents(self, doc_entities: list[DocumentEntity]) -> bool: doc_entity (DocumentEntity): Document Entity object """ + @abstractmethod + def delete_corpus(self, corpus_id: UUID): + """delete all vectors under a corpus + + Args: + corpus_id (UUID): the corpus id + """ + @abstractmethod def search_corpora(self, corpus_ids: list[UUID], clue: str) -> list[tuple[float, DocumentEntity, int, int]]: """Search set of corpora using a clue diff --git a/memas/make_celery.py b/memas/make_celery.py new file mode 100644 index 0000000..c173de4 --- /dev/null +++ b/memas/make_celery.py @@ -0,0 +1,5 @@ +from app import create_app + +# This is the necessary entry point for initiating the celery worker +flask_app = create_app() +celery_app = flask_app.extensions["celery"] diff --git a/memas/memas-config.yml b/memas/memas-config.yml index 2e864e5..ca83699 100644 --- a/memas/memas-config.yml +++ b/memas/memas-config.yml @@ -11,3 +11,8 @@ ELASTICSEARCH: MILVUS: ip: "127.0.0.1" port: 19530 + +CELERY: + broker_url: "redis://localhost" + result_backend: "redis://localhost" + task_ignore_result: True diff --git a/memas/storage_driver/corpus_doc_metadata.py b/memas/storage_driver/corpus_doc_metadata.py index 66e2b7d..73d74cd 100644 --- a/memas/storage_driver/corpus_doc_metadata.py +++ b/memas/storage_driver/corpus_doc_metadata.py @@ -3,7 +3,9 @@ from uuid import UUID from cassandra.cqlengine import columns, management from cassandra.cqlengine.models import Model +from cassandra.cqlengine.query import DoesNotExist from memas.interface.corpus import Citation +from memas.interface.exceptions import DocumentMetadataNotFound from memas.interface.storage_driver import CorpusDocumentMetadataStore @@ -17,7 +19,6 @@ class DocumentMetadata(Model): document_name = columns.Text() source_name = columns.Text() source_uri = columns.Text() - corpus_name = columns.Text() description = columns.Text() segment_count = columns.Integer() added_at = columns.DateTime(required=True) @@ -54,7 +55,7 @@ def insert_document_metadata(self, corpus_id: UUID, document_id: UUID, num_segme added_at=datetime.now()) return True - def get_document_citation(self, corpus_id: UUID, document_id: UUID) -> Citation: + def get_document_citation(self, corpus_id: UUID, document_id: UUID) -> [Citation | None]: """Retrieves the document citation Args: @@ -65,8 +66,11 @@ def get_document_citation(self, corpus_id: UUID, document_id: UUID) -> Citation: Citation: Citation object of the document """ _log.debug(f"Retrieving document citation for [corpus_id={corpus_id.hex}] [document_id={document_id.hex}]") - result = DocumentMetadata.get( - corpus_id=corpus_id, document_id=document_id) + try: + result = DocumentMetadata.get(corpus_id=corpus_id, document_id=document_id) + except DoesNotExist as e: + _log.error(f"Document citation not found for [corpus_id={corpus_id.hex}] [document_id={document_id.hex}]") + raise DocumentMetadataNotFound(corpus_id, document_id) from e return Citation(source_uri=result.source_uri, source_name=result.source_name, description=result.description, @@ -85,5 +89,9 @@ def get_document_segment_count(self, corpus_id: UUID, document_id: UUID) -> int: _log.debug(f"Retrieving document segment count for [corpus_id={corpus_id.hex}] [document_id={document_id.hex}]") return DocumentMetadata.get(corpus_id=corpus_id, document_id=document_id).segment_count + def delete_corpus(self, corpus_id: UUID): + # This is surprisingly efficient based on: https://stackoverflow.com/a/43341522 + DocumentMetadata.filter(corpus_id=corpus_id).delete() + SINGLETON: CorpusDocumentMetadataStore = CorpusDocumentMetadataStoreImpl() diff --git a/memas/storage_driver/corpus_doc_store.py b/memas/storage_driver/corpus_doc_store.py index 246966f..55cee71 100644 --- a/memas/storage_driver/corpus_doc_store.py +++ b/memas/storage_driver/corpus_doc_store.py @@ -93,3 +93,18 @@ def search_corpora(self, corpus_ids: list[UUID], clue: str) -> list[tuple[float, hit["_id"][:32]), document_name=data[NAME_FIELD], document=data[DOC_FIELD]))) return result + + def delete_corpus(self, corpus_id: UUID): + _log.debug(f"Deleting documents for [corpus_id={corpus_id}]") + delete_query = { + "bool": { + "filter": [ + {"term": {CORPUS_FIELD: corpus_id.hex}} + ] + } + } + response = self.es.delete_by_query(index=self.es_index, query=delete_query) + # TODO: add retry/error handling + assert not response["timed_out"] + _log.debug( + f"Corpus Metadata Deletion took {response['took']}ms, total {response['total']} entries, deleted {response['deleted']} entries.") diff --git a/memas/storage_driver/corpus_vector_store.py b/memas/storage_driver/corpus_vector_store.py index 28cf4da..fc1fcd2 100644 --- a/memas/storage_driver/corpus_vector_store.py +++ b/memas/storage_driver/corpus_vector_store.py @@ -1,11 +1,8 @@ from dataclasses import dataclass import logging -import numpy as np -import re -import time import uuid +import numpy as np from pymilvus import ( - connections, FieldSchema, CollectionSchema, DataType, @@ -44,7 +41,15 @@ class MilvusSentenceObject: end_index: int def to_data(self): - return [[self.composite_id], [self.corpus_id], [self.document_name], [self.text_preview], self.embedding, [self.start_index], [self.end_index]] + return [ + [self.composite_id], + [self.corpus_id], + [self.document_name], + [self.text_preview], + self.embedding, + [self.start_index], + [self.end_index] + ] def hash_sentence_id(document_id: uuid.UUID, sentence: str) -> uuid.UUID: @@ -52,7 +57,8 @@ def hash_sentence_id(document_id: uuid.UUID, sentence: str) -> uuid.UUID: def convert_batch(objects: list[MilvusSentenceObject]): - composite_ids, corpus_ids, document_names, text_previews, embeddings, start_indices, end_indices = [], [], [], [], [], [], [] + composite_ids, corpus_ids, document_names = [], [], [] + text_previews, embeddings, start_indices, end_indices = [], [], [], [] for obj in objects: composite_ids.append(obj.composite_id) corpus_ids.append(obj.corpus_id) @@ -155,3 +161,6 @@ def save_documents(self, doc_entities: list[DocumentEntity]) -> bool: insert_count = insert_count + self.collection.insert(convert_batch(objects)).insert_count return insert_count == sentence_count + + def delete_corpus(self, corpus_id: uuid.UUID): + self.collection.delete(f"{CORPUS_FIELD} == \"{corpus_id.hex}\"") diff --git a/memas/storage_driver/memas_metadata.py b/memas/storage_driver/memas_metadata.py index b74c2a8..e4833c3 100644 --- a/memas/storage_driver/memas_metadata.py +++ b/memas/storage_driver/memas_metadata.py @@ -1,14 +1,30 @@ from datetime import datetime +from enum import Enum import logging from typing import Final import uuid +from uuid import UUID +from cassandra import ConsistencyLevel from cassandra.cqlengine import columns, management from cassandra.cqlengine.models import Model from cassandra.cqlengine.query import BatchQuery, DoesNotExist, LWTException from memas.interface import corpus -from memas.interface.corpus import CorpusType -from memas.interface.exceptions import IllegalNameException, NamespaceDoesNotExistException, NamespaceExistsException -from memas.interface.namespace import ROOT_ID, ROOT_NAME, NAMESPACE_SEPARATOR, CORPUS_SEPARATOR, is_pathname_format_valid, is_name_format_valid +from memas.interface.corpus import CorpusInfo, CorpusType +from memas.interface.exceptions import ( + IllegalNameException, + IllegalStateException, + NamespaceDoesNotExistException, + NamespaceExistsException +) +from memas.interface.namespace import ( + ROOT_ID, + ROOT_NAME, + NAMESPACE_SEPARATOR, + CORPUS_SEPARATOR, + is_corpus_pathname_valid, + is_namespace_pathname_valid, + mangle_corpus_pathname +) from memas.interface.storage_driver import MemasMetadataStore @@ -28,6 +44,10 @@ READ_AND_WRITE: Final[int] = STD_READ_PERMISSION & STD_WRITE_PERMISSION +class NamespaceStatus(Enum): + DELETING = "deleting" + + class NamespaceNameToId(Model): """ Maps full pathnames to ids, for both corpus and namespace. @@ -52,12 +72,14 @@ class NamespaceInfo(Model): parent_id = columns.UUID(partition_key=True) namespace_id = columns.UUID(primary_key=True) - parent_path = columns.Ascii(required=True, max_length=MAX_PATH_LENGTH - MAX_SEGMENT_LENGTH, min_length=0) + parent_pathname = columns.Ascii(required=True, max_length=MAX_PATH_LENGTH - MAX_SEGMENT_LENGTH, min_length=0) namespace_name = columns.Ascii(required=True, max_length=MAX_NS_NAME_LENGTH) # Default set of (shared) corpora that is queried. This won't include the direct child corpora # of this namespace, since those will always be queried unless specified otherwise. - query_default_corpus = columns.Set(columns.UUID, default=set()) + # This set stores strings that are the concatenation of "{namespace_id}:{corpus_id}" + query_default_corpus = columns.Set(columns.Ascii, default=set()) created_at = columns.DateTime(required=True) + status = columns.Ascii() class CorpusInfo(Model): @@ -67,10 +89,12 @@ class CorpusInfo(Model): parent_id = columns.UUID(partition_key=True) corpus_id = columns.UUID(primary_key=True) + parent_pathname = columns.Ascii(required=True, max_length=MAX_PATH_LENGTH - MAX_SEGMENT_LENGTH, min_length=0) corpus_name = columns.Ascii(required=True, max_length=MAX_CORPUS_NAME_LENGTH) corpus_type = columns.Ascii(required=True) permissions = columns.Integer(required=True) created_at = columns.DateTime(required=True) + status = columns.Ascii() def split_namespace_pathname(pathname: str) -> tuple[str, str]: @@ -82,6 +106,8 @@ def split_namespace_pathname(pathname: str) -> tuple[str, str]: Returns: tuple[str, str]: (parent_pathname, namespace_name) pair """ + if not is_namespace_pathname_valid(pathname): + raise IllegalNameException(pathname) tokens = pathname.rsplit(NAMESPACE_SEPARATOR, 1) if len(tokens) == 1: return (ROOT_NAME, pathname) @@ -97,9 +123,10 @@ def split_corpus_pathname(corpus_pathname: str) -> tuple[str, str]: Returns: tuple[str, str]: (parent_pathname, corpus_name) pair """ - tokens = corpus_pathname.split(CORPUS_SEPARATOR) - if len(tokens) != 2: + if not is_corpus_pathname_valid(corpus_pathname): raise IllegalNameException(corpus_pathname) + + tokens = corpus_pathname.split(CORPUS_SEPARATOR) return (tokens[0], tokens[1]) @@ -124,12 +151,14 @@ def _get_id_by_name(self, fullname: str) -> uuid.UUID: raise NamespaceDoesNotExistException(fullname) from e return result.id - def _get_ids_by_name(self, fullname: str) -> tuple[uuid.UUID, uuid.UUID]: - if CORPUS_SEPARATOR in fullname: - parent_pathname, child_name = split_corpus_pathname(fullname) - else: - parent_pathname, child_name = split_namespace_pathname(fullname) + def get_corpus_ids_by_name(self, fullname: str) -> tuple[uuid.UUID, uuid.UUID]: + parent_pathname, _ = split_corpus_pathname(fullname) + child_id = self._get_id_by_name(fullname=fullname) + parent_id = self._get_id_by_name(fullname=parent_pathname) + return (parent_id, child_id) + def get_namespace_ids_by_name(self, fullname: str) -> tuple[uuid.UUID, uuid.UUID]: + parent_pathname, _ = split_namespace_pathname(fullname) child_id = self._get_id_by_name(fullname=fullname) parent_id = self._get_id_by_name(fullname=parent_pathname) return (parent_id, child_id) @@ -139,8 +168,6 @@ def create_namespace(self, namespace_pathname: str, *, parent_id: uuid.UUID = No if namespace_pathname == ROOT_NAME: raise NamespaceExistsException(namespace_pathname, "\"\" is reserved for the root namespace!") - if not is_pathname_format_valid(namespace_pathname): - raise IllegalNameException(namespace_pathname) parent_pathname, child_name = split_namespace_pathname(namespace_pathname) if parent_id is None: @@ -162,7 +189,7 @@ def create_namespace(self, namespace_pathname: str, *, parent_id: uuid.UUID = No with BatchQuery() as batch_query: NamespaceInfo.batch(batch_query).create( parent_id=parent_id, namespace_id=namespace_id, - parent_path=parent_pathname, namespace_name=child_name, created_at=now) + parent_pathname=parent_pathname, namespace_name=child_name, created_at=now) NamespaceParent.batch(batch_query).create(child_id=namespace_id, parent_id=parent_id) return namespace_id @@ -171,9 +198,6 @@ def create_corpus(self, corpus_pathname: str, corpus_type: CorpusType, permissio _log.debug(f"Creating corpus for [corpus_pathname=\"{corpus_pathname}\"] [corpus_type={corpus_type}]") parent_pathname, corpus_name = split_corpus_pathname(corpus_pathname) - if not is_pathname_format_valid(parent_pathname) or not is_name_format_valid(corpus_name): - raise IllegalNameException(corpus_pathname) - if parent_id is None: parent_id = self._get_id_by_name(parent_pathname) @@ -188,7 +212,7 @@ def create_corpus(self, corpus_pathname: str, corpus_type: CorpusType, permissio # FIXME: Same issue with namespace id colliding. with BatchQuery() as batch_query: CorpusInfo.batch(batch_query).create( - parent_id=parent_id, corpus_id=corpus_id, corpus_name=corpus_name, + parent_id=parent_id, corpus_id=corpus_id, parent_pathname=parent_pathname, corpus_name=corpus_name, corpus_type=corpus_type.value, permissions=permissions, created_at=now) NamespaceParent.batch(batch_query).create(child_id=corpus_id, parent_id=parent_id) return corpus_id @@ -201,19 +225,73 @@ def create_knowledge_corpus(self, corpus_pathname: str, *, parent_id: uuid.UUID return self.create_corpus(corpus_pathname=corpus_pathname, corpus_type=CorpusType.KNOWLEDGE, permissions=STD_READ_PERMISSION, parent_id=parent_id) - def get_query_corpora(self, namespace_pathname: str) -> set[uuid.UUID]: - parent_id, namespace_id = self._get_ids_by_name(namespace_pathname) - namespace_result = NamespaceInfo.get(parent_id=parent_id, namespace_id=namespace_id) - query_corpora = set(namespace_result.query_default_corpus) + def initiate_delete_corpus(self, parent_id: UUID, corpus_id: UUID, corpus_pathname: str): + _log.debug(f"Initiating delete corpus for [corpus_pathname=\"{corpus_pathname}\"] [corpus_id={corpus_id.hex}]") + try: + NamespaceNameToId.objects(fullname=corpus_pathname).consistency( + ConsistencyLevel.QUORUM).iff(id=corpus_id).delete() + except LWTException as e: + _log.info(f"Corpus already deleted [corpus_pathname=\"{corpus_pathname}\"] [corpus_id={corpus_id.hex}]") + raise NamespaceDoesNotExistException("corpus_pathname") from e + + try: + CorpusInfo.objects(parent_id=parent_id, corpus_id=corpus_id).if_exists().update( + status=NamespaceStatus.DELETING.value) + except LWTException as e: + _log.error(f"CorpusInfo already deleted [parent_id=\"{parent_id.hex}\"] [corpus_id={corpus_id.hex}]") + raise IllegalStateException("CorpusInfo already deleted but NamespaceNameToId wasn't") from e + + def finish_delete_corpus(self, namespace_id: UUID, corpus_id: UUID): + # truly delete the remaining metadata of the corpus. Note that the NamespaceNameToId entry should be deleted prior to this call + with BatchQuery() as batch_query: + CorpusInfo.batch(batch_query).filter(parent_id=namespace_id, corpus_id=corpus_id).delete() + NamespaceParent.batch(batch_query).filter(child_id=corpus_id).delete() - for corpora_result in CorpusInfo.filter(parent_id=namespace_id).all(): - query_corpora.add(corpora_result.corpus_id) - return query_corpora + def get_query_corpora(self, namespace_pathname: str) -> set[corpus.CorpusInfo]: + parent_id, namespace_id = self.get_namespace_ids_by_name(namespace_pathname) + namespace_result = NamespaceInfo.get(parent_id=parent_id, namespace_id=namespace_id) + query_corpuses = set() + deleted_corpuses = set() + for composite_id in set(namespace_result.query_default_corpus): + # the composite id is a composition of "{namespace_id}:{corpus_id}" + namespace_hex, corpus_hex = composite_id.split(":") + namespace_id = uuid.UUID(namespace_hex) + corpus_id = uuid.UUID(corpus_hex) + try: + corpus_info = self.get_corpus_info_by_id(namespace_id, corpus_id) + except IllegalStateException as err: + deleted_corpuses.add(composite_id) + query_corpuses.add(corpus_info) + # for corpuses not found, this means they were deleted. Update the set to remove this + if deleted_corpuses: + new_query_set = set(namespace_result.query_default_corpus).difference(deleted_corpuses) + NamespaceInfo.objects(parent_id=parent_id, namespace_id=namespace_id).update( + query_default_corpus=new_query_set) + + for result in CorpusInfo.filter(parent_id=namespace_id).all(): + corpus_pathname = mangle_corpus_pathname(result.parent_pathname, result.corpus_name) + corpus_info = corpus.CorpusInfo(corpus_pathname=corpus_pathname, namespace_id=result.parent_id, + corpus_id=result.corpus_id, corpus_type=CorpusType(result.corpus_type)) + query_corpuses.add(corpus_info) + return query_corpuses def get_corpus_info(self, corpus_pathname: str) -> corpus.CorpusInfo: - parent_id, corpus_id = self._get_ids_by_name(corpus_pathname) - result = CorpusInfo.get(parent_id=parent_id, corpus_id=corpus_id) - return corpus.CorpusInfo(corpus_pathname=corpus_pathname, corpus_id=corpus_id, corpus_type=CorpusType(result.corpus_type)) + parent_id, corpus_id = self.get_corpus_ids_by_name(corpus_pathname) + corpus_info = self.get_corpus_info_by_id(parent_id, corpus_id) + if corpus_info.corpus_pathname != corpus_pathname: + raise IllegalStateException( + f"Inconsistent CorpusInfo vs NamespacePathname, \"{corpus_info.corpus_pathname}\"!=\"{corpus_pathname}\"") + return corpus_info + + def get_corpus_info_by_id(self, namespace_id: UUID, corpus_id: UUID) -> corpus.CorpusInfo: + try: + corpus_info = CorpusInfo.get(parent_id=namespace_id, corpus_id=corpus_id) + except DoesNotExist as e: + # TODO: Maybe free up the name in this case? + raise IllegalStateException("Corpus creation or delete incomplete") + + corpus_pathname = mangle_corpus_pathname(corpus_info.parent_pathname, corpus_info.corpus_name) + return corpus.CorpusInfo(corpus_pathname=corpus_pathname, namespace_id=namespace_id, corpus_id=corpus_id, corpus_type=CorpusType(corpus_info.corpus_type)) SINGLETON: MemasMetadataStore = MemasMetadataStoreImpl() diff --git a/requirements.txt b/requirements.txt index 8c8154b..d5add66 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,8 @@ flask==2.3.2 +celery[redis] tensorflow==2.12.0 tensorflow_hub -pymilvus==2.2.8 +pymilvus==2.3.2 elasticsearch==8.8.0 scylla-driver==3.26.2 nltk diff --git a/test-requirements.txt b/test-requirements.txt index b488f4d..8d6e46e 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -1,3 +1,3 @@ pytest pylint -autopep8 +autopep8>=2.0.4 diff --git a/tests/interface/test_corpus.py b/tests/interface/test_corpus.py new file mode 100644 index 0000000..af87aba --- /dev/null +++ b/tests/interface/test_corpus.py @@ -0,0 +1,13 @@ +import uuid +from memas.interface.corpus import CorpusInfo, CorpusType + + +def test_corpus_info_hashable(): + namespace_id1 = uuid.uuid4() + corpus_id1 = uuid.uuid4() + corpus_info1 = CorpusInfo("test1", namespace_id1, corpus_id1, CorpusType.CONVERSATION) + + try: + s = {corpus_info1} + except TypeError: + assert False, "CorpusInfo is not hashable" diff --git a/tests/interface/test_namespace.py b/tests/interface/test_namespace.py index 8275f6e..d9d2564 100644 --- a/tests/interface/test_namespace.py +++ b/tests/interface/test_namespace.py @@ -1,26 +1,54 @@ import pytest -from memas.interface.namespace import is_pathname_format_valid, is_name_format_valid +from memas.interface.namespace import is_namespace_pathname_valid, is_corpus_pathname_valid, is_name_format_valid, mangle_corpus_pathname + + +@pytest.mark.parametrize("parent_pathname, corpus_name, expected_pathname", [ + ("aaa.bbb", "ccc", "aaa.bbb:ccc"), +]) +def test_mangle_corpus_pathname(parent_pathname, corpus_name, expected_pathname): + assert mangle_corpus_pathname(parent_pathname, corpus_name) == expected_pathname @pytest.mark.parametrize("pathname", [ "aaa.bbb.ccc", # most standard namespace - "a.bb:ccc", # most standard corpus - ":corpus", # root level corpus "", # The empty string namespace is the reserved root namespace "c", # Single character names are allowed "AFD.my.namespace" # capital case is allowed ]) -def test_pathname_format_valid(pathname): - assert is_pathname_format_valid(pathname) +def test_namespace_pathname_format_valid(pathname): + assert is_namespace_pathname_valid(pathname) + + +@pytest.mark.parametrize("pathname", [ + "a.bb:ccc", # most standard corpus + ":corpus", # root level corpus +]) +def test_corpus_pathname_format_valid(pathname): + assert is_corpus_pathname_valid(pathname) + + +@pytest.mark.parametrize("pathname", [ + "x y", # No spaces allowed + "this,is\"also;bad", # no weird separators + "a..b", # only the root namespace can be empty + "a.bb:ccc", # most standard corpus + ":corpus", # root level corpus +]) +def test_namespace_pathname_format_invalid(pathname): + assert not is_namespace_pathname_valid(pathname) @pytest.mark.parametrize("pathname", [ "x y", # No spaces allowed "this,is\"also;bad", # no weird separators "a..b", # only the root namespace can be empty + "aaa.bbb.ccc", # most standard namespace + "", # The empty string namespace is the reserved root namespace + "c", # Single character names are allowed + "AFD.my.namespace" # capital case is allowed ]) -def test_pathname_format_invalid(pathname): - assert not is_pathname_format_valid(pathname) +def test_corpus_pathname_format_invalid(pathname): + assert not is_corpus_pathname_valid(pathname) @pytest.mark.parametrize("name", [