Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Broker errors when processing the groupBy subquery #17598

Open
soullkk opened this issue Dec 31, 2024 · 2 comments
Open

Broker errors when processing the groupBy subquery #17598

soullkk opened this issue Dec 31, 2024 · 2 comments

Comments

@soullkk
Copy link
Contributor

soullkk commented Dec 31, 2024

Broker errors when processing the groupBy subquery.

Affected Version

Apache Druid 28.0.1

Description

Please include as much detailed information about the problem as possible.

  • Cluster size
    3 nodes in cluster.
  • Configurations in use
    NA.
  • Steps to reproduce the problem
  1. The datasource structure:
name type type
ts long timestamp
device_mac string dimension
device_name string dimension
status long measure max
x_axis double measure sum
y_axis double measure sum
area_id string dimension
area_name string dimension
site_id string dimension
region_level_one string dimension
region_level_two string dimension
region_level_three string dimension
region_level_four string dimension
region_level_five string dimension
region_level_six string dimension
region_level_seven string dimension
region_level_eight string dimension
region_name string dimension
tenant_id string dimension
res_id string dimension
  1. The data as follows:
{"ts":1735012987481,"device_mac":"10-c3-ab-2a-f7-50","device_name":"2102353KCNW0LA000284","area_id":"","area_name":"","site_id":"68ee80a3-6bff-4ee6-8157-21f88f1da20d","region_level_one":"","region_level_two":"","region_level_three":"","region_level_four":"","region_level_five":"","region_level_six":"","region_level_seven":"2843a7a0-93c2-47a1-9707-9d910707fd30","region_level_eight":"901bb356-7188-45b9-9d5f-2484122314f7","region_name":"","tenant_id":"default-organization-id","res_id":"3d7557f2-97de-4567-b61b-b5e15d005aac","count":1,"status":1,"x_axis":0.0,"y_axis":0.0}
{"ts":1735012987483,"device_mac":"10-c3-ab-2a-f7-50","device_name":"2102353KCNW0LA000284","area_id":"","area_name":"","site_id":"68ee80a3-6bff-4ee6-8157-21f88f1da20d","region_level_one":"","region_level_two":"","region_level_three":"","region_level_four":"","region_level_five":"","region_level_six":"","region_level_seven":"2843a7a0-93c2-47a1-9707-9d910707fd30","region_level_eight":"901bb356-7188-45b9-9d5f-2484122314f7","region_name":"","tenant_id":"default-organization-id","res_id":"3d7557f2-97de-4567-b61b-b5e15d005aac","count":6,"status":1,"x_axis":0.0,"y_axis":0.0}
{"ts":1735013047011,"device_mac":"1c-20-db-c9-07-20","device_name":"21500829322SK9604364","area_id":"","area_name":"","site_id":"68ee80a3-6bff-4ee6-8157-21f88f1da20d","region_level_one":"","region_level_two":"","region_level_three":"","region_level_four":"","region_level_five":"","region_level_six":"","region_level_seven":"2843a7a0-93c2-47a1-9707-9d910707fd30","region_level_eight":"901bb356-7188-45b9-9d5f-2484122314f7","region_name":"","tenant_id":"default-organization-id","res_id":"b97dfe70-f742-477a-93c1-dc1690d38063","count":1,"status":0,"x_axis":0.0,"y_axis":0.0}
{"ts":1735013047012,"device_mac":"1c-20-db-c9-07-20","device_name":"21500829322SK9604364","area_id":"","area_name":"","site_id":"68ee80a3-6bff-4ee6-8157-21f88f1da20d","region_level_one":"","region_level_two":"","region_level_three":"","region_level_four":"","region_level_five":"","region_level_six":"","region_level_seven":"2843a7a0-93c2-47a1-9707-9d910707fd30","region_level_eight":"901bb356-7188-45b9-9d5f-2484122314f7","region_name":"","tenant_id":"default-organization-id","res_id":"b97dfe70-f742-477a-93c1-dc1690d38063","count":1,"status":0,"x_axis":0.0,"y_axis":0.0}
{"ts":1735013047013,"device_mac":"1c-20-db-c9-07-20","device_name":"21500829322SK9604364","area_id":"","area_name":"","site_id":"68ee80a3-6bff-4ee6-8157-21f88f1da20d","region_level_one":"","region_level_two":"","region_level_three":"","region_level_four":"","region_level_five":"","region_level_six":"","region_level_seven":"2843a7a0-93c2-47a1-9707-9d910707fd30","region_level_eight":"901bb356-7188-45b9-9d5f-2484122314f7","region_name":"","tenant_id":"default-organization-id","res_id":"b97dfe70-f742-477a-93c1-dc1690d38063","count":6,"status":0,"x_axis":0.0,"y_axis":0.0}

  1. The native query context as follows:
{
    "queryType": "groupBy",
    "dataSource": {
        "type": "query",
        "query": {
            "queryType": "groupBy",
            "dataSource": "ODAEDATASET__DEFAULT_ci_campus_isac_csi_awareness__DEFAULT",
            "granularity": "all",
            "dimensions": [{
                    "type": "default",
                    "dimension": "device_mac",
                    "outputName": "device_mac",
                    "outputType": "STRING"
                }
            ],
            "limitSpec": {
                "type": "default"
            },
            "filter": {
                "type": "and",
                "fields": [{
                        "type": "selector",
                        "dimension": "tenant_id",
                        "value": "default-organization-id"
                    }, {
                        "type": "not",
                        "field": {
                            "type": "selector",
                            "dimension": "site_id",
                            "value": None
                        }
                    }
                ]
            },
            "aggregations": [{
                    "type": "longLast",
                    "name": "status",
                    "fieldName": "status"
                }
            ],
            "intervals": ["2024-12-24T04:00:00.000Z/2024-12-24T05:00:00.001Z"]
        }
    },
    "granularity": "all",
    "dimensions": [],
    "limitSpec": {
        "type": "default"
    },
    "aggregations": [{
            "type": "longSum",
            "name": "aCount",
            "expression": "case_searched((\"status\" == 1),1,0)"
        }
    ],
    "intervals": ["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"],
    "context": {
        "timeout": 30000,
        "maxRowsQueuedForOrdering": 43796
    }
}
  • The error message or stack traces encountered. Providing more context, such as nearby log messages or even entire logs, can be helpful.

subQuery result:

[{
        "version": "v1",
        "timestamp": "2024-12-24T04:00:00.000Z",
        "event": {
            "device_mac": "10-c3-ab-2a-f7-50",
            "status": 1
        }
    }, {
        "version": "v1",
        "timestamp": "2024-12-24T04:00:00.000Z",
        "event": {
            "device_mac": "1c-20-db-c9-07-20",
            "status": 0
        }
    }
]

So when using the longSum operator to aggregate subquery results, the expected result should be:

[{
        "version": "v1",
        "timestamp": "1900-01-01T00:00:00.000Z",
        "event": {
            "aCount": 1
        }
    }
]

but actual result is:

[{
        "version": "v1",
        "timestamp": "1900-01-01T00:00:00.000Z",
        "event": {
            "aCount": 0
        }
    }
]
  • Any debugging that you have already done

GroupByQueryQueryToolChest.java
method: mergeGroupByResultsWithoutPushDown, line: 224 -250

...
      final Sequence<ResultRow> subqueryResult = mergeGroupByResults(
          subquery,
          resource,
          runner,
          context
      );

      final Sequence<ResultRow> finalizingResults = finalizeSubqueryResults(subqueryResult, subquery);

      if (query.getSubtotalsSpec() != null) {
        return groupingEngine.processSubtotalsSpec(
            query,
            resource,
            groupingEngine.processSubqueryResult(subquery, query, resource, finalizingResults, false)
        );
      } else {
        return groupingEngine.applyPostProcessing(
            groupingEngine.processSubqueryResult(
                subquery,
                query,
                resource,
                finalizingResults,
                false
            ),
            query
        );
      }
...

the subqueryResult is :

{
      "device_mac": "10-c3-ab-2a-f7-50",
      "status": {lhs=1735012987483, rhs=1}
 }
 {
      "device_mac": "1c-20-db-c9-07-20",
      "status": {lhs=1735012987483, rhs=0}
 }

the finalizingResults is same as subqueryResult, because subquery.context().isFinalize(false) == false, Why is finalize equal to false?
Because it was overwritten as false in the FinalizeResultsQueryRunner.java.

FinalizeResultsQueryRunner.java
method: run, line: 58-75

  public Sequence<T> run(final QueryPlus<T> queryPlus, ResponseContext responseContext)
  {
    final Query<T> query = queryPlus.getQuery();
    final QueryContext queryContext = query.context();
    final boolean isBySegment = queryContext.isBySegment();
    final boolean shouldFinalize = queryContext.isFinalize(true);

    final Query<T> queryToRun;
    final Function<T, ?> finalizerFn;
    final MetricManipulationFn metricManipulationFn;

    if (shouldFinalize) {

     # **************** 
     # overridden here
      queryToRun = query.withOverriddenContext(ImmutableMap.of(QueryContexts.FINALIZE_KEY, false));
      metricManipulationFn = MetricManipulatorFns.finalizing();
    } else {
      queryToRun = query;
      metricManipulationFn = MetricManipulatorFns.identity();
    }

So when processing subQuery result in spillingGrouper, status cannot be parsed as a number:
currentValue is: Pair{lhs=1735012987483, rhs=1}, (expected current value is 1, after LongLastAggregator finalize)
the return result is: NullHandling.defaultLongValue()=0

        private Number getCurrentValueAsNumber()
        {
          final Object currentValue = getCurrentValue();
          if (currentValue instanceof StructuredData) {
            return Rows.objectToNumber(columnName, ((StructuredData) currentValue).getValue(), throwParseExceptions);
          }
          return Rows.objectToNumber(
              columnName,
              currentValue,
              throwParseExceptions
          );
        }
  • My Solution

How to solve this problem? I think when sub query can be performed, finalize needs to be set to true. My solution as follows:
FinalizeResultsQueryRunner.java

  private Sequence<ResultRow> mergeGroupByResultsWithoutPushDown(
      GroupByQuery query,
      GroupByQueryResources resource,
      QueryRunner<ResultRow> runner,
      ResponseContext context
  )
  {
    // If there's a subquery, merge subquery results and then apply the aggregator

    final DataSource dataSource = query.getDataSource();

    if (dataSource instanceof QueryDataSource) {
      final GroupByQuery subquery;
      try {
        // Inject outer query context keys into subquery if they don't already exist in the subquery context.
        // Unlike withOverriddenContext's normal behavior, we want keys present in the subquery to win.
        final Map<String, Object> subqueryContext = new TreeMap<>();
        if (query.getContext() != null) {
          for (Map.Entry<String, Object> entry : query.getContext().entrySet()) {
            if (entry.getValue() != null) {
              subqueryContext.put(entry.getKey(), entry.getValue());
            }
          }
        }
        if (((QueryDataSource) dataSource).getQuery().getContext() != null) {
          subqueryContext.putAll(((QueryDataSource) dataSource).getQuery().getContext());
        }
        subqueryContext.put(GroupByQuery.CTX_KEY_SORT_BY_DIMS_FIRST, false);

        # my solution:
        # if can perform subquery, the finalize is should be set to true
        if (canPerformSubquery(((QueryDataSource) dataSource).getQuery())) {
          subqueryContext.put(QueryContexts.FINALIZE_KEY, true);
        }

        subquery = (GroupByQuery) ((QueryDataSource) dataSource).getQuery().withOverriddenContext(subqueryContext);
      }
      catch (ClassCastException e) {
        throw new UnsupportedOperationException("Subqueries must be of type 'group by'");
      }

      final Sequence<ResultRow> subqueryResult = mergeGroupByResults(
          subquery,
          resource,
          runner,
          context
      );

      final Sequence<ResultRow> finalizingResults = finalizeSubqueryResults(subqueryResult, subquery);

soullkk added a commit to soullkk/druid that referenced this issue Jan 1, 2025
soullkk added a commit to soullkk/druid that referenced this issue Jan 1, 2025
soullkk added a commit to soullkk/druid that referenced this issue Jan 1, 2025
@soullkk
Copy link
Contributor Author

soullkk commented Jan 6, 2025

@kfaraz @gianm @abhishekagarwal87 Could someone give me some ideas to solve this problem? The solution I presented is problematic in certain scenarios because 50 of the test cases did not pass

@soullkk
Copy link
Contributor Author

soullkk commented Jan 22, 2025

@kfaraz @gianm @abhishekagarwal87 any ideas? I tried to fix this issue, but my modification is problematic for the test cases of the HyperUniquesAggregatorFactory.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant