2017-08-30 67 views
1
得到错误的Pyspark通信

我刚使用从远程PC通过RPC.For测试目的pyspark通信的RabbitMQ我已经开发了是给我的错误 enter image description here通过RPC中的RabbitMQ

我跟了一个测试代码超过pyspark

这里实现RPC RabbitMQ的文档教程是我的火花RPC服务器代码

import pika 
from tkinter import* 
from pyspark.sql import SparkSession 
from pyspark import SparkConf,SparkContext 
import json 
import re 



connectionparam=pika.ConnectionParameters(host="localhost") 
connection=pika.BlockingConnection(connectionparam) 

channel=connection.channel() 

channel.queue_declare(queue='rpc_queue') 







spark=SparkSession.builder.config("spark.sql.warehouse.dir", "C:\spark\spark-warehouse")\ 
    \ 
    .appName("TestApp").\ 
    enableHiveSupport().getOrCreate() 

print("success") 
#establishhing chraracter 
#sqlstring="SELECT lflow1.LeaseType as LeaseType, lflow1.Status as Status, lflow1.Property as property, lflow1.City as City, lesflow2.DealType as DealType, lesflow2.Area as Area, lflow1.Did as DID, lesflow2.MID as MID from lflow1, lesflow2 WHERE lflow1.Did = lesflow2.MID" 



def queryBuilder(sqlval): 
    print("printing",sqlval) 
    df=spark.sql(sqlval) 
    print("printing data frame table") 
    df.show() 

    resultlist = df.toJSON().collect() 
    dumpdata = re.sub(r"\'", "", str(resultlist)) 
    jsondata = json.dumps(dumpdata) 
    #print(jsondata) 
    return jsondata 


def on_request(ch,method,props, body): 
    n=body 
    print("printing request body ",n) 
    response=queryBuilder(n) 
    ch.basic_publish(exchange='', 
        routing_key=props.reply_to, 
        properties=pika.BasicProperties(correlation_id=props.correlation_id), 
        body=response 
        ) 
    ch.basic_ack(delivery_tag=method.delivery_tag) 


channel.basic_qos(prefetch_count=1) 
channel.basic_consume(on_request,queue='rpc_queue') 
print("[x] Awaiting RPC Request") 

channel.start_consuming() 

master=Tk() 
entryval=Entry(master) 
entryval.grid(row=0,column=1) 
Button(master,text='Quit',command=master.quit).grid(row=3,column=1,sticky=W,pady=50) 
mainloop() 

和远程pyspark applicati我下面的RPC客户端代码上是

import pika 
import uuid 

class SparkRpcClient(object): 
    def __init__(self): 
     self.connection = pika.BlockingConnection(pika.ConnectionParameters(
       host='localhost')) 

     self.channel = self.connection.channel() 

     result = self.channel.queue_declare(exclusive=True) 
     self.callback_queue = result.method.queue 

     self.channel.basic_consume(self.on_response, no_ack=True, 
            queue=self.callback_queue) 

    def on_response(self, ch, method, props, body): 
     if self.corr_id == props.correlation_id: 
      self.response = body 

    def call(self, querymsg): 
     self.response = None 
     self.corr_id = str(uuid.uuid4()) 
     self.channel.basic_publish(exchange='', 
            routing_key='rpc_queue', 
            properties=pika.BasicProperties(
             reply_to = self.callback_queue, 
             correlation_id = self.corr_id, 
             ), 
            body=querymsg) 
     while self.response is None: 
      self.connection.process_data_events() 
     return int(self.response) 

sparkrpc = SparkRpcClient() 
sqlstring="SELECT lflow1.LeaseType as LeaseType, lflow1.Status as Status, lflow1.Property as property, lflow1.City as City, lesflow2.DealType as DealType, lesflow2.Area as Area, lflow1.Did as DID, lesflow2.MID as MID from lflow1, lesflow2 WHERE lflow1.Did = lesflow2.MID" 


print(" [x] Requesting query") 
response = sparkrpc.call(sqlstring) 
print(" [.] Got %s" % response) 

我的服务器已经从客户端接收到的请求字符串,并打印出来,但它不能在我的作品querybuild()函数处理的SqlString并返回JSON数据。更多的我已经请求了多次,它似乎是个人请求排队在rpc队列,但没有清除。因为如果我只运行服务器脚本我得到相同的错误。可能是我在这里失去了一些东西,任何人都可以帮我弄清楚。我只是想提前 格利扬

回答

1

你传递不兼容的类型(貌似无论bytesbytearray),其中str预计JSON数据返回给客户端 感谢。

您应该首先将decode的内容串起来。

def queryBuilder(sqlval, enc): 
    ... 
    df = spark.sql(sqlval.decode(enc)) 
    df.show() 
    ... 
+0

谢谢你,我刚刚解码字节流 – Kalyan