1

我正在尝试使用Spark结构化流从卡夫卡主题读取XML数据。如何从Kafka读取XML格式的流数据?

我试过使用Databricks spark-xml包,但是我收到一个错误消息,说这个包不支持流式阅读。有什么方法可以使用结构化流从Kafka主题中提取XML数据?

我当前的代码:

df = spark \ 
     .readStream \ 
     .format("kafka") \ 
     .format('com.databricks.spark.xml') \ 
     .options(rowTag="MainElement")\ 
     .option("kafka.bootstrap.servers", "localhost:9092") \ 
     .option(subscribeType, "test") \ 
     .load() 

错误:

py4j.protocol.Py4JJavaError: An error occurred while calling o33.load. 
: java.lang.UnsupportedOperationException: Data source com.databricks.spark.xml does not support streamed reading 
     at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:234) 

回答

3
.format("kafka") \ 
.format('com.databricks.spark.xml') \ 

最后一个与com.databricks.spark.xml胜变为(隐藏卡夫卡作为源)的数据流源。

换句话说,以上相当于.format('com.databricks.spark.xml')单独。

正如您可能已经体验的那样,Databricks spark-xml包不支持流式读取(即不能用作流式源)。该软件包不适用于流媒体。

Is there any way I can extract XML data from Kafka topic using structured streaming?

您只需使用标准函数或UDF自行访问和处理XML。在结构化流式传输到Spark 2.2.0中没有内置的支持流式处理XML。

无论如何这应该不是什么大问题。一个Scala代码可能如下所示。

val input = spark. 
    readStream. 
    format("kafka"). 
    ... 
    load 

val values = input.select('value cast "string") 

val extractValuesFromXML = udf { (xml: String) => ??? } 
val numbersFromXML = values.withColumn("number", extractValuesFromXML('value)) 

// print XMLs and numbers to the stdout 
val q = numbersFromXML. 
    writeStream. 
    format("console"). 
    start 

另一种可能的解决办法是写自己的自定义流Source将应对def getBatch(start: Option[Offset], end: Offset): DataFrame的XML格式。那应该工作。

+1

谢谢,亚采。我写了UDF来解析XML数据。它正在工作。我将很快发布该UDF。 –

1

不能混合格式这种方式。卡夫卡源加载为包括值的数目,如keyvaluetopic,与value列存储payload as a binary type

Note that the following Kafka params cannot be set and the Kafka source or sink will throw an exception:

...

value.deserializer: Values are always deserialized as byte arrays with ByteArrayDeserializer. Use DataFrame operations to explicitly deserialize the values.

解析该内容是用户的责任,并且不能被委派给其他数据源。例如,请参阅我对How to read records in JSON format from Kafka using Structured Streaming?的回答。

对于XML,您可能需要一个UDF(UserDefinedFunction),但您可以先尝试Hive XPath functions。您还应该解码二进制数据。

2
import xml.etree.ElementTree as ET 
df = spark \ 
     .readStream \ 
     .format("kafka") \ 
     .option("kafka.bootstrap.servers", "localhost:9092") \ 
     .option(subscribeType, "test") \ 
     .load() 

然后我写了一个Python UDF

def parse(s): 
    xml = ET.fromstring(s) 
    ns = {'real_person': 'http://people.example.com', 
     'role': 'http://characters.example.com'} 
    actor_el = xml.find("DNmS:actor",ns) 

    if(actor_el): 
     actor = actor_el.text 
    role_el.find('real_person:role', ns) 
    if(role_el): 
     role = role_el.text 
    return actor+"|"+role 

注册此UDF

extractValuesFromXML = udf(parse) 

    XML_DF= df .withColumn("mergedCol",extractroot("value")) 

    AllCol_DF= xml_DF.withColumn("actorName", split(col("mergedCol"), "\\|").getItem(0))\ 
     .withColumn("Role", split(col("mergedCol"), "\\|").getItem(1)) 
相关问题