diff --git a/Modules/IntentSpecification2WorkflowGenerator/api/api_main.py b/Modules/IntentSpecification2WorkflowGenerator/api/api_main.py index 91e21584..34fec948 100644 --- a/Modules/IntentSpecification2WorkflowGenerator/api/api_main.py +++ b/Modules/IntentSpecification2WorkflowGenerator/api/api_main.py @@ -63,6 +63,7 @@ def convert_strings_to_uris(obj): @app.post('/logical_planner') def run_logical_planner(): plan_ids = request.json.get('plan_ids', '') + print(plan_ids) intent_json = request.json.get('intent_graph', '') algorithm_implementations = request.json.get('algorithm_implementations', '') ontology = Graph().parse(data=request.json.get('ontology', ''), format='turtle') @@ -71,14 +72,15 @@ def run_logical_planner(): algorithm_implementations_uris = convert_strings_to_uris(algorithm_implementations) intent = Graph().parse(data=intent_json, format='turtle') - intent.print() impls = [impl for alg, impls in algorithm_implementations_uris.items() if str(alg) in plan_ids for impl in impls] workflow_plans = workflow_planner(ontology, impls, intent) + workflow_plans[0].serialize(destination='C:\\Users\\marc.maynou\\Desktop\\NextiaJD\\wplan.rdf', format='xml') logical_plans = logical_planner(ontology, workflow_plans) + workflow_plans[0].serialize(destination='C:\\Users\\marc.maynou\\Desktop\\NextiaJD\\lplan.rdf', format='xml') return logical_plans @@ -203,38 +205,32 @@ def download_proactive(): load_dataset_task = bucket.create_Import_Data_task(import_from="PA:USER_FILE", file_path=data_product_name + ".csv", file_delimiter=";", label_column=label_column) proactive_job.addTask(load_dataset_task) + # remove_nulls = bucket.create_Fill_NaNs_task(0) + # split_data_task.addDependency(load_dataset_task) + # proactive_job.addTask(remove_nulls) + split_data_task = bucket.create_Split_Data_task() split_data_task.addDependency(load_dataset_task) proactive_job.addTask(split_data_task) # Model depends on the layout, the rest is the same - scale_task = bucket.create_Scale_Data_task() model_task = bucket.create_Support_Vector_Machines_task() for key in layout: if "decision_tree_predictor" in key: model_task = bucket.create_Random_Forest_task() break - random_forest_task = bucket.create_Random_Forest_task() - proactive_job.addTask(random_forest_task) + proactive_job.addTask(model_task) train_model_task = bucket.create_Train_Model_task() train_model_task.addDependency(split_data_task) train_model_task.addDependency(model_task) proactive_job.addTask(train_model_task) - download_model_task = bucket.create_Download_Model_task() - download_model_task.addDependency(train_model_task) - proactive_job.addTask(download_model_task) - predict_model_task = bucket.create_Predict_Model_task() predict_model_task.addDependency(split_data_task) predict_model_task.addDependency(train_model_task) proactive_job.addTask(predict_model_task) - preview_results_task = bucket.create_Preview_Results_task() - preview_results_task.addDependency(predict_model_task) - proactive_job.addTask(preview_results_task) - gateway.saveJob2XML(proactive_job, os.path.abspath(r'api/temp_files/extremexp_test_workflow.xml')) finally: diff --git a/Modules/NextiaBS/src/main/java/edu/upc/essi/dtim/nextiabs/implementations/CSVBootstrap.java b/Modules/NextiaBS/src/main/java/edu/upc/essi/dtim/nextiabs/implementations/CSVBootstrap.java index a702ffaf..914ae654 100644 --- a/Modules/NextiaBS/src/main/java/edu/upc/essi/dtim/nextiabs/implementations/CSVBootstrap.java +++ b/Modules/NextiaBS/src/main/java/edu/upc/essi/dtim/nextiabs/implementations/CSVBootstrap.java @@ -12,13 +12,9 @@ import edu.upc.essi.dtim.nextiabs.bootstrap.BootstrapResult; import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.CSVParser; +import org.apache.commons.csv.CSVRecord; -import java.io.BufferedReader; -import java.io.FileNotFoundException; -import java.io.FileReader; -import java.io.IOException; -import java.util.Arrays; -import java.util.Objects; +import java.io.*; import java.util.stream.Collectors; import static edu.upc.essi.dtim.nextiabs.utils.DF_MMtoRDFS.productionRulesDataframe_to_RDFS; @@ -44,17 +40,13 @@ public Graph bootstrapSchema(Boolean generateMetadata) { G_target = (LocalGraph) CoreGraphFactory.createGraphInstance("local"); // setPrefixes(); - BufferedReader br; - try { - br = new BufferedReader(new FileReader(path)); - } catch (FileNotFoundException e) { - throw new RuntimeException(e); - } - CSVParser parser; try { - parser = CSVParser.parse(br, CSVFormat.DEFAULT.withFirstRecordAsHeader()); + char delimiter = detectDelimiter(path); // Detect the delimiter of the file + BufferedReader br = new BufferedReader(new FileReader(path)); + parser = CSVParser.parse(br, CSVFormat.DEFAULT.withFirstRecordAsHeader().withDelimiter(delimiter)); } catch (IOException e) { + e.printStackTrace(); throw new RuntimeException(e); } @@ -68,7 +60,7 @@ public Graph bootstrapSchema(Boolean generateMetadata) { G_target.addTriple(createIRI(h2),DataFrame_MM.hasDataType,DataFrame_MM.String); }); - String select = parser.getHeaderNames().stream().map(a -> a + " AS " + reformatName(a)).collect(Collectors.joining(", ")); + String select = parser.getHeaderNames().stream().map(a -> "\"" + a + "\" AS " + reformatName(a)).collect(Collectors.joining(", ")); wrapper = "SELECT " + select + " FROM `" + name + "`"; //TODO: implement metadata @@ -95,6 +87,30 @@ public void generateMetadata(){ G_target.addTripleLiteral(ds, DataSourceVocabulary.HAS_WRAPPER.getURI(), wrapper); } + private char detectDelimiter(String path) throws IOException { + char[] delimiters = {';', ',', '\t'}; + BufferedReader br = new BufferedReader(new FileReader(path)); + + for (char delimiter : delimiters) { + // Parsing the CSV file with current delimiter + CSVFormat csvFormat = CSVFormat.DEFAULT.withDelimiter(delimiter); + CSVParser csvParser = new CSVParser(br, csvFormat); + + Iterable records = csvParser.getRecords(); // Get the first record + if (records.iterator().hasNext()) { + CSVRecord firstRecord = records.iterator().next(); + // If the record contains more than 1 column, we assume it's the correct delimiter + if (firstRecord.size() > 1) { + csvParser.close(); + return delimiter; + } + } + csvParser.close(); // Close the parser + br = new BufferedReader(new FileReader(path)); // Reset the reader to start from the beginning of the file + } + return ','; // Return null if no delimiter is detected + } + @Override public Graph bootstrapSchema() { return bootstrapSchema(false); diff --git a/Modules/NextiaJD/build.gradle b/Modules/NextiaJD/build.gradle index 38b53220..58dac557 100644 --- a/Modules/NextiaJD/build.gradle +++ b/Modules/NextiaJD/build.gradle @@ -1,5 +1,6 @@ plugins { id 'java' + id 'com.github.johnrengelman.shadow' version '7.1.2' } group = 'edu.upc.essi.dtim' @@ -22,11 +23,20 @@ dependencies { implementation group: 'com.googlecode.json-simple', name: 'json-simple', version: '1.1.1' implementation group: 'org.jpmml', name: 'pmml-evaluator', version: '1.6.4' implementation group: 'commons-io', name: 'commons-io', version: '2.15.0' + implementation group: 'com.opencsv', name: 'opencsv', version: '5.9' testImplementation 'org.junit.jupiter:junit-jupiter:5.8.1' testImplementation 'org.junit.jupiter:junit-jupiter:5.8.1' testImplementation 'org.junit.jupiter:junit-jupiter:5.8.1' } +shadowJar { + zip64(true) + manifest { + attributes 'Main-Class': 'edu.upc.essi.dtim.NextiaJD.Main' + } +} + test { useJUnitPlatform() -} \ No newline at end of file +} + diff --git a/Modules/NextiaJD/src/main/java/edu/upc/essi/dtim/NextiaJD/Main.java b/Modules/NextiaJD/src/main/java/edu/upc/essi/dtim/NextiaJD/Main.java index a57ed043..1e476e73 100644 --- a/Modules/NextiaJD/src/main/java/edu/upc/essi/dtim/NextiaJD/Main.java +++ b/Modules/NextiaJD/src/main/java/edu/upc/essi/dtim/NextiaJD/Main.java @@ -1,17 +1,32 @@ package edu.upc.essi.dtim.NextiaJD; +import com.opencsv.CSVWriter; import edu.upc.essi.dtim.NextiaJD.predictQuality.PredictQuality; import edu.upc.essi.dtim.NextiaJD.predictQuality.Profile; import edu.upc.essi.dtim.NextiaJD.utils.DuckDB; +import org.apache.commons.lang3.tuple.Pair; +import com.opencsv.CSVReader; +import java.io.BufferedReader; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; import java.sql.Connection; +import java.sql.SQLException; +import java.util.LinkedList; +import java.util.List; import static edu.upc.essi.dtim.NextiaJD.predictQuality.Profile.generateAllProfilesOfAllDataInAFolder; public class Main { public static void main(String[] args) { -// Connection conn = DuckDB.getConnection(); -// Profile p = new Profile(conn); + Connection conn = null; + try { + conn = DuckDB.getConnection(); + } catch (SQLException e) { + throw new RuntimeException(e); + } + Profile p = new Profile(conn); // // if pathToStoreProfile is left blank (i.e. "") the profile will not be stored in disk // // if resultingProfileName is left blank (i.e. "") the profile file name will be the same as the original csv // p.createProfile("D:\\Work\\TFM\\Others\\eo_xx.csv", "D:\\Work\\Profiles", "test2"); @@ -22,21 +37,232 @@ public static void main(String[] args) { // System.out.println(cq.calculateQualityDiscrete("D:\\Work\\TFM\\Others\\eo_xx.csv", "D:\\Work\\TFM\\Others\\eo_xx.csv", "ein", "ein")); // System.out.println(cq.calculateQualityContinuous("D:\\Work\\TFM\\Others\\eo_xx.csv", "D:\\Work\\TFM\\Others\\eo_xx.csv", "ein", "ein")); -// PredictQuality pq = new PredictQuality(conn); +// PredictQuality pq = new PredictQuality(); // pq.predictQuality("D:\\Projects\\Files\\eo4_profile.json", "D:\\Projects\\Files\\eo_xx_2_profile.json", "NAME", "NAME"); +// pq.calculateDistancesAttVsFolder("dummy_value", "file_1_profile.csv", "C:\\Users\\marc.maynou\\Desktop\\scalability\\linearity\\1_gb\\profiles"); +// pq.calculateDistancesForAllProfilesInAFolder("C:\\Work\\NextiaJD\\other_datasets\\santos_benchmark_small\\profiles_copy", "C:\\Users\\marc.maynou\\Desktop"); + +// pq.predictQuality("C:\\Work\\NextiaJD\\datasets\\profilesCSV\\acquisitions_profile.csv", "C:\\Work\\NextiaJD\\datasets\\profilesCSV\\acquisitions_profile.csv", +// "AcquisitionID", "AcquisitionID"); + +// santos(); +// tus(); +// santosBig(); +// nextiaJD(); +// tusBig(); +// scalability(); + d3l(); + +// p.createProfile("C:\\Work\\NextiaJD\\nextia\\datasets\\worldcitiespop.csv", "C:\\Work\\NextiaJD\\nextia"); + +// p.createProfile(args[0], args[1]); + +// try { +// generateAllProfilesOfAllDataInAFolder("C:\\Work\\NextiaJD\\other_datasets\\D3L\\benchmark", "C:\\Work\\NextiaJD\\other_datasets\\D3L\\profiles_short"); +// } catch (Exception e) { +// throw new RuntimeException(e); +// } + + } + + public static void scalability() { + try { + PredictQuality pq = new PredictQuality(); + + for (int i = 1; i <= 20; ++i) { + pq.calculateDistancesAttVsFolder("dummy_value", "file_" + i + "_profile.csv", "C:\\Users\\marc.maynou\\Desktop\\scalability\\sizes\\size_100_kb\\profiles"); + System.out.println("Query column " + i + " out of " + 20); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public static void santos() { + try { +// generateAllProfilesOfAllDataInAFolder("C:\\Work\\NextiaJD\\other_datasets\\santos_benchmark_small\\datalake", "C:\\Work\\NextiaJD\\other_datasets\\santos_benchmark_small\\profiles"); + PredictQuality pq = new PredictQuality(); + + List> listOfQueryColumns = new LinkedList<>(); + try (CSVReader reader = new CSVReader(new FileReader("C:\\Work\\NextiaJD\\other_datasets\\santos_benchmark_small\\santos_small_benchmark_groundtruth.csv"))) { + String[] headerLine = reader.readNext(); + String[] line; + while ((line = reader.readNext()) != null) { + String dataset = line[1]; + String attribute = line[4]; // the attribute name is the same for the two columns + if (!listOfQueryColumns.contains(Pair.of(dataset, attribute))) { + listOfQueryColumns.add(Pair.of(dataset, attribute)); + } + } + } catch (IOException e) { + e.printStackTrace(); + } + + int counter = 1; + + for (Pair pair: listOfQueryColumns) { + pq.calculateDistancesAttVsFolder(pair.getRight(), pair.getLeft().replace(".csv", "_profile.csv"), "C:\\Work\\NextiaJD\\other_datasets\\santos_benchmark_small\\profiles_short"); + System.out.println("Query column " + counter + " out of " + listOfQueryColumns.size()); + counter++; + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public static void tus() { try { -// generateAllProfilesOfAllDataInAFolder("C:\\Work\\NextiaJD\\datasets", "C:\\Work\\NextiaJD\\datasets\\profilesCSV"); +// generateAllProfilesOfAllDataInAFolder("C:\\Work\\NextiaJD\\other_datasets\\tus\\tus_small\\csvfiles", "C:\\Work\\NextiaJD\\other_datasets\\tus\\tus_small\\profiles"); Connection conn = DuckDB.getConnection(); - Profile p = new Profile(conn); - PredictQuality pq = new PredictQuality(conn); + PredictQuality pq = new PredictQuality(); -// p.createProfile("C:\\Work\\NextiaJD\\datasets\\us_companies_copy.csv", "C:\\Work\\NextiaJD"); + List> listOfQueryColumns = new LinkedList<>(); + try (CSVReader reader = new CSVReader(new FileReader("C:\\Work\\NextiaJD\\other_datasets\\tus_small\\TUS_benchmark_relabeled_groundtruth.csv"))) { + String[] headerLine = reader.readNext(); + String[] line; + while ((line = reader.readNext()) != null) { + String dataset = line[1]; + String attribute = line[4]; // the attribute name is the same for the two columns + if (!listOfQueryColumns.contains(Pair.of(dataset, attribute))) { + listOfQueryColumns.add(Pair.of(dataset, attribute)); + } + } + } catch (IOException e) { + e.printStackTrace(); + } - pq.calculateDistancesForAllProfilesInAFolder("C:\\Work\\NextiaJD\\datasets\\profilesCSV", "C:\\Work\\NextiaJD"); + int counter = 1; -// pq.predictQuality("C:\\Work\\NextiaJD\\datasets\\profilesCSV\\acquisitions_profile.csv", "C:\\Work\\NextiaJD\\datasets\\profilesCSV\\acquisitions_profile.csv", -// "AcquisitionID", "AcquisitionID"); + for (Pair pair: listOfQueryColumns) { + pq.calculateDistancesAttVsFolder(pair.getRight(), pair.getLeft().replace(".csv", "_profile.csv"), "C:\\Work\\NextiaJD\\other_datasets\\tus_small\\profiles_short"); + System.out.println("Query column " + counter + " out of " + listOfQueryColumns.size()); + counter++; + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public static void santosBig() { + try { +// generateAllProfilesOfAllDataInAFolder("C:\\Work\\NextiaJD\\other_datasets\\santos_benchmark_big\\datalake", "C:\\Work\\NextiaJD\\other_datasets\\santos_benchmark_big\\xd"); + PredictQuality pq = new PredictQuality(); + + List> listOfQueryColumns = new LinkedList<>(); + try (CSVReader reader = new CSVReader(new FileReader("C:\\Work\\NextiaJD\\other_datasets\\santos_benchmark_big\\santos_big_benchmark_groundtruth.csv"))) { + String[] headerLine = reader.readNext(); + String[] line; + while ((line = reader.readNext()) != null) { + String dataset = line[0]; + String attribute = line[2]; + if (!listOfQueryColumns.contains(Pair.of(dataset, attribute))) { + listOfQueryColumns.add(Pair.of(dataset, attribute)); + } + } + } catch (IOException e) { + e.printStackTrace(); + } + int counter = 1; + + for (Pair pair: listOfQueryColumns) { + pq.calculateDistancesAttVsFolder(pair.getRight(), pair.getLeft().replace(".csv", "_profile.csv"), "C:\\Work\\NextiaJD\\other_datasets\\santos_benchmark_big\\profiles_short"); + System.out.println("Query column " + counter + " out of " + listOfQueryColumns.size()); + ++counter; + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public static void nextiaJD() { + try { + PredictQuality pq = new PredictQuality(); + + List> listOfQueryColumns = new LinkedList<>(); + try (CSVReader reader = new CSVReader(new FileReader("C:\\Work\\NextiaJD\\nextia\\ground_truth_validate.csv"))) { + String[] headerLine = reader.readNext(); + String[] line; + while ((line = reader.readNext()) != null) { + String dataset = line[0]; + String attribute = line[1]; + if (!listOfQueryColumns.contains(Pair.of(dataset, attribute))) { + listOfQueryColumns.add(Pair.of(dataset, attribute)); + } + } + } catch (IOException e) { + e.printStackTrace(); + } + int counter = 1; + + + for (Pair pair: listOfQueryColumns) { + pq.calculateDistancesAttVsFolder(pair.getRight(), pair.getLeft().replace(".csv", "_profile.csv"), "C:\\Work\\NextiaJD\\nextia\\profiles"); + System.out.println("Query column " + counter + " out of " + listOfQueryColumns.size()); + ++counter; + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public static void tusBig() { + try { + PredictQuality pq = new PredictQuality(); + + List> listOfQueryColumns = new LinkedList<>(); + try (CSVReader reader = new CSVReader(new FileReader("C:\\Work\\NextiaJD\\other_datasets\\tus_big\\TUS_large_candidate_queries_sample.csv"))) { + String[] headerLine = reader.readNext(); + String[] line; + while ((line = reader.readNext()) != null) { + String dataset = line[0]; + String attribute = line[1]; + if (!listOfQueryColumns.contains(Pair.of(dataset, attribute))) { + listOfQueryColumns.add(Pair.of(dataset, attribute)); + } + } + } catch (IOException e) { + e.printStackTrace(); + } + + int counter = 0; + for (Pair pair: listOfQueryColumns) { + pq.calculateDistancesAttVsFolder(pair.getRight(), pair.getLeft().replace(".csv", "_profile.csv"), "C:\\Work\\NextiaJD\\other_datasets\\tus_big\\profiles_short"); + System.out.println("Query column " + counter + " out of " + listOfQueryColumns.size()); + ++counter; + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public static void d3l() { + try { +// generateAllProfilesOfAllDataInAFolder("C:\\Work\\NextiaJD\\other_datasets\\santos_benchmark_small\\datalake", "C:\\Work\\NextiaJD\\other_datasets\\santos_benchmark_small\\profiles"); + PredictQuality pq = new PredictQuality(); + + List> listOfQueryColumns = new LinkedList<>(); + try (CSVReader reader = new CSVReader(new FileReader("C:\\Work\\NextiaJD\\other_datasets\\D3L\\d3l_ground_truth_sample.csv"))) { + String[] headerLine = reader.readNext(); + String[] line; + while ((line = reader.readNext()) != null) { + String dataset = line[1]; + String attribute = line[2]; + if (!listOfQueryColumns.contains(Pair.of(dataset, attribute))) { + listOfQueryColumns.add(Pair.of(dataset, attribute)); + } + } + } catch (IOException e) { + e.printStackTrace(); + } + + int counter = 1; + + for (Pair pair: listOfQueryColumns) { + pq.calculateDistancesAttVsFolder(pair.getRight(), pair.getLeft() + "_profile.csv", "C:\\Work\\NextiaJD\\other_datasets\\D3L\\profiles_short"); + System.out.println("Query column " + counter + " out of " + listOfQueryColumns.size()); + counter++; + } } catch (Exception e) { throw new RuntimeException(e); } diff --git a/Modules/NextiaJD/src/main/java/edu/upc/essi/dtim/NextiaJD/discovery/Discovery.java b/Modules/NextiaJD/src/main/java/edu/upc/essi/dtim/NextiaJD/discovery/Discovery.java index 2ca5d9bb..45237a3e 100644 --- a/Modules/NextiaJD/src/main/java/edu/upc/essi/dtim/NextiaJD/discovery/Discovery.java +++ b/Modules/NextiaJD/src/main/java/edu/upc/essi/dtim/NextiaJD/discovery/Discovery.java @@ -76,7 +76,7 @@ public JSONArray createProfile(String path, String pathToStoreProfile) throws SQ @Override public double predictJoinQuality(String path1, String path2, String att1, String att2) throws SQLException, IOException, ParseException, SAXException, JAXBException { Connection conn = DuckDB.getConnection(); - PredictQuality pq = new PredictQuality(conn); + PredictQuality pq = new PredictQuality(); return pq.predictQuality(path1, path2, att1, att2); } diff --git a/Modules/NextiaJD/src/main/java/edu/upc/essi/dtim/NextiaJD/predictQuality/PredictQuality.java b/Modules/NextiaJD/src/main/java/edu/upc/essi/dtim/NextiaJD/predictQuality/PredictQuality.java index 12a8bfa5..c95be37d 100644 --- a/Modules/NextiaJD/src/main/java/edu/upc/essi/dtim/NextiaJD/predictQuality/PredictQuality.java +++ b/Modules/NextiaJD/src/main/java/edu/upc/essi/dtim/NextiaJD/predictQuality/PredictQuality.java @@ -3,15 +3,14 @@ import org.apache.commons.text.similarity.LevenshteinDistance; import java.io.*; -import java.sql.Connection; import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import static edu.upc.essi.dtim.NextiaJD.utils.Utils.readCSVFile; -import static edu.upc.essi.dtim.NextiaJD.utils.Utils.readJSONFile; public class PredictQuality { - - Connection conn; LinkedList metricsToNormalize = new LinkedList<>(Arrays.asList( "cardinality", "entropy", "frequency_avg", "frequency_min", "frequency_max", "frequency_sd", "len_max_word", "len_min_word", "len_avg_word", "words_cnt_max", "words_cnt_min", "words_cnt_avg", @@ -39,7 +38,7 @@ public class PredictQuality { Map.entry("first_word", 3), Map.entry("last_word", 3) ); - public PredictQuality(Connection conn) {this.conn = conn;} + public PredictQuality() {} public double predictQuality(String path1, String path2, String att1, String att2) { // LinkedList> profiles1 = readJSONFile(path1); @@ -53,10 +52,6 @@ public double predictQuality(String path1, String path2, String att1, String att normalizeProfile(profiles1); normalizeProfile(profiles2); - // Initialize the distances map with the cardinalities -// Map distances = getCardinalityProportion(profiles1, att1, profiles2, att2); - Map distances = new HashMap<>(); - // Get the profiles that we need (that is, get only the two profiles corresponding to the two attributes to compare) Map profile1 = new HashMap<>(); Map profile2 = new HashMap<>(); @@ -68,23 +63,28 @@ public double predictQuality(String path1, String path2, String att1, String att } // Calculate the distances - distances.putAll(calculateDistances(profile1, profile2)); + Map distances = calculateDistances(profile1, profile2); + if (distances.isEmpty()) { + throw new RuntimeException(); + } // writeDistances("C:\\Projects\\ODIN", distances, true); return predictQualityThroughModel(distances); } private void writeDistances(String distancesFilePath, Map distances, Boolean writeHeader) throws IOException { - File file = new File(distancesFilePath + "\\distances.csv"); + File file = new File(distancesFilePath.replace("/", "_").replace(": ","_")); + boolean fileExists = file.exists(); Writer writer = new FileWriter(file, true); - if (writeHeader) { + if (!fileExists) { for (String key: distances.keySet()) { writer.write(key); writer.write(","); } writer.write("\n"); } + for (String key: distances.keySet()) { writer.write(String.valueOf(distances.get(key))); writer.write(","); @@ -93,22 +93,17 @@ private void writeDistances(String distancesFilePath, Map distan writer.flush(); } -// private Map getCardinalityProportion(LinkedList> profiles1, String att1, LinkedList> profiles2, String att2) { -// Map distances = new HashMap<>(); -// -// double cardinality1 = 0.0; -// double cardinality2 = 0.0; -// for (Map profile: profiles1) { -// if (profile.get("attribute_name").equals(att1)) cardinality1 = Double.parseDouble(String.valueOf(profile.get("cardinality"))); -// } -// for (Map profile: profiles2) { -// if (profile.get("attribute_name").equals(att2)) cardinality2 = Double.parseDouble(String.valueOf(profile.get("cardinality"))); -// } -// distances.put("original_cardinality", cardinality1); -// distances.put("original_cardinality_2", cardinality2); -// -// return distances; -// } + private void writeHeader(String distancesFilePath, Map distances) throws IOException { + File file = new File(distancesFilePath.replace("/", "_")); + Writer writer = new FileWriter(file, true); + + for (String key: distances.keySet()) { + writer.write(key); + writer.write(","); + } + writer.write("\n"); + writer.flush(); + } private double predictQualityThroughModel(Map distances) { return 0.0; @@ -116,34 +111,40 @@ private double predictQualityThroughModel(Map distances) { private Map calculateDistances(Map profile1, Map profile2) { Map distances = new HashMap<>(); - for (String feature: profile1.keySet()) { - if (distancePattern.get(feature) == 0) { // subtraction for most numeric values, such as cardinality - double value = objectToDouble(profile1.get(feature)) - objectToDouble(profile2.get(feature)); - distances.put(feature, value); - } - else if (distancePattern.get(feature) == 1) { // containment for arrays, such as the most common words - List elementsList1 = Arrays.asList(((String) profile1.get(feature)).replaceAll("\\[|\\]|\\s", "").split(",")); - List elementsList2 = Arrays.asList(((String) profile2.get(feature)).replaceAll("\\[|\\]|\\s", "").split(",")); - LinkedList listValues1 = new LinkedList<>(elementsList1); - LinkedList listValues2 = new LinkedList<>(elementsList2); - - double numberOfContainedValues = 0.0; - for (String value: listValues1) { - if (listValues2.contains(value)) numberOfContainedValues += 1; + try { + for (String feature: profile1.keySet()) { + if (distancePattern.get(feature) == 0) { // subtraction for most numeric values, such as cardinality + double value = objectToDouble(profile1.get(feature)) - objectToDouble(profile2.get(feature)); + distances.put(feature, value); + } + else if (distancePattern.get(feature) == 1) { // containment for arrays, such as the most common words + List elementsList1 = Arrays.asList(((String) profile1.get(feature)).replaceAll("\\[|\\]|\\s", "").split(",")); + List elementsList2 = Arrays.asList(((String) profile2.get(feature)).replaceAll("\\[|\\]|\\s", "").split(",")); + LinkedList listValues1 = new LinkedList<>(elementsList1); + LinkedList listValues2 = new LinkedList<>(elementsList2); + + double numberOfContainedValues = 0.0; + for (String value: listValues1) { + if (listValues2.contains(value)) numberOfContainedValues += 1; + } + distances.put(feature, numberOfContainedValues/listValues1.size()); + } + else if (distancePattern.get(feature) == 2) { // add both values, such as the two datasets names + distances.put(feature, profile1.get(feature)); + distances.put(feature + "_2", profile2.get(feature)); + } + else if (distancePattern.get(feature) == 3) { // levenshtein distance, such as for the first words + distances.put(feature, Double.valueOf(LevenshteinDistance.getDefaultInstance() + .apply((CharSequence) profile1.get(feature), (CharSequence) profile2.get(feature)))); } - distances.put(feature, numberOfContainedValues/listValues1.size()); - } - else if (distancePattern.get(feature) == 2) { // add both values, such as the two datasets names - distances.put(feature, profile1.get(feature)); - distances.put(feature + "_2", profile2.get(feature)); - } - else if (distancePattern.get(feature) == 3) { // levenshtein distance, such as for the first words - distances.put(feature, Double.valueOf(LevenshteinDistance.getDefaultInstance() - .apply((CharSequence) profile1.get(feature), (CharSequence) profile2.get(feature)))); } + distances.putAll(calculateBinaryFeatures(profile1,profile2)); + } catch (Exception e) { + throw new RuntimeException(e); + } + if (distances.size() != 40 && distances.size() != 61) { // 61 for full + distances = new HashMap<>(); } - - distances.putAll(calculateBinaryFeatures(profile1,profile2)); return distances; } @@ -193,11 +194,14 @@ private void normalizeProfile(LinkedList> profile) { } } } - //writeJSON(profile, "", "/home/marc/Escritorio/Files/Profiles", "normalized_profile"); } private double objectToDouble(Object o) { - return Double.parseDouble(String.valueOf(o)); + try { + return Double.parseDouble(String.valueOf(o)); + } catch (Exception e) { + return 0; + } } public void calculateDistancesForAllProfilesInAFolder(String path, String distancesPath) { @@ -206,9 +210,9 @@ public void calculateDistancesForAllProfilesInAFolder(String path, String distan assert files != null; for (int i = 0; i < files.length; ++i) { + System.out.println("Started iteration " + i); for (int j = 0; j< files.length; ++j) { if (j > i) { - System.out.println("Dataset 1: " + (i+1) + "/" + (files.length - 1) + " " + files[i] + " || Dataset 2: " + (j-i) + "/" + (files.length - i - 1) + " " + files[j]); LinkedList> profiles1 = readCSVFile(String.valueOf(files[i])); LinkedList> profiles2 = readCSVFile(String.valueOf(files[j])); @@ -222,24 +226,88 @@ public void calculateDistancesForAllProfilesInAFolder(String path, String distan // we get the profiles of both attributes and calculate the distances. for (Map profile1: profiles1) { for (Map profile2: profiles2) { - Map distances = new HashMap<>(); - double cardinality1 = Double.parseDouble(String.valueOf(profile1.get("cardinality"))); - double cardinality2 = Double.parseDouble(String.valueOf(profile2.get("cardinality"))); - distances.put("K", Math.min(cardinality1, cardinality2)/Math.max(cardinality1, cardinality2)); - distances.put("cardinalityRaw", cardinality1); - distances.put("cardinalityRaw_2", cardinality2); - distances.putAll(calculateDistances(profile1, profile2)); + Map distances = calculateDistances(profile1, profile2); + if (!distances.isEmpty()) { + try { + writeDistances(distancesPath + "\\distances.csv", distances, writeHeader); + writeHeader = false; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + } + + } + } + } + } + + public void calculateDistancesAttVsFolder(String attribute, String dataset, String path) { + File[] files = (new File (path)).listFiles(File::isFile); + + LinkedList> profilesDataset = readCSVFile(path + "\\" + dataset); + profilesDataset.removeAll(Collections.singleton(null)); + normalizeProfile(profilesDataset); + profilesDataset.forEach(prof -> System.out.println(prof.get("attribute_name"))); + Map queryProfile = profilesDataset.stream().filter(prof -> prof.get("attribute_name").equals("\"" + attribute.trim() + "\"")).findFirst().orElse(null); + assert queryProfile != null; + assert files != null; + + ExecutorService executor = Executors.newFixedThreadPool(8); + + for (int i = 0; i < files.length; ++i) { + int finalI = i; + String fileDatasetName = String.valueOf(files[finalI]).substring(String.valueOf(files[finalI]).lastIndexOf("\\") + 1); + if (i == 0) { + // Filter out the same dataset + if (!fileDatasetName.equals(dataset)) { + LinkedList> dataLakeProfiles = readCSVFile(String.valueOf(files[finalI])); + for (Map dataLakeProfile : dataLakeProfiles) { + Map distances = calculateDistances(queryProfile, dataLakeProfile); + if (!distances.isEmpty()) { try { - writeDistances(distancesPath, distances, writeHeader); - writeHeader = false; + File folder = new File(path + "\\distances"); // Create folder + folder.mkdirs(); + + String distancesPath = path + "\\distances\\" + "distances_" + dataset.replace(".csv", "") + "_" + attribute + ".csv"; + writeDistances(distancesPath, distances, false); } catch (IOException e) { throw new RuntimeException(e); } } } - } } + else { + executor.submit(() -> { + // Filter out the same dataset +// if (!fileDatasetName.equals(dataset)) { + LinkedList> dataLakeProfiles = readCSVFile(String.valueOf(files[finalI])); + for (Map dataLakeProfile : dataLakeProfiles) { + Map distances = calculateDistances(queryProfile, dataLakeProfile); + if (!distances.isEmpty()) { + try { + File folder = new File(path + "\\distances"); // Create folder + folder.mkdirs(); + + String distancesPath = path + "\\distances\\" + "distances_" + dataset.replace(".csv", "") + "_" + attribute + ".csv"; + writeDistances(distancesPath, distances, false); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } +// } + }); + } + } + + executor.shutdown(); + try { + executor.awaitTermination(5, TimeUnit.MINUTES); + } catch (InterruptedException e) { + throw new RuntimeException(e); } } } diff --git a/Modules/NextiaJD/src/main/java/edu/upc/essi/dtim/NextiaJD/predictQuality/Profile.java b/Modules/NextiaJD/src/main/java/edu/upc/essi/dtim/NextiaJD/predictQuality/Profile.java index 3c0def3d..8c572c5d 100644 --- a/Modules/NextiaJD/src/main/java/edu/upc/essi/dtim/NextiaJD/predictQuality/Profile.java +++ b/Modules/NextiaJD/src/main/java/edu/upc/essi/dtim/NextiaJD/predictQuality/Profile.java @@ -3,15 +3,19 @@ import edu.upc.essi.dtim.NextiaJD.utils.DuckDB; import org.json.simple.JSONArray; +import java.io.BufferedWriter; import java.io.File; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; -import java.sql.Connection; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; +import java.sql.*; import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.nio.charset.*; +import java.io.IOException; +import java.util.List; import static edu.upc.essi.dtim.NextiaJD.predictQuality.FeatureGeneration.*; import static edu.upc.essi.dtim.NextiaJD.utils.Utils.*; @@ -19,25 +23,58 @@ public class Profile { Connection conn; - String tableName = "temptTable"; + String tableName; public Profile(Connection conn) { this.conn = conn; + this.tableName = "temptTable"; + } + + public Profile(Connection conn, String tableName) { + this.conn = conn; + this.tableName = tableName; } public JSONArray createProfile(String dataPath, String pathToStoreProfile) { try { // Create table from file and preprocess the data (trim and lowercase) Statement stmt = conn.createStatement(); - stmt.execute("CREATE TABLE \"" + tableName + "\" AS SELECT * FROM read_csv_auto('" + dataPath + "', header=True, all_varchar=True)"); + try { + stmt.execute("CREATE TABLE \"" + tableName + "\" AS SELECT * FROM read_csv_auto('" + dataPath + "', header=True, sample_size=100, ignore_errors=true)"); + } catch (SQLException e) { + if (e.toString().contains("Invalid unicode (byte sequence mismatch) detected in CSV file")) { + try { + // Read all lines from the file with Latin-1 encoding + List lines = Files.readAllLines(Paths.get(dataPath), StandardCharsets.ISO_8859_1); + + // Write all lines back to the same file with UTF-8 encoding + try (BufferedWriter writer = Files.newBufferedWriter(Paths.get(dataPath), StandardCharsets.UTF_8)) { + for (String line : lines) { + writer.write(line); + writer.newLine(); + } + } + stmt = conn.createStatement(); + stmt.execute("CREATE TABLE \"" + tableName + "\" AS SELECT * FROM read_csv_auto('" + dataPath + "', header=True, sample_size=100, ignore_errors=true)"); + } catch (IOException ex) { + ex.printStackTrace(); + } + } + if (e.toString().contains("CSV File not supported for multithreading")) { + stmt = conn.createStatement(); + stmt.execute("CREATE TABLE \"" + tableName + "\" AS SELECT * FROM read_csv_auto('" + dataPath + "', header=True, sample_size=100, ignore_errors=True, parallel=false)"); + } + } preprocessing(conn, tableName); // Generate the profile of the table: for each column, its profile is generated and added to the features variable LinkedList> features = new LinkedList<>(); ResultSet rs = stmt.executeQuery("DESCRIBE \"" + tableName + "\""); while (rs.next()) { - // We only generate the profile if the column has some value (i.e. if it is only null values, we do not create the profile) - if (getNumberOfValues(conn, tableName, rs.getString(1)) != 0.0) { + ResultSet rs2 = conn.createStatement().executeQuery("SELECT \"" + rs.getString(1) + "\" FROM \"" + tableName + "\" LIMIT 0"); + ResultSetMetaData rsmd = rs2.getMetaData(); + // We only generate the profile if the column is VARCHAR and has some value (i.e. if it is only null values, we do not create the profile) + if (rsmd.getColumnTypeName(1).equals("VARCHAR") && getNumberOfValues(conn, tableName, rs.getString(1)) != 0.0) { features.add(createProfileOfColumn(rs.getString(1))); } } @@ -48,7 +85,7 @@ public JSONArray createProfile(String dataPath, String pathToStoreProfile) { } // Write the profile in a CSV/JSON file - if (!pathToStoreProfile.isEmpty()) { + if (!pathToStoreProfile.isEmpty() && !features.isEmpty()) { writeCSV(features, dataPath, pathToStoreProfile); // writeJSON(features, dataPath, pathToStoreProfile); } @@ -57,11 +94,23 @@ public JSONArray createProfile(String dataPath, String pathToStoreProfile) { JSONArray json = new JSONArray(); json.addAll(features); stmt.execute("DROP TABLE \"" + tableName + "\""); + stmt.close(); + conn.close(); + return json; } catch (SQLException e) { - throw new RuntimeException(e); + System.out.println("SKIPPED " + dataPath); + System.out.println(e); + // We have to remove the table if it has been created. Otherwise, the remaining profiles will not be generated + try { + Statement stmt = conn.createStatement(); + stmt.execute("DROP TABLE IF EXISTS \"" + tableName + "\""); + } catch (SQLException ex) { + throw new RuntimeException(ex); + } } + return null; } public Map createProfileOfColumn(String column) throws SQLException { @@ -70,7 +119,7 @@ public Map createProfileOfColumn(String column) throws SQLExcepti addValueDistributionFeatures(column, columnFeatures); addSyntacticFeatures(column, columnFeatures); addOtherFeatures(column, columnFeatures); - columnFeatures.put("attribute_name", column); // Add name of the column + columnFeatures.put("attribute_name", "\"" + column + "\""); // Add name of the column return columnFeatures; } @@ -111,18 +160,18 @@ public void addValueDistributionFeatures(String column, Map colu } public void addSyntacticFeatures(String column, Map columnFeatures) throws SQLException { - Map newFeatures = generateDatatypes(conn, tableName, column); - columnFeatures.put("datatype", newFeatures.get("datatype")); - columnFeatures.put("specific_type", newFeatures.get("specific_type")); - - String[] datatypeLabels = {"pct_numeric", "pct_alphanumeric", "pct_alphabetic", "pct_non_alphanumeric", "pct_date_time", "pct_unknown"}; - String[] specificDatatypeLabels = {"pct_phones", "pct_email", "pct_url", "pct_ip", "pct_username", "pct_phrases", "pct_general", - "pct_date", "pct_time", "pct_date_time_specific", "pct_others"}; // Other = not determined - - for (String datatypeLabel : datatypeLabels) columnFeatures.put(datatypeLabel, newFeatures.get(datatypeLabel)); - for (String specificDatatypeLabel : specificDatatypeLabels) columnFeatures.put(specificDatatypeLabel, newFeatures.get(specificDatatypeLabel)); - - newFeatures = generateLengths(conn, tableName, column); +// Map newFeatures = generateDatatypes(conn, tableName, column); +// columnFeatures.put("datatype", newFeatures.get("datatype")); +// columnFeatures.put("specific_type", newFeatures.get("specific_type")); +// +// String[] datatypeLabels = {"pct_numeric", "pct_alphanumeric", "pct_alphabetic", "pct_non_alphanumeric", "pct_date_time", "pct_unknown"}; +// String[] specificDatatypeLabels = {"pct_phones", "pct_email", "pct_url", "pct_ip", "pct_username", "pct_phrases", "pct_general", +// "pct_date", "pct_time", "pct_date_time_specific", "pct_others"}; // Other = not determined +// +// for (String datatypeLabel : datatypeLabels) columnFeatures.put(datatypeLabel, newFeatures.get(datatypeLabel)); +// for (String specificDatatypeLabel : specificDatatypeLabels) columnFeatures.put(specificDatatypeLabel, newFeatures.get(specificDatatypeLabel)); + + Map newFeatures = generateLengths(conn, tableName, column); columnFeatures.put("len_max_word", newFeatures.get("len_max_word")); columnFeatures.put("len_min_word", newFeatures.get("len_min_word")); columnFeatures.put("len_avg_word", newFeatures.get("len_avg_word")); @@ -148,19 +197,34 @@ public void addOtherFeatures(String column, Map columnFeatures) } public static void generateAllProfilesOfAllDataInAFolder(String path, String pathToStore) throws Exception { - Connection conn = DuckDB.getConnection(); Files.createDirectories(Path.of(pathToStore)); // Path of the folder that contains the files to obtain profiles from (we get only the files) - File[] files = (new File (path)).listFiles(File::isFile); + File[] files = (new File(path)).listFiles(File::isFile); assert files != null; int counter = 1; - for (File file: files) { - System.out.println("File " + counter + " out of " + files.length + ": " + file); - Profile p = new Profile(conn); - p.createProfile(String.valueOf(file), pathToStore); + + // Define the thread pool + int parallelism = 4; // Number of available processors + ExecutorService executor = Executors.newFixedThreadPool(parallelism); + + for (File file : files) { + int fileNumber = counter; + executor.submit(() -> { + try { + Connection conn = DuckDB.getConnection(); + Profile p = new Profile(conn, "table" + fileNumber); + p.createProfile(String.valueOf(file), pathToStore); + System.out.println("File " + fileNumber + " out of " + files.length + ": " + file); + } catch (Exception e) { + e.printStackTrace(); + } + }); counter++; } + + // Shutdown the executor when all tasks are completed + executor.shutdown(); } } diff --git a/Modules/NextiaJD/src/main/java/edu/upc/essi/dtim/NextiaJD/utils/DuckDB.java b/Modules/NextiaJD/src/main/java/edu/upc/essi/dtim/NextiaJD/utils/DuckDB.java index f07608e7..f8fb1a88 100644 --- a/Modules/NextiaJD/src/main/java/edu/upc/essi/dtim/NextiaJD/utils/DuckDB.java +++ b/Modules/NextiaJD/src/main/java/edu/upc/essi/dtim/NextiaJD/utils/DuckDB.java @@ -13,5 +13,6 @@ public static Connection getConnection() throws SQLException { throw new RuntimeException("DuckDB driver not found"); } return DriverManager.getConnection("jdbc:duckdb:"); +// return DriverManager.getConnection("jdbc:duckdb:" + "C:\\Projects\\database"); } } diff --git a/Modules/NextiaJD/src/main/java/edu/upc/essi/dtim/NextiaJD/utils/Utils.java b/Modules/NextiaJD/src/main/java/edu/upc/essi/dtim/NextiaJD/utils/Utils.java index a27b593b..951b12cb 100644 --- a/Modules/NextiaJD/src/main/java/edu/upc/essi/dtim/NextiaJD/utils/Utils.java +++ b/Modules/NextiaJD/src/main/java/edu/upc/essi/dtim/NextiaJD/utils/Utils.java @@ -39,7 +39,7 @@ public static void preprocessing(Connection conn, String tableName) throws SQLEx "SET \"" + column + "\" = " + "CASE " + "WHEN \"" + column + "\" IN ('', ' ') THEN NULL " + - "ELSE LOWER(TRIM(REPLACE(REPLACE(\"" + column + "\", '\n', ' '), ';', ','))) " + + "ELSE LOWER(TRIM(REPLACE(REPLACE(\"" + column + "\", '\n', '_'), ';', ','))) " + "END"); } } diff --git a/api/src/main/java/edu/upc/essi/dtim/odin/nextiaInterfaces/nextiaQR/qrModuleImpl.java b/api/src/main/java/edu/upc/essi/dtim/odin/nextiaInterfaces/nextiaQR/qrModuleImpl.java index 7eb8dc89..44b780d6 100644 --- a/api/src/main/java/edu/upc/essi/dtim/odin/nextiaInterfaces/nextiaQR/qrModuleImpl.java +++ b/api/src/main/java/edu/upc/essi/dtim/odin/nextiaInterfaces/nextiaQR/qrModuleImpl.java @@ -37,7 +37,11 @@ public QueryResult makeQuery(IntegratedGraphJenaImpl integratedGraph, List { if (logPlan.id === this.removeLastPart(key)) { logPlan.plans.push(plan)