2016-04-27 52 views
2

我面对陌生的行为,而使用火花与一个主站和HDFS上的三个工作节点救了我的解析XML文件,问题是星火将结果保存到HDFS

当我解析XMLFILE,并试图在HDFS保存接档无法保存所有解析结果。

,当我通过指定

sc = SparkContext("local", "parser") 

and the spark-submit will be ./bin/spark-submit xml_parser.py 

执行本地模式相同的代码,这个运行提供了与完整的记录HDFS 117MB解析的文件。

,并在火花客户端模式下执行代码,然后我做了以下的情况下,

sc = SparkContext("spark://master:7077", "parser") 

和火花提交的,

./bin/spark-submit --master yarn-client --deploy-mode client --driver-memory 7g --executor-memory 4g --executor-cores 2 xml_parser.py 1000 

给我的不完全记录HDFS 19MB文件。

保存结果在这两种情况下,我使用rdd.saveAsTextFile( “HDFS://”)

我使用spark1.6.1-hadoop2.6 和Apache的Hadoop 2.7.2

任何人都可以帮助我。我不明白为什么它会发生。 我有以下sparkCluster,

1主8GbRAM

2- workerNode1 8GbRAM

3- WorkerNode2 8GbRAM

4- workerNode3 8GbRAM

和我上面簇上配置Hadoop的2.7。2 1个主3的DataNode,

如果我太平绅士在severNode给我,

24097硕士

21652 JPS

23398的NameNode

23799 ResourceManager的

23630 SecondaryNameNode

JPS上的所有的DataNodes,

8006工人

7819节点管理器

27164 JPS

7678的DataNode

通过检查HadoopNameNode UI主:9000给我的三米现场的DataNodes

通过在主人上检查SparkMaster Ui:7077给我三名现场工作人员

请有看这里,

sc = SpakContext("spark://master:7077", "parser") 
-------------------------------------------- 
contains the logic of XMLParsing 
-------------------------------------------- 
and I am appending the result in one list like, 
cc_list.append([final_cll, Date,Time,int(cont[i]), float(values[i]),0]) 
Now I am Parallelizing the above cc_list like 
parallel_list = sc.parallelize(cc_list) 
parallel_list.saveAsTextFile("hdfs://master:9000/ some path") 
Now I am Doing some operations here. 
new_list = sc.textFile("hdfs://localhost:9000/some path/part-00000).map(lambda line:line.split(',')) 

result = new_list.map(lambda x: (x[0]+', '+x[3],float(x[4]))).sortByKey('true').coalesce(1) 
result = result.map(lambda x:x[0]+','+str(x[1])) 
result = result.map(lambda x: x.lstrip('[').rstrip(']').replace(' ','')).saveAsTextFile("hdfs://master:9000/some path1)) 
+0

你能分享代码吗?否则很难理解到底发生了什么...... – mgaido

+0

整个解析逻辑是在Python中没有火花转换和行动我只用了我并行化的列表。 –

回答

1

对不起,这里这样的傻瓜问题。其实我发现了两个问题

1)多个工作运行时,

parallel_list = sc.parallelize(cc_list) 

创建4-5部分文件和parallel_list与部分00000到部分00004,并且在装载parallel_list保存在HDFS u能在代码上面看到

new_list = sc.textFile(pathto parallel_list/part-00000) ==> so it was taking only the first part. 

2),而在运行localMode,

parallel_list = sc.parallelize(cc_list) was creating only one part file so i was able to pick whole file at one stroke. 

所以,当工人我拿出两种解决方案

1)上运行的火花我刚添加的兼职*而从parallel_list

2创建new_list)通过增加spark.akka.frameSize 10000通过--configure spark.akka.frameSize = 1000与火花提交。