Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/loop for crashed tasks #266

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions karton/core/karton.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,84 @@ def loop(self) -> None:
if task:
self.internal_process(task)

def loop_crashed_tasks(self) -> None:
"""
Blocking loop that consumes crashed tasks.
It's preferable for debugging issueses occured with your service.

Consumer.loop_crashed_tasks is different from Consumer.loop:
- It does not rely on `karton.queue`. It finds crashed doc in `karton.task`.
So RUN ONLY ONE REPLICA to avoid race condition
and large resource consumption.
- It does not rely in task_timeout.
- It does not register new binds.
- It does not shut down another instances on binds / version mismatch.
- It does not listen queue in a traditional way / it dows not subsribe.
It looks for tasks in `CRASHED` state.
- It does not increment `TASK_CRASHED` metrics.
- It reimplements `Cunsumer.internal_process` in simplified way.

:meta private:
"""
self.log.info("Service %s started in crash-consume mode", self.identity)

with self.graceful_killer():
while not self.shutdown:

task: Task
for task in self.backend.iter_all_tasks(parse_resources=False):
if task.headers["receiver"] == self.identity:
break
else:
self.log.warning(
"Crashed task for consumer %s not found."
"Sleeping and trying again..." % (self.identity,)
)
time.sleep(5)
continue

self.current_task = task
self.log_handler.set_task(self.current_task)

exception_str = None

try:
self.log.info("Received new task - %s", self.current_task.uid)
# self.backend.set_task_status(self.current_task, TaskState.STARTED)

self._run_pre_hooks()

saved_exception = None
try:
self.process(self.current_task)
except Exception as exc:
saved_exception = exc
raise
finally:
self._run_post_hooks(saved_exception)

self.log.info("Task done - %s", self.current_task.uid)
except Exception:
exc_info = sys.exc_info()
exception_str = traceback.format_exception(*exc_info)
self.log.exception(
"Failed to process task - %s", self.current_task.uid
)
finally:
self.backend.increment_metrics(
KartonMetrics.TASK_CONSUMED, self.identity
)

task_state = TaskState.FINISHED

# report the task status as crashed
# if an exception was caught while processing
if exception_str is not None:
task_state = TaskState.CRASHED
self.current_task.error = exception_str

self.backend.set_task_status(self.current_task, task_state)


class LogConsumer(KartonServiceBase):
"""
Expand Down
Loading