Creating an SQS Queue with NodeJs SDK

Introductory Lab

Sat, 10 Oct 2020

Overview

This introductory Lab provides a basic overview of creating queues, processing messages and understanding the lifecycle of AWS SQS. We are going to use NodeJs SDK. All the code used in the lab can be found at the ordev gitlab repo.

Amazon Simple Queue Service s a fully managed message queuing service that enables you to decouple and scale microservices, distributed systems, and serverless applications. SQS eliminates the complexity and overhead associated with managing and operating message oriented middleware, and empowers developers to focus on differentiating work.

Topics covered

  • Understanding Message LifeCycle
  • Creating the Queue
  • Generating Messages to the queue
  • Process and delete SQS Messages

Time needed:

  • 1h

Understanding Message LifeCycle

unsplash.com

  1. A producer (component 1) sends message A to a queue, and the message is distributed across the Amazon SQS servers redundantly.

  2. When a consumer (component 2) is ready to process messages, it consumes messages from the queue, and message A is returned. While message A is being processed, it remains in the queue and isn’t returned to subsequent receive requests for the duration of the visibility timeout.

  3. The consumer (component 2) deletes message A from the queue to prevent the message from being received and processed again when the visibility timeout expires.

Task 1: Creating the Queue

  • Step 1: Creation of simple code to call createQueue in aws sqs.
var AWS = require('aws-sdk');
const dotenv = require('dotenv');
dotenv.config();

AWS.config.region = `${process.env.AWS_REGION}`;

var sqs = new AWS.SQS();
var params =
    {
        QueueName: `${process.env.NAME_QUEUE}`, /* required */
        Attributes: {
            ReceiveMessageWaitTimeSeconds: '20',
            VisibilityTimeout: '60'
        }
    };

sqs.createQueue(params, function (err, data) {
    if (err)
        console.log(err, err.stack);
    else
        console.log('Successfully created SQS queue URL ' + data.QueueUrl);
});
  • After executing our code , we go to the aws console and the queue should be created with the parameters specified ReceiveMessageWaitTimeSeconds and Visibilitytimeout.

unsplash.com

Task 2: Generating Messages to the queue

  • Step 1: In order to access properly to the queue we need the queue Url. For this reason it’s needed to implement getQueueUrl in our SimpleQueueGenerator class:
/**
 * @returns {Promise<*>} The url of the Queue
 */
var params =
    {
        QueueName: `${process.env.NAME_QUEUE}`
    };
async function getQueueUrl() {
    const response = await sqs.getQueueUrl(params).promise();
    console.log(`Response arrived for get queue url:  ${util.inspect(response)}`);
    return response.QueueUrl;
}
  • Step 2: Implementation of createMessages method to autogenerate n Dummy messages in the queue.
/**
 * Generates n dummy messages in SQS 
 * @param nMessages - Number of Messages to generate in SQS
 * @returns {Promise<void>}
 */
async function createMessages(nMessages) {

    let queueUrl = await getQueueUrl();

    let messages = [];
    for (let i = 0; i < nMessages; i++) {
        messages[i] = 'Auto-generated message ' + i + ' for ORCDEV queue - ' + Date.now();
    }

    for (const message of messages) {
        console.log("Sending message: " + message);
        params = {
            MessageBody: message,
            QueueUrl: queueUrl
        };

        await sqs.sendMessage(params, function (err, data) {
            if (err) console.log(err, err.stack);
            else console.log(data);
        });
    }
    
}
  • Response from the queue:
Response arrived for get queue url:  { ResponseMetadata: { RequestId: 'f01341f7-4098-59f8-a80a-19e0ecf3cfb8' },
  QueueUrl:
   'https://sqs.eu-west-3.amazonaws.com/849659930690/orc-sqs-lab' }
Sending message: Auto-generated message 0 for ORCDEV queue - 1604168235021
Sending message: Auto-generated message 1 for ORCDEV queue - 1604168235021
Sending message: Auto-generated message 2 for ORCDEV queue - 1604168235021
Sending message: Auto-generated message 3 for ORCDEV queue - 1604168235021
Sending message: Auto-generated message 4 for ORCDEV queue - 1604168235021
{ ResponseMetadata: { RequestId: '388daf5c-b517-5105-a374-f74b197cbb7b' },
  MD5OfMessageBody: 'd626675cd85e79f6a0176db597ba5314',
  MessageId: 'e020b02b-72b9-4916-9020-76ffb9f31844' }
{ ResponseMetadata: { RequestId: '10b0ec91-112a-584c-996f-b486bd4124d1' },
  MD5OfMessageBody: '5e80b95602ddb56c48c71187fb6e69b6',
  MessageId: '47dbd952-f1f4-410b-ac12-5b663ba6ae8b' }
{ ResponseMetadata: { RequestId: 'd1735edf-b0f6-5f61-9fc5-06c81c9da5bd' },
  MD5OfMessageBody: 'd35ecabdebdf07667f37ddf1465d1ad5',
  MessageId: 'b644a71b-5463-40cc-9985-2ad0ead6cd02' }
{ ResponseMetadata: { RequestId: '47e7dcfe-33af-5dd6-b0e2-84e99fc4e14b' },
  MD5OfMessageBody: '34cffb9f6142be0fcbac6aba34c8ed8b',
  MessageId: '53ad0b4a-6614-494e-a3a0-2638fc55d947' }
{ ResponseMetadata: { RequestId: 'd23e2172-2f20-5e5f-9945-7965834b0680' },
  MD5OfMessageBody: 'e86f72022ebbc4d4b14c571061a93afb',
  MessageId: '437762d5-810a-4539-9d7a-27c6fbe64615' }

Task 3: Processing Messages in the Queue

  • Step 1: To understand this task and the lifecycle of the queue in this step we’re only retrieving 3 messages from the queue, then in the console we’re going to check that there is 3 messages not available in the queue, and during the VisibilityTimeout time (aka 60 seconds) it will remain unaccessible.
async function receiveMessages(){
    var params =
        {
            QueueUrl: await getQueueUrl(),
            WaitTimeSeconds: '20',
            VisibilityTimeout: '60'
        };

    sqs.receiveMessage(params, function (err, data) {
        if(err)
            console.log(err, err.stack);
        else{
            if((typeof data.messages !== undefined) &&  (data.Messages.length !== 0)){
                console.log('Received ' + data.messagesessages.length + ' Messages from SQS queue.');
                console.log('Message: ' + data.messagesessages[0].Body + ' Messages from SQS queue.');
            }else{
                console.log('SQS queue empty...');
            }
        }
    })
}
  • Response from the queue:
Response arrived for get queue url:  { ResponseMetadata: { RequestId: '3bcb850b-95cc-5da8-96eb-dd1c2dc2e853' },
  QueueUrl:
   'https://sqs.eu-west-3.amazonaws.com/849659930690/orc-sqs-lab' }
Received 1 Messages from SQS queue.
Message: Auto-generated message 0 for ORCDEV queue - 1604167330958 Messages from SQS queue.
amatore@amatore-PROX15-AMD:~/Projects/lab_aws-sqs-sns$ node SimpleQueueConsumer.js 
Response arrived for get queue url:  { ResponseMetadata: { RequestId: '23632667-af97-584a-a65a-7e701786c6bd' },
  QueueUrl:
   'https://sqs.eu-west-3.amazonaws.com/849659930690/orc-sqs-lab' }
Received 1 Messages from SQS queue.
Message: Auto-generated message 1 for ORCDEV queue - 1604168235021 Messages from SQS queue.
amatore@amatore-PROX15-AMD:~/Projects/lab_aws-sqs-sns$ node SimpleQueueConsumer.js 
Response arrived for get queue url:  { ResponseMetadata: { RequestId: 'e843a699-5045-50d7-bb70-fe8ee85614ed' },
  QueueUrl:
   'https://sqs.eu-west-3.amazonaws.com/849659930690/orc-sqs-lab' }
Received 1 Messages from SQS queue.
Message: Auto-generated message 3 for ORCDEV queue - 1604168235021 Messages from SQS queue.
  • Status of the queue during the next 60 seconds of visibility Timeout:

unsplash.com

  • Step 2: Implementation of processMessages to delete messages from the queue permanently.
async function processMessages(messagesSQS){
    for (const item of messagesSQS){
        var params =
            {
                QueueUrl: await getQueueUrl(),
                ReceiptHandle: item.ReceiptHandle
            };

        await sqs.deleteMessage(params, function (err, data) {
            if(err)
                console.log(err, err.stack);
            else{
                console.log('Deleted Message RequestId ' + JSON.stringify(data.ResponseMetadata.RequestId) + ' Messages from SQS queue.');
            }

        })
    }
}
async function receiveMessages(){
    var params =
        {
            QueueUrl: await getQueueUrl(),
            WaitTimeSeconds: '20',
            VisibilityTimeout: '60'
        };

    sqs.receiveMessage(params, function (err, data) {
        if(err)
            console.log(err, err.stack);
        else{
            if((typeof data.messages !== undefined) &&  (data.Messages.length !== 0)){
                console.log('Received ' + data.messages.length + ' Messages from SQS queue.');
                processMessages(data.messages);
            }else{
                console.log('SQS queue empty...');
            }
        }
    })
}
  • After executing processMessages method we receive this response.
Response arrived for get queue url:  { ResponseMetadata: { RequestId: '7a02329b-b2e9-5d9a-8bc4-6a8c0ef530c6' },
  QueueUrl:
   'https://sqs.eu-west-3.amazonaws.com/849659930690/orc-sqs-lab' }
Received 1 Messages from SQS queue.
Response arrived for get queue url:  { ResponseMetadata: { RequestId: 'bd019dc3-d6e0-5815-9e94-c5d46afdbf2e' },
  QueueUrl:
   'https://sqs.eu-west-3.amazonaws.com/849659930690/orc-sqs-lab' }
Deleted Message RequestId "cad93063-54f3-5fd6-be45-6874e8eebbc1" Messages from SQS queue.
  • Status of the queue after 1 message has been processed/deleted permanently:

unsplash.com

Loading...

written by Senior Orc Developer