Spark 与 MongoDB 集成指南:打造高效大数据生态

学习如何将Spark与MongoDB集成,提升大数据处理效率,构建更强大的数据分析系统。

原文标题:大数据系列之Spark和MongoDB集成

原文作者:牧羊人的方向

冷月清谈:

本文介绍了如何将 Spark 和 MongoDB 集成,以构建更高效的大数据生态系统。MongoDB 作为文档数据库,支持索引、快速响应,并具备强大的聚合函数,可弥补 HDFS 在实时分析方面的不足。

文章首先阐述了 Spark 和 MongoDB 集成的典型架构:Spark Driver 发起任务,Spark Master 调度资源,Spark Worker 节点上的 Executor 进程并行处理数据。所有与 MongoDB 的交互都通过 Mongo-Spark 连接器完成。

在连接器选择方面,官方连接器在读取数据时性能更优,支持条件下推,能够将过滤条件下推到 MongoDB 执行,减少数据传输量。第三方连接器则在写入数据时功能更丰富,支持更新操作。

文章详细介绍了 MongoDB-Spark 连接器的配置方法,包括输入配置、输出配置和缓存配置。配置可以通过 SparkConf 使用 --conf 参数或修改 $SPARK_HOME/conf/spark-default.conf 文件来完成。

连接器的使用方法也得到了清晰的讲解,包括使用 spark-submit 提交可执行程序和使用 spark shell 进行交互式操作。文章提供了示例代码,演示了如何使用连接器进行数据的读写操作。

最后,文章以一个具体的 PySpark 操作 MongoDB 的案例,演示了如何在实际环境中连接、读取和过滤 MongoDB 数据。案例涵盖了环境准备、连接建立、数据过滤、SQL 时间函数处理等方面的内容,并提供了完整的代码示例。

怜星夜思:

1、文章提到了官方和第三方 MongoDB Spark 连接器,实际应用中该如何选择?除了文章中提到的读写性能和更新功能,还有哪些其他方面的考量因素?
2、如何在 Spark 中高效地处理 MongoDB 中的嵌套文档和数组类型数据?
3、除了文中提到的架构,Spark 和 MongoDB 还有哪些其他的集成方式?各自的优缺点是什么?

原文内容

在Spark生态系统中,HDFS作为存储可以使用MongoDB来替代,构建成Spark+MongoDB生态系统。MongoDB作为文档存储型数据库,支持HDFS没有的索引概念,响应时间为毫秒级别,同时可以利用强大的aggregate函数做数据的筛选和预处理。

1、Spark和MongoDB集成

MongoDB是一种文档型数据库,作为一个适用于敏捷开发的数据库,MongoDB的数据模式可以随着应用程序的发展而灵活地更新。但是MongoDB适合一次查询的需求,对于统计、分析(尤其是在需要跨表、跨库的情况下)并不是太方便,我们可以用spark来处理MongoDB数据。Spark和MongoDB部署的一个典型架构如下:

Spark任务一般由Spark的driver节点发起,经过Spark Master进行资源调度分发。比如这里我们有4个Spark worker节点,这些节点上的几个executor 计算进程就会同时开始工作。一般一个core就对应一个executor。每个executor会独立的去MongoDB取来原始数据,直接套用Spark提供的分析算法或者使用自定义流程来处理数据,计算完后把相应结果写回到MongoDB。在这里,所有和MongoDB的交互都是通过一个叫做Mongo-Spark的连接器来完成的,Mongo-Spark连接器有官方版本和第三方开发的版本,本次主要使用官方的版本。

1.1 MongoDB-Spark-Connector选择

在获取数据时,官方连接器的性能似乎比第三方连接器的好一点,官方连接器有一个条件下推的原则。我们知道spark的算子分为两种:Transformation和Action,只有遇到Action算子才会触发作业的提交。比如在后续的一些Transformation算子中对数据有一定的数据过滤条件,官方连接器会把过滤条件下推到MongoDB去执行,这样可以保证从MongoDB取出来、经过网络传输到Spark计算节点的数据确实都是用得着的。在写数据到mongodb时,第三方连接器的功能比官方连接器要优,支持在原有表的基础上做更新。

  • 官方connector的github地址:https://github.com/mongodb/mongo-spark

  • 第三方connector的github地址:https://github.com/Stratio/Spark-MongoDB

1.2 MongoDB-Spark-Connector配置

MongoDB-Spark-Connector的配置可以通过使用SparkConf使用--conf或者$SPARK_HOME/conf/spark-default.conf文件进行指定。

1.2.1 Input Configuration

如果这些input configuration通过SparkConf设置,需加上spark.mongodb.input前缀

参数名 描述
uri 连接MongoDB的uri地址IP或hostname, mongodb://host:port/
database 从MongoDB中读取数据的database名称
collection 从MongoDB中读取数据的collection名称

示例如下:

a)uri为:

spark.mongodb.input.uri=mongodb://127.0.0.1/databaseName.collectionName?readPreference=primaryPreferred

b)对应的configuration file如下:

spark.mongodb.input.uri=mongodb://127.0.0.1/
spark.mongodb.input.database=databaseName
spark.mongodb.input.collection=collectionName
spark.mongodb.input.readPreference.name=primaryPreferred

如果同时设置了uri和单独配置文件,uri设置会将单独设置覆盖掉。

1.2.2 Output Configuration

如果这些output configuration通过SparkConf设置,需加上spark.mongodb.output前缀

参数名 描述
uri 连接MongoDB的uri地址IP或hostname, mongodb://host:port/
database 数据写入MongoDB的database名称
collection 数据写入MongoDB的collection名称

a) uri为:

spark.mongodb.output.uri=mongodb://127.0.0.1/test.myCollection

b)对应的configuration file如下

spark.mongodb.output.uri=mongodb://127.0.0.1/
spark.mongodb.output.database=test
spark.mongodb.output.collection=myCollection

如果同时设置了uri和单独配置文件,uri设置会将单独设置覆盖掉。

1.2.3 Cache Configuration
参数名 描述 默认值
spark.mongodb.keep_alive_ms The length of time to keep a MongoClient available for sharing. 5000
1.3 MongoDB连接器使用
1.3.1 spark-submit提交可执行程序

首先要安装spark(如果不需要把数据保存到hdfs、不需要使用yarn,可以不安装hadoop),在spark目录下的bin目录下会有一个spark-submit可执行文件。例如把代码保存在test.py中,如果使用官方连接器,运行以下命令,在联网环境下如果没有找到连接会从官网自动下载并保存在路径/root/.ivy2/jars中。注:在内网环境下将cache、jar目录拷贝到~/.ivy2目录下即可

[root@tango-spark01 spark-2.3.0]# spark-submit --packages org.mongodb.spark:mongo-spark-connector_2.11:2.2.2 test-rdd/test-conn.py

test-conn.py程序如下:

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *

sc = SparkContext()
ctx = SQLContext(sc)
test_collection = ctx.read.format("com.mongodb.spark.sql").options(uri="mongodb://192.168.112.102:27017", database="syslogdb", collection="syslog_tango_01").load()

test_collection.printSchema()
print(test_collection.first())
print("helloworld!!")
1.3.2 spark shell方式操作

如果通过pyspark连接MongoDB,运行:

./bin/pyspark 
--conf "spark.mongodb.input.uri=mongodb://192.168.112.102/testdb.myCollection?readPreference=primaryPreferred"
--conf "spark.mongodb.output.uri=mongodb://192.168.112.102/testdb.myCollection"
--packages org.mongodb.spark:mongo-spark-connector_2.11:2.2.2

1)写入MongoDB

people = spark.createDataFrame([("Bilbo Baggins",  50), ("Gandalf", 1000), ("Thorin", 195), ("Balin", 178), ("Kili", 77), ("Dwalin", 169), ("Oin", 167), ("Gloin", 158), ("Fili", 82), ("Bombur", None)], ["name", "age"])
people.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").save()
people.show()

People DataFrame写入到MongoDB输出参数的database和collections中,如下:

2)读MongoDB

df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load()
df.printSchema()

输出如下:

输出如下:
>>> df.printSchema()
root
|-- _id: struct (nullable = true)
| |-- oid: string (nullable = true)
|-- age: long (nullable = true)
|-- name: string (nullable = true)
1.4 PySpark-MongoDB数据加载
1.4.1 环境准备

1)Spark集群环境准备

参考“”

2)MongoDB集群环境准备

参考“”

1.4.2 PySpark操作MongoDB

1)启动MongoDB集群环境

[root@tango-centos01 mongodb-linux-x86_64-rhel70-3.6.3]./bin/mongod -f ./config/master.conf
[root@tango-centos02 mongodb-linux-x86_64-rhel70-3.6.3]# ./bin/mongod -f ./config/slave.conf
[root@tango-centos03 mongodb-linux-x86_64-rhel70-3.6.3]# ./bin/mongod -f ./config/slave.conf

2)指定connector package执行代码

[root@tango-spark01 spark-2.3.0]# spark-submit --packages org.mongodb.spark:mongo-spark-connector_2.11:2.2.2 /usr/local/spark/ipynotebook/01-mongodb-01.py

3)建立MongoDB的连接

test_collection = ctx.read.format("com.mongodb.spark.sql").\
options(uri=urlAddr, database=dbName, collection=collID).load()
  • uri:MongoDB的URL地址,"mongodb://192.168.112.102:27017"

  • database:DB名称

  • collection:collection名称

4)访问数据Filter过滤

从mongodb中读取数据时指定数据分区字段,分区大小提高读取效率, 当需要过滤部分数据集的情况下使用Dataset/SQL的方式filter,Mongo Connector会创建aggregation pipeline在mongodb端进行过滤,然后再传回给spark进行优化处理。

a)使用Spark SQL中指定谓词filter数据

sqlContext.sql("select * from df_tb1 where timeStart < date_sub(current_date(),1)").show()

b)使用Data Frame对数据进行filter

originDf = df.filter(df["@timestamp"] < currentTimestamp & df["@timestamp"] >= currentTimestamp -1440*60*1000) .select("_id", "host", "message").toDF("id", "hostname", "message")

5)使用SQL时间函数处理时间戳字段

time1 = time()
swimmersJSON.registerTempTable("df_tb2")
df2=sqlContext.sql("select age,name,timeStart,unix_timestamp(timeStart) time_stamp from df_tb2")
df2.show()
df2.registerTempTable("df_tb3")
sqlContext.sql("select * from df_tb3 where time_stamp < "+str(time1)+"-24*3600").show()

6)完整代码如下:

# -*- coding: UTF-8 -*-
import sys
from time import time
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
import numpy as np
import math

#settings mongodb configuration infomation
urlAddr = "mongodb://192.168.112.102:27017"
dbName = "syslogdb"
collID ="syslog_tango_01"

#Configure Spark AppName and Master
def CreateSparkContext():
sparkConf = SparkConf().setAppName("PyMongo") \
.set("spark.ui.showConsoleProgress", "false") \
.setMaster("local[*]")
sc = SparkContext(conf = sparkConf)
print ("master="+sc.master)
return (sc)

# Connect to mongodb
# URL address and database and collection name
def ConnectMongo(ctx):
#ctx = SQLContext(sc)
test_collection = ctx.read.format("com.mongodb.spark.sql").\
options(uri=urlAddr, database=dbName, collection=collID).load()
return test_collection

# Connect to MongoDB with filter schema
def ConnectMongoFilter(ctx):
# ctx = SQLContext(sc)
fields_list = "@timestamp,@version,_id,host,message"
fields = [StructField(field_name, StringType(), True) for field_name in fields_list.split(',')]
schema = StructType(fields)
test_collection = ctx.read.schema(schema).format("com.mongodb.spark.sql"). \
options(uri=urlAddr, database=dbName, collection=collID).load()
return test_collection

if __name__ == "__main__":
print("PySpark-MongoDB")
sc=CreateSparkContext()
ctx = SQLContext(sc)
retCollid = ConnectMongo(ctx)
# print schema and content
retCollid.printSchema()
retCollid.first()
#use DataFrames
currentTimestamp=time()
print(currentTimestamp)
df = retCollid
print("########test information:########")
print(type(df))
print(df["@timestamp"])
df.select("@timestamp").show(5)
originDf = df.filter( \
df["@timestamp"] < currentTimestamp & df["@timestamp"] >= currentTimestamp - 1440*60*1000) \
.select("_id", "host", "message").toDF("id", "hostname", "message")
#originDf = df.filter(df["@timestamp"]>1533028658.78).select("_id", "host", "message").toDF("id", "hostname", "message")
originDf.show()

#use Filter Schema
filterColl = ConnectMongoFilter(ctx)
filterColl.show(5)
print("######################################")
#use Spark SQL
df.registerTempTable("tempSYSLOG")
sql = "select * from tempSYSLOG where host like 'tango-01%'"
result = ctx.sql(sql)
result.show(5)

参考资料

  1. MongoDB Connector for Spark官方文档,https://docs.mongodb.com/spark-connector/v2.0/

  2. http://www.mongoing.com/tj/mongodb_shanghai_spark


我觉得选择哪个连接器主要还是看实际需求。如果你的应用主要是读取MongoDB的数据进行分析,那么官方连接器就足够了。但如果你的应用需要频繁地写入或更新MongoDB的数据,那么第三方连接器可能更合适。另外,还可以考虑社区活跃度和文档完善程度,毕竟用的人多,遇到问题也更容易解决。

除了使用 Mongo-Spark 连接器直接读取 MongoDB 数据,还可以将 MongoDB 数据导出为其他格式,例如 CSV、JSON 等,然后再导入 Spark 中进行处理。这种方式的优点是比较灵活,可以根据需要选择不同的数据格式,但缺点是需要额外的导出和导入步骤,可能会影响效率。

处理嵌套文档,可以使用 Spark SQL 的 explode 函数将嵌套数组或 map 展开成多行,或者使用 flatten 函数将嵌套结构扁平化。对于数组类型数据,可以使用 Spark SQL 的 array 函数进行操作,例如 array_contains, size 等。

补充一点,对于一些复杂的聚合操作,可以考虑直接在 MongoDB 中使用 Aggregation Framework 进行预处理,然后再将结果导入 Spark 中进行分析。这样可以充分利用 MongoDB 的优势,提高整体效率。当然,具体选择哪种方式还是要根据实际情况进行权衡。

除了 explode 和 flatten,还可以使用 Spark 的 Dataset API,它提供了更灵活的数据处理方式,可以方便地访问嵌套文档和数组中的元素。例如,可以使用 getAs 方法获取嵌套字段的值,或者使用 map 函数对数组元素进行转换。

还可以使用 Kafka 作为中间件,将 MongoDB 数据实时同步到 Kafka 中,然后 Spark 再从 Kafka 中消费数据进行处理。这种方式的优点是可以实现实时数据处理,但缺点是需要搭建和维护 Kafka 集群,增加了系统的复杂度。

如果嵌套结构比较复杂,可以考虑使用自定义 UDF (User Defined Function) 来处理。UDF 可以用 Java, Scala, Python 等语言编写,可以实现更复杂的逻辑。不过使用 UDF 可能会影响性能,需要谨慎使用。

关于MongoDB Spark连接器的选择,除了读写性能和更新功能外,还需要考虑数据量、数据类型、项目预算和团队技术栈。例如,如果数据量特别大,而且对实时性要求很高,那么官方连接器的性能优势就更明显。如果项目预算有限,并且团队对开源工具比较熟悉,那么选择官方连接器可能更合适。另外,如果需要一些高级功能,比如数据同步、增量更新等,可以考虑一些商业的连接器,它们通常提供更完善的功能和技术支持。

可以使用 MongoDB 的 BI Connector,它可以将 MongoDB 数据以 SQL 的方式暴露给 BI 工具,例如 Tableau, Power BI 等。这样就可以直接使用 BI 工具对 MongoDB 数据进行分析,而不需要编写 Spark 代码。这种方式的优点是方便快捷,但缺点是功能相对有限,无法进行复杂的分析。