| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285 |
- import time, json
- import datetime
- import traceback
- import urllib.parse
- from tqdm.asyncio import tqdm
- from .recycle_daily_publish_articles import UpdateRootSourceIdAndUpdateTimeTask
- from .recycle_daily_publish_articles import Const
- from applications.crawler.wechat import get_article_list_from_account
- from applications.crawler.wechat import get_article_detail
- from applications.pipeline import insert_outside_article_into_recycle_pool
- from applications.api import feishu_robot
- class RecycleOutsideAccountArticlesTask(Const):
- def __init__(self, pool, log_client, date_string):
- self.pool = pool
- self.log_client = log_client
- self.date_string = date_string
- async def get_outside_accounts(self):
- query = """
- select
- t2.group_source_name as account_source,
- t3.name as name,
- t3.gh_id as gh_id
- from wx_statistics_group_source t1
- join wx_statistics_group_source_account t2 on t2.group_source_name = t1.account_source_name
- join publish_account t3 on t3.id = t2.account_id
- where
- t1.mode_type = '代运营服务号' and
- (
- t2.group_source_name like '%云誉%'
- or t2.group_source_name like '%微小盟%'
- or t2.group_source_name like '%阿雅达%'
- or t2.group_source_name like '%创易%'
- )
- and t3.status = 1 and t3.name != '';
- """
- return await self.pool.async_fetch(query=query, db_name="aigc")
- async def recycle_single_account(self, account):
- """recycle single account"""
- query = """
- select max(update_time) as publish_timestamp \
- from outside_account_articles
- where gh_id = %s;
- """
- response = await self.pool.async_fetch(query=query, params=(account["gh_id"],))
- if response:
- max_publish_timestamp = response[0]["publish_timestamp"]
- else:
- max_publish_timestamp = int(time.time()) - self.NEW_ACCOUNT_CRAWL_PERIOD
- cursor = None
- while True:
- response = await get_article_list_from_account(
- account_id=account["gh_id"], index=cursor
- )
- response_code = response["code"]
- match response_code:
- case self.ACCOUNT_FORBIDDEN_CODE:
- # await feishu_robot.bot(
- # title="发布账号封禁",
- # detail={
- # "账号名称": account["name"],
- # "账号id": account["gh_id"],
- # },
- # )
- return
- case self.ARTICLE_SUCCESS_CODE:
- msg_list = response.get("data", {}).get("data", [])
- if not msg_list:
- return
- await insert_outside_article_into_recycle_pool(
- self.pool, self.log_client, msg_list, account
- )
- # check last article
- last_article = msg_list[-1]
- last_publish_timestamp = last_article["AppMsg"]["BaseInfo"][
- "UpdateTime"
- ]
- if last_publish_timestamp <= max_publish_timestamp:
- return
- cursor = response["data"].get("next_cursor")
- if not cursor:
- return
- case self.CRAWL_CRASH_CODE:
- await self.log_client.log(
- contents={
- "task": "recycle_daily_publish_articles",
- "data": {
- "gh_id": account["gh_id"],
- },
- "message": "爬虫挂掉",
- "status": "fail",
- }
- )
- case _:
- return
- async def deal(self):
- subscription_accounts = await self.get_outside_accounts()
- for account in tqdm(subscription_accounts, desc="recycle each account"):
- try:
- await self.recycle_single_account(account)
- except Exception as e:
- print(
- f"{account['name']}\t{account['gh_id']}: recycle account error:", e
- )
- class UpdateOutsideRootSourceIdAndUpdateTimeTask(UpdateRootSourceIdAndUpdateTimeTask):
- def __init__(self, pool, log_client):
- super().__init__(pool, log_client)
- async def get_outside_article_list_v2(self) -> list[dict]:
- query = """
- select content_url, wx_sn
- from outside_account_articles where publish_timestamp in %s
- order by update_time desc;
- """
- article_list = await self.pool.async_fetch(
- query=query, params=(tuple([0, -1]),)
- )
- return article_list
- async def check_each_article(self, article: dict):
- url = article["content_url"]
- wx_sn = article["wx_sn"]
- try:
- response = await get_article_detail(url)
- response_code = response["code"]
- if response_code == self.ARTICLE_DELETE_CODE:
- publish_timestamp_s = self.DELETE_STATUS
- root_source_id_list = []
- elif response_code == self.ARTICLE_ILLEGAL_CODE:
- publish_timestamp_s = self.ILLEGAL_STATUS
- root_source_id_list = []
- elif response_code == self.ARTICLE_SUCCESS_CODE:
- data = response["data"]["data"]
- publish_timestamp_ms = data["publish_timestamp"]
- publish_timestamp_s = int(publish_timestamp_ms / 1000)
- mini_program = data.get("mini_program", [])
- if mini_program:
- root_source_id_list = [
- urllib.parse.parse_qs(urllib.parse.unquote(i["path"])).get(
- "rootSourceId", [""]
- )[0]
- for i in mini_program
- ]
- else:
- root_source_id_list = []
- else:
- publish_timestamp_s = self.UNKNOWN_STATUS
- root_source_id_list = []
- except Exception as e:
- publish_timestamp_s = self.REQUEST_FAIL_STATUS
- root_source_id_list = None
- error_msg = traceback.format_exc()
- await self.log_client.log(
- contents={
- "task": "get_official_article_detail",
- "data": {
- "url": url,
- "wx_sn": wx_sn,
- "error_msg": error_msg,
- "error": str(e),
- },
- "function": "check_each_article",
- "status": "fail",
- }
- )
- query = """
- update outside_account_articles set publish_timestamp = %s, root_source_id_list = %s
- where wx_sn = %s;
- """
- await self.pool.async_save(
- query=query,
- params=(
- publish_timestamp_s,
- json.dumps(root_source_id_list, ensure_ascii=False),
- wx_sn,
- ),
- )
- if publish_timestamp_s == self.REQUEST_FAIL_STATUS:
- article["wx_sn"] = wx_sn
- return article
- else:
- return None
- async def fallback_mechanism(self):
- # # 通过msgId 来修改publish_timestamp
- # update_sql = f"""
- # update outside_account_articles oaa
- # join (
- # select gh_id, app_msg_id, max(publish_timestamp) as publish_timestamp
- # from outside_account_articles
- # where publish_timestamp > %s
- # group by gh_id, app_msg_id
- # ) vv on oaa.app_msg_id = vv.app_msg_id and oaa.gh_id = vv.gh_id
- # set oaa.publish_timestamp = vv.publish_timestamp
- # where oaa.publish_timestamp <= %s;
- # """
- # affected_rows_1 = await self.pool.async_save(query=update_sql, params=(0, 0))
- # 若还是无 publish_timestamp,用update_time当作 publish_timestamp
- update_sql_2 = f"""
- update outside_account_articles
- set publish_timestamp = update_time
- where publish_timestamp < %s;
- """
- affected_rows_2 = await self.pool.async_save(query=update_sql_2, params=(0,))
- if affected_rows_2:
- await feishu_robot.bot(
- title="执行兜底修改发布时间戳",
- detail={
- # "通过msgId修改": affected_rows_1,
- "通过create_timestamp修改": affected_rows_2,
- },
- mention=False,
- )
- async def deal(self):
- task_list = await self.get_outside_article_list_v2()
- for task in tqdm(task_list, desc="get article detail step1: "):
- try:
- await self.check_each_article(task)
- except Exception as e:
- try:
- await self.log_client.log(
- contents={
- "task": "get_official_article_detail_step1",
- "data": {
- "detail": {
- "url": task["ContentUrl"],
- "wx_sn": task["wx_sn"],
- },
- "error_msg": traceback.format_exc(),
- "error": str(e),
- },
- "function": "check_each_article",
- "status": "fail",
- }
- )
- except Exception as e:
- print(e)
- print(traceback.format_exc())
- # # process_failed_task_reproduce
- # fail_tasks = await self.get_article_list()
- # fail_list = []
- # for fail_task in tqdm(fail_tasks, desc="get article detail step2: "):
- # try:
- # res = await self.check_each_article(fail_task)
- # if res:
- # fail_list.append(res)
- # except Exception as e:
- # await self.log_client.log(
- # contents={
- # "task": "get_official_article_detail_step2",
- # "data": {
- # "detail": {
- # "url": fail_task["ContentUrl"],
- # "wx_sn": fail_task["wx_sn"].decode("utf-8"),
- # },
- # "error_msg": traceback.format_exc(),
- # "error": str(e),
- # },
- # "function": "check_each_article",
- # "status": "fail",
- # }
- # )
- # if fail_list:
- # await feishu_robot.bot(title="更新文章,获取detail失败", detail=fail_list)
- #
- # current_hour = datetime.datetime.now().hour
- # if current_hour >= 21:
- # await self.fallback_mechanism()
|