fwh_data_recycle.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. import json
  2. import time
  3. import urllib.parse
  4. from tqdm import tqdm
  5. from pymysql.cursors import DictCursor
  6. from applications.db import DatabaseConnector
  7. from applications.utils import str_to_md5
  8. from cold_start.crawler.wechat import get_article_detail
  9. from config import denet_config, long_articles_config, piaoquan_crawler_config
  10. class FwhDataRecycle:
  11. RECYCLE_INIT_STATUS = 0
  12. RECYCLE_PROCESSING_STATUS = 1
  13. RECYCLE_SUCCESS_STATUS = 2
  14. RECYCLE_FAILED_STATUS = 99
  15. PUBLISH_SUCCESS_STATUS = 2
  16. STAT_PERIOD = 2 * 24 * 3600
  17. def __init__(self):
  18. self.denet_client = DatabaseConnector(denet_config)
  19. self.denet_client.connect()
  20. self.long_articles_client = DatabaseConnector(long_articles_config)
  21. self.long_articles_client.connect()
  22. self.piaoquan_client = DatabaseConnector(piaoquan_crawler_config)
  23. self.piaoquan_client.connect()
  24. class FwhGroupPublishRecordManager(FwhDataRecycle):
  25. def get_published_articles(self):
  26. fetch_query = f"""
  27. select id, publish_content_id, gh_id, user_group_id
  28. from long_articles_group_send_result
  29. where status = %s and recycle_status = %s;
  30. """
  31. fetch_response = self.long_articles_client.fetch(
  32. query=fetch_query,
  33. cursor_type=DictCursor,
  34. params=(self.PUBLISH_SUCCESS_STATUS, self.RECYCLE_INIT_STATUS),
  35. )
  36. return fetch_response
  37. def get_article_url_from_aigc_system(self, publish_content_id, user_group_id):
  38. sql = f"""
  39. select t1.publish_stage_url, t2.publish_timestamp
  40. from publish_content_stage_url t1
  41. left join publish_content t2 on t1.publish_content_id = t2.id
  42. where t1.publish_content_id = %s and t1.user_group_id = %s;
  43. """
  44. article_info = self.denet_client.fetch(
  45. sql,
  46. cursor_type=DictCursor,
  47. params=(publish_content_id, user_group_id),
  48. )
  49. if article_info:
  50. return article_info[0]
  51. else:
  52. return None
  53. def update_recycle_status(self, record_id, ori_status, new_status):
  54. update_query = f"""
  55. update long_articles_group_send_result
  56. set recycle_status = %s
  57. where id = %s and recycle_status = %s;
  58. """
  59. return self.long_articles_client.save(
  60. update_query, (new_status, record_id, ori_status)
  61. )
  62. def set_article_url(self, record_id, article_url, publish_timestamp):
  63. update_query = f"""
  64. update long_articles_group_send_result
  65. set url = %s, publish_timestamp = %s, recycle_status = %s
  66. where id = %s and recycle_status = %s;
  67. """
  68. return self.long_articles_client.save(
  69. query=update_query,
  70. params=(article_url, publish_timestamp, self.RECYCLE_SUCCESS_STATUS, record_id, self.RECYCLE_PROCESSING_STATUS)
  71. )
  72. def deal(self):
  73. publish_records = self.get_published_articles()
  74. for publish_record in tqdm(publish_records):
  75. publish_content_id = publish_record["publish_content_id"]
  76. record_id = publish_record["id"]
  77. group_id = publish_record["user_group_id"]
  78. # lock
  79. self.update_recycle_status(record_id, self.RECYCLE_INIT_STATUS, self.RECYCLE_PROCESSING_STATUS)
  80. publish_call_back_info = self.get_article_url_from_aigc_system(publish_content_id, group_id)
  81. if publish_call_back_info:
  82. article_url = publish_call_back_info["publish_stage_url"]
  83. publish_timestamp = int(publish_call_back_info["publish_timestamp"] / 1000)
  84. if article_url and publish_timestamp:
  85. # set record and unlock
  86. self.set_article_url(record_id, article_url, publish_timestamp)
  87. else:
  88. # unlock
  89. self.update_recycle_status(record_id, self.RECYCLE_PROCESSING_STATUS, self.RECYCLE_INIT_STATUS)
  90. else:
  91. # unlock
  92. self.update_recycle_status(record_id, self.RECYCLE_PROCESSING_STATUS, self.RECYCLE_INIT_STATUS)
  93. class SaveFwhDataToDatabase(FwhDataRecycle):
  94. def update_article_read_cnt(self, wx_sn, new_read_cnt):
  95. """
  96. update article read cnt
  97. """
  98. update_query = f"""
  99. update official_articles_v2
  100. set show_view_count = %s
  101. where wx_sn = %s;
  102. """
  103. return self.piaoquan_client.save(update_query, (new_read_cnt, wx_sn))
  104. def save_data_to_database(self, article):
  105. """
  106. save data to db
  107. """
  108. insert_query = f"""
  109. insert into official_articles_v2
  110. (ghId, accountName, appMsgId, title, Type, createTime, updateTime, ItemIndex, ContentUrl, show_view_count,
  111. wx_sn, title_md5, article_group, channel_content_id, root_source_id_list, publish_timestamp)
  112. values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  113. """
  114. return self.piaoquan_client.save(insert_query, article)
  115. def get_group_server_accounts(self):
  116. fetch_query = f"""
  117. select gzh_id from article_gzh_developer;
  118. """
  119. fetch_response = self.piaoquan_client.fetch(fetch_query, cursor_type=DictCursor)
  120. gh_id_list = [i['gzh_id'] for i in fetch_response]
  121. gh_id_list = ['gh_5e543853d8f0']
  122. return gh_id_list
  123. def get_stat_published_articles(self, gh_id):
  124. earliest_timestamp = int(time.time()) - self.STAT_PERIOD
  125. fetch_query = f"""
  126. select publish_date, account_name, gh_id, user_group_id, url, publish_timestamp
  127. from long_articles_group_send_result
  128. where gh_id = %s and recycle_status = %s and publish_timestamp > %s;
  129. """
  130. return self.long_articles_client.fetch(fetch_query, DictCursor,(gh_id, self.RECYCLE_SUCCESS_STATUS, earliest_timestamp))
  131. def process_each_account_data(self, account_published_article_list):
  132. if not account_published_article_list:
  133. return
  134. for article in account_published_article_list:
  135. account_name = article['account_name']
  136. gh_id = article['gh_id']
  137. user_group_id = article['user_group_id']
  138. url = article['url']
  139. publish_timestamp = article['publish_timestamp']
  140. # get article detail info with spider
  141. try:
  142. article_detail_info = get_article_detail(url, is_count=True, is_cache=False)
  143. time.sleep(3)
  144. content_url = article_detail_info["data"]["data"]["content_link"]
  145. app_msg_id = content_url.split("mid=")[-1].split("&")[0]
  146. wx_sn = content_url.split("sn=")[-1]
  147. create_time = publish_timestamp
  148. update_time = publish_timestamp
  149. item_index = article_detail_info["data"]["data"]["item_index"]
  150. show_view_count = article_detail_info["data"]["data"]["view_count"]
  151. title = article_detail_info["data"]["data"]["title"]
  152. title_md5 = str_to_md5(title)
  153. channel_content_id = article_detail_info["data"]["data"]["channel_content_id"]
  154. mini_program_info = article_detail_info["data"]["data"]["mini_program"]
  155. root_source_id_list = [
  156. urllib.parse.parse_qs(
  157. urllib.parse.unquote(i['path'])
  158. )['rootSourceId'][0]
  159. for i in mini_program_info
  160. ]
  161. root_source_id_list = json.dumps(root_source_id_list)
  162. try:
  163. self.save_data_to_database(
  164. article=(gh_id, account_name, app_msg_id, title, '9', create_time, update_time, item_index, content_url, show_view_count, wx_sn, title_md5, user_group_id, channel_content_id, root_source_id_list, publish_timestamp)
  165. )
  166. except Exception as e:
  167. self.update_article_read_cnt(wx_sn, show_view_count)
  168. except Exception as e:
  169. print(f"article {url} is not available, skip it")
  170. print(e)
  171. def deal(self):
  172. account_id_list = self.get_group_server_accounts()
  173. for account_id in account_id_list:
  174. publish_articles = tqdm(self.get_stat_published_articles(account_id), desc=f"<crawling> {account_id}")
  175. self.process_each_account_data(publish_articles)
  176. class FwhDataExportTemp(FwhDataRecycle):
  177. def get_publish_articles(self):
  178. sql = f"""
  179. -- select accountName, title, article_group, ItemIndex, show_view_count, from_unixtime(createTime, '%Y-%m-%d'), root_source_id_list
  180. select accountName, title, ItemIndex, from_unixtime(createTime, '%Y-%m-%d'), sum(show_view_count), root_source_id_list
  181. from official_articles_v2
  182. where accountName = '票圈精彩'
  183. and from_unixtime(publish_timestamp) between '2025-06-07' and '2025-06-10'
  184. group by accountName, title, ItemIndex;
  185. """
  186. return self.piaoquan_client.fetch(query=sql, cursor_type=DictCursor)
  187. def get_fission_info(self, root_source_id_list):
  188. """
  189. 获取裂变信息
  190. """
  191. root_source_id_tuple = tuple(json.loads(root_source_id_list))
  192. query = f"""
  193. select sum(uv) as 'uv', sum(first_uv) as 'first_uv', sum(split_uv) as 'split_uv', sum(split0) as 'T+0_fission'
  194. from changwen_data_rootsourceid
  195. where root_source_id
  196. in %s;
  197. """
  198. return self.long_articles_client.fetch(query=query, cursor_type=DictCursor, params=(root_source_id_tuple,))
  199. def deal(self):
  200. import pandas as pd
  201. publish_articles = self.get_publish_articles()
  202. L = []
  203. for article in publish_articles:
  204. root_source_id_list = article['root_source_id_list']
  205. fission_info = self.get_fission_info(root_source_id_list)
  206. article['uv'] = fission_info[0]['uv']
  207. article['first_uv'] = fission_info[0]['first_uv']
  208. article['split_uv'] = fission_info[0]['split_uv']
  209. article['T+0_fission'] = fission_info[0]['T+0_fission']
  210. L.append(article)
  211. df = pd.DataFrame(L)
  212. df.to_csv('temp.csv', index=False)
  213. if __name__ == '__main__':
  214. FwhGroupPublishRecordManager().deal()
  215. SaveFwhDataToDatabase().deal()
  216. # FwhDataExportTemp().deal()