Skip to content

Commit

Permalink
Use an inner join to prevent controller from loading partially-constr…
Browse files Browse the repository at this point in the history
…ucted jobs (#681)
  • Loading branch information
mwylde authored Jul 9, 2024
1 parent 5417ce2 commit 42c480c
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 8 deletions.
14 changes: 7 additions & 7 deletions crates/arroyo-api/queries/api_queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ VALUES (:pub_id, :organization_id, :created_by, :name, :type, :textual_repr, :ud
SELECT pipelines.id, pipelines.pub_id, name, type, textual_repr, udfs, program, checkpoint_interval_micros, stop, pipelines.created_at, state, parallelism_overrides, ttl_micros
FROM pipelines
INNER JOIN job_configs on pipelines.id = job_configs.pipeline_id
LEFT JOIN job_statuses ON job_configs.id = job_statuses.id
INNER JOIN job_statuses ON job_configs.id = job_statuses.id
WHERE pipelines.organization_id = :organization_id
AND pipelines.pub_id IS NOT NULL
AND ttl_micros IS NULL
Expand All @@ -146,7 +146,7 @@ LIMIT cast(:limit as integer);
SELECT pipelines.id, pipelines.pub_id, name, type, textual_repr, udfs, program, checkpoint_interval_micros, stop, pipelines.created_at, state, parallelism_overrides, ttl_micros
FROM pipelines
INNER JOIN job_configs on pipelines.id = job_configs.pipeline_id
LEFT JOIN job_statuses ON job_configs.id = job_statuses.id
INNER JOIN job_statuses ON job_configs.id = job_statuses.id
WHERE pipelines.pub_id = :pub_id AND pipelines.organization_id = :organization_id;

--! get_pipeline_id
Expand Down Expand Up @@ -196,31 +196,31 @@ INSERT INTO job_statuses (pub_id, id, organization_id) VALUES (:pub_id, :id, :or
--! get_jobs: (start_time?, finish_time?, state?, tasks?, textual_repr?, failure_message?, run_id?, udfs)
SELECT job_configs.id as id, pipeline_name, stop, textual_repr, start_time, finish_time, state, tasks, pipeline_id, failure_message, run_id, udfs
FROM job_configs
LEFT JOIN job_statuses ON job_configs.id = job_statuses.id
INNER JOIN job_statuses ON job_configs.id = job_statuses.id
INNER JOIN pipelines ON pipeline_id = pipelines.id
WHERE job_configs.organization_id = :organization_id AND ttl_micros IS NULL
ORDER BY COALESCE(job_configs.updated_at, job_configs.created_at) DESC;

--! get_pipeline_jobs : DbPipelineJob(start_time?, finish_time?, state?, tasks?, failure_message?, run_id?)
SELECT job_configs.id, stop, start_time, finish_time, state, tasks, failure_message, run_id, checkpoint_interval_micros, job_configs.created_at
FROM job_configs
LEFT JOIN job_statuses ON job_configs.id = job_statuses.id
INNER JOIN job_statuses ON job_configs.id = job_statuses.id
INNER JOIN pipelines ON pipelines.id = job_configs.pipeline_id
WHERE job_configs.organization_id = :organization_id AND pipelines.pub_id = :pub_id
ORDER BY job_configs.created_at DESC;

--! get_all_jobs : DbPipelineJob(start_time?, finish_time?, state?, tasks?, failure_message?, run_id?)
SELECT job_configs.id, stop, start_time, finish_time, state, tasks, failure_message, run_id, checkpoint_interval_micros, job_configs.created_at
FROM job_configs
LEFT JOIN job_statuses ON job_configs.id = job_statuses.id
INNER JOIN job_statuses ON job_configs.id = job_statuses.id
INNER JOIN pipelines ON pipelines.id = job_configs.pipeline_id
WHERE job_configs.organization_id = :organization_id AND ttl_micros IS NULL
ORDER BY job_configs.created_at DESC;

--! get_pipeline_job : DbPipelineJob(start_time?, finish_time?, state?, tasks?, failure_message?, run_id?)
SELECT job_configs.id, stop, start_time, finish_time, state, tasks, failure_message, run_id, checkpoint_interval_micros, job_configs.created_at
FROM job_configs
LEFT JOIN job_statuses ON job_configs.id = job_statuses.id
INNER JOIN job_statuses ON job_configs.id = job_statuses.id
INNER JOIN pipelines ON pipelines.id = job_configs.pipeline_id
WHERE job_configs.organization_id = :organization_id AND job_configs.id = :job_id
ORDER BY job_configs.created_at DESC;
Expand All @@ -229,7 +229,7 @@ ORDER BY job_configs.created_at DESC;
--! get_job_details: (start_time?, finish_time?, state?, tasks?, textual_repr?, udfs, failure_message?, run_id?)
SELECT pipeline_name, stop, parallelism_overrides, state, start_time, finish_time, tasks, textual_repr, program, pipeline_id, udfs, failure_message, run_id
FROM job_configs
LEFT JOIN job_statuses ON job_configs.id = job_statuses.id
INNER JOIN job_statuses ON job_configs.id = job_statuses.id
INNER JOIN pipelines ON pipeline_id = pipelines.id
WHERE job_configs.organization_id = :organization_id AND job_configs.id = :job_id;

Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-controller/queries/controller_queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ SELECT
s.restart_nonce as status_restart_nonce,
restart_mode
FROM job_configs c
LEFT JOIN job_statuses s ON c.id = s.id;
INNER JOIN job_statuses s ON c.id = s.id;

--! update_job_status (start_time?, finish_time?, tasks?, failure_message?, pipeline_path?, wasm_path?)
UPDATE job_statuses
Expand Down

0 comments on commit 42c480c

Please sign in to comment.