2017-06-15 33 views
0

我正在使用PySpark进行一些大规模处理并将一些结果保存到MongoDB实例。我正在使用mongo-spark-connector_2.11-2.0.0.jar将数据框写入MongoDB。如何处理PySpark中的MongoDB异常?

df.write.format("com.mongodb.spark.sql.DefaultSource").option("spark.mongodb.output.uri", "mongodb://username:[email protected]:10203/mydb.mycollection").mode("overwrite").save() 

有时候我例外,例如ConnectionExceptionMongoCommandException等。所以我想处理这些例外。所以我添加了这些异常处理代码片段,但是我得到了ImportError: No module named com.mongodb

try: 
    df.write.format("com.mongodb.spark.sql.DefaultSource").option("spark.mongodb.output.uri", "mongodb://username:[email protected]:10203/mydb.mycollection").mode("overwrite").save() 
except MongoCommandException: 
    err_code = MongoCommandException.getErrorCode() 
    if err_code == int(16): 
     print "Request size is too large to write to Mongo" 

所以能有人对你帮助我如何使用mongo-spark-connector_2.11-2.0.0.jar

回答

1

来处理PySpark例外由于其中PySpark是利用了Java jar执行堆栈,你使用的是什么/眼见实际上是Java图书馆。 这就是为什么你无法访问PySpark中的com.mongodb库。

你可以从py4j

from py4j.protocol import Py4JJavaError 

try: 
    df.write.format("com.mongodb.spark.sql.DefaultSource") 
      .option("spark.mongodb.output.uri", "mongodb://username:[email protected]:10203/mydb.mycollection") 
      .mode("overwrite").save() 
except Py4JJavaError, ex: 
    print(ex.java_exception.toString()) 
    # analyse error stack and handle as needed. 

但是做什么,捕捉异常见MongoDB Java MongoException class所有直接子类,以查看可用的例外处理。

+0

有趣。谢谢! –