Skip to content

Commit

Permalink
Merge branch '0.8.2-integration' to master 0.8.2 release
Browse files Browse the repository at this point in the history
  • Loading branch information
jackson-paul committed Feb 22, 2019
2 parents 7400035 + 610533d commit 16fc319
Show file tree
Hide file tree
Showing 189 changed files with 9,249 additions and 2,581 deletions.
1 change: 1 addition & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
version.sbt merge=ours
670 changes: 296 additions & 374 deletions README.md

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions akka-bootstrapper/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ akka-bootstrapper {

# The base url of the seeds endpoint. The baseUrl and path put together will form the seeds endpoint, and be used to discover seeds if a cluster already exists. Ends with a slash.
base-url = "http://localhost:8080/"

# Number of times we will retry fetching information from seeds endpoint before resorting to creating a new cluster.
retries = 5

# Sleep time between retries
sleep-between-retries = 10s
}

seed-discovery {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ final class AkkaBootstrapperSettings(val config: Config) extends StrictLogging {

val seedsBaseUrl = bootstrapper.getString("http-seeds.base-url")
val seedsPath = bootstrapper.getString("http-seeds.path")
val seedsHttpRetries = bootstrapper.getInt("http-seeds.retries")
val seedsHttpSleepBetweenRetries = bootstrapper.getDuration("http-seeds.sleep-between-retries")

// used by simple dns srv and consul
lazy val seedNodeCount: Integer = bootstrapper.getInt("dns-srv.seed-node-count")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import scala.util.control.NonFatal
import akka.actor.{Address, AddressFromURIString}
import akka.cluster.Cluster
import com.typesafe.scalalogging.StrictLogging
import scalaj.http.Http
import scalaj.http.{Http, HttpResponse}

/** Seed node strategy. Some implementations discover, some simply read from immutable config. */
abstract class ClusterSeedDiscovery(val cluster: Cluster,
Expand All @@ -24,40 +24,44 @@ abstract class ClusterSeedDiscovery(val cluster: Cluster,
}

@throws(classOf[DiscoveryTimeoutException])
//scalastyle:off null
protected def discoverPeersForNewCluster: Seq[Address]

protected def discoverExistingCluster: Seq[Address] = {

val seedsEndpoint = settings.seedsBaseUrl + settings.seedsPath
var response: HttpResponse[String] = null
var retriesRemaining = settings.seedsHttpRetries
do {
try {
logger.info(s"Trying to fetch seeds from $seedsEndpoint ... $retriesRemaining retries remaining.")
response = Http(seedsEndpoint).timeout(2000, 2000).asString
logger.info(s"Seeds endpoint returned a ${response.code}. Response body was ${response.body}")
} catch {
case NonFatal(e) =>
logger.info(s"Seeds endpoint $seedsEndpoint failed. This is expected on cluster bootstrap", e)
}
retriesRemaining -= 1
if (retriesRemaining > 0) Thread.sleep(settings.seedsHttpSleepBetweenRetries.toMillis)
} while ((response == null || !response.is2xx) && retriesRemaining > 0)

logger.info("Checking seeds endpoint {} to see if cluster already exists", seedsEndpoint)

try {

val response = Http(seedsEndpoint).timeout(1000, 1000).asString
if (!response.is2xx) {
logger.info("Seeds endpoint returned a {}. Assuming cluster does not exist. Response body was {}",
response.code.toString, response.body)
Seq.empty[Address]
} else {
decode[ClusterMembershipHttpResponse](response.body) match {
case Right(membersResponse) =>
logger.info("Cluster exists. Response: {}", membersResponse)
membersResponse.members.sorted.map(a => AddressFromURIString.parse(a))
case Left(ex) =>
logger.error(s"Exception parsing JSON response ${response.body}, returning empty seeds", ex)
Seq.empty[Address]
}
if (response == null || !response.is2xx) {
logger.info(s"Giving up on discovering seeds after ${settings.seedsHttpRetries} retries. " +
s"Assuming cluster does not exist. ")
Seq.empty[Address]
} else {
decode[ClusterMembershipHttpResponse](response.body) match {
case Right(membersResponse) =>
logger.info("Cluster exists. Response: {}", membersResponse)
membersResponse.members.sorted.map(a => AddressFromURIString.parse(a))
case Left(ex) =>
logger.error(s"Exception parsing JSON response ${response.body}, returning empty seeds", ex)
Seq.empty[Address]
}
} catch {
case NonFatal(e) =>
logger.info("Seeds endpoint {} did not return the seeds. Cluster does not exist.", seedsEndpoint)
Seq.empty[Address]
}
}
}


object ClusterSeedDiscovery {
/** Seed node strategy. Some implementations discover, some simply read them. */
def apply(cluster: Cluster, settings: AkkaBootstrapperSettings): ClusterSeedDiscovery = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import filodb.cassandra.columnstore.CassandraColumnStore
import filodb.cassandra.metastore.CassandraMetaStore
import filodb.coordinator.StoreFactory
import filodb.core.memstore.TimeSeriesMemStore
import filodb.core.store.NullColumnStore

/**
* A StoreFactory for a TimeSeriesMemStore backed by a Cassandra ChunkSink for on-demand recovery/persistence
Expand All @@ -19,4 +20,19 @@ class CassandraTSStoreFactory(config: Config, sched: Scheduler) extends StoreFac
val colStore = new CassandraColumnStore(config, sched)(sched)
val metaStore = new CassandraMetaStore(config.getConfig("cassandra"))(sched)
val memStore = new TimeSeriesMemStore(config, colStore, metaStore)(sched)
}

/**
* A StoreFactory for a TimeSeriesMemStore with Cassandra for metadata, but NullColumnStore for
* disabling write of chunks to a persistent column store. This can be used in environments
* where we would like to test in-memory aspects of the store and ignore persistence and
* on-demand-paging.
*
* @param config a Typesafe Config, not at the root but at the "filodb." level
* @param sched a Monix Scheduler, recommended to be the standard I/O pool, for scheduling asynchronous I/O
*/
class NonPersistentTSStoreFactory(config: Config, sched: Scheduler) extends StoreFactory {
val colStore = new NullColumnStore()(sched)
val metaStore = new CassandraMetaStore(config.getConfig("cassandra"))(sched)
val memStore = new TimeSeriesMemStore(config, colStore, metaStore)(sched)
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ extends ColumnStore with CassandraChunkSource with StrictLogging {
logger.info(s"Starting CassandraColumnStore with config ${cassandraConfig.withoutPath("password")}")

private val writeParallelism = cassandraConfig.getInt("write-parallelism")
private val partitionListNumStripesPerShard = cassandraConfig.getInt("partition-list-num-stripes-per-shard")

val sinkStats = new ChunkSinkStats

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ package filodb.cassandra.metastore

import scala.concurrent.{ExecutionContext, Future}

import com.datastax.driver.core.Row
import com.datastax.driver.core.{ConsistencyLevel, Row}
import com.typesafe.config.{Config, ConfigFactory, ConfigRenderOptions}

import filodb.cassandra.{FiloCassandraConnector, FiloSessionProvider}
import filodb.core.IngestionKeys
import filodb.core.downsample.DownsampleConfig
import filodb.core.store.{IngestionConfig, StoreConfig}

/**
Expand Down Expand Up @@ -38,7 +38,8 @@ sealed class IngestionConfigTable(val config: Config, val sessionProvider: FiloS
ConfigFactory.parseString(row.getString(IngestionKeys.Resources)),
row.getString("factoryclass"),
sourceConf,
StoreConfig(sourceConf.getConfig("store")))
StoreConfig(sourceConf.getConfig("store")),
DownsampleConfig.downsampleConfigFromSource(sourceConf))
}

def initialize(): Future[Response] = execCql(createCql)
Expand All @@ -54,10 +55,17 @@ sealed class IngestionConfigTable(val config: Config, val sessionProvider: FiloS
execStmt(insertCql.bind(state.ref.dataset, state.ref.database.getOrElse(""),
state.resources.root.render(ConfigRenderOptions.concise),
state.streamFactoryClass,
state.streamStoreConfig.root.render(ConfigRenderOptions.concise)), AlreadyExists)
state.sourceStoreConfig.root.render(ConfigRenderOptions.concise)), AlreadyExists)

// SELECT * with consistency ONE to let it succeed more often. This is a temporary workaround to rearranging
// the schema so we don't need to do a full table scan. It is justified because the ingestion config table
// almost never changes, this read is done only on new cluster startup, and the table is only written to with
// the setup command (which always runs well after cluster startup).
lazy val readAllCql = session.prepare(s"SELECT * FROM $tableString")
.setConsistencyLevel(ConsistencyLevel.ONE)

def readAllConfigs(): Future[Seq[IngestionConfig]] =
session.executeAsync(s"SELECT * FROM $tableString")
session.executeAsync(readAllCql.bind())
.toIterator.map(_.map(fromRow).toSeq)

def deleteIngestionConfig(dataset: DatasetRef): Future[Response] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class MemstoreCassandraSinkSpec extends AllTablesTest {
val start = System.currentTimeMillis
val stream = Observable.fromIterable(groupedRecords(dataset1, linearMultiSeries(startTs=start)))
val flushStream = FlushStream.everyN(4, 50, stream.share)
memStore.ingestStream(dataset1.ref, 0, stream, scheduler, flushStream)(ex => throw ex).futureValue
memStore.ingestStream(dataset1.ref, 0, stream, scheduler, flushStream).futureValue

// Two flushes and 3 chunksets have been flushed
memStore.store.sinkStats.chunksetsWritten should be >= 3
Expand Down
Loading

0 comments on commit 16fc319

Please sign in to comment.