Java API如何实现向Hive批量导入数据
Java API实现向Hive批量导入数据
Java程序中产生的数据,如果导入oracle或者mysql库,可以通过jdbc连接insert批量操作完成,但是当前版本的hive并不支持批量insert操作,因为需要先将结果数据写入hdfs文件,然后插入Hive表中。
package com.enn.idcard;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
/**
* <p>Description: </p>
* @author kangkaia
* @date 2017年12月26日 下午1:42:24
*/
public class HiveJdbc {
public static void main(String[] args) throws IOException {
List<List> argList = new ArrayList<List>();
List<String> arg = new ArrayList<String>();
arg.add("12345");
arg.add("m");
argList.add(arg);
arg = new ArrayList<String>();
arg.add("54321");
arg.add("f");
argList.add(arg);
// System.out.println(argList.toString());
String dst = "/test/kk.txt";
createFile(dst,argList);
loadData2Hive(dst);
}
/**
* 将数据插入hdfs中,用于load到hive表中,默认分隔符是"\001"
* @param dst
* @param contents
* @throws IOException
*/
public static void createFile(String dst , List<List> argList) throws IOException{
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
Path dstPath = new Path(dst); //目标路径
//打开一个输出流
FSDataOutputStream outputStream = fs.create(dstPath);
StringBuffer sb = new StringBuffer();
for(List<String> arg:argList){
for(String value:arg){
sb.append(value).append("\001");
}
sb.deleteCharAt(sb.length() - 4);//去掉最后一个分隔符
sb.append("\n");
}
sb.deleteCharAt(sb.length() - 2);//去掉最后一个换行符
byte[] contents = sb.toString().getBytes();
outputStream.write(contents);
outputStream.close();
fs.close();
System.out.println("文件创建成功!");
}
/**
* 将HDFS文件load到hive表中
* @param dst
*/
public static void loadData2Hive(String dst) {
String JDBC_DRIVER = "org.apache.hive.jdbc.HiveDriver";
String CONNECTION_URL = "jdbc:hive2://server-13:10000/default;auth=noSasl";
String username = "admin";
String password = "admin";
Connection con = null;
try {
Class.forName(JDBC_DRIVER);
con = (Connection) DriverManager.getConnection(CONNECTION_URL,username,password);
Statement stmt = con.createStatement();
String sql = " load data inpath '"+dst+"' into table population.population_information ";
stmt.execute(sql);
System.out.println("loadData到Hive表成功!");
} catch (SQLException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}finally {
// 关闭rs、ps和con
if(con != null){
try {
con.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
}
注意:
本例使用mvn搭建,conf配置文件放在src/main/resources目录下。
Hive提供的默认文件存储格式有textfile、sequencefile、rcfile等。用户也可以通过实现接口来自定义输入输的文件格式。
在实际应用中,textfile由于无压缩,磁盘及解析的开销都很大,一般很少使用。Sequencefile以键值对的形式存储的二进制的格式,其支持针对记录级别和块级别的压缩。rcfile是一种行列结合的存储方式(text file和sequencefile都是行表[row table]),其保证同一条记录在同一个hdfs块中,块以列式存储。一般而言,对于OLTP而言,行表优势大于列表,对于OLAP而言,列表的优势大于行表,特别容易想到当做聚合操作时,列表的复杂度将会比行表小的多,虽然单独rcfile的列运算不一定总是存在的,但是rcfile的高压缩率确实减少文件大小,因此实际应用中,rcfile总是成为不二的选择,达观数据平台在选择文件存储格式时也大量选择了rcfile方案。
通过hdfs导入hive的表默认是textfile格式的,因此可以改变存储格式,具体方法是先创建sequencefile、rcfile等格式的空表,然后重新插入数据即可。
insert overwrite table seqfile_table select * from textfile_table; …… insert overwrite table rcfile_table select * from textfile_table;
java 批量插入hive中转在HDFS
稍微修改了下,这文章是通过将数据存盘后,加载到HIVE.
模拟数据放到HDFS然后加载到HIVE,请大家记得添加HIVE JDBC依赖否则会报错。
加载前的数据表最好用外部表,否则会drop表的时候元数据会一起删除!
<dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-jdbc</artifactId> <version>1.1.0</version> </dependency>
代码
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class Demo {
public static void main(String[] args) throws Exception {
List<List> argList = new ArrayList<List>();
List<String> arg = new ArrayList<String>();
arg.add("12345");
arg.add("m");
argList.add(arg);
arg = new ArrayList<String>();
arg.add("54321");
arg.add("f");
argList.add(arg);
// System.out.println(argList.toString());
String dst = "/test/kk.txt";
createFile(dst,argList);
// loadData2Hive(dst);
}
/**
* 将数据插入hdfs中,用于load到hive表中,默认分隔符是"|"
* @param dst
* @param contents
* @throws IOException
* @throws Exception
* @throws InterruptedException
*/
public static void createFile(String dst , List<List> argList) throws IOException, InterruptedException, Exception{
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop:9000"),conf,"root");
Path dstPath = new Path(dst); //目标路径
//打开一个输出流
FSDataOutputStream outputStream = fs.create(dstPath);
StringBuffer sb = new StringBuffer();
for(List<String> arg:argList){
for(String value:arg){
sb.append(value).append("|");
}
sb.deleteCharAt(sb.length() - 1);//去掉最后一个分隔符
sb.append("\n");
}
byte[] contents = sb.toString().getBytes();
outputStream.write(contents);
outputStream.flush();;
outputStream.close();
fs.close();
System.out.println("文件创建成功!");
}
/**
* 将HDFS文件load到hive表中
* @param dst
*/
public static void loadData2Hive(String dst) {
String JDBC_DRIVER = "org.apache.hive.jdbc.HiveDriver";
String CONNECTION_URL = "jdbc:hive2://hadoop:10000/default";
String username = "root";
String password = "root";
Connection con = null;
try {
Class.forName(JDBC_DRIVER);
con = (Connection) DriverManager.getConnection(CONNECTION_URL,username,password);
Statement stmt = con.createStatement();
String sql = " load data inpath '"+dst+"' into table test ";//test 为插入的表
stmt.execute(sql);
System.out.println("loadData到Hive表成功!");
} catch (SQLException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}finally {
// 关闭rs、ps和con
if(con != null){
try {
con.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
}
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。
相关文章
mybatis参数类型不匹配错误argument type mismatch的处理方案
这篇文章主要介绍了mybatis参数类型不匹配错误argument type mismatch的处理方案,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教2022-01-01
springcloud中Ribbon和RestTemplate实现服务调用与负载均衡
这篇文章主要介绍了Ribbon和RestTemplate实现服务调用与负载均衡,想了解负载均衡的同学可以参考下2021-04-04
SpringBoot 整合Tess4J库实现图片文字识别案例详解
Tess4J是一个基于Tesseract OCR引擎的Java接口,可以用来识别图像中的文本,说白了,就是封装了它的API,让Java可以直接调用,今天给大家分享一个SpringBoot整合Tess4j库实现图片文字识别的小案例2023-10-10
解决idea中maven项目打包成jar报错:没有主清单属性的问题
这篇文章主要给大家分享了idea中maven项目打包成jar,报错没有主清单属性解决方法,文中有详细的解决方法,如果又遇到同样问题的朋友可以参考一下本文2023-09-09
Java使用Apache Commons高效处理CSV文件的操作指南
在 Java 开发中,CSV(Comma-Separated Values,逗号分隔值)是一种常见的数据存储格式,广泛用于数据交换和简单的存储任务,本文将介绍Java使用Apache Commons高效处理CSV文件的操作指南,需要的朋友可以参考下2025-03-03


最新评论