基于Spark的电商用户行为分析系统的设计与实现

项目架构

Flume–>Kafka–>Spark Streaming–>Mysql–>FineReport 10

数据可视化使用第三方软件FineReport支持

1. 数据采集:利用Java线程模拟行为数据写入被监控的文件

模拟电商网站用户行为数据(也可与阿里云天池开源数据集:真实的淘宝或天猫用户行为数据)

user_id	用户ID,整数类型,序列化后的用户ID
item_id 商品ID,整数类型,序列化后的商品ID
category_id 商品类目ID,整数类型,序列化后的商品所属类目ID
behavior_type 行为类型,字符串,枚举类型,包括('pv','buy', 'cart','fav')
ddate 行为发生的时间
  • flume实时监控数据文件,并将新增的用户行为数据采集传输给Kafka

启动flume:

flume-ng agent -c ./softwares/flume/conf/ -f ./flume2kafka.conf -n a1

flume2kafka.conf

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
# 这里其实是监控Linux命令的source,该命令是监控文件并将实时变化的内容输出
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/admin/bigdata.log

# 设置kafka接收器 
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
# 设置kafka的broker地址和端口号
a1.sinks.k1.brokerList=172.17.33.37:9092
# 设置Kafka的topic
a1.sinks.k1.topic=henry
# 设置序列化的方式
a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder

# use a channel which buffers events in memory
a1.channels.c1.type=memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 1000

# Bind the source and sink to the channel
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
  • Kafka接收数据:将flume传输过来的数据放入消息队列,等待spark streaming 消费
    启动zookeeper
zkServer.sh start

启动Kafka(-deamon后台启动)

kafka-server-start.sh -daemon ./softwares/kafka/config/server.properties

创建topic(3个分区,1个副本,topic名为henry)

kafka-topics.sh --create --zookeeper 172.17.33.37:2181 --replication-factor 1 --partitions 3 --topic henry

#查看所有topic
kafka-topics.sh --list --zookeeper 172.17.33.37:2181 

#定义一个Kafka生产者端口
kafka-console-producer.sh --broker-list 172.17.33.37:9092 --topic henry

#定义一个Kafka消费者端口
kafka-console-consumer.sh --bootstrap-server 172.17.33.37:9092 --topic henry

数据采集部分已准备完成,这里没有进行数据清洗而是将全量的上游数据传输至数据处理模块,统一处理

2. 数据处理

  • spark streaming 程序为 实时等待流,从Kafka指定的topic中消费数据

UserBehavior.scala

/**
  * spark streaming实时处理kafka端的数据并持久化到MySQL
  *
  * @author huangxuan
  * @date 2021.03.19
  */
object UserBehavior {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    System.setProperty("spark.serializer","org.apache.spark.serializer.KryoSerializer")
    //sc
    val sc = new SparkConf().setAppName("To MySql").setMaster("local[5]")
    //ssc  每5秒一个批次
    val ssc = new StreamingContext(sc,Seconds(5))

    //从Kafka中读取数据
    val topics = "henry"  //topic name
    val numThreads = 3 //每个topic的分区数
    val topicMap = topics.split(" ").map((_,numThreads.toInt)).toMap
    val kafkaStream = KafkaUtils.createStream(
      ssc, // spark streaming 上下文对象
      "172.17.33.37:2181", //zookeeper 集群
      "test1",   //topic 所在组
      topicMap
    )

    //获取一个连接对象
    val conn = ConnectionPool.getConnection
    val stmt = conn.createStatement

    //spark streaming 消费数据
    val lines = kafkaStream.map(_._2).flatMap(_.split("\n"))
    lines.foreachRDD(line=>{
      //将kafka的数据以行为单位转换成字符串集合
      // (1,1,pv),(1,2,pv)
      val datas:Array[String] = line.collect()
      datas.foreach(x=>{
        val parms = x.split(",")
        val sql = "insert into user_behavior values("+parms(0)+","+parms(1)+",\'"+parms(2)+"\');"
        stmt.executeUpdate(sql)
        println(sql)
      })
    })
    ConnectionPool.returnConnection(conn)

    ssc.start
    //实时等待流
    ssc.awaitTermination
  }
}

jar包上传至服务器并执行spark streaming程序

spark-submit --class "saprkStreaming.KafkaWordCounter" --master local[2] Spark_Study-1.0-SNAPSHOT.jar

3.数据分析

  • 本次分析所使用的模型和体系

电商分析通常从四个方面展开,即流程效率分析、流量/用户分析、商品分析、产品分析,通过流程效率拆解追踪问题产生环节,通过用户粘性、价值、满意度分析来进行用户分层及流失预警,通过商品生命周期及关联分析来划分商品等级,通过产品分析提升用户浏览-购买过程体验;

本项目通过常用的电商数据分析指标,采用AARRR漏斗模型拆解用户进入APP后的每一步行为;并使用RFM模型,对用户价值进行评价,找到最有价值的用户群,针对这部分用户进行差异化营销。

用户:
    年龄:不同年龄段的购物需求
    地区:不同地区的购物需求
    性别:不同性别的购物需求
    以及其他组合分析
商品:
    类型:
    价格:

4. 数据库设计

  • 采用MySQL 8.0.23 作为spark streaming的下游数据持久层
  1. 用户信息表:
create table user_info(
   u_id int primary key COMMENT '用户ID',
   u_name varchar(50) COMMENT '用户姓名',
   u_sex varchar(10) COMMENT '用户性别',
   u_province varchar(50) COMMENT '用户所在省',
   u_city varchar(50) COMMENT '用户所在市',
   u_birthday date COMMENT '用户出生日期'
)character set = utf8;

insert into user_info values(1,'张三','男','北京','北京','1999-12-26');
insert into user_info values(2,'李四','男','湖北','武汉','1979-04-21');
insert into user_info values(3,'王五','男','河南','郑州','1984-10-16');
insert into user_info values(4,'李大壮','男','山东','青岛','2002-09-09');
insert into user_info values(5,'肖站','男','江西','南昌','1968-11-06');
insert into user_info values(6,'王伊勃','男','新疆','乌鲁木齐','1996-07-09');
insert into user_info values(7,'王冰冰','女','辽宁','长春','1990-09-28');
insert into user_info values(8,'李冰冰','女','黑龙江','哈尔滨','1983-04-11');
insert into user_info values(9,'范冰冰','女','陕西','西安','1977-12-20');
insert into user_info values(10,'安吉拉','女','上海','上海','2002-03-23');
insert into user_info values(11,'罗玉凤','女','海南','海口','1965-03-08');
insert into user_info values(12,'肖龙女','女','四川','成都','1992-05-06');
insert into user_info values(13,'洛雪梅','女','云南','昆明','1983-01-06');
insert into user_info values(14,'马冬梅','女','湖南','长沙','1974-11-06');
insert into user_info values(15,'苏妲己','女','贵州','贵阳','1974-11-06');
insert into user_info values(16,'王建国','男','重庆','重庆','1999-11-06');
insert into user_info values(17,'李旦','男','内蒙古','呼和浩特','1968-11-06');
insert into user_info values(18,'张晓飞','男','吉林','长春','1968-11-06');
insert into user_info values(19,'古力娜扎','女','福建','福州','1994-01-06');
insert into user_info values(20,'允允朴','男','浙江','杭州','2006-11-06');

  1. 商品信息表:
create table goods_info(
   g_id int primary key COMMENT '商品ID',
   g_name varchar(50) COMMENT '商品名称',
   g_type int COMMENT '商品类型编号',
   g_price float COMMENT '商品价格'
)character set = utf8;

insert into goods_info values(1,'安踏运动鞋',1,198);
insert into goods_info values(2,'李宁板鞋',1,214.99);
insert into goods_info values(3,'李宁韦德之道9篮球鞋',1,1499);
insert into goods_info values(4,'Jordan印花套头连帽衫 黑色',1,189);
insert into goods_info values(5,'遮阳帽',1,49);
insert into goods_info values(6,'牛仔裤',1,324);
insert into goods_info values(7,'休闲衬衫',1,88.88);
insert into goods_info values(8,'皮鞋',1,499);
insert into goods_info values(9,'T恤',1,79);
insert into goods_info values(10,'潮流工装裤',1,209);
insert into goods_info values(11,'AD钙奶',2,45);
insert into goods_info values(12,'手撕面包',2,36);
insert into goods_info values(13,'卫龙辣条',2,8);
insert into goods_info values(14,'螺蛳粉',2,66.66);
insert into goods_info values(15,'可乐',2,6);
insert into goods_info values(16,'蛋黄酥',2,28);
insert into goods_info values(17,'自热火锅',2,60);
insert into goods_info values(18,'大益普洱茶',2,999);
insert into goods_info values(19,'飞天茅台酱香型',2,1499);
insert into goods_info values(20,'零食大礼包',2,120);
insert into goods_info values(21,'华为Mate40 Pro',3,6499);
insert into goods_info values(22,'小米11',3,3999);
insert into goods_info values(23,'自拍杆',3,30);
insert into goods_info values(24,'数据线',3,25);
insert into goods_info values(25,'充电宝',3,120);
insert into goods_info values(26,'平板电脑',3,4299);
insert into goods_info values(27,'无人机',3,4999);
insert into goods_info values(28,'游戏机',3,2549);
insert into goods_info values(29,'笔记本电脑',3,9999);
insert into goods_info values(30,'数码相机',3,43800);
insert into goods_info values(31,'小米电视机',4,1499);
insert into goods_info values(32,'美的洗衣机',4,729);
insert into goods_info values(33,'电吹风',4,59);
insert into goods_info values(34,'电饭煲',4,159);
insert into goods_info values(35,'热水器',4,569);
insert into goods_info values(36,'空调',4,2399);
insert into goods_info values(37,'微波炉',4,489);
insert into goods_info values(38,'净水器',4,259);
insert into goods_info values(39,'咖啡机',4,4690);
insert into goods_info values(40,'电扇',4,42);
insert into goods_info values(41,'纸尿裤',5,99);
insert into goods_info values(42,'奶瓶',5,39);
insert into goods_info values(43,'益智玩具',5,26);
insert into goods_info values(44,'毛绒抱枕',5,35);
insert into goods_info values(45,'涂色绘画',5,31.5);
insert into goods_info values(46,'手办',5,199);
insert into goods_info values(47,'儿童肚兜',5,19.9);
insert into goods_info values(48,'儿童牙刷',5,14.9);
insert into goods_info values(49,'儿童餐椅',5,119);
insert into goods_info values(50,'妈咪包',5,60);
insert into goods_info values(51,'行李包',6,44.5);
insert into goods_info values(52,'女生斜挎包',6,139);
insert into goods_info values(53,'双肩包',6,69.6);
insert into goods_info values(54,'链条包',6,469);
insert into goods_info values(55,'旅行箱',6,334);
insert into goods_info values(56,'帆布包',6,62);
insert into goods_info values(57,'单肩包',6,199);
insert into goods_info values(58,'男士胸包',6,62);
insert into goods_info values(59,'男士真皮包',6,439);
insert into goods_info values(60,'卡包',6,54);
insert into goods_info values(61,'口红',7,319);
insert into goods_info values(62,'面膜',7,99);
insert into goods_info values(63,'雅思兰黛粉底',7,410);
insert into goods_info values(64,'OLAY抗老精华',7,229);
insert into goods_info values(65,'大宝眼霜',7,99.9);
insert into goods_info values(66,'安耐晒',7,79);
insert into goods_info values(67,'清扬男士洗发水',7,69.9);
insert into goods_info values(68,'舒肤佳沐浴露',7,34.9);
insert into goods_info values(69,'发胶',7,69);
insert into goods_info values(70,'香皂',7,10);
insert into goods_info values(71,'六味地黄丸',8,30);
insert into goods_info values(72,'皮炎宁',8,44);
insert into goods_info values(73,'999感冒灵',8,99);
insert into goods_info values(74,'九芝堂阿胶补血颗粒',8,436);
insert into goods_info values(75,'维C含片',8,34);
insert into goods_info values(76,'珍视明滴眼液',8,42);
insert into goods_info values(77,'汤臣倍健复合维A',8,144);
insert into goods_info values(78,'云南白药创可贴',8,21);
insert into goods_info values(79,'万通筋骨贴',8,29);
insert into goods_info values(80,'安眠药',8,169);
insert into goods_info values(81,'鞋架',9,9.8);
insert into goods_info values(82,'衣架',9,46);
insert into goods_info values(83,'折叠床',9,129);
insert into goods_info values(84,'真皮沙发',9,16999);
insert into goods_info values(85,'浴缸',9,420);
insert into goods_info values(86,'台灯',9,66);
insert into goods_info values(87,'椅子',9,44);
insert into goods_info values(88,'桌子',9,130);
insert into goods_info values(89,'傲风电竞椅',9,444);
insert into goods_info values(90,'床垫',9,239);
insert into goods_info values(91,'平衡车',10,1629);
insert into goods_info values(92,'路亚套杆',10,429);
insert into goods_info values(93,'跑步机',10,599);
insert into goods_info values(94,'哑铃套件',10,236);
insert into goods_info values(95,'健腹轮',10,29);
insert into goods_info values(96,'呼啦圈',10,76);
insert into goods_info values(97,'轮滑鞋',10,232);
insert into goods_info values(98,'篮球',10,199);
insert into goods_info values(99,'羽毛球拍',10,142);
insert into goods_info values(100,'网球拍',10,64);

  1. 商品类目表:
create table goods_type(
   t_id int primary key COMMENT '商品类型ID',
   t_name varchar(50) COMMENT '商品类型名称'
)character set = utf8;

insert into goods_type values(1,'服饰鞋帽');
insert into goods_type values(2,'食品饮料');
insert into goods_type values(3,'数码3C');
insert into goods_type values(4,'家用电器');
insert into goods_type values(5,'母婴');
insert into goods_type values(6,'箱包');
insert into goods_type values(7,'个护美妆');
insert into goods_type values(8,'医药');
insert into goods_type values(9,'家具');
insert into goods_type values(10,'体育器械');


  1. 行为类目表:
create table behavior_type(
   b_id varchar(5) primary key COMMENT '行为类型ID',
   b_name varchar(50) COMMENT '行为类型名称'
)character set = utf8;

insert into behavior_type values('pv','点击');
insert into behavior_type values('buy','购买');
insert into behavior_type values('cart','加购物车');
insert into behavior_type values('fav','收藏');
  1. 用户行为数据表:来源spark streaming
create table user_behavior(
   user_id int COMMENT '用户ID',
   good_id int COMMENT '商品ID',
   behavior_type varchar(5) COMMENT '行为类型ID',
   ddate date COMMENT '行为时间'
)character set = utf8;
  1. 系统用户表:
create table sys_user(
   userName varchar(50) primary key COMMENT '用户名',
   userPassword varchar(50) COMMENT '密码',
   userEmail varchar(50) COMMENT '邮箱',
   userIdentity int COMMENT '用户身份,0-超级管理员,1-普通用户'
)character set = utf8;

insert into sys_user values('henry','henry','1332822653@qq.com',0);
insert into sys_user values('tom','tom','9999999999@163.com',1);
insert into sys_user values('jack','jack','helloworld@163.com',1);
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐