hadoop 详解如何实现数据排序

 更新时间:2024年10月11日 12:02:35   作者:YinJuan791739156  
在很多业务场景下,需要对原始的数据读取分析后,将输出的结果按照指定的业务字段进行排序输出,方便上层应用对结果数据进行展示或使用,减少二次排序的成本

排序是Hadoop的默认行为,不管你是否需要,MapReduce的MapTask和Task都会对输出的结果的Key进行排序,默认的排序顺序是按照字典顺序排列,实现的方法是快速排序。

自定义排序需要继承WritableComparable,实现compareTo方法就完成了自定义排序。

下面介绍几种排序的场景

一、全排序

全排序是指最终只产生一个输出文件,数据在文件内部有序。

1、输入数据

13470253144	180	180	360
13509468723	7335	110349	117684
13560439638	918	4938	5856
13568436656	3597	25635	29232
13590439668	1116	954	2070
13630577991	6960	690	7650
13682846555	1938	2910	4848
13729199489	240	0	240
13736230513	2481	24681	27162
13768778790	120	120	240
13846544121	264	0	264
13956435636	132	1512	1644
13966251146	240	0	240
13975057813	11058	48243	59301
13992314666	3008	3720	6728
15043685818	3659	3538	7197
15910133277	3156	2936	6092
15959002129	1938	180	2118
18271575951	1527	2106	3633
18390173782	9531	2412	11943
84188413	4116	1432	5548

2、Bean对象

继承WritabelComparable,并实现方法compareTo。WritabelComparable起始就是两个接口的综合,Writable是Hadoop自定义序列化数据需要实现的接口,而Coparable是比较排序需要实现的接口。

package cn.nuwa.hap.cp;

import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;


@Data
@NoArgsConstructor
public class FlowBean implements WritableComparable<FlowBean> {
    private long upFlow; //上行流量
    private long downFlow; //下行流量
    private long sumFlow; //总流量

    public void setSumFlow() {
        this.sumFlow = this.upFlow + this.downFlow;
    }

    //4 实现序列化和反序列化方法,注意顺序一定要保持一致
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeLong(upFlow);
        dataOutput.writeLong(downFlow);
        dataOutput.writeLong(sumFlow);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.upFlow = dataInput.readLong();
        this.downFlow = dataInput.readLong();
        this.sumFlow = dataInput.readLong();
    }

    //5 重写ToString
    @Override
    public String toString() {
        return upFlow + "\t" + downFlow + "\t" + sumFlow;
    }

    @Override
    public int compareTo(FlowBean  o) {
        //按照总流量比较,倒序排列
        if(this.sumFlow > o.getSumFlow()){
            return -1;
        }else if(this.sumFlow < o.getSumFlow()){
            return 1;
        }else {
            return 0;
        }
    }
}

  3、Mapper类

package cn.nuwa.hap.cp;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;


public class FlowMapper extends Mapper<LongWritable, Text, FlowBean, Text> {
    private FlowBean outK = new FlowBean();
    private Text outV = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //1 获取一行数据
        String line = value.toString();

        //2 按照"\t",切割数据
        String[] split = line.split("\t");

        //3 封装outK outV
        outK.setUpFlow(Long.parseLong(split[1]));
        outK.setDownFlow(Long.parseLong(split[2]));
        outK.setSumFlow();
        outV.set(split[0]);

        //4 写出outK outV
        context.write(outK,outV);
    }
}

4、Reduce类

package cn.nuwa.hap.cp;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;


public class FlowReducer extends Reducer<FlowBean, Text, Text, FlowBean> {

    @Override
    protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

        //遍历values集合,循环写出,避免总流量相同的情况
        for (Text value : values) {
            //调换KV位置,反向写出
            context.write(value, key);
        }
    }
}

5、Dirver类

package cn.nuwa.hap.cp;

import cn.nuwa.hap.wb.FlowBean;
import cn.nuwa.hap.wb.ProvincePartitioner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class FlowDriver {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        //1 获取job对象
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        //2 关联本Driver类
        job.setJarByClass(FlowDriver.class);

        //3 关联Mapper和Reducer
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);

        //4 设置Map端输出KV类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(cn.nuwa.hap.wb.FlowBean.class);

        //5 设置程序最终输出的KV类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        //6 设置程序的输入输出路径
        FileInputFormat.setInputPaths(job, new Path("C:\\Users\\Dell\\Desktop\\hadoop\\inputFlow"));
        FileOutputFormat.setOutputPath(job, new Path("C:\\Users\\Dell\\Desktop\\hadoop\\outputFlow"));

        //8 指定自定义分区器
        job.setPartitionerClass(ProvincePartitioner.class);

        //9 同时指定相应数量的ReduceTask
        job.setNumReduceTasks(5);

        //7 提交Job
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}

6、最终结果

13509468723	7335	110349	117684
13975057813	11058	48243	59301
13568436656	3597	25635	29232
13736230513	2481	24681	27162
18390173782	9531	2412	11943
13630577991	6960	690	7650
15043685818	3659	3538	7197
13992314666	3008	3720	6728
15910133277	3156	2936	6092
13560439638	918	4938	5856
84188413	4116	1432	5548
13682846555	1938	2910	4848
18271575951	1527	2106	3633
15959002129	1938	180	2118
13590439668	1116	954	2070
13956435636	132	1512	1644
13470253144	180	180	360
13846544121	264	0	264
13729199489	240	0	240
13768778790	120	120	240
13966251146	240	0	240

二、分区排序

分区排序的本质是:

  • 数据分区
  • 区内数据有序

只需要在全排序的基础上加上分区的代码即可

1、分区类

package cn.nuwa.hap.cp;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;


public class ProvincePartitioner extends Partitioner< FlowBean, Text> {

    @Override
    public int getPartition(FlowBean flowBean, Text text, int numPartitions) {
        //获取手机号前三位
        String phone = text.toString();
        String prePhone = phone.substring(0, 3);

        //定义一个分区号变量partition,根据prePhone设置分区号
        int partition;
        if("136".equals(prePhone)){
            partition = 0;
        }else if("137".equals(prePhone)){
            partition = 1;
        }else if("138".equals(prePhone)){
            partition = 2;
        }else if("139".equals(prePhone)){
            partition = 3;
        }else {
            partition = 4;
        }

        //最后返回分区号partition
        return partition;
    }
}

 2、Driver类新增分区配置

  // 设置自定义分区器
  job.setPartitionerClass(ProvincePartitioner.class);

  // 设置对应的ReduceTask的个数
  job.setNumReduceTasks(5);

3、结果

part-r-00000

13630577991	6960	690	7650
13682846555	1938	2910	4848

 part-r-00001

13736230513	2481	24681	27162
13729199489	240	0	240
13768778790	120	120	240

part-r-00002

13846544121	264	0	264

part-r-00003

13975057813	11058	48243	59301
13992314666	3008	3720	6728
13956435636	132	1512	1644
13966251146	240	0	240

part-r-00004

13509468723	7335	110349	117684
13568436656	3597	25635	29232
18390173782	9531	2412	11943
15043685818	3659	3538	7197
15910133277	3156	2936	6092
13560439638	918	4938	5856
84188413	4116	1432	5548
18271575951	1527	2106	3633
15959002129	1938	180	2118
13590439668	1116	954	2070
13470253144	180	180	360

到此这篇关于hadoop 详解如何实现数据排序的文章就介绍到这了,更多相关hadoop 数据排序内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Java如何取掉json数据中值为null的属性字段

    Java如何取掉json数据中值为null的属性字段

    这篇文章主要介绍了Java如何取掉json数据中值为null的属性字段,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-03-03
  • 手把手教你如何在idea中搭建SpringBoot项目

    手把手教你如何在idea中搭建SpringBoot项目

    这篇文章主要介绍了如何搭建一个SpringBoot项目,包括环境准备、创建新项目、探索项目结构以及展望未来,通过详细的步骤和实用的技巧,帮助开发者快速上手SpringBoot开发,文中通过图文介绍的非常详细,需要的朋友可以参考下
    2025-02-02
  • Java super关键字的使用详解

    Java super关键字的使用详解

    java中的super关键字是一个引用变量,用于引用直接父类对象,下面这篇文章主要给大家介绍一下super关键字的使用,需要的朋友可以参考下
    2022-07-07
  • Mybatis中的游标查询Cursor(滚动查询)

    Mybatis中的游标查询Cursor(滚动查询)

    这篇文章主要介绍了Mybatis中的游标查询Cursor(滚动查询),具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-01-01
  • Spring Boot中使用JDBC Templet的方法教程

    Spring Boot中使用JDBC Templet的方法教程

    这篇文章主要给大家介绍了关于在Spring Boot中使用JDBC Templet的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧。
    2018-03-03
  • Javadoc 具体使用详解

    Javadoc 具体使用详解

    这篇文章主要介绍了Javadoc 具体使用详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-08-08
  • SpringBoot如何配置MySQL和Oracl双数据源(Mybatis)

    SpringBoot如何配置MySQL和Oracl双数据源(Mybatis)

    这篇文章主要介绍了SpringBoot如何配置MySQL和Oracl双数据源(Mybatis)问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-03-03
  • springboot3解决跨域的几种方式小结

    springboot3解决跨域的几种方式小结

    这篇文章主要介绍了springboot3解决跨域的几种方式,文中通过代码示例给大家介绍的非常详细,对大家的解决跨域有一定的帮助,需要的朋友可以参考下
    2024-03-03
  • java使用jdbc连接数据库简单实例

    java使用jdbc连接数据库简单实例

    这篇文章主要为大家详细介绍了java使用jdbc连接数据库的简单实例,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2018-07-07
  • java中的equals()和toString()方法实例详解

    java中的equals()和toString()方法实例详解

    这篇文章主要介绍了java中的equals()和toString()方法实例详解的相关资料,这里举例说明,并附实例代码,和实现效果图,需要的朋友可以参考下
    2016-11-11

最新评论