Mock a SQS queue with moto

Below is simple example code to mock the aws sqs api. This can be useful for development so you dont have to actually manage a real SQS queue / connectivity / IAM permissions until you’re ready.

app.py

A simple function to write a message …


This content originally appeared on DEV Community and was authored by drewmullen

Below is simple example code to mock the aws sqs api. This can be useful for development so you dont have to actually manage a real SQS queue / connectivity / IAM permissions until you're ready.

app.py

A simple function to write a message to a preexisting queue

QUEUE_URL = os.environ.get('QUEUE_URL', '<default value>')
def write_message(data):
    sqs = boto3.client('sqs', region_name = 'us-east-1')
    r = sqs.send_message(
        MessageBody = json.dumps(data),
        QueueUrl = QUEUE_URL
    )

conftest.py

Pytest fixtures to mock up the aws sqs API. aws_credentials() also ensures that your pytest functions will not actually write to aws resources.

REGION='us-east-'
@pytest.fixture(scope='function')
def aws_credentials():
    """Mocked AWS Credentials for moto."""
    os.environ['AWS_ACCESS_KEY_ID'] = 'testing'
    os.environ['AWS_SECRET_ACCESS_KEY'] = 'testing'
    os.environ['AWS_SECURITY_TOKEN'] = 'testing'
    os.environ['AWS_SESSION_TOKEN'] = 'testing'


@pytest.fixture(scope='function')
def sqs_client(aws_credentials):
    # setup
    with mock_sqs():
        yield boto3.client('sqs', region_name=REGION)
    # teardown

test_sqs.py

An example test function. Create a queue using the mock client from conftest.py (notice sqs_client parameter matches the conftest function name sqs_client), invoke your python module function app.write_message(). Validate the returned message matches what you sent

def test_write_message(sqs_client):
    queue = sqs_client.create_queue(QueueName='test-msg-sender')
    queue_url = queue['QueueUrl']
    # override function global URL variable
    app.QUEUE_URL = queue_url
    expected_msg = str({'msg':f'this is a test'})
    app.write_message(expected_msg)
    sqs_messages = sqs_client.receive_message(QueueUrl=queue_url)

    assert json.loads(sqs_messages['Messages'][0]['Body']) == expected_msg

Extra

In case you wanted to see my file structure:

├── README.md
├── app.py
├── requirements.txt
├── requirements_dev.txt
└── tests
    ├── __init__.py
    ├── conftest.py
    └── unit
        ├── __init__.py
        └── test_sqs.py

Thank you

kudos to Arunkumar Muralidharan who got me started


This content originally appeared on DEV Community and was authored by drewmullen


Print Share Comment Cite Upload Translate Updates
APA

drewmullen | Sciencx (2021-03-10T14:39:47+00:00) Mock a SQS queue with moto. Retrieved from https://www.scien.cx/2021/03/10/mock-a-sqs-queue-with-moto/

MLA
" » Mock a SQS queue with moto." drewmullen | Sciencx - Wednesday March 10, 2021, https://www.scien.cx/2021/03/10/mock-a-sqs-queue-with-moto/
HARVARD
drewmullen | Sciencx Wednesday March 10, 2021 » Mock a SQS queue with moto., viewed ,<https://www.scien.cx/2021/03/10/mock-a-sqs-queue-with-moto/>
VANCOUVER
drewmullen | Sciencx - » Mock a SQS queue with moto. [Internet]. [Accessed ]. Available from: https://www.scien.cx/2021/03/10/mock-a-sqs-queue-with-moto/
CHICAGO
" » Mock a SQS queue with moto." drewmullen | Sciencx - Accessed . https://www.scien.cx/2021/03/10/mock-a-sqs-queue-with-moto/
IEEE
" » Mock a SQS queue with moto." drewmullen | Sciencx [Online]. Available: https://www.scien.cx/2021/03/10/mock-a-sqs-queue-with-moto/. [Accessed: ]
rf:citation
» Mock a SQS queue with moto | drewmullen | Sciencx | https://www.scien.cx/2021/03/10/mock-a-sqs-queue-with-moto/ |

Please log in to upload a file.




There are no updates yet.
Click the Upload button above to add an update.

You must be logged in to translate posts. Please log in or register.