update_article_info_from_aigc.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. """
  2. @author: luojunhui
  3. """
  4. import json
  5. import time
  6. import traceback
  7. from typing import List, Dict
  8. from pymysql.cursors import DictCursor
  9. from tqdm import tqdm
  10. from applications import aiditApi
  11. from applications import bot
  12. from applications import log
  13. from applications import WeixinSpider
  14. from applications.const import ArticleCollectorConst
  15. from applications.db import DatabaseConnector
  16. from applications.functions import Functions
  17. from config import denet_config, long_articles_config
  18. empty_dict = {}
  19. const = ArticleCollectorConst()
  20. functions = Functions()
  21. spider = WeixinSpider()
  22. class UpdateArticleInfoFromAIGC(object):
  23. """
  24. 从aigc获取文章信息
  25. """
  26. def __init__(self):
  27. self.aigc_db_client = DatabaseConnector(db_config=denet_config)
  28. self.long_articles_db_client = DatabaseConnector(db_config=long_articles_config)
  29. self.aigc_db_client.connect()
  30. self.long_articles_db_client.connect()
  31. def get_published_articles(self) -> List[Dict]:
  32. """
  33. 获取当天发布文章的List
  34. """
  35. sql = f"""
  36. SELECT trace_id, push_type
  37. FROM long_articles_published_trace_id
  38. WHERE create_timestamp > UNIX_TIMESTAMP(DATE_SUB(CURDATE(), INTERVAL 1 DAY)) AND status = {const.INIT_STATUS};
  39. """
  40. article_list = self.long_articles_db_client.fetch(sql, cursor_type=DictCursor)
  41. return article_list
  42. def get_article_info_from_aigc(self, trace_id: str) -> Dict:
  43. """
  44. 从aigc获取发布结果
  45. """
  46. sql = f"""
  47. SELECT t2.crawler_channel_content_id, t2.publish_stage_url, t2.publish_timestamp, t1.result_data
  48. from publish_content_miniprogram t1
  49. join publish_content t2 on t1.publish_content_id = t2.id
  50. where t1.trace_id = '{trace_id}' and t2.status = {const.PUBLISHED_STATUS};
  51. """
  52. article_info = self.aigc_db_client.fetch(sql, cursor_type=DictCursor)
  53. if article_info:
  54. return article_info[0]
  55. else:
  56. return empty_dict
  57. def get_article_info_by_trace_id(self, trace_id: str) -> Dict:
  58. """
  59. 通过trace_id来查询文章信息
  60. """
  61. select_sql = f"""
  62. SELECT t1.gh_id, t1.account_name, t2.article_title
  63. FROM long_articles_match_videos t1
  64. JOIN long_articles_text t2
  65. ON t1.content_id = t2.content_id
  66. WHERE t1.trace_id = '{trace_id}';
  67. """
  68. article_info = self.long_articles_db_client.fetch(select_sql, cursor_type=DictCursor)
  69. if article_info:
  70. return article_info[0]
  71. else:
  72. return empty_dict
  73. def update_each_article(self, article: Dict):
  74. """
  75. 更新每个文章的信息
  76. """
  77. trace_id = article["trace_id"]
  78. push_type = article["push_type"]
  79. article_info = self.get_article_info_from_aigc(trace_id)
  80. if article_info:
  81. channel_content_id = article_info["crawler_channel_content_id"]
  82. published_url = article_info["publish_stage_url"]
  83. publish_timestamp = int(article_info["publish_timestamp"] / 1000)
  84. result_data = json.loads(article_info["result_data"])
  85. root_source_id_list = [
  86. functions.extract_path(item["productionPath"])["root_source_id"] for item in result_data
  87. ]
  88. wx_sn = None
  89. if published_url:
  90. response = spider.get_article_text(content_link=published_url)
  91. code = response['code']
  92. match code:
  93. case const.ARTICLE_SUCCESS_CODE:
  94. long_url = response['data']['data']['content_link']
  95. wx_sn = functions.extract_params_from_url(url=long_url, key="sn")
  96. status = const.SUCCESS_STATUS
  97. case const.ARTICLE_DELETE_CODE:
  98. log(
  99. task="update_article_info_from_aigc",
  100. function="update_each_article",
  101. status="fail",
  102. message=trace_id,
  103. data={
  104. "msg": "文章被删文",
  105. "publish_timestamp": publish_timestamp,
  106. "article_delete_timestamp": int(time.time()),
  107. "duration": int(time.time()) - publish_timestamp
  108. }
  109. )
  110. status = const.FAIL_STATUS
  111. case const.ARTICLE_ILLEGAL_CODE:
  112. log(
  113. task="update_article_info_from_aigc",
  114. function="update_each_article",
  115. status="fail",
  116. message=trace_id,
  117. data={
  118. "msg": "文章被判断违规",
  119. "publish_timestamp": publish_timestamp,
  120. "illegal_timestamp": int(time.time()),
  121. "duration": int(time.time()) - publish_timestamp
  122. }
  123. )
  124. article_info = self.get_article_info_by_trace_id(trace_id)
  125. if article_info:
  126. error_detail = response.get("msg")
  127. insert_sql = f"""
  128. INSERT IGNORE INTO illegal_articles
  129. (gh_id, account_name, title, wx_sn, publish_date, illegal_reason)
  130. VALUES
  131. (%s, %s, %s, %s, %s, %s);
  132. """
  133. affected_rows = self.long_articles_db_client.save(
  134. query=insert_sql,
  135. params=(
  136. article_info['gh_id'],
  137. article_info['account_name'],
  138. article_info['article_title'],
  139. wx_sn,
  140. functions.timestamp_to_str(publish_timestamp),
  141. error_detail
  142. )
  143. )
  144. if affected_rows:
  145. bot(
  146. title="文章违规告警(new task)",
  147. detail={
  148. "account_name": article_info['account_name'],
  149. "gh_id": article_info['gh_id'],
  150. "title": article_info['article_title'],
  151. "wx_sn": wx_sn,
  152. "publish_date": functions.timestamp_to_str(publish_timestamp),
  153. "error_detail": error_detail,
  154. },
  155. mention=False
  156. )
  157. aiditApi.delete_articles(
  158. gh_id=article_info['gh_id'],
  159. title=article_info['article_title']
  160. )
  161. status = const.FAIL_STATUS
  162. case _:
  163. status = const.FAIL_STATUS
  164. else:
  165. if push_type == const.BULK_AUTO_PUSH:
  166. status = const.INIT_STATUS
  167. else:
  168. status = const.SUCCESS_STATUS
  169. update_sql = f"""
  170. UPDATE long_articles_published_trace_id
  171. SET published_url = %s, status = %s, wx_sn = %s, publish_timestamp = %s, crawler_channel_content_id = %s, root_source_id_list = %s
  172. WHERE trace_id = %s;
  173. """
  174. self.long_articles_db_client.save(
  175. query=update_sql,
  176. params=(published_url, status, wx_sn, publish_timestamp, channel_content_id, json.dumps(root_source_id_list), trace_id)
  177. )
  178. else:
  179. update_sql = f"""
  180. UPDATE long_articles_published_trace_id
  181. SET status = %s
  182. WHERE trace_id = %s;
  183. """
  184. self.long_articles_db_client.save(
  185. query=update_sql, params=(const.FAIL_STATUS, trace_id)
  186. )
  187. def deal(self):
  188. """
  189. main function
  190. """
  191. article_list = self.get_published_articles()
  192. log(
  193. task="update_article_info_from_aigc",
  194. function="deal",
  195. data=article_list,
  196. message="total got {} articles this time".format(len(article_list))
  197. )
  198. for article in tqdm(article_list, desc="更新文章信息"):
  199. try:
  200. self.update_each_article(article)
  201. except Exception as e:
  202. log(
  203. task="update_article_info_from_aigc",
  204. function="update_each_article",
  205. status="fail",
  206. message="update_article_fail",
  207. data={
  208. "trace_id": article["trace_id"],
  209. "error": str(e),
  210. "traceback": traceback.format_exc()
  211. }
  212. )