‪TYPO3CMS  ‪main
ConsumeMessagesCommand.php
Go to the documentation of this file.
1 <?php
2 
3 declare(strict_types=1);
4 
5 /*
6  * This file is part of the TYPO3 CMS project.
7  *
8  * It is free software; you can redistribute it and/or modify it under
9  * the terms of the GNU General Public License, either version 2
10  * of the License, or any later version.
11  *
12  * For the full copyright and license information, please read the
13  * LICENSE.txt file that was distributed with this source code.
14  *
15  * The TYPO3 project - inspiring people to share!
16  */
17 
19 
20 use Psr\EventDispatcher\EventDispatcherInterface;
21 use Symfony\Component\Console\Attribute\AsCommand;
22 use Symfony\Component\Console\Command\Command;
23 use Symfony\Component\Console\Completion\CompletionInput;
24 use Symfony\Component\Console\Completion\CompletionSuggestions;
25 use Symfony\Component\Console\Exception\RuntimeException;
26 use Symfony\Component\Console\Input\InputArgument;
27 use Symfony\Component\Console\Input\InputInterface;
28 use Symfony\Component\Console\Input\InputOption;
29 use Symfony\Component\Console\Logger\ConsoleLogger;
30 use Symfony\Component\Console\Output\ConsoleOutputInterface;
31 use Symfony\Component\Console\Output\OutputInterface;
32 use Symfony\Component\Console\Question\ChoiceQuestion;
33 use Symfony\Component\Console\Style\SymfonyStyle;
34 use Symfony\Component\DependencyInjection\ServiceLocator;
35 use Symfony\Component\Messenger\MessageBusInterface;
36 use Symfony\Component\Messenger\Worker;
38 
42 #[AsCommand('messenger:consume', 'Consume messages')]
43 class ‪ConsumeMessagesCommand extends Command
44 {
45  public function ‪__construct(
46  private readonly MessageBusInterface $messageBus,
47  private readonly ServiceLocator $receiverLocator,
48  private readonly ‪StopWorkerOnTimeLimitListener $stopWorkerOnTimeLimitListener,
49  private readonly EventDispatcherInterface $eventDispatcher,
50  private readonly array $receiverNames = [],
51  private readonly array $busIds = [],
52  ) {
53  parent::__construct();
54  }
55 
56  protected function ‪configure(): void
57  {
58  $defaultReceiverName = count($this->receiverNames) === 1 ? current($this->receiverNames) : null;
59 
60  $this
61  ->setDefinition(
62  [
63  new InputArgument(
64  'receivers',
65  InputArgument::IS_ARRAY,
66  'Names of the receivers/transports to consume in order of priority',
67  $defaultReceiverName ? [$defaultReceiverName] : []
68  ),
69  new InputOption('sleep', null, InputOption::VALUE_REQUIRED, 'Seconds to sleep before asking for new messages after no messages were found', 1),
70  new InputOption('queues', null, InputOption::VALUE_REQUIRED | InputOption::VALUE_IS_ARRAY, 'Limit receivers to only consume from the specified queues'),
71  new InputOption('exit-code-on-limit', null, InputOption::VALUE_REQUIRED, 'Exit code when limits are reached', 0),
72  ]
73  )
74  ->setHelp(
75  <<<'EOF'
76 The <info>%command.name%</info> command consumes messages and dispatches them to the message bus.
77 
78  <info>php %command.full_name% <receiver-name></info>
79 
80 To receive from multiple transports, pass each name:
81 
82  <info>php %command.full_name% receiver1 receiver2</info>
83 
84 Use the --queues option to limit a receiver to only certain queues (only supported by some receivers):
85 
86  <info>php %command.full_name% <receiver-name> --queues=fasttrack</info>
87 
88 EOF
89  );
90  }
91 
92  protected function ‪interact(InputInterface $input, OutputInterface ‪$output)
93  {
94  $io = new SymfonyStyle($input, ‪$output instanceof ConsoleOutputInterface ? ‪$output->getErrorOutput() : ‪$output);
95 
96  if ($this->receiverNames && !$input->getArgument('receivers')) {
97  $io->block('Which transports/receivers do you want to consume?', null, 'fg=white;bg=blue', ' ', true);
98 
99  $io->writeln('Choose which receivers you want to consume messages from in order of priority.');
100  if (count($this->receiverNames) > 1) {
101  $io->writeln(sprintf('Hint: to consume from multiple, use a list of their names, e.g. <comment>%s</comment>', implode(', ', $this->receiverNames)));
102  }
103 
104  $question = new ChoiceQuestion('Select receivers to consume:', $this->receiverNames, 0);
105  $question->setMultiselect(true);
106 
107  $input->setArgument('receivers', $io->askQuestion($question));
108  }
109 
110  if (!$input->getArgument('receivers')) {
111  throw new RuntimeException('Please pass at least one receiver.', 1605305001);
112  }
113  }
114 
115  protected function ‪execute(InputInterface $input, OutputInterface ‪$output): int
116  {
117  $exitCodeOnLimit = (int)($input->getOption('exit-code-on-limit'));
118 
119  $receivers = [];
120  $rateLimiters = [];
121  $receiverNames = $input->getArgument('receivers');
122  foreach ($receiverNames as $receiverName) {
123  if (!$this->receiverLocator->has($receiverName)) {
124  $message = sprintf('The receiver "%s" does not exist.', $receiverName);
125  if ($this->receiverNames) {
126  $message .= sprintf(' Valid receivers are: %s.', implode(', ', $this->receiverNames));
127  }
128 
129  throw new RuntimeException($message, 1605305002);
130  }
131 
132  $receivers[$receiverName] = $this->receiverLocator->get($receiverName);
133  }
134 
135  $io = new SymfonyStyle($input, ‪$output instanceof ConsoleOutputInterface ? ‪$output->getErrorOutput() : ‪$output);
136  $io->success(sprintf('Consuming messages from transport%s "%s".', count($receivers) > 1 ? 's' : '', implode(', ', $receiverNames)));
137  $io->comment('Quit the worker with CONTROL-C.');
138 
139  if (‪$output->getVerbosity() < OutputInterface::VERBOSITY_VERBOSE) {
140  $io->comment('Re-run the command with a -vv option to see logs about consumed messages.');
141  }
142 
143  $worker = new Worker($receivers, $this->messageBus, $this->eventDispatcher, new ConsoleLogger(‪$output), $rateLimiters);
144  $options = [
145  'sleep' => $input->getOption('sleep') * 1000000,
146  ];
147  $queues = $input->getOption('queues');
148  if ($queues) {
149  $options['queues'] = $queues;
150  }
151  $worker->run($options);
152 
153  return $this->stopWorkerOnTimeLimitListener->hasStopped() ? $exitCodeOnLimit : 0;
154  }
155 
156  public function ‪complete(CompletionInput $input, CompletionSuggestions $suggestions): void
157  {
158  if ($input->mustSuggestArgumentValuesFor('receivers')) {
159  $suggestions->suggestValues(array_diff($this->receiverNames, array_diff($input->getArgument('receivers'), [$input->getCompletionValue()])));
160 
161  return;
162  }
163 
164  if ($input->mustSuggestOptionValuesFor('bus')) {
165  $suggestions->suggestValues($this->busIds);
166  }
167  }
168 }
‪TYPO3\CMS\Core\Command\ConsumeMessagesCommand
Definition: ConsumeMessagesCommand.php:44
‪TYPO3\CMS\Core\Command\ConsumeMessagesCommand\interact
‪interact(InputInterface $input, OutputInterface $output)
Definition: ConsumeMessagesCommand.php:92
‪$output
‪$output
Definition: annotationChecker.php:114
‪TYPO3\CMS\Core\Command\ConsumeMessagesCommand\complete
‪complete(CompletionInput $input, CompletionSuggestions $suggestions)
Definition: ConsumeMessagesCommand.php:156
‪TYPO3\CMS\Core\Messenger\EventListener\StopWorkerOnTimeLimitListener
Definition: StopWorkerOnTimeLimitListener.php:32
‪TYPO3\CMS\Core\Command\ConsumeMessagesCommand\configure
‪configure()
Definition: ConsumeMessagesCommand.php:56
‪TYPO3\CMS\Core\Command
Definition: CacheFlushCommand.php:18
‪TYPO3\CMS\Core\Command\ConsumeMessagesCommand\__construct
‪__construct(private readonly MessageBusInterface $messageBus, private readonly ServiceLocator $receiverLocator, private readonly StopWorkerOnTimeLimitListener $stopWorkerOnTimeLimitListener, private readonly EventDispatcherInterface $eventDispatcher, private readonly array $receiverNames=[], private readonly array $busIds=[],)
Definition: ConsumeMessagesCommand.php:45
‪TYPO3\CMS\Core\Command\ConsumeMessagesCommand\execute
‪execute(InputInterface $input, OutputInterface $output)
Definition: ConsumeMessagesCommand.php:115