python操作kafka实践的示例代码

 更新时间:2019年06月19日 09:17:42   作者:Small_office  
这篇文章主要介绍了python操作kafka实践的示例代码,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

1、先看最简单的场景,生产者生产消息,消费者接收消息,下面是生产者的简单代码。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import json
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='xxxx:x')

msg_dict = {
  "sleep_time": 10,
  "db_config": {
    "database": "test_1",
    "host": "xxxx",
    "user": "root",
    "password": "root"
  },
  "table": "msg",
  "msg": "Hello World"
}
msg = json.dumps(msg_dict)
producer.send('test_rhj', msg, partition=0)
producer.close()

下面是消费者的简单代码:

from kafka import KafkaConsumer

consumer = KafkaConsumer('test_rhj', bootstrap_servers=['xxxx:x'])
for msg in consumer:
  recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
  print recv

下面是结果:

2、如果想要完成负载均衡,就需要知道kafka的分区机制,同一个主题,可以为其分区,在生产者不指定分区的情况,kafka会将多个消息分发到不同的分区,消费者订阅时候如果不指定服务组,会收到所有分区的消息,如果指定了服务组,则同一服务组的消费者会消费不同的分区,如果2个分区两个消费者的消费者组消费,则,每个消费者消费一个分区,如果有三个消费者的服务组,则会出现一个消费者消费不到数据;如果想要消费同一分区,则需要用不同的服务组。以此为原理,我们对消费者做如下修改:

from kafka import KafkaConsumer

consumer = KafkaConsumer('test_rhj', bootstrap_servers=['xxxx:x'])
for msg in consumer:
  recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
  print recv

然后我们开两个消费者进行消费,生产者分别往0分区和1分区发消息结果如下,可以看到,一个消费者只能消费0分区,另一个只能消费1分区:


3、kafka提供了偏移量的概念,允许消费者根据偏移量消费之前遗漏的内容,这基于kafka名义上的全量存储,可以保留大量的历史数据,历史保存时间是可配置的,一般是7天,如果偏移量定位到了已删除的位置那也会有问题,但是这种情况可能很小;每个保存的数据文件都是以偏移量命名的,当前要查的偏移量减去文件名就是数据在该文件的相对位置。要指定偏移量消费数据,需要指定该消费者要消费的分区,否则代码会找不到分区而无法消费,代码如下:

from kafka import KafkaConsumer
from kafka.structs import TopicPartition

consumer = KafkaConsumer(group_id='123456', bootstrap_servers=['10.43.35.25:4531'])
consumer.assign([TopicPartition(topic='test_rhj', partition=0), TopicPartition(topic='test_rhj', partition=1)])
print consumer.partitions_for_topic("test_rhj") # 获取test主题的分区信息
print consumer.assignment()
print consumer.beginning_offsets(consumer.assignment())
consumer.seek(TopicPartition(topic='test_rhj', partition=0), 0)
for msg in consumer:
  recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
  print recv

因为指定的便宜量为0,所以从一开始插入的数据都可以查到,而且因为指定了分区,指定的分区结果都可以消费,结果如下:

4、有时候,我们并不需要实时获取数据,因为这样可能会造成性能瓶颈,我们只需要定时去获取队列里的数据然后批量处理就可以,这种情况,我们可以选择主动拉取数据

from kafka import KafkaConsumer
import time

consumer = KafkaConsumer(group_id='123456', bootstrap_servers=['10.43.35.25:4531'])
consumer.subscribe(topics=('test_rhj',))
index = 0
while True:
  msg = consumer.poll(timeout_ms=5) # 从kafka获取消息
  print msg
  time.sleep(2)
  index += 1
  print '--------poll index is %s----------' % index

结果如下,可以看到,每次拉取到的都是前面生产的数据,可能是多条的列表,也可能没有数据,如果没有数据,则拉取到的为空:

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

相关文章

  • 使用Python+Appuim 清理微信的方法

    使用Python+Appuim 清理微信的方法

    这篇文章主要介绍了使用Python+Appuim 清理微信,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-01-01
  • 在Django中创建自己的自定义用户模型

    在Django中创建自己的自定义用户模型

    这篇文章主要介绍了在Django中创建自己的自定义用户模型,创建自己的自定义用户模型至关重要。将来,如果要对模型进行一些更改,则可以轻松进行这些更改。不然我们可能必须对模型进行一些更改,而且代码的某些部分也将被更改,下面一起进入文章里哦阿姐个表格的详细内容吧
    2022-01-01
  • Python接口开发实现步骤详解

    Python接口开发实现步骤详解

    这篇文章主要介绍了Python接口开发实现步骤详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-04-04
  • python openpyxl筛选某些列的操作

    python openpyxl筛选某些列的操作

    这篇文章主要介绍了python openpyxl筛选某些列的操作,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2021-03-03
  • Django 报错:Broken pipe from ('127.0.0.1', 58924)的解决

    Django 报错:Broken pipe from ('127.0.0.1', 5892

    这篇文章主要介绍了Django 报错:Broken pipe from ('127.0.0.1', 58924)的解决方案,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-09-09
  • Python pyecharts模块安装与入门教程

    Python pyecharts模块安装与入门教程

    Echarts 是一个由百度开源的数据可视化,凭借着良好的交互性,精巧的图表设计,得到了众多开发者的认可,这篇文章主要介绍了Python pyecharts数据可视化模块安装与入门教程,需要的朋友可以参考下
    2022-09-09
  • pycharm命令终端运行python文件以及传递参数方式

    pycharm命令终端运行python文件以及传递参数方式

    这篇文章主要介绍了pycharm命令终端运行python文件以及传递参数方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-06-06
  • 如何利用Python连接MySQL数据库实现数据储存

    如何利用Python连接MySQL数据库实现数据储存

    当我们学习了mysql数据库后,我们会想着该如何将python和mysql结合起来运用,下面这篇文章主要给大家介绍了关于如何利用Python连接MySQL数据库实现数据储存的相关资料,需要的朋友可以参考下
    2021-11-11
  • python开发之函数定义实例分析

    python开发之函数定义实例分析

    这篇文章主要介绍了python开发之函数定义方法,以实例形式较为详细的分析了Python中函数的定义与使用技巧,需要的朋友可以参考下
    2015-11-11
  • 关于pygame自定义窗口创建及相关操作指南

    关于pygame自定义窗口创建及相关操作指南

    对于开发一个游戏来说,窗口的显示肯定是前提中的前提,对于pygame来说,只需要一小段代码就可以初始化窗口,下面这篇文章主要给大家介绍了关于pygame自定义窗口创建及相关操作的相关资料,需要的朋友可以参考下
    2022-07-07

最新评论