Skip to content

Commit

Permalink
Add verbose pipeline parameter to output each processor's execution d…
Browse files Browse the repository at this point in the history
…etails (opensearch-project#16843)

* Add verbose pipeline parameter to output each processor's execution details

Signed-off-by: Junwei Dai <[email protected]>

* add change log

Signed-off-by: Junwei Dai <[email protected]>

# Conflicts:
#	CHANGELOG.md

* Refactor ProcessorExecutionDetail to improve field handling

Signed-off-by: Junwei Dai <[email protected]>

* Fix ITtest Fail

Signed-off-by: Junwei Dai <[email protected]>

* Add more unit test

Signed-off-by: Junwei Dai <[email protected]>

* resolve comments

Signed-off-by: Junwei Dai <[email protected]>

* 1.add todo to change version.current
2.use exist xcontentUtil to read
3.move processor excution key to ProcessorExecutionDetail

Signed-off-by: Junwei Dai <[email protected]>

* refactor code

Signed-off-by: Junwei Dai <[email protected]>

* refactor code based on the comment

Signed-off-by: Junwei Dai <[email protected]>

* refactor code based on the comment

Signed-off-by: Junwei Dai <[email protected]>

* 1.add javadoc
2.refactor error message

Signed-off-by: Junwei Dai <[email protected]>

* change error message

Signed-off-by: Junwei Dai <[email protected]>

* 1.Added wrappers for tracking execution details of search processors.
2.Removed redundant logic for cleaner and simpler implementation.

Signed-off-by: Junwei Dai <[email protected]>

* change version to 3.0.0

Signed-off-by: Junwei Dai <[email protected]>

* fix unit test

Signed-off-by: Junwei Dai <[email protected]>

* fix unit test

Signed-off-by: Junwei Dai <[email protected]>

* addressed comments 1. removed unnecessary log

Signed-off-by: Junwei Dai <[email protected]>

* addressed comments

Signed-off-by: Junwei Dai <[email protected]>

* revise comment to opensearch.api

Signed-off-by: Junwei Dai <[email protected]>

* removed unused logger and comment

Signed-off-by: Junwei Dai <[email protected]>

* removed unnecessary try catch block. add more comment

Signed-off-by: Junwei Dai <[email protected]>

* addressed comments

Signed-off-by: Junwei Dai <[email protected]>

* remove wrong unit test

Signed-off-by: Junwei Dai <[email protected]>

---------

Signed-off-by: Junwei Dai <[email protected]>
Co-authored-by: Junwei Dai <[email protected]>
(cherry picked from commit e15f712)
Signed-off-by: Junwei Dai <[email protected]>
  • Loading branch information
junweid62 and Junwei Dai committed Jan 23, 2025
1 parent 380949f commit 46d14a5
Show file tree
Hide file tree
Showing 21 changed files with 1,251 additions and 27 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add stats for remote publication failure and move download failure stats to remote methods([#16682](https://github.com/opensearch-project/OpenSearch/pull/16682/))
- Update script supports java.lang.String.sha1() and java.lang.String.sha256() methods ([#16923](https://github.com/opensearch-project/OpenSearch/pull/16923))
- Added a precaution to handle extreme date values during sorting to prevent `arithmetic_exception: long overflow` ([#16812](https://github.com/opensearch-project/OpenSearch/pull/16812)).
- Add `verbose_pipeline` parameter to output each processor's execution details ([#16843](https://github.com/opensearch-project/OpenSearch/pull/16843)).
- Add search replica stats to segment replication stats API ([#16678](https://github.com/opensearch-project/OpenSearch/pull/16678))
- Introduce framework for auxiliary transports and an experimental gRPC transport plugin ([#16534](https://github.com/opensearch-project/OpenSearch/pull/16534))
- Support searching from doc_value using termQueryCaseInsensitive/termQuery in flat_object/keyword field([#16974](https://github.com/opensearch-project/OpenSearch/pull/16974/))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.opensearch.search.aggregations.Aggregations;
import org.opensearch.search.aggregations.InternalAggregations;
import org.opensearch.search.internal.InternalSearchResponse;
import org.opensearch.search.pipeline.ProcessorExecutionDetail;
import org.opensearch.search.profile.ProfileShardResult;
import org.opensearch.search.profile.SearchProfileShardResults;
import org.opensearch.search.suggest.Suggest;
Expand All @@ -74,6 +75,7 @@
import java.util.function.Supplier;

import static org.opensearch.action.search.SearchResponseSections.EXT_FIELD;
import static org.opensearch.action.search.SearchResponseSections.PROCESSOR_RESULT_FIELD;
import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;

/**
Expand Down Expand Up @@ -402,6 +404,7 @@ public static SearchResponse innerFromXContent(XContentParser parser) throws IOE
List<ShardSearchFailure> failures = new ArrayList<>();
Clusters clusters = Clusters.EMPTY;
List<SearchExtBuilder> extBuilders = new ArrayList<>();
List<ProcessorExecutionDetail> processorResult = new ArrayList<>();
for (Token token = parser.nextToken(); token != Token.END_OBJECT; token = parser.nextToken()) {
if (token == Token.FIELD_NAME) {
currentFieldName = parser.currentName();
Expand Down Expand Up @@ -525,6 +528,11 @@ public static SearchResponse innerFromXContent(XContentParser parser) throws IOE
extBuilders.add(searchExtBuilder);
}
}
} else if (PROCESSOR_RESULT_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
while ((token = parser.nextToken()) != Token.END_ARRAY) {
ProcessorExecutionDetail detail = ProcessorExecutionDetail.fromXContent(parser);
processorResult.add(detail);
}
} else {
parser.skipChildren();
}
Expand All @@ -538,7 +546,8 @@ public static SearchResponse innerFromXContent(XContentParser parser) throws IOE
terminatedEarly,
profile,
numReducePhases,
extBuilders
extBuilders,
processorResult
);
return new SearchResponse(
searchResponseSections,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.search.SearchExtBuilder;
import org.opensearch.search.SearchHits;
import org.opensearch.search.aggregations.Aggregations;
import org.opensearch.search.pipeline.ProcessorExecutionDetail;
import org.opensearch.search.profile.ProfileShardResult;
import org.opensearch.search.profile.SearchProfileShardResults;
import org.opensearch.search.suggest.Suggest;
Expand All @@ -65,7 +66,7 @@
public class SearchResponseSections implements ToXContentFragment {

public static final ParseField EXT_FIELD = new ParseField("ext");

public static final ParseField PROCESSOR_RESULT_FIELD = new ParseField("processor_results");
protected final SearchHits hits;
protected final Aggregations aggregations;
protected final Suggest suggest;
Expand All @@ -74,6 +75,7 @@ public class SearchResponseSections implements ToXContentFragment {
protected final Boolean terminatedEarly;
protected final int numReducePhases;
protected final List<SearchExtBuilder> searchExtBuilders = new ArrayList<>();
protected final List<ProcessorExecutionDetail> processorResult = new ArrayList<>();

public SearchResponseSections(
SearchHits hits,
Expand All @@ -84,7 +86,17 @@ public SearchResponseSections(
SearchProfileShardResults profileResults,
int numReducePhases
) {
this(hits, aggregations, suggest, timedOut, terminatedEarly, profileResults, numReducePhases, Collections.emptyList());
this(
hits,
aggregations,
suggest,
timedOut,
terminatedEarly,
profileResults,
numReducePhases,
Collections.emptyList(),
Collections.emptyList()
);
}

public SearchResponseSections(
Expand All @@ -95,7 +107,8 @@ public SearchResponseSections(
Boolean terminatedEarly,
SearchProfileShardResults profileResults,
int numReducePhases,
List<SearchExtBuilder> searchExtBuilders
List<SearchExtBuilder> searchExtBuilders,
List<ProcessorExecutionDetail> processorResult
) {
this.hits = hits;
this.aggregations = aggregations;
Expand All @@ -104,6 +117,7 @@ public SearchResponseSections(
this.timedOut = timedOut;
this.terminatedEarly = terminatedEarly;
this.numReducePhases = numReducePhases;
this.processorResult.addAll(processorResult);
this.searchExtBuilders.addAll(Objects.requireNonNull(searchExtBuilders, "searchExtBuilders must not be null"));
}

Expand Down Expand Up @@ -166,13 +180,21 @@ public final XContentBuilder toXContent(XContentBuilder builder, Params params)
}
builder.endObject();
}

if (!processorResult.isEmpty()) {
builder.field(PROCESSOR_RESULT_FIELD.getPreferredName(), processorResult);
}
return builder;
}

public List<SearchExtBuilder> getSearchExtBuilders() {
return Collections.unmodifiableList(this.searchExtBuilders);
}

public List<ProcessorExecutionDetail> getProcessorResult() {
return processorResult;
}

protected void writeTo(StreamOutput out) throws IOException {
throw new UnsupportedOperationException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,9 @@ private static void parseSearchSource(final SearchSourceBuilder searchSourceBuil
if (request.hasParam("timeout")) {
searchSourceBuilder.timeout(request.paramAsTime("timeout", null));
}
if (request.hasParam("verbose_pipeline")) {
searchSourceBuilder.verbosePipeline(request.paramAsBoolean("verbose_pipeline", false));
}
if (request.hasParam("terminate_after")) {
int terminateAfter = request.paramAsInt("terminate_after", SearchContext.DEFAULT_TERMINATE_AFTER);
if (terminateAfter < 0) {
Expand Down
17 changes: 17 additions & 0 deletions server/src/main/java/org/opensearch/search/SearchHits.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.lucene.search.TotalHits.Relation;
import org.opensearch.common.Nullable;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.lucene.Lucene;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -166,6 +167,22 @@ public SearchHit[] getHits() {
return this.hits;
}

/**
* Creates a deep copy of this SearchHits instance.
*
* @return a deep copy of the current SearchHits object
* @throws IOException if an I/O exception occurs during serialization or deserialization
*/
public SearchHits deepCopy() throws IOException {
try (BytesStreamOutput out = new BytesStreamOutput()) {
this.writeTo(out);

try (StreamInput in = out.bytes().streamInput()) {
return new SearchHits(in);
}
}
}

/**
* Return the hit as the provided position.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ public final class SearchSourceBuilder implements Writeable, ToXContentObject, R
public static final ParseField SLICE = new ParseField("slice");
public static final ParseField POINT_IN_TIME = new ParseField("pit");
public static final ParseField SEARCH_PIPELINE = new ParseField("search_pipeline");
public static final ParseField VERBOSE_SEARCH_PIPELINE = new ParseField("verbose_pipeline");

public static SearchSourceBuilder fromXContent(XContentParser parser) throws IOException {
return fromXContent(parser, true);
Expand Down Expand Up @@ -227,6 +228,8 @@ public static HighlightBuilder highlight() {

private String searchPipeline;

private boolean verbosePipeline = false;

/**
* Constructs a new search source builder.
*/
Expand Down Expand Up @@ -311,6 +314,9 @@ public SearchSourceBuilder(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_2_18_0)) {
searchPipeline = in.readOptionalString();
}
if (in.getVersion().onOrAfter(Version.V_2_19_0)) {
verbosePipeline = in.readBoolean();
}
}

@Override
Expand Down Expand Up @@ -402,6 +408,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_18_0)) {
out.writeOptionalString(searchPipeline);
}
if (out.getVersion().onOrAfter(Version.V_2_19_0)) {
out.writeBoolean(verbosePipeline);
}
}

/**
Expand Down Expand Up @@ -1159,6 +1168,26 @@ public SearchSourceBuilder pipeline(String searchPipeline) {
return this;
}

/**
* Enables or disables verbose mode for the search pipeline.
*
* When verbose mode is enabled, detailed information about each processor
* in the search pipeline is included in the search response. This includes
* the processor name, execution status, input, output, and time taken for processing.
*
* This parameter is primarily intended for debugging purposes, allowing users
* to track how data flows and transforms through the search pipeline.
*
*/
public SearchSourceBuilder verbosePipeline(Boolean verbosePipeline) {
this.verbosePipeline = verbosePipeline;
return this;
}

public Boolean verbosePipeline() {
return verbosePipeline;
}

/**
* Rewrites this search source builder into its primitive form. e.g. by
* rewriting the QueryBuilder. If the builder did not change the identity
Expand Down Expand Up @@ -1257,6 +1286,7 @@ private SearchSourceBuilder shallowCopy(
rewrittenBuilder.derivedFieldsObject = derivedFieldsObject;
rewrittenBuilder.derivedFields = derivedFields;
rewrittenBuilder.searchPipeline = searchPipeline;
rewrittenBuilder.verbosePipeline = verbosePipeline;
return rewrittenBuilder;
}

Expand Down Expand Up @@ -1326,6 +1356,8 @@ public void parseXContent(XContentParser parser, boolean checkTrailingTokens) th
profile = parser.booleanValue();
} else if (SEARCH_PIPELINE.match(currentFieldName, parser.getDeprecationHandler())) {
searchPipeline = parser.text();
} else if (VERBOSE_SEARCH_PIPELINE.match(currentFieldName, parser.getDeprecationHandler())) {
verbosePipeline = parser.booleanValue();
} else {
throw new ParsingException(
parser.getTokenLocation(),
Expand Down Expand Up @@ -1659,6 +1691,10 @@ public XContentBuilder innerToXContent(XContentBuilder builder, Params params) t
builder.field(SEARCH_PIPELINE.getPreferredName(), searchPipeline);
}

if (verbosePipeline) {
builder.field(VERBOSE_SEARCH_PIPELINE.getPreferredName(), verbosePipeline);
}

return builder;
}

Expand Down Expand Up @@ -1937,7 +1973,8 @@ public int hashCode() {
pointInTimeBuilder,
derivedFieldsObject,
derivedFields,
searchPipeline
searchPipeline,
verbosePipeline
);
}

Expand Down Expand Up @@ -1983,7 +2020,8 @@ public boolean equals(Object obj) {
&& Objects.equals(pointInTimeBuilder, other.pointInTimeBuilder)
&& Objects.equals(derivedFieldsObject, other.derivedFieldsObject)
&& Objects.equals(derivedFields, other.derivedFields)
&& Objects.equals(searchPipeline, other.searchPipeline);
&& Objects.equals(searchPipeline, other.searchPipeline)
&& Objects.equals(verbosePipeline, other.verbosePipeline);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.opensearch.search.SearchExtBuilder;
import org.opensearch.search.SearchHits;
import org.opensearch.search.aggregations.InternalAggregations;
import org.opensearch.search.pipeline.ProcessorExecutionDetail;
import org.opensearch.search.profile.SearchProfileShardResults;
import org.opensearch.search.suggest.Suggest;

Expand Down Expand Up @@ -73,7 +74,17 @@ public InternalSearchResponse(
Boolean terminatedEarly,
int numReducePhases
) {
this(hits, aggregations, suggest, profileResults, timedOut, terminatedEarly, numReducePhases, Collections.emptyList());
this(
hits,
aggregations,
suggest,
profileResults,
timedOut,
terminatedEarly,
numReducePhases,
Collections.emptyList(),
Collections.emptyList()
);
}

public InternalSearchResponse(
Expand All @@ -84,9 +95,20 @@ public InternalSearchResponse(
boolean timedOut,
Boolean terminatedEarly,
int numReducePhases,
List<SearchExtBuilder> searchExtBuilderList
List<SearchExtBuilder> searchExtBuilderList,
List<ProcessorExecutionDetail> processorResult
) {
super(hits, aggregations, suggest, timedOut, terminatedEarly, profileResults, numReducePhases, searchExtBuilderList);
super(
hits,
aggregations,
suggest,
timedOut,
terminatedEarly,
profileResults,
numReducePhases,
searchExtBuilderList,
processorResult
);
}

public InternalSearchResponse(StreamInput in) throws IOException {
Expand All @@ -98,7 +120,8 @@ public InternalSearchResponse(StreamInput in) throws IOException {
in.readOptionalBoolean(),
in.readOptionalWriteable(SearchProfileShardResults::new),
in.readVInt(),
readSearchExtBuildersOnOrAfter(in)
readSearchExtBuildersOnOrAfter(in),
readProcessorResultOnOrAfter(in)
);
}

Expand All @@ -112,6 +135,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(profileResults);
out.writeVInt(numReducePhases);
writeSearchExtBuildersOnOrAfter(out, searchExtBuilders);
writeProcessorResultOnOrAfter(out, processorResult);
}

private static List<SearchExtBuilder> readSearchExtBuildersOnOrAfter(StreamInput in) throws IOException {
Expand All @@ -123,4 +147,15 @@ private static void writeSearchExtBuildersOnOrAfter(StreamOutput out, List<Searc
out.writeNamedWriteableList(searchExtBuilders);
}
}

private static List<ProcessorExecutionDetail> readProcessorResultOnOrAfter(StreamInput in) throws IOException {
return (in.getVersion().onOrAfter(Version.V_2_19_0)) ? in.readList(ProcessorExecutionDetail::new) : Collections.emptyList();
}

private static void writeProcessorResultOnOrAfter(StreamOutput out, List<ProcessorExecutionDetail> processorResult) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_19_0)) {
out.writeList(processorResult);
}
}

}
Loading

0 comments on commit 46d14a5

Please sign in to comment.