flink-streaming

    2热度

    1回答

    目前,我的工作中,我有一个需要预处理之前,它是“流处理”一个CSV文件的项目。因此,我需要执行批处理以及流处理。具体而言,我的data.csv文件需要预先处理并在特定字段上排序,该字段将用作流处理的EventTime时间戳。下面的批处理脚本产生预处理输出: final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvir

    1热度

    1回答

    我是新来弗林克 创建文件(CSV或文本)我有改造想这样 val supportTask= customSource .map(line => line.split(",")) .map(line => SupportTaskNew(line(0)toInt,line(1).toString,line(2)toString,line(3)toLong,line(4).toStr

    0热度

    4回答

    我一直在试图找到一个连接器将数据从Redis读取到Flink。 Flink的文档包含写入Redis的连接器的说明。我需要从我的Flink作业中读取来自Redis的数据。在Using Apache Flink for data streaming中,Fabian提到可以从Redis读取数据。什么是可用于此目的的连接器?

    1热度

    1回答

    我开始在Zeppelin上使用Flink并尝试运行流中最简单的程序:wordcount。 当我使用终端在本地模式下运行此代码时,它工作。 这是我如何做到这一点:https://ci.apache.org/projects/flink/flink-docs-release-1.2/quickstart/setup_quickstart.html 这是代码: object SocketWindowWo

    0热度

    1回答

    我试图利用并行性来加速Top-10窗口操作。我的应用程序由具有时间戳和密钥的事件组成(即,Tuple2<Long,String>),我的目标是生成30分钟翻滚窗口(使用事件时间)的十大最频繁键。为此,我的查询由一个入口,一个窗口和一个聚合阶段组成。换句话说,我的代码将需要做类似如下:上述 DataStream<Tuple3<Long, String, Integer>> s = env

    2热度

    1回答

    我正在为我的Flink系统编写测试。我想通过拓扑抽取数据,查询状态,然后重置我的测试。是否有可查询状态的管理功能,如重置/删除当前状态的能力,以便我可以在测试运行之间清除状态?我没有在文档中找到任何内容,但我希望我错过了一些东西。谢谢。

    0热度

    2回答

    我正在尝试运行Flink流式作业。我想确定流式传输过程的吞吐量和延迟。我已经开始了卡夫卡经纪人服务器,并从卡夫卡传入消息。我如何计算每秒消息(吞吐量)? (像rdd.count。有没有类似的方法来获取传入消息的计数) (完整的scenerio:我已经通过生产者发送消息作为Json对象,我添加了一些信息,如名称作为字符串和在Json对象中也是System.currentTimeMills 在流式传输

    0热度

    1回答

    I am using concurrend append method from the class Core in Azure to store data to Azure Data lake.Below is the code and the exception which I got.I am getting this exception rarely not always.Could an

    1热度

    1回答

    我在Flink编程中遇到了与How to count unique words in a stream?相同的问题。但是,答案使用Scala API来解决问题。 Flink Java API支持filterWithState转换吗? Java API可以解决这个问题吗?

    0热度

    1回答

    我想知道我们可以使用flink Table和SQL api中的两个表(连接)来编写查询。 我是flik新手,我想从两个不同的数据集创建两个表并查询它们并生成其他数据集。 我的查询会像select... from table1, table 2...那么我们可以这样写这样的查询哪些查询两个表或更多? 感谢