利用Java连接Hadoop进行编程

 更新时间:2022年06月28日 15:40:16   作者:wr456wr  
这篇文章主要介绍了利用Java连接Hadoop进行编程,文章围绕主题展开详细的内容介绍,具有一定的参考价值,感兴趣的小伙伴可以参考一下

实验环境

  • hadoop版本:3.3.2
  • jdk版本:1.8
  • hadoop安装系统:ubuntu18.04
  • 编程环境:IDEA
  • 编程主机:windows

实验内容

测试Java远程连接hadoop

创建maven工程,引入以下依赖:

<dependency>
                <groupId>org.testng</groupId>
                <artifactId>testng</artifactId>
                <version>RELEASE</version>
                <scope>compile</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-hdfs</artifactId>
                <version>3.3.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
                <version>3.3.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-core</artifactId>
                <version>1.2.1</version>
            </dependency>

虚拟机的/etc/hosts配置

hdfs-site.xml配置

<configuration>
        <property>
                <name>dfs.replication</name>
                <value>1</value>
        </property>
        <property>
                <name>dfs.namenode.name.dir</name>
                <value>file:/root/rDesk/hadoop-3.3.2/tmp/dfs/name</value>
        </property>
        <property>
                <name>dfs.datanode.http.address</name>
                <value>VM-12-11-ubuntu:50010</value>
        </property>
        <property>
                <name>dfs.client.use.datanode.hostname</name>
                <value>true</value>
        </property>
        <property>
                <name>dfs.datanode.data.dir</name>
                <value>file:/root/rDesk/hadoop-3.3.2/tmp/dfs/data</value>
        </property>
</configuration>

core-site.xml配置

<configuration>
  <property>
          <name>hadoop.tmp.dir</name>
          <value>file:/root/rDesk/hadoop-3.3.2/tmp</value>
          <description>Abase for other temporary directories.</description>
  </property>
  <property>
          <name>fs.defaultFS</name>
          <value>hdfs://VM-12-11-ubuntu:9000</value>
  </property>
</configuration>

启动hadoop

sbin/start-dfs.sh

主机的hosts(C:\Windows\System32\drivers\etc)文件配置

尝试连接到虚拟机的hadoop并读取文件内容,这里我读取hdfs下的/root/iinput文件内容

Java代码:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
public class TestConnectHadoop {
    public static void main(String[] args) throws Exception {

        String hostname = "VM-12-11-ubuntu";
        String HDFS_PATH = "hdfs://" + hostname + ":9000";
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", HDFS_PATH);
        conf.set("fs.hdfs.impl", DistributedFileSystem.class.getName());
        conf.set("dfs.client.use.datanode.hostname", "true");

        FileSystem fs = FileSystem.get(conf);
        FileStatus[] fileStatuses = fs.listStatus(new Path("/"));
        for (FileStatus fileStatus : fileStatuses) {
            System.out.println(fileStatus.toString());
        }
        FileStatus fileStatus = fs.getFileStatus(new Path("/root/iinput"));
        System.out.println(fileStatus.getOwner());
        System.out.println(fileStatus.getGroup());

        System.out.println(fileStatus.getPath());
        FSDataInputStream open = fs.open(fileStatus.getPath());
        byte[] buf = new byte[1024];
        int n = -1;
        StringBuilder sb = new StringBuilder();
        while ((n = open.read(buf)) > 0) {
            sb.append(new String(buf, 0, n));
        }
        System.out.println(sb);
    }
}

运行结果:

编程实现一个类“MyFSDataInputStream”,该类继承“org.apache.hadoop.fs.FSDataInputStream",要求如下: ①实现按行读取HDFS中指定文件的方法”readLine()“,如果读到文件末尾,则返回为空,否则返回文件一行的文本

思路:emmm我的思路比较简单,只适用于该要求,仅作参考。
将所有的数据读取出来存储起来,然后根据换行符进行拆分,将拆分的字符串数组存储起来,用于readline返回

Java代码

import org.apache.hadoop.fs.FSDataInputStream;
import java.io.IOException;
import java.io.InputStream;
public class MyFSDataInputStream extends FSDataInputStream {
    private String data = null;
    private String[] lines = null;
    private int count = 0;
    private FSDataInputStream in;
    public MyFSDataInputStream(InputStream in) throws IOException {
        super(in);
        this.in = (FSDataInputStream) in;
        init();
    }
    private void init() throws IOException {
        byte[] buf = new byte[1024];
        int n = -1;
        StringBuilder sb = new StringBuilder();
        while ((n = this.in.read(buf)) > 0) {
            sb.append(new String(buf, 0, n));
        }
        data = sb.toString();
        lines = data.split("\n");
    }
    /**
     * 实现按行读取HDFS中指定文件的方法”readLine()“,如果读到文件末尾,则返回为空,否则返回文件一行的文本
     */
    public String read_line() {
        return count < lines.length ? lines[count++] : null;
    }

}

测试类:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
public class TestConnectHadoop {
    public static void main(String[] args) throws Exception {

        String hostname = "VM-12-11-ubuntu";
        String HDFS_PATH = "hdfs://" + hostname + ":9000";
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", HDFS_PATH);
        conf.set("fs.hdfs.impl", DistributedFileSystem.class.getName());
        conf.set("dfs.client.use.datanode.hostname", "true");
        FileSystem fs = FileSystem.get(conf);
        FileStatus fileStatus = fs.getFileStatus(new Path("/root/iinput"));
        System.out.println(fileStatus.getOwner());
        System.out.println(fileStatus.getGroup());
        System.out.println(fileStatus.getPath());
        FSDataInputStream open = fs.open(fileStatus.getPath());
        MyFSDataInputStream myFSDataInputStream = new MyFSDataInputStream(open);
        String line = null;
        int count = 0;
        while ((line = myFSDataInputStream.read_line()) != null ) {
            System.out.printf("line %d is: %s\n", count++, line);
        }
        System.out.println("end");

    }
}

运行结果:

②实现缓存功能,即利用”MyFSDataInputStream“读取若干字节数据时,首先查找缓存,如果缓存中有所需要数据,则直接由缓存提供,否则从HDFS中读取数据

import org.apache.hadoop.fs.FSDataInputStream;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
public class MyFSDataInputStream extends FSDataInputStream {
    private BufferedInputStream buffer;
    private String[] lines = null;
    private int count = 0;
    private FSDataInputStream in;
    public MyFSDataInputStream(InputStream in) throws IOException {
        super(in);
        this.in = (FSDataInputStream) in;
        init();
    }
    private void init() throws IOException {
        byte[] buf = new byte[1024];
        int n = -1;
        StringBuilder sb = new StringBuilder();
        while ((n = this.in.read(buf)) > 0) {
            sb.append(new String(buf, 0, n));
        }
        //缓存数据读取
        buffer = new BufferedInputStream(this.in);
        lines = sb.toString().split("\n");
    }
    /**
     * 实现按行读取HDFS中指定文件的方法”readLine()“,如果读到文件末尾,则返回为空,否则返回文件一行的文本
     */
    public String read_line() {
        return count < lines.length ? lines[count++] : null;
    }
    @Override
    public int read() throws IOException {
        return this.buffer.read();
    }
    public int readWithBuf(byte[] buf, int offset, int len) throws IOException {
        return this.buffer.read(buf, offset, len);
    }
    public int readWithBuf(byte[] buf) throws IOException {
        return this.buffer.read(buf);
    }
}

到此这篇关于利用Java连接Hadoop进行编程的文章就介绍到这了,更多相关Java连接Hadoop内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 深入了解Spring Boot2.3.0及以上版本的Liveness和Readiness功能

    深入了解Spring Boot2.3.0及以上版本的Liveness和Readiness功能

    这篇文章主要介绍了Spring Boot2.3.0及以上版本的Liveness和Readiness功能示例深入解析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-10-10
  • springmvc处理异步请求的示例

    springmvc处理异步请求的示例

    这篇文章主要介绍了springmvc处理异步请求的示例,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-01-01
  • Java elasticSearch-api的具体操作步骤讲解

    Java elasticSearch-api的具体操作步骤讲解

    这篇文章主要介绍了elasticSearch-api的具体操作步骤讲解,本文通过详细的步骤介绍和图文代码展示讲解了该项技术,需要的朋友可以参考下
    2021-06-06
  • java shiro实现退出登陆清空缓存

    java shiro实现退出登陆清空缓存

    本篇文章主要介绍了java shiro实现退出登陆清空缓存,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-02-02
  • Java使用Soap方式调用WebService接口代码示例

    Java使用Soap方式调用WebService接口代码示例

    Java调用WebService接口是指通过Java语言来访问并与WebService进行交互,WebService是一种基于Web的服务架构,它通过标准的XML和HTTP协议来提供服务,这篇文章主要给大家介绍了关于Java使用Soap方式调用WebService接口的相关资料,需要的朋友可以参考下
    2024-03-03
  • Java实现短信验证码和国际短信群发功能的示例

    Java实现短信验证码和国际短信群发功能的示例

    本篇文章主要介绍了Java实现短信验证码和国际短信群发功能的示例,具有一定的参考价值,感兴趣的小伙伴们可以参考一下。
    2017-02-02
  • IDEA连接mysql数据库报错的解决方法

    IDEA连接mysql数据库报错的解决方法

    这篇文章主要介绍了IDEA连接mysql数据库报错的解决方法,文中有非常详细的图文示例,对出现Server returns invalid timezone. Go to ‘Advanced‘ tab and set ‘serverTimezone‘ prope报错的小伙伴们很有帮助哟,需要的朋友可以参考下
    2021-05-05
  • SpringBoot 集成短信和邮件的配置示例详解

    SpringBoot 集成短信和邮件的配置示例详解

    这篇文章主要介绍了SpringBoot 集成短信和邮件的相关知识,项目中使用lombok插件和swagger依赖,无相关依赖的请自行修改,本文通过实例代码给大家介绍的非常详细,需要的朋友可以参考下
    2022-04-04
  • idea下如何设置项目启动的JVM运行内存大小

    idea下如何设置项目启动的JVM运行内存大小

    这篇文章主要介绍了idea下如何设置项目启动的JVM运行内存大小问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2023-12-12
  • SpringBoot整合WebSocket实现聊天室流程全解

    SpringBoot整合WebSocket实现聊天室流程全解

    WebSocket协议是基于TCP的一种新的网络协议。本文将通过SpringBoot集成WebSocket实现简易聊天室,对大家的学习或者工作具有一定的参考学习价值,感兴趣的可以了解一下
    2023-01-01

最新评论