2017-06-30 22 views
1

我试着去实现Redis的特定于集合类型下沉= MAP弹簧XD Redis的水槽与收藏型MAP

stream create tTEST_GF_SINK --definition "trigger --initialDelay=0 --fixedDelay=1 --timeUnit=MINUTES --payload= 'new Date().toString()' --outputType=application/json | 
    transform --expression='new java.util.Date().toString()'| 
    redis --collectionType=MAP --key=1" --deploy 

对于所有其他集合类型,如LIST,SET,ZSET流能够写入这些redis下沉,但是当我使用MAP作为集合类型时,它会在redis_mapkey上抛出以下错误。

2017-06-30T16:47:59-0400 1.3.1.RELEASE ERROR任务调度-3- handler.LoggingHandler - org.springframework.messaging.MessageHandlingException:无法在Redis的集合存储消息数据; 嵌套的异常是org.springframework.expression.spel.SpelEvaluationException:EL1008E :(pos 8):在'org.springframework.messaging.MessageHeaders'类型的对象上找不到属性或字段'redis_mapKey' - 可能不公开? 在org.springframework.integration.redis.outbound.RedisStoreWritingMessageHandler.handleMessageInternal(RedisStoreWritingMessageHandler.java:278) 在org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) 在org.springframework.integration .dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) 在org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:147) 在org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java :120) at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) at org.springframework.integration.ch annel.AbstractMessageChannel.send(AbstractMessageChannel.java:442) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:392) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java: 115) 在org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) 在org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) 在org.springframework.integration。 handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:231) at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:154) at org.springfr amework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:102) 在org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:105) 在org.springframework.integration.handler.AbstractMessageHandler.handleMessage( AbstractMessageHandler.java:127) 在org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) 在org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:147) 的组织。 springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:120) at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscriba (org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:442) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:392) at org。在org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) at org.springframework.messaging.core.AbstractMessageSendingTemplate.send( AbstractMessageSendingTemplate。的java:105) 在org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:231) 在org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:154) 在org.springframework。 integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:102) 在org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:105) 在org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler。 java:127) at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:147) at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:120) at org.springframework.integration.channel。在org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:392)处, at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) at org.springfr amework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) 在org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:231) 在org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput( AbstractMessageProducingHandler.java:154) 在org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:102) 在org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:105) 的组织。 springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) at org.springframework.integration.dispatcher.Abstra ctDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) 在org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:147) 在org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:120) 在org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) 在org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:442) 在org.springframework.integration.channel。 AbstractMessageChannel.send(AbstractMessageChannel.java:392) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) at org.springframework.messaging.core.GenericMessagingTemplate .doSend(GenericMessagingTemplate.java:45) 在org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) 在org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:231) 在org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:154) 在org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:102) 在org.springframework.integration.handler.AbstractReplyProducingMessageHandler .handleMessageInternal(AbstractReplyProducingMessageHandler.java:105) at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler。的java:127)在 org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) 在org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:147) 在org.springframework。 integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:120) 在org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) 在org.springframework.integration.channel.AbstractMessageChannel.send(摘要消息信道。的java:442)在 org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:392) 在org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) 在org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) 在org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) 在org.springframework.integration.handler。 AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:231) 在org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:154) 在org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:102) 在org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:105) 在org.spring framework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) 在org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) 在org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch( UnicastingDispatcher.java:147) 在org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:120) 在org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) 的组织。 springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:442) 在org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:392) 在org.springfr amework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) 在org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) 在org.springframework.messaging.core.AbstractMessageSendingTemplate.send( AbstractMessageSendingTemplate.java:105) 在org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:161) 在org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:251) 的组织。 springframework.integration.endpoint.AbstractPollingEndpoint.access $ 000(AbstractPollingEndpoint.java:57) 在org.springframework.integration.endpoint.AbstractPollingEndpoint $ 1.call(AbstractPollingEndpoint.java:176) 在组织.springframework.integration.endpoint.AbstractPollingEndpoint $ 1.call(AbstractPollingEndpoint.java:173) 在org.springframework.integration.endpoint.AbstractPollingEndpoint $波兰德$ 1.run(AbstractPollingEndpoint.java:330) 在org.springframework.integration.util .ErrorHandlingTaskExecutor $ 1.run(ErrorHandlingTaskExecutor.java:55) 在org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) 在org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java: 51) 在org.springframework.integration.endpoint.AbstractPollingEndpoint $ Poller.run(AbstractPollingEndpoint.java:324) 在org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) 的组织。SpringFramework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81) at java.util.concurrent.Executors $ RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask。的java:266) 在java.util.concurrent.ScheduledThreadPoolExecutor中$ $ ScheduledFutureTask.access 201(ScheduledThreadPoolExecutor.java:180) 在java.util.concurrent.ScheduledThreadPoolExecutor中$ ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) 在java.util中.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 引起:org.springframework.expression.spel.SpelEvaluationExcept离子:EL1008E :(pos 8):在'org.springframework.messaging.MessageHeaders'类型的对象上找不到属性或字段'redis_mapKey' - 可能不公开? 在org.springframework.expression.spel.ast.PropertyOrFieldReference.readProperty(PropertyOrFieldReference.java:224) 在org.springframework.expression.spel.ast.PropertyOrFieldReference.getValueInternal(PropertyOrFieldReference.java:94) 在org.springframework。 expression.pel.ast.PropertyOrFieldReference.access $ 000(PropertyOrFieldReference.java:46) at org.springframework.expression.spel.ast.PropertyOrFieldReference $ AccessorLValue.getValue(PropertyOrFieldReference.java:374) at org.springframework.expression.spel .ast.CompoundExpression.getValueInternal(CompoundExpression.java:88) at org.springframework.expression.spel.ast.SpelNodeImpl.getValue(SpelNodeImpl.java:120) at org.springframework.expression.spel.standard.SpelExpression.getValue (SpelExpression.java:267) 在org.springframework.integration.redis.outbound.RedisStoreWritingMessageHandler.determineMapKey(RedisStoreWritingMessageHandler.java:415) 在org.springframework.integration.redis.outbound.RedisStoreWritingMessageHandler.writeToMap(RedisStoreWritingMessageHandler.java:379) 在org.springframework。 integration.redis.outbound.RedisStoreWritingMessageHandler.handleMessageInternal(RedisStoreWritingMessageHandler.java:271) ...... 99多个

任何一个可以帮助我

回答

0

与地图收藏,也有涉及到两个键;地图存储的关键字以及元素存储在地图中的关键字。

XD模块不会将mapKey作为属性公开。您可以添加一个上游头来设置redis_mapKey头,也可以编辑模块的配置以添加map-key-expression

例如,编辑xd/modules/sink/redis/config/redis.xml是这样的...

<beans:beans profile="use-store-expression"> 
    <redis:store-outbound-channel-adapter 
     map-key-expression="payload.substring(17)" 
     key-expression="${keyExpression}" collection-type="${collectionType}" 
     channel="input" connection-factory="redisConnectionFactory" /> 
</beans:beans> 

与...

xd:>stream create foo --definition "time | redis --collectionType=MAP --key='foo'" --deploy 

结果...

127.0.0.1:6379> hgetall "foo" 
1) "52" 
2) "2017-07-01 08:43:52" 
3) "53" 
4) "2017-07-01 08:43:53" 
5) "54" 
6) "2017-07-01 08:43:54" 
7) "55" 
8) "2017-07-01 08:43:55" 
... 
0

@Gary。我试着为header-richher添加redis_mapkey的建议。它的工作正常。

流创建tTEST_GF_SINK --definition“触发--initialDelay = 0 --fixedDelay = 1 --timeUnit = MINUTES --payload = '新日期()。的toString()' --outputType =应用/ JSON | header-enricher --headers = {\“redis_mapKey \”:\“payload.substring(17)\”} | transform --expression ='new java.util.Date()。的toString()“| redis的--collectionType = MAP --key = 1" --deploy

127.0.0.1:6379> hgetall 1

1) “NG()”

2)“星期六7月1 23:29:16 EDT 2017“