diff --git a/akka-sample-sharding-java/build.sbt b/akka-sample-sharding-java/build.sbt index e1aa2095..5b4f7420 100644 --- a/akka-sample-sharding-java/build.sbt +++ b/akka-sample-sharding-java/build.sbt @@ -20,6 +20,7 @@ val `akka-sample-sharding-java` = project javacOptions in doc in Compile := Seq("-parameters", "-Xdoclint:none"), libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-cluster-sharding" % akkaVersion, + "com.typesafe.akka" %% "akka-cluster-sharding-typed" % akkaVersion, "com.typesafe.akka" %% "akka-serialization-jackson" % akkaVersion, "org.scalatest" %% "scalatest" % "3.0.7" % Test ), diff --git a/akka-sample-sharding-java/src/main/java/sample/sharding/Device.java b/akka-sample-sharding-java/src/main/java/sample/sharding/Device.java index 7d6a7178..8d722d8b 100644 --- a/akka-sample-sharding-java/src/main/java/sample/sharding/Device.java +++ b/akka-sample-sharding-java/src/main/java/sample/sharding/Device.java @@ -32,10 +32,12 @@ public String toString() { public static class GetTemperature implements Command { public final int deviceId; + public final ActorRef replyTo; @JsonCreator - public GetTemperature(int deviceId) { + public GetTemperature(int deviceId, ActorRef replyTo) { this.deviceId = deviceId; + this.replyTo = replyTo; } } @@ -77,13 +79,18 @@ private void receiveRecordTemperature(RecordTemperature cmd) { } private void receiveGetTemperature(GetTemperature cmd) { + Temperature reply; if (temperatures.isEmpty()) { - getSender().tell(new Temperature(cmd.deviceId, Double.NaN, - Double.NaN, 0), getSelf()); + reply = new Temperature(cmd.deviceId, Double.NaN, Double.NaN, 0); } else { - getSender().tell(new Temperature(cmd.deviceId, average(temperatures), - temperatures.get(temperatures.size() - 1), temperatures.size()), getSelf()); + reply = new Temperature(cmd.deviceId, average(temperatures), + temperatures.get(temperatures.size() - 1), temperatures.size()); } + + if (cmd.replyTo == null) + getSender().tell(reply, getSelf()); + else + cmd.replyTo.tell(reply, getSelf()); } private double sum(List values) { diff --git a/akka-sample-sharding-java/src/main/java/sample/sharding/Devices.java b/akka-sample-sharding-java/src/main/java/sample/sharding/Devices.java index aba56b6d..72d4e00a 100644 --- a/akka-sample-sharding-java/src/main/java/sample/sharding/Devices.java +++ b/akka-sample-sharding-java/src/main/java/sample/sharding/Devices.java @@ -2,6 +2,7 @@ import java.time.Duration; import java.util.Random; +import java.util.concurrent.CompletionStage; import akka.actor.AbstractActorWithTimers; @@ -13,6 +14,8 @@ import akka.cluster.sharding.ClusterSharding; import akka.cluster.sharding.ClusterShardingSettings; import akka.cluster.sharding.ShardRegion; +import akka.cluster.sharding.typed.ShardingEnvelope; +import akka.pattern.Patterns; public class Devices extends AbstractActorWithTimers { @@ -31,7 +34,13 @@ public String entityId(Object message) { return String.valueOf(((Device.RecordTemperature) message).deviceId); else if (message instanceof Device.GetTemperature) return String.valueOf(((Device.GetTemperature) message).deviceId); - else + else if (message instanceof ShardingEnvelope) { + ShardingEnvelope envelope = (ShardingEnvelope) message; + if (envelope.message() instanceof Device.RecordTemperature) + return String.valueOf(((Device.RecordTemperature) envelope.message()).deviceId); + else + return null; + } else return null; } }; @@ -84,8 +93,15 @@ private void receiveUpdateDevice() { } private void receiveReadTemperatures() { - for (int id = 0; id < numberOfDevices; id++) { - deviceRegion.tell(new Device.GetTemperature(id), getSelf()); + for (int deviceId = 0; deviceId < numberOfDevices; deviceId++) { + if (deviceId >= 40) { + final int id = deviceId; + CompletionStage reply = Patterns.askWithReplyTo(deviceRegion, replyTo -> + new Device.GetTemperature(id, replyTo), Duration.ofSeconds(3)); + Patterns.pipe(reply, getContext().getDispatcher()).to(getSelf()); + } else { + deviceRegion.tell(new Device.GetTemperature(deviceId, getSelf()), getSelf()); + } } } diff --git a/akka-sample-sharding-java/src/main/resources/application.conf b/akka-sample-sharding-java/src/main/resources/application.conf index 903f3afc..8af5c0b9 100644 --- a/akka-sample-sharding-java/src/main/resources/application.conf +++ b/akka-sample-sharding-java/src/main/resources/application.conf @@ -27,3 +27,6 @@ akka { } } + +akka.cluster.sharding.number-of-shards = 100 + diff --git a/akka-sample-sharding-scala/build.sbt b/akka-sample-sharding-scala/build.sbt index 32145afe..f18acdef 100644 --- a/akka-sample-sharding-scala/build.sbt +++ b/akka-sample-sharding-scala/build.sbt @@ -1,7 +1,7 @@ import com.typesafe.sbt.SbtMultiJvm.multiJvmSettings import com.typesafe.sbt.SbtMultiJvm.MultiJvmKeys.MultiJvm -val akkaVersion = "2.6.0-M4" +val akkaVersion = "2.6.0" lazy val `akka-sample-sharding-scala` = project .in(file(".")) @@ -20,8 +20,9 @@ lazy val `akka-sample-sharding-scala` = project javaOptions in run ++= Seq("-Xms128m", "-Xmx1024m"), libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-cluster-sharding" % akkaVersion, + "com.typesafe.akka" %% "akka-cluster-sharding-typed" % akkaVersion, "com.typesafe.akka" %% "akka-serialization-jackson" % akkaVersion, - "org.scalatest" %% "scalatest" % "3.0.7" % Test + "org.scalatest" %% "scalatest" % "3.0.8" % Test ), mainClass in (Compile, run) := Some("sample.sharding.ShardingApp"), // disable parallel tests diff --git a/akka-sample-sharding-scala/src/main/resources/application.conf b/akka-sample-sharding-scala/src/main/resources/application.conf index fc64a627..193471a5 100644 --- a/akka-sample-sharding-scala/src/main/resources/application.conf +++ b/akka-sample-sharding-scala/src/main/resources/application.conf @@ -20,10 +20,9 @@ akka { seed-nodes = [ "akka://ShardingSystem@127.0.0.1:2551", "akka://ShardingSystem@127.0.0.1:2552"] - - # auto downing is NOT safe for production deployments. - # you may want to use it during development, read more about it in the docs. - auto-down-unreachable-after = 10s } } + +akka.cluster.sharding.number-of-shards = 100 + diff --git a/akka-sample-sharding-scala/src/main/resources/logback.xml b/akka-sample-sharding-scala/src/main/resources/logback.xml new file mode 100644 index 00000000..cbf0f479 --- /dev/null +++ b/akka-sample-sharding-scala/src/main/resources/logback.xml @@ -0,0 +1,18 @@ + + + + [%date{ISO8601}] [%level] [%logger] [%thread] [%X{akkaSource}] - %msg%n + + + + + 1024 + true + + + + + + + + diff --git a/akka-sample-sharding-scala/src/main/scala/sample/sharding/Device.scala b/akka-sample-sharding-scala/src/main/scala/sample/sharding/Device.scala index 19b0488b..610f5f40 100644 --- a/akka-sample-sharding-scala/src/main/scala/sample/sharding/Device.scala +++ b/akka-sample-sharding-scala/src/main/scala/sample/sharding/Device.scala @@ -13,7 +13,7 @@ object Device { case class RecordTemperature(deviceId: Int, temperature: Double) extends Command - case class GetTemperature(deviceId: Int) extends Command + case class GetTemperature(deviceId: Int, replyTo: ActorRef) extends Command case class Temperature(deviceId: Int, average: Double, @@ -38,13 +38,17 @@ class Device extends Actor with ActorLogging { ) context.become(counting(temperatures)) - case GetTemperature(id) => + case GetTemperature(id, replyTo) => val reply = if (values.isEmpty) Temperature(id, Double.NaN, Double.NaN, 0) else Temperature(id, average(values), values.last, values.size) - sender() ! reply + + if (replyTo == null) + sender() ! reply + else + replyTo ! reply } diff --git a/akka-sample-sharding-scala/src/main/scala/sample/sharding/Devices.scala b/akka-sample-sharding-scala/src/main/scala/sample/sharding/Devices.scala index 6cbe3182..9d97f929 100644 --- a/akka-sample-sharding-scala/src/main/scala/sample/sharding/Devices.scala +++ b/akka-sample-sharding-scala/src/main/scala/sample/sharding/Devices.scala @@ -5,6 +5,10 @@ import scala.util.Random import akka.actor._ import akka.cluster.sharding._ +import akka.cluster.sharding.typed.ShardingEnvelope +import akka.pattern.extended.ask // note extended.ask +import akka.pattern.pipe +import akka.util.Timeout object Devices { // Update a random device @@ -21,7 +25,9 @@ class Devices extends Actor with ActorLogging with Timers { private val extractEntityId: ShardRegion.ExtractEntityId = { case msg @ Device.RecordTemperature(id, _) => (id.toString, msg) - case msg @ Device.GetTemperature(id) => (id.toString, msg) + case msg @ Device.GetTemperature(id, _) => (id.toString, msg) + case ShardingEnvelope(_, msg @ Device.RecordTemperature(id, _)) => + (id.toString, msg) } private val numberOfShards = 100 @@ -29,11 +35,15 @@ class Devices extends Actor with ActorLogging with Timers { private val extractShardId: ShardRegion.ExtractShardId = { case Device.RecordTemperature(id, _) => (math.abs(id.hashCode) % numberOfShards).toString - case Device.GetTemperature(id) => + case Device.GetTemperature(id, _) => (math.abs(id.hashCode) % numberOfShards).toString // Needed if you want to use 'remember entities': case ShardRegion.StartEntity(id) => (math.abs(id.hashCode) % numberOfShards).toString + case ShardingEnvelope(_, Device.RecordTemperature(id, _)) => + (math.abs(id.hashCode) % numberOfShards).toString + case ShardingEnvelope(_, Device.GetTemperature(id, _)) => + (math.abs(id.hashCode) % numberOfShards).toString } val deviceRegion: ActorRef = ClusterSharding(context.system).start( @@ -64,7 +74,14 @@ class Devices extends Actor with ActorLogging with Timers { case ReadTemperatures => (0 to numberOfDevices).foreach { deviceId => - deviceRegion ! Device.GetTemperature(deviceId) + if (deviceId >= 40) { + import context.dispatcher + implicit val timeout = Timeout(3.seconds) + deviceRegion + .ask(replyTo => Device.GetTemperature(deviceId, replyTo)) + .pipeTo(self) + } else + deviceRegion ! Device.GetTemperature(deviceId, self) } case temp: Device.Temperature => diff --git a/akka-sample-sharding-typed-java/.gitignore b/akka-sample-sharding-typed-java/.gitignore new file mode 100644 index 00000000..b0814a06 --- /dev/null +++ b/akka-sample-sharding-typed-java/.gitignore @@ -0,0 +1,18 @@ +*# +*.iml +*.ipr +*.iws +*.pyc +*.tm.epoch +*.vim +*-shim.sbt +.idea/ +/project/plugins/project +project/boot +target/ +/logs +.cache +.classpath +.project +.settings +native/ diff --git a/akka-sample-sharding-typed-java/COPYING b/akka-sample-sharding-typed-java/COPYING new file mode 100644 index 00000000..0e259d42 --- /dev/null +++ b/akka-sample-sharding-typed-java/COPYING @@ -0,0 +1,121 @@ +Creative Commons Legal Code + +CC0 1.0 Universal + + CREATIVE COMMONS CORPORATION IS NOT A LAW FIRM AND DOES NOT PROVIDE + LEGAL SERVICES. DISTRIBUTION OF THIS DOCUMENT DOES NOT CREATE AN + ATTORNEY-CLIENT RELATIONSHIP. CREATIVE COMMONS PROVIDES THIS + INFORMATION ON AN "AS-IS" BASIS. CREATIVE COMMONS MAKES NO WARRANTIES + REGARDING THE USE OF THIS DOCUMENT OR THE INFORMATION OR WORKS + PROVIDED HEREUNDER, AND DISCLAIMS LIABILITY FOR DAMAGES RESULTING FROM + THE USE OF THIS DOCUMENT OR THE INFORMATION OR WORKS PROVIDED + HEREUNDER. + +Statement of Purpose + +The laws of most jurisdictions throughout the world automatically confer +exclusive Copyright and Related Rights (defined below) upon the creator +and subsequent owner(s) (each and all, an "owner") of an original work of +authorship and/or a database (each, a "Work"). + +Certain owners wish to permanently relinquish those rights to a Work for +the purpose of contributing to a commons of creative, cultural and +scientific works ("Commons") that the public can reliably and without fear +of later claims of infringement build upon, modify, incorporate in other +works, reuse and redistribute as freely as possible in any form whatsoever +and for any purposes, including without limitation commercial purposes. +These owners may contribute to the Commons to promote the ideal of a free +culture and the further production of creative, cultural and scientific +works, or to gain reputation or greater distribution for their Work in +part through the use and efforts of others. + +For these and/or other purposes and motivations, and without any +expectation of additional consideration or compensation, the person +associating CC0 with a Work (the "Affirmer"), to the extent that he or she +is an owner of Copyright and Related Rights in the Work, voluntarily +elects to apply CC0 to the Work and publicly distribute the Work under its +terms, with knowledge of his or her Copyright and Related Rights in the +Work and the meaning and intended legal effect of CC0 on those rights. + +1. Copyright and Related Rights. A Work made available under CC0 may be +protected by copyright and related or neighboring rights ("Copyright and +Related Rights"). Copyright and Related Rights include, but are not +limited to, the following: + + i. the right to reproduce, adapt, distribute, perform, display, + communicate, and translate a Work; + ii. moral rights retained by the original author(s) and/or performer(s); +iii. publicity and privacy rights pertaining to a person's image or + likeness depicted in a Work; + iv. rights protecting against unfair competition in regards to a Work, + subject to the limitations in paragraph 4(a), below; + v. rights protecting the extraction, dissemination, use and reuse of data + in a Work; + vi. database rights (such as those arising under Directive 96/9/EC of the + European Parliament and of the Council of 11 March 1996 on the legal + protection of databases, and under any national implementation + thereof, including any amended or successor version of such + directive); and +vii. other similar, equivalent or corresponding rights throughout the + world based on applicable law or treaty, and any national + implementations thereof. + +2. Waiver. To the greatest extent permitted by, but not in contravention +of, applicable law, Affirmer hereby overtly, fully, permanently, +irrevocably and unconditionally waives, abandons, and surrenders all of +Affirmer's Copyright and Related Rights and associated claims and causes +of action, whether now known or unknown (including existing as well as +future claims and causes of action), in the Work (i) in all territories +worldwide, (ii) for the maximum duration provided by applicable law or +treaty (including future time extensions), (iii) in any current or future +medium and for any number of copies, and (iv) for any purpose whatsoever, +including without limitation commercial, advertising or promotional +purposes (the "Waiver"). Affirmer makes the Waiver for the benefit of each +member of the public at large and to the detriment of Affirmer's heirs and +successors, fully intending that such Waiver shall not be subject to +revocation, rescission, cancellation, termination, or any other legal or +equitable action to disrupt the quiet enjoyment of the Work by the public +as contemplated by Affirmer's express Statement of Purpose. + +3. Public License Fallback. Should any part of the Waiver for any reason +be judged legally invalid or ineffective under applicable law, then the +Waiver shall be preserved to the maximum extent permitted taking into +account Affirmer's express Statement of Purpose. In addition, to the +extent the Waiver is so judged Affirmer hereby grants to each affected +person a royalty-free, non transferable, non sublicensable, non exclusive, +irrevocable and unconditional license to exercise Affirmer's Copyright and +Related Rights in the Work (i) in all territories worldwide, (ii) for the +maximum duration provided by applicable law or treaty (including future +time extensions), (iii) in any current or future medium and for any number +of copies, and (iv) for any purpose whatsoever, including without +limitation commercial, advertising or promotional purposes (the +"License"). The License shall be deemed effective as of the date CC0 was +applied by Affirmer to the Work. Should any part of the License for any +reason be judged legally invalid or ineffective under applicable law, such +partial invalidity or ineffectiveness shall not invalidate the remainder +of the License, and in such case Affirmer hereby affirms that he or she +will not (i) exercise any of his or her remaining Copyright and Related +Rights in the Work or (ii) assert any associated claims and causes of +action with respect to the Work, in either case contrary to Affirmer's +express Statement of Purpose. + +4. Limitations and Disclaimers. + + a. No trademark or patent rights held by Affirmer are waived, abandoned, + surrendered, licensed or otherwise affected by this document. + b. Affirmer offers the Work as-is and makes no representations or + warranties of any kind concerning the Work, express, implied, + statutory or otherwise, including without limitation warranties of + title, merchantability, fitness for a particular purpose, non + infringement, or the absence of latent or other defects, accuracy, or + the present or absence of errors, whether or not discoverable, all to + the greatest extent permissible under applicable law. + c. Affirmer disclaims responsibility for clearing rights of other persons + that may apply to the Work or any use thereof, including without + limitation any person's Copyright and Related Rights in the Work. + Further, Affirmer disclaims responsibility for obtaining any necessary + consents, permissions or other rights required for any use of the + Work. + d. Affirmer understands and acknowledges that Creative Commons is not a + party to this document and has no duty or obligation with respect to + this CC0 or use of the Work. diff --git a/akka-sample-sharding-typed-java/LICENSE b/akka-sample-sharding-typed-java/LICENSE new file mode 100644 index 00000000..4239f09e --- /dev/null +++ b/akka-sample-sharding-typed-java/LICENSE @@ -0,0 +1,10 @@ +Akka sample by Lightbend + +Licensed under Public Domain (CC0) + +To the extent possible under law, the person who associated CC0 with +this Template has waived all copyright and related or neighboring +rights to this Template. + +You should have received a copy of the CC0 legalcode along with this +work. If not, see . diff --git a/akka-sample-sharding-typed-java/README.md b/akka-sample-sharding-typed-java/README.md new file mode 100644 index 00000000..ff10cd6a --- /dev/null +++ b/akka-sample-sharding-typed-java/README.md @@ -0,0 +1,39 @@ +This tutorial contains a sample illustrating [Akka Cluster Sharding](http://doc.akka.io/docs/akka/current/java/cluster-sharding.html#an-example). + +## Example overview + +First of all, make sure the correct settings in [application.conf](src/main/resources/application.conf) are set as described in the akka-sample-cluster tutorial. + +Open [ShardingApp.java](src/main/java/sample/sharding/ShardingApp.java). + +This small program starts an ActorSystem with Cluster Sharding enabled. It joins the cluster and starts a `Devices` actor. This actor starts the infrastructure to shard `Device` instances and starts sending messages to arbitrary devices. + +To run this sample, type `sbt "runMain sample.sharding.ShardingApp"` if it is not already started. + +`ShardingApp` starts three actor systems (cluster members) in the same JVM process. It can be more interesting to run them in separate processes. Stop the application and then open three terminal windows. + +In the first terminal window, start the first seed node with the following command: + + sbt "runMain sample.sharding.ShardingApp 2551" + +2551 corresponds to the port of the first seed-nodes element in the configuration. In the log output you see that the cluster node has been started and changed status to 'Up'. + +You'll see a log message when `Devices` sends a message to record the current temperature, and for each of those you'll see a log message from the `Device` showing the action taken and the new average temperature. + +In the second terminal window, start the second seed node with the following command: + + sbt "runMain sample.sharding.ShardingApp 2552" + +2552 corresponds to the port of the second seed-nodes element in the configuration. In the log output you see that the cluster node has been started and joins the other seed node and becomes a member of the cluster. Its status changed to 'Up'. Switch over to the first terminal window and see in the log output that the member joined. + +Some of the devices that were originally on the `ActorSystem` on port 2551 will be migrated to the newly joined `ActorSystem` on port 2552. The migration is straightforward: the old actor is stopped and a fresh actor is started on the newly created `ActorSystem`. Notice this means the average is reset: if you want your state to be persisted you'll need to take care of this yourself. For this reason Cluster Sharding and Akka Persistence are such a popular combination. + +Start another node in the third terminal window with the following command: + + sbt "runMain sample.sharding.ShardingApp 0" + +Now you don't need to specify the port number, 0 means that it will use a random available port. It joins one of the configured seed nodes. Look at the log output in the different terminal windows. + +Start even more nodes in the same way, if you like. + +Shut down one of the nodes by pressing 'ctrl-c' in one of the terminal windows. The other nodes will detect the failure after a while, which you can see in the log output in the other terminals. diff --git a/akka-sample-sharding-typed-java/build.sbt b/akka-sample-sharding-typed-java/build.sbt new file mode 100644 index 00000000..71085fe7 --- /dev/null +++ b/akka-sample-sharding-typed-java/build.sbt @@ -0,0 +1,32 @@ +val akkaVersion = "2.6.0-M4" + +val `akka-sample-sharding-typed-java` = project + .in(file(".")) + .settings( + organization := "com.typesafe.akka.samples", + scalaVersion := "2.12.8", + scalacOptions in Compile ++= Seq( + "-deprecation", + "-feature", + "-unchecked", + "-Xlog-reflective-calls", + "-Xlint" + ), + javacOptions in Compile ++= Seq( + "-parameters", + "-Xlint:unchecked", + "-Xlint:deprecation" + ), + javacOptions in doc in Compile := Seq("-parameters", "-Xdoclint:none"), + libraryDependencies ++= Seq( + "com.typesafe.akka" %% "akka-cluster-sharding-typed" % akkaVersion, + "com.typesafe.akka" %% "akka-serialization-jackson" % akkaVersion, + "org.scalatest" %% "scalatest" % "3.0.7" % Test + ), + mainClass in (Compile, run) := Some("sample.sharding.ShardingApp"), + // disable parallel tests + parallelExecution in Test := false, + licenses := Seq( + ("CC0", url("http://creativecommons.org/publicdomain/zero/1.0")) + ) + ) diff --git a/akka-sample-sharding-typed-java/project/build.properties b/akka-sample-sharding-typed-java/project/build.properties new file mode 100644 index 00000000..c0bab049 --- /dev/null +++ b/akka-sample-sharding-typed-java/project/build.properties @@ -0,0 +1 @@ +sbt.version=1.2.8 diff --git a/akka-sample-sharding-typed-java/project/plugins.sbt b/akka-sample-sharding-typed-java/project/plugins.sbt new file mode 100644 index 00000000..a7f83b3d --- /dev/null +++ b/akka-sample-sharding-typed-java/project/plugins.sbt @@ -0,0 +1 @@ +addSbtPlugin("com.dwijnand" % "sbt-dynver" % "3.0.0") diff --git a/akka-sample-sharding-typed-java/src/main/java/sample/sharding/Device.java b/akka-sample-sharding-typed-java/src/main/java/sample/sharding/Device.java new file mode 100644 index 00000000..4a7a8676 --- /dev/null +++ b/akka-sample-sharding-typed-java/src/main/java/sample/sharding/Device.java @@ -0,0 +1,130 @@ +package sample.sharding; + +import java.util.ArrayList; +import java.util.List; + +import akka.actor.typed.ActorRef; +import akka.actor.typed.ActorSystem; +import akka.actor.typed.Behavior; +import akka.actor.typed.javadsl.AbstractBehavior; +import akka.actor.typed.javadsl.ActorContext; +import akka.actor.typed.javadsl.Behaviors; +import akka.actor.typed.javadsl.Receive; +import akka.cluster.sharding.typed.javadsl.ClusterSharding; +import akka.cluster.sharding.typed.javadsl.Entity; +import akka.cluster.sharding.typed.javadsl.EntityTypeKey; +import com.fasterxml.jackson.annotation.JsonCreator; + +public class Device extends AbstractBehavior { + + public static final EntityTypeKey TYPE_KEY = EntityTypeKey.create(Command.class, "Device"); + + public static void init(ActorSystem system) { + // If the original hashing function was using + // `(math.abs(id.hashCode) % numberOfShards).toString` + // the default HashCodeMessageExtractor in Typed can be used. + // That is also compatible with `akka.cluster.sharding.ShardRegion.HashCodeMessageExtractor`. + ClusterSharding.get(system).init(Entity.of(TYPE_KEY, context -> Device.create())); + } + + public interface Command extends Message {} + + public static class RecordTemperature implements Command { + public final int deviceId; + public final double temperature; + + public RecordTemperature(int deviceId, double temperature) { + this.deviceId = deviceId; + this.temperature = temperature; + } + + @Override + public String toString() { + return "RecordTemperature(" + deviceId + ", " + temperature + ")"; + } + } + + public static class GetTemperature implements Command { + public final int deviceId; + public final ActorRef replyTo; + + @JsonCreator + public GetTemperature(int deviceId, ActorRef replyTo) { + this.deviceId = deviceId; + this.replyTo = replyTo; + } + } + + public static class Temperature implements Message { + public final int deviceId; + public final double average; + public final double latest; + public final int readings; + + public Temperature(int deviceId, double average, double latest, int readings) { + this.deviceId = deviceId; + this.average = average; + this.latest = latest; + this.readings = readings; + } + + @Override + public String toString() { + return "Temperature(" + deviceId + ", " + average + ", " + latest+ ", " + readings + ")"; + } + } + + public static Behavior create() { + return Behaviors.setup(Device::new); + } + + private final ActorContext context; + private List temperatures = new ArrayList(); + + private Device(ActorContext context) { + this.context = context; + } + + @Override + public Receive createReceive() { + return newReceiveBuilder() + .onMessage(RecordTemperature.class, this::receiveRecordTemperature) + .onMessage(GetTemperature.class, this::receiveGetTemperature) + .build(); + } + + private Behavior receiveRecordTemperature(RecordTemperature cmd) { + temperatures.add(cmd.temperature); + context.getLog().info("Recording temperature {} for device {}, average is {} after {} readings", + cmd.temperature, cmd.deviceId, average(temperatures), temperatures.size()); + return this; + } + + private Behavior receiveGetTemperature(GetTemperature cmd) { + Temperature reply; + if (temperatures.isEmpty()) { + reply = new Temperature(cmd.deviceId, Double.NaN, Double.NaN, 0); + } else { + reply = new Temperature(cmd.deviceId, average(temperatures), + temperatures.get(temperatures.size() - 1), temperatures.size()); + } + + cmd.replyTo.tell(reply); + return this; + } + + private double sum(List values) { + double result = 0.0; + for (double d : values) { + result += d; + } + return result; + } + + private double average(List values) { + if (values.isEmpty()) + return Double.NaN; + else + return sum(values) / values.size(); + } +} diff --git a/akka-sample-sharding-typed-java/src/main/java/sample/sharding/Devices.java b/akka-sample-sharding-typed-java/src/main/java/sample/sharding/Devices.java new file mode 100644 index 00000000..5fe32d12 --- /dev/null +++ b/akka-sample-sharding-typed-java/src/main/java/sample/sharding/Devices.java @@ -0,0 +1,106 @@ +package sample.sharding; + +import akka.actor.typed.ActorRef; +import akka.actor.typed.Behavior; +import akka.actor.typed.javadsl.AbstractBehavior; +import akka.actor.typed.javadsl.ActorContext; +import akka.actor.typed.javadsl.Behaviors; +import akka.actor.typed.javadsl.Receive; +import akka.cluster.sharding.typed.javadsl.ClusterSharding; +import akka.cluster.sharding.typed.javadsl.EntityRef; + +import java.time.Duration; +import java.util.Random; +import java.util.concurrent.CompletionStage; + +public class Devices extends AbstractBehavior { + + public interface Command {} + + public enum UpdateDevice implements Command { + INSTANCE + } + + public enum ReadTemperatures implements Command { + INSTANCE + } + + private static class GetTemperatureReply implements Command { + final Device.Temperature temp; + + private GetTemperatureReply(Device.Temperature temp) { + this.temp = temp; + } + } + + public static Behavior create() { + return Behaviors.setup(context -> + Behaviors.withTimers(timers -> { + Device.init(context.getSystem()); + + timers.startTimerWithFixedDelay(UpdateDevice.INSTANCE, UpdateDevice.INSTANCE, Duration.ofSeconds(1)); + timers.startTimerWithFixedDelay(ReadTemperatures.INSTANCE, ReadTemperatures.INSTANCE, Duration.ofSeconds(15)); + + return new Devices(context); + })); + } + + private final ActorContext context; + private final ClusterSharding sharding; + private final ActorRef temperatureAdapter; + private final Random random = new Random(); + + private final int numberOfDevices = 50; + + public Devices(ActorContext context) { + this.context = context; + + this.sharding = ClusterSharding.get(context.getSystem()); + + this.temperatureAdapter = + context.messageAdapter(Device.Temperature.class, GetTemperatureReply::new); + } + + @Override + public Receive createReceive() { + return newReceiveBuilder() + .onMessage(UpdateDevice.class, m -> receiveUpdateDevice()) + .onMessage(ReadTemperatures.class, m -> receiveReadTemperatures()) + .onMessage(GetTemperatureReply.class, this::receiveTemperature) + .build(); + } + + private Behavior receiveUpdateDevice() { + int deviceId = random.nextInt(numberOfDevices); + double temperature = 5 + 30 * random.nextDouble(); + Device.RecordTemperature msg = new Device.RecordTemperature(deviceId, temperature); + context.getLog().info("Sending {}", msg); + entityRefFor(deviceId).tell(msg); + return this; + } + + private Behavior receiveReadTemperatures() { + for (int deviceId = 0; deviceId < numberOfDevices; deviceId++) { + entityRefFor(deviceId).tell(new Device.GetTemperature(deviceId, temperatureAdapter)); + } + return this; + } + + private EntityRef entityRefFor(int deviceId) { + return sharding.entityRefFor(Device.TYPE_KEY, String.valueOf(deviceId)); + } + + private Behavior receiveTemperature(GetTemperatureReply reply) { + Device.Temperature temp = reply.temp; + if (temp.readings > 0) + context.getLog().info( + "Temperature of device {} is {} with average {} after {} readings", + temp.deviceId, + temp.latest, + temp.average, + temp.readings + ); + return this; + } + +} diff --git a/akka-sample-sharding-typed-java/src/main/java/sample/sharding/Message.java b/akka-sample-sharding-typed-java/src/main/java/sample/sharding/Message.java new file mode 100644 index 00000000..e3e9fa78 --- /dev/null +++ b/akka-sample-sharding-typed-java/src/main/java/sample/sharding/Message.java @@ -0,0 +1,7 @@ +package sample.sharding; + +/** + * Marker interface for actor messages that are serialized. + */ +public interface Message { +} diff --git a/akka-sample-sharding-typed-java/src/main/java/sample/sharding/ShardingApp.java b/akka-sample-sharding-typed-java/src/main/java/sample/sharding/ShardingApp.java new file mode 100644 index 00000000..34121409 --- /dev/null +++ b/akka-sample-sharding-typed-java/src/main/java/sample/sharding/ShardingApp.java @@ -0,0 +1,27 @@ +package sample.sharding; + +import akka.actor.typed.ActorSystem; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +public class ShardingApp { + + public static void main(String[] args) { + if (args.length == 0) + startup(new String[] { "2551", "2552", "0" }); + else + startup(args); + } + + public static void startup(String[] ports) { + for (String port : ports) { + // Override the configuration of the port + Config config = ConfigFactory.parseString( + "akka.remote.artery.canonical.port=" + port).withFallback( + ConfigFactory.load()); + + // Create an Akka system + ActorSystem.create(Devices.create(), "ShardingSystem", config); + } + } +} diff --git a/akka-sample-sharding-typed-java/src/main/resources/application.conf b/akka-sample-sharding-typed-java/src/main/resources/application.conf new file mode 100644 index 00000000..0eef9963 --- /dev/null +++ b/akka-sample-sharding-typed-java/src/main/resources/application.conf @@ -0,0 +1,32 @@ +akka { + loglevel = INFO + + actor { + provider = "cluster" + + serialization-bindings { + "sample.sharding.Message" = jackson-cbor + } + } + + # For the sample, just bind to loopback and do not allow access from the network + # the port is overridden by the logic in main class + remote.artery { + canonical.port = 0 + canonical.hostname = 127.0.0.1 + } + + cluster { + seed-nodes = [ + "akka://ShardingSystem@127.0.0.1:2551", + "akka://ShardingSystem@127.0.0.1:2552"] + + # auto downing is NOT safe for production deployments. + # you may want to use it during development, read more about it in the docs. + auto-down-unreachable-after = 10s + } + + sharding.number-of-shards = 100 + +} + diff --git a/akka-sample-sharding-typed-scala/LICENSE b/akka-sample-sharding-typed-scala/LICENSE new file mode 100644 index 00000000..4239f09e --- /dev/null +++ b/akka-sample-sharding-typed-scala/LICENSE @@ -0,0 +1,10 @@ +Akka sample by Lightbend + +Licensed under Public Domain (CC0) + +To the extent possible under law, the person who associated CC0 with +this Template has waived all copyright and related or neighboring +rights to this Template. + +You should have received a copy of the CC0 legalcode along with this +work. If not, see . diff --git a/akka-sample-sharding-typed-scala/README.md b/akka-sample-sharding-typed-scala/README.md new file mode 100644 index 00000000..bbb52b6a --- /dev/null +++ b/akka-sample-sharding-typed-scala/README.md @@ -0,0 +1,39 @@ +This tutorial contains a sample illustrating [Akka Cluster Sharding](http://doc.akka.io/docs/akka/current/scala/cluster-sharding.html#an-example). + +## Example overview + +First of all, make sure the correct settings in [application.conf](src/main/resources/application.conf) are set as described in the akka-sample-cluster tutorial. + +Open [ShardingApp.scala](src/main/scala/sample/sharding/ShardingApp.scala). + +This small program starts an ActorSystem with Cluster Sharding enabled. It joins the cluster and starts a `Devices` actor. This actor starts the infrastructure to shard `Device` instances and starts sending messages to arbitrary devices. + +To run this sample, type `sbt "runMain sample.sharding.ShardingApp"` if it is not already started. + +`ShardingApp` starts three actor systems (cluster members) in the same JVM process. It can be more interesting to run them in separate processes. Stop the application and then open three terminal windows. + +In the first terminal window, start the first seed node with the following command: + + sbt "runMain sample.sharding.ShardingApp 2551" + +2551 corresponds to the port of the first seed-nodes element in the configuration. In the log output you see that the cluster node has been started and changed status to 'Up'. + +You'll see a log message when `Devices` sends a message to record the current temperature, and for each of those you'll see a log message from the `Device` showing the action taken and the new average temperature. + +In the second terminal window, start the second seed node with the following command: + + sbt "runMain sample.sharding.ShardingApp 2552" + +2552 corresponds to the port of the second seed-nodes element in the configuration. In the log output you see that the cluster node has been started and joins the other seed node and becomes a member of the cluster. Its status changed to 'Up'. Switch over to the first terminal window and see in the log output that the member joined. + +Some of the devices that were originally on the `ActorSystem` on port 2551 will be migrated to the newly joined `ActorSystem` on port 2552. The migration is straightforward: the old actor is stopped and a fresh actor is started on the newly created `ActorSystem`. Notice this means the average is reset: if you want your state to be persisted you'll need to take care of this yourself. For this reason Cluster Sharding and Akka Persistence are such a popular combination. + +Start another node in the third terminal window with the following command: + + sbt "runMain sample.sharding.ShardingApp 0" + +Now you don't need to specify the port number, 0 means that it will use a random available port. It joins one of the configured seed nodes. Look at the log output in the different terminal windows. + +Start even more nodes in the same way, if you like. + +Shut down one of the nodes by pressing 'ctrl-c' in one of the terminal windows. The other nodes will detect the failure after a while, which you can see in the log output in the other terminals. diff --git a/akka-sample-sharding-typed-scala/build.sbt b/akka-sample-sharding-typed-scala/build.sbt new file mode 100644 index 00000000..55e116bd --- /dev/null +++ b/akka-sample-sharding-typed-scala/build.sbt @@ -0,0 +1,33 @@ +import com.typesafe.sbt.SbtMultiJvm.multiJvmSettings +import com.typesafe.sbt.SbtMultiJvm.MultiJvmKeys.MultiJvm + +val akkaVersion = "2.6.0-M4" + +lazy val `akka-sample-sharding-typed-scala` = project + .in(file(".")) + .settings(multiJvmSettings: _*) + .settings( + organization := "com.typesafe.akka.samples", + scalaVersion := "2.12.8", + scalacOptions in Compile ++= Seq( + "-deprecation", + "-feature", + "-unchecked", + "-Xlog-reflective-calls", + "-Xlint" + ), + javacOptions in Compile ++= Seq("-Xlint:unchecked", "-Xlint:deprecation"), + javaOptions in run ++= Seq("-Xms128m", "-Xmx1024m"), + libraryDependencies ++= Seq( + "com.typesafe.akka" %% "akka-cluster-sharding-typed" % akkaVersion, + "com.typesafe.akka" %% "akka-serialization-jackson" % akkaVersion, + "org.scalatest" %% "scalatest" % "3.0.7" % Test + ), + mainClass in (Compile, run) := Some("sample.sharding.ShardingApp"), + // disable parallel tests + parallelExecution in Test := false, + licenses := Seq( + ("CC0", url("http://creativecommons.org/publicdomain/zero/1.0")) + ) + ) + .configs(MultiJvm) diff --git a/akka-sample-sharding-typed-scala/project/build.properties b/akka-sample-sharding-typed-scala/project/build.properties new file mode 100644 index 00000000..c0bab049 --- /dev/null +++ b/akka-sample-sharding-typed-scala/project/build.properties @@ -0,0 +1 @@ +sbt.version=1.2.8 diff --git a/akka-sample-sharding-typed-scala/project/plugins.sbt b/akka-sample-sharding-typed-scala/project/plugins.sbt new file mode 100644 index 00000000..2d026355 --- /dev/null +++ b/akka-sample-sharding-typed-scala/project/plugins.sbt @@ -0,0 +1,2 @@ +addSbtPlugin("com.typesafe.sbt" % "sbt-multi-jvm" % "0.4.0") +addSbtPlugin("com.dwijnand" % "sbt-dynver" % "3.0.0") diff --git a/akka-sample-sharding-typed-scala/src/main/resources/application.conf b/akka-sample-sharding-typed-scala/src/main/resources/application.conf new file mode 100644 index 00000000..c5e13e4c --- /dev/null +++ b/akka-sample-sharding-typed-scala/src/main/resources/application.conf @@ -0,0 +1,31 @@ +akka { + loglevel = INFO + + actor { + provider = "cluster" + + serialization-bindings { + "sample.sharding.Message" = jackson-cbor + } + } + + # For the sample, just bind to loopback and do not allow access from the network + # the port is overridden by the logic in main class + remote.artery { + canonical.port = 0 + canonical.hostname = 127.0.0.1 + } + + cluster { + seed-nodes = [ + "akka://ShardingSystem@127.0.0.1:2551", + "akka://ShardingSystem@127.0.0.1:2552"] + + # auto downing is NOT safe for production deployments. + # you may want to use it during development, read more about it in the docs. + auto-down-unreachable-after = 10s + + sharding.number-of-shards = 100 + } + +} diff --git a/akka-sample-sharding-typed-scala/src/main/scala/sample/sharding/Device.scala b/akka-sample-sharding-typed-scala/src/main/scala/sample/sharding/Device.scala new file mode 100644 index 00000000..162e43e0 --- /dev/null +++ b/akka-sample-sharding-typed-scala/src/main/scala/sample/sharding/Device.scala @@ -0,0 +1,71 @@ +package sample.sharding + +import akka.actor.typed.ActorRef +import akka.actor.typed.ActorSystem +import akka.actor.typed.Behavior +import akka.actor.typed.scaladsl.Behaviors +import akka.cluster.sharding.typed.scaladsl.ClusterSharding +import akka.cluster.sharding.typed.scaladsl.Entity +import akka.cluster.sharding.typed.scaladsl.EntityTypeKey + +/** + * This is just an example: cluster sharding would be overkill for just keeping a small amount of data, + * but becomes useful when you have a collection of 'heavy' actors (in terms of processing or state) + * so you need to distribute them across several nodes. + */ +object Device { + val TypeKey = EntityTypeKey[Device.Command]("Device") + + def init(system: ActorSystem[_]): Unit = { + + // If the original hashing function was using + // `(math.abs(id.hashCode) % numberOfShards).toString` + // the default HashCodeMessageExtractor in Typed can be used. + // That is also compatible with `akka.cluster.sharding.ShardRegion.HashCodeMessageExtractor`. + ClusterSharding(system).init(Entity(TypeKey, _ => Device())) + } + + sealed trait Command extends Message + + case class RecordTemperature(deviceId: Int, temperature: Double) + extends Command + + case class GetTemperature(deviceId: Int, replyTo: ActorRef[Temperature]) + extends Command + + case class Temperature(deviceId: Int, + average: Double, + latest: Double, + readings: Int) + extends Message + + def apply(): Behavior[Command] = + counting(Vector.empty) + + private def counting(values: Vector[Double]): Behavior[Command] = { + Behaviors.receive { (context, cmd) => + cmd match { + case RecordTemperature(id, temp) => + val temperatures = values :+ temp + context.log.info( + s"Recording temperature $temp for device $id, average is ${average(temperatures)} after " + + s"${temperatures.size} readings" + ) + counting(temperatures) + + case GetTemperature(id, replyTo) => + val reply = + if (values.isEmpty) + Temperature(id, Double.NaN, Double.NaN, 0) + else + Temperature(id, average(values), values.last, values.size) + replyTo ! reply + Behaviors.same + } + } + } + + private def average(values: Vector[Double]): Double = + if (values.isEmpty) Double.NaN + else values.sum / values.size +} diff --git a/akka-sample-sharding-typed-scala/src/main/scala/sample/sharding/Devices.scala b/akka-sample-sharding-typed-scala/src/main/scala/sample/sharding/Devices.scala new file mode 100644 index 00000000..60c353b5 --- /dev/null +++ b/akka-sample-sharding-typed-scala/src/main/scala/sample/sharding/Devices.scala @@ -0,0 +1,70 @@ +package sample.sharding + +import scala.concurrent.duration._ +import scala.util.Random + +import akka.actor.typed.Behavior +import akka.actor.typed.scaladsl.Behaviors +import akka.cluster.sharding.typed.scaladsl.ClusterSharding + +object Devices { + sealed trait Command + + private case object UpdateDevice extends Command + + private case object ReadTemperatures extends Command + + private case class GetTemperatureReply(temp: Device.Temperature) + extends Command + + def apply(): Behavior[Command] = { + Behaviors.setup { context => + Device.init(context.system) + val sharding = ClusterSharding(context.system) + + Behaviors.withTimers { timers => + val random = new Random() + val numberOfDevices = 50 + + timers.startTimerWithFixedDelay(UpdateDevice, UpdateDevice, 1.second) + timers.startTimerWithFixedDelay( + ReadTemperatures, + ReadTemperatures, + 15.seconds + ) + + val temperatureAdapter = + context.messageAdapter[Device.Temperature](GetTemperatureReply(_)) + + Behaviors.receiveMessage { + case UpdateDevice => + val deviceId = random.nextInt(numberOfDevices) + val temperature = 5 + 30 * random.nextDouble() + val msg = Device.RecordTemperature(deviceId, temperature) + context.log.info(s"Sending $msg") + sharding.entityRefFor(Device.TypeKey, deviceId.toString) ! msg + Behaviors.same + + case ReadTemperatures => + (0 to numberOfDevices).foreach { deviceId => + val entityRef = + sharding.entityRefFor(Device.TypeKey, deviceId.toString) + entityRef ! Device.GetTemperature(deviceId, temperatureAdapter) + } + Behaviors.same + + case GetTemperatureReply(temp: Device.Temperature) => + if (temp.readings > 0) + context.log.info( + "Temperature of device {} is {} with average {} after {} readings", + temp.deviceId, + temp.latest, + temp.average, + temp.readings + ) + Behaviors.same + } + } + } + } +} diff --git a/akka-sample-sharding-typed-scala/src/main/scala/sample/sharding/Message.scala b/akka-sample-sharding-typed-scala/src/main/scala/sample/sharding/Message.scala new file mode 100644 index 00000000..84baedcb --- /dev/null +++ b/akka-sample-sharding-typed-scala/src/main/scala/sample/sharding/Message.scala @@ -0,0 +1,9 @@ +/** + * Copyright (C) 2019 Lightbend Inc. + */ +package sample.sharding + +/** + * Marker interface for actor messages that are serialized. + */ +trait Message diff --git a/akka-sample-sharding-typed-scala/src/main/scala/sample/sharding/ShardingApp.scala b/akka-sample-sharding-typed-scala/src/main/scala/sample/sharding/ShardingApp.scala new file mode 100644 index 00000000..cff9352e --- /dev/null +++ b/akka-sample-sharding-typed-scala/src/main/scala/sample/sharding/ShardingApp.scala @@ -0,0 +1,29 @@ +package sample.sharding + +import akka.actor.typed.ActorSystem +import com.typesafe.config.ConfigFactory + +object ShardingApp { + def main(args: Array[String]): Unit = { + if (args.isEmpty) + startup(Seq("2551", "2552", "0")) + else + startup(args) + } + + def startup(ports: Seq[String]): Unit = { + // In a production application you wouldn't typically start multiple ActorSystem instances in the + // same JVM, here we do it to easily demonstrate these ActorSytems (which would be in separate JVM's) + // talking to each other. + ports foreach { port => + // Override the configuration of the port + val config = ConfigFactory + .parseString("akka.remote.artery.canonical.port=" + port) + .withFallback(ConfigFactory.load()) + + // Create an Akka system, with Devices actor that starts the sharding and sends random messages + ActorSystem(Devices(), "ShardingSystem", config) + } + } + +}