master

laravel/framework

Last updated at: 29/12/2023 09:20

DynamoBatchRepository.php

TLDR

This file, DynamoBatchRepository.php, is a part of the Illuminate Bus package in the Demo Projects project. It contains the implementation of the BatchRepository interface and provides functionality for interacting with a DynamoDB database to manage batches.

Methods

get

Retrieves a list of batches from the DynamoDB table based on the application name and an optional limit and "before" ID.

find

Retrieves information about an existing batch from the DynamoDB table based on the batch ID.

store

Stores a new pending batch in the DynamoDB table.

incrementTotalJobs

Increments the total number of jobs within a batch in the DynamoDB table.

decrementPendingJobs

Decrements the total number of pending jobs for a batch in the DynamoDB table.

incrementFailedJobs

Increments the total number of failed jobs for a batch in the DynamoDB table.

markAsFinished

Updates the finished_at attribute of a batch in the DynamoDB table to mark it as finished.

cancel

Updates the cancelled_at and finished_at attributes of a batch in the DynamoDB table to mark it as cancelled.

delete

Deletes a batch from the DynamoDB table based on the batch ID.

transaction

Executes the given Closure within a storage-specific transaction. (This method does nothing in this implementation.)

rollBack

Rolls back the last database transaction for the connection. (This method does nothing in this implementation.)

createAwsDynamoTable

Creates the underlying DynamoDB table for storing batch information.

deleteAwsDynamoTable

Deletes the underlying DynamoDB table.

getDynamoClient

Returns the underlying DynamoDB client instance.

getTable

Returns the name of the table that contains the batch records.

Classes

N/A

<?php

namespace Illuminate\Bus;

use Aws\DynamoDb\DynamoDbClient;
use Aws\DynamoDb\Marshaler;
use Carbon\CarbonImmutable;
use Closure;
use Illuminate\Support\Str;

class DynamoBatchRepository implements BatchRepository
{
    /**
     * The batch factory instance.
     *
     * @var \Illuminate\Bus\BatchFactory
     */
    protected $factory;

    /**
     * The database connection instance.
     *
     * @var \Aws\DynamoDb\DynamoDbClient
     */
    protected $dynamoDbClient;

    /**
     * The application name.
     *
     * @var string
     */
    protected $applicationName;

    /**
     * The table to use to store batch information.
     *
     * @var string
     */
    protected $table;

    /**
     * The time-to-live value for batch records.
     *
     * @var int
     */
    protected $ttl;

    /**
     * The name of the time-to-live attribute for batch records.
     *
     * @var string
     */
    protected $ttlAttribute;

    /**
     * The DynamoDB marshaler instance.
     *
     * @var \Aws\DynamoDb\Marshaler
     */
    protected $marshaler;

    /**
     * Create a new batch repository instance.
     */
    public function __construct(
        BatchFactory $factory,
        DynamoDbClient $dynamoDbClient,
        string $applicationName,
        string $table,
        ?int $ttl,
        ?string $ttlAttribute
    ) {
        $this->factory = $factory;
        $this->dynamoDbClient = $dynamoDbClient;
        $this->applicationName = $applicationName;
        $this->table = $table;
        $this->ttl = $ttl;
        $this->ttlAttribute = $ttlAttribute;
        $this->marshaler = new Marshaler;
    }

    /**
     * Retrieve a list of batches.
     *
     * @param  int  $limit
     * @param  mixed  $before
     * @return \Illuminate\Bus\Batch[]
     */
    public function get($limit = 50, $before = null)
    {
        $condition = 'application = :application';

        if ($before) {
            $condition = 'application = :application AND id < :id';
        }

        $result = $this->dynamoDbClient->query([
            'TableName' => $this->table,
            'KeyConditionExpression' => $condition,
            'ExpressionAttributeValues' => array_filter([
                ':application' => ['S' => $this->applicationName],
                ':id' => array_filter(['S' => $before]),
            ]),
            'Limit' => $limit,
            'ScanIndexForward' => false,
        ]);

        return array_map(
            fn ($b) => $this->toBatch($this->marshaler->unmarshalItem($b, mapAsObject: true)),
            $result['Items']
        );
    }

    /**
     * Retrieve information about an existing batch.
     *
     * @param  string  $batchId
     * @return \Illuminate\Bus\Batch|null
     */
    public function find(string $batchId)
    {
        if ($batchId === '') {
            return null;
        }

        $b = $this->dynamoDbClient->getItem([
            'TableName' => $this->table,
            'Key' => [
                'application' => ['S' => $this->applicationName],
                'id' => ['S' => $batchId],
            ],
        ]);

        if (! isset($b['Item'])) {
            // If we didn't find it via a standard read, attempt consistent read...
            $b = $this->dynamoDbClient->getItem([
                'TableName' => $this->table,
                'Key' => [
                    'application' => ['S' => $this->applicationName],
                    'id' => ['S' => $batchId],
                ],
                'ConsistentRead' => true,
            ]);

            if (! isset($b['Item'])) {
                return null;
            }
        }

        $batch = $this->marshaler->unmarshalItem($b['Item'], mapAsObject: true);

        if ($batch) {
            return $this->toBatch($batch);
        }
    }

    /**
     * Store a new pending batch.
     *
     * @param  \Illuminate\Bus\PendingBatch  $batch
     * @return \Illuminate\Bus\Batch
     */
    public function store(PendingBatch $batch)
    {
        $id = (string) Str::orderedUuid();

        $batch = [
            'id' => $id,
            'name' => $batch->name,
            'total_jobs' => 0,
            'pending_jobs' => 0,
            'failed_jobs' => 0,
            'failed_job_ids' => [],
            'options' => $this->serialize($batch->options ?? []),
            'created_at' => time(),
            'cancelled_at' => null,
            'finished_at' => null,
        ];

        if (! is_null($this->ttl)) {
            $batch[$this->ttlAttribute] = time() + $this->ttl;
        }

        $this->dynamoDbClient->putItem([
            'TableName' => $this->table,
            'Item' => $this->marshaler->marshalItem(
                array_merge(['application' => $this->applicationName], $batch)
            ),
        ]);

        return $this->find($id);
    }

    /**
     * Increment the total number of jobs within the batch.
     *
     * @param  string  $batchId
     * @param  int  $amount
     * @return void
     */
    public function incrementTotalJobs(string $batchId, int $amount)
    {
        $update = 'SET total_jobs = total_jobs + :val, pending_jobs = pending_jobs + :val';

        if ($this->ttl) {
            $update = "SET total_jobs = total_jobs + :val, pending_jobs = pending_jobs + :val, #{$this->ttlAttribute} = :ttl";
        }

        $this->dynamoDbClient->updateItem(array_filter([
            'TableName' => $this->table,
            'Key' => [
                'application' => ['S' => $this->applicationName],
                'id' => ['S' => $batchId],
            ],
            'UpdateExpression' => $update,
            'ExpressionAttributeValues' => array_filter([
                ':val' => ['N' => "$amount"],
                ':ttl' => array_filter(['N' => $this->getExpiryTime()]),
            ]),
            'ExpressionAttributeNames' => $this->ttlExpressionAttributeName(),
            'ReturnValues' => 'ALL_NEW',
        ]));
    }

    /**
     * Decrement the total number of pending jobs for the batch.
     *
     * @param  string  $batchId
     * @param  string  $jobId
     * @return \Illuminate\Bus\UpdatedBatchJobCounts
     */
    public function decrementPendingJobs(string $batchId, string $jobId)
    {
        $update = 'SET pending_jobs = pending_jobs - :inc';

        if ($this->ttl !== null) {
            $update = "SET pending_jobs = pending_jobs - :inc, #{$this->ttlAttribute} = :ttl";
        }

        $batch = $this->dynamoDbClient->updateItem(array_filter([
            'TableName' => $this->table,
            'Key' => [
                'application' => ['S' => $this->applicationName],
                'id' => ['S' => $batchId],
            ],
            'UpdateExpression' => $update,
            'ExpressionAttributeValues' => array_filter([
                ':inc' => ['N' => '1'],
                ':ttl' => array_filter(['N' => $this->getExpiryTime()]),
            ]),
            'ExpressionAttributeNames' => $this->ttlExpressionAttributeName(),
            'ReturnValues' => 'ALL_NEW',
        ]));

        $values = $this->marshaler->unmarshalItem($batch['Attributes']);

        return new UpdatedBatchJobCounts(
            $values['pending_jobs'],
            $values['failed_jobs']
        );
    }

    /**
     * Increment the total number of failed jobs for the batch.
     *
     * @param  string  $batchId
     * @param  string  $jobId
     * @return \Illuminate\Bus\UpdatedBatchJobCounts
     */
    public function incrementFailedJobs(string $batchId, string $jobId)
    {
        $update = 'SET failed_jobs = failed_jobs + :inc, failed_job_ids = list_append(failed_job_ids, :jobId)';

        if ($this->ttl !== null) {
            $update = "SET failed_jobs = failed_jobs + :inc, failed_job_ids = list_append(failed_job_ids, :jobId), #{$this->ttlAttribute} = :ttl";
        }

        $batch = $this->dynamoDbClient->updateItem(array_filter([
            'TableName' => $this->table,
            'Key' => [
                'application' => ['S' => $this->applicationName],
                'id' => ['S' => $batchId],
            ],
            'UpdateExpression' => $update,
            'ExpressionAttributeValues' => array_filter([
                ':jobId' => $this->marshaler->marshalValue([$jobId]),
                ':inc' => ['N' => '1'],
                ':ttl' => array_filter(['N' => $this->getExpiryTime()]),
            ]),
            'ExpressionAttributeNames' => $this->ttlExpressionAttributeName(),
            'ReturnValues' => 'ALL_NEW',
        ]));

        $values = $this->marshaler->unmarshalItem($batch['Attributes']);

        return new UpdatedBatchJobCounts(
            $values['pending_jobs'],
            $values['failed_jobs']
        );
    }

    /**
     * Mark the batch that has the given ID as finished.
     *
     * @param  string  $batchId
     * @return void
     */
    public function markAsFinished(string $batchId)
    {
        $update = 'SET finished_at = :timestamp';

        if ($this->ttl !== null) {
            $update = "SET finished_at = :timestamp, #{$this->ttlAttribute} = :ttl";
        }

        $this->dynamoDbClient->updateItem(array_filter([
            'TableName' => $this->table,
            'Key' => [
                'application' => ['S' => $this->applicationName],
                'id' => ['S' => $batchId],
            ],
            'UpdateExpression' => $update,
            'ExpressionAttributeValues' => array_filter([
                ':timestamp' => ['N' => (string) time()],
                ':ttl' => array_filter(['N' => $this->getExpiryTime()]),
            ]),
            'ExpressionAttributeNames' => $this->ttlExpressionAttributeName(),
        ]));
    }

    /**
     * Cancel the batch that has the given ID.
     *
     * @param  string  $batchId
     * @return void
     */
    public function cancel(string $batchId)
    {
        $update = 'SET cancelled_at = :timestamp, finished_at = :timestamp';

        if ($this->ttl !== null) {
            $update = "SET cancelled_at = :timestamp, finished_at = :timestamp, #{$this->ttlAttribute} = :ttl";
        }

        $this->dynamoDbClient->updateItem(array_filter([
            'TableName' => $this->table,
            'Key' => [
                'application' => ['S' => $this->applicationName],
                'id' => ['S' => $batchId],
            ],
            'UpdateExpression' => $update,
            'ExpressionAttributeValues' => array_filter([
                ':timestamp' => ['N' => (string) time()],
                ':ttl' => array_filter(['N' => $this->getExpiryTime()]),
            ]),
            'ExpressionAttributeNames' => $this->ttlExpressionAttributeName(),
        ]));
    }

    /**
     * Delete the batch that has the given ID.
     *
     * @param  string  $batchId
     * @return void
     */
    public function delete(string $batchId)
    {
        $this->dynamoDbClient->deleteItem([
            'TableName' => $this->table,
            'Key' => [
                'application' => ['S' => $this->applicationName],
                'id' => ['S' => $batchId],
            ],
        ]);
    }

    /**
     * Execute the given Closure within a storage specific transaction.
     *
     * @param  \Closure  $callback
     * @return mixed
     */
    public function transaction(Closure $callback)
    {
        return $callback();
    }

    /**
     * Rollback the last database transaction for the connection.
     *
     * @return void
     */
    public function rollBack()
    {
    }

    /**
     * Convert the given raw batch to a Batch object.
     *
     * @param  object  $batch
     * @return \Illuminate\Bus\Batch
     */
    protected function toBatch($batch)
    {
        return $this->factory->make(
            $this,
            $batch->id,
            $batch->name,
            (int) $batch->total_jobs,
            (int) $batch->pending_jobs,
            (int) $batch->failed_jobs,
            $batch->failed_job_ids,
            $this->unserialize($batch->options) ?? [],
            CarbonImmutable::createFromTimestamp($batch->created_at),
            $batch->cancelled_at ? CarbonImmutable::createFromTimestamp($batch->cancelled_at) : $batch->cancelled_at,
            $batch->finished_at ? CarbonImmutable::createFromTimestamp($batch->finished_at) : $batch->finished_at
        );
    }

    /**
     * Create the underlying DynamoDB table.
     *
     * @return void
     */
    public function createAwsDynamoTable(): void
    {
        $definition = [
            'TableName' => $this->table,
            'AttributeDefinitions' => [
                [
                    'AttributeName' => 'application',
                    'AttributeType' => 'S',
                ],
                [
                    'AttributeName' => 'id',
                    'AttributeType' => 'S',
                ],
            ],
            'KeySchema' => [
                [
                    'AttributeName' => 'application',
                    'KeyType' => 'HASH',
                ],
                [
                    'AttributeName' => 'id',
                    'KeyType' => 'RANGE',
                ],
            ],
            'BillingMode' => 'PAY_PER_REQUEST',
        ];

        $this->dynamoDbClient->createTable($definition);

        if (! is_null($this->ttl)) {
            $this->dynamoDbClient->updateTimeToLive([
                'TableName' => $this->table,
                'TimeToLiveSpecification' => [
                    'AttributeName' => $this->ttlAttribute,
                    'Enabled' => true,
                ],
            ]);
        }
    }

    /**
     * Delete the underlying DynamoDB table.
     */
    public function deleteAwsDynamoTable(): void
    {
        $this->dynamoDbClient->deleteTable([
            'TableName' => $this->table,
        ]);
    }

    /**
     * Get the expiry time based on the configured time-to-live.
     *
     * @return string|null
     */
    protected function getExpiryTime(): ?string
    {
        return is_null($this->ttl) ? null : (string) (time() + $this->ttl);
    }

    /**
     * Get the expression attribute name for the time-to-live attribute.
     *
     * @return array
     */
    protected function ttlExpressionAttributeName(): array
    {
        return is_null($this->ttl) ? [] : ["#{$this->ttlAttribute}" => $this->ttlAttribute];
    }

    /**
     * Serialize the given value.
     *
     * @param  mixed  $value
     * @return string
     */
    protected function serialize($value)
    {
        return serialize($value);
    }

    /**
     * Unserialize the given value.
     *
     * @param  string  $serialized
     * @return mixed
     */
    protected function unserialize($serialized)
    {
        return unserialize($serialized);
    }

    /**
     * Get the underlying DynamoDB client instance.
     *
     * @return \Aws\DynamoDb\DynamoDbClient
     */
    public function getDynamoClient(): DynamoDbClient
    {
        return $this->dynamoDbClient;
    }

    /**
     * The the name of the table that contains the batch records.
     *
     * @return string
     */
    public function getTable(): string
    {
        return $this->table;
    }
}