Amazon has announced their new FIFO SQS service and I'd like to use it in Laravel Queue to solve some concurrency issues.
I've created several new queues and changed the configurations. However, I got a MissingParameter error which says
The request must contain the parameter MessageGroupId.
So I modified the file vendor/laravel/framework/src/Illuminate/Queue/SqsQueue.php
public function pushRaw($payload, $queue = null, array $options = [])
{
$response = $this->sqs->sendMessage(['QueueUrl' => $this->getQueue($queue), 'MessageBody' => $payload,
'MessageGroupId' => env('APP_ENV', getenv('APP_ENV'))]);
return $response->get('MessageId');
}
public function later($delay, $job, $data = '', $queue = null)
{
$payload = $this->createPayload($job, $data);
$delay = $this->getSeconds($delay);
return $this->sqs->sendMessage([
'QueueUrl' => $this->getQueue($queue), 'MessageBody' => $payload, 'DelaySeconds' => $delay,
'MessageGroupId' => env('APP_ENV', getenv('APP_ENV'))
])->get('MessageId');
}
I'm using APP_ENV as the group ID (it's a single message queue so actually it doesn't matter a lot. I just want everything to be FIFO).
But I'm still getting the same error message. How could I fix it? Any help would be appreciated.
(btw, where has the SDK defined sendMessage? I can find a stub for it but I didn't find the detailed implementation)
To create an Amazon SQS queue (console) Open the Amazon SQS console at https://console.aws.amazon.com/sqs/ . Choose Create queue. For Type, the Standard queue type is set by default. To create a FIFO queue, choose FIFO.
You can't convert an existing standard queue into a FIFO queue. To make the move, you must either create a new FIFO queue for your application or delete your existing standard queue and recreate it as a FIFO queue.
In FIFO queues, messages are ordered based on message group ID. If multiple hosts (or different threads on the same host) send messages with the same message group ID to a FIFO queue, Amazon SQS stores the messages in the order in which they arrive for processing.
To determine whether a queue is FIFO, you can check whether QueueName ends with the . fifo suffix. ContentBasedDeduplication – Returns whether content-based deduplication is enabled for the queue. For more information, see Exactly-once processing in the Amazon SQS Developer Guide.
I want to point out to others who might stumble across the same issue that, although editing SqsQueue.php works, it will easily be reset by a composer install or composer update. An alternative is to implement a new Illuminate\Queue\Connectors\ConnectorInterface for SQS FIFO then add it to Laravel's queue manager.
My approach is as follows:
SqsFifoQueue class that extends Illuminate\Queue\SqsQueue but supports SQS FIFO.SqsFifoConnector class that extends Illuminate\Queue\Connectors\SqsConnector that would establish a connection using SqsFifoQueue.SqsFifoServiceProvider that registers the SqsFifoConnector to Laravel's queue manager.SqsFifoServiceProvider to your config/app.php.config/queue.php to use the new SQS FIFO Queue driver.Example:
Create a new SqsFifoQueue class that extends Illuminate\Queue\SqsQueue but supports SQS FIFO.
<?php
class SqsFifoQueue extends \Illuminate\Queue\SqsQueue
{
public function pushRaw($payload, $queue = null, array $options = [])
{
$response = $this->sqs->sendMessage([
'QueueUrl' => $this->getQueue($queue),
'MessageBody' => $payload,
'MessageGroupId' => uniqid(),
'MessageDeduplicationId' => uniqid(),
]);
return $response->get('MessageId');
}
}
Create a new SqsFifoConnector class that extends Illuminate\Queue\Connectors\SqsConnector that would establish a connection using SqsFifoQueue.
<?php
use Aws\Sqs\SqsClient;
use Illuminate\Support\Arr;
class SqsFifoConnector extends \Illuminate\Queue\Connectors\SqsConnector
{
public function connect(array $config)
{
$config = $this->getDefaultConfiguration($config);
if ($config['key'] && $config['secret']) {
$config['credentials'] = Arr::only($config, ['key', 'secret']);
}
return new SqsFifoQueue(
new SqsClient($config), $config['queue'], Arr::get($config, 'prefix', '')
);
}
}
Create a new SqsFifoServiceProvider that registers the SqsFifoConnector to Laravel's queue manager.
<?php
class SqsFifoServiceProvider extends \Illuminate\Support\ServiceProvider
{
public function register()
{
$this->app->afterResolving('queue', function ($manager) {
$manager->addConnector('sqsfifo', function () {
return new SqsFifoConnector;
});
});
}
}
Add SqsFifoServiceProvider to your config/app.php.
<?php
return [
'providers' => [
...
SqsFifoServiceProvider::class,
],
];
Update config/queue.php to use the new SQS FIFO Queue driver.
<?php
return [
'default' => 'sqsfifo',
'connections' => [
'sqsfifo' => [
'driver' => 'sqsfifo',
'key' => 'my_key'
'secret' => 'my_secret',
'queue' => 'my_queue_url',
'region' => 'my_sqs_region',
],
],
];
Then your queue should now support SQS FIFO Queues.
Shameless plug: While working on the steps above I've created a laravel-sqs-fifo composer package to handle this at https://github.com/maqe/laravel-sqs-fifo.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With