基于Shark/Spark的分布式空间数据分析框架

  • 温馨 , 1, 2 ,
  • 罗侃 1, 2 ,
  • 陈荣国 , 1, *
展开
  • 1. 中国科学院地理科学与资源研究所 资源与环境信息系统国家重点实验室,北京 100101
  • 2. 中国科学院大学,北京 100049
*通讯作者:陈荣国(1962-),男,研究员,研究方向为空间数据库理论与技术、空间信息共享与互操作。E-mail:

作者简介:温馨(1989-),女,硕士生,研究方向为云环境下的空间数据管理。E-mail:

收稿日期: 2014-10-11

  要求修回日期: 2014-11-26

  网络出版日期: 2015-04-10

基金资助

国家高技术发展研究计划“863”项目(2013AA12A204、2013AA122302)

A Framework of Distributed Spatial Data Analysis Based on Shark/Spark

  • WEN Xin , 1, 2 ,
  • LUO Kan 1, 2 ,
  • CHEN Rongguo , 1, *
Expand
  • 1. State Key Laboratory of Resources and Environmental Information System, Institute of Geographic Sciences and Natural Resources Research, CAS, Beijing 100101, China
  • 2. University of Chinese Academy of Sciences, Beijing 100049, China
*Corresponding author: CHEN Rongguo, E-mail:

Received date: 2014-10-11

  Request revised date: 2014-11-26

  Online published: 2015-04-10

Copyright

《地球信息科学学报》编辑部 所有

摘要

随着空间数据的与日俱增,传统依托于单节点的空间数据管理方法,已难以满足海量数据高并发的需求。云计算的兴起带来机遇与挑战,分布式技术与数据库技术的优势互补,为云计算下高效的数据管理提供了可能。本文提出一种在分布式计算引擎(Shark/Spark)中集合之关键技术(包括空间数据映射、空间数据加载、数据备份及空间查询等),将空间数据库对空间数据的高效存储、索引及查询优势与分布式计算引擎对复杂计算的优势相结合,实现一种基于Shark/Spark的分布式空间数据分析框架。在具体实现中,通过空间自定义函数和空间函数下推2种方式实现空间查询,结果表明,影响返回结果数据量的空间查询更适合下推给空间数据库完成,而不影响返回结果数据量的空间查询,利用分布式计算引擎直接运算更有优势。同时,通过与现有的一种分布式GIS方案(ArcGIS on Hadoop)对比发现,空间数据库的空间索引可有效提高查询效率,空间数据管理也更加独立。

本文引用格式

温馨 , 罗侃 , 陈荣国 . 基于Shark/Spark的分布式空间数据分析框架[J]. 地球信息科学学报, 2015 , 17(4) : 401 -407 . DOI: 10.3724/SP.J.1047.2015.00401

Abstract

With the development of technology, spatial datasets continue increasing in an incredible speed. Traditional data management based on single-node DBMS hardly meets the demands of high-concurrence in massive data. The rise of cloud computing brings brand new opportunities and challenges. Some researchers adopt a hybrid solution that combines the fault tolerance, heterogeneous cluster, and distributed computing framework together for efficient performances. Derived from the computing framework of Spark, Shark is a computing engine for fast data analysis. When a query is submitted, Shark compiles the query into an operator tree represented by RDDs, which will then be translated by Spark into a graph of tasks for execution. Shark does not support spatial query at the moment; therefore, we introduce an approach to enable Shark/Spark to support spatial query. With the APIs and UDFs that provided by Shark, Shark/Spark has the capability to process spatial data fetching from spatial databases and perform spatial queries according to the demands. Integrating Shark/Spark and relevant components which include mapping, loading, backup and querying of spatial data, and taking the advantages of the efficient spatial data management of spatial databases and high performance computing that involves the large-scale data processing of Spark, a framework of distributed spatial data analysis based on Shark/Spark has been implemented. During the implementation and testing process, we found that in order to achieve a better performance, some queries which had impacts on the returned dataset, should be pushed entirely into the database layer; while the other queries should be performed in Spark. In addition, we found that this system outperformed ArcGIS on Hadoop in some queries because the spatial index of spatial databases could improve its efficiency. Moreover, data management using a spatial database would be much more independent and convenient.

1 引言

空间技术及存储技术的发展,推动着空间数据爆炸式增长,海量空间数据的管理成为了地理信息系统(Geographic Information System,GIS)研究的热点[1-3]。传统的空间数据管理依托于单节点关系型数据库,其在海量数据管理、高并发读写和扩展性等方面存在局限性。云计算的兴起为空间数据管理带来了机遇与挑战,将云计算技术与空间数据库技术结合,实现对海量空间数据的检索查询,逐渐成为空间信息技术的新兴研究领域[4-5]。本文以空间数据库实现空间数据存储,对空间数据进行高效的一体化管理,集合分布式计算引擎进行空间运算,形成一种结合二者优势的分布式空间数据分析框架。
关于分布式技术与关系型数据库的结合已有一些讨论与尝试[6-7]。HadoopDB[8](现已改名为Hadpt)采用MapReduce作为多个数据库管理系统(Database Management System,DBMS)节点之间的连接层,通过转换HadoopDB中接收的SQL语句,将其推入到数据层处理。某些情况下,其可同时实现关系数据库的高性能特性和MapReduce的扩展性、容错性[9]。MongoDB(https://github.com/mongodb/mongo-hadoop)通过MongoDB Connector实现Hadoop与MongoDB的数据交换,提高了读取运算大数据的效率。在此基础之上,结合Spark API,Niskanen(http://codeforhire.com/2014/02/18/using-spark-with-mongodb/)实现MongoDB与Spark的结合。在分布式技术与GIS结合中,Hadoop-GIS[10]通过与Hive集成,利用MapReduce处理边界对象,实现了Hadoop的大型空间运算。Witayangkurn[11]将数据存储在PostGIS中,通过Java拓扑套件(Java Topology Suite,JTS)实现用户自定义函数(User Defined Function,UDF),在Hadoop/Hive中进行空间运算,与传统运算方式相比,效率得到极大提高;但其处理大数据的复杂运算能力还有提升的空间。美国环境系统研究所(Environmental Systems Research Institute,Inc,简称ESRl)开发了Esri Geometry API for Java,以及Spatial Framework for Hadoop(http://esri.github. io/gis-tools-for-hadoop/),将ArcGIS与Hadoop集成,通过在Hive中注册空间查询函数,查询Hadoop分布式文件系统(Hadoop Distributed File System,HDFS)上的空间数据,提高了空间大数据的并行处理能力;但采用HDFS存储空间数据,没有建立空间索引,检索效率有限,且对空间数据格式有一定限制,数据管理较为复杂。
本文提出的分布式空间数据分析框架,引入空间数据库一体化管理空间、属性数据,以及较小冗余度、高效空间索引等优势[12],结合分布式计算引擎对大型运算的优化支持,实现了一种分布式技术的空间数据分析方法。

2 实现空间数据分析框架的关键技术

本方法架构如图1所示,由2部分组成:(1)分布式计算层,以计算性能良好的Spark[13-15]作为计算引擎,其引入了与其他分布式计算不同的弹性分布式数据集(Resilient Distributed Datasets,RDD)进行内存运算,与MapReduce相比节省了大量的磁盘输入、输出(input/output,I/O)操作,提高了运算速度;并以Shark[16](Hive on Spark)作为数据仓库,将查询语句转换为Spark上的RDD操作。(2)数据管理层,采用空间数据库与HDFS混合的方式进行数据存储。通过在Shark中沿用Hive的StorageHandler方法,访问除HDFS文件以外的其他数据库或文件[17]
Fig. 1 The architecture of distributed spatial data analysis based on Shark/Spark

图1 基于Shark/Spark的分布式空间数据分析框架

通过编写空间数据库存储处理程序,自定义InputFormat、OutputFormat、SerDE实现HiveStorageHandler接口,使分布式计算引擎可使用空间操作语句操作底层空间数据库。设计并实现空间处理函数,通过JTS编写的空间UDF和修改查询计划实现空间函数下推2种方式,对空间数据进行空间分析,将适合空间数据库完成的空间运算下推给空间数据库处理,而适合分布式计算引擎完成的运算则直接由计算引擎处理。
具体工作流程:
(1)当客户端提交查询语句后,Shark接收、解析、优化查询语句,并从元数据库中读取存储节点数据库信息,根据信息初始化查询工作,并提交给Spark Master节点。
(2)Spark Master节点获取来自Shark提交的查询工作,根据查询工作的信息生成多个任务。所有任务初始化完成后,发给各个Slave节点执行。Master节点等待所有Slave节点执行的结果。
(3)当所有Slave节点执行完成后,将执行的结果返回给Master节点,Master对所有来自Slave节点的结果进行合并与计算,完成最终查询,并通知Shark工作已完成。

2.1 空间数据映射

针对空间数据的存储管理,SQL/MM[18]定义了空间数据类型的描述,包括点、线、面等地理对象,定义了操作相应数据类型的存储过程和空间分析函数及其3种转换格式,包括文本标记语言(Well-Known Text,WKT)、二进制标记语言(Well-Known Binary,WKB),以及地理标记语言(Geography Markup Language,GML)。目前,大多数空间数据库遵循此规范。Shark/Hive虽支持基本的数据类型,但无法直接支持空间数据库特有的空间数据类型。通过转换其提供的字节数组(Binary)类型,将符合空间数据库中SQL/MM规范的空间数据与分布式计算引擎进行交互,可解决这一问题。

2.2 空间数据加载

针对海量数据的存储和访问,将数据分片到不同节点上,可降低单台机器的负载,提高数据运算效率,减少故障造成的损失。不同节点中的数据若通过手动方式单独加载将影响工作效率,故考虑创建并行服务来进行数据高速加载。如图2所示,通过制定规则将数据分片平摊到各节点,执行安全外壳协议(Secure Shell,SSH)命令,启动各节点的数据加载进程,使得所有数据并行入库。数据加载完成后,在数据主节点的系统表中增加数据分区信息,最后在Shark/Hive中创建一张外部表(注册元数据)指定对应数据信息即可。
Fig. 2 Flow chart of spatial data loading

图2 空间数据加载

2.3 数据备份

采用复制器进行数据备份,并引用可用性探测机制,可规避因单点故障造成的数据错误或异常。如图3所示,通过复制器将每个物理节点的数据分片备份在其他物理节点中,节点故障时,可自动切换镜像。同时,通过对集群中各个虚拟节点进行心跳检测,更新虚拟节点状态的列表,并推送到应用端。如不正常,则不予分配负载;如正常,则从各相应分片中获取数据,汇总返回结果。
Fig. 3 Backup mechanism for spatial data

图3 空间数据备份

2.4 分布式空间查询

Shark/Spark原本并不支持空间查询,但允许用户编写UDF实现自定义功能,可利用此特性实现空间查询。本文实现一组有关空间查询的自定义函数并进行注册,Shark/Spark接收客户端提交的查询请求后,首先对语句进行解析,匹配相应的函数,并最终将解析语句转换为查询计划,通过与底层空间数据库进行查询交互,查询到对应结果并传回客户端。
图4所示,在空间查询的解析处理流程中,空间查询语句依次通过语法解析器、语义分析器、逻辑计划生成器、逻辑执行优化器、物理计划生成器[19],并最终执行。
Fig. 4 Query and compiling process of spatial data object

图4 空间数据对象查询解析流程

(1)语法解析器:根据语法规则及语法解析工具,对查询语句进行语法解析,构造抽象语法树。
(2)语义分析器:遍历语法解析器生成的抽象语法树,抽象出查询基本组成单元,并生成不同类型的SemanticAnalyzer,将相应信息进行保存和验证。
(3)逻辑计划生成器:根据语义分析器生成的相关信息,生成逻辑操作树。如有已注册的空间函数,判断是否为需要下推的空间查询;若是则修改逻辑查询执行计划,否则继续执行。
(4)逻辑执行优化器:根据一定的规则对生成的逻辑操作树进行优化操作。
(5)物理计划生成器:递归访问逻辑操作树,将逻辑操作转化为一系列的RDD。

2.5 空间数据分析

(1)空间UDF实现:基于Shark提供的UDF[20]方式,可直接实现空间查询函数(以下称为空间UDF),本文采用JTS辅助其实现。JTS是一个提供空间函数的开源Java库,其空间函数基于OGC标准规范,函数功能完整,运算效率较高,在空间数据分析领域使用广泛。空间UDF具体实现过程:从空间数据库中读取数据(WKB或WKT)并将其转换为空间对象,通过JTS提供的空间关系判断函数(以ST_Within为例)进行空间关系判断,输出空间判断结果(True/False)。具体的代码如下:
(2)空间函数下推:通过对语句的分析来判断是否进空间函数下推(以下称为函数下推),适合空间数据库完成的空间运算将被下推到数据库执行,优化空间查询效率。具体流程如图5所示,在逻辑计划生成阶段,通过遍历选择输出列(SelectOperator)中的各列,查看每一列是否为在配置文件中已注册并需要下推的空间函数,若是,则使用数据类型为相应下推函数返回类型的虚拟列加以替换。若WHERE子句中存在需要下推的空间函数,则将整条查询语句下推到空间数据库。最后重新生成新的逻辑查询执行计划。
Fig. 5 Flow chart of the pushing function in spatial query

图5 空间查询中函数下推流程

3 空间数据分析框架的实验与分析

3.1 实验环境与实验数据

测试环境如图6所示,总共7台相同配置的机器,其中,1台为master,其他6台为slave,master为管理节点,6台slave为计算和存储节点。实验数据采用百年全球地震数据,时间范围从1898-2011年,共包含77 037条数据。实验将分别对函数下推、空间UDF与ArcGIS on Hadoop 3种方法执行类似运算并进行对比。在数据底层存储,函数下推及空间UDF均采用本文方法,让数据存储在空间数据库中;ArcGIS on Hadoop则根据要求提前上传至HDFS中。
Fig. 6 Testing systems for the cluster

图6 集群测试环境

3.2 实验案例

3.2.1 SELECT子句执行运算效率对比
一般查询中,SELECT子句指定要查询的列,在SELECT子句后使用空间谓词ST_AsText(shape)将空间对象以文本的方式输出。
(1)函数下推及空间UDF执行查询的语句均为:
SELECT ST_AsText (shape)
FROM table where fid <= N;
(2)ArcGIS on Hadoop:
SELECT ST_AsText (ST_Point(x,y))
FROM table LIMIT N;
其中,xy为经纬度坐标,N为返回条数。
在实验中通过修改返回的数据量大小,记录不同实现方法的执行时间。实验结果如图7所示,函数下推及空间UDF的耗时随数据量增大而增加,而ArcGIS on Hadoop受数据量变化影响较小。在数据量较小时,函数下推及空间UDF执行时间差异较小,耗时短,ArcGIS on Hadoop耗时长;但面对较大数据量,ArcGIS on Hadoop则优于另2种方式。相较而言,利用空间UDF的查询效率略高于函数下推的方式。
Fig. 7 Example of the spatial query in SELECT clause

图7 SELECT子句中空间查询示例

3.2.2 WHERE子句执行运算效率对比
WHERE子句中,通过与SELECT语句结合可查找符合过滤条件的记录。ST_Within(shape1,shape2)通常用以判断空间对象shape1是否位于空间对象shape2内。在本次实验中,指定一个查询范围(矩形)内曾经发生过的地震,即在WHERE子句中使用空间谓词ST_Within返回运算为真的数据集。查询案例示意图如图8(b)所示,小圆点表示过去曾经发生过的所有地震事件,10个查询矩形分别编号1-10,其大小直接影响返回的数据量。
Fig. 8 Example of the spatial query in WHERE clause

图8 WHERE子句中空间查询示例

(1)函数下推查询语句为:
SELECT a
FROM earthquake
WHERE ST_Within(shape,
ST_Geomfromtext('POLYGON((Shape(N)))', 4326))=1;
(2)空间UDF查询语句为:
SELECT a
FROM earthquake
WHERE ST_Within (shape, POLYGON((Shape(N)))',4326)=1;
(3)ArcGIS on Hadoop查询语句为:
SELECT a
FROM earthquake
WHERE ST_Within(ST_Point(x, y),ST_Geomfromtext(
'POLYGON((Shape(N)))', 4326));
其中,xy为经纬度坐标,Shape(N)表示图8(b)中编号为N的矩形空间范围。
执行情况如图8(a)所示,随着数据量的变化,函数下推及空间UDF的运算较稳定,而ArcGIS on Hadoop耗时变化较大,其中函数下推明显优于另外两种方法,其运算时间比其他2种方式快10倍以上。

3.3 实验结果分析

通过实验发现,在空间数据管理方面,ArcGIS on Hadoop的数据采用HDFS进行管理,需上传特定格式的空间数据,或在其他平台ArcGIS中使用插件进行远程管理,操作复杂。本文方法采用空间数据库管理,无需担心特殊格式的问题。
在空间查询性能方面,本文方法与ArcGIS on Hadoop各有所长。直接查询数据或者直接进行空间计算时,在返回较小数据量的情况下,本文方法优于ArcGIS on Hadoop;返回较大数据量时,其运算速度不及ArcGIS on Hadoop;当使用空间函数筛选空间数据时,函数下推较空间UDF和ArcGIS on Hadoop运算时间快10倍。
本文框架采用的2种空间查询实现方法,在SELECT子句中,如果其操作不影响返回结果的数据量(如ST_AsText),可通过空间UDF进行,空间数据从空间数据库全部取出后,直接交予Spark进行调用和计算,可有效利用Spark集群计算优势,速度较快。在WHERE子句中,如果其操作影响返回结果的数据量(如ST_Within),则可通过函数下推的方式进行,将空间查询语句下推到数据库中,由空间数据库完成分析查询工作,仅返回符合条件的数据集。这种查询相较于空间UDF需从空间数据库中取出全部数据后,在Spark中进行计算,明显减少了通信时间,有效地减少了整个空间查询分析的执行时间。
对比本文中3种空间查询方法,函数下推、空间UDF,以及ArcGIS on Hadoop在空间查询中的运算差异可看出,空间UDF和ArcGIS on Hadoop方法因未建立空间索引,需遍历所有空间对象进行空间运算,效率较低。而空间数据库在导入空间数据时会建立空间索引,通过空间索引筛选空间数据,不符合特定空间操作的空间对象会被排除,可有效地减少空间运算次数,提高空间数据的检索效率。

4 结语

将空间数据管理与云计算结合是大势所趋,通过空间数据库管理空间数据便捷灵活,分布式框架可有效节约服务器资源、计算资源。本文采用底层数据由空间数据库管理,顶层采用Shark/Spark计算引擎的集成方法,可充分结合二者优势。在具体实现中,通过对不同的实现方法进行实践、对比,得出一种折中的实现方案,即将能充分利用适于空间数据库优势(如利用空间索引的空间查询)的工作下推给空间数据库,而需要大量计算的工作则交由分布式计算引擎完成。目前采用的空间UDF的方法进行空间查询,只利用了分布式计算引擎的部分优势,更全面地利用其优势并调优测试,不断提高运算效率,还有待今后更进一步的深入研究。

The authors have declared that no competing interests exist.

[1]
Goodchild M, Haining R, Wise S.Integrating GIS and spatial data analysis: Problems and possibilities[J]. International Journal of Geographical Information Systems, 1992,6(5):407-423.

[2]
Yang C, Goodchild M, Huang Q, et al.Spatial cloud computing: How can the geospatial sciences use and help shape cloud computing?[J]. International Journal of Digital Earth, 2011,4(4):305-329.

[3]
Zhong Y, Han J, Zhang T, et al.Towards parallel spatial query processing for big spatial data[C]. 2012 IEEE 26th International Parallel and Distributed Processing Symposium Workshops & PhD Forum, 2012.

[4]
Aji A, Wang F.High performance spatial query processing for large scale scientific data[C]. Proceedings of the on SIGMOD/PODS 2012 PhD Symposium. New York: ACM Press, 2012.

[5]
Cary A, Yesha Y, Adjouadi M, et al. Leveraging cloud computing in geodatabase management[C].2010 IEEE International Conference on Granular Computing (GrC), 2010.

[6]
Abadi D J.Data management in the cloud: Limitations and opportunities[J]. IEEE Data Engineering Bulletin, 2009,32(1):3-12.

[7]
Su X, Swart G.Oracle in-database Hadoop: When MapReduce meets RDBMS[C]. Proceedings of the 2012 ACM SIGMOD International Conference on Management of Data, 2012.

[8]
Abouzeid A, Bajda-Pawlikowski K, Abadi D, et al.HadoopDB: An architectural hybrid of MapReduce and DBMS technologies for analytical workloads[J]. Proceedings of the VLDB Endowment, 2009,2(1):922-933.

[9]
王珊,王会举,覃雄派,等. 架构大数据:挑战、现状与展望[J].计算机学报,2011(10):1741-1752.

[10]
Aji A, Wang F, Vo H, et al.Hadoop-GIS: A high performance spatial data warehousing system over MapReduce[J]. Proceedings of the VLDB Endowment International Conference on Very Large Data Bases, 2013,6(11):1009-1020.

[11]
Witayangkurn A, Horanont T, Shibasaki R.Performance comparisons of spatial data processing techniques for a large scale mobile phone dataset[C]. Proceedings of the 3rd International Conference on Computing for Geospatial Research and Applications. New York: ACM Press, 2012.

[12]
程昌秀. 空间数据库管理系统概论[M].北京:科学出版社,2012.

[13]
Zaharia M, Chowdhury M, Franklin M J, et al.Spark: Cluster computing with working sets[C]. Proceedings of the 2Nd USENIX Conference on Hot Topics in Cloud Computing. Berkeley: USENIX Association, 2010:10.

[14]
Tabaa Y, Medouri A, Tetouan M.Towards a next generation of scientific computing in the cloud[J]. International Journal of Computer Science, 2012,9(6):177-183.

[15]
Zaharia M, Chowdhury M, Das T, et al.Fast and interactive analytics over Hadoop data with Spark[C]. USENIX, 2012.

[16]
Xin R S, Rosen J, Zaharia M, et al.Shark: SQL and rich analytics at scale[C]. Proceedings of the 2013 ACM SIGMOD International Conference on Management of Data, 2013:13-24.

[17]
Edward C, Dean W, Jason R.Programming Hive[M].北京:人民邮电出版社,2013

[18]
Stolze K.SQL/MM spatial-The standard to manage spatial data in a relational database system[C]. Leipzig: BTW, 2003.

[19]
高昂,陈荣国,赵彦庆,等.空间数据访问集成与分布式空间数据源对象查询[J].地球信息科学学报,2010,12(4):532-540.

[20]
Engle C, Lupher A, Xin R, et al.Shark: Fast data analysis using coarse-grained distributed memory[C]. Proceedings of the 2012 ACM SIGMOD International Conference on Management of Data, 2012.

文章导航

/