java实现MapReduce对文件进行切分的示例代码

 更新时间:2022年01月21日 10:11:47   作者:liangzai2048  
本文主要介绍了java实现MapReduce对文件进行切分的示例代码,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

比如有海量的文本文件,如订单,页面点击事件的记录,量特别大,很难搞定。
那么我们该怎样解决海量数据的计算?

1、获取总行数
2、计算每个文件中存多少数据
3、split切分文件
4、reduce将文件进行汇总

例如这里有百万条数据,单个文件操作太麻烦,所以我们需要进行切分
在切分文件的过程中会出现文件不能整个切分的情况,可能有剩下的数据并没有被读取到,所以我们每个切分128条数据,不足128条再保留到一个文件中

创建MapTask

import java.io.*;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

public class MapTask extends Thread {
    //用来接收具体的哪一个文件
    private File file;
    private int flag;

    public MapTask(File file, int flag) {
        this.file = file;
        this.flag = flag;
    }

    @Override
    public void run() {
        try {
            BufferedReader br = new BufferedReader(new FileReader(file));
            String line;
            HashMap<String, Integer> map = new HashMap<String, Integer>();
            while ((line = br.readLine()) != null) {
                /**
                 * 统计班级人数HashMap存储
                 */
                String clazz = line.split(",")[4];
                if (!map.containsKey(clazz)) {
                    map.put(clazz, 1);
                } else {
                    map.put(clazz, map.get(clazz) + 1);
                }
            }
            br.close();
            BufferedWriter bw = new BufferedWriter(
                    new FileWriter("F:\\IDEADEMO\\shujiabigdata\\part\\part---" + flag));
            Set<Map.Entry<String, Integer>> entries = map.entrySet();
            for (Map.Entry<String, Integer> entry : entries) {
                String key = entry.getKey();
                Integer value = entry.getValue();
                bw.write(key + ":" + value);
                bw.newLine();
            }
            bw.flush();
            bw.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

创建Map

import java.io.File;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Map {
    public static void main(String[] args) {
        long start = System.currentTimeMillis();
        // 多线程连接池(线程池)
        ExecutorService executorService = Executors.newFixedThreadPool(8);
        // 获取文件列表
        File file = new File("F:\\IDEADEMO\\shujiabigdata\\split");
        File[] files = file.listFiles();
        //创建多线程对象
        int flag = 0;
        for (File f : files) {
            //为每一个文件启动一个线程
            MapTask mapTask = new MapTask(f, flag);
            executorService.submit(mapTask);
            flag++;
        }
        executorService.shutdown();
        long end = System.currentTimeMillis();
        System.out.println(end-start);
    }
}

创建ClazzSum

import java.io.BufferedReader;
import java.io.FileReader;
import java.util.HashMap;

public class ClazzSum {
    public static void main(String[] args) throws Exception {
        long start = System.currentTimeMillis();
        BufferedReader br = new BufferedReader(
                new FileReader("F:\\IDEADEMO\\shujiabigdata\\data\\bigstudents.txt"));
        String line;
        HashMap<String, Integer> map = new HashMap<String, Integer>();
        while ((line = br.readLine()) != null) {
            String clazz = line.split(",")[4];
            if (!map.containsKey(clazz)) {
                map.put(clazz, 1);
            } else {
                map.put(clazz, map.get(clazz) + 1);
            }
        }
        System.out.println(map);
        long end = System.currentTimeMillis();
        System.out.println(end-start);
    }
}

创建split128

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.FileReader;
import java.io.FileWriter;
import java.util.ArrayList;

public class Split128 {
    public static void main(String[] args) throws Exception {
        BufferedReader br = new BufferedReader(
                new FileReader("F:\\IDEADEMO\\shujiabigdata\\data\\students.txt"));

        //用作标记文件,也作为文件名称
        int index = 0;
        BufferedWriter bw = new BufferedWriter(
                new FileWriter("F:\\IDEADEMO\\shujiabigdata\\split01\\split---" + index));

        ArrayList<String> list = new ArrayList<String>();
        String line;
        //用作累计读取了多少行数据
        int flag = 0;
        int row = 0;
        while ((line = br.readLine()) != null) {
            list.add(line);
            flag++;
            // flag = 140
            if (flag == 140) {// 一个文件读写完成,生成新的文件
                row = 0 + 128 * index;
                for (int i = row; i <= row + 127; i++) {
                    bw.write(list.get(i));
                    bw.newLine();
                }
                bw.flush();
                bw.close();
                /**
                 * 生成新的文件
                 * 计数清零
                 */
                index++;
                flag = 12;
                bw = new BufferedWriter(
                        new FileWriter("F:\\IDEADEMO\\shujiabigdata\\split01\\split---" + index));
            }
        }
        //文件读取剩余128*1.1范围之内
        for (int i = list.size() - flag; i < list.size(); i++) {
            bw.write(list.get(i));
            bw.newLine();
        }
        bw.flush();
        bw.close();
    }
}

创建Reduce

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.HashMap;

public class Reduce {
    public static void main(String[] args) throws Exception {
        long start = System.currentTimeMillis();
        HashMap<String, Integer> map = new HashMap<String, Integer>();
        File file = new File("F:\\IDEADEMO\\shujiabigdata\\part");
        File[] files = file.listFiles();
        for (File f : files) {
            BufferedReader br = new BufferedReader(new FileReader(f));
            String line;
            while ((line = br.readLine()) != null) {
                String clazz = line.split(":")[0];
                int sum = Integer.valueOf(line.split(":")[1]);
                if (!map.containsKey(clazz)) {
                    map.put(clazz, sum);
                } else {
                    map.put(clazz, map.get(clazz) + sum);
                }
            }
        }
        long end = System.currentTimeMillis();
        System.out.println(end-start);
        System.out.println(map);
    }
}

最后将文件切分了8份,这里采用了线程池,建立线程连接,多个线程同时启动,比单一文件采用多线程效率更高更好使。

到此这篇关于java实现MapReduce对文件进行切分的示例代码的文章就介绍到这了,更多相关java MapReduce 文件切分内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • springboot注解Aspect实现方案

    springboot注解Aspect实现方案

    本文提供一种自定义注解,来实现业务审批操作的DEMO,不包含审批流程的配置功能。对springboot注解Aspect实现方案感兴趣的朋友一起看看吧
    2022-01-01
  • Spring实战之类级别缓存实现与使用方法

    Spring实战之类级别缓存实现与使用方法

    这篇文章主要介绍了Spring实战之类级别缓存实现与使用方法,结合实例形式分析了Spring类级别缓存配置、属性、领域模型等相关操作技巧,需要的朋友可以参考下
    2020-01-01
  • Java中输入输出方式详细讲解

    Java中输入输出方式详细讲解

    这篇文章主要给大家介绍了关于Java中输入输出方式的相关资料,Java输入输出是指使用java提供的一些类和方法来实现数据的输入和输出,文中通过代码介绍的非常详细,需要的朋友可以参考下
    2023-09-09
  • activiti实现员工请假流程解析

    activiti实现员工请假流程解析

    这篇文章主要介绍了activiti实现员工请假流程解析,本文通过实例代码图文相结合给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-07-07
  • RocketMQ producer发送者浅析

    RocketMQ producer发送者浅析

    RocketMQ生产者是一种高性能、可靠的消息发送者,能够将消息快速、可靠地发送到RocketMQ消息队列中。它具有多种消息发送模式和消息发送方式,可以根据不同的业务需求进行灵活配置
    2023-04-04
  • 浅谈对象与Map相互转化

    浅谈对象与Map相互转化

    这篇文章主要介绍了利用BeanMap进行对象与Map的相互转换,在文中列举了完整代码,需要的朋友可以参考下。
    2017-09-09
  • SpringCloud超详细讲解微服务网关Zuul

    SpringCloud超详细讲解微服务网关Zuul

    这篇文章主要介绍了SpringCloud Zuul微服务网关,负载均衡,熔断和限流,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2022-07-07
  • Java中断线程的方法

    Java中断线程的方法

    这篇文章主要介绍了Java中断线程的方法,非常不错,具有参考借鉴价值,需要的朋友可以参考下
    2017-05-05
  • 解决redisTemplate中leftPushAll隐性bug的问题

    解决redisTemplate中leftPushAll隐性bug的问题

    这篇文章主要介绍了解决redisTemplate中leftPushAll隐性bug的问题,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2021-02-02
  • springboot 参数格式校验操作

    springboot 参数格式校验操作

    这篇文章主要介绍了springboot 参数格式校验操作,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-07-07

最新评论