2017-05-28 14 views
5

我创建pyspark结构化的流节目%spark.pyspark解释,并试图在齐柏林笔记本执行:权限被拒绝:用户=飞艇在使用AWS中EMR集群

%spark.pyspark 

query_window = windowedCounts \ 
       .writeStream \ 
       .outputMode("complete") \ 
       .format("memory") \ 
       .queryName("myTable_window") \ 
       .start() 

我收到以下错误:

Py4JJavaError: An error occurred while calling o191.start. 
: org.apache.hadoop.security.AccessControlException: Permission denied: user=zeppelin, access=WRITE, inode="/mnt/tmp/temporary-e0cf0f09-a6f4-44d6-9a72-324660085608/metadata":hdfs:hadoop:drwxr-xr-x 

我在AWS EMR集群中使用Zeppelin Notebook Version 0.7.1。

帮助将不胜感激。

完整堆栈跟踪:

Traceback (most recent call last): 
    File "/tmp/zeppelin_pyspark-8165971491474576109.py", line 349, in <module> 
    raise Exception(traceback.format_exc()) 
Exception: Traceback (most recent call last): 
    File "/tmp/zeppelin_pyspark-8165971491474576109.py", line 342, in <module> 
    exec(code) 
    File "<stdin>", line 5, in <module> 
    File "/usr/lib/spark/python/pyspark/sql/streaming.py", line 816, in start 
    return self._sq(self._jwrite.start()) 
    File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__ 
    answer, self.gateway_client, self.target_id, self.name) 
    File "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco 
    return f(*a, **kw) 
    File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value 
    format(target_id, ".", name), value) 
Py4JJavaError: An error occurred while calling o191.start. 
: org.apache.hadoop.security.AccessControlException: Permission denied: user=zeppelin, access=WRITE, inode="/mnt/tmp/temporary-e0cf0f09-a6f4-44d6-9a72-324660085608/metadata":hdfs:hadoop:drwxr-xr-x 
    at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:320) 
    at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:292) 
    at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:213) 
    at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:190) 
    at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1728) 
    at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1712) 
    at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkAncestorAccess(FSDirectory.java:1695) 
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2515) 
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2450) 
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2334) 
    at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:624) 
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:397) 
    at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) 
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616) 
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982) 
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049) 
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at javax.security.auth.Subject.doAs(Subject.java:422) 
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698) 
    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2045) 
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) 
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) 
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) 
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423) 
    at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106) 
    at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73) 
    at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1653) 
    at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1689) 
    at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1624) 
    at org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:448) 
    at org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:444) 
    at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) 
    at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:459) 
    at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:387) 
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:915) 
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:896) 
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:793) 
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:782) 
    at org.apache.spark.sql.execution.streaming.StreamMetadata$.write(StreamMetadata.scala:76) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$5.apply(StreamExecution.scala:102) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$5.apply(StreamExecution.scala:100) 
    at scala.Option.getOrElse(Option.scala:121) 
    at org.apache.spark.sql.execution.streaming.StreamExecution.<init>(StreamExecution.scala:100) 
    at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232) 
    at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:269) 
    at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:227) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) 
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) 
    at py4j.Gateway.invoke(Gateway.java:280) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:214) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): Permission denied: user=zeppelin, access=WRITE, inode="/mnt/tmp/temporary-e0cf0f09-a6f4-44d6-9a72-324660085608/metadata":hdfs:hadoop:drwxr-xr-x 
    at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:320) 
    at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:292) 
    at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:213) 
    at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:190) 
    at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1728) 
    at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1712) 
    at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkAncestorAccess(FSDirectory.java:1695) 
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2515) 
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2450) 
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2334) 
    at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:624) 
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:397) 
    at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) 
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616) 
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982) 
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049) 
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at javax.security.auth.Subject.doAs(Subject.java:422) 
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698) 
    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2045) 
    at org.apache.hadoop.ipc.Client.call(Client.java:1475) 
    at org.apache.hadoop.ipc.Client.call(Client.java:1412) 
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229) 
    at com.sun.proxy.$Proxy12.create(Unknown Source) 
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:296) 
    at sun.reflect.GeneratedMethodAccessor14.invoke(Unknown Source) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191) 
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) 
    at com.sun.proxy.$Proxy13.create(Unknown Source) 
    at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1648) 
    ... 30 more 
+0

代码片段:%spark.pyspark query_window = windowedCounts \ .writeStream \ .outputMode( “完全”)\ .format( “存储器”)\ .queryName( “myTable_window”)\ .start() –

+1

您是否为'zeppelin'用户的该目录添加了权限? – 1ambda

回答

0

看来该writeStream查询试图在该checkpointLocation写入云HDFS和飞艇与写作云HDFS的问题。

通过将检查点位置添加到S3上的某处解决了该问题。

query_window = windowedCounts \ 
      .writeStream \ 
      .outputMode("complete") \ 
      .format("memory") \ 
      .queryName("myTable_window") \ 
      .option("checkpointLocation","path_on_s3") \ 
      .start()