一、问题描述

from pyspark.sql.types import StringType

@udf(returnType = StringType())
def bad_funify(s):
    return s + " is fun!"

countries2 = spark.createDataFrame([("Thailand", 3), (None, 4)], ["country", "id"])
countries2.withColumn("fun_country", bad_funify("country")).show()

用一个udf想让df(有country和id两个字段)生成新的一列fun_country(内容是字符串,内容为country xx is fun),但是df中有的country字段内容没有数据(注意类型是None而不是null),结果报错如下:

PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/usr/lib/spark-current/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main
    process()
  File "/usr/lib/spark-current/python/lib/pyspark.zip/pyspark/worker.py", line 611, in process
    serializer.dump_stream(out_iter, outfile)
  File "/usr/lib/spark-current/python/lib/pyspark.zip/pyspark/serializers.py", line 211, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/usr/lib/spark-current/python/lib/pyspark.zip/pyspark/serializers.py", line 132, in dump_stream
    for obj in iterator:
  File "/usr/lib/spark-current/python/lib/pyspark.zip/pyspark/serializers.py", line 200, in _batched
    for item in iterator:
  File "/usr/lib/spark-current/python/lib/pyspark.zip/pyspark/worker.py", line 452, in mapper
    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
  File "/usr/lib/spark-current/python/lib/pyspark.zip/pyspark/worker.py", line 452, in <genexpr>
    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
  File "/usr/lib/spark-current/python/lib/pyspark.zip/pyspark/worker.py", line 87, in <lambda>
    return lambda *a: f(*a)
  File "/usr/lib/spark-current/python/lib/pyspark.zip/pyspark/util.py", line 74, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-1051-5a6c51e7c332>", line 5, in bad_funify
TypeError: unsupported operand type(s) for +: 'NoneType' and 'str'

二、解决方案

这是个很蠢的问题。其实如果country为空值时,fun_country应该也是空的,所以就简单加多个判断的逻辑即可。修改udf为good_funity后:

@udf(returnType=StringType())
def good_funify(s):
     return None if s == None else s + " is fun!"
countries2.withColumn("fun_country", good_funify("country")).show()

+--------+---+----------------+
| country| id|     fun_country|
+--------+---+----------------+
|Thailand|  3|Thailand is fun!|
|    null|  4|            null|
+--------+---+----------------+

Reference

[1] Navigating None and null in PySpark

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐