Kafka的安装及接入SpringBoot的详细过程

 更新时间:2024年05月15日 11:30:26   作者:LB_bei  
Kafka 是一种高性能、分布式的消息队列系统,最初由 LinkedIn 公司开发,并于2011年成为 Apache 顶级项目,这篇文章主要介绍了Kafka的安装及接入SpringBoot,需要的朋友可以参考下

环境:windows、jdk1.8、springboot2

Apache Kafka
Apache Kafka: A Distributed Streaming Platform.
https://kafka.apache.org/

1.概述

Kafka 是一种高性能、分布式的消息队列系统,最初由 LinkedIn 公司开发,并于2011年成为 Apache 顶级项目。它设计用于处理大规模的实时数据流,具有高吞吐量、低延迟、持久性等特点,被广泛应用于构建实时数据管道、日志收集、事件驱动架构等场景。

详细概述见Kafka概述:

1.1 Kafka的作用

  • 发布和订阅记录流
  • 持久存储记录流,Kafka中的数据即使消费后也不会消失
  • 在系统或应用之间构建可靠获取数据的实时流数据管道
  • 构建转换或响应数据流的实时流应用程序
  • Kafka可以处理源源不断产生的数据

1.2 Kafka的一些概念

  • topic:Kafka将消息分门别类,每一类的消息称之为一个主题(Topic 就是Rabbitmq中的queue)
  • producer:发布消息的对象称之为主题生产者(Kafka topic producer)
  • consumer:订阅消息并处理发布的消息的对象称之为主题消费者(consumers)
  • broker:已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker)。 消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。

2.Kafka下载安装

Apache Kafka
Apache Kafka: A Distributed Streaming Platform.
https://kafka.apache.org/downloads

选择最新版就可以

2.1 配置kafka

解压下载的文件,修改 config 文件夹下的 zookeeper.properties

修改 config 文件夹下的server.properties

当需要外网访问时要配置advertised.listeners(比如连云服务器的kafka)

advertised.listeners=PLAINTEXT://xxx.xxx.xxx.xxx:9092

2.2 启动 zookeeper

Zookeeper 在 Kafka 中充当了分布式协调服务的角色,帮助 Kafka 实现了集群管理、元数据存储、故障恢复、领导者选举等功能,是 Kafka 高可用性、可靠性和分布式特性的重要支撑。

kafka_2.13-3.7.0\bin\windows文件夹中输入命令:

zookeeper-server-start.bat ../../config/zookeeper.properties

可以本地访问看一下:http://localhost:2181/

2.3 启动Kafka

kafka_2.13-3.7.0\bin\windows文件夹中输入命令:

kafka-server-start.sh ../../config/server.properties

访问路径:http://localhost:9092/

2.4 便捷启动脚本

两个脚本放到Kafka的目录(kafka_2.13-3.7.0)中

cd bin\windows

zookeeper-server-start.bat ../../config/zookeeper.properties

cd bin\windows

kafka-server-start.bat ../../config/server.properties

3.springboot集成Kafka

3.1 环境搭建

(1)添加pom依赖

<!-- 继承Spring boot工程 -->
<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.2.8.RELEASE</version>
</parent>
<properties>
    <fastjson.version>1.2.58</fastjson.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!-- kafkfa -->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>${fastjson.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
    </dependency>
</dependencies>

(2)配置类application.yml

生产者:

spring:
  kafka:
    bootstrap-servers: xxx.xxx.xxx.xxx:9092
    producer:
      retries: 0
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

消费者:

spring:
  kafka:
    bootstrap-servers: xxx.xxx.xxx.xxx:9092
    consumer:
      group-id: kafka-demo-kafka-group
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

(3)启动类

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class KafkaApp {
    public static void main(String[] args) {
        SpringApplication.run(KafkaApp.class, args);
    }
}

3.2 消息生产者

junit测试,新建消息发送方

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.test.context.junit4.SpringRunner;
​
​
@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaSendTest {
    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate; //如果这里有红色波浪线,那是假错误
​
    @Test
    public void sendMsg(){
        String topic = "spring_test";
        kafkaTemplate.send(topic,"hello spring boot kafka!");
        System.out.println("发送成功.");
        while (true){ //保存加载ioc容器
​
        }
    }
}

3.3 消息消费者

新建监听类:

​
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
​
@Component
public class MyKafkaListener {
​
 //    以下两种方法都行
 // 指定监听的主题
//    @KafkaListener(topics = "spring_test")
//    public void receiveMsg(String message){
//        System.out.println("接收到的消息:"+message);
//    }
​
    @KafkaListener(topics = "spring_test")
    public void handleMessage(ConsumerRecord<String, String> record) {
        System.out.println("接收到消息,偏移量为: " + record.offset() + " 消息为: " + record.value());
    }
}

到此这篇关于Kafka的安装及接入SpringBoot的文章就介绍到这了,更多相关Kafka的安装及接入SpringBoot内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • JAVA使用hutool工具实现查询树结构数据(省市区)

    JAVA使用hutool工具实现查询树结构数据(省市区)

    今天通过本文给大家分享JAVA使用hutool工具实现查询树结构数据(省市区),代码分为表结构和数据结构,代码简单易懂,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧
    2021-08-08
  • mybatis中几种typeHandler的定义使用详解

    mybatis中几种typeHandler的定义使用详解

    本文主要介绍了mybatis中几种typeHandler的定义使用,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2021-12-12
  • Java集合中的CopyOnWriteArrayList使用详解

    Java集合中的CopyOnWriteArrayList使用详解

    这篇文章主要介绍了Java集合中的CopyOnWriteArrayList使用详解,CopyOnWriteArrayList是ArrayList的线程安全版本,从他的名字可以推测,CopyOnWriteArrayList是在有写操作的时候会copy一份数据,然后写完再设置成新的数据,需要的朋友可以参考下
    2023-12-12
  • java中File类的三种创建文件夹方法总结

    java中File类的三种创建文件夹方法总结

    这篇文章主要给大家介绍了关于java中File类的三种创建文件夹方法,File类代表文件或目录路径名的抽象表达形式,通过File类提供的方法,我们可以很方便地创建文件夹,文中通过代码介绍的非常详细,需要的朋友可以参考下
    2024-04-04
  • 使用springboot对linux进行操控的方法示例

    使用springboot对linux进行操控的方法示例

    这篇文章主要介绍了使用springboot对linux进行操控的方法示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-11-11
  • spring缓存自定义resolver的方法

    spring缓存自定义resolver的方法

    这篇文章主要为大家详细介绍了spring缓存自定义resolver的方法,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下,希望能够给你带来帮助
    2022-03-03
  • JDK动态代理详细解析

    JDK动态代理详细解析

    这篇文章主要介绍了JDK动态代理详细解析,在Java的动态代理机制中,有两个重要的类和接口,一个是InvoInvocationHandler(接口)、Proxy(类),这一个类和接口是我们动态代理所必须用到的,需要的朋友可以参考下
    2023-11-11
  • maven仓库访问顺序小结

    maven仓库访问顺序小结

    在日常操作中,相信很多人在maven仓库的优先级顺序是什么问题上存在疑惑,本文就来介绍一下maven仓库访问顺序,具有一定的参考价值,感兴趣的可以了解一下
    2023-10-10
  • 详解关于spring bean名称命名的那些事

    详解关于spring bean名称命名的那些事

    每个bean都有一个或者多个标识符,这些标识符在容器中必须是唯一的,这篇文章主要给大家介绍了关于spring bean名称命名的那些事,需要的朋友可以参考下
    2021-07-07
  • windows 部署JAVA环境安装iDea的详细步骤

    windows 部署JAVA环境安装iDea的详细步骤

    这篇文章主要介绍了windows 部署JAVA环境安装iDea的详细步骤,本文通过图文并茂的形式给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-08-08

最新评论