My HNG Journey. Stage Three: Building a Robust Messaging System with FastAPI and RabbitMQ/Celery for Asynchronous Email Handling

Introduction

My only encounter with a message broker service until this point was when I read about Amazon MQ in preparation for my AWS Solutions Architect Associate examination. Imagine my utter confusion when I read the project for stage 3…


This content originally appeared on DEV Community and was authored by Ravencodess

Introduction

My only encounter with a message broker service until this point was when I read about Amazon MQ in preparation for my AWS Solutions Architect Associate examination. Imagine my utter confusion when I read the project for stage 3.
In this stage of my HNG journey, I was required to build a project that combines the power of FastAPI and Celery to create a robust messaging system. This system not only handles user requests efficiently but also leverages asynchronous task processing for seamless email sending. Throughout this article, I'll walk you through the key components and steps involved in developing this application, highlighting the integration of RabbitMQ as the message broker and utilizing SMTP for email delivery. By the end, you'll have a comprehensive understanding of how to build and deploy a similar system.

Here are the full requirements for this task:

  • Install RabbitMQ and Celery on your local machine.
  • Set up a Python application with the following functionalities: An endpoint that can accept two parameters: ?sendmail and ?talktome.
  • Endpoint Functionalities: ?sendmail: When this parameter is passed, the system should send an email using SMTP to the specified email address. (e.g., ?sendmail=mail@example.com). Use RabbitMQ/Celery to queue the email sending task. Ensure the email-sending script retrieves and executes tasks from the queue. ?talktome: When this parameter is passed, the system should log the current time to /var/log/messaging_system.log.
  • Nginx Configuration: Configure Nginx to serve your Python application. Ensure proper routing of requests to the application.
  • Endpoint Access: Use ngrok or a similar tool to expose your local application endpoint for external access.

Prerequisites

  • Python 3.8+
  • SMTP Email Account

Let's Get Started

Step 1

RabbitMQ and Celery Local Installation
For this project, I'll be using a Vagrant box on my laptop running Ubuntu 22.04 OS, but you can use any linux flavor you want.
We were instructed not to run RabbitMQ as a Docker container, so I ran the debian installation script from the official docs

#!/bin/sh

sudo apt-get install curl gnupg apt-transport-https -y

## Team RabbitMQs main signing key
curl -1sLf "https://keys.openpgp.org/vks/v1/by-fingerprint/0A9AF2115F4687BD29803A206B73A36E6026DFCA" | sudo gpg --dearmor | sudo tee /usr/share/keyrings/com.rabbitmq.team.gpg > /dev/null
## Community mirror of Cloudsmith: modern Erlang repository
curl -1sLf https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-erlang.E495BB49CC4BBE5B.key | sudo gpg --dearmor | sudo tee /usr/share/keyrings/rabbitmq.E495BB49CC4BBE5B.gpg > /dev/null
## Community mirror of Cloudsmith: RabbitMQ repository
curl -1sLf https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-server.9F4587F226208342.key | sudo gpg --dearmor | sudo tee /usr/share/keyrings/rabbitmq.9F4587F226208342.gpg > /dev/null

## Add apt repositories maintained by Team RabbitMQ
sudo tee /etc/apt/sources.list.d/rabbitmq.list <<EOF
## Provides modern Erlang/OTP releases
##
deb [arch=amd64 signed-by=/usr/share/keyrings/rabbitmq.E495BB49CC4BBE5B.gpg] https://ppa1.novemberain.com/rabbitmq/rabbitmq-erlang/deb/ubuntu jammy main
deb-src [signed-by=/usr/share/keyrings/rabbitmq.E495BB49CC4BBE5B.gpg] https://ppa1.novemberain.com/rabbitmq/rabbitmq-erlang/deb/ubuntu jammy main

# another mirror for redundancy
deb [arch=amd64 signed-by=/usr/share/keyrings/rabbitmq.E495BB49CC4BBE5B.gpg] https://ppa2.novemberain.com/rabbitmq/rabbitmq-erlang/deb/ubuntu jammy main
deb-src [signed-by=/usr/share/keyrings/rabbitmq.E495BB49CC4BBE5B.gpg] https://ppa2.novemberain.com/rabbitmq/rabbitmq-erlang/deb/ubuntu jammy main

## Provides RabbitMQ
##
deb [arch=amd64 signed-by=/usr/share/keyrings/rabbitmq.9F4587F226208342.gpg] https://ppa1.novemberain.com/rabbitmq/rabbitmq-server/deb/ubuntu jammy main
deb-src [signed-by=/usr/share/keyrings/rabbitmq.9F4587F226208342.gpg] https://ppa1.novemberain.com/rabbitmq/rabbitmq-server/deb/ubuntu jammy main

# another mirror for redundancy
deb [arch=amd64 signed-by=/usr/share/keyrings/rabbitmq.9F4587F226208342.gpg] https://ppa2.novemberain.com/rabbitmq/rabbitmq-server/deb/ubuntu jammy main
deb-src [signed-by=/usr/share/keyrings/rabbitmq.9F4587F226208342.gpg] https://ppa2.novemberain.com/rabbitmq/rabbitmq-server/deb/ubuntu jammy main
EOF

## Update package indices
sudo apt-get update -y

## Install Erlang packages
sudo apt-get install -y erlang-base \
                        erlang-asn1 erlang-crypto erlang-eldap erlang-ftp erlang-inets \
                        erlang-mnesia erlang-os-mon erlang-parsetools erlang-public-key \
                        erlang-runtime-tools erlang-snmp erlang-ssl \
                        erlang-syntax-tools erlang-tftp erlang-tools erlang-xmerl

## Install rabbitmq-server and its dependencies
sudo apt-get install rabbitmq-server -y --fix-missing

To confirm if RabbitMQ is installed correctly, run this command

sudo rabbitmqctl status

Rabbitmq installation

Next, we need to install Celery. Before that, let's create a Python virtual environment (venv) to better handle pip dependencies across our local machine.
Ensure the venv package is installed then create a venv

sudo apt install python3.10-venv
python3 -m venv env
# start up the virtual environment
source env/bin/activate

We can now observe an (env) block before our command prompt

env creation

To install celery, run

pip install celery
# confirm celery installation
celery --version

Step 2

Python Application Setup
Before we start writing our python application, let's understand celery and RabbitMQ and the role they will be playing in our application.

RabbitMQ is a message broker software that facilitates communication between different applications or components within a system. It acts as an intermediary that can receive messages from producers (senders) and deliver them to consumers (receivers). In the context of this application, Celery acts as both the producer and the consumer of the messages sent to the message broker.
Celery is an open-source distributed task queue system for Python, designed to handle asynchronous and scheduled tasks in a reliable and scalable manner. It enables you to run tasks concurrently. We send tasks to the queue as a producer using the .delay() method and then a celery worker consumes and executes the task in a celery worker process.

Let's use a short demo to see how Celery and RabbitMQ work together.

Create two files, main.py and tasks.py
tasks.py

from celery import Celery
from time import sleep

celery = Celery(
    "tasks",
    broker="pyamqp://guest:guest@localhost:5672//"
)

@celery.task
def reverse(text):
    sleep(5)
    print(text[::-1])


def echo(text):
    print(text)

reverse.delay('first function')

echo('second function')

This script initializes a celery object that uses RabbitMQ as the broker, the string "pyamqp://guest:guest@localhost:5672//"
connects celery to RabbitMQ running locally on port 5672 using the default login username and password called guest.The script then calls two functions, reverse() and echo() but sent the reverse function to celery using the .delay() method . The first function takes in a string, sleeps for 5 seconds and then return the reversed string. The second function just repeats the string passed into it.

Run this script using

python3 tasks.py

Immediately the string passed into the second function is printed to the terminal and the first function doesn't return anything to the terminal

Second function

To see the outcome of the first function, open a new terminal, P.S. Ensure you enter the same virtual environment as the previous terminal.

celery -A tasks worker --loglevel=info

A celery worker starts up, connects to RabbitMQ, waits for 5 seconds then prints out the reversed string passed into the first function

first function

Now we've observed how celery enables asynchronous programming by utilizing task queues, let's proceed with the python application. We would later see how to make use of a backend with celery so as to get the output of the consumed task in the celery worker process and use it in our application.

We need to first install the necessary dependencies for this application like FastAPI for building APIs and uvicorn for running our backend server.

 pip install fastapi uvicorn

Next we need to make sure our log file exists and the correct permissions are set in order not to encounter any permission errors. My user is vagrant, so I'll give vagrant ownership of the file.

sudo touch /var/log/messaging_system.log
sudo chown vagrant:vagrant /var/log/messaging_system.log

create a .env file that contains your email address and smtp_password. If you are using gmail, you can get your password using app_passwords

.env

email_sender=your_email
email_password=your_password

Let's write our email sending application.

tasks.py

from celery import Celery
from dotenv import load_dotenv
import os
from email.message import EmailMessage
import smtplib

# Load environment variables from a .env file
load_dotenv()

# Initialize a Celery instance with a broker and backend
celery = Celery(
    "tasks",
    broker="pyamqp://guest:guest@localhost:5672//",
    backend="rpc://"
)

# Define a Celery task to send an email
@celery.task
def send_mail(email):
    # Retrieve email sender and password from environment variables
    email_sender = os.getenv("email_sender")
    email_password = os.getenv("email_password")
    # Set the recipient email address
    email_receiver = email

    # Define the email subject and body
    subject = "Hello From HNG"
    body = "This is a test email"

    # Create an EmailMessage object and set its content
    em = EmailMessage()
    em["From"] = email_sender
    em["To"] = email_receiver
    em["Subject"] = subject
    em.set_content(body)

    try:
        # Establish a secure connection to the SMTP server
        with smtplib.SMTP_SSL('smtp.gmail.com', 465) as server:
            # Log in to the SMTP server using the senders credentials
            server.login(email_sender, email_password)
            # Send the email
            server.sendmail(email_sender, email_receiver, em.as_string())
        # Return a success message if the email is sent
        return f"Email sent to {email_receiver}"
    except Exception as e:
        # Return an error message if the email fails to send
        return f"Failed to send email to {email_receiver}: {e}"

This code:

  • Uses the backend="rpc://" in Celery configuration which refers to the result backend. This setting determines where Celery stores the results of tasks after they are executed. The rpc:// backend in Celery uses RabbitMQ to store task results. This is implemented using RabbitMQ's Remote Procedure Call (RPC) mechanism.
  • Uses celery tasks to create a send_mail asynchronous function that composes a test email which utilizes pythons' smtplib library to send secure emails to the recipient.
  • The function takes in a recipient email as a parameter which would passed into it from main.py

main.py

from fastapi import FastAPI
from typing import Optional
from datetime import datetime
from tasks import send_mail
from fastapi.responses import PlainTextResponse

# Initialize the FastAPI application
app = FastAPI()

# Function to log events to a file
def logger(event):
    # Open the log file in append mode
    with open("/var/log/messaging_system.log", "a") as log_file:
        # Write the event with a timestamp to the log file
        log_file.write(f"{datetime.now()}: {event}\n")

# Define a GET endpoint at the root URL
@app.get("/")
def test(sendmail: Optional[str] = None, talktome: Optional[str] = None):
    # Initialize the response dictionary
    response = {}

    # If 'sendmail' query parameter is provided
    if sendmail is not None:
        # If 'sendmail' is an empty string
        if sendmail == "":
            response["sendmail"] = "no mail provided"
        else:
            # Log the sending mail action
            logger(f"Sending mail to {sendmail}  .....")
            # Call the asynchronous send_mail task
            result = send_mail.delay(sendmail)
            # Log the result of the send_mail task
            logger(f"{result.get()}")
        # Indicate that the action has been logged
        response["sendmail"] = "logged action to /logs"

    # If 'talktome' query parameter is provided
    if talktome is not None:
        # Log the talktome message
        logger(talktome)
        # Indicate that the action has been logged
        response["talktome"] = "logged action to /logs"

    # If neither 'sendmail' nor 'talktome' query parameters are provided
    if sendmail is None and talktome is None:
        response["Default"] = "no parameters provided"

    # Return the response dictionary
    return response

# Define a GET endpoint to retrieve logs
@app.get("/logs")
def logs():
    # Open the log file in read mode
    with open("/var/log/messaging_system.log", "r") as log_file:
        # Read the entire content of the log file
        log_content = log_file.read()
        # Return the log content as plain text response
        return PlainTextResponse(content=log_content)

This code:

  • Imports the necessary dependencies needed for the script
  • Creates a logging function that prints the current time and event to the log file we created previously
  • Initializes FastAPI
  • Creates two endpoints, one for the root / and one for the logs /logs. The / endpoint uses the send_mail function imported from tasks.py and calls it asynchronously using .delay(), but because we are using a backend in tasks.py we can retrieve the results of the asynchronous function using the .get() method. The ?sendmail query parameter sends a mail to the provided email address. If ?talktome query parameter is passed, It prints the current time to the log file.

Step 3

Creating systemd services for celery and uvicron

Our application is ready and we can start celery using

celery -A tasks worker --loglevel=info

Start uvicorn for our backend application server using

uvicorn main:app --reload

P.S if you wish to access the application from the browser add the --host 0.0.0.0 to the uvicorn command

But this method will require us to open and maintain multiple terminal sessions. Instead let's run the applications as systemd services in the background.
Open /etc/systemd/system/uvicorn.service and /etc/systemd/system/celery.service with sudo permissions.

/etc/systemd/system/uvicorn.service

[Unit]
Description=Uvicorn instance to serve FastAPI application
After=network.target

[Service]
User=vagrant
Group=www-data
WorkingDirectory=/home/vagrant/message-broker-stage-3/app
ExecStart=/home/vagrant/.local/bin/uvicorn main:app --reload --host 0.0.0.0 --port 8000

[Install]
WantedBy=multi-user.target

/etc/systemd/system/celery.service

[Unit]
Description=Celery Service
After=network.target

[Service]
User=vagrant
Group=www-data
WorkingDirectory=/home/vagrant/messaging-stage-3/app
ExecStart=/home/vagrant/.local/bin/celery -A tasks worker --loglevel=info

[Install]
WantedBy=multi-user.target

Ensure you input the correct user, file path and location for the bin directory of your uvicorn and celery binaries

you can then start and enable the two services in the background using

sudo systemctl start uvicorn.service
sudo systemctl enable uvicorn.service

sudo systemctl start celery.service
sudo systemctl enable celery.service

Ensure they are running properly by using

sudo systemctl status uvicorn.service
sudo systemctl status celery.service

Uvicorn service

Celery service

Step 4

Configure Nginx reverse proxy

In this step, we will use Nginx to route localhost to our application running on port 8000.

If you don't have nginx installed. You can install using

sudo apt update
sudo apt install nginx nginx-core

Open the


 and replace the default `http{}` block

**nginx.conf**



```javascript
http {
    # Basic HTTP settings
    server_tokens off;
    sendfile on;
    tcp_nopush on;
    tcp_nodelay on;
    keepalive_timeout 65;
    types_hash_max_size 2048;


    server {
        listen 80;
        server_name localhost;

        location / {
            proxy_pass http://localhost:8000;
            proxy_set_header Host $host;
            proxy_set_header X-Real-IP $remote_addr;
            proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
            proxy_set_header X-Forwarded-Proto $scheme;
        }

    }
}

Check if the configuration is correct and restart Nginx using

sudo nginx -t
sudo systemctl restart nginx.service

Running curl localhost?talktome should respond with a JSON message informing you that the action has been logged to /logs.

?talktome

Step 5

Use Ngrok to expose api
The final step requires us to use ngrok to expose our API securely to the public.
Set up an ngrok free account and obtain a auth token by following the official ngrok docs

Finally we can expose our API running on localhost:80 using

ngrok http 80

Ngrok

Test the email functionality by adding ?sendmail=your_email

Mail test

Mail Received

You can also test the ?talktome query parameter access the logs using /logs path.

Conclusion

We have successfully developed a Python application that utilizes RabbitMQ and Celery as a message broker and task queue executor. This application deploys a public API endpoint using ngrok, providing functionality for sending emails and printing logs.

By integrating these technologies, we have created a robust and scalable messaging system. This project not only demonstrates the power of asynchronous task processing with Celery and RabbitMQ but also highlights the simplicity and efficiency of using FastAPI for building web applications. Deploying the API with ngrok allows for quick public exposure and testing, making it an excellent tool for development and debugging.

In conclusion, this project serves as a solid foundation for building more complex systems that require reliable message handling and task execution. Whether you are looking to expand its capabilities or use it as a learning experience, this messaging system showcases the potential of modern Python web development.

The tasks are getting longer and harder as the stages go by. I can't imagine what is in store in stage 4, But one thing is certain, I'll give it the best I've got.
If you made it this far, Thank you, Thank you, Thank you. ♥
Until next time.
Happy Programming 🚀


This content originally appeared on DEV Community and was authored by Ravencodess


Print Share Comment Cite Upload Translate Updates
APA

Ravencodess | Sciencx (2024-07-15T00:34:33+00:00) My HNG Journey. Stage Three: Building a Robust Messaging System with FastAPI and RabbitMQ/Celery for Asynchronous Email Handling. Retrieved from https://www.scien.cx/2024/07/15/my-hng-journey-stage-three-building-a-robust-messaging-system-with-fastapi-and-rabbitmq-celery-for-asynchronous-email-handling/

MLA
" » My HNG Journey. Stage Three: Building a Robust Messaging System with FastAPI and RabbitMQ/Celery for Asynchronous Email Handling." Ravencodess | Sciencx - Monday July 15, 2024, https://www.scien.cx/2024/07/15/my-hng-journey-stage-three-building-a-robust-messaging-system-with-fastapi-and-rabbitmq-celery-for-asynchronous-email-handling/
HARVARD
Ravencodess | Sciencx Monday July 15, 2024 » My HNG Journey. Stage Three: Building a Robust Messaging System with FastAPI and RabbitMQ/Celery for Asynchronous Email Handling., viewed ,<https://www.scien.cx/2024/07/15/my-hng-journey-stage-three-building-a-robust-messaging-system-with-fastapi-and-rabbitmq-celery-for-asynchronous-email-handling/>
VANCOUVER
Ravencodess | Sciencx - » My HNG Journey. Stage Three: Building a Robust Messaging System with FastAPI and RabbitMQ/Celery for Asynchronous Email Handling. [Internet]. [Accessed ]. Available from: https://www.scien.cx/2024/07/15/my-hng-journey-stage-three-building-a-robust-messaging-system-with-fastapi-and-rabbitmq-celery-for-asynchronous-email-handling/
CHICAGO
" » My HNG Journey. Stage Three: Building a Robust Messaging System with FastAPI and RabbitMQ/Celery for Asynchronous Email Handling." Ravencodess | Sciencx - Accessed . https://www.scien.cx/2024/07/15/my-hng-journey-stage-three-building-a-robust-messaging-system-with-fastapi-and-rabbitmq-celery-for-asynchronous-email-handling/
IEEE
" » My HNG Journey. Stage Three: Building a Robust Messaging System with FastAPI and RabbitMQ/Celery for Asynchronous Email Handling." Ravencodess | Sciencx [Online]. Available: https://www.scien.cx/2024/07/15/my-hng-journey-stage-three-building-a-robust-messaging-system-with-fastapi-and-rabbitmq-celery-for-asynchronous-email-handling/. [Accessed: ]
rf:citation
» My HNG Journey. Stage Three: Building a Robust Messaging System with FastAPI and RabbitMQ/Celery for Asynchronous Email Handling | Ravencodess | Sciencx | https://www.scien.cx/2024/07/15/my-hng-journey-stage-three-building-a-robust-messaging-system-with-fastapi-and-rabbitmq-celery-for-asynchronous-email-handling/ |

Please log in to upload a file.




There are no updates yet.
Click the Upload button above to add an update.

You must be logged in to translate posts. Please log in or register.