Confluent Kafka Python Client

-> Download kafka and untar:

wget http://mirrors.estointernet.in/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz
tar -xvf ./kafka_2.11-2.1.0.tgz

-> Start Zookeeper service

bin/zookeeper-server-start.sh config/zookeeper.properties

->…


This content originally appeared on DEV Community and was authored by Santhosh Balasa

-> Download kafka and untar:

wget http://mirrors.estointernet.in/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz 
tar -xvf ./kafka_2.11-2.1.0.tgz

-> Start Zookeeper service

bin/zookeeper-server-start.sh config/zookeeper.properties

-> Start Kafka service

bin/kafka-server-start.sh config/server.properties

-> Create a topic named obs_parser

 bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic obs3-meta --partitions 2 --replication-factor 1

-> Start a Producer to send messages

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic obs3-meta

-> Start a Consumer to receive messages

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic obs3_meta

-> Ingest: To write into the queue:

"""
Routines to write router info into Kafka Message Queue through resource uuid.

"""
from confluent_kafka import Producer
import logging

from settings import kafka_host, kafka_port, kafka_topic

logger = logging.getLogger(__name__)


def ingest(parsed_json):
    def delivery_msg(err, msg):
        if err:
            logger.error(f'obs3-meta kafka message failed delivery: {err}\n')

    p = Producer({'bootstrap.servers': f'{kafka_host}:{kafka_port}'})
    p.poll(0)
    p.produce(kafka_topic, str(msg), callback=delivery_msg)
    p.flush()

-> Egest: To read from the queue:

from confluent_kafka import Consumer, KafkaError
import sys
import logging

from settings import kafka_host, kafka_port, kafka_topic

logger = logging.getLogger(__name__)


def egest(uuid):
    c = Consumer({
        'bootstrap.servers': f'{kafka_host}:{kafka_port}',
        'group.id': 'console-consumer-8436',
        'auto.offset.reset': 'earliest'
    })

    c.subscribe([kafka_topic])

    while True:
        msg = c.poll(1.0)
        if msg is None:
            continue
        if msg.error():
            print(f'Consumer error: {msg.error()}')
            continue
        if uuid in msg.value().decode('utf-8'):
            parsed_json = msg_value().decode('utf-8')
            break
    c.close()


if __name__ == '__main__':
    try:
        uuid = 'f70ab2e6-ca07-43b4-9586-de1c9fb45584'
        egest(uuid)
    except Exception as e:
        logger.error(f'Processor exit with: {e}', exc_info=True)
        sys.exit(1)  # exit with error


This content originally appeared on DEV Community and was authored by Santhosh Balasa


Print Share Comment Cite Upload Translate Updates
APA

Santhosh Balasa | Sciencx (2021-05-31T03:45:26+00:00) Confluent Kafka Python Client. Retrieved from https://www.scien.cx/2021/05/31/confluent-kafka-python-client/

MLA
" » Confluent Kafka Python Client." Santhosh Balasa | Sciencx - Monday May 31, 2021, https://www.scien.cx/2021/05/31/confluent-kafka-python-client/
HARVARD
Santhosh Balasa | Sciencx Monday May 31, 2021 » Confluent Kafka Python Client., viewed ,<https://www.scien.cx/2021/05/31/confluent-kafka-python-client/>
VANCOUVER
Santhosh Balasa | Sciencx - » Confluent Kafka Python Client. [Internet]. [Accessed ]. Available from: https://www.scien.cx/2021/05/31/confluent-kafka-python-client/
CHICAGO
" » Confluent Kafka Python Client." Santhosh Balasa | Sciencx - Accessed . https://www.scien.cx/2021/05/31/confluent-kafka-python-client/
IEEE
" » Confluent Kafka Python Client." Santhosh Balasa | Sciencx [Online]. Available: https://www.scien.cx/2021/05/31/confluent-kafka-python-client/. [Accessed: ]
rf:citation
» Confluent Kafka Python Client | Santhosh Balasa | Sciencx | https://www.scien.cx/2021/05/31/confluent-kafka-python-client/ |

Please log in to upload a file.




There are no updates yet.
Click the Upload button above to add an update.

You must be logged in to translate posts. Please log in or register.