Skip to content

Commit

Permalink
CELE-119 feat: Replace curl with niquests
Browse files Browse the repository at this point in the history
  • Loading branch information
afonsobspinto committed Jan 9, 2025
1 parent e095d56 commit ef841ec
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 19 deletions.
3 changes: 2 additions & 1 deletion applications/visualizer/backend/api/decorators/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ async def line_generator():
yield line

# Return a streaming response that sends data asynchronously
return StreamingHttpResponse(line_generator(), content_type="text/plain")
return StreamingHttpResponse(line_generator(), content_type="text/plain", headers={"Content-Encoding": "identity"})


return wrapper
35 changes: 17 additions & 18 deletions ingestion/ingestion/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timedelta, timezone
from pathlib import Path

from google.api_core.exceptions import PreconditionFailed
from google.cloud import storage
from pydantic import ValidationError
from tqdm import tqdm
import niquests

from ingestion.cli import ask, type_directory, type_file
from ingestion.em_metadata import EMMetadata, Tile
Expand Down Expand Up @@ -494,25 +494,24 @@ def trigger_populate_db(args):
)
return

# Add `stdbuf` to ensure curl is unbuffered
command = [
"stdbuf",
"-oL", # Force line buffering
"curl",
"-N", # Disable buffering in curl
"-u",
f"{client_id}:{private_key_id}",
f"{api_url}",
]

with subprocess.Popen(
command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
) as proc:
session = niquests.Session(resolver="doh+google://", multiplexed=True)

with session.get(
api_url, auth=(client_id, private_key_id), stream=True, timeout=None
) as response:
if response.status_code != 200:
print(
f"Error: Received status code {response.status_code}",
file=sys.stderr,
)
return
try:
for line in proc.stdout:
print(line, end="", flush=True) # Real-time output
for line in response.iter_lines(decode_unicode=True):
if line:
if isinstance(line, bytes):
line = line.decode("utf-8")
print(line, flush=True)
except KeyboardInterrupt:
proc.terminate()
print("\nStreaming interrupted by user.", file=sys.stderr)
except Exception as e:
print(f"An error occurred: {e}", file=sys.stderr)
Expand Down

0 comments on commit ef841ec

Please sign in to comment.