alg_recsys_recall_undertake.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. # -*- coding: utf-8 -*-
  2. import traceback
  3. from config import set_config
  4. from log import Log
  5. from utils import execute_sql_from_odps
  6. from db_helper import RedisHelper
  7. from datetime import datetime, timedelta
  8. from alg_recsys_recall_4h_region_trend import records_process_for_list
  9. config_, _ = set_config()
  10. log_ = Log()
  11. redis_helper = RedisHelper()
  12. REDIS_PREFIX = "alg_recsys_video_tags_"
  13. def get_video_tags_v2():
  14. PROJECT = "loghubods"
  15. TABLE = "loghubods.automated_updates_category_labels_1"
  16. # now_date = datetime.today()
  17. # date = datetime.strftime(now_date, '%Y%m%d')
  18. # previous_date = now_date - timedelta(days=1)
  19. # previous_date_str = datetime.strftime(previous_date, '%Y%m%d')
  20. # 获取当前日期
  21. now_date = datetime.today()
  22. # 获取当前月份
  23. current_month = now_date.strftime('%Y%m')
  24. # 获取上个月份
  25. previous_month = (now_date - timedelta(days=now_date.day)).strftime('%Y%m')
  26. try:
  27. sql = '''SELECT videoid
  28. ,secondary_labels
  29. FROM loghubods.automated_updates_category_labels_1
  30. WHERE (
  31. dt LIKE '{}%' OR dt LIKE '{}%'
  32. )
  33. '''.format(current_month, previous_month)
  34. print("sql:" + sql)
  35. records = execute_sql_from_odps(project=PROJECT, sql=sql)
  36. video_tags_list = []
  37. with records.open_reader() as reader:
  38. for record in reader:
  39. video_id = int(record['videoid'])
  40. tags = ",".join([i for i in str(record['secondary_labels']).split(",") if i in TAG_SET])
  41. d = {}
  42. d["video_id"] = video_id
  43. d["tags"] = tags
  44. video_tags_list.append(d)
  45. # log_.info("{}:{}".format(video_id, tags))
  46. log_.info("增量表:{}".format(str(len(video_tags_list))))
  47. return video_tags_list
  48. except Exception as e:
  49. log_.error(str(e) + str(traceback.format_exc()))
  50. return []
  51. def process_and_store(row):
  52. video_id = row["video_id"]
  53. tags = row["tags"]
  54. key = REDIS_PREFIX + str(video_id)
  55. expire_time = 24 * 3600 * 2
  56. redis_helper.set_data_to_redis(key, tags, expire_time)
  57. # log_.info("video-tags写入数据key={},value={}".format(key, tags))
  58. def main():
  59. log_.info("开始执行:" + datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
  60. #1 读取当前时间及上一时间
  61. now_date = datetime.today()
  62. hour = datetime.strftime(now_date, '%H').lstrip('0')
  63. date = datetime.strftime(now_date, '%Y%m%d')
  64. if hour == "":
  65. hour = "0"
  66. previous_date = now_date + timedelta(hours=1)
  67. hour_next = datetime.strftime(previous_date, '%H').lstrip('0')
  68. date_next = datetime.strftime(previous_date, '%Y%m%d')
  69. if hour_next == "":
  70. hour_next = "0"
  71. #2 循环拼接key,读一个写一个。
  72. REGION_CODE = {
  73. '北京': '110000', '天津': '120000', '河北省': '130000', '山西省': '140000', '内蒙古': '150000',
  74. '辽宁省': '210000', '吉林省': '220000', '黑龙江省': '230000',
  75. '上海': '310000', '江苏省': '320000', '浙江省': '330000', '安徽省': '340000', '福建省': '350000',
  76. '江西省': '360000',
  77. '山东省': '370000',
  78. '河南省': '410000', '湖北省': '420000', '湖南省': '430000', '广东省': '440000', '广西': '450000',
  79. '海南省': '460000',
  80. '重庆': '500000', '四川省': '510000', '贵州省': '520000', '云南省': '530000', '西藏': '540000',
  81. '陕西省': '610000', '甘肃省': '620000', '青海省': '630000', '宁夏': '640000', '新疆': '650000',
  82. '台湾省': '710000', '香港': '810000', '澳门': '820000',
  83. '广州': '440100', '深圳': '440300', '成都': '510100', '长沙': '430100',
  84. }
  85. for _, code in REGION_CODE.items():
  86. read_key = "recall:item:score:region:h:{}:data66:rule66:{}:{}".format(code, date, hour)
  87. value = redis_helper.get_data_from_set(read_key)
  88. print(read_key)
  89. print(value)
  90. write_key = "recall:item:score:region:h:{}:data66:rule66:{}:{}".format(code, date_next, hour_next)
  91. print(write_key)
  92. # records_process_for_list(result, process_and_store, max_size=50, num_workers=8)
  93. log_.info("完成执行:" + datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
  94. if __name__ == '__main__':
  95. main()
  96. # cd /root/zhangbo/rov-offline
  97. # python alg_recsys_recall_shield_videos.py