Skip to content

Commit

Permalink
feat(cubesql): Add filter flattening rule
Browse files Browse the repository at this point in the history
  • Loading branch information
mcheshkov committed Jan 28, 2025
1 parent 50bdbe7 commit a95fe52
Show file tree
Hide file tree
Showing 3 changed files with 256 additions and 5 deletions.
240 changes: 239 additions & 1 deletion rust/cubesql/cubesql/src/compile/rewrite/rules/wrapper/filter.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
compile::rewrite::{
cube_scan_wrapper, filter,
cube_scan_wrapper, filter, rewrite,
rewriter::{CubeEGraph, CubeRewrite},
rules::wrapper::WrapperRules,
subquery, transforming_rewrite, wrapped_select, wrapped_select_aggr_expr_empty_tail,
Expand Down Expand Up @@ -431,6 +431,244 @@ impl WrapperRules {
)]);
}

pub fn filter_merge_rules(&self, rules: &mut Vec<CubeRewrite>) {
rules.extend(vec![rewrite(
"wrapper-merge-filter-with-inner-wrapped-select",
// Input is not a finished wrapper_pullup_replacer, but WrappedSelect just before pullup
// After pullup replacer would disable push to cube, because any node on top would have WrappedSelect in `from`
// So there would be no CubeScan to push to
// Instead, this rule tries to catch `from` before pulling up, and merge outer Filter into inner WrappedSelect
filter(
"?filter_expr",
cube_scan_wrapper(
wrapped_select(
"WrappedSelectSelectType:Projection",
wrapper_pullup_replacer(
wrapped_select_projection_expr_empty_tail(),
wrapper_replacer_context(
"?alias_to_cube",
"WrapperReplacerContextPushToCube:true",
"WrapperReplacerContextInProjection:false",
"?cube_members",
"?grouped_subqueries",
"?ungrouped_scan",
),
),
wrapper_pullup_replacer(
wrapped_select_subqueries_empty_tail(),
wrapper_replacer_context(
"?alias_to_cube",
"WrapperReplacerContextPushToCube:true",
"WrapperReplacerContextInProjection:false",
"?cube_members",
"?grouped_subqueries",
"?ungrouped_scan",
),
),
wrapper_pullup_replacer(
wrapped_select_group_expr_empty_tail(),
wrapper_replacer_context(
"?alias_to_cube",
"WrapperReplacerContextPushToCube:true",
"WrapperReplacerContextInProjection:false",
"?cube_members",
"?grouped_subqueries",
"?ungrouped_scan",
),
),
wrapper_pullup_replacer(
wrapped_select_aggr_expr_empty_tail(),
wrapper_replacer_context(
"?alias_to_cube",
"WrapperReplacerContextPushToCube:true",
"WrapperReplacerContextInProjection:false",
"?cube_members",
"?grouped_subqueries",
"?ungrouped_scan",
),
),
wrapper_pullup_replacer(
wrapped_select_window_expr_empty_tail(),
wrapper_replacer_context(
"?alias_to_cube",
"WrapperReplacerContextPushToCube:true",
"WrapperReplacerContextInProjection:false",
"?cube_members",
"?grouped_subqueries",
"?ungrouped_scan",
),
),
wrapper_pullup_replacer(
"?inner_from",
wrapper_replacer_context(
"?alias_to_cube",
"WrapperReplacerContextPushToCube:true",
"WrapperReplacerContextInProjection:false",
"?cube_members",
"?grouped_subqueries",
"?ungrouped_scan",
),
),
wrapper_pullup_replacer(
"?inner_joins",
wrapper_replacer_context(
"?alias_to_cube",
"WrapperReplacerContextPushToCube:true",
"WrapperReplacerContextInProjection:false",
"?cube_members",
"?grouped_subqueries",
"?ungrouped_scan",
),
),
wrapper_pullup_replacer(
wrapped_select_filter_expr_empty_tail(),
wrapper_replacer_context(
"?alias_to_cube",
"WrapperReplacerContextPushToCube:true",
"WrapperReplacerContextInProjection:false",
"?cube_members",
"?grouped_subqueries",
"?ungrouped_scan",
),
),
wrapped_select_having_expr_empty_tail(),
// Inner must not have limit and offset, because they are not commutative with aggregation
"WrappedSelectLimit:None",
"WrappedSelectOffset:None",
wrapper_pullup_replacer(
wrapped_select_order_expr_empty_tail(),
wrapper_replacer_context(
"?alias_to_cube",
"WrapperReplacerContextPushToCube:true",
"WrapperReplacerContextInProjection:false",
"?cube_members",
"?grouped_subqueries",
"?ungrouped_scan",
),
),
"WrappedSelectAlias:None",
"WrappedSelectDistinct:false",
"WrappedSelectPushToCube:true",
"WrappedSelectUngroupedScan:true",
),
"CubeScanWrapperFinalized:false",
),
),
cube_scan_wrapper(
wrapped_select(
"WrappedSelectSelectType:Projection",
wrapper_pullup_replacer(
wrapped_select_projection_expr_empty_tail(),
wrapper_replacer_context(
"?alias_to_cube",
"WrapperReplacerContextPushToCube:true",
"WrapperReplacerContextInProjection:false",
"?cube_members",
"?grouped_subqueries",
"WrapperReplacerContextUngroupedScan:true",
),
),
wrapper_pullup_replacer(
wrapped_select_subqueries_empty_tail(),
wrapper_replacer_context(
"?alias_to_cube",
"WrapperReplacerContextPushToCube:true",
"WrapperReplacerContextInProjection:false",
"?cube_members",
"?grouped_subqueries",
"WrapperReplacerContextUngroupedScan:true",
),
),
wrapper_pullup_replacer(
wrapped_select_group_expr_empty_tail(),
wrapper_replacer_context(
"?alias_to_cube",
"WrapperReplacerContextPushToCube:true",
"WrapperReplacerContextInProjection:false",
"?cube_members",
"?grouped_subqueries",
"WrapperReplacerContextUngroupedScan:true",
),
),
wrapper_pullup_replacer(
wrapped_select_aggr_expr_empty_tail(),
wrapper_replacer_context(
"?alias_to_cube",
"WrapperReplacerContextPushToCube:true",
"WrapperReplacerContextInProjection:false",
"?cube_members",
"?grouped_subqueries",
"WrapperReplacerContextUngroupedScan:true",
),
),
wrapper_pullup_replacer(
wrapped_select_window_expr_empty_tail(),
wrapper_replacer_context(
"?alias_to_cube",
"WrapperReplacerContextPushToCube:true",
"WrapperReplacerContextInProjection:false",
"?cube_members",
"?grouped_subqueries",
"WrapperReplacerContextUngroupedScan:true",
),
),
wrapper_pullup_replacer(
"?inner_from",
wrapper_replacer_context(
"?alias_to_cube",
"WrapperReplacerContextPushToCube:true",
"WrapperReplacerContextInProjection:false",
"?cube_members",
"?grouped_subqueries",
"WrapperReplacerContextUngroupedScan:true",
),
),
wrapper_pullup_replacer(
"?inner_joins",
wrapper_replacer_context(
"?alias_to_cube",
"WrapperReplacerContextPushToCube:true",
"WrapperReplacerContextInProjection:false",
"?cube_members",
"?grouped_subqueries",
"WrapperReplacerContextUngroupedScan:true",
),
),
wrapper_pushdown_replacer(
"?filter_expr",
wrapper_replacer_context(
"?alias_to_cube",
"WrapperReplacerContextPushToCube:true",
"WrapperReplacerContextInProjection:false",
"?cube_members",
"?grouped_subqueries",
"WrapperReplacerContextUngroupedScan:true",
),
),
wrapped_select_having_expr_empty_tail(),
"WrappedSelectLimit:None",
"WrappedSelectOffset:None",
wrapper_pullup_replacer(
wrapped_select_order_expr_empty_tail(),
wrapper_replacer_context(
"?alias_to_cube",
"WrapperReplacerContextPushToCube:true",
"WrapperReplacerContextInProjection:false",
"?cube_members",
"?grouped_subqueries",
"WrapperReplacerContextUngroupedScan:true",
),
),
"WrappedSelectAlias:None",
"WrappedSelectDistinct:false",
"WrappedSelectPushToCube:true",
"WrappedSelectUngroupedScan:true",
),
"CubeScanWrapperFinalized:false",
),
)]);
}

fn transform_filter(
&self,
push_to_cube_var: &'static str,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ impl RewriteRules for WrapperRules {
self.limit_rules(&mut rules);
self.filter_rules(&mut rules);
self.filter_rules_subquery(&mut rules);
self.filter_merge_rules(&mut rules);
self.subquery_rules(&mut rules);
self.order_rules(&mut rules);
self.window_rules(&mut rules);
Expand Down
20 changes: 16 additions & 4 deletions rust/cubesql/cubesql/src/compile/test/test_cube_join_grouped.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,10 +387,8 @@ JOIN
GROUP BY customer_gender
ORDER BY mme_inner__ DESC
LIMIT 20) AS anon_1 ON customer_gender = customer_gender__
-- filters here are not supported without filter flattening in wrapper
-- TODO enable it when ready
-- WHERE order_date >= TO_TIMESTAMP('2022-09-16 00:00:00.000000', 'YYYY-MM-DD HH24:MI:SS.US')
-- AND order_date < TO_TIMESTAMP('2024-09-16 00:00:00.000000', 'YYYY-MM-DD HH24:MI:SS.US')
WHERE order_date >= TO_TIMESTAMP('2022-09-16 00:00:00.000000', 'YYYY-MM-DD HH24:MI:SS.US')
AND order_date < TO_TIMESTAMP('2024-09-16 00:00:00.000000', 'YYYY-MM-DD HH24:MI:SS.US')
GROUP BY DATE_TRUNC('week', order_date)
ORDER BY avgPrice DESC
LIMIT 1000
Expand Down Expand Up @@ -424,6 +422,7 @@ LIMIT 1000
let subquery = &wrapped_sql_node.request.subquery_joins.unwrap()[0];

assert!(!subquery.sql.contains("ungrouped"));
// Inner order
let re = Regex::new(
r#""order":\s*\[\s*\[\s*"KibanaSampleDataEcommerce.avgPrice",\s*"desc"\s*\]\s*\]"#,
)
Expand All @@ -435,6 +434,19 @@ LIMIT 1000
r#"${KibanaSampleDataEcommerce.customer_gender} = \"anon_1\".\"customer_gender_\""#
));

// Outer filter
assert_eq!(wrapped_sql_node.request.segments.as_ref().unwrap().len(), 1);
assert!(
wrapped_sql_node.request.segments.as_ref().unwrap()[0].contains(
r#"${KibanaSampleDataEcommerce.order_date} >= timestamptz '2022-09-16T00:00:00.000Z'"#
)
);
assert!(
wrapped_sql_node.request.segments.as_ref().unwrap()[0].contains(
r#"${KibanaSampleDataEcommerce.order_date} < timestamptz '2024-09-16T00:00:00.000Z'"#
)
);

// Measure from top aggregation
assert!(wrapped_sql_node
.wrapped_sql
Expand Down

0 comments on commit a95fe52

Please sign in to comment.