update_article_info_from_aigc.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. """
  2. @author: luojunhui
  3. """
  4. import json
  5. from typing import List, Dict
  6. from pymysql.cursors import DictCursor
  7. from tqdm import tqdm
  8. from applications.const import ArticleCollectorConst
  9. from applications.db import DatabaseConnector
  10. from applications.functions import Functions
  11. from config import denet_config, long_articles_config
  12. empty_dict = {}
  13. const = ArticleCollectorConst()
  14. functions = Functions()
  15. class UpdateArticleInfoFromAIGC(object):
  16. """
  17. 从aigc获取文章信息
  18. """
  19. def __init__(self):
  20. self.aigc_db_client = DatabaseConnector(db_config=denet_config)
  21. self.long_articles_db_client = DatabaseConnector(db_config=long_articles_config)
  22. self.aigc_db_client.connect()
  23. self.long_articles_db_client.connect()
  24. def get_published_articles(self) -> List[Dict]:
  25. """
  26. 获取当天发布文章的List
  27. """
  28. sql = f"""
  29. SELECT trace_id, push_type
  30. FROM long_articles_published_trace_id
  31. WHERE create_timestamp> UNIX_TIMESTAMP(DATE_SUB(CURDATE(), INTERVAL 1 DAY)) AND status = {const.INIT_STATUS};
  32. """
  33. article_list = self.long_articles_db_client.fetch(sql, cursor_type=DictCursor)
  34. return article_list
  35. def get_article_info_from_aigc(self, trace_id: str) -> Dict:
  36. """
  37. 从aigc获取发布结果
  38. """
  39. sql = f"""
  40. SELECT t2.crawler_channel_content_id, t2.publish_stage_url, t2.publish_timestamp, t1.result_data
  41. from publish_content_miniprogram t1
  42. join publish_content t2 on t1.publish_content_id = t2.id
  43. where t1.trace_id = '{trace_id}' and t2.status = {const.PUBLISHED_STATUS};
  44. """
  45. article_info = self.aigc_db_client.fetch(sql, cursor_type=DictCursor)
  46. if article_info:
  47. return article_info[0]
  48. else:
  49. return empty_dict
  50. def update_each_article(self, article: Dict):
  51. """
  52. 更新每个文章的信息
  53. """
  54. trace_id = article["trace_id"]
  55. push_type = article["push_type"]
  56. article_info = self.get_article_info_from_aigc(trace_id)
  57. if article_info:
  58. channel_content_id = article_info["crawler_channel_content_id"]
  59. published_url = article_info["publish_stage_url"]
  60. publish_timestamp = int(article_info["publish_timestamp"] / 1000)
  61. result_data = json.loads(article_info["result_data"])
  62. root_source_id_list = [
  63. functions.extract_path(item["productionPath"])["root_source_id"] for item in result_data
  64. ]
  65. if published_url:
  66. status = const.SUCCESS_STATUS
  67. else:
  68. if push_type == const.BULK_AUTO_PUSH:
  69. status = const.INIT_STATUS
  70. else:
  71. status = const.SUCCESS_STATUS
  72. update_sql = f"""
  73. UPDATE long_articles_published_trace_id
  74. SET published_url = %s, status = %s, publish_timestamp = %s, crawler_channel_content_id = %s, root_source_id_list = %s
  75. WHERE trace_id = %s;
  76. """
  77. self.long_articles_db_client.save(
  78. query=update_sql,
  79. params=(published_url, status, publish_timestamp, channel_content_id, json.dumps(root_source_id_list), trace_id)
  80. )
  81. else:
  82. update_sql = f"""
  83. UPDATE long_articles_published_trace_id
  84. SET status = %s
  85. WHERE trace_id = %s;
  86. """
  87. self.long_articles_db_client.save(
  88. query=update_sql, params=(const.FAIL_STATUS, trace_id)
  89. )
  90. def deal(self):
  91. """
  92. main function
  93. """
  94. article_list = self.get_published_articles()
  95. for article in tqdm(article_list, desc="更新文章信息"):
  96. try:
  97. self.update_each_article(article)
  98. except Exception as e:
  99. print(e)
  100. if __name__ == "__main__":
  101. u = UpdateArticleInfoFromAIGC()
  102. u.deal()