Skip to content

Commit

Permalink
Apply suggestions from code review
Browse files Browse the repository at this point in the history
  • Loading branch information
josevalim authored Jul 18, 2024
1 parent 3378d20 commit 236d7cc
Showing 1 changed file with 12 additions and 22 deletions.
34 changes: 12 additions & 22 deletions test/replication_connection_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -50,23 +50,17 @@ defmodule ReplicationTest do
{:noreply, [reply], pid}
end

@impl true
@doc """
This is part of the "stream_continuation" test and handles the COPY :done
state. It will start another stream right away by starting the replication
slot.
"""
# This is part of the "stream_continuation" test and handles the COPY :done
# state. It will start another stream right away by starting the replication
# slot.
def handle_data(:done, %{pid: pid, test: "stream_continuation"}) do
send(pid, {:done, System.unique_integer([:monotonic])})
query = "START_REPLICATION SLOT postgrex_test LOGICAL 0/0 (proto_version '1', publication_names 'postgrex_example')"

{:stream, query, [], pid}
end

@impl true
@doc """
This is part of the "stream_continuation" test and handles the COPY results.
"""
# This is part of the "stream_continuation" test and handles the COPY results.
def handle_data(msg, %{pid: pid, test: "stream_continuation"} = s) do
send(pid, {msg, System.unique_integer([:monotonic])})
{:noreply, [], s}
Expand Down Expand Up @@ -102,11 +96,8 @@ defmodule ReplicationTest do
{:query, query, {from, pid}}
end

@impl true
@doc """
This is part of the "stream_continuation" test and handles call that
triggers that chain of events.
"""
# This is part of the "stream_continuation" test and handles call that
# triggers that chain of events.
def handle_call({:query, query, %{test: "stream_continuation", next_query: _} = opts}, from, pid) do
{:query, query, Map.merge(opts, %{from: from, pid: pid})}
end
Expand All @@ -128,10 +119,7 @@ defmodule ReplicationTest do
{:noreply, pid}
end

@impl true
@doc """
Handles the result of the "stream_continuation" query call. It is the results of the slot creation.
"""
# Handles the result of the "stream_continuation" query call. It is the results of the slot creation.
def handle_result(results, %{from: from, test: "stream_continuation", next_query: next_query} = s) do
Postgrex.ReplicationConnection.reply(from, {:ok, results})
{:stream, next_query, [], Map.delete(s, :next_query)}
Expand Down Expand Up @@ -334,16 +322,18 @@ defmodule ReplicationTest do

query = "CREATE_REPLICATION_SLOT postgrex_test TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT"
next_query = "COPY repl_test TO STDOUT"
PR.call(context.repl, {:query, query, %{test: "stream_continuation", next_query: next_query}})
assert_received {:connect, _}

PR.call(
context.repl,
{:query, query, %{test: "stream_continuation", next_query: next_query}}
)

assert_receive {"42\t42\n", i1}, @timeout
assert_receive {"1\t1\n", i2} when i1 < i2, @timeout
assert_receive {:done, i3} when i2 < i3, @timeout
# Prior to allowing one stream to start after another, this would fail
assert_receive <<?k, _::64, _::64, _>>, @timeout
end

end

defp start_replication(repl) do
Expand Down

0 comments on commit 236d7cc

Please sign in to comment.