How to Send Logs to a Telegram Bot Using RabbitMQ and FastAPI

Photo by sydney Rae on Unsplash

How to Send Logs to a Telegram Bot Using RabbitMQ and FastAPI

Logs are a valuable source of information for monitoring website traffic and troubleshooting problems. However, managing logs can be a daunting task, especially if you have a large website with a lot of traffic.

One way to simplify log management is to use a message broker like RabbitMQ. RabbitMQ is a lightweight, reliable messaging broker that can be used to decouple applications and distribute messages between different systems.

In this article, we will build a visitor tracker that sends logs to a Telegram bot using RabbitMQ and FastAPI. Also, the visitor tracker uses a Postgres database to store the logs, but you can skip that part if you want and build just API with its middleware. There will be two separate projects, the visitor tracker and the telegram bot. And we will connect them using RabbitMQ.

By the end of this article, you will have a working system that can send logs to a Telegram bot in real-time. This will allow you to monitor your website traffic and troubleshoot problems more easily.

Event-Driven Architecture

As this article in Amazon explains that event-driven architecture uses events to trigger and communicate between decoupled services and is common in modern applications built with microservices. An event is a state change, or an update, like an item being placed in a shopping cart on an e-commerce website. Events can either carry the state (the item purchased, its price, and a delivery address) or events can be identifiers (a notification that an order was shipped).

Event-driven architectures have three key components: event producers, event routers, and event consumers. A producer publishes an event to the router, which filters and pushes the events to consumers. Producer services and consumer services are decoupled, which allows them to be scaled, updated, and deployed independently.

Also, this article in RedHat says that event-driven architecture may be based on either a pub/sub model or an event stream model.

Pub/sub model

This is a messaging infrastructure based on subscriptions to an event stream. With this model, after an event occurs, or is published, it is sent to subscribers that need to be informed.

Event streaming model

With an event streaming model, events are written to a log. Event consumers don’t subscribe to an event stream. Instead, they can read from any part of the stream and can join the stream at any time.

In the case of this example, we are going to build a microservice with FastAPI and Postgres, that tracks the requests of the API, then, every log will be sent to a RabbitMQ queue and finally, to a Telegram bot. The FastaAPI server will be the producer and the Telegram bot the consumer.

Requirements

  • Python installed

  • Basic Python knowledge

  • Pip installed

  • Postgres installed

  • Ngrok installed or any tunneling software.

  • RabbitMQ installed (Download from here)

Building the Visitor Tracker

First, we create a directory for this application.

mkdir visitor_tracker
cd
#Windows users
py -m venv venv
cd venv/Scripts
./activate

#Linux
python3 -m venv venv
source venv/bin/activate

We install all the dependencies we need.

pip install fastapi psycopg2-binary python-dotenv

Inside the visitor_tracker directory, we create a new file, main.py.

main.py

from fastapi import FastAPI, Request
from datetime import datetime

app = FastAPI()

@app.get("/")
def index(request: Request):
    """Track website visitor."""
    ip_address = request.client.host
    request_url = request.url._url
    request_port = request.url.port
    request_path = request.url.path
    request_method = request.method
    request_time = datetime.now()
    browser_type = request.headers["User-Agent"]
    operating_system = request.headers["Accept"]

    return {
        "ip_address": ip_address,
        "request_url": request_url,
        "request_port": request_port,
        "request_path": request_path,
        "request_method": request_method,
        "request_time": request_time,
        "browser_type": browser_type,
        "operating_system": operating_system,
    }



if __name__ == "__main__":
    app.run(debug=True)

In this file, we define a FastAPI application that tracks website visitors. The index() route is the only route defined in the application. This route takes a Request object as an input and returns a dictionary of information about the website visitor. The information that is tracked includes the visitor's IP address, request URL, request port, request path, request method, request time, browser type, and operating system.

We navigate to "localhost:8000", and we should see this response in our window.

But we got that data from the endpoint, we don't want to write this code for every endpoint. So, we are going to build a middleware that captures that data from every request and for every endpoint.

We create a new file, tracker.py.

from fastapi import Request
from datetime import datetime


def visitor_tracker(request: Request):
    ip_address = request.client.host
    request_url = request.url._url
    request_port = request.url.port
    request_path = request.url.path
    request_method = request.method
    request_time = datetime.now()
    browser_type = request.headers["User-Agent"]
    operating_system = request.headers["Accept"]

    return {
        "ip_address": ip_address,
        "request_url": request_url,
        "request_port": request_port,
        "request_path": request_path,
        "request_method": request_method,
        "request_time": request_time,
        "browser_type": browser_type,
        "operating_system": operating_system,
    }

The visitor_tracker() function is used to track a single website visitor. The function takes a Request object as an input and returns a dictionary of information about the website visitor. The information that is tracked includes the visitor's IP address, request URL, request port, request path, request method, request time, browser type, and operating system.

Then, we go to main.py file and create a middleware.

from fastapi import FastAPI, Request
from datetime import datetime
from tracker import visitor_tracker

app = FastAPI()


@app.middleware("tracker")
async def tracker(request: Request, call_next):
    tracker = visitor_tracker(request)
    response = await call_next(request)

    return response


@app.get("/")
def index():
    return "Hello, world"

@app.get("/json")
def some_func():
    return {
        "some_json": "Some Json"
    }



if __name__ == "__main__":
    app.run(debug=True)

The middleware function is called for every request that is made to the application. The middleware function first calls the visitor_tracker() function to track the website visitor. Then, the middleware function calls the call_next() function to process the request.

This is the data we want the API stores in the database and sends to the broker(RabbitMQ).

Now, we have to create a database to store it.

We have to create a database.

On our command line, we run the following command:

CREATE DATABASE logs_db;

init_db.py

import os
import psycopg2
from dotenv import load_dotenv

load_dotenv()
USER = os.getenv('USER')
PASSWORD = os.getenv('PASSWORD')

def get_db_connection():
    conn = psycopg2.connect(
        dbname = "logs_db",
        user = "postgres",
        password = PASSWORD
    )
    return conn

conn = get_db_connection()
cur = conn.cursor()

cur.execute('DROP TABLE IF EXISTS logs;')
cur.execute('CREATE TABLE logs (id serial PRIMARY KEY,'
                                 'ip_address varchar (150) NOT NULL,'
                                 'request_url varchar (50) NOT NULL,'
                                 'request_port integer NOT NULL,'
                                 'request_path varchar (50) NOT NULL,'
                                 'request_method varchar (50) NOT NULL,'
                                 'browser_type varchar (150) NOT NULL,'
                                 'operating_system varchar (150) NOT NULL,'
                                 'request_time timestamp (50) NOT NULL,'

                                 'date_added date DEFAULT CURRENT_TIMESTAMP);'
                                 )

cur.execute('INSERT INTO logs (ip_address,'
                                 'request_url,'
                                 'request_port,'
                                 'request_path,'
                                 'request_method,'
                                 'browser_type,'
                                 'operating_system,'
                                 'request_time)'
                                 'VALUES (%s, %s, %s, %s, %s, %s, %s, %s)',
            ('127.0.0.1',
             'http://localhost:8000',
             8000,
             "/",
             "GET",
             "Chrome",
             "Windows 11",
             "2023-06-25T16:03:24.722256"
             )
            )




conn.commit()

cur.close()
conn.close()

Here we setting up a database connection to store log data. First, we load the .env file using dotenv to get the database username and password variables. Then, we define a get_db_connection() function that establishes a connection to a PostgreSQL database named logs_db. Then, the code calls that function to get a database connection and cursor.

In this file, also the code drops the logs table if it exists and recreates it with the given schema - with columns to store IP address, request URL, port, path, method, browser, OS, time etc. It inserts sample log data into the table with its values. It commits the changes to the database and closes the cursor and connection.

helpers.py

import collections


def to_dict(psycopg_tuple:tuple):
    tracker = collections.OrderedDict()
    tracker['id'] = psycopg_tuple[0]

    tracker["ip_address"] = psycopg_tuple[1]
    tracker["request_url"] = psycopg_tuple[2]
    tracker["request_port"] = psycopg_tuple[3]

    tracker["request_path"] = psycopg_tuple[4]
    tracker["request_method"] = psycopg_tuple[5]
    tracker["browser_type"] = psycopg_tuple[6]
    tracker["operating_system"] = psycopg_tuple[7]
    tracker["request_time"] = psycopg_tuple[8].strftime("%d-%m-%Y, %H:%M:%S")
    return tracker


def list_dict(rows:list):

    row_list = []
    for row in rows:
        book_dict = to_dict(row)
        row_list.append(book_dict)

    return row_list

This file has two functions: to_dict() and list_dict(). The to_dict() function converts a PostgreSQL tuple to a dictionary. The list_dict() function converts a list of PostgreSQL tuples to a list of dictionaries.

The to_dict() function takes a PostgreSQL tuple as input and returns a dictionary. The dictionary contains the values of the tuple in the same order as the tuple. The list_dict() function takes a list of PostgreSQL tuples as input and returns a list of dictionaries. The dictionaries are created using the to_dict() function.

controllers.py

from init_db import get_db_connection
from helpers import to_dict,list_dict
import json

def all_logs():
    conn = get_db_connection()
    cur = conn.cursor()
    cur.execute('SELECT * FROM logs;')
    logs = list_dict(cur.fetchall())
    cur.close()
    conn.close()


    return logs

def new_log(ip_address: str,
         request_url: str,
         request_port: int,
         request_path: str,
         request_method: str,
         browser_type: str,
         operating_system: str,
         request_time: str,):

    conn = get_db_connection()
    cur = conn.cursor()
    cur.execute('INSERT INTO logs (ip_address, request_url, request_port, request_path, request_method, browser_type, operating_system, request_time)'
                    'VALUES (%s, %s, %s, %s, %s, %s, %s, %s) RETURNING *;',(ip_address,
                                                    request_url,
                                                    request_port,
                                                    request_path,
                                                    request_method,
                                                    browser_type,
                                                    operating_system,
                                                    request_time,))

    log = cur.fetchone()[:]
    log_dict = to_dict(log)
    conn.commit()
    cur.close()
    conn.close()

    return json.dumps(log_dict)

The all_logs() function gets all logs from the database and returns a list of dictionaries. Each dictionary contains information about a single log.

The new_log() function inserts a new log into the database.

Building the telegram bot

BotFather

To create a Telegram bot, we need a token.

Go to the Telegram App and enter @BotFather in the search bar.

Screenshot-2022-12-16-092357

Select /start to activate the bot. Then we select /newbot and follow the instructions to create a bot.

Screenshot-2022-12-16-093337

We have to choose a name that the users will see and a username. Then the bot will send us a message with our token in it.

Now, we create a new project, for the telegram bot.

mkdir telegram_bot
cd telegram_bot

Create a virtual environment for the telegram bot project.

Install the pyTelegramBotAPI library using pip:

pip install pyTelegramBotAPI python-dotenv

We create a .env file, and paste the token that BotFather gave us in it.

.env

BOT_TOKEN=<Telegram Token>

We create a new file, bot.py, where we will write the handlers of the Telegram bot.

bot.py

import os
import logging
import telebot
from dotenv import load_dotenv

logger = telebot.logger
telebot.logger.setLevel(logging.INFO)


load_dotenv()

BOT_TOKEN = os.getenv('BOT_TOKEN')



bot = telebot.TeleBot(BOT_TOKEN)

@bot.message_handler(commands=['start', 'hello'])
def send_welcome(message):
    bot.reply_to(message, f"Howdy, how are you doing? This is your chat ID: {message.chat.id}")



bot.infinity_polling()

We initialize the Telebot instance using the BOT_TOKEN.

The above sends the message "Howdy, how are you doing? This is your chat ID: {message.chat.id}" to the user when the user sends the /start or the /hello command. We will need the Chat ID to send the logs to a specific chat or user.

We start the bot's polling using infinity_polling(). This will poll for new updates indefinitely.

We run python3 bot.py in our command line and go to Telegram to send the command /start or /hello.

But, this is just script. We want a program that receives the logs from the FastAPI service we built previously.

We have to run this script on a server. We are going to use FastAPI to run the bot.

pip install fastapi uvicorn

We create a new file, main.py.

main.py

from fastapi import FastAPI, Request
from datetime import datetime
import os
import logging
import telebot
from dotenv import load_dotenv



logger = telebot.logger
telebot.logger.setLevel(logging.INFO)


load_dotenv()

BOT_TOKEN = os.getenv('BOT_TOKEN')
WEBHOOK_HOST = '<ip/domain>'
WEBHOOK_PORT = 8443  # 443, 80, 88 or 8443 (port need to be 'open')
WEBHOOK_LISTEN = '0.0.0.0'  # In some VPS you may need to put here the IP addr
WEBHOOK_URL_BASE = "<Ngrok URL>"
WEBHOOK_URL_PATH = "/{}/".format(BOT_TOKEN)

app = FastAPI()

bot = telebot.TeleBot(BOT_TOKEN)


@app.post(f'/{BOT_TOKEN}/')
def process_webhook(update: dict):
    """
    Process webhook calls
    """
    if update:
        update = telebot.types.Update.de_json(update)
        bot.process_new_updates([update])
    else:
        return



@bot.message_handler(commands=['start', 'hello'])
def send_welcome(message):
    bot.reply_to(message, f"Howdy, how are you doing? This is your chat ID: {message.chat.id}")




bot.remove_webhook()

# Set webhook
bot.set_webhook(
    url=WEBHOOK_URL_BASE + WEBHOOK_URL_PATH,

)  


if __name__ == "__main__":

    app.run(app,
    host="127.0.0.1",
    port=5000)

We initialize the Telebot instance using the BOT_TOKEN. We define the webhook-related variables:

  • WEBHOOK_HOST: The IP address or domain to use for the webhook

  • WEBHOOK_PORT: The port to listen on (443, 80, 88 or 8443)

  • WEBHOOK_LISTEN: The IP address to listen on (0.0.0.0 to listen on all addresses)

  • WEBHOOK_URL_BASE: The base URL for the webhook (from Ngrok URL)

  • WEBHOOK_URL_PATH: The URL path for the webhook.

We initialize the FastAPI app. We define a POST endpoint to process webhook calls from Telegram. It processes the update JSON and passes it to the Telebot process_new_updates() method. We remove any existing webhook for the bot. We set the webhook URL using the Ngrok URL and webhook path.

The constant WEBHOOK_URL_BASE should be a URL generated by Ngrok to receive messages from the chat.

We are going to leave the telegram bot unfinished for now and come back later.

Now, we are going to set up the RabbitMQ server, to send and receive messages.

Setting up RabbitMQ

We need to install RabbitMQ, you can download it from here. And follow the instructions from the webpage to install it.

Now, we are going to build the sender.

The sender is the program that will send the logs to the RabbitMQ queue.

In the visitor_tracker folder, we create a new file, send.py and install the RabbitMQ client.

pip install pika

send.py

import pika


def sender(body: dict):
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    channel.queue_declare(queue='logs')

    channel.basic_publish(exchange='', routing_key='logs', body=body)
    print(" [x] Sent 'Logs'")
    connection.close()

The sender() function takes a dictionary as input and sends the dictionary to the RabbitMQ broker. The dictionary contains information about a single log.

The sender() function first creates a connection to the RabbitMQ broker. Then, the function creates a channel on the connection. The function then declares a queue called logs on the channel. Finally, the function publishes the dictionary to the queue.

The sender() function prints a message to the console to indicate that the log has been sent. The function then closes the connection to the RabbitMQ broker.

The logs that the sender() sends to the queue, will be consumed by the telegram bot and the bot sends them to a chat.

Now, we go to the main.py file to add the sender to the middleware.

main.py

from send import sender

@app.middleware("tracker")
async def tracker(request: Request, call_next):


    tracker = visitor_tracker(request)


    log = new_log(tracker["ip_address"], tracker["request_url"], tracker["request_port"],
                      tracker["request_path"], tracker["request_method"],
                      tracker["browser_type"], tracker["operating_system"],tracker["request_time"])

    sender(log)



    response = await call_next(request)
    return response

Now, we start this server. And we make a request to http://localhost:8000 through a web client or web browser.

When we make a request, the server sends the value of the log variable to a queue.

We can verify if the message is in the queue by running the rabbitmqctl.bat list_queues command in our command line.

There is a message in the logs queue waiting to be consumed.

Now, we go to the telegram bot and create a receiver, a handler to receive the messages from the queue.

We need to install the RabbitMQ client too.

pip install pika

In the main.py file of the telegram bot, we create a function to receive the messages from the queue and send them to a chat.

import pika, sys, os

...
def receiver():
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()

    channel.queue_declare(queue='logs')

    def callback(ch, method, properties, body:dict):
        print(" [x] Received %r" % body)
        log = f"This is the body : {body}"
        bot.send_message(<CHAT ID>, log)

    channel.basic_consume(queue='logs', on_message_callback=callback, auto_ack=True)


    print(' [*] Waiting for messages.')
    channel.start_consuming()
....

@bot.message_handler(func=receiver())
def echo_all(message):
    pass #This is intentional, you may leave this function like this

The receiver() function receives logs from the RabbitMQ broker and sends them to Telegram. The logs are sent to a Telegram bot.

The receiver() function first creates a connection to the RabbitMQ broker. Then, the function creates a channel on the connection. The function then declares a queue called logs on the channel. Finally, the function starts consuming messages from the queue.

The receiver() function calls the callback() function when a message is received. The callback() function prints the message to the console and then sends the message to the Telegram bot.

The user will be receiving a message every time the API is requested.

Complete main.py file of the server.

from fastapi import FastAPI, Request
from datetime import datetime
from tracker import visitor_tracker
from controllers import new_log, all_logs

from send import sender

app = FastAPI()

@app.middleware("tracker")
async def tracker(request: Request, call_next):


    tracker = visitor_tracker(request)


    log = new_log(tracker["ip_address"], tracker["request_url"], tracker["request_port"],
                      tracker["request_path"], tracker["request_method"],
                      tracker["browser_type"], tracker["operating_system"],tracker["request_time"])

    sender(log)



    response = await call_next(request)
    return response


@app.get("/logs")
def logs():
    logs = all_logs()
    return logs

@app.get("/")
def index():
    return "Hello, world"

@app.get("/json")
def some_func():
    return {
        "some_json": "Some Json"
    }


if __name__ == "__main__":
    app.run(port=5000)

Complete main.py file of the telegram bot.

from fastapi import FastAPI, Request
from datetime import datetime
import os
import logging
import telebot
from dotenv import load_dotenv
import pika, sys, os


logger = telebot.logger
telebot.logger.setLevel(logging.INFO)


load_dotenv()

BOT_TOKEN = os.getenv('BOT_TOKEN')
WEBHOOK_HOST = '<ip/domain>'
WEBHOOK_PORT = 8443  # 443, 80, 88 or 8443 (port need to be 'open')
WEBHOOK_LISTEN = '0.0.0.0'  # In some VPS you may need to put here the IP addr
WEBHOOK_URL_BASE = "<Ngrok URL>"
WEBHOOK_URL_PATH = "/{}/".format(BOT_TOKEN)

app = FastAPI()

bot = telebot.TeleBot(BOT_TOKEN)


def receiver():
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()

    channel.queue_declare(queue='logs')

    def callback(ch, method, properties, body:dict):
        print(" [x] Received %r" % body)
        log = f"This is the body : {body}"
        bot.send_message(1047727961, log)

    channel.basic_consume(queue='logs', on_message_callback=callback, auto_ack=True)


    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()



@app.post(f'/{BOT_TOKEN}/')
def process_webhook(update: dict):
    """
    Process webhook calls
    """
    if update:
        update = telebot.types.Update.de_json(update)
        bot.process_new_updates([update])
    else:
        return



@bot.message_handler(commands=['start', 'hello'])
def send_welcome(message):
    bot.reply_to(message, f"Howdy, how are you doing? This is your chat ID: {message.chat.id}")


@bot.message_handler(func=receiver())
def echo_all(message):
    pass #This is intentional, you may leave this function like this



bot.remove_webhook()

# Set webhook
bot.set_webhook(
    url=WEBHOOK_URL_BASE + WEBHOOK_URL_PATH,

)


if __name__ == "__main__":

    app.run(app,
    host="127.0.0.1",
    port=8000)

We start both servers, running uvicorn main:app command in different terminals. And we navigate to locahost:5000/. We should receive in the logs of the telegram bot a message like this:

And if go to our Telegram chat, we should receive a message like this:

Conclusion

In this article, we have shown you how to send logs to a Telegram bot using RabbitMQ and FastAPI. We hope that this article has been helpful, I started writing this article to learn how to use RabbitMQ and how can I use it with FastAPI, and also to practice and know a little bit about Event-Driven applications, which is a concept that I'm trying to understand.

Even when these programs accomplished what I meant, sending logs to a Telegram bot. I'm not satisfied with the telegram bot, especially the handler that has the receiver() as a parameter and the callback() sending the message to the chat.

Also, it has an issue when we stop the telegram bot or reload, the server doesn't start completely.

Thank you for taking the time to read this article.

If you have any recommendations about other packages, architectures, how to improve my code, my English, or anything; please leave a comment or contact me through Twitter, or LinkedIn.

The visitor tracker's code is here.

The source code of the Telegram Bot is here.

Resources