2015-12-30 38 views
3

我已经创建了一个固定的线程池来处理每300毫秒发送一个事件,并假定该进程需要1000毫秒。假设多线程将工作,但只有一个线程被重用。RxJava - 为什么执行者只使用一个线程

如果我设置的sleepTime小于300毫秒,处理线程改变,但是没用。

问题:我能做些什么来使它并发?为什么程序重用线程?

预先感谢您

public static void main(String[] args) throws InterruptedException { 
    long sleepTime = 1000; 
    ExecutorService e = Executors.newFixedThreadPool(3); 

    Observable.interval(300, TimeUnit.MILLISECONDS) 
    .subscribeOn(Schedulers.computation()) 
    .flatMap(new Func1<Long, Observable<Long>>() { 
     @Override 
     public Observable<Long> call(Long pT) { 
      return Observable.just(pT).subscribeOn(Schedulers.from(e)); 
     } 
    }) 
    .doOnNext(new Action1<Long>() { 

     @Override 
     public void call(Long pT) { 
      try { 
       Thread.sleep(sleepTime); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 

     } 
    }) 
    .subscribe(new Action1<Long>() { 

     @Override 
     public void call(Long pT) { 
      System.out.println("i am " + pT + "in thread:" + Thread.currentThread().getName()); 

     } 
    }); 


    Thread.sleep(50000); 
    e.shutdownNow(); 

} 

日志

i am 0in thread:pool-1-thread-1 
i am 1in thread:pool-1-thread-1 
i am 2in thread:pool-1-thread-1 
i am 3in thread:pool-1-thread-1 
i am 4in thread:pool-1-thread-1 
i am 5in thread:pool-1-thread-1 
i am 6in thread:pool-1-thread-1 
i am 7in thread:pool-1-thread-1 
i am 8in thread:pool-1-thread-1 
i am 9in thread:pool-1-thread-1 
i am 10in thread:pool-1-thread-1 
i am 11in thread:pool-1-thread-1 
+0

请注意:您可以使用jvisualvm来更加可靠地计算出进度计划和使用哪些线程:http://docs.oracle.com/javase/6/docs/technotes/tools/ share/jvisualvm.html –

+0

@ReutSharabani在eclipse Debug视图中,我可以看到生成的线程,但程序只重用一个线程。 – Rockman12352

回答

0

据我了解在你的代码,生产者在比用户更快的速度生产。但是Observable<Long> interval(long interval, TimeUnit unit)实际上不支持Backpressure。文档指出

该操作员不支持背压,因为它使用时间。如果下游的 需要较慢,则应该减慢计时器或使用类似于{@link #onBackpressureDrop}的东西 。

如果你的处理真的比生产速度较慢,你可以在你的用户代码做的就是这样的事情

.subscribe(new Action1<Long>() { 

    @Override 
    public void call(Long pT) { 
     e.submit(new Runnable() { 
      System.out.println("i am " + pT + "in thread:" + Thread.currentThread().getName()); 

     } 
    } 
}); 
+0

当然,我可以像你说的那样把任务提交给不同的线程。但我想用Scheduler自然地做到这一点。 – Rockman12352

+0

@ Rockman12352我同意,但是据我所知,Observable将执行整个执行过程(从生产者到订阅者)以单线程发射。每个生产者的“长”数据的含义,它将在单线程中调用所有订阅者。我可能在这里是错的,但这就是我迄今得到的结果 – Wins

0

相反

.subscribeOn(Schedulers.computation()) 

尝试

.observeOn(Schedulers.computation()) 

这个我以前玩过机智的例子h与Rx的并发工作相当不错,例如

public class ObservableZip { 

private Scheduler scheduler; 
private Scheduler scheduler1; 
private Scheduler scheduler2; 

@Test 
public void testAsyncZip() { 
    scheduler = Schedulers.newThread(); 
    scheduler1 = Schedulers.newThread(); 
    scheduler2 = Schedulers.newThread(); 
    long start = System.currentTimeMillis(); 
    Observable.zip(obAsyncString(), obAsyncString1(), obAsyncString2(), (s, s2, s3) -> s.concat(s2) 
                         .concat(s3)) 
       .subscribe(result -> showResult("Async:", start, result)); 
} 




public void showResult(String transactionType, long start, String result) { 
    System.out.println(result + " " + 
           transactionType + String.valueOf(System.currentTimeMillis() - start)); 
} 


public Observable<String> obAsyncString() { 
    return Observable.just("") 
        .observeOn(scheduler) 
        .doOnNext(val -> { 
         System.out.println("Thread " + Thread.currentThread() 
                   .getName()); 
        }) 
        .map(val -> "Hello"); 
} 

public Observable<String> obAsyncString1() { 
    return Observable.just("") 
        .observeOn(scheduler1) 
        .doOnNext(val -> { 
         System.out.println("Thread " + Thread.currentThread() 
                   .getName()); 
        }) 
        .map(val -> " World"); 
} 

public Observable<String> obAsyncString2() { 
    return Observable.just("") 
        .observeOn(scheduler2) 
        .doOnNext(val -> { 
         System.out.println("Thread " + Thread.currentThread() 
                   .getName()); 
        }) 
        .map(val -> "!"); 
    } 

} 
+0

它不起作用。在你的例子中,3个observable来自不同的线程,所以它自然是多线程的。但在我的情况下,我想把它分成一个池。 – Rockman12352

+0

但是每次调用Schedulers.computation()都不会给你一个新的线程? – paul

+0

是的,我会尝试另一台电脑 – Rockman12352

0

我在GitHub上找到了答案!

内部observable确实发射在多线程上,但接下来做的不是。如果我想让它平行,我应该在内部可观察的情况下做。

+0

你有链接或例子吗? – Christoph

相关问题