Skip to content

Commit

Permalink
Removing remaining database triggers
Browse files Browse the repository at this point in the history
Signed-off-by: Andrea Lamparelli <[email protected]>
  • Loading branch information
lampajr committed Jan 10, 2025
1 parent 241a084 commit e068554
Show file tree
Hide file tree
Showing 8 changed files with 279 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,8 @@ WHERE ds.id IN (SELECT id FROM ids)
LEFT JOIN label_values lv ON dataset.id = lv.dataset_id
LEFT JOIN label ON label.id = label_id
""";

//@formatter:on

@Inject
EntityManager em;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,25 +95,34 @@ public class RunServiceImpl implements RunService {

//@formatter:off
private static final String FIND_AUTOCOMPLETE = """
SELECT * FROM (
SELECT DISTINCT jsonb_object_keys(q) AS key
FROM run, jsonb_path_query(run.data, ? ::jsonpath) q
WHERE jsonb_typeof(q) = 'object') AS keys
WHERE keys.key LIKE CONCAT(?, '%');
""";
protected static final String FIND_RUNS_WITH_URI = """
SELECT id, testid
FROM run
WHERE NOT trashed
AND (data->>'$schema' = ?1
OR (CASE
WHEN jsonb_typeof(data) = 'object' THEN ?1 IN (SELECT values.value->>'$schema' FROM jsonb_each(data) as values)
WHEN jsonb_typeof(data) = 'array' THEN ?1 IN (SELECT jsonb_array_elements(data)->>'$schema')
ELSE false
END)
OR (metadata IS NOT NULL AND ?1 IN (SELECT jsonb_array_elements(metadata)->>'$schema'))
)
""";
SELECT * FROM (
SELECT DISTINCT jsonb_object_keys(q) AS key
FROM run, jsonb_path_query(run.data, ? ::jsonpath) q
WHERE jsonb_typeof(q) = 'object') AS keys
WHERE keys.key LIKE CONCAT(?, '%');
""";
private static final String FIND_RUNS_WITH_URI = """
SELECT id, testid
FROM run
WHERE NOT trashed
AND (data->>'$schema' = ?1
OR (CASE
WHEN jsonb_typeof(data) = 'object' THEN ?1 IN (SELECT values.value->>'$schema' FROM jsonb_each(data) as values)
WHEN jsonb_typeof(data) = 'array' THEN ?1 IN (SELECT jsonb_array_elements(data)->>'$schema')
ELSE false
END)
OR (metadata IS NOT NULL AND ?1 IN (SELECT jsonb_array_elements(metadata)->>'$schema'))
)
""";

private static final String UPDATE_DATASET_SCHEMAS = """
WITH uris AS (
SELECT jsonb_array_elements(ds.data)->>'$schema' AS uri FROM dataset ds WHERE ds.id = ?1
), indexed as (
SELECT uri, row_number() over () - 1 as index FROM uris
) INSERT INTO dataset_schemas(dataset_id, uri, index, schema_id)
SELECT ?1 as dataset_id, indexed.uri, indexed.index, schema.id FROM indexed JOIN schema ON schema.uri = indexed.uri;
""";
//@formatter:on
private static final String[] CONDITION_SELECT_TERMINAL = { "==", "!=", "<>", "<", "<=", ">", ">=", " " };
private static final String CHANGE_ACCESS = "UPDATE run SET owner = ?, access = ? WHERE id = ?";
Expand Down Expand Up @@ -188,46 +197,71 @@ void onNewOrUpdatedSchema(int schemaId) {
log.errorf("Cannot process schema add/update: cannot load schema %d", schemaId);
return;
}
processNewOrUpdatedSchema(schema);
}

@Transactional
void processNewOrUpdatedSchema(SchemaDAO schema) {
// we don't have to care about races with new runs
clearRunAndDatasetSchemas(schemaId);
findRunsWithUri(schema.uri, (runId, testId) -> {
log.debugf("Recalculate Datasets for run %d - schema %d (%s) changed", runId, schema.id, schema.uri);
onNewOrUpdatedSchemaForRun(runId, schema.id);
});
}

void findRunsWithUri(String uri, BiConsumer<Integer, Integer> consumer) {
ScrollableResults<RunFromUri> results = session.createNativeQuery(FIND_RUNS_WITH_URI, Tuple.class).setParameter(1, uri)
try (ScrollableResults<RunFromUri> results = session.createNativeQuery(FIND_RUNS_WITH_URI, Tuple.class)
.setParameter(1, uri)
.setTupleTransformer((tuple, aliases) -> {
RunFromUri r = new RunFromUri();
r.id = (int) tuple[0];
r.testId = (int) tuple[1];
return r;
})
.setFetchSize(100)
.scroll(ScrollMode.FORWARD_ONLY);
while (results.next()) {
RunFromUri r = results.get();
consumer.accept(r.id, r.testId);
.scroll(ScrollMode.FORWARD_ONLY)) {
while (results.next()) {
RunFromUri r = results.get();
consumer.accept(r.id, r.testId);
}
}
}

/**
* Keep the run_schemas table up to date with the associated schemas
* If `recalculate` is true, trigger the run recalculation as well.
* This is not required when creating a new run as the datasets will be
* created automatically by the process, the recalculation is required when updating
* the Schema
* @param runId id of the run
* @param schemaId id of the schema
*/
@WithRoles(extras = Roles.HORREUM_SYSTEM)
@Transactional
void onNewOrUpdatedSchemaForRun(int runId, int schemaId) {
em.createNativeQuery("SELECT update_run_schemas(?1)::text").setParameter(1, runId).getSingleResult();
//clear validation error tables by schemaId
updateRunSchemas(runId);

// clear validation error tables by schemaId
em.createNativeQuery("DELETE FROM dataset_validationerrors WHERE schema_id = ?1")
.setParameter(1, schemaId).executeUpdate();
em.createNativeQuery("DELETE FROM run_validationerrors WHERE schema_id = ?1")
.setParameter(1, schemaId).executeUpdate();

Util.registerTxSynchronization(tm, txStatus -> mediator.queueRunRecalculation(runId));
// transform(runId, true);
}

@Transactional
void updateRunSchemas(int runId) {
em.createNativeQuery("SELECT update_run_schemas(?1)::text").setParameter(1, runId).getSingleResult();
}

@Transactional
public void updateDatasetSchemas(int datasetId) {
em.createNativeQuery(UPDATE_DATASET_SCHEMAS).setParameter(1, datasetId).executeUpdate();
}

@Transactional
void clearRunAndDatasetSchemas(int schemaId) {
// clear old run and dataset schemas associations
em.createNativeQuery("DELETE FROM run_schemas WHERE schemaid = ?1")
.setParameter(1, schemaId).executeUpdate();
em.createNativeQuery("DELETE FROM dataset_schemas WHERE schema_id = ?1")
.setParameter(1, schemaId).executeUpdate();
}

@PermitAll
Expand Down Expand Up @@ -336,13 +370,13 @@ public JsonNode getMetadata(int id, String schemaUri) {
@Override
// TODO: it would be nicer to use @FormParams but fetchival on client side doesn't support that
public void updateAccess(int id, String owner, Access access) {
Query query = em.createNativeQuery(CHANGE_ACCESS);
query.setParameter(1, owner);
query.setParameter(2, access.ordinal());
query.setParameter(3, id);
if (query.executeUpdate() != 1) {
int updatedRecords = RunDAO.update("owner = ?1, access = ?2 WHERE id = ?3", owner, access, id);
if (updatedRecords != 1) {
throw ServiceException.serverError("Access change failed (missing permissions?)");
}

// propagate the same change to all datasets belonging to the run
DatasetDAO.update("owner = ?1, access = ?2 WHERE run.id = ?3", owner, access, id);
}

@RolesAllowed(Roles.UPLOADER)
Expand Down Expand Up @@ -670,6 +704,7 @@ public RunPersistence addAuthenticated(RunDAO run, TestDAO test) {
}
log.debugf("Upload flushed, run ID %d", run.id);

updateRunSchemas(run.id);
mediator.newRun(RunMapper.from(run));
List<Integer> datasetIds = transform(run.id, false);
if (mediator.testMode())
Expand Down Expand Up @@ -991,6 +1026,7 @@ private void trashInternal(int id, boolean trashed) {
run.trashed = false;
run.persistAndFlush();
transform(id, true);
updateRunSchemas(run.id);
} else
throw ServiceException.badRequest("Not possible to un-trash a run that's not referenced to a Test");
}
Expand All @@ -1017,7 +1053,8 @@ public void updateDescription(int id, String description) {
throw ServiceException.notFound("Run not found: " + id);
}
run.description = description;
run.persistAndFlush();
// propagate the same change to all datasets belonging to the run
DatasetDAO.update("description = ?1 WHERE run.id = ?2", description, run.id);
}

@RolesAllowed(Roles.TESTER)
Expand Down Expand Up @@ -1071,7 +1108,7 @@ public Map<Integer, String> updateSchema(int id, String path, String schemaUri)
.distinct()
.collect(
Collectors.toMap(
tuple -> ((Integer) tuple.get("key")).intValue(),
tuple -> (Integer) tuple.get("key"),
tuple -> ((String) tuple.get("value"))));

em.flush();
Expand Down Expand Up @@ -1377,6 +1414,9 @@ List<Integer> transform(int runId, boolean isRecalculation) {
*/
private Integer createDataset(DatasetDAO ds, boolean isRecalculation) {
ds.persistAndFlush();
// re-create the dataset_schemas associations
updateDatasetSchemas(ds.id);

if (isRecalculation) {
try {
Dataset.EventNew event = new Dataset.EventNew(DatasetMapper.from(ds), true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@
public class SchemaServiceImpl implements SchemaService {
private static final Logger log = Logger.getLogger(SchemaServiceImpl.class);

//@formatter:off
private static final String FETCH_SCHEMAS_RECURSIVE =
//@formatter:off
private static final String FETCH_SCHEMAS_RECURSIVE =
"""
WITH RECURSIVE refs(uri) AS
(
Expand All @@ -86,7 +86,7 @@ SELECT substring(jsonb_path_query(schema, '$.**.\"$ref\" ? (! (@ starts with \"#
FROM schema
INNER JOIN refs ON schema.uri = refs.uri
""";
//@formatter:on
//@formatter:on

private static final JsonSchemaFactory JSON_SCHEMA_FACTORY = new JsonSchemaFactory.Builder()
.defaultMetaSchemaIri(JsonMetaSchema.getV4().getIri())
Expand Down Expand Up @@ -160,13 +160,6 @@ public Integer add(Schema schemaDTO) {
em.flush();
if (!Objects.equals(schema.uri, existing.uri) ||
Objects.equals(schema.schema, existing.schema)) {
//We need to delete from run_schemas and dataset_schemas as they will be recreated
//when we create new datasets psql will still create new entries in dataset_schemas
// https://github.com/Hyperfoil/Horreum/blob/master/horreum-backend/src/main/resources/db/changeLog.xml#L2522
em.createNativeQuery("DELETE FROM run_schemas WHERE schemaid = ?1")
.setParameter(1, schema.id).executeUpdate();
em.createNativeQuery("DELETE FROM dataset_schemas WHERE schema_id = ?1")
.setParameter(1, schema.id).executeUpdate();
newOrUpdatedSchema(schema);
}
} else {
Expand Down Expand Up @@ -710,7 +703,7 @@ public Integer addOrUpdateLabel(int schemaId, Label labelDTO) {
}
existing.name = label.name;

//When we clear extractors we should also delete label_values
// when we clear extractors we should also delete label_values
em.createNativeQuery(
"DELETE FROM dataset_view WHERE dataset_id IN (SELECT dataset_id FROM label_values WHERE label_id = ?1)")
.setParameter(1, existing.id).executeUpdate();
Expand Down Expand Up @@ -865,7 +858,7 @@ public void importSchema(ObjectNode node) {
}

boolean newSchema = true;
SchemaDAO schema = null;
SchemaDAO schema;
if (importSchema.id != null) {
//first check if this schema exists
schema = SchemaDAO.findById(importSchema.id);
Expand Down Expand Up @@ -917,6 +910,7 @@ public void importSchema(ObjectNode node) {
//let's wrap flush in a try/catch, if we get any role issues at commit we can give a sane msg
try {
em.flush();
newOrUpdatedSchema(schema);
} catch (Exception e) {
throw ServiceException.serverError("Failed to persist Schema: " + e.getMessage());
}
Expand Down
44 changes: 44 additions & 0 deletions horreum-backend/src/main/resources/db/changeLog.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4796,4 +4796,48 @@
$$ LANGUAGE plpgsql;
</sql>
</changeSet>
<changeSet id="128" author="lampajr">
<validCheckSum>ANY</validCheckSum>
<sql>
-- drop triggers
DROP TRIGGER IF EXISTS rs_after_run_untrash ON run;
DROP TRIGGER IF EXISTS rs_after_run_update ON run;
DROP TRIGGER IF EXISTS before_schema_update ON schema;
DROP TRIGGER IF EXISTS ds_after_insert ON dataset;

-- drop functions
DROP FUNCTION rs_after_run_update;
DROP FUNCTION before_schema_update_func;
DROP FUNCTION ds_after_dataset_insert_func;
</sql>
</changeSet>
<changeSet id="129" author="lampajr">
<validCheckSum>ANY</validCheckSum>
<sql>
-- drop triggers
DROP TRIGGER IF EXISTS after_run_update_non_data ON run;
DROP TRIGGER IF EXISTS delete_run_validations ON run;

-- drop functions
DROP FUNCTION after_run_update_non_data_func;
DROP FUNCTION delete_run_validations;
</sql>
</changeSet>
<changeSet id="130" author="lampajr">
<validCheckSum>ANY</validCheckSum>
<sql>
-- drop triggers
DROP TRIGGER IF EXISTS lv_before_update ON label;
DROP TRIGGER IF EXISTS lv_after_update ON label;
DROP TRIGGER IF EXISTS recalc_labels ON label_recalc_queue;

-- drop functions
DROP FUNCTION lv_before_label_update_func;
DROP FUNCTION lv_after_label_update_func;
DROP FUNCTION recalc_label_values;

-- drop table as no longer used
DROP TABLE label_recalc_queue;
</sql>
</changeSet>
</databaseChangeLog>
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ public void testMissingRules(TestInfo info) throws InterruptedException {
em.clear();

pollMissingDataRuleResultsByDataset(thirdEvent.datasetId, 1);
trashRun(thirdRunId, test.id);
trashRun(thirdRunId, test.id, true);
pollMissingDataRuleResultsByDataset(thirdEvent.datasetId, 0);

alertingService.checkMissingDataset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,16 +237,6 @@ public static Test createExampleTest(String testName, Integer datastoreID) {
return test;
}

public static List<View> createExampleViews(int testId) {
View defaultView = new View();
defaultView.name = "Default";
defaultView.testId = testId;
defaultView.components = new ArrayList<>();
defaultView.components.add(new io.hyperfoil.tools.horreum.api.data.ViewComponent("Some column", null, "foo"));

return Collections.singletonList(defaultView);
}

public static String getAccessToken(String userName, String... groups) {
return Jwt.preferredUserName(userName)
.groups(new HashSet<>(Arrays.asList(groups)))
Expand Down Expand Up @@ -616,10 +606,12 @@ protected ArrayNode jsonArray(String... items) {
return array;
}

protected BlockingQueue<Integer> trashRun(int runId, Integer testId) throws InterruptedException {
protected BlockingQueue<Integer> trashRun(int runId, Integer testId, boolean trashed) throws InterruptedException {
BlockingQueue<Integer> trashedQueue = serviceMediator.getEventQueue(AsyncEventChannels.RUN_TRASHED, testId);
jsonRequest().post("/api/run/" + runId + "/trash").then().statusCode(204);
assertEquals(runId, trashedQueue.poll(10, TimeUnit.SECONDS));
jsonRequest().post("/api/run/" + runId + "/trash?isTrashed=" + trashed).then().statusCode(204);
if (trashed) {
assertEquals(runId, trashedQueue.poll(10, TimeUnit.SECONDS));
}
return trashedQueue;
}

Expand Down
Loading

0 comments on commit e068554

Please sign in to comment.