diff --git a/.gitignore b/.gitignore index 6c260a02..f70ebc68 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ cloud-harness/ .vscode/ node_modules secret.json +data/ \ No newline at end of file diff --git a/applications/visualizer/backend/.gitignore b/applications/visualizer/backend/.gitignore index b1213fd3..86bc28dd 100644 --- a/applications/visualizer/backend/.gitignore +++ b/applications/visualizer/backend/.gitignore @@ -406,4 +406,7 @@ poetry.toml # LSP config files pyrightconfig.json -# End of https://www.toptal.com/developers/gitignore/api/node,python,django \ No newline at end of file +# End of https://www.toptal.com/developers/gitignore/api/node,python,django + + +static/ \ No newline at end of file diff --git a/applications/visualizer/backend/api/api.py b/applications/visualizer/backend/api/api.py index eae959e2..96c67b50 100644 --- a/applications/visualizer/backend/api/api.py +++ b/applications/visualizer/backend/api/api.py @@ -1,12 +1,18 @@ +from io import StringIO +import sys from collections import defaultdict from typing import Iterable, Optional from ninja import NinjaAPI, Router, Query, Schema from ninja.pagination import paginate, PageNumberPagination +from ninja.errors import HttpError + from django.shortcuts import aget_object_or_404 from django.db.models import Q from django.db.models.manager import BaseManager from django.conf import settings +from django.core.management import call_command + from .utils import get_dataset_viewer_config, to_list @@ -16,7 +22,9 @@ Neuron as NeuronModel, Connection as ConnectionModel, ) +from .decorators.streaming import with_stdout_streaming from .services.connectivity import query_nematode_connections +from .authenticators.basic_auth_super_user import basic_auth_superuser class ErrorMessage(Schema): @@ -237,6 +245,23 @@ def get_connections( # ) +## Ingestion + + +@api.get("/populate_db", auth=basic_auth_superuser, tags=["ingestion"]) +@with_stdout_streaming +def populate_db(request): + try: + print("Starting DB population...\n") + call_command("migrate") + call_command("populatedb") + except Exception as e: + raise HttpError(500) + + +## Healthcheck + + @api.get("/live", tags=["healthcheck"]) async def live(request): """Test if application is healthy""" diff --git a/applications/visualizer/backend/api/authenticators/__init__.py b/applications/visualizer/backend/api/authenticators/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/applications/visualizer/backend/api/authenticators/basic_auth_super_user.py b/applications/visualizer/backend/api/authenticators/basic_auth_super_user.py new file mode 100644 index 00000000..daf6b50b --- /dev/null +++ b/applications/visualizer/backend/api/authenticators/basic_auth_super_user.py @@ -0,0 +1,14 @@ +from ninja.security import HttpBasicAuth +from django.contrib.auth import authenticate as django_authenticate + + +class BasicAuthSuperUser(HttpBasicAuth): + def authenticate(self, request, username, password): + # Authenticate user with Django's built-in authenticate function + user = django_authenticate(request, username=username, password=password) + if user and user.is_superuser: # Ensure the user is a superuser + return user + return None + + +basic_auth_superuser = BasicAuthSuperUser() diff --git a/applications/visualizer/backend/api/decorators/__init__.py b/applications/visualizer/backend/api/decorators/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/applications/visualizer/backend/api/decorators/streaming.py b/applications/visualizer/backend/api/decorators/streaming.py new file mode 100644 index 00000000..63a270c8 --- /dev/null +++ b/applications/visualizer/backend/api/decorators/streaming.py @@ -0,0 +1,63 @@ +import asyncio +import sys +import threading +from queue import Queue +from functools import wraps +from django.http import StreamingHttpResponse + + +def with_stdout_streaming(func): + """ + A decorator that: + - Runs the decorated function in a separate thread, + - Captures anything it prints to stdout, + - Streams that output asynchronously line-by-line as it's produced. + """ + + @wraps(func) + def wrapper(request, *args, **kwargs): + q = Queue() + + def run_func(): + # Redirect sys.stdout + old_stdout = sys.stdout + + class QueueWriter: + def write(self, data): + if data: + q.put(data) + + def flush(self): + pass # For compatibility with print + + sys.stdout = QueueWriter() + + try: + func(request, *args, **kwargs) + except Exception as e: + q.put(f"Error: {e}\n") + finally: + # Signal completion + q.put(None) + sys.stdout = old_stdout + + # Run the function in a background thread + t = threading.Thread(target=run_func) + t.start() + + # Async generator to yield lines from the queue + async def line_generator(): + while True: + line = await asyncio.to_thread(q.get) + if line is None: # End signal + break + yield line + + # Return a streaming response that sends data asynchronously + return StreamingHttpResponse( + line_generator(), + content_type="text/plain", + headers={"Content-Encoding": "identity"}, + ) + + return wrapper diff --git a/ingestion/ingestion/ingest.py b/ingestion/ingestion/ingest.py index 6dcc9a5b..7384b75e 100644 --- a/ingestion/ingestion/ingest.py +++ b/ingestion/ingestion/ingest.py @@ -4,17 +4,17 @@ import json import logging import os +import subprocess import sys -import tempfile from argparse import ArgumentParser, Namespace from concurrent.futures import ThreadPoolExecutor from datetime import datetime, timedelta, timezone from pathlib import Path - from google.api_core.exceptions import PreconditionFailed from google.cloud import storage from pydantic import ValidationError from tqdm import tqdm +import niquests from ingestion.cli import ask, type_directory, type_file from ingestion.em_metadata import EMMetadata, Tile @@ -47,8 +47,12 @@ logger = logging.getLogger(__name__) -def _done_message(dataset_name: str, dry_run: bool = False) -> str: - return f"==> Done {'upload simulation for' if dry_run else 'uploading'} dataset '{dataset_name}'! ✨" +def _done_message(dataset_name: str | None, dry_run: bool = False) -> str: + """Generate a completion message for the ingestion process.""" + if dataset_name: + return f"==> Done {'upload simulation for' if dry_run else 'uploading'} dataset '{dataset_name}'! ✨" + else: + return "==> Ingestion completed! ✨" def add_flags(parser: ArgumentParser): @@ -108,6 +112,18 @@ def env_or(name: str, default: str) -> str: ), ) + parser.add_argument( + "--populate-db", + action="store_true", + help="Trigger DB population via the API endpoint", + ) + + parser.add_argument( + "--populate-db-url", + default="https://celegans.dev.metacell.us/api/populate_db", + help="The API URL to trigger DB population", + ) + def add_add_dataset_flags(parser: ArgumentParser): parser.add_argument( @@ -460,6 +476,47 @@ def upload_em_tiles( pbar.close() +def trigger_populate_db(args): + try: + api_url = args.populate_db_url + + # Load service account credentials from gcp_credentials + with open(args.gcp_credentials, "r") as f: + gcp_creds = json.load(f) + + client_id = gcp_creds.get("client_id") + private_key_id = gcp_creds.get("private_key_id") + + if not client_id or not private_key_id: + print( + "Error: Could not extract client_id or private_key_id from gcp_credentials", + file=sys.stderr, + ) + return + + session = niquests.Session(resolver="doh+google://", multiplexed=True) + + with session.get( + api_url, auth=(client_id, private_key_id), stream=True, timeout=None + ) as response: + if response.status_code != 200: + print( + f"Error: Received status code {response.status_code}", + file=sys.stderr, + ) + return + try: + for line in response.iter_lines(decode_unicode=True): + if line: + if isinstance(line, bytes): + line = line.decode("utf-8") + print(line, flush=True) + except KeyboardInterrupt: + print("\nStreaming interrupted by user.", file=sys.stderr) + except Exception as e: + print(f"An error occurred: {e}", file=sys.stderr) + + def ingest_cmd(args: Namespace): """Runs the ingestion command.""" @@ -471,7 +528,7 @@ def ingest_cmd(args: Namespace): bucket = storage_client.get_bucket(args.gcp_bucket) rs = RemoteStorage(bucket, dry_run=args.dry_run) - dataset_id = args.id + dataset_id = getattr(args, "id", None) overwrite = args.overwrite if args.prune: @@ -485,29 +542,35 @@ def ingest_cmd(args: Namespace): elif dry_run: logger.info(f"skipped prunning files from the bucket") - if args.data: - validate_and_upload_data(dataset_id, args.data, rs, overwrite=overwrite) - elif dry_run: - logger.warning(f"skipping neurons data validation and upload") + if dataset_id: + if args.data: + validate_and_upload_data(dataset_id, args.data, rs, overwrite=overwrite) + elif dry_run: + logger.warning(f"skipping neurons data validation and upload") - if args.segmentation: - upload_segmentations(dataset_id, args.segmentation, rs, overwrite=overwrite) - elif dry_run: - logger.warning("skipping segmentation upload: flag not set") + if args.segmentation: + upload_segmentations(dataset_id, args.segmentation, rs, overwrite=overwrite) + elif dry_run: + logger.warning("skipping segmentation upload: flag not set") - if args.synapses: - upload_synapses(dataset_id, args.synapses, rs, overwrite=overwrite) - elif dry_run: - logger.warning("skipping synapses upload: flag not set") + if args.synapses: + upload_synapses(dataset_id, args.synapses, rs, overwrite=overwrite) + elif dry_run: + logger.warning("skipping synapses upload: flag not set") - if paths := getattr(args, "3d"): - upload_3d(dataset_id, paths, rs, overwrite=overwrite) - elif dry_run: - logger.warning("skipping 3D files upload: flag not set") + if paths := getattr(args, "3d"): + upload_3d(dataset_id, paths, rs, overwrite=overwrite) + elif dry_run: + logger.warning("skipping 3D files upload: flag not set") + + if args.em: + upload_em_tiles(dataset_id, args.em, rs, overwrite=overwrite) + elif dry_run: + logger.warning("skipping EM tiles upload: flag not set") - if args.em: - upload_em_tiles(dataset_id, args.em, rs, overwrite=overwrite) + if args.populate_db: + trigger_populate_db(args) elif dry_run: - logger.warning("skipping EM tiles upload: flag not set") + logger.warning("skipping populate DB: flag not set") print(_done_message(dataset_id, dry_run)) diff --git a/ingestion/pyproject.toml b/ingestion/pyproject.toml index 6eece16f..89b2c11d 100755 --- a/ingestion/pyproject.toml +++ b/ingestion/pyproject.toml @@ -17,19 +17,18 @@ maintainers = [ { name = "Diogo Correia", email = "diogo@metacell.us" }, ] dependencies = [ - "pydantic==2.8.2", - "json_source_map==1.0.5", - "crc32c==2.7.post1", "colorama==0.4.6", - "google-cloud-storage==2.18.2", - "tqdm==4.66.5", - "pillow==10.4.0", - - # extraction dependencies + "crc32c==2.7.post1", "diplib==3.5.1", "geojson==3.1.0", "geojson-rewind==1.1.0", + "google-cloud-storage==2.18.2", + "json_source_map==1.0.5", + "niquests==3.7.2", "numpy==2.0.0", + "pillow==10.4.0", + "pydantic==2.8.2", + "tqdm==4.66.5", ] [project.optional-dependencies]