From 9e00282232bf1b77de86995e25dc4c3f6b8ad486 Mon Sep 17 00:00:00 2001 From: Michel Zimmer Date: Fri, 24 Jun 2022 00:08:58 +0200 Subject: [PATCH] Fix accidental Cassandra tombstone creation when recording measurements --- Dockerfile | 4 +- build.sbt | 2 +- docker-compose.yml | 2 + .../MeasurementCassandraCodecs.scala | 188 +++++++++--------- 4 files changed, 98 insertions(+), 98 deletions(-) diff --git a/Dockerfile b/Dockerfile index 24f1588..a6b0e04 100644 --- a/Dockerfile +++ b/Dockerfile @@ -26,11 +26,11 @@ LABEL org.opencontainers.image.vendor="neuland – Büro für Informatik GmbH" LABEL org.opencontainers.image.licenses="Apache-2.0" LABEL org.opencontainers.image.title="bandwhichd-server" LABEL org.opencontainers.image.description="bandwhichd server collecting measurements and calculating statistics" -LABEL org.opencontainers.image.version="0.5.0" +LABEL org.opencontainers.image.version="0.5.1" USER guest ENTRYPOINT ["/opt/java/openjdk/bin/java"] CMD ["-jar", "/opt/bandwhichd-server.jar"] EXPOSE 8080 HEALTHCHECK --interval=5s --timeout=1s --start-period=2s --retries=2 \ CMD wget --spider http://localhost:8080/v1/health || exit 1 -COPY --from=build --chown=root:root /tmp/bandwhichd-server/target/scala-3.1.2/bandwhichd-server-assembly-0.5.0.jar /opt/bandwhichd-server.jar \ No newline at end of file +COPY --from=build --chown=root:root /tmp/bandwhichd-server/target/scala-3.1.2/bandwhichd-server-assembly-0.5.1.jar /opt/bandwhichd-server.jar \ No newline at end of file diff --git a/build.sbt b/build.sbt index b62244b..bbeef60 100644 --- a/build.sbt +++ b/build.sbt @@ -2,7 +2,7 @@ lazy val root = (project in file(".")) .settings( organization := "de.neuland-bfi", name := "bandwhichd-server", - version := "0.5.0", + version := "0.5.1", scalaVersion := "3.1.2", Compile / scalaSource := baseDirectory.value / "src" / "main" / "scala", Test / scalaSource := baseDirectory.value / "src" / "test" / "scala", diff --git a/docker-compose.yml b/docker-compose.yml index f8e10fe..f81c3aa 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -18,6 +18,8 @@ services: ports: - 9042:9042 bandwhichd-server: + depends_on: + - cassandra build: context: . command: diff --git a/src/main/scala/de/neuland/bandwhichd/server/adapter/out/measurement/MeasurementCassandraCodecs.scala b/src/main/scala/de/neuland/bandwhichd/server/adapter/out/measurement/MeasurementCassandraCodecs.scala index b7973ee..9371efc 100644 --- a/src/main/scala/de/neuland/bandwhichd/server/adapter/out/measurement/MeasurementCassandraCodecs.scala +++ b/src/main/scala/de/neuland/bandwhichd/server/adapter/out/measurement/MeasurementCassandraCodecs.scala @@ -16,127 +16,125 @@ import java.util.UUID import scala.util.Try object MeasurementCassandraCodecs { - given Encoder[Measurement[Timing]] = - (measurement: Measurement[Timing]) => - measurement match - case nc: Measurement.NetworkConfiguration => - Encoder[Measurement.NetworkConfiguration].mapJson( - _.mapObject( - _.add( - "measurement_type", - Json.fromString("network_configuration") - ) - ) - )(nc) - case nu: Measurement.NetworkUtilization => - Encoder[Measurement.NetworkUtilization].mapJson( - _.mapObject( - _.add( - "measurement_type", - Json.fromString("network_utilization") - ) - ) - )(nu) - - given Decoder[Measurement[Timing]] = - (c: HCursor) => { - val measurementTypeCursor = c.downField("measurement_type") - for { - measurementType <- measurementTypeCursor.as[String] - measurement <- { - measurementType match - case "network_configuration" => - c.as[Measurement.NetworkConfiguration] - case "network_utilization" => - c.as[Measurement.NetworkUtilization] - case _ => - Left( - DecodingFailure( - s"invalid measurement type $measurementType", - measurementTypeCursor.history - ) - ) - } - } yield measurement - } - - given Codec[Measurement.NetworkConfiguration] = - Codec.forProduct6( + given Codec[Measurement[Timing]] = + Codec.forProduct9( "agent_id", "timestamp", + "end_timestamp", + "measurement_type", "network_configuration_machine_id", "network_configuration_hostname", "network_configuration_interfaces", - "network_configuration_open_sockets" + "network_configuration_open_sockets", + "network_utilization_connections" )( ( agentId: AgentId, timestamp: Timing.Timestamp, + endTimestamp: Timing.Timestamp, + measurementType: String, machinedId: MachineId, hostname: Hostname, interfaces: Seq[Interface], - openSockets: Seq[OpenSocket] + openSockets: Seq[OpenSocket], + connections: Seq[Connection] ) => - Measurement.NetworkConfiguration( - agentId = agentId, - timing = timestamp, - machineId = machinedId, - hostname = hostname, - interfaces = interfaces, - openSockets = openSockets + measurementType match + case "network_configuration" => + Measurement.NetworkConfiguration( + agentId = agentId, + timing = timestamp, + machineId = machinedId, + hostname = hostname, + interfaces = interfaces, + openSockets = openSockets + ) + case "network_utilization" => + Measurement.NetworkUtilization( + agentId = agentId, + timing = Timing.Timeframe( + Interval( + start = timestamp.instant, + stop = endTimestamp.instant + ) + ), + connections = connections + ) + case _ => + throw DecodingFailure( + s"invalid measurement type $measurementType", + List(CursorOp.DownField("measurement_type")) + ) + )(_ match + case Measurement.NetworkConfiguration( + agentId, + timing, + machineId, + hostname, + interfaces, + openSockets + ) => + ( + agentId, + timing, + Timing.Timestamp(Instant.EPOCH), + "network_configuration", + machineId, + hostname, + interfaces, + openSockets, + Seq.empty[Connection] + ) + case Measurement.NetworkUtilization( + agentId, + timing, + connections + ) => + ( + agentId, + Timing.Timestamp(timing.value.normalizedStart), + Timing.Timestamp(timing.value.normalizedStop), + "network_utilization", + MachineId(new UUID(0, 0)), + Hostname.fromString("a").get, + Seq.empty[Interface], + Seq.empty[OpenSocket], + connections ) - )(nc => - ( - nc.agentId, - nc.timing, - nc.machineId, - nc.hostname, - nc.interfaces, - nc.openSockets - ) ) given Codec[Interface] = - Codec.forProduct3("name", "is_up", "networks")(Interface.apply)(interface => + Codec.forProduct3( + "name", + "is_up", + "networks" + )(Interface.apply)(interface => (interface.name, interface.isUp, interface.networks) ) given Codec[OpenSocket] = - Codec.forProduct3("socket", "protocol", "maybe_process_name")( - OpenSocket.apply - )(openSocket => - (openSocket.socket, openSocket.protocol, openSocket.maybeProcessName) - ) - - given Codec[Measurement.NetworkUtilization] = - Codec.forProduct4( - "agent_id", - "timestamp", - "end_timestamp", - "network_utilization_connections" + Codec.forProduct3( + "socket", + "protocol", + "maybe_process_name" )( ( - agentId: AgentId, - timestamp: Timing.Timestamp, - endTimestamp: Timing.Timestamp, - connections: Seq[Connection] + socket: SocketAddress[Host], + protocol: Protocol, + processNameValue: String ) => - Measurement.NetworkUtilization( - agentId = agentId, - timing = Timing.Timeframe( - Interval( - start = timestamp.instant, - stop = endTimestamp.instant - ) - ), - connections = connections + OpenSocket( + socket = socket, + protocol = protocol, + maybeProcessName = + if (processNameValue.nonEmpty) Option(ProcessName(processNameValue)) + else None ) - )(nu => + )(openSocket => ( - nu.agentId, - Timing.Timestamp(nu.timing.value.normalizedStart), - Timing.Timestamp(nu.timing.value.normalizedStop), - nu.connections + openSocket.socket, + openSocket.protocol, + openSocket.maybeProcessName.fold("")(_.value) ) )