Skip to content

Commit

Permalink
[hive] Optimize spark drop table test case (apache#4853)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaohongbo committed Jan 9, 2025
1 parent 4975635 commit 9462d04
Showing 1 changed file with 30 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,27 @@

package org.apache.paimon.spark.sql

import org.apache.paimon.catalog.Identifier
import org.apache.paimon.fs.Path
import org.apache.paimon.hive.HiveMetastoreClient
import org.apache.paimon.spark.PaimonHiveTestBase
import org.apache.paimon.spark.catalog.WithPaimonCatalog
import org.apache.paimon.spark.PaimonHiveTestBase.hiveUri
import org.apache.paimon.spark.{PaimonHiveTestBase, SparkCatalog}
import org.apache.paimon.table.FileStoreTable

import org.apache.spark.SparkConf
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.junit.jupiter.api.Assertions

abstract class DDLWithHiveCatalogTestBase extends PaimonHiveTestBase {

protected val paimonHiveNoCacheCatalogName: String = "paimon_hive_no_cache"

override protected def sparkConf: SparkConf = {
super.sparkConf.set(s"spark.sql.catalog.$paimonHiveNoCacheCatalogName.cache-enabled", "false")
.set(s"spark.sql.catalog.$paimonHiveNoCacheCatalogName", classOf[SparkCatalog].getName)
.set(s"spark.sql.catalog.$paimonHiveNoCacheCatalogName.metastore", "hive")
.set(s"spark.sql.catalog.$paimonHiveNoCacheCatalogName.warehouse", tempHiveDBDir.getCanonicalPath)
.set(s"spark.sql.catalog.$paimonHiveNoCacheCatalogName.uri", hiveUri)
}

test("Paimon DDL with hive catalog: create database with location and comment") {
Seq(sparkCatalogName, paimonHiveCatalogName).foreach {
catalogName =>
Expand Down Expand Up @@ -624,47 +633,26 @@ abstract class DDLWithHiveCatalogTestBase extends PaimonHiveTestBase {
}

test("Paimon DDL with hive catalog: drop table which location has been deleted") {
spark.close()

Seq("paimon", sparkCatalogName, paimonHiveCatalogName).foreach {
Seq("paimon", sparkCatalogName, paimonHiveCatalogName, paimonHiveNoCacheCatalogName).foreach {
catalogName =>
Seq(false, true).foreach {
cacheEnabled =>
val customSpark = SparkSession
.builder()
.master("local[2]")
.config(sparkConf)
.config(s"spark.sql.catalog.$catalogName.cache-enabled", cacheEnabled)
.getOrCreate()

customSpark.sql(s"USE $catalogName")
customSpark.sql("CREATE DATABASE IF NOT EXISTS paimon_db")

withDatabase("paimon_db") {
customSpark.sql("USE paimon_db")
customSpark.sql("CREATE TABLE t USING paimon")
val currentCatalog = customSpark.sessionState.catalogManager.currentCatalog
.asInstanceOf[WithPaimonCatalog]
.paimonCatalog()
val table = currentCatalog
.getTable(Identifier.create("paimon_db", "t"))
.asInstanceOf[FileStoreTable]
table.fileIO().delete(table.location(), true)
if (catalogName.equals("paimon")) {
// Filesystem catalog determines whether a table exists based on table location
assert(customSpark.sql("SHOW TABLES").count() == 0)
} else {
// Hive catalog determines whether a table exists based on metadata in hms
assert(customSpark.sql("SHOW TABLES").count() == 1)
}
customSpark.sql("DROP TABLE IF EXISTS t")
assert(customSpark.sql("SHOW TABLES").count() == 0)
}

customSpark.close()
spark.sql(s"USE $catalogName")
withDatabase("paimon_db") {
spark.sql(s"CREATE DATABASE paimon_db")
spark.sql(s"USE paimon_db")
spark.sql("CREATE TABLE t USING paimon")
val table = loadTable("paimon_db", "t")
table.fileIO().delete(table.location(), true)
if (catalogName.equals("paimon")) {
// Filesystem catalog determines whether a table exists based on table location
assert(spark.sql("SHOW TABLES").count() == 0)
} else {
// Hive catalog determines whether a table exists based on metadata in hms
assert(spark.sql("SHOW TABLES").count() == 1)
}
spark.sql("DROP TABLE IF EXISTS t")
assert(spark.sql("SHOW TABLES").count() == 0)
}
}
reset()
}

def getDatabaseProp(dbName: String, propertyName: String): String = {
Expand Down

0 comments on commit 9462d04

Please sign in to comment.