task3.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. """
  2. @author: luojunhui
  3. """
  4. import datetime
  5. from tqdm import tqdm
  6. from applications import AIDTApi, DeNetMysql, PQMySQL, Functions, ODPSApi
  7. from config import poolTagMap
  8. class SendToMultiLevels(object):
  9. """
  10. 冷启分配任务
  11. """
  12. AidApi = AIDTApi()
  13. DeMysql = DeNetMysql()
  14. PqMysql = PQMySQL()
  15. Fun = Functions()
  16. OA = ODPSApi()
  17. @classmethod
  18. def getYesterdayData(cls):
  19. """
  20. 获取前一天数据表现
  21. :return:
  22. """
  23. odps_sql = "select * from loghubods.changwen_article_datastat where dt = '20240724';"
  24. result = cls.OA.select(sql=odps_sql)
  25. response_list = [
  26. {
  27. "article_id": record["article_id"],
  28. "increase_read_count": record["increase_read_count"],
  29. "read_count": record["read_count"],
  30. "increase_income": record["increase_income"],
  31. "income": record["income"],
  32. "increase_share_count": record["increase_share_count"],
  33. "share_count": record["share_count"],
  34. "update_timestamp": record["update_timestamp"]
  35. } for record in result
  36. ]
  37. return response_list
  38. @classmethod
  39. def splitToDifferentPools(cls, yesterday_data):
  40. """
  41. 分类至Pools
  42. :return:
  43. """
  44. pool_level_1 = [cls.Fun.matchLinkById(i['article_id']) for i in tqdm(yesterday_data) if
  45. i['increase_read_count'] >= 9000]
  46. pool_level_2 = [cls.Fun.matchLinkById(i['article_id']) for i in tqdm(yesterday_data) if
  47. 3500 <= i['increase_read_count'] < 9000]
  48. pool_level_3 = [cls.Fun.matchLinkById(i['article_id']) for i in tqdm(yesterday_data) if
  49. 1000 <= i['increase_read_count'] < 3500]
  50. L = {
  51. "Level1": pool_level_1,
  52. "Level2": pool_level_2,
  53. "Level3": pool_level_3
  54. }
  55. return L
  56. @classmethod
  57. def sendToEachCrawlerPlan(cls, key, url_list):
  58. """
  59. :param key:
  60. :param url_list:
  61. :return:
  62. """
  63. # daily自动创建新抓取计划
  64. cls.AidApi.updateArticleIntoCrawlerPlan(
  65. plan_id=None,
  66. plan_name="{}--{}".format(datetime.datetime.today().__str__().split(" ")[0], key),
  67. plan_tag=poolTagMap[key],
  68. url_list=url_list
  69. )
  70. @classmethod
  71. def sendToDifferentPools(cls, pool_info):
  72. """
  73. 获取文章url
  74. :return:
  75. """
  76. for key in pool_info:
  77. cls.sendToEachCrawlerPlan(key, pool_info[key])
  78. @classmethod
  79. def deal(cls):
  80. """
  81. Dealing function
  82. :return:
  83. """
  84. yesterday_data = cls.getYesterdayData()
  85. level_url_list_map = cls.splitToDifferentPools(yesterday_data)
  86. cls.sendToDifferentPools(pool_info=level_url_list_map)
  87. if __name__ == '__main__':
  88. STML = SendToMultiLevels()
  89. STML.deal()