123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232 |
- """
- @author: luojunhui
- """
- import json
- import time
- import traceback
- from typing import List, Dict
- from pymysql.cursors import DictCursor
- from tqdm import tqdm
- from applications import aiditApi
- from applications import bot
- from applications import log
- from applications import WeixinSpider
- from applications.const import ArticleCollectorConst
- from applications.db import DatabaseConnector
- from applications.functions import Functions
- from config import denet_config, long_articles_config
- empty_dict = {}
- const = ArticleCollectorConst()
- functions = Functions()
- spider = WeixinSpider()
- class UpdateArticleInfoFromAIGC(object):
- """
- 从aigc获取文章信息
- """
- def __init__(self):
- self.aigc_db_client = DatabaseConnector(db_config=denet_config)
- self.long_articles_db_client = DatabaseConnector(db_config=long_articles_config)
- self.aigc_db_client.connect()
- self.long_articles_db_client.connect()
- def get_published_articles(self) -> List[Dict]:
- """
- 获取当天发布文章的List
- """
- sql = f"""
- SELECT trace_id, push_type
- FROM long_articles_published_trace_id
- WHERE create_timestamp > UNIX_TIMESTAMP(DATE_SUB(CURDATE(), INTERVAL 1 DAY)) AND status = {const.INIT_STATUS};
- """
- article_list = self.long_articles_db_client.fetch(sql, cursor_type=DictCursor)
- return article_list
- def get_article_info_from_aigc(self, trace_id: str) -> Dict:
- """
- 从aigc获取发布结果
- """
- sql = f"""
- SELECT t2.crawler_channel_content_id, t2.publish_stage_url, t2.publish_timestamp, t1.result_data
- from publish_content_miniprogram t1
- join publish_content t2 on t1.publish_content_id = t2.id
- where t1.trace_id = '{trace_id}' and t2.status = {const.PUBLISHED_STATUS};
- """
- article_info = self.aigc_db_client.fetch(sql, cursor_type=DictCursor)
- if article_info:
- return article_info[0]
- else:
- return empty_dict
- def get_article_info_by_trace_id(self, trace_id: str) -> Dict:
- """
- 通过trace_id来查询文章信息
- """
- select_sql = f"""
- SELECT t1.gh_id, t1.account_name, t2.article_title
- FROM long_articles_match_videos t1
- JOIN long_articles_text t2
- ON t1.content_id = t2.content_id
- WHERE t1.trace_id = '{trace_id}';
- """
- article_info = self.long_articles_db_client.fetch(select_sql, cursor_type=DictCursor)
- if article_info:
- return article_info[0]
- else:
- return empty_dict
- def update_each_article(self, article: Dict):
- """
- 更新每个文章的信息
- """
- trace_id = article["trace_id"]
- push_type = article["push_type"]
- article_info = self.get_article_info_from_aigc(trace_id)
- if article_info:
- channel_content_id = article_info["crawler_channel_content_id"]
- published_url = article_info["publish_stage_url"]
- publish_timestamp = int(article_info["publish_timestamp"] / 1000)
- result_data = json.loads(article_info["result_data"])
- root_source_id_list = [
- functions.extract_path(item["productionPath"])["root_source_id"] for item in result_data
- ]
- wx_sn = None
- if published_url:
- response = spider.get_article_text(content_link=published_url)
- code = response['code']
- match code:
- case const.ARTICLE_SUCCESS_CODE:
- long_url = response['data']['data']['content_link']
- wx_sn = functions.extract_params_from_url(url=long_url, key="sn")
- status = const.SUCCESS_STATUS
- case const.ARTICLE_DELETE_CODE:
- log(
- task="update_article_info_from_aigc",
- function="update_each_article",
- status="fail",
- message=trace_id,
- data={
- "msg": "文章被删文",
- "publish_timestamp": publish_timestamp,
- "article_delete_timestamp": int(time.time()),
- "duration": int(time.time()) - publish_timestamp
- }
- )
- status = const.FAIL_STATUS
- case const.ARTICLE_ILLEGAL_CODE:
- log(
- task="update_article_info_from_aigc",
- function="update_each_article",
- status="fail",
- message=trace_id,
- data={
- "msg": "文章被判断违规",
- "publish_timestamp": publish_timestamp,
- "illegal_timestamp": int(time.time()),
- "duration": int(time.time()) - publish_timestamp
- }
- )
- article_info = self.get_article_info_by_trace_id(trace_id)
- if article_info:
- error_detail = response.get("msg")
- insert_sql = f"""
- INSERT IGNORE INTO illegal_articles
- (gh_id, account_name, title, wx_sn, publish_date, illegal_reason)
- VALUES
- (%s, %s, %s, %s, %s, %s);
- """
- affected_rows = self.long_articles_db_client.save(
- query=insert_sql,
- params=(
- article_info['gh_id'],
- article_info['account_name'],
- article_info['article_title'],
- wx_sn,
- functions.timestamp_to_str(publish_timestamp),
- error_detail
- )
- )
- if affected_rows:
- bot(
- title="文章违规告警(new task)",
- detail={
- "account_name": article_info['account_name'],
- "gh_id": article_info['gh_id'],
- "title": article_info['article_title'],
- "wx_sn": wx_sn,
- "publish_date": functions.timestamp_to_str(publish_timestamp),
- "error_detail": error_detail,
- },
- mention=False
- )
- aiditApi.delete_articles(
- gh_id=article_info['gh_id'],
- title=article_info['article_title']
- )
- status = const.FAIL_STATUS
- case _:
- status = const.FAIL_STATUS
- else:
- if push_type == const.BULK_AUTO_PUSH:
- status = const.INIT_STATUS
- else:
- status = const.SUCCESS_STATUS
- update_sql = f"""
- UPDATE long_articles_published_trace_id
- SET published_url = %s, status = %s, wx_sn = %s, publish_timestamp = %s, crawler_channel_content_id = %s, root_source_id_list = %s
- WHERE trace_id = %s;
- """
- self.long_articles_db_client.save(
- query=update_sql,
- params=(published_url, status, wx_sn, publish_timestamp, channel_content_id, json.dumps(root_source_id_list), trace_id)
- )
- else:
- update_sql = f"""
- UPDATE long_articles_published_trace_id
- SET status = %s
- WHERE trace_id = %s;
- """
- self.long_articles_db_client.save(
- query=update_sql, params=(const.FAIL_STATUS, trace_id)
- )
- def deal(self):
- """
- main function
- """
- article_list = self.get_published_articles()
- log(
- task="update_article_info_from_aigc",
- function="deal",
- data=article_list,
- message="total got {} articles this time".format(len(article_list))
- )
- for article in tqdm(article_list, desc="更新文章信息"):
- try:
- self.update_each_article(article)
- except Exception as e:
- log(
- task="update_article_info_from_aigc",
- function="update_each_article",
- status="fail",
- message="update_article_fail",
- data={
- "trace_id": article["trace_id"],
- "error": str(e),
- "traceback": traceback.format_exc()
- }
- )
|