1 項目介紹
2 涉及的技術(shù)
3 推薦流程圖
4 收獲
5 問題
1 項目介紹
- 使用Spark框架實現(xiàn)電影推薦系統(tǒng);
- 運用數(shù)據(jù)挖掘的算法產(chǎn)生模型,為用戶精準(zhǔn)推薦喜好的電影;
- 分別通過離線和實時兩種方式實現(xiàn)電影推薦系統(tǒng);
2 涉及技術(shù)
Spark:基于內(nèi)存的分布式計算框架
Hadoop:分布式離線計算框架
Hive:基于Hadoop的一個數(shù)據(jù)倉庫工具,可以將結(jié)構(gòu)化的數(shù)據(jù)文件映射為一張數(shù)據(jù)庫表,并提供簡單的sql查詢功能,可以將sql語句轉(zhuǎn)換為MapReduce任務(wù)進(jìn)行運行
Kafka:分布式高并發(fā)消息隊列,負(fù)責(zé)緩存Flume采集的數(shù)據(jù)并為下游的各種計算提供高并發(fā)的數(shù)據(jù)處理
Hbase:億級行百萬列并可毫秒級查詢的數(shù)據(jù)庫,可快速查詢我們的計算數(shù)據(jù)
Phoenix:是構(gòu)建在HBase上的SQL中間層,Phoenix查詢引擎會將SQL查詢轉(zhuǎn)換為一個或者多個HBase Scan,并行執(zhí)行以生成標(biāo)準(zhǔn)的JDBC結(jié)果集。
3 推薦流程圖

image.png
解釋如下:
- 加載HDFS數(shù)據(jù),處理之后存儲到Hive中;
- 離線推薦部分技術(shù)處理思路;
- 從Hive中加載訓(xùn)練數(shù)據(jù)和測試數(shù)據(jù)
- 使用SparkMLlib的ALS交替最小二乘法訓(xùn)練模型
- 使用模型產(chǎn)生推薦結(jié)果
- 將推薦結(jié)果寫入到Mysql、Hive、Phoenix+Hbase中
- 實時推薦部分技術(shù)處理思路;
- 從Hive中拿出數(shù)據(jù)
- 取出測試數(shù)據(jù)集中數(shù)據(jù),send到Kafka中。
- 通過SparkStreaming主動Kafka消息隊列獲取數(shù)據(jù),并根據(jù)用戶是否為新用戶制定推薦策略
- 新用戶,從訓(xùn)練數(shù)據(jù)集中取出瀏覽人數(shù)最多的電影的前5部作為推薦結(jié)果
- 老用戶,使用推薦模型為用戶推薦5部電影

image.png
4 收獲
1 大數(shù)據(jù)環(huán)境搭建
(1)單機(jī)版Hadoop、Spark、Hive、Mysql的搭建
Spark處理HDFS數(shù)據(jù),并將結(jié)果存儲在Hive中
配置一臺Hive + Mysql元數(shù)據(jù)庫
2 數(shù)據(jù)初始預(yù)處理
object ETL {
def main(args: Array[String]): Unit = {
val localClusterURL = "local[2]"
val clusterMasterURL = "spark://s1:7077"
val conf = new SparkConf().setAppName("ETL").setMaster(clusterMasterURL)
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val hc = new HiveContext(sc)
import sqlContext.implicits._
hc.sql("use moive_recommend")
// 設(shè)置RDD的partition的數(shù)量一般以集群分配給應(yīng)用的CPU核數(shù)的整數(shù)倍為宜。
val minPartitions = 8
// 通過case class來定義Links的數(shù)據(jù)結(jié)構(gòu),數(shù)據(jù)的schema,適用于schama已知的數(shù)據(jù)
// 也可以通過StructType的方式,適用于schema未知的數(shù)據(jù),具體參考文檔:
//http://spark.apache.org/docs/1.6.2/sql-programming-guide.html#programmatically-specifying-the-schema
val links = sc.textFile("hdfs://localhost/home/spark/temp/moiveRec/links.txt",minPartitions).filter{ !_.endsWith(",")}.map(_.split(","))
.map(x =>Links(x(0).trim.toInt,x(1).trim().toInt,x(2).trim().toInt)).toDF()
val movies = sc.textFile("hdfs://localhost/home/spark/temp/moiveRec/movies.txt",minPartitions).filter{ !_.endsWith(",")}.map(_.split(","))
.map(x =>Movies(x(0).trim.toInt,x(1).trim(),x(2).trim())).toDF()
val ratings = sc.textFile("hdfs://localhost/home/spark/temp/moiveRec/ratings.txt",minPartitions).filter{ !_.endsWith(",")}.map(_.split(","))
.map(x =>Ratings(x(0).trim.toInt,x(1).trim().toInt,x(2).trim().toDouble,x(3).trim().toInt)).toDF()
val tags = sc.textFile("hdfs://localhost/home/spark/temp/moiveRec/tags.txt", minPartitions).filter { !_.endsWith(",") }.map(x=>rebuild(x))
.map(_.split(",")).map(x => Tags(x(0).trim().toInt, x(1).trim().toInt, x(2).trim(), x(3).trim().toInt)).toDF()
links.write.mode(SaveMode.Overwrite).parquet("/home/spark/temp/moiveRec/links")
hc.sql("drop table if exists links")
hc.sql("create table if not exists links(movieId int,imdbId int,tmdbId int) stored as parquet" )
hc.sql("load data inpath '/home/spark/temp/moiveRec/links' overwrite into table links")
movies.write.mode(SaveMode.Overwrite).parquet("/home/spark/temp/moiveRec/movies")
hc.sql("drop table if exists movies")
hc.sql("create table if not exists movies(movieId int,title string,genres string) stored as parquet" )
hc.sql("load data inpath '/home/spark/temp/moiveRec/movies' overwrite into table movies")
ratings.write.mode(SaveMode.Overwrite).parquet("/home/spark/temp/moiveRec/ratings")
hc.sql("drop table if exists ratings")
hc.sql("create table if not exists ratings(userId int,movieId int,rating double,timestamp int) stored as parquet" )
hc.sql("load data inpath '/home/spark/temp/moiveRec/ratings' overwrite into table ratings")
tags.write.mode(SaveMode.Overwrite).parquet("/home/spark/temp/moiveRec/tags")
hc.sql("drop table if exists tags")
hc.sql("create table if not exists tags(userId int,movieId int,tag string,timestamp int) stored as parquet")
hc.sql("load data inpath '/home/spark/temp/moiveRec/tags' overwrite into table tags")
}
// tags中大部分?jǐn)?shù)據(jù)格式如下:
// 4208,260,Action-packed,1438012536
// 但會出現(xiàn)如下的數(shù)據(jù):
// 4208,260,"Family,Action-packed",1438012562
// 這樣對數(shù)據(jù)split后插入hive中就會出錯,需清洗數(shù)據(jù):
// 4208,260,"Family,Action-packed",1438012562 => 4208,260,FamilyAction-packed,1438012562
private def rebuild(input:String):String = {
val a = input.split(",")
val head = a.take(2).mkString(",")//提取列表的前2個元素
val tail = a.takeRight(1).mkString//提取列表的最后1個元素
val b = a.drop(2).dropRight(1).mkString.replace("\"", "")
val output = head + "," + b + "," + tail
output
}
}
3 Hive的使用
4 SparkMLlib機(jī)器學(xué)習(xí)算法庫的使用
/**
* KafkaProducer從測試數(shù)據(jù)集中取出數(shù)據(jù)
*/
object Spark_MovieTraining extends AppConf {
def main(args: Array[String]): Unit = {
hc.sql("use moive_recommend")
// 訓(xùn)練集,總數(shù)據(jù)集的60%
val trainingData = hc.sql("select * from trainingData")
val ratingRDD = hc.sql("select * from trainingData")
.rdd.map(x => Rating(x.getInt(0), x.getInt(1), x.getDouble(2)))
// Build the recommendation model using ALS
val rank = 10
val numIterations = 10
val model = ALS.train(ratingRDD, rank, numIterations, 0.01)
// Evaluate the model on rating data
val training = ratingRDD.map {
case Rating(userid, movieid, rating) => (userid, movieid)
}
ratingRDD.persist()
training.persist()
val predictions =
model.predict(training).map {
case Rating(userid, movieid, rating) => ((userid, movieid), rating)
}
val ratesAndPreds = ratingRDD.map { case Rating(userid, movieid, rating) =>
((userid, movieid), rating)
}.join(predictions)
val MSE = ratesAndPreds.map { case ((userid, movieid), (r1, r2)) =>
val err = (r1 - r2)
err * err
}.mean()
println(s"Mean Squared Error = $MSE")
// Save and load model
model.save(sc, s"/home/spark/temp/moiveRec/BestModel1/$MSE")
//val sameModel = MatrixFactorizationModel.load(sc, "/home/spark/temp/moiveRec/BestModel/")
}
}
5 實時推薦部分Kafka + Streaming + Phoenix+Hbase流處理
object KafkaProducer extends AppConf {
def main(args: Array[String]): Unit = {
hc.sql("use moive_recommend")
val testDF = hc.sql("select * from testData limit 10000")
val prop = new Properties()
// 指定kafka的 ip地址:端口號
prop.put("bootstrap.servers", "s1:9092")
// 設(shè)定ProducerRecord發(fā)送的key值為String類型
prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
// 設(shè)定ProducerRecord發(fā)送的value值為String類型
prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val topic = "movie"
val testData = testDF.map(
x => (topic, x.getInt(0).toString() + "," + x.getInt(1).toString + "," + x.getDouble(2).toString())
)
val producer = new KafkaProducer[String, String](prop)
// 如果服務(wù)器內(nèi)存不夠,會出現(xiàn)OOM錯誤
val messages = testData.toLocalIterator
while (messages.hasNext) {
val message = messages.next()
val record = new ProducerRecord[String, String](topic, message._1, message._2)
println(record)
producer.send(record)
// 延遲10毫秒
Thread.sleep(10)
}
producer.close()
}
}
/**
* 接收kafka產(chǎn)生的數(shù)據(jù),進(jìn)行處理
*/
object SparkDirectStream {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("SparkDirectStream").setMaster("spark://s1:7077")
// Duration對象中封裝了時間的一個對象,它的單位是ms.
val batchDuration = new Duration(5000)
// batchDuration為時間間隔
val ssc = new StreamingContext(conf, batchDuration)
val hc = new HiveContext(ssc.sparkContext)
// 訓(xùn)練數(shù)據(jù)中是否有該用戶
val validusers = hc.sql("select * from trainingData")
val userlist = validusers.select("userId")
val modelpath = "/home/spark/temp/moiveRec/BestModel1/0.5366434001808432"
val broker = "s1:9092"
// val topics = "movie".split(",").toSet
val topics = Set("movie")
// val kafkaParams = Map("bootstrap.servers" -> "spark1:9092")
val kafkaParams = Map("metadata.broker.list" -> "s1:9092")
def exist(u: Int): Boolean = {
val userlist = hc.sql("select distinct(userid) from trainingdata").rdd.map(x => x.getInt(0)).toArray()
userlist.contains(u)
}
// 為沒有登錄的用戶推薦電影的策略:
// 1.推薦觀看人數(shù)較多的電影,采用這種策略
// 2.推薦最新的電影
val defaultrecresult = hc.sql("select * from pop5result").rdd.toLocalIterator
// 創(chuàng)建SparkStreaming接收kafka消息隊列數(shù)據(jù)的2種方式
// 一種是Direct approache,通過SparkStreaming自己主動去Kafka消息隊
// 列中查詢還沒有接收進(jìn)來的數(shù)據(jù),并把他們拿到sparkstreaming中。
val kafkaDirectStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
val model = MatrixFactorizationModel.load(ssc.sparkContext, modelpath)
val messages = kafkaDirectStream.foreachRDD { rdd =>
// println(rdd)
val userrdd = rdd.map(x => x._2.split(",")).map(x => x(1)).map(_.toInt)
val validusers = userrdd.filter(user => exist(user))
val newusers = userrdd.filter(user => !exist(user))
// 采用迭代器的方式來避開對象不能序列化的問題。
// 通過對RDD中的每個元素實時產(chǎn)生推薦結(jié)果,將結(jié)果寫入到redis,或者其他高速緩存中,來達(dá)到一定的實時性。
// 2個流的處理分成2個sparkstreaming的應(yīng)用來處理。
val validusersIter = validusers.toLocalIterator
val newusersIter = newusers.toLocalIterator
while (validusersIter.hasNext) {
val recresult = model.recommendProducts(validusersIter.next, 5)
println("below movies are recommended for you :")
println(recresult)
}
while (newusersIter.hasNext) {
println("below movies are recommended for you :")
for (i <- defaultrecresult) {
println(i.getString(0))
}
}
}
ssc.start()
ssc.awaitTermination()
}
}
5 問題
依舊不熟悉scala語言,在使用Spark時很多東西依舊不知道
參考文獻(xiàn)
http://www.dajiangtai.com/course/56.do
end