2017-08-02 51 views
0

我正在尝试创建和部署Dataflow管道,以将从Cloud Pubsub主题收集的数据流式传输到Cloud Datastore。不过,我一直运行到错误如下: 找不到符号符号:变量DatastoreIO位置:类DataUpload从Cloud Pubsub到数据存储区的数据流管道

我不明白我在做什么错了,因为我也跟着下面的文件导入相关包 - DatastoreV1以及后面的示例代码的语法:https://cloud.google.com/dataflow/java-sdk/JavaDoc/com/google/cloud/dataflow/sdk/io/datastore/DatastoreV1

任何帮助将大大缓解我已经做了这些几天的头撞。在此先感谢!

import com.google.cloud.dataflow.sdk.Pipeline; 
import com.google.cloud.dataflow.sdk.io.PubsubIO; 
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; 
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; 
import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner; 
import com.google.cloud.dataflow.sdk.transforms.DoFn; 
import com.google.cloud.dataflow.sdk.transforms.ParDo; 
import com.google.cloud.dataflow.sdk.values.PCollection; 
import com.google.cloud.dataflow.sdk.io.datastore.DatastoreV1; 

public class DataUpload{ 

    public static void main(String[] args) { 
     DataflowPipelineOptions options = PipelineOptionsFactory.create() 
       .as(DataflowPipelineOptions.class); 

     options.setRunner(DataflowPipelineRunner.class); 
     options.setProject(projectName); 
     //options.setStagingLocation("gs://my-staging-bucket/staging"); 
     options.setStreaming(true); 

     Pipeline p = Pipeline.create(options); 
     p.apply(PubsubIO.Read.topic("projects/{project_name}/topics/data")) 
      .apply(DatastoreIO.v1().write().withProjectId(projectId)); 

     p.run(); 
    } 
} 

更新:我已经包括下面的线,得到了错误的一个新的一群。

import com.google.cloud.dataflow.sdk.io.datastore.DatastoreIO; 

DataUpload.java:[30,14]找到的应用(com.google.cloud.dataflow.sdk.io.datastore.DatastoreV1.Write) 方法com.google.cloud.dataflow没有合适的方法.sdk.values.PCollection.apply(com.google.cloud.dataflow.sdk.transforms.PTransform,OutputT>)不适用 (无法推断类型变量OutputT (参数不匹配; com.google.cloud .dataflow.sdk.io.datastore.DatastoreV1.Write无法转换为com.google.cloud.dataflow.sdk.transforms.PTransform,OutputT>)) com.google.cloud.dataflow.sdk.values.PCollection方法。 apply(java.lang.String,com.google.cloud.dataflow.sdk.transforms.PTransform,OutputT>)不适用 (无法推断类型变量OutputT (实际和正式参数列表长度不同))

回答

2

请添加以下import语句,我认为您错过了。

import com.google.cloud.dataflow.sdk.io.DatastoreIO 

你可以找到一些例子和文档使用此here

如果使用的是数据流2.0+,那么请看看这些java docs,有一些不同的包和方法名。

+0

感谢您的建议。最初我使用了这条线,但还有一些其他错误提示DatastoreV1丢失。所以我同时添加了两个 'import com.google.cloud.dataflow.sdk.io.DatastoreIO' 'import com.google.cloud.dataflow.sdk.io.DatastoreV1' 现在我得到了另一堆引用该行的错误该方法被调用,“没有找到合适的方法(com.google.cloud.dataflow.sdk.io.datastore.DatastoreV1.Write)”等 – jlyh

+1

我认为问题是您的PCollection数据类型不匹配。 PubSubIO返回PCollection https://cloud.google.com/dataflow/java-sdk/JavaDoc/com/google/cloud/dataflow/sdk/io/PubsubIO.Read DatstoreIO接受PCollection https://cloud.google.com/dataflow/java-sdk/JavaDoc/com/google/cloud/dataflow/sdk/io/DatastoreIO 您需要在中间引入一个ParDo,以便从PubSubIO的字符串值存储到数据存储实体对象类型。 https://cloud.google。com/dataflow/java-sdk/JavaDoc/com/google/cloud/dataflow/sdk/io/DatastoreIO –

+1

此外,请看这里的示例,您还需要指定数据集以写入数据存储库 https://cloud.google.com/dataflow/java-sdk/JavaDoc/com/google/cloud/dataflow/sdk/io/DatastoreIO –

相关问题