关于kafka发送消息的三种方式总结

 更新时间:2023年04月06日 09:44:18   作者:大佬喝可乐丶  
这篇文章主要介绍了关于kafka发送消息的三种方式总结,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教

kafka发送消息的方式

package com.zl.kafkademo;
 
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
 
import java.util.Properties;
 
/**
 * @Auther: le
 * @Date: 2019/4/23 22:05
 * @Description:
 */
public class MyProducer implements Job {
    private static KafkaProducer<String,String> producer;
 
    static {
        Properties properties = new Properties();
        properties.put("bootstrap.servers","127.0.0.1:9092");
        properties.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        producer = new KafkaProducer<String, String>(properties);
    }
 
    /**
     * 第一种直接发送,不管结果
     */
    private static void sendMessageForgetResult(){
        ProducerRecord<String,String> record = new ProducerRecord<String,String>(
                "kafka-study","name","Forget_result"
        );
        producer.send(record);
        producer.close();
    }
 
    /**
     * 第二种同步发送,等待执行结果
     * @return
     * @throws Exception
     */
    private static RecordMetadata sendMessageSync() throws Exception{
        ProducerRecord<String,String> record = new ProducerRecord<String,String>(
                "kafka-study","name","sync"
        );
        RecordMetadata result = producer.send(record).get();
        System.out.println(result.topic());
        System.out.println(result.partition());
        System.out.println(result.offset());
        return result;
    }
 
    /**
     * 第三种执行回调函数
     */
    private static void sendMessageCallback(){
        ProducerRecord<String,String> record = new ProducerRecord<String,String>(
                "kafka-study","name","callback"
        );
        producer.send(record,new MyProducerCallback());
    }
 
    //定时任务
    @Override
    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        try {
            sendMessageSync();
        }catch (Exception e){
            System.out.println("error:"+e);
        }
 
    }
 
    private static class MyProducerCallback implements Callback{
 
        @Override
        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
            if (e !=null){
                e.printStackTrace();
                return;
            }
            System.out.println(recordMetadata.topic());
            System.out.println(recordMetadata.partition());
            System.out.println(recordMetadata.offset());
            System.out.println("Coming in MyProducerCallback");
        }
    }
 
 
    public static void main(String[] args){
        //sendMessageForgetResult();
        //sendMessageCallback();
        JobDetail job = JobBuilder.newJob(MyProducer.class).build();
 
        Trigger trigger = TriggerBuilder.newTrigger()
                .withSchedule(SimpleScheduleBuilder.repeatSecondlyForever()).build();
 
        try {
            Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
            scheduler.scheduleJob(job,trigger);
            scheduler.start();
        }catch (SchedulerException e){
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        }
 
    }
 
 
}

需要引入文件

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.0.1</version>
        </dependency>
 
        <dependency>
            <groupId>org.quartz-scheduler</groupId>
            <artifactId>quartz</artifactId>
            <version>2.3.0</version>
        </dependency>

测试方法

MAC下操作指令

1、创建主题:

./kafka-topics.sh --create --topic kafka-study --zookeeper 127.0.0.1:2181 --config max.message.bytes=12800000 --config flush.messages=1 --partitions 5 --replication-factor 1

2、运行上述程序,执行定时任务

3、查看消费情况

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafka-study --from-beginning

windows操作指令

 1、进入  D:\zookeeper-3.4.14\bin   打开新的cmd,输入“zkServer“,运行Zookeeper

 2、进入 D:\kafka_2.11-0.11.0.0 运行cmd

.\bin\windows\kafka-server-start.bat .\config\server.properties

3、 创建主题

进入D:\kafka_2.11-0.11.0.0运行cmd,输入:

.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

查看已创建主题:

.\bin\windows\kafka-topics.bat --list --zookeeper localhost:2181

查看指定主题的详细信息:

.\bin\windows\kafka-topics.bat --describe --zookeeper localhost:2181 --topic test

查看主题消费详情:

.\bin\windows\kafka-console-consumer.bat --zookeeper localhost:2181 --topic kafka-study --from-beginning

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • Java获取堆栈信息的三种方法小结

    Java获取堆栈信息的三种方法小结

    在Java编程中,获取堆栈信息对于调试和故障排除非常重要,Java提供了多种方式来获取当前线程的堆栈信息,下面就跟随小编一起学习一下常用的三种吧
    2024-03-03
  • java编程中实现调用js方法分析

    java编程中实现调用js方法分析

    这篇文章主要介绍了java编程中实现调用js方法,结合具体实例形式较为详细的分析了java编程中调用js方法的常用操作技巧与注意事项,需要的朋友可以参考下
    2017-09-09
  • Java实现一个简易版的多级菜单功能

    Java实现一个简易版的多级菜单功能

    这篇文章主要给大家介绍了关于Java如何实现一个简易版的多级菜单功能的相关资料,文中通过实例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2022-01-01
  • java基础之注解示例详解

    java基础之注解示例详解

    大家好,本篇文章主要讲的是java基础之注解示例详解,感兴趣的同学赶快来看一看吧,对你有帮助的话记得收藏一下,方便下次浏览
    2021-12-12
  • 解决在Gradle/IDEA中无法正常使用readLine的问题原因

    解决在Gradle/IDEA中无法正常使用readLine的问题原因

    这篇文章主要介绍了在Gradle/IDEA中无法正常使用readLine的解决方法,原因是由于Gradle的标准输入默认并不与系统标准输入绑定,需手动设置,需要的朋友可以参考下
    2021-12-12
  • JAVA利用泛型返回类型不同的对象方法

    JAVA利用泛型返回类型不同的对象方法

    下面小编就为大家带来一篇JAVA利用泛型返回类型不同的对象方法。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-02-02
  • Spring内置定时任务调度@Scheduled使用详解

    Spring内置定时任务调度@Scheduled使用详解

    这篇文章主要介绍了Spring内置定时任务调度@Scheduled使用详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-12-12
  • Java实现字符编码转换(utf-8/gbk)

    Java实现字符编码转换(utf-8/gbk)

    这篇文章主要为大家详细介绍了如何使用Java实现字符编码转换工具,主要针对UTF-8和GBK两种编码格式,文中的示例代码讲解详,需要的可以了解下
    2025-03-03
  • Java CPU性能分析工具代码实例

    Java CPU性能分析工具代码实例

    这篇文章主要介绍了Java CPU性能分析工具代码实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-01-01
  • SpringBoot基于AbstractRoutingDataSource实现多数据源动态切换

    SpringBoot基于AbstractRoutingDataSource实现多数据源动态切换

    本文主要介绍了SpringBoot基于AbstractRoutingDataSource实现多数据源动态切换,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2022-05-05

最新评论