task1.py 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. """
  2. @author: luojunhui
  3. """
  4. import json
  5. from tqdm import tqdm
  6. from applications import AIDTApi, DeNetMysql, PQMySQL
  7. class ColdStartPool(object):
  8. """
  9. 冷启动任务
  10. """
  11. AidApi = AIDTApi()
  12. DeMysql = DeNetMysql()
  13. PqMysql = PQMySQL()
  14. @classmethod
  15. def getPlanAllArticles(cls, plan_id):
  16. """
  17. 获取一个计划的所有内容
  18. :param plan_id:
  19. :return:
  20. """
  21. page = 1
  22. response = cls.AidApi.getPlanArticleList(plan_id=plan_id, page_index=page).get("data", {})
  23. data_list = response['data']
  24. all_articles_count = response['totalCount']
  25. while len(data_list) + 50 * (page - 1) < all_articles_count:
  26. page += 1
  27. response_next_page = cls.AidApi.getPlanArticleList(plan_id=plan_id, page_index=page).get("data", {})
  28. data_list += response_next_page['data']
  29. return data_list
  30. @classmethod
  31. def updateToPool(cls, plan_id):
  32. """
  33. 获取计划内容并且写入冷启池
  34. :param plan_id:
  35. :return:
  36. """
  37. each_plan_articles = cls.getPlanAllArticles(plan_id)
  38. for article in tqdm(each_plan_articles):
  39. try:
  40. cls.updateEachArticle(article)
  41. except Exception as e:
  42. print(e)
  43. # with ThreadPoolExecutor(max_workers=10) as Pool:
  44. # Pool.map(cls.updateEachArticle, each_plan_articles)
  45. @classmethod
  46. def updateEachArticle(cls, article_obj):
  47. """
  48. update each article to db
  49. :param article_obj:
  50. :return:
  51. """
  52. sql = f"""
  53. INSERT INTO cold_start_article_pool
  54. (content_id, content_link, title, cover, view_count, like_count, looking_count, publish_time_stamp, plan_id, category, content_channel_id)
  55. VALUES
  56. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  57. """
  58. params = (
  59. article_obj.get("id"),
  60. article_obj.get("contentLink"),
  61. article_obj.get("title"),
  62. article_obj.get("coverImageUrl"),
  63. article_obj.get("viewCount"),
  64. article_obj.get("likeCount"),
  65. article_obj.get("lookingCount"),
  66. article_obj.get("publishTimestamp"),
  67. article_obj.get("sourceCrawlerPlans")[0].get("id"),
  68. article_obj.get("sourceCrawlerPlans")[0].get("name").split("-")[1],
  69. article_obj.get("channelContentId")
  70. )
  71. cls.PqMysql.update(sql=sql, params=params)
  72. @classmethod
  73. def deal(cls):
  74. """
  75. 获取非空抓取计划id
  76. :return:
  77. """
  78. plan_id_list = cls.DeMysql.getUnEmptyPlan()
  79. for plan_id in tqdm(plan_id_list):
  80. cls.updateToPool(plan_id)
  81. CST = ColdStartPool()
  82. CST.deal()