diff --git a/src/main.py b/src/main.py index 244b943..b190eb6 100644 --- a/src/main.py +++ b/src/main.py @@ -331,6 +331,8 @@ def dispatch_fanned_out_messages(client: httpx.AsyncClient, tasks: collections.abc.MutableSet[asyncio.Task], send_info: Submission, gauges: collections.abc.Mapping[str, Metrics], + *, + retry_knative: bool, ): """Package and send the fanned-out messages to Prompt Processing. @@ -346,6 +348,8 @@ def dispatch_fanned_out_messages(client: httpx.AsyncClient, The data and address to submit. gauges : mapping [`str`, `Metrics`] A mapping from instrument name to metrics for that instrument. + retry_knative : `bool` + Whether or not Knative requests can be retried. """ try: attributes = { @@ -372,6 +376,7 @@ def dispatch_fanned_out_messages(client: httpx.AsyncClient, headers, body, str(info), + retry=retry_knative, ) ) tasks.add(task) @@ -389,22 +394,28 @@ async def knative_request( headers: dict[str, str], body: bytes, info: str, + *, + retry: bool, ) -> None: """Makes knative http request. Parameters ---------- - client: `httpx.AsyncClient` + in_process_requests_gauge : `prometheus_client.Gauge` + A gauge to be updated with the start and end of the request. + client : `httpx.AsyncClient` The async httpx client. knative_serving_url : `string` The url for the knative instance. - headers: dict[`str,'str'] + headers : dict[`str,'str'] The headers to pass to knative. - body: `bytes` + body : `bytes` The next visit message body. - info: `str` + info : `str` Information such as some fields of the next visit message to identify this request and to log with. + retry : `bool` + Whether or not requests can be retried. """ with in_process_requests_gauge.track_inprogress(): result = await client.post( @@ -418,8 +429,12 @@ async def knative_request( f"nextVisit {info} status code {result.status_code} for initial request {result.content}" ) - ''' - if result.status_code == 502 or result.status_code == 503: + if retry and result.status_code == 503: + if 'Retry-After' in result.headers: + delay = int(result.headers['Retry-After']) + logging.info("Waiting %d seconds before retrying nextVisit %s...", delay, info) + await asyncio.sleep(delay) + logging.info( f"retry after status code {result.status_code} for nextVisit {info}" ) @@ -430,9 +445,9 @@ async def knative_request( timeout=None, ) logging.info( - f"nextVisit {info} retried request {retry_result.content}" + f"nextVisit {info} status code {retry_result.status_code} for " + f"retried request {retry_result.content}" ) - ''' async def main() -> None: @@ -446,6 +461,7 @@ async def main() -> None: expire = float(os.environ["MESSAGE_EXPIRATION"]) kafka_schema_registry_url = os.environ["KAFKA_SCHEMA_REGISTRY_URL"] max_outgoing = int(os.environ["MAX_FAN_OUT_MESSAGES"]) + retry_knative = os.environ["RETRY_KNATIVE_REQUESTS"].lower() == "true" # kafka auth sasl_username = os.environ["SASL_USERNAME"] @@ -513,7 +529,8 @@ async def main() -> None: gauges, hsc_upload_detectors, ) - dispatch_fanned_out_messages(client, topic, tasks, send_info, gauges) + dispatch_fanned_out_messages(client, topic, tasks, send_info, gauges, + retry_knative=retry_knative) except UnsupportedMessageError: logging.exception("Could not process message, continuing.") finally: