Skip to content
This repository has been archived by the owner on Jan 17, 2022. It is now read-only.

Commit

Permalink
feat(messenger): Add Symfony Messenger integration
Browse files Browse the repository at this point in the history
Asynchronously dispatch messages using Symfony Messanger API and Swoole inter-process communication (Swoole task) without serialization.

Notice: Symfony messenger `messenger:consume-messages` command is not supported. Dispatched messages are handled in task worker processes of Swoole server.

Usage:

1. Install `symfony/messenger` package via composer
2. Enable task workers inside configuration

    Example:

    ```yaml
    # config/packages/swoole.yaml
    swoole:
        http_server:
            ...
            settings:
                ...
                task_worker_count: auto
    ```
3. Configure swoole messenger transport

   Example:

   ```yaml
   # config/packages/messenger.yaml
   framework:
       messenger:
           transports:
               swoole: swoole://task
           routing:
               '*': swoole
   ```
4. (optional) Follow official [symfony messenger guide](https://symfony.com/doc/current/messenger.html) to define message object and its handler

Relates to #4
  • Loading branch information
k911 committed May 8, 2019
1 parent 3997d23 commit 4ffe654
Show file tree
Hide file tree
Showing 24 changed files with 792 additions and 103 deletions.
2 changes: 2 additions & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,10 @@
"swoole/ide-helper": "^4.3",
"symfony/debug": "^4.2",
"symfony/framework-bundle": "^4.2.4",
"symfony/messenger": "^4.2",
"symfony/monolog-bridge": "^4.2",
"symfony/monolog-bundle": "^3.3",
"symfony/serializer": "^4.2",
"symfony/twig-bundle": "^4.2",
"symfony/var-dumper": "^4.2",
"symfony/yaml": "^4.2"
Expand Down
356 changes: 253 additions & 103 deletions composer.lock

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ public function getConfigTreeBuilder(): TreeBuilder
->integerNode('reactor_count')
->min(1)
->end()
->scalarNode('task_worker_count')
->defaultNull()
->end()
->end()
->end() // settings
->end()
Expand Down
22 changes: 22 additions & 0 deletions src/Bridge/Symfony/Bundle/DependencyInjection/SwooleExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,19 @@
use K911\Swoole\Bridge\Symfony\HttpFoundation\RequestFactoryInterface;
use K911\Swoole\Bridge\Symfony\HttpFoundation\TrustAllProxiesRequestHandler;
use K911\Swoole\Bridge\Symfony\HttpKernel\DebugHttpKernelRequestHandler;
use K911\Swoole\Bridge\Symfony\Messenger\SwooleServerTaskTransportFactory;
use K911\Swoole\Bridge\Symfony\Messenger\SwooleServerTaskTransportHandler;
use K911\Swoole\Server\Config\Socket;
use K911\Swoole\Server\Config\Sockets;
use K911\Swoole\Server\Configurator\ConfiguratorInterface;
use K911\Swoole\Server\HttpServer;
use K911\Swoole\Server\HttpServerConfiguration;
use K911\Swoole\Server\RequestHandler\AdvancedStaticFilesServer;
use K911\Swoole\Server\RequestHandler\RequestHandlerInterface;
use K911\Swoole\Server\Runtime\BootableInterface;
use K911\Swoole\Server\Runtime\HMR\HotModuleReloaderInterface;
use K911\Swoole\Server\Runtime\HMR\InotifyHMR;
use K911\Swoole\Server\TaskHandler\TaskHandlerInterface;
use K911\Swoole\Server\WorkerHandler\HMRWorkerStartHandler;
use K911\Swoole\Server\WorkerHandler\WorkerStartHandlerInterface;
use Symfony\Component\Config\FileLocator;
Expand All @@ -28,6 +32,8 @@
use Symfony\Component\DependencyInjection\Loader\YamlFileLoader;
use Symfony\Component\DependencyInjection\Reference;
use Symfony\Component\HttpKernel\DependencyInjection\Extension;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;

final class SwooleExtension extends Extension implements PrependExtensionInterface
{
Expand Down Expand Up @@ -58,6 +64,10 @@ public function load(array $configs, ContainerBuilder $container): void
$config = $this->processConfiguration($configuration, $configs);

$this->registerHttpServer($config['http_server'], $container);

if (\interface_exists(TransportFactoryInterface::class)) {
$this->registerSwooleServerTransportConfiguration($container);
}
}

/**
Expand All @@ -78,6 +88,18 @@ private function registerHttpServer(array $config, ContainerBuilder $container):
$this->registerHttpServerConfiguration($config, $container);
}

private function registerSwooleServerTransportConfiguration(ContainerBuilder $container): void
{
$container->register(SwooleServerTaskTransportFactory::class)
->addTag('messenger.transport_factory')
->addArgument(new Reference(HttpServer::class));

$container->register(SwooleServerTaskTransportHandler::class)
->addArgument(new Reference(MessageBusInterface::class))
->addArgument(new Reference(SwooleServerTaskTransportHandler::class.'.inner'))
->setDecoratedService(TaskHandlerInterface::class, null, -10);
}

private function registerHttpServerConfiguration(array $config, ContainerBuilder $container): void
{
[
Expand Down
10 changes: 10 additions & 0 deletions src/Bridge/Symfony/Bundle/Resources/config/services.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ services:
'K911\Swoole\Server\LifecycleHandler\ServerManagerStopHandlerInterface':
class: K911\Swoole\Server\LifecycleHandler\NoOpServerManagerStopHandler

'K911\Swoole\Server\TaskHandler\TaskHandlerInterface':
class: K911\Swoole\Server\TaskHandler\NoOpTaskHandler

'K911\Swoole\Server\TaskHandler\TaskFinishedHandlerInterface':
class: K911\Swoole\Server\TaskHandler\NoOpTaskFinishedHandler

'K911\Swoole\Server\Api\ApiServerClientFactory':

'K911\Swoole\Server\Api\ApiServerClient':
Expand Down Expand Up @@ -78,6 +84,10 @@ services:

'K911\Swoole\Server\Configurator\WithWorkerStartHandler':

'K911\Swoole\Server\Configurator\WithTaskHandler':

'K911\Swoole\Server\Configurator\WithTaskFinishedHandler':

'K911\Swoole\Server\Configurator\CallableChainConfiguratorFactory':

'K911\Swoole\Server\Api\WithApiServerConfiguration':
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<?php

declare(strict_types=1);

namespace K911\Swoole\Bridge\Symfony\Messenger\Exception;

final class ReceiverNotAvailableException extends \RuntimeException
{
public static function make(): self
{
throw new self('Swoole Server Task transport does not support Receiver. Messages sent via Swoole Server Task transport are dispatched inside task worker processes.');
}
}
28 changes: 28 additions & 0 deletions src/Bridge/Symfony/Messenger/SwooleServerTaskReceiver.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<?php

declare(strict_types=1);

namespace K911\Swoole\Bridge\Symfony\Messenger;

use K911\Swoole\Bridge\Symfony\Messenger\Exception\ReceiverNotAvailableException;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;

final class SwooleServerTaskReceiver implements ReceiverInterface
{
/**
* {@inheritdoc}
*/
public function receive(callable $handler): void
{
// noop
throw ReceiverNotAvailableException::make();
}

/**
* {@inheritdoc}
*/
public function stop(): void
{
// noop
}
}
29 changes: 29 additions & 0 deletions src/Bridge/Symfony/Messenger/SwooleServerTaskSender.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
<?php

declare(strict_types=1);

namespace K911\Swoole\Bridge\Symfony\Messenger;

use K911\Swoole\Server\HttpServer;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;

final class SwooleServerTaskSender implements SenderInterface
{
private $httpServer;

public function __construct(HttpServer $httpServer)
{
$this->httpServer = $httpServer;
}

/**
* {@inheritdoc}
*/
public function send(Envelope $envelope): Envelope
{
$this->httpServer->dispatchTask($envelope);

return $envelope;
}
}
45 changes: 45 additions & 0 deletions src/Bridge/Symfony/Messenger/SwooleServerTaskTransport.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
<?php

declare(strict_types=1);

namespace K911\Swoole\Bridge\Symfony\Messenger;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\TransportInterface;

final class SwooleServerTaskTransport implements TransportInterface
{
private $receiver;
private $sender;

public function __construct(SwooleServerTaskReceiver $receiver, SwooleServerTaskSender $sender)
{
$this->receiver = $receiver;
$this->sender = $sender;
}

/**
* {@inheritdoc}
*/
public function receive(callable $handler): void
{
dump($handler);
$this->receiver->receive($handler);
}

/**
* {@inheritdoc}
*/
public function stop(): void
{
$this->receiver->stop();
}

/**
* {@inheritdoc}
*/
public function send(Envelope $envelope): Envelope
{
return $this->sender->send($envelope);
}
}
35 changes: 35 additions & 0 deletions src/Bridge/Symfony/Messenger/SwooleServerTaskTransportFactory.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
<?php

declare(strict_types=1);

namespace K911\Swoole\Bridge\Symfony\Messenger;

use K911\Swoole\Server\HttpServer;
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;

final class SwooleServerTaskTransportFactory implements TransportFactoryInterface
{
private $server;

public function __construct(HttpServer $server)
{
$this->server = $server;
}

public function createTransport(string $dsn, array $options): TransportInterface
{
return new SwooleServerTaskTransport(
new SwooleServerTaskReceiver(),
new SwooleServerTaskSender($this->server)
);
}

/**
* {@inheritdoc}
*/
public function supports(string $dsn, array $options): bool
{
return 0 === \mb_strpos($dsn, 'swoole://task');
}
}
35 changes: 35 additions & 0 deletions src/Bridge/Symfony/Messenger/SwooleServerTaskTransportHandler.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
<?php

declare(strict_types=1);

namespace K911\Swoole\Bridge\Symfony\Messenger;

use Assert\Assertion;
use K911\Swoole\Server\TaskHandler\TaskHandlerInterface;
use Swoole\Server;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;

final class SwooleServerTaskTransportHandler implements TaskHandlerInterface
{
private $bus;
private $decorated;

public function __construct(MessageBusInterface $bus, ?TaskHandlerInterface $decorated = null)
{
$this->bus = $bus;
$this->decorated = $decorated;
}

public function handle(Server $server, int $taskId, int $fromId, $data): void
{
Assertion::isInstanceOf($data, Envelope::class);
/* @var $data Envelope */
$this->bus->dispatch($data->with(new ReceivedStamp()));

if ($this->decorated instanceof TaskHandlerInterface) {
$this->decorated->handle($server, $taskId, $fromId, $data);
}
}
}
31 changes: 31 additions & 0 deletions src/Server/Configurator/WithTaskFinishedHandler.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
<?php

declare(strict_types=1);

namespace K911\Swoole\Server\Configurator;

use K911\Swoole\Server\HttpServerConfiguration;
use K911\Swoole\Server\TaskHandler\TaskFinishedHandlerInterface;
use Swoole\Http\Server;

final class WithTaskFinishedHandler implements ConfiguratorInterface
{
private $handler;
private $configuration;

public function __construct(TaskFinishedHandlerInterface $handler, HttpServerConfiguration $configuration)
{
$this->handler = $handler;
$this->configuration = $configuration;
}

/**
* {@inheritdoc}
*/
public function configure(Server $server): void
{
if ($this->configuration->getTaskWorkerCount() > 0) {
$server->on('finish', [$this->handler, 'handle']);
}
}
}
31 changes: 31 additions & 0 deletions src/Server/Configurator/WithTaskHandler.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
<?php

declare(strict_types=1);

namespace K911\Swoole\Server\Configurator;

use K911\Swoole\Server\HttpServerConfiguration;
use K911\Swoole\Server\TaskHandler\TaskHandlerInterface;
use Swoole\Http\Server;

final class WithTaskHandler implements ConfiguratorInterface
{
private $handler;
private $configuration;

public function __construct(TaskHandlerInterface $handler, HttpServerConfiguration $configuration)
{
$this->handler = $handler;
$this->configuration = $configuration;
}

/**
* {@inheritdoc}
*/
public function configure(Server $server): void
{
if ($this->configuration->getTaskWorkerCount() > 0) {
$server->on('task', [$this->handler, 'handle']);
}
}
}
5 changes: 5 additions & 0 deletions src/Server/HttpServer.php
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ public function getServer(): Server
return $this->server;
}

public function dispatchTask($data): void
{
$this->getServer()->task($data);
}

/**
* @return Listener[]
*/
Expand Down
Loading

0 comments on commit 4ffe654

Please sign in to comment.