Skip to content

Commit

Permalink
[FLINK-35472] Improve tests for Elasticsearch 8 connector
Browse files Browse the repository at this point in the history
  • Loading branch information
liuml07 committed May 28, 2024
1 parent 50327f8 commit cadc370
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 275 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,59 +27,29 @@
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import co.elastic.clients.elasticsearch.core.bulk.IndexOperation;
import org.apache.http.HttpHost;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.junit.jupiter.Testcontainers;

import java.io.IOException;
import org.junit.jupiter.api.TestTemplate;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;

/** Integration tests for {@link Elasticsearch8AsyncSink}. */
@Testcontainers
public class Elasticsearch8AsyncSinkITCase extends ElasticsearchSinkBaseITCase {
private static boolean failed;

@BeforeEach
void setUp() {
this.client = getRestClient();
failed = false;
}

@AfterEach
void shutdown() throws IOException {
if (client != null) {
client.close();
}
}

@Test
@TestTemplate
public void testWriteToElasticsearch() throws Exception {
String index = "test-write-to-elasticsearch";

try (StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1)) {

env.setRestartStrategy(RestartStrategies.noRestart());

final Elasticsearch8AsyncSink<DummyData> sink =
Elasticsearch8AsyncSinkBuilder.<DummyData>builder()
.setMaxBatchSize(5)
.setHosts(
new HttpHost(
ES_CONTAINER.getHost(),
ES_CONTAINER.getFirstMappedPort()))
.setElementConverter(
(element, ctx) ->
new IndexOperation.Builder<>()
.index(index)
.id(element.getId())
.document(element)
.build())
.build();
Elasticsearch8AsyncSink<DummyData> sink = getSinkForDummyData(index);

env.fromElements("first", "second", "third", "fourth", "fifth")
.map(
Expand All @@ -90,32 +60,17 @@ public void testWriteToElasticsearch() throws Exception {
env.execute();
}

assertIdsAreWritten(client, index, new String[] {"first_v1_index", "second_v1_index"});
assertIdsAreWritten(index, new String[] {"first_v1_index", "second_v1_index"});
}

@Test
@TestTemplate
public void testRecovery() throws Exception {
String index = "test-recovery";

try (final StreamExecutionEnvironment env = new LocalStreamEnvironment()) {

env.enableCheckpointing(100L);

final Elasticsearch8AsyncSink<DummyData> sink =
Elasticsearch8AsyncSinkBuilder.<DummyData>builder()
.setMaxBatchSize(5)
.setHosts(
new HttpHost(
ES_CONTAINER.getHost(),
ES_CONTAINER.getFirstMappedPort()))
.setElementConverter(
(element, ctx) ->
new IndexOperation.Builder<>()
.index(index)
.id(element.getId())
.document(element)
.build())
.build();
final Elasticsearch8AsyncSink<DummyData> sink = getSinkForDummyData(index);

env.fromElements("first", "second", "third", "fourth", "fifth")
.map(
Expand Down

This file was deleted.

Loading

0 comments on commit cadc370

Please sign in to comment.