一、通过列表创建

1. 元组列表

列表中指定多个行,每行数据用一个元组形式表示,同时用一个列表表示每一列的列名。

value = [('Alice', 18), ('Bob', 19)]
df = spark.createDataFrame(value, ['name', 'age'])
df.show()

结果如下:

+-----+---+
| name|age|
+-----+---+
|Alice| 18|
|  Bob| 19|
+-----+---+

2. 字典列表

或者采用字典的形式分别表示每行数据,每一行是一个字典数据,用key表示列名,value表示具体的值,如下所示:

value = [{'name': 'Alice', 'age': 18}, {'name': 'Bob', 'age': 19}]
df = spark.createDataFrame(value)
df.show()

结果如下:

+---+-----+
|age| name|
+---+-----+
| 18|Alice|
| 19|  Bob|
+---+-----+

二、通过pandas创建

1. 不指定schema

先通过pandas构建一个dataframe(具体可参考pandas的dataframe),然后再通过这个pandas的dataframe构建spark的dataframe,如下所示:

import pandas as pd

df_pd = pd.DataFrame([('Alice', 18), ('Bob', 19)])
df = spark.createDataFrame(df_pd)
df.show()

结果如下,未指定列信息schema的情况下,会自动为每个列指定名称为数字,从0开始。

+-----+---+
|    0|  1|
+-----+---+
|Alice| 18|
|  Bob| 19|
+-----+---+

2. 指定schema

而如果我们指定好列的schema信息,则可以构建指定列名的dataframe,如下所示:

import pandas as pd
from pyspark.sql.types import *

df_pd = pd.DataFrame([('Alice', 18), ('Bob', 19)])
schema = StructType([StructField("name", StringType(), True), StructField("age", IntegerType(), True)])
df = spark.createDataFrame(df_pd, schema)
df.show()

结果如下所示:

+-----+---+
| name|age|
+-----+---+
|Alice| 18|
|  Bob| 19|
+-----+---+

或者也可以用如下的简写形式:

import pandas as pd
from pyspark.sql.types import *

df_pd = pd.DataFrame([('Alice', 18), ('Bob', 19)])
schema = "name: string, age: int"
df = spark.createDataFrame(df_pd, schema)
df.show()

得到的结果与上面的相同。

三、通过rdd创建

1. 不指定schema

先构建一个rdd,然后再通过rdd创建dataframe。

value = [('Alice', 18), ('Bob', 19)]
rdd = spark.sparkContext.parallelize(value)
df = spark.createDataFrame(rdd)
df.show()

结果如下所示,在不指定列名的情况下,默认列名为下划线加数字,从1开始:

+-----+---+
|   _1| _2|
+-----+---+
|Alice| 18|
|  Bob| 19|
+-----+---+

2. 指定schema

如果想要指定列结构schema信息,可以采用如下形式:

value = [('Alice', 18), ('Bob', 19)]
rdd = spark.sparkContext.parallelize(value)
schema = "name: string, age: int"
df = spark.createDataFrame(rdd, schema)
df.show()

schema信息也可以采用以下方式定义:

schema = StructType([
   StructField("name", StringType(), True),
   StructField("age", IntegerType(), True)])

结果如下:

+-----+---+
| name|age|
+-----+---+
|Alice| 18|
|  Bob| 19|
+-----+---+

3. 将rdd映射为Row格式

也可以采用如下形式指定schema信息:

from pyspark.sql import Row

value = [('Alice', 18), ('Bob', 19)]
rdd = spark.sparkContext.parallelize(value)
Person = Row('name', 'age')
person = rdd.map(lambda r: Person(*r))
df = spark.createDataFrame(person)
df.show()

结果如下:

+-----+---+
| name|age|
+-----+---+
|Alice| 18|
|  Bob| 19|
+-----+---+

要理解上述采用Row对象的方式指定schema,我们可以先看看如下代码:

print(Row)
print(Row('name', 'age'))
print(Row('name', 'age')('Alice', 18))

结果如下,我们会发现Row('name', 'age')其实是创建了一个Row('name', 'age')类,这个类指定了每一列的列名。然后我们用这个类可以创建具体的对象。

<class 'pyspark.sql.types.Row'>
<Row('name', 'age')>
Row(name='Alice', age=18)

那么回到前面的代码,我们的Person是一个Row类型对象,指定了每一列的列名信息。而person则对rdd中的每一个元素进行映射,将原先的tuple类型转为了Row类型,我们不妨打印这几个看看:

print(Person)
print(rdd.collect())
print(person.collect())

结果如下:

<Row('name', 'age')>
[('Alice', 18), ('Bob', 19)]
[Row(name='Alice', age=18), Row(name='Bob', age=19)]

4. 提前指定rdd为Row格式

以下这种方式与上面一种方式很接近,只不过提前将rdd中元素的结构信息定义好:

from pyspark.sql import Row

row = Row("name", "age")
sc = spark.sparkContext
rdd = sc.parallelize([row('Alice', 18), row('Bob', 19)])
df = rdd.toDF()
df.show()

结果如下:

+-----+---+
| name|age|
+-----+---+
|Alice| 18|
|  Bob| 19|
+-----+---+

5. rdd转dataframe

除了上面的几种方法,还有一种比较简单的方法,先创建一个rdd,然后将rdd转为dataframe,不过需要给rdd添加一下schema信息,如下:

value = [('Alice', 18), ('Bob', 19)]
rdd = spark.sparkContext.parallelize(value)
schema = "name: string, age: int"
df = rdd.toDF(schema)
df.show()

结果前面的相同:

+-----+---+
| name|age|
+-----+---+
|Alice| 18|
|  Bob| 19|
+-----+---+

四、创建空dataframe

除了创建有数据的dataframe,在实际项目过程中,难免会碰到一些情况:初始化的时候需要创建一个空的dataframe,有以下两种方式创建。

1. 依据指定schema创建

这个方式其实是通过一个空的rdd来创建dataframe,必须要指定schema内容,否则会报错:

schema = "name: string, age: int"
df = spark.createDataFrame(spark.sparkContext.emptyRDD(), schema)
df.show()
df.printSchema()

结果如下所示,我们可以看到,用字符串的形式指定的schema信息,默认都是nullable的:

+----+---+
|name|age|
+----+---+
+----+---+

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)

而如果我们想要手动指定列不为nullable的,则可以用以下方式指定schema:

from pyspark.sql.types import *
schema = StructType([
        StructField("name", StringType(), False),
        StructField("age", IntegerType(), False)])
df = spark.createDataFrame(spark.sparkContext.emptyRDD(), schema)
df.show()
df.printSchema()

结果如下所示:

+----+---+
|name|age|
+----+---+
+----+---+

root
 |-- name: string (nullable = false)
 |-- age: integer (nullable = false)

2. 依据已有dataframe创建

这个方式其实本质上与上一种方式没有什么不同,只是可以通过已有的dataframe获取schema信息:

value = [('Alice', 18), ('Bob', 19)]
df = spark.createDataFrame(value, ['name', 'age'])
df2 = spark.createDataFrame(spark.sparkContext.emptyRDD(), df.schema)
print(df.schema)
df2.show()

结果如下:

StructType(List(StructField(name,StringType,true),StructField(age,LongType,true)))
+----+---+
|name|age|
+----+---+
+----+---+

df.schema的结果我们可以看到,这个schema信息与我们手动创建的shcema结构其实是一致的,而这样可以省去自己手动创建的麻烦。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐