学习如何将Spark与MongoDB集成,提升大数据处理效率,构建更强大的数据分析系统。
原文标题:大数据系列之Spark和MongoDB集成
原文作者:牧羊人的方向
冷月清谈:
文章首先阐述了 Spark 和 MongoDB 集成的典型架构:Spark Driver 发起任务,Spark Master 调度资源,Spark Worker 节点上的 Executor 进程并行处理数据。所有与 MongoDB 的交互都通过 Mongo-Spark 连接器完成。
在连接器选择方面,官方连接器在读取数据时性能更优,支持条件下推,能够将过滤条件下推到 MongoDB 执行,减少数据传输量。第三方连接器则在写入数据时功能更丰富,支持更新操作。
文章详细介绍了 MongoDB-Spark 连接器的配置方法,包括输入配置、输出配置和缓存配置。配置可以通过 SparkConf 使用 --conf 参数或修改 $SPARK_HOME/conf/spark-default.conf 文件来完成。
连接器的使用方法也得到了清晰的讲解,包括使用 spark-submit 提交可执行程序和使用 spark shell 进行交互式操作。文章提供了示例代码,演示了如何使用连接器进行数据的读写操作。
最后,文章以一个具体的 PySpark 操作 MongoDB 的案例,演示了如何在实际环境中连接、读取和过滤 MongoDB 数据。案例涵盖了环境准备、连接建立、数据过滤、SQL 时间函数处理等方面的内容,并提供了完整的代码示例。
怜星夜思:
2、如何在 Spark 中高效地处理 MongoDB 中的嵌套文档和数组类型数据?
3、除了文中提到的架构,Spark 和 MongoDB 还有哪些其他的集成方式?各自的优缺点是什么?
原文内容
在Spark生态系统中,HDFS作为存储可以使用MongoDB来替代,构建成Spark+MongoDB生态系统。MongoDB作为文档存储型数据库,支持HDFS没有的索引概念,响应时间为毫秒级别,同时可以利用强大的aggregate函数做数据的筛选和预处理。
1、Spark和MongoDB集成
MongoDB是一种文档型数据库,作为一个适用于敏捷开发的数据库,MongoDB的数据模式可以随着应用程序的发展而灵活地更新。但是MongoDB适合一次查询的需求,对于统计、分析(尤其是在需要跨表、跨库的情况下)并不是太方便,我们可以用spark来处理MongoDB数据。Spark和MongoDB部署的一个典型架构如下:
Spark任务一般由Spark的driver节点发起,经过Spark Master进行资源调度分发。比如这里我们有4个Spark worker节点,这些节点上的几个executor 计算进程就会同时开始工作。一般一个core就对应一个executor。每个executor会独立的去MongoDB取来原始数据,直接套用Spark提供的分析算法或者使用自定义流程来处理数据,计算完后把相应结果写回到MongoDB。在这里,所有和MongoDB的交互都是通过一个叫做Mongo-Spark的连接器来完成的,Mongo-Spark连接器有官方版本和第三方开发的版本,本次主要使用官方的版本。
在获取数据时,官方连接器的性能似乎比第三方连接器的好一点,官方连接器有一个条件下推的原则。我们知道spark的算子分为两种:Transformation和Action,只有遇到Action算子才会触发作业的提交。比如在后续的一些Transformation算子中对数据有一定的数据过滤条件,官方连接器会把过滤条件下推到MongoDB去执行,这样可以保证从MongoDB取出来、经过网络传输到Spark计算节点的数据确实都是用得着的。在写数据到mongodb时,第三方连接器的功能比官方连接器要优,支持在原有表的基础上做更新。
-
官方connector的github地址:https://github.com/mongodb/mongo-spark
-
第三方connector的github地址:https://github.com/Stratio/Spark-MongoDB
MongoDB-Spark-Connector的配置可以通过使用SparkConf使用--conf或者$SPARK_HOME/conf/spark-default.conf文件进行指定。
如果这些input configuration通过SparkConf设置,需加上spark.mongodb.input前缀
参数名 | 描述 |
---|---|
uri | 连接MongoDB的uri地址IP或hostname, mongodb://host:port/ |
database | 从MongoDB中读取数据的database名称 |
collection | 从MongoDB中读取数据的collection名称 |
示例如下:
a)uri为:
spark.mongodb.input.uri=mongodb://127.0.0.1/databaseName.collectionName?readPreference=primaryPreferred
b)对应的configuration file如下:
spark.mongodb.input.uri=mongodb://127.0.0.1/
spark.mongodb.input.database=databaseName
spark.mongodb.input.collection=collectionName
spark.mongodb.input.readPreference.name=primaryPreferred
如果同时设置了uri和单独配置文件,uri设置会将单独设置覆盖掉。
如果这些output configuration通过SparkConf设置,需加上spark.mongodb.output前缀
参数名 | 描述 |
---|---|
uri | 连接MongoDB的uri地址IP或hostname, mongodb://host:port/ |
database | 数据写入MongoDB的database名称 |
collection | 数据写入MongoDB的collection名称 |
a) uri为:
spark.mongodb.output.uri=mongodb://127.0.0.1/test.myCollection
b)对应的configuration file如下
spark.mongodb.output.uri=mongodb://127.0.0.1/
spark.mongodb.output.database=test
spark.mongodb.output.collection=myCollection
如果同时设置了uri和单独配置文件,uri设置会将单独设置覆盖掉。
参数名 | 描述 | 默认值 |
---|---|---|
spark.mongodb.keep_alive_ms | The length of time to keep a MongoClient available for sharing. | 5000 |
首先要安装spark(如果不需要把数据保存到hdfs、不需要使用yarn,可以不安装hadoop),在spark目录下的bin目录下会有一个spark-submit可执行文件。例如把代码保存在test.py中,如果使用官方连接器,运行以下命令,在联网环境下如果没有找到连接会从官网自动下载并保存在路径/root/.ivy2/jars中。注:在内网环境下将cache、jar目录拷贝到~/.ivy2目录下即可
[root@tango-spark01 spark-2.3.0]# spark-submit --packages org.mongodb.spark:mongo-spark-connector_2.11:2.2.2 test-rdd/test-conn.py
test-conn.py程序如下:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
sc = SparkContext()
ctx = SQLContext(sc)
test_collection = ctx.read.format("com.mongodb.spark.sql").options(uri="mongodb://192.168.112.102:27017", database="syslogdb", collection="syslog_tango_01").load()
test_collection.printSchema()
print(test_collection.first())
print("helloworld!!")
如果通过pyspark连接MongoDB,运行:
./bin/pyspark
--conf "spark.mongodb.input.uri=mongodb://192.168.112.102/testdb.myCollection?readPreference=primaryPreferred"
--conf "spark.mongodb.output.uri=mongodb://192.168.112.102/testdb.myCollection"
--packages org.mongodb.spark:mongo-spark-connector_2.11:2.2.2
1)写入MongoDB
people = spark.createDataFrame([("Bilbo Baggins", 50), ("Gandalf", 1000), ("Thorin", 195), ("Balin", 178), ("Kili", 77), ("Dwalin", 169), ("Oin", 167), ("Gloin", 158), ("Fili", 82), ("Bombur", None)], ["name", "age"])
people.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").save()
people.show()
People DataFrame写入到MongoDB输出参数的database和collections中,如下:
2)读MongoDB
df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load()
df.printSchema()
输出如下:
输出如下:
>>> df.printSchema()
root
|-- _id: struct (nullable = true)
| |-- oid: string (nullable = true)
|-- age: long (nullable = true)
|-- name: string (nullable = true)
1)Spark集群环境准备
参考“”
2)MongoDB集群环境准备
参考“”
1)启动MongoDB集群环境
[root@tango-centos01 mongodb-linux-x86_64-rhel70-3.6.3]./bin/mongod -f ./config/master.conf
[root@tango-centos02 mongodb-linux-x86_64-rhel70-3.6.3]# ./bin/mongod -f ./config/slave.conf
[root@tango-centos03 mongodb-linux-x86_64-rhel70-3.6.3]# ./bin/mongod -f ./config/slave.conf
2)指定connector package执行代码
[root@tango-spark01 spark-2.3.0]# spark-submit --packages org.mongodb.spark:mongo-spark-connector_2.11:2.2.2 /usr/local/spark/ipynotebook/01-mongodb-01.py
3)建立MongoDB的连接
test_collection = ctx.read.format("com.mongodb.spark.sql").\
options(uri=urlAddr, database=dbName, collection=collID).load()
-
uri:MongoDB的URL地址,"mongodb://192.168.112.102:27017"
-
database:DB名称
-
collection:collection名称
4)访问数据Filter过滤
从mongodb中读取数据时指定数据分区字段,分区大小提高读取效率, 当需要过滤部分数据集的情况下使用Dataset/SQL的方式filter,Mongo Connector会创建aggregation pipeline在mongodb端进行过滤,然后再传回给spark进行优化处理。
a)使用Spark SQL中指定谓词filter数据
sqlContext.sql("select * from df_tb1 where timeStart < date_sub(current_date(),1)").show()
b)使用Data Frame对数据进行filter
originDf = df.filter(df["@timestamp"] < currentTimestamp & df["@timestamp"] >= currentTimestamp -1440*60*1000) .select("_id", "host", "message").toDF("id", "hostname", "message")
5)使用SQL时间函数处理时间戳字段
time1 = time()
swimmersJSON.registerTempTable("df_tb2")
df2=sqlContext.sql("select age,name,timeStart,unix_timestamp(timeStart) time_stamp from df_tb2")
df2.show()
df2.registerTempTable("df_tb3")
sqlContext.sql("select * from df_tb3 where time_stamp < "+str(time1)+"-24*3600").show()
6)完整代码如下:
# -*- coding: UTF-8 -*-
import sys
from time import time
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
import numpy as np
import math
#settings mongodb configuration infomation
urlAddr = "mongodb://192.168.112.102:27017"
dbName = "syslogdb"
collID ="syslog_tango_01"
#Configure Spark AppName and Master
def CreateSparkContext():
sparkConf = SparkConf().setAppName("PyMongo") \
.set("spark.ui.showConsoleProgress", "false") \
.setMaster("local[*]")
sc = SparkContext(conf = sparkConf)
print ("master="+sc.master)
return (sc)
# Connect to mongodb
# URL address and database and collection name
def ConnectMongo(ctx):
#ctx = SQLContext(sc)
test_collection = ctx.read.format("com.mongodb.spark.sql").\
options(uri=urlAddr, database=dbName, collection=collID).load()
return test_collection
# Connect to MongoDB with filter schema
def ConnectMongoFilter(ctx):
# ctx = SQLContext(sc)
fields_list = "@timestamp,@version,_id,host,message"
fields = [StructField(field_name, StringType(), True) for field_name in fields_list.split(',')]
schema = StructType(fields)
test_collection = ctx.read.schema(schema).format("com.mongodb.spark.sql"). \
options(uri=urlAddr, database=dbName, collection=collID).load()
return test_collection
if __name__ == "__main__":
print("PySpark-MongoDB")
sc=CreateSparkContext()
ctx = SQLContext(sc)
retCollid = ConnectMongo(ctx)
# print schema and content
retCollid.printSchema()
retCollid.first()
#use DataFrames
currentTimestamp=time()
print(currentTimestamp)
df = retCollid
print("########test information:########")
print(type(df))
print(df["@timestamp"])
df.select("@timestamp").show(5)
originDf = df.filter( \
df["@timestamp"] < currentTimestamp & df["@timestamp"] >= currentTimestamp - 1440*60*1000) \
.select("_id", "host", "message").toDF("id", "hostname", "message")
#originDf = df.filter(df["@timestamp"]>1533028658.78).select("_id", "host", "message").toDF("id", "hostname", "message")
originDf.show()
#use Filter Schema
filterColl = ConnectMongoFilter(ctx)
filterColl.show(5)
print("######################################")
#use Spark SQL
df.registerTempTable("tempSYSLOG")
sql = "select * from tempSYSLOG where host like 'tango-01%'"
result = ctx.sql(sql)
result.show(5)
参考资料
-
MongoDB Connector for Spark官方文档,https://docs.mongodb.com/spark-connector/v2.0/
-
http://www.mongoing.com/tj/mongodb_shanghai_spark