123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433 |
- import json
- import time
- import datetime
- import urllib.parse
- import traceback
- from tqdm import tqdm
- from applications.api import feishu_robot
- from applications.crawler.wechat import get_article_list_from_account
- from applications.crawler.wechat import get_article_detail
- from applications.pipeline import insert_article_into_recycle_pool
- class Const:
- # 订阅号
- SUBSCRIBE_TYPE_SET = {0, 1}
- NEW_ACCOUNT_CRAWL_PERIOD = 60 * 60 * 24 * 30
- FORBIDDEN_GH_IDS = [
- "gh_4c058673c07e",
- "gh_de9f9ebc976b",
- "gh_7b4a5f86d68c",
- "gh_f902cea89e48",
- "gh_789a40fe7935",
- "gh_cd041ed721e6",
- "gh_62d7f423f382",
- "gh_043223059726",
- ]
- # 文章状态
- # 记录默认状态
- DEFAULT_STATUS = 0
- # 请求接口失败状态
- REQUEST_FAIL_STATUS = -1
- # 文章被删除状态
- DELETE_STATUS = -2
- # 未知原因无信息返回状态
- UNKNOWN_STATUS = -3
- # 文章违规状态
- ILLEGAL_STATUS = -4
- ARTICLE_ILLEGAL_CODE = 25012
- ARTICLE_DELETE_CODE = 25005
- ARTICLE_SUCCESS_CODE = 0
- ARTICLE_UNKNOWN_CODE = 10000
- class RecycleDailyPublishArticlesTask(Const):
- def __init__(self, pool, log_client, date_string):
- self.pool = pool
- self.log_client = log_client
- self.date_string = date_string
- async def get_publish_accounts(self):
- """
- get all publish accounts
- """
- query = f"""
- select distinct t3.name, t3.gh_id, t3.follower_count, t3.create_timestamp as account_init_timestamp,
- t4.service_type_info as account_type, t4.verify_type_info as account_auth, t3.id as account_id,
- group_concat(distinct t5.remark) as account_remark
- from
- publish_plan t1
- join publish_plan_account t2 on t1.id = t2.plan_id
- join publish_account t3 on t2.account_id = t3.id
- left join publish_account_wx_type t4 on t3.id = t4.account_id
- left join publish_account_remark t5 on t3.id = t5.publish_account_id
- where t1.plan_status = 1 and t1.content_modal = 3 and t3.channel = 5
- group by t3.id;
- """
- account_list, error = await self.pool.async_fetch(query, db_name="aigc")
- return [i for i in account_list if "自动回复" not in str(i["account_remark"])]
- async def get_account_status(self):
- """get account experiment status"""
- sql = f"""
- select t1.account_id, t2.status
- from wx_statistics_group_source_account t1
- join wx_statistics_group_source t2 on t1.group_source_name = t2.account_source_name;
- """
- account_status_list, error = await self.pool.async_fetch(sql, db_name="aigc")
- account_status_dict = {
- account["account_id"]: account["status"] for account in account_status_list
- }
- return account_status_dict
- async def recycle_single_account(self, account):
- """recycle single account"""
- query = f"""
- select max(publish_timestamp) as publish_timestamp from official_articles_v2 where ghId = %s;
- """
- response, error = await self.pool.async_fetch(
- query, params=(account["gh_id"],), db_name="piaoquan_crawler"
- )
- if response:
- max_publish_timestamp = response[0]["publish_timestamp"]
- else:
- max_publish_timestamp = int(time.time()) - self.NEW_ACCOUNT_CRAWL_PERIOD
- cursor = None
- while True:
- response = get_article_list_from_account(
- account_id=account["gh_id"], index=cursor
- )
- response_code = response["code"]
- match response_code:
- case 25013:
- await feishu_robot.bot(
- title="发布账号封禁",
- detail={
- "账号名称": account["name"],
- "账号id": account["gh_id"],
- },
- )
- return
- case 0:
- msg_list = response.get("data", {}).get("data", [])
- if not msg_list:
- return
- await insert_article_into_recycle_pool(
- self.pool, self.log_client, msg_list, account
- )
- # check last article
- last_article = msg_list[-1]
- last_publish_timestamp = last_article["AppMsg"]["BaseInfo"][
- "UpdateTime"
- ]
- if last_publish_timestamp <= max_publish_timestamp:
- return
- cursor = response["data"].get("next_cursor")
- if not cursor:
- return
- case _:
- return
- async def get_task_list(self):
- """recycle all publish accounts articles"""
- binding_accounts = await self.get_publish_accounts()
- # 过滤封禁账号
- binding_accounts = [
- i for i in binding_accounts if i["gh_id"] not in self.FORBIDDEN_GH_IDS
- ]
- account_status = await self.get_account_status()
- account_list = [
- {
- **item,
- "using_status": (
- 0 if account_status.get(item["account_id"]) == "实验" else 1
- ),
- }
- for item in binding_accounts
- ]
- # 订阅号
- subscription_accounts = [
- i for i in account_list if i["account_type"] in self.SUBSCRIBE_TYPE_SET
- ]
- return subscription_accounts
- async def deal(self):
- subscription_accounts = await self.get_task_list()
- for account in tqdm(subscription_accounts, desc="recycle each account"):
- try:
- await self.recycle_single_account(account)
- except Exception as e:
- print(
- f"{account['name']}\t{account['gh_id']}: recycle account error:", e
- )
- class CheckDailyPublishArticlesTask(RecycleDailyPublishArticlesTask):
- async def check_account(self, account: dict, date_string: str) -> bool:
- """check account data"""
- query = f"""
- select accountName, count(1) as publish_count
- from official_articles_v2 where ghId = %s and from_unixtime(createTime) > %s;
- """
- response, error = await self.pool.async_fetch(
- query=query,
- db_name="piaoquan_crawler",
- params=(account["gh_id"], date_string),
- )
- if error:
- await feishu_robot.bot(
- title="sql错误",
- detail={
- "task": "CheckDailyPublishArticlesTask",
- "function": "check_account",
- "account": account,
- "date_string": date_string,
- },
- mention=False,
- )
- return False
- else:
- today_publish_count = response[0]["publish_count"]
- return today_publish_count > 0
- async def deal(self):
- task_list = await self.get_task_list()
- for task in tqdm(task_list, desc="check each account step1: "):
- if await self.check_account(task, self.date_string):
- continue
- else:
- await self.recycle_single_account(task)
- # check again
- fail_list = []
- for second_task in tqdm(task_list, desc="check each account step2: "):
- if await self.check_account(second_task, self.date_string):
- continue
- else:
- second_task.pop("account_type", None)
- second_task.pop("account_auth", None)
- second_task.pop("account_id", None)
- second_task.pop("account_remark", None)
- fail_list.append(second_task)
- if fail_list:
- columns = [
- feishu_robot.create_feishu_columns_sheet(
- sheet_type="plain_text",
- sheet_name="name",
- display_name="公众号名称",
- ),
- feishu_robot.create_feishu_columns_sheet(
- sheet_type="plain_text", sheet_name="gh_id", display_name="gh_id"
- ),
- feishu_robot.create_feishu_columns_sheet(
- sheet_type="number",
- sheet_name="follower_count",
- display_name="粉丝数",
- ),
- feishu_robot.create_feishu_columns_sheet(
- sheet_type="date",
- sheet_name="account_init_timestamp",
- display_name="账号接入系统时间",
- ),
- feishu_robot.create_feishu_columns_sheet(
- sheet_type="plain_text",
- sheet_name="using_status",
- display_name="利用状态",
- ),
- ]
- await feishu_robot.bot(
- title=f"{self.date_string} 发布文章,存在未更新的账号",
- detail={"columns": columns, "rows": fail_list},
- table=True,
- mention=False,
- )
- else:
- await feishu_robot.bot(
- title=f"{self.date_string} 发布文章,所有文章更新成功",
- detail={
- "date_string": self.date_string,
- "finish_time": datetime.datetime.now().__str__(),
- },
- mention=False,
- )
- class UpdateRootSourceIdAndUpdateTimeTask(Const):
- """
- update publish_timestamp && root_source_id
- """
- def __init__(self, pool, log_client):
- self.pool = pool
- self.log_client = log_client
- async def get_article_list(self):
- query = f"""select ContentUrl, wx_sn from official_articles_v2 where publish_timestamp in %s;"""
- article_list, error = await self.pool.async_fetch(
- query=query, db_name="piaoquan_crawler", params=(tuple([0, -1]),)
- )
- return article_list
- async def check_each_article(self, article: dict):
- url = article["ContentUrl"]
- wx_sn = article["wx_sn"].decode("utf-8")
- try:
- response = get_article_detail(url)
- response_code = response["code"]
- if response_code == self.ARTICLE_DELETE_CODE:
- publish_timestamp_s = self.DELETE_STATUS
- root_source_id_list = []
- elif response_code == self.ARTICLE_ILLEGAL_CODE:
- publish_timestamp_s = self.ILLEGAL_STATUS
- root_source_id_list = []
- elif response_code == self.ARTICLE_SUCCESS_CODE:
- data = response["data"]["data"]
- publish_timestamp_ms = data["publish_timestamp"]
- publish_timestamp_s = int(publish_timestamp_ms / 1000)
- mini_program = data.get("mini_program", [])
- if mini_program:
- root_source_id_list = [
- urllib.parse.parse_qs(urllib.parse.unquote(i["path"]))[
- "rootSourceId"
- ][0]
- for i in mini_program
- ]
- else:
- root_source_id_list = []
- else:
- publish_timestamp_s = self.UNKNOWN_STATUS
- root_source_id_list = []
- except Exception as e:
- publish_timestamp_s = self.REQUEST_FAIL_STATUS
- root_source_id_list = None
- error_msg = traceback.format_exc()
- await self.log_client.log(
- contents={
- "task": "get_official_article_detail",
- "data": {
- "url": url,
- "wx_sn": wx_sn,
- "error_msg": error_msg,
- "error": str(e),
- },
- "function": "check_each_article",
- "status": "fail",
- }
- )
- query = f"""
- update official_articles_v2 set publish_timestamp = %s, root_source_id_list = %s
- where wx_sn = %s;
- """
- await self.pool.async_save(
- query=query,
- db_name="piaoquan_crawler",
- params=(
- publish_timestamp_s,
- json.dumps(root_source_id_list, ensure_ascii=False),
- wx_sn,
- ),
- )
- if publish_timestamp_s == self.REQUEST_FAIL_STATUS:
- return article
- else:
- return None
- async def fallback_mechanism(self):
- # 通过msgId 来修改publish_timestamp
- update_sql = f"""
- update official_articles_v2 oav
- join (
- select ghId, appMsgId, max(publish_timestamp) as publish_timestamp
- from official_articles_v2
- where publish_timestamp > %s
- group by ghId, appMsgId
- ) vv
- on oav.appMsgId = vv.appMsgId and oav.ghId = vv.ghId
- set oav.publish_timestamp = vv.publish_timestamp
- where oav.publish_timestamp <= %s;
- """
- await self.pool.async_save(query=update_sql, params=(0, 0), db_name="piaoquan_crawler")
- # 若还是无 publish_timestamp,用update_time当作 publish_timestamp
- update_sql_2 = f"""
- update official_articles_v2
- set publish_timestamp = updateTime
- where publish_timestamp < %s;
- """
- await self.pool.async_save(query=update_sql_2, params=0)
- async def deal(self):
- task_list = await self.get_article_list()
- for task in tqdm(task_list, desc="get article detail step1: "):
- try:
- await self.check_each_article(task)
- except Exception as e:
- try:
- await self.log_client.log(
- contents={
- "task": "get_official_article_detail_step1",
- "data": {
- "detail": {
- "url": task["ContentUrl"],
- "wx_sn": task["wx_sn"].decode("utf-8"),
- },
- "error_msg": traceback.format_exc(),
- "error": str(e),
- },
- "function": "check_each_article",
- "status": "fail",
- }
- )
- except Exception as e:
- print(e)
- print(traceback.format_exc())
- # process_failed_task_reproduce
- fail_tasks = await self.get_article_list()
- fail_list = []
- for fail_task in tqdm(fail_tasks, desc="get article detail step2: "):
- try:
- res = await self.check_each_article(fail_task)
- if res:
- fail_list.append(res)
- except Exception as e:
- await self.log_client.log(
- contents={
- "task": "get_official_article_detail_step2",
- "data": {
- "detail": {
- "url": fail_task["ContentUrl"],
- "wx_sn": fail_task["wx_sn"].decode("utf-8"),
- },
- "error_msg": traceback.format_exc(),
- "error": str(e),
- },
- "function": "check_each_article",
- "status": "fail",
- }
- )
- if fail_list:
- await feishu_robot.bot(title="更新文章,获取detail失败", detail=fail_list)
- current_hour = datetime.datetime.now().hour
- if current_hour >= 21:
- await self.fallback_mechanism()
|