python kafka 多线程消费者&手动提交实例

 更新时间:2019年12月21日 15:08:06   作者:一天两晒网  
今天小编就为大家分享一篇python kafka 多线程消费者&手动提交实例,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧

官方文档:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html

import threading
 
import os
import sys
from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata
 
from consumers.db_util import *
from consumers.json_dispose import *
from collections import OrderedDict
 
 
threads = []
# col_dic, sql_dic = get()
 
 
class MyThread(threading.Thread):
  def __init__(self, thread_name, topic, partition):
    threading.Thread.__init__(self)
    self.thread_name = thread_name
    # self.keyName = keyName
    self.partition = partition
    self.topic = topic
 
  def run(self):
    print("Starting " + self.name)
    Consumer(self.thread_name, self.topic, self.partition)
 
  def stop(self):
    sys.exit()
 
 
def Consumer(thread_name, topic, partition):
  broker_list = '172.16.90.63:6667, 172.16.90.58:6667, 172.16.90.59:6667'
  '''
  fetch_min_bytes(int) - 服务器为获取请求而返回的最小数据量,否则请等待
  fetch_max_wait_ms(int) - 如果没有足够的数据立即满足fetch_min_bytes给出的要求,服务器在回应提取请求之前将阻塞的最大时间量(以毫秒为单位)
  fetch_max_bytes(int) - 服务器应为获取请求返回的最大数据量。这不是绝对最大值,如果获取的第一个非空分区中的第一条消息大于此值,
              则仍将返回消息以确保消费者可以取得进展。注意:使用者并行执行对多个代理的提取,因此内存使用将取决于包含该主题分区的代理的数量。
              支持的Kafka版本> = 0.10.1.0。默认值:52428800(50 MB)。
  enable_auto_commit(bool) - 如果为True,则消费者的偏移量将在后台定期提交。默认值:True。
  max_poll_records(int) - 单次调用中返回的最大记录数poll()。默认值:500
  max_poll_interval_ms(int) - poll()使用使用者组管理时的调用之间的最大延迟 。这为消费者在获取更多记录之前可以闲置的时间量设置了上限。
                如果 poll()在此超时到期之前未调用,则认为使用者失败,并且该组将重新平衡以便将分区重新分配给另一个成员。默认300000
  '''
  consumer = KafkaConsumer(bootstrap_servers=broker_list,
               group_id="xiaofesi",
               client_id=thread_name,
               enable_auto_commit=False,
               fetch_min_bytes=1024*1024,#1M
               # fetch_max_bytes=1024 * 1024 * 1024 * 10,
               fetch_max_wait_ms=60000,#30s
               request_timeout_ms=305000,
               # consumer_timeout_ms=1,
               # max_poll_records=5000,
               # max_poll_interval_ms=60000 无该参数
               )
  #查出数据库上次保存的offset,此offset已经是上次消费最后一条的offset的offset+1,也就是这次消费的起始位
  dic = get_kafka(topic, partition)
  tp = TopicPartition(topic, partition)
  print(thread_name, tp, dic['offset'])
  #分配该消费者的TopicPartition,也就是topic和partition,根据参数,我是三个消费者,三个线程,每个线程消费者消费一个分区
  consumer.assign([tp])
  #重置此消费者消费的起始位
  consumer.seek(tp, dic['offset'])
  print("程序首次运行\t线程:", thread_name, "分区:", partition, "偏移量:", dic['offset'], "\t开始消费...")
  num=0 #记录该消费者消费次数
  # end_offset = consumer.end_offsets([tp])[tp]
  # print(end_offset)
  while True:
    args = OrderedDict()
    msg = consumer.poll(timeout_ms=60000)
    end_offset = consumer.end_offsets([tp])[tp]
    print('已保存的偏移量', consumer.committed(tp),'最新偏移量,',end_offset)
    if len(msg) > 0:
      print("线程:", thread_name, "分区:", partition, "最大偏移量:", end_offset, "有无数据,", len(msg))
      lines=0
      for data in msg.values():
        for line in data:
          lines+=1
          line = eval(line.value.decode('utf-8'))
          '''
          do something
          '''
      # 线程此批次消息条数
      print(thread_name,"lines",lines)
      #数据保存至数据库
      is_succeed = save_to_db(args, thread_name)
      if is_succeed:
        #更新自己保存在数据库中的各topic, partition的偏移量
        is_succeed1 = update_offset(topic, partition, end_offset)
        #手动提交偏移量 offsets格式:{TopicPartition:OffsetAndMetadata(offset_num,None)}
        consumer.commit(offsets={tp:(OffsetAndMetadata(end_offset,None))})
        print(thread_name,"to db suss",num+1)
        if is_succeed1 == 0:
          #系统退出?这个还没试
          os.exit()
          '''
          sys.exit()  只能退出该线程,也就是说其它两个线程正常运行,主程序不退出
          '''
      else:
        os.exit()
    else:
      print(thread_name,'没有数据')
    num+=1
    print(thread_name,"第",num,"次")
 
 
if __name__ == '__main__':
  try:
    t1 = MyThread("Thread-0", "test", 0)
    threads.append(t1)
    t2 = MyThread("Thread-1", "test", 1)
    threads.append(t2)
    t3 = MyThread("Thread-2", "test", 2)
    threads.append(t3)
 
    for t in threads:
      t.start()
 
    for t in threads:
      t.join()
 
    print("exit program with 0")
  except:
    print("Error: failed to run consumer program")

以上这篇python kafka 多线程消费者&手动提交实例就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • 对python中Librosa的mfcc步骤详解

    对python中Librosa的mfcc步骤详解

    今天小编就为大家分享一篇对python中Librosa的mfcc步骤详解,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2019-01-01
  • Python Pygame实战之红心大战游戏的实现

    Python Pygame实战之红心大战游戏的实现

    说起Windows自带的游戏,相信许多80、90后的朋友都不陌生。本文就将利用Python中的Pygame模块实现一下windows经典游戏之一的红心大战,需要的可以参考一下
    2022-02-02
  • python编程开发之日期操作实例分析

    python编程开发之日期操作实例分析

    这篇文章主要介绍了python编程开发之日期操作,以实例形式较为详细的分析了Python中datetime与time库的相关使用技巧,具有一定参考借鉴价值,需要的朋友可以参考下
    2015-11-11
  • python3.9安装RobotFramework的简单教程

    python3.9安装RobotFramework的简单教程

    python3.9安装RobotFramework,不同于python2.7和python3.6,使用这两个版本安装会出现问题,因为我安装遇到问题发现没有最新的教程,所以打算自己写一个,同时下面会记录安装步骤及使用的方法会出现的一些问题,对python3.9安装RobotFramework感兴趣的朋友一起看看吧
    2023-01-01
  • 使用Python实现将Word文档转换为PNG图片

    使用Python实现将Word文档转换为PNG图片

    在这篇博客中,我将介绍一个使用Python编写的小工具,它能够将指定文件夹中的所有Word文档转换为PNG图片,这个工具基于wxPython库构建图形用户界面,接下来,我将详细说明这个工具的功能及其实现,需要的朋友可以参考下
    2024-08-08
  • Python递归时间复杂度

    Python递归时间复杂度

    这篇文章主要介绍了Python递归时间复杂度,时间复杂度一般认为O(logn),但递归算法的时间复杂度本质上是要看递归的次数,每次递归中的操作次数,下面文章详细介绍,需要的朋友可以参考一下
    2022-03-03
  • python如何构建mock接口服务

    python如何构建mock接口服务

    这篇文章主要介绍了python如何构建mock接口服务,帮助大家更好的理解和使用python,感兴趣的朋友可以了解下
    2021-01-01
  • python文字转语音实现过程解析

    python文字转语音实现过程解析

    这篇文章主要介绍了python文字转语音实现过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-11-11
  • Python爬虫之Selenium鼠标事件的实现

    Python爬虫之Selenium鼠标事件的实现

    这篇文章主要介绍了Python爬虫之Selenium鼠标事件的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-12-12
  • Python实现批量下载ts文件并合并为mp4

    Python实现批量下载ts文件并合并为mp4

    这篇文章主要为大家详细介绍了如何通过Python语言实现批量下载ts文件并合并为mp4视频的功能,文中的示例代码讲解详细,感兴趣的小伙伴可以了解一下
    2023-06-06

最新评论