expirment_tasks.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. import json
  2. import datetime
  3. from typing import Dict, List
  4. from pymysql.cursors import DictCursor
  5. from applications.api.odps_api import ODPSApi
  6. from applications.db import DatabaseConnector
  7. from config import growth_config, long_articles_config
  8. def format_video_values(value_list: List[List]) -> Dict:
  9. keyword_map = {}
  10. for line in value_list:
  11. video_id = line[0]
  12. video_data = json.loads(line[1])
  13. video_keywords = video_data["video_keywords"]
  14. if video_keywords:
  15. try:
  16. keywords = json.loads(video_keywords)
  17. except:
  18. if "," in video_keywords:
  19. keywords = video_keywords.split(",")
  20. elif "," in video_keywords:
  21. keywords = video_keywords.split(",")
  22. else:
  23. continue
  24. for keyword in keywords:
  25. keyword = keyword.strip()
  26. if keyword_map.get(keyword):
  27. keyword_map[keyword].append(video_id)
  28. else:
  29. keyword_map[keyword] = [video_id]
  30. return keyword_map
  31. def get_candidate_videos(db_client):
  32. """
  33. 获取待处理的视频
  34. """
  35. sql = f"""
  36. SELECT video_id
  37. FROM content_platform_video
  38. WHERE dt > DATE_SUB(CURRENT_DATE, INTERVAL 5 DAY)
  39. group by video_id;
  40. """
  41. response = db_client.fetch(sql, cursor_type=DictCursor)
  42. return response
  43. def get_video_detail(video_tuple: tuple, odps_server: ODPSApi, yesterday: str):
  44. """
  45. 获取视频详情
  46. """
  47. fetch_query = f"""
  48. select id, `data` from loghubods.videoid_feature_aitags
  49. where dt = '{yesterday}'
  50. and id in {video_tuple};
  51. """
  52. res = odps_server.execute_sql(fetch_query)
  53. values_list = res.values.tolist()
  54. return values_list
  55. def save_to_database(date_str, keyword_map, db_client):
  56. """
  57. 保存到数据库
  58. """
  59. sql = f"""
  60. insert into video_keywords_map (date, keywords_map, status)
  61. values (%s, %s, %s);
  62. """
  63. affected_rows = db_client.save(sql, (date_str, keyword_map, 1))
  64. if affected_rows:
  65. update_query = f"""
  66. update video_keywords_map
  67. set status = %s where date != %s;
  68. """
  69. return db_client.save(update_query, (0, date_str))
  70. else:
  71. return 0
  72. def deal():
  73. yesterday = datetime.date.today() - datetime.timedelta(days=1)
  74. yesterday_str = yesterday.strftime("%Y%m%d")
  75. growth_client = DatabaseConnector(growth_config)
  76. growth_client.connect()
  77. long_articles_client = DatabaseConnector(long_articles_config)
  78. long_articles_client.connect()
  79. odps_server = ODPSApi()
  80. video_ids = get_candidate_videos(growth_client)
  81. video_id_tuple = tuple([i["video_id"] for i in video_ids])
  82. video_details = get_video_detail(video_id_tuple, odps_server, yesterday_str)
  83. keyword_map = format_video_values(video_details)
  84. save_to_database(
  85. yesterday_str, json.dumps(keyword_map, ensure_ascii=False), long_articles_client
  86. )