import json import datetime from typing import Dict, List from pymysql.cursors import DictCursor from applications.api.odps_api import ODPSApi from applications.db import DatabaseConnector from config import growth_config, long_articles_config def format_video_values(value_list: List[List]) -> Dict: keyword_map = {} for line in value_list: video_id = line[0] video_data = json.loads(line[1]) video_keywords = video_data["video_keywords"] if video_keywords: try: keywords = json.loads(video_keywords) except: if "," in video_keywords: keywords = video_keywords.split(",") elif "," in video_keywords: keywords = video_keywords.split(",") else: continue for keyword in keywords: keyword = keyword.strip() if keyword_map.get(keyword): keyword_map[keyword].append(video_id) else: keyword_map[keyword] = [video_id] return keyword_map def get_candidate_videos(db_client): """ 获取待处理的视频 """ sql = f""" SELECT video_id FROM content_platform_video WHERE dt > DATE_SUB(CURRENT_DATE, INTERVAL 5 DAY) group by video_id; """ response = db_client.fetch(sql, cursor_type=DictCursor) return response def get_video_detail(video_tuple: tuple, odps_server: ODPSApi, yesterday: str): """ 获取视频详情 """ fetch_query = f""" select id, `data` from loghubods.videoid_feature_aitags where dt = '{yesterday}' and id in {video_tuple}; """ res = odps_server.execute_sql(fetch_query) values_list = res.values.tolist() return values_list def save_to_database(date_str, keyword_map, db_client): """ 保存到数据库 """ sql = f""" insert into video_keywords_map (date, keywords_map, status) values (%s, %s, %s); """ affected_rows = db_client.save(sql, (date_str, keyword_map, 1)) if affected_rows: update_query = f""" update video_keywords_map set status = %s where date != %s; """ return db_client.save(update_query, (0, date_str)) else: return 0 def deal(): yesterday = datetime.date.today() - datetime.timedelta(days=1) yesterday_str = yesterday.strftime("%Y%m%d") growth_client = DatabaseConnector(growth_config) growth_client.connect() long_articles_client = DatabaseConnector(long_articles_config) long_articles_client.connect() odps_server = ODPSApi() video_ids = get_candidate_videos(growth_client) video_id_tuple = tuple([i["video_id"] for i in video_ids]) video_details = get_video_detail(video_id_tuple, odps_server, yesterday_str) keyword_map = format_video_values(video_details) save_to_database( yesterday_str, json.dumps(keyword_map, ensure_ascii=False), long_articles_client )