2015-11-15 67 views
1

我是Spark的新手。我试图实现tf-idf。我需要计算每个文档中每个单词出现的次数以及每个文档中的单词总数。火花,关于reduceByKey的小问题

我想减少和可能的另一个操作,但我不知道如何。 这里是我的输入:

对的形式是(documentName , (word, wordCount))前。

("doc1", ("a", 3)), ("doc1", ("the", 2)), ("doc2", ("a", 5)), 
    ("doc2",("have", 5)) 

键是文档和值是单词,该单词在该文档中出现多少次。我想计算每个文档中的总词数并可能计算该词的百分比。

输出我想:

("doc1", (("a", 3), 5)) , ("doc1", (("the", 2), 5)), 
    ("doc2", (("a", 5),10)), ("doc2", (("have", 5),10)) 

我得到

corpus.join(corpus.reduceByKey(lambda x, y : x[1]+y[1])) 

起点的效果:

collect_of_docs = [(doc1,text1), (doc2, text2),....] 

def count_words(x): 
    l = [] 
    words = x[1].split() 
    for w in words: 
     l.append(((w, x[0]), 1)) 
    return l 

sc = SparkContext() 
corpus = sc.parallelize(collect_of_docs) 
input = (corpus 
    .flatMap(count_words) 
    .reduceByKey(add) 
    .map(lambda ((x,y), z) : (y, (x,z)))) 

如果可能,我想只作一个减少一个棘手的操作操作员也许。任何帮助表示赞赏:)在此先感谢。

回答

1

一般来说,flatMap只是为了稍后收集您的数据没有意义。我假设你的数据看起来或多或少是这样的:

collect_of_docs = sc.parallelize([ 
    (1, "Lorem ipsum dolor sit amet, consectetur adipiscing elit."), 
    (2, "Mauris magna sem, vehicula sed dictum finibus, posuere id ipsum."), 
    (3, "Duis eleifend molestie dolor, quis fringilla eros facilisis ac.")]) 

首先,我们将使用一个基本的正则表达式需要一些助手和Counter

from __future__ import division # If for some reason you use Python 2.x 
import re 
from collections import Counter 

def count_words(doc, pattern=re.compile("\w+")): 
    """Given a tuple (doc_id, text) 
    return a tuple (doc_id, tokens_count 

    >>> count_words((1, "Foo bar bar.")) 
    (1, Counter({'Foo': 1, 'bar': 2})) 
    """ 
    (doc_id, text) = doc 
    return (doc_id, Counter(pattern.findall(text))) 

def compute_tf(cnt): 
    """Convert term counter to term frequency 

    >>> compute_tf(Counter({'Foo': 1, 'bar': 2})) 
    {'Foo': 0.3333333333333333, 'bar': 0.6666666666666666} 
    """ 
    n = sum(cnt.values()) 
    return {k: v/n for (k, v) in cnt.items()} 

和最终结果:

tfs = (collect_of_docs 
    .map(count_words) 
    .mapValues(compute_tf)) 

tfs.sortByKey().first() 

## (1, 
## {'Lorem': 0.125, 
## 'adipiscing': 0.125, 
## 'amet': 0.125, 
## 'consectetur': 0.125, 
## 'dolor': 0.125, 
## 'elit': 0.125, 
## 'ipsum': 0.125, 
## 'sit': 0.125}) 

使用上述文件频率可计算如下:

from operator import add 

dfs = (tfs 
    .values() 
    .flatMap(lambda kv: ((k, 1) for k in kv.keys())) 
    .reduceByKey(add)) 

dfs.sortBy(lambda x: -x[1]).take(5) 

## [('ipsum', 2), 
## ('dolor', 2), 
## ('consectetur', 1), 
## ('finibus', 1), 
## ('fringilla', 1)] 
+0

是的,这就是我想要的:))我想出于好奇而多了一件事。假设我想为每个单词计算tf-idf分数,并以doc1 word1 score1 word2 score2的形式展示它们; doc2 word1 score1 word2 score2 word3 score3; ..........这里最有效的方法是什么?我想我只是迭代tfs键和相应的单词,并从dfs中查找单词idf。你会怎么做? – dogacanb

+0

这取决于。如果唯一条目的数量相对较少,那么您可以收集dfs,以dict形式广播,并通过tfs收集flatMap。否则,我会flatMap tfs到'(term,(doc_id,freq))')并且加入'dfs'。 – zero323

+1

即使看起来不一样,它也有很大的不同。使用广播变量不需要洗牌。所以这是一个本地操作。如果dfs很大,那么广播就成了一个限制因素,而混洗/散列连接就成了一个更好的解决方案。 – zero323