diff --git a/src/protocol/AProtocol.php b/src/protocol/AProtocol.php index d053adc..bb37276 100644 --- a/src/protocol/AProtocol.php +++ b/src/protocol/AProtocol.php @@ -27,6 +27,14 @@ abstract class AProtocol public ServerState $serverState; + /** + * Multiple RUN statements in transaction generates "streams" which are pulled or discarded + * We are keeping track of open streams to keep valid Server State + * @link https://www.neo4j.com/docs/bolt/current/bolt/message/#transaction + * @var int + */ + protected int $openStreams = 0; + /** * @throws UnpackException * @throws PackException @@ -141,8 +149,10 @@ public function getResponse(): Response foreach (($this->serverStateTransition ?? []) as $transition) { if ($transition[0] === $serverState && $transition[1] === $response->message && $transition[2] === $response->signature) { $this->serverState = $transition[3]; - if ($response->signature === Signature::SUCCESS && ($response->content['has_more'] ?? false)) - $this->serverState = ($serverState === ServerState::TX_READY || $serverState === ServerState::TX_STREAMING) ? ServerState::TX_STREAMING : ServerState::STREAMING; + if (in_array($response->message, [Message::PULL, Message::DISCARD], true) + && $response->signature === Signature::SUCCESS + && (($response->content['has_more'] ?? false) || $this->openStreams)) + $this->serverState = $this->serverState === ServerState::TX_READY ? ServerState::TX_STREAMING : ServerState::STREAMING; if ($transition[3] === ServerState::DEFUNCT) $this->connection->disconnect(); break; diff --git a/src/protocol/v3/CommitMessage.php b/src/protocol/v3/CommitMessage.php index e5c6978..cece762 100644 --- a/src/protocol/v3/CommitMessage.php +++ b/src/protocol/v3/CommitMessage.php @@ -3,7 +3,7 @@ namespace Bolt\protocol\v3; use Bolt\enum\Message; -use Bolt\protocol\{V3, V4, V4_1, V4_2, V4_3, V4_4, V5, V5_1, V5_2, V5_3, V5_4}; +use Bolt\protocol\{Response, V3, V4, V4_1, V4_2, V4_3, V4_4, V5, V5_1, V5_2, V5_3, V5_4}; use Bolt\error\BoltException; trait CommitMessage @@ -21,4 +21,16 @@ public function commit(): V3|V4|V4_1|V4_2|V4_3|V4_4|V5|V5_1|V5_2|V5_3|V5_4 $this->pipelinedMessages[] = Message::COMMIT; return $this; } + + /** + * Read COMMIT response + * @return iterable + * @throws BoltException + */ + protected function _commit(): iterable + { + $this->openStreams = 0; + $content = $this->read($signature); + yield new Response(Message::COMMIT, $signature, $content); + } } diff --git a/src/protocol/v3/RollbackMessage.php b/src/protocol/v3/RollbackMessage.php index b84133a..fa67d54 100644 --- a/src/protocol/v3/RollbackMessage.php +++ b/src/protocol/v3/RollbackMessage.php @@ -3,7 +3,7 @@ namespace Bolt\protocol\v3; use Bolt\enum\Message; -use Bolt\protocol\{V3, V4, V4_1, V4_2, V4_3, V4_4, V5, V5_1, V5_2, V5_3, V5_4}; +use Bolt\protocol\{Response, V3, V4, V4_1, V4_2, V4_3, V4_4, V5, V5_1, V5_2, V5_3, V5_4}; use Bolt\error\BoltException; trait RollbackMessage @@ -21,4 +21,16 @@ public function rollback(): V3|V4|V4_1|V4_2|V4_3|V4_4|V5|V5_1|V5_2|V5_3|V5_4 $this->pipelinedMessages[] = Message::ROLLBACK; return $this; } + + /** + * Read ROLLBACK response + * @return iterable + * @throws BoltException + */ + protected function _rollback(): iterable + { + $this->openStreams = 0; + $content = $this->read($signature); + yield new Response(Message::ROLLBACK, $signature, $content); + } } diff --git a/src/protocol/v3/RunMessage.php b/src/protocol/v3/RunMessage.php index b92ffdb..52b73d8 100644 --- a/src/protocol/v3/RunMessage.php +++ b/src/protocol/v3/RunMessage.php @@ -3,7 +3,7 @@ namespace Bolt\protocol\v3; use Bolt\enum\Message; -use Bolt\protocol\{V3, V4, V4_1, V4_2, V4_3, V4_4, V5, V5_1, V5_2, V5_3, V5_4}; +use Bolt\protocol\{Response, V3, V4, V4_1, V4_2, V4_3, V4_4, V5, V5_1, V5_2, V5_3, V5_4}; use Bolt\error\BoltException; trait RunMessage @@ -26,4 +26,17 @@ public function run(string $query, array $parameters = [], array $extra = []): V $this->pipelinedMessages[] = Message::RUN; return $this; } + + /** + * Read RUN response + * @return iterable + * @throws BoltException + */ + protected function _run(): iterable + { + $content = $this->read($signature); + if (array_key_exists('qid', $content)) + $this->openStreams++; + yield new Response(Message::RUN, $signature, $content); + } } diff --git a/src/protocol/v4/DiscardMessage.php b/src/protocol/v4/DiscardMessage.php index dc92827..fdce66c 100644 --- a/src/protocol/v4/DiscardMessage.php +++ b/src/protocol/v4/DiscardMessage.php @@ -3,7 +3,8 @@ namespace Bolt\protocol\v4; use Bolt\enum\Message; -use Bolt\protocol\{V4, V4_1, V4_2, V4_3, V4_4, V5, V5_1, V5_2, V5_3, V5_4}; +use Bolt\enum\Signature; +use Bolt\protocol\{Response, V4, V4_1, V4_2, V4_3, V4_4, V5, V5_1, V5_2, V5_3, V5_4}; use Bolt\error\BoltException; trait DiscardMessage @@ -24,4 +25,17 @@ public function discard(array $extra = []): V4|V4_1|V4_2|V4_3|V4_4|V5|V5_1|V5_2| $this->pipelinedMessages[] = Message::DISCARD; return $this; } + + /** + * Read DISCARD response + * @return iterable + * @throws BoltException + */ + protected function _discard(): iterable + { + $content = $this->read($signature); + if (!($content['has_more'] ?? false) && $this->openStreams) + $this->openStreams = $signature === Signature::SUCCESS ? $this->openStreams - 1 : 0; + yield new Response(Message::DISCARD, $signature, $content); + } } diff --git a/src/protocol/v4/PullMessage.php b/src/protocol/v4/PullMessage.php index 82be37b..39aa70d 100644 --- a/src/protocol/v4/PullMessage.php +++ b/src/protocol/v4/PullMessage.php @@ -26,14 +26,20 @@ public function pull(array $extra = []): V4|V4_1|V4_2|V4_3|V4_4|V5|V5_1|V5_2|V5_ } /** - * Read PULL response - * @return array + * Read PULL responses + * @return iterable * @throws BoltException */ protected function _pull(): iterable { do { $content = $this->read($signature); + if (!($content['has_more'] ?? false) && $this->openStreams) { + if ($signature === Signature::SUCCESS) + $this->openStreams--; + elseif ($signature === Signature::FAILURE) + $this->openStreams = 0; + } yield new Response(Message::PULL, $signature, $content); } while ($signature == Signature::RECORD); } diff --git a/src/protocol/v5_1/ServerStateTransition.php b/src/protocol/v5_1/ServerStateTransition.php index c9a211d..2e0edb4 100644 --- a/src/protocol/v5_1/ServerStateTransition.php +++ b/src/protocol/v5_1/ServerStateTransition.php @@ -22,10 +22,8 @@ trait ServerStateTransition [ServerState::READY, Message::ROUTE, Signature::SUCCESS, ServerState::READY], [ServerState::READY, Message::RESET, Signature::SUCCESS, ServerState::READY], [ServerState::READY, Message::RESET, Signature::FAILURE, ServerState::DEFUNCT], - [ServerState::STREAMING, Message::PULL, Signature::SUCCESS, ServerState::STREAMING], [ServerState::STREAMING, Message::PULL, Signature::SUCCESS, ServerState::READY], [ServerState::STREAMING, Message::PULL, Signature::FAILURE, ServerState::FAILED], - [ServerState::STREAMING, Message::DISCARD, Signature::SUCCESS, ServerState::STREAMING], [ServerState::STREAMING, Message::DISCARD, Signature::SUCCESS, ServerState::READY], [ServerState::STREAMING, Message::DISCARD, Signature::FAILURE, ServerState::FAILED], [ServerState::STREAMING, Message::RESET, Signature::SUCCESS, ServerState::READY], @@ -40,10 +38,8 @@ trait ServerStateTransition [ServerState::TX_READY, Message::RESET, Signature::FAILURE, ServerState::DEFUNCT], [ServerState::TX_STREAMING, Message::RUN, Signature::SUCCESS, ServerState::TX_STREAMING], [ServerState::TX_STREAMING, Message::RUN, Signature::FAILURE, ServerState::FAILED], - [ServerState::TX_STREAMING, Message::PULL, Signature::SUCCESS, ServerState::TX_STREAMING], [ServerState::TX_STREAMING, Message::PULL, Signature::SUCCESS, ServerState::TX_READY], [ServerState::TX_STREAMING, Message::PULL, Signature::FAILURE, ServerState::FAILED], - [ServerState::TX_STREAMING, Message::DISCARD, Signature::SUCCESS, ServerState::TX_STREAMING], [ServerState::TX_STREAMING, Message::DISCARD, Signature::SUCCESS, ServerState::TX_READY], [ServerState::TX_STREAMING, Message::DISCARD, Signature::FAILURE, ServerState::FAILED], [ServerState::TX_STREAMING, Message::RESET, Signature::SUCCESS, ServerState::READY],