2016-02-28 43 views
1

我写了下面MyPythonGateway.java,这样我可以从Python中叫我的定制Java类:pyspark:从pyspark调用一个自定义的java函数。我需要Java_Gateway吗?

public class MyPythonGateway { 

    public String findMyNum(String input) { 
     return MyUtiltity.parse(input).getMyNum(); 
    } 

    public static void main(String[] args) { 
     GatewayServer server = new GatewayServer(new MyPythonGateway()); 
     server.start(); 
    } 
} 

,这里是我如何在我的Python代码使用它:

def main(): 

    gateway = JavaGateway()     # connect to the JVM 
    myObj = gateway.entry_point.findMyNum("1234 GOOD DAY") 
    print(myObj) 


if __name__ == '__main__': 
    main() 

现在我想使用PySpark中的MyPythonGateway.findMyNum()函数,而不仅仅是一个独立的python脚本。我做了以下内容:

myNum = sparkcontext._jvm.myPackage.MyPythonGateway.findMyNum("1234 GOOD DAY") 
print(myNum) 

但是,我得到了以下错误:

... line 43, in main: 
myNum = sparkcontext._jvm.myPackage.MyPythonGateway.findMyNum("1234 GOOD DAY") 
    File "/home/edamameQ/spark-1.5.2/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 726, in __getattr__ 
py4j.protocol.Py4JError: Trying to call a package. 

所以我怎么错过这里?我不知道是否应该在使用pyspark时运行单独的JavaApplication of MyPythonGateway来启动网关服务器。请指教。谢谢!


下面是正是我需要的:

input.map(f) 

def f(row): 
    // call MyUtility.java 
    // x = MyUtility.parse(row).getMyNum() 
    // return x 

什么是接近这一目标的最佳方式是什么?谢谢!

回答

2

首先,您看到的错误通常意味着您尝试使用的类无法访问。所以很可能这是一个CLASSPATH问题。

关于总体思路有两个重要的问题:

  • 你不能访问SparkContext所以使用PySpark网关将无法正常工作的行为或改造内(见How to use Java/Scala function from an action or a transformation?的一些细节))。如果你想从工人使用Py4J,你必须在每个工人机器上启动一个单独的网关。
  • 你真的不想在Python和JVM之间传递数据。 Py4J不适用于数据密集型任务。
+0

谢谢!基本上,MyUtitlity.java有点复杂,我们真的不想在python中重新编写它。有没有办法从pyspark工作中调用MyUtility.java?如果还有其他选择,我们不一定需要使用Py4J ... – Edamame

+0

嗯,很多取决于您的体系结构和代码。可能最简单和相对高效的解决方案是将数据“管道化”到Java代码并读取输出。或者,你可以通过磁盘传递数据(这基本上是PySpark驱动程序用来处理事物的方式,尽管我认为它不再是这种情况,或者也许是这样)。最复杂的解决方案是具有处理请求的持久性(或临时性,例如执行程序的生命周期期间)Java进程。 – zero323

+0

如何在驾驶员和工人身上正确地注册罐子?然后让Python包装器能够在驱动程序上正确调用jar包? –