1、pyspark 读取与保存
参考:http://www.manongjc.com/detail/15-vfxldlrjpphxldk.html
a、
dt1 = spark.read.parquet(r'/home/Felix/pycharm_projects/test/testfile.parquet')
print(dt1.show())
另:读取有分区partition数据;要想读取后保留暂时某字段,则路径写到到前一个目录,比如要结果展示is_night字段,读取路径写到day目录就行
spark.read.parquet(r'/o****0/second_bucket=3/year=2021/month=5/day=1/')
df = df.withColumn('n_datetime', F.to_timestamp(F.col('c_datetime')))
#df = df.withColumn('n_datetime', F.date_format(df.s_time,'yyyy-MM-dd'))
df = df.withColumn("bucket", F.substring(F.col("n_dnum"), -1, 1))
df = df.withColumn("bucket", F.substring(F.col("n_dnum"), -1, 1))
df = df.withColumn("hour", F.hour(F.col("n_datetime")))
df = df.withColumn("is_night", ((F.col("hour")>=18) | (F.col("hour")<=6)))
df = df.withColumn("year", F.year(F.col("n_datetime")))
df = df.withColumn("month", F.month(F.col("n_datetime")))
df = df.withColumn("day", F.dayofmonth(F.col("n_datetime")))
print(df.show())
df(target_date.year,target_date.month,target_date.day,None,None).write.mode("append").partitionBy('bucket','year', 'month',"day","is_night").saveAsTable("g表名ew")
b、
保存的时候主要文件夹权限,不然报错ERROR FileOutputCommitter: Mkdirs failed to create file:
修改文件夹权限先
sudo chmod -R 777 /var/home/**/cc
mode有overwrite(覆盖),append(添加)可选
file_path_2 = r'/home/Felix/pycharm_projects/test/testfile.parquet'
df.write.parquet(path=file_path_2, mode='overwrite')
## 两种方式
df1.write.mode("append").partitionBy(["year","month","day"]).parquet("/data/loong/aid_score")
df.write.mode("append").partitionBy(["year","month","day"]).saveAsTable("%s.%s" %(database,table))
# 保存csv文件
df_spark.repartition(1).write.csv("./res", encoding="utf-8", header=True)
### repartition(1)是将所有分区文件合并成一个,不加这个选项会生成很多个文件。
c、通过sql读取hive表,sql语句区分大小写
***sql括号里为是全量语句用双引号和三引号包括起来
spark.sql(""" select * from XXX """)
spark.sql(raw_******sql)
dataframe转化与展示
pyspark DataFrame与pandas.DataFrame之间的相互转换
*pandas、pyspark读取文件尽量都写绝对完全的路径
# pandas转spark
pandas_df = pd.read_parquet(***)
values = pandas_df.values.tolist()
columns = pandas_df.columns.tolist()
spark_df = spark.createDataFrame(values, columns)
# spark转pandas
pandas_df = spark_df.toPandas()
*pyspark DataFrame 展示查看
show
take
选择列
select
更多推荐