BeanstalkdQueue.php
TLDR
This file, BeanstalkdQueue.php
, is part of the Illuminate\Queue namespace and contains the definition of the BeanstalkdQueue
class. This class implements the Queue
and QueueContract
interfaces and is responsible for handling interactions with the Beanstalkd queue system. It provides methods for pushing and processing jobs in the queue.
__construct
The constructor method of the BeanstalkdQueue
class initializes the instance variables with the provided values.
size
This method returns the size of the queue, optionally for a specific queue.
push
Pushes a new job onto the queue. The job parameter specifies the class name of the job, the data parameter contains the job data, and the queue parameter optionally specifies the queue to push the job onto.
pushRaw
Pushes a raw payload onto the queue. This method allows the user to push arbitrary payloads onto the queue.
later
Pushes a new job onto the queue after a specified delay. The delay parameter can be provided as a DateTime, a DateInterval, or an integer representing the delay in seconds.
bulk
Pushes an array of jobs onto the queue. The jobs parameter is an array of job objects, the data parameter contains the job data, and the queue parameter optionally specifies the queue to push the jobs onto.
pop
Pops the next job off of the queue. The queue parameter optionally specifies the queue to pop the job from.
deleteMessage
Deletes a message from the Beanstalk queue. The queue parameter specifies the queue, and the id parameter specifies the message ID.
getQueue
Gets the queue or returns the default queue. The queue parameter specifies the queue to get, or if it is not provided, the default queue is returned.
getPheanstalk
Gets the underlying Pheanstalk instance.
<?php
namespace Illuminate\Queue;
use Illuminate\Contracts\Queue\Queue as QueueContract;
use Illuminate\Queue\Jobs\BeanstalkdJob;
use Pheanstalk\Contract\JobIdInterface;
use Pheanstalk\Pheanstalk;
use Pheanstalk\Values\Job;
use Pheanstalk\Values\JobId;
use Pheanstalk\Values\TubeName;
class BeanstalkdQueue extends Queue implements QueueContract
{
/**
* The Pheanstalk instance.
*
* @var \Pheanstalk\Contract\PheanstalkManagerInterface&\Pheanstalk\Contract\PheanstalkPublisherInterface&\Pheanstalk\Contract\PheanstalkSubscriberInterface
*/
protected $pheanstalk;
/**
* The name of the default tube.
*
* @var string
*/
protected $default;
/**
* The "time to run" for all pushed jobs.
*
* @var int
*/
protected $timeToRun;
/**
* The maximum number of seconds to block for a job.
*
* @var int
*/
protected $blockFor;
/**
* Create a new Beanstalkd queue instance.
*
* @param \Pheanstalk\Contract\PheanstalkManagerInterface&\Pheanstalk\Contract\PheanstalkPublisherInterface&\Pheanstalk\Contract\PheanstalkSubscriberInterface $pheanstalk
* @param string $default
* @param int $timeToRun
* @param int $blockFor
* @param bool $dispatchAfterCommit
* @return void
*/
public function __construct($pheanstalk,
$default,
$timeToRun,
$blockFor = 0,
$dispatchAfterCommit = false)
{
$this->default = $default;
$this->blockFor = $blockFor;
$this->timeToRun = $timeToRun;
$this->pheanstalk = $pheanstalk;
$this->dispatchAfterCommit = $dispatchAfterCommit;
}
/**
* Get the size of the queue.
*
* @param string|null $queue
* @return int
*/
public function size($queue = null)
{
return (int) $this->pheanstalk->statsTube(new TubeName($this->getQueue($queue)))->currentJobsReady;
}
/**
* Push a new job onto the queue.
*
* @param string $job
* @param mixed $data
* @param string|null $queue
* @return mixed
*/
public function push($job, $data = '', $queue = null)
{
return $this->enqueueUsing(
$job,
$this->createPayload($job, $this->getQueue($queue), $data),
$queue,
null,
function ($payload, $queue) {
return $this->pushRaw($payload, $queue);
}
);
}
/**
* Push a raw payload onto the queue.
*
* @param string $payload
* @param string|null $queue
* @param array $options
* @return mixed
*/
public function pushRaw($payload, $queue = null, array $options = [])
{
$this->pheanstalk->useTube(new TubeName($this->getQueue($queue)));
return $this->pheanstalk->put(
$payload, Pheanstalk::DEFAULT_PRIORITY, Pheanstalk::DEFAULT_DELAY, $this->timeToRun
);
}
/**
* Push a new job onto the queue after (n) seconds.
*
* @param \DateTimeInterface|\DateInterval|int $delay
* @param string $job
* @param mixed $data
* @param string|null $queue
* @return mixed
*/
public function later($delay, $job, $data = '', $queue = null)
{
return $this->enqueueUsing(
$job,
$this->createPayload($job, $this->getQueue($queue), $data),
$queue,
$delay,
function ($payload, $queue, $delay) {
$this->pheanstalk->useTube(new TubeName($this->getQueue($queue)));
return $this->pheanstalk->put(
$payload,
Pheanstalk::DEFAULT_PRIORITY,
$this->secondsUntil($delay),
$this->timeToRun
);
}
);
}
/**
* Push an array of jobs onto the queue.
*
* @param array $jobs
* @param mixed $data
* @param string|null $queue
* @return void
*/
public function bulk($jobs, $data = '', $queue = null)
{
foreach ((array) $jobs as $job) {
if (isset($job->delay)) {
$this->later($job->delay, $job, $data, $queue);
} else {
$this->push($job, $data, $queue);
}
}
}
/**
* Pop the next job off of the queue.
*
* @param string|null $queue
* @return \Illuminate\Contracts\Queue\Job|null
*/
public function pop($queue = null)
{
$queue = $this->getQueue($queue);
$this->pheanstalk->watch(new TubeName($queue));
$job = $this->pheanstalk->reserveWithTimeout($this->blockFor);
if ($job instanceof JobIdInterface) {
return new BeanstalkdJob(
$this->container, $this->pheanstalk, $job, $this->connectionName, $queue
);
}
}
/**
* Delete a message from the Beanstalk queue.
*
* @param string $queue
* @param string|int $id
* @return void
*/
public function deleteMessage($queue, $id)
{
$this->pheanstalk->useTube(new TubeName($this->getQueue($queue)));
$this->pheanstalk->delete(new Job(new JobId($id), ''));
}
/**
* Get the queue or return the default.
*
* @param string|null $queue
* @return string
*/
public function getQueue($queue)
{
return $queue ?: $this->default;
}
/**
* Get the underlying Pheanstalk instance.
*
* @return \Pheanstalk\Contract\PheanstalkManagerInterface&\Pheanstalk\Contract\PheanstalkPublisherInterface&\Pheanstalk\Contract\PheanstalkSubscriberInterface
*/
public function getPheanstalk()
{
return $this->pheanstalk;
}
}