java 中Spark中将对象序列化存储到hdfs

 更新时间:2017年06月09日 14:12:18   作者:小水熊  
这篇文章主要介绍了java 中Spark中将对象序列化存储到hdfs的相关资料,需要的朋友可以参考下

java 中Spark中将对象序列化存储到hdfs

摘要: Spark应用中经常会遇到这样一个需求: 需要将JAVA对象序列化并存储到HDFS, 尤其是利用MLlib计算出来的一些模型, 存储到hdfs以便模型可以反复利用. 下面的例子演示了Spark环境下从Hbase读取数据, 生成一个word2vec模型, 存储到hdfs.

废话不多说, 直接贴代码了. spark1.4 + hbase0.98

import org.apache.spark.storage.StorageLevel
import scala.collection.JavaConverters._
import java.io.File
import java.io.FileInputStream
import java.io.FileOutputStream
import java.io.ObjectInputStream
import java.io.ObjectOutputStream
import java.net.URI
import java.util.Date
import org.ansj.library.UserDefineLibrary
import org.ansj.splitWord.analysis.NlpAnalysis
import org.ansj.splitWord.analysis.ToAnalysis
import org.apache.hadoop.fs.FSDataInputStream
import org.apache.hadoop.fs.FSDataOutputStream
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.FileUtil
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.filter.FilterList
import org.apache.hadoop.hbase.filter.PageFilter
import org.apache.hadoop.hbase.filter.RegexStringComparator
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.{Base64, Bytes}
import com.feheadline.fespark.db.Neo4jManager
import com.feheadline.fespark.util.Env
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd._
import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}
import scala.math.log
import scala.io.Source

object Word2VecDemo {

 def convertScanToString(scan: Scan) = {
  val proto = ProtobufUtil.toScan(scan)
  Base64.encodeBytes(proto.toByteArray)
 }

 def main(args: Array[String]): Unit = {
  val sparkConf = new SparkConf().setAppName("Word2Vec Demo")
  sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  sparkConf.set("spark.kryoserializer.buffer", "256m")
  sparkConf.set("spark.kryoserializer.buffer.max","2046m")
  sparkConf.set("spark.akka.frameSize", "500")
  sparkConf.set("spark.rpc.askTimeout", "30")
  

  val sc = new SparkContext(sparkConf)
  val hbaseConf = HBaseConfiguration.create()
  hbaseConf.set("hbase.zookeeper.quorum", "myzookeeper")

  hbaseConf.set(TableInputFormat.INPUT_TABLE, "crawled")

  val scan = new Scan()
  val filterList:FilterList = new FilterList(FilterList.Operator.MUST_PASS_ALL)
  
  val comp:RegexStringComparator = new RegexStringComparator(""".{1500,}""")
  
  val articleFilter:SingleColumnValueFilter = new SingleColumnValueFilter(
  "data".getBytes,
  "article".getBytes,
  CompareOp.EQUAL,
  comp
  )
  
  filterList.addFilter(articleFilter)
  filterList.addFilter(new PageFilter(100))
  
  scan.setFilter(filterList)
  scan.setCaching(50)
  scan.setCacheBlocks(false)
  hbaseConf.set(TableInputFormat.SCAN,convertScanToString(scan))

  val crawledRDD = sc.newAPIHadoopRDD(
   hbaseConf,
   classOf[TableInputFormat],
   classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
   classOf[org.apache.hadoop.hbase.client.Result]
  )
 
  val articlesRDD = crawledRDD.filter{
   case (_,result) => {
     val content = Bytes.toString(result.getValue("data".getBytes,"article".getBytes))
     content != null
   }
  }

  val wordsInDoc = articlesRDD.map{
   case (_,result) => {
     val content = Bytes.toString(result.getValue("data".getBytes,"article".getBytes))
     if(content!=null)ToAnalysis.parse(content).asScala.map(_.getName).toSeq
     else Seq("")
   }
  }
  
  val fitleredWordsInDoc = wordsInDoc.filter(_.nonEmpty)
  
  val word2vec = new Word2Vec()
  val model = word2vec.fit(fitleredWordsInDoc)
  
  //---------------------------------------重点看这里-------------------------------------------------------------
  //将上面的模型存储到hdfs
  val hadoopConf = sc.hadoopConfiguration
  hadoopConf.set("fs.defaultFS", "hdfs://myhadoop:9000/")
  val fileSystem = FileSystem.get(hadoopConf)
  val path = new Path("/user/hadoop/data/mllib/word2vec-object")
  val oos = new ObjectOutputStream(new FSDataOutputStream(fileSystem.create(path)))
  oos.writeObject(model)
  oos.close
  
  //这里示例另外一个程序直接从hdfs读取序列化对象使用模型
  val ois = new ObjectInputStream(new FSDataInputStream(fileSystem.open(path)))
  val sample_model = ois.readObject.asInstanceOf[Word2VecModel]
  
  /*
  * //你还可以将序列化文件从hdfs放到本地, scala程序使用模型
  * import java.io._
  * import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}
  * val ois = new ObjectInputStream(new FileInputStream("/home/cherokee/tmp/word2vec-object"))
  * val sample_model = ois.readObject.asInstanceOf[Word2VecModel]
  * ois.close
  */
  //--------------------------------------------------------------------------------------------------------------
 }
}


感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!

相关文章

  • JavaFX之TableView的使用详解

    JavaFX之TableView的使用详解

    这篇文章主要介绍了JavaFX之TableView的使用,有需要的朋友可以参考一下
    2013-12-12
  • springboot swagger2注解使用的教程

    springboot swagger2注解使用的教程

    这篇文章主要介绍了springboot swagger2注解使用,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-05-05
  • java常用工具类 数字工具类

    java常用工具类 数字工具类

    这篇文章主要为大家详细介绍了java常用工具类中的数字工具类,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2019-05-05
  • Java获取两个字符串中最大相同子串的方法

    Java获取两个字符串中最大相同子串的方法

    今天小编就为大家分享一篇Java获取两个字符串中最大相同子串的方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2018-07-07
  • 配置DispatcherServlet的方法介绍

    配置DispatcherServlet的方法介绍

    今天小编就为大家分享一篇关于配置DispatcherServlet的方法介绍,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧
    2019-01-01
  • Springboot pom项目间接依赖包版本与预期不符原因解决分析

    Springboot pom项目间接依赖包版本与预期不符原因解决分析

    这篇文章主要介绍了Springboot pom项目间接依赖包版本与预期不符原因解决分析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-08-08
  • Java搭建一个springboot3.4.1项目 JDK21的详细过程

    Java搭建一个springboot3.4.1项目 JDK21的详细过程

    这篇文章详细介绍了如何使用IntelliJ IDEA搭建一个基于Spring Boot 3.4.1的项目,并使用JDK 21和Maven 3.6.3,涵盖了环境准备、项目创建、依赖管理、Maven配置、以及解决常见问题的步骤,感兴趣的朋友跟随小编一起看看吧
    2025-01-01
  • 使用IDEA打jar包的详细图文教程

    使用IDEA打jar包的详细图文教程

    JAR文件是一种压缩文件,与常见的ZIP压缩文件兼容,被称为JAR包,下面这篇文章主要给大家介绍了关于使用IDEA打jar包的相关资料,文中通过图文介绍的非常详细,需要的朋友可以参考下
    2022-08-08
  • Linux环境卸载Centos7自带的OpenJDK和安装JDK1.8图文教程

    Linux环境卸载Centos7自带的OpenJDK和安装JDK1.8图文教程

    CentOS系统是开发者常用的Linux操作系统,安装它时会默认安装自带的旧版本的OpenJDK,但在开发者平时开发Java项目时还是需要完整的JDK,这篇文章主要给大家介绍了关于Linux环境卸载Centos7自带的OpenJDK和安装JDK1.8的相关资料,需要的朋友可以参考下
    2024-07-07
  • 非常全面的Java SpringBoot点赞功能实现

    非常全面的Java SpringBoot点赞功能实现

    但是这些功能再项目中是高频出现的,如果直接操作数据库的话,对数据库压力太大。那遇到这个问题怎么解决?这篇文章主要给大家介绍了关于Java SpringBoot点赞功能实现 的相关资料,需要的朋友可以参考下
    2022-01-01

最新评论