Skip to content

Commit

Permalink
Add host projector (#32)
Browse files Browse the repository at this point in the history
* Chore: change Repo test port

* Add commanded_ecto_projections to the mix

* Add Host read model

* Add and wire-up host projector

* Add projectors subscription migration (auto-generated)

* Add projectors test helper

* Add host projector test

* Add after update pub sub callback

* Chore: add event_store.reset mix task
  • Loading branch information
fabriziosestito committed Dec 30, 2021
1 parent ba7b1d1 commit 404365d
Show file tree
Hide file tree
Showing 12 changed files with 198 additions and 3 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ jobs:
POSTGRES_PASSWORD: postgres
POSTGRES_DB: tronto_test
ports:
- 5432:5432
- 5433:5432
# Set health checks to wait until postgres has started
options: >-
--health-cmd pg_isready
Expand Down
1 change: 1 addition & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ config :tronto, Tronto.Repo,
password: "postgres",
database: "tronto_test#{System.get_env("MIX_TEST_PARTITION")}",
hostname: "localhost",
port: 5433,
pool: Ecto.Adapters.SQL.Sandbox,
pool_size: 10

Expand Down
3 changes: 2 additions & 1 deletion lib/tronto/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ defmodule Tronto.Application do
# Start the Endpoint (http/https)
TrontoWeb.Endpoint,
Tronto.Commanded,
Tronto.Scheduler
Tronto.Scheduler,
Tronto.Monitoring.ProjectorsSupervisor
# Start a worker by calling: Tronto.Worker.start_link(arg)
# {Tronto.Worker, arg}
]
Expand Down
58 changes: 58 additions & 0 deletions lib/tronto/monitoring/projectors/host_projector.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
defmodule Tronto.Monitoring.HostProjector do
@moduledoc """
Host projector
"""

use Commanded.Projections.Ecto,
application: Tronto.Commanded,
repo: Tronto.Repo,
name: "host_projector"

alias Tronto.Monitoring.Domain.Events.HostRegistered
alias Tronto.Monitoring.HostReadModel

project(
%HostRegistered{
id_host: id,
hostname: hostname,
ip_addresses: ip_addresses,
agent_version: agent_version
},
fn multi ->
changeset =
%HostReadModel{}
|> HostReadModel.changeset(%{
id: id,
hostname: hostname,
ip_addresses: ip_addresses,
agent_version: agent_version
})

Ecto.Multi.insert(multi, :host, changeset)
end
)

@impl true
def after_update(
%HostRegistered{
id_host: id,
hostname: hostname,
ip_addresses: ip_addresses,
agent_version: agent_version
},
_,
_
) do
Phoenix.PubSub.broadcast(
Tronto.PubSub,
"hosts",
{:host_registered,
%{
id: id,
hostname: hostname,
ip_addresses: ip_addresses,
agent_version: agent_version
}}
)
end
end
18 changes: 18 additions & 0 deletions lib/tronto/monitoring/projectors/projectors_supervisor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
defmodule Tronto.Monitoring.ProjectorsSupervisor do
@moduledoc false

use Supervisor

def start_link(init_arg) do
Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
end

@impl true
def init(_init_arg) do
children = [
Tronto.Monitoring.HostProjector
]

Supervisor.init(children, strategy: :one_for_one)
end
end
24 changes: 24 additions & 0 deletions lib/tronto/monitoring/read_models/host_read_model.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
defmodule Tronto.Monitoring.HostReadModel do
@moduledoc """
Host read model
"""

use Ecto.Schema

import Ecto.Changeset

@type t :: %__MODULE__{}

@derive {Jason.Encoder, except: [:__meta__, :__struct__]}
@primary_key {:id, :binary_id, autogenerate: false}
schema "hosts" do
field :hostname, :string
field :ip_addresses, {:array, :string}
field :agent_version, :string
end

@spec changeset(t() | Ecto.Changeset.t(), map) :: Ecto.Changeset.t()
def changeset(host, attrs) do
cast(host, attrs, __MODULE__.__schema__(:fields))
end
end
8 changes: 7 additions & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ defmodule Tronto.MixProject do

# Specifies which paths to compile per environment.
defp elixirc_paths(:test),
do: ["lib", "test/support", "deps/commanded/test/support/aggregate_case.ex"]
do: [
"lib",
"test/support",
"deps/commanded/test/support/aggregate_case.ex"
]

defp elixirc_paths(_), do: ["lib"]

Expand All @@ -41,6 +45,7 @@ defmodule Tronto.MixProject do
defp deps do
[
{:commanded, "~> 1.3"},
{:commanded_ecto_projections, "~> 1.2"},
{:commanded_eventstore_adapter, "~> 1.2"},
{:credo, "~> 1.6", only: [:dev, :test], runtime: false},
{:dialyxir, "~> 1.0", only: [:dev, :test], runtime: false},
Expand Down Expand Up @@ -89,6 +94,7 @@ defmodule Tronto.MixProject do
"ecto.setup": ["ecto.create", "ecto.migrate", "run priv/repo/seeds.exs"],
"ecto.reset": ["ecto.drop", "ecto.setup"],
"event_store.setup": ["event_store.create", "event_store.init"],
"event_store.reset": ["event_store.drop", "event_store.setup"],
test: ["ecto.create --quiet", "ecto.migrate --quiet", "test"],
"assets.deploy": [
# "cmd --cd assets npm run deploy",
Expand Down
1 change: 1 addition & 0 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"bunt": {:hex, :bunt, "0.2.0", "951c6e801e8b1d2cbe58ebbd3e616a869061ddadcc4863d0a2182541acae9a38", [:mix], [], "hexpm", "7af5c7e09fe1d40f76c8e4f9dd2be7cebd83909f31fee7cd0e9eadc567da8353"},
"castore": {:hex, :castore, "0.1.13", "ccf3ab251ffaebc4319f41d788ce59a6ab3f42b6c27e598ad838ffecee0b04f9", [:mix], [], "hexpm", "a14a7eecfec7e20385493dbb92b0d12c5d77ecfd6307de10102d58c94e8c49c0"},
"commanded": {:hex, :commanded, "1.3.1", "d18a73bface68c04cbbda69647604a3cc1918fbdf8af4a784fc3a3a30ca34a13", [:mix], [{:backoff, "~> 1.1", [hex: :backoff, repo: "hexpm", optional: false]}, {:elixir_uuid, "~> 1.2", [hex: :elixir_uuid, repo: "hexpm", optional: false]}, {:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 2.0", [hex: :phoenix_pubsub, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_registry, "~> 0.2", [hex: :telemetry_registry, repo: "hexpm", optional: false]}], "hexpm", "9bd03ef6fc05e3a8fb4d0808f13a2106688e60ee4b2bdb78cf7e63a6788c9faf"},
"commanded_ecto_projections": {:hex, :commanded_ecto_projections, "1.2.1", "ad1e274d2458a4dab268deb95601e0fb644ab141fb14995f540c66de1788731f", [:mix], [{:commanded, "~> 1.2", [hex: :commanded, repo: "hexpm", optional: false]}, {:ecto, "~> 3.5", [hex: :ecto, repo: "hexpm", optional: false]}, {:ecto_sql, "~> 3.5", [hex: :ecto_sql, repo: "hexpm", optional: false]}, {:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "461a1e3489c56f6e7564f54e4200a41afcab5618fdf396e0c00269613328f00c"},
"commanded_eventstore_adapter": {:hex, :commanded_eventstore_adapter, "1.2.0", "a311247d70ce775b2d4b5484d09dbefd27911025ac7deb3e5b79b79f243e6fb1", [:mix], [{:commanded, "~> 1.2", [hex: :commanded, repo: "hexpm", optional: false]}, {:eventstore, "~> 1.1", [hex: :eventstore, repo: "hexpm", optional: false]}, {:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "b2cce46dfccf400f3956322a72997e591c0f833d0c20557e1074c7609564dae2"},
"connection": {:hex, :connection, "1.1.0", "ff2a49c4b75b6fb3e674bfc5536451607270aac754ffd1bdfe175abe4a6d7a68", [:mix], [], "hexpm", "722c1eb0a418fbe91ba7bd59a47e28008a189d47e37e0e7bb85585a016b2869c"},
"cowboy": {:hex, :cowboy, "2.9.0", "865dd8b6607e14cf03282e10e934023a1bd8be6f6bacf921a7e2a96d800cd452", [:make, :rebar3], [{:cowlib, "2.11.0", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "1.8.0", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "2c729f934b4e1aa149aff882f57c6372c15399a20d54f65c8d67bef583021bde"},
Expand Down
12 changes: 12 additions & 0 deletions priv/repo/migrations/20211228124832_create_hosts.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
defmodule Tronto.Repo.Migrations.CreateHosts do
use Ecto.Migration

def change do
create table(:hosts, primary_key: false) do
add :id, :uuid, primary_key: true
add :hostname, :string
add :ip_addresses, {:array, :string}
add :agent_version, :string
end
end
end
12 changes: 12 additions & 0 deletions priv/repo/migrations/20211228125531_create_projection_versions.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
defmodule Tronto.Repo.Migrations.CreateProjectionVersions do
use Ecto.Migration

def change do
create table(:projection_versions, primary_key: false) do
add(:projection_name, :text, primary_key: true)
add(:last_seen_event_number, :bigint)

timestamps(type: :naive_datetime_usec)
end
end
end
29 changes: 29 additions & 0 deletions test/support/helpers/projector_test_helper.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
defmodule Tronto.ProjectorTestHelper do
@moduledoc """
This module contains helper functions for testing projectors
"""

def project(projector, event, projection_name) do
:ok =
projector.handle(event, %{
event_number: next_event_number(projector, projection_name),
handler_name: projection_name
})
end

defp next_event_number(projector, projection_name),
do: last_seen_event_number(projector, projection_name) + 1

defp last_seen_event_number(projector, projection_name) do
projector
|> Module.concat(ProjectionVersion)
|> Tronto.Repo.get(projection_name)
|> case do
nil ->
0

projection_version ->
Map.get(projection_version, :last_seen_event_number)
end
end
end
33 changes: 33 additions & 0 deletions test/tronto/projectors/host_projector_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
defmodule Tronto.Monitoring.HostProjectorTest do
use ExUnit.Case
use Tronto.DataCase

alias Tronto.Monitoring.{
HostProjector,
HostReadModel
}

alias Tronto.Monitoring.Domain.Events.HostRegistered

alias Tronto.ProjectorTestHelper
alias Tronto.Repo

@moduletag :integration

test "should project a new host when HostRegistered event is received" do
event = %HostRegistered{
id_host: Faker.UUID.v4(),
hostname: Faker.StarWars.character(),
ip_addresses: [Faker.Internet.ip_v4_address()],
agent_version: Faker.StarWars.planet()
}

ProjectorTestHelper.project(HostProjector, event, "host_projector")
host_projection = Repo.get!(HostReadModel, event.id_host)

assert event.id_host == host_projection.id
assert event.hostname == host_projection.hostname
assert event.ip_addresses == host_projection.ip_addresses
assert event.agent_version == host_projection.agent_version
end
end

0 comments on commit 404365d

Please sign in to comment.