Skip to content

Commit

Permalink
Changes to all integration tests to move the common logic of creating…
Browse files Browse the repository at this point in the history
… connecting to a separate function.
  • Loading branch information
subkanthi committed Jan 27, 2025
1 parent b8285ef commit a5a2bb4
Show file tree
Hide file tree
Showing 59 changed files with 186 additions and 392 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.altinity.clickhouse.debezium.embedded.common.PropertiesHelper;
import com.altinity.clickhouse.debezium.embedded.config.SinkConnectorLightWeightConfig;
import com.altinity.clickhouse.debezium.embedded.ddl.parser.DDLParserService;
import com.altinity.clickhouse.debezium.embedded.ddl.parser.MySQLDDLParserService;
import com.altinity.clickhouse.debezium.embedded.parser.DebeziumRecordParserService;
import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
Expand All @@ -17,7 +16,6 @@
import com.altinity.clickhouse.sink.connector.executor.DebeziumOffsetManagement;
import com.altinity.clickhouse.sink.connector.model.ClickHouseStruct;
import com.altinity.clickhouse.sink.connector.model.DBCredentials;
import com.clickhouse.jdbc.ClickHouseConnection;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.debezium.config.CommonConnectorConfig;
Expand Down Expand Up @@ -198,7 +196,8 @@ private String getDatabaseName(SourceRecord sr) {
private BaseDbWriter createWriter(ClickHouseSinkConnectorConfig config, String databaseName) {
DBCredentials dbCredentials = parseDBConfiguration(config);
String jdbcUrl = BaseDbWriter.getConnectionString(dbCredentials.getHostName(), dbCredentials.getPort(), databaseName);
Connection conn = BaseDbWriter.createConnection(jdbcUrl, "Client_1", dbCredentials.getUserName(), dbCredentials.getPassword(), config);
Connection conn = BaseDbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME, dbCredentials.getUserName(), dbCredentials.getPassword(),
databaseName, config);
return new BaseDbWriter(dbCredentials.getHostName(), dbCredentials.getPort(), databaseName, dbCredentials.getUserName(), dbCredentials.getPassword(), config, conn);
}

Expand Down Expand Up @@ -384,18 +383,19 @@ private void createDatabaseForDebeziumStorage(ClickHouseSinkConnectorConfig conf
DBCredentials dbCredentials = parseDBConfiguration(config);

String jdbcUrl = BaseDbWriter.getConnectionString(dbCredentials.getHostName(), dbCredentials.getPort(),
"system");
Connection conn = BaseDbWriter.createConnection(jdbcUrl, "Client_1",dbCredentials.getUserName(), dbCredentials.getPassword(), config);
BaseDbWriter.SYSTEM_DB);
Connection conn = BaseDbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME,
dbCredentials.getUserName(), dbCredentials.getPassword(), BaseDbWriter.SYSTEM_DB, config);
BaseDbWriter writer = new BaseDbWriter(dbCredentials.getHostName(), dbCredentials.getPort(),
"system", dbCredentials.getUserName(),
BaseDbWriter.SYSTEM_DB, dbCredentials.getUserName(),
dbCredentials.getPassword(), config, conn);

Pair<String, String> tableNameDatabaseName = getDebeziumOffsetStorageDatabaseName(props);
String databaseName = tableNameDatabaseName.getRight();

String createDbQuery = String.format("create database if not exists %s", databaseName);
log.info("CREATING DEBEZIUM STORAGE Database: " + createDbQuery);
writer.executeQuery(createDbQuery);
writer.executeSystemQuery(createDbQuery);

break;
} catch (Exception e) {
Expand Down Expand Up @@ -423,13 +423,14 @@ private void createSchemaHistoryTable(ClickHouseSinkConnectorConfig config, Prop
DBCredentials dbCredentials = parseDBConfiguration(config);
String jdbcUrl = BaseDbWriter.getConnectionString(dbCredentials.getHostName(), dbCredentials.getPort(),
"system");
Connection conn = BaseDbWriter.createConnection(jdbcUrl, "Client_1",dbCredentials.getUserName(), dbCredentials.getPassword(), config);
Connection conn = BaseDbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME,
dbCredentials.getUserName(), dbCredentials.getPassword(), BaseDbWriter.SYSTEM_DB, config);
BaseDbWriter writer = new BaseDbWriter(dbCredentials.getHostName(), dbCredentials.getPort(),
"system", dbCredentials.getUserName(),
dbCredentials.getPassword(), config, conn);

try {
writer.executeQuery(createSchemaHistoryTable);
writer.executeSystemQuery(createSchemaHistoryTable);
} catch(Exception e) {
log.error("Error creating schema history table", e);
}
Expand All @@ -450,7 +451,8 @@ private void createViewForShowReplicaStatus(ClickHouseSinkConnectorConfig config

String jdbcUrl = BaseDbWriter.getConnectionString(dbCredentials.getHostName(), dbCredentials.getPort(),
"system");
Connection conn = BaseDbWriter.createConnection(jdbcUrl, "Client_1",dbCredentials.getUserName(), dbCredentials.getPassword(), config);
Connection conn = BaseDbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME,
dbCredentials.getUserName(), dbCredentials.getPassword(), BaseDbWriter.SYSTEM_DB, config);
BaseDbWriter writer = new BaseDbWriter(dbCredentials.getHostName(), dbCredentials.getPort(),
"system", dbCredentials.getUserName(),
dbCredentials.getPassword(), config, conn);
Expand All @@ -463,7 +465,7 @@ private void createViewForShowReplicaStatus(ClickHouseSinkConnectorConfig config
// Remove quotes.
formattedView = formattedView.replace("\"", "");
try {
writer.executeQuery(formattedView);
writer.executeSystemQuery(formattedView);
} catch(Exception e) {
log.error("**** Error creating VIEW **** " + formattedView);
}
Expand Down Expand Up @@ -540,8 +542,9 @@ public String getDebeziumStorageStatus(ClickHouseSinkConnectorConfig config, Pro
log.error("**** Connection to ClickHouse is not established, re-initiating ****");
String jdbcUrl = BaseDbWriter.getConnectionString(dbCredentials.getHostName(), dbCredentials.getPort(),
databaseName);
Connection conn = BaseDbWriter.createConnection(jdbcUrl, "Client_1",
dbCredentials.getUserName(), dbCredentials.getPassword(), config);
Connection conn = BaseDbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME,
dbCredentials.getUserName(), dbCredentials.getPassword(),
BaseDbWriter.SYSTEM_DB, config);
writer = new BaseDbWriter(dbCredentials.getHostName(), dbCredentials.getPort(),
databaseName, dbCredentials.getUserName(),
dbCredentials.getPassword(), config, conn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void deleteOffsetStorageRow(String offsetKey,

// String connectorName = config.getString("connector.name");
String debeziumStorageStatusQuery = String.format("delete from %s where offset_key='%s'" , tableName, offsetKey);
writer.executeQuery(debeziumStorageStatusQuery);
writer.executeSystemQuery(debeziumStorageStatusQuery);
}

/**
Expand All @@ -67,7 +67,7 @@ public void deleteSchemaHistoryTable(String offsetKey,

String debeziumStorageStatusQuery = String.format("delete from `%s` where JSONExtractRaw(JSONExtractRaw(history_data,'source'), 'server')='%s'" , tableName, offsetKey);
log.info("Deleting schema history table query: " + debeziumStorageStatusQuery);
writer.executeQuery(debeziumStorageStatusQuery);
writer.executeSystemQuery(debeziumStorageStatusQuery);
}
/**
* Function to get the latest timestamp of the record in the table
Expand All @@ -81,7 +81,7 @@ public String getDebeziumLatestRecordTimestamp(Properties props, BaseDbWriter wr
JdbcOffsetBackingStoreConfig.PROP_TABLE_NAME.name());

String debeziumLatestRecordTimestampQuery = String.format("select max(record_insert_ts) from %s" , tableName);
return writer.executeQuery(debeziumLatestRecordTimestampQuery);
return writer.executeSystemQuery(debeziumLatestRecordTimestampQuery);
}

public String getDebeziumStorageStatusQuery(
Expand All @@ -92,7 +92,7 @@ public String getDebeziumStorageStatusQuery(
String offsetKey = getOffsetKey(props);
// String connectorName = config.getString("connector.name");
String debeziumStorageStatusQuery = String.format("select offset_val from %s where offset_key='%s'" , tableName, offsetKey);
return writer.executeQuery(debeziumStorageStatusQuery);
return writer.executeSystemQuery(debeziumStorageStatusQuery);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,8 @@ public void testDecoderBufsPlugin() throws Exception {
Thread.sleep(10000);//
Thread.sleep(50000);

String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
"public");
Connection chConn = BaseDbWriter.createConnection(jdbcUrl, "Client_1",
clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>()));

BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
"public", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, chConn);
BaseDbWriter writer = ITCommon.getDBWriter(clickHouseContainer);

Map<String, String> tmColumns = writer.getColumnsDataTypesForTable("tm");
Assert.assertTrue(tmColumns.size() == 22);
Assert.assertTrue(tmColumns.get("id").equalsIgnoreCase("UUID"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,8 @@ public void testPgOutputPlugin() throws Exception {


// Create connection.
String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
"public");
Connection conn = BaseDbWriter.createConnection(jdbcUrl, "Client_1",
clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>()));

BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
"public", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, conn);
BaseDbWriter writer = ITCommon.getDBWriter(clickHouseContainer);

Map<String, String> tmColumns = writer.getColumnsDataTypesForTable("tm");
Assert.assertTrue(tmColumns.size() == 22);
Assert.assertTrue(tmColumns.get("id").equalsIgnoreCase("UUID"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@

import com.altinity.clickhouse.debezium.embedded.common.PropertiesHelper;
import com.altinity.clickhouse.debezium.embedded.config.ConfigLoader;
import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
import com.altinity.clickhouse.sink.connector.db.BaseDbWriter;

import org.testcontainers.clickhouse.ClickHouseContainer;
import org.testcontainers.containers.*;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Properties;

import java.util.HashMap;
public class ITCommon {
static public Connection connectToMySQL(MySQLContainer mySqlContainer) {
Connection conn = null;
Expand Down Expand Up @@ -196,4 +199,18 @@ static public Properties getDebeziumPropertiesForSchemaOnly(MySQLContainer mySql
props.setProperty("replica.status.view", "CREATE VIEW IF NOT EXISTS %s.show_replica_status AS SELECT now() - fromUnixTimestamp(JSONExtractUInt(offset_val, 'ts_sec')) AS seconds_behind_source, toDateTime(fromUnixTimestamp(JSONExtractUInt(offset_val, 'ts_sec')), 'UTC') AS utc_time, fromUnixTimestamp(JSONExtractUInt(offset_val, 'ts_sec')) AS local_time FROM %s settings final=1");
return props;
}


static public BaseDbWriter getDBWriter(ClickHouseContainer clickHouseContainer) {

String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
"employees");
Connection connection = BaseDbWriter.createConnection(jdbcUrl, BaseDbWriter.DATABASE_CLIENT_NAME, clickHouseContainer.getUsername(),
clickHouseContainer.getPassword(), BaseDbWriter.SYSTEM_DB, new ClickHouseSinkConnectorConfig(new HashMap<>()));

BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
"employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, connection);

return writer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,15 +126,7 @@ public void testMultipleDatabases() throws Exception {
conn.close();

// Create connection to clickhouse and validate if the tables are replicated.
String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
"system");
Connection chConn = BaseDbWriter.createConnection(jdbcUrl, "Client_1",
clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>()));

BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
"system", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, chConn);
// query clickhouse connection and get data for test_table1 and test_table2

BaseDbWriter writer = ITCommon.getDBWriter(clickHouseContainer);

ResultSet rs = writer.executeQueryWithResultSet("SELECT * FROM employees.audience");
// Validate the data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,7 @@ public void testMySQLGeneratedColumns() throws Exception {
conn.prepareStatement("insert into contacts(first_name, last_name, email) values('John', 'Doe', '[email protected]')").execute();
Thread.sleep(20000);

String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), "employees");
Connection connection = BaseDbWriter.createConnection(jdbcUrl, "client_1", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>()));

BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
"employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, connection);
BaseDbWriter writer = ITCommon.getDBWriter(clickHouseContainer);
Map<String, String> columnsToDataTypeMap = writer.getColumnsDataTypesForTable("contacts");

Assert.assertTrue(columnsToDataTypeMap.get("id").equalsIgnoreCase("Int32"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,7 @@ public void testMultipleDatabases() throws Exception {
conn.close();

// Create connection to clickhouse and validate if the tables are replicated.
String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
"system");
Connection chConn = BaseDbWriter.createConnection(jdbcUrl, "Client_1",
clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>()));

BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
"system", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, chConn);
BaseDbWriter writer = ITCommon.getDBWriter(clickHouseContainer);
// query clickhouse connection and get data for test_table1 and test_table2


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.testcontainers.utility.DockerImageName;

import java.sql.Connection;
import java.util.HashMap;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,8 @@ public void testDecoderBufsPlugin() throws Exception {
Thread.sleep(10000);//
Thread.sleep(50000);

String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
"public");
Connection chConn = BaseDbWriter.createConnection(jdbcUrl, "Client_1",
clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>()));

BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
"public", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, chConn);
BaseDbWriter writer = ITCommon.getDBWriter(clickHouseContainer);

Map<String, String> tmColumns = writer.getColumnsDataTypesForTable("tm");
Assert.assertTrue(tmColumns.size() == 22);
Assert.assertTrue(tmColumns.get("id").equalsIgnoreCase("UUID"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,23 @@

import com.altinity.clickhouse.debezium.embedded.cdc.DebeziumChangeEventCapture;
import com.altinity.clickhouse.debezium.embedded.cdc.DebeziumOffsetStorage;
import com.altinity.clickhouse.debezium.embedded.ddl.parser.MySQLDDLParserService;
import com.altinity.clickhouse.debezium.embedded.parser.SourceRecordParserService;
import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
import com.altinity.clickhouse.sink.connector.db.BaseDbWriter;
import com.clickhouse.jdbc.ClickHouseConnection;
import org.junit.Assert;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.testcontainers.Testcontainers;
import org.testcontainers.clickhouse.ClickHouseContainer;
import org.testcontainers.containers.*;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.utility.DockerImageName;

import java.sql.Connection;
import java.sql.ResultSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -115,13 +113,8 @@ public void testDecoderBufsPlugin() throws Exception {
Thread.sleep(10000);//
Thread.sleep(50000);

String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
"public");
Connection chConn = BaseDbWriter.createConnection(jdbcUrl, "Client_1",
clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>()));

BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
"public", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, chConn);
BaseDbWriter writer = ITCommon.getDBWriter(clickHouseContainer);

Map<String, String> tmColumns = writer.getColumnsDataTypesForTable("tm");
Assert.assertTrue(tmColumns.size() == 22);
Assert.assertTrue(tmColumns.get("id").equalsIgnoreCase("UUID"));
Expand Down
Loading

0 comments on commit a5a2bb4

Please sign in to comment.