|
@@ -91,16 +91,21 @@ def get_table_data(project, table, partition):
|
|
|
data.append(tmp)
|
|
|
return data
|
|
|
|
|
|
+def region_match(region_cn: str, region_name2code: dict):
|
|
|
+ for r in region_name2code:
|
|
|
+ if region_cn == r or region_cn in r or (
|
|
|
+ region_cn.endswith("省") and region_cn.split("省")[0] in r
|
|
|
+ ):
|
|
|
+ return region_name2code[r]
|
|
|
+ return None
|
|
|
+
|
|
|
def process_and_store(row):
|
|
|
- region = row["region"]
|
|
|
- if region not in region_name2code:
|
|
|
- return
|
|
|
+ region_code = row["region_code"]
|
|
|
videoid_array_sum = row["videoid_array_sum"]
|
|
|
videoid_array_avg = row["videoid_array_avg"]
|
|
|
- region_code = region_name2code[region]
|
|
|
key1 = "alg_recsys_recall_4h_region_trend_sum_" + region_code
|
|
|
key2 = "alg_recsys_recall_4h_region_trend_avg_" + region_code
|
|
|
- expire_time = 24 * 3600
|
|
|
+ expire_time = 24 * 3600 * 4
|
|
|
redis_helper.set_data_to_redis(key1, videoid_array_sum, expire_time)
|
|
|
redis_helper.set_data_to_redis(key2, videoid_array_avg, expire_time)
|
|
|
log_.info("trend-sum写入数据key={},value={}".format(key1, videoid_array_sum))
|
|
@@ -124,13 +129,16 @@ def h_timer_check():
|
|
|
log_.info("上游数据就绪,count={},开始读取数据表".format(table_data_cnt))
|
|
|
#2 读取数据表
|
|
|
data = get_table_data(project, table, partition)
|
|
|
+ data_new = []
|
|
|
for one in data:
|
|
|
- if one["region"] in region_name2code:
|
|
|
- pass
|
|
|
+ region_code = region_match(one["region"], region_name2code)
|
|
|
+ if region_code:
|
|
|
+ one["region_code"] = region_code
|
|
|
+ data_new.append(one)
|
|
|
else:
|
|
|
- log_.info("{}被过滤掉了".format(one))
|
|
|
- data = [one for one in data if one["region"] in region_name2code]
|
|
|
- log_.info("数据处理完成,数据数量={},所有的地域code:".format(len(data), ",".join([
|
|
|
+ log_.info("{}被过滤掉了".format(one["region"]))
|
|
|
+
|
|
|
+ log_.info("数据处理完成,数据数量={},所有的地域:".format(len(data_new), ",".join([
|
|
|
one["region"] for one in data
|
|
|
])))
|
|
|
log_.info("开始处理和写入redis")
|