Springboot 2.x集成kafka 2.2.0的示例代码

 更新时间:2022年04月28日 10:22:08   作者:g-Jack  
kafka近几年更新非常快,也可以看出kafka在企业中是用的频率越来越高。本文主要为大家介绍了Springboot 2.x集成kafka 2.2.0的示例代码,需要的可以参考一下

引言

kafka近几年更新非常快,也可以看出kafka在企业中是用的频率越来越高,在springboot中集成kafka还是比较简单的,但是应该注意使用的版本和kafka中基本配置,这个地方需要信心,防止进入坑中。

版本对应地址:https://spring.io/projects/spring-kafka

基本环境

springboot版本2.1.4

kafka版本2.2.0

jdk 1.8

代码编写

1、基本引用pom

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.1.4.RELEASE</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>com.example</groupId>
	<artifactId>demo</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>kafkademo</name>
	<description>Demo project for Spring Boot</description>
 
	<properties>
		<java.version>1.8</java.version>
	</properties>
 
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
 
		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
			<scope>runtime</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
 
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
			<version>2.2.0.RELEASE</version>
		</dependency>
 
		<dependency>
			<groupId>com.google.code.gson</groupId>
			<artifactId>gson</artifactId>
			<version>2.7</version>
		</dependency>
 
	</dependencies>
 
	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
 
</project>

2、基本配置

spring.kafka.bootstrap-servers=2.1.1.1:9092
spring.kafka.consumer.group-id=test-consumer-group
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
 
#logging.level.root=debug

3、实体类

package com.example.demo.model;
 
import java.util.Date;
 
public class Messages {
 
    private Long id;
 
    private String msg;
 
    private Date sendTime;
 
    public Long getId() {
        return id;
    }
 
    public void setId(Long id) {
        this.id = id;
    }
 
    public String getMsg() {
        return msg;
    }
 
    public void setMsg(String msg) {
        this.msg = msg;
    }
 
    public Date getSendTime() {
        return sendTime;
    }
 
    public void setSendTime(Date sendTime) {
        this.sendTime = sendTime;
    }
}

4、生产者端

package com.example.demo.service;
 
import com.example.demo.model.Messages;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
 
import java.util.Date;
import java.util.UUID;
 
@Service
public class KafkaSender {
 
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
 
    private Gson gson = new GsonBuilder().create();
 
    public void send() {
        Messages message = new Messages();
        message.setId(System.currentTimeMillis());
        message.setMsg("123");
        message.setSendTime(new Date());
        ListenableFuture<SendResult<String, String>> test0 = kafkaTemplate.send("newtopic", gson.toJson(message));
    }
}

5、消费者

package com.example.demo.service;
 
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
 
import java.util.Optional;
 
@Service
public class KafkaReceiver {
 
    @KafkaListener(topics = {"newtopic"})
    public void listen(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            Object message = kafkaMessage.get();
            System.out.println("record =" + record);
            System.out.println("message =" + message);
 
        }
    }
    
}

6、测试

在启动方法中模拟消息生产者,向kafka中发送消息

package com.example.demo;
 
import com.example.demo.service.KafkaSender;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
 
@SpringBootApplication
public class KafkademoApplication {
 
	public static void main(String[] args) {
		ConfigurableApplicationContext context = SpringApplication.run(KafkademoApplication.class, args);
 
		KafkaSender sender = context.getBean(KafkaSender.class);
		for (int i = 0; i <1000; i++) {
			sender.send();
 
			try {
				Thread.sleep(300);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
 
 
	}
 
}

效果展示

命令行直接消费消息

遇到的问题

生产端连接kafka超时

at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:119)

解决方案:

修改kafka中的server.properties中的下面配置,将原来的默认配置替换成下面ip+端口的形式,重启kafka

到此这篇关于Springboot 2.x集成kafka 2.2.0的示例代码的文章就介绍到这了,更多相关Springboot集成kafka内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Java静态方法和实例方法区别详解

    Java静态方法和实例方法区别详解

    这篇文章主要为大家详细介绍了Java静态方法和实例方法的区别,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2016-12-12
  • 基于Spring中各个jar包的作用及依赖(详解)

    基于Spring中各个jar包的作用及依赖(详解)

    下面小编就为大家带来一篇基于Spring中各个jar包的作用及依赖(详解)。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-11-11
  • java实现菜单滑动效果

    java实现菜单滑动效果

    这篇文章主要介绍了java实现菜单滑动效果,效果非常棒,这里推荐给大家,有需要的小伙伴可以参考下。
    2015-03-03
  • Java超详细教你写一个网络购书系统案例

    Java超详细教你写一个网络购书系统案例

    这篇文章主要介绍了怎么用Java来写一个购书系统,购买书籍主要需要每本书的编号、书名、单价、库存属性,能够让客户通过编号来选书,感兴趣的朋友跟随文章往下看看吧
    2022-03-03
  • java中元素排序Comparable和Comparator的区别

    java中元素排序Comparable和Comparator的区别

    本文主要介绍了java中元素排序Comparable和Comparator的区别,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2021-12-12
  • java实现读取jar包中配置文件的几种方式

    java实现读取jar包中配置文件的几种方式

    本文主要介绍了java实现读取jar包中配置文件的几种方式,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-06-06
  • Java 1.0和Java 1.1 的IO类的比较

    Java 1.0和Java 1.1 的IO类的比较

    Java库的IO分为输入/输出两部分。早期的Java 1.0版本的输入系统是InputStream及其子类,输出系统是OutputStream及其子类。后来的Java 1.1版本对IO系统进行了重新设计。本分对此做了分析和比较,有利于学习,最后给出了例子。
    2013-11-11
  • Mybatis结果集映射与生命周期详细介绍

    Mybatis结果集映射与生命周期详细介绍

    结果集映射指的是将数据表中的字段与实体类中的属性关联起来,这样 MyBatis 就可以根据查询到的数据来填充实体对象的属性,帮助我们完成赋值操作
    2022-10-10
  • MyBatis-Plus拦截器对敏感数据实现加密

    MyBatis-Plus拦截器对敏感数据实现加密

    做课程项目petstore时遇到需要加密属性的问题,而MyBatis-Plus为开发者提供了拦截器的相关接口,本文主要介绍通过MyBatis-Plus的拦截器接口自定义一个拦截器类实现敏感数据如用户密码的加密功能,感兴趣的可以了解一下
    2021-11-11
  • Java中的@Repeatable注解的作用详解

    Java中的@Repeatable注解的作用详解

    这篇文章主要介绍了Java中的@Repeatable注解的作用详解,@Repeatable注解是用来标注一个注解在同一个地方可重复使用的一个注解,使被他注释的注解可以在同一个地方重复使用,需要的朋友可以参考下
    2024-01-01

最新评论