我在Swift(Bluemix Objectstore)中存储了一个文件,我试图从Bluemix中Spark上运行的Python应用程序访问它。SwiftConfigurationException:从Bluemix访问Swift时无效的主机名Spark
我可以成功访问运行在相同环境中的Python笔记本中的文件,但在提交的Python应用程序中执行访问失败,出现org.apache.hadoop.fs.swift.exceptions.SwiftConfigurationException: Invalid host name
错误。
这里的Python代码:
# Added to the Python job but not part of the notebook
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
sc = SparkContext()
sqlContext = SQLContext(sc)
# Verified in the notebook
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import Row
from pyspark.mllib.regression import LabeledPoint
from pyspark.sql.functions import udf
from pyspark.mllib.linalg import Vectors
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.param import Param, Params
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.stat import Statistics
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
import sys
import numpy as np
import pandas as pd
import time
import datetime
def set_hadoop_config(credentials):
"""This function sets the Hadoop configuration with given credentials,
so it is possible to access data using SparkContext"""
prefix = "fs.swift.service." + credentials['name']
hconf = sc._jsc.hadoopConfiguration()
hconf.set(prefix + ".auth.url", credentials['auth_url']+'/v3/auth/tokens')
hconf.set(prefix + ".auth.endpoint.prefix", "endpoints")
hconf.set(prefix + ".tenant", credentials['project_id'])
hconf.set(prefix + ".username", credentials['user_id'])
hconf.set(prefix + ".password", credentials['password'])
hconf.setInt(prefix + ".http.port", 8080)
hconf.set(prefix + ".region", credentials['region'])
hconf.setBoolean(prefix + ".public", True)
credentials = {
'auth_url':'https://identity.open.softlayer.com',
'project':'object_storage_bcc6ba38_7399_4aed_a47c_e6bcdc959163',
'project_id':'f26ba12177c44e59adbe243b430b3bf5',
'region':'dallas',
'user_id':'bb973e5a84da4fce8c62d95f2e1e5d19',
'domain_id':'bd9453b2e5e2424388e25677cd26a7cf',
'domain_name':'1062145',
'username':'admin_a16bbb9d8d1d051ba505b6e7e76867f61c9d1ac1',
'password':"""...""",
'filename':'2001-2008-merged.csv',
'container':'notebooks',
'tenantId':'s090-be5845bf9646f1-3ef81b4dcb61'
}
credentials['name'] = 'FlightDelay_demo2'
set_hadoop_config(credentials)
textFile = sc.textFile("swift://" + credentials['container'] + "." + credentials['name'] + credentials['filename'])
textFileRDD=textFile.map(lambda x: x.split(','))
# Line that throws the error:
header = textFileRDD.first()
我提交的Python应用程序,以星火如下:
./spark-submit.sh \
--vcap ./vcap.json \
--deploy-mode cluster \
--master https://spark.bluemix.net \
/resources/FlightDelay_demo2.py
这里是我的vcap.json:
{
"credentials": {
"tenant_id": "s090-be5845bf9646f1-3ef81b4dcb61",
"tenant_id_full": "2f61d7ef-955a-41f2-9090-be5845bf9646_dd570054-525b-4659-9af1-3ef81b4dcb61",
"cluster_master_url": "https://spark.bluemix.net",
"instance_id": "2f61d7ef-955a-41f2-9090-be5845bf9646",
"tenant_secret": "...",
"plan": "ibm.SparkService.PayGoPersonal"
}
}
这里的全错误:
Traceback (most recent call last):
File "/gpfs/fs01/user/s090-be5845bf9646f1-3ef81b4dcb61/data/workdir/spark-driver-b2f25278-f8b4-40e3-8d53-9e8a64228197/FlightDelay_demo2.py", line 94, in <module>
header = textFileRDD.first()
File "/usr/local/src/spark160master/spark-1.6.0-bin-2.6.0/python/lib/pyspark.zip/pyspark/rdd.py", line 1315, in first
File "/usr/local/src/spark160master/spark-1.6.0-bin-2.6.0/python/lib/pyspark.zip/pyspark/rdd.py", line 1267, in take
File "/usr/local/src/spark160master/spark-1.6.0-bin-2.6.0/python/lib/pyspark.zip/pyspark/rdd.py", line 2363, in getNumPartitions
File "/usr/local/src/spark160master/spark-1.6.0-bin-2.6.0/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__
File "/usr/local/src/spark160master/spark-1.6.0-bin-2.6.0/python/lib/pyspark.zip/pyspark/sql/utils.py", line 45, in deco
File "/usr/local/src/spark160master/spark-1.6.0-bin-2.6.0/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o33.partitions.
: org.apache.hadoop.fs.swift.exceptions.SwiftConfigurationException: Invalid host name
at org.apache.hadoop.fs.swift.util.SwiftUtils.validSchema(SwiftUtils.java:222)
at org.apache.hadoop.fs.swift.http.SwiftRestClient.<init>(SwiftRestClient.java:510)
at org.apache.hadoop.fs.swift.http.SwiftRestClient.getInstance(SwiftRestClient.java:1914)
at org.apache.hadoop.fs.swift.snative.SwiftNativeFileSystemStore.initialize(SwiftNativeFileSystemStore.java:81)
at org.apache.hadoop.fs.swift.snative.SwiftNativeFileSystem.initialize(SwiftNativeFileSystem.java:129)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2596)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:256)
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.api.java.JavaRDDLike$class.partitions(JavaRDDLike.scala:64)
at org.apache.spark.api.java.AbstractJavaRDDLike.partitions(JavaRDDLike.scala:46)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:95)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55)
at java.lang.reflect.Method.invoke(Method.java:507)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:785)
我想这可能是相关的,所以这里就是星火配置(sorted(sc._conf.getAll())
)看起来像Python应用程序中:
[(u'spark.app.name', u'/resources/FlightDelay_demo2.py'),
(u'spark.driver.extraClassPath', u'/gpfs/fs01/user/s090-be5845bf9646f1-3ef81b4dcb61/data/libs/*:'),
(u'spark.driver.extraLibraryPath', u'/gpfs/fs01/user/s090-be5845bf9646f1-3ef81b4dcb61/data/libs/*:'),
(u'spark.eventLog.dir', u'/gpfs/fs01/user/s090-be5845bf9646f1-3ef81b4dcb61/events'),
(u'spark.eventLog.enabled', u'true'),
(u'spark.executor.extraClassPath', u'/gpfs/fs01/user/s090-be5845bf9646f1-3ef81b4dcb61/data/libs/*:'),
(u'spark.executor.extraLibraryPath', u'/gpfs/fs01/user/s090-be5845bf9646f1-3ef81b4dcb61/data/libs/*:'),
(u'spark.executor.instances', u'2'),
(u'spark.executor.memory', u'1024m'),
(u'spark.files', u'/gpfs/fs01/user/s090-be5845bf9646f1-3ef81b4dcb61/data/128249272b758216b946308d5f6ea43ca033a85a/FlightDelay_demo2.py'),
(u'spark.files.useFetchCache', u'false'),
(u'spark.master', u'spark://yp-spark-dal09-env5-0020:7083'),
(u'spark.rdd.compress', u'True'),
(u'spark.serializer.objectStreamReset', u'100'),
(u'spark.service.hashed_tenant_id', u'25vT74pNDbLRr98mnGe81k9FmiS9NRiyELS04g=='),
(u'spark.service.plan_name', u'ibm.SparkService.PayGoPersonal'),
(u'spark.service.spark_version', u'1.6.0'),
(u'spark.shuffle.service.port', u'7340'),
(u'spark.submit.pyFiles', u'null'),
(u'spark.ui.port', u'0')]