我司Spark迁移Hive数据到MongoDB生产案例代码–转载

MongoDB admin 2年前 (2018-12-28) 307次浏览 0个评论 扫描二维码

本文章转自若泽大数据,如果您也想转载请注明出处,原文地址:
我司Spark迁移Hive数据到MongoDB生产案例代码
(若泽大数据:www.ruozedata.com ,系统化讲解大数据知识,专注于生产案例的讲解,讲师全部为一线在职牛人,不定期学员分享生产案例,所有人一起进步)

Hive emp表数据如下

hive (soul)> select * from emp;
OK
emp.empno    emp.ename   emp.job emp.age emp.deptno
7369    SMITH   CLERK   24  10
7499    ALLEN   SALESMAN    30  20
7521    WARD    SALESMAN    25  30
7654    MARTIN  SALESMAN    23  10
7698    BLAKE   MANAGER 29  40

pom.xml

 <properties>
    <scala.version>2.11.8</scala.version>
    <spark.version>2.2.0</spark.version>
    <hive.version>1.1.0</hive.version>
  </properties>
<dependency>
      <groupId>org.mongodb.spark</groupId>
      <artifactId>mongo-spark-connector_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>org.mongodb</groupId>
      <artifactId>mongo-java-driver</artifactId>
      <version>3.6.3</version>
    </dependency>
    <dependency>
      <groupId>org.mongodb</groupId>
      <artifactId>bson</artifactId>
      <version>3.4.0</version>
    </dependency>
    <!--SparkHive-->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-hive_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <!--MySQL Driver-->
    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>5.1.39</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>

MongoDB 的updateSave(对已有数据进行更新) upsertSave(有就更新没有就插入)工具类

package com.soul.utils
import com.mongodb.client.MongoCollection
import com.mongodb.client.model.{ReplaceOneModel, UpdateOneModel}
import com.mongodb.spark.MongoConnector
import com.mongodb.spark.config.WriteConfig
import org.apache.spark.rdd.RDD
import org.bson.Document
import scala.collection.JavaConverters._
import scala.reflect.ClassTag
/**
  * @author 若泽数据 soulChun
  * @create 2018-12-18-20:37
  */
object MongoUtils {
  val DefaultMaxBatchSize = 100000
  def updateSave[D: ClassTag](rdd: RDD[UpdateOneModel[Document]]): Unit = updateSave(rdd, WriteConfig(rdd.sparkContext))
  def updateSave[D: ClassTag](rdd: RDD[UpdateOneModel[D]], writeConfig: WriteConfig): Unit = {
    val mongoConnector = MongoConnector(writeConfig.asOptions)
    rdd.foreachPartition(iter => if (iter.nonEmpty) {
      mongoConnector.withCollectionDo(writeConfig, { collection: MongoCollection[D] =>
        iter.grouped(DefaultMaxBatchSize).foreach(batch => collection.bulkWrite(batch.toList.asJava))
      })
    })
  }
  def upsertSave[D: ClassTag](rdd: RDD[ReplaceOneModel[Document]]): Unit = upsertSave(rdd, WriteConfig(rdd.sparkContext))
  def upsertSave[D: ClassTag](rdd: RDD[ReplaceOneModel[D]], writeConfig: WriteConfig): Unit = {
    val mongoConnector = MongoConnector(writeConfig.asOptions)
    rdd.foreachPartition(iter => if (iter.nonEmpty) {
      mongoConnector.withCollectionDo(writeConfig, { collection: MongoCollection[D] =>
        iter.grouped(DefaultMaxBatchSize).foreach(batch => collection.bulkWrite(batch.toList.asJava))
      })
    })
  }
}

一、将DF存入MongoDB

package com.soul.sparkmg;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.ReplaceOneModel;
import com.mongodb.client.model.UpdateOneModel;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.spark.MongoSpark;
import com.soul.utils.MongoUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.hive.HiveContext;
import org.bson.Document;
/**
 * @author 若泽数据 soulChun
 * @create 2018-12-15-16:17
 */
public class SparkHiveToMg {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("SparkHiveToMg").setMaster("local[2]");
        //如何你的密码中有@符号 请用%40代替
        conf.set("spark.mongodb.output.uri", "mongodb://root:root@127.0.0.1/soul_db.emp");
        JavaSparkContext jsc =  new JavaSparkContext(conf);
        HiveContext hc = new HiveContext(jsc);
        Dataset<Row> df  =hc.table("soul.emp");
        //直接存DF到MongoDB
        MongoSpark.save(df);
        jsc.stop();
    }
}

启动程序会自动在MongoDB建表emp(emp是在uri中指定的,可以自己修改),然后将数据插入,发现五条数据已存入MongoDB。

二、将RDD存入MongoDB

package com.soul.sparkmg;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.ReplaceOneModel;
import com.mongodb.client.model.UpdateOneModel;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.spark.MongoSpark;
import com.soul.utils.MongoUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.hive.HiveContext;
import org.bson.Document;
/**
 * @author 若泽数据 soulChun
 * @create 2018-12-15-16:17
 */
public class SparkHiveToMg {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("SparkHiveToMg").setMaster("local[2]");
        //如何你的密码中有@符号 请用%40代替
        conf.set("spark.mongodb.output.uri", "mongodb://root:root@127.0.0.1/soul_db.emp");
        JavaSparkContext jsc =  new JavaSparkContext(conf);
        HiveContext hc = new HiveContext(jsc);
        Dataset<Row> df  =hc.table("soul.emp");
        JavaRDD<Row> rdd = df.toJavaRDD();
        //insert
        JavaRDD<Document> rddDoc= rdd.map(new Function<Row, Document>() {
            public Document call(Row row) throws Exception {
                Document doc = new Document();
                doc.put("empno",row.get(0));
                doc.put("ename",row.get(1));
                doc.put("job",row.get(2));
                doc.put("age",row.get(3));
                doc.put("deptno",row.get(4));
                return doc;
            }
        });
        MongoSpark.save(rddDoc);
        jsc.stop();
    }
}

三、对已有数据进行更新

将MongoDB中第一个文档的age改成100

然后运行以下程序

package com.soul.sparkmg;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.ReplaceOneModel;
import com.mongodb.client.model.UpdateOneModel;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.spark.MongoSpark;
import com.soul.utils.MongoUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.hive.HiveContext;
import org.bson.Document;
/**
 * @author 若泽数据 soulChun
 * @create 2018-12-15-16:17
 */
public class SparkHiveToMg {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("SparkHiveToMg").setMaster("local[2]");
        //如何你的密码中有@符号 请用%40代替
        conf.set("spark.mongodb.output.uri", "mongodb://root:root@127.0.0.1/soul_db.emp");
        JavaSparkContext jsc =  new JavaSparkContext(conf);
        HiveContext hc = new HiveContext(jsc);
        Dataset<Row> df  =hc.table("soul.emp");
        //直接存DF到MongoDB
//        MongoSpark.save(df);
        JavaRDD<Row> rdd = df.toJavaRDD();
        //update
        JavaRDD<UpdateOneModel<Document>> rddUpdate= rdd.map(new Function<Row, UpdateOneModel<Document>>() {
            public UpdateOneModel<Document> call(Row row) throws Exception {
                Document doc = new Document();
                doc.put("empno",row.get(0));
                doc.put("ename",row.get(1));
                doc.put("job",row.get(2));
                doc.put("age",row.get(3));
                doc.put("deptno",row.get(4));
                Document modifiers = new Document();
                modifiers.put("$set",doc);
                return new UpdateOneModel<Document>(Filters.eq("empno",doc.get("empno")),modifiers,new UpdateOptions().upsert(true));
            }
        });
        MongoUtils.updateSave(rddUpdate.rdd(),rddUpdate.classTag());
        jsc.stop();
    }
}

运行完毕后查看MongoDB中还是五条数据,而且age已更新为原有的30

四、对已有数据进行更新而且没有的进行插

删除MongoDB中emp的4、5文档,而且将第一个文档的age改为200

然后运行以下程序

package com.soul.sparkmg;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.ReplaceOneModel;
import com.mongodb.client.model.UpdateOneModel;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.spark.MongoSpark;
import com.soul.utils.MongoUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.hive.HiveContext;
import org.bson.Document;
/**
 * @author 若泽数据 soulChun
 * @create 2018-12-15-16:17
 */
public class SparkHiveToMg {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("SparkHiveToMg").setMaster("local[2]");
        //如何你的密码中有@符号 请用%40代替
        conf.set("spark.mongodb.output.uri", "mongodb://root:root@127.0.0.1/soul_db.emp");
        JavaSparkContext jsc =  new JavaSparkContext(conf);
        HiveContext hc = new HiveContext(jsc);
        Dataset<Row> df  =hc.table("soul.emp");
        JavaRDD<Row> rdd = df.toJavaRDD();
        //upsert
        JavaRDD<ReplaceOneModel<Document>> rddUpsert= rdd.map(new Function<Row, ReplaceOneModel<Document>>() {
            public ReplaceOneModel<Document> call(Row row) throws Exception {
                Document doc = new Document();
                doc.put("empno",row.get(0));
                doc.put("ename",row.get(1));
                doc.put("job",row.get(2));
                doc.put("age",row.get(3));
                doc.put("deptno",row.get(4));
//                Document modifiers = new Document();
//                modifiers.put("$set",doc);
                return new ReplaceOneModel<Document>(Filters.eq("empno",doc.get("empno")),doc,new UpdateOptions().upsert(true));
            }
        });
        MongoUtils.upsertSave(rddUpsert.rdd(),rddUpsert.classTag());
        jsc.stop();
    }
}

会发现数据已恢复

如果对数据进行Update或者Upsert的时候记得将

Filters.eq("empno",doc.get("empno")

关联字段empno在MongoDB中设置成索引字段,可以提高性能。如果公司有调度平台(支持动态传参)可以将上面的内容改写成插件,支持任意Hive表的迁移。

MongoSpark的具体使用请参考:

https://docs.mongodb.com/?_ga=2.206688532.683104556.1545909674-1980198650.1544859775

codeobj , 版权所有丨如未注明 , 均为原创丨本网站采用BY-NC-SA协议进行授权
转载请注明原文链接:我司Spark迁移Hive数据到MongoDB生产案例代码–转载
喜欢 (0)
[a37free@163.com]
分享 (0)
发表我的评论
取消评论
表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址