Kafka简单客户端编程实例

 更新时间:2017年11月08日 11:33:31   作者:liuyazhuang  
这篇文章主要为大家详细介绍了Kafka简单客户端编程实例,利用Kafka的API进行客户端编程,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

今天,我们给大家带来一篇如何利用Kafka的API进行客户端编程的文章,这篇文章很简单,就是利用Kafka的API创建一个生产者和消费者,生产者不断向Kafka写入消息,消费者则不断消费Kafka的消息。下面是具体的实例代码。

一、创建配置类Config

这个类很简单,只是存放了两个常量,一个是话题TOPIC,一个是线程数THREADS

package com.lya.kafka; 
 
/** 
 * 配置项 
 * @author liuyazhuang 
 * 
 */ 
public class Config { 
  
 /** 
  * 话题 
  */ 
 public static final String TOPIC = "wordcount"; 
 /** 
  * 线程数 
  */ 
 public static final Integer THREADS = 1; 
} 

二、编程生产者类ProducerDemo

这个类的主要作用就是向Kafka写入相应的消息,并且将消息写入wordcount话题。

package com.lya.kafka; 
 
import java.util.Properties; 
 
import kafka.javaapi.producer.Producer; 
import kafka.producer.KeyedMessage; 
import kafka.producer.ProducerConfig; 
 
/** 
 * 生产者实例 
 * @author liuyazhuang 
 * 
 */ 
public class ProducerDemo { 
 public static void main(String[] args) throws Exception { 
  Properties props = new Properties(); 
  props.put("zk.connect", "192.168.209.121:2181"); 
  props.put("metadata.broker.list","192.168.209.121:9092"); 
  props.put("serializer.class", "kafka.serializer.StringEncoder"); 
  props.put("zk.connectiontimeout.ms", "15000"); 
  ProducerConfig config = new ProducerConfig(props); 
  Producer<String, String> producer = new Producer<String, String>(config); 
 
  // 发送业务消息 
  // 读取文件 读取内存数据库 读socket端口 
  for (int i = 1; i <= 100; i++) { 
   Thread.sleep(500); 
   producer.send(new KeyedMessage<String, String>(Config.TOPIC, 
     "this number ===>>> " + i)); 
  } 
 
 } 
} 

三、编写消息者类ConsumerDemo

这个类的主要作用就是消费Kafka中wordcount话题的消息。

package com.lya.kafka; 
 
import java.util.HashMap; 
import java.util.List; 
import java.util.Map; 
import java.util.Properties; 
 
import kafka.consumer.Consumer; 
import kafka.consumer.ConsumerConfig; 
import kafka.consumer.KafkaStream; 
import kafka.javaapi.consumer.ConsumerConnector; 
import kafka.message.MessageAndMetadata; 
 
/** 
 * 消费者实例 
 * @author liuyazhuang 
 * 
 */ 
public class ConsumerDemo { 
  
 
 public static void main(String[] args) { 
   
  Properties props = new Properties(); 
  props.put("zookeeper.connect", "192.168.209.121:2181"); 
  props.put("group.id", "1111"); 
  props.put("auto.offset.reset", "smallest"); 
  props.put("zk.connectiontimeout.ms", "15000"); 
 
  ConsumerConfig config = new ConsumerConfig(props); 
  ConsumerConnector consumer =Consumer.createJavaConsumerConnector(config); 
  Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 
  topicCountMap.put(Config.TOPIC, Config.THREADS); 
  Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); 
  List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(Config.TOPIC); 
   
  for(final KafkaStream<byte[], byte[]> kafkaStream : streams){ 
   new Thread(new Runnable() { 
    @Override 
    public void run() { 
     for(MessageAndMetadata<byte[], byte[]> mm : kafkaStream){ 
      String msg = new String(mm.message()); 
      System.out.println(msg); 
     } 
    } 
    
   }).start(); 
   
  } 
 } 
} 

四、运行实例

首先,运行消费者类ConsumerDemo
运行结果如下:

没有打印任何信息。
此时,我们运行生产者类ProducerDemo
我们再次打开消费者的控制台查看如下:

打印出了生产者生产的消息。
至此,Kafka简单客户端编程实例结束。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

相关文章

  • 解决SpringBoot中使用@Async注解失效的问题

    解决SpringBoot中使用@Async注解失效的问题

    这篇文章主要介绍了解决SpringBoot中使用@Async注解失效的问题,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2020-09-09
  • 基于Java创建XML(无中文乱码)过程解析

    基于Java创建XML(无中文乱码)过程解析

    这篇文章主要介绍了基于Java创建XML(无中文乱码)过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-10-10
  • jpa实体@ManyToOne @OneToMany无限递归方式

    jpa实体@ManyToOne @OneToMany无限递归方式

    这篇文章主要介绍了jpa实体@ManyToOne @OneToMany无限递归方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-10-10
  • mybatis中的count()按条件查询方式

    mybatis中的count()按条件查询方式

    这篇文章主要介绍了mybatis中的count()按条件查询方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-01-01
  • Java 数据结构与算法系列精讲之单向链表

    Java 数据结构与算法系列精讲之单向链表

    单向链表特点是链表的链接方向是单向的,访问要通过顺序读取从头部开始。链表是使用指针构造的列表,是由一个个结点组装起来的,又称为结点列表。其中每个结点都有指针成员变量指向列表中的下一个结点,head指针指向第一个结点称为表头,而终止于最后一个指向nuLL的指针
    2022-02-02
  • java后台利用Apache poi 生成excel文档提供前台下载示例

    java后台利用Apache poi 生成excel文档提供前台下载示例

    本篇文章主要介绍了java后台利用Apache poi 生成excel文档提供前台下载示例,非常具有实用价值,需要的朋友可以参考下
    2017-05-05
  • 深入了解java中的string对象

    深入了解java中的string对象

    这篇文章主要介绍了java中的string对象,String对象是Java中使用最频繁的对象之一,所以Java开发者们也在不断地对String对象的实现进行优化,以便提升String对象的性能。对此感兴趣的朋友跟随小编一起看看吧
    2019-11-11
  • Spring Boot 日志概念及使用详解

    Spring Boot 日志概念及使用详解

    文章介绍了日志的重要性、日志格式、日志使用方法、日志级别、日志配置、日志持久化以及使用Lombok简化日志输出,感兴趣的朋友一起看看吧
    2025-03-03
  • Java实现画图的详细步骤(完整代码)

    Java实现画图的详细步骤(完整代码)

    今天给大家带来的是关于Java的相关知识,文章围绕着Java实现画图的详细步骤展开,文中有非常详细的介绍及代码示例,需要的朋友可以参考下
    2021-06-06
  • 详解SpringBoot的jar为什么可以直接运行

    详解SpringBoot的jar为什么可以直接运行

    SpringBoot提供了一个插件spring-boot-maven-plugin用于把程序打包成一个可执行的jar包,本文给大家介绍了为什么SpringBoot的jar可以直接运行,文中有相关的代码示例供大家参考,感兴趣的朋友可以参考下
    2024-02-02

最新评论