alg_recsys_recall_undertake.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. # -*- coding: utf-8 -*-
  2. import traceback
  3. from config import set_config
  4. from log import Log
  5. from utils import execute_sql_from_odps
  6. from db_helper import RedisHelper
  7. from datetime import datetime, timedelta
  8. from alg_recsys_recall_4h_region_trend import records_process_for_list
  9. config_, _ = set_config()
  10. log_ = Log()
  11. redis_helper = RedisHelper()
  12. REDIS_PREFIX = "alg_recsys_video_tags_"
  13. def get_video_tags():
  14. """获取视频的tag"""
  15. PROJECT = "videoods"
  16. TABLE = "videoods.dim_video"
  17. try:
  18. sql = "SELECT videoid \
  19. ,tags \
  20. FROM {} \
  21. LATERAL VIEW EXPLODE(SPLIT(tags,',')) exploded AS exploded_value \
  22. WHERE tags IS NOT NULL \
  23. AND ( \
  24. exploded_value = '元旦' \
  25. OR exploded_value = '腊八节' \
  26. OR exploded_value = '小年' \
  27. OR exploded_value = '除夕' \
  28. OR exploded_value = '春节' \
  29. OR exploded_value = '情人节' \
  30. OR exploded_value = '元宵节' \
  31. OR exploded_value = '龙抬头' \
  32. OR exploded_value = '妇女节' \
  33. OR exploded_value = '劳动节' \
  34. OR exploded_value = '母亲节' \
  35. OR exploded_value = '儿童节' \
  36. OR exploded_value = '端午节' \
  37. OR exploded_value = '父亲节' \
  38. OR exploded_value = '建党节' \
  39. OR exploded_value = '七七事变' \
  40. OR exploded_value = '建军节' \
  41. OR exploded_value = '七夕节' \
  42. OR exploded_value = '中元节' \
  43. OR exploded_value = '中秋节' \
  44. OR exploded_value = '毛主席逝世' \
  45. OR exploded_value = '国庆节' \
  46. OR exploded_value = '重阳节' \
  47. OR exploded_value = '感恩节' \
  48. OR exploded_value = '公祭日' \
  49. OR exploded_value = '平安夜' \
  50. OR exploded_value = '圣诞节' \
  51. OR exploded_value = '毛主席诞辰' \
  52. OR exploded_value = '小寒' \
  53. OR exploded_value = '大寒' \
  54. OR exploded_value = '立春' \
  55. OR exploded_value = '雨水' \
  56. OR exploded_value = '惊蛰' \
  57. OR exploded_value = '春分' \
  58. OR exploded_value = '清明' \
  59. OR exploded_value = '谷雨' \
  60. OR exploded_value = '立夏' \
  61. OR exploded_value = '小满' \
  62. OR exploded_value = '芒种' \
  63. OR exploded_value = '夏至' \
  64. OR exploded_value = '小暑' \
  65. OR exploded_value = '大暑' \
  66. OR exploded_value = '立秋' \
  67. OR exploded_value = '处暑' \
  68. OR exploded_value = '白露' \
  69. OR exploded_value = '秋分' \
  70. OR exploded_value = '寒露' \
  71. OR exploded_value = '霜降' \
  72. OR exploded_value = '立冬' \
  73. OR exploded_value = '小雪' \
  74. OR exploded_value = '大雪' \
  75. OR exploded_value = '冬至' \
  76. OR exploded_value = '早上好' \
  77. OR exploded_value = '中午好' \
  78. OR exploded_value = '下午好' \
  79. OR exploded_value = '晚上好' \
  80. OR exploded_value = '晚安' \
  81. OR exploded_value = '祝福' \
  82. OR exploded_value = 'P1高风险' \
  83. OR exploded_value = 'P0高风险' \
  84. )".format(TABLE)
  85. print("sql:"+sql)
  86. records = execute_sql_from_odps(project=PROJECT, sql=sql)
  87. video_tags_list = []
  88. with records.open_reader() as reader:
  89. for record in reader:
  90. video_id = int(record['videoid'])
  91. tags = ",".join([i for i in str(record['tags']).split(",") if i in TAG_SET])
  92. d = {}
  93. d["video_id"] = video_id
  94. d["tags"] = tags
  95. video_tags_list.append(d)
  96. # log_.info("{}:{}".format(video_id, tags))
  97. log_.info("天级表:{}".format(str(len(video_tags_list))))
  98. return video_tags_list
  99. except Exception as e:
  100. log_.error(str(e) + str(traceback.format_exc()))
  101. return []
  102. def get_video_tags_v2():
  103. PROJECT = "loghubods"
  104. TABLE = "loghubods.automated_updates_category_labels_1"
  105. # now_date = datetime.today()
  106. # date = datetime.strftime(now_date, '%Y%m%d')
  107. # previous_date = now_date - timedelta(days=1)
  108. # previous_date_str = datetime.strftime(previous_date, '%Y%m%d')
  109. # 获取当前日期
  110. now_date = datetime.today()
  111. # 获取当前月份
  112. current_month = now_date.strftime('%Y%m')
  113. # 获取上个月份
  114. previous_month = (now_date - timedelta(days=now_date.day)).strftime('%Y%m')
  115. try:
  116. sql = '''SELECT videoid
  117. ,secondary_labels
  118. FROM loghubods.automated_updates_category_labels_1
  119. WHERE (
  120. dt LIKE '{}%' OR dt LIKE '{}%'
  121. )
  122. '''.format(current_month, previous_month)
  123. print("sql:" + sql)
  124. records = execute_sql_from_odps(project=PROJECT, sql=sql)
  125. video_tags_list = []
  126. with records.open_reader() as reader:
  127. for record in reader:
  128. video_id = int(record['videoid'])
  129. tags = ",".join([i for i in str(record['secondary_labels']).split(",") if i in TAG_SET])
  130. d = {}
  131. d["video_id"] = video_id
  132. d["tags"] = tags
  133. video_tags_list.append(d)
  134. # log_.info("{}:{}".format(video_id, tags))
  135. log_.info("增量表:{}".format(str(len(video_tags_list))))
  136. return video_tags_list
  137. except Exception as e:
  138. log_.error(str(e) + str(traceback.format_exc()))
  139. return []
  140. def process_and_store(row):
  141. video_id = row["video_id"]
  142. tags = row["tags"]
  143. key = REDIS_PREFIX + str(video_id)
  144. expire_time = 24 * 3600 * 2
  145. redis_helper.set_data_to_redis(key, tags, expire_time)
  146. # log_.info("video-tags写入数据key={},value={}".format(key, tags))
  147. def main():
  148. log_.info("开始执行:" + datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
  149. video_tags_list = get_video_tags()
  150. video_tags_list2 = get_video_tags_v2()
  151. m1 = {}
  152. m2 = {}
  153. for row in video_tags_list:
  154. m1[row["video_id"]] = row["tags"]
  155. for row in video_tags_list2:
  156. m2[row["video_id"]] = row["tags"]
  157. merged_map = {}
  158. merged_map.update(m1)
  159. for key, value in m2.items():
  160. if key in merged_map:
  161. l1 = merged_map[key].split(",")
  162. l2 = value.split(",")
  163. l1.extend(l2)
  164. l3 = list(set(l1))
  165. merged_map[key] = ",".join(l3)
  166. else:
  167. merged_map[key] = value
  168. result = []
  169. for key, value in merged_map.items():
  170. tmp = {}
  171. tmp["video_id"] = key
  172. tmp["tags"] = value
  173. result.append(tmp)
  174. log_.info("{}:{}".format(key, value))
  175. log_.info("video的数据量:{}".format(len(result)))
  176. records_process_for_list(result, process_and_store, max_size=50, num_workers=8)
  177. log_.info("video的数据量:{}".format(len(result)))
  178. log_.info("完成执行:" + datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
  179. if __name__ == '__main__':
  180. main()
  181. # cd /root/zhangbo/rov-offline
  182. # python alg_recsys_recall_shield_videos.py