2017-02-28 42 views
0

我有一个cassandraSQLContext,我这样做:星火查询追加密钥空间火花临时表

cassandraSqlContext.setKeyspace("test"); 

因为如果我不这样做,它抱怨我设置默认密钥空间。

现在,当我运行这段代码:

 def insertIntoCassandra(siteMetaData: MetaData, dataFrame: DataFrame): Unit ={ 
     System.out.println(dataFrame.show()) 
     val tableName = siteMetaData.getTableName.toLowerCase() 
    dataFrame.registerTempTable("spark_"+ tableName) 
    System.out.println("Registered the spark table to spark_" + tableName) 

    val columns = columnMap.get(siteMetaData.getTableName) 
     val query = cassandraQueryBuilder.buildInsertQuery("test", tableName, columns) 
     System.out.println("Query: " + query); 
    cassandraSqlContext.sql(query) 
     System.out.println("Query executed") 
    } 

它给了我这个错误日志:

Registered the spark table to spark_test 
Query: INSERT INTO TABLE test.tablename SELECT **the columns here** FROM spark_tablename 
17/02/28 04:15:53 ERROR JobScheduler: Error running job streaming job 1488255351000 ms.0 
java.util.concurrent.ExecutionException: java.io.IOException: Couldn't find test.tablename or any similarly named keyspace and table pairs 

我不明白的是为什么心不是cassandraSQLContext执行打印出来的查询,为什么它会将密钥空间追加到可触发的火花中。

public String buildInsertQuery(String activeReplicaKeySpace, String tableName, String columns){ 
    String sql = "INSERT INTO TABLE " + activeReplicaKeySpace + "." + tableName + 
     " SELECT " + columns + " FROM spark_" + tableName; 
    return sql; 
    } 

回答

0

所以,问题是我使用的方法之一,我被实例化这是与cassandraSQLContext传递给insertIntoCassandra法冲突的新cassandraSQLContext cassandraSQLContext.In的两个不同的实例。