diff --git a/karton/core/karton.py b/karton/core/karton.py index 5d819b9..1414331 100644 --- a/karton/core/karton.py +++ b/karton/core/karton.py @@ -346,6 +346,84 @@ def loop(self) -> None: if task: self.internal_process(task) + def loop_crashed_tasks(self) -> None: + """ + Blocking loop that consumes crashed tasks. + It's preferable for debugging issueses occured with your service. + + Consumer.loop_crashed_tasks is different from Consumer.loop: + - It does not rely on `karton.queue`. It finds crashed doc in `karton.task`. + So RUN ONLY ONE REPLICA to avoid race condition + and large resource consumption. + - It does not rely in task_timeout. + - It does not register new binds. + - It does not shut down another instances on binds / version mismatch. + - It does not listen queue in a traditional way / it dows not subsribe. + It looks for tasks in `CRASHED` state. + - It does not increment `TASK_CRASHED` metrics. + - It reimplements `Cunsumer.internal_process` in simplified way. + + :meta private: + """ + self.log.info("Service %s started in crash-consume mode", self.identity) + + with self.graceful_killer(): + while not self.shutdown: + + task: Task + for task in self.backend.iter_all_tasks(parse_resources=False): + if task.headers["receiver"] == self.identity: + break + else: + self.log.warning( + "Crashed task for consumer %s not found." + "Sleeping and trying again..." % (self.identity,) + ) + time.sleep(5) + continue + + self.current_task = task + self.log_handler.set_task(self.current_task) + + exception_str = None + + try: + self.log.info("Received new task - %s", self.current_task.uid) + # self.backend.set_task_status(self.current_task, TaskState.STARTED) + + self._run_pre_hooks() + + saved_exception = None + try: + self.process(self.current_task) + except Exception as exc: + saved_exception = exc + raise + finally: + self._run_post_hooks(saved_exception) + + self.log.info("Task done - %s", self.current_task.uid) + except Exception: + exc_info = sys.exc_info() + exception_str = traceback.format_exception(*exc_info) + self.log.exception( + "Failed to process task - %s", self.current_task.uid + ) + finally: + self.backend.increment_metrics( + KartonMetrics.TASK_CONSUMED, self.identity + ) + + task_state = TaskState.FINISHED + + # report the task status as crashed + # if an exception was caught while processing + if exception_str is not None: + task_state = TaskState.CRASHED + self.current_task.error = exception_str + + self.backend.set_task_status(self.current_task, task_state) + class LogConsumer(KartonServiceBase): """