Skip to content

Commit

Permalink
- Changing the way we compare dataframes, use dataframe apis.
Browse files Browse the repository at this point in the history
  • Loading branch information
sonal committed Sep 27, 2019
1 parent 6a12a57 commit 609714d
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -375,15 +375,14 @@ protected void executeSnappyStreamingJob(Vector jobClassNames, String logFileNam
Log.getLogWriter().info("JobID is : " + jobID);
SnappyBB.getBB().getSharedMap().put(appName, jobID);
for (int j = 0; j < 3; j++) {
if (!getJobStatus(jobID)) {
throw new TestException("Got Exception while executing streaming job. Please check " +
"the job status output.");
}
try {
Thread.sleep(10 * 1000);
} catch (InterruptedException ie) {
}
getJobStatus(jobID);
}
if(!checkJobStatus(jobID)){
throw new TestException("Got Exception while executing streaming job. Please check " +
"the job status output.");
}
}
}
Expand Down Expand Up @@ -596,7 +595,7 @@ public boolean getJobStatus(String jobID){
return false;
break;
}
} try { Thread.sleep(10*1000);} catch(InterruptedException ie) { }
}
} catch (IOException ie){
Log.getLogWriter().info("Got exception while accessing current dir");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -970,8 +970,6 @@ public void performUpdate() {
if (stmt.toUpperCase().contains("SELECT"))
getAndExecuteSelect(dConn,stmt,true);
Log.getLogWriter().info("Executing " + stmt + " on derby.");
if (stmt.toUpperCase().contains("SELECT"))
getAndExecuteSelect(dConn, stmt, true);
int derbyRows = dConn.createStatement().executeUpdate(stmt);
Log.getLogWriter().info("Updated " + derbyRows + " rows in derby.");
if (numRows != derbyRows) {
Expand Down Expand Up @@ -1021,8 +1019,6 @@ public void performDelete() {
if (stmt.toUpperCase().contains("SELECT"))
getAndExecuteSelect(dConn,stmt,true);
Log.getLogWriter().info("Executing " + stmt + " on derby.");
if (stmt.toUpperCase().contains("SELECT"))
getAndExecuteSelect(dConn, stmt, true);
int derbyRows = dConn.createStatement().executeUpdate(stmt);
Log.getLogWriter().info("Deleted " + derbyRows + " rows in derby.");
if (numRows != derbyRows) {
Expand Down
72 changes: 47 additions & 25 deletions dtests/src/test/scala/io/snappydata/hydra/SnappyTestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ object SnappyTestUtils {
pw: PrintWriter, sqlContext: SQLContext): Boolean = {
var validationFailed = false
var snappyDF: DataFrame = null
snappyDF = snc.sql(sqlString)
snappyDF = snc.sql(sqlString).cache()
val snappyDFCount = snappyDF.count
// scalastyle:off println
pw.println(s"\n${logTime} Executing Query $queryNum ...")
Expand All @@ -99,7 +99,7 @@ object SnappyTestUtils {
}
var fullRSValidationFailed: Boolean = false
if (validateFullResultSet) {
val sparkDF = sqlContext.sql(sqlString)
val sparkDF = sqlContext.sql(sqlString).cache()
val sparkDFCount = sparkDF.count()
if(snappyDFCount != sparkDFCount) {
pw.println(s"Count difference observed in snappy and spark resultset for query " +
Expand All @@ -125,18 +125,16 @@ object SnappyTestUtils {
def assertQuery(snc: SnappyContext, snappyDF: DataFrame, sparkDF: DataFrame, queryNum: String,
pw: PrintWriter): Boolean = {
var fullRSValidationFailed = false
val snappyQueryFileName = s"Snappy_${queryNum}"
val snappyDest: String = getQueryResultDir("snappyResults") +
File.separator + snappyQueryFileName

val snappyResFileName = s"Snappy_${queryNum}"
val snappyDest: String = getQueryResultDir("snappyResults") + File.separator + snappyResFileName
// scalastyle:off println
// pw.println(s"Snappy query results are at : ${snappyDest}")
val snappyFile: File = new java.io.File(snappyDest)

val sparkQueryFileName = s"Spark_${queryNum}"
val sparkDest: String = getQueryResultDir("sparkResults") + File.separator +
sparkQueryFileName
// pw.println(s"Spark query results are at : ${sparkDest}")
val sparkResFileName = s"Spark_${queryNum}"
val sparkDest: String = getQueryResultDir("sparkResults") + File.separator + sparkResFileName
val sparkFile: File = new java.io.File(sparkDest)

try {
if (!snappyFile.exists()) {
// val snap_col1 = snappyDF.schema.fieldNames(0)
Expand All @@ -145,27 +143,52 @@ object SnappyTestUtils {
writeToFile(snappyDF.repartition((1)), snappyDest, snc)
pw.println(s"${logTime} Snappy result collected in : ${snappyDest}")
}

if (!sparkFile.exists()) {
// val col1 = sparkDF.schema.fieldNames(0)
// val col = sparkDF.schema.fieldNames.filter(!_.equals(col1)).toSeq
// sparkDF.repartition(1).sortWithinPartitions(col1, col: _*)
writeToFile(sparkDF.repartition(1), sparkDest, snc)
pw.println(s"${logTime} Spark result collected in : ${sparkDest}")
}
val missingDF = sparkDF.except(snappyDF).collectAsList()
val unexpectedDF = snappyDF.except(sparkDF).collectAsList()
if(missingDF.size() > 0 || unexpectedDF.size() > 0) {
fullRSValidationFailed = true
pw.println("Found mismatch in resultset")
if(missingDF.size() > 0) {
pw.println(s"The following ${missingDF.size} rows were missing in snappyDF:\n ")
for(i <- 0 to missingDF.size())
pw.println(missingDF.get(i))
val expectedFile = sparkFile.listFiles.filter(_.getName.endsWith(".csv"))
val sparkDF2 = snc.read.format("com.databricks.spark.csv")
.option("header", "false")
.option("inferSchema", "false")
.option("nullValue", "NULL")
.option("maxCharsPerColumn", "4096")
.load(s"${expectedFile}")

val missingDF: Array[Row] = sparkDF2.except(snappyDF).sort(sparkDF2.columns(0)).collect()
val unexpectedDF: Array[Row] = snappyDF.except(sparkDF2).sort(sparkDF2.columns(0)).collect()

val aStr = new StringBuilder
if(missingDF.length > 0 || unexpectedDF.length > 0) {
pw.println(s"Found mismatch in resultset for query ${queryNum}... ")
if(missingDF.length > 0) {
aStr.append(s"The following ${missingDF.size} rows were missing in snappyDF:\n ")
for(i <- 0 to missingDF.size)
aStr.append(missingDF(i) + "\n")
}
if(unexpectedDF.size() > 0) {
pw.println(s"The following ${unexpectedDF.size} rows were unexpected in snappyDF:\n")
for(i <- 0 to unexpectedDF.size())
pw.println(unexpectedDF.get(i))
if(unexpectedDF.length > 0) {
aStr.append(s"The following ${unexpectedDF.size} rows were unexpected in snappyDF:\n")
for(i <- 0 to unexpectedDF.size)
aStr.append(unexpectedDF(i) + "\n")
}

// check if the mismatch is due to decimal, and can be ignored
if (unexpectedDF.length == missingDF.length) {
for (i <- 0 until missingDF.size) {
if (!isIgnorable(missingDF(i).toString, unexpectedDF(i).toString)) {
fullRSValidationFailed = true
}
}
pw.println("This mismatch can be ignored.")
aStr.setLength(0) // data mismatch can be ignored
}
if(aStr.length > 0) {
pw.println(aStr)
fullRSValidationFailed = true
}
}
// fullRSValidationFailed
Expand All @@ -182,7 +205,6 @@ object SnappyTestUtils {
fullRSValidationFailed
}


def dataTypeConverter(row: Row): Row = {
val md = row.toSeq.map {
// case d: Double => "%18.1f".format(d).trim().toDouble
Expand Down Expand Up @@ -216,7 +238,7 @@ object SnappyTestUtils {
})
sb.toString()
}).write.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").option(
"header", false).save(dest)
"header", true).save(dest)
}

/*
Expand Down
26 changes: 6 additions & 20 deletions dtests/src/test/scala/io/snappydata/hydra/spva/SPVATestUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,34 +29,26 @@ object SPVATestUtil {
def createAndLoadReplicatedTables(snc: SnappyContext): Unit = {

snc.sql(SPVAQueries.patients_table)
SPVAQueries.patients(snc).write.insertInto("patient")

snc.sql(SPVAQueries.encounters_table)
SPVAQueries.encounters(snc).write.insertInto("encounters")

snc.sql(SPVAQueries.allergies_table)
SPVAQueries.allergies(snc).write.insertInto("allergies")

snc.sql(SPVAQueries.careplans_table)
SPVAQueries.careplans(snc).write.insertInto("careplans")

snc.sql(SPVAQueries.conditions_table)
SPVAQueries.conditions(snc).write.insertInto("conditions")

snc.sql(SPVAQueries.imaging_studies_table)
SPVAQueries.imaging_studies(snc).write.insertInto("imaging_studies")

snc.sql(SPVAQueries.immunizations_table)
SPVAQueries.immunizations(snc).write.insertInto("immunizations")

snc.sql(SPVAQueries.medications_table)
SPVAQueries.medications(snc).write.insertInto("medications")

snc.sql(SPVAQueries.observations_table)
SPVAQueries.observations(snc).write.insertInto("observations")

snc.sql(SPVAQueries.procedures_table)
SPVAQueries.procedures(snc).write.insertInto("procedures")

loadTables(snc)
}

def createAndLoadPartitionedTables(snc: SnappyContext): Unit = {
Expand Down Expand Up @@ -91,16 +83,7 @@ object SPVATestUtil {
" colocate_with 'PATIENTS', buckets '12', redundancy '1', PERSISTENT 'sync', " +
" EVICTION_BY 'LRUHEAPPERCENT')")

SPVAQueries.patients(snc).write.insertInto("patients")
SPVAQueries.encounters(snc).write.insertInto("encounters")
SPVAQueries.allergies(snc).write.insertInto("allergies")
SPVAQueries.careplans(snc).write.insertInto("careplans")
SPVAQueries.conditions(snc).write.insertInto("conditions")
SPVAQueries.imaging_studies(snc).write.insertInto("imaging_studies")
SPVAQueries.immunizations(snc).write.insertInto("immunizations")
SPVAQueries.medications(snc).write.insertInto("medications")
SPVAQueries.observations(snc).write.insertInto("observations")
SPVAQueries.procedures(snc).write.insertInto("procedures")
loadTables(snc)
}

def createAndLoadColumnTables(snc: SnappyContext): Unit = {
Expand Down Expand Up @@ -134,7 +117,10 @@ object SPVATestUtil {
snc.sql(SPVAQueries.procedures_table + " using column options(PARTITION_BY 'PATIENT', " +
" colocate_with 'PATIENTS', buckets '12', redundancy '1', PERSISTENT 'sync', " +
" EVICTION_BY 'LRUHEAPPERCENT')")
loadTables(snc)
}

def loadTables(snc: SnappyContext): Unit = {
SPVAQueries.patients(snc).write.insertInto("patients")
SPVAQueries.encounters(snc).write.insertInto("encounters")
SPVAQueries.allergies(snc).write.insertInto("allergies")
Expand Down

0 comments on commit 609714d

Please sign in to comment.