我可以用DirectRunner
运行此代码,它工作正常。随着DataflowRunner
崩溃有:WriteToText在DirectRunner中可用,但在DataflowRunner中TypeError失败
TypeError: process() takes exactly 4 arguments (3 given) [while running 'write_text/Write/WriteImpl/WriteBundles']`
我的Apache束SDK进行了克隆,从主建的指示中的说明。它的构建为apache-beam-sdk==0.6.0.dev0
。我怀疑版本,但因为(我认为)我看到代码更改没有最近版本(NewDoFn
消失,但版本没有改变)。
我不确定它是否是问题的根源,但似乎在安装的sdk和数据流容器中存在不匹配。我得到了另一个不匹配类型的错误,其中DirectRunner
直接通过element
到我的DoFn.process()
,而DataflowRunner
通过context
。
我想这个隔离在尽可能的简单代码:
import uuid
import apache_beam.utils.pipeline_options
import apache_beam as beam
runner = 'DataflowRunner'
# runner = 'DirectRunner'
options = beam.utils.pipeline_options.PipelineOptions()
gcloud_options = options.view_as(beam.utils.pipeline_options.GoogleCloudOptions)
gcloud_options.job_name = 'a' + str(uuid.uuid4())
gcloud_options.project = 'your-project'
gcloud_options.staging_location = 'gs://your-bucket/beam/staging'
gcloud_options.temp_location = 'gs://your-bucket/beam/temp'
options.view_as(beam.utils.pipeline_options.StandardOptions).runner = runner
p = beam.Pipeline(options=options)
(p
| 'some_strings' >> beam.Create(tuple('asdfqwert'))
| 'write_text' >> beam.io.WriteToText('strings', file_name_suffix='.txt')
)
p.run().wait_until_finish()
全输出:
No handlers could be found for logger "oauth2client.contrib.multistore_file"
/Users/john/miniconda3/envs/py2/lib/python2.7/site-packages/apache_beam/coders/typecoders.py:136: UserWarning: Using fallback coder for typehint: Any.
warnings.warn('Using fallback coder for typehint: %r.' % typehint)
DEPRECATION: pip install --download has been deprecated and will be removed in the future. Pip now has a download command that should be used instead.
Collecting google-cloud-dataflow==0.5.1
Using cached google-cloud-dataflow-0.5.1.tar.gz
Saved /var/folders/v3/61xx4nnn6p36n5m9fp4qdwtr0000gn/T/tmpuCWoeh/google-cloud-dataflow-0.5.1.tar.gz
Successfully downloaded google-cloud-dataflow
Traceback (most recent call last):
File "reproduce_bug.py", line 28, in <module>
p.run().wait_until_finish()
File "/Users/john/miniconda3/envs/py2/lib/python2.7/site-packages/apache_beam/runners/dataflow_runner.py", line 706, in wait_until_finish
(self.state, getattr(self._runner, 'last_error_msg', None)), self)
apache_beam.runners.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
(70278eb56b40fd94): Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 514, in do_work
work_executor.execute()
File "dataflow_worker/executor.py", line 899, in dataflow_worker.executor.MapTaskExecutor.execute (dataflow_worker/executor.c:26452)
op.start()
File "dataflow_worker/executor.py", line 191, in dataflow_worker.executor.ReadOperation.start (dataflow_worker/executor.c:7575)
def start(self):
File "dataflow_worker/executor.py", line 196, in dataflow_worker.executor.ReadOperation.start (dataflow_worker/executor.c:7480)
with self.spec.source.reader() as reader:
File "dataflow_worker/executor.py", line 206, in dataflow_worker.executor.ReadOperation.start (dataflow_worker/executor.c:7425)
self.output(windowed_value)
File "dataflow_worker/executor.py", line 136, in dataflow_worker.executor.Operation.output (dataflow_worker/executor.c:5749)
cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
File "dataflow_worker/executor.py", line 83, in dataflow_worker.executor.ConsumerSet.receive (dataflow_worker/executor.c:3884)
cython.cast(Operation, consumer).process(windowed_value)
File "dataflow_worker/executor.py", line 505, in dataflow_worker.executor.DoOperation.process (dataflow_worker/executor.c:15525)
self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 163, in apache_beam.runners.common.DoFnRunner.receive (apache_beam/runners/common.c:4862)
self.process(windowed_value)
File "apache_beam/runners/common.py", line 270, in apache_beam.runners.common.DoFnRunner.process (apache_beam/runners/common.c:7749)
self.reraise_augmented(exn)
File "apache_beam/runners/common.py", line 281, in apache_beam.runners.common.DoFnRunner.reraise_augmented (apache_beam/runners/common.c:8108)
raise type(exn), args, sys.exc_info()[2]
File "apache_beam/runners/common.py", line 268, in apache_beam.runners.common.DoFnRunner.process (apache_beam/runners/common.c:7660)
self.old_dofn_process(element)
File "apache_beam/runners/common.py", line 173, in apache_beam.runners.common.DoFnRunner.old_dofn_process (apache_beam/runners/common.c:5182)
self._process_outputs(element, self.dofn_process(self.context))
File "apache_beam/runners/common.py", line 152, in apache_beam.runners.common.DoFnRunner.__init__.lambda3 (apache_beam/runners/common.c:3640)
self.dofn_process = lambda context: fn.process(context, *args)
TypeError: process() takes exactly 4 arguments (3 given) [while running 'write_text/Write/WriteImpl/WriteBundles']
这是有道理的。我想知道如果我在设置上出错了,以获得差异。我今天会试一试。 – KobeJohn
你应该很好''virtualenv venv;源venv/bin/activate; pip安装apache_beam' – Pablo
我没有在pypi上看到名为'apache_beam'或类似的软件包。我错过了吗? 'pip安装google-cloud-dataflow'会导致'google-cloud-dataflow-0.5.5',根据您的描述,这表明它使用的是beam 0.5.5,它不会与默认管道版本匹配。关于apache beam repository本身的安装说明说要克隆会导致HEAD的repo。我是否错过了某些东西,或者文档+ repo + pypi与版本控制不同步?附: conda 4生活! – KobeJohn