2011-02-11 25 views
0

我正在做一个POC来从(Java)服务器推送数据,但使用RTMP的LCDS 3.1的DataService。Java通过Adobe RTMP推送DataMessage LCDS DataService

配置正常。 Adobe Air客户端DataMessage到服务器(+汇编程序保存在数据库中):好吧

我发现了很多AsyncMessage的例子,但是由于这是通过DataService服务的RTMP目标,我必须发送一个DataMessage。

显然,有一些错误(或者我错过了东西/好的API文档!)。

所以,请你能帮助我吗?

这是推送的代码。关键方法是doPush()

package mypackage.lcds.service.ds.impl; 

import java.util.HashMap; 
import java.util.HashSet; 
import java.util.Map; 
import java.util.Set; 

import org.apache.commons.collections.CollectionUtils; 
import org.apache.log4j.Logger; 
import org.springframework.stereotype.Service; 

import mypackage.lcds.service.ds.DataPushService; 
import mypackage.model.dto.AbstractDto; 
import mypackage.model.exception.DsPushException; 
import flex.data.messages.DataMessage; 
import flex.messaging.MessageBroker; 
import flex.messaging.messages.Message; 
import flex.messaging.services.MessageService; 
import flex.messaging.util.UUIDUtils; 

/** 
* Implementation of {@link DataPushService}. 
*/ 
// see http://forums.adobe.com/thread/580667 
// MessageCLient : 
// http://livedocs.adobe.com/livecycle/8.2/programLC/programmer/lcds/help .html?content=lcconnections_2.html 
@Service 
public final class DataPushServiceImpl implements DataPushService { 
    private static final Logger LOG = Logger.getLogger(DataPushServiceImpl.class); 

    /** 
    * Destination name for Data-service.<br> 
    * See data-management-config.XML. 
    */ 
    private static final String DESTINATION_NAME__POC_DS_XCHANGE = "poc-ds-xchange"; 

    /** 
    * See data-management-config.XML. 
    */ 
    private static final String PUSH_DTO_SERVICE__NAME = "data-service"; 

    /** 
    * set "manually" by Spring (contexts workaround; not autowired). 
    */ 
    private MessageBroker messageBroker = null; 

    /** 
    * Does the push of a single DTO.<br> 
    * Only subscriberId's that are {@link Long} values will be used. Other Id's do not get a Message sent. 
    * 
    * @param dto 
    *   {@link AbstractDto} object. 
    * @param subscriberIds 
    *   {@link Set} of LCDS Message subscriber IDs {@link Long}. If null, sends to all connected clients. 
    * 
    * @throws DsPushException 
    *    if any error 
    */ 
    @SuppressWarnings("unchecked") 
    private void doPush(final AbstractDto dto, final Set<Long> subscriberIds) 
      throws DsPushException { 

     Set<?> ids = new HashSet<Object>(); 

     // obtain message service by means of message broker 
     MessageService messageService = this.getMessageService(); 

     DataMessage message = this.createMessage(dto, messageService); 

     // fill ids 
     if ((subscriberIds == null) || (subscriberIds.isEmpty())) { 
      if (LOG.isDebugEnabled()) { 
       LOG.debug("Sending message all currently connected subscriberIds "); 
      } 

      Set idsFromDS = messageService.getSubscriberIds(message, true); 
      if ((idsFromDS != null) && (!idsFromDS.isEmpty())) { 
       CollectionUtils.addAll(ids, idsFromDS.iterator()); 
      } 
     } else { 
      CollectionUtils.addAll(ids, subscriberIds.iterator()); 
     } 

     if (ids.isEmpty()) { 
      if (LOG.isDebugEnabled()) { 
       LOG.debug("No subscriberId to send the Message to."); 
       LOG.debug("Known subscribers : " + messageService.getSubscriberIds(message, true).toString()); 
      } 
     } else { 

      if (LOG.isDebugEnabled()) { 
       LOG.debug("Sending message to subscriberIds : " + subscriberIds.toString()); 
       LOG.debug("Known subscribers : " + messageService.getSubscriberIds(message, true).toString()); 
      } 

      // send messages to all subscriberIds 1 by 1 
      Object responsePayload = null; 
      boolean isSent = false; 
      for (Object id : ids) { 

       if (id instanceof Long) { 
        try { 
         message.setHeader(Message.DESTINATION_CLIENT_ID_HEADER, id); 
         if (LOG.isDebugEnabled()) { 
          LOG.debug("Sending LCDS DataMessage to subscriber [" + id + "] \n" + message.toString(2)); 
         } 
         responsePayload = messageService.serviceMessage(message, true); 

         // no exception ==> means OK? 
         // TODO TEST retuned payload 
         isSent = true; 

        } catch (Exception e) { 
         LOG.error("Error while sending message to subscriberId " + id, e); 
         isSent = false; 
        } finally { 
         if (LOG.isDebugEnabled()) { 
          LOG.debug("Message sent to '" + String.valueOf(id) + "' : " + String.valueOf(isSent)); 
         } 
        } 
       } else if (LOG.isDebugEnabled()) { 
        LOG.debug("Avoiding subscriber ID (not a Long value) : " + String.valueOf(id)); 
       } 
      } 
     } 
    } 

    /** 
    * {@inheritDoc} 
    * 
    * @see DataPushService#pushToAllClients(AbstractDto) 
    */ 
    // TODO test : if client is not connected, does LCDS record it for later (offline mode on the server?) 
    public void pushToAllClients(final AbstractDto dto) throws DsPushException { 
     this.doPush(dto, null); 
    } 

    public void pushTo1Client(AbstractDto dto, Long subscriberId) throws DsPushException { 
     Set<Long> subscriberIds = new HashSet<Long>(); 
     subscriberIds.add(subscriberId); 

     this.doPush(dto, subscriberIds); 
    } 

    /** 
    * {@inheritDoc}<br> 
    * subscriberIds refer to the 'clientId' set by the client app when it subscribes to the DS destination. 
    * 
    * @see DataPushService#pushToClients(AbstractDto, Set) 
    */ 
    public void pushToClients(final AbstractDto dto, final Set<Long> subscriberIds) throws DsPushException { 
     this.doPush(dto, subscriberIds); 
    } 

    @SuppressWarnings("unchecked") 
    private DataMessage createMessage(final AbstractDto dto, final MessageService messageService) { 
     DataMessage msg = new DataMessage(); 
     msg.setClientId(getServerId()); 
     msg.setTimestamp(System.currentTimeMillis()); 
     msg.setMessageId(UUIDUtils.createUUID(true)); 
     msg.setCorrelationId(msg.getMessageId()); // TODO OK messageId == CorrelationId ? 
     msg.setDestination(DESTINATION_NAME__POC_DS_XCHANGE); 
     msg.setBody(dto); 
     msg.setOperation(DataMessage.CREATE_AND_SEQUENCE_OPERATION); // TODO OK operation? 

     Map identity = new HashMap(2); 
     // see data-management-config.xml 
     identity.put("id", dto.getId()); 
     msg.setIdentity(identity); 

     // FIXME set priority. How? 
     if (LOG.isDebugEnabled()) { 
      LOG.debug("LCDS DataMessage created : \n" + msg.toString(2)); 
     } 
     return msg; 
    } 

    private Object getServerId() { 
     // FIXME OK? 
     return "X-BACKEND"; 
    } 

    /** 
    * Get the current {@link MessageBroker}'s service layer. 
    * 
    * @return {@link MessageService} to use for push data 
    */ 
    private MessageService getMessageService() { 
     if (LOG.isDebugEnabled()) { 
      LOG.debug("Getting MessageBroker's DataService service "); 
     } 

     // Was : return (MessageService) MessageBroker.getMessageBroker(null).getService(PUSH_DTO_SERVICE__NAM E); 
     return (MessageService) this.messageBroker.getService(PUSH_DTO_SERVICE__NAME); 
    } 

    /** 
    * Set the messageBroker. For SPring. 
    * 
    * @param messageBroker 
    *   the messageBroker to set 
    */ 
    public void setMessageBroker(final MessageBroker messageBroker) { 
     this.messageBroker = messageBroker; 
    } 
} 

注:messagebroker通过Spring设置一次。它适用于此POC。

我有一个Servlet将DTO保存到数据库,然后尝试通过服务推送它。所有似乎都行,但我得到一个NullPointerException(NPE)。

这里是Tomcat的6 LOG(其发送到subscriberID '99'):

LCDS DataMessage created : 
Flex Message (flex.data.messages.DataMessage) 
     operation = create_and_sequence 
     id = {id=3203} 
     clientId = X-BACKEND 
     correlationId = 7E6C3051-FA0F-9183-4745-B90ACACD71EA 
     destination = poc-ds-xchange 
     messageId = 7E6C3051-FA0F-9183-4745-B90ACACD71EA 
     timestamp = 1297412881050 
     timeToLive = 0 
     body = mypackage.model.dto.XchangeDto[id=3203[clientId=2[userId=123456[text= InterActionServlet Test] 
09:28:01,065 DEBUG [impl.DataPushServiceImpl] Sending message to subscriberIds : [99] 
09:28:01,065 DEBUG [impl.DataPushServiceImpl] Known subscribers : [99] 
09:28:01,065 DEBUG [impl.DataPushServiceImpl] Sending LCDS DataMessage to subscriber [99] 
Flex Message (flex.data.messages.DataMessage) 
     operation = create_and_sequence 
     id = {id=3203} 
     clientId = X-BACKEND 
     correlationId = 7E6C3051-FA0F-9183-4745-B90ACACD71EA 
     destination = poc-ds-xchange 
     messageId = 7E6C3051-FA0F-9183-4745-B90ACACD71EA 
     timestamp = 1297412881050 
     timeToLive = 0 
     body = mypackage.model.dto.XchangeDto[id=3203[clientId=2[userId=123456[text= InterActionServlet Test] 
     hdr(DSDstClientId) = 99 
09:28:02,456 ERROR [impl.DataPushServiceImpl] Error while sending message to subscriberId 99 
java.lang.NullPointerException 
    at flex.data.adapters.JavaAdapter.invokeAssemblerSync(JavaAdapter.java:1 741) 
    at flex.data.adapters.JavaAdapter.invokeBatchOperation(JavaAdapter.java: 1630) 
    at flex.data.adapters.JavaAdapter.invoke(JavaAdapter.java:658) 
    at flex.messaging.services.MessageService.serviceMessage(MessageService. java:318) 
    at flex.messaging.services.MessageService.serviceMessage(MessageService. java:233) 
    at mypackage.lcds.service.ds.impl.DataPushServiceImpl.doPush(DataPushSer viceImpl.java:142) 
    at mypackage.lcds.service.ds.impl.DataPushServiceImpl.pushTo1Client(Data PushServiceImpl.java:178) 
    at mypackage.servlet.InteractionServlet.push(InteractionServlet.java:75) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) 
    at java.lang.reflect.Method.invoke(Unknown Source) 
    at org.springframework.web.bind.annotation.support.HandlerMethodInvoker. doInvokeMethod(HandlerMethodInvoker.java:421) 
    at org.springframework.web.bind.annotation.support.HandlerMethodInvoker. invokeHandlerMethod(HandlerMethodInvoker.java:136) 
    at org.springframework.web.servlet.mvc.annotation.AnnotationMethodHandle rAdapter.invokeHandlerMethod(AnnotationMethodHandlerAdapter.java:326) 
    at org.springframework.web.servlet.mvc.annotation.AnnotationMethodHandle rAdapter.handle(AnnotationMethodHandlerAdapter.java:313) 
    at org.springframework.web.servlet.DispatcherServlet.doDispatch(Dispatch erServlet.java:875) 
    at org.springframework.web.servlet.DispatcherServlet.doService(Dispatche rServlet.java:807) 
    at org.springframework.web.servlet.FrameworkServlet.processRequest(Frame workServlet.java:571) 
    at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServl et.java:501) 
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:690) 
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:803) 
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(Appl icationFilterChain.java:290) 
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationF ilterChain.java:206) 
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperV alve.java:233) 
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextV alve.java:175) 
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.j ava:128) 
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.j ava:102) 
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineVal ve.java:109) 
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.jav a:263) 
    at org.apache.coyote.http11.Http11Processor.process(Http11Processor.java :844) 
    at org.apache.coyote.http11.Http11Protocol$Http11ConnectionHandler.proce ss(Http11Protocol.java:584) 
    at org.apache.tomcat.util.net.JIoEndpoint$Worker.run(JIoEndpoint.java:44 7) 
    at java.lang.Thread.run(Unknown Source) 
09:28:02,472 DEBUG [impl.DataPushServiceImpl] Message sent to '99' : false 

==>我究竟做错了什么?

我无法跟踪代码(我没有源),但引发的异常仅仅是没有帮助的。

我是否缺少设置标题?

非常感谢你的帮助,

+0

如果你要构建一个显示bug的测试项目,我可以看看。否则,试图猜测问题是否会过于耗时 – 2011-03-02 11:47:01

回答

1

对于记录,我发现它; O)

知道的基本就是DataServiceTransaction API是一个必须具备的,如果你使用LC数据服务。

对于deails看到Adobe forums thread

对于这里的记录,这是我的工作(基本)代码:

/** 
* ASSUMPTION : the client Flex/Air apps set the desired userId (= filter) as a fillParameter of the 
* DataService.fill() method. This will filter output based on {@link AbstractDto#getUserId()}. 
*/ 

@Service 
public final class DataPushServiceImpl implements DataPushService { 
    private static final Logger LOG = Logger.getLogger(DataPushServiceImpl.class); 
/* *********** V2 : DataServiceTransaction.createItem() ********* */ 
/** 
* Does the push of a single DTO. 
* 
* @param dto 
*   {@link AbstractDto} object. Contains the {@link AbstractDto#getUserId()} that is used by clients to 
*   filter data in the DataService.fill() method (used by the Assembler). 
* 
* @throws DsPushException 
*    if any error 
*/ 
private boolean doPushViaTransaction(final AbstractDto dto) throws DsPushException { 

    if (LOG.isDebugEnabled()) { 
     LOG.debug("Sending message through DataServiceTransaction (see userId field) : " + dto.toString()); 
    } 

    // One MUST instantiate a DataServiceTransaction to be able to send anything (NullPointerException) 
    DataServiceTransaction dtx = null; 
    boolean isOwnerOfTx = false; 
    boolean isSent = false; 
    try { 
     // if already in an Assembler, we do have a tx ==> no commit nor rollback! 
     dtx = DataServiceTransaction.getCurrentDataServiceTransaction(); 
     if (dtx == null) { 
      // new one, no JTA ==> ourselves : commit or rollback 
      isOwnerOfTx = true; 
      //MessageBroker instantiated with SpringFlex is auto-named 
      dtx = DataServiceTransaction.begin("_messageBroker", false); 
     } 

     isSent = this.doTransactionSend(dto, dtx); 

    } catch (Exception e) { 
     // Log exception, but no impact on the back-end business ==> swallow Exception 
     LOG.error("Could not send the creation to LCDS", e); 
     if (isOwnerOfTx) { 
      dtx.rollback(); 
     } 
    } finally { 
     try { 
      if (isOwnerOfTx && (dtx != null)) { 
       dtx.commit(); 
      } 
     } catch (Exception e) { 
      // swallow 
      LOG.error("Could not send the creation to LCDS (@commit of the DataServiceTransaction)", e); 
     } 
    } 

    return isSent; 

} 

private boolean doTransactionSend(final AbstractDto dto, final DataServiceTransaction dtx) { 

    boolean isSent = false; 

    if (dto == null) { 
     LOG.error("The given DTO is null! Nothing happens"); 

    } else { 
     try { 
      dtx.createItem(FlexUtils.DESTINATION_NAME__POC_DS, dto); 
      isSent = true; // no problem 
     } catch (Exception e) { 
      // Log exception, but no impact on the business 
      LOG.error("Could not send the creation to LCDS for DTO " + dto.toString(), e); 
     } finally { 
      if (LOG.isDebugEnabled()) { 
       LOG.debug("DTO : " + dto.toString() + "\n sent : " + String.valueOf(isSent)); 
      } 
     } 
    } 

    return isSent; 
} 

//implementation of DataPushService interface 
/** 
* {@inheritDoc} 
* 
* @see DataPushService#pushNewDTo(AbstractDto, java.lang.Long) 
*/ 
@Transactional(rollbackFor = DsPushException.class) 
public boolean pushNewDTo(final AbstractDto dto, final Long subscriberId) throws DsPushException { 
    return this.doPushViaTransaction(dto); 
} 
} 

享受,感谢您的时间!

G.