2011-07-07 38 views
3

这是关于非常常见的传感器数据处理问题。同步和合并消息/数据流

要同步和合并来自不同来源的传感器数据,我想用Java实现它,而不需要太复杂的第三方库或框架。假设我定义了一个对象(O),它由例如4个属性(A1,... A4)组成。这4个属性来自不同的数据通道,例如套接字通道。

这4个属性一般以1.0〜2.0Hz的速率到达,它们的到达相互独立。 一旦有4个属性(A1,..A4)在同一时间(在一个小的时间窗口内,例如100ms),我从这4个属性构造一个新的对象(O)。

描述性场景如下。 A1〜A4的到达时间点用*标记。

对象O1〜U3分别在t1,t2和t3的时间点构建。 有些属性在t2和t3之间到达,但是对于构造一个对象并不完整,因此它们将被丢弃并被忽略。

A1  *   *   *   * 
    A2  *   *   *  * 
    A3  *   *     * 
    A4  *   *  *   * 
    --------|------------|-----------------|----------> time 
      t1   t2    t3 
      O1   O2    O3 

一些要求:

  1. 确定的时间点a.s.a.p.从最后传入的4个属性构造一个对象。
  2. FIFO,O1必须在O2之前构建,等等。
  3. 减少在Java中的锁定
  4. 最终丢弃数据如果它们没有完成构造一个对象。

一些快速想法上实施有:

  • 存储在时间离散的铲斗的FIFO队列的任何传入属性(每个桶包含4个不同的属性)。
  • 同时运行一个无限的线程来检查FIFO队列(从队列的头部),如果任何存储桶已经被4个不同的属性填充。如果是,则构建一个对象并从队列中移除桶。如果某个存储桶未在特定时间段内填满,则该存储桶将被丢弃。

任何建议和更正是值得欢迎的!

+0

只是为了确保我清楚数据的丢失,例如,如果A2-A4在彼此的〜100 ms内没有爆炸,就应该丢弃A1。因此,如果A1到达并且在150毫秒内没有任何反应,那么A2,A3和A4在大约50ms内到达,由于没有有效的A1,所以不会创建对象。如果新的A1在A2,A3和A4中最早的100ms内到达,则可以创建一个对象,如果不是,A2,A3和A4将被丢弃。 – Rich

+0

数据是否应以某种固定的速率到达,以便您知道他们应该到达什么时间? – Kaj

+0

关于它的更多思考,另一种描述的方式是100ms长的滑动窗口,如果数据到达该窗口中的所有四个点,则可以创建一个对象。 – Rich

回答

0

这不太可能解决您的问题,但它可能会指向您正确的方向。

我会使用谷歌番石榴的MapMaker的第一次尝试:

ConcurrentMap<Key, Bucket> graphs = new MapMaker() 
            .expireAfterAccess(100, TimeUnit.MILLISECOND) 
            .makeComputingMap(new Function<Key, Bucket>() { 
                public Bucket apply(Key key) { 
                 return new Bucket(key); 
                } 
            }); 

这将创建一个映射,其元素就会消失,如果他们没有被100毫秒访问,并创建一个新的水桶时,它被要求对于。

我无法解决的问题正是Key的含义:S你真正想要的是一种队列形式的相同功能。

0

这里的另一个疯狂的想法:

使用一个单一的LinkedBlockingQueue写值从所有传感器A1-A4

分配此队列AtomicReference可变

创建一个定时器任务将切换到此队列(100ms)

从旧队列提取所有数据并查看是否有全部数据A1-A4

如果是,则创建对象,否则放下一切

+0

我认为问题在于您可能会丢失数据--A1可能会在第一个队列的生命周期结束时到达,而另一些可能会在下一个队列的生命周期的开始时到达。因此,这四个数据点已经在时限内到达,但是由于不共享相同的队列,因此不会创建该对象。 – Rich

+0

是的,这是真的..会认为别的东西:) –

0

这是做它的另一种方式 - 它只是伪不过,你需要自己动手写:)

class SlidingWindow { 
    AtomicReference<Object> a1; 
    AtomicReference<Object> a2; 
    AtomicReference<Object> a3; 
    AtomicReference<Object> a4; 

    Queue<Long> arrivalTimes = new Queue(4); 

    public Bucket setA1(Object data) { 
     a1.set(data); 
     now = System.currentTimeInMillis() 
     long oldestArrivalTime = arrivalTimes.pop(); 
     arrivalTimes.push(now); 
     if (now - oldestArrivalTime < 100) { 
      return buildBucket(); 
     } 
     return null; 
    } 

    public Bucket setA2(Object data) { ... 

    ... 

    private Bucket buildBucket() { 
     Bucket b = new Bucket(a1, a2, a3, a4); 
     a1.clear(); 
     a2.clear(); 
     a3.clear(); 
     a4.clear(); 
     return b; 
    } 

} 
0

你可以做这样的事情,get操作被阻塞直到数据到达,add操作没有被阻塞。 get操作可能会进行一些优化,以便将候选人保留在并列结构中,以便在筛选出旧项目时不需要遍历所有候选项。然而,迭代4个项目应该足够快。

import java.util.HashMap; 
import java.util.Iterator; 
import java.util.concurrent.LinkedBlockingQueue; 

public class Filter<V> { 

    private static final long MAX_AGE_IN_MS = 100; 

    private final int numberOfSources; 

    private final LinkedBlockingQueue<Item> values = new LinkedBlockingQueue<Item>(); 

    public Filter(int numberOfSources) { 
     this.numberOfSources = numberOfSources; 
    } 

    public void add(String source, V data) { 
     values.add(new Item(source, data)); 
    } 

    public void get() throws InterruptedException { 
     HashMap<String, Item> result = new HashMap<String, Item>(); 
     while (true) { 
      while (result.size() < numberOfSources) { 
       Item i = values.take(); 
       result.put(i.source, i); 
       if (result.size() == numberOfSources) { 
        break; 
       } 
      } 
      //We got candidates from each source now, check if some are too old. 
      long now = System.currentTimeMillis(); 
      Iterator<Item> it = result.values().iterator(); 
      while (it.hasNext()) { 
       Item item = it.next(); 
       if (now - item.creationTime > MAX_AGE_IN_MS) { 
        it.remove(); 
       } 
      } 
      if (result.size() == numberOfSources) { 
       System.out.println("Got result, create a result object and return the items " + result.values()); 
       break; 
      } 
     } 
    } 

    private class Item { 
     final String source; 
     final V value; 
     final long creationTime; 

     public Item(String source, V value) { 
      this.source = source; 
      this.value = value; 
      this.creationTime = System.currentTimeMillis(); 
     } 

     public String toString() { 
      return String.valueOf(value); 
     } 
    } 


    public static void main(String[] args) throws Exception { 
     final Filter<String> filter = new Filter<String>(4); 
     new Thread(new Runnable() { 
      public void run() { 
       try { 
        filter.get(); 
       } catch (InterruptedException e) { 
        e.printStackTrace(); 
       } 
      } 
     }).start(); 

     filter.add("a0", "va0.1"); 
     filter.add("a0", "va0.2"); 
     Thread.sleep(2000); 
     filter.add("a0", "va0.3"); 
     Thread.sleep(100); 
     filter.add("a1", "va1.1"); 
     filter.add("a2", "va2.1"); 
     filter.add("a0", "va0.4"); 
     Thread.sleep(100); 
     filter.add("a3", "va3.1"); 
     Thread.sleep(10); 
     filter.add("a1", "va1.2"); 
     filter.add("a2", "va2.2"); 
     filter.add("a0", "va0.5"); 

    } 


}