SpringBoot 集成 Kettle的实现示例

 更新时间:2025年01月26日 10:24:20   作者:catoop  
本文主要介绍了SpringBoot 集成 Kettle的实现示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

Kettle 简介

Kettle 最初由 Matt Casters 开发,是 Pentaho 数据集成平台的一部分。它提供了一个用户友好的界面和丰富的功能集,使用户能够轻松地设计、执行和监控 ETL 任务。Kettle 通过其强大的功能和灵活性,帮助企业高效地处理大规模数据集成任务。

主要组成部分

  • Spoon
    • 用途:Spoon 是 Kettle 的图形化设计工具。用户可以使用 Spoon 设计和调试 ETL 转换和作业。
    • 功能:拖放式界面、预览数据、测试 ETL 流程、管理连接、编写脚本等。
  • Pan
    • 用途:Pan 是一个命令行工具,用于执行由 Spoon 设计的 ETL 转换。
    • 功能:通过命令行执行转换、调度作业、集成到其他自动化流程中。
  • Kitchen
    • 用途:Kitchen 是一个命令行工具,用于执行由 Spoon 设计的 ETL 作业。
    • 功能:通过命令行执行作业、调度作业、集成到其他自动化流程中。
  • Carte
    • 用途:Carte 是一个轻量级的 Web 服务器,提供远程执行和监控功能。
    • 功能:远程执行和监控 ETL 转换和作业、查看日志、管理集群等。
  • Repositories
    • 用途:存储和管理 ETL 转换和作业的地方。
    • 功能:可以使用数据库或文件系统作为存储库,支持版本控制和共享。

主要功能和特点

  • 数据提取

    • 支持多种数据源,如关系数据库、文件(CSV、Excel、XML 等)、大数据平台(Hadoop、Hive 等)、云存储(Amazon S3、Google Drive 等)、Web 服务和 API 等。
  • 数据转换

    • 丰富的转换步骤,包括数据清洗、数据聚合、数据过滤、数据排序、数据连接、数据拆分、数据类型转换等。
  • 数据加载

    • 支持将数据加载到多种目标系统中,如关系数据库、大数据平台、文件系统、云存储等。
  • 调度和自动化

    • 支持通过命令行工具(Pan 和 Kitchen)和调度器(如 cron 或 Windows 任务计划)进行调度和自动化执行。
  • 扩展性

    • 提供了插件机制,用户可以编写自定义插件,扩展 Kettle 的功能。
    • 支持 JavaScript 和 Java 进行脚本编写,增强转换和作业的灵活性。
  • 集群和并行处理

    • 支持集群模式,能够在分布式环境中并行处理大规模数据。
    • 提供了分布式 ETL 执行和负载均衡功能。
  • 数据质量和数据治理

    • 提供了数据验证、数据一致性检查和数据校验功能,帮助确保数据的质量和一致性。
  • 实时数据处理

    • 支持实时数据流处理,通过集成 Kafka、MQTT 等流处理平台,实现实时数据的提取、转换和加载。

集成 Kettle

将 Kettle(Pentaho Data Integration, PDI)集成到 Spring Boot 项目中,可以实现 ETL 流程的自动化和集成化处理。以下是详细的集成过程:

准备工作

  • 下载 Kettle:从 Pentaho 官网下载 Kettle(PDI)的最新版本,并解压到本地目录。
  • Spring Boot 项目:确保已有一个 Spring Boot 项目,或新建一个 Spring Boot 项目。

引入 Kettle 依赖

在 Spring Boot 项目的 pom.xml 文件中添加 Kettle 所需的依赖。你可以将 Kettle 的 JAR 文件添加到本地 Maven 仓库,或直接在项目中引入这些 JAR 文件。

<dependencies>
    <!-- Spring Boot 依赖 -->

    <!-- Kettle 依赖 -->
    <dependency>
        <groupId>pentaho-kettle</groupId>
        <artifactId>kettle-core</artifactId>
        <version>9.4.0.0-343</version>
    </dependency>
    <dependency>
        <groupId>pentaho-kettle</groupId>
        <artifactId>kettle-engine</artifactId>
        <version>9.4.0.0-343</version>
    </dependency>
    <dependency>
        <groupId>pentaho-kettle</groupId>
        <artifactId>kettle-dbdialog</artifactId>
        <version>9.4.0.0-343</version>
    </dependency>
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-vfs2</artifactId>
        <version>2.7.0</version>
    </dependency>
    <!-- 根据需要添加其他 Kettle 依赖 -->
    
    <!-- 操作数据库数据时添加相应的数据库依赖 -->
    
</dependencies>

处理密码加密

在 resources 目录下创建 kettle-password-encoder-plugins.xml 文件,用于配置密码加密插件:

<password-encoder-plugins>

    <password-encoder-plugin id="Kettle">
        <description>Kettle Password Encoder</description>
        <classname>org.pentaho.support.encryption.KettleTwoWayPasswordEncoder</classname>
    </password-encoder-plugin>

</password-encoder-plugins>

kettle-core依赖中org.pentaho.support.encryption.KettleTwoWayPasswordEncoder类实现了TwoWayPasswordEncoderInterface接口,用于处理密码的加密和解密操作。

添加 Spoon 的任务文件

在 Kettle(Pentaho Data Integration,PDI)中,作业(Job)和转换(Transformation)是两种核心的 ETL 组件,它们在设计和功能上有着本质的区别。

转换(Transformation)

  • 数据处理流程:转换是一个数据处理流程,专注于数据的提取(Extract)、转换(Transform)和加载(Load)。
  • 行级处理:转换以行级处理数据,每次处理一行数据,并将其传递给下一步骤。
  • 任务文件为.ktr文件。

作业(Job)

  • 任务管理和控制流程:作业是一个任务管理和控制流程,负责调度和控制一系列任务的执行顺序。
  • 步骤级处理:作业以步骤为单位处理任务,每次执行一个步骤,然后根据条件决定执行下一个步骤。
  • 任务文件为.kjb文件。

区别

  • 转换处理数据行,作业处理任务步骤。
  • 转换中的步骤是并行执行的,而作业中的步骤是顺序执行的。
  • 转换侧重于数据的处理和转换,作业侧重于任务的调度和管理。
  • 转换主要通过数据流控制,作业提供了丰富的逻辑控制(条件判断、循环、错误处理等)。
  • 转换适用于复杂的数据处理流程,作业适用于任务调度和控制。

在 Spring Boot 项目的 resources 目录下,创建一个 kettle 目录,并将 Kettle 的任务文件(如 转换1.ktr)复制到该目录中。

编写 Kettle 服务类

创建一个服务类,用于执行 Kettle 转换或作业。

package com.example.kettletest.service.impl;

import com.example.kettletest.service.KettleJobService;
import org.pentaho.di.core.KettleEnvironment;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.exception.KettleXMLException;
import org.pentaho.di.core.util.EnvUtil;
import org.pentaho.di.job.Job;
import org.pentaho.di.job.JobMeta;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransMeta;
import org.springframework.core.io.ClassPathResource;
import org.springframework.stereotype.Service;

import java.io.File;
import java.io.IOException;

/**
 * @author 罗森
 * @date 2024/6/6 13:21
 */
@Service
public class KettleJobServiceImpl implements KettleJobService {
    @Override
    public void runTaskFile(String taskFileName) {
        // 初始化 Kettle 环境
        try {
            KettleEnvironment.init();
            EnvUtil.environmentInit();
        } catch (KettleException e) {
            throw new RuntimeException(e);
        }
        // 执行任务文件
        if (taskFileName.endsWith(".ktr")) {
            taskFileKTR(taskFileName);
        } else if (taskFileName.endsWith(".kjb")) {
            taskFileKJB(taskFileName);
        } else {
            throw new IllegalArgumentException("Unsupported file type: " + taskFileName);
        }
    }

    /**
     * 针对kjb文件的操作
     * @param taskFileName
     */
    public void taskFileKJB(String taskFileName) {
        try {
            // 获取资源文件路径
            ClassPathResource resource = new ClassPathResource("kettle/" + taskFileName);
            File jobFile = resource.getFile();
            // 加载 KJB 文件
            JobMeta jobMeta = new JobMeta(jobFile.getAbsolutePath(), null);
            // 创建作业对象
            Job job = new Job(null, jobMeta);
            // 启动作业
            job.start();
            // 等待作业完成
            job.waitUntilFinished();

            if (job.getErrors() > 0) {
                System.out.println("There were errors during job execution.");
            } else {
                System.out.println("Job executed successfully.");
            }
        } catch (IOException | KettleXMLException e) {
            e.printStackTrace();
        }
    }

    /**
     * 针对ktr文件的操作
     * @param taskFileName
     */
    public void taskFileKTR(String taskFileName) {
        try {
            // 获取资源文件路径
            ClassPathResource resource = new ClassPathResource("kettle/" + taskFileName);
            File transFile = resource.getFile();
            // 加载 KTR 文件
            TransMeta transMeta = new TransMeta(transFile.getAbsolutePath());
            // 创建转换对象
            Trans trans = new Trans(transMeta);
            // 启动作业
            trans.execute(null);
            // 等待作业完成
            trans.waitUntilFinished();

            if (trans.getErrors() > 0) {
                System.err.println("There were errors during Transformation execution.");
            } else {
                System.out.println("Transformation executed successfully!");
            }
        } catch (IOException | KettleException e) {
            e.printStackTrace();
        }
    }
}

常见问题解决办法

  • 运行后报错信息为:Unable to find plugin with ID 'Kettle'. If this is a test, make sure kettle-core tests jar is a dependency. If this is live make sure a kettle-password-encoder-plugins.xml exits in the classpath.

    **解决办法:**在 resources 目录下创建 kettle-password-encoder-plugins.xml 文件。

  • 运行后报错信息为:ERROR (version 9.4.0.0-343, build 0.0 from 2022-11-08 07.50.27 by buildguy) : A serious error occurred during job execution: 无法找到作业的开始点.

    **解决办法:**为Spoon制作的作业任务增加开始节点。

  • 运行后报错信息为:Can't run transformation due to plugin missing.

    **解决办法:**此问题通常出现在涉及类似于导出excel文件、json文件时。在初始化 Kettle 环境之前指明相关插件的绝对路径(相关插件通常在Kettle本地解压文件夹中的plugins目录下),新增以下代码:

    StepPluginType.getInstance().getPluginFolders().add(new PluginFolder("E:\\Kettle\\pdi-ce-9.4.0.0-343\\data-integration\\plugins", false, true));
    

    将代码中的地址换成您本地的绝对地址。

到此这篇关于SpringBoot 集成 Kettle的实现示例的文章就介绍到这了,更多相关SpringBoot 集成 Kettle内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家! 

相关文章

  • java中单双斜杠的使用图文详解

    java中单双斜杠的使用图文详解

    JAVA中的斜杠有正斜杠与反斜杠之分,正斜杠,一般就叫做斜杠,下面这篇文章主要给大家介绍了关于java中单双斜杠使用的相关资料,文中通过图文介绍的非常详细,需要的朋友可以参考下
    2022-09-09
  • Java多线程产生死锁的必要条件

    Java多线程产生死锁的必要条件

    今天小编就为大家分享一篇关于Java多线程产生死锁的必要条件,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧
    2019-01-01
  • Spring框架JavaMailSender发送邮件工具类详解

    Spring框架JavaMailSender发送邮件工具类详解

    这篇文章主要为大家详细介绍了Spring框架JavaMailSender发送邮件工具类,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2019-04-04
  • Java 集合框架 Queue 和 Stack 体系

    Java 集合框架 Queue 和 Stack 体系

    这篇文章主要介绍了Java 集合框架Queue和Stack体系,Stack 继承自Vector,并拓展了五个允许将容器视为栈结构的操作,Queue接口定义了队列的能力,它继承自Collection,更多相关内容需要得小伙伴可以参考一下
    2022-06-06
  • Java泛型实现类型安全的通用类型转换器

    Java泛型实现类型安全的通用类型转换器

    在开发中,我们常常需要在不同类型之间进行转换,为了提高代码的可读性与安全性,Java的泛型机制提供了强大的类型检查能力,下面我们就来看看如何通过泛型实现类型安全的通用转换器
    2024-11-11
  • 原理分析SonarQube中IdentityProvider账户互斥现象

    原理分析SonarQube中IdentityProvider账户互斥现象

    这篇文章主要为大家介绍分析SonarQube中IdentityProvider账户互斥现象原理,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步
    2022-02-02
  • Java中的继承与接口解读

    Java中的继承与接口解读

    这篇文章主要介绍了Java中的继承与接口使用,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-02-02
  • SpringBoot 图书管理系统(删除、强制登录、更新图书)详细代码

    SpringBoot 图书管理系统(删除、强制登录、更新图书)详细代码

    在企业开发中,通常不采用delete语句进行物理删除,而是使用逻辑删除,逻辑删除通过修改标识字段来表示数据已被删除,方便数据恢复,本文给大家介绍SpringBoot 图书管理系统实例代码,感兴趣的朋友跟随小编一起看看吧
    2024-09-09
  • dubbo服务注册到nacos的过程剖析

    dubbo服务注册到nacos的过程剖析

    这篇文章主要为大家介绍了dubbo服务注册到nacos的过程剖析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职极限
    2022-02-02
  • spring cloud gateway限流常见算法实现

    spring cloud gateway限流常见算法实现

    本文主要介绍了spring cloud gateway限流常见算法实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2025-02-02

最新评论