Skip to content

Commit

Permalink
Add support for custom TTLs (#741)
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde authored Sep 16, 2024
1 parent 3fb69df commit 4f8dc2e
Show file tree
Hide file tree
Showing 10 changed files with 158 additions and 20 deletions.
4 changes: 4 additions & 0 deletions crates/arroyo-planner/src/extension/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ use datafusion_proto::generated::datafusion::PhysicalPlanNode;
use datafusion_proto::physical_plan::AsExecutionPlan;
use prost::Message;
use std::sync::Arc;
use std::time::Duration;

pub(crate) const JOIN_NODE_NAME: &str = "JoinNode";

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct JoinExtension {
pub(crate) rewritten_join: LogicalPlan,
pub(crate) is_instant: bool,
pub(crate) ttl: Option<Duration>,
}

impl ArroyoExtension for JoinExtension {
Expand Down Expand Up @@ -55,6 +57,7 @@ impl ArroyoExtension for JoinExtension {
right_schema: Some(right_schema.as_ref().clone().into()),
output_schema: Some(self.output_schema().into()),
join_plan: physical_plan_node.encode_to_vec(),
ttl_micros: self.ttl.map(|t| t.as_micros() as u64),
};

let logical_node = LogicalNode {
Expand Down Expand Up @@ -105,6 +108,7 @@ impl UserDefinedLogicalNodeCore for JoinExtension {
Ok(Self {
rewritten_join: inputs[0].clone(),
is_instant: self.is_instant,
ttl: self.ttl,
})
}
}
10 changes: 8 additions & 2 deletions crates/arroyo-planner/src/extension/updating_aggregate.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::sync::Arc;

use arrow_schema::{DataType, Field, Schema, TimeUnit};
use arroyo_datastream::logical::{LogicalEdge, LogicalEdgeType, LogicalNode, OperatorName};
use arroyo_rpc::{df::ArroyoSchema, grpc::api::UpdatingAggregateOperator, TIMESTAMP_FIELD};
use datafusion::common::{plan_err, DFSchemaRef, Result, TableReference};
use datafusion::logical_expr::{Extension, LogicalPlan, UserDefinedLogicalNodeCore};
use datafusion_proto::protobuf::{physical_plan_node::PhysicalPlanType, PhysicalPlanNode};
use std::sync::Arc;
use std::time::Duration;

use crate::builder::{NamedNode, SplitPlanOutput};

Expand All @@ -21,13 +21,15 @@ pub(crate) struct UpdatingAggregateExtension {
pub(crate) key_fields: Vec<usize>,
pub(crate) final_calculation: LogicalPlan,
pub(crate) timestamp_qualifier: Option<TableReference>,
pub(crate) ttl: Duration,
}

impl UpdatingAggregateExtension {
pub fn new(
aggregate: LogicalPlan,
key_fields: Vec<usize>,
timestamp_qualifier: Option<TableReference>,
ttl: Duration,
) -> Self {
let final_calculation = LogicalPlan::Extension(Extension {
node: Arc::new(IsRetractExtension::new(
Expand All @@ -40,6 +42,7 @@ impl UpdatingAggregateExtension {
key_fields,
final_calculation,
timestamp_qualifier,
ttl,
}
}
}
Expand Down Expand Up @@ -74,6 +77,7 @@ impl UserDefinedLogicalNodeCore for UpdatingAggregateExtension {
inputs[0].clone(),
self.key_fields.clone(),
self.timestamp_qualifier.clone(),
self.ttl,
))
}
}
Expand Down Expand Up @@ -164,7 +168,9 @@ impl ArroyoExtension for UpdatingAggregateExtension {
.pipeline
.update_aggregate_flush_interval
.as_micros() as u64,
ttl_micros: self.ttl.as_micros() as u64,
};

let node = LogicalNode {
operator_id: format!("updating_aggregate_{}", index),
description: "UpdatingAggregate".to_string(),
Expand Down
69 changes: 68 additions & 1 deletion crates/arroyo-planner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use datafusion::prelude::{create_udf, SessionConfig};

use datafusion::sql::sqlparser::dialect::PostgreSqlDialect;
use datafusion::sql::sqlparser::parser::Parser;
use datafusion::sql::{planner::ContextProvider, TableReference};
use datafusion::sql::{planner::ContextProvider, sqlparser, TableReference};

use datafusion::logical_expr::expr::ScalarFunction;
use datafusion::logical_expr::{
Expand All @@ -58,6 +58,7 @@ use crate::json::register_json_functions;
use crate::rewriters::{SourceMetadataVisitor, TimeWindowUdfChecker, UnnestRewriter};

use crate::udafs::EmptyUdaf;
use arrow::compute::kernels::cast_utils::parse_interval_day_time;
use arroyo_datastream::logical::LogicalProgram;
use arroyo_operator::connector::Connection;
use arroyo_rpc::df::ArroyoSchema;
Expand All @@ -71,6 +72,7 @@ use datafusion::execution::FunctionRegistry;
use datafusion::logical_expr;
use datafusion::logical_expr::expr_rewriter::FunctionRewrite;
use datafusion::logical_expr::planner::ExprPlanner;
use datafusion::sql::sqlparser::ast::{OneOrManyWithParens, Statement};
use std::time::{Duration, SystemTime};
use std::{collections::HashMap, sync::Arc};
use syn::Item;
Expand All @@ -86,6 +88,19 @@ pub struct CompiledSql {
pub connection_ids: Vec<i64>,
}

#[derive(Clone)]
pub struct PlanningOptions {
ttl: Duration,
}

impl Default for PlanningOptions {
fn default() -> Self {
Self {
ttl: Duration::from_secs(24 * 60 * 60),
}
}
}

#[derive(Clone, Default)]
pub struct ArroyoSchemaProvider {
pub source_defs: HashMap<String, String>,
Expand All @@ -100,6 +115,7 @@ pub struct ArroyoSchemaProvider {
pub python_udfs: HashMap<String, PythonUdfConfig>,
pub function_rewriters: Vec<Arc<dyn FunctionRewrite + Send + Sync>>,
pub expr_planners: Vec<Arc<dyn ExprPlanner>>,
pub planning_options: PlanningOptions,
}

pub fn register_functions(registry: &mut dyn FunctionRegistry) {
Expand Down Expand Up @@ -572,6 +588,53 @@ pub fn rewrite_plan(
Ok(rewritten_plan.data)
}

fn try_handle_set_variable(
statement: &Statement,
schema_provider: &mut ArroyoSchemaProvider,
) -> Result<bool> {
if let Statement::SetVariable {
variables, value, ..
} = statement
{
let OneOrManyWithParens::One(opt) = variables else {
return plan_err!("invalid syntax for `SET` call");
};

if opt.to_string() != "updating_ttl" {
return plan_err!(
"invalid option '{}'; supported options are 'updating_ttl'",
opt
);
}

if value.len() != 1 {
return plan_err!("invalid `SET updating_ttl` call; expected exactly one expression");
}

let sqlparser::ast::Expr::Value(sqlparser::ast::Value::SingleQuotedString(s)) =
value.first().unwrap()
else {
return plan_err!(
"invalid `SET updating_ttl`; expected a singly-quoted string argument"
);
};

let interval = parse_interval_day_time(s).map_err(|_| {
DataFusionError::Plan(format!(
"could not parse '{}' as an interval in `SET updating_ttl` statement",
s
))
})?;

schema_provider.planning_options.ttl =
Duration::from_secs(interval.days as u64 * 24 * 60 * 60)
+ Duration::from_millis(interval.milliseconds as u64);
return Ok(true);
}

Ok(false)
}

pub async fn parse_and_get_arrow_program(
query: String,
mut schema_provider: ArroyoSchemaProvider,
Expand All @@ -593,6 +656,10 @@ pub async fn parse_and_get_arrow_program(

let mut inserts = vec![];
for statement in Parser::parse_sql(&dialect, &query)? {
if try_handle_set_variable(&statement, &mut schema_provider)? {
continue;
}

if let Some(table) =
Table::try_from_statement(&statement, &schema_provider, &session_state)?
{
Expand Down
22 changes: 15 additions & 7 deletions crates/arroyo-planner/src/plan/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use crate::extension::key_calculation::KeyCalculationExtension;
use crate::extension::updating_aggregate::UpdatingAggregateExtension;
use crate::plan::WindowDetectingVisitor;
use crate::{
fields_with_qualifiers, find_window, schema_from_df_fields_with_metadata, DFField,
WindowBehavior,
fields_with_qualifiers, find_window, schema_from_df_fields_with_metadata, ArroyoSchemaProvider,
DFField, WindowBehavior,
};
use arroyo_rpc::{IS_RETRACT_FIELD, TIMESTAMP_FIELD};
use datafusion::common::tree_node::{Transformed, TreeNodeRewriter};
Expand All @@ -15,16 +15,18 @@ use datafusion::logical_expr::{aggregate_function, Aggregate, Expr, Extension, L
use std::sync::Arc;
use tracing::debug;

#[derive(Debug, Default)]
pub struct AggregateRewriter {}
pub struct AggregateRewriter<'a> {
pub schema_provider: &'a ArroyoSchemaProvider,
}

impl AggregateRewriter {
impl<'a> AggregateRewriter<'a> {
pub fn rewrite_non_windowed_aggregate(
input: Arc<LogicalPlan>,
mut key_fields: Vec<DFField>,
group_expr: Vec<Expr>,
mut aggr_expr: Vec<Expr>,
schema: Arc<DFSchema>,
schema_provider: &ArroyoSchemaProvider,
) -> Result<Transformed<LogicalPlan>> {
if input
.schema()
Expand Down Expand Up @@ -112,6 +114,7 @@ impl AggregateRewriter {
LogicalPlan::Aggregate(aggregate),
(0..key_count).collect(),
column.relation,
schema_provider.planning_options.ttl,
);
let final_plan = LogicalPlan::Extension(Extension {
node: Arc::new(updating_aggregate_extension),
Expand All @@ -120,7 +123,7 @@ impl AggregateRewriter {
}
}

impl TreeNodeRewriter for AggregateRewriter {
impl<'a> TreeNodeRewriter for AggregateRewriter<'a> {
type Node = LogicalPlan;

fn f_up(&mut self, node: Self::Node) -> Result<Transformed<Self::Node>> {
Expand Down Expand Up @@ -218,7 +221,12 @@ impl TreeNodeRewriter for AggregateRewriter {
}
(false, false) => {
return Self::rewrite_non_windowed_aggregate(
input, key_fields, group_expr, aggr_expr, schema,
input,
key_fields,
group_expr,
aggr_expr,
schema,
self.schema_provider,
);
}
};
Expand Down
12 changes: 8 additions & 4 deletions crates/arroyo-planner/src/plan/join.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::extension::join::JoinExtension;
use crate::extension::key_calculation::KeyCalculationExtension;
use crate::plan::WindowDetectingVisitor;
use crate::{fields_with_qualifiers, schema_from_df_fields_with_metadata};
use crate::{fields_with_qualifiers, schema_from_df_fields_with_metadata, ArroyoSchemaProvider};
use arroyo_datastream::WindowType;
use arroyo_rpc::IS_RETRACT_FIELD;
use datafusion::common::tree_node::{Transformed, TreeNodeRewriter};
Expand All @@ -17,9 +17,11 @@ use datafusion::logical_expr::{
use datafusion::prelude::coalesce;
use std::sync::Arc;

pub(crate) struct JoinRewriter {}
pub(crate) struct JoinRewriter<'a> {
pub schema_provider: &'a ArroyoSchemaProvider,
}

impl JoinRewriter {
impl<'a> JoinRewriter<'a> {
fn check_join_windowing(join: &Join) -> Result<bool> {
let left_window = WindowDetectingVisitor::get_window(&join.left)?;
let right_window = WindowDetectingVisitor::get_window(&join.right)?;
Expand Down Expand Up @@ -187,7 +189,7 @@ impl JoinRewriter {
}
}

impl TreeNodeRewriter for JoinRewriter {
impl<'a> TreeNodeRewriter for JoinRewriter<'a> {
type Node = LogicalPlan;

fn f_up(&mut self, node: Self::Node) -> Result<Transformed<Self::Node>> {
Expand Down Expand Up @@ -240,6 +242,8 @@ impl TreeNodeRewriter for JoinRewriter {
let join_extension = JoinExtension {
rewritten_join: final_logical_plan,
is_instant,
// only non-instant (updating) joins have a TTL
ttl: (!is_instant).then_some(self.schema_provider.planning_options.ttl),
};

Ok(Transformed::yes(LogicalPlan::Extension(Extension {
Expand Down
10 changes: 8 additions & 2 deletions crates/arroyo-planner/src/plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,10 +306,16 @@ impl<'a> TreeNodeRewriter for ArroyoRewriter<'a> {
return AsyncUdfRewriter::new(self.schema_provider).f_up(node);
}
LogicalPlan::Aggregate(aggregate) => {
return AggregateRewriter {}.f_up(LogicalPlan::Aggregate(aggregate));
return AggregateRewriter {
schema_provider: self.schema_provider,
}
.f_up(LogicalPlan::Aggregate(aggregate));
}
LogicalPlan::Join(join) => {
return JoinRewriter {}.f_up(LogicalPlan::Join(join));
return JoinRewriter {
schema_provider: self.schema_provider,
}
.f_up(LogicalPlan::Join(join));
}
LogicalPlan::TableScan(table_scan) => {
return SourceRewriter {
Expand Down
19 changes: 19 additions & 0 deletions crates/arroyo-planner/src/test/queries/custom_ttls.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
CREATE TABLE mastodon (
value TEXT
) WITH (
connector = 'sse',
format = 'raw_string',
endpoint = 'http://mastodon.arroyo.dev/api/v1/streaming/public',
events = 'update'
);

CREATE VIEW tags AS (
SELECT tag FROM (
SELECT extract_json_string(value, '$.tags[*].name') AS tag
FROM mastodon)
WHERE tag is not null
);

set updating_ttl = '5 seconds';

select count(distinct tag) from tags;
2 changes: 2 additions & 0 deletions crates/arroyo-rpc/proto/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ message JoinOperator {
ArroyoSchema right_schema = 3;
ArroyoSchema output_schema = 4;
bytes join_plan = 5;
optional uint64 ttl_micros = 6;
}

message WindowFunctionOperator {
Expand Down Expand Up @@ -100,6 +101,7 @@ message UpdatingAggregateOperator {
bytes combine_plan = 6;
bytes final_aggregation_plan = 7;
uint64 flush_interval_micros = 8;
uint64 ttl_micros = 9;
}

message WasmUdfs {
Expand Down
16 changes: 14 additions & 2 deletions crates/arroyo-worker/src/arrow/join_with_expiration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use datafusion::physical_plan::ExecutionPlan;
use datafusion_proto::{physical_plan::AsExecutionPlan, protobuf::PhysicalPlanNode};
use futures::StreamExt;
use prost::Message;
use tracing::warn;

pub struct JoinWithExpiration {
left_expiration: Duration,
Expand Down Expand Up @@ -217,9 +218,20 @@ impl OperatorConstructor for JoinWithExpirationConstructor {
let left_schema = left_input_schema.schema_without_keys()?;
let right_schema = right_input_schema.schema_without_keys()?;

let mut ttl = Duration::from_micros(
config
.ttl_micros
.expect("ttl must be set for non-instant join"),
);

if ttl == Duration::ZERO {
warn!("TTL was not set for join with expiration");
ttl = Duration::from_secs(24 * 60 * 60);
}

Ok(OperatorNode::from_operator(Box::new(JoinWithExpiration {
left_expiration: Duration::from_secs(3600),
right_expiration: Duration::from_secs(3600),
left_expiration: ttl,
right_expiration: ttl,
left_input_schema,
right_input_schema,
left_schema,
Expand Down
Loading

0 comments on commit 4f8dc2e

Please sign in to comment.