2016-02-24 127 views
1

我正在使用apache spark 1.5数据框与elasticsearch,我尝试从包含id列表(数组)的列中过滤id。在Spark DataFrame中按数组值过滤

例如elasticsearch列的映射如下所示:

{ 
     "people":{ 
      "properties":{ 
       "artist":{ 
        "properties":{ 
         "id":{ 
         "index":"not_analyzed", 
         "type":"string" 
         }, 
         "name":{ 
          "type":"string", 
          "index":"not_analyzed", 
         } 
        } 
       } 
      } 
    } 

的示例性数据格式将是像以下

{ 
    "people": { 
     "artist": { 
      [ 
        { 
         "id": "153", 
         "name": "Tom" 
        }, 
        { 
         "id": "15389", 
         "name": "Cok" 
        } 
      ] 
     } 
    } 
}, 
{ 
    "people": { 
     "artist": { 
      [ 
        { 
         "id": "369", 
         "name": "Carl" 
        }, 
        { 
         "id": "15389", 
         "name": "Cok" 
        }, 
       { 
         "id": "698", 
         "name": "Sol" 
        } 
      ] 
     } 
    } 
} 

在火花我尝试:

val peopleId = 152 
val dataFrame = sqlContext.read 
    .format("org.elasticsearch.spark.sql") 
    .load("index/type") 

dataFrame.filter(dataFrame("people.artist.id").contains(peopleId)) 
    .select("people_sequence.artist.id") 

我得到了所有包含152的id,例如1523,152978,但不仅仅是id == 152

然后我试图

dataFrame.filter(dataFrame("people.artist.id").equalTo(peopleId)) 
    .select("people.artist.id") 

我得到空的,我明白为什么,那是因为我有people.artist.id

的阵列

谁能告诉我如何筛选时,我有ID列表?

回答

4

火花1.5+可以使用array_contains功能:

df.where(array_contains($"people.artist.id", "153")) 

如果您使用的是早期版本,你可以尝试这样的UDF:

val containsId = udf(
    (rs: Seq[Row], v: String) => rs.map(_.getAs[String]("id")).exists(_ == v)) 
df.where(containsId($"people.artist", lit("153")))