diff --git a/src/Akka.CQRS.Pricing.Actors/MatchAggregator.cs b/src/Akka.CQRS.Pricing.Actors/MatchAggregator.cs index bf5c52f8..5272009c 100644 --- a/src/Akka.CQRS.Pricing.Actors/MatchAggregator.cs +++ b/src/Akka.CQRS.Pricing.Actors/MatchAggregator.cs @@ -88,9 +88,7 @@ private void Recovers() /// Recovery has completed successfully. /// protected override void OnReplaySuccess() - { - _publishPricesTask = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(TimeSpan.FromSeconds(10), - TimeSpan.FromSeconds(10), Self, PublishEvents.Instance, ActorRefs.NoSender); + { // setup subscription to TradeEventPublisher Self.Tell(DoSubscribe.Instance); @@ -263,6 +261,9 @@ private void Commands() protected override void PreStart() { + _publishPricesTask = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(TimeSpan.FromSeconds(10), + TimeSpan.FromSeconds(10), Self, PublishEvents.Instance, ActorRefs.NoSender); + _log.Info("Starting..."); base.PreStart(); } diff --git a/src/Akka.CQRS.Pricing.Service/Program.cs b/src/Akka.CQRS.Pricing.Service/Program.cs index 8a7b5665..0d9373d2 100644 --- a/src/Akka.CQRS.Pricing.Service/Program.cs +++ b/src/Akka.CQRS.Pricing.Service/Program.cs @@ -32,11 +32,15 @@ static int Main(string[] args) var sharding = ClusterSharding.Get(actorSystem); + var shardRegion = sharding.Start("priceAggregator", s => Props.Create(() => new MatchAggregator(s)), ClusterShardingSettings.Create(actorSystem), new StockShardMsgRouter()); + var priceInitiatorActor = actorSystem.ActorOf(ClusterSingletonManager.Props(Props.Create(() => new PriceInitiatorActor(shardRegion)), + ClusterSingletonManagerSettings.Create(actorSystem).WithRole("pricing-engine").WithSingletonName("priceInitiator")), "priceInitiator"); + var clientHandler = actorSystem.ActorOf(Props.Create(() => new ClientHandlerActor(shardRegion)), "subscriptions");