Skip to content

Commit

Permalink
chaining optimizer
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde committed Dec 2, 2024
1 parent c04bc2e commit f11a90d
Show file tree
Hide file tree
Showing 8 changed files with 150 additions and 33 deletions.
9 changes: 7 additions & 2 deletions crates/arroyo-controller/src/job_controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,11 @@ impl JobController {
.map(|(id, w)| (*id, w.connect.clone()))
.collect();
let program = self.model.program.clone();
let operator_indices: Arc<HashMap<_, _>> = Arc::new(program.graph
.node_indices()
.map(|idx| (program.graph[idx].node_id, idx.index() as u32))
.collect());


self.model.metric_update_task = Some(tokio::spawn(async move {
let mut metrics: HashMap<(u32, u32), HashMap<MetricName, u64>> = HashMap::new();
Expand All @@ -682,12 +687,12 @@ impl JobController {
.into_iter()
.filter_map(|f| Some((get_metric_name(&f.name?)?, f.metric)))
.flat_map(|(metric, values)| {
let program = program.clone();
let operator_indices = operator_indices.clone();
values.into_iter().filter_map(move |m| {
let subtask_idx =
u32::from_str(find_label(&m.label, "subtask_idx")?).ok()?;
let operator_idx =
program.operator_index(u32::from_str(find_label(&m.label, "node_id")?).ok()?)?;
*operator_indices.get(&u32::from_str(find_label(&m.label, "node_id")?).ok()?)?;
let value = m
.counter
.map(|c| c.value)
Expand Down
1 change: 1 addition & 0 deletions crates/arroyo-datastream/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#![allow(clippy::comparison_chain)]

pub mod logical;
pub mod optimizers;

use arroyo_rpc::config::{config, DefaultSink};
use arroyo_rpc::grpc::api;
Expand Down
37 changes: 8 additions & 29 deletions crates/arroyo-datastream/src/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ use arroyo_rpc::api_types::pipelines::{PipelineEdge, PipelineGraph, PipelineNode
use arroyo_rpc::df::ArroyoSchema;
use arroyo_rpc::grpc::api;
use arroyo_rpc::grpc::api::{
ArrowProgram, ArrowProgramConfig, ChainedOperator, ConnectorOp, EdgeType,
ArrowProgram, ArrowProgramConfig, ConnectorOp, EdgeType,
};
use datafusion_proto::generated::datafusion;
use petgraph::dot::Dot;
use petgraph::graph::DiGraph;
use petgraph::prelude::EdgeRef;
Expand All @@ -25,6 +24,7 @@ use std::hash::Hasher;
use std::str::FromStr;
use std::sync::Arc;
use strum::{Display, EnumString};
use crate::optimizers::Optimizer;

#[derive(Clone, Copy, Debug, Eq, PartialEq, EnumString, Display)]
pub enum OperatorName {
Expand Down Expand Up @@ -136,27 +136,23 @@ impl TryFrom<LogicalProgram> for PipelineGraph {
pub struct LogicalEdge {
pub edge_type: LogicalEdgeType,
pub schema: ArroyoSchema,
pub projection: Option<Vec<usize>>,
}

impl LogicalEdge {
pub fn new(
edge_type: LogicalEdgeType,
schema: ArroyoSchema,
projection: Option<Vec<usize>>,
) -> Self {
LogicalEdge {
edge_type,
schema,
projection,
}
}

pub fn project_all(edge_type: LogicalEdgeType, schema: ArroyoSchema) -> Self {
LogicalEdge {
edge_type,
schema,
projection: None,
}
}
}
Expand All @@ -170,8 +166,8 @@ pub struct ChainedLogicalOperator {

#[derive(Clone, Debug)]
pub struct OperatorChain {
operators: Vec<ChainedLogicalOperator>,
edges: Vec<ArroyoSchema>,
pub(crate) operators: Vec<ChainedLogicalOperator>,
pub(crate) edges: Vec<ArroyoSchema>,
}

impl OperatorChain {
Expand Down Expand Up @@ -283,22 +279,19 @@ pub struct ProgramConfig {
pub struct LogicalProgram {
pub graph: LogicalGraph,
pub program_config: ProgramConfig,
pub operator_indices: HashMap<u32, u32>,
}

impl LogicalProgram {
pub fn new(graph: LogicalGraph, program_config: ProgramConfig) -> Self {
let operator_indices = graph
.node_indices()
.map(|idx| (graph[idx].node_id, idx.index() as u32))
.collect();

Self {
graph,
program_config,
operator_indices,
}
}

pub fn optimize(&mut self, optimizer: &dyn Optimizer) {
optimizer.optimize(&mut self.graph);
}

pub fn update_parallelism(&mut self, overrides: &HashMap<u32, usize>) {
for node in self.graph.node_weights_mut() {
Expand Down Expand Up @@ -341,10 +334,6 @@ impl LogicalProgram {
.collect()
}

pub fn operator_index(&self, id: u32) -> Option<u32> {
self.operator_indices.get(&id).cloned()
}

pub fn tasks_per_operator(&self) -> HashMap<String, usize> {
let mut tasks_per_operator = HashMap::new();
for node in self.graph.node_weights() {
Expand Down Expand Up @@ -455,11 +444,6 @@ impl TryFrom<ArrowProgram> for LogicalProgram {
LogicalEdge {
edge_type: edge.edge_type().into(),
schema: schema.clone().try_into()?,
projection: if edge.projection.is_empty() {
None
} else {
Some(edge.projection.iter().map(|p| *p as usize).collect())
},
},
);
}
Expand Down Expand Up @@ -644,11 +628,6 @@ impl From<LogicalProgram> for ArrowProgram {
target: target.index() as i32,
schema: Some(edge.schema.clone().into()),
edge_type: edge_type as i32,
projection: edge
.projection
.as_ref()
.map(|p| p.iter().map(|v| *v as u32).collect())
.unwrap_or_default(),
}
})
.collect();
Expand Down
123 changes: 123 additions & 0 deletions crates/arroyo-datastream/src/optimizers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
use std::collections::HashSet;
use petgraph::prelude::*;
use crate::logical::{LogicalEdgeType, LogicalGraph};

pub trait Optimizer {
fn optimize(&self, plan: &mut LogicalGraph);
}

pub struct ChainingOptimizer {
}


impl Optimizer for ChainingOptimizer {
fn optimize(&self, plan: &mut LogicalGraph) {
let node_indices: Vec<NodeIndex> = plan.node_indices().collect();
let mut removed_nodes = HashSet::new();

for &node_idx in &node_indices {
if removed_nodes.contains(&node_idx) {
continue;
}

let mut current_node = match plan.node_weight(node_idx) {
Some(node) => node.clone(),
None => continue,
};

// sources and sinks can't be chained
if current_node.operator_chain.is_source() || current_node.operator_chain.is_sink() {
continue;
}

let mut chain = vec![node_idx];
let mut next_node_idx = node_idx;

loop {
let mut successors = plan
.edges_directed(next_node_idx, Outgoing)
.collect::<Vec<_>>();

if successors.len() != 1 {
break;
}

let edge = successors.remove(0);
let edge_type = edge.weight().edge_type;

if edge_type != LogicalEdgeType::Forward {
break;
}

let successor_idx = edge.target();

if removed_nodes.contains(&successor_idx) {
break;
}

let successor_node = match plan.node_weight(successor_idx) {
Some(node) => node.clone(),
None => break,
};

// skip if parallelism doesn't match or successor is a sink
if current_node.parallelism != successor_node.parallelism || successor_node.operator_chain.is_sink()
{
break;
}

// skip successors with multiple predecessors
if plan.edges_directed(successor_idx, Incoming).count() > 1 {
break;
}

chain.push(successor_idx);
next_node_idx = successor_idx;
}

if chain.len() > 1 {
for i in 1..chain.len() {
let node_to_merge_idx = chain[i];
let node_to_merge = plan.node_weight(node_to_merge_idx).unwrap().clone();

current_node.description = format!(
"{} -> {}",
current_node.description, node_to_merge.description
);

current_node
.operator_chain
.operators
.extend(node_to_merge.operator_chain.operators.clone());

if let Some(edge_idx) = plan.find_edge(chain[i - 1], node_to_merge_idx) {
let edge = plan.edge_weight(edge_idx).unwrap();
current_node
.operator_chain
.edges
.push(edge.schema.clone());
}

removed_nodes.insert(node_to_merge_idx);
}

plan[node_idx] = current_node;

let last_node_idx = *chain.last().unwrap();
let outgoing_edges: Vec<_> = plan
.edges_directed(last_node_idx, petgraph::Outgoing)
.map(|e| (e.id(), e.target(), e.weight().clone()))
.collect();

for (edge_id, target_idx, edge_weight) in outgoing_edges {
plan.remove_edge(edge_id);
plan.add_edge(node_idx, target_idx, edge_weight);
}
}
}

for node_idx in removed_nodes {
plan.remove_node(node_idx);
}
}
}
8 changes: 7 additions & 1 deletion crates/arroyo-planner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ use std::{collections::HashMap, sync::Arc};
use syn::Item;
use tracing::{debug, info, warn};
use unicase::UniCase;
use arroyo_datastream::optimizers::ChainingOptimizer;
use arroyo_rpc::config::config;

const DEFAULT_IDLE_TIME: Option<Duration> = Some(Duration::from_secs(5 * 60));
pub const ASYNC_RESULT_FIELD: &str = "__async_result";
Expand Down Expand Up @@ -804,13 +806,17 @@ pub async fn parse_and_get_arrow_program(
}
let graph = plan_to_graph_visitor.into_graph();

let program = LogicalProgram::new(
let mut program = LogicalProgram::new(
graph,
ProgramConfig {
udf_dylibs: schema_provider.dylib_udfs.clone(),
python_udfs: schema_provider.python_udfs.clone(),
},
);

if arroyo_rpc::config::config().pipeline.enable_chaining {
program.optimize(&ChainingOptimizer{});
}

Ok(CompiledSql {
program,
Expand Down
1 change: 1 addition & 0 deletions crates/arroyo-rpc/default.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ worker-heartbeat-timeout = "30s"
healthy-duration = "2m"
worker-startup-time = "10m"
task-startup-time = "2m"
enable-chaining = false

[pipeline.compaction]
enabled = false
Expand Down
1 change: 0 additions & 1 deletion crates/arroyo-rpc/proto/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -298,5 +298,4 @@ message ArrowEdge {
int32 target = 2;
ArroyoSchema schema = 4;
EdgeType edge_type = 5;
repeated uint32 projection = 6;
}
3 changes: 3 additions & 0 deletions crates/arroyo-rpc/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,9 @@ pub struct PipelineConfig {
/// Default sink, for when none is specified
#[serde(default)]
pub default_sink: DefaultSink,

/// Whether to enable operator chaining
pub enable_chaining: bool,

pub compaction: CompactionConfig,
}
Expand Down

0 comments on commit f11a90d

Please sign in to comment.