| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431 |
- import asyncio
- import json
- import time
- import urllib.parse
- from datetime import datetime
- from typing import Optional, List, Dict
- from tqdm import tqdm
- from applications.api import feishu_robot
- from applications.utils import str_to_md5, days_remaining_in_month
- from applications.crawler.wechat import get_article_detail
- class RecycleFwhGroupPublishArticlesConst:
- # 状态Code
- INIT_STATUS = 0
- PROCESSING_STATUS = 1
- SUCCESS_STATUS = 2
- FAILED_STATUS = 99
- # 群发成功状态
- PUBLISH_SUCCESS_STATUS = 2
- # 阅读量更新区间
- STAT_PERIOD = 3 * 24 * 3600
- # 文章违规状态
- ARTICLE_ILLEGAL_CODE = 25012
- # 未使用的账号
- NOT_USED_SERVER_ACCOUNT = {"gh_84e744b16b3a", "gh_5855bed97938", "gh_61a72b720de3"}
- class RecycleFwhGroupPublishArticlesBase(RecycleFwhGroupPublishArticlesConst):
- def __init__(self, pool, log_client):
- self.pool = pool
- self.log_client = log_client
- async def get_server_group_publish_accounts(self) -> List[str]:
- query = "select gzh_id from article_gzh_developer;"
- fetch_response = await self.pool.async_fetch(
- query=query, db_name="piaoquan_crawler"
- )
- return [
- i["gzh_id"]
- for i in fetch_response
- if i["gzh_id"] not in self.NOT_USED_SERVER_ACCOUNT
- ]
- async def get_account_name(self, gh_id: str) -> Optional[str]:
- query = """select account_name from long_articles_group_send_result where gh_id = %s limit 1;"""
- fetch_response = await self.pool.async_fetch(query=query, params=(gh_id,))
- if fetch_response:
- return fetch_response[0]["account_name"]
- return None
- # 违规文章报警
- async def alert_illegal_article(
- self,
- gh_id: str,
- account_name: str,
- group_id: str,
- illegal_msg: str,
- publish_date: str,
- ):
- await feishu_robot.bot(
- title="服务号文章违规告警,请前往微信公众平台处理",
- detail={
- "account_name": account_name,
- "gh_id": gh_id,
- "group_id": group_id,
- "illegal_msg": illegal_msg,
- "publish_date": str(publish_date),
- },
- env="server_account_publish_monitor",
- )
- class RecordFwhGroupPublishArticles(RecycleFwhGroupPublishArticlesBase):
- # 获取服务号发文细节
- async def get_group_published_articles(self):
- query = """
- SELECT
- id, publish_content_id, gh_id, user_group_id
- FROM long_articles_group_send_result
- WHERE status = %s AND recycle_status = %s;
- """
- return await self.pool.async_fetch(
- query=query, params=(self.PUBLISH_SUCCESS_STATUS, self.INIT_STATUS)
- )
- # 通过回调结果获取分组群发文章的回调信息
- async def get_article_call_back_from_aigc(
- self, publish_content_id: str, user_group_id: str
- ) -> Optional[Dict]:
- query = """
- SELECT t1.publish_stage_url
- FROM publish_content_stage_url t1
- LEFT JOIN publish_content t2 ON t1.publish_content_id = t2.id
- WHERE t1.publish_content_id = %s AND t1.user_group_id = %s;
- """
- fetch_response = await self.pool.async_fetch(
- query=query, db_name="aigc", params=(publish_content_id, user_group_id)
- )
- if fetch_response:
- return fetch_response[0]
- return None
- # 更新文章回收状态
- async def update_recycle_status(
- self, record_id: int, ori_status: int, new_status: int
- ) -> int:
- query = """
- UPDATE long_articles_group_send_result
- SET recycle_status = %s
- WHERE id = %s AND recycle_status = %s;
- """
- return await self.pool.async_save(
- query=query, params=(new_status, record_id, ori_status)
- )
- # 为文章写入链接
- async def set_article_url(self, record_id: int, url: str) -> int:
- query = """
- UPDATE long_articles_group_send_result
- SET url = %s, recycle_status = %s
- WHERE id = %s and recycle_status = %s;
- """
- return await self.pool.async_save(
- query=query, params=(url, self.SUCCESS_STATUS, record_id, self.INIT_STATUS)
- )
- async def deal(self):
- group_published_articles = await self.get_group_published_articles()
- for record in tqdm(group_published_articles):
- record_id = record["id"]
- publish_content_id = record["publish_content_id"]
- user_group_id = record["user_group_id"]
- # lock task
- acquire_lock = await self.update_recycle_status(
- record_id, self.INIT_STATUS, self.PROCESSING_STATUS
- )
- if not acquire_lock:
- continue
- # get article link from aigc
- call_back_response = await self.get_article_call_back_from_aigc(
- publish_content_id, user_group_id
- )
- if call_back_response:
- article_link = call_back_response["publish_stage_url"]
- if article_link:
- await self.set_article_url(record_id, article_link)
- else:
- # unlock
- await self.update_recycle_status(
- record_id, self.PROCESSING_STATUS, self.FAILED_STATUS
- )
- else:
- # unlock
- await self.update_recycle_status(
- record_id, self.PROCESSING_STATUS, self.FAILED_STATUS
- )
- class SaveFwhDataToDatabase(RecycleFwhGroupPublishArticlesBase):
- # 更新阅读量
- async def update_read_count(self, wx_sn, read_count):
- query = """
- UPDATE official_articles_v2
- SET show_view_count = %s
- WHERE wx_sn = %s;
- """
- return await self.pool.async_save(
- query=query, db_name="piaoquan_crawler", params=(read_count, wx_sn)
- )
- # 保存文章数据到数据库
- async def save_data_to_database(self, article):
- query = f"""
- insert into official_articles_v2
- (ghId, accountName, appMsgId, title, Type, createTime, updateTime, ItemIndex, ContentUrl, show_view_count,
- wx_sn, title_md5, article_group, channel_content_id, root_source_id_list, publish_timestamp)
- values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
- """
- return await self.pool.async_save(
- query=query, params=article, db_name="piaoquan_crawler"
- )
- # 获取需要更新的分组群发文章
- async def get_stat_published_articles(self, gh_id):
- earliest_timestamp = int(time.time()) - self.STAT_PERIOD
- fetch_query = f"""
- select publish_date, account_name, gh_id, user_group_id, url, publish_timestamp
- from long_articles_group_send_result
- where gh_id = %s and recycle_status = %s and create_time > %s;
- """
- earliest_time = datetime.fromtimestamp(earliest_timestamp).strftime(
- "%Y-%m-%d %H:%M:%S"
- )
- return await self.pool.async_fetch(
- query=fetch_query, params=(gh_id, self.SUCCESS_STATUS, earliest_time)
- )
- # 处理某个账号
- async def process_each_fwh_account(self, gh_id):
- stat_articles = await self.get_stat_published_articles(gh_id)
- if not stat_articles:
- return
- for article in stat_articles:
- account_name = article["account_name"]
- user_group_id = article["user_group_id"]
- url = article["url"]
- publish_date = article["publish_date"]
- # get article detail info with spider
- try:
- article_detail_info = await get_article_detail(
- article_link=url, is_count=True, is_cache=False
- )
- response_code = article_detail_info["code"]
- if response_code == self.ARTICLE_ILLEGAL_CODE:
- await self.alert_illegal_article(
- account_name=account_name,
- gh_id=gh_id,
- group_id=user_group_id,
- illegal_msg=article_detail_info["msg"],
- publish_date=publish_date,
- )
- await asyncio.sleep(1)
- content_url = article_detail_info["data"]["data"]["content_link"]
- app_msg_id = content_url.split("mid=")[-1].split("&")[0]
- wx_sn = content_url.split("sn=")[-1]
- publish_timestamp = int(
- article_detail_info["data"]["data"]["publish_timestamp"] / 1000
- )
- create_time = publish_timestamp
- update_time = publish_timestamp
- item_index = article_detail_info["data"]["data"]["item_index"]
- show_view_count = article_detail_info["data"]["data"]["view_count"]
- title = article_detail_info["data"]["data"]["title"]
- title_md5 = str_to_md5(title)
- channel_content_id = article_detail_info["data"]["data"][
- "channel_content_id"
- ]
- mini_program_info = article_detail_info["data"]["data"]["mini_program"]
- root_source_id_list = [
- urllib.parse.parse_qs(urllib.parse.unquote(i["path"]))[
- "rootSourceId"
- ][0]
- for i in mini_program_info["item_list"]
- ]
- root_source_id_list = json.dumps(root_source_id_list)
- try:
- await self.save_data_to_database(
- article=(
- gh_id,
- account_name,
- app_msg_id,
- title,
- "9",
- create_time,
- update_time,
- item_index,
- url,
- show_view_count,
- wx_sn,
- title_md5,
- user_group_id,
- channel_content_id,
- root_source_id_list,
- publish_timestamp,
- )
- )
- except Exception as e:
- await self.update_read_count(
- wx_sn=wx_sn, read_count=show_view_count
- )
- except Exception as e:
- print(f"article {url} is not available, skip it")
- print(e)
- # deal
- async def deal(self):
- account_id_list = await self.get_server_group_publish_accounts()
- if not account_id_list:
- return
- for account_id in account_id_list:
- await self.process_each_fwh_account(gh_id=account_id)
- class FwhGroupPublishMonitor(RecycleFwhGroupPublishArticlesBase):
- # 获取指定日期,该账号下所有群发任务的平均粉丝数
- async def get_sent_fans(self, date_string: str, gh_id: str) -> int:
- query = """
- select push_id, avg(sent_count) as 'total_sent_fans'
- from long_articles_group_send_result
- where publish_date = %s and gh_id = %s and status = %s
- group by push_id;
- """
- fetch_response = await self.pool.async_fetch(
- query=query,
- params=(date_string, gh_id, self.PUBLISH_SUCCESS_STATUS),
- )
- fans_list = [i["total_sent_fans"] for i in fetch_response]
- return sum(fans_list) if fans_list else 0
- # 获取指定日期,该账号下所有群发任务的剩余粉丝数
- async def get_remain_fans(self, gh_id: str):
- query = """
- select count(1) as 'remain_fans'
- from article_user_group
- where gzh_id = %s and is_delete = %s and remaining_count > %s;
- """
- fetch_response = await self.pool.async_fetch(query=query, params=(gh_id, 0, 0))
- response = fetch_response[0]["remain_fans"]
- return response if response else 0
- async def get_remain_publish_times(self, gh_id: str):
- """
- 获取剩余可发布次数
- """
- fetch_query = """
- select sum(remaining_count) as 'remain_publish_times'
- from article_user_group
- where gzh_id = %s and is_delete = %s;
- """
- fetch_response = await self.pool.async_fetch(
- query=fetch_query, db_name="piaoquan_crawler", params=(gh_id, 0)
- )
- response = fetch_response[0]["remain_publish_times"]
- return response if response else 0
- async def get_remain_fans_and_publish_times(self, gh_id: str, date_string: str):
- """
- 获取发布前,该账号剩余的发布次数和粉丝数
- """
- fetch_query = """
- select fans_before_publish, publish_times_before_publish
- from fwh_daily_publish_detail
- where gh_id = %s and publish_date = %s;
- """
- fetch_response = await self.pool.async_fetch(
- query=fetch_query, db_name="piaoquan_crawler", params=(gh_id, date_string)
- )
- return fetch_response[0] if fetch_response else None
- async def deal(self, date_string: str = datetime.today().strftime("%Y-%m-%d")):
- """
- monitor the publish record
- """
- now = datetime.now()
- if now.hour > 12:
- return
- gh_id_list = await self.get_server_group_publish_accounts()
- if not gh_id_list:
- return
- # get rest publish days
- remain_days = days_remaining_in_month()
- # get table columns
- columns = [
- feishu_robot.create_feishu_columns_sheet(
- sheet_type="plain_text",
- sheet_name="account_name",
- display_name="公众号名称",
- ),
- feishu_robot.create_feishu_columns_sheet(
- sheet_type="number", sheet_name="sent_fans", display_name="今日发布次数"
- ),
- feishu_robot.create_feishu_columns_sheet(
- sheet_type="number",
- sheet_name="rest_publish_times",
- display_name="发文前剩余发文次数",
- ),
- feishu_robot.create_feishu_columns_sheet(
- sheet_type="number",
- sheet_name="rest_publish_fans",
- display_name="发文前剩余发文粉丝数",
- ),
- feishu_robot.create_feishu_columns_sheet(
- sheet_type="number",
- sheet_name="remain_days",
- display_name="本月剩余天数",
- ),
- ]
- monitor_table = []
- for gh_id in gh_id_list:
- account_name = await self.get_account_name(gh_id)
- sent_fans = await self.get_sent_fans(date_string, gh_id)
- detail = await self.get_remain_fans_and_publish_times(gh_id, date_string)
- if not detail:
- await feishu_robot.bot(
- title=f"{date_string}服务号发文详情",
- detail=f"{gh_id}--{account_name} 没有发布详情",
- env="server_account_publish_monitor",
- )
- continue
- remain_fans, remain_publish_times = (
- detail["fans_before_publish"],
- detail["publish_times_before_publish"],
- )
- temp = {
- "account_name": account_name,
- "rest_publish_times": int(remain_publish_times),
- "rest_publish_fans": int(remain_fans),
- "remain_days": int(remain_days),
- "sent_fans": int(sent_fans),
- }
- monitor_table.append(temp)
- await feishu_robot.bot(
- title=f"{date_string}服务号发文详情",
- detail={"columns": columns, "rows": monitor_table},
- table=True,
- mention=False,
- env="server_account_publish_monitor",
- )
|