fwh_data_recycle.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. import time
  2. from datetime import datetime
  3. from pymysql.cursors import DictCursor
  4. from applications.db import DatabaseConnector
  5. from cold_start.crawler.wechat import get_article_detail
  6. from config import denet_config, long_articles_config, piaoquan_crawler_config
  7. class FwhDataRecycle:
  8. RECYCLE_INIT_STATUS = 0
  9. RECYCLE_PROCESSING_STATUS = 1
  10. RECYCLE_SUCCESS_STATUS = 2
  11. RECYCLE_FAILED_STATUS = 99
  12. PUBLISH_SUCCESS_STATUS = 2
  13. STAT_PERIOD = 3 * 24 * 3600
  14. def __init__(self):
  15. self.denet_client = DatabaseConnector(denet_config)
  16. self.denet_client.connect()
  17. self.long_articles_client = DatabaseConnector(long_articles_config)
  18. self.long_articles_client.connect()
  19. self.piaoquan_client = DatabaseConnector(piaoquan_crawler_config)
  20. self.piaoquan_client.connect()
  21. class FwhGroupPublishRecordManager(FwhDataRecycle):
  22. def get_published_articles(self):
  23. fetch_query = f"""
  24. select id, publish_content_id, gh_id, user_group_id
  25. from long_articles_group_send_result
  26. where status = %s and recycle_status = %s;
  27. """
  28. fetch_response = self.long_articles_client.fetch(
  29. query=fetch_query,
  30. cursor_type=DictCursor,
  31. params=(self.PUBLISH_SUCCESS_STATUS, self.RECYCLE_INIT_STATUS),
  32. )
  33. return fetch_response
  34. def get_article_url_from_aigc_system(self, publish_content_id, user_group_id):
  35. sql = f"""
  36. select t1.publish_stage_url, t2.publish_timestamp
  37. from publish_content_stage_url t1
  38. left join publish_content t2 on t1.publish_content_id = t2.id
  39. where t1.publish_content_id = %s and t1.user_group_id = %s;
  40. """
  41. article_info = self.denet_client.fetch(
  42. sql,
  43. cursor_type=DictCursor,
  44. params=(publish_content_id, user_group_id),
  45. )
  46. if article_info:
  47. return article_info[0]
  48. else:
  49. return None
  50. def update_recycle_status(self, record_id, ori_status, new_status):
  51. update_query = f"""
  52. update long_articles_group_send_result
  53. set recycle_status = %s
  54. where id = %s and recycle_status = %s;
  55. """
  56. return self.long_articles_client.save(
  57. update_query, (new_status, record_id, ori_status)
  58. )
  59. def set_article_url(self, record_id, article_url, publish_timestamp):
  60. update_query = f"""
  61. update long_articles_group_send_result
  62. set url = %s, publish_timestamp = %s, recycle_status = %s
  63. where id = %s and recycle_status = %s;
  64. """
  65. return self.long_articles_client.save(
  66. query=update_query,
  67. params=(article_url, publish_timestamp, self.RECYCLE_SUCCESS_STATUS, record_id, self.RECYCLE_PROCESSING_STATUS)
  68. )
  69. def deal(self):
  70. publish_records = self.get_published_articles()
  71. for publish_record in publish_records:
  72. publish_content_id = publish_record["publish_content_id"]
  73. record_id = publish_record["id"]
  74. # lock
  75. self.update_recycle_status(record_id, self.RECYCLE_INIT_STATUS, self.RECYCLE_PROCESSING_STATUS)
  76. publish_call_back_info = self.get_article_url_from_aigc_system(publish_content_id)
  77. if publish_call_back_info:
  78. article_url = publish_call_back_info["publish_stage_url"]
  79. publish_timestamp = int(publish_call_back_info["publish_timestamp"] / 1000)
  80. if article_url and publish_timestamp:
  81. # set record and unlock
  82. self.set_article_url(record_id, article_url, publish_timestamp)
  83. else:
  84. # unlock
  85. self.update_recycle_status(record_id, self.RECYCLE_PROCESSING_STATUS, self.RECYCLE_INIT_STATUS)
  86. else:
  87. # unlock
  88. self.update_recycle_status(record_id, self.RECYCLE_PROCESSING_STATUS, self.RECYCLE_INIT_STATUS)
  89. class SaveFwhDataToDatabase(FwhDataRecycle):
  90. def update_article_read_cnt(self, wx_sn, new_read_cnt):
  91. """
  92. update article read cnt
  93. """
  94. update_query = f"""
  95. update official_articles_v2
  96. set show_view_count = %s
  97. where wx_sn = %s;
  98. """
  99. return self.piaoquan_client.save(update_query, (new_read_cnt, wx_sn))
  100. def save_data_to_database(self, article_list):
  101. """
  102. save data to db
  103. """
  104. insert_query = f"""
  105. insert into official_articles_v2
  106. (ghId, accountName, appMsgId, title, Type, createTime, updateTime, ItemIndex, ContentUrl, show_view_count, wx_sn, title_md5, article_group, channel_content_id, root_source_id_list)
  107. values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  108. """
  109. return self.piaoquan_client.save(insert_query, article_list)
  110. def get_group_server_accounts(self):
  111. fetch_query = f"""
  112. select gzh_id from article_gzh_developer;
  113. """
  114. fetch_response = self.piaoquan_client.fetch(fetch_query, cursor_type=DictCursor)
  115. gh_id_list = [i['gzh_id'] for i in fetch_response]
  116. return gh_id_list
  117. def get_stat_published_articles(self, gh_id):
  118. earliest_timestamp = int(time.time()) - self.STAT_PERIOD
  119. fetch_query = f"""
  120. select account_name, gh_id, user_group_id, url, publish_timestamp
  121. from long_articles_group_send_result
  122. where gh_id = %s and recycle_status = %s and publish_timestamp > %s;
  123. """
  124. return self.long_articles_client.fetch(fetch_query, DictCursor,(gh_id, self.RECYCLE_SUCCESS_STATUS, earliest_timestamp))
  125. def process_each_account_data(self, account_published_article_list):
  126. if not account_published_article_list:
  127. return
  128. for article in account_published_article_list:
  129. account_name = article['account_name']
  130. gh_id = article['gh_id']
  131. user_group_id = article['user_group_id']
  132. url = article['url']
  133. publish_timestamp = article['publish_timestamp']
  134. # get article detail info with spider
  135. article_detail_info = get_article_detail(url)
  136. time.sleep(5)
  137. def deal(self):
  138. account_id_list = self.get_group_server_accounts()
  139. for account_id in account_id_list:
  140. publish_articles = self.get_stat_published_articles(account_id)
  141. if __name__ == '__main__':
  142. FwhGroupPublishRecordManager().deal()