Skip to content

Commit

Permalink
Metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde committed Dec 4, 2024
1 parent cbd03c2 commit ff3352e
Show file tree
Hide file tree
Showing 14 changed files with 96 additions and 76 deletions.
17 changes: 6 additions & 11 deletions crates/arroyo-controller/src/job_controller/job_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub fn get_metric_name(name: &str) -> Option<MetricName> {

#[derive(Copy, Clone, Eq, PartialEq, Hash)]
pub struct TaskKey {
pub operator_id: u32,
pub node_id: u32,
pub subtask_idx: u32,
}

Expand All @@ -46,7 +46,7 @@ impl JobMetrics {
for i in 0..program.graph[op].parallelism {
tasks.insert(
TaskKey {
operator_id: op.index() as u32,
node_id: op.index() as u32,
subtask_idx: i as u32,
},
TaskMetrics::new(),
Expand All @@ -60,23 +60,18 @@ impl JobMetrics {
}
}

pub async fn update(
&self,
operator_id: u32,
subtask_idx: u32,
values: &HashMap<MetricName, u64>,
) {
pub async fn update(&self, node_id: u32, subtask_idx: u32, values: &HashMap<MetricName, u64>) {
let now = SystemTime::now();

let key = TaskKey {
operator_id,
node_id,
subtask_idx,
};
let mut tasks = self.tasks.write().await;
let Some(task) = tasks.get_mut(&key) else {
warn!(
"Task not found for operator_id: {}, subtask_idx: {}",
operator_id, subtask_idx
node_id, subtask_idx
);
return;
};
Expand Down Expand Up @@ -106,7 +101,7 @@ impl JobMetrics {
HashMap::new();

for (k, v) in self.tasks.read().await.iter() {
let op = metric_groups.entry(k.operator_id).or_default();
let op = metric_groups.entry(k.node_id).or_default();

for (metric, rate) in &v.rates {
op.entry(*metric).or_default().push(SubtaskMetrics {
Expand Down
6 changes: 3 additions & 3 deletions crates/arroyo-metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use std::collections::HashMap;
use std::sync::{Arc, OnceLock, RwLock};

use arroyo_types::{
ChainInfo, TaskInfo, BATCHES_RECV, BATCHES_SENT, BYTES_RECV, BYTES_SENT,
DESERIALIZATION_ERRORS, MESSAGES_RECV, MESSAGES_SENT,
ChainInfo, BATCHES_RECV, BATCHES_SENT, BYTES_RECV, BYTES_SENT, DESERIALIZATION_ERRORS,
MESSAGES_RECV, MESSAGES_SENT,
};
use lazy_static::lazy_static;
use prometheus::{
Expand Down Expand Up @@ -42,7 +42,7 @@ pub fn histogram_for_task(

lazy_static! {
pub static ref TASK_METRIC_LABELS: Vec<&'static str> =
vec!["operator_id", "subtask_idx", "operator_name"];
vec!["node_id", "subtask_idx", "operator_name"];
pub static ref MESSAGE_RECV_COUNTER: IntCounterVec = register_int_counter_vec!(
MESSAGES_RECV,
"Count of messages received by this subtask",
Expand Down
48 changes: 32 additions & 16 deletions crates/arroyo-operator/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -545,24 +545,40 @@ impl ChainedOperator {
control_message: &ControlMessage,
shutdown_after_commit: bool,
) -> bool {
for (op, ctx) in self.iter_mut() {
match control_message {
ControlMessage::Checkpoint(_) => {
error!("shouldn't receive checkpoint")
}
ControlMessage::Stop { .. } => {
error!("shouldn't receive stop")
}
ControlMessage::Commit { epoch, commit_data } => {
op.handle_commit(*epoch, &commit_data, ctx).await;
return shutdown_after_commit;
}
ControlMessage::LoadCompacted { compacted } => {
ctx.load_compacted(compacted).await;
}
ControlMessage::NoOp => {}
match control_message {
ControlMessage::Checkpoint(_) => {
error!("shouldn't receive checkpoint")
}
ControlMessage::Stop { .. } => {
error!("shouldn't receive stop")
}
ControlMessage::Commit { epoch, commit_data } => {
assert!(
self.next.is_none(),
"can only commit sinks, which cannot be chained"
);
self.operator
.handle_commit(*epoch, &commit_data, &mut self.context)
.await;
return shutdown_after_commit;
}
ControlMessage::LoadCompacted { compacted } => {
self.iter_mut()
.find(|(_, ctx)| ctx.task_info.operator_id == compacted.operator_id)
.unwrap_or_else(|| {
panic!(
"could not load compacted data for unknown operator '{}'",
compacted.operator_id
)
})
.1
.load_compacted(compacted)
.await;
}
ControlMessage::NoOp => {}
}

for (op, ctx) in self.iter_mut() {}
false
}

Expand Down
5 changes: 2 additions & 3 deletions crates/arroyo-state/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@ use lazy_static::lazy_static;
use prometheus::{register_gauge_vec, GaugeVec};

lazy_static! {
pub static ref WORKER_LABELS_NAMES: Vec<&'static str> = vec!["operator_id", "task_id"];
pub static ref WORKER_LABELS_NAMES: Vec<&'static str> = vec!["node_id", "task_id"];
pub static ref CURRENT_FILES_GAUGE: GaugeVec = register_gauge_vec!(
"arroyo_worker_current_files",
"Number of parquet files in the checkpoint",
&WORKER_LABELS_NAMES
)
.unwrap();
pub static ref TABLE_LABELS_NAMES: Vec<&'static str> =
vec!["operator_id", "task_id", "table_char"];
pub static ref TABLE_LABELS_NAMES: Vec<&'static str> = vec!["node_id", "task_id", "table_char"];
pub static ref TABLE_SIZE_GAUGE: GaugeVec = register_gauge_vec!(
"arroyo_worker_table_size_keys",
"Number of keys in the table",
Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,7 @@ impl WorkerGrpc for WorkerServer {
}
}

return Ok(Response::new(LoadCompactedDataRes {}));
Ok(Response::new(LoadCompactedDataRes {}))
}

async fn stop_execution(
Expand Down
4 changes: 2 additions & 2 deletions webui/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"preview": "vite preview",
"format": "npx prettier --write src/ && npx eslint --fix --ext .js,.jsx,.ts,.tsx src",
"check": "npx prettier --check src/ && npx eslint --ext .js,.jsx,.ts,.tsx src",
"openapi": "cargo build --package arroyo-openapi && npx openapi-typescript $(pwd)/../target/api-spec.json --output $(pwd)/src/gen/api-types.ts"
"openapi": "cargo build --package arroyo-openapi && pnpm exec openapi-typescript $(pwd)/../target/api-spec.json --output $(pwd)/src/gen/api-types.ts"
},
"dependencies": {
"@babel/core": "^7.26.0",
Expand Down Expand Up @@ -47,7 +47,7 @@
"monaco-editor": "^0.34.1",
"monaco-sql-languages": "^0.9.5",
"openapi-fetch": "^0.6.2",
"openapi-typescript": "^6.7.6",
"openapi-typescript": "=6.2.8",
"prop-types": "^15.8.1",
"react": "^18.3.1",
"react-dom": "^18.3.1",
Expand Down
10 changes: 5 additions & 5 deletions webui/pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 0 additions & 6 deletions webui/post

This file was deleted.

6 changes: 0 additions & 6 deletions webui/pre

This file was deleted.

23 changes: 18 additions & 5 deletions webui/src/components/OperatorDetail.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ import { components } from '../gen/api-types';
export interface OperatorDetailProps {
pipelineId: string;
jobId: string;
operatorId: string;
nodeId: number;
}

const OperatorDetail: React.FC<OperatorDetailProps> = ({ pipelineId, jobId, operatorId }) => {
const OperatorDetail: React.FC<OperatorDetailProps> = ({ pipelineId, jobId, nodeId }) => {
const { pipeline } = usePipeline(pipelineId);
const { operatorMetricGroups, operatorMetricGroupsLoading, operatorMetricGroupsError } =
useJobMetrics(pipelineId, jobId);
Expand All @@ -39,8 +39,8 @@ const OperatorDetail: React.FC<OperatorDetailProps> = ({ pipelineId, jobId, oper
return <Loading size={'lg'} />;
}

const node = pipeline.graph.nodes.find(n => n.nodeId == operatorId);
const operatorMetricGroup = operatorMetricGroups.find(o => o.operatorId == operatorId);
const node = pipeline.graph.nodes.find(n => n.nodeId == nodeId);
const operatorMetricGroup = operatorMetricGroups.find(o => o.nodeId == nodeId);

if (!operatorMetricGroup) {
return <Loading size={'lg'} />;
Expand Down Expand Up @@ -107,7 +107,20 @@ const OperatorDetail: React.FC<OperatorDetailProps> = ({ pipelineId, jobId, oper
</Box>
</HStack>
<Box marginTop="10px">Backpressure: {backpressureBadge}</Box>
<Box marginTop="10px">{node?.operator}</Box>
<Box marginTop="10px">
<Box
display={'inline-block'}
border={'1px solid #aaa'}
px={1.5}
fontSize={12}
rounded={'full'}
mr={2}
title={'ID of this node'}
>
{node?.nodeId}
</Box>
{node?.description}
</Box>
<Box marginTop="10px" fontFamily="monaco,ubuntu mono,fixed-width">
<Code>{Math.round(msgRecv)} eps</Code> rx
<Code marginLeft="20px">{Math.round(msgSent)} eps</Code> tx
Expand Down
15 changes: 10 additions & 5 deletions webui/src/gen/api-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ export interface components {
createdAt: number;
definition: string;
description?: string | null;
dylibUrl: string;
dylibUrl?: string | null;
id: string;
language: components["schemas"]["UdfLanguage"];
name: string;
Expand Down Expand Up @@ -355,7 +355,8 @@ export interface components {
};
OperatorMetricGroup: {
metricGroups: (components["schemas"]["MetricGroup"])[];
operatorId: string;
/** Format: int32 */
nodeId: number;
};
OperatorMetricGroupCollection: {
data: (components["schemas"]["OperatorMetricGroup"])[];
Expand Down Expand Up @@ -396,10 +397,12 @@ export interface components {
hasMore: boolean;
};
PipelineEdge: {
destId: string;
/** Format: int32 */
destId: number;
edgeType: string;
keyType: string;
srcId: string;
/** Format: int32 */
srcId: number;
valueType: string;
};
PipelineGraph: {
Expand All @@ -408,7 +411,8 @@ export interface components {
};
PipelineNode: {
description: string;
nodeId: string;
/** Format: int32 */
nodeId: number;
operator: string;
/** Format: int32 */
parallelism: number;
Expand Down Expand Up @@ -469,6 +473,7 @@ export interface components {
SourceField: {
fieldName: string;
fieldType: components["schemas"]["SourceFieldType"];
metadataKey?: string | null;
nullable: boolean;
};
SourceFieldType: {
Expand Down
1 change: 1 addition & 0 deletions webui/src/routes/connections/ConfluentSchemaEditor.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ export function ConfluentSchemaEditor({
}) {
let formatEl = null;

// @ts-ignore
if (state.schema!.format!.protobuf !== undefined) {
formatEl = (
<Box maxW={'lg'}>
Expand Down
6 changes: 3 additions & 3 deletions webui/src/routes/pipelines/PipelineDetails.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ import { vs2015 } from 'react-syntax-highlighter/dist/esm/styles/hljs';
import { useNavbar } from '../../App';

export function PipelineDetails() {
const [activeOperator, setActiveOperator] = useState<string | undefined>(undefined);
const [activeOperator, setActiveOperator] = useState<number | undefined>(undefined);
const {
isOpen: configModalOpen,
onOpen: onConfigModalOpen,
Expand Down Expand Up @@ -103,9 +103,9 @@ export function PipelineDetails() {
}

let operatorDetail = undefined;
if (activeOperator) {
if (activeOperator != undefined) {
operatorDetail = (
<OperatorDetail pipelineId={pipeline.id} jobId={job.id} operatorId={activeOperator} />
<OperatorDetail pipelineId={pipeline.id} jobId={job.id} nodeId={activeOperator} />
);
}

Expand Down
Loading

0 comments on commit ff3352e

Please sign in to comment.