Skip to content

Commit

Permalink
es 8.13.0 support
Browse files Browse the repository at this point in the history
  • Loading branch information
shi-yuan committed Aug 18, 2024
1 parent d1d84a7 commit 26102a0
Show file tree
Hide file tree
Showing 96 changed files with 5,771 additions and 192 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ jdk:

before_install:
- sudo rm -rf /var/lib/elasticsearch
- curl https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-8.12.2-amd64.deb -o elasticsearch.deb && sudo dpkg -i --force-confnew elasticsearch.deb
- curl https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-8.13.0-amd64.deb -o elasticsearch.deb && sudo dpkg -i --force-confnew elasticsearch.deb
- sudo cp ./src/test/resources/elasticsearch.yml /etc/elasticsearch/elasticsearch.yml
- sudo cat /etc/elasticsearch/elasticsearch.yml
- sudo java -version
Expand Down
8 changes: 4 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>org.nlpcn</groupId>
<artifactId>elasticsearch-sql</artifactId>
<version>8.12.2.0</version>
<version>8.13.0.0</version>
<packaging>jar</packaging>
<description>Query elasticsearch using SQL</description>
<name>elasticsearch-sql</name>
Expand Down Expand Up @@ -44,7 +44,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<runSuite>**/MainTestSuite.class</runSuite>
<elasticsearch.plugin.name>sql</elasticsearch.plugin.name>
<elasticsearch.version>8.12.2</elasticsearch.version>
<elasticsearch.version>8.13.0</elasticsearch.version>
<elasticsearch.plugin.classname>org.elasticsearch.plugin.nlpcn.SqlPlug</elasticsearch.plugin.classname>
<druid.version>1.2.15</druid.version>
<guava.version>32.0.0-jre</guava.version>
Expand Down Expand Up @@ -162,8 +162,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<source>16</source>
<target>16</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.alibaba.druid.pool;

import org.elasticsearch.client.internal.Client;
import org.elasticsearch.plugin.nlpcn.client.ElasticsearchRestClient;

import javax.sql.DataSource;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.admin.indices.refresh;

import org.elasticsearch.action.support.broadcast.BaseBroadcastResponse;
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.XContentParser;

import java.util.Arrays;

import static org.elasticsearch.action.support.broadcast.BaseBroadcastResponse.declareBroadcastFields;

/**
* The response of a refresh action.
*/
public class ParsedRefreshResponse {

private static final ConstructingObjectParser<BroadcastResponse, Void> PARSER = new ConstructingObjectParser<>("refresh", true, arg -> {
BaseBroadcastResponse response = (BaseBroadcastResponse) arg[0];
return new BroadcastResponse(
response.getTotalShards(),
response.getSuccessfulShards(),
response.getFailedShards(),
Arrays.asList(response.getShardFailures())
);
});

static {
declareBroadcastFields(PARSER);
}

public static BroadcastResponse fromXContent(XContentParser parser) {
return PARSER.apply(parser, null);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.bulk;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.DocWriteRequest.OpType;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xcontent.XContentParser;

import java.io.IOException;

import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.elasticsearch.common.xcontent.XContentParserUtils.throwUnknownField;

/**
* Represents a single item response for an action executed as part of the bulk API. Holds the index/type/id
* of the relevant action, and if it has failed or not (with the failure message in case it failed).
*/
public class ParsedBulkItemResponse {

/**
* Reads a {@link BulkItemResponse} from a {@link XContentParser}.
*
* @param parser the {@link XContentParser}
* @param id the id to assign to the parsed {@link BulkItemResponse}. It is usually the index of
* the item in the {@link BulkResponse#getItems} array.
*/
public static BulkItemResponse fromXContent(XContentParser parser, int id) throws IOException {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);

XContentParser.Token token = parser.nextToken();
ensureExpectedToken(XContentParser.Token.FIELD_NAME, token, parser);

String currentFieldName = parser.currentName();
token = parser.nextToken();

final OpType opType = OpType.fromString(currentFieldName);
ensureExpectedToken(XContentParser.Token.START_OBJECT, token, parser);

DocWriteResponse.Builder builder = null;
CheckedConsumer<XContentParser, IOException> itemParser = null;

if (opType == OpType.INDEX || opType == OpType.CREATE) {
final IndexResponse.Builder indexResponseBuilder = new IndexResponse.Builder();
builder = indexResponseBuilder;
itemParser = (indexParser) -> IndexResponse.parseXContentFields(indexParser, indexResponseBuilder);

} else if (opType == OpType.UPDATE) {
final UpdateResponse.Builder updateResponseBuilder = new UpdateResponse.Builder();
builder = updateResponseBuilder;
itemParser = (updateParser) -> UpdateResponse.parseXContentFields(updateParser, updateResponseBuilder);

} else if (opType == OpType.DELETE) {
final DeleteResponse.Builder deleteResponseBuilder = new DeleteResponse.Builder();
builder = deleteResponseBuilder;
itemParser = (deleteParser) -> DeleteResponse.parseXContentFields(deleteParser, deleteResponseBuilder);
} else {
throwUnknownField(currentFieldName, parser);
}

RestStatus status = null;
ElasticsearchException exception = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
}

if (BulkItemResponse.ERROR.equals(currentFieldName)) {
if (token == XContentParser.Token.START_OBJECT) {
exception = ElasticsearchException.fromXContent(parser);
}
} else if (BulkItemResponse.STATUS.equals(currentFieldName)) {
if (token == XContentParser.Token.VALUE_NUMBER) {
status = RestStatus.fromCode(parser.intValue());
}
} else {
itemParser.accept(parser);
}
}

ensureExpectedToken(XContentParser.Token.END_OBJECT, token, parser);
token = parser.nextToken();
ensureExpectedToken(XContentParser.Token.END_OBJECT, token, parser);

BulkItemResponse bulkItemResponse;
if (exception != null) {
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(builder.getShardId().getIndexName(), builder.getId(), exception, status);
bulkItemResponse = BulkItemResponse.failure(id, opType, failure);
} else {
bulkItemResponse = BulkItemResponse.success(id, opType, builder.build());
}
return bulkItemResponse;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.bulk;

import org.elasticsearch.xcontent.XContentParser;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.elasticsearch.common.xcontent.XContentParserUtils.throwUnknownField;
import static org.elasticsearch.common.xcontent.XContentParserUtils.throwUnknownToken;

/**
* A response of a bulk execution. Holding a response for each item responding (in order) of the
* bulk requests. Each item holds the index/type/id is operated on, and if it failed or not (with the
* failure message).
*/
public class ParsedBulkResponse {

public static BulkResponse fromXContent(XContentParser parser) throws IOException {
XContentParser.Token token = parser.nextToken();
ensureExpectedToken(XContentParser.Token.START_OBJECT, token, parser);

long took = -1L;
long ingestTook = BulkResponse.NO_INGEST_TOOK;
List<BulkItemResponse> items = new ArrayList<>();

String currentFieldName = parser.currentName();
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
if (BulkResponse.TOOK.equals(currentFieldName)) {
took = parser.longValue();
} else if (BulkResponse.INGEST_TOOK.equals(currentFieldName)) {
ingestTook = parser.longValue();
} else if (BulkResponse.ERRORS.equals(currentFieldName) == false) {
throwUnknownField(currentFieldName, parser);
}
} else if (token == XContentParser.Token.START_ARRAY) {
if (BulkResponse.ITEMS.equals(currentFieldName)) {
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
items.add(ParsedBulkItemResponse.fromXContent(parser, items.size()));
}
} else {
throwUnknownField(currentFieldName, parser);
}
} else {
throwUnknownToken(token, parser);
}
}
return new BulkResponse(items.toArray(new BulkItemResponse[items.size()]), took, ingestTook);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.search;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.core.RefCounted;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentParser.Token;

import java.io.IOException;
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.List;

import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg;

/**
* A multi search response.
*/
public class ParsedMultiSearchResponse {

private static final ParseField RESPONSES = new ParseField(MultiSearchResponse.Fields.RESPONSES);
private static final ParseField TOOK_IN_MILLIS = new ParseField("took");
private static final Field REF_COUNTED_FIELD;

static {
try {
REF_COUNTED_FIELD = MultiSearchResponse.class.getDeclaredField("refCounted");
if (!REF_COUNTED_FIELD.isAccessible()) {
REF_COUNTED_FIELD.setAccessible(true);
}
} catch (NoSuchFieldException e) {
throw new IllegalStateException(e);
}
}

@SuppressWarnings("unchecked")
private static final ConstructingObjectParser<MultiSearchResponse, Void> PARSER = new ConstructingObjectParser<>(
"multi_search",
true,
a -> new MultiSearchResponse(((List<MultiSearchResponse.Item>) a[0]).toArray(new MultiSearchResponse.Item[0]), (long) a[1])
);
static {
PARSER.declareObjectArray(constructorArg(), (p, c) -> itemFromXContent(p), RESPONSES);
PARSER.declareLong(constructorArg(), TOOK_IN_MILLIS);
}

public static MultiSearchResponse fromXContext(XContentParser parser) {
return unpooled(PARSER.apply(parser, null));
}

private static MultiSearchResponse.Item itemFromXContent(XContentParser parser) throws IOException {
// This parsing logic is a bit tricky here, because the multi search response itself is tricky:
// 1) The json objects inside the responses array are either a search response or a serialized exception
// 2) Each response json object gets a status field injected that ElasticsearchException.failureFromXContent(...) does not parse,
// but SearchResponse.innerFromXContent(...) parses and then ignores. The status field is not needed to parse
// the response item. However in both cases this method does need to parse the 'status' field otherwise the parsing of
// the response item in the next json array element will fail due to parsing errors.

MultiSearchResponse.Item item = null;
String fieldName = null;

Token token = parser.nextToken();
assert token == Token.FIELD_NAME;
outer: for (; token != Token.END_OBJECT; token = parser.nextToken()) {
switch (token) {
case FIELD_NAME:
fieldName = parser.currentName();
if ("error".equals(fieldName)) {
item = new MultiSearchResponse.Item(null, ElasticsearchException.failureFromXContent(parser));
} else if ("status".equals(fieldName) == false) {
item = new MultiSearchResponse.Item(ParsedSearchResponse.innerFromXContent(parser), null);
break outer;
}
break;
case VALUE_NUMBER:
if ("status".equals(fieldName)) {
// Ignore the status value
}
break;
}
}
assert parser.currentToken() == Token.END_OBJECT;
return item;
}

private static MultiSearchResponse unpooled(MultiSearchResponse searchResponse) {
MultiSearchResponse.Item[] items = searchResponse.getResponses();
MultiSearchResponse.Item[] tempItems = Arrays.copyOf(items, items.length);
searchResponse.decRef();
System.arraycopy(tempItems, 0, items, 0, items.length);
try {
REF_COUNTED_FIELD.set(searchResponse, RefCounted.ALWAYS_REFERENCED);
} catch (IllegalAccessException e) {
throw new IllegalStateException(e);
}
return searchResponse;
}
}
Loading

0 comments on commit 26102a0

Please sign in to comment.