2016-02-20 53 views
3

我想在Spark HDInsight群集上运行python wordcount,并从Jupyter运行它。我并不确定这是否是正确的方法,但我找不到有关如何在HDInsight Spark群集上提交独立Python应用程序的任何帮助。如何从Jupyter提交HDInsight Spark群集上的python wordcount

代码:

import pyspark 
import operator 
from pyspark import SparkConf 
from pyspark import SparkContext 
import atexit 
from operator import add 
conf = SparkConf().setMaster("yarn-client").setAppName("WC") 
sc = SparkContext(conf = conf) 
atexit.register(lambda: sc.stop()) 

input = sc.textFile("wasb:///example/data/gutenberg/davinci.txt") 
words = input.flatMap(lambda x: x.split()) 
wordCount = words.map(lambda x: (str(x),1)).reduceByKey(add) 

wordCount.saveAsTextFile("wasb:///example/outputspark") 

和错误消息我得到的和不理解:

ValueError        Traceback (most recent call last) 
<ipython-input-2-8a9d4f2cb5e8> in <module>() 
     6 from operator import add 
     7 import atexit 
----> 8 sc = SparkContext('yarn-client') 
     9 
    10 input = sc.textFile("wasb:///example/data/gutenberg/davinci.txt") 

/usr/hdp/current/spark-client/python/pyspark/context.pyc in __init__(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer, conf, gateway, jsc, profiler_cls) 
    108   """ 
    109   self._callsite = first_spark_call() or CallSite(None, None, None) 
--> 110   SparkContext._ensure_initialized(self, gateway=gateway) 
    111   try: 
    112    self._do_init(master, appName, sparkHome, pyFiles, environment, batchSize, serializer, 

/usr/hdp/current/spark-client/python/pyspark/context.pyc in _ensure_initialized(cls, instance, gateway) 
    248       " created by %s at %s:%s " 
    249       % (currentAppName, currentMaster, 
--> 250        callsite.function, callsite.file, callsite.linenum)) 
    251     else: 
    252      SparkContext._active_spark_context = instance 

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=pyspark-shell, master=yarn-client) created by __init__ at <ipython-input-1-86beedbc8a46>:7 

它实际上是可以运行python工作这种方式?如果是的话 - 这似乎是SparkContext定义的问题...我尝试不同的方法:

sc = SparkContext('spark://headnodehost:7077', 'pyspark') 

conf = SparkConf().setMaster("yarn-client").setAppName("WordCount1") 
sc = SparkContext(conf = conf) 

,但没有成功。运行作业或配置SparkContext的正确方式是什么?

回答

0

看起来我可以自己回答我的问题。 在代码中的一些变化被证明是有益的:

conf = SparkConf() 
conf.setMaster("yarn-client") 
conf.setAppName("pyspark-word-count6") 
sc = SparkContext(conf=conf) 
atexit.register(lambda: sc.stop()) 

data = sc.textFile("wasb:///example/data/gutenberg/davinci.txt") 
words = data.flatMap(lambda x: x.split()) 
wordCount = words.map(lambda x: (x.encode('ascii','ignore'),1)).reduceByKey(add) 

wordCount.saveAsTextFile("wasb:///output/path") 
0

我只是解决了我的代码类似的bug找到它下降到pyspark只接受来自SparkContext一个对象()的事实。一旦提交,任何更改和代码运行都会触发该问题并返回错误消息初始化。我的解决方案只是重新启动平台内核,并在重新启动笔记本以重新运行笔记本脚本时。然后它运行没有错误。

1

如果从Jupyter笔记本运行而不是Spark上下文是为您预先创建的,创建单独的上下文将会不正确。为了解决这个问题,只是删除创建上下文的线条,并直接开始从:

input = sc.textFile("wasb:///example/data/gutenberg/davinci.txt") 

如果你需要独立运行程序,你可以在命令行使用pyspark运行或使用使用李维服务器上运行的REST API提交在集群上。

相关问题