How to Integrate Apache Pulsar with FastAPI

Photo by Varun Yadav on Unsplash

How to Integrate Apache Pulsar with FastAPI

This is an article about how to set up Apache Pulsar, produce messages and consume them using FastAPI.

This article will not show all the features of Apache Pulsar, or FastAPI.

Apache Pulsar and FastAPI are two powerful technologies that when combined can help you build extremely fast and scalable real-time APIs. Apache Pulsar is an open-source distributed streaming platform that acts as a broker between producers and consumers of data streams. FastAPI is a modern, fast (high-performance), web framework for building APIs with Python. In this article, we will see how to integrate Apache Pulsar with FastAPI to leverage the event streaming capability of Pulsar to power your FastAPI applications.

Requirements

  • Java Runtime Environment (64-bit)

  • Python installed

  • Pip installed

  • WSL installed for Windows users

Apache Pulsar

Pulsar is a multi-tenant, high-performance solution for server-to-server messaging. Originally developed by Yahoo, Pulsar is under the stewardship of the Apache Software Foundation.

Key features of Pulsar are listed below:

  • Native support for multiple clusters in a Pulsar instance, with seamless geo-replication of messages across clusters.

  • Very low publish and end-to-end latency.

  • Seamless scalability to over a million topics.

  • A simple client API with bindings for Java, Go, Python and C++.

  • Multiple subscription types (exclusive, shared, and failover) for topics.

  • Guaranteed message delivery with persistent message storage provided by Apache BookKeeper. A serverless lightweight computing framework Pulsar Functions offers the capability for stream-native data processing.

  • A serverless connector framework Pulsar IO, which is built on Pulsar Functions, makes it easier to move data in and out of Apache Pulsar.

  • Tiered Storage offloads data from hot/warm storage to cold/long-term storage (such as S3 and GCS) when the data is aging out.

Installation

To install Apache Pulsar, we go to this page, download the binary file, and unzip it.

Or execute the following commands:

wget https://archive.apache.org/dist/pulsar/pulsar-3.0.0/apache-pulsar-3.0.0-bin.tar.gz
tar xvfz apache-pulsar-3.0.0-bin.tar.gz
cd apache-pulsar-3.0.0
ls -1F

Windows users should execute the commands above using WSL.

To start a Pulsar cluster, we run this command to start a standalone Pulsar cluster:

bin/pulsar standalone

After we started our Pulsar cluster, we create two files, a producer and a consumer.

As the Pulsar terminology in the documentation says, a producer is a process that publishes messages to a Pulsar topic. A process that establishes a subscription to a Pulsar topic and processes messages published to that topic by producers.

A topic is a named channel used to pass messages published by producers to consumers who process those messages.

producer.py

import pulsar

client = pulsar.Client('pulsar://localhost:6650')

producer = client.create_producer('my-topic')

for i in range(10):
    producer.send(('Hello-%d' % i).encode('utf-8'))

client.close()

In this code, we import the Apache Pulsar client library. Then, we connect to a Pulsar broker running in the localhost:6650. We create a producer, with 'my-topic" as the topic. Then, we create a For loop to send 10 messages to the topic we define. Finally, we close the connection.

consumer.py

import pulsar

client = pulsar.Client('pulsar://localhost:6650')

consumer = client.subscribe('my-topic', 'my-subscription')

while True:
    msg = consumer.receive()
    try:
        print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))
        # Acknowledge successful processing of the message
        consumer.acknowledge(msg)
    except Exception:
        # Message failed to be processed
        consumer.negative_acknowledge(msg)

client.close()

In the consumer, we import the Apache Pulsar client library and create a connection to the Pulsar broker running locally on port 6650. We subscribe to the 'my-topic' topic with the subscription name 'my-subscription'.

We create an infinite loop that will continuously receive and process messages. If successful, it acknowledges the message. If an exception occurs, it negatively acknowledges the message.

Then, we close the Pulsar client and connection.

We run in different terminals the producer.py file and the consumer.py file.

This is the producer.py file terminal output:

And this is the consumer.py file terminal output:

Creating a FastAPI server

Now, we are going to create a server that sends log information to the consumer.

Every time a request is made to the server, the server will send the logs to the "logs" topic.

main.py

from fastapi import FastAPI, Request
from datetime import datetime
import pulsar 

app = FastAPI()

@app.get("/")
async def root(request: Request):

    ip_address = request.client.host
    request_url = request.url._url
    request_port = request.url.port
    request_path = request.url.path
    request_method = request.method
    request_time = datetime.now()
    browser_type = request.headers["User-Agent"]
    operating_system = request.headers["Accept"]
    message = {
        "ip_address": ip_address,
        "request_url": request_url,
        "request_port": request_port,
        "request_path": request_path,
        "request_method": request_method,
        "request_time": request_time,
        "browser_type": browser_type,
        "operating_system": operating_system,
    }

    client = pulsar.Client('pulsar://localhost:6650')
    producer = client.create_producer('my-topic')

    producer.send(str(message).encode("utf-8"))
    client.close()
    return {"message": message}

if __name__ == "__main__":
    app.run(debug=True)

In the main.py file, we import the FastAPI, Request, datetime and pulsar libraries.

We create a FastAPI app and define a root endpoint / that receives a Request object. And extract various information from the request like IP address, URL, method, headers etc. Then, we create a message dictionary with the extracted request information.

We create a connection to a Pulsar broker, create a producer and send the message dictionary to the 'my-topic' topic.

And finally, this code returns the message dictionary in the response.

We convert the Python dictionary to a string to avoid this message error:

This happens because the content must be a bytes object. We convert the dictionary to a string to be able to encode it.

Here is the documentation of the producer in the Python client.

If we navigate to locahost:8000 with a web browser, we should see this response.

We could use an HTTP client, and we should receive the same response.

We run the consumer.py file in another terminal. And should receive the following response:

Conclusion

Integrating Apache Pulsar with FastAPI enables you to build high-performance real-time APIs. Pulsar handles the complexities of event streaming and event storage while FastAPI provides a simple yet powerful framework to build the APIs. This combination allows you to decouple the event producer from the API consumer.

Thank you for taking the time to read this article.

If you have any recommendations about other packages, architectures, how to improve my code, my English, or anything; please leave a comment or contact me through Twitter, or LinkedIn.

The source code is here.

Resources

Run a standalone Pulsar cluster locally

Set up Python client

Pulsar Terminology

Use Python client

Python client documentation

Produce and consume messages