2016-12-16 86 views
3

我期待在Spark 1.6上使用DataFrames API构建Spark Streaming应用程序。在我陷入兔洞之前,我希望有人能够帮助我理解DataFrames如何处理具有不同模式的数据。在运行Spark Streaming应用程序时处理模式更改

这个想法是消息将通过Avro模式流入卡夫卡。我们应该能够以向后兼容的方式发展架构,而无需重新启动流应用程序(应用程序逻辑仍然可以工作)。

使用模式注册表反编译消息的新版本以及使用KafkaUtils创建直接流和AvroKafkaDecoder(来自Confluent)消息中嵌入的模式标识似乎并不重要。这让我尽可能拥有DStream。

问题1: 在该DStream中,将会有具有不同版本模式的对象。所以,当我将每一个转换成一个Row对象时,我应该传入一个最新的reader模式来正确地迁移数据,并且我需要将最新模式传递到sqlContext.createDataFrame(rowRdd,schema)调用中。 DStream中的对象是GenericData.Record类型,并且据我所知,没有简单的方法可以确定哪个是最新版本。我看到两种可能的解决方案,一种是调用模式注册表以获取每个微博上最新版本的模式。另一种是修改解码器以附加模式标识。然后我可以迭代rdd来查找最高的id并从本地缓存中获取模式。

我希望有人已经以可重用的方式很好地解决了这个问题。

问题#2: Spark将为每个分区提供一个与Kafka不同的执行程序。当一个执行者接收到与其他“最新”模式不同的应用程序时,我的应用程序会发生什么。由一个执行器创建的DataFrame在同一个时间窗口中将具有与另一个不同的模式。我实际上并不知道这是否是一个真正的问题。我在查看数据流时遇到问题,以及哪种操作会导致问题。如果这是一个问题,那就意味着执行者之间需要共享一些数据,这听起来既复杂又低效。

我需要担心这个吗?如果我这样做,如何解决模式差异?

感谢, --Ben

回答

2

我相信我已经解决了这一点。我正在使用Confluent的模式注册表和KafkaAvroDecoder。简化代码如下所示:

// Get the latest schema here. This schema will be used inside the 
// closure below to ensure that all executors are using the same 
// version for this time slice. 
val sr : CachedSchemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 1000) 
val m = sr.getLatestSchemaMetadata(subject) 
val schemaId = m.getId 
val schemaString = m.getSchema 

val outRdd = rdd.mapPartitions(partitions => { 
    // Note: we cannot use the schema registry from above because this code 
    // will execute on remote machines, requiring the schema registry to be 
    // serialized. We could use a pool of these. 
    val schemaRegistry : CachedSchemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 1000) 
    val decoder: KafkaAvroDecoder = new KafkaAvroDecoder(schemaRegistry) 
    val parser = new Schema.Parser() 
    val avroSchema = parser.parse(schemaString) 
    val avroRecordConverter = AvroSchemaConverter.createConverterToSQL(avroSchema) 

    partitions.map(input => { 
    // Decode the message using the latest version of the schema. 
    // This will apply Avro's standard schema evolution rules 
    // (for compatible schemas) to migrate the message to the 
    // latest version of the schema. 
    val record = decoder.fromBytes(messageBytes, avroSchema).asInstanceOf[GenericData.Record] 
    // Convert record into a DataFrame with columns according to the schema 
    avroRecordConverter(record).asInstanceOf[Row] 
    }) 
}) 

// Get a Spark StructType representation of the schema to apply 
// to the DataFrame. 
val sparkSchema = AvroSchemaConverter.toSqlType(
     new Schema.Parser().parse(schemaString) 
    ).dataType.asInstanceOf[StructType] 
sqlContext.createDataFrame(outRdd, sparkSchema) 
相关问题