Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/SnappyDataInc/snappydata
Browse files Browse the repository at this point in the history
…into v2connector-catalog
  • Loading branch information
PradeepSurale committed Feb 18, 2019
2 parents 7d16d12 + 568b7e1 commit 799d4ab
Show file tree
Hide file tree
Showing 65 changed files with 2,988 additions and 237 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,22 +47,22 @@ SnappyData artifacts are hosted in Maven Central. You can add a Maven dependency
```
groupId: io.snappydata
artifactId: snappydata-core_2.11
version: 1.0.1
version: 1.0.2.1
groupId: io.snappydata
artifactId: snappydata-cluster_2.11
version: 1.0.1
version: 1.0.2.1
```

**Using SBT Dependency**

If you are using SBT, add this line to your **build.sbt** for core SnappyData artifacts:

`libraryDependencies += "io.snappydata" % "snappydata-core_2.11" % "1.0.1"`
`libraryDependencies += "io.snappydata" % "snappydata-core_2.11" % "1.0.2.1"`

For additions related to SnappyData cluster, use:

`libraryDependencies += "io.snappydata" % "snappydata-cluster_2.11" % "1.0.1"`
`libraryDependencies += "io.snappydata" % "snappydata-cluster_2.11" % "1.0.2.1"`

You can find more specific SnappyData artifacts [here](http://mvnrepository.com/artifact/io.snappydata)

Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ allprojects {
apply plugin: "build-time-tracker"

group = 'io.snappydata'
version = '1.0.2.1'
version = '1.0.2.2'

// apply compiler options
tasks.withType(JavaCompile) {
Expand Down
12 changes: 10 additions & 2 deletions cluster/sbin/snappy-leads.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,18 @@ sbin="$(dirname "$(absPath "$0")")"
. "$SNAPPY_HOME/bin/load-spark-env.sh"
. "$SNAPPY_HOME/bin/load-snappy-env.sh"

CONF_DIR_OPT=
# Check if --config is passed as an argument. It is an optional parameter.
if [ "$1" == "--config" ]
then
CONF_DIR=$2
CONF_DIR_OPT="--config $CONF_DIR"
shift 2
fi

# Launch the slaves
if echo $@ | grep -qw start; then
"$sbin/snappy-nodes.sh" lead cd "$SNAPPY_HOME" \; "$sbin/snappy-lead.sh" "$@" $LEAD_STARTUP_OPTIONS
"$sbin/snappy-nodes.sh" lead $CONF_DIR_OPT cd "$SNAPPY_HOME" \; "$sbin/snappy-lead.sh" "$@" $LEAD_STARTUP_OPTIONS
else
"$sbin/snappy-nodes.sh" lead cd "$SNAPPY_HOME" \; "$sbin/snappy-lead.sh" "$@"
"$sbin/snappy-nodes.sh" lead $CONF_DIR_OPT cd "$SNAPPY_HOME" \; "$sbin/snappy-lead.sh" "$@"
fi
13 changes: 11 additions & 2 deletions cluster/sbin/snappy-locators.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,18 @@ sbin="$(dirname "$(absPath "$0")")"
. "$SNAPPY_HOME/bin/load-spark-env.sh"
. "$SNAPPY_HOME/bin/load-snappy-env.sh"

CONF_DIR_OPT=
# Check if --config is passed as an argument. It is an optional parameter.
if [ "$1" == "--config" ]
then
CONF_DIR=$2
CONF_DIR_OPT="--config $CONF_DIR"
shift 2
fi

# Launch the slaves
if echo $@ | grep -qw start; then
"$sbin/snappy-nodes.sh" locator cd "$SNAPPY_HOME" \; "$sbin/snappy-locator.sh" "$@" $LOCATOR_STARTUP_OPTIONS $ENCRYPT_PASSWORD_OPTIONS
"$sbin/snappy-nodes.sh" locator $CONF_DIR_OPT cd "$SNAPPY_HOME" \; "$sbin/snappy-locator.sh" "$@" $LOCATOR_STARTUP_OPTIONS $ENCRYPT_PASSWORD_OPTIONS
else
"$sbin/snappy-nodes.sh" locator cd "$SNAPPY_HOME" \; "$sbin/snappy-locator.sh" "$@"
"$sbin/snappy-nodes.sh" locator $CONF_DIR_OPT cd "$SNAPPY_HOME" \; "$sbin/snappy-locator.sh" "$@"
fi
11 changes: 9 additions & 2 deletions cluster/sbin/snappy-servers.sh
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,16 @@ elif [ "$1" = "-fg" -o "$1" = "--foreground" ]; then
shift
fi

# Check for conf dir specification
CONF_DIR_OPT=
if [ "$1" = "--config" ]; then
CONF_DIR_OPT="--config $2"
shift 2
fi

# Launch the slaves
if echo $@ | grep -qw start; then
"$sbin/snappy-nodes.sh" server $BACKGROUND cd "$SNAPPY_HOME" \; "$sbin/snappy-server.sh" "$@" $SERVER_STARTUP_OPTIONS
"$sbin/snappy-nodes.sh" server $BACKGROUND $CONF_DIR_OPT cd "$SNAPPY_HOME" \; "$sbin/snappy-server.sh" "$@" $SERVER_STARTUP_OPTIONS
else
"$sbin/snappy-nodes.sh" server $BACKGROUND cd "$SNAPPY_HOME" \; "$sbin/snappy-server.sh" "$@"
"$sbin/snappy-nodes.sh" server $BACKGROUND $CONF_DIR_OPT cd "$SNAPPY_HOME" \; "$sbin/snappy-server.sh" "$@"
fi
15 changes: 12 additions & 3 deletions cluster/sbin/snappy-start-all.sh
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ fi

BACKGROUND=-bg
clustermode=
CONF_DIR_ARG=

while (( "$#" )); do
param="$1"
Expand All @@ -47,6 +48,14 @@ while (( "$#" )); do
-fg | --foreground)
BACKGROUND=-fg
;;
-conf | --config)
conf_dir="$2"
if [ ! -d $conf_dir ] ; then
echo "Conf directory $conf_dir does not exist"
exit 1
fi
CONF_DIR_ARG="--config $conf_dir"
shift ;;
rowstore)
clustermode="rowstore"
;;
Expand All @@ -58,12 +67,12 @@ done


# Start Locators
"$sbin"/snappy-locators.sh start $clustermode "$@"
"$sbin"/snappy-locators.sh $CONF_DIR_ARG start $clustermode "$@"

# Start Servers
"$sbin"/snappy-servers.sh $BACKGROUND start $clustermode "$@"
"$sbin"/snappy-servers.sh $BACKGROUND $CONF_DIR_ARG start $clustermode "$@"

# Start Leads
if [ "$clustermode" != "rowstore" ]; then
"$sbin"/snappy-leads.sh start
"$sbin"/snappy-leads.sh $CONF_DIR_ARG start
fi
15 changes: 12 additions & 3 deletions cluster/sbin/snappy-stop-all.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ sbin="$(dirname "$(absPath "$0")")"

BACKGROUND=-fg
clustermode=
CONF_DIR_ARG=

while (( "$#" )); do
param="$1"
Expand All @@ -43,6 +44,14 @@ while (( "$#" )); do
-fg | --foreground)
BACKGROUND=-fg
;;
-conf | --config)
conf_dir="$2"
if [ ! -d $conf_dir ] ; then
echo "Conf directory $conf_dir does not exists"
exit 1
fi
CONF_DIR_ARG="--config $conf_dir"
shift ;;
rowstore)
clustermode="rowstore"
;;
Expand All @@ -54,11 +63,11 @@ done

# Stop Leads
if [ "$clustermode" != "rowstore" ]; then
"$sbin"/snappy-leads.sh stop
"$sbin"/snappy-leads.sh $CONF_DIR_ARG stop
fi

# Stop Servers
"$sbin"/snappy-servers.sh $BACKGROUND stop
"$sbin"/snappy-servers.sh $BACKGROUND $CONF_DIR_ARG stop

# Stop locators
"$sbin"/snappy-locators.sh stop
"$sbin"/snappy-locators.sh $CONF_DIR_ARG stop
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.util.Properties

import scala.language.postfixOps
import scala.sys.process._
import scala.util.Random

import com.gemstone.gemfire.internal.shared.NativeCalls
import com.pivotal.gemfirexd.internal.engine.Misc
Expand Down Expand Up @@ -70,6 +71,7 @@ abstract class ClusterManagerTestBase(s: String)
bootProps.setProperty("critical-heap-percentage", "95")
bootProps.setProperty("gemfirexd.max-lock-wait", "60000")
bootProps.setProperty("member-timeout", "5000")
bootProps.setProperty("snappydata.sql.planCaching", random.nextBoolean().toString)

// reduce startup time
// sysProps.setProperty("p2p.discoveryTimeout", "1000")
Expand Down Expand Up @@ -112,6 +114,9 @@ abstract class ClusterManagerTestBase(s: String)

override def beforeClass(): Unit = {
super.beforeClass()
val logger = LoggerFactory.getLogger(getClass)
logger.info("Boot properties:" + bootProps)

doSetUp()
val locNetPort = locatorNetPort
val locNetProps = locatorNetProps
Expand Down Expand Up @@ -251,6 +256,7 @@ abstract class ClusterManagerTestBase(s: String)
object ClusterManagerTestBase extends Logging {
final def locatorPort: Int = DistributedTestBase.getDUnitLocatorPort
final lazy val locPort: Int = locatorPort
private val random = new Random()

/* SparkContext is initialized on the lead node and hence,
this can be used only by jobs running on Lead node */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,9 @@ object SplitSnappyClusterDUnitTest
.set("spark.testing.reservedMemory", "0")
.set("spark.sql.autoBroadcastJoinThreshold", "-1")
.set("snappydata.connection", connectionURL)
.set("snapptdata.sql.planCaching", random.nextBoolean().toString)

logInfo("Spark conf:" + conf.getAll.toString)

val sc = SparkContext.getOrCreate(conf)
// sc.setLogLevel("DEBUG")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

import com.gemstone.gemfire.DataSerializer
import com.gemstone.gemfire.cache.CacheClosedException
import com.gemstone.gemfire.internal.shared.{ClientSharedUtils, Version}
import com.gemstone.gemfire.internal.{ByteArrayDataInput, InternalDataSerializer}
import com.pivotal.gemfirexd.Attribute
Expand All @@ -36,7 +37,7 @@ import com.pivotal.gemfirexd.internal.iapi.types.{DataValueDescriptor, SQLChar}
import com.pivotal.gemfirexd.internal.impl.sql.execute.ValueRow
import com.pivotal.gemfirexd.internal.shared.common.StoredFormatIds
import com.pivotal.gemfirexd.internal.snappy.{LeadNodeExecutionContext, SparkSQLExecute}
import io.snappydata.{Constant, QueryHint}
import io.snappydata.{Constant, Property, QueryHint}

import org.apache.spark.serializer.{KryoSerializerPool, StructTypeSerializer}
import org.apache.spark.sql.catalyst.expressions
Expand Down Expand Up @@ -78,7 +79,7 @@ class SparkSQLExecuteImpl(val sql: String,

session.setPreparedQuery(preparePhase = false, pvs)

private[this] val df = session.sql(sql)
private[this] val df = Utils.sqlInternal(session, sql)

private[this] val thresholdListener = Misc.getMemStore.thresholdListener()

Expand Down Expand Up @@ -481,7 +482,12 @@ object SnappySessionPerConnection {
val session = connectionIdMap.get(connectionID)
if (session != null) session
else {
val session = SnappyContext().snappySession
val session = SnappyContext.globalSparkContext match {
// use a CancelException to force failover by client to another lead if available
case null => throw new CacheClosedException("No SparkContext ...")
case sc => new SnappySession(sc)
}
Property.PlanCaching.set(session.sessionState.conf, true)
val oldSession = connectionIdMap.putIfAbsent(connectionID, session)
if (oldSession == null) session else oldSession
}
Expand Down
30 changes: 30 additions & 0 deletions cluster/src/test/scala/org/apache/spark/sql/NorthWindTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.spark.sql

import io.snappydata.Property.PlanCaching
import io.snappydata.SnappyFunSuite
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}

Expand Down Expand Up @@ -57,6 +58,35 @@ class NorthWindTest
validatePartitionedColumnTableQueries(snc)
}

// enable if transformations are supported in plan-caching.
test("SNAP-2451"){
val planCaching = PlanCaching.get(snc.sessionState.conf)
PlanCaching.set(snc.sessionState.conf, false)
try {
createAndLoadColumnTables(snc)

val df1 = snc.sql("SELECT ShipCountry, Sum(Order_Details.UnitPrice * Quantity * Discount)" +
" AS ProductSales FROM Orders INNER JOIN Order_Details ON" +
" Orders.OrderID = Order_Details.OrderID" +
" where orders.OrderID > 11000 GROUP BY ShipCountry")

val result1 = df1.repartition(1).collect()
assert(result1.length == 22)


val df2 = snc.sql("SELECT ShipCountry, Sum(Order_Details.UnitPrice * Quantity * Discount)" +
" AS ProductSales FROM Orders INNER JOIN Order_Details ON" +
" Orders.OrderID = Order_Details.OrderID" +
" where orders.OrderID > 11070 GROUP BY ShipCountry")

val result2 = df2.repartition(1).collect()
assert(result2.length == 7)
} finally {
PlanCaching.set(snc.sessionState.conf, planCaching)
}

}

test("Test colocated tables queries") {
createAndLoadColocatedTables(snc)
validateColocatedTableQueries(snc)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

package org.apache.spark.sql

import io.snappydata.Property.PlanCaching
import io.snappydata.{Property, SnappyFunSuite}
import org.scalatest.Matchers._

Expand Down Expand Up @@ -433,6 +434,9 @@ class SnappySQLQuerySuite extends SnappyFunSuite {

test("Push down TPCH Q19") {
session.sql("set spark.sql.autoBroadcastJoinThreshold=-1")
session.sql("set snappydata.sql.planCaching=true").collect()
val planCaching = PlanCaching.get(snc.sessionState.conf)
PlanCaching.set(snc.sessionState.conf, true)
try {
// this loop exists because initial implementation had a problem
// in RefParamLiteral.hashCode() that caused it to fail once in 2-3 runs
Expand All @@ -441,6 +445,7 @@ class SnappySQLQuerySuite extends SnappyFunSuite {
}
} finally {
session.sql(s"set spark.sql.autoBroadcastJoinThreshold=${10L * 1024 * 1024}")
PlanCaching.set(snc.sessionState.conf, planCaching)
}
}

Expand Down Expand Up @@ -469,7 +474,6 @@ class SnappySQLQuerySuite extends SnappyFunSuite {
| +- SubqueryAlias CT2
| +- Relation[ID#0,DATA#0] ColumnFormatRelation[APP.CT2]
|""".stripMargin

assert(idPattern.replaceAllIn(ds.queryExecution.analyzed.treeString, "#0") === expectedTree)
assert(ds.collect() === Array(Row(100L, "data100")))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@ package org.apache.spark.sql.execution.benchmark
import java.sql.{Date, DriverManager, Timestamp}
import java.time.{ZoneId, ZonedDateTime}

import scala.util.Random

import com.typesafe.config.Config
import io.snappydata.SnappyFunSuite
import org.scalatest.Assertions

import org.apache.spark.memory.SnappyUnifiedMemoryManager
import org.apache.spark.sql._
import org.apache.spark.sql.collection.Utils
import org.apache.spark.sql.execution.benchmark.TAQTest.CreateOp
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{Decimal, DecimalType, StringType, StructField, StructType}
Expand Down Expand Up @@ -133,7 +136,7 @@ class TAQTestJob extends SnappySQLJob with Logging {
for (_ <- 1 to numRuns) {
val start = System.nanoTime()
for (_ <- 1 to numIters) {
session.sql("select * from citi_order where id=1000 " +
Utils.sqlInternal(session, "select * from citi_order where id=1000 " +
"--GEMFIREXD-PROPERTIES executionEngine=Spark").collectInternal()
}
val end = System.nanoTime()
Expand Down Expand Up @@ -267,6 +270,8 @@ object TAQTest extends Logging with Assertions {
System.runFinalization()
}

private val random = new Random()

def newSparkConf(addOn: SparkConf => SparkConf = null): SparkConf = {
val cores = math.min(16, Runtime.getRuntime.availableProcessors())
val conf = new SparkConf()
Expand All @@ -277,8 +282,9 @@ object TAQTest extends Logging with Assertions {
conf.set("snappydata.store.memory-size", "1200m")
}
conf.set("spark.memory.manager", classOf[SnappyUnifiedMemoryManager].getName)
conf.set("spark.serializer", "org.apache.spark.serializer.PooledKryoSerializer")
conf.set("spark.closure.serializer", "org.apache.spark.serializer.PooledKryoSerializer")
.set("spark.serializer", "org.apache.spark.serializer.PooledKryoSerializer")
.set("spark.closure.serializer", "org.apache.spark.serializer.PooledKryoSerializer")
.set("snappydata.sql.planCaching", random.nextBoolean().toString)
if (addOn != null) {
addOn(conf)
}
Expand Down
Loading

0 comments on commit 799d4ab

Please sign in to comment.