2015-09-15 90 views
0

我应该将Phoenix数据读入pyspark。PySpark HBase/Phoenix集成

编辑: 我使用HBase的星火转换器:

下面的代码片段:

port="2181" 
host="zookeperserver" 
keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter" 
valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter" 
cmdata_conf = {"hbase.zookeeper.property.clientPort":port, "hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": "camel", "hbase.mapreduce.scan.columns": "data:a"} 
sc.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat","org.apache.hadoop.hbase.io.ImmutableBytesWritable","org.apache.hadoop.hbase.client.Result",keyConverter=keyConv,valueConverter=valueConv,conf=cmdata_conf) 

回溯:

Traceback (most recent call last): 
    File "<stdin>", line 1, in <module> 
    File "/usr/hdp/2.3.0.0-2557/spark/python/pyspark/context.py", line 547, in newAPIHadoopRDD 
    jconf, batchSize) 
    File "/usr/hdp/2.3.0.0-2557/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__ 
    File "/usr/hdp/2.3.0.0-2557/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value 
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD. 
: java.io.IOException: No table was provided. 
    at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:130) 

任何帮助将非常感激。

谢谢! /蒂娜

回答

0
+0

我尝试了第二种方式,但即时得到一个错误: Py4JJavaError:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD:同时呼吁ž发生错误。 :java.io.IOException:未提供表。 你在PYSPARK做过这些吗? – Ranic

+0

有你提供合适的配置,以星火newAPIHadoopRDD如下: sparkconf = { “hbase.zookeeper.quorum”:zookeeperhost, “hbase.mapreduce.inputtable”:sampletable, “hbase.mapreduce.scan.columns” :列} hbase_rdd = sc.newAPIHadoopRDD( “org.apache.hadoop.hbase.mapreduce.TableInputFormat”, “org.apache.hadoop.hbase.io.ImmutableBytesWritable”, “org.apache.hadoop.hbase.client .Result“, keyConverter = keyConv, valueConverter = valueConv, conf = sparkconf) –

+0

请尝试上面的方法,我认为你没有在配置中提供表名。还有t他的keyConv和valueConv的值分别为examples.pythonconverters.ImmutableBytesWritableToStringConverter和examples.pythonconverters.HBaseResultToStringConverter分别为 –

0

使用spark phoenix插件是推荐的方法。 关于凤凰火花插件请找细节here

环境:与AWS EMR 5.10,PySpark

测试,以下是凤凰https://phoenix.apache.org/language/ 打开凤凰壳

步骤

  1. 创建表“/usr/lib/phoenix/bin/sqlline.py”

    DROP TABLE IF EXISTS TableName;

    CREATE TABLE TableName(DOMAIN VARCHAR primary key);

    UPSERT INTO TableName(DOMAIN)VALUES('foo');

  2. 下载火花凤插件jar 下载火花凤凰插件从https://mvnrepository.com/artifact/org.apache.phoenix/phoenix-core/4.11.0-HBase-1.3 你需要凤凰罐子 - HBase的 - client.jar中,我用凤凰4.11.0-HBase的-1.3-client.jar中按我的凤凰和HBase的版本

  3. 从你的Hadoop主目录,设置以下变量:

    phoenix_jars = /家庭/用户/ Apache的凤凰4.11.0-HBase的-1.3斌/凤4.11.0- HBase-1.3-client.jar

  4. Start PySp赤贝并添加驱动程序和执行器的类路径依赖

    pyspark --jars $ {} phoenix_jars --conf spark.executor.extraClassPath = $ {} phoenix_jars

--create ZooKeeper的URL,替换与您的群集zookeeper法定人数,您可以从hbase站点检查。XML

emrMaster = "ZooKeeper URL" 

df = sqlContext.read \ 
.format("org.apache.phoenix.spark") \ 
.option("table", "TableName") \ 
.option("zkUrl", emrMaster) \ 
.load() 

df.show() 
df.columns 
df.printSchema() 
df1=df.replace(['foo'], ['foo1'], 'DOMAIN') 
df1.show() 

df1.write \ 
    .format("org.apache.phoenix.spark") \ 
    .mode("overwrite") \ 
    .option("table", "TableName") \ 
    .option("zkUrl", emrMaster) \ 
    .save()