123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106 |
- import json
- import datetime
- from typing import Dict, List
- from pymysql.cursors import DictCursor
- from applications.api.odps_api import ODPSApi
- from applications.db import DatabaseConnector
- from config import growth_config, long_articles_config
- def format_video_values(value_list: List[List]) -> Dict:
- keyword_map = {}
- for line in value_list:
- video_id = line[0]
- video_data = json.loads(line[1])
- video_keywords = video_data["video_keywords"]
- if video_keywords:
- try:
- keywords = json.loads(video_keywords)
- except:
- if "," in video_keywords:
- keywords = video_keywords.split(",")
- elif "," in video_keywords:
- keywords = video_keywords.split(",")
- else:
- continue
- for keyword in keywords:
- keyword = keyword.strip()
- if keyword_map.get(keyword):
- keyword_map[keyword].append(video_id)
- else:
- keyword_map[keyword] = [video_id]
- return keyword_map
- def get_candidate_videos(db_client):
- """
- 获取待处理的视频
- """
- sql = f"""
- SELECT video_id
- FROM content_platform_video
- WHERE dt > DATE_SUB(CURRENT_DATE, INTERVAL 5 DAY)
- group by video_id;
- """
- response = db_client.fetch(sql, cursor_type=DictCursor)
- return response
- def get_video_detail(video_tuple: tuple, odps_server: ODPSApi, yesterday: str):
- """
- 获取视频详情
- """
- fetch_query = f"""
- select id, `data` from loghubods.videoid_feature_aitags
- where dt = '{yesterday}'
- and id in {video_tuple};
- """
- res = odps_server.execute_sql(fetch_query)
- values_list = res.values.tolist()
- return values_list
- def save_to_database(date_str, keyword_map, db_client):
- """
- 保存到数据库
- """
- sql = f"""
- insert into video_keywords_map (date, keywords_map, status)
- values (%s, %s, %s);
- """
- affected_rows = db_client.save(sql, (date_str, keyword_map, 1))
- if affected_rows:
- update_query = f"""
- update video_keywords_map
- set status = %s where date != %s;
- """
- return db_client.save(update_query, (0, date_str))
- else:
- return 0
- def deal():
- yesterday = datetime.date.today() - datetime.timedelta(days=1)
- yesterday_str = yesterday.strftime("%Y%m%d")
- growth_client = DatabaseConnector(growth_config)
- growth_client.connect()
- long_articles_client = DatabaseConnector(long_articles_config)
- long_articles_client.connect()
- odps_server = ODPSApi()
- video_ids = get_candidate_videos(growth_client)
- video_id_tuple = tuple([i["video_id"] for i in video_ids])
- video_details = get_video_detail(video_id_tuple, odps_server, yesterday_str)
- keyword_map = format_video_values(video_details)
- save_to_database(
- yesterday_str, json.dumps(keyword_map, ensure_ascii=False), long_articles_client
- )
|