| 
					
				 | 
			
			
				@@ -0,0 +1,81 @@ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+package com.aliyun.odps.spark.examples.makedata_recsys.v20250218 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import com.alibaba.fastjson.JSON 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import com.aliyun.odps.TableSchema 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import com.aliyun.odps.data.Record 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import com.aliyun.odps.spark.examples.myUtils.{MyHdfsUtils, ParamUtils, env} 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import org.apache.hadoop.io.compress.GzipCodec 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import org.apache.spark.sql.SparkSession 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import org.xm.Similarity 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import scala.collection.JavaConversions._ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import scala.collection.mutable.ArrayBuffer 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import scala.util.Random 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+/* 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+   20250218 提取特征 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ */ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+object makedata_recsys_41_data_fu_sample_20250218 { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+  def main(args: Array[String]): Unit = { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    val spark = SparkSession 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+      .builder() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+      .appName(this.getClass.getName) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+      .getOrCreate() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    val sc = spark.sparkContext 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    // 1 读取参数 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    val param = ParamUtils.parseArgs(args) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    val repartition = param.getOrElse("repartition", "32").toInt 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    val readPath = param.getOrElse("readPath", "/dw/recommend/model/41_recsys_origin_date/20250221*/*") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    val savePath = param.getOrElse("savePath", "/dw/recommend/model/41_recsys_sample_data/20250221") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    val fuSampleRate = param.getOrElse("fuSampleRate", "0.05").toDouble 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    val whatLabel = param.getOrElse("whatLabel", "is_share") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    val data = sc.textFile(readPath) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+      .filter { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        line => { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+          val rLine = line.split("\t") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+          val labelJson = JSON.parseObject(rLine(1)) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+          val label = labelJson.getString(whatLabel) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+          "1".equals(label) || new Random().nextDouble() <= fuSampleRate 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+      } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+      .map { line => line } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    // 4 保存数据到hdfs 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    val hdfsPath = savePath 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    if (hdfsPath.nonEmpty && hdfsPath.startsWith("/dw/recommend/model/")) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+      println("删除路径并开始数据写入:" + hdfsPath) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+      MyHdfsUtils.delete_hdfs_path(hdfsPath) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+      data.coalesce(repartition).saveAsTextFile(hdfsPath, classOf[GzipCodec]) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    } else { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+      println("路径不合法,无法写入:" + hdfsPath) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+  } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+  def func(record: Record, schema: TableSchema): Record = { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    record 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+  } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+  def funcC34567ForTags(tags: String, title: String): Tuple4[Double, String, Double, Double] = { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    // 匹配数量 匹配词 语义最高相似度分 语义平均相似度分 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    val tagsList = tags.split(",") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    var d1 = 0.0 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    val d2 = new ArrayBuffer[String]() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    var d3 = 0.0 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    var d4 = 0.0 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    for (tag <- tagsList) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+      if (title.contains(tag)) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        d1 = d1 + 1.0 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        d2.add(tag) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+      } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+      val score = Similarity.conceptSimilarity(tag, title) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+      d3 = if (score > d3) score else d3 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+      d4 = d4 + score 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    d4 = if (tagsList.nonEmpty) d4 / tagsList.size else d4 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    (d1, d2.mkString(","), d3, d4) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+  } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+} 
			 |