关于kafka发送消息的三种方式总结

 更新时间:2023年04月06日 09:44:18   作者:大佬喝可乐丶  
这篇文章主要介绍了关于kafka发送消息的三种方式总结,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教

kafka发送消息的方式

package com.zl.kafkademo;
 
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
 
import java.util.Properties;
 
/**
 * @Auther: le
 * @Date: 2019/4/23 22:05
 * @Description:
 */
public class MyProducer implements Job {
    private static KafkaProducer<String,String> producer;
 
    static {
        Properties properties = new Properties();
        properties.put("bootstrap.servers","127.0.0.1:9092");
        properties.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        producer = new KafkaProducer<String, String>(properties);
    }
 
    /**
     * 第一种直接发送,不管结果
     */
    private static void sendMessageForgetResult(){
        ProducerRecord<String,String> record = new ProducerRecord<String,String>(
                "kafka-study","name","Forget_result"
        );
        producer.send(record);
        producer.close();
    }
 
    /**
     * 第二种同步发送,等待执行结果
     * @return
     * @throws Exception
     */
    private static RecordMetadata sendMessageSync() throws Exception{
        ProducerRecord<String,String> record = new ProducerRecord<String,String>(
                "kafka-study","name","sync"
        );
        RecordMetadata result = producer.send(record).get();
        System.out.println(result.topic());
        System.out.println(result.partition());
        System.out.println(result.offset());
        return result;
    }
 
    /**
     * 第三种执行回调函数
     */
    private static void sendMessageCallback(){
        ProducerRecord<String,String> record = new ProducerRecord<String,String>(
                "kafka-study","name","callback"
        );
        producer.send(record,new MyProducerCallback());
    }
 
    //定时任务
    @Override
    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        try {
            sendMessageSync();
        }catch (Exception e){
            System.out.println("error:"+e);
        }
 
    }
 
    private static class MyProducerCallback implements Callback{
 
        @Override
        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
            if (e !=null){
                e.printStackTrace();
                return;
            }
            System.out.println(recordMetadata.topic());
            System.out.println(recordMetadata.partition());
            System.out.println(recordMetadata.offset());
            System.out.println("Coming in MyProducerCallback");
        }
    }
 
 
    public static void main(String[] args){
        //sendMessageForgetResult();
        //sendMessageCallback();
        JobDetail job = JobBuilder.newJob(MyProducer.class).build();
 
        Trigger trigger = TriggerBuilder.newTrigger()
                .withSchedule(SimpleScheduleBuilder.repeatSecondlyForever()).build();
 
        try {
            Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
            scheduler.scheduleJob(job,trigger);
            scheduler.start();
        }catch (SchedulerException e){
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        }
 
    }
 
 
}

需要引入文件

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.0.1</version>
        </dependency>
 
        <dependency>
            <groupId>org.quartz-scheduler</groupId>
            <artifactId>quartz</artifactId>
            <version>2.3.0</version>
        </dependency>

测试方法

MAC下操作指令

1、创建主题:

./kafka-topics.sh --create --topic kafka-study --zookeeper 127.0.0.1:2181 --config max.message.bytes=12800000 --config flush.messages=1 --partitions 5 --replication-factor 1

2、运行上述程序,执行定时任务

3、查看消费情况

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafka-study --from-beginning

windows操作指令

 1、进入  D:\zookeeper-3.4.14\bin   打开新的cmd,输入“zkServer“,运行Zookeeper

 2、进入 D:\kafka_2.11-0.11.0.0 运行cmd

.\bin\windows\kafka-server-start.bat .\config\server.properties

3、 创建主题

进入D:\kafka_2.11-0.11.0.0运行cmd,输入:

.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

查看已创建主题:

.\bin\windows\kafka-topics.bat --list --zookeeper localhost:2181

查看指定主题的详细信息:

.\bin\windows\kafka-topics.bat --describe --zookeeper localhost:2181 --topic test

查看主题消费详情:

.\bin\windows\kafka-console-consumer.bat --zookeeper localhost:2181 --topic kafka-study --from-beginning

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • Java基础之内存泄漏与溢出详解

    Java基础之内存泄漏与溢出详解

    今天带大家来了解一下Java内存泄漏与溢出的知识,文中有非常详细的介绍,对正在学习Java基础的各位小伙伴呢很有帮助哟,需要的朋友可以参考下
    2021-05-05
  • SpringMVC视图转发重定向区别及控制器详解

    SpringMVC视图转发重定向区别及控制器详解

    这篇文章主要为大家介绍了SpringMVC视图转发重定向区别及控制器示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-05-05
  • SpringBoot可视化接口开发工具magic-api的简单使用教程

    SpringBoot可视化接口开发工具magic-api的简单使用教程

    作为Java后端开发,平时开发API接口的时候经常需要定义Controller、Service、Dao、Mapper、XML、VO等Java对象。有没有什么办法可以让我们不写这些代码,直接操作数据库生成API接口呢?今天给大家推荐一款工具magic-api,来帮我们实现这个小目标!
    2021-06-06
  • Java基于Socket实现多人聊天室

    Java基于Socket实现多人聊天室

    这篇文章主要为大家详细介绍了Java基于Socket实现多人聊天室,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2022-09-09
  • Java编程实现多线程TCP服务器完整实例

    Java编程实现多线程TCP服务器完整实例

    这篇文章主要介绍了Java编程实现多线程TCP服务器完整实例,具有一定借鉴价值,需要的朋友可以参考下
    2018-01-01
  • Java 详细分析四个经典链表面试题

    Java 详细分析四个经典链表面试题

    兄弟们,编程,当我们学习完数据结构的时候,你就会有一种豁然开朗的感觉。算是真正的入了编程的门,所以打好数据结构的基础是特别特别重要的
    2022-03-03
  • Java实现中文字符串与unicode互转工具类

    Java实现中文字符串与unicode互转工具类

    这篇文章主要为大家详细介绍了Java实现中文字符串与unicode互转的工具类,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2019-04-04
  • Java使用HttpUtils实现发送HTTP请求

    Java使用HttpUtils实现发送HTTP请求

    这篇文章主要介绍了Java使用HttpUtils实现发送HTTP请求,HTTP请求,在日常开发中,还是比较常见的,今天给大家分享HttpUtils如何使用,需要的朋友可以参考下
    2023-05-05
  • Java synchronized轻量级锁的核心原理详解

    Java synchronized轻量级锁的核心原理详解

    这篇文章主要为大家详细介绍了Java synchronized轻量级锁的核心原理,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下,希望能够给你带来帮助
    2022-03-03
  • 深入浅出讲解Java集合之Map接口

    深入浅出讲解Java集合之Map接口

    这篇文章主要介绍了深入浅出讲解Java集合之Map接口,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-09-09

最新评论