Dispatcher.php
TLDR
This file contains the Dispatcher
class which is responsible for dispatching commands to their appropriate handlers. It also includes methods for dispatching commands synchronously, creating batches of queueable jobs, chaining queueable jobs, and more.
Methods
dispatch
Dispatches a command to its appropriate handler. If the command should be queued, it dispatches it to the queue. Otherwise, it dispatches it to the handler immediately.
dispatchSync
Dispatches a command to its appropriate handler in the current process. If the command should be queued and supports being queued on a specific connection, it dispatches it to the queue on the "sync" connection. Otherwise, it dispatches it to the handler immediately.
dispatchNow
Dispatches a command to its appropriate handler in the current process without using the synchronous queue. It checks if the command implements the InteractsWithQueue
and Queueable
interfaces and sets the job if it hasn't been set already. Then, it determines the callback to be executed based on whether a handler is specified or can be retrieved from the command. Finally, it passes the command through the pipeline with the specified pipes and executes the callback.
findBatch
Attempts to find a batch with the given ID by using the BatchRepository from the container.
batch
Creates a new batch of queueable jobs. Accepts a collection or an array of jobs and wraps them in a PendingBatch
.
chain
Creates a new chain of queueable jobs. Accepts a collection or an array of jobs, prepares any nested batches, and returns a PendingChain
.
hasCommandHandler
Determines if the given command has a registered handler in the handlers
array.
getCommandHandler
Retrieves the handler for a command from the container if it has been registered.
commandShouldBeQueued
Determines if the given command should be queued by checking if it implements the ShouldQueue
interface.
dispatchToQueue
Dispatches a command to its appropriate handler behind a queue. Resolves the queue using the queue resolver closure and checks if the queue returned is an implementation of the Queue
interface. If the command has a queue
method, it calls that method passing the queue and the command. Otherwise, it pushes the command to the queue by calling the pushCommandToQueue
method.
pushCommandToQueue
Pushes the command onto the given queue instance. If the command has both a queue
and delay
property, it schedules the command to be executed on the specified queue after the specified delay. If the command has only a queue
property, it pushes the command to that queue. If the command has only a delay
property, it schedules the command to be executed after the specified delay. Otherwise, it simply pushes the command to the queue.
dispatchAfterResponse
Dispatches a command to its appropriate handler after the current process has finished (e.g., after the response has been sent). This is achieved by registering a termination callback with the container that dispatches the command synchronously.
pipeThrough
Sets the pipes through which commands should be piped before dispatching.
map
Maps a command to a handler by merging the specified map
array with the handlers
array.
<?php
namespace Illuminate\Bus;
use Closure;
use Illuminate\Contracts\Bus\QueueingDispatcher;
use Illuminate\Contracts\Container\Container;
use Illuminate\Contracts\Queue\Queue;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\PendingChain;
use Illuminate\Pipeline\Pipeline;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\Jobs\SyncJob;
use Illuminate\Support\Collection;
use RuntimeException;
class Dispatcher implements QueueingDispatcher
{
/**
* The container implementation.
*
* @var \Illuminate\Contracts\Container\Container
*/
protected $container;
/**
* The pipeline instance for the bus.
*
* @var \Illuminate\Pipeline\Pipeline
*/
protected $pipeline;
/**
* The pipes to send commands through before dispatching.
*
* @var array
*/
protected $pipes = [];
/**
* The command to handler mapping for non-self-handling events.
*
* @var array
*/
protected $handlers = [];
/**
* The queue resolver callback.
*
* @var \Closure|null
*/
protected $queueResolver;
/**
* Create a new command dispatcher instance.
*
* @param \Illuminate\Contracts\Container\Container $container
* @param \Closure|null $queueResolver
* @return void
*/
public function __construct(Container $container, Closure $queueResolver = null)
{
$this->container = $container;
$this->queueResolver = $queueResolver;
$this->pipeline = new Pipeline($container);
}
/**
* Dispatch a command to its appropriate handler.
*
* @param mixed $command
* @return mixed
*/
public function dispatch($command)
{
return $this->queueResolver && $this->commandShouldBeQueued($command)
? $this->dispatchToQueue($command)
: $this->dispatchNow($command);
}
/**
* Dispatch a command to its appropriate handler in the current process.
*
* Queueable jobs will be dispatched to the "sync" queue.
*
* @param mixed $command
* @param mixed $handler
* @return mixed
*/
public function dispatchSync($command, $handler = null)
{
if ($this->queueResolver &&
$this->commandShouldBeQueued($command) &&
method_exists($command, 'onConnection')) {
return $this->dispatchToQueue($command->onConnection('sync'));
}
return $this->dispatchNow($command, $handler);
}
/**
* Dispatch a command to its appropriate handler in the current process without using the synchronous queue.
*
* @param mixed $command
* @param mixed $handler
* @return mixed
*/
public function dispatchNow($command, $handler = null)
{
$uses = class_uses_recursive($command);
if (in_array(InteractsWithQueue::class, $uses) &&
in_array(Queueable::class, $uses) &&
! $command->job) {
$command->setJob(new SyncJob($this->container, json_encode([]), 'sync', 'sync'));
}
if ($handler || $handler = $this->getCommandHandler($command)) {
$callback = function ($command) use ($handler) {
$method = method_exists($handler, 'handle') ? 'handle' : '__invoke';
return $handler->{$method}($command);
};
} else {
$callback = function ($command) {
$method = method_exists($command, 'handle') ? 'handle' : '__invoke';
return $this->container->call([$command, $method]);
};
}
return $this->pipeline->send($command)->through($this->pipes)->then($callback);
}
/**
* Attempt to find the batch with the given ID.
*
* @param string $batchId
* @return \Illuminate\Bus\Batch|null
*/
public function findBatch(string $batchId)
{
return $this->container->make(BatchRepository::class)->find($batchId);
}
/**
* Create a new batch of queueable jobs.
*
* @param \Illuminate\Support\Collection|array|mixed $jobs
* @return \Illuminate\Bus\PendingBatch
*/
public function batch($jobs)
{
return new PendingBatch($this->container, Collection::wrap($jobs));
}
/**
* Create a new chain of queueable jobs.
*
* @param \Illuminate\Support\Collection|array $jobs
* @return \Illuminate\Foundation\Bus\PendingChain
*/
public function chain($jobs)
{
$jobs = Collection::wrap($jobs);
$jobs = ChainedBatch::prepareNestedBatches($jobs);
return new PendingChain($jobs->shift(), $jobs->toArray());
}
/**
* Determine if the given command has a handler.
*
* @param mixed $command
* @return bool
*/
public function hasCommandHandler($command)
{
return array_key_exists(get_class($command), $this->handlers);
}
/**
* Retrieve the handler for a command.
*
* @param mixed $command
* @return bool|mixed
*/
public function getCommandHandler($command)
{
if ($this->hasCommandHandler($command)) {
return $this->container->make($this->handlers[get_class($command)]);
}
return false;
}
/**
* Determine if the given command should be queued.
*
* @param mixed $command
* @return bool
*/
protected function commandShouldBeQueued($command)
{
return $command instanceof ShouldQueue;
}
/**
* Dispatch a command to its appropriate handler behind a queue.
*
* @param mixed $command
* @return mixed
*
* @throws \RuntimeException
*/
public function dispatchToQueue($command)
{
$connection = $command->connection ?? null;
$queue = call_user_func($this->queueResolver, $connection);
if (! $queue instanceof Queue) {
throw new RuntimeException('Queue resolver did not return a Queue implementation.');
}
if (method_exists($command, 'queue')) {
return $command->queue($queue, $command);
}
return $this->pushCommandToQueue($queue, $command);
}
/**
* Push the command onto the given queue instance.
*
* @param \Illuminate\Contracts\Queue\Queue $queue
* @param mixed $command
* @return mixed
*/
protected function pushCommandToQueue($queue, $command)
{
if (isset($command->queue, $command->delay)) {
return $queue->laterOn($command->queue, $command->delay, $command);
}
if (isset($command->queue)) {
return $queue->pushOn($command->queue, $command);
}
if (isset($command->delay)) {
return $queue->later($command->delay, $command);
}
return $queue->push($command);
}
/**
* Dispatch a command to its appropriate handler after the current process.
*
* @param mixed $command
* @param mixed $handler
* @return void
*/
public function dispatchAfterResponse($command, $handler = null)
{
$this->container->terminating(function () use ($command, $handler) {
$this->dispatchSync($command, $handler);
});
}
/**
* Set the pipes through which commands should be piped before dispatching.
*
* @param array $pipes
* @return $this
*/
public function pipeThrough(array $pipes)
{
$this->pipes = $pipes;
return $this;
}
/**
* Map a command to a handler.
*
* @param array $map
* @return $this
*/
public function map(array $map)
{
$this->handlers = array_merge($this->handlers, $map);
return $this;
}
}