python消费kafka数据批量插入到es的方法

 更新时间:2018年12月27日 09:52:04   作者:亮亮爱吃虾  
今天小编就为大家分享一篇python消费kafka数据批量插入到es的方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧

1、es的批量插入

这是为了方便后期配置的更改,把配置信息放在logging.conf中

用elasticsearch来实现批量操作,先安装依赖包,sudo pip install Elasticsearch2

from elasticsearch import Elasticsearch 
class ImportEsData:

  logging.config.fileConfig("logging.conf")
  logger = logging.getLogger("msg")

  def __init__(self,hosts,index,type):
    self.es = Elasticsearch(hosts=hosts.strip(',').split(','), timeout=5000)
    self.index = index
    self.type = type


  def set_date(self,data): 
    # 批量处理 
    # es.index(index="test-index",doc_type="test-type",id=42,body={"any":"data","timestamp":datetime.now()})
    self.es.index(index=self.index,doc_type=self.index,body=data)

2、使用pykafka消费kafka

1.因为kafka是0.8,pykafka不支持zk,只能用get_simple_consumer来实现

2.为了实现多个应用同时消费而且不重消费,所以一个应用消费一个partition

3. 为是确保消费数据量在不满足10000这个批量值,能在一个时间范围内插入到es中,这里设置consumer_timeout_ms一个超时等待时间,退出等待消费阻塞。

4.退出等待消费阻塞后导致无法再消费数据,因此在获取self.consumer 的外层加入了while True 一个死循环

#!/usr/bin/python
# -*- coding: UTF-8 -*-
from pykafka import KafkaClient
import logging
import logging.config
from ConfigUtil import ConfigUtil
import datetime


class KafkaPython:
  logging.config.fileConfig("logging.conf")
  logger = logging.getLogger("msg")
  logger_data = logging.getLogger("data")

  def __init__(self):
    self.server = ConfigUtil().get("kafka","kafka_server")
    self.topic = ConfigUtil().get("kafka","topic")
    self.group = ConfigUtil().get("kafka","group")
    self.partition_id = int(ConfigUtil().get("kafka","partition"))
    self.consumer_timeout_ms = int(ConfigUtil().get("kafka","consumer_timeout_ms"))
    self.consumer = None
    self.hosts = ConfigUtil().get("es","hosts")
    self.index_name = ConfigUtil().get("es","index_name")
    self.type_name = ConfigUtil().get("es","type_name")


  def getConnect(self):
    client = KafkaClient(self.server)
    topic = client.topics[self.topic]
    p = topic.partitions
    ps={p.get(self.partition_id)}

    self.consumer = topic.get_simple_consumer(
      consumer_group=self.group,
      auto_commit_enable=True,
      consumer_timeout_ms=self.consumer_timeout_ms,
      # num_consumer_fetchers=1,
      # consumer_id='test1',
      partitions=ps
      )
    self.starttime = datetime.datetime.now()


  def beginConsumer(self):
    print("beginConsumer kafka-python")
    imprtEsData = ImportEsData(self.hosts,self.index_name,self.type_name)
    #创建ACTIONS 
    count = 0
    ACTIONS = [] 

    while True:
      endtime = datetime.datetime.now()
      print (endtime - self.starttime).seconds
      for message in self.consumer:
        if message is not None:
          try:
            count = count + 1
            # print(str(message.partition.id)+","+str(message.offset)+","+str(count))
            # self.logger.info(str(message.partition.id)+","+str(message.offset)+","+str(count))
            action = { 
              "_index": self.index_name, 
              "_type": self.type_name, 
              "_source": message.value
            }
            ACTIONS.append(action)
            if len(ACTIONS) >= 10000:
              imprtEsData.set_date(ACTIONS)
              ACTIONS = []
              self.consumer.commit_offsets()
              endtime = datetime.datetime.now()
              print (endtime - self.starttime).seconds
              #break
          except (Exception) as e:
            # self.consumer.commit_offsets()
            print(e)
            self.logger.error(e)
            self.logger.error(str(message.partition.id)+","+str(message.offset)+","+message.value+"\n")
            # self.logger_data.error(message.value+"\n")
          # self.consumer.commit_offsets()


      if len(ACTIONS) > 0:
        self.logger.info("等待时间超过,consumer_timeout_ms,把集合数据插入es")
        imprtEsData.set_date(ACTIONS)
        ACTIONS = []
        self.consumer.commit_offsets()




  def disConnect(self):
    self.consumer.close()


from elasticsearch import Elasticsearch 
from elasticsearch.helpers import bulk
class ImportEsData:

  logging.config.fileConfig("logging.conf")
  logger = logging.getLogger("msg")

  def __init__(self,hosts,index,type):
    self.es = Elasticsearch(hosts=hosts.strip(',').split(','), timeout=5000)
    self.index = index
    self.type = type


  def set_date(self,data): 
    # 批量处理 
    success = bulk(self.es, data, index=self.index, raise_on_error=True) 
    self.logger.info(success) 

3、运行

if __name__ == '__main__':
  kp = KafkaPython()
  kp.getConnect()
  kp.beginConsumer()
  # kp.disConnect()

注:简单的写了一个从kafka中读取数据到一个list里,当数据达到一个阈值时,在批量插入到 es的插件

现在还在批量的压测中。。。

以上这篇python消费kafka数据批量插入到es的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • 1秒钟使用python建立文件服务器的方法步骤

    1秒钟使用python建立文件服务器的方法步骤

    本文主要介绍了1秒钟使用python建立文件服务器的方法步骤,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2021-10-10
  • Python各种类型装饰器详细介绍

    Python各种类型装饰器详细介绍

    大家好,本篇文章主要讲的是Python各种类型装饰器详细介绍,感兴趣的同学赶快来看一看吧,对你有帮助的话记得收藏一下,方便下次浏览
    2021-12-12
  • Python解析xml中dom元素的方法

    Python解析xml中dom元素的方法

    这篇文章主要介绍了Python解析xml中dom元素的方法,实例分析了Python操作XML中元素的技巧,具有一定参考借鉴价值,需要的朋友可以参考下
    2015-03-03
  • Python使用Turtle图形函数画图颜色填充实例

    Python使用Turtle图形函数画图颜色填充实例

    这篇文章主要介绍了Python使用Turtle图形函数画图颜色填充实例,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-08-08
  • 关于WARNING:Ignoring invalid distribution -pencv-python....警告信息的处理方法(已解决!)

    关于WARNING:Ignoring invalid distribution -pencv-python....

    这篇文章主要给大家介绍了关于WARNING:Ignoring invalid distribution -pencv-python....警告信息的处理方法,文中通过图文将解决的办法介绍的非常详细,对大家学习或者使用python具有一定的参考学习价值,需要的朋友可以参考下
    2023-03-03
  • Pytorch反向求导更新网络参数的方法

    Pytorch反向求导更新网络参数的方法

    今天小编就为大家分享一篇Pytorch反向求导更新网络参数的方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2019-08-08
  • python爬虫使用scrapy注意事项

    python爬虫使用scrapy注意事项

    在本篇文章里小编给大家整理的是一篇关于python爬虫使用scrapy注意事项的相关文章,对此有兴趣的朋友们可以学习下。
    2020-11-11
  • python中os.path.exits()的坑

    python中os.path.exits()的坑

    本文主要介绍了python中os.path.exits()的坑,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-04-04
  • 使用python读写txt和json(jsonl)大文件的方法步骤

    使用python读写txt和json(jsonl)大文件的方法步骤

    在Python中读取txt和json(jsonl)大文件并保存到字典是一项非常常见的操作,这篇文章主要给大家介绍了关于使用python读写txt和json(jsonl)大文件的方法步骤,需要的朋友可以参考下
    2023-12-12
  • python3实现字符串的全排列的方法(无重复字符)

    python3实现字符串的全排列的方法(无重复字符)

    这篇文章主要介绍了python3实现字符串的全排列的方法(无重复字符),小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2018-07-07

最新评论