Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workflows, multiple tasks depending on one another #1198

Open
nikita-krokosh opened this issue Sep 18, 2024 · 6 comments
Open

Workflows, multiple tasks depending on one another #1198

nikita-krokosh opened this issue Sep 18, 2024 · 6 comments

Comments

@nikita-krokosh
Copy link

Hi. Let's say I need job to run only if 2 others (created before) are completed successfully, AFAIU there's no support for this.

Can I extend a procrastinate job queue table myself to add array field column with required jobs ID list and I could check if all of them are finished inside my task manually or with some decorator? In other words, is having extra columns supported and won't break standard flows?

@ewjoachim
Copy link
Member

ewjoachim commented Sep 18, 2024

AFAIU there's no support for this

I think you're right

is having extra columns supported and won't break standard flows?

I don't think it's a good idea, but I can't tell you exacty what will fail. I think you should rather do this on a different table, but you can add a foreign key as long as you handle the cases where the job will be deleted (so ON DELETE CASCADE I guess)

@onlyann
Copy link
Contributor

onlyann commented Sep 19, 2024

Assuming a task C needs to run when task A and B complete, introduce an orchestrator task O that defers task A and B, waits for both of them to complete and then defers task C.

Would that work in your scenario?

@ewjoachim
Copy link
Member

Hm, I'm not sure the "Orchestrator task" is a concept I like. If the worker running it crashes, the whole set of task will be completely broken.

@onlyann
Copy link
Contributor

onlyann commented Sep 19, 2024

It is not foolproof but might be good enough.
A variant of this (still imperfect) is to defer A and B, and then defer the task O that receives the job ids it should wait for. Task O waits for those job ids to complete before deferring task C.

This way, the task O can be retried if the worker crashes.

@TkTech
Copy link

TkTech commented Sep 20, 2024

So the way Chancy does this seems to work (1b+ workflows so far), and looking at the model for Procastinate I don't see why something similar wouldn't be an option if there's no issue with adding more tables. It can be done in a way that avoids modifying the existing jobs and instead just builds on top of them.

Two tables are used, the first to track the workflows themselves:

CREATE TABLE {workflows} (
                    id UUID PRIMARY KEY,
                    name TEXT NOT NULL,
                    state TEXT NOT NULL,
                    created_at TIMESTAMPTZ DEFAULT NOW(),
                    updated_at TIMESTAMPTZ DEFAULT NOW()
                )

And another to track each step inside of a workflow, which will be updated with the ID of the job once it's started (job_id becomes a bigserial for procastinate, state becomes status, etc):

CREATE TABLE {workflow_steps} (
                    id SERIAL PRIMARY KEY,
                    workflow_id UUID REFERENCES {workflows}(id)
                        ON DELETE CASCADE,
                    step_id TEXT NOT NULL,
                    job_data JSON NOT NULL,
                    dependencies JSON NOT NULL,
                    job_id UUID,
                    created_at TIMESTAMPTZ DEFAULT NOW(),
                    updated_at TIMESTAMPTZ DEFAULT NOW()
                )

Periodically, incomplete workflows are picked up and processed:

while await self.sleep(self.polling_interval):
            await self.wait_for_leader(worker)
            workflows = await self.fetch_workflows(
                chancy,
                states=["pending", "running"],
                limit=self.max_workflows_per_run,
            )
            for workflow in workflows:
                await self.process_workflow(chancy, workflow)

Fetching the workflows and their steps can be done in a single quick query thanks to json_build_object:

SELECT 
                            w.id, 
                            w.name, 
                            w.state,
                            w.created_at,
                            w.updated_at,
                            COALESCE(json_agg(
                                json_build_object(
                                    'step_id', ws.step_id,
                                    'job_data', ws.job_data,
                                    'dependencies', ws.dependencies,
                                    'state', j.state,
                                    'job_id', ws.job_id
                                )
                            ), '[]'::json) as steps
                        FROM {workflows} w
                        LEFT JOIN {workflow_steps} ws ON w.id = ws.workflow_id
                        LEFT JOIN {jobs} j ON ws.job_id = j.id
                        WHERE (
                            %(states)s::text[] IS NULL OR
                            w.state = ANY(%(states)s::text[])
                        ) AND (
                            %(ids)s::uuid[] IS NULL OR
                            w.id = ANY(%(ids)s::uuid[])
                        )
                        GROUP BY w.id, w.name, w.state
                        LIMIT {limit}

And then the process to progress a workflow becomes trivial, ~15 lines, https://github.com/TkTech/chancy/blob/main/chancy/plugins/workflow/__init__.py#L340. job_data on each step contains the serialized job to be inserted into the job table like any normal job after all the dependencies are met.

Since procastinate doesn't have a leadership node, we'd add a locked_at column to the workflow table and do a SELECT...FOR UPDATE with a timeout and let every worker take a shot at periodically progressing workflows.

This way job's don't know they are part of a workflow, no persistent job is needed, just a periodic one, and the only relationship between the two is the job ID and state. Since this implements DAG-based workflows, it becomes easy to re-implement Celery's Chain and Group as well - 6 lines.

@nikita-krokosh
Copy link
Author

nikita-krokosh commented Sep 23, 2024

Thanks for all the answers folks. I'll try to to investigate more and come up with something.
I was thinking about celery too, but as far as I understand there's also no easy way to make jobs scheduled from different places:

  • depend on bunch of other jobs that were scheduled before.
  • maintain locking by unique id to prevent some of them from running concurrently.

@ewjoachim ewjoachim changed the title Having one job depend on another (or several). Workflows, multiple tasks depending on one another Dec 30, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants