Ciężko uwierzyć, ale opcja --all
w komendzie messenger:consume
nie jest dostępna aż do Symfony 7.1 — w starszych wersjach konieczne jest podanie jawnej listy transportów przy wywoływaniu komendy. Ale jeśli potrzebujesz tej opcji, to można ją w łatwy sposób przenieść do 6.4!
Okoliczności
W GetResponse przygotowujemy obecnie nowe podejście dla workerów AMQP, ponieważ to, które mamy, nie jest już odpowiednie. Zdecydowaliśmy się użyć Symfony Messenger z automatycznym skalowaniem opartym o KEDA. W zwykłej aplikacji Symfony, mając kontrolę nad instancją AMQP, bardzo łatwo wszystko skonfigurować, jednak w naszym przypadku najtrudniejsze jest to, że musimy dopasować się do istniejącej infrastruktury, a nasza aplikacja nie jest właścicielem serwera AMQP (jest tylko jednym z konsumentów). Oznacza to, że nie możemy używać automatycznej konfiguracji, kolejek dead letter, ani opóźnionych wiadomości i musimy wprowadzić transporty z pełną kompatybilnością wsteczną dla payloadów wiadomości. Trzeba było wprowadzić kilka niestandardowych rozwiązań, ale już jesteśmy prawie u celu!
Tak czy inaczej, ponieważ nasz serwer AMQP korzysta z wielu exchange’ów, Symfony Messenger musi być skonfigurowany z wieloma transportami. To prowadzi nas do jednej małej niedogodności: musimy albo określić listę transportów podczas wykonywania polecenia messenger:consume
, albo pozostawić ją pustą i interaktywnie wybrać transport z listy wyświetlanej w CLI.
Ale chcemy konsumować wszystkie transporty, przynajmniej lokalnie lub w instancjach testowych (na produkcji zgrupujemy transporty i kolejki, aby można je było skalować). Jak to osiągnąć?
--all
: opcja Schrödingera
Na szczęście kilka miesięcy temu został zmerdżowany pull request, który wprowadził opcję --all
do komendy messenger:consume
. Ale są z tym 2 problemy: ze względu na okres zamrożenia funkcji w wersji 7.0, należało go przenieść do wersji 7.1 (która w momencie pisania tego tekstu nie została jeszcze wydana), ale nawet gdyby ta zmiana pojawiła się nieco wcześniej, nasza aplikacja nadal korzysta z Symfony 6.4 🤷♂️. Jest to coś, nad czym musimy w pewnym momencie popracować, ale nie stanie się to teraz.
Ostatecznie wygląda na to, że jest to dla nas opcja Schrödingera – jest, ale jednocześnie nie jest dostępna.
Implementacja --all
w Symfony 6.4
Zrobiłem dzisiaj mały proof of concept i wygląda na to, że możliwe jest łatwe przeniesienie opcji --all
do Symfony 6.4. Aby wprowadzić tę opcję w poleceniu messenger:consume
musimy rozszerzyć wbudowane polecenie Symfony:
<?php
declare(strict_types=1);
namespace Codito\App\Cli;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Messenger\Command\ConsumeMessagesCommand as SymfonyConsumeMessagesCommand;
/**
* This is only a proxy to original `messenger:consume` command, with `--all` option ported from Symfony 7.1,
* command's FQCN is overridden dynamically in compiler pass ({@see OverrideMessengerConsumeCommandCompilerPass}).
*
* @see https://github.com/symfony/symfony/pull/52411 @TODO Remove this workaround when Symfony 7.1 is used
*/
#[AsCommand(name: 'messenger:consume', description: 'Wrapper for Symfony\'s command that adds `--all` option')]
class ConsumeMessagesCommand extends SymfonyConsumeMessagesCommand
{
/** @var list<string> */
private array $transportNames = [];
/**
* @param list<string> $transportNames
*/
public function setTransportNames(array $transportNames): void
{
$this->transportNames = $transportNames;
}
protected function configure(): void
{
parent::configure();
$this->addOption('all', 'a', InputOption::VALUE_NONE, 'Consume messages from all receivers');
}
protected function execute(InputInterface $input, OutputInterface $output): int
{
if ($input->getOption('all')) {
if ([] === $this->transportNames) {
throw new \RuntimeException('There are no transports configured');
}
$input->setArgument('receivers', $this->transportNames);
}
return parent::execute($input, $output);
}
protected function interact(InputInterface $input, OutputInterface $output): void
{
if ($input->getOption('all')) {
return;
}
parent::interact($input, $output);
}
}
OK, ale to polecenie jest już zarejestrowane w aplikacji Symfony, ponieważ framework bundle udostępnia je od razu po zainstalowaniu komponentu Messenger. Jak wskazać Symfony, aby używało naszej wersji komendy? Compiler pass na ratunek!
<?php
declare(strict_types=1);
namespace Codito\App\DependencyInjection\Compiler;
use Codito\App\Cli\ConsumeMessagesCommand;
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
use Symfony\Component\DependencyInjection\ContainerBuilder;
/**
* This compiler pass is responsible for registering enhanced version of `messenger:consume` command.
*
* @see https://github.com/symfony/symfony/pull/52411 @TODO Remove this workaround when Symfony 7.1 is used
*/
class OverrideMessengerConsumeCommandCompilerPass implements CompilerPassInterface
{
public function process(ContainerBuilder $container)
{
$symfonyConsumeCommandDefinition = $container->getDefinition('console.command.messenger_consume_messages');
$symfonyConsumeCommandDefinition->setClass(ConsumeMessagesCommand::class);
/** @uses ConsumeMessagesCommand::setTransportNames() */
$symfonyConsumeCommandDefinition->addMethodCall(
'setTransportNames',
[$symfonyConsumeCommandDefinition->getArgument(4)]
);
}
}
Ten compiler pass robi dwie rzeczy:
- Zastępuje parametr
class
w definicji usługiconsole.command.messenger_consume_messages
, pod którą komendamessenger:consume
jest zarejestrowana w kontenerze DI. Oznacza to, że gdy komenda jest wykonywana i powiązana z nią usługa zostanie pobrana z kontenera DI, Symfony zainicjalizuje ją przy użyciu naszej rozszerzonej klasy. - Rejestruje wywołanie metody w naszej niestandardowej usłudze
Codito\App\Cli\ConsumeMessagesCommand
, co zaskutkuje wstrzyknięciem listy skonfigurowanych transportów, abyśmy mogli automatycznie z niej skorzystać, gdy została użyta opcja--all
. Wartość, która zostanie wstrzyknięta do komendy, jest pobierana z oryginalnej definicji komendy (i jest określana w jeszcze innym przebiegu kompilatora).
Aby to zadziałało, musimy oczywiście zarejestrować compiler pass w naszym kernelu:
<?php
declare(strict_types=1);
namespace Codito\App;
use Codito\App\DependencyInjection\Compiler\OverrideMessengerConsumeCommandCompilerPass;
use Symfony\Bundle\FrameworkBundle\Kernel\MicroKernelTrait;
use Symfony\Component\Config\Loader\LoaderInterface;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\HttpKernel\Kernel as SymfonyKernel;
class Kernel extends SymfonyKernel
{
use MicroKernelTrait;
/**
* Used in {@see MicroKernelTrait::registerContainerConfiguration()}
*/
protected function configureContainer(ContainerBuilder $container, LoaderInterface $loader): void
{
$loader->load($this->getConfigDir() . '/services.yml');
$container->addCompilerPass(new OverrideMessengerConsumeCommandCompilerPass());
}
}
Dzięki temu zabiegowi, uruchamiając bin/console messenger:consume --all
, automatycznie stosujemy kompletną listę transportów i natychmiast pobieramy wiadomości z nich wszystkich:
[OK] Consuming messages from transports "poc1, poc2, poc3".
// The worker will automatically exit once it has received a stop signal via the messenger:stop-workers command.
// Quit the worker with CONTROL-C.
// Re-run the command with a -vv option to see logs about consumed messages
Podsumowanie
Kontener DI w Symfony jest niezwykle elastyczny. Fakt, że pracujesz z definicjami, a nie z konkretnymi instancjami usług, pozwala Ci dynamicznie podłączyć się do procesu kompilacji i robić co tylko chcesz — jedynym ograniczeniem jest Twoja wyobraźnia!
Pamiętaj, że proces kompilacji kontenera DI składa się z wielu etapów, więc zachowaj ostrożność podczas rejestracji Twojego compiler passa. Kolejność ma znaczenie, zarówno pod względem etapów kompilacji, jak i priorytetów w ramach każdego kroku.