|
@@ -23,6 +23,7 @@ object makedata_09_user2redis_freq {
|
|
|
.appName(this.getClass.getName)
|
|
|
.getOrCreate()
|
|
|
val sc = spark.sparkContext
|
|
|
+ sc.setCheckpointDir("/dw/recommend/model/99_zhangbo_checkpoint/")
|
|
|
|
|
|
// 1 读取参数
|
|
|
val param = ParamUtils.parseArgs(args)
|
|
@@ -92,7 +93,6 @@ object makedata_09_user2redis_freq {
|
|
|
.reduceByKey((a, b) => Math.max(a.toLong, b.toLong).toString)
|
|
|
.filter(r => DateUtils.parseDate(date, Array[String]("yyyyMMdd")).getTime / 1000 - r._2.toLong / 1000 < 3600 * 24 * midDays)
|
|
|
println("------------mid处理完毕,近期保留的用户有:" + midRdd.count() + "------------------")
|
|
|
-
|
|
|
//5 用户区分
|
|
|
val savePathPart = savePathUser + "/all/" + partition
|
|
|
val userDataRead = sc.textFile(savePathPart).filter(_.split("\t").length >= 2)
|
|
@@ -100,6 +100,7 @@ object makedata_09_user2redis_freq {
|
|
|
val rList = r.split("\t")
|
|
|
(rList(0), rList(1))
|
|
|
}).join(midRdd).map(r => (r._1, r._2._1))
|
|
|
+ userDataRead.checkpoint()
|
|
|
// .leftOuterJoin(midRdd).map {
|
|
|
// case (mid, (fea, Some(_))) =>
|
|
|
// (mid, fea, true)
|