2016-12-07 78 views
0

我目前正在解决一个涉及来自总线的GPS数据的问题。我面临的问题是在我的过程中减少计算。在Apache Spark中过滤空间数据

在一张桌子上有大约20亿个GPS坐标点(Lat-Long度数),在另一个桌子上有大约12,000个Lat-Long坐标的公交站点。预计20亿点的5-10%在公交车站。

问题:我只需要标记和提取公交车站点(12,000点)中的那些点数(20亿)。由于这是GPS数据,我不能做精确匹配的坐标,而是做一个基于容差的geofencing。

问题:使用当前的朴素方法,标记公交车站的过程需要很长时间。目前,我们正在挑选12,000个公交站点中的每一个站点,并以100米的容差查询20亿个点(通过将程度差异转换为距离)。

问题:是否有一个算法高效的过程来实现这个标记点?

+0

使用k-d树可以开始。 –

+0

我工作过类似的用例。我们使用'GeoHashes'的属性来定义单元格,并定义每个单元格的过程。这仍然是一个广泛的问题。也许你可以展示你目前的方法的代码来推动讨论? – maasg

+0

@LostInOverflow - 当然,通过它。 – OrangeRind

回答

0

是的,你可以使用类似SpatialSpark。它仅适用于Spark 1.6.1,但您可以使用BroadcastSpatialJoin创建效率极高的RTree

下面是使用SpatialSpark与PySpark我的一个例子,以检查是否不同多边形在彼此或相交的:

from ast import literal_eval as make_tuple 
print "Java Spark context version:", sc._jsc.version() 
spatialspark = sc._jvm.spatialspark 

rectangleA = Polygon([(0, 0), (0, 10), (10, 10), (10, 0)]) 
rectangleB = Polygon([(-4, -4), (-4, 4), (4, 4), (4, -4)]) 
rectangleC = Polygon([(7, 7), (7, 8), (8, 8), (8, 7)]) 
pointD = Point((-1, -1)) 

def geomABWithId(): 
    return sc.parallelize([ 
    (0L, rectangleA.wkt), 
    (1L, rectangleB.wkt) 
    ]) 

def geomCWithId(): 
    return sc.parallelize([ 
    (0L, rectangleC.wkt) 
    ]) 

def geomABCWithId(): 
    return sc.parallelize([ 
    (0L, rectangleA.wkt), 
    (1L, rectangleB.wkt), 
    (2L, rectangleC.wkt)]) 

def geomDWithId(): 
    return sc.parallelize([ 
    (0L, pointD.wkt) 
    ]) 

dfAB     = sqlContext.createDataFrame(geomABWithId(), ['id', 'wkt']) 
dfABC    = sqlContext.createDataFrame(geomABCWithId(), ['id', 'wkt']) 
dfC     = sqlContext.createDataFrame(geomCWithId(), ['id', 'wkt']) 
dfD     = sqlContext.createDataFrame(geomDWithId(), ['id', 'wkt']) 

# Supported Operators: Within, WithinD, Contains, Intersects, Overlaps, NearestD 
SpatialOperator  = spatialspark.operator.SpatialOperator 
BroadcastSpatialJoin = spatialspark.join.BroadcastSpatialJoin 

joinRDD = BroadcastSpatialJoin.apply(sc._jsc, dfABC._jdf, dfAB._jdf, SpatialOperator.Within(), 0.0) 

joinRDD.count() 

results = joinRDD.collect() 
map(lambda result: make_tuple(result.toString()), results) 

# [(0, 0), (1, 1), (2, 0)] read as: 
# ID 0 is within 0 
# ID 1 is within 1 
# ID 2 is within 0 

注线

joinRDD = BroadcastSpatialJoin.apply(sc._jsc, dfABC._jdf, dfAB._jdf, SpatialOperator.Within(), 0.0) 

的最后一个参数是一个缓冲器值,在你的情况下,这将是你想要使用的宽容。如果您使用纬度/经度,它可能会是一个非常小的数字,因为它是一个径向系统,并且取决于您想要的公差,您需要输入calculate based on lat/lon for your area of interest