spark-hdfs-hive-hbase
打开虚拟机首先确保虚拟机上安装了hadoop spark hive hbase然后依次启动启动hbase 首先先确保 先启动zekooperhadoop启动命令 start-all.shhive 启动 hive --service hiveserver2 & 后台启动 不想后台的话就不用加&zkserve.sh start 启动zekooperhbase start-hbase.s
打开虚拟机
首先确保虚拟机上安装了hadoop spark hive hbase
然后依次启动
启动hbase 首先先确保 先启动zekooper
hadoop启动命令 start-all.sh
hive 启动 hive --service hiveserver2 & 后台启动 不想后台的话就不用加&
zkserve.sh start 启动zekooper
hbase start-hbase.sh
hive --service metastroe 启动spark
我这里少启动了一个spark 因为我开始是用rdd写的 后续启动就是了
首先这里有一份文件
首先对文件进行上传
首先在hdfs上创建一个文件夹
然后上传文件
打开网页
http://192.168.80.181:50070/explorer.html#/app/data/exam
文件已经上传成功
然后打开idea 对数据进行查询
查询的话 我们可以用sparkcontext sparksession 和 spark sql 3中方式
这里我先使用sparksession进行数据查询
sparkcontext rdd书写
object Myeaxm {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setMaster("local[*]")
.setAppName("test")
val sc=new SparkContext(conf)
.textFile("hdfs://192.168.80.181:9000/app/data/exam/returned_goods_log_7-9.csv")
val header=sc.first()
val rdd1=sc.filter(_!=header)
val rdd=rdd1.map(_.split(",",-1))
// ①统计产生售后服务的订单号的个数。
// println(rdd.map(_ (3)).distinct.count())
// ②统计每个月分别产生售后服务的不同订单个数。
// println(rdd.map(x => (x(0), x(3))).distinct.map(x => (x._1, 1))
// .reduceByKey(_ + _).foreach(println))
// ③统计产生多次售后服务的订单及对应的售后服务次数。
println(rdd1.map(x => (x(3), 1)).reduceByKey(_ + _)
.filter(x => x._2 > 1).foreach(println))
// ④统计每个月每种售后服务问题类型占当月总售后服务量的占比(保留小数点后 3 位),
// 输出结果模式(月份,问题类型,占比)。
// ⑤统计每个月每个员工处理的任务状态为已完成状态的售后服务数量。
// rdd.map(x=>(x(0),x(12),x(13)))
// .filter(!_._2.trim.equals("")).filter(_._3.equals("已完成"))
// .map(x=>((x._1,x._2),1)).reduceByKey(_+_).foreach(println)
}
}
sparkSession
val spark =SparkSession.builder()
.appName("exam")
.master("local[*]").getOrCreate()
val df=spark.read.format("csv")
.option("header","true")
.load("D:\\Download\\returned_goods_log_7-9.csv")
import spark.implicits._
// ①统计产生售后服务的订单号的个数。
// df.select("订单号").distinct().agg(count($"订单号")).show()
// ②统计每个月分别产生售后服务的不同订单个数。
// df.select("月份","订单号").distinct()
// .groupBy("月份").agg(count($"订单号")).show()
// ③统计产生多次售后服务的订单及对应的售后服务次数。
// df.select("订单号")
// .groupBy($"订单号").agg(count("订单号") as "sum")
// .where("sum >1").show()
// ④统计每个月每种售后服务问题类型占当月总售后服务量的占比(保留小数点后 3 位),
// 输出结果模式(月份,问题类型,占比)。
val df1=df.select("月份","关联(问题类型)")
.groupBy("月份","关联(问题类型)")
.agg(count("关联(问题类型)") as "sum")
val df2=df.select("月份","关联(问题类型)").groupBy("月份")
.agg(count("关联(问题类型)") as "sum1")
df1.join(df2,Seq("月份"),"left")
.withColumn("占比",round($"sum"/$"sum1",3)).show()
// ⑤统计每个月每个员工处理的任务状态为已完成状态的售后服务数量。
df.withColumn("state",$"任务状态")
.withColumn("person",$"任务执行人")
.where("state ='已完成' and trim(person)!=''" )
.groupBy("月份","person")
.agg(count("state").as("sum"))
.select("月份","person","sum").show()
spark.stop()
然后打开虚拟机 启动hbase
start-hbase.sh 启动hbase要 先启动zekooper zkServer.sh start
4 .Hive 中 创 建 数 据 库 exam 类型是csv类型
create database exam
create external table exam.ex_exam_after_sales_service(
createMonth string,
createTime string,
createPerson string,
orderCode string,
productCode string,
serviceReasonType string,
serviceReasonDetail string,
resendProductCodeproductNum string,
returnOrderCode string,
resendPorductDetail string,
resendOrderCode string,
excutePerson string,
state string,
finishTime string,
provice string,
city string
)
row format serde 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
with serdeproperties
(
'separatorChar' = ',',
'quoteChar' = '\"',
'escapeChar'= '\\'
)
location '/app/data/exam/'
tblproperties('skip.header.line.count'='1')
然后创建外部表映射hbase
create external table exam.ex_exam_after_sales_service_statistics(
key string,
serviceReasonDetailCount int
)
stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
with serdeproperties(
"hbase.columns.mapping"=":key,statistics:serviceReasonDetailCount"
)
tblproperties(
"hbase.table.name"="exam:after_sales_service")
with
t1 as (select concat(createMonth,serviceReasonDetail)
as key from exam.ex_exam_after_sales_service)
insert into table ex_exam_after_sales_service_statistics
select key,count(key) from t1 group by key --往数据表中插入数据
查询hbase
scan 'exam:after_sales_service',{LIMIT=>10}
更多推荐
所有评论(0)