MonitorCommand.php
TLDR
The provided file is MonitorCommand.php
located in the Illuminate\Queue\Console
namespace. It is a command class that monitors the size of specified queues and dispatches events when certain conditions are met.
Methods
handle
This method executes the console command. It parses the specified queues, displays their sizes, and dispatches events if the queue sizes exceed the maximum threshold.
parseQueues
This method parses the specified queues into an array of connections and queues. It uses the Factory
instance to get the size of each queue and determines the status based on the maximum threshold.
displaySizes
This method displays the sizes and statuses of the queues in the console. It uses the twoColumnDetail
method to format the output.
dispatchEvents
This method dispatches events for queues that exceed the maximum threshold. It creates a QueueBusy
event object and fires the event using the Dispatcher
instance.
Classes
None
<?php
namespace Illuminate\Queue\Console;
use Illuminate\Console\Command;
use Illuminate\Contracts\Events\Dispatcher;
use Illuminate\Contracts\Queue\Factory;
use Illuminate\Queue\Events\QueueBusy;
use Illuminate\Support\Collection;
use Symfony\Component\Console\Attribute\AsCommand;
#[AsCommand(name: 'queue:monitor')]
class MonitorCommand extends Command
{
/**
* The console command name.
*
* @var string
*/
protected $signature = 'queue:monitor
{queues : The names of the queues to monitor}
{--max=1000 : The maximum number of jobs that can be on the queue before an event is dispatched}';
/**
* The console command description.
*
* @var string
*/
protected $description = 'Monitor the size of the specified queues';
/**
* The queue manager instance.
*
* @var \Illuminate\Contracts\Queue\Factory
*/
protected $manager;
/**
* The events dispatcher instance.
*
* @var \Illuminate\Contracts\Events\Dispatcher
*/
protected $events;
/**
* Create a new queue monitor command.
*
* @param \Illuminate\Contracts\Queue\Factory $manager
* @param \Illuminate\Contracts\Events\Dispatcher $events
* @return void
*/
public function __construct(Factory $manager, Dispatcher $events)
{
parent::__construct();
$this->manager = $manager;
$this->events = $events;
}
/**
* Execute the console command.
*
* @return void
*/
public function handle()
{
$queues = $this->parseQueues($this->argument('queues'));
$this->displaySizes($queues);
$this->dispatchEvents($queues);
}
/**
* Parse the queues into an array of the connections and queues.
*
* @param string $queues
* @return \Illuminate\Support\Collection
*/
protected function parseQueues($queues)
{
return collect(explode(',', $queues))->map(function ($queue) {
[$connection, $queue] = array_pad(explode(':', $queue, 2), 2, null);
if (! isset($queue)) {
$queue = $connection;
$connection = $this->laravel['config']['queue.default'];
}
return [
'connection' => $connection,
'queue' => $queue,
'size' => $size = $this->manager->connection($connection)->size($queue),
'status' => $size >= $this->option('max') ? '<fg=yellow;options=bold>ALERT</>' : '<fg=green;options=bold>OK</>',
];
});
}
/**
* Display the queue sizes in the console.
*
* @param \Illuminate\Support\Collection $queues
* @return void
*/
protected function displaySizes(Collection $queues)
{
$this->newLine();
$this->components->twoColumnDetail('<fg=gray>Queue name</>', '<fg=gray>Size / Status</>');
$queues->each(function ($queue) {
$name = '['.$queue['connection'].'] '.$queue['queue'];
$status = '['.$queue['size'].'] '.$queue['status'];
$this->components->twoColumnDetail($name, $status);
});
$this->newLine();
}
/**
* Fire the monitoring events.
*
* @param \Illuminate\Support\Collection $queues
* @return void
*/
protected function dispatchEvents(Collection $queues)
{
foreach ($queues as $queue) {
if ($queue['status'] == '<fg=green;options=bold>OK</>') {
continue;
}
$this->events->dispatch(
new QueueBusy(
$queue['connection'],
$queue['queue'],
$queue['size'],
)
);
}
}
}