Establishing a Connection to Apache Kafka Using Python Flask
Apache Kafka is a distributed streaming platform to enables communication among producers and consumers in publish-subscribe messaging to deal with huge volumes of real-time data stream.
Additionally, I provided the docker-compose.yml file to build, run, and test Apache Kafka locally.
Pre-requisites
Building the docker-compose.yml file requires installing the Docker engine and Docker Compose.
Installation
- Create the docker-compose.yml, and write the following code below.
version: "3"
services:
zookeeper:
restart: always
image: docker.io/bitnami/zookeeper:3.8
ports:
- "2181:2181"
volumes:
- "zookeeper-volume:/bitnami"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
restart: always
image: docker.io/bitnami/kafka:3.3
ports:
- "9093:9093"
volumes:
- "kafka-volume:/bitnami"
environment:
- KAFKA_BROKER_ID=1
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9093
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT
depends_on:
- zookeeper
volumes:
kafka-volume:
zookeeper-volume:
- Open a terminal and cd to the current working directory in which docker-compose.yml is saved and run:
docker-compose up -d
Interacting with Kafka containers from your computer
- Checking docker containers currently run executes the following command:
docker ps | grep -e kafka -e zookeeper
- Creating a Kafka topic runs the following command:
docker-compose exec kafka kafka-topics.sh --create --bootstrap-server \
localhost:9092 --replication-factor 1 --partitions 1 --topic hello-kafka
- Listing Kafka topics runs the following command:
docker-compose exec kafka kafka-topics.sh --list --bootstrap-server \
localhost:9092
- Sending messages to Kafka topics uses the following command:
docker-compose exec kafka kafka-console-producer.sh --broker-list \
localhost:9092 --topic hello-kafka
- Consuming messages from Kafka topics runs the following command:
docker-compose exec kafka kafka-console-consumer.sh --bootstrap-server \
localhost:9092 --topic hello-kafka --from-beginning
- Stopping Kafka containers runs the following command:
docker-compose down
Building Python Flask to pull the live stream data from Apache Kafka
The flask-kafka project was developed to publish and subscribe from Kafka topics by exposing services as REST-API. There are several Python libraries to interact with Kafka, e.g., confluent-kafka, kafka-python, aiokafka. In this tutorial, I use confluent-kafka, which provides the producer, consumer, and admin operations with a high-performance Python client for Apache Kafka.
Create a Flask application
- If you don’t have a Flask application, create one by installing Flask using the following command:
pip install flask
- Next, establish a Flask application instance by creating a Python file named “app.py” to expose RESTful API methods. The content of my app.py is as follows:
import sys
from flask import Flask
from pathlib import Path
from api.bootstrap import Bootstrap
from api.controller.consumer_controller import consumer
from api.controller.producer_controller import producer
"""
Author : Neda Peyrone
Create Date : 30-08-2023
File : app.py
"""
def create_app():
path = Path().absolute()
Bootstrap(path) # load parameters from the YAML file
app = Flask(__name__)
app.register_blueprint(consumer)
app.register_blueprint(producer)
return app
if __name__ == '__main__':
try:
port = int(sys.argv[1])
except Exception:
port = 8081
create_app().run(host='0.0.0.0', port=port, debug=True, use_reloader=True)
- To initialize the necessary parameters from the YAML file configuration, I created the Bootstrap class and instantiated it within the create_app() function in the app.py file. The code for the Bootstrap class is provided below.
from api.util.file_util import read_yaml_file
from api.configuration.app_config import AppConfig
"""
Author : Neda Peyrone
Create Date : 30-08-2023
File : bootstrap.py
"""
class Bootstrap:
def __init__(self, path) -> None:
self.path = path
self.load_app_config()
def load_app_config(self):
d = read_yaml_file(f"{self.path}/resources/config.yaml")
cfg = AppConfig(d)
print(f"O:--Load App Config--:bootstrap_servers/{cfg.params['kafka']['bootstrap_servers']}")
- The following code demonstrates the YAML file configuration used in this Flask application.
kafka:
bootstrap_servers: localhost:9093
auto_offset_reset: earliest
- The snippet code below demonstrates how to create the consumer_controller.py file. This Python file is responsible for controlling HTTP requests from clients. It validates and transforms them into consumer instances before passing them to the consumer service.
from flask import request, Blueprint
from marshmallow import ValidationError
from api.domain.consumer import Consumer
from api.util import exception_util, service_util
from api.constant.message_code import MessageCode
from api.schema.consumer_schema import ConsumerSchema
from api.exception.service_exception import ServiceException
from api.business_logic.consumer_service import ConsumerService
consumer = Blueprint('consumer', __name__)
"""
Author : Neda Peyrone
Create Date : 30-08-2023
File : consumer_controller.py
"""
@consumer.route("/consume", methods=['POST'])
def consume():
payload = request.get_json()
try:
validated_data = ConsumerSchema().load(payload)
consumer = Consumer(**validated_data)
msg = ConsumerService().consume(consumer)
return service_util.build_server_response(MessageCode.SUCCESS, msg)
except (ValidationError, ServiceException) as err:
return exception_util.handler(err)
- The following code demonstrates the creation of the ConsumerService class, which encapsulates the business logic needed to fetch messages from a specified Kafka topic through the KafkaConnector class.
from api.domain.consumer import Consumer
from api.constant.message_code import MessageCode
from api.configuration.app_config import AppConfig
from api.configuration.kafka_config import KafkaConfig
from api.connector.kafka_connector import KafkaConnector
from api.exception.service_exception import ServiceException
"""
Author : Neda Peyrone
Create Date : 30-08-2023
File : consumer_service.py
"""
class ConsumerService:
def __init__(self):
self.__load_params()
self.connector = KafkaConnector(self.cfg)
def __load_params(self):
appConfig = AppConfig()
self.cfg = KafkaConfig(
appConfig.params['kafka']['bootstrap_servers'],
appConfig.params['kafka']['auto_offset_reset']
)
def consume(self, consumer: Consumer):
action = 'consume'
print(f'I:--START--:--{action}--')
try:
msg = self.connector.consume(consumer.group_id, consumer.topic, conf=consumer.config)
print(f'O:--SUCCESS--:--{action}--:msg/{msg}')
return msg
except Exception as err:
self.__handle_error(action, err)
def __handle_error(self, action, err):
dir(err)
desc = MessageCode.UNKNOWN_ERROR.value if len(err.args) == 0 else err.args[0]
print(f'O:--FAIL--:--{action}--:errorDesc/{desc}')
raise ServiceException(desc)
- The following code demonstrates how to create the KafkaConnector class, which facilitates Kafka connections and offers essential functions, e.g., produce and consume.
from ast import Dict
from typing import Optional
from api.configuration.kafka_config import KafkaConfig
from api.exception.service_exception import ServiceException
from confluent_kafka import Producer, Consumer, KafkaError, KafkaException
"""
Author : Neda Peyrone
Create Date : 30-08-2023
File : kafka_connector.py
"""
class KafkaConnector:
def __init__(self, config: Optional[KafkaConfig]):
self.config = config
def error_callback(self, err: KafkaError):
raise ServiceException(err.str())
def __get_producer(self, conf: Optional[Dict]=None) -> Producer:
default_conf = {
'bootstrap.servers': self.config.bootstrap_servers,
'retries': 0,
'error_cb': self.error_callback
}
if conf:
default_conf.update(conf)
return Producer(default_conf)
def produce(self, topic, message, conf: Optional[Dict]=None):
producer = self.__get_producer(conf)
producer.produce(topic, message)
producer.flush()
def __get_consumer(self, group_id, topics, conf: Optional[Dict]=None) -> Consumer:
default_conf = {
'bootstrap.servers': self.config.bootstrap_servers,
'group.id': group_id,
'auto.offset.reset': self.config.auto_offset_reset,
'enable.auto.commit': True
}
if conf:
default_conf.update(conf)
try:
consumer = Consumer(default_conf)
consumer.subscribe(topics)
return consumer
except KafkaException as err:
raise ServiceException(f'Error while consuming message: "{err.args[0].str()}."')
def consume(self, group_id: str, topic: str, timeout=10.0, conf: Optional[Dict] = None):
consumer = self.__get_consumer(group_id, [topic], conf)
msg = consumer.poll(timeout) # Pull a message takes as much as 10 seconds
consumer.close() # Close the consumer
if msg is None:
raise ServiceException('Error consuming a message that does not have a message part.')
elif not msg.error():
return msg.value().decode('utf-8')
elif msg.error().code() == KafkaError._PARTITION_EOF:
raise ServiceException(f'End of partition reached {msg.topic()}/{msg.partition()}.')
else:
raise ServiceException(f'Error while consuming message: {(msg.error().str())}.')
Start the flask-kafka project using the following command:
python app.py
To test the consumer and producer services, I use the REST Client, an extension available on Visual Studio Code (VSCode).
Have fun and happy coding :)