java实现MapReduce对文件进行切分的示例代码

 更新时间:2022年01月21日 10:11:47   作者:liangzai2048  
本文主要介绍了java实现MapReduce对文件进行切分的示例代码,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

比如有海量的文本文件,如订单,页面点击事件的记录,量特别大,很难搞定。
那么我们该怎样解决海量数据的计算?

1、获取总行数
2、计算每个文件中存多少数据
3、split切分文件
4、reduce将文件进行汇总

例如这里有百万条数据,单个文件操作太麻烦,所以我们需要进行切分
在切分文件的过程中会出现文件不能整个切分的情况,可能有剩下的数据并没有被读取到,所以我们每个切分128条数据,不足128条再保留到一个文件中

创建MapTask

import java.io.*;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

public class MapTask extends Thread {
    //用来接收具体的哪一个文件
    private File file;
    private int flag;

    public MapTask(File file, int flag) {
        this.file = file;
        this.flag = flag;
    }

    @Override
    public void run() {
        try {
            BufferedReader br = new BufferedReader(new FileReader(file));
            String line;
            HashMap<String, Integer> map = new HashMap<String, Integer>();
            while ((line = br.readLine()) != null) {
                /**
                 * 统计班级人数HashMap存储
                 */
                String clazz = line.split(",")[4];
                if (!map.containsKey(clazz)) {
                    map.put(clazz, 1);
                } else {
                    map.put(clazz, map.get(clazz) + 1);
                }
            }
            br.close();
            BufferedWriter bw = new BufferedWriter(
                    new FileWriter("F:\\IDEADEMO\\shujiabigdata\\part\\part---" + flag));
            Set<Map.Entry<String, Integer>> entries = map.entrySet();
            for (Map.Entry<String, Integer> entry : entries) {
                String key = entry.getKey();
                Integer value = entry.getValue();
                bw.write(key + ":" + value);
                bw.newLine();
            }
            bw.flush();
            bw.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

创建Map

import java.io.File;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Map {
    public static void main(String[] args) {
        long start = System.currentTimeMillis();
        // 多线程连接池(线程池)
        ExecutorService executorService = Executors.newFixedThreadPool(8);
        // 获取文件列表
        File file = new File("F:\\IDEADEMO\\shujiabigdata\\split");
        File[] files = file.listFiles();
        //创建多线程对象
        int flag = 0;
        for (File f : files) {
            //为每一个文件启动一个线程
            MapTask mapTask = new MapTask(f, flag);
            executorService.submit(mapTask);
            flag++;
        }
        executorService.shutdown();
        long end = System.currentTimeMillis();
        System.out.println(end-start);
    }
}

创建ClazzSum

import java.io.BufferedReader;
import java.io.FileReader;
import java.util.HashMap;

public class ClazzSum {
    public static void main(String[] args) throws Exception {
        long start = System.currentTimeMillis();
        BufferedReader br = new BufferedReader(
                new FileReader("F:\\IDEADEMO\\shujiabigdata\\data\\bigstudents.txt"));
        String line;
        HashMap<String, Integer> map = new HashMap<String, Integer>();
        while ((line = br.readLine()) != null) {
            String clazz = line.split(",")[4];
            if (!map.containsKey(clazz)) {
                map.put(clazz, 1);
            } else {
                map.put(clazz, map.get(clazz) + 1);
            }
        }
        System.out.println(map);
        long end = System.currentTimeMillis();
        System.out.println(end-start);
    }
}

创建split128

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.FileReader;
import java.io.FileWriter;
import java.util.ArrayList;

public class Split128 {
    public static void main(String[] args) throws Exception {
        BufferedReader br = new BufferedReader(
                new FileReader("F:\\IDEADEMO\\shujiabigdata\\data\\students.txt"));

        //用作标记文件,也作为文件名称
        int index = 0;
        BufferedWriter bw = new BufferedWriter(
                new FileWriter("F:\\IDEADEMO\\shujiabigdata\\split01\\split---" + index));

        ArrayList<String> list = new ArrayList<String>();
        String line;
        //用作累计读取了多少行数据
        int flag = 0;
        int row = 0;
        while ((line = br.readLine()) != null) {
            list.add(line);
            flag++;
            // flag = 140
            if (flag == 140) {// 一个文件读写完成,生成新的文件
                row = 0 + 128 * index;
                for (int i = row; i <= row + 127; i++) {
                    bw.write(list.get(i));
                    bw.newLine();
                }
                bw.flush();
                bw.close();
                /**
                 * 生成新的文件
                 * 计数清零
                 */
                index++;
                flag = 12;
                bw = new BufferedWriter(
                        new FileWriter("F:\\IDEADEMO\\shujiabigdata\\split01\\split---" + index));
            }
        }
        //文件读取剩余128*1.1范围之内
        for (int i = list.size() - flag; i < list.size(); i++) {
            bw.write(list.get(i));
            bw.newLine();
        }
        bw.flush();
        bw.close();
    }
}

创建Reduce

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.HashMap;

public class Reduce {
    public static void main(String[] args) throws Exception {
        long start = System.currentTimeMillis();
        HashMap<String, Integer> map = new HashMap<String, Integer>();
        File file = new File("F:\\IDEADEMO\\shujiabigdata\\part");
        File[] files = file.listFiles();
        for (File f : files) {
            BufferedReader br = new BufferedReader(new FileReader(f));
            String line;
            while ((line = br.readLine()) != null) {
                String clazz = line.split(":")[0];
                int sum = Integer.valueOf(line.split(":")[1]);
                if (!map.containsKey(clazz)) {
                    map.put(clazz, sum);
                } else {
                    map.put(clazz, map.get(clazz) + sum);
                }
            }
        }
        long end = System.currentTimeMillis();
        System.out.println(end-start);
        System.out.println(map);
    }
}

最后将文件切分了8份,这里采用了线程池,建立线程连接,多个线程同时启动,比单一文件采用多线程效率更高更好使。

到此这篇关于java实现MapReduce对文件进行切分的示例代码的文章就介绍到这了,更多相关java MapReduce 文件切分内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Java语法关于泛型与类型擦除的分析

    Java语法关于泛型与类型擦除的分析

    泛型没有其看起来那么深不可测,它并不神秘与神奇,泛型是Java 中一个很小巧的概念,但同时也是一个很容易让人迷惑的知识点,它让人迷惑的地方在于它的许多表现有点违反直觉
    2021-09-09
  • SpringMVC执行过程详细讲解

    SpringMVC执行过程详细讲解

    MVC是一种软件设计典范,用一种业务逻辑、数据、界面显示分离的方法组织代码,将业务逻辑聚集到一个组件里面,在改进和个性化定制界面及用户交互的同时,不需要重新编写业务逻辑,MVC分层有助于管理和架构复杂的应用程序
    2022-08-08
  • Springboot利用Aop捕捉注解实现业务异步执行

    Springboot利用Aop捕捉注解实现业务异步执行

    在开发过程中,尽量会将比较耗时且并不会影响请求的响应结果的业务放在异步线程池中进行处理,那么到时什么任务在执行的时候会创建单独的线程进行处理呢?这篇文章主要介绍了Springboot利用Aop捕捉注解实现业务异步执行
    2023-04-04
  • springboot用controller跳转html页面的实现

    springboot用controller跳转html页面的实现

    这篇文章主要介绍了springboot用controller跳转html页面的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-09-09
  • java并发学习之Executor源码解析

    java并发学习之Executor源码解析

    这篇文章主要为大家介绍了java并发学习之Executor源码示例解析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-07-07
  • java连接mysql数据库详细步骤解析

    java连接mysql数据库详细步骤解析

    以下是对java连接mysql数据库的具体详细步骤进行了分析介绍,需要的朋友可以过来参考下
    2013-08-08
  • SpringBoot的10个参数验证技巧分享

    SpringBoot的10个参数验证技巧分享

    参数验证很重要,是平时开发环节中不可少的一部分,但是我想很多后端同事会偷懒,干脆不错,这样很可能给系统的稳定性和安全性带来严重的危害,那么在Spring Boot应用中如何做好参数校验工作呢,本文提供了10个小技巧,需要的朋友可以参考下
    2023-09-09
  • SpringBoot对接Spark过程详解

    SpringBoot对接Spark过程详解

    这篇文章主要介绍SpringBoot接入Spark的方法的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望能帮助大家解决问题
    2023-02-02
  • java将数据写入内存,磁盘的方法

    java将数据写入内存,磁盘的方法

    下面小编就为大家分享一篇java将数据写入内存,磁盘的方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2018-01-01
  • fastjson转换对象实体@JsonProperty不生效问题及解决

    fastjson转换对象实体@JsonProperty不生效问题及解决

    这篇文章主要介绍了fastjson转换对象实体@JsonProperty不生效问题及解决,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-08-08

最新评论