2017-06-06 81 views
1

我有这样的路径下保存的这2个文件:PySpark覆盖加入sc.addPyFile

C:\代码\ SAMPLE1 \ main.py

def method(): 
    return "this is sample method 1" 

C:\代码\ SAMPLE2 \ main.py

def method(): 
    return "this is sample method 2" 

然后我运行此:

from pyspark import SparkContext 
from pyspark.sql import SparkSession 

sc = SparkContext() 
spark = SparkSession(sc) 

sc.addPyFile("~/code/sample1/main.py") 
main1 = __import__("main") 
print(main1.method()) # this is sample method 1 

sc.addPyFile("~/code/sample2/main.py") # Error 

错误是

Py4JJavaError:调用o21.addFile时发生错误。文件C:\ Users \ hans.yulian \ AppData \ Local \ Temp \ spark-5da165cf-410f-4576-8124-0ab23aba6aa3 \ userFiles-25a7ca23-84fb-42b7-95d9-206867fb9dfd \ main.py存在并且与/C:/Users/hans.yulian/Documents/spark-test/main2/main.py的内容不匹配

这意味着它的临时文件中已经有“main.py”文件文件夹和内容不同。我不知道是否有这种情况下,任何解决办法,但对我来说,我有以下的限制:

  1. 的文件名还是要“main.py”,只有文件夹可以 不同
  2. 这没关系以某种方式清除临时文件夹添加AGA
  3. 在另一个文件中,我有唯一的解决方法是通过在main.py的前面附加随机 字符串,例如abcdemain.pyfghijmain.py,那么我会import main = __import __(“abcdemain”), ,但是这个人是不是真的最好

回答

1

虽然在技术上是可行的,通过设置spark.files.overwrite"true"

from pyspark import SparkConf, SparkContext 

sc = SparkContext(conf=SparkConf().set("spark.files.overwrite", "true")) 

,并在简单的情况下,会给出正确的结果

def f(*_):                 
    from main import method 
    return [method()] 

sc.addFile("/path/to/sample1/main.py") 
sc.parallelize([], 3).mapPartitions(f).collect() 
['this is sample method 1', 
'this is sample method 1', 
'this is sample method 1'] 
sc.addFile("/path/to/sample2/main.py") 

sc.parallelize([], 3).mapPartitions(f).collect() 
['this is sample method 2', 
'this is sample method 2', 
'this is sample method 2'] 

它在实践中将不可靠,即使您在每个访问模块上使用reload,也会使您的应用程序难以推理。由于Spark可能会隐式地缓存某些对象,或者透明地重新启动Python工作器,所以在不同节点可以看到源的不同状态的情况下,您可能会很容易结束。