2016-07-19 37 views
0

我在Swift(Bluemix Objectstore)中存储了一个文件,我试图从Bluemix中Spark上运行的Python应用程序访问它。SwiftConfigurationException:从Bluemix访问Swift时无效的主机名Spark

我可以成功访问运行在相同环境中的Python笔记本中的文件,但在提交的Python应用程序中执行访问失败,出现org.apache.hadoop.fs.swift.exceptions.SwiftConfigurationException: Invalid host name错误。

这里的Python代码:

# Added to the Python job but not part of the notebook 

from pyspark import SparkContext, SparkConf 
from pyspark.sql import SQLContext 
sc = SparkContext() 
sqlContext = SQLContext(sc) 

# Verified in the notebook 

from pyspark.sql import SQLContext 
from pyspark.sql.types import * 
from pyspark.sql import Row 
from pyspark.mllib.regression import LabeledPoint 
from pyspark.sql.functions import udf 
from pyspark.mllib.linalg import Vectors 
from pyspark.ml.classification import LogisticRegression 
from pyspark.ml.param import Param, Params 
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel 
from pyspark.mllib.regression import LabeledPoint 
from pyspark.mllib.stat import Statistics 
from pyspark.ml.feature import OneHotEncoder, StringIndexer 
from pyspark.mllib.linalg import Vectors 
from pyspark.ml.feature import VectorAssembler 
import sys 
import numpy as np 
import pandas as pd 
import time 
import datetime 

def set_hadoop_config(credentials): 
    """This function sets the Hadoop configuration with given credentials, 
    so it is possible to access data using SparkContext""" 

    prefix = "fs.swift.service." + credentials['name'] 
    hconf = sc._jsc.hadoopConfiguration() 
    hconf.set(prefix + ".auth.url", credentials['auth_url']+'/v3/auth/tokens') 
    hconf.set(prefix + ".auth.endpoint.prefix", "endpoints") 
    hconf.set(prefix + ".tenant", credentials['project_id']) 
    hconf.set(prefix + ".username", credentials['user_id']) 
    hconf.set(prefix + ".password", credentials['password']) 
    hconf.setInt(prefix + ".http.port", 8080) 
    hconf.set(prefix + ".region", credentials['region']) 
    hconf.setBoolean(prefix + ".public", True) 

credentials = { 
    'auth_url':'https://identity.open.softlayer.com', 
    'project':'object_storage_bcc6ba38_7399_4aed_a47c_e6bcdc959163', 
    'project_id':'f26ba12177c44e59adbe243b430b3bf5', 
    'region':'dallas', 
    'user_id':'bb973e5a84da4fce8c62d95f2e1e5d19', 
    'domain_id':'bd9453b2e5e2424388e25677cd26a7cf', 
    'domain_name':'1062145', 
    'username':'admin_a16bbb9d8d1d051ba505b6e7e76867f61c9d1ac1', 
    'password':"""...""", 
    'filename':'2001-2008-merged.csv', 
    'container':'notebooks', 
    'tenantId':'s090-be5845bf9646f1-3ef81b4dcb61' 
} 
credentials['name'] = 'FlightDelay_demo2' 
set_hadoop_config(credentials) 
textFile = sc.textFile("swift://" + credentials['container'] + "." + credentials['name'] + credentials['filename']) 

textFileRDD=textFile.map(lambda x: x.split(',')) 

# Line that throws the error: 
header = textFileRDD.first() 

我提交的Python应用程序,以星火如下:

./spark-submit.sh \ 
    --vcap ./vcap.json \ 
    --deploy-mode cluster \ 
    --master https://spark.bluemix.net \ 
    /resources/FlightDelay_demo2.py 

这里是我的vcap.json:

{ 
    "credentials": { 
    "tenant_id": "s090-be5845bf9646f1-3ef81b4dcb61", 
    "tenant_id_full": "2f61d7ef-955a-41f2-9090-be5845bf9646_dd570054-525b-4659-9af1-3ef81b4dcb61", 
    "cluster_master_url": "https://spark.bluemix.net", 
    "instance_id": "2f61d7ef-955a-41f2-9090-be5845bf9646", 
    "tenant_secret": "...", 
    "plan": "ibm.SparkService.PayGoPersonal" 
    } 
} 

这里的全错误:

Traceback (most recent call last): 
    File "/gpfs/fs01/user/s090-be5845bf9646f1-3ef81b4dcb61/data/workdir/spark-driver-b2f25278-f8b4-40e3-8d53-9e8a64228197/FlightDelay_demo2.py", line 94, in <module> 
    header = textFileRDD.first() 
    File "/usr/local/src/spark160master/spark-1.6.0-bin-2.6.0/python/lib/pyspark.zip/pyspark/rdd.py", line 1315, in first 
    File "/usr/local/src/spark160master/spark-1.6.0-bin-2.6.0/python/lib/pyspark.zip/pyspark/rdd.py", line 1267, in take 
    File "/usr/local/src/spark160master/spark-1.6.0-bin-2.6.0/python/lib/pyspark.zip/pyspark/rdd.py", line 2363, in getNumPartitions 
    File "/usr/local/src/spark160master/spark-1.6.0-bin-2.6.0/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__ 
    File "/usr/local/src/spark160master/spark-1.6.0-bin-2.6.0/python/lib/pyspark.zip/pyspark/sql/utils.py", line 45, in deco 
    File "/usr/local/src/spark160master/spark-1.6.0-bin-2.6.0/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in get_return_value 
py4j.protocol.Py4JJavaError: An error occurred while calling o33.partitions. 
: org.apache.hadoop.fs.swift.exceptions.SwiftConfigurationException: Invalid host name 
    at org.apache.hadoop.fs.swift.util.SwiftUtils.validSchema(SwiftUtils.java:222) 
    at org.apache.hadoop.fs.swift.http.SwiftRestClient.<init>(SwiftRestClient.java:510) 
    at org.apache.hadoop.fs.swift.http.SwiftRestClient.getInstance(SwiftRestClient.java:1914) 
    at org.apache.hadoop.fs.swift.snative.SwiftNativeFileSystemStore.initialize(SwiftNativeFileSystemStore.java:81) 
    at org.apache.hadoop.fs.swift.snative.SwiftNativeFileSystem.initialize(SwiftNativeFileSystem.java:129) 
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2596) 
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91) 
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630) 
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612) 
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370) 
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296) 
    at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:256) 
    at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228) 
    at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313) 
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) 
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) 
    at org.apache.spark.api.java.JavaRDDLike$class.partitions(JavaRDDLike.scala:64) 
    at org.apache.spark.api.java.AbstractJavaRDDLike.partitions(JavaRDDLike.scala:46) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:95) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55) 
    at java.lang.reflect.Method.invoke(Method.java:507) 
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) 
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381) 
    at py4j.Gateway.invoke(Gateway.java:259) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:209) 
    at java.lang.Thread.run(Thread.java:785) 

我想这可能是相关的,所以这里就是星火配置(sorted(sc._conf.getAll()))看起来像Python应用程序中:

[(u'spark.app.name', u'/resources/FlightDelay_demo2.py'), 
(u'spark.driver.extraClassPath', u'/gpfs/fs01/user/s090-be5845bf9646f1-3ef81b4dcb61/data/libs/*:'), 
(u'spark.driver.extraLibraryPath', u'/gpfs/fs01/user/s090-be5845bf9646f1-3ef81b4dcb61/data/libs/*:'), 
(u'spark.eventLog.dir', u'/gpfs/fs01/user/s090-be5845bf9646f1-3ef81b4dcb61/events'), 
(u'spark.eventLog.enabled', u'true'), 
(u'spark.executor.extraClassPath', u'/gpfs/fs01/user/s090-be5845bf9646f1-3ef81b4dcb61/data/libs/*:'), 
(u'spark.executor.extraLibraryPath', u'/gpfs/fs01/user/s090-be5845bf9646f1-3ef81b4dcb61/data/libs/*:'), 
(u'spark.executor.instances', u'2'), 
(u'spark.executor.memory', u'1024m'), 
(u'spark.files', u'/gpfs/fs01/user/s090-be5845bf9646f1-3ef81b4dcb61/data/128249272b758216b946308d5f6ea43ca033a85a/FlightDelay_demo2.py'), 
(u'spark.files.useFetchCache', u'false'), 
(u'spark.master', u'spark://yp-spark-dal09-env5-0020:7083'), 
(u'spark.rdd.compress', u'True'), 
(u'spark.serializer.objectStreamReset', u'100'), 
(u'spark.service.hashed_tenant_id', u'25vT74pNDbLRr98mnGe81k9FmiS9NRiyELS04g=='), 
(u'spark.service.plan_name', u'ibm.SparkService.PayGoPersonal'), 
(u'spark.service.spark_version', u'1.6.0'), 
(u'spark.shuffle.service.port', u'7340'), 
(u'spark.submit.pyFiles', u'null'), 
(u'spark.ui.port', u'0')] 

回答

0

有两个问题,均与本迅速网址:

  1. credentials['name']不能包含在下划线等主机名中非法的字符。删除下划线如下:

    credentials['name'] = 'FlightDelayDemo2' 
    
  2. 有主机名和文件名之间缺少斜杠。它需要添加:

    textFile = sc.textFile("swift://" + credentials['container'] + "." + credentials['name'] + "/" + credentials['filename']) 
    

由于我回答我的问题,我想提一提的是,虽然调试类似的东西,你可能会看到另一个令人费解的错误是:

pyspark.sql.utils.IllegalArgumentException: u'Host name may not be null'