項目整體介紹
電商平臺系統(tǒng)上線運行一段時間后,可以收集到大量的用戶行為數(shù)據(jù);利用大數(shù)據(jù)技術進行深入挖掘和分析,可以得到感興趣的商業(yè)指標。而隨著大數(shù)據(jù)技術的深入研究與應用,簡單的統(tǒng)計指標已經(jīng)不能滿足業(yè)務發(fā)展的需求了。
企業(yè)的關注點,日益聚焦在、如何利用大數(shù)據(jù),來為精細化運營和精準營銷服務。而要做精細化運營,首先要建立本企業(yè)的用戶畫像。
數(shù)據(jù)源分析
項目所用到的數(shù)據(jù)源,就是業(yè)務系統(tǒng)中收集的數(shù)據(jù),保存在MySQL的表中。
主要有以下這些表:
- 用戶信息表(t_member)
- 用戶地址表(t_member_addr)
- 商品信息表(t_commodity)
- 商品類別信息表(t_commodity_cate)
- 訂單表(t_order)
- 訂單商品表(t_order_commodity)
- 優(yōu)惠券表(t_coupon)
- 用戶優(yōu)惠券表(t_coupon_member)
- 訂單優(yōu)惠券表(t_coupon_order)
- 快遞表(t_delivery)
- 反饋表(t_feedback)
- 商店表(t_shop)
- 商家訂單表(t_shop_order,訂單對于賣家也有記錄)
- 后臺用戶表(t_user,員工表)
這些數(shù)據(jù)內(nèi)容,主要可以分為:用戶信息,商品信息,訂單信息,運營相關信息,服務相關信息五大類,我們后續(xù)可以根據(jù)這個標準做標簽建模。
項目架構
我們將數(shù)據(jù)用sqoop從MySQL中導出到Hive,然后使用spark作為分析引擎,用spark SQL對定義的指標進行計算,得到的結果寫入外部存儲系統(tǒng)。
寫入的結果按照用途,主要分成兩類:一類是平臺的統(tǒng)計指標,寫入redis或者MySQL,做圖表展示,用于市場分析決策;另一類則是每個用戶的具體標簽,寫入ES,方便進行復雜查詢和用戶篩選。

數(shù)據(jù)展示和應用
得到的兩類計算結果,分別用于展示和精準營銷。
- 平臺統(tǒng)計指標展示
圖表形式展現(xiàn)(echarts.js);統(tǒng)計結果數(shù)據(jù)寫入redis(或mysql);后臺應用服務獲取數(shù)據(jù),傳給前端進行頁面繪制。 - 用戶標簽信息應用
全部標簽在頁面上分類顯示;構建后臺應用服務,點選標簽進行圈人;生成對應的用戶列表,用于精準營銷推送信息。
環(huán)境配置
安裝Hadoop(略)
安裝Hive
基本配置
1)下載解壓
sudo tar xvfz apache-hive-2.3.7-bin.tar.gz -C /usr/local/lib/
sudo chown -R ubuntu:ubuntu apache-hive-2.3.7-bin/
2)添加環(huán)境變量
vi ~/.bashrc
export HIVE_HOME=/usr/local/apache-hive-2.3.6-bin
export PATH=$PATH:$HIVE_HOME/bin
source ~/.bashrc
3)定義倉庫目錄
hdfs dfs -mkdir -p /user/ubuntu/warehouse
修改讀寫權限:
hdfs dfs -chmod 777 /user/ubuntu/warehouse
4)配置文件
新建一個hive-site.xml:
vi conf/hive-site.xml
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?><!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<configuration>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>
jdbc:mysql://localhost:3306/metastore?createDatabaseIfNotExist=true
</value>
<description>
JDBC connect string for a JDBC metastore.
To use SSL to encrypt/authenticate the connection, provide database-specific SSL flag in the connection URL.
For example, jdbc:postgresql://myhost/db?ssl=true for postgres database.
</description>
</property>
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/user/ubuntu/warehouse</value>
<description>location of default database for the warehouse</description>
</property>
<property>
<name>hive.metastore.uris</name>
<value>thrift://localhost:9083</value>
<description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.</description>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
<description>Driver class name for a JDBC metastore</description>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>123456</value>
</property>
<property>
<name>javax.jdo.PersistenceManagerFactoryClass</name>
<value>org.datanucleus.api.jdo.JDOPersistenceManagerFactory</value>
<description>class implementing the jdo persistence</description>
</property>
</configuration>
初始化metastore
hive/bin/schematool -dbType mysql -initSchema root 123456
啟動hive元存儲thrift服務器
hive --service metastore
創(chuàng)建項目數(shù)據(jù)的數(shù)據(jù)庫
進入hive客戶端控制臺,創(chuàng)建一個叫做ecommerce的數(shù)據(jù)庫:
hive
create database ecommerce;
數(shù)據(jù)準備
在mysql中創(chuàng)建生產(chǎn)表
實際生產(chǎn)環(huán)境中,我們的表一般都是放在mysql中的,所以需要先模擬創(chuàng)建生產(chǎn)表。
連接mysql
mysql -u root -p
創(chuàng)建數(shù)據(jù)庫 ecommerce
create database ecommerce charset=utf8;
執(zhí)行寫好的創(chuàng)建表的ddl
source desktop/files/ddl/i_member.sql;
source desktop/files/ddl/i_commodity.sql;
source desktop/files/ddl/i_order.sql;
source desktop/files/ddl/i_marketing.sql;
source desktop/files/ddl/i_operation.sql;
配置sqoop做數(shù)據(jù)遷移
解壓sqoop安裝包
tar zxvf sqoop-1.4.7.bin__hadoop-2.6.0.tar.gz
添加mysql的連接器jar包
將mysql-connector-java-5.1.28-bin.jar拷貝到sqoop/lib下面:
cp mysql-connector-java-5.1.28-bin.jar ./lib/
另外注意,還應該把hive-site.xml拷貝到sqoop/conf下面。
配置文件sqoop-env.sh
修改配置文件,加入hive的環(huán)境變量:
cd /opt/sqoop/conf
mv sqoop-env-template.sh sqoop-env.sh
vim sqoop-env.sh
export HIVE_HOME= /usr/local/lib/apache-hive-2.3.7-bin
用sqoop將數(shù)據(jù)從mysql導入hive
1)進入hive
hive
2)建庫 ecommerce(如果之前沒有建)
create database ecommerce;
3)創(chuàng)建導入數(shù)據(jù)的腳本datamigrate.sh
#!/bin/bash
# 定義一個函數(shù),執(zhí)行sqoop命令,所以執(zhí)行腳本應該在sqoop/bin下面
sq()
{
./sqoop import \
--connect jdbc:mysql://localhost:3306/ecommerce \
--username root \
--password 123456 \
--table $1 \
--num-mappers 1 \
--hive-import \
--fields-terminated-by "\t" \
--hive-overwrite \
--hive-database ecommerce \
--hive-table $1
}
sq t_commodity
sq t_commodity_cate
sq t_coupon
sq t_coupon_member
sq t_coupon_order
sq t_delivery
sq t_feedback
sq t_member
sq t_member_addr
sq t_order
sq t_order_commodity
sq t_shop
sq t_shop_order
sq t_user
注意里面到mysql的連接,localhost改成自己需要的主機名,用戶名密碼也要改成自己的配置。
運行腳本之后,進入hive,可以看到ecommerce下已經(jīng)有所有的表了。
數(shù)據(jù)處理
創(chuàng)建項目
打開IDE,創(chuàng)建一個 java的maven 項目:EcommerceUserProfile。在src/main/java下新建package:com.atguigu.userprofile,下面包含這樣幾個package:
- utils:工具包
- etl:對hive中數(shù)據(jù)進行ETL的spark程序
另外,還會有前后端業(yè)務相關的代碼,我們會在后面定義。
引入依賴和插件
pom文件配置如下:
<!--定義版本信息-->
<properties>
<java.version>1.8</java.version>
<spark.version>2.1.0</spark.version>
<scala.version>2.11</scala.version>
<springboot.version>2.2.0.RELEASE</springboot.version>
<elasticsearch.version>6.8.5</elasticsearch.version>
<lombok.version>1.18.10</lombok.version>
<fastjson.version>1.2.59</fastjson.version>
</properties>
<dependencies>
<!--spring boot 相關依賴-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>${springboot.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
<version>${springboot.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
<version>${springboot.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
<version>${springboot.version}</version>
</dependency>
<!--spark 相關依賴,需要core、sql和hive-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
<version>${lombok.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-hadoop</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<!-- Java High Level REST Client -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
</dependencies>
<profiles>
<!--開發(fā)環(huán)境-->
<profile>
<id>dev</id>
<properties>
<build.profile.id>dev</build.profile.id>
</properties>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
</profile>
<!--生產(chǎn)環(huán)境-->
<profile>
<id>prod</id>
<properties>
<build.profile.id>prod</build.profile.id>
</properties>
</profile>
</profiles>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
<!--用resource定義打包時的資源文件-->
<resources>
<resource>
<directory>src/main/resources</directory>
<filtering>true</filtering>
<excludes>
<exclude>application-*.properties</exclude>
</excludes>
</resource>
<resource>
<directory>src/main/resources</directory>
<filtering>false</filtering>
<includes>
<include>
application-${build.profile.id}.properties
</include>
</includes>
</resource>
</resources>
</build>
創(chuàng)建通用的工具類
在utils下新建一個SparkUtils類,專門用來定義一些通用的spark相關的操作。
public class SparkUtils {
// 定義會話池
private static ThreadLocal<SparkSession> sessionPool = new ThreadLocal<>();
public static SparkSession initSession() {
if (sessionPool.get() != null) {
return sessionPool.get();
}
SparkSession session = SparkSession.builder().appName("etl")
.master("local[*]")
.config("es.nodes", "localhost")
.config("es.port", "9200")
.config("es.index.auto.create", "false")
.enableHiveSupport()
.getOrCreate();
sessionPool.set(session);
return session;
}
}
另外,開始寫業(yè)務邏輯之前,首先需要把hive-site.xml復制到resources下面,本地運行,需要有hive的相關配置信息。這里需要注意修改兩個地方:hive.metastore.warehouse.dir和hive.metastore.uris。
平臺用戶統(tǒng)計指標
用戶信息的提?。瀳D)
我們關心的用戶信息的分布情況,主要有性別、渠道、是否訂閱(關注公眾號)、熱度。
這些數(shù)據(jù),應該就是:當前有多少男性用戶、多少女性用戶、多少安卓用戶、多少ios用戶…全部都是統(tǒng)計一個count值。

最終,我們應該把這些數(shù)據(jù)包在一起,供前端頁面來讀取,就可以畫出上面的餅圖了。
在com.atguigu.userprofile.etl下新建一個類MemberEtl,開始寫代碼。
public class MemberEtl {
public static void main(String[] args) {
SparkSession session = SparkUtils.initSession();
// 寫sql查詢數(shù)據(jù)
List<MemberSex> memberSexes = memberSexEtl(session);
List<MemberChannel> memberChannels = memberChannelEtl(session);
List<MemberMpSub> memberMpSubs = memberMpSubEtl(session);
MemberHeat memberHeat = memberHeatEtl(session);
// 拼成需要的結果
MemberVo memberVo = new MemberVo();
memberVo.setMemberSexes(memberSexes);
memberVo.setMemberChannels(memberChannels);
memberVo.setMemberMpSubs(memberMpSubs);
memberVo.setMemberHeat(memberHeat);
//打印到控制臺輸出
System.out.println("===========" + JSON.toJSONString(memberVo));
}
public static List<MemberSex> memberSexEtl(SparkSession session) {
// 先用sql得到每個性別的count統(tǒng)計數(shù)據(jù)
Dataset<Row> dataset = session.sql(
"select sex as memberSex, count(id) as sexCount " +
" from ecommerce.t_member group by sex");
List<String> list = dataset.toJSON().collectAsList();
// 對每一個元素依次map成MemberSex,收集起來
List<MemberSex> result = list.stream()
.map( str -> JSON.parseObject(str, MemberSex.class))
.collect(Collectors.toList());
return result;
}
public static List<MemberChannel> memberChannelEtl(SparkSession session) {
Dataset<Row> dataset = session.sql(
"select member_channel as memberChannel, count(id) as channelCount " +
" from ecommerce.t_member group by member_channel");
List<String> list = dataset.toJSON().collectAsList();
List<MemberChannel> result = list.stream()
.map(str -> JSON.parseObject(str, MemberChannel.class))
.collect(Collectors.toList());
return result;
}
public static List<MemberMpSub> memberMpSubEtl(SparkSession session) {
Dataset<Row> sub = session.sql(
"select count(if(mp_open_id !='null',true,null)) as subCount, " +
" count(if(mp_open_id ='null',true,null)) as unSubCount " +
" from ecommerce.t_member");
List<String> list = sub.toJSON().collectAsList();
List<MemberMpSub> result = list.stream()
.map(str -> JSON.parseObject(str, MemberMpSub.class))
.collect(Collectors.toList());
return result;
}
public static MemberHeat memberHeatEtl(SparkSession session) {
// reg , complete , order , orderAgain, coupon
Dataset<Row> reg_complete = session.sql(
"select count(if(phone='null',true,null)) as reg," +
" count(if(phone !='null',true,null)) as complete " +
" from ecommerce.t_member");
Dataset<Row> order_again = session.sql(
"select count(if(t.orderCount =1,true,null)) as order," +
"count(if(t.orderCount >=2,true,null)) as orderAgain from " +
"(select count(order_id) as orderCount,member_id from ecommerce.t_order group by member_id) as t");
Dataset<Row> coupon = session.sql("select count(distinct member_id) as coupon from ecommerce.t_coupon_member ");
// 最終,將三張表(注冊、復購、優(yōu)惠券)連在一起
Dataset<Row> heat = coupon.crossJoin(reg_complete).crossJoin(order_again);
List<String> list = heat.toJSON().collectAsList();
List<MemberHeat> result = list.stream()
.map(str -> JSON.parseObject(str, MemberHeat.class))
.collect(Collectors.toList());
// 只有一行數(shù)據(jù),獲取后返回
return result.get(0);
}
// 想要展示餅圖的數(shù)據(jù)信息
@Data
static class MemberVo{
private List<MemberSex> memberSexes; // 性別統(tǒng)計信息
private List<MemberChannel> memberChannels; // 渠道來源統(tǒng)計信息
private List<MemberMpSub> memberMpSubs; // 用戶是否關注媒體平臺
private MemberHeat memberHeat; // 用戶熱度統(tǒng)計
}
// 分別定義每個元素類
@Data
static class MemberSex {
private Integer memberSex;
private Integer sexCount;
}
@Data
static class MemberChannel {
private Integer memberChannel;
private Integer channelCount;
}
@Data
static class MemberMpSub {
private Integer subCount;
private Integer unSubCount;
}
@Data
static class MemberHeat {
private Integer reg; // 只注冊,未填寫手機號
private Integer complete; // 完善了信息,填了手機號
private Integer order; // 下過訂單
private Integer orderAgain; // 多次下單,復購
private Integer coupon; // 購買過優(yōu)惠券,儲值
}
}
熱詞提?。ㄔ~云)
所謂詞云,其實就是統(tǒng)計關鍵詞的頻率。
只要是用戶評價、商品類別之類的文本信息,我們都可以按空格分詞,然后統(tǒng)計每個詞出現(xiàn)的次數(shù)——就相當于是一個word count,然后按照count數(shù)量降序排列就可以了。

由于沒有現(xiàn)成的數(shù)據(jù),這里用了搜狗提供的一個樣例語料庫,上傳到hdfs(也可以添加到resource)。
hdfs dfs -mkdir /data
hdfs dfs -put /mnt/c/Users/wushengran/Desktop/files/data/SogouQ.sample.txt /data
整體代碼如下:
public static void main(String[] args) {
// 為了方便操作數(shù)據(jù),首先創(chuàng)建一個jsc:
SparkConf sc = new SparkConf()
.setAppName("hot word etl")
.setMaster("local[*]");
JavaSparkContext jsc = new JavaSparkContext(sc);
// 數(shù)據(jù)文件在hdfs上
System.setProperty("HADOOP_USER_NAME", "ubuntu");
// 用jsc讀取hdfs文件,轉成java rdd
JavaRDD<String> linesRdd = jsc.textFile("hdfs://192.168.99.170:9000/data/SogouQ.sample.txt");
JavaPairRDD<String, Integer> pairRDD = linesRdd.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
// 以制表符分隔,取第三個字段
String word = s.split("\t")[2];
return new Tuple2<>(word, 1);
}
});
// 以word作為key,分組聚合
JavaPairRDD<String, Integer> countRdd = pairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
// 元素互換位置
JavaPairRDD<Integer, String> swapedRdd = countRdd.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
@Override
public Tuple2<Integer, String> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
return stringIntegerTuple2.swap();
}
});
// 按照count排序
JavaPairRDD<Integer, String> sortedRdd = swapedRdd.sortByKey(false);
// 再互換位置
JavaPairRDD<String, Integer> resultRdd = sortedRdd.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<Integer, String> integerStringTuple2) throws Exception {
return integerStringTuple2.swap();
}
});
// 取前10個熱詞輸出
List<Tuple2<String, Integer>> hotWordCounts = resultRdd.take(10);
// 打印輸出
for (Tuple2<String, Integer> hotWordCount: hotWordCounts) {
System.out.println(hotWordCount._1 + " === count " + hotWordCount._2);
}
}
平臺近期數(shù)據(jù)增量(折線圖)
我們還可以統(tǒng)計平臺用戶數(shù)據(jù)的增長,主要就是注冊量和訂單量。我們定義幾個要展示的統(tǒng)計指標:(只統(tǒng)計展示近七天的數(shù)據(jù))
- 近七天的、每天新增注冊人數(shù)(每天增量);
- 近七天的、截至每天的總用戶人數(shù)(每天總量);
- 近七天的、截至每天的總訂單數(shù)(每天總量);
- 近七天的、截至每天的總訂單流水金額數(shù)量(每天總量)

在etl下新建一個GrowthEtl類,代碼如下:
public class GrowthEtl {
public static void main(String[] args) {
SparkSession session = SparkUtils.initSession();
List<GrowthLineVo> growthLineVo = growthEtl(session);
System.out.println(growthLineVo);
}
private static List<GrowthLineVo> growthEtl(SparkSession session) {
// 指定“當前日期”是2019.11.30,這是數(shù)據(jù)決定的
LocalDate now = LocalDate.of(2019, Month.NOVEMBER, 30);
Date nowDay = Date.from(now.atStartOfDay(ZoneId.systemDefault()).toInstant());
Date sevenDayBefore = DateUtil.addDay(nowDay, -7);
// 近七天注冊人數(shù)統(tǒng)計
String memberSql = "select date_format(create_time,'yyyy-MM-dd') as day," +
" count(id) as regCount, max(id) as memberCount " +
" from ecommerce.t_member where create_time >='%s' " +
" group by date_format(create_time,'yyyy-MM-dd') order by day";
memberSql = String.format(memberSql, DateUtil.DateToString(sevenDayBefore, DateStyle.YYYY_MM_DD_HH_MM_SS));
Dataset<Row> memberDs = session.sql(memberSql);
// 近七天訂單和流水統(tǒng)計
String orderSql = "select date_format(create_time,'yyyy-MM-dd') as day," +
" max(order_id) orderCount, sum(origin_price) as gmv" +
" from ecommerce.t_order where create_time >='%s' " +
"group by date_format(create_time,'yyyy-MM-dd') order by day";
orderSql = String.format(orderSql, DateUtil.DateToString(sevenDayBefore, DateStyle.YYYY_MM_DD_HH_MM_SS));
Dataset<Row> orderDs = session.sql(orderSql);
// 聯(lián)接查詢,按照day內(nèi)連接
Dataset<Tuple2<Row, Row>> tuple2Dataset = memberDs.joinWith(orderDs, memberDs.col("day").equalTo(orderDs.col("day")), "inner");
List<Tuple2<Row, Row>> tuple2s = tuple2Dataset.collectAsList();
List<GrowthLineVo> vos = new ArrayList<>();
// 遍歷二元組List,包裝 GrowthLineVo
for (Tuple2<Row, Row> tuple2 : tuple2s) {
Row row1 = tuple2._1(); // memberSql結果
Row row2 = tuple2._2(); // orderSql結果
JSONObject obj = new JSONObject();
StructType schema = row1.schema();
String[] strings = schema.fieldNames();
for (String string : strings) {
Object as = row1.getAs(string);
obj.put(string, as);
}
schema = row2.schema();
strings = schema.fieldNames();
for (String string : strings) {
Object as = row2.getAs(string);
obj.put(string, as);
}
GrowthLineVo growthLineVo = obj.toJavaObject(GrowthLineVo.class);
vos.add(growthLineVo);
}
// 七天前,再之前的訂單流水總和(GMV)
String preGmvSql = "select sum(origin_price) as totalGmv from ecommerce.t_order where create_time <'%s'";
preGmvSql = String.format(preGmvSql, DateUtil.DateToString(sevenDayBefore, DateStyle.YYYY_MM_DD_HH_MM_SS));
Dataset<Row> gmvDs = session.sql(preGmvSql);
double previousGmv = gmvDs.collectAsList().get(0).getDouble(0);
BigDecimal preGmv = BigDecimal.valueOf(previousGmv);
// 之前每天的增量gmv取出,依次疊加,得到總和
List<BigDecimal> totalGmvList = new ArrayList<>();
for (int i = 0; i < vos.size(); i++) {
GrowthLineVo growthLineVo = vos.get(i);
BigDecimal gmv = growthLineVo.getGmv();
BigDecimal temp = gmv.add(preGmv);
for (int j = 0; j < i; j++) {
GrowthLineVo prev = vos.get(j);
temp = temp.add(prev.getGmv());
}
totalGmvList.add(temp);
}
// 遍歷總量gmv的List,更新vos里面gmv的值
for (int i = 0; i < totalGmvList.size(); i++) {
GrowthLineVo lineVo = vos.get(i);
lineVo.setGmv(totalGmvList.get(i));
}
return vos;
}
@Data
static class GrowthLineVo {
// 每天新增注冊數(shù)、總用戶數(shù)、總訂單數(shù)、總流水GMV
private String day;
private Integer regCount;
private Integer memberCount;
private Integer orderCount;
private BigDecimal gmv;
}
}
平臺近期周環(huán)比統(tǒng)計(柱狀圖)
所謂周環(huán)比,week on week,就是比較兩周的數(shù)據(jù),最近一周跟上周相比;而且為了圖像更清楚,我們可以對兩周的數(shù)據(jù),再做一個同比顯示:每一天都跟上周的這一天比(周一跟周一比,周二跟周二比),看增長多少。

我們準備統(tǒng)計注冊數(shù)量(regs)和訂單數(shù)量(orders)的周環(huán)比增長。
在etl下新建一個WowEtl類,代碼如下:
public class WowEtl {
public static void main(String[] args) {
SparkSession session = SparkUtils.initSession();
// 查詢近一周的reg和order的數(shù)量
List<RegVo> regVos = regWeekCount(session);
List<OrderVo> orderVos = orderWeekCount(session);
System.out.println("======" + regVos);
System.out.println("======" + orderVos);
}
public static List<RegVo> regWeekCount(SparkSession session) {
LocalDate now = LocalDate.of(2019, Month.NOVEMBER, 30);
Date nowDay = Date.from(now.atStartOfDay(ZoneId.systemDefault()).toInstant());
Date lastTwoWeekFirstDay = DateUtil.addDay(nowDay, -14);
String sql = "select date_format(create_time,'yyyy-MM-dd') as day," +
" count(id) as regCount from ecommerce.t_member " +
" where create_time >='%s' and create_time < '%s' " +
" group by date_format(create_time,'yyyy-MM-dd')";
sql = String.format(sql,
DateUtil.DateToString(lastTwoWeekFirstDay, DateStyle.YYYY_MM_DD_HH_MM_SS),
DateUtil.DateToString(nowDay, DateStyle.YYYY_MM_DD_HH_MM_SS));
Dataset<Row> dataset = session.sql(sql);
List<String> list = dataset.toJSON().collectAsList();
List<RegVo> result = list.stream()
.map(str -> JSON.parseObject(str, RegVo.class))
.collect(Collectors.toList());
return result;
}
public static List<OrderVo> orderWeekCount(SparkSession session) {
LocalDate now = LocalDate.of(2019, Month.NOVEMBER, 30);
Date nowDay = Date.from(now.atStartOfDay(ZoneId.systemDefault()).toInstant());
Date lastTwoWeekFirstDay = DateUtil.addDay(nowDay, -14);
String sql = "select date_format(create_time,'yyyy-MM-dd') as day," +
" count(order_id) as orderCount from ecommerce.t_order " +
" where create_time >='%s' and create_time < '%s' " +
" group by date_format(create_time,'yyyy-MM-dd')";
sql = String.format(sql,
DateUtil.DateToString(lastTwoWeekFirstDay, DateStyle.YYYY_MM_DD_HH_MM_SS),
DateUtil.DateToString(nowDay, DateStyle.YYYY_MM_DD_HH_MM_SS));
Dataset<Row> dataset = session.sql(sql);
List<String> list = dataset.toJSON().collectAsList();
List<OrderVo> result = list.stream()
.map(str -> JSON.parseObject(str, OrderVo.class))
.collect(Collectors.toList());
return result;
}
@Data
static class RegVo {
private String day;
private Integer regCount;
}
@Data
static class OrderVo {
private String day;
private Integer orderCount;
}
}
營銷提醒統(tǒng)計(餅圖)
營銷提醒,主要是優(yōu)惠券過期提醒,主要分兩類:首單免費優(yōu)惠券(coupon_id為1),以及一般的抵扣優(yōu)惠券(代金券,coupon_id不為1)。
我們可以將所有優(yōu)惠券,按照失效時間統(tǒng)計出來,以便考慮優(yōu)惠券的使用情況,快速調(diào)整營銷策略。

我們這里的業(yè)務定義,優(yōu)惠券都是7天有效期。
在etl下新建一個RemindEtl類,代碼如下:
public class RemindEtl {
public static void main(String[] args) {
SparkSession session = SparkUtils.initSession();
// 查詢近一周優(yōu)惠券的數(shù)量
List<FreeRemindVo> freeVos = freeRemindCount(session);
List<CouponRemindVo> couponVos = couponRemindCount(session);
System.out.println("======" + freeVos);
System.out.println("======" + couponVos);
}
public static List<FreeRemindVo> freeRemindCount(SparkSession session){
LocalDate now = LocalDate.of(2019, Month.NOVEMBER, 30);
Date nowDay = Date.from(now.atStartOfDay(ZoneId.systemDefault()).toInstant());
Date sevenDayBefore = DateUtil.addDay(nowDay, -7);
String sql ="select date_format(create_time,'yyyy-MM-dd') as day, " +
" count(member_id) as freeCount " +
" from ecommerce.t_coupon_member where coupon_id = 1 " +
" and coupon_channel = 2 and create_time >= '%s' " +
" group by date_format(create_time,'yyyy-MM-dd')";
sql = String.format(sql,
DateUtil.DateToString(sevenDayBefore, DateStyle.YYYY_MM_DD_HH_MM_SS));
Dataset<Row> dataset = session.sql(sql);
List<String> list = dataset.toJSON().collectAsList();
List<FreeRemindVo> result = list.stream()
.map(str -> JSON.parseObject(str, FreeRemindVo.class))
.collect(Collectors.toList());
return result;
}
public static List<CouponRemindVo> couponRemindCount(SparkSession session){
LocalDate now = LocalDate.of(2019, Month.NOVEMBER, 30);
Date nowDay = Date.from(now.atStartOfDay(ZoneId.systemDefault()).toInstant());
Date sevenDayBefore = DateUtil.addDay(nowDay, -7);
String sql ="select date_format(create_time,'yyyy-MM-dd') as day, " +
" count(member_id) as couponCount " +
" from ecommerce.t_coupon_member where coupon_id != 1 " +
" and create_time >= '%s' " +
" group by date_format(create_time,'yyyy-MM-dd')";
sql = String.format(sql,
DateUtil.DateToString(sevenDayBefore, DateStyle.YYYY_MM_DD_HH_MM_SS));
Dataset<Row> dataset = session.sql(sql);
List<String> list = dataset.toJSON().collectAsList();
List<CouponRemindVo> result = list.stream()
.map(str -> JSON.parseObject(str, CouponRemindVo.class))
.collect(Collectors.toList());
return result;
}
@Data
static class FreeRemindVo {
private String day;
private Integer freeCount;
}
@Data
static class CouponRemindVo {
private String day;
private Integer couponCount;
}
}
用戶行為轉化率分析(漏斗圖)
在一個電商平臺上,用戶可以有不同的業(yè)務行為:
頁面展現(xiàn) -> 點擊 -> 加購物車 -> 下單 -> 復購 -> 充值(購買優(yōu)惠券)
對于用戶的一些行為,我們是希望做沉降分析的,考察用戶每一步行為的轉化率。這就可以畫出一個“漏斗圖”:

在etl下新建一個ConversionEtl類,代碼如下:
public class ConversionEtl {
public static void main(String[] args) {
SparkSession session = SparkUtils.initSession();
ConversionVo conversionVo = conversionBehaviorCount(session);
System.out.println(conversionVo);
}
public static ConversionVo conversionBehaviorCount(SparkSession session){
// 查詢下過訂單的用戶
Dataset<Row> orderMember = session.sql("select distinct(member_id) from ecommerce.t_order " +
"where order_status=2");
// 將購買次數(shù)超過 1 次的用戶查出來
Dataset<Row> orderAgainMember = session.sql("select t.member_id as member_id " +
" from (select count(order_id) as orderCount," +
" member_id from ecommerce.t_order " +
" where order_status=2 group by member_id) as t " +
" where t.orderCount>1");
// 查詢充值過的用戶
Dataset<Row> charge = session.sql("select distinct(member_id) as member_id " +
"from ecommerce.t_coupon_member where coupon_channel = 1");
Dataset<Row> join = charge.join(
orderAgainMember,
orderAgainMember.col("member_id")
.equalTo(charge.col("member_id")),
"inner");
// 統(tǒng)計各層級的數(shù)量
long order = orderMember.count();
long orderAgain = orderAgainMember.count();
long chargeCoupon = join.count();
// 包裝成VO
ConversionVo vo = new ConversionVo();
vo.setPresent(1000L); // 目前數(shù)據(jù)中沒有,直接給定值
vo.setClick(800L);
vo.setAddCart(600L);
vo.setOrder(order);
vo.setOrderAgain(orderAgain);
vo.setChargeCoupon(chargeCoupon);
return vo;
}
@Data
static class ConversionVo{
private Long present;
private Long click;
private Long addCart;
private Long order;
private Long orderAgain;
private Long chargeCoupon;
}
}
用戶標簽系統(tǒng)(用戶畫像建模)
現(xiàn)在電商行業(yè)競爭越來越激烈,只有泛泛的數(shù)據(jù)統(tǒng)計是不夠的。很多大廠都開始向著所謂“精準營銷”“個性化推薦”這些領域來發(fā)力。而精準營銷的基礎,就是要給用戶打上各種各樣、豐富而且不同的標簽。
所以我們接下來做的就是,用sql把用戶標簽提取出來,而得到的標簽結果要寫入es,供業(yè)務系統(tǒng)讀取查詢。
用戶標簽定義
我們希望提取的用戶標簽,有這么幾部分:
- 用戶個人信息(從member表中提?。?/li>
- 用戶業(yè)務行為信息(從order和t_order_commodity中提取)
- 用戶市場營銷信息(優(yōu)惠券相關,從t_coupon_member中提取)
- 用戶服務信息(從快遞表和反饋表,t_delivery和t_feedback中提取)

具體定義字段如下:
- 個人信息
memberId, phone, sex, channel, subOpenId, address, regTime - 購買行為特征(及興趣愛好)
orderTime, orderCount, favGoods, orderMoney - 消費能力(及過期提醒)
freeCouponTime, couponTimes, chargeMoney - 反饋行為(為了提升效率)
overTime, feedback
在ES中創(chuàng)建mapping
在ES中首先新建一個tag索引,用來保存我們所有的數(shù)據(jù):PUT tag
然后在索引中新建映射(mapping):
PUT /tag/_doc/_mapping?pretty
{
"_doc": {
"properties": {
"memberId": { "type": "text", "fields": { "keyword": { "type": "keyword" } } },
"phone": { "type": "text", "fields": { "keyword": { "type": "keyword" } } },
"sex": { "type": "text", "fields": { "keyword": { "type": "keyword" } } },
"channel": { "type": "text", "fields": { "keyword": { "type": "keyword" } } },
"subOpenId": { "type": "text", "fields": { "keyword": { "type": "keyword" } } },
“address”: { “type”: “text” },
"regTime": { "type": "date" },
"orderCount": { "type": "long" },
"orderTime": { "type": "date" },
"orderMoney": { "type": "float" },
"favGoods": { "type": "text", "fields": { "keyword": { "type": "keyword" } } },
"freeCouponTime": { "type": "date" },
"couponTimes": { "type": "date" },
"chargeMoney": { "type": "double" },
"overTime": { "type": "long" },
"feedBack": { "type": "long" }
}
}
}
提取用戶標簽數(shù)據(jù)寫入ES
在etl下新建一個類UserTagEtl,代碼如下:
public class UserTagEtl {
public static void main(String[] args) {
SparkSession session = SparkUtils.initSession();
etl(session);
}
// 提取用戶標簽
private static void etl(SparkSession session) {
// 提取用戶基本信息標簽
Dataset<Row> memberBase = session.sql(
"select id as memberId,phone, sex,member_channel as channel, mp_open_id as subOpenId," +
" address_default_id as address, date_format(create_time,'yyyy-MM-dd') as regTime" +
" from ecommerce.t_member");
// 提取用戶購買行為特征
Dataset<Row> orderBehavior = session.sql(
"select o.member_id as memberId," +
" count(o.order_id) as orderCount," +
" date_format(max(o.create_time),'yyyy-MM-dd') as orderTime," +
" sum(o.pay_price) as orderMoney, " +
" collect_list(DISTINCT oc.commodity_id) as favGoods " +
" from ecommerce.t_order as o left join ecommerce.t_order_commodity as oc" +
" on o.order_id = oc.order_id group by o.member_id");
// 提取用戶購買能力標簽
Dataset<Row> freeCoupon = session.sql(
"select member_id as memberId, " +
" date_format(create_time,'yyyy-MM-dd') as freeCouponTime " +
" from ecommerce.t_coupon_member where coupon_id = 1");
// 多次購買購物券的時間
Dataset<Row> couponTimes = session.sql(
"select member_id as memberId," +
" collect_list(date_format(create_time,'yyyy-MM-dd')) as couponTimes" +
" from ecommerce.t_coupon_member where coupon_id !=1 group by member_id");
// 買購物券總的花費金額
Dataset<Row> chargeMoney = session.sql(
"select cm.member_id as memberId , sum(c.coupon_price/2) as chargeMoney " +
" from ecommerce.t_coupon_member as cm left join ecommerce.t_coupon as c " +
" on cm.coupon_id = c.id where cm.coupon_channel != 1 group by cm.member_id");
// 用戶對服務的反饋行為特征
Dataset<Row> overTime = session.sql(
"select (to_unix_timestamp(max(arrive_time)) - to_unix_timestamp(max(pick_time))) " +
" as overTime, member_id as memberId " +
" from ecommerce.t_delivery group by member_id");
// 最近一次用戶反饋
Dataset<Row> feedback = session.sql("select fb.feedback_type as feedback,fb.member_id as memberId" +
" from ecommerce.t_feedback as fb " +
" right join (select max(id) as fid,member_id as memberId " +
" from ecommerce.t_feedback group by member_id) as t " +
" on fb.id = t.fid");
// 將這些結果注冊成表,全部按照id連接起來,合并出想要的信息,寫入es中
memberBase.registerTempTable("memberBase");
orderBehavior.registerTempTable("orderBehavior");
freeCoupon.registerTempTable("freeCoupon");
couponTimes.registerTempTable("couponTimes");
chargeMoney.registerTempTable("chargeMoney");
overTime.registerTempTable("overTime");
feedback.registerTempTable("feedback");
Dataset<Row> result = session.sql("select m.*,o.orderCount,o.orderTime,o.orderMoney,o.favGoods," +
" fb.freeCouponTime,ct.couponTimes, cm.chargeMoney,ot.overTime,f.feedBack" +
" from memberBase as m " +
" left join orderBehavior as o on m.memberId = o.memberId " +
" left join freeCoupon as fb on m.memberId = fb.memberId " +
" left join couponTimes as ct on m.memberId = ct.memberId " +
" left join chargeMoney as cm on m.memberId = cm.memberId " +
" left join overTime as ot on m.memberId = ot.memberId " +
" left join feedback as f on m.memberId = f.memberId ");
JavaEsSparkSQL.saveToEs(result,"/usertag/_doc");
}
// 定義用戶標簽的VO
@Data
public static class MemberTag implements Serializable {
// 用戶基本信息
private String memberId;
private String phone;
private String sex;
private String channel;
private String subOpenId;
private String address;
private String regTime;
// 用戶業(yè)務行為特征
private Long orderCount;
private String orderTime;
private Double orderMoney;
private List<String> favGoods;
// 用戶購買能力
private String freeCouponTime; // 首單免費時間
private List<String> couponTimes; // 多次購買時間
private Double chargeMoney; // 購買花費金額
// 用戶反饋行為特征
private Integer overTime;
private Integer feedBack;
}
}
數(shù)據(jù)展示
我們最終,需要把集中統(tǒng)計類的標簽,和用戶個人標簽整合起來,在頁面做一個展示。
統(tǒng)計類的指標很簡單,只要寫入redis,由前端頁面讀出,應用echarts.js這樣的顯示工具把它畫出來就可以了。
而用戶個人標簽,我們主要是供運營客服人員圈人使用,所以相當于還有一個后臺管理系統(tǒng)。運營人員在頁面上點選標簽,發(fā)送請求到后臺,后端服務就從es中取數(shù),返回前端顯示。
配置文件
應用相關
application.properties里面設置spring.profiles.active,主要是用來區(qū)分不同環(huán)境下的配置:dev和prod。
所以具體不同環(huán)境的配置文件,就是application-dev.properties和application-prod.properties。
日志相關
在properties文件中有l(wèi)ogging.config,這指定了當前具體的日志配置文件,是logback.xml。
更改日志級別,可以在applicaition.properties中添加:
logging.level.root=warn
前端代碼
Js和Css
前端用到的組件,主要是Vue.js,其中還用到了echarts.js做更漂亮的圖表展示。
js和css代碼放在resouces/static下面,可以直接拷貝進來。
Html模板
我們用到了Thymeleaf,這是SpringBoot官方推薦的模板引擎。
定義的模板文件,放在resouces/templates下面,可以直接拷貝進來。我們定義了兩個模板,分別對應兩個頁面:一個index是訪問主頁,也就是統(tǒng)計信息大屏展示;另一個是tags,標簽頁面,類似于控制后臺,可以點選標簽進行圈人。
后端業(yè)務代碼
我們的業(yè)務代碼的主要目的是,實現(xiàn)按照某些條件圈人的功能。
首先,在src/main/java下創(chuàng)建一個新的package:com.atguigu.userprofile.app,專門用來放后臺相關邏輯代碼。
下面分幾個package:
- controller
這是java web項目的核心部分,用來響應前端發(fā)來的請求,并經(jīng)過處理后包裝好response返回給前端; - service
這是服務層,后端的控制器處理的過程,核心就是調(diào)用后臺的服務,得到想要的結果; - support
用來放支持類,比如一些特定的數(shù)據(jù)結構,或者對es的連接操作,等等。
入口代碼
直接在app下新建類:UserFilterByTagApplication.java,這是樣板代碼,應用主入口。
package com.atguigu.userprofile.app;
// spring boot 主程序入口
@SpringBootApplication
public class UserFilterByTagApplication {
public static void main(String[] args) {
SpringApplication.run(UserFilterByTagApplication.class, args);
}
}
支持代碼
在app/support下新建類:
1)es查詢標簽操作數(shù)據(jù)的類(EsQueryTag)
這是一個定義好的類,專門用于定義es查詢操作的字段,也就是這對標簽的查詢條件,包括name、value和type。
@Data
public class EsQueryTag {
private String name; // 標簽名稱
private String value; // 查詢限定的值
private String type; // 查詢類型
}
2)es配置類(ESConfig)
要處理es相關操作,自然有es相關的連接配置。我們可以在support下新建一個ESConfig.java,專門定義es的連接。
@Configuration
public class ESConfig {
private Logger logger = LoggerFactory.getLogger(ESConfig.class);
private static final int ADDRESS_LENGTH = 2;
private static final String HTTP_SCHEME = "http";
@Value("${elasticsearch.ip}")
String[] ipAddress;
@Bean(name = "highLevelClient")
public RestHighLevelClient highLevelClient(@Autowired RestClientBuilder restClientBuilder) {
restClientBuilder.setMaxRetryTimeoutMillis(60000); // 超時1min
return new RestHighLevelClient(restClientBuilder);
}
@Bean
public RestClientBuilder restClientBuilder() {
// 從配置參數(shù)里,提取ip和port,包裝成HttpHost的列表
HttpHost[] hosts = Arrays.stream(ipAddress)
.map(this::makeHttpHost)
.filter(Objects::nonNull)
.toArray(HttpHost[]::new);
logger.info("ES hosts:{}", Arrays.toString(hosts));
return RestClient.builder(hosts);
}
private HttpHost makeHttpHost(String s) {
String[] address = s.split(":");
if (address.length == ADDRESS_LENGTH) {
String ip = address[0];
int port = Integer.parseInt(address[1]);
return new HttpHost(ip, port, HTTP_SCHEME);
} else {
return null;
}
}
}
Controller(處理網(wǎng)絡請求)
前端發(fā)來了請求,需要由控制器Controller來響應,定義對應的業(yè)務操作和返回。
1) 頁面控制器(PageController)
這是后臺的主控制器,對頁面請求分配路由,由于用了thymeleaf模板,所以可以直接返回一個字符串:tags就表示用tags.html;index就表示用index.html。
我們在resources下創(chuàng)建templates目錄,將index.html和tags.html放入;把static目錄也創(chuàng)建出來,把js和css拷進去。
代碼如下:
@Controller
@Slf4j
public class PageController {
// 首先是對根路徑的處理,加載index.html頁面
@RequestMapping("/")
public String index() {
return "index";
}
// 對于點選標簽圈人的后臺管理功能,另設一個地址/tags,加載tags.html
@RequestMapping("/tags")
public String tags(){
return "tags";
}
}
2)響應es查詢請求的控制器(EsQueryController)
用來處理es相關操作的請求。定義的請求路徑為/gen,這應該是一個POST請求,用來下載圈好的用戶和手機號,頁面點擊“生成報告”,就可以直接從es生成報告。
@Controller
public class EsQueryController {
@Autowired
EsQueryService service;
// 需要提交表單,POST請求
@RequestMapping("/gen")
public void genAndDown(HttpServletResponse response, @RequestBody String data) {
JSONObject object = JSON.parseObject(data);
JSONArray selectedTags = object.getJSONArray("selectedTags");
List<EsQueryTag> list = selectedTags.toJavaList(EsQueryTag.class);
// 調(diào)用服務,按照查詢限制提取出對應的用戶信息
List<UserTagEtl.MemberTag> tags = service.buildQuery(list);
// 自定義方法,將所有用戶信息拼在一起寫入文件
String content = toContent(tags);
String fileName = "member.txt";
response.setContentType("application/octet-stream");
try {
response.setHeader("Content-Disposition", "attachment; filename=" + URLEncoder.encode(fileName, "UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
try {
ServletOutputStream sos = response.getOutputStream();
BufferedOutputStream bos = new BufferedOutputStream(sos);
bos.write(content.getBytes("UTF-8"));
bos.flush(); // 直接將緩沖池中數(shù)據(jù)刷出,默認是要填滿才發(fā)
bos.close();
sos.close();
} catch (IOException e) {
e.printStackTrace();
}
}
private String toContent(List<UserTagEtl.MemberTag> tags) {
StringBuilder sb = new StringBuilder();
for (UserTagEtl.MemberTag tag : tags) {
sb.append("[" + tag.getMemberId() + "," + tag.getPhone() + "]\r\n");
}
return sb.toString();
}
}
`
Service(核心服務)
EsQueryService里面主要就是構建es的查詢請求。代碼如下
@Service
public class EsQueryService {
@Resource(name = "highLevelClient")
RestHighLevelClient highLevelClient;
public List<UserTagEtl.MemberTag> buildQuery(List<EsQueryTag> tags) {
SearchRequest request = new SearchRequest();
request.indices("usertag");
request.types("_doc");
SearchSourceBuilder builder = new SearchSourceBuilder();
request.source(builder);
String[] includes = {"memberId", "phone"};
builder.fetchSource(includes, null);
builder.from(0); // 查詢結果,從0開始
builder.size(1000); // 大小為1000,相當于 limit 1000
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
builder.query(boolQueryBuilder);
List<QueryBuilder> should = boolQueryBuilder.should();
List<QueryBuilder> mustNot = boolQueryBuilder.mustNot();
List<QueryBuilder> must = boolQueryBuilder.must();
// 遍歷查詢條件參數(shù),判斷是哪一類
for (EsQueryTag tag : tags) {
String name = tag.getName();
String value = tag.getValue();
String type = tag.getType();
if (type.equals("match")) {
should.add(QueryBuilders.matchQuery(name, value));
}
if (type.equals("notMatch")) {
mustNot.add(QueryBuilders.matchQuery(name, value));
}
if (type.equals("rangeBoth")) {
String[] split = value.split("-");
String v1 = split[0];
String v2 = split[1];
should.add(QueryBuilders.rangeQuery(name).lte(v2).gte(v1));
}
if (type.equals("rangeGte")) {
should.add(QueryBuilders.rangeQuery(name).gte(value));
}
if (type.equals("rangeLte")) {
should.add(QueryBuilders.rangeQuery(name).lte(value));
}
if (type.equals("exists")) {
should.add(QueryBuilders.existsQuery(name));
}
}
RequestOptions options = RequestOptions.DEFAULT;
List<UserTagEtl.MemberTag> memberTags = new ArrayList<>();
try {
SearchResponse search = highLevelClient.search(request, options);
SearchHits hits = search.getHits();
Iterator<SearchHit> iterator = hits.iterator();
while (iterator.hasNext()) {
SearchHit hit = iterator.next();
String sourceAsString = hit.getSourceAsString();
UserTagEtl.MemberTag memberTag = JSON.parseObject(sourceAsString, UserTagEtl.MemberTag.class);
memberTags.add(memberTag);
}
return memberTags;
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
}
運行測試
由于我們引入spring-boot的maven-plugin插件,所以可以直接在開發(fā)環(huán)境啟動web server:雙擊 Maven Plugins -> spring boot -> spring boot – run啟動。
訪問localhost:8080可以查看統(tǒng)計信息;
訪問localhost:8080/tags,進入圈人后臺,點選一些標簽,然后點擊“生成”按鈕,看是否有文件下載。