Skip to content

Commit

Permalink
Merge pull request #1668 from Logflare/staging
Browse files Browse the repository at this point in the history
Release 1.3.28
  • Loading branch information
Ziinc authored Aug 6, 2023
2 parents 89ed66c + 3776d08 commit 6e3f538
Show file tree
Hide file tree
Showing 13 changed files with 753 additions and 177 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.3.27
1.3.28
6 changes: 5 additions & 1 deletion lib/logflare/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ defmodule Logflare.Application do
{DynamicSupervisor, strategy: :one_for_one, name: Logflare.Backends.RecentLogsSup},
{DynamicSupervisor,
strategy: :one_for_one, name: Logflare.Backends.Adaptor.PostgresAdaptor.Supervisor},
{DynamicSupervisor,
strategy: :one_for_one, name: Logflare.Backends.Adaptor.PostgresAdaptor.PgRepoSupervisor},
{Registry, name: Logflare.Backends.SourceRegistry, keys: :unique},
{Registry, name: Logflare.Backends.SourceDispatcher, keys: :duplicate},
{Registry, name: Logflare.CounterRegistry, keys: :unique}
Expand Down Expand Up @@ -140,6 +142,8 @@ defmodule Logflare.Application do
{DynamicSupervisor, strategy: :one_for_one, name: Logflare.Backends.RecentLogsSup},
{DynamicSupervisor,
strategy: :one_for_one, name: Logflare.Backends.Adaptor.PostgresAdaptor.Supervisor},
{DynamicSupervisor,
strategy: :one_for_one, name: Logflare.Backends.Adaptor.PostgresAdaptor.PgRepoSupervisor},
{Registry, name: Logflare.Backends.SourceRegistry, keys: :unique},
{Registry, name: Logflare.Backends.SourceDispatcher, keys: :duplicate},
{Registry, name: Logflare.CounterRegistry, keys: :unique}
Expand Down Expand Up @@ -190,9 +194,9 @@ defmodule Logflare.Application do
if SingleTenant.supabase_mode?() do
SingleTenant.create_supabase_sources()
SingleTenant.create_supabase_endpoints()
SingleTenant.ensure_supabase_sources_started()

unless SingleTenant.postgres_backend?() do
SingleTenant.ensure_supabase_sources_started()
# buffer time for all sources to init and create tables
# in case of latency.
:timer.sleep(3_000)
Expand Down
18 changes: 13 additions & 5 deletions lib/logflare/backends/adaptor/postgres_adaptor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ defmodule Logflare.Backends.Adaptor.PostgresAdaptor do
{:ok, _} <- Registry.register(SourceDispatcher, source_id, {__MODULE__, :ingest}),
{:ok, buffer_pid} <- MemoryBuffer.start_link([]),
repository_module <- create_repo(source_backend),
:ok <- connect_to_repo(source_backend),
:ok <- connected?(source_backend),
:ok <- create_log_events_table(source_backend) do
state = %__MODULE__{
buffer_module: MemoryBuffer,
Expand Down Expand Up @@ -90,18 +90,26 @@ defmodule Logflare.Backends.Adaptor.PostgresAdaptor do
@impl true
def execute_query(%SourceBackend{} = source_backend, %Ecto.Query{} = query) do
mod = create_repo(source_backend)
:ok = connect_to_repo(source_backend)
:ok = connected?(source_backend)
result = mod.all(query)
{:ok, result}
end

def execute_query(%SourceBackend{} = source_backend, query_string) when is_binary(query_string),
do: execute_query(source_backend, {query_string, []})

def execute_query(%SourceBackend{} = source_backend, {query_string, params})
def execute_query(%SourceBackend{config: config} = source_backend, {query_string, params})
when is_binary(query_string) and is_list(params) do
mod = create_repo(source_backend)
:ok = connect_to_repo(source_backend)
:ok = connected?(source_backend)

# explicitly set search path
schema = Map.get(config, "schema") || Map.get(config, :schema)

if schema do
Ecto.Adapters.SQL.query!(mod, "SET search_path=#{schema}")
end

result = Ecto.Adapters.SQL.query!(mod, query_string, params)

rows =
Expand All @@ -113,8 +121,8 @@ defmodule Logflare.Backends.Adaptor.PostgresAdaptor do
end

# expose PgRepo functions
defdelegate connected?(source_backend), to: PgRepo
defdelegate create_repo(source_backend), to: PgRepo
defdelegate connect_to_repo(source_backend), to: PgRepo
defdelegate table_name(source_or_source_backend), to: PgRepo
defdelegate create_log_events_table(source_backend), to: PgRepo
defdelegate create_log_events_table(source_backend, override_migrations), to: PgRepo
Expand Down
197 changes: 134 additions & 63 deletions lib/logflare/backends/adaptor/postgres_adaptor/pg_repo.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@ defmodule Logflare.Backends.Adaptor.PostgresAdaptor.PgRepo do
Using the Source Backend source id we create a new Ecto.Repo which whom we will
be able to connect to the configured PSQL URL, run migrations and insert data.
"""
alias Logflare.Backends.Adaptor.PostgresAdaptor.Repo.Migrations.AddLogEvents
alias Logflare.Backends.Adaptor.PostgresAdaptor.Supervisor
use GenServer

alias Logflare.Backends.Adaptor.PostgresAdaptor
alias Logflare.Backends.Adaptor.PostgresAdaptor.PgLogEvent
alias Logflare.Backends.Adaptor.PostgresAdaptor.Repo.Migrations.AddLogEvents
alias Logflare.Backends.Adaptor.PostgresAdaptor.PgRepoSupervisor
alias Logflare.Backends.Adaptor.PostgresAdaptor.Supervisor
alias Logflare.Backends.SourceBackend
alias Logflare.Source
alias Logflare.LogEvent

alias Logflare.Source
require Logger

@ast (quote do
Expand All @@ -26,28 +28,14 @@ defmodule Logflare.Backends.Adaptor.PostgresAdaptor.PgRepo do
Requires `:source` to be preloaded.
"""
@spec create_repo(SourceBackend.t()) :: atom()

def create_repo(%SourceBackend{source: %Source{}} = source_backend) do
def create_repo(%SourceBackend{source: %Source{}, config: config} = source_backend) do
name = get_repo_module(source_backend)

case Code.ensure_compiled(name) do
{:module, _} -> nil
_ -> {:module, _, _, _} = Module.create(name, @ast, Macro.Env.location(__ENV__))
end

migration_table = migrations_table_name(source_backend)
unless Process.whereis(name) do
child_spec = {__MODULE__, %{repo_module_name: name, config: config}}

schema = Map.get(source_backend.config, "schema")

after_connect =
if schema do
{Postgrex, :query!, ["set search_path=#{schema}", []]}
end

Application.put_env(:logflare, name,
migration_source: migration_table,
after_connect: after_connect
)
{:ok, _} = DynamicSupervisor.start_child(PgRepoSupervisor, child_spec)
end

name
end
Expand All @@ -65,33 +53,24 @@ defmodule Logflare.Backends.Adaptor.PostgresAdaptor.PgRepo do
@doc """
Connects to a given postgres. Requires `:source` to be preloaded.
"""
@spec connect_to_repo(SourceBackend.t()) :: :ok
def connect_to_repo(%SourceBackend{config: config} = source_backend) do
repo = get_repo_module(source_backend)

unless Process.whereis(repo) do
pool_size =
Keyword.get(Application.get_env(:logflare, :postgres_backend_adapter), :pool_size, 10)

opts = [
{:url, config["url"] || config.url},
{:name, repo},
{:pool_size, pool_size}
]
@spec connected?(SourceBackend.t()) :: :ok | {:error, :not_connected}
def connected?(%SourceBackend{} = source_backend) do
repo_module_name = get_repo_module(source_backend)
connected?(repo_module_name, 5)
end

{:ok, _} = DynamicSupervisor.start_child(Supervisor, repo.child_spec(opts))
schema = Map.get(source_backend.config, "schema") || Map.get(source_backend.config, :schema)
defp connected?(_repo_module_name, 0), do: {:error, :not_connected}
defp connected?(_repo_module_name, :ok), do: :ok

if schema do
query = """
CREATE SCHEMA IF NOT EXISTS #{schema}
"""
defp connected?(repo_module_name, acc) do
case GenServer.call(via(repo_module_name), :connected?) do
:ok ->
connected?(repo_module_name, :ok)

{:ok, _} = Ecto.Adapters.SQL.query(repo, query, [])
end
_ ->
:timer.sleep(500)
connected?(repo_module_name, acc - 1)
end

:ok
end

@doc """
Expand All @@ -100,18 +79,12 @@ defmodule Logflare.Backends.Adaptor.PostgresAdaptor.PgRepo do
@spec create_log_events_table(SourceBackend.t(), list() | nil) ::
:ok | {:error, :failed_migration}
def create_log_events_table(source_backend, override_migrations \\ nil) do
repository_module = get_repo_module(source_backend)
repo_module_name = get_repo_module(source_backend)
migrations = if override_migrations, do: override_migrations, else: migrations(source_backend)
schema = Map.get(source_backend.config, "schema") || Map.get(source_backend.config, :schema)
opts = [all: true, prefix: schema, migrations: migrations]

opts = [all: true, prefix: schema]
Ecto.Migrator.run(repository_module, migrations, :up, opts)

:ok
rescue
e in Postgrex.Error ->
Logger.error("Error creating log_events table: #{inspect(e)}")
{:error, :failed_migration}
GenServer.call(via(repo_module_name), {:run_migrations, source_backend, opts})
end

@doc """
Expand Down Expand Up @@ -139,21 +112,17 @@ defmodule Logflare.Backends.Adaptor.PostgresAdaptor.PgRepo do
"""
@spec rollback_migrations(SourceBackend.t()) :: :ok
def rollback_migrations(source_backend) do
repository_module = create_repo(source_backend)
Ecto.Migrator.run(repository_module, migrations(source_backend), :down, all: true)

:ok
repo_module_name = get_repo_module(source_backend)
GenServer.call(via(repo_module_name), {:rollback_migrations_table, source_backend})
end

@doc """
Drops the migration table
"""
@spec drop_migrations_table(SourceBackend.t()) :: :ok
def drop_migrations_table(source_backend) do
repository_module = create_repo(source_backend)
migrations_table = migrations_table_name(source_backend)
Ecto.Adapters.SQL.query!(repository_module, "DROP TABLE IF EXISTS #{migrations_table}")
:ok
repo_module_name = get_repo_module(source_backend)
GenServer.call(via(repo_module_name), {:drop_migrations_table, source_backend})
end

@doc """
Expand Down Expand Up @@ -198,4 +167,106 @@ defmodule Logflare.Backends.Adaptor.PostgresAdaptor.PgRepo do

repo.insert(changeset)
end

## Genserver calls
defp via(repo_module_name) do
{:via, Registry, {Logflare.Backends.SourceRegistry, repo_module_name}}
end

def start_link(%{repo_module_name: repo_module_name} = state),
do: GenServer.start_link(__MODULE__, state, name: via(repo_module_name))

def init(state), do: {:ok, state, {:continue, :generate_repo_module}}

def handle_continue(:generate_repo_module, %{repo_module_name: repo_module_name} = state) do
case Code.ensure_compiled(repo_module_name) do
{:module, _} -> nil
_ -> {:module, _, _, _} = Module.create(repo_module_name, @ast, Macro.Env.location(__ENV__))
end

{:noreply, state, {:continue, :connect_repo}}
end

def handle_continue(:connect_repo, state) do
%{config: config, repo_module_name: repo_module_name} = state

pool_size =
Keyword.get(Application.get_env(:logflare, :postgres_backend_adapter), :pool_size, 10)

schema = Map.get(config, "schema") || Map.get(config, :schema)
repo_opts = [url: config["url"] || config.url, name: repo_module_name, pool_size: pool_size]

unless Process.whereis(repo_module_name) do
{:ok, _} = DynamicSupervisor.start_child(Supervisor, repo_module_name.child_spec(repo_opts))

if schema do
query = """
CREATE SCHEMA IF NOT EXISTS #{schema}
"""

{:ok, _} = Ecto.Adapters.SQL.query(repo_module_name, query, [])
Application.put_env(:logflare, repo_module_name, after_connect: after_connect(state))
end
end

{:noreply, state}
end

def handle_call(:connected?, _, %{repo_module_name: repo_module_name} = state) do
%Postgrex.Result{} = Ecto.Adapters.SQL.query!(repo_module_name, "SELECT 1")
{:reply, :ok, state}
rescue
_ -> {:reply, :error, state}
end

def handle_call({:run_migrations, %SourceBackend{} = source_backend, opts}, _, state) do
%{repo_module_name: repo_module_name} = state

migrations_table_name = migrations_table_name(source_backend)
{migrations, opts} = Keyword.pop!(opts, :migrations)

Application.put_env(:logflare, repo_module_name,
migration_source: migrations_table_name,
after_connect: after_connect(state)
)

Ecto.Migrator.run(repo_module_name, migrations, :up, opts)

{:reply, :ok, state}
rescue
e in Postgrex.Error ->
Logger.error("Error creating log_events table: #{inspect(e)}")
{:reply, {:error, :failed_migration}, state}
end

def handle_call({:drop_migrations_table, %SourceBackend{} = source_backend}, _, state) do
%{repo_module_name: repo_module_name} = state
migrations_table = migrations_table_name(source_backend)
Ecto.Adapters.SQL.query!(repo_module_name, "DROP TABLE IF EXISTS #{migrations_table}")

{:reply, :ok, state}
rescue
e in Postgrex.Error ->
Logger.error("Error creating log_events table: #{inspect(e)}")
{:reply, {:error, :failed_migration}, state}
end

def handle_call({:rollback_migrations_table, %SourceBackend{} = source_backend}, _, state) do
repository_module = create_repo(source_backend)
Ecto.Migrator.run(repository_module, migrations(source_backend), :down, all: true)

{:reply, :ok, state}
rescue
e in Postgrex.Error ->
Logger.error("Error creating log_events table: #{inspect(e)}")
{:reply, {:error, :failed_migration}, state}
end

defp after_connect(%{config: %{schema: schema}}), do: after_connect(schema)
defp after_connect(%{config: %{"schema" => schema}}), do: after_connect(schema)

defp after_connect(schema) when is_binary(schema),
do: {Postgrex, :query!, ["set search_path=#{schema}", []]}

defp after_connect(_), do: nil
end
8 changes: 6 additions & 2 deletions lib/logflare/endpoints.ex
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,10 @@ defmodule Logflare.Endpoints do
{endpoint, query_string} =
if SingleTenant.supabase_mode?() and SingleTenant.postgres_backend_adapter_opts() != nil do
# translate the query
{:ok, q} = Logflare.Sql.translate(:bq_sql, :pg_sql, transformed_query)
schema_prefix = Keyword.get(SingleTenant.postgres_backend_adapter_opts(), :schema)

{:ok, q} = Logflare.Sql.translate(:bq_sql, :pg_sql, transformed_query, schema_prefix)

{Map.put(endpoint_query, :language, :pg_sql), q}
else
{endpoint_query, transformed_query}
Expand Down Expand Up @@ -254,7 +257,8 @@ defmodule Logflare.Endpoints do
Map.get(params, parameter)
end

with {:ok, rows} <- PostgresAdaptor.execute_query(source_backend, {transformed_query, args}) do
with {:ok, rows} <-
PostgresAdaptor.execute_query(source_backend, {transformed_query, args}) do
{:ok, %{rows: rows}}
end
end
Expand Down
Loading

0 comments on commit 6e3f538

Please sign in to comment.