task3.py 2.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. """
  2. @author: luojunhui
  3. """
  4. import datetime
  5. from tqdm import tqdm
  6. from applications import AIDTApi, DeNetMysql, PQMySQL, Functions, ODPSApi
  7. from config import poolTagMap
  8. from stratrgy import ArticlePoolStrategy
  9. class SendToMultiLevels(object):
  10. """
  11. 流量池任务
  12. """
  13. AidApi = AIDTApi()
  14. DeMysql = DeNetMysql()
  15. PqMysql = PQMySQL()
  16. Fun = Functions()
  17. OA = ODPSApi()
  18. @classmethod
  19. def getYesterdayData(cls):
  20. """
  21. 获取前一天数据表现
  22. :return:
  23. """
  24. odps_sql = "select * from loghubods.changwen_article_datastat where dt = '20240729';"
  25. result = cls.OA.select(sql=odps_sql)
  26. response_list = [
  27. {
  28. "article_id": record["article_id"],
  29. "increase_read_count": record["increase_read_count"],
  30. "read_count": record["read_count"],
  31. "increase_income": record["increase_income"],
  32. "income": record["income"],
  33. "increase_share_count": record["increase_share_count"],
  34. "share_count": record["share_count"],
  35. "update_timestamp": record["update_timestamp"]
  36. } for record in result if record['increase_read_count'] >= 1000
  37. ]
  38. return response_list
  39. @classmethod
  40. def splitToDifferentPools(cls, yesterday_data):
  41. """
  42. 分类至Pools
  43. :return:
  44. """
  45. S = ArticlePoolStrategy()
  46. detail_list = S.getData(article_list=yesterday_data)
  47. result = S.splitByStrategy(detail_list=detail_list)
  48. return result
  49. @classmethod
  50. def sendToEachCrawlerPlan(cls, key, url_list):
  51. """
  52. :param key:
  53. :param url_list:
  54. :return:
  55. """
  56. print(key)
  57. print(len(url_list))
  58. print(url_list)
  59. # daily自动创建新抓取计划
  60. # cls.AidApi.updateArticleIntoCrawlerPlan(
  61. # plan_id=None,
  62. # plan_name="{}--{}".format(datetime.datetime.today().__str__().split(" ")[0], key),
  63. # plan_tag=poolTagMap[key],
  64. # url_list=url_list
  65. # )
  66. @classmethod
  67. def sendToDifferentPools(cls, pool_info):
  68. """
  69. 获取文章url
  70. :return:
  71. """
  72. for key in pool_info:
  73. cls.sendToEachCrawlerPlan(key, pool_info[key])
  74. @classmethod
  75. def deal(cls):
  76. """
  77. Dealing function
  78. :return:
  79. """
  80. yesterday_data = cls.getYesterdayData()
  81. level_url_list_map = cls.splitToDifferentPools(yesterday_data)
  82. cls.sendToDifferentPools(pool_info=level_url_list_map)
  83. if __name__ == '__main__':
  84. ST = SendToMultiLevels()
  85. ST.deal()