Skip to content
This repository has been archived by the owner on Mar 27, 2023. It is now read-only.

Commit

Permalink
Add Cassandra database storage back end
Browse files Browse the repository at this point in the history
  • Loading branch information
Michel Zimmer committed Jun 22, 2022
1 parent 710750c commit 5e46b8a
Show file tree
Hide file tree
Showing 28 changed files with 1,254 additions and 451 deletions.
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ LABEL org.opencontainers.image.vendor="neuland – Büro für Informatik GmbH"
LABEL org.opencontainers.image.licenses="Apache-2.0"
LABEL org.opencontainers.image.title="bandwhichd-server"
LABEL org.opencontainers.image.description="bandwhichd server collecting measurements and calculating statistics"
LABEL org.opencontainers.image.version="0.4.0"
LABEL org.opencontainers.image.version="0.5.0"
USER guest
ENTRYPOINT ["/opt/java/openjdk/bin/java"]
CMD ["-jar", "/opt/bandwhichd-server.jar"]
EXPOSE 8080
HEALTHCHECK --interval=5s --timeout=1s --start-period=2s --retries=2 \
CMD wget --spider http://localhost:8080/v1/health || exit 1
COPY --from=build --chown=root:root /tmp/bandwhichd-server/target/scala-3.1.2/bandwhichd-server-assembly-0.4.0.jar /opt/bandwhichd-server.jar
COPY --from=build --chown=root:root /tmp/bandwhichd-server/target/scala-3.1.2/bandwhichd-server-assembly-0.5.0.jar /opt/bandwhichd-server.jar
17 changes: 16 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,26 @@ lazy val root = (project in file("."))
.settings(
organization := "de.neuland-bfi",
name := "bandwhichd-server",
version := "0.4.0",
version := "0.5.0",
scalaVersion := "3.1.2",
Compile / scalaSource := baseDirectory.value / "src" / "main" / "scala",
Test / scalaSource := baseDirectory.value / "src" / "test" / "scala",
Test / fork := true,
ThisBuild / assemblyMergeStrategy := {
case PathList(ps @ _*) if ps.last endsWith "module-info.class" =>
MergeStrategy.discard
case PathList(ps @ _*)
if ps.last endsWith "io.netty.versions.properties" =>
MergeStrategy.discard
case path =>
val oldStrategy = (ThisBuild / assemblyMergeStrategy).value
oldStrategy(path)
},
libraryDependencies += "com.dimafeng" %% "testcontainers-scala-scalatest" % "0.40.8" % "test",
libraryDependencies += "co.fs2" %% "fs2-core" % "3.2.8",
libraryDependencies += "co.fs2" %% "fs2-reactive-streams" % "3.2.8",
libraryDependencies += "com.comcast" %% "ip4s-core" % "3.1.3",
libraryDependencies += "com.datastax.oss" % "java-driver-core" % "4.14.1",
libraryDependencies += "io.circe" %% "circe-core" % "0.14.2",
libraryDependencies += "io.circe" %% "circe-parser" % "0.14.2",
libraryDependencies += "org.typelevel" %% "cats-effect" % "3.3.12",
Expand Down
22 changes: 21 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,10 +1,30 @@
# bandwhichd-server depends on cassandra.
# docker-compose does not detect if cassandra is up properly.
# Additionally, the keyspace needs to be created manually for
# now, because it is not part of the automated migrations.
#
# 1. Start cassandra
# docker-compose up --detach cassandra
# 2. Create keyspace (it takes a while until cassandra is ready and accepts connections)
# docker-compose exec -- cassandra cqlsh --execute="create keyspace if not exists bandwhichd with replication = {'class': 'SimpleStrategy', 'replication_factor': 1};"
# 3. Start bandwhichd-server
# docker-compose up --build --detach bandwhichd-server
# 4. Follow logs
# docker-compose logs --follow

services:
main:
cassandra:
image: cassandra:4.0.4
ports:
- 9042:9042
bandwhichd-server:
build:
context: .
command:
- -Xmx1g
- -jar
- /opt/bandwhichd-server.jar
environment:
CONTACT_POINTS: "cassandra:9042"
ports:
- 8080:8080
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import de.neuland.bandwhichd.server.domain.measurement.Timing.{
Timeframe,
Timestamp
}
import de.neuland.bandwhichd.server.lib.time.ZonedInterval
import de.neuland.bandwhichd.server.lib.time.Interval
import io.circe.{Decoder, DecodingFailure, HCursor}

import java.time.{Duration, ZonedDateTime}
Expand Down Expand Up @@ -40,8 +40,8 @@ object DomainDecoders {

val timeframeDecoder: Decoder[Timeframe] =
(c: HCursor) =>
c.as[ZonedInterval](
de.neuland.bandwhichd.server.lib.time.circe.Decoder.zonedIntervalDecoder
c.as[Interval](
de.neuland.bandwhichd.server.lib.time.circe.Decoder.intervalDecoder
).map(Timeframe.apply)

val timestampDecoder: Decoder[Timestamp] =
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package de.neuland.bandwhichd.server.adapter.in.v1.health

import cats.effect.Async
import de.neuland.bandwhichd.server.adapter.out.measurement.MeasurementInMemoryRepository
import de.neuland.bandwhichd.server.lib.health.jvm.JvmMemoryUtilization
import de.neuland.bandwhichd.server.lib.health.{Check, Health}
import fs2.Pure
Expand All @@ -10,9 +9,7 @@ import org.http4s.dsl.Http4sDsl
import org.http4s.implicits.*
import org.http4s.{Entity, HttpRoutes, Response, Status}

class HealthController[F[_]: Async](
private val measurementInMemoryRepository: MeasurementInMemoryRepository[F]
) extends Http4sDsl[F] {
class HealthController[F[_]: Async] extends Http4sDsl[F] {

val routes: HttpRoutes[F] =
HttpRoutes.of[F] { case GET -> Root / "v1" / "health" =>
Expand All @@ -21,8 +18,7 @@ class HealthController[F[_]: Async](

private val currentHealth = Health(
Seq[Check](
JvmMemoryUtilization.current,
measurementInMemoryRepository
JvmMemoryUtilization.current
)
)

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package de.neuland.bandwhichd.server.adapter.out

import cats.implicits.*
import cats.effect.Async
import com.datastax.oss.driver.api.core.cql.{SimpleStatement, Statement}
import de.neuland.bandwhichd.server.boot.Configuration
import de.neuland.bandwhichd.server.lib.cassandra.CassandraContext

class CassandraMigration[F[_]: Async](
private val cassandraContext: CassandraContext[F]
) {
def migrate(configuration: Configuration): F[Unit] =
for {
_ <- createCidrType(configuration)
_ <- createMeasurementNetworkConfigurationInterfaceType(configuration)
_ <- createMeasurementNetworkConfigurationOpenSocketType(configuration)
_ <- createMeasurementNetworkUtilizationConnectionType(configuration)
_ <- createMeasurementsTable(configuration)
} yield ()

private def createMeasurementsTable(
configuration: Configuration
): F[Unit] =
cassandraContext.executeRawExpectNoRow(
SimpleStatement
.builder(
"""create table if not exists measurements (
| agent_id uuid,
| timestamp timestamp,
| end_timestamp timestamp,
| measurement_type ascii,
| network_configuration_machine_id uuid,
| network_configuration_hostname text,
| network_configuration_interfaces frozen<list<frozen<measurement_network_configuration_interface>>>,
| network_configuration_open_sockets frozen<list<frozen<measurement_network_configuration_open_socket>>>,
| network_utilization_connections frozen<list<frozen<measurement_network_utilization_connection>>>,
| primary key ((agent_id), timestamp, measurement_type),
|) with clustering order by (timestamp asc)""".stripMargin
)
.setKeyspace(configuration.measurementsKeyspace)
.build()
)

private def createMeasurementNetworkConfigurationInterfaceType(
configuration: Configuration
): F[Unit] =
cassandraContext.executeRawExpectNoRow(
SimpleStatement
.builder(
"""create type if not exists measurement_network_configuration_interface (
| name text,
| is_up boolean,
| networks frozen<list<frozen<cidr>>>,
|)""".stripMargin
)
.setKeyspace(configuration.measurementsKeyspace)
.build()
)

private def createMeasurementNetworkConfigurationOpenSocketType(
configuration: Configuration
): F[Unit] =
cassandraContext.executeRawExpectNoRow(
SimpleStatement
.builder(
"""create type if not exists measurement_network_configuration_open_socket (
| socket text,
| protocol ascii,
| maybe_process_name text,
|)""".stripMargin
)
.setKeyspace(configuration.measurementsKeyspace)
.build()
)

private def createMeasurementNetworkUtilizationConnectionType(
configuration: Configuration
): F[Unit] =
cassandraContext.executeRawExpectNoRow(
SimpleStatement
.builder(
"""create type if not exists measurement_network_utilization_connection (
| interface_name text,
| local_socket text,
| remote_socket text,
| protocol ascii,
| received bigint,
| sent bigint,
|)""".stripMargin
)
.setKeyspace(configuration.measurementsKeyspace)
.build()
)

private def createCidrType(
configuration: Configuration
): F[Unit] =
cassandraContext.executeRawExpectNoRow(
SimpleStatement
.builder(
"""create type if not exists cidr (
| address inet,
| prefix_bits smallint,
|)""".stripMargin
)
.setKeyspace(configuration.measurementsKeyspace)
.build()
)
}
Loading

0 comments on commit 5e46b8a

Please sign in to comment.