|
@@ -5,7 +5,7 @@ import com.aliyun.odps.TableSchema
|
|
import com.aliyun.odps.data.Record
|
|
import com.aliyun.odps.data.Record
|
|
import com.aliyun.odps.spark.examples.myUtils.{MyDateUtils, MyHdfsUtils, ParamUtils, env}
|
|
import com.aliyun.odps.spark.examples.myUtils.{MyDateUtils, MyHdfsUtils, ParamUtils, env}
|
|
import examples.extractor.RankExtractorFeature_20240530
|
|
import examples.extractor.RankExtractorFeature_20240530
|
|
-import examples.utils.DateTimeUtil
|
|
|
|
|
|
+import examples.utils.{AdUtil, DateTimeUtil}
|
|
import org.apache.hadoop.io.compress.GzipCodec
|
|
import org.apache.hadoop.io.compress.GzipCodec
|
|
import org.apache.spark.sql.SparkSession
|
|
import org.apache.spark.sql.SparkSession
|
|
import org.xm.Similarity
|
|
import org.xm.Similarity
|
|
@@ -55,16 +55,7 @@ object makedata_ad_31_originData_20240718 {
|
|
transfer = func,
|
|
transfer = func,
|
|
numPartition = tablePart)
|
|
numPartition = tablePart)
|
|
.filter(record => {
|
|
.filter(record => {
|
|
- val extendAlg: JSONObject = if (record.isNull("extend_alg")) new JSONObject() else
|
|
|
|
- JSON.parseObject(record.getString("extend_alg"))
|
|
|
|
- var isApi = extendAlg.getString("is_api")
|
|
|
|
- if (extendAlg.containsKey("extinfo")) {
|
|
|
|
- val extInfoJson = extendAlg.getJSONObject("extinfo")
|
|
|
|
- if (extInfoJson.containsKey("isApi")) {
|
|
|
|
- isApi = extInfoJson.getString("isApi")
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- "1".equals(isApi)
|
|
|
|
|
|
+ AdUtil.isApi(record)
|
|
})
|
|
})
|
|
.map(record => {
|
|
.map(record => {
|
|
|
|
|