123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246 |
- import json
- import time
- import urllib.parse
- from tqdm import tqdm
- from pymysql.cursors import DictCursor
- from applications.db import DatabaseConnector
- from applications.utils import str_to_md5
- from cold_start.crawler.wechat import get_article_detail
- from config import denet_config, long_articles_config, piaoquan_crawler_config
- class FwhDataRecycle:
- RECYCLE_INIT_STATUS = 0
- RECYCLE_PROCESSING_STATUS = 1
- RECYCLE_SUCCESS_STATUS = 2
- RECYCLE_FAILED_STATUS = 99
- PUBLISH_SUCCESS_STATUS = 2
- STAT_PERIOD = 3 * 24 * 3600
- def __init__(self):
- self.denet_client = DatabaseConnector(denet_config)
- self.denet_client.connect()
- self.long_articles_client = DatabaseConnector(long_articles_config)
- self.long_articles_client.connect()
- self.piaoquan_client = DatabaseConnector(piaoquan_crawler_config)
- self.piaoquan_client.connect()
- class FwhGroupPublishRecordManager(FwhDataRecycle):
- def get_published_articles(self):
- fetch_query = f"""
- select id, publish_content_id, gh_id, user_group_id
- from long_articles_group_send_result
- where status = %s and recycle_status = %s;
- """
- fetch_response = self.long_articles_client.fetch(
- query=fetch_query,
- cursor_type=DictCursor,
- params=(self.PUBLISH_SUCCESS_STATUS, self.RECYCLE_INIT_STATUS),
- )
- return fetch_response
- def get_article_url_from_aigc_system(self, publish_content_id, user_group_id):
- sql = f"""
- select t1.publish_stage_url, t2.publish_timestamp
- from publish_content_stage_url t1
- left join publish_content t2 on t1.publish_content_id = t2.id
- where t1.publish_content_id = %s and t1.user_group_id = %s;
- """
- article_info = self.denet_client.fetch(
- sql,
- cursor_type=DictCursor,
- params=(publish_content_id, user_group_id),
- )
- if article_info:
- return article_info[0]
- else:
- return None
- def update_recycle_status(self, record_id, ori_status, new_status):
- update_query = f"""
- update long_articles_group_send_result
- set recycle_status = %s
- where id = %s and recycle_status = %s;
- """
- return self.long_articles_client.save(
- update_query, (new_status, record_id, ori_status)
- )
- def set_article_url(self, record_id, article_url, publish_timestamp):
- update_query = f"""
- update long_articles_group_send_result
- set url = %s, publish_timestamp = %s, recycle_status = %s
- where id = %s and recycle_status = %s;
- """
- return self.long_articles_client.save(
- query=update_query,
- params=(article_url, publish_timestamp, self.RECYCLE_SUCCESS_STATUS, record_id, self.RECYCLE_PROCESSING_STATUS)
- )
- def deal(self):
- publish_records = self.get_published_articles()
- for publish_record in tqdm(publish_records):
- publish_content_id = publish_record["publish_content_id"]
- record_id = publish_record["id"]
- group_id = publish_record["user_group_id"]
- # lock
- self.update_recycle_status(record_id, self.RECYCLE_INIT_STATUS, self.RECYCLE_PROCESSING_STATUS)
- publish_call_back_info = self.get_article_url_from_aigc_system(publish_content_id, group_id)
- if publish_call_back_info:
- article_url = publish_call_back_info["publish_stage_url"]
- publish_timestamp = int(publish_call_back_info["publish_timestamp"] / 1000)
- if article_url and publish_timestamp:
- # set record and unlock
- self.set_article_url(record_id, article_url, publish_timestamp)
- else:
- # unlock
- self.update_recycle_status(record_id, self.RECYCLE_PROCESSING_STATUS, self.RECYCLE_INIT_STATUS)
- else:
- # unlock
- self.update_recycle_status(record_id, self.RECYCLE_PROCESSING_STATUS, self.RECYCLE_INIT_STATUS)
- class SaveFwhDataToDatabase(FwhDataRecycle):
- def update_article_read_cnt(self, wx_sn, new_read_cnt):
- """
- update article read cnt
- """
- update_query = f"""
- update official_articles_v2
- set show_view_count = %s
- where wx_sn = %s;
- """
- return self.piaoquan_client.save(update_query, (new_read_cnt, wx_sn))
- def save_data_to_database(self, article):
- """
- save data to db
- """
- insert_query = f"""
- insert into official_articles_v2
- (ghId, accountName, appMsgId, title, Type, createTime, updateTime, ItemIndex, ContentUrl, show_view_count,
- wx_sn, title_md5, article_group, channel_content_id, root_source_id_list, publish_timestamp)
- values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
- """
- return self.piaoquan_client.save(insert_query, article)
- def get_group_server_accounts(self):
- fetch_query = f"""
- select gzh_id from article_gzh_developer;
- """
- fetch_response = self.piaoquan_client.fetch(fetch_query, cursor_type=DictCursor)
- gh_id_list = [i['gzh_id'] for i in fetch_response]
- return gh_id_list
- def get_stat_published_articles(self, gh_id):
- earliest_timestamp = int(time.time()) - self.STAT_PERIOD
- fetch_query = f"""
- select publish_date, account_name, gh_id, user_group_id, url, publish_timestamp
- from long_articles_group_send_result
- where gh_id = %s and recycle_status = %s and publish_timestamp > %s;
- """
- return self.long_articles_client.fetch(fetch_query, DictCursor,(gh_id, self.RECYCLE_SUCCESS_STATUS, earliest_timestamp))
- def process_each_account_data(self, account_published_article_list):
- if not account_published_article_list:
- return
- for article in account_published_article_list:
- account_name = article['account_name']
- gh_id = article['gh_id']
- user_group_id = article['user_group_id']
- url = article['url']
- publish_timestamp = article['publish_timestamp']
- # get article detail info with spider
- try:
- article_detail_info = get_article_detail(url, is_count=True, is_cache=False)
- time.sleep(3)
- content_url = article_detail_info["data"]["data"]["content_link"]
- app_msg_id = content_url.split("mid=")[-1].split("&")[0]
- wx_sn = content_url.split("sn=")[-1]
- create_time = publish_timestamp
- update_time = publish_timestamp
- item_index = article_detail_info["data"]["data"]["item_index"]
- show_view_count = article_detail_info["data"]["data"]["view_count"]
- title = article_detail_info["data"]["data"]["title"]
- title_md5 = str_to_md5(title)
- channel_content_id = article_detail_info["data"]["data"]["channel_content_id"]
- mini_program_info = article_detail_info["data"]["data"]["mini_program"]
- root_source_id_list = [
- urllib.parse.parse_qs(
- urllib.parse.unquote(i['path'])
- )['rootSourceId'][0]
- for i in mini_program_info
- ]
- root_source_id_list = json.dumps(root_source_id_list)
- try:
- self.save_data_to_database(
- article=(gh_id, account_name, app_msg_id, title, '9', create_time, update_time, item_index, content_url, show_view_count, wx_sn, title_md5, user_group_id, channel_content_id, root_source_id_list, publish_timestamp)
- )
- except Exception as e:
- self.update_article_read_cnt(wx_sn, show_view_count)
- except Exception as e:
- print(f"article {url} is not available, skip it")
- print(e)
- def deal(self):
- account_id_list = self.get_group_server_accounts()
- for account_id in account_id_list:
- publish_articles = tqdm(self.get_stat_published_articles(account_id), desc=f"<crawling> {account_id}")
- self.process_each_account_data(publish_articles)
- class FwhDataExportTemp(FwhDataRecycle):
- def get_publish_articles(self):
- sql = f"""
- select accountName, title, article_group, ItemIndex, show_view_count, from_unixtime(createTime, '%Y-%m-%d'), root_source_id_list
- from official_articles_v2
- where ghId in (
- select gzh_id from article_gzh_developer
- )
- and from_unixtime(publish_timestamp) between '2025-06-08' and '2025-06-09';
- """
- return self.piaoquan_client.fetch(query=sql, cursor_type=DictCursor)
- def get_fission_info(self, root_source_id_list):
- """
- 获取裂变信息
- """
- root_source_id_tuple = tuple(json.loads(root_source_id_list))
- query = f"""
- select sum(uv) as 'uv', sum(first_uv) as 'first_uv', sum(split_uv) as 'split_uv'
- from changwen_data_rootsourceid
- where root_source_id
- in %s;
- """
- return self.long_articles_client.fetch(query=query, cursor_type=DictCursor, params=(root_source_id_tuple,))
- def deal(self):
- import pandas as pd
- publish_articles = self.get_publish_articles()
- L = []
- for article in publish_articles:
- root_source_id_list = article['root_source_id_list']
- fission_info = self.get_fission_info(root_source_id_list)
- article['uv'] = fission_info[0]['uv']
- article['first_uv'] = fission_info[0]['first_uv']
- article['split_uv'] = fission_info[0]['split_uv']
- L.append(article)
- df = pd.DataFrame(L)
- df.to_csv('temp.csv', index=False)
- if __name__ == '__main__':
- FwhDataExportTemp().deal()
|