diff --git a/VERSION b/VERSION index f23616f6c..c0ff51de6 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.3.27 \ No newline at end of file +1.3.28 \ No newline at end of file diff --git a/lib/logflare/application.ex b/lib/logflare/application.ex index b8533923c..15cfa57e4 100644 --- a/lib/logflare/application.ex +++ b/lib/logflare/application.ex @@ -59,6 +59,8 @@ defmodule Logflare.Application do {DynamicSupervisor, strategy: :one_for_one, name: Logflare.Backends.RecentLogsSup}, {DynamicSupervisor, strategy: :one_for_one, name: Logflare.Backends.Adaptor.PostgresAdaptor.Supervisor}, + {DynamicSupervisor, + strategy: :one_for_one, name: Logflare.Backends.Adaptor.PostgresAdaptor.PgRepoSupervisor}, {Registry, name: Logflare.Backends.SourceRegistry, keys: :unique}, {Registry, name: Logflare.Backends.SourceDispatcher, keys: :duplicate}, {Registry, name: Logflare.CounterRegistry, keys: :unique} @@ -140,6 +142,8 @@ defmodule Logflare.Application do {DynamicSupervisor, strategy: :one_for_one, name: Logflare.Backends.RecentLogsSup}, {DynamicSupervisor, strategy: :one_for_one, name: Logflare.Backends.Adaptor.PostgresAdaptor.Supervisor}, + {DynamicSupervisor, + strategy: :one_for_one, name: Logflare.Backends.Adaptor.PostgresAdaptor.PgRepoSupervisor}, {Registry, name: Logflare.Backends.SourceRegistry, keys: :unique}, {Registry, name: Logflare.Backends.SourceDispatcher, keys: :duplicate}, {Registry, name: Logflare.CounterRegistry, keys: :unique} @@ -190,9 +194,9 @@ defmodule Logflare.Application do if SingleTenant.supabase_mode?() do SingleTenant.create_supabase_sources() SingleTenant.create_supabase_endpoints() + SingleTenant.ensure_supabase_sources_started() unless SingleTenant.postgres_backend?() do - SingleTenant.ensure_supabase_sources_started() # buffer time for all sources to init and create tables # in case of latency. :timer.sleep(3_000) diff --git a/lib/logflare/backends/adaptor/postgres_adaptor.ex b/lib/logflare/backends/adaptor/postgres_adaptor.ex index 4114ef056..99a3e772d 100644 --- a/lib/logflare/backends/adaptor/postgres_adaptor.ex +++ b/lib/logflare/backends/adaptor/postgres_adaptor.ex @@ -41,7 +41,7 @@ defmodule Logflare.Backends.Adaptor.PostgresAdaptor do {:ok, _} <- Registry.register(SourceDispatcher, source_id, {__MODULE__, :ingest}), {:ok, buffer_pid} <- MemoryBuffer.start_link([]), repository_module <- create_repo(source_backend), - :ok <- connect_to_repo(source_backend), + :ok <- connected?(source_backend), :ok <- create_log_events_table(source_backend) do state = %__MODULE__{ buffer_module: MemoryBuffer, @@ -90,7 +90,7 @@ defmodule Logflare.Backends.Adaptor.PostgresAdaptor do @impl true def execute_query(%SourceBackend{} = source_backend, %Ecto.Query{} = query) do mod = create_repo(source_backend) - :ok = connect_to_repo(source_backend) + :ok = connected?(source_backend) result = mod.all(query) {:ok, result} end @@ -98,10 +98,18 @@ defmodule Logflare.Backends.Adaptor.PostgresAdaptor do def execute_query(%SourceBackend{} = source_backend, query_string) when is_binary(query_string), do: execute_query(source_backend, {query_string, []}) - def execute_query(%SourceBackend{} = source_backend, {query_string, params}) + def execute_query(%SourceBackend{config: config} = source_backend, {query_string, params}) when is_binary(query_string) and is_list(params) do mod = create_repo(source_backend) - :ok = connect_to_repo(source_backend) + :ok = connected?(source_backend) + + # explicitly set search path + schema = Map.get(config, "schema") || Map.get(config, :schema) + + if schema do + Ecto.Adapters.SQL.query!(mod, "SET search_path=#{schema}") + end + result = Ecto.Adapters.SQL.query!(mod, query_string, params) rows = @@ -113,8 +121,8 @@ defmodule Logflare.Backends.Adaptor.PostgresAdaptor do end # expose PgRepo functions + defdelegate connected?(source_backend), to: PgRepo defdelegate create_repo(source_backend), to: PgRepo - defdelegate connect_to_repo(source_backend), to: PgRepo defdelegate table_name(source_or_source_backend), to: PgRepo defdelegate create_log_events_table(source_backend), to: PgRepo defdelegate create_log_events_table(source_backend, override_migrations), to: PgRepo diff --git a/lib/logflare/backends/adaptor/postgres_adaptor/pg_repo.ex b/lib/logflare/backends/adaptor/postgres_adaptor/pg_repo.ex index c74cef235..056eabd4a 100644 --- a/lib/logflare/backends/adaptor/postgres_adaptor/pg_repo.ex +++ b/lib/logflare/backends/adaptor/postgres_adaptor/pg_repo.ex @@ -5,14 +5,16 @@ defmodule Logflare.Backends.Adaptor.PostgresAdaptor.PgRepo do Using the Source Backend source id we create a new Ecto.Repo which whom we will be able to connect to the configured PSQL URL, run migrations and insert data. """ - alias Logflare.Backends.Adaptor.PostgresAdaptor.Repo.Migrations.AddLogEvents - alias Logflare.Backends.Adaptor.PostgresAdaptor.Supervisor + use GenServer + alias Logflare.Backends.Adaptor.PostgresAdaptor alias Logflare.Backends.Adaptor.PostgresAdaptor.PgLogEvent + alias Logflare.Backends.Adaptor.PostgresAdaptor.Repo.Migrations.AddLogEvents + alias Logflare.Backends.Adaptor.PostgresAdaptor.PgRepoSupervisor + alias Logflare.Backends.Adaptor.PostgresAdaptor.Supervisor alias Logflare.Backends.SourceBackend - alias Logflare.Source alias Logflare.LogEvent - + alias Logflare.Source require Logger @ast (quote do @@ -26,28 +28,14 @@ defmodule Logflare.Backends.Adaptor.PostgresAdaptor.PgRepo do Requires `:source` to be preloaded. """ @spec create_repo(SourceBackend.t()) :: atom() - - def create_repo(%SourceBackend{source: %Source{}} = source_backend) do + def create_repo(%SourceBackend{source: %Source{}, config: config} = source_backend) do name = get_repo_module(source_backend) - case Code.ensure_compiled(name) do - {:module, _} -> nil - _ -> {:module, _, _, _} = Module.create(name, @ast, Macro.Env.location(__ENV__)) - end - - migration_table = migrations_table_name(source_backend) + unless Process.whereis(name) do + child_spec = {__MODULE__, %{repo_module_name: name, config: config}} - schema = Map.get(source_backend.config, "schema") - - after_connect = - if schema do - {Postgrex, :query!, ["set search_path=#{schema}", []]} - end - - Application.put_env(:logflare, name, - migration_source: migration_table, - after_connect: after_connect - ) + {:ok, _} = DynamicSupervisor.start_child(PgRepoSupervisor, child_spec) + end name end @@ -65,33 +53,24 @@ defmodule Logflare.Backends.Adaptor.PostgresAdaptor.PgRepo do @doc """ Connects to a given postgres. Requires `:source` to be preloaded. """ - @spec connect_to_repo(SourceBackend.t()) :: :ok - def connect_to_repo(%SourceBackend{config: config} = source_backend) do - repo = get_repo_module(source_backend) - - unless Process.whereis(repo) do - pool_size = - Keyword.get(Application.get_env(:logflare, :postgres_backend_adapter), :pool_size, 10) - - opts = [ - {:url, config["url"] || config.url}, - {:name, repo}, - {:pool_size, pool_size} - ] + @spec connected?(SourceBackend.t()) :: :ok | {:error, :not_connected} + def connected?(%SourceBackend{} = source_backend) do + repo_module_name = get_repo_module(source_backend) + connected?(repo_module_name, 5) + end - {:ok, _} = DynamicSupervisor.start_child(Supervisor, repo.child_spec(opts)) - schema = Map.get(source_backend.config, "schema") || Map.get(source_backend.config, :schema) + defp connected?(_repo_module_name, 0), do: {:error, :not_connected} + defp connected?(_repo_module_name, :ok), do: :ok - if schema do - query = """ - CREATE SCHEMA IF NOT EXISTS #{schema} - """ + defp connected?(repo_module_name, acc) do + case GenServer.call(via(repo_module_name), :connected?) do + :ok -> + connected?(repo_module_name, :ok) - {:ok, _} = Ecto.Adapters.SQL.query(repo, query, []) - end + _ -> + :timer.sleep(500) + connected?(repo_module_name, acc - 1) end - - :ok end @doc """ @@ -100,18 +79,12 @@ defmodule Logflare.Backends.Adaptor.PostgresAdaptor.PgRepo do @spec create_log_events_table(SourceBackend.t(), list() | nil) :: :ok | {:error, :failed_migration} def create_log_events_table(source_backend, override_migrations \\ nil) do - repository_module = get_repo_module(source_backend) + repo_module_name = get_repo_module(source_backend) migrations = if override_migrations, do: override_migrations, else: migrations(source_backend) schema = Map.get(source_backend.config, "schema") || Map.get(source_backend.config, :schema) + opts = [all: true, prefix: schema, migrations: migrations] - opts = [all: true, prefix: schema] - Ecto.Migrator.run(repository_module, migrations, :up, opts) - - :ok - rescue - e in Postgrex.Error -> - Logger.error("Error creating log_events table: #{inspect(e)}") - {:error, :failed_migration} + GenServer.call(via(repo_module_name), {:run_migrations, source_backend, opts}) end @doc """ @@ -139,10 +112,8 @@ defmodule Logflare.Backends.Adaptor.PostgresAdaptor.PgRepo do """ @spec rollback_migrations(SourceBackend.t()) :: :ok def rollback_migrations(source_backend) do - repository_module = create_repo(source_backend) - Ecto.Migrator.run(repository_module, migrations(source_backend), :down, all: true) - - :ok + repo_module_name = get_repo_module(source_backend) + GenServer.call(via(repo_module_name), {:rollback_migrations_table, source_backend}) end @doc """ @@ -150,10 +121,8 @@ defmodule Logflare.Backends.Adaptor.PostgresAdaptor.PgRepo do """ @spec drop_migrations_table(SourceBackend.t()) :: :ok def drop_migrations_table(source_backend) do - repository_module = create_repo(source_backend) - migrations_table = migrations_table_name(source_backend) - Ecto.Adapters.SQL.query!(repository_module, "DROP TABLE IF EXISTS #{migrations_table}") - :ok + repo_module_name = get_repo_module(source_backend) + GenServer.call(via(repo_module_name), {:drop_migrations_table, source_backend}) end @doc """ @@ -198,4 +167,106 @@ defmodule Logflare.Backends.Adaptor.PostgresAdaptor.PgRepo do repo.insert(changeset) end + + ## Genserver calls + defp via(repo_module_name) do + {:via, Registry, {Logflare.Backends.SourceRegistry, repo_module_name}} + end + + def start_link(%{repo_module_name: repo_module_name} = state), + do: GenServer.start_link(__MODULE__, state, name: via(repo_module_name)) + + def init(state), do: {:ok, state, {:continue, :generate_repo_module}} + + def handle_continue(:generate_repo_module, %{repo_module_name: repo_module_name} = state) do + case Code.ensure_compiled(repo_module_name) do + {:module, _} -> nil + _ -> {:module, _, _, _} = Module.create(repo_module_name, @ast, Macro.Env.location(__ENV__)) + end + + {:noreply, state, {:continue, :connect_repo}} + end + + def handle_continue(:connect_repo, state) do + %{config: config, repo_module_name: repo_module_name} = state + + pool_size = + Keyword.get(Application.get_env(:logflare, :postgres_backend_adapter), :pool_size, 10) + + schema = Map.get(config, "schema") || Map.get(config, :schema) + repo_opts = [url: config["url"] || config.url, name: repo_module_name, pool_size: pool_size] + + unless Process.whereis(repo_module_name) do + {:ok, _} = DynamicSupervisor.start_child(Supervisor, repo_module_name.child_spec(repo_opts)) + + if schema do + query = """ + CREATE SCHEMA IF NOT EXISTS #{schema} + """ + + {:ok, _} = Ecto.Adapters.SQL.query(repo_module_name, query, []) + Application.put_env(:logflare, repo_module_name, after_connect: after_connect(state)) + end + end + + {:noreply, state} + end + + def handle_call(:connected?, _, %{repo_module_name: repo_module_name} = state) do + %Postgrex.Result{} = Ecto.Adapters.SQL.query!(repo_module_name, "SELECT 1") + {:reply, :ok, state} + rescue + _ -> {:reply, :error, state} + end + + def handle_call({:run_migrations, %SourceBackend{} = source_backend, opts}, _, state) do + %{repo_module_name: repo_module_name} = state + + migrations_table_name = migrations_table_name(source_backend) + {migrations, opts} = Keyword.pop!(opts, :migrations) + + Application.put_env(:logflare, repo_module_name, + migration_source: migrations_table_name, + after_connect: after_connect(state) + ) + + Ecto.Migrator.run(repo_module_name, migrations, :up, opts) + + {:reply, :ok, state} + rescue + e in Postgrex.Error -> + Logger.error("Error creating log_events table: #{inspect(e)}") + {:reply, {:error, :failed_migration}, state} + end + + def handle_call({:drop_migrations_table, %SourceBackend{} = source_backend}, _, state) do + %{repo_module_name: repo_module_name} = state + migrations_table = migrations_table_name(source_backend) + Ecto.Adapters.SQL.query!(repo_module_name, "DROP TABLE IF EXISTS #{migrations_table}") + + {:reply, :ok, state} + rescue + e in Postgrex.Error -> + Logger.error("Error creating log_events table: #{inspect(e)}") + {:reply, {:error, :failed_migration}, state} + end + + def handle_call({:rollback_migrations_table, %SourceBackend{} = source_backend}, _, state) do + repository_module = create_repo(source_backend) + Ecto.Migrator.run(repository_module, migrations(source_backend), :down, all: true) + + {:reply, :ok, state} + rescue + e in Postgrex.Error -> + Logger.error("Error creating log_events table: #{inspect(e)}") + {:reply, {:error, :failed_migration}, state} + end + + defp after_connect(%{config: %{schema: schema}}), do: after_connect(schema) + defp after_connect(%{config: %{"schema" => schema}}), do: after_connect(schema) + + defp after_connect(schema) when is_binary(schema), + do: {Postgrex, :query!, ["set search_path=#{schema}", []]} + + defp after_connect(_), do: nil end diff --git a/lib/logflare/endpoints.ex b/lib/logflare/endpoints.ex index 66ced3f8e..e137556dc 100644 --- a/lib/logflare/endpoints.ex +++ b/lib/logflare/endpoints.ex @@ -168,7 +168,10 @@ defmodule Logflare.Endpoints do {endpoint, query_string} = if SingleTenant.supabase_mode?() and SingleTenant.postgres_backend_adapter_opts() != nil do # translate the query - {:ok, q} = Logflare.Sql.translate(:bq_sql, :pg_sql, transformed_query) + schema_prefix = Keyword.get(SingleTenant.postgres_backend_adapter_opts(), :schema) + + {:ok, q} = Logflare.Sql.translate(:bq_sql, :pg_sql, transformed_query, schema_prefix) + {Map.put(endpoint_query, :language, :pg_sql), q} else {endpoint_query, transformed_query} @@ -254,7 +257,8 @@ defmodule Logflare.Endpoints do Map.get(params, parameter) end - with {:ok, rows} <- PostgresAdaptor.execute_query(source_backend, {transformed_query, args}) do + with {:ok, rows} <- + PostgresAdaptor.execute_query(source_backend, {transformed_query, args}) do {:ok, %{rows: rows}} end end diff --git a/lib/logflare/single_tenant.ex b/lib/logflare/single_tenant.ex index e7539d95c..539fa25db 100644 --- a/lib/logflare/single_tenant.ex +++ b/lib/logflare/single_tenant.ex @@ -51,42 +51,45 @@ defmodule Logflare.SingleTenant do "postgREST.logs.prod", "pgbouncer.logs.prod" ] - @endpoint_params [ - %{ - name: "logs.all", - query: - Application.app_dir(:logflare, "priv/supabase/endpoints/logs.all.sql") |> File.read!(), - sandboxable: true, - max_limit: 1000, - language: :bq_sql, - enable_auth: true, - cache_duration_seconds: 0 - }, - %{ - name: "usage.api-counts", - query: - Application.app_dir(:logflare, "priv/supabase/endpoints/usage.api-counts.sql") - |> File.read!(), - sandboxable: true, - max_limit: 1000, - language: :bq_sql, - enable_auth: true, - cache_duration_seconds: 900, - proactive_requerying_seconds: 300 - }, - %{ - name: "functions.invocation-stats", - query: - Application.app_dir(:logflare, "priv/supabase/endpoints/functions.invocation-stats.sql") - |> File.read!(), - sandboxable: true, - max_limit: 1000, - language: :bq_sql, - enable_auth: true, - cache_duration_seconds: 900, - proactive_requerying_seconds: 300 - } - ] + + defp endpoint_params do + [ + %{ + name: "logs.all", + query: + Application.app_dir(:logflare, "priv/supabase/endpoints/logs.all.sql") |> File.read!(), + sandboxable: true, + max_limit: 1000, + language: :bq_sql, + enable_auth: true, + cache_duration_seconds: 0 + }, + %{ + name: "usage.api-counts", + query: + Application.app_dir(:logflare, "priv/supabase/endpoints/usage.api-counts.sql") + |> File.read!(), + sandboxable: true, + max_limit: 1000, + language: :bq_sql, + enable_auth: true, + cache_duration_seconds: 900, + proactive_requerying_seconds: 300 + }, + %{ + name: "functions.invocation-stats", + query: + Application.app_dir(:logflare, "priv/supabase/endpoints/functions.invocation-stats.sql") + |> File.read!(), + sandboxable: true, + max_limit: 1000, + language: :bq_sql, + enable_auth: true, + cache_duration_seconds: 900, + proactive_requerying_seconds: 300 + } + ] + end @doc """ Retrieves the default user @@ -191,7 +194,7 @@ defmodule Logflare.SingleTenant do if count == 0 do endpoints = - for params <- @endpoint_params do + for params <- endpoint_params() do {:ok, endpoint} = Endpoints.create_query(user, params) endpoint end diff --git a/lib/logflare/sql.ex b/lib/logflare/sql.ex index 150ff6ae2..88ac34d87 100644 --- a/lib/logflare/sql.ex +++ b/lib/logflare/sql.ex @@ -686,7 +686,7 @@ defmodule Logflare.Sql do end) end - def translate(:bq_sql, :pg_sql, query) when is_binary(query) do + def translate(:bq_sql, :pg_sql, query, schema_prefix \\ nil) when is_binary(query) do {:ok, stmts} = Parser.parse("bigquery", query) for ast <- stmts do @@ -702,13 +702,21 @@ defmodule Logflare.Sql do ast |> Parser.to_string() + # explicitly set the schema prefix of the table + replacement_pattern = + if schema_prefix do + ~s|"#{schema_prefix}"."log_events_\\g{2}"| + else + "\"log_events_\\g{2}\"" + end + converted = query_string |> bq_to_pg_convert_parameters(params) # TODO: remove once sqlparser-rs bug is fixed # parser for postgres adds parenthesis to the end for postgres |> String.replace(~r/current\_timestamp\(\)/im, "current_timestamp") - |> String.replace(~r/\"([\w\_\-]*\.[\w\_\-]+)\.([\w_]{36})"/im, "\"log_events_\\g{2}\"") + |> String.replace(~r/\"([\w\_\-]*\.[\w\_\-]+)\.([\w_]{36})"/im, replacement_pattern) {:ok, converted} end) @@ -722,7 +730,7 @@ defmodule Logflare.Sql do |> Map.to_list() |> Enum.sort_by(fn {i, _v} -> i end, :asc) |> Enum.reduce(string, fn {index, param}, acc -> - Regex.replace(~r/@#{param}(?!:\s|$)/, acc, "$#{index}", global: false) + Regex.replace(~r/@#{param}(?!:\s|$)/, acc, "$#{index}::text", global: false) end) end @@ -783,12 +791,19 @@ defmodule Logflare.Sql do }} "timestamp_trunc" -> - to_trunc = get_in(v, ["args", Access.at(0)]) + to_trunc = get_in(v, ["args", Access.at(0), "Unnamed", "Expr"]) interval_type = get_in(v, ["args", Access.at(1), "Unnamed", "Expr", "Identifier", "value"]) |> String.downcase() + field_arg = + if is_timestamp_identifier?(to_trunc) do + at_time_zone(to_trunc) + else + to_trunc + end + {k, %{ v @@ -796,7 +811,9 @@ defmodule Logflare.Sql do %{ "Unnamed" => %{"Expr" => %{"Value" => %{"SingleQuotedString" => interval_type}}} }, - bq_to_pg_convert_functions(to_trunc) + %{ + "Unnamed" => %{"Expr" => field_arg} + } ], "name" => [%{"quote_style" => nil, "value" => "date_trunc"}] }} @@ -820,6 +837,35 @@ defmodule Logflare.Sql do defp bq_to_pg_convert_functions(kv), do: kv + # handle timestamp references in binary operations + defp pg_traverse_final_pass( + {"BinaryOp" = k, + %{ + "left" => left, + "right" => right, + "op" => operator + } = v} + ) do + [left, right] = + for expr <- [left, right] do + cond do + is_timestamp_identifier?(expr) -> + at_time_zone(expr) + + is_identifier?(expr) and operator == "Eq" -> + # wrap with a cast + expr + |> cast_to_jsonb() + |> jsonb_to_text() + + true -> + expr + end + end + + {k, %{v | "left" => left, "right" => right} |> pg_traverse_final_pass()} + end + # convert backticks to double quotes defp pg_traverse_final_pass({"quote_style" = k, "`"}), do: {k, "\""} # drop cross join unnest @@ -908,9 +954,11 @@ defmodule Logflare.Sql do cte_aliases: cte_aliases, cte_from_aliases: cte_from_aliases, in_cte_tables_tree: false, - in_cast: false, + in_function_or_cast: false, + in_projection_tree: false, from_table_aliases: [], - from_table_values: [] + from_table_values: [], + in_binaryop: false }) |> then(fn ast when joins != [] -> @@ -923,6 +971,50 @@ defmodule Logflare.Sql do defp convert_keys_to_json_query(identifiers, data, base \\ "body") + # convert body.timestamp from unix microsecond to postgres timestamp + defp convert_keys_to_json_query( + %{"CompoundIdentifier" => [%{"value" => "timestamp"}]}, + %{ + in_cte_tables_tree: in_cte_tables_tree, + cte_aliases: cte_aliases, + in_projection_tree: false + } = _data, + [ + table, + "body" + ] + ) + when cte_aliases == %{} or in_cte_tables_tree == true do + at_time_zone(%{ + "Nested" => %{ + "JsonAccess" => %{ + "left" => %{ + "CompoundIdentifier" => [ + %{"quote_style" => nil, "value" => table}, + %{"quote_style" => nil, "value" => "body"} + ] + }, + "operator" => "LongArrow", + "right" => %{"Value" => %{"SingleQuotedString" => "timestamp"}} + } + } + }) + end + + defp convert_keys_to_json_query(%{"Identifier" => %{"value" => "timestamp"}}, _data, "body") do + at_time_zone(%{ + "Nested" => %{ + "JsonAccess" => %{ + "left" => %{ + "Identifier" => %{"quote_style" => nil, "value" => "body"} + }, + "operator" => "LongArrow", + "right" => %{"Value" => %{"SingleQuotedString" => "timestamp"}} + } + } + }) + end + defp convert_keys_to_json_query( %{"CompoundIdentifier" => [%{"value" => key}]}, data, @@ -937,7 +1029,8 @@ defmodule Logflare.Sql do %{"quote_style" => nil, "value" => field} ] }, - "operator" => if(data.in_cast, do: "LongArrow", else: "Arrow"), + "operator" => + if(data.in_function_or_cast or data.in_binaryop, do: "LongArrow", else: "Arrow"), "right" => %{"Value" => %{"SingleQuotedString" => key}} } } @@ -953,25 +1046,55 @@ defmodule Logflare.Sql do "Nested" => %{ "JsonAccess" => %{ "left" => %{"Identifier" => %{"quote_style" => nil, "value" => base}}, - "operator" => if(data.in_cast, do: "LongArrow", else: "Arrow"), + "operator" => + if(data.in_function_or_cast or data.in_binaryop, do: "LongArrow", else: "Arrow"), "right" => %{"Value" => %{"SingleQuotedString" => key}} } } } end + # handle cross join aliases when there are different base field names as compared to what is referenced + defp convert_keys_to_json_query( + %{"CompoundIdentifier" => [%{"value" => _join_alias}, %{"value" => key} | _]}, + data, + {base, arr_path} + ) do + str_path = Enum.join(arr_path, ",") + path = "{#{str_path},#{key}}" + + %{ + "Nested" => %{ + "JsonAccess" => %{ + "left" => %{"Identifier" => %{"quote_style" => nil, "value" => base}}, + "operator" => + if(data.in_function_or_cast or data.in_binaryop, + do: "HashLongArrow", + else: "HashArrow" + ), + "right" => %{"Value" => %{"SingleQuotedString" => path}} + } + } + } + end + defp convert_keys_to_json_query( %{"CompoundIdentifier" => [%{"value" => join_alias}, %{"value" => key} | _]}, data, base ) do - path = "{#{data.alias_path_mappings[join_alias]},#{key}}" + str_path = Enum.join(data.alias_path_mappings[join_alias], ",") + path = "{#{str_path},#{key}}" %{ "Nested" => %{ "JsonAccess" => %{ "left" => %{"Identifier" => %{"quote_style" => nil, "value" => base}}, - "operator" => if(data.in_cast, do: "HashLongArrow", else: "HashArrow"), + "operator" => + if(data.in_function_or_cast or data.in_binaryop, + do: "HashLongArrow", + else: "HashArrow" + ), "right" => %{"Value" => %{"SingleQuotedString" => path}} } } @@ -987,7 +1110,8 @@ defmodule Logflare.Sql do "Nested" => %{ "JsonAccess" => %{ "left" => %{"Identifier" => %{"quote_style" => nil, "value" => base}}, - "operator" => if(data.in_cast, do: "LongArrow", else: "Arrow"), + "operator" => + if(data.in_function_or_cast or data.in_binaryop, do: "LongArrow", else: "Arrow"), "right" => %{"Value" => %{"SingleQuotedString" => name}} } } @@ -1020,7 +1144,19 @@ defmodule Logflare.Sql do get_in(from, ["relation", "Table", "alias", "name", "value"]) end) - for from <- from_list, + for from <- from_list do + Enum.reduce(from["joins"] || [], %{}, fn + %{ + "relation" => %{ + "UNNEST" => %{ + "array_expr" => %{"Identifier" => %{"value" => identifier_val}}, + "alias" => %{"name" => %{"value" => alias_name}} + } + } + }, + acc -> + Map.put(acc, alias_name, [identifier_val]) + %{ "relation" => %{ "UNNEST" => %{ @@ -1028,19 +1164,32 @@ defmodule Logflare.Sql do "alias" => %{"name" => %{"value" => alias_name}} } } - } <- from["joins"] || [], - into: %{} do - arr_path = for i <- identifiers, value = i["value"], value not in table_aliases, do: value + }, + acc -> + arr_path = + for i <- identifiers, value = i["value"], value not in table_aliases do + if is_map_key(acc, value), do: acc[value], else: [value] + end + |> List.flatten() - str_path = Enum.join(arr_path, ",") - {alias_name, str_path} + Map.put(acc, alias_name, arr_path) + end) end + |> Enum.reduce(%{}, fn mappings, acc -> Map.merge(acc, mappings) end) + end + + defp traverse_convert_identifiers({"BinaryOp" = k, v}, data) do + {k, traverse_convert_identifiers(v, Map.put(data, :in_binaryop, true))} end defp traverse_convert_identifiers({"cte_tables" = k, v}, data) do {k, traverse_convert_identifiers(v, Map.put(data, :in_cte_tables_tree, true))} end + defp traverse_convert_identifiers({"projection" = k, v}, data) do + {k, traverse_convert_identifiers(v, Map.put(data, :in_projection_tree, true))} + end + # handle top level queries defp traverse_convert_identifiers( {"Query" = k, %{"body" => %{"Select" => %{"from" => [_ | _] = from_list}}} = v}, @@ -1062,10 +1211,13 @@ defmodule Logflare.Sql do value_map["value"] end + alias_path_mappings = get_bq_alias_path_mappings(%{"Query" => v}) + data = Map.merge(data, %{ from_table_aliases: aliases, - from_table_values: values + from_table_values: values, + alias_path_mappings: alias_path_mappings }) {k, traverse_convert_identifiers(v, data)} @@ -1108,8 +1260,8 @@ defmodule Logflare.Sql do {k, traverse_convert_identifiers(v, data)} end - defp traverse_convert_identifiers({"Cast" = k, v}, data) do - {k, traverse_convert_identifiers(v, Map.put(data, :in_cast, true))} + defp traverse_convert_identifiers({k, v}, data) when k in ["Function", "Cast"] do + {k, traverse_convert_identifiers(v, Map.put(data, :in_function_or_cast, true))} end # auto set the column alias if not set @@ -1132,7 +1284,31 @@ defmodule Logflare.Sql do {"CompoundIdentifier" = k, [%{"value" => head_val}, tail] = v}, data ) do + # dbg({v, data}) cond do + is_map_key(data.alias_path_mappings, head_val) and + length(data.alias_path_mappings[head_val || []]) > 1 -> + # referencing a cross join unnest + # pop first path part and use it as the base + # with a cross join unnest(metadata) as m + # with a cross join unnest(m.request) as request + # reference of request.status_code gets converted to: + # metadata -> 'request, status_code' + # base is set to the first item of the path (full json path is metadata.request.status_code) + + # pop the first + [base | arr_path] = data.alias_path_mappings[head_val] + + convert_keys_to_json_query(%{k => v}, data, {base, arr_path}) + |> Map.to_list() + |> List.first() + + # outside of a cte, referencing table alias + # preserve as is + head_val in data.from_table_aliases and data.in_cte_tables_tree == false and + data.cte_aliases != %{} -> + {k, v} + # first OR condition: outside of cte and non-cte # second OR condition: inside a cte head_val in data.from_table_aliases or @@ -1207,4 +1383,71 @@ defmodule Logflare.Sql do |> Map.to_list() |> List.first() end + + defp is_identifier?(identifier), + do: is_map_key(identifier, "CompoundIdentifier") or is_map_key(identifier, "Identifier") + + defp is_timestamp_identifier?(%{"Identifier" => %{"value" => "timestamp"}}), do: true + + defp is_timestamp_identifier?(%{"CompoundIdentifier" => [_head, %{"value" => "timestamp"}]}), + do: true + + defp is_timestamp_identifier?(_), do: false + + defp at_time_zone(identifier) do + %{ + "Nested" => %{ + "AtTimeZone" => %{ + "time_zone" => "UTC", + "timestamp" => %{ + "Function" => %{ + "args" => [ + %{ + "Unnamed" => %{ + "Expr" => %{ + "BinaryOp" => %{ + "left" => %{ + "Cast" => %{ + "data_type" => %{"BigInt" => nil}, + "expr" => identifier + } + }, + "op" => "Divide", + "right" => %{"Value" => %{"Number" => ["1000000.0", false]}} + } + } + } + } + ], + "distinct" => false, + "name" => [%{"quote_style" => nil, "value" => "to_timestamp"}], + "over" => nil, + "special" => false + } + } + } + } + } + end + + defp cast_to_jsonb(identifier) do + %{ + "Cast" => %{ + "data_type" => %{"Custom" => [[%{"quote_style" => nil, "value" => "jsonb"}], []]}, + "expr" => identifier + } + } + end + + defp jsonb_to_text(jsonb) do + %{ + "Nested" => %{ + "JsonAccess" => %{ + "left" => jsonb, + "operator" => "HashLongArrow", + "right" => %{"Value" => %{"SingleQuotedString" => "{}"}} + } + } + } + end end diff --git a/priv/supabase/endpoints/logs.all.sql b/priv/supabase/endpoints/logs.all.sql index 9c9f34403..9d97a75ba 100644 --- a/priv/supabase/endpoints/logs.all.sql +++ b/priv/supabase/endpoints/logs.all.sql @@ -21,8 +21,8 @@ where -- order of the where clauses matters -- project then timestamp then everything else t.project = @project - AND CASE WHEN COALESCE(@iso_timestamp_start, '') = '' THEN TRUE ELSE cast(t.timestamp as timestamp) > @iso_timestamp_start END - AND CASE WHEN COALESCE(@iso_timestamp_end, '') = '' THEN TRUE ELSE cast(t.timestamp as timestamp) <= @iso_timestamp_end END + AND CASE WHEN COALESCE(@iso_timestamp_start, '') = '' THEN TRUE ELSE cast(t.timestamp as timestamp) > cast(@iso_timestamp_start as timestamp) END + AND CASE WHEN COALESCE(@iso_timestamp_end, '') = '' THEN TRUE ELSE cast(t.timestamp as timestamp) <= cast(@iso_timestamp_end as timestamp) END AND cast(t.timestamp as timestamp) > retention.date order by cast(t.timestamp as timestamp) desc @@ -39,8 +39,8 @@ where -- order of the where clauses matters -- project then timestamp then everything else t.project = @project - AND CASE WHEN COALESCE(@iso_timestamp_start, '') = '' THEN TRUE ELSE cast(t.timestamp as timestamp) > @iso_timestamp_start END - AND CASE WHEN COALESCE(@iso_timestamp_end, '') = '' THEN TRUE ELSE cast(t.timestamp as timestamp) <= @iso_timestamp_end END + AND CASE WHEN COALESCE(@iso_timestamp_start, '') = '' THEN TRUE ELSE cast(t.timestamp as timestamp) > cast(@iso_timestamp_start as timestamp) END + AND CASE WHEN COALESCE(@iso_timestamp_end, '') = '' THEN TRUE ELSE cast(t.timestamp as timestamp) <= cast(@iso_timestamp_end as timestamp) END AND cast(t.timestamp as timestamp) > retention.date order by cast(t.timestamp as timestamp) desc ), @@ -54,8 +54,8 @@ select from retention, `deno-relay-logs` as t cross join unnest(t.metadata) as m where - CASE WHEN COALESCE(@iso_timestamp_start, '') = '' THEN TRUE ELSE cast(t.timestamp as timestamp) > @iso_timestamp_start END - AND CASE WHEN COALESCE(@iso_timestamp_end, '') = '' THEN TRUE ELSE cast(t.timestamp as timestamp) <= @iso_timestamp_end END + CASE WHEN COALESCE(@iso_timestamp_start, '') = '' THEN TRUE ELSE cast(t.timestamp as timestamp) > cast(@iso_timestamp_start as timestamp) END + AND CASE WHEN COALESCE(@iso_timestamp_end, '') = '' THEN TRUE ELSE cast(t.timestamp as timestamp) <= cast(@iso_timestamp_end as timestamp) END and m.project_ref = @project AND cast(t.timestamp as timestamp) > retention.date order by cast(t.timestamp as timestamp) desc @@ -73,8 +73,8 @@ where -- order of the where clauses matters -- project then timestamp then everything else m.project_ref = @project - AND CASE WHEN COALESCE(@iso_timestamp_start, '') = '' THEN TRUE ELSE cast(t.timestamp as timestamp) > @iso_timestamp_start END - AND CASE WHEN COALESCE(@iso_timestamp_end, '') = '' THEN TRUE ELSE cast(t.timestamp as timestamp) <= @iso_timestamp_end END + AND CASE WHEN COALESCE(@iso_timestamp_start, '') = '' THEN TRUE ELSE cast(t.timestamp as timestamp) > cast(@iso_timestamp_start as timestamp) END + AND CASE WHEN COALESCE(@iso_timestamp_end, '') = '' THEN TRUE ELSE cast(t.timestamp as timestamp) <= cast(@iso_timestamp_end as timestamp) END AND cast(t.timestamp as timestamp) > retention.date order by cast(t.timestamp as timestamp) desc ), @@ -92,8 +92,8 @@ where -- project then timestamp then everything else -- m.project = @project t.project = @project - AND CASE WHEN COALESCE(@iso_timestamp_start, '') = '' THEN TRUE ELSE cast(t.timestamp as timestamp) > @iso_timestamp_start END - AND CASE WHEN COALESCE(@iso_timestamp_end, '') = '' THEN TRUE ELSE cast(t.timestamp as timestamp) <= @iso_timestamp_end END + AND CASE WHEN COALESCE(@iso_timestamp_start, '') = '' THEN TRUE ELSE cast(t.timestamp as timestamp) > cast(@iso_timestamp_start as timestamp) END + AND CASE WHEN COALESCE(@iso_timestamp_end, '') = '' THEN TRUE ELSE cast(t.timestamp as timestamp) <= cast(@iso_timestamp_end as timestamp) END AND cast(t.timestamp as timestamp) > retention.date order by cast(t.timestamp as timestamp) desc ), @@ -108,8 +108,8 @@ from retention, `realtime.logs.prod` as t cross join unnest(t.metadata) as m where m.project = @project - AND CASE WHEN COALESCE(@iso_timestamp_start, '') = '' THEN TRUE ELSE cast(t.timestamp as timestamp) > @iso_timestamp_start END - AND CASE WHEN COALESCE(@iso_timestamp_end, '') = '' THEN TRUE ELSE cast(t.timestamp as timestamp) <= @iso_timestamp_end END + AND CASE WHEN COALESCE(@iso_timestamp_start, '') = '' THEN TRUE ELSE cast(t.timestamp as timestamp) > cast(@iso_timestamp_start as timestamp) END + AND CASE WHEN COALESCE(@iso_timestamp_end, '') = '' THEN TRUE ELSE cast(t.timestamp as timestamp) <= cast(@iso_timestamp_end as timestamp) END AND cast(t.timestamp as timestamp) > retention.date order by cast(t.timestamp as timestamp) desc ), @@ -124,8 +124,8 @@ from retention, `storage.logs.prod.2` as t cross join unnest(t.metadata) as m where m.project = @project - AND CASE WHEN COALESCE(@iso_timestamp_start, '') = '' THEN TRUE ELSE cast(t.timestamp as timestamp) > @iso_timestamp_start END - AND CASE WHEN COALESCE(@iso_timestamp_end, '') = '' THEN TRUE ELSE cast(t.timestamp as timestamp) <= @iso_timestamp_end END + AND CASE WHEN COALESCE(@iso_timestamp_start, '') = '' THEN TRUE ELSE cast(t.timestamp as timestamp) > cast(@iso_timestamp_start as timestamp) END + AND CASE WHEN COALESCE(@iso_timestamp_end, '') = '' THEN TRUE ELSE cast(t.timestamp as timestamp) <= cast(@iso_timestamp_end as timestamp) END AND cast(t.timestamp as timestamp) > retention.date order by cast(t.timestamp as timestamp) desc ), @@ -139,8 +139,8 @@ select from retention, `postgREST.logs.prod` as t cross join unnest(t.metadata) as m where - CASE WHEN COALESCE(@iso_timestamp_start, '') = '' THEN TRUE ELSE cast(t.timestamp as timestamp) > @iso_timestamp_start END - AND CASE WHEN COALESCE(@iso_timestamp_end, '') = '' THEN TRUE ELSE cast(t.timestamp as timestamp) <= @iso_timestamp_end END + CASE WHEN COALESCE(@iso_timestamp_start, '') = '' THEN TRUE ELSE cast(t.timestamp as timestamp) > cast(@iso_timestamp_start as timestamp) END + AND CASE WHEN COALESCE(@iso_timestamp_end, '') = '' THEN TRUE ELSE cast(t.timestamp as timestamp) <= cast(@iso_timestamp_end as timestamp) END AND t.project = @project AND cast(t.timestamp as timestamp) > retention.date order by cast(t.timestamp as timestamp) desc @@ -155,8 +155,8 @@ select from retention, `pgbouncer.logs.prod` as t cross join unnest(t.metadata) as m where - CASE WHEN COALESCE(@iso_timestamp_start, '') = '' THEN TRUE ELSE cast(t.timestamp as timestamp) > @iso_timestamp_start END - AND CASE WHEN COALESCE(@iso_timestamp_end, '') = '' THEN TRUE ELSE cast(t.timestamp as timestamp) <= @iso_timestamp_end END + CASE WHEN COALESCE(@iso_timestamp_start, '') = '' THEN TRUE ELSE cast(t.timestamp as timestamp) > cast(@iso_timestamp_start as timestamp) END + AND CASE WHEN COALESCE(@iso_timestamp_end, '') = '' THEN TRUE ELSE cast(t.timestamp as timestamp) <= cast(@iso_timestamp_end as timestamp) END AND t.project = @project AND cast(t.timestamp as timestamp) > retention.date order by cast(t.timestamp as timestamp) desc diff --git a/test/logflare/backends/postgres_adaptor_test.exs b/test/logflare/backends/postgres_adaptor_test.exs index e86aec903..7670e8fb0 100644 --- a/test/logflare/backends/postgres_adaptor_test.exs +++ b/test/logflare/backends/postgres_adaptor_test.exs @@ -71,17 +71,20 @@ defmodule Logflare.Backends.Adaptor.PostgresAdaptorTest do end describe "repo module" do - test "create_repo/1 creates a new Ecto.Repo for given source_backend", %{ + test "connected?/1 true when repo is connected", %{source_backend: source_backend} do + PostgresAdaptor.create_repo(source_backend) + assert :ok = PostgresAdaptor.connected?(source_backend) + end + + test "create_repo/1 creates a new Ecto.Repo for given source_backend and connects", %{ source_backend: source_backend } do repo = PostgresAdaptor.create_repo(source_backend) + assert :ok = PostgresAdaptor.connected?(source_backend) assert Keyword.get(repo.__info__(:attributes), :behaviour) == [Ecto.Repo] - env = Application.get_env(:logflare, repo) # module name should have a prefix assert "Elixir.Logflare.Repo.Postgres.Adaptor" <> _ = Atom.to_string(repo) - - assert env[:migration_source] == PostgresAdaptor.migrations_table_name(source_backend) end test "custom schema", %{source: source, postgres_url: url} do @@ -93,7 +96,7 @@ defmodule Logflare.Backends.Adaptor.PostgresAdaptorTest do source_backend = insert(:source_backend, type: :postgres, source: source, config: config) PostgresAdaptor.create_repo(source_backend) - assert :ok = PostgresAdaptor.connect_to_repo(source_backend) + assert :ok = PostgresAdaptor.connected?(source_backend) assert {:ok, [%{"schema_name" => "my_schema"}]} = PostgresAdaptor.execute_query( @@ -111,7 +114,7 @@ defmodule Logflare.Backends.Adaptor.PostgresAdaptorTest do source_backend: source_backend } do repo = PostgresAdaptor.create_repo(source_backend) - assert :ok = PostgresAdaptor.connect_to_repo(source_backend) + assert :ok = PostgresAdaptor.connected?(source_backend) assert :ok = PostgresAdaptor.create_log_events_table(source_backend) query = from(l in PostgresAdaptor.table_name(source_backend), select: l.body) assert repo.all(query) == [] @@ -119,7 +122,7 @@ defmodule Logflare.Backends.Adaptor.PostgresAdaptorTest do test "handle migration errors", %{source_backend: source_backend} do PostgresAdaptor.create_repo(source_backend) - assert :ok = PostgresAdaptor.connect_to_repo(source_backend) + assert :ok = PostgresAdaptor.connected?(source_backend) bad_migrations = [{0, BadMigration}] assert capture_log(fn -> diff --git a/test/logflare/endpoints_test.exs b/test/logflare/endpoints_test.exs index 2d6a99c65..b0116172f 100644 --- a/test/logflare/endpoints_test.exs +++ b/test/logflare/endpoints_test.exs @@ -221,7 +221,7 @@ defmodule Logflare.EndpointsTest do ) PostgresAdaptor.create_repo(source_backend) - PostgresAdaptor.connect_to_repo(source_backend) + assert :ok = PostgresAdaptor.connected?(source_backend) PostgresAdaptor.create_log_events_table(source_backend) on_exit(fn -> diff --git a/test/logflare/sql_test.exs b/test/logflare/sql_test.exs index e014cf9a0..93d7b935d 100644 --- a/test/logflare/sql_test.exs +++ b/test/logflare/sql_test.exs @@ -451,7 +451,7 @@ defmodule Logflare.SqlTest do test "countif into count-filter" do bq_query = "select countif(test = 1) from my_table" - pg_query = ~s|select count(*) filter (where (body -> 'test') = 1) from my_table| + pg_query = ~s|select count(*) filter (where (body ->> 'test') = 1) from my_table| {:ok, translated} = Sql.translate(:bq_sql, :pg_sql, bq_query) assert Sql.Parser.parse("postgres", translated) == Sql.Parser.parse("postgres", pg_query) end @@ -478,7 +478,7 @@ defmodule Logflare.SqlTest do assert Sql.Parser.parse("postgres", translated) == Sql.Parser.parse("postgres", pg_query) end - test "timestamp_trunc" do + test "timestamp_trunc without a field reference" do bq_query = "select timestamp_trunc(current_timestamp(), day) as t" pg_query = ~s|select date_trunc('day', current_timestamp) as t| {:ok, translated} = Sql.translate(:bq_sql, :pg_sql, bq_query) @@ -514,7 +514,7 @@ defmodule Logflare.SqlTest do ), b as ( select 'btest' as other from a, my_table t - where a.col = (t.body -> 'my_col') + where (a.col::jsonb #>> '{}' ) = (t.body ->> 'my_col') ) select a.col as col from a """ @@ -564,7 +564,7 @@ defmodule Logflare.SqlTest do with a as ( select 'test' as col from my_table t - order by cast(t.timestamp as timestamp) desc + order by cast(t.my_col as timestamp) desc ) select a.col from a """ @@ -572,7 +572,7 @@ defmodule Logflare.SqlTest do with a as ( select 'test' as col from my_table t - order by cast( (t.body ->> 'timestamp') as timestamp) desc + order by cast( (t.body ->> 'my_col') as timestamp) desc ) select a.col as col from a """ @@ -585,7 +585,7 @@ defmodule Logflare.SqlTest do with a as ( select 'test' as col from my_table t - order by cast(t.timestamp as timestamp) desc + order by cast(t.my_col as timestamp) desc ) select 'tester' as col """ @@ -593,7 +593,7 @@ defmodule Logflare.SqlTest do with a as ( select 'test' as col from my_table t - order by cast( (t.body ->> 'timestamp') as timestamp) desc + order by cast( (t.body ->> 'my_col') as timestamp) desc ) select 'tester' as col """ @@ -615,7 +615,7 @@ defmodule Logflare.SqlTest do with a as ( select 'test' as col from my_table t - where (body #> '{metadata,project}') = '123' + where (body #>> '{metadata,project}') = '123' ) select a.col as col from a """ @@ -637,7 +637,7 @@ defmodule Logflare.SqlTest do with c as (select '123' as val), a as ( select 'test' as col from c, my_table t - where (body #> '{metadata,project}') = '123' + where (body #>> '{metadata,project}') = '123' ) select a.col as col from a """ @@ -653,10 +653,26 @@ defmodule Logflare.SqlTest do assert Sql.Parser.parse("postgres", translated) == Sql.Parser.parse("postgres", pg_query) end + test "field references within a DATE_TRUNC() are converted to ->> syntax for string casting" do + bq_query = ~s|select DATE_TRUNC('day', col) as date from my_table| + pg_query = ~s|select DATE_TRUNC('day', (body ->> 'col')) as date from my_table| + + {:ok, translated} = Sql.translate(:bq_sql, :pg_sql, bq_query) + assert Sql.Parser.parse("postgres", translated) == Sql.Parser.parse("postgres", pg_query) + end + + test "field references in left-right operators are converted to ->> syntax" do + bq_query = ~s|select t.id = 'test' as value from my_table t| + pg_query = ~s|select (t.body ->> 'id') = 'test' as value from my_table t| + + {:ok, translated} = Sql.translate(:bq_sql, :pg_sql, bq_query) + assert Sql.Parser.parse("postgres", translated) == Sql.Parser.parse("postgres", pg_query) + end + test "order by json query" do - bq_query = ~s|select id from my_source t order by t.timestamp| + bq_query = ~s|select id from my_source t order by t.my_col| - pg_query = ~s|select (body -> 'id') as id from my_source t order by (t.body -> 'timestamp')| + pg_query = ~s|select (body -> 'id') as id from my_source t order by (t.body -> 'my_col')| {:ok, translated} = Sql.translate(:bq_sql, :pg_sql, bq_query) assert Sql.Parser.parse("postgres", translated) == Sql.Parser.parse("postgres", pg_query) @@ -669,7 +685,8 @@ defmodule Logflare.SqlTest do bq_query = ~s|select @test as arg1, @test_another as arg2, coalesce(@test, '') > @test as arg_copy| - pg_query = ~s|select $1 as arg1, $2 as arg2, coalesce($3, '') > $4 as arg_copy| + pg_query = + ~s|select $1::text as arg1, $2::text as arg2, coalesce($3::text, '') > $4::text as arg_copy| {:ok, translated} = Sql.translate(:bq_sql, :pg_sql, bq_query) assert Sql.Parser.parse("postgres", translated) == Sql.Parser.parse("postgres", pg_query) @@ -696,6 +713,120 @@ defmodule Logflare.SqlTest do assert translated =~ ~s("log_events_b658a216_0aef_427e_bae8_9dfc68aad6dd") end + test "custom schema prefixing" do + input = + "SELECT body, event_message, timestamp FROM `.1_prod.b658a216_0aef_427e_bae8_9dfc68aad6dd`" + + {:ok, translated} = Sql.translate(:bq_sql, :pg_sql, input) + assert translated =~ ~s("log_events_b658a216_0aef_427e_bae8_9dfc68aad6dd") + {:ok, translated} = Sql.translate(:bq_sql, :pg_sql, input, "my_schema") + assert translated =~ ~s("my_schema"."log_events_b658a216_0aef_427e_bae8_9dfc68aad6dd") + end + + test "unix microsecond timestamp handling" do + bq_query = ~s|select t.timestamp as ts from my_table t| + + pg_query = ~s|select (t.body -> 'timestamp') as ts from my_table t| + + {:ok, translated} = Sql.translate(:bq_sql, :pg_sql, bq_query) + assert Sql.Parser.parse("postgres", translated) == Sql.Parser.parse("postgres", pg_query) + + # only convert if not in projection + bq_query = ~s|select t.id as id from my_table t where t.timestamp is not null| + + pg_query = + ~s|select (t.body -> 'id') as id from my_table t where (to_timestamp( (t.body ->> 'timestamp')::bigint / 1000000.0) AT TIME ZONE 'UTC') is not null| + + {:ok, translated} = Sql.translate(:bq_sql, :pg_sql, bq_query) + assert Sql.Parser.parse("postgres", translated) == Sql.Parser.parse("postgres", pg_query) + end + + test "CTE translation with cross join" do + bq_query = ~s""" + with edge_logs as ( + select t.timestamp, t.id, t.event_message, t.metadata + from `cloudflare.logs.prod` t + cross join unnest(metadata) as m + ) + select id, timestamp, event_message, request.method, request.path, response.status_code + from edge_logs + cross join unnest(metadata) as m + cross join unnest(m.request) as request + cross join unnest(m.response) as response + """ + + pg_query = ~s""" + with edge_logs as ( + select + (t.body -> 'timestamp') as timestamp, + (t.body -> 'id') as id, + (t.body -> 'event_message') AS event_message, + (t.body -> 'metadata') as metadata + from "cloudflare.logs.prod" t + ) + SELECT + id AS id, + timestamp AS timestamp, + event_message AS event_message, + ( metadata #> '{request,method}') AS method, + ( metadata #> '{request,path}') AS path, + ( metadata #> '{response,status_code}') AS status_code + FROM edge_logs + """ + + {:ok, translated} = Sql.translate(:bq_sql, :pg_sql, bq_query) + assert Sql.Parser.parse("postgres", translated) == Sql.Parser.parse("postgres", pg_query) + end + + test "special handling of timestamp field and date_trunc : " do + bq_query = ~s""" + with edge_logs as (select t.timestamp from `cloudflare.logs.prod` t) + select timestamp_trunc(t.timestamp, day) as timestamp from edge_logs t + """ + + pg_query = ~s""" + with edge_logs as ( select (t.body -> 'timestamp') as timestamp from "cloudflare.logs.prod" t ) + SELECT date_trunc('day', (to_timestamp( t.timestamp::bigint / 1000000.0) AT TIME ZONE 'UTC') ) AS timestamp FROM edge_logs t + """ + + {:ok, translated} = Sql.translate(:bq_sql, :pg_sql, bq_query) + assert Sql.Parser.parse("postgres", translated) == Sql.Parser.parse("postgres", pg_query) + end + + test "special handling of timestamp field for binary ops" do + bq_query = ~s""" + with edge_logs as (select t.timestamp from `cloudflare.logs.prod` t) + select t.timestamp as timestamp from edge_logs t + where t.timestamp > '2023-08-05T09:00:00.000Z' + """ + + pg_query = ~s""" + with edge_logs as ( select (t.body -> 'timestamp') as timestamp from "cloudflare.logs.prod" t ) + SELECT t.timestamp AS timestamp FROM edge_logs t + where (to_timestamp(CAST(t.timestamp AS BIGINT) / 1000000.0) AT TIME ZONE 'UTC') > '2023-08-05T09:00:00.000Z' + """ + + {:ok, translated} = Sql.translate(:bq_sql, :pg_sql, bq_query) + assert Sql.Parser.parse("postgres", translated) == Sql.Parser.parse("postgres", pg_query) + end + + test "fields in binary op are cast to text only when equal" do + bq_query = ~s""" + with edge_logs as (select t.id from `cloudflare.logs.prod` t) + select t.id as id from edge_logs t + where t.id = '123' and t.id > 123 + """ + + pg_query = ~s""" + with edge_logs as ( select (t.body -> 'id') as id from "cloudflare.logs.prod" t ) + SELECT t.id AS id FROM edge_logs t + where (cast(t.id as jsonb) #>> '{}') = '123' and t.id > 123 + """ + + {:ok, translated} = Sql.translate(:bq_sql, :pg_sql, bq_query) + assert Sql.Parser.parse("postgres", translated) == Sql.Parser.parse("postgres", pg_query) + end + # functions metrics # test "APPROX_QUANTILES is translated" # tes "offset() and indexing is translated" diff --git a/test/logflare_web/controllers/endpoints_controller_test.exs b/test/logflare_web/controllers/endpoints_controller_test.exs index 695c02a48..17fb470b6 100644 --- a/test/logflare_web/controllers/endpoints_controller_test.exs +++ b/test/logflare_web/controllers/endpoints_controller_test.exs @@ -1,6 +1,8 @@ defmodule LogflareWeb.EndpointsControllerTest do use LogflareWeb.ConnCase alias Logflare.SingleTenant + alias Logflare.Backends + alias Logflare.Source describe "query" do setup :set_mimic_global @@ -152,7 +154,12 @@ defmodule LogflareWeb.EndpointsControllerTest do end describe "single tenant supabase mode" do - TestUtils.setup_single_tenant(seed_user: true, supabase_mode: true, backend_type: :postgres) + TestUtils.setup_single_tenant( + seed_user: true, + supabase_mode: true, + backend_type: :postgres, + pg_schema: "my_schema" + ) setup do SingleTenant.create_supabase_sources() @@ -171,16 +178,112 @@ defmodule LogflareWeb.EndpointsControllerTest do assert conn.halted == false end - test "GET a basic sandboxed query with fromt able", %{conn: conn, user: user} do + test "GET a basic sandboxed query with fromt able", %{conn: initial_conn, user: user} do + for source <- Logflare.Repo.all(Source) do + Backends.ingest_logs( + [%{"event_message" => "some message", "project" => "default"}], + source + ) + end + + :timer.sleep(2000) + + params = %{ + iso_timestamp_start: + DateTime.utc_now() |> DateTime.add(-3, :day) |> DateTime.to_iso8601(), + project: "default", + project_tier: "ENTERPRISE", + sql: "select timestamp, event_message, metadata from edge_logs" + } + conn = - conn + initial_conn |> put_req_header("x-api-key", user.api_key) - |> get( - ~p"/endpoints/query/logs.all?#{%{sql: ~s(select 'hello' as world from edge_logs)}}" - ) + |> get(~p"/endpoints/query/logs.all?#{params}") + + assert [%{"event_message" => "some message", "timestamp" => timestamp}] = + json_response(conn, 200)["result"] + + # render as unix microsecond + assert inspect(timestamp) |> String.length() == 16 + assert "16" <> _ = inspect(timestamp) + assert conn.halted == false + + # test a logs ui query + params = %{ + iso_timestamp_start: + DateTime.utc_now() |> DateTime.add(-3, :day) |> DateTime.to_iso8601(), + project: "default", + project_tier: "ENTERPRISE", + sql: + "select id, timestamp, event_message, request.method, request.path, response.status_code from edge_logs cross join unnest(metadata) as m cross join unnest(m.request) as request cross join unnest(m.response) as response limit 100 " + } + + conn = + initial_conn + |> put_req_header("x-api-key", user.api_key) + |> get(~p"/endpoints/query/logs.all?#{params}") + + assert [%{"event_message" => "some message", "id" => log_id}] = + json_response(conn, 200)["result"] + + assert conn.halted == false + + # different project filter + params = %{ + iso_timestamp_start: + DateTime.utc_now() |> DateTime.add(-3, :day) |> DateTime.to_iso8601(), + project: "other", + project_tier: "ENTERPRISE", + sql: + "select id, timestamp, event_message, request.method, request.path, response.status_code from edge_logs cross join unnest(metadata) as m cross join unnest(m.request) as request cross join unnest(m.response) as response limit 100 " + } + + conn = + initial_conn + |> put_req_header("x-api-key", user.api_key) + |> get(~p"/endpoints/query/logs.all?#{params}") assert [] = json_response(conn, 200)["result"] assert conn.halted == false + + # log chart sql + params = %{ + iso_timestamp_start: + DateTime.utc_now() |> DateTime.add(-3, :day) |> DateTime.to_iso8601(), + project: "default", + project_tier: "ENTERPRISE", + sql: + "\nSELECT\n-- event-chart\n timestamp_trunc(t.timestamp, minute) as timestamp,\n count(t.timestamp) as count\nFROM\n edge_logs t\n cross join unnest(t.metadata) as metadata \n cross join unnest(metadata.request) as request \n cross join unnest(metadata.response) as response\n where t.timestamp > '2023-08-05T09:00:00.000Z'\nGROUP BY\ntimestamp\nORDER BY\n timestamp ASC\n" + } + + conn = + initial_conn + |> put_req_header("x-api-key", user.api_key) + |> get(~p"/endpoints/query/logs.all?#{params}") + + assert [%{"count" => 1}] = json_response(conn, 200)["result"] + assert conn.halted == false + + # log chart sql + params = %{ + iso_timestamp_start: + DateTime.utc_now() |> DateTime.add(-3, :day) |> DateTime.to_iso8601(), + project: "default", + project_tier: "ENTERPRISE", + sql: + "select id, timestamp, event_message, metadata from edge_logs where id = '#{log_id}' limit 1" + } + + conn = + initial_conn + |> put_req_header("x-api-key", user.api_key) + |> get(~p"/endpoints/query/logs.all?#{params}") + + assert [%{"event_message" => "some message", "id" => ^log_id}] = + json_response(conn, 200)["result"] + + assert conn.halted == false end end end diff --git a/test/support/test_utils.ex b/test/support/test_utils.ex index 00532dec4..82f1d68fb 100644 --- a/test/support/test_utils.ex +++ b/test/support/test_utils.ex @@ -24,7 +24,8 @@ defmodule Logflare.TestUtils do seed_user: false, supabase_mode: false, bigquery_project_id: random_string(), - backend_type: :bigquery + backend_type: :bigquery, + pg_schema: nil }) quote do @@ -58,7 +59,7 @@ defmodule Logflare.TestUtils do end end - defp setup_single_tenant_backend(%{backend_type: :postgres}) do + defp setup_single_tenant_backend(%{backend_type: :postgres, pg_schema: schema}) do quote do setup do %{username: username, password: password, database: database, hostname: hostname} = @@ -66,7 +67,12 @@ defmodule Logflare.TestUtils do url = "postgresql://#{username}:#{password}@#{hostname}/#{database}" previous = Application.get_env(:logflare, :postgres_backend_adapter) - Application.put_env(:logflare, :postgres_backend_adapter, url: url) + + Application.put_env(:logflare, :postgres_backend_adapter, + url: url, + schema: unquote(schema) + ) + on_exit(fn -> Application.put_env(:logflare, :postgres_backend_adapter, previous) end) :ok end