import json import time import urllib.parse from tqdm import tqdm from pymysql.cursors import DictCursor from applications.db import DatabaseConnector from applications.utils import str_to_md5 from cold_start.crawler.wechat import get_article_detail from config import denet_config, long_articles_config, piaoquan_crawler_config class FwhDataRecycle: RECYCLE_INIT_STATUS = 0 RECYCLE_PROCESSING_STATUS = 1 RECYCLE_SUCCESS_STATUS = 2 RECYCLE_FAILED_STATUS = 99 PUBLISH_SUCCESS_STATUS = 2 STAT_PERIOD = 2 * 24 * 3600 def __init__(self): self.denet_client = DatabaseConnector(denet_config) self.denet_client.connect() self.long_articles_client = DatabaseConnector(long_articles_config) self.long_articles_client.connect() self.piaoquan_client = DatabaseConnector(piaoquan_crawler_config) self.piaoquan_client.connect() class FwhGroupPublishRecordManager(FwhDataRecycle): def get_published_articles(self): fetch_query = f""" select id, publish_content_id, gh_id, user_group_id from long_articles_group_send_result where status = %s and recycle_status = %s; """ fetch_response = self.long_articles_client.fetch( query=fetch_query, cursor_type=DictCursor, params=(self.PUBLISH_SUCCESS_STATUS, self.RECYCLE_INIT_STATUS), ) return fetch_response def get_article_url_from_aigc_system(self, publish_content_id, user_group_id): sql = f""" select t1.publish_stage_url, t2.publish_timestamp from publish_content_stage_url t1 left join publish_content t2 on t1.publish_content_id = t2.id where t1.publish_content_id = %s and t1.user_group_id = %s; """ article_info = self.denet_client.fetch( sql, cursor_type=DictCursor, params=(publish_content_id, user_group_id), ) if article_info: return article_info[0] else: return None def update_recycle_status(self, record_id, ori_status, new_status): update_query = f""" update long_articles_group_send_result set recycle_status = %s where id = %s and recycle_status = %s; """ return self.long_articles_client.save( update_query, (new_status, record_id, ori_status) ) def set_article_url(self, record_id, article_url, publish_timestamp): update_query = f""" update long_articles_group_send_result set url = %s, publish_timestamp = %s, recycle_status = %s where id = %s and recycle_status = %s; """ return self.long_articles_client.save( query=update_query, params=(article_url, publish_timestamp, self.RECYCLE_SUCCESS_STATUS, record_id, self.RECYCLE_PROCESSING_STATUS) ) def deal(self): publish_records = self.get_published_articles() for publish_record in tqdm(publish_records): publish_content_id = publish_record["publish_content_id"] record_id = publish_record["id"] group_id = publish_record["user_group_id"] # lock self.update_recycle_status(record_id, self.RECYCLE_INIT_STATUS, self.RECYCLE_PROCESSING_STATUS) publish_call_back_info = self.get_article_url_from_aigc_system(publish_content_id, group_id) if publish_call_back_info: article_url = publish_call_back_info["publish_stage_url"] publish_timestamp = int(publish_call_back_info["publish_timestamp"] / 1000) if article_url and publish_timestamp: # set record and unlock self.set_article_url(record_id, article_url, publish_timestamp) else: # unlock self.update_recycle_status(record_id, self.RECYCLE_PROCESSING_STATUS, self.RECYCLE_INIT_STATUS) else: # unlock self.update_recycle_status(record_id, self.RECYCLE_PROCESSING_STATUS, self.RECYCLE_INIT_STATUS) class SaveFwhDataToDatabase(FwhDataRecycle): def update_article_read_cnt(self, wx_sn, new_read_cnt): """ update article read cnt """ update_query = f""" update official_articles_v2 set show_view_count = %s where wx_sn = %s; """ return self.piaoquan_client.save(update_query, (new_read_cnt, wx_sn)) def save_data_to_database(self, article): """ save data to db """ insert_query = f""" insert into official_articles_v2 (ghId, accountName, appMsgId, title, Type, createTime, updateTime, ItemIndex, ContentUrl, show_view_count, wx_sn, title_md5, article_group, channel_content_id, root_source_id_list, publish_timestamp) values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s); """ return self.piaoquan_client.save(insert_query, article) def get_group_server_accounts(self): fetch_query = f""" select gzh_id from article_gzh_developer; """ fetch_response = self.piaoquan_client.fetch(fetch_query, cursor_type=DictCursor) gh_id_list = [i['gzh_id'] for i in fetch_response] gh_id_list = ['gh_5e543853d8f0'] return gh_id_list def get_stat_published_articles(self, gh_id): earliest_timestamp = int(time.time()) - self.STAT_PERIOD fetch_query = f""" select publish_date, account_name, gh_id, user_group_id, url, publish_timestamp from long_articles_group_send_result where gh_id = %s and recycle_status = %s and publish_timestamp > %s; """ return self.long_articles_client.fetch(fetch_query, DictCursor,(gh_id, self.RECYCLE_SUCCESS_STATUS, earliest_timestamp)) def process_each_account_data(self, account_published_article_list): if not account_published_article_list: return for article in account_published_article_list: account_name = article['account_name'] gh_id = article['gh_id'] user_group_id = article['user_group_id'] url = article['url'] publish_timestamp = article['publish_timestamp'] # get article detail info with spider try: article_detail_info = get_article_detail(url, is_count=True, is_cache=False) time.sleep(3) content_url = article_detail_info["data"]["data"]["content_link"] app_msg_id = content_url.split("mid=")[-1].split("&")[0] wx_sn = content_url.split("sn=")[-1] create_time = publish_timestamp update_time = publish_timestamp item_index = article_detail_info["data"]["data"]["item_index"] show_view_count = article_detail_info["data"]["data"]["view_count"] title = article_detail_info["data"]["data"]["title"] title_md5 = str_to_md5(title) channel_content_id = article_detail_info["data"]["data"]["channel_content_id"] mini_program_info = article_detail_info["data"]["data"]["mini_program"] root_source_id_list = [ urllib.parse.parse_qs( urllib.parse.unquote(i['path']) )['rootSourceId'][0] for i in mini_program_info ] root_source_id_list = json.dumps(root_source_id_list) try: self.save_data_to_database( article=(gh_id, account_name, app_msg_id, title, '9', create_time, update_time, item_index, content_url, show_view_count, wx_sn, title_md5, user_group_id, channel_content_id, root_source_id_list, publish_timestamp) ) except Exception as e: self.update_article_read_cnt(wx_sn, show_view_count) except Exception as e: print(f"article {url} is not available, skip it") print(e) def deal(self): account_id_list = self.get_group_server_accounts() for account_id in account_id_list: publish_articles = tqdm(self.get_stat_published_articles(account_id), desc=f" {account_id}") self.process_each_account_data(publish_articles) class FwhDataExportTemp(FwhDataRecycle): def get_publish_articles(self): sql = f""" -- select accountName, title, article_group, ItemIndex, show_view_count, from_unixtime(createTime, '%Y-%m-%d'), root_source_id_list select accountName, title, ItemIndex, from_unixtime(createTime, '%Y-%m-%d'), sum(show_view_count), root_source_id_list from official_articles_v2 where accountName = '票圈精彩' and from_unixtime(publish_timestamp) between '2025-06-07' and '2025-06-10' group by accountName, title, ItemIndex; """ return self.piaoquan_client.fetch(query=sql, cursor_type=DictCursor) def get_fission_info(self, root_source_id_list): """ 获取裂变信息 """ root_source_id_tuple = tuple(json.loads(root_source_id_list)) query = f""" select sum(uv) as 'uv', sum(first_uv) as 'first_uv', sum(split_uv) as 'split_uv', sum(split0) as 'T+0_fission' from changwen_data_rootsourceid where root_source_id in %s; """ return self.long_articles_client.fetch(query=query, cursor_type=DictCursor, params=(root_source_id_tuple,)) def deal(self): import pandas as pd publish_articles = self.get_publish_articles() L = [] for article in publish_articles: root_source_id_list = article['root_source_id_list'] fission_info = self.get_fission_info(root_source_id_list) article['uv'] = fission_info[0]['uv'] article['first_uv'] = fission_info[0]['first_uv'] article['split_uv'] = fission_info[0]['split_uv'] article['T+0_fission'] = fission_info[0]['T+0_fission'] L.append(article) df = pd.DataFrame(L) df.to_csv('temp.csv', index=False) if __name__ == '__main__': SaveFwhDataToDatabase().deal()