0

我想更改要追加的列映射。有没有更好的方法来自定义Java中的Spark Cassandra Connector的列映射?如何在Java中使用Spark Cassandra Connector自定义列映射?

ColumnName song_id = new ColumnName("song_id", Option.empty()); 
CollectionColumnName key_codes = new ColumnName("key_codes", Option.empty()).append(); 
List<ColumnRef> collectionColumnNames = Arrays.asList(song_id, key_codes); 
scala.collection.Seq<ColumnRef> columnRefSeq = JavaApiHelper.toScalaSeq(collectionColumnNames); 

javaFunctions(songStream) 
       .writerBuilder("demo", "song", mapToRow(PianoSong.class)) 
       .withColumnSelector(new SomeColumns(columnRefSeq)) 
       .saveToCassandra(); 

这是取自this Spark Streaming code样本。

回答

1

只是要使用 CollectionColumnName

其中有一个构造函数

case class CollectionColumnName(
    columnName: String, 
    alias: Option[String] = None, 
    collectionBehavior: CollectionBehavior = CollectionOverwrite) extends ColumnRef 

您可以重命名列裁判的设置alias,你可以改变collectionBehavior插入行为,采取以下类。

Api Link

/** Insert behaviors for Collections. */ 
sealed trait CollectionBehavior 
case object CollectionOverwrite extends CollectionBehavior 
case object CollectionAppend extends CollectionBehavior 
case object CollectionPrepend extends CollectionBehavior 
case object CollectionRemove extends CollectionBehavior 

这意味着你可以做

CollectionColumnName appendColumn = 
    new CollectionColumnName("ColumnName", Option.empty(), CollectionPrepend$.MODULE$); 

这看起来有点更多的Java-y和有点更加明确。你有这个代码的其他目标吗?

+0

That works - thanks @RussS –