diff --git a/crates/arroyo-controller/src/job_controller/job_metrics.rs b/crates/arroyo-controller/src/job_controller/job_metrics.rs index 4368b86ac..6087b551c 100644 --- a/crates/arroyo-controller/src/job_controller/job_metrics.rs +++ b/crates/arroyo-controller/src/job_controller/job_metrics.rs @@ -29,7 +29,7 @@ pub fn get_metric_name(name: &str) -> Option { #[derive(Copy, Clone, Eq, PartialEq, Hash)] pub struct TaskKey { - pub operator_id: u32, + pub node_id: u32, pub subtask_idx: u32, } @@ -46,7 +46,7 @@ impl JobMetrics { for i in 0..program.graph[op].parallelism { tasks.insert( TaskKey { - operator_id: op.index() as u32, + node_id: op.index() as u32, subtask_idx: i as u32, }, TaskMetrics::new(), @@ -60,23 +60,18 @@ impl JobMetrics { } } - pub async fn update( - &self, - operator_id: u32, - subtask_idx: u32, - values: &HashMap, - ) { + pub async fn update(&self, node_id: u32, subtask_idx: u32, values: &HashMap) { let now = SystemTime::now(); let key = TaskKey { - operator_id, + node_id, subtask_idx, }; let mut tasks = self.tasks.write().await; let Some(task) = tasks.get_mut(&key) else { warn!( "Task not found for operator_id: {}, subtask_idx: {}", - operator_id, subtask_idx + node_id, subtask_idx ); return; }; @@ -106,7 +101,7 @@ impl JobMetrics { HashMap::new(); for (k, v) in self.tasks.read().await.iter() { - let op = metric_groups.entry(k.operator_id).or_default(); + let op = metric_groups.entry(k.node_id).or_default(); for (metric, rate) in &v.rates { op.entry(*metric).or_default().push(SubtaskMetrics { diff --git a/crates/arroyo-metrics/src/lib.rs b/crates/arroyo-metrics/src/lib.rs index 0dc180481..54b0d8f43 100644 --- a/crates/arroyo-metrics/src/lib.rs +++ b/crates/arroyo-metrics/src/lib.rs @@ -2,8 +2,8 @@ use std::collections::HashMap; use std::sync::{Arc, OnceLock, RwLock}; use arroyo_types::{ - ChainInfo, TaskInfo, BATCHES_RECV, BATCHES_SENT, BYTES_RECV, BYTES_SENT, - DESERIALIZATION_ERRORS, MESSAGES_RECV, MESSAGES_SENT, + ChainInfo, BATCHES_RECV, BATCHES_SENT, BYTES_RECV, BYTES_SENT, DESERIALIZATION_ERRORS, + MESSAGES_RECV, MESSAGES_SENT, }; use lazy_static::lazy_static; use prometheus::{ @@ -42,7 +42,7 @@ pub fn histogram_for_task( lazy_static! { pub static ref TASK_METRIC_LABELS: Vec<&'static str> = - vec!["operator_id", "subtask_idx", "operator_name"]; + vec!["node_id", "subtask_idx", "operator_name"]; pub static ref MESSAGE_RECV_COUNTER: IntCounterVec = register_int_counter_vec!( MESSAGES_RECV, "Count of messages received by this subtask", diff --git a/crates/arroyo-operator/src/operator.rs b/crates/arroyo-operator/src/operator.rs index d349da594..51152d7f0 100644 --- a/crates/arroyo-operator/src/operator.rs +++ b/crates/arroyo-operator/src/operator.rs @@ -545,24 +545,40 @@ impl ChainedOperator { control_message: &ControlMessage, shutdown_after_commit: bool, ) -> bool { - for (op, ctx) in self.iter_mut() { - match control_message { - ControlMessage::Checkpoint(_) => { - error!("shouldn't receive checkpoint") - } - ControlMessage::Stop { .. } => { - error!("shouldn't receive stop") - } - ControlMessage::Commit { epoch, commit_data } => { - op.handle_commit(*epoch, &commit_data, ctx).await; - return shutdown_after_commit; - } - ControlMessage::LoadCompacted { compacted } => { - ctx.load_compacted(compacted).await; - } - ControlMessage::NoOp => {} + match control_message { + ControlMessage::Checkpoint(_) => { + error!("shouldn't receive checkpoint") + } + ControlMessage::Stop { .. } => { + error!("shouldn't receive stop") + } + ControlMessage::Commit { epoch, commit_data } => { + assert!( + self.next.is_none(), + "can only commit sinks, which cannot be chained" + ); + self.operator + .handle_commit(*epoch, &commit_data, &mut self.context) + .await; + return shutdown_after_commit; } + ControlMessage::LoadCompacted { compacted } => { + self.iter_mut() + .find(|(_, ctx)| ctx.task_info.operator_id == compacted.operator_id) + .unwrap_or_else(|| { + panic!( + "could not load compacted data for unknown operator '{}'", + compacted.operator_id + ) + }) + .1 + .load_compacted(compacted) + .await; + } + ControlMessage::NoOp => {} } + + for (op, ctx) in self.iter_mut() {} false } diff --git a/crates/arroyo-state/src/metrics.rs b/crates/arroyo-state/src/metrics.rs index 0816d9e6e..793dca9eb 100644 --- a/crates/arroyo-state/src/metrics.rs +++ b/crates/arroyo-state/src/metrics.rs @@ -2,15 +2,14 @@ use lazy_static::lazy_static; use prometheus::{register_gauge_vec, GaugeVec}; lazy_static! { - pub static ref WORKER_LABELS_NAMES: Vec<&'static str> = vec!["operator_id", "task_id"]; + pub static ref WORKER_LABELS_NAMES: Vec<&'static str> = vec!["node_id", "task_id"]; pub static ref CURRENT_FILES_GAUGE: GaugeVec = register_gauge_vec!( "arroyo_worker_current_files", "Number of parquet files in the checkpoint", &WORKER_LABELS_NAMES ) .unwrap(); - pub static ref TABLE_LABELS_NAMES: Vec<&'static str> = - vec!["operator_id", "task_id", "table_char"]; + pub static ref TABLE_LABELS_NAMES: Vec<&'static str> = vec!["node_id", "task_id", "table_char"]; pub static ref TABLE_SIZE_GAUGE: GaugeVec = register_gauge_vec!( "arroyo_worker_table_size_keys", "Number of keys in the table", diff --git a/crates/arroyo-worker/src/lib.rs b/crates/arroyo-worker/src/lib.rs index 9ffbb0b17..86dd5b23a 100644 --- a/crates/arroyo-worker/src/lib.rs +++ b/crates/arroyo-worker/src/lib.rs @@ -634,7 +634,7 @@ impl WorkerGrpc for WorkerServer { } } - return Ok(Response::new(LoadCompactedDataRes {})); + Ok(Response::new(LoadCompactedDataRes {})) } async fn stop_execution( diff --git a/webui/package.json b/webui/package.json index efec76f4f..b8382e68a 100644 --- a/webui/package.json +++ b/webui/package.json @@ -9,7 +9,7 @@ "preview": "vite preview", "format": "npx prettier --write src/ && npx eslint --fix --ext .js,.jsx,.ts,.tsx src", "check": "npx prettier --check src/ && npx eslint --ext .js,.jsx,.ts,.tsx src", - "openapi": "cargo build --package arroyo-openapi && npx openapi-typescript $(pwd)/../target/api-spec.json --output $(pwd)/src/gen/api-types.ts" + "openapi": "cargo build --package arroyo-openapi && pnpm exec openapi-typescript $(pwd)/../target/api-spec.json --output $(pwd)/src/gen/api-types.ts" }, "dependencies": { "@babel/core": "^7.26.0", @@ -47,7 +47,7 @@ "monaco-editor": "^0.34.1", "monaco-sql-languages": "^0.9.5", "openapi-fetch": "^0.6.2", - "openapi-typescript": "^6.7.6", + "openapi-typescript": "=6.2.8", "prop-types": "^15.8.1", "react": "^18.3.1", "react-dom": "^18.3.1", diff --git a/webui/pnpm-lock.yaml b/webui/pnpm-lock.yaml index df8c069f7..6d399b5aa 100644 --- a/webui/pnpm-lock.yaml +++ b/webui/pnpm-lock.yaml @@ -114,8 +114,8 @@ importers: specifier: ^0.6.2 version: 0.6.2 openapi-typescript: - specifier: ^6.7.6 - version: 6.7.6 + specifier: '=6.2.8' + version: 6.2.8 prop-types: specifier: ^15.8.1 version: 15.8.1 @@ -2830,8 +2830,8 @@ packages: openapi-fetch@0.6.2: resolution: {integrity: sha512-Faj29Kzh7oCbt1bz6vAGNKtRJlV/GolOQTx87eYUnfCK7eVXdN9jQVojroc7tcJ5OQgyhbeOqD7LS/8UtGBnMQ==} - openapi-typescript@6.7.6: - resolution: {integrity: sha512-c/hfooPx+RBIOPM09GSxABOZhYPblDoyaGhqBkD/59vtpN21jEuWKDlM0KYTvqJVlSYjKs0tBcIdeXKChlSPtw==} + openapi-typescript@6.2.8: + resolution: {integrity: sha512-yA+y5MHiu6cjmtsGfNLavzVuvGCKzjL3H+exgHDPK6bnp6ZVFibtAiafenNSRDWL0x+7Sw/VPv5SbaqiPLW46w==} hasBin: true optionator@0.9.4: @@ -6712,7 +6712,7 @@ snapshots: openapi-fetch@0.6.2: {} - openapi-typescript@6.7.6: + openapi-typescript@6.2.8: dependencies: ansi-colors: 4.1.3 fast-glob: 3.3.2 diff --git a/webui/post b/webui/post deleted file mode 100644 index a185aa4db..000000000 --- a/webui/post +++ /dev/null @@ -1,6 +0,0 @@ -Projection: COUNT(UInt8(1)) AS count, SUM(price) AS price_sum, AVG(price) AS avg_price, MIN(price) AS min_price, MAX(price) AS max_price, SUM(price / price) AS has_price, SUM(price * auction), MIN(price - auction), MAX(price % auction), AVG(price >> auction) - Aggregate: groupBy=[[hop(IntervalDayTime("2000"), IntervalDayTime("10000"))]], aggr=[[COUNT(UInt8(1)), SUM(price), AVG(price), MIN(price), MAX(price), SUM(price / price), SUM(price * auction), MIN(price - auction), MAX(price % auction), AVG(price >> auction)]] - Projection: auction, price - Projection: (nexmark_thousand.bid)[auction] AS auction, (nexmark_thousand.bid)[price] AS price - TableScan: nexmark_thousand - diff --git a/webui/pre b/webui/pre deleted file mode 100644 index a185aa4db..000000000 --- a/webui/pre +++ /dev/null @@ -1,6 +0,0 @@ -Projection: COUNT(UInt8(1)) AS count, SUM(price) AS price_sum, AVG(price) AS avg_price, MIN(price) AS min_price, MAX(price) AS max_price, SUM(price / price) AS has_price, SUM(price * auction), MIN(price - auction), MAX(price % auction), AVG(price >> auction) - Aggregate: groupBy=[[hop(IntervalDayTime("2000"), IntervalDayTime("10000"))]], aggr=[[COUNT(UInt8(1)), SUM(price), AVG(price), MIN(price), MAX(price), SUM(price / price), SUM(price * auction), MIN(price - auction), MAX(price % auction), AVG(price >> auction)]] - Projection: auction, price - Projection: (nexmark_thousand.bid)[auction] AS auction, (nexmark_thousand.bid)[price] AS price - TableScan: nexmark_thousand - diff --git a/webui/src/components/OperatorDetail.tsx b/webui/src/components/OperatorDetail.tsx index 8e2cd7052..c288848e8 100644 --- a/webui/src/components/OperatorDetail.tsx +++ b/webui/src/components/OperatorDetail.tsx @@ -18,10 +18,10 @@ import { components } from '../gen/api-types'; export interface OperatorDetailProps { pipelineId: string; jobId: string; - operatorId: string; + nodeId: number; } -const OperatorDetail: React.FC = ({ pipelineId, jobId, operatorId }) => { +const OperatorDetail: React.FC = ({ pipelineId, jobId, nodeId }) => { const { pipeline } = usePipeline(pipelineId); const { operatorMetricGroups, operatorMetricGroupsLoading, operatorMetricGroupsError } = useJobMetrics(pipelineId, jobId); @@ -39,8 +39,8 @@ const OperatorDetail: React.FC = ({ pipelineId, jobId, oper return ; } - const node = pipeline.graph.nodes.find(n => n.nodeId == operatorId); - const operatorMetricGroup = operatorMetricGroups.find(o => o.operatorId == operatorId); + const node = pipeline.graph.nodes.find(n => n.nodeId == nodeId); + const operatorMetricGroup = operatorMetricGroups.find(o => o.nodeId == nodeId); if (!operatorMetricGroup) { return ; @@ -107,7 +107,20 @@ const OperatorDetail: React.FC = ({ pipelineId, jobId, oper Backpressure: {backpressureBadge} - {node?.operator} + + + {node?.nodeId} + + {node?.description} + {Math.round(msgRecv)} eps rx {Math.round(msgSent)} eps tx diff --git a/webui/src/gen/api-types.ts b/webui/src/gen/api-types.ts index 114cc2622..b7eb96f3e 100644 --- a/webui/src/gen/api-types.ts +++ b/webui/src/gen/api-types.ts @@ -272,7 +272,7 @@ export interface components { createdAt: number; definition: string; description?: string | null; - dylibUrl: string; + dylibUrl?: string | null; id: string; language: components["schemas"]["UdfLanguage"]; name: string; @@ -355,7 +355,8 @@ export interface components { }; OperatorMetricGroup: { metricGroups: (components["schemas"]["MetricGroup"])[]; - operatorId: string; + /** Format: int32 */ + nodeId: number; }; OperatorMetricGroupCollection: { data: (components["schemas"]["OperatorMetricGroup"])[]; @@ -396,10 +397,12 @@ export interface components { hasMore: boolean; }; PipelineEdge: { - destId: string; + /** Format: int32 */ + destId: number; edgeType: string; keyType: string; - srcId: string; + /** Format: int32 */ + srcId: number; valueType: string; }; PipelineGraph: { @@ -408,7 +411,8 @@ export interface components { }; PipelineNode: { description: string; - nodeId: string; + /** Format: int32 */ + nodeId: number; operator: string; /** Format: int32 */ parallelism: number; @@ -469,6 +473,7 @@ export interface components { SourceField: { fieldName: string; fieldType: components["schemas"]["SourceFieldType"]; + metadataKey?: string | null; nullable: boolean; }; SourceFieldType: { diff --git a/webui/src/routes/connections/ConfluentSchemaEditor.tsx b/webui/src/routes/connections/ConfluentSchemaEditor.tsx index 70f751f8e..23ff528f9 100644 --- a/webui/src/routes/connections/ConfluentSchemaEditor.tsx +++ b/webui/src/routes/connections/ConfluentSchemaEditor.tsx @@ -22,6 +22,7 @@ export function ConfluentSchemaEditor({ }) { let formatEl = null; + // @ts-ignore if (state.schema!.format!.protobuf !== undefined) { formatEl = ( diff --git a/webui/src/routes/pipelines/PipelineDetails.tsx b/webui/src/routes/pipelines/PipelineDetails.tsx index 470293af2..368e47615 100644 --- a/webui/src/routes/pipelines/PipelineDetails.tsx +++ b/webui/src/routes/pipelines/PipelineDetails.tsx @@ -53,7 +53,7 @@ import { vs2015 } from 'react-syntax-highlighter/dist/esm/styles/hljs'; import { useNavbar } from '../../App'; export function PipelineDetails() { - const [activeOperator, setActiveOperator] = useState(undefined); + const [activeOperator, setActiveOperator] = useState(undefined); const { isOpen: configModalOpen, onOpen: onConfigModalOpen, @@ -103,9 +103,9 @@ export function PipelineDetails() { } let operatorDetail = undefined; - if (activeOperator) { + if (activeOperator != undefined) { operatorDetail = ( - + ); } diff --git a/webui/src/routes/pipelines/PipelineGraph.tsx b/webui/src/routes/pipelines/PipelineGraph.tsx index aa12cd408..a566b5332 100644 --- a/webui/src/routes/pipelines/PipelineGraph.tsx +++ b/webui/src/routes/pipelines/PipelineGraph.tsx @@ -10,7 +10,7 @@ function PipelineGraphNode({ }: { data: { node: PipelineNode; - setActiveOperator: (op: string) => void; + setActiveOperator: (op: number) => void; isActive: boolean; operatorBackpressure: number; }; @@ -47,15 +47,15 @@ export function PipelineGraphViewer({ }: { graph: PipelineGraph; operatorMetricGroups?: OperatorMetricGroup[]; - setActiveOperator: (op: string) => void; - activeOperator?: string; + setActiveOperator: (node: number) => void; + activeOperator?: number; }) { const nodeTypes = useMemo(() => ({ pipelineNode: PipelineGraphNode }), []); const nodes = graph.nodes.map(node => { let backpressure = 0; if (operatorMetricGroups && operatorMetricGroups.length > 0) { - const operatorMetricGroup = operatorMetricGroups.find(o => o.operatorId == node.nodeId); + const operatorMetricGroup = operatorMetricGroups.find(o => o.nodeId == node.nodeId); if (operatorMetricGroup) { const metricGroups = operatorMetricGroup.metricGroups; const backpressureMetrics = metricGroups.find(m => m.name == 'backpressure'); @@ -64,12 +64,15 @@ export function PipelineGraphViewer({ } return { - id: node.nodeId, + id: String(node.nodeId), type: 'pipelineNode', data: { label: node.description, node: node, - setActiveOperator: setActiveOperator, + setActiveOperator: () => { + console.log(node); + return setActiveOperator(node.nodeId); + }, isActive: node.nodeId == activeOperator, operatorBackpressure: backpressure, }, @@ -87,8 +90,8 @@ export function PipelineGraphViewer({ const edges = graph.edges.map(edge => { return { id: `${edge.srcId}-${edge.destId}`, - source: edge.srcId, - target: edge.destId, + source: String(edge.srcId), + target: String(edge.destId), type: 'step', }; }); @@ -99,8 +102,8 @@ export function PipelineGraphViewer({ return {}; }); - nodes.forEach(node => g.setNode(node.id, node)); - edges.forEach(edge => g.setEdge(edge.source, edge.target)); + nodes.forEach(node => g.setNode(String(node.id), node)); + edges.forEach(edge => g.setEdge(String(edge.source), String(edge.target))); dagre.layout(g);