2011-03-10 43 views
48

任何人都可以指向一个简单的,开源的Map/Reduce framework/API for Java吗?目前似乎没有太多证据表明这种事情存在,但其他人可能会有所不同。简单的Java地图/ Reduce框架

我能找到的最好的当然是Hadoop MapReduce,但是没有通过“简单”的标准。我不需要运行分布式作业的能力,只需要使用标准的Java5风格的并发,就可以在单核JVM中在多核机器上运行map/reduce-style作业。

写自己并不难,但我宁愿不必。

+3

我碰到这个视频当中将通知有关使用Java 8新功能。它似乎有将在新版本中的MapReduce API来了。 http://www.youtube.com/watch?v=47_Em-zc7_Q – gigadot 2011-11-26 15:43:42

+0

我很想知道您当前的解决方案是针对此问题的。我只是寻找快速,简单的方法来在单台机器上并行执行Lists.transform(函数)。 – Snekse 2013-04-12 16:54:13

+6

LeoTask的作品。这是一个在多核机器上运行并结果汇总框架的并行任务。 https://github.com/mleoking/leotask – 2015-01-08 14:05:36

回答

10

我认为这是值得一提的是,这些问题是历史如Java 8.一种示例的:

int heaviestBlueBlock = 
    blocks.filter(b -> b.getColor() == BLUE) 
      .map(Block::getWeight) 
      .reduce(0, Integer::max); 

换句话说:单节点的MapReduce是Java 8可用。

有关更多细节,参见Brian Goetz's presentation about project lambda

+0

假设它是成立的,是的。历史告诉我们,有趣的东西通常会被踢出去。 – skaffman 2012-01-17 19:13:06

+4

@skaffman:如果lambda最终没有成功,我会哭的! – 2012-01-17 21:53:53

+1

我(非常迟来)接受了这个答案,因为随着Java8的采用,其他选项将很快变得不合时宜。 – skaffman 2014-07-05 09:47:02

9

我用以下结构

int procs = Runtime.getRuntime().availableProcessors(); 
ExecutorService es = Executors.newFixedThreadPool(procs); 

List<Future<TaskResult>> results = new ArrayList(); 
for(int i=0;i<tasks;i++) 
    results.add(es.submit(new Task(i))); 
for(Future<TaskResult> future:results) 
    reduce(future); 
+6

嗯...这不是map-reduce,那只是一个裸体执行者。 – skaffman 2011-03-10 13:39:47

+0

你想要简单。循环将工作映射为“任务”任务,并可用于合并或减少个别结果。可选的结果可以存储在未来。 – 2011-03-10 14:07:46

+0

我意识到我*可以*写我自己的地图/减少框架,但我不*想要*。这很复杂,想要使用现成的通用解决方案。 – skaffman 2011-03-10 14:09:19

6

我创建了一个一次性为自己几年前,当我得到了一个8核的机器,但我不与它非常高兴。我从来没有像我所希望的那样简单地使用它,而内存密集型任务并没有很好地扩展。

如果你没有得到任何真正答案我可以分享更多,但它的核心是:

public class LocalMapReduce<TMapInput, TMapOutput, TOutput> { 
    private int m_threads; 
    private Mapper<TMapInput, TMapOutput> m_mapper; 
    private Reducer<TMapOutput, TOutput> m_reducer; 
    ... 
    public TOutput mapReduce(Iterator<TMapInput> inputIterator) { 
     ExecutorService pool = Executors.newFixedThreadPool(m_threads); 
     Set<Future<TMapOutput>> futureSet = new HashSet<Future<TMapOutput>>(); 
     while (inputIterator.hasNext()) { 
      TMapInput m = inputIterator.next(); 
      Future<TMapOutput> f = pool.submit(m_mapper.makeWorker(m)); 
      futureSet.add(f); 
      Thread.sleep(10); 
     } 
     while (!futureSet.isEmpty()) { 
      Thread.sleep(5); 
      for (Iterator<Future<TMapOutput>> fit = futureSet.iterator(); fit.hasNext();) { 
       Future<TMapOutput> f = fit.next(); 
       if (f.isDone()) { 
        fit.remove(); 
        TMapOutput x = f.get(); 
        m_reducer.reduce(x); 
       } 
      } 
     } 
     return m_reducer.getResult(); 
    } 
} 

编辑:根据评论,以下是未sleep版本。诀窍是使用CompletionService,它基本上提供了完成Future s的阻塞队列。

public class LocalMapReduce<TMapInput, TMapOutput, TOutput> { 
    private int m_threads; 
    private Mapper<TMapInput, TMapOutput> m_mapper; 
    private Reducer<TMapOutput, TOutput> m_reducer; 
    ... 
    public TOutput mapReduce(Collection<TMapInput> input) { 
     ExecutorService pool = Executors.newFixedThreadPool(m_threads); 
     CompletionService<TMapOutput> futurePool = 
        new ExecutorCompletionService<TMapOutput>(pool); 
     Set<Future<TMapOutput>> futureSet = new HashSet<Future<TMapOutput>>(); 
     for (TMapInput m : input) { 
      futureSet.add(futurePool.submit(m_mapper.makeWorker(m))); 
     } 
     pool.shutdown(); 
     int n = futureSet.size(); 
     for (int i = 0; i < n; i++) { 
      m_reducer.reduce(futurePool.take().get()); 
     } 
     return m_reducer.getResult(); 
    } 

我还会注意到,这是一个非常蒸馏水的map-reduce算法,包括单减少工人这确实既减少和合并操作。

+0

缺少按键排序减少值,因此减少部分不像在Hadoop中那样并行化。 – yura 2011-03-10 14:44:04

+0

@yura:确实。这是我不想担心的那种微调的微妙之处。 – skaffman 2011-03-10 14:48:13

+0

Thread.sleep不好:) – 2011-05-07 20:01:00

8

我意识到这可能有点后,但你可能想看看JDK7的JSR166y ForkJoin类。

有一个后台移植的库,可以在JDK6下正常工作,因此您不必等到下个千年才能使用它。它位于原始执行程序和hadoop之间,为当前JVM中的map reduce工作提供框架。

3

你看过GridGain吗?

+0

GridGain非常好,也许是最好的,但非常昂贵,他们不支持社区版本。 即使社区版3.6的文件也无法下载。 我不推荐用于简单目的的网格增益。只要你有一个大项目和一个非常大的公司。 由于这个原因,我推荐Akka。 – felipe 2013-03-25 19:10:28

+0

它们于2014年3月重新开放。 – CheatEx 2014-04-20 11:42:07

18

你退房Akka?虽然akka实际上是一个基于分布式Actor模型的并发框架,但只需很少的代码就可以实现很多事情。将工作分成几部分很容易,它可以自动充分利用多核机器,并且可以使用多台机器来处理工作。不像使用线程,我感觉更自然。

我有一个Java map reduce example使用akka。这不是最简单的地图缩小示例,因为它利用了期货;但它应该给你一个粗略的想法。我的地图缩小示例演示了几件主要的事情:

  • 如何划分工作。
  • 如何分配工作:akka拥有一个非常简单的消息传递系统,可以作为工作分配器,您可以配置其工作计划。一旦我学会了如何使用它,我无法停下来。它非常简单和灵活。我立即使用了全部四个CPU内核。这对于实现服务非常有用。
  • 如何知道工作完成的时间以及结果是否已准备好处理:除非您已熟悉期货,否则这部分实际上可能是最难理解和难以理解的部分。您不需要使用期货,因为还有其他选择。我只是用它们,因为我想要让人们更短的时间让他们开心。

如果您有任何问题,StackOverflow实际上有一个很棒的akka​​ QA部分。

5

我喜欢用Skandium在Java中进行并行处理。该框架为具有共享内存的多核机器实现了某些并行性模式(即主从,映射/减少,管道,分叉和划分& Conquer)。这种技术被称为“算法骨架”。这些模式可以嵌套。

详细有骨骼和肌肉。肌肉做实际工作(分裂,合并,执行和条件)。除了“While”,“For”和“If”之外,骨架代表并行性的模式,在嵌套模式时可能很有用。

可以在框架内找到示例。我需要一点点来了解如何使用肌肉和骨骼,但在克服这个障碍之后,我非常喜欢这个框架。 :)

+0

这不会被积极开发。 – 2013-03-11 12:36:58

+0

伤心,但是确实如此。想要在几天前访问他们的网站,似乎他们在今年年初拉了它。所以如果没有人觉得自己有义务维护包(它是开源的),那么就不会有任何更新。也许我会在下一次寻找替代品,但我真的很满意。 – 2013-03-12 17:20:42

0

甲MapReduce的API导入Hazelcast的V3.2(见MapReduce API section in the docs)。虽然Hazelcast旨在用于分布式系统,但它在单个节点设置中工作得非常好,而且它相当轻巧。

0

您可以尝试LeoTask:并行任务运行和结果聚合框架

它是自由和开放源码:https://github.com/mleoking/leotask

下面是一个简单的介绍显示出它的API:https://github.com/mleoking/leotask/blob/master/leotask/introduction.pdf?raw=true

这是使用所有可用的CPU核心在单台计算机上工作的轻量级框架。

它具有以下特点:

  • 自动&平行参数太空探索
  • 灵活&基于配置的结果聚合
  • 编程模型只专注于关键逻辑
  • 可靠&自动中断回收

和实用程序:

  • 动态&可复制的网络结构。
  • 集成的Gnuplot
  • 网络生成根据公共网络模型
  • DelimitedReader:一个复杂的阅读器,探索CSV(逗号分隔值)文件,如一个数据库
  • 快速随机数发生器基于Mersenne扭曲算法
  • 一个集成CurveFitter从ImageJ的项目
+0

这是一则广告。 – 2015-01-15 21:31:37