task1.py 2.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. """
  2. @author: luojunhui
  3. """
  4. import json
  5. from tqdm import tqdm
  6. from applications import AIDTApi, DeNetMysql, PQMySQL
  7. class ColdStartPool(object):
  8. """
  9. 冷启动任务
  10. """
  11. AidApi = AIDTApi()
  12. DeMysql = DeNetMysql()
  13. PqMysql = PQMySQL()
  14. @classmethod
  15. def getPlanAllArticles(cls, plan_id):
  16. """
  17. 获取一个计划的所有内容
  18. :param plan_id:
  19. :return:
  20. """
  21. page = 1
  22. response = cls.AidApi.getPlanArticleList(plan_id=plan_id, page_index=page).get("data", {})
  23. data_list = response['data']
  24. all_articles_count = response['totalCount']
  25. while len(data_list) + 50 * (page - 1) < all_articles_count:
  26. page += 1
  27. response_next_page = cls.AidApi.getPlanArticleList(plan_id=plan_id, page_index=page).get("data", {})
  28. data_list += response_next_page['data']
  29. return data_list
  30. @classmethod
  31. def updateToPool(cls, plan_id):
  32. """
  33. 获取计划内容并且写入冷启池
  34. :param plan_id:
  35. :return:
  36. """
  37. each_plan_articles = cls.getPlanAllArticles(plan_id)
  38. for article in tqdm(each_plan_articles):
  39. try:
  40. cls.updateEachArticle(article)
  41. except Exception as e:
  42. print(e)
  43. # with ThreadPoolExecutor(max_workers=10) as Pool:
  44. # Pool.map(cls.updateEachArticle, each_plan_articles)
  45. @classmethod
  46. def updateEachArticle(cls, article_obj):
  47. """
  48. update each article to db
  49. :param article_obj:
  50. :return:
  51. """
  52. sql = f"""
  53. INSERT INTO cold_start_article_pool
  54. (content_id, content_link, title, cover, view_count, like_count, looking_count, publish_time_stamp, plan_id, category, content_channel_id, status)
  55. VALUES
  56. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  57. """
  58. params = (
  59. article_obj.get("id"),
  60. article_obj.get("contentLink"),
  61. article_obj.get("title"),
  62. article_obj.get("coverImageUrl"),
  63. article_obj.get("viewCount"),
  64. article_obj.get("likeCount"),
  65. article_obj.get("lookingCount"),
  66. article_obj.get("publishTimestamp"),
  67. article_obj.get("sourceCrawlerPlans")[0].get("id"),
  68. article_obj.get("sourceCrawlerPlans")[0].get("name").split("-")[1],
  69. article_obj.get("channelContentId"),
  70. 1
  71. )
  72. cls.PqMysql.update(sql=sql, params=params)
  73. @classmethod
  74. def deal(cls):
  75. """
  76. 获取非空抓取计划id
  77. :return:
  78. """
  79. plan_id_list = cls.DeMysql.getUnEmptyPlan()
  80. for plan_id in tqdm(plan_id_list):
  81. cls.updateToPool(plan_id)
  82. CST = ColdStartPool()
  83. CST.deal()