详解HDFS多文件Join操作的实例

 更新时间:2017年10月18日 10:30:53   作者:回首凡尘不做仙  
这篇文章主要介绍了详解HDFS多文件Join操作的实例的相关资料,希望通过本文能帮助到大家,让大家理解掌握这部分内容,需要的朋友可以参考下

详解HDFS多文件Join操作的实例

最近在做HDFS文件处理之时,遇到了多文件Join操作,其中包括:All Join以及常用的Left Join操作,

下面是个简单的例子;采用两个表来做left join其中数据结构如下:

A 文件:

a|1b|2|c

B文件:

a|b|1|2|c

即:A文件中的第一、二列与B文件中的第一、三列对应;类似数据库中Table的主键/外键

代码如下:

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase;
import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase;
import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


import cn.eshore.traffic.hadoop.util.CommUtil;
import cn.eshore.traffic.hadoop.util.StringUtil;




/**
 * @ClassName: DataJoin
 * @Description: HDFS JOIN操作
 * @author hadoop
 * @date 2012-12-18 下午5:51:32
 */
public class InstallJoin extends Configured implements Tool {
private String static enSplitCode = "\\|";
private String static splitCode = "|";


// 自定义Reducer
public static class ReduceClass extends DataJoinReducerBase {


@Override
protected TaggedMapOutput combine(Object[] tags, Object[] values) {
String joinedStr = "";
//该段判断用户生成Left join限制【其中tags表示文件的路径,install表示文件名称前缀】
//去掉则为All Join
if (tags.length == 1 && tags[0].toString().contains("install")) {
return null;
}

Map<String, String> map = new HashMap<String, String>();
for (int i = 0; i < values.length; i++) {
TaggedWritable tw = (TaggedWritable) values[i];
String line = ((Text) tw.getData()).toString();


String[] tokens = line.split(enSplitCode, 8);
String groupValue = tokens[6];

String type = tokens[7];

map.put(type, groupValue);
}

joinedStr += StringUtil.getCount(map.get("7"))+"|"+StringUtil.getCount(map.get("30"));
TaggedWritable retv = new TaggedWritable(new Text(joinedStr));
retv.setTag((Text) tags[0]);
return retv;
}
}


// 自定义Mapper
public static class MapClass extends DataJoinMapperBase {


//自定义Key【类似数据库中的主键/外键】
@Override
protected Text generateGroupKey(TaggedMapOutput aRecord) {
String line = ((Text) aRecord.getData()).toString();
String[] tokens = line.split(CommUtil.enSplitCode);


String key = "";
String type = tokens[7];
//由于不同文件中的Key所在列有可能不同,所以需要动态生成Key,其中type为不同文件中的数据标识;如:A文件最后一列为a用于表示此数据为A文件数据
if ("7".equals(type)) {
key = tokens[0]+"|"+tokens[1];
}else if ("30".equals(type)) {
key = tokens[0]+"|"+tokens[2];
}
return new Text(key);
}


@Override
protected Text generateInputTag(String inputFile) {
return new Text(inputFile);
}


@Override
protected TaggedMapOutput generateTaggedMapOutput(Object value) {
TaggedWritable retv = new TaggedWritable((Text) value);
retv.setTag(this.inputTag);
return retv;
}


}


public static class TaggedWritable extends TaggedMapOutput {


private Writable data;


// 自定义
public TaggedWritable() {
this.tag = new Text("");
}


public TaggedWritable(Writable data) {
this.tag = new Text("");
this.data = data;
}


@Override
public Writable getData() {
return data;
}


@Override
public void write(DataOutput out) throws IOException {
this.tag.write(out);
out.writeUTF(this.data.getClass().getName());
this.data.write(out);
}


@Override
public void readFields(DataInput in) throws IOException {
this.tag.readFields(in);
String dataClz = in.readUTF();
if (this.data == null
|| !this.data.getClass().getName().equals(dataClz)) {
try {
this.data = (Writable) ReflectionUtils.newInstance(
Class.forName(dataClz), null);
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
this.data.readFields(in);
}


}


/**
* job运行
*/
@Override
public int run(String[] paths) throws Exception {
int no = 0;
try {
Configuration conf = getConf();
JobConf job = new JobConf(conf, InstallJoin.class);
FileInputFormat.setInputPaths(job, new Path(paths[0]));
FileOutputFormat.setOutputPath(job, new Path(paths[1]));
job.setJobName("join_data_test");
job.setMapperClass(MapClass.class);
job.setReducerClass(ReduceClass.class);
job.setInputFormat(TextInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(TaggedWritable.class);
job.set("mapred.textoutputformat.separator", CommUtil.splitCode);
JobClient.runJob(job);
no = 1;
} catch (Exception e) {
throw new Exception();
}
return no;
}


//测试
public static void main(String[] args) {
String[] paths = {
"hdfs://master...:9000/home/hadoop/traffic/join/newtype",
"hdfs://master...:9000/home/hadoop/traffic/join/newtype/output" }

int res = 0;
try {
res = ToolRunner.run(new Configuration(), new InstallJoin(), paths);
} catch (Exception e) {
e.printStackTrace();
}
System.exit(res);
}
}

如有疑问请留言或者到本站社区交流讨论,感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!

相关文章

  • 最全LocalDateTime、LocalDate、Date、String相互转化的方法

    最全LocalDateTime、LocalDate、Date、String相互转化的方法

    大家在开发过程中必不可少的和日期打交道,对接别的系统时,时间日期格式不一致,每次都要转化,本文为大家准备了最全的LocalDateTime、LocalDate、Date、String相互转化方法,需要的可以参考一下
    2023-06-06
  • SpringBoot Redis批量存取数据的操作

    SpringBoot Redis批量存取数据的操作

    这篇文章主要介绍了SpringBoot Redis批量存取数据的操作,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-08-08
  • 详解Spring事务回滚和事务提交

    详解Spring事务回滚和事务提交

    这篇文章主要介绍了详解Spring事务回滚和事务提交的相关资料,帮助大家更好的理解和学习使用spring框架,感兴趣的朋友可以了解下
    2021-03-03
  • Java Web程序中利用Spring框架返回JSON格式的日期

    Java Web程序中利用Spring框架返回JSON格式的日期

    这里我们来介绍一下Java Web程序中利用Spring框架返回JSON格式的日期的方法,前提注意使用@DatetimeFormat时要引入一个类库joda-time-版本.jar,否则会无法访问相应路径
    2016-05-05
  • MyBatisPlus批量添加的优化与报错解决

    MyBatisPlus批量添加的优化与报错解决

    MybatisPlus是一个高效的java持久层框架,它在Mybatis的基础上增加了一些便捷的功能,提供了更加易用的API,可以大幅度提高开发效率,这篇文章主要给大家介绍了关于MyBatisPlus批量添加的优化与报错解决的相关资料,需要的朋友可以参考下
    2023-05-05
  • maven中snapshot相关jar无法拉取问题及解决方案(常用方案)

    maven中snapshot相关jar无法拉取问题及解决方案(常用方案)

    Maven中的SNAPSHOT版本是指正在开发中的版本,这些版本可能会频繁地更新,在使用Maven构建项目时,有时会遇到无法拉取SNAPSHOT相关jar的问题,下面给大家分享maven中snapshot相关jar无法拉取问题及解决方案,感兴趣的朋友一起看看吧
    2024-06-06
  • SpringBoot3读取配置文件application.properties属性值的几种方式

    SpringBoot3读取配置文件application.properties属性值的几种方式

    这篇文章主要介绍了SpringBoot3读取配置文件application.properties属性值的几种方式,文中通过代码示例给大家讲解的非常详细,对大家的学习或工作有一定的帮助,需要的朋友可以参考下
    2024-11-11
  • Mybatis之解决collection一对多问题(显示的结果没有整合到一起)

    Mybatis之解决collection一对多问题(显示的结果没有整合到一起)

    这篇文章主要介绍了Mybatis之解决collection一对多问题(显示的结果没有整合到一起),具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-03-03
  • 浅谈Java之终止继承:Final类和Fianl方法

    浅谈Java之终止继承:Final类和Fianl方法

    这篇文章主要介绍了Java之终止继承:Final类和Fianl方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-03-03
  • Java 配置log 将日志信息输出到指定日志文件中

    Java 配置log 将日志信息输出到指定日志文件中

    这篇文章主要介绍了Java 配置log 将日志信息输出到指定日志文件中,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2023-09-09

最新评论