1
我是Spark的新手。我试图实现tf-idf。我需要计算每个文档中每个单词出现的次数以及每个文档中的单词总数。火花,关于reduceByKey的小问题
我想减少和可能的另一个操作,但我不知道如何。 这里是我的输入:
对的形式是(documentName , (word, wordCount))
前。
("doc1", ("a", 3)), ("doc1", ("the", 2)), ("doc2", ("a", 5)),
("doc2",("have", 5))
键是文档和值是单词,该单词在该文档中出现多少次。我想计算每个文档中的总词数并可能计算该词的百分比。
输出我想:
("doc1", (("a", 3), 5)) , ("doc1", (("the", 2), 5)),
("doc2", (("a", 5),10)), ("doc2", (("have", 5),10))
我得到
corpus.join(corpus.reduceByKey(lambda x, y : x[1]+y[1]))
起点的效果:
collect_of_docs = [(doc1,text1), (doc2, text2),....]
def count_words(x):
l = []
words = x[1].split()
for w in words:
l.append(((w, x[0]), 1))
return l
sc = SparkContext()
corpus = sc.parallelize(collect_of_docs)
input = (corpus
.flatMap(count_words)
.reduceByKey(add)
.map(lambda ((x,y), z) : (y, (x,z))))
如果可能,我想只作一个减少一个棘手的操作操作员也许。任何帮助表示赞赏:)在此先感谢。
是的,这就是我想要的:))我想出于好奇而多了一件事。假设我想为每个单词计算tf-idf分数,并以doc1 word1 score1 word2 score2的形式展示它们; doc2 word1 score1 word2 score2 word3 score3; ..........这里最有效的方法是什么?我想我只是迭代tfs键和相应的单词,并从dfs中查找单词idf。你会怎么做? – dogacanb
这取决于。如果唯一条目的数量相对较少,那么您可以收集dfs,以dict形式广播,并通过tfs收集flatMap。否则,我会flatMap tfs到'(term,(doc_id,freq))')并且加入'dfs'。 – zero323
即使看起来不一样,它也有很大的不同。使用广播变量不需要洗牌。所以这是一个本地操作。如果dfs很大,那么广播就成了一个限制因素,而混洗/散列连接就成了一个更好的解决方案。 – zero323