task1.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. """
  2. @author: luojunhui
  3. """
  4. from tqdm import tqdm
  5. from applications import AIDTApi, DeNetMysql, PQMySQL
  6. class ColdStartPool(object):
  7. """
  8. 冷启动任务
  9. """
  10. AidApi = AIDTApi()
  11. DeMysql = DeNetMysql()
  12. PqMysql = PQMySQL()
  13. @classmethod
  14. def getPlanAllArticles(cls, plan_id):
  15. """
  16. 获取一个计划的所有内容
  17. :param plan_id:
  18. :return:
  19. """
  20. page = 1
  21. response = cls.AidApi.getPlanArticleList(plan_id=plan_id, page_index=page).get("data", {})
  22. data_list = response['data']
  23. all_articles_count = response['totalCount']
  24. while len(data_list) + 50 * (page - 1) < all_articles_count:
  25. page += 1
  26. response_next_page = cls.AidApi.getPlanArticleList(plan_id=plan_id, page_index=page).get("data", {})
  27. data_list += response_next_page['data']
  28. return data_list
  29. @classmethod
  30. def updateToPool(cls, plan_id):
  31. """
  32. 获取计划内容并且写入冷启池
  33. :param plan_id:
  34. :return:
  35. """
  36. each_plan_articles = cls.getPlanAllArticles(plan_id)
  37. for article in tqdm(each_plan_articles):
  38. try:
  39. cls.updateEachArticle(article)
  40. except Exception as e:
  41. print(e)
  42. # with ThreadPoolExecutor(max_workers=10) as Pool:
  43. # Pool.map(cls.updateEachArticle, each_plan_articles)
  44. @classmethod
  45. def updateEachArticle(cls, article_obj):
  46. """
  47. update each article to db
  48. :param article_obj:
  49. :return:
  50. """
  51. sql = f"""
  52. INSERT INTO cold_start_article_pool
  53. (content_id, content_link, title, cover, view_count, like_count, looking_count, publish_time_stamp, plan_id, category, content_channel_id, status)
  54. VALUES
  55. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  56. """
  57. params = (
  58. article_obj.get("id"),
  59. article_obj.get("contentLink"),
  60. article_obj.get("title"),
  61. article_obj.get("coverImageUrl"),
  62. article_obj.get("viewCount"),
  63. article_obj.get("likeCount"),
  64. article_obj.get("lookingCount"),
  65. article_obj.get("publishTimestamp"),
  66. article_obj.get("sourceCrawlerPlans")[0].get("id"),
  67. article_obj.get("sourceCrawlerPlans")[0].get("name").split("-")[1],
  68. article_obj.get("channelContentId"),
  69. 1
  70. )
  71. cls.PqMysql.update(sql=sql, params=params)
  72. @classmethod
  73. def deal(cls):
  74. """
  75. 获取非空抓取计划id
  76. :return:
  77. """
  78. plan_id_list = cls.DeMysql.getUnEmptyPlan()
  79. for plan_id in tqdm(plan_id_list):
  80. cls.updateToPool(plan_id)
  81. if __name__ == '__main__':
  82. CP = ColdStartPool()
  83. CP.deal()