python 实时获取kafka消费队列信息示例详解

 更新时间:2023年07月24日 08:40:03   作者:xiaoming0018  
这篇文章主要介绍了python实时获取kafka消费队列信息,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下

安装 pykafka

pip install pykafka

一、消费kafka消息

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from pykafka import KafkaClient
from pykafka.common import OffsetType
from vpn_data_handler import handler_data
bootstrap_servers = '10.*.**.**:9092'
group_id = 'test1'
class KConsumer(object):
    """kafka 消费者; 动态传参,非配置文件传入;
      kafka 的消费者应该尽量和生产者保持在不同的节点上;否则容易将程序陷入死循环中;
     """
    _encode = "UTF-8"
    def __init__(self, topics, bootstrap_server=None, group_id=group_id, partitions=None):
        """ 初始化kafka的消费者;
           1. 设置默认 kafka 的主题, 节点地址, 消费者组 id(不传入的时候使用默认的值)
           2. 当需要设置特定参数的时候可以直接在 kwargs 直接传入,进行解包传入原始函数;
         Args:
           topics: str; kafka 的消费主题;
           bootstrap_server: list; kafka 的消费者地址;
           group_id: str; kafka 的消费者分组 id,默认是 start_task 主要是接收并启动任务的消费者,仅此一个消费者组id;
         """
        if bootstrap_server is None:
            bootstrap_server = bootstrap_servers
        self.client = KafkaClient(hosts=bootstrap_server)
        # 选择要消费的topic
        vpn_topic = self.client.topics[topics]
        self.consumer = vpn_topic.get_simple_consumer(consumer_group=group_id,
                                                      consumer_timeout_ms=200,
                                                      auto_commit_enable=True,# 自动提交偏移量
                                                      auto_offset_reset=OffsetType.LATEST)  #LATEST 获取当前偏移量最新消息  EARLIEST从头开始获取信息
    def recv(self):
        """
         接收消费中的数据
         Returns:
         """
        return self.consumer
def main():
    """
    kafka消费队列入口
    :param topic:
    :return:
    """
    obj = KConsumer(topics="topics_name")
    while True:
        for message in obj.recv():
            data = eval(message.value.decode('utf-8'))
            handler_data(data)
if __name__ == '__main__':
    main()

二、生产者推送消息

#!/usr/bin/python
# -*- coding:utf-8 -*-
from pykafka import KafkaClient
client = KafkaClient(hosts="10.XX0.XX0.XX4:9092")  # 可接受多个client
# 查看所有的topic
# print(client.topics)
topic = client.topics['test_78'] # 选择一个topic
message = "test message2 test message2"
with topic.get_sync_producer() as producer:
    producer.produce(bytes(message, encoding='utf8')) #python3需要编码
    print(message)

到此这篇关于python 实时获取kafka消费队列信息的文章就介绍到这了,更多相关python kafka消费队列信息内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • python调用虹软2.0第三版的具体使用

    python调用虹软2.0第三版的具体使用

    这篇文章主要介绍了python调用虹软2.0第三版的具体使用,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2019-02-02
  • 基于python使用tibco ems代码实例

    基于python使用tibco ems代码实例

    这篇文章主要介绍了基于python使用tibco ems代码实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-12-12
  • Python微服务开发之使用FastAPI构建高效API

    Python微服务开发之使用FastAPI构建高效API

    微服务架构在现代软件开发中日益普及,它将复杂的应用程序拆分成多个可独立部署的小型服务。本文将介绍如何使用 Python 的 FastAPI 库快速构建和部署微服务,感兴趣的可以了解一下
    2023-05-05
  • Python格式化处理JSON数据的完整指南

    Python格式化处理JSON数据的完整指南

    在Python中,我们经常需要处理JSON数据,而格式化JSON数据是开发过程中的常见需求,本文将详细介绍如何在Python中对JSON数据进行格式化处理,感兴趣的小伙伴可以跟随小编一起学习一下
    2026-04-04
  • python实现磁盘日志清理的示例

    python实现磁盘日志清理的示例

    这篇文章主要介绍了python实现磁盘日志清理的示例,帮助大家更好的理解和使用python,感兴趣的朋友可以了解下
    2020-11-11
  • python docx 中文字体设置的操作方法

    python docx 中文字体设置的操作方法

    今天小编就为大家分享一篇python docx 中文字体设置的操作方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2018-05-05
  • pandas计数 value_counts()的使用

    pandas计数 value_counts()的使用

    这篇文章主要介绍了pandas计数 value_counts()的使用,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-06-06
  • Python二进制文件读取并转换为浮点数详解

    Python二进制文件读取并转换为浮点数详解

    这篇文章主要介绍了Python二进制文件读取并转换为浮点数详解,用python读取二进制文件,这里主要用到struct包,而这个包里面的方法主要是unpack、pack、calcsize。,需要的朋友可以参考下
    2019-06-06
  • Python实现线程池之线程安全队列

    Python实现线程池之线程安全队列

    这篇文章主要为大家详细介绍了Python实现线程池之线程安全队列,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2022-05-05
  • Python实例方法与类方法和静态方法介绍与区别分析

    Python实例方法与类方法和静态方法介绍与区别分析

    在 Python 中,实例方法(instance method),类方法(class method)与静态方法(static method)经常容易混淆。本文通过代码例子来说明它们的区别
    2022-10-10

最新评论