Ciężko uwierzyć, ale opcja --all w komendzie messenger:consume nie jest dostępna aż do Symfony 7.1 — w starszych wersjach konieczne jest podanie jawnej listy transportów przy wywoływaniu komendy. Ale jeśli potrzebujesz tej opcji, to można ją w łatwy sposób przenieść do 6.4!

Okoliczności

W GetResponse przygotowujemy obecnie nowe podejście dla workerów AMQP, ponieważ to, które mamy, nie jest już odpowiednie. Zdecydowaliśmy się użyć Symfony Messenger z automatycznym skalowaniem opartym o KEDA. W zwykłej aplikacji Symfony, mając kontrolę nad instancją AMQP, bardzo łatwo wszystko skonfigurować, jednak w naszym przypadku najtrudniejsze jest to, że musimy dopasować się do istniejącej infrastruktury, a nasza aplikacja nie jest właścicielem serwera AMQP (jest tylko jednym z konsumentów). Oznacza to, że nie możemy używać automatycznej konfiguracji, kolejek dead letter, ani opóźnionych wiadomości i musimy wprowadzić transporty z pełną kompatybilnością wsteczną dla payloadów wiadomości. Trzeba było wprowadzić kilka niestandardowych rozwiązań, ale już jesteśmy prawie u celu!

Tak czy inaczej, ponieważ nasz serwer AMQP korzysta z wielu exchange’ów, Symfony Messenger musi być skonfigurowany z wieloma transportami. To prowadzi nas do jednej małej niedogodności: musimy albo określić listę transportów podczas wykonywania polecenia messenger:consume, albo pozostawić ją pustą i interaktywnie wybrać transport z listy wyświetlanej w CLI.

Ale chcemy konsumować wszystkie transporty, przynajmniej lokalnie lub w instancjach testowych (na produkcji zgrupujemy transporty i kolejki, aby można je było skalować). Jak to osiągnąć?

--all: opcja Schrödingera

Na szczęście kilka miesięcy temu został zmerdżowany pull request, który wprowadził opcję --all do komendy messenger:consume. Ale są z tym 2 problemy: ze względu na okres zamrożenia funkcji w wersji 7.0, należało go przenieść do wersji 7.1 (która w momencie pisania tego tekstu nie została jeszcze wydana), ale nawet gdyby ta zmiana pojawiła się nieco wcześniej, nasza aplikacja nadal korzysta z Symfony 6.4 🤷‍♂️. Jest to coś, nad czym musimy w pewnym momencie popracować, ale nie stanie się to teraz.

Ostatecznie wygląda na to, że jest to dla nas opcja Schrödingera – jest, ale jednocześnie nie jest dostępna.

Implementacja --all w Symfony 6.4

Zrobiłem dzisiaj mały proof of concept i wygląda na to, że możliwe jest łatwe przeniesienie opcji --all do Symfony 6.4. Aby wprowadzić tę opcję w poleceniu messenger:consume musimy rozszerzyć wbudowane polecenie Symfony:

<?php

declare(strict_types=1);

namespace Codito\App\Cli;

use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Messenger\Command\ConsumeMessagesCommand as SymfonyConsumeMessagesCommand;

/**
 * This is only a proxy to original `messenger:consume` command, with `--all` option ported from Symfony 7.1,
 * command's FQCN is overridden dynamically in compiler pass ({@see OverrideMessengerConsumeCommandCompilerPass}).
 *
 * @see https://github.com/symfony/symfony/pull/52411 @TODO Remove this workaround when Symfony 7.1 is used
 */
#[AsCommand(name: 'messenger:consume', description: 'Wrapper for Symfony\'s command that adds `--all` option')]
class ConsumeMessagesCommand extends SymfonyConsumeMessagesCommand
{
    /** @var list<string> */
    private array $transportNames = [];

    /**
     * @param list<string> $transportNames
     */
    public function setTransportNames(array $transportNames): void
    {
        $this->transportNames = $transportNames;
    }

    protected function configure(): void
    {
        parent::configure();

        $this->addOption('all', 'a', InputOption::VALUE_NONE, 'Consume messages from all receivers');
    }

    protected function execute(InputInterface $input, OutputInterface $output): int
    {
        if ($input->getOption('all')) {
            if ([] === $this->transportNames) {
                throw new \RuntimeException('There are no transports configured');
            }

            $input->setArgument('receivers', $this->transportNames);
        }

        return parent::execute($input, $output);
    }

    protected function interact(InputInterface $input, OutputInterface $output): void
    {
        if ($input->getOption('all')) {
            return;
        }

        parent::interact($input, $output);
    }
}

OK, ale to polecenie jest już zarejestrowane w aplikacji Symfony, ponieważ framework bundle udostępnia je od razu po zainstalowaniu komponentu Messenger. Jak wskazać Symfony, aby używało naszej wersji komendy? Compiler pass na ratunek!

<?php

declare(strict_types=1);

namespace Codito\App\DependencyInjection\Compiler;

use Codito\App\Cli\ConsumeMessagesCommand;
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
use Symfony\Component\DependencyInjection\ContainerBuilder;

/**
 * This compiler pass is responsible for registering enhanced version of `messenger:consume` command.
 *
 * @see https://github.com/symfony/symfony/pull/52411 @TODO Remove this workaround when Symfony 7.1 is used
 */
class OverrideMessengerConsumeCommandCompilerPass implements CompilerPassInterface
{
    public function process(ContainerBuilder $container)
    {
        $symfonyConsumeCommandDefinition = $container->getDefinition('console.command.messenger_consume_messages');
        $symfonyConsumeCommandDefinition->setClass(ConsumeMessagesCommand::class);

        /** @uses ConsumeMessagesCommand::setTransportNames() */
        $symfonyConsumeCommandDefinition->addMethodCall(
            'setTransportNames',
            [$symfonyConsumeCommandDefinition->getArgument(4)]
        );
    }
}

Ten compiler pass robi dwie rzeczy:

  • Zastępuje parametr class w definicji usługi console.command.messenger_consume_messages, pod którą komenda messenger:consume jest zarejestrowana w kontenerze DI. Oznacza to, że gdy komenda jest wykonywana i powiązana z nią usługa zostanie pobrana z kontenera DI, Symfony zainicjalizuje ją przy użyciu naszej rozszerzonej klasy.
  • Rejestruje wywołanie metody w naszej niestandardowej usłudze Codito\App\Cli\ConsumeMessagesCommand, co zaskutkuje wstrzyknięciem listy skonfigurowanych transportów, abyśmy mogli automatycznie z niej skorzystać, gdy została użyta opcja --all. Wartość, która zostanie wstrzyknięta do komendy, jest pobierana z oryginalnej definicji komendy (i jest określana w jeszcze innym przebiegu kompilatora).

Aby to zadziałało, musimy oczywiście zarejestrować compiler pass w naszym kernelu:

<?php

declare(strict_types=1);

namespace Codito\App;

use Codito\App\DependencyInjection\Compiler\OverrideMessengerConsumeCommandCompilerPass;
use Symfony\Bundle\FrameworkBundle\Kernel\MicroKernelTrait;
use Symfony\Component\Config\Loader\LoaderInterface;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\HttpKernel\Kernel as SymfonyKernel;

class Kernel extends SymfonyKernel
{
    use MicroKernelTrait;

    /**
     * Used in {@see MicroKernelTrait::registerContainerConfiguration()}
     */
    protected function configureContainer(ContainerBuilder $container, LoaderInterface $loader): void
    {
        $loader->load($this->getConfigDir() . '/services.yml');

        $container->addCompilerPass(new OverrideMessengerConsumeCommandCompilerPass());
    }
}

Dzięki temu zabiegowi, uruchamiając bin/console messenger:consume --all, automatycznie stosujemy kompletną listę transportów i natychmiast pobieramy wiadomości z nich wszystkich:


 [OK] Consuming messages from transports "poc1, poc2, poc3".

 // The worker will automatically exit once it has received a stop signal via the messenger:stop-workers command.       

 // Quit the worker with CONTROL-C.                                                                                     

 // Re-run the command with a -vv option to see logs about consumed messages

Podsumowanie

Kontener DI w Symfony jest niezwykle elastyczny. Fakt, że pracujesz z definicjami, a nie z konkretnymi instancjami usług, pozwala Ci dynamicznie podłączyć się do procesu kompilacji i robić co tylko chcesz — jedynym ograniczeniem jest Twoja wyobraźnia!

Pamiętaj, że proces kompilacji kontenera DI składa się z wielu etapów, więc zachowaj ostrożność podczas rejestracji Twojego compiler passa. Kolejność ma znaczenie, zarówno pod względem etapów kompilacji, jak i priorytetów w ramach każdego kroku.