springboot使用kafka推送数据到服务端的操作方法带认证

 更新时间:2024年11月06日 10:22:29   作者:Alina_shu  
在使用Kafka进行数据推送时,遇到认证问题导致连接失败,本文详细介绍了Kafka的认证配置过程,包括配置文件的引入和参数设置,实际测试表明,需要正确配置sasl.jaas.config以及其他认证参数,探讨了配置文件是否可以同时存在多个配置块的可能性,并提出了实际操作中的注意事项

遇到的问题

在实际开发过程中,因为推送数据需要用到kafka,为了比较方便与后续其他需求需要使用kafka,所以开发的过程中是设想能写一个工具类,方便后续的使用,但是,测试不带认证的kafka服务端的时候,发送是正常的,但是实际情况是,对方的服务器需要认证,导致遇到连不上对方服务,推送失败的问题,需要找对方确认对方的认证配置信息。并且在查询怎么处理的时候,验证也出现了比较奇葩的情况,那本次文章就简单写遇到的问题和验证结果。

碰到的天坑

1.度的时候,总是说引入了配置文件就完事,其他的配置不需要再配sasl.jaas.config,但是我实际测试下来不行啊。还是得引入配置文件+配置参数设置。
2.第二个基于未来思考的问题是,这个引入的配置文件内容,是否可以多个。据度来度去的结果说是可以的,按顺序会去逐一匹配直到匹配成功,那就是下面配置的KafkaClient配置块是可以多个,就名称不一样就行。但是碍于条件限制,没试过。😀

处理步骤

加载配置
度了很多文章,都提到了需要在服务启动的时候引入认证配置文件,设置属性 java.security.auth.login.config,度了一下是有两种方式。
a). 在服务启动的时候用参数引入,命令如下:
java -Djava.security.auth.login.config=(具体地址自己填,因为我配置文件是跟test.jar同级目录所以无前缀)kafka_jaas_config.config -jar test.jar
b). 在代码中去引入,如:(这方式我是没试的)
System.setProperty(“java.security.auth.login.config”, “kafka_jaas_config.config”);

c). 配置文件的内容(注意password后面那个该死的分号是要的,配置参数设置的时候也是要的)

KafkaClient {  
    org.apache.kafka.common.security.scram.ScramLoginModule required  
    username="user"  
    password="123";  
};

设置认证参数(如果有的话)
类似要填的参数是:
security.protocol,sasl.mechanism,sasl.jaas.config(,sasl.username和sasl.password,这两个我看是高版本直接配置据说能认证,不需要sasl.jaas.config,但是低版本是需要sasl.jaas.config,所以建议是可以都配上)
such as :

props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"user\" password=\"123\";");

工具类 

package platform.cars.utils;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.retry.annotation.Backoff;
import org.springframework.retry.annotation.Retryable;
import javax.annotation.PreDestroy;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
 * @Auther: Ms.y
 */
@Configuration
public class KafkaUtil {
    private static final ConcurrentHashMap<String, KafkaTemplate<String, String>> templateCache = new ConcurrentHashMap<>();
    private Map<String, Object> kafkaProducerConfigs(String servers, Map<String, Object> otherConfigs) {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        if (otherConfigs != null){
            otherConfigs.forEach(props::put);
        }
        return props;
    }
    public KafkaTemplate<String, String> getKafkaTemplate(String servers, Map<String, Object> otherConfigs) {
        return templateCache.computeIfAbsent(servers, bs -> createKafkaTemplate(bs, otherConfigs));
    }
    private KafkaTemplate<String, String> createKafkaTemplate(String servers, Map<String, Object> otherConfigs) {
        Map<String, Object> configs = kafkaProducerConfigs(servers, otherConfigs);
        ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(configs);
        return new KafkaTemplate<>(producerFactory);
    }
    @PreDestroy
    public void destroy() {
        for (KafkaTemplate<String, String> template : templateCache.values()) {
            template.destroy();
        }
    }
}

怎么用

1.正常的autowired就行
2.自己查配置还有是否需要认证的信息
3.获取template去发送数据
4.处理结果

Map<String,Object> configs = new HashMap<>();
//自己给configs 填值
KafkaTemplate kafkaTemplate = kafkaUtil.getKafkaTemplate("ip:port",configs);
kafkaTemplate.send("topic名称", "消息内容");

废话

我这是因为为了方便加了个缓存队列,存储了kafka已经连过的服务,不需要的话完全可以自己改造去掉这部分。如果有可以精进的问题可以提啊,欢迎挑刺。

到此这篇关于springboot使用kafka推送数据到服务端,带认证的文章就介绍到这了,更多相关springboot kafka推送数据内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • java实现简单的ATM项目

    java实现简单的ATM项目

    这篇文章主要为大家详细介绍了java实现简单的ATM项目,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2020-10-10
  • SpringBoot集成RbbitMQ队列踩坑记录

    SpringBoot集成RbbitMQ队列踩坑记录

    这篇文章主要介绍了SpringBoot集成RbbitMQ队列踩坑记录,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-04-04
  • java实现线性表及其算法

    java实现线性表及其算法

    线性表是最简单和最常用的一种数据结构,它是有n个体数据元素(节点)组成的有限序列,这篇文章主要介绍了java实现线性表及其算法,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2018-06-06
  • Java实现将容器 Map中的内容保存到数组

    Java实现将容器 Map中的内容保存到数组

    这篇文章主要介绍了Java实现将容器 Map中的内容保存到数组,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2020-09-09
  • Java实现帧动画的实例代码

    Java实现帧动画的实例代码

    这篇文章主要介绍了Java实现帧动画的实例代码,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2018-05-05
  • Spring Boot缓存实战之Redis 设置有效时间和自动刷新缓存功能(时间支持在配置文件中配置)

    Spring Boot缓存实战之Redis 设置有效时间和自动刷新缓存功能(时间支持在配置文件中配置)

    这篇文章主要介绍了Spring Boot缓存实战 Redis 设置有效时间和自动刷新缓存,时间支持在配置文件中配置,需要的朋友可以参考下
    2023-05-05
  • java类实现日期的时间差的实例讲解

    java类实现日期的时间差的实例讲解

    在本篇文章里小编给大家整理的是一篇关于java类实现日期的时间差的实例讲解内容,有兴趣的朋友们可以学习下。
    2021-01-01
  • 详解Java七大阻塞队列之SynchronousQueue

    详解Java七大阻塞队列之SynchronousQueue

    SynchronousQueue不需要存储线程间交换的数据,它的作用像是一个匹配器,使生产者和消费者一一匹配。本文详细讲解了Java七大阻塞队列之一SynchronousQueue,需要了解的小伙伴可以参考一下这篇文章
    2021-09-09
  • java字符串的重要使用方法以及实例

    java字符串的重要使用方法以及实例

    在本篇文章里小编给大家整理了关于java字符串的重要使用方法以及实例代码,需要的朋友们可以跟着学习参考下。
    2019-03-03
  • SpringBoot整合阿里云视频点播的过程详解

    SpringBoot整合阿里云视频点播的过程详解

    视频点播(ApsaraVideo for VoD)是集音视频采集、编辑、上传、自动化转码处理、媒体资源管理、分发加速于一体的一站式音视频点播解决方案。这篇文章主要介绍了SpringBoot整合阿里云视频点播的详细过程,需要的朋友可以参考下
    2021-12-12

最新评论