Establishing a Connection to Apache Kafka Using Python Flask

Neda Peyrone, PhD
5 min readSep 19, 2023

--

Apache Kafka is a distributed streaming platform to enables communication among producers and consumers in publish-subscribe messaging to deal with huge volumes of real-time data stream.

Additionally, I provided the docker-compose.yml file to build, run, and test Apache Kafka locally.

Pre-requisites
Building the docker-compose.yml file requires installing the Docker engine and Docker Compose.

Installation

  • Create the docker-compose.yml, and write the following code below.
version: "3"
services:
zookeeper:
restart: always
image: docker.io/bitnami/zookeeper:3.8
ports:
- "2181:2181"
volumes:
- "zookeeper-volume:/bitnami"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes

kafka:
restart: always
image: docker.io/bitnami/kafka:3.3
ports:
- "9093:9093"
volumes:
- "kafka-volume:/bitnami"
environment:
- KAFKA_BROKER_ID=1
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9093
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT
depends_on:
- zookeeper

volumes:
kafka-volume:
zookeeper-volume:
  • Open a terminal and cd to the current working directory in which docker-compose.yml is saved and run:
docker-compose up -d

Interacting with Kafka containers from your computer

  • Checking docker containers currently run executes the following command:
docker ps | grep -e kafka -e zookeeper
  • Creating a Kafka topic runs the following command:
docker-compose exec kafka kafka-topics.sh --create --bootstrap-server \
localhost:9092 --replication-factor 1 --partitions 1 --topic hello-kafka
  • Listing Kafka topics runs the following command:
docker-compose exec kafka kafka-topics.sh --list --bootstrap-server \ 
localhost:9092
  • Sending messages to Kafka topics uses the following command:
docker-compose exec kafka kafka-console-producer.sh --broker-list \
localhost:9092 --topic hello-kafka
  • Consuming messages from Kafka topics runs the following command:
docker-compose exec kafka kafka-console-consumer.sh --bootstrap-server \
localhost:9092 --topic hello-kafka --from-beginning
  • Stopping Kafka containers runs the following command:
docker-compose down

Building Python Flask to pull the live stream data from Apache Kafka

The flask-kafka project was developed to publish and subscribe from Kafka topics by exposing services as REST-API. There are several Python libraries to interact with Kafka, e.g., confluent-kafka, kafka-python, aiokafka. In this tutorial, I use confluent-kafka, which provides the producer, consumer, and admin operations with a high-performance Python client for Apache Kafka.

Create a Flask application

  • If you don’t have a Flask application, create one by installing Flask using the following command:
pip install flask
  • Next, establish a Flask application instance by creating a Python file named “app.py” to expose RESTful API methods. The content of my app.py is as follows:
import sys
from flask import Flask
from pathlib import Path
from api.bootstrap import Bootstrap

from api.controller.consumer_controller import consumer
from api.controller.producer_controller import producer

"""
Author : Neda Peyrone
Create Date : 30-08-2023
File : app.py
"""

def create_app():
path = Path().absolute()
Bootstrap(path) # load parameters from the YAML file

app = Flask(__name__)

app.register_blueprint(consumer)
app.register_blueprint(producer)

return app


if __name__ == '__main__':
try:
port = int(sys.argv[1])
except Exception:
port = 8081

create_app().run(host='0.0.0.0', port=port, debug=True, use_reloader=True)
  • To initialize the necessary parameters from the YAML file configuration, I created the Bootstrap class and instantiated it within the create_app() function in the app.py file. The code for the Bootstrap class is provided below.
from api.util.file_util import read_yaml_file
from api.configuration.app_config import AppConfig

"""
Author : Neda Peyrone
Create Date : 30-08-2023
File : bootstrap.py
"""

class Bootstrap:

def __init__(self, path) -> None:
self.path = path
self.load_app_config()

def load_app_config(self):
d = read_yaml_file(f"{self.path}/resources/config.yaml")
cfg = AppConfig(d)
print(f"O:--Load App Config--:bootstrap_servers/{cfg.params['kafka']['bootstrap_servers']}")
  • The following code demonstrates the YAML file configuration used in this Flask application.
kafka:
bootstrap_servers: localhost:9093
auto_offset_reset: earliest
  • The snippet code below demonstrates how to create the consumer_controller.py file. This Python file is responsible for controlling HTTP requests from clients. It validates and transforms them into consumer instances before passing them to the consumer service.
from flask import request, Blueprint
from marshmallow import ValidationError
from api.domain.consumer import Consumer
from api.util import exception_util, service_util
from api.constant.message_code import MessageCode
from api.schema.consumer_schema import ConsumerSchema
from api.exception.service_exception import ServiceException
from api.business_logic.consumer_service import ConsumerService

consumer = Blueprint('consumer', __name__)

"""
Author : Neda Peyrone
Create Date : 30-08-2023
File : consumer_controller.py
"""

@consumer.route("/consume", methods=['POST'])
def consume():
payload = request.get_json()

try:
validated_data = ConsumerSchema().load(payload)
consumer = Consumer(**validated_data)

msg = ConsumerService().consume(consumer)
return service_util.build_server_response(MessageCode.SUCCESS, msg)
except (ValidationError, ServiceException) as err:
return exception_util.handler(err)
  • The following code demonstrates the creation of the ConsumerService class, which encapsulates the business logic needed to fetch messages from a specified Kafka topic through the KafkaConnector class.
from api.domain.consumer import Consumer
from api.constant.message_code import MessageCode
from api.configuration.app_config import AppConfig
from api.configuration.kafka_config import KafkaConfig
from api.connector.kafka_connector import KafkaConnector
from api.exception.service_exception import ServiceException

"""
Author : Neda Peyrone
Create Date : 30-08-2023
File : consumer_service.py
"""

class ConsumerService:

def __init__(self):
self.__load_params()
self.connector = KafkaConnector(self.cfg)

def __load_params(self):
appConfig = AppConfig()
self.cfg = KafkaConfig(
appConfig.params['kafka']['bootstrap_servers'],
appConfig.params['kafka']['auto_offset_reset']
)

def consume(self, consumer: Consumer):
action = 'consume'
print(f'I:--START--:--{action}--')

try:
msg = self.connector.consume(consumer.group_id, consumer.topic, conf=consumer.config)
print(f'O:--SUCCESS--:--{action}--:msg/{msg}')
return msg
except Exception as err:
self.__handle_error(action, err)

def __handle_error(self, action, err):
dir(err)
desc = MessageCode.UNKNOWN_ERROR.value if len(err.args) == 0 else err.args[0]
print(f'O:--FAIL--:--{action}--:errorDesc/{desc}')
raise ServiceException(desc)
  • The following code demonstrates how to create the KafkaConnector class, which facilitates Kafka connections and offers essential functions, e.g., produce and consume.
from ast import Dict
from typing import Optional
from api.configuration.kafka_config import KafkaConfig
from api.exception.service_exception import ServiceException
from confluent_kafka import Producer, Consumer, KafkaError, KafkaException

"""
Author : Neda Peyrone
Create Date : 30-08-2023
File : kafka_connector.py
"""

class KafkaConnector:

def __init__(self, config: Optional[KafkaConfig]):
self.config = config

def error_callback(self, err: KafkaError):
raise ServiceException(err.str())

def __get_producer(self, conf: Optional[Dict]=None) -> Producer:
default_conf = {
'bootstrap.servers': self.config.bootstrap_servers,
'retries': 0,
'error_cb': self.error_callback
}

if conf:
default_conf.update(conf)

return Producer(default_conf)

def produce(self, topic, message, conf: Optional[Dict]=None):
producer = self.__get_producer(conf)
producer.produce(topic, message)
producer.flush()

def __get_consumer(self, group_id, topics, conf: Optional[Dict]=None) -> Consumer:
default_conf = {
'bootstrap.servers': self.config.bootstrap_servers,
'group.id': group_id,
'auto.offset.reset': self.config.auto_offset_reset,
'enable.auto.commit': True
}

if conf:
default_conf.update(conf)

try:
consumer = Consumer(default_conf)
consumer.subscribe(topics)
return consumer
except KafkaException as err:
raise ServiceException(f'Error while consuming message: "{err.args[0].str()}."')

def consume(self, group_id: str, topic: str, timeout=10.0, conf: Optional[Dict] = None):
consumer = self.__get_consumer(group_id, [topic], conf)
msg = consumer.poll(timeout) # Pull a message takes as much as 10 seconds
consumer.close() # Close the consumer
if msg is None:
raise ServiceException('Error consuming a message that does not have a message part.')
elif not msg.error():
return msg.value().decode('utf-8')
elif msg.error().code() == KafkaError._PARTITION_EOF:
raise ServiceException(f'End of partition reached {msg.topic()}/{msg.partition()}.')
else:
raise ServiceException(f'Error while consuming message: {(msg.error().str())}.')

Start the flask-kafka project using the following command:

python app.py

To test the consumer and producer services, I use the REST Client, an extension available on Visual Studio Code (VSCode).

Demonstrating the call producer service through the REST Client.
Demonstrating the call consumer service through the REST Client.

Have fun and happy coding :)

--

--

Neda Peyrone, PhD
Neda Peyrone, PhD

Written by Neda Peyrone, PhD

I have completed my PhD in Computer Engineering from Chulalongkorn University. I am currently working as a Senior Technical Analyst in the financial industry.

Responses (2)