Skip to content

Commit

Permalink
CC-8804: Integration tests for upsert/delete (#16)
Browse files Browse the repository at this point in the history
* GH-264: Add embedded integration test for upsert/delete

Also fixes a bug in the schema retriever logic where key schemas were not being
reported to schema retrievers, and improves shutdown logic so that tasks can
stop gracefully when requested by the framework.

* GH-264: Clean up shutdown logic, make logs easier to read

* GC-264: Retain prior shutdown behavior when upsert/delete is not enabled

* GC-264: Refactor merge query construction logic

* GC-264: Fix infinite recursion bug in SchemaRetriever interface
  • Loading branch information
C0urante authored Jun 29, 2020
1 parent d66018f commit 7563971
Show file tree
Hide file tree
Showing 19 changed files with 1,457 additions and 352 deletions.
37 changes: 33 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,41 @@ adjusting flags given to the Avro Console Producer and tweaking the config setti

## Integration Testing the Connector

There is a legacy Docker-based integration test for the connector, and newer integration tests that
programmatically instantiate an embedded Connect cluster.

### Embedded integration tests

Currently these tests only verify the connector's upsert/delete feature. They should eventually
replace all of the existing Docker-based tests.

#### Configuring the tests

You must supply the following environment variables in order to run the tests:

- `$KCBQ_TEST_PROJECT`: The name of the BigQuery project to use for the test
- `$KCBQ_TEST_DATASET`: The name of the BigQuery dataset to use for the test
- `$KCBQ_TEST_KEYFILE`: The key (either file or raw contents) used to authenticate with BigQuery
during the test

Additionally, the `$KCBQ_TEST_KEYSOURCE` variable can be supplied to specify whether the value of
`$KCBQ_TEST_KEYFILE` are a path to a key file (if set to `FILE`) or the raw contents of a key file
(if set to `JSON`). The default is `FILE`.

#### Running the Integration Tests

```bash
./gradlew embeddedIntegrationTest
```

### Docker-based tests

> **NOTE**: You must have [Docker] installed and running on your machine in order to run integration
tests for the connector.

This all takes place in the `kcbq-connector` directory.

### How Integration Testing Works
#### How Integration Testing Works

Integration tests run by creating [Docker] instances for [Zookeeper], [Kafka], [Schema Registry],
and the BigQuery Connector itself, then verifying the results using a [JUnit] test.
Expand All @@ -148,7 +177,7 @@ The project and dataset they write to, as well as the specific JSON key file the
specified by command-line flag, environment variable, or configuration file — the exact details of
each can be found by running the integration test script with the `-?` flag.

### Data Corruption Concerns
#### Data Corruption Concerns

In order to ensure the validity of each test, any table that will be written to in the course of
integration testing is preemptively deleted before the connector is run. This will only be an issue
Expand All @@ -161,7 +190,7 @@ tests will corrupt any existing data that is already on your machine, and there
free up any of your ports that might currently be in use by real instances of the programs that are
faked in the process of testing.

### Running the Integration Tests
#### Running the Integration Tests

Running the series of integration tests is easy:

Expand All @@ -176,7 +205,7 @@ the `--help` flag.
> **NOTE:** You must have a recent version of [boot2docker], [Docker Machine], [Docker], etc.
installed. Older versions will hang when cleaning containers, and linking doesn't work properly.

### Adding New Integration Tests
#### Adding New Integration Tests

Adding an integration test is a little more involved, and consists of two major steps: specifying
Avro data to be sent to Kafka, and specifying via JUnit test how to verify that such data made
Expand Down
28 changes: 27 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ project.ext {
ioConfluentVersion = '5.5.0'
junitVersion = '4.12'
kafkaVersion = '2.5.0'
kafkaScalaVersion = '2.12' // For integration testing only
mockitoVersion = '3.2.4'
slf4jVersion = '1.6.1'
}
Expand Down Expand Up @@ -153,6 +154,26 @@ project(':kcbq-connector') {
}
}

test {
useJUnit {
// Exclude embedded integration tests from normal testing since they require BigQuery
// credentials and can take a while
excludeCategories 'org.apache.kafka.test.IntegrationTest'
}
}

task embeddedIntegrationTest(type: Test) {
useJUnit {
includeCategories 'org.apache.kafka.test.IntegrationTest'
}

// Enable logging for integration tests
testLogging {
outputs.upToDateWhen {false}
showStandardStreams = true
}
}

task integrationTestPrep() {
dependsOn 'integrationTestTablePrep'
dependsOn 'integrationTestBucketPrep'
Expand Down Expand Up @@ -226,7 +247,12 @@ project(':kcbq-connector') {
"junit:junit:$junitVersion",
"org.mockito:mockito-core:$mockitoVersion",
"org.mockito:mockito-inline:$mockitoVersion",
"org.apache.kafka:connect-api:$kafkaVersion"
"org.apache.kafka:kafka_$kafkaScalaVersion:$kafkaVersion",
"org.apache.kafka:kafka_$kafkaScalaVersion:$kafkaVersion:test",
"org.apache.kafka:kafka-clients:$kafkaVersion:test",
"org.apache.kafka:connect-api:$kafkaVersion",
"org.apache.kafka:connect-runtime:$kafkaVersion",
"org.apache.kafka:connect-runtime:$kafkaVersion:test",
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public interface SchemaRetriever {
* {@link org.apache.kafka.connect.sink.SinkConnector#start(Map)} method.
* @param properties The configuration settings of the connector.
*/
public void configure(Map<String, String> properties);
void configure(Map<String, String> properties);

/**
* Retrieve the most current schema for the given topic.
Expand All @@ -25,13 +25,30 @@ public interface SchemaRetriever {
* @param schemaType The type of kafka schema, either "value" or "key".
* @return The Schema for the given table.
*/
public Schema retrieveSchema(TableId table, String topic, KafkaSchemaRecordType schemaType);
Schema retrieveSchema(TableId table, String topic, KafkaSchemaRecordType schemaType);

/**
* Set the last seen schema for a given topic
* Set the last seen schema for a given topic.
* @param table The table that will be created.
* @param topic The topic to retrieve a schema for.
* @param schema The last seen Kafka Connect Schema
*/
public void setLastSeenSchema(TableId table, String topic, Schema schema);
void setLastSeenSchema(TableId table, String topic, Schema schema);

/**
* Set the last seen schema for a given topic and record type.
* In order to preserve backwards compatibility, will invoke
* {@link #setLastSeenSchema(TableId, String, Schema)} by default if the schema is for a record
* value, and otherwise be a no-op.
* @param table The table that will be created.
* @param topic The topic to retrieve a schema for.
* @param schema The last seen Kafka Connect Schema.
* @param schemaType The type of the schema (key or value).
* @since 1.7.0
*/
default void setLastSeenSchema(TableId table, String topic, Schema schema, KafkaSchemaRecordType schemaType) {
if (KafkaSchemaRecordType.VALUE.equals(schemaType)) {
setLastSeenSchema(table, topic, schema);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,16 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;

import static com.wepay.kafka.connect.bigquery.utils.TableNameUtils.intTable;

/**
* A {@link SinkTask} used to translate Kafka Connect {@link SinkRecord SinkRecords} into BigQuery
* {@link RowToInsert RowToInserts} and subsequently write them to BigQuery.
Expand Down Expand Up @@ -133,6 +136,11 @@ public BigQuerySinkTask(BigQuery testBigQuery, SchemaRetriever schemaRetriever,

@Override
public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
if (upsertDelete) {
throw new ConnectException("This connector cannot perform upsert/delete on older versions of "
+ "the Connect framework; please upgrade to version 0.10.2.0 or later");
}

try {
executor.awaitCurrentTasks();
} catch (InterruptedException err) {
Expand Down Expand Up @@ -164,6 +172,11 @@ private PartitionedTableId getRecordTable(SinkRecord record) {

TableId baseTableId = topicsToBaseTableIds.get(record.topic());
if (upsertDelete) {

// Notify the schema retriever of the schema for the destination table (it will be notified
// for the intermediate table in the put() loop)
recordLastSeenSchemas(baseTableId, record);

TableId intermediateTableId = mergeBatches.intermediateTableFor(baseTableId);
// If upsert/delete is enabled, we want to stream into a non-partitioned intermediate table
return new PartitionedTableId.Builder(intermediateTableId).build();
Expand Down Expand Up @@ -225,6 +238,7 @@ private Map<String, Object> getUpsertDeleteRow(SinkRecord record, TableId table)

result.put(MergeQueries.INTERMEDIATE_TABLE_KEY_FIELD_NAME, convertedKey);
result.put(MergeQueries.INTERMEDIATE_TABLE_VALUE_FIELD_NAME, convertedValue);
result.put(MergeQueries.INTERMEDIATE_TABLE_ITERATION_FIELD_NAME, totalBatchSize);
if (usePartitionDecorator && useMessageTimeDatePartitioning) {
if (record.timestampType() == TimestampType.NO_TIMESTAMP_TYPE) {
throw new ConnectException(
Expand Down Expand Up @@ -262,6 +276,15 @@ private String getRowId(SinkRecord record) {
record.kafkaOffset());
}

private void recordLastSeenSchemas(TableId table, SinkRecord record) {
if (schemaRetriever != null) {
schemaRetriever.setLastSeenSchema(
table, record.topic(), record.keySchema(), KafkaSchemaRecordType.KEY);
schemaRetriever.setLastSeenSchema(
table, record.topic(), record.valueSchema(), KafkaSchemaRecordType.VALUE);
}
}

@Override
public void put(Collection<SinkRecord> records) {
if (upsertDelete) {
Expand All @@ -277,11 +300,7 @@ public void put(Collection<SinkRecord> records) {
for (SinkRecord record : records) {
if (record.value() != null || config.getBoolean(config.DELETE_ENABLED_CONFIG)) {
PartitionedTableId table = getRecordTable(record);
if (schemaRetriever != null) {
schemaRetriever.setLastSeenSchema(table.getBaseTableId(),
record.topic(),
record.valueSchema());
}
recordLastSeenSchemas(table.getBaseTableId(), record);

if (!tableWriterBuilders.containsKey(table)) {
TableWriterBuilder tableWriterBuilder;
Expand Down Expand Up @@ -515,33 +534,36 @@ private void maybeStartMergeFlushTask() {

@Override
public void stop() {
maybeStopExecutor(loadExecutor, "load executor");
maybeStopExecutor(executor, "table write executor");
if (upsertDelete) {
mergeBatches.intermediateTables().forEach(table -> {
logger.debug("Deleting {}", intTable(table));
getBigQuery().delete(table);
});
}

logger.trace("task.stop()");
}

private void maybeStopExecutor(ExecutorService executor, String executorName) {
if (executor == null) {
return;
}

try {
if (upsertDelete) {
mergeBatches.intermediateTables().forEach(table -> {
logger.debug("Deleting intermediate table {}", table);
getBigQuery().delete(table);
});
}
} finally {
try {
logger.trace("Forcibly shutting down {}", executorName);
executor.shutdownNow();
} else {
logger.trace("Requesting shutdown for {}", executorName);
executor.shutdown();
executor.awaitTermination(EXECUTOR_SHUTDOWN_TIMEOUT_SEC, TimeUnit.SECONDS);
if (loadExecutor != null) {
try {
logger.info("Attempting to shut down load executor.");
loadExecutor.shutdown();
loadExecutor.awaitTermination(EXECUTOR_SHUTDOWN_TIMEOUT_SEC, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
logger.warn("Could not shut down load executor within {}s.",
EXECUTOR_SHUTDOWN_TIMEOUT_SEC);
}
}
} catch (InterruptedException ex) {
logger.warn("{} active threads are still executing tasks {}s after shutdown is signaled.",
executor.getActiveCount(), EXECUTOR_SHUTDOWN_TIMEOUT_SEC);
} finally {
logger.trace("task.stop()");
}
logger.trace("Awaiting termination of {}", executorName);
executor.awaitTermination(EXECUTOR_SHUTDOWN_TIMEOUT_SEC, TimeUnit.SECONDS);
logger.trace("Shut down {} successfully", executorName);
} catch (Exception e) {
logger.warn("Failed to shut down {}", executorName, e);
}
}

Expand Down
Loading

0 comments on commit 7563971

Please sign in to comment.