如何使用Java调用Spark集群

 更新时间:2024年02月29日 09:45:26   作者:bluesnail95  
这篇文章主要介绍了如何使用Java调用Spark集群,我搭建的Spark集群的版本是2.4.4,本文结合示例代码给大家介绍的非常详细,感兴趣的朋友跟随小编一起看看吧

我搭建的Spark集群的版本是2.4.4。

在网上找的maven依赖,链接忘记保存了。。。。

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <hadoop.version>2.6.0-cdh5.14.2</hadoop.version>
    <hive.version>1.1.0-cdh5.14.2</hive.version>
    <hbase.version>1.2.0-cdh5.14.2</hbase.version>
    <scala.version>2.11.8</scala.version>
    <spark.version>2.4.4</spark.version>
</properties>
<repositories>
    <repository>
        <id>cloudera</id>
        <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
    </repository>
</repositories>
<dependencies>
    <!--scala-->
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>2.11.8</version>
    </dependency>
    <!-- spark-core -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <!-- spark-sql -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <!-- spark-hive -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-hive_2.11</artifactId>
        <version>2.4.4</version>
    </dependency>
    <!-- spark-graphx -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-graphx_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <!-- hadoop -->
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>${hadoop.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>${hadoop.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-hdfs</artifactId>
        <version>${hadoop.version}</version>
    </dependency>
    <!-- log4j -->
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.17</version>
    </dependency>
    <!-- junit -->
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.11</version>
    </dependency>
    <!-- kafka-clients -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.11.0.2</version>
    </dependency>
    <!-- mysql-connector-java -->
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.31</version>
    </dependency>
</dependencies>
<build>
    <plugins>
        <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
            <version>2.0.1.RELEASE</version>
            <configuration>
                <mainClass>gdut.spark.SparkInit</mainClass>
            </configuration>
            <executions>
                <execution>
                    <goals>
                        <goal>repackage</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

Java客户端连接示例:

import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.SparkConf;
import java.util.Arrays;
import java.util.List;
public class SparkInit {
    public static void main(String[] args) {
        try {
            SparkConf conf = new SparkConf().setAppName("liufeifei").setMaster("spark://x.x.x.x:30010");
            conf.set("spark.executor.cores","1");
            conf.set("spark.executor.memory", "1024m");
            JavaSparkContext sc = new JavaSparkContext(conf);
            List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
            JavaRDD<Integer> distData = sc.parallelize(data);
            System.out.println("result is " + distData.count());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

遇到问题:
(1)spark集群中,worker节点提示:Failed to send RPC
master pod的spark-shell执行collect方法,日志输出如下:

worker pod输出如下:

worker节点无法创建Executor,在worker节点的安装目录下有个work目录,有每次创建Executor的日志。查看是worker节点与master节点无法通信。但是worker节点有向master注册,在master的UI界面有显示注册的worker节点。在网上不经意看到有人说可能是istio影响了,然后想起自己之前部署过istio。查看spark部署的命名空间确实是开启istio注入。

换个没有istio注入的命名空间创建spark集群。在master节点的spark-shell可以执行collect方法,可以调度到worker节点的Executor。

(2)Caused by: java.net.UnknownHostException: XXX
无论在本地还是在虚拟机执行上面的客户端连接,都会提示UnknownHostException。这是因为在worker容器的/etc/hosts找不到客户端主机名称和IP的映射关系。

解决办法:使用 HostAliases 向 Pod /etc/hosts 文件添加条目

hostAliases:
  - ip: "127.0.0.1"
    hostnames:
    - "foo.local"
    - "bar.local"
  - ip: "10.1.2.3"
    hostnames:
    - "foo.remote"
    - "bar.remote"

我在yaml文件添加了hostAliases之后,提示主机名不符合规定,然后修改了自己虚拟机上的主机名。

LINUX修改主机名称(立即永久生效)

修改主机名后遇到:java.net.UnknownHostException:Name or Service not known

修改了/etc/hosts文件可以解决。

因为spark集群是部署在一台虚拟机上,本地不能和虚拟机通信,所以要把spring boot项目打包成jar在虚拟机上执行。
Main方法输出:

worker日志输出(k8s容器和宿主机时间相差了8个小时):

到此这篇关于使用Java调用Spark集群的文章就介绍到这了,更多相关Java Spark集群内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Java获取Object中Value的实现方法

    Java获取Object中Value的实现方法

    本文介绍了在Java中获取对象属性值的几种常见方法,包括使用反射机制、getter方法、接口或抽象类、Map数据结构、序列化与反序列化以及JavaBeans规范,每种方法都有其适用场景和优缺点,选择合适的方法取决于具体需求
    2025-03-03
  • 解决springboot启动Logback报错ERROR in ch.qos.logback.classic.joran.action.ContextNameAction - Failed to rena

    解决springboot启动Logback报错ERROR in ch.qos.logback.cla

    这篇文章主要介绍了解决springboot启动Logback报错ERROR in ch.qos.logback.classic.joran.action.ContextNameAction - Failed to rena问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-04-04
  • Java SpringBoot模板引擎之 Thymeleaf入门详解

    Java SpringBoot模板引擎之 Thymeleaf入门详解

    jsp有着强大的功能,能查出一些数据转发到JSP页面以后,我们可以用jsp轻松实现数据的显示及交互等,包括能写Java代码。但是,SpringBoot首先是以jar的方式,不是war;其次我们的tomcat是嵌入式的,所以现在默认不支持jsp
    2021-10-10
  • Java集合教程之Collection实例详解

    Java集合教程之Collection实例详解

    集合,或者叫容器,是一个包含多个元素的对象,下面这篇文章主要给大家介绍了关于Java集合教程之Collection的相关资料,文中通过示例代码介绍的非常详细,需要的朋友可以参考下
    2018-08-08
  • RocketMQ保证消息的有序性的案例分享

    RocketMQ保证消息的有序性的案例分享

    Apache RocketMQ 是一个常用的开源消息中间件,它提供了强大的有序消息处理能力,这里我们会探讨 RocketMQ 是如何保证消息的有序性的,包括其设计原理和相关的源码实现,需要的朋友可以参考下
    2024-04-04
  • SpringBoot使用spring.config.import多种方式导入配置文件

    SpringBoot使用spring.config.import多种方式导入配置文件

    本文主要介绍了SpringBoot使用spring.config.import多种方式导入配置文件,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2022-05-05
  • 如何基于SpringBoot实现人脸识别功能

    如何基于SpringBoot实现人脸识别功能

    人工智能时代的到来,相信大家已耳濡目染,虹软免费,离线开放的人脸识别SDK,正推动着全行业进入刷脸时代,下面这篇文章主要给大家介绍了关于如何基于SpringBoot实现人脸识别功能的相关资料,需要的朋友可以参考下
    2022-05-05
  • Java中线程安全有哪些实现思路

    Java中线程安全有哪些实现思路

    在 Java 多线程编程中,线程安全是一个非常重要的概念,本文主要介绍了Java中线程安全有哪些实现思路,非常具有实用价值,需要的朋友可以参考下
    2023-05-05
  • java正则表达式简单应用

    java正则表达式简单应用

    这篇文章主要介绍了java正则表达式简单应用,在之前几篇文章中已经深入学习了java正则表达式基础知识,本文对java正则表达式应用进行研究,感兴趣的小伙伴们可以参考一下
    2015-12-12
  • 基于Java HttpClient和Htmlparser实现网络爬虫代码

    基于Java HttpClient和Htmlparser实现网络爬虫代码

    这篇文章主要介绍了基于Java HttpClient和Htmlparser实现网络爬虫代码的相关资料,需要的朋友可以参考下
    2015-12-12

最新评论