2017-04-17 110 views
2

我正在构建基于Apache Spark的推荐引擎。我可以从PostgreSQL加载数据,但是当我试图映射这些数据时,我得到一个值错误:PySpark值错误

这个工作成功。

df = sql_context.read.format('jdbc').options(
    url=db_url, 
    dbtable=db_table, driver="org.postgresql.Driver" 
).load() 

此行将模式打印到控制台。

df.printSchema() 

它输出“ınteger”而不是“整数”。我认为这是问题。

下面是模式的控制台输出:

root 
|-- id: ınteger (nullable = false) 
|-- user_id: ınteger (nullable = false) 
|-- star: ınteger (nullable = false) 
|-- product_id: ınteger (nullable = false) 

我试图得到具体列,但它提出了一个数值错误。

validation_for_predict_rdd = validation_rdd.map(
    lambda x: (x.user_id, x.product_id) 
) 

错误输出:

raise ValueError("Could not parse datatype: %s" % json_value) 
ValueError: Could not parse datatype: ınteger 

我试图定义一个自定义模式来解决。但是JDBC不允许使用自定义模式。

custom_schema = StructType([ 
    StructField("id", LongType(), False), 
    StructField("user_id", LongType(), False), 
    StructField("star", LongType(), False), 
    StructField("product_id", LongType(), False)]) 

df = sql_context.read.format('jdbc').options(
    url=db_url, 
    dbtable=db_table, driver="org.postgresql.Driver" 
).load(schema=custom_schema) 

错误输出:

raise AnalysisException(s.split(': ', 1)[1], stackTrace) 
pyspark.sql.utils.AnalysisException: 'jdbc does not allow user-specified schemas.;' 

什么是为 “整数” 值错误的解决方案?我可以更改数据库字段类型,但这不是一个合适的解决方案。

+1

您可以使用'pgsql'连接到数据库并执行'DESCRIBE TABLE'吗?我怀疑它可能与数据库或表的编码有关。 –

+0

CREATE DATABASE buyexpress_service_layer WITH OWNER = postgres的 ENCODING = 'UTF8' TABLESPACE = pg_default LC_COLLATE = 'Turkish_Turkey.1254' LC_CTYPE = 'Turkish_Turkey.1254' 连接限制= -1; –

+2

啊! 'LC_'正在搞乱你的Spark应用程序(!)在开始你的'pyspark'或'spark-submit'之前,你可以使用'LC_ALL = en_US.UTF-8'和'LANG = en_US.UTF-8'来改变它吗? –

回答

0

我改变了系统语言,但它没有工作。此外,我备份了我的数据库并创建了一个新数据库。根据Jacek的回答,我使用了不同的字符类型和排序规则。在这种情况下,新的创建声明应该是这样的:

CREATE DATABASE buyexpress_service_layer 
    WITH OWNER = postgres 
     TEMPLATE = template0 
     ENCODING = 'UTF8' 
     TABLESPACE = pg_default 
     LC_COLLATE = 'English_United States.1252' 
     LC_CTYPE = 'English_United States.1252' 
     CONNECTION LIMIT = -1; 

它现在正在工作。谢谢你所有的答案。

3

问题的根源在于JVM语言环境。在local模式下,Python解释器区域设置为('en_US', 'UTF-8'),可以重现一般的问题如下:

  • 创建一个会话:

    from pyspark import SparkContext 
    from pyspark.sql import SparkSession 
    from pyspark.sql.types import IntegerType 
    
    sc = SparkContext(master="local[*]") 
    spark = SparkSession(sc) 
    
  • 设置的地点,以tr_TR

    locale = sc._jvm.java.util.Locale 
    locale.setDefault(locale.forLanguageTag("tr-TR")) 
    
  • 尝试创建一个DataFrame

    spark.createDataFrame([1, 2, 3], IntegerType()) 
    
    Py4JJavaError: An error occurred while calling o25.applySchemaToPythonRDD. 
    : java.util.NoSuchElementException: key not found: integer 
        at scala.collection.MapLike$class.default(MapLike.scala:228) 
        at scala.collection.AbstractMap.default(Map.scala:59) 
    

    此外:

    from pyspark.sql.functions import col 
    
    spark.read.json(
        sc.parallelize(["""{"x": 1}""" 
    ])).selectExpr("CAST(x AS integer)") 
    
    ValueError        Traceback (most recent call last) 
    ... 
    ValueError: Could not parse datatype: ınteger 
    

最简单的解决方法是设置spark.driver.extraJavaOptionsset JVM localeen_US

spark.driver.extraJavaOptions -Duser.country=US -Duser.language=en 

SPARK_HOME/conf/spark-defaults.conf。对于spark.executor.extraJavaOptions也可以使用相同的设置。

您也可以尝试使用上述java.util.Locale.setDefault招,你执行任何其他代码之前:

locale.setDefault(locale.forLanguageTag("en-US")) 

,但我不认为这是一个可靠的解决方案。

您还可以更新到最近的母版,其中这个问题已经部分解决:

  • SPARK-20156 - Java的String toLowerCase“土耳其语言环境的错误”导致星火问题

更多关于问题来源:The infamous Turkish locale bug