|
@@ -12,55 +12,6 @@ log_ = Log()
|
|
|
redis_helper = RedisHelper()
|
|
|
|
|
|
|
|
|
-REDIS_PREFIX = "alg_recsys_video_tags_"
|
|
|
-
|
|
|
-
|
|
|
-def get_video_tags_v2():
|
|
|
- PROJECT = "loghubods"
|
|
|
- TABLE = "loghubods.automated_updates_category_labels_1"
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
- now_date = datetime.today()
|
|
|
-
|
|
|
- current_month = now_date.strftime('%Y%m')
|
|
|
-
|
|
|
- previous_month = (now_date - timedelta(days=now_date.day)).strftime('%Y%m')
|
|
|
- try:
|
|
|
- sql = '''SELECT videoid
|
|
|
- ,secondary_labels
|
|
|
-FROM loghubods.automated_updates_category_labels_1
|
|
|
-WHERE (
|
|
|
- dt LIKE '{}%' OR dt LIKE '{}%'
|
|
|
-)
|
|
|
-'''.format(current_month, previous_month)
|
|
|
- print("sql:" + sql)
|
|
|
- records = execute_sql_from_odps(project=PROJECT, sql=sql)
|
|
|
- video_tags_list = []
|
|
|
- with records.open_reader() as reader:
|
|
|
- for record in reader:
|
|
|
- video_id = int(record['videoid'])
|
|
|
- tags = ",".join([i for i in str(record['secondary_labels']).split(",") if i in TAG_SET])
|
|
|
- d = {}
|
|
|
- d["video_id"] = video_id
|
|
|
- d["tags"] = tags
|
|
|
- video_tags_list.append(d)
|
|
|
-
|
|
|
- log_.info("增量表:{}".format(str(len(video_tags_list))))
|
|
|
- return video_tags_list
|
|
|
- except Exception as e:
|
|
|
- log_.error(str(e) + str(traceback.format_exc()))
|
|
|
- return []
|
|
|
-def process_and_store(row):
|
|
|
- video_id = row["video_id"]
|
|
|
- tags = row["tags"]
|
|
|
- key = REDIS_PREFIX + str(video_id)
|
|
|
- expire_time = 24 * 3600 * 2
|
|
|
- redis_helper.set_data_to_redis(key, tags, expire_time)
|
|
|
-
|
|
|
|
|
|
def main():
|
|
|
log_.info("开始执行:" + datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
|
|
@@ -108,4 +59,4 @@ if __name__ == '__main__':
|
|
|
|
|
|
|
|
|
|
|
|
-
|
|
|
+
|