Skip to content

Commit

Permalink
Parallelize NextiaJD
Browse files Browse the repository at this point in the history
  • Loading branch information
marc-maynou committed Apr 7, 2024
1 parent 492f6f5 commit cc47386
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 20 deletions.
11 changes: 10 additions & 1 deletion Modules/NextiaJD/build.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
plugins {
id 'java'
id 'com.github.johnrengelman.shadow' version '7.1.2'
}

group = 'edu.upc.essi.dtim'
Expand Down Expand Up @@ -28,6 +29,14 @@ dependencies {
testImplementation 'org.junit.jupiter:junit-jupiter:5.8.1'
}

shadowJar {
zip64(true)
manifest {
attributes 'Main-Class': 'edu.upc.essi.dtim.NextiaJD.Main'
}
}

test {
useJUnitPlatform()
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,18 @@ public static void main(String[] args) {
// pq.predictQuality("C:\\Work\\NextiaJD\\datasets\\profilesCSV\\acquisitions_profile.csv", "C:\\Work\\NextiaJD\\datasets\\profilesCSV\\acquisitions_profile.csv",
// "AcquisitionID", "AcquisitionID");


// santos();
// tus();
santosBig();
// p.createProfile("C:\\Work\\NextiaJD\\other_datasets\\santos_benchmark_big\\datalake\\gp-prescribing---september-2018.csv",
// "C:\\Work");
// santosBig();
// p.createProfile("D:\\real_data_lake_benchmark\\datalake\\HMRC_WMI_headcount_and_payroll_data_June_2015.csv", "D:\\");



p.createProfile(args[0], args[1]);

}

// java -jar NextiaJD-all.jar %%F "C:\Projects\real_data_lake_benchmark\profiles"
public static void santos() {
try {
// generateAllProfilesOfAllDataInAFolder("C:\\Work\\NextiaJD\\other_datasets\\santos_benchmark_small\\datalake", "C:\\Work\\NextiaJD\\other_datasets\\santos_benchmark_small\\profiles");
Expand Down Expand Up @@ -115,7 +119,7 @@ public static void tus() {

public static void santosBig() {
try {
generateAllProfilesOfAllDataInAFolder("C:\\Work\\NextiaJD\\other_datasets\\santos_benchmark_big\\datalake", "C:\\Work\\NextiaJD\\other_datasets\\santos_benchmark_big\\profiles");
generateAllProfilesOfAllDataInAFolder("C:\\Projects\\real_data_lake_benchmark\\datalake", "C:\\Projects\\real_data_lake_benchmark\\profiles");
// Connection conn = DuckDB.getConnection();
// Profile p = new Profile(conn);
// PredictQuality pq = new PredictQuality(conn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public JSONArray createProfile(String dataPath, String pathToStoreProfile) {
try {
// Create table from file and preprocess the data (trim and lowercase)
Statement stmt = conn.createStatement();
stmt.execute("CREATE TABLE \"" + tableName + "\" AS SELECT * FROM read_csv_auto('" + dataPath + "', header=True, sample_size=-1, ignore_errors=True)");
stmt.execute("CREATE TABLE \"" + tableName + "\" AS SELECT * FROM read_csv_auto('" + dataPath + "', header=True, sample_size=1000, ignore_errors=True)");
preprocessing(conn, tableName);

// Generate the profile of the table: for each column, its profile is generated and added to the features variable
Expand Down Expand Up @@ -64,10 +64,12 @@ public JSONArray createProfile(String dataPath, String pathToStoreProfile) {
JSONArray json = new JSONArray();
json.addAll(features);
stmt.execute("DROP TABLE \"" + tableName + "\"");

return json;

} catch (SQLException e) {
System.out.println("SKIPPED");
System.out.println("SKIPPED " + dataPath);
System.out.println(e);
// We have to remove the table if it has been created. Otherwise, the remaining profiles will not be generated
try {
Statement stmt = conn.createStatement();
Expand Down Expand Up @@ -166,30 +168,32 @@ public static void generateAllProfilesOfAllDataInAFolder(String path, String pat
Files.createDirectories(Path.of(pathToStore));

// Path of the folder that contains the files to obtain profiles from (we get only the files)
File[] files = (new File (path)).listFiles(File::isFile);
File[] files = (new File(path)).listFiles(File::isFile);
assert files != null;

int counter = 1;

ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
// Creating a fixed-size thread pool
ExecutorService executor = Executors.newFixedThreadPool(8);

for (File file: files) {
// System.out.println("File " + counter + " out of " + files.length + ": " + file);
final int counterIteration = counter;
for (File file : files) {
final int counterIter = counter;
executor.submit(() -> {
Connection conn = null;
try {
conn = DuckDB.getConnection();
} catch (SQLException e) {
throw new RuntimeException(e);
System.out.println("File " + counterIter + " out of " + files.length + ": " + file + " || " + "Processor: " + Thread.currentThread().getName());
if (counterIter > 10000 && counterIter <= 11000) {
Connection conn = DuckDB.getConnection();
Profile p = new Profile(conn, "table" + counterIter);
p.createProfile(String.valueOf(file), pathToStore);
}
} catch (Exception e) {
e.printStackTrace();
}
Profile p = new Profile(conn, "table" + counterIteration);
p.createProfile(String.valueOf(file), pathToStore);
System.out.println("Finsihed " + counterIteration);
});
counter++;
}

// Shutdown the executor after all tasks are submitted
executor.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ public static Connection getConnection() throws SQLException {
throw new RuntimeException("DuckDB driver not found");
}
return DriverManager.getConnection("jdbc:duckdb:");
// return DriverManager.getConnection("jdbc:duckdb:" + "C:\\Projects\\database");
}
}

0 comments on commit cc47386

Please sign in to comment.