apache-flink

    3热度

    1回答

    在Apache Flink中使用自定义分区程序时,我想将数据集的某些元素分配给多个分区。目前我尝试复制这些元素并将每个元素分配给一个集群。我想知道有什么方法可以这样做吗?如果不是什么是复制数据集子集的有效方式?

    0热度

    1回答

    目前,我正在使用144个TaskSlots在4台机器的远程集群上运行Flink程序。 30分左右运行后,我收到以下错误: INFO org.apache.flink.runtime.jobmanager.web.JobManagerInfoServlet - Info server for jobmanager: Failed to write json updates for job b2eaf

    2热度

    1回答

    我已经构建了一个自定义源来处理Flink中的日志流。 该程序运行良好,并在处理记录后给我所需的结果。 但是,当我检查Web UI时,我没有看到计数。以下是屏幕截图: Records/Bytes Count

    1热度

    1回答

    我想使用sbt交叉编译功能编译Scala 2.10和2.11的项目。问题是这个项目包含一些Flink依赖项。但Flink jar不遵循标准的二进制格式(后缀_2.10或_2.11):scala 2.10 jar没有后缀,并且2.11拥有它。 我找到了一个工作解决方案。但我对此并不满意,我的问题是:有没有更简单和/或更优雅的方法来解决这个问题? 我目前的解决方案: def flinkDependen

    1热度

    1回答

    我在eclipse中运行flink,必要的jar已被Maven提取。我的机器有一个带有八个内核的处理器,我必须写的流应用程序从它的输入中读取行并计算一些统计数据。 当我在我的机器上运行该程序时,我期望flink使用CPU的所有内核以及线程代码。但是,当我观察内核时,我发现只有一个内核正在使用。我尝试了很多东西,并在下面的代码中留下了我的最后一次尝试,即设置环境的并行性。我也尝试将它设置为单独的流等

    2热度

    1回答

    我有一个程序在本地集群中正常工作,但在远程集群上执行时没有正常运行。我想知道,调试在远程Flink集群上运行的程序的最佳和常见方式是什么? 任何帮助表示赞赏!

    1热度

    1回答

    我使用java.util.Map作为位置或表​​达式键不支持的数据类型,所以如果组字段数大于25,我该如何将基于java.util.Map的数据集分组? 示例代码示出如下: Map<String,Object> input1 = new HashMap<>(); for (int i=0; i<30; i++){ input.put("groupField" + i,"value"+i

    1热度

    1回答

    我正在使用Flink 0.10.1的DataSet API编写应用程序。 我可以使用Flink中的单个操作员获得多个收集器吗? 我想要做的是什么样的东西如下: val lines = env.readTextFile(...) val (out_small, out_large) = lines **someOp** { (iterator, collector1, collector

    1热度

    2回答

    我使用dop> 1来执行我的程序,但我不想要多个输出文件。 Java myDataSet.writeAsText(outputFilePath, WriteMode.OVERWRITE).setParallelism(1);按预期工作。 但是,当我在Python中尝试相同它不起作用。这是我的代码:myDataSet.write_text(output_file, write_mode=WriteM

    0热度

    1回答

    了解Flink体系结构(物理和运行时间)的组织结构,了解其内部工作原理(分布,并行性等)的最佳方法是什么?代码? 在当前的技术水平下,平流层(Nephele,PACT等)的论文需要考虑多少可靠性? 谢谢!