- >上有流的数目没有限制,弗林克将根据作业管理器/任务管理器,所使用的并行化的存储器/ CPU和时隙数的规模。我使用YARN来管理资源。如果连接的数据流量很高,那么我们需要谨慎一点,因为某些任务管理器并不是全部/大部分处理都会发生,因为这会降低处理速度。由于某些任务管理器负荷过重,肯定会出现这种情况,并且需要对此进行预防性检查,因此可能会出现卡夫卡流本身滞后或内部滞后现象。
- >连续查询的支持已建成的最新版本弗林克部分,你可以检查弗林克文档了。
- >如果通过读取一个数据流到另一个你的意思是连接在弗林克术语两个流那么我们就可以将它们连接起来的一个公共密钥和保持价值状态。请注意,值状态在任务管理器中维护,不会在任务管理器之间共享。否则,如果您隐含两个或更多流的联合,那么我们就可以构建flatmap函数,以使来自这些流的数据具有标准格式。工会
实施例: VAL流1:的数据流中[UserBookingEvent] = BookingClosure.getSource(runmode).getSource(ENV) .MAP(新ClosureMapFunction)
VAL流2:的数据流中[UserBookingEvent] = BookingCancel.getSource (runmode).getSource(ENV) .MAP(新CancelMapFunction)
VAL unionStream:的数据流中[UserBookingEvent] = stream1.union(流2)
import org.apache.flink.api.common.functions.MapFunction
import org.json4s.jackson.JsonMethods.{parse => _, parseOpt => _}
import org.json4s.native.JsonMethods._
import org.slf4j.{Logger, LoggerFactory}
class CancelMapFunction extends MapFunction[String, Option[UserBookingEvent]] {
override def map(in: String): Option[UserBookingEvent] = {
val LOG: Logger = LoggerFactory.getLogger(classOf[CancelMapFunction])
try {
implicit lazy val formats = org.json4s.DefaultFormats
val json = parse(in)
..............
} catch {
case e: Exception => {
LOG.error("Could not parse Cancel Event= " + in + e.getMessage)
None
}
}