Skip to content

Commit

Permalink
Merge pull request #250 from Avishka-Shamendra/master
Browse files Browse the repository at this point in the history
Add System Property to Allow Null Parameters with CUD Functions
  • Loading branch information
AnuGayan authored Mar 26, 2024
2 parents cdd6fed + 9daf936 commit 6d96f6a
Show file tree
Hide file tree
Showing 3 changed files with 187 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,14 @@
"perform CUD operations.",
defaultValue = "false",
possibleParameters = {"true", "false"}
),
@SystemParameter(
name = "allow.null.params.with.CUD",
description = "When set to 'true', this parameter allows the RDBMS CUD function to accept" +
"parameters with NULL values. " +
"When set to 'false', NULL parameters will not be allowed with CUD functions.",
defaultValue = "false",
possibleParameters = {"true", "false"}
)
},
returnAttributes = {
Expand Down Expand Up @@ -208,6 +216,7 @@ public class CUDStreamProcessor extends StreamProcessor<State> {
private List<Attribute> attributeList = new ArrayList<>();
private String transactionCorrelationId;
private boolean enableCudOperationAutocommit = true;
private boolean allowNullValuedParams;

@Override
protected StateFactory<State> init(MetaStreamEvent metaStreamEvent, AbstractDefinition inputDefinition,
Expand All @@ -216,6 +225,8 @@ protected StateFactory<State> init(MetaStreamEvent metaStreamEvent, AbstractDefi
boolean findToBeExecuted, SiddhiQueryContext siddhiQueryContext) {
boolean performCUDOps = Boolean.parseBoolean(
configReader.readConfig("perform.CUD.operations", "false"));
allowNullValuedParams = Boolean.parseBoolean(
configReader.readConfig("allow.null.params.with.CUD", "false"));
if (!performCUDOps) {
throw new SiddhiAppValidationException("Performing CUD operations through " +
"rdbms cud function is disabled. This is configured through system parameter, " +
Expand Down Expand Up @@ -330,9 +341,14 @@ protected void process(ComplexEventChunk<StreamEvent> streamEventChunk, Processo
}
for (int i = 0; i < this.expressionExecutors.size(); i++) {
ExpressionExecutor attributeExpressionExecutor = this.expressionExecutors.get(i);
Object value = attributeExpressionExecutor.execute(event);
if (value == null && !allowNullValuedParams) {
throw new SiddhiAppValidationException("Null values have been detected " +
"in the parameters passed to the CUD function. " +
"CUD functions do not permit null parameters.");
}
RDBMSStreamProcessorUtil.populateStatementWithSingleElement(stmt, i + 1,
attributeExpressionExecutor.getReturnType(),
attributeExpressionExecutor.execute(event));
attributeExpressionExecutor.getReturnType(), value);
}
}
stmt.addBatch();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,11 @@ public static HikariDataSource getDataSourceService(String dataSourceName) {
*/
public static void populateStatementWithSingleElement(PreparedStatement stmt, int ordinal, Attribute.Type type,
Object value) throws SQLException {
// Handle 'null' valued params separately
if (value == null) {
stmt.setObject(ordinal, null);
return;
}
switch (type) {
case BOOL:
stmt.setBoolean(ordinal, (Boolean) value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -551,4 +551,168 @@ public void receive(Event[] events) {
siddhiAppRuntime.shutdown();
((HikariDataSource) dataSource).close();
}


@Test()
public void rdbmsCUDNullParamTest1() throws InterruptedException {
log.info("rdbmsCUDNullParamTest1 - Test allow.null.params.with.CUD property behavior. " +
"When property is 'true', parametrized cud function with " +
"null value should be accepted");

String databaseType = System.getenv("DATABASE_TYPE");
if (databaseType == null) {
databaseType = RDBMSTableTestUtils.TestType.H2.toString();
}
RDBMSTableTestUtils.TestType type = RDBMSTableTestUtils.TestType.valueOf(databaseType);

YAMLConfigManager yamlConfigManager = new YAMLConfigManager(
"extensions: \n" +
" - extension: \n" +
" namespace: rdbms\n" +
" name: cud\n" +
" properties:\n" +
" allow.null.params.with.CUD: true\n" +
" perform.CUD.operations: true");

SiddhiManager siddhiManager = new SiddhiManager();
siddhiManager.setConfigManager(yamlConfigManager);

DataSource dataSource = RDBMSTableTestUtils.initDataSource();
siddhiManager.setDataSource("TEST_DATASOURCE", dataSource);

String definitions = "" +
"define stream InsertStream(symbol string, price float, volume long);\n" +
"\n" +
"@Store(type=\"rdbms\", jdbc.url=\"" + url + "\", jdbc.driver.name=\"" + driverClassName + "\"," +
"username=\"" + user + "\", password=\"" + password + "\", pool.properties=\"maximumPoolSize:1\")" +
"define table " + TABLE_NAME + " (symbol string, price float, volume long); " +
"\n";

String parameterizedSqlQuery;
boolean isOracle11 = false;
if (type.equals(RDBMSTableTestUtils.TestType.ORACLE)) {
parameterizedSqlQuery = "INSERT INTO " + TABLE_NAME + "(symbol, price, volume) VALUES (?,?,?)";
isOracle11 = Boolean.parseBoolean(System.getenv("IS_ORACLE_11"));
} else {
parameterizedSqlQuery = "INSERT INTO " + TABLE_NAME + "(symbol, price, volume) VALUES (?,?,?);";
}
if (!type.equals(RDBMSTableTestUtils.TestType.ORACLE)) {
parameterizedSqlQuery = parameterizedSqlQuery.concat(";");
}

String parameterizedCud = "" +
"from InsertStream#rdbms:cud(\"TEST_DATASOURCE\", \"" + parameterizedSqlQuery +
"\", symbol, price, volume) " +
"select numRecords " +
"insert into OutputStream ;" +
"\n";

SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(definitions + parameterizedCud);
InputHandler insertStream = siddhiAppRuntime.getInputHandler("InsertStream");
siddhiAppRuntime.start();

siddhiAppRuntime.addCallback("OutputStream", new StreamCallback() {
@Override
public void receive(Event[] events) {
EventPrinter.print(events);
for (Event event : events) {
isEventArrived = true;
eventCount.incrementAndGet();
actualData.add(event.getData());
}
}
});

insertStream.send(new Object[]{"X", 1.0f, 30L});
SiddhiTestHelper.waitForEvents(2000, 1, eventCount, 60000);
siddhiAppRuntime.shutdown();
((HikariDataSource) dataSource).close();

Assert.assertTrue(isEventArrived, "Event Not Arrived");
Assert.assertEquals(eventCount.get(), 1, "Event count did not match");

if (isOracle11) {
Assert.assertEquals(actualData.get(0)[0], -2);
} else {
Assert.assertEquals(actualData.get(0)[0], 1);
}
}

@Test()
public void rdbmsCUDNullParamTest2() throws InterruptedException {
log.info("rdbmsCUDNullParamTest2 - Test allow.null.params.with.CUD property behavior. " +
"When property is 'false', parametrized cud functions with " +
"null value should be rejected");

String databaseType = System.getenv("DATABASE_TYPE");
if (databaseType == null) {
databaseType = RDBMSTableTestUtils.TestType.H2.toString();
}
RDBMSTableTestUtils.TestType type = RDBMSTableTestUtils.TestType.valueOf(databaseType);

YAMLConfigManager yamlConfigManager = new YAMLConfigManager(
"extensions: \n" +
" - extension: \n" +
" namespace: rdbms\n" +
" name: cud\n" +
" properties:\n" +
" perform.CUD.operations: true");

SiddhiManager siddhiManager = new SiddhiManager();
siddhiManager.setConfigManager(yamlConfigManager);

DataSource dataSource = RDBMSTableTestUtils.initDataSource();
siddhiManager.setDataSource("TEST_DATASOURCE", dataSource);

String definitions = "" +
"define stream InsertStream(symbol string, price float, volume long);\n" +
"\n" +
"@Store(type=\"rdbms\", jdbc.url=\"" + url + "\", jdbc.driver.name=\"" + driverClassName + "\"," +
"username=\"" + user + "\", password=\"" + password + "\", pool.properties=\"maximumPoolSize:1\")" +
"define table " + TABLE_NAME + " (symbol string, price float, volume long); " +
"\n";

String parameterizedSqlQuery;
if (type.equals(RDBMSTableTestUtils.TestType.ORACLE)) {
parameterizedSqlQuery = "INSERT INTO " + TABLE_NAME + "(symbol, price, volume) VALUES (?,?,?)";
} else {
parameterizedSqlQuery = "INSERT INTO " + TABLE_NAME + "(symbol, price, volume) VALUES (?,?,?);";
}
if (!type.equals(RDBMSTableTestUtils.TestType.ORACLE)) {
parameterizedSqlQuery = parameterizedSqlQuery.concat(";");
}

String parameterizedCud = "" +
"from InsertStream#rdbms:cud(\"TEST_DATASOURCE\", \"" + parameterizedSqlQuery +
"\", symbol, price, volume) " +
"select numRecords " +
"insert into OutputStream ;" +
"\n";

SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(definitions + parameterizedCud);
InputHandler insertStream = siddhiAppRuntime.getInputHandler("InsertStream");
siddhiAppRuntime.start();

siddhiAppRuntime.addCallback("OutputStream", new StreamCallback() {
@Override
public void receive(Event[] events) {
EventPrinter.print(events);
for (Event event : events) {
isEventArrived = true;
eventCount.incrementAndGet();
actualData.add(event.getData());
}
}
});

insertStream.send(new Object[]{"Y", 1.0f, null});
SiddhiTestHelper.waitForEvents(2000, 1, eventCount, 6000);
siddhiAppRuntime.shutdown();
((HikariDataSource) dataSource).close();

// Event should not arrive and 0 events should be received
Assert.assertFalse(isEventArrived, "Event Arrived");
Assert.assertEquals(eventCount.get(), 0, "Event count did not match");
}

}

0 comments on commit 6d96f6a

Please sign in to comment.