123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403 |
- """
- @author: luojunhui
- @desc: 更新文章的阅读详情
- """
- import json
- import time
- import traceback
- import urllib.parse
- from datetime import datetime
- from typing import Dict, List
- from pymysql.cursors import DictCursor
- from tqdm import tqdm
- from applications import log
- from applications.api import FeishuBotApi
- from applications.db import DatabaseConnector
- from applications.utils import str_to_md5, show_desc_to_sta
- from cold_start.crawler.wechat import get_article_list_from_account
- from config import denet_config, long_articles_config, piaoquan_crawler_config
- class UpdatePublishedArticlesTaskConst:
- """
- 更新已发布文章消息常量配置
- """
- SUCCESS_CODE = 0
- # 爬虫详情接口返回code
- ARTICLE_ILLEGAL_CODE = 25012
- ARTICLE_DELETE_CODE = 25005
- ARTICLE_UNKNOWN_CODE = 10000
- # 账号违规状态码
- ACCOUNT_ILLEGAL_CODE = 25013
- UNKNOWN_SPIDER_ERROR_CODE = 20000
- # 记录默认状态
- DEFAULT_STATUS = 0
- # 请求接口失败状态
- REQUEST_FAIL_STATUS = -1
- # 文章被删除状态
- DELETE_STATUS = -2
- # 未知原因无信息返回状态
- UNKNOWN_STATUS = -3
- # 文章违规状态
- ILLEGAL_STATUS = -4
- # 公众号类型(订阅号 or 服务号)
- # 订阅号
- SUBSCRIBE_TYPE_SET = {0, 1}
- # 服务号
- SERVICE_TYPE = 2
- # 监测周期(秒)
- MONITOR_PERIOD = 60 * 60 * 24 * 3
- # 新号抓文章周期
- NEW_ACCOUNT_CRAWL_PERIOD = 60 * 60 * 24 * 30
- # 订阅号,抓取失败失败率报警阈值
- TASK_FAIL_RATE_THRESHOLD = 0.3
- # ARTICLE TABLE
- ARTICLE_TABLE_NAME = "official_articles_v2"
- # ILLEGAL_GH_IDS
- ILLEGAL_GH_IDS = [
- 'gh_4c058673c07e',
- 'gh_de9f9ebc976b',
- 'gh_7b4a5f86d68c',
- 'gh_f902cea89e48',
- 'gh_789a40fe7935',
- 'gh_cd041ed721e6',
- 'gh_62d7f423f382',
- 'gh_043223059726',
- 'gh_5bb79339a1f4'
- ]
- class UpdatePublishedArticlesTaskBase(UpdatePublishedArticlesTaskConst):
- def __init__(self):
- self.crawler_client = DatabaseConnector(piaoquan_crawler_config)
- self.long_articles_client = DatabaseConnector(long_articles_config)
- self.denet_client = DatabaseConnector(denet_config)
- self.crawler_client.connect()
- self.long_articles_client.connect()
- self.denet_client.connect()
- self.feishu_bot_api = FeishuBotApi()
- def fetch_published_accounts(self) -> List[Dict]:
- """
- get published articles from aigc
- """
- fetch_query = f"""
- SELECT DISTINCT
- t3.`name` as account_name,
- t3.gh_id,
- t3.follower_count as fans,
- t3.create_timestamp as account_init_timestamp,
- t4.service_type_info as account_type,
- t4.verify_type_info as account_auth,
- t3.id as account_id,
- group_concat(distinct t5.remark) as account_remark
- FROM
- publish_plan t1
- JOIN publish_plan_account t2 ON t1.id = t2.plan_id
- JOIN publish_account t3 ON t2.account_id = t3.id
- LEFT JOIN publish_account_wx_type t4 on t3.id = t4.account_id
- LEFT JOIN publish_account_remark t5 on t3.id = t5.publish_account_id
- WHERE
- t1.plan_status = 1
- AND t3.channel = 5
- GROUP BY t3.id;
- """
- fetch_response = self.denet_client.fetch(
- query=fetch_query, cursor_type=DictCursor
- )
- account_list = [
- i for i in fetch_response if "自动回复" not in str(i["account_remark"])
- ]
- account_list = [
- i for i in account_list if i["gh_id"] not in self.ILLEGAL_GH_IDS
- ]
- return account_list
- def fetch_account_experiment_status(self) -> Dict[str, str]:
- fetch_query = f"""
- SELECT t1.account_id, t2.status
- FROM wx_statistics_group_source_account t1
- JOIN wx_statistics_group_source t2
- ON t1.group_source_name = t2.account_source_name;
- """
- account_status_list = self.denet_client.fetch(
- query=fetch_query, cursor_type=DictCursor
- )
- account_status_dict = {
- account["account_id"]: account["status"] for account in account_status_list
- }
- return account_status_dict
- def get_account_list(self) -> List[Dict]:
- account_without_status = self.fetch_published_accounts()
- account_status_dict = self.fetch_account_experiment_status()
- account_list = [
- {
- **item,
- "using_status": (
- 0 if account_status_dict.get(item["account_id"]) == "实验" else 1
- ),
- }
- for item in account_without_status
- ]
- return account_list
- def fetch_account_max_publish_timestamp(self, gh_id: str) -> int:
- # get max published timestamp for this account
- fetch_query = f"""
- SELECT MAX(publish_timestamp) AS max_publish_timestamp
- FROM {self.ARTICLE_TABLE_NAME}
- WHERE ghId = %s;
- """
- fetch_response = self.crawler_client.fetch(
- fetch_query, cursor_type=DictCursor, params=(gh_id,)
- )
- if fetch_response:
- max_publish_timestamp = fetch_response[0]["max_publish_timestamp"]
- else:
- max_publish_timestamp = int(time.time()) - self.NEW_ACCOUNT_CRAWL_PERIOD
- return max_publish_timestamp
- def crawl_account_published_articles(self, account: Dict[str, str]):
- # max_publish_timestamp = self.fetch_account_max_publish_timestamp(
- # account["gh_id"]
- # )
- cursor = None
- while True:
- crawl_response = get_article_list_from_account(
- account["gh_id"], index=cursor
- )
- crawl_response_code = crawl_response["code"]
- match crawl_response_code:
- # 请求成功
- case self.SUCCESS_CODE:
- print("success")
- break
- # msg_list = crawl_response.get("data", {}).get("data", [])
- # if not msg_list:
- # break
- #
- # self.record_each_msg(account, msg_list)
- # earliest_msg = msg_list[-1]
- # earliest_update_timestamp = earliest_msg["AppMsg"]["BaseInfo"][
- # "UpdateTime"
- # ]
- # if earliest_update_timestamp > max_publish_timestamp:
- # cursor = crawl_response["data"]["next_cursor"]
- # else:
- # break
- # 账号违规
- case self.ACCOUNT_ILLEGAL_CODE:
- log(
- task="update_published_articles",
- function="crawl_account_published_articles",
- message="账号违规",
- data=account,
- )
- self.feishu_bot_api.bot(
- title="公众号账号违规报警",
- detail={
- "账号名称": account["account_name"],
- "账号ID": account["gh_id"],
- "违规原因": crawl_response["msg"],
- "粉丝数": account["fans"],
- "利用状态": account["using_status"]
- },
- env="gzh_monitor_bot",
- mention=False
- )
- break
- case self.UNKNOWN_SPIDER_ERROR_CODE:
- log(
- task="update_published_articles",
- function="crawl_account_published_articles",
- message=f"未知错误",
- data=account,
- )
- self.feishu_bot_api.bot(
- title="接口请求失败报警",
- detail={
- "账号名称": account["account_name"],
- "账号ID": account["gh_id"],
- "违规原因": crawl_response["msg"],
- "粉丝数": account["fans"],
- "利用状态": account["using_status"]
- },
- env="dev",
- mention=False
- )
- break
- # 其他 code
- case _:
- print("unknown code:", crawl_response_code)
- break
- def record_each_msg(self, account, msg_list):
- for msg in msg_list:
- base_info = msg.get("BaseInfo", {})
- app_msg_id = msg.get("AppMsg", {}).get("BaseInfo", {}).get("AppMsgId", None)
- create_timestamp = (
- msg.get("AppMsg", {}).get("BaseInfo", {}).get("CreateTime", None)
- )
- update_timestamp = (
- msg.get("AppMsg", {}).get("BaseInfo", {}).get("UpdateTime", None)
- )
- publish_type = msg.get("AppMsg", {}).get("BaseInfo", {}).get("Type", None)
- detail_article_list = msg.get("AppMsg", {}).get("DetailInfo", [])
- if detail_article_list:
- for article in detail_article_list:
- title = article.get("Title", None)
- digest = article.get("Digest", None)
- item_index = article.get("ItemIndex", None)
- content_url = article.get("ContentUrl", None)
- source_url = article.get("SourceUrl", None)
- cover_img_url = article.get("CoverImgUrl", None)
- cover_img_url_1_1 = article.get("CoverImgUrl_1_1", None)
- cover_img_url_235_1 = article.get("CoverImgUrl_235_1", None)
- item_show_type = article.get("ItemShowType", None)
- is_original = article.get("IsOriginal", None)
- show_desc = article.get("ShowDesc", None)
- show_stat = show_desc_to_sta(show_desc)
- ori_content = article.get("ori_content", None)
- show_view_count = show_stat.get("show_view_count", 0)
- show_like_count = show_stat.get("show_like_count", 0)
- show_zs_count = show_stat.get("show_zs_count", 0)
- show_pay_count = show_stat.get("show_pay_count", 0)
- wx_sn = (
- content_url.split("&sn=")[1].split("&")[0]
- if content_url
- else None
- )
- status = account["using_status"]
- info_tuple = (
- account["gh_id"],
- account["account_name"],
- app_msg_id,
- title,
- publish_type,
- create_timestamp,
- update_timestamp,
- digest,
- item_index,
- content_url,
- source_url,
- cover_img_url,
- cover_img_url_1_1,
- cover_img_url_235_1,
- item_show_type,
- is_original,
- show_desc,
- ori_content,
- show_view_count,
- show_like_count,
- show_zs_count,
- show_pay_count,
- wx_sn,
- json.dumps(base_info, ensure_ascii=False),
- str_to_md5(title),
- status,
- )
- try:
- insert_sql = f"""
- INSERT INTO {self.ARTICLE_TABLE_NAME}
- (ghId, accountName, appMsgId, title, Type, createTime, updateTime, Digest, ItemIndex, ContentUrl, SourceUrl, CoverImgUrl, CoverImgUrl_1_1, CoverImgUrl_255_1, ItemShowType, IsOriginal, ShowDesc, ori_content, show_view_count, show_like_count, show_zs_count, show_pay_count, wx_sn, baseInfo, title_md5, status)
- VALUES
- (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
- """
- self.crawler_client.save(query=insert_sql, params=info_tuple)
- log(
- task="updatePublishedMsgDaily",
- function="insert_each_msg",
- message="插入文章数据成功",
- data={"info": info_tuple},
- )
- except Exception as e:
- try:
- update_sql = f"""
- UPDATE {self.ARTICLE_TABLE_NAME}
- SET show_view_count = %s, show_like_count=%s
- WHERE wx_sn = %s;
- """
- self.crawler_client.save(
- query=update_sql,
- params=(show_view_count, show_like_count, wx_sn),
- )
- log(
- task="updatePublishedMsgDaily",
- function="insert_each_msg",
- message="更新文章数据成功",
- data={
- "wxSn": wx_sn,
- "likeCount": show_like_count,
- "viewCount": show_view_count,
- },
- )
- except Exception as e:
- log(
- task="updatePublishedMsgDaily",
- function="insert_each_msg",
- message="更新文章失败, 报错原因是: {}".format(e),
- status="fail",
- )
- continue
- class UpdatePublishedArticlesTaskCollector(UpdatePublishedArticlesTaskBase):
- def deal(self):
- account_list = self.get_account_list()
- for account in tqdm(account_list, desc="抓取每个账号的文章信息"):
- try:
- self.crawl_account_published_articles(account)
- except Exception as e:
- log(
- task="update_published_articles_collector",
- function="crawl_account_published_articles",
- message=f"抓取账号文章信息失败, 报错原因是: {e}",
- status="fail",
- data={
- "account": account,
- "error": traceback.format_exc(),
- }
- )
- # self.feishu_bot_api.bot(
- # title='更新每日发布文章任务完成通知',
- # detail={
- # "msg": "账号更新完成",
- # "finish_time": datetime.today().__str__()
- # },
- # mention=False
- # )
- class UpdatePublishedArticlesTaskChecker(UpdatePublishedArticlesTaskBase):
- def deal(self):
- pass
- class UpdatePublishedArticlesTaskArticlesMonitor(UpdatePublishedArticlesTaskBase):
- pass
|