2016-11-24 14 views
3

我正在读取大数据集形式的hdfs位置并将我的数据框保存到红移中。无法将数据帧保存在红移中

df.write 
    .format("com.databricks.spark.redshift") 
    .option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass") 
    .option("dbtable", "my_table_copy") 
    .option("tempdir", "s3n://path/for/temp/data") 
    .mode("error") 
    .save() 

一段时间后,我收到以下错误

s3.amazonaws.com:443 failed to respond 
at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:143) 
at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:57) 
at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:261) 
at org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(AbstractHttpClientConnection.java:283) 
at org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(DefaultClientConnection.java:251) 
at org.apache.http.impl.conn.AbstractClientConnAdapter.receiveResponseHeader(AbstractClientConnAdapter.java:223) 
at org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:272) 
at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:124) 
at org.apache.http.impl.client.DefaultRequestDirector.tryExecute(DefaultRequestDirector.java:685) 
at org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:487) 
at org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:882) 
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82) 
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:55) 
at org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:334) 
at org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:281) 
at org.jets3t.service.impl.rest.httpclient.RestStorageService.performRestPut(RestStorageService.java:1043) 
at org.jets3t.service.impl.rest.httpclient.RestStorageService.copyObjectImpl(RestStorageService.java:2029) 
at org.jets3t.service.StorageService.copyObject(StorageService.java:871) 
at org.jets3t.service.StorageService.copyObject(StorageService.java:916) 
at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.copy(Jets3tNativeFileSystemStore.java:323) 
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) 
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) 
at org.apache.hadoop.fs.s3native.NativeS3FileSystem.rename(NativeS3FileSystem.java:707) 
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:370) 
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:384) 
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:326) 
at org.apache.spark.sql.execution.datasources.BaseWriterContainer.commitJob(WriterContainer.scala:230) 
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala: 

我发现github上

s3.amazonaws.com:443 failed to respond

难道我做错了同样的问题? help me plz

+0

是您的群集设置上aws emr? –

回答

1

我在使用AWS EMR的情况下遇到了同样的问题。

Redshift databricks library使用Amazon S3高效地将数据传入和传出RedshiftSpark.This库首先在Amazon S3中写入数据,并使用EMRFS将此数据加载到Redshift中。

你必须配置你的EMRFS设置,它会工作。

EMR文件系统(EMRFS)和Hadoop分布式文件系统 (HDFS)都安装在您的EMR群集上。 EMRFS是实现HDFS的 ,允许EMR集群在 Amazon S3上存储数据。

EMRFS将尝试为特定数量的重试(emrfs-retry-logic)验证在其元数据 中跟踪的对象的列表一致性。缺省值为5.在 超过重试次数的情况下,原始作业 返回失败。为了克服这个问题,我们可以覆盖在下面的步骤你 默认emrfs配置:

第一步:登录您的EMR主实例

第二步:添加以下属性到/ usr /共享/ AWS/EMR/emrfs/CONF/emrfs-site.xml中

须藤VI /usr/share/aws/emr/emrfs/conf/emrfs-site.xml fs.s3.consistent.throwExceptionOnInconsistency 假

<property> 
    <name>fs.s3.consistent.retryPolicyType</name> 
    <value>fixed</value> 
</property> 
<property> 
    <name>fs.s3.consistent.retryPeriodSeconds</name> 
    <value>10</value> 
</property> 
<property> 
    <name>fs.s3.consistent</name> 
    <value>false</value> 
</property> 

,并重新启动EMR集群

,还可以配置你的hadoopConfiguration hadoopConf.set( “fs.s3a.attempts.maximum”, “30”)

val hadoopConf = SparkDriver.getContext.hadoopConfiguration 
hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem") 
hadoopConf.set("fs.s3a.attempts.maximum", "30") 
hadoopConf.set("fs.s3n.awsAccessKeyId", awsAccessKeyId) 
hadoopConf.set("fs.s3n.awsSecretAccessKey", awsSecretAccessKey) 
+0

thx它为我工作 –