2017-07-25 124 views
2

我想实现一个安装程序,我有多个Web浏览器打开一个websocket连接到我的akka​​-http服务器,以便读取发布到卡夫卡主题的所有消息。卡夫卡主题websocket

所以消息流应该走这条路

kafka topic -> akka-http -> websocket connection 1 
         -> websocket connection 2 
         -> websocket connection 3 

对于WebSocket的,现在我已经创建了一个路径:

val route: Route = 
path("ws") { 
    handleWebSocketMessages(notificationWs) 
} 

然后我创建了一个消费者对我的卡夫卡话题:

val consumerSettings = ConsumerSettings(system, 
    new ByteArrayDeserializer, new StringDeserializer) 
    .withBootstrapServers("localhost:9092") 
    .withGroupId("group1") 
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") 
val source = Consumer 
    .plainSource(consumerSettings, Subscriptions.topics("topic1")) 

然后最后我想把这个源连接到handleWebSocketMessages中的websocket

def handleWebSocketMessages: Flow[Message, Message, Any] = 
    Flow[Message].mapConcat { 
    case tm: TextMessage => 
     TextMessage(source)::Nil 
    case bm: BinaryMessage => 
     // ignore binary messages but drain content to avoid the stream being clogged 
     bm.dataStream.runWith(Sink.ignore) 
     Nil 
    } 

这里是我的错误,当我尝试在TextMessage中使用source

Error:(77, 9) overloaded method value apply with alternatives: (textStream: akka.stream.scaladsl.Source[String,Any])akka.http.scaladsl.model.ws.TextMessage (text: String)akka.http.scaladsl.model.ws.TextMessage.Strict cannot be applied to (akka.stream.scaladsl.Source[org.apache.kafka.clients.consumer.ConsumerRecord[Array[Byte],String],akka.kafka.scaladsl.Consumer.Control]) TextMessage(source)::Nil

我想我前进的道路上做出频频失误,但我要说的是,大部分阻塞部分是handleWebSocketMessages

回答

3

第一件事,就是要了解该源的类型:Source[ConsumerRecord[K, V], Control]。 因此,它不是您可以作为TextMessage参数传递的内容。

现在,让我们来看网页套接字的点:

  • 传出消息被内置在卡夫卡源中的每个消息。该消息将是来自Kafka消息的字符串转换的TextMessage。
  • 对于每个收到的消息,只是println()一样它

所以,Flow可以被看作是两个组件:Source &的Sink

val incomingMessages: Sink[Message, NotUsed] = 
    Sink.foreach(println(_)) 

val outgoingMessages: Source[Message, NotUsed] = 
    source 
    .map { consumerRecord => TextMessage(consumerRecord.record.value) } 

val handleWebSocketMessages: Flow[Message, Message, Any] 
    = Flow.fromSinkAndSource(incomingMessages, outgoingMessages) 

希望它有帮助。

+0

非常感谢!您的答案在我使用'Consumer.committableSource'而不是'Consumer.plainSource',和'consumerRecord.record.value()'而不是'consumerRecord.getkey.toString'之后起作用。 –

+0

太棒了!我更新了我的答案。 – n1r3