

Last updated at: 29/12/2023 09:23



The DatabaseQueue.php file is a part of the Illuminate Queue package in the Laravel framework. It provides a database driver implementation for the Laravel queue system.


size($queue = null)

Returns the size of the queue.

push($job, $data = '', $queue = null)

Pushes a new job onto the queue.

pushRaw($payload, $queue = null, array $options = [])

Pushes a raw payload onto the queue.

later($delay, $job, $data = '', $queue = null)

Pushes a new job onto the queue after (n) seconds.

bulk($jobs, $data = '', $queue = null)

Pushes an array of jobs onto the queue.

release($queue, $job, $delay)

Release a reserved job back onto the queue after (n) seconds.

pushToDatabase($queue, $payload, $delay = 0, $attempts = 0)

Pushes a raw payload to the database with a given delay of (n) seconds.

buildDatabaseRecord($queue, $payload, $availableAt, $attempts = 0)

Creates an array to insert for the given job.

pop($queue = null)

Pops the next job off of the queue.


Gets the next available job for the queue.


Gets the lock required for popping the next job.


Modifies the query to check for available jobs.


Modifies the query to check for jobs that are reserved but have expired.

marshalJob($queue, $job)

Marshals the reserved job into a DatabaseJob instance.


Marks the given job ID as reserved.

deleteReserved($queue, $id)

Deletes a reserved job from the queue.

deleteAndRelease($queue, $job, $delay)

Deletes a reserved job from the reserved queue and releases it.


Deletes all of the jobs from the queue.


Gets the queue or returns the default.


Gets the underlying database instance.


namespace Illuminate\Queue;

use Illuminate\Contracts\Queue\ClearableQueue;
use Illuminate\Contracts\Queue\Queue as QueueContract;
use Illuminate\Database\Connection;
use Illuminate\Queue\Jobs\DatabaseJob;
use Illuminate\Queue\Jobs\DatabaseJobRecord;
use Illuminate\Support\Carbon;
use Illuminate\Support\Str;
use PDO;

class DatabaseQueue extends Queue implements QueueContract, ClearableQueue
     * The database connection instance.
     * @var \Illuminate\Database\Connection
    protected $database;

     * The database table that holds the jobs.
     * @var string
    protected $table;

     * The name of the default queue.
     * @var string
    protected $default;

     * The expiration time of a job.
     * @var int|null
    protected $retryAfter = 60;

     * Create a new database queue instance.
     * @param  \Illuminate\Database\Connection  $database
     * @param  string  $table
     * @param  string  $default
     * @param  int  $retryAfter
     * @param  bool  $dispatchAfterCommit
     * @return void
    public function __construct(Connection $database,
                                $default = 'default',
                                $retryAfter = 60,
                                $dispatchAfterCommit = false)
        $this->table = $table;
        $this->default = $default;
        $this->database = $database;
        $this->retryAfter = $retryAfter;
        $this->dispatchAfterCommit = $dispatchAfterCommit;

     * Get the size of the queue.
     * @param  string|null  $queue
     * @return int
    public function size($queue = null)
        return $this->database->table($this->table)
                    ->where('queue', $this->getQueue($queue))

     * Push a new job onto the queue.
     * @param  string  $job
     * @param  mixed  $data
     * @param  string|null  $queue
     * @return mixed
    public function push($job, $data = '', $queue = null)
        return $this->enqueueUsing(
            $this->createPayload($job, $this->getQueue($queue), $data),
            function ($payload, $queue) {
                return $this->pushToDatabase($queue, $payload);

     * Push a raw payload onto the queue.
     * @param  string  $payload
     * @param  string|null  $queue
     * @param  array  $options
     * @return mixed
    public function pushRaw($payload, $queue = null, array $options = [])
        return $this->pushToDatabase($queue, $payload);

     * Push a new job onto the queue after (n) seconds.
     * @param  \DateTimeInterface|\DateInterval|int  $delay
     * @param  string  $job
     * @param  mixed  $data
     * @param  string|null  $queue
     * @return mixed
    public function later($delay, $job, $data = '', $queue = null)
        return $this->enqueueUsing(
            $this->createPayload($job, $this->getQueue($queue), $data),
            function ($payload, $queue, $delay) {
                return $this->pushToDatabase($queue, $payload, $delay);

     * Push an array of jobs onto the queue.
     * @param  array  $jobs
     * @param  mixed  $data
     * @param  string|null  $queue
     * @return mixed
    public function bulk($jobs, $data = '', $queue = null)
        $queue = $this->getQueue($queue);

        $now = $this->availableAt();

        return $this->database->table($this->table)->insert(collect((array) $jobs)->map(
            function ($job) use ($queue, $data, $now) {
                return $this->buildDatabaseRecord(
                    $this->createPayload($job, $this->getQueue($queue), $data),
                    isset($job->delay) ? $this->availableAt($job->delay) : $now,

     * Release a reserved job back onto the queue after (n) seconds.
     * @param  string  $queue
     * @param  \Illuminate\Queue\Jobs\DatabaseJobRecord  $job
     * @param  int  $delay
     * @return mixed
    public function release($queue, $job, $delay)
        return $this->pushToDatabase($queue, $job->payload, $delay, $job->attempts);

     * Push a raw payload to the database with a given delay of (n) seconds.
     * @param  string|null  $queue
     * @param  string  $payload
     * @param  \DateTimeInterface|\DateInterval|int  $delay
     * @param  int  $attempts
     * @return mixed
    protected function pushToDatabase($queue, $payload, $delay = 0, $attempts = 0)
        return $this->database->table($this->table)->insertGetId($this->buildDatabaseRecord(
            $this->getQueue($queue), $payload, $this->availableAt($delay), $attempts

     * Create an array to insert for the given job.
     * @param  string|null  $queue
     * @param  string  $payload
     * @param  int  $availableAt
     * @param  int  $attempts
     * @return array
    protected function buildDatabaseRecord($queue, $payload, $availableAt, $attempts = 0)
        return [
            'queue' => $queue,
            'attempts' => $attempts,
            'reserved_at' => null,
            'available_at' => $availableAt,
            'created_at' => $this->currentTime(),
            'payload' => $payload,

     * Pop the next job off of the queue.
     * @param  string|null  $queue
     * @return \Illuminate\Contracts\Queue\Job|null
     * @throws \Throwable
    public function pop($queue = null)
        $queue = $this->getQueue($queue);

        return $this->database->transaction(function () use ($queue) {
            if ($job = $this->getNextAvailableJob($queue)) {
                return $this->marshalJob($queue, $job);

     * Get the next available job for the queue.
     * @param  string|null  $queue
     * @return \Illuminate\Queue\Jobs\DatabaseJobRecord|null
    protected function getNextAvailableJob($queue)
        $job = $this->database->table($this->table)
                    ->where('queue', $this->getQueue($queue))
                    ->where(function ($query) {
                    ->orderBy('id', 'asc')

        return $job ? new DatabaseJobRecord((object) $job) : null;

     * Get the lock required for popping the next job.
     * @return string|bool
    protected function getLockForPopping()
        $databaseEngine = $this->database->getPdo()->getAttribute(PDO::ATTR_DRIVER_NAME);
        $databaseVersion = $this->database->getConfig('version') ?? $this->database->getPdo()->getAttribute(PDO::ATTR_SERVER_VERSION);

        if (Str::of($databaseVersion)->contains('MariaDB')) {
            $databaseEngine = 'mariadb';
            $databaseVersion = Str::before(Str::after($databaseVersion, '5.5.5-'), '-');
        } elseif (Str::of($databaseVersion)->contains(['vitess', 'PlanetScale'])) {
            $databaseEngine = 'vitess';
            $databaseVersion = Str::before($databaseVersion, '-');

        if (($databaseEngine === 'mysql' && version_compare($databaseVersion, '8.0.1', '>=')) ||
            ($databaseEngine === 'mariadb' && version_compare($databaseVersion, '10.6.0', '>=')) ||
            ($databaseEngine === 'pgsql' && version_compare($databaseVersion, '9.5', '>='))) {
            return 'FOR UPDATE SKIP LOCKED';

        if ($databaseEngine === 'sqlsrv') {
            return 'with(rowlock,updlock,readpast)';

        return true;

     * Modify the query to check for available jobs.
     * @param  \Illuminate\Database\Query\Builder  $query
     * @return void
    protected function isAvailable($query)
        $query->where(function ($query) {
                  ->where('available_at', '<=', $this->currentTime());

     * Modify the query to check for jobs that are reserved but have expired.
     * @param  \Illuminate\Database\Query\Builder  $query
     * @return void
    protected function isReservedButExpired($query)
        $expiration = Carbon::now()->subSeconds($this->retryAfter)->getTimestamp();

        $query->orWhere(function ($query) use ($expiration) {
            $query->where('reserved_at', '<=', $expiration);

     * Marshal the reserved job into a DatabaseJob instance.
     * @param  string  $queue
     * @param  \Illuminate\Queue\Jobs\DatabaseJobRecord  $job
     * @return \Illuminate\Queue\Jobs\DatabaseJob
    protected function marshalJob($queue, $job)
        $job = $this->markJobAsReserved($job);

        return new DatabaseJob(
            $this->container, $this, $job, $this->connectionName, $queue

     * Mark the given job ID as reserved.
     * @param  \Illuminate\Queue\Jobs\DatabaseJobRecord  $job
     * @return \Illuminate\Queue\Jobs\DatabaseJobRecord
    protected function markJobAsReserved($job)
        $this->database->table($this->table)->where('id', $job->id)->update([
            'reserved_at' => $job->touch(),
            'attempts' => $job->increment(),

        return $job;

     * Delete a reserved job from the queue.
     * @param  string  $queue
     * @param  string  $id
     * @return void
     * @throws \Throwable
    public function deleteReserved($queue, $id)
        $this->database->transaction(function () use ($id) {
            if ($this->database->table($this->table)->lockForUpdate()->find($id)) {
                $this->database->table($this->table)->where('id', $id)->delete();

     * Delete a reserved job from the reserved queue and release it.
     * @param  string  $queue
     * @param  \Illuminate\Queue\Jobs\DatabaseJob  $job
     * @param  int  $delay
     * @return void
    public function deleteAndRelease($queue, $job, $delay)
        $this->database->transaction(function () use ($queue, $job, $delay) {
            if ($this->database->table($this->table)->lockForUpdate()->find($job->getJobId())) {
                $this->database->table($this->table)->where('id', $job->getJobId())->delete();

            $this->release($queue, $job->getJobRecord(), $delay);

     * Delete all of the jobs from the queue.
     * @param  string  $queue
     * @return int
    public function clear($queue)
        return $this->database->table($this->table)
                    ->where('queue', $this->getQueue($queue))

     * Get the queue or return the default.
     * @param  string|null  $queue
     * @return string
    public function getQueue($queue)
        return $queue ?: $this->default;

     * Get the underlying database instance.
     * @return \Illuminate\Database\Connection
    public function getDatabase()
        return $this->database;