From 68b5582b27facc07ad130d96004bcac292fae837 Mon Sep 17 00:00:00 2001 From: Ziinc Date: Wed, 11 Dec 2024 17:39:27 +0800 Subject: [PATCH] perf: reduce broadway memory footprint (#2278) * perf: reduce broadway memory footprint * chore: fix failing tests * chore: increase min-ready --- cloudbuild/staging/deploy.yaml | 2 +- .../backends/adaptor/postgres_adaptor/pipeline.ex | 4 +++- lib/logflare/backends/adaptor/webhook_adaptor.ex | 4 ++-- lib/logflare/backends/buffer_producer.ex | 8 +++++--- lib/logflare/source/bigquery/pipeline.ex | 4 ++-- test/logflare/backends/buffer_producer_test.exs | 12 +++++++++--- test/logflare/backends/loki_adaptor_test.exs | 1 + 7 files changed, 23 insertions(+), 12 deletions(-) diff --git a/cloudbuild/staging/deploy.yaml b/cloudbuild/staging/deploy.yaml index b1bcf1ccf..7eb70504b 100644 --- a/cloudbuild/staging/deploy.yaml +++ b/cloudbuild/staging/deploy.yaml @@ -43,7 +43,7 @@ steps: - --type=proactive - --max-surge=1 - --max-unavailable=0 - - --min-ready=60 + - --min-ready=180 - --minimal-action=replace - --most-disruptive-allowed-action=replace - --replacement-method=substitute diff --git a/lib/logflare/backends/adaptor/postgres_adaptor/pipeline.ex b/lib/logflare/backends/adaptor/postgres_adaptor/pipeline.ex index 79c498cea..ec5a052e1 100644 --- a/lib/logflare/backends/adaptor/postgres_adaptor/pipeline.ex +++ b/lib/logflare/backends/adaptor/postgres_adaptor/pipeline.ex @@ -15,7 +15,9 @@ defmodule Logflare.Backends.Adaptor.PostgresAdaptor.Pipeline do Broadway.start_link(__MODULE__, name: adaptor_state.pipeline_name, producer: [ - module: {BufferProducer, [source: adaptor_state.source, backend: adaptor_state.backend]}, + module: + {BufferProducer, + [source_id: adaptor_state.source.id, backend_id: adaptor_state.backend.id]}, transformer: {__MODULE__, :transform, []}, concurrency: 1 ], diff --git a/lib/logflare/backends/adaptor/webhook_adaptor.ex b/lib/logflare/backends/adaptor/webhook_adaptor.ex index a4c35fe58..320118ee2 100644 --- a/lib/logflare/backends/adaptor/webhook_adaptor.ex +++ b/lib/logflare/backends/adaptor/webhook_adaptor.ex @@ -105,8 +105,8 @@ defmodule Logflare.Backends.Adaptor.WebhookAdaptor do module: {BufferProducer, [ - backend: args.backend, - source: args.source + backend_id: Map.get(args.backend || %{}, :id), + source_id: args.source.id ]}, transformer: {__MODULE__, :transform, []}, concurrency: 1 diff --git a/lib/logflare/backends/buffer_producer.ex b/lib/logflare/backends/buffer_producer.ex index efec15432..34d6e50df 100644 --- a/lib/logflare/backends/buffer_producer.ex +++ b/lib/logflare/backends/buffer_producer.ex @@ -15,12 +15,14 @@ defmodule Logflare.Backends.BufferProducer do @impl GenStage def init(opts) do + source = Sources.Cache.get_by_id(opts[:source_id]) + state = %{ demand: 0, # TODO: broadcast by id instead. - source_id: opts[:source].id, - source_token: opts[:source].token, - backend_id: Map.get(opts[:backend] || %{}, :id), + source_id: opts[:source_id], + source_token: source.token, + backend_id: opts[:backend_id], # discard logging backoff last_discard_log_dt: nil, interval: Keyword.get(opts, :interval, @default_interval) diff --git a/lib/logflare/source/bigquery/pipeline.ex b/lib/logflare/source/bigquery/pipeline.ex index ba3901560..fece6ae25 100644 --- a/lib/logflare/source/bigquery/pipeline.ex +++ b/lib/logflare/source/bigquery/pipeline.ex @@ -45,8 +45,8 @@ defmodule Logflare.Source.BigQuery.Pipeline do module: {BufferProducer, [ - source: source, - backend: backend + source_id: source.id, + backend_id: backend.id ]}, transformer: {__MODULE__, :transform, []} ], diff --git a/test/logflare/backends/buffer_producer_test.exs b/test/logflare/backends/buffer_producer_test.exs index 7c0a4e96a..45df60edb 100644 --- a/test/logflare/backends/buffer_producer_test.exs +++ b/test/logflare/backends/buffer_producer_test.exs @@ -16,7 +16,10 @@ defmodule Logflare.Backends.BufferProducerTest do source = insert(:source, user: user) le = build(:log_event, source: source) - buffer_producer_pid = start_supervised!({BufferProducer, backend: nil, source: source}) + + buffer_producer_pid = + start_supervised!({BufferProducer, backend_id: nil, source_id: source.id}) + sid_bid_pid = {source.id, nil, buffer_producer_pid} :timer.sleep(100) :ok = IngestEventQueue.add_to_table(sid_bid_pid, [le]) @@ -37,7 +40,10 @@ defmodule Logflare.Backends.BufferProducerTest do startup_key = {source.id, nil, nil} IngestEventQueue.upsert_tid(startup_key) :ok = IngestEventQueue.add_to_table(startup_key, [le]) - buffer_producer_pid = start_supervised!({BufferProducer, backend: nil, source: source}) + + buffer_producer_pid = + start_supervised!({BufferProducer, backend_id: nil, source_id: source.id}) + sid_bid_pid = {source.id, nil, buffer_producer_pid} GenStage.stream([{buffer_producer_pid, max_demand: 1}]) @@ -53,7 +59,7 @@ defmodule Logflare.Backends.BufferProducerTest do source = insert(:source, user: user) pid = - start_supervised!({BufferProducer, backend: nil, source: source, buffer_size: 10}) + start_supervised!({BufferProducer, backend_id: nil, source_id: source.id, buffer_size: 10}) le = build(:log_event) items = for _ <- 1..100, do: le diff --git a/test/logflare/backends/loki_adaptor_test.exs b/test/logflare/backends/loki_adaptor_test.exs index b53c2091e..f26807ff6 100644 --- a/test/logflare/backends/loki_adaptor_test.exs +++ b/test/logflare/backends/loki_adaptor_test.exs @@ -52,6 +52,7 @@ defmodule Logflare.Backends.Adaptor.LokiAdaptorTest do describe "logs ingestion" do setup do + insert(:plan) user = insert(:user) source = insert(:source, user: user)