Skip to content

Commit

Permalink
Merge pull request #1629 from Logflare/staging
Browse files Browse the repository at this point in the history
Release v1.3.20
  • Loading branch information
chasers authored Jul 17, 2023
2 parents 958a2f5 + d57761a commit 77dbab8
Show file tree
Hide file tree
Showing 9 changed files with 31 additions and 8 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.3.19
1.3.20
2 changes: 2 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ config :logflare, LogflareWeb.Endpoint,
pubsub_server: Logflare.PubSub,
live_view: [signing_salt: "Fvo_-oQi4bjPfQLh"]

config :logflare, Logflare.PubSub, pool_size: 10

# Configures Elixir's Logger
config :logger,
handle_otp_reports: true,
Expand Down
3 changes: 2 additions & 1 deletion lib/logflare/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,14 @@ defmodule Logflare.Application do
grpc_port = Application.get_env(:grpc, :port)
ssl = Application.get_env(:logflare, :ssl)
grpc_creds = if ssl, do: GRPC.Credential.new(ssl: ssl)
pool_size = Application.get_env(:logflare, Logflare.PubSub)[:pool_size]

[
{Task.Supervisor, name: Logflare.TaskSupervisor},
{Cluster.Supervisor, [topologies, [name: Logflare.ClusterSupervisor]]},
get_goth_child_spec(),
Logflare.Repo,
{Phoenix.PubSub, name: Logflare.PubSub, pool_size: 10},
{Phoenix.PubSub, name: Logflare.PubSub, pool_size: pool_size},
# supervisor(LogflareTelemetry.Supervisor, []),
# Context Caches
ContextCache,
Expand Down
6 changes: 5 additions & 1 deletion lib/logflare/pubsub_rates/buffers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ defmodule Logflare.PubSubRates.Buffers do

use GenServer

@pool_size Application.compile_env(:logflare, Logflare.PubSub)[:pool_size]

def start_link(args \\ []) do
GenServer.start_link(
__MODULE__,
Expand All @@ -16,7 +18,9 @@ defmodule Logflare.PubSubRates.Buffers do
end

def init(state) do
PubSub.subscribe(Logflare.PubSub, "buffers")
for shard <- 1..@pool_size do
PubSub.subscribe(Logflare.PubSub, "buffers:shard-#{shard}")
end

{:ok, state}
end
Expand Down
6 changes: 5 additions & 1 deletion lib/logflare/pubsub_rates/inserts.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ defmodule Logflare.PubSubRates.Inserts do

use GenServer

@pool_size Application.compile_env(:logflare, Logflare.PubSub)[:pool_size]

def start_link(args \\ []) do
GenServer.start_link(
__MODULE__,
Expand All @@ -16,7 +18,9 @@ defmodule Logflare.PubSubRates.Inserts do
end

def init(state) do
PubSub.subscribe(Logflare.PubSub, "inserts")
for shard <- 1..@pool_size do
PubSub.subscribe(Logflare.PubSub, "inserts:shard-#{shard}")
end

{:ok, state}
end
Expand Down
6 changes: 5 additions & 1 deletion lib/logflare/pubsub_rates/rates.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ defmodule Logflare.PubSubRates.Rates do

use GenServer

@pool_size Application.compile_env(:logflare, Logflare.PubSub)[:pool_size]

def start_link(args \\ []) do
GenServer.start_link(
__MODULE__,
Expand All @@ -16,7 +18,9 @@ defmodule Logflare.PubSubRates.Rates do
end

def init(state) do
PubSub.subscribe(Logflare.PubSub, "rates")
for shard <- 1..@pool_size do
PubSub.subscribe(Logflare.PubSub, "rates:shard-#{shard}")
end

{:ok, state}
end
Expand Down
5 changes: 4 additions & 1 deletion lib/logflare/source/bigquery/buffer_counter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ defmodule Logflare.Source.BigQuery.BufferCounter do

@broadcast_every 5_000
@max_buffer_len 5_000
@pool_size Application.compile_env(:logflare, Logflare.PubSub)[:pool_size]

def start_link(%RLS{source_id: source_id}) when is_atom(source_id) do
GenServer.start_link(
Expand Down Expand Up @@ -178,9 +179,11 @@ defmodule Logflare.Source.BigQuery.BufferCounter do
defp broadcast_buffer(state) do
local_buffer = %{Node.self() => %{len: state.len}}

shard = :erlang.phash2(state.source_id, @pool_size)

Phoenix.PubSub.broadcast(
Logflare.PubSub,
"buffers",
"buffers:shard-#{shard}",
{:buffers, state.source_id, local_buffer}
)

Expand Down
4 changes: 3 additions & 1 deletion lib/logflare/source/rate_counter_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ defmodule Logflare.Source.RateCounterServer do

@default_bucket_width 60
@ets_table_name :rate_counters
@pool_size Application.compile_env(:logflare, Logflare.PubSub)[:pool_size]

use TypedStruct

Expand Down Expand Up @@ -252,11 +253,12 @@ defmodule Logflare.Source.RateCounterServer do
end

def broadcast(%RCS{} = state) do
shard = :erlang.phash2(state.source_id, @pool_size)
local_rates = %{Node.self() => state_to_external(state)}

Phoenix.PubSub.broadcast(
Logflare.PubSub,
"rates",
"rates:shard-#{shard}",
{:rates, state.source_id, local_rates}
)

Expand Down
5 changes: 4 additions & 1 deletion lib/logflare/source/recent_logs_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ defmodule Logflare.Source.RecentLogsServer do

@touch_timer :timer.minutes(45)
@broadcast_every 500
@pool_size Application.compile_env(:logflare, Logflare.PubSub)[:pool_size]

def start_link(%__MODULE__{source_id: source_id} = rls) when is_atom(source_id) do
GenServer.start_link(__MODULE__, rls, name: source_id)
Expand Down Expand Up @@ -246,9 +247,11 @@ defmodule Logflare.Source.RecentLogsServer do

inserts_payload = %{Node.self() => %{node_inserts: current_inserts, bq_inserts: bq_inserts}}

shard = :erlang.phash2(state.source_id, @pool_size)

Phoenix.PubSub.broadcast(
Logflare.PubSub,
"inserts",
"inserts:shard-#{shard}",
{:inserts, state.source_id, inserts_payload}
)
end
Expand Down

0 comments on commit 77dbab8

Please sign in to comment.