2013-03-21 47 views
1

我有一个HashSet。 Occasionlay,将新值添加到此哈希集。我正在尝试做的是在计时器添加完成后的一分钟内从计时器中删除每个元素。无功扩展计时器

我还是rx的新手,但这似乎是使用它的理想场合。

我想是这样的:

AddItem(string item) 
{ 
    _mySet.Add(item); 
    var timer = Observable.Timer(TimeSpan.FromSeconds(60), _scheduler); 
    timer 
     .Take(1) 
     .Do(item => RemoveItem(item)) 
     .Subscribe(_ => Console.WriteLine("Removed {0}", item)); 
} 

似乎好的工作(通过单元测试)。

有没有人看到这种方法有什么问题?

+0

AddItem(”foo“); 30秒后的AddItem(”foo“);? – Phil 2013-03-24 14:23:28

回答

2
  1. 你在Do调用拉姆达不看的权利 - Observable.Timer产生int值,但你的集合是一个HashSet<string> - 这不应该编译。我猜这只是一个错字。

  2. Do:一般而言,您的订阅应该在Subscribe之后完成。 Do是用于副作用(我不喜欢流中的副作用的想法,所以我避免它,但它对调试很有用)。

  3. TakeObservable.Timer只产生一个价值在终止前,因此没有必要为Take操作

,我会写你的函数:

AddItem(string item) 
{ 
    _mySet.Add(item); 
    Observable.Timer(TimeSpan.FromSeconds(60), _scheduler) 
     .Subscribe(_ => RemoveItem(item)); 
} 
-2

对不起,不是故意挑你,但:

ALWAYS处置IDISPOSABLES !!!!!

(编辑:好吧,不知道我今天早上把我的咖啡,但我回答废话的整个混乱的挫折感,我将离开上述只是因为一般,你想确保处置任何IDisposable,但在努力弥补下面的潺潺......)

调用到 Subscribe创建一个订阅,你是不是配置,所以多次调用该方法只是要排队越来越多的废话 - 现在在这个特定的情况下,它不是世界末日,因为 Timer只发射一次,但仍然...处置!

如果你真的想使用这种方法(我认为更好的方法是让一些正在运行的线程/任务“倾向于”你的值,当它认为它的必要时删除),至少尝试类似于:

好的,忽略所有这些剔除的废话。的Observable.Timer实现是这样的:

public static IObservable<long> Timer(TimeSpan dueTime) 
{ 
    return s_impl.Timer(dueTime); 
} 

进而调用这个:

这就要求...

private static IObservable<long> Timer_(TimeSpan dueTime, IScheduler scheduler) 
{ 
    return new Timer(dueTime, null, scheduler); 
} 

这里的事情变得有趣 - TimerProducer<long>,其中肉味的位是:

private IDisposable InvokeStart(IScheduler self, object state) 
{ 
    this._pendingTickCount = 1; 
    SingleAssignmentDisposable disposable = new SingleAssignmentDisposable(); 
    this._periodic = disposable; 
    disposable.Disposable = self.SchedulePeriodic<long>(1L, this._period, new Func<long, long>(this.Tock)); 
    try 
    { 
     base._observer.OnNext(0L); 
    } 
    catch (Exception exception) 
    { 
     disposable.Dispose(); 
     exception.Throw(); 
    } 
    if (Interlocked.Decrement(ref this._pendingTickCount) > 0) 
    { 
     SingleAssignmentDisposable disposable2 = new SingleAssignmentDisposable { 
      Disposable = self.Schedule<long>(1L, new Action<long, Action<long>>(this.CatchUp)) 
     }; 
     return new CompositeDisposable(2) { disposable, disposable2 }; 
    } 
    return disposable; 
} 

现在,base._observer.OnNext,这是内部的水槽设置为触发的计时器滴答,其中在该Invoke是:

private void Invoke() 
{ 
    base._observer.OnNext(0L); 
    base._observer.OnCompleted(); 
    base.Dispose(); 
} 

所以,是的。它自动处理自己 - 并且不会有任何“闲置的订阅”浮动。

嗯....乌鸦很好吃。:|

+0

JerKimball,我认为在Take(1)满足后,源自动处理,如果我添加了一个OnCompleted hander,我就会看到它被调用,当OnCompleted被调用时,源是否被处置? – Flack 2013-03-21 17:12:49

+1

@JerKimbal,这是不正确的建议,请删除它。在Rx的上下文中的IDisposable不会**需要处置。在Rx中Disposing有一个特殊的含义,断开订阅** early **。没有内存泄漏,没有任何问题示例代码 – 2013-03-21 20:03:19

+0

对不起@JerKimball,你通常对Rx很好,但Paul和Flack是正确的 - 一旦序列完成,订阅就会自动“处置”(取消订阅)。 – AlexFoxGill 2013-03-21 20:06:31

0

如果您使用ReactiveUI,一类叫做ReactiveCollection肯定会帮助这里,你可以使用这样的:

theCollection.ItemsAdded 
    .SelectMany(x => Observable.Timer(TimeSpan.FromSeconds(60), _scheduler).Select(_ => x)) 
    .Subscribe(x => theCollection.Remove(x)); 
2

你不需要创建一个序列来做到这一点。您已经成为一名优秀的公民,并使用Scheduler明确性,所以请使用它!

你可以只是这对你的代码

AddItem(string item) 
{ 
    _mySet.Add(item); 
    //Note this does return an IDisposable if you want to cancel the subscription. 
    _scheduler.Schedule(
    TimeSpan.FromSeconds(60), 
    ()=> 
    { 
     RemoveItem(item); 
     Console.WriteLine("Removed {0}", item); 
    }); 
} 

这基本上意味着有少得多的工作在幕后继续。考虑Observable.Timer方法正在进行的所有工作,实际上您只希望执行的所有工作是使用一个值(您忽略)来计划OnNext。

我也会假设即使是一个对Rx一无所知的用户也能够阅读这个时间表代码。即。 “添加此项目后,我安排此删除操作在60秒内运行)

+0

这不是功能上非常考虑... – 2013-03-22 22:39:46

+0

大声笑。我只是指出这与问题中的代码达到了相同的结果。问题不在于如何改用新图书馆,而是“更实用”。正如Flack在他的问题中指出的那样,原始代码可以工作并通过单元测试。我只是建议他不需要使用序列。 – 2013-03-23 20:19:04