Prechádzať zdrojové kódy

Merge branch '2025-01-08-monitor-task-improve' of luojunhui/LongArticlesJob into master

luojunhui 9 mesiacov pred
rodič
commit
350e7cb930

+ 108 - 0
published_articles_monitor.py

@@ -0,0 +1,108 @@
+"""
+监测已发布文章
+"""
+
+from datetime import datetime
+from argparse import ArgumentParser
+from concurrent.futures import ThreadPoolExecutor
+
+from tqdm import tqdm
+from applications import bot
+from applications.db import DatabaseConnector
+from applications.const import updatePublishedMsgTaskConst
+from applications import WeixinSpider
+from config import piaoquan_crawler_config, long_articles_config
+
+const = updatePublishedMsgTaskConst()
+spider = WeixinSpider()
+
+
+def monitor_article(article):
+    """
+    校验单篇文章是否
+    """
+    gh_id, account_name, title, url, wx_sn, publish_date = article
+    try:
+        response = spider.get_article_text(url, is_cache=False)
+        response_code = response["code"]
+        if response_code == const.ARTICLE_ILLEGAL_CODE:
+            error_detail = response.get("msg")
+            insert_sql = f"""
+                INSERT IGNORE INTO illegal_articles 
+                (gh_id, account_name, title, wx_sn, publish_date, illegal_reason)
+                VALUES 
+                (%s, %s, %s, %s, %s, %s);
+                """
+            affected_rows = long_articles_db_client.save(
+                query=insert_sql,
+                params=(gh_id, account_name, title, wx_sn, publish_date, error_detail),
+            )
+            if affected_rows:
+                bot(
+                    title="文章违规告警",
+                    detail={
+                        "account_name": account_name,
+                        "gh_id": gh_id,
+                        "title": title,
+                        "wx_sn": wx_sn.decode("utf-8"),
+                        "publish_date": str(publish_date),
+                        "error_detail": error_detail,
+                    },
+                    mention=False
+                )
+    except Exception as e:
+        print(e)
+
+
+def get_article_list(run_date):
+    """
+    监控任务, 监测周期为7天,监测文章是否被违规,若监测到违规文章,则进行告警
+    :return:
+    """
+    if not run_date:
+        run_date = datetime.today().strftime("%Y-%m-%d")
+
+    monitor_start_timestamp = (
+            int(datetime.strptime(run_date, "%Y-%m-%d").timestamp()) - const.MONITOR_PERIOD
+    )
+    select_sql = f"""
+        SELECT ghId, accountName, title, ContentUrl, wx_sn, from_unixtime(publish_timestamp) AS publish_timestamp
+        FROM official_articles_v2
+        WHERE publish_timestamp >= {monitor_start_timestamp}
+        ORDER BY publish_timestamp DESC;
+    """
+    article_list = piaoquan_crawler_db_client.fetch(select_sql)
+    return article_list
+
+
+if __name__ == "__main__":
+    parser = ArgumentParser()
+
+    parser.add_argument(
+        "--run_date",
+        help="--run_date %Y-%m-%d",
+    )
+    args = parser.parse_args()
+    if args.run_date:
+        run_date = args.run_date
+    else:
+        run_date = None
+
+    piaoquan_crawler_db_client = DatabaseConnector(db_config=piaoquan_crawler_config)
+    piaoquan_crawler_db_client.connect()
+    long_articles_db_client = DatabaseConnector(db_config=long_articles_config)
+    long_articles_db_client.connect()
+
+    # Number of concurrent threads
+    MAX_WORKERS = 4
+
+    article_list = get_article_list(run_date=None)
+
+    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
+        list(
+            tqdm(
+                executor.map(monitor_article, article_list),
+                total=len(article_list),
+                desc="Monitor Article List",
+            )
+        )

+ 1 - 1
sh/published_articles_monitor.sh

@@ -13,4 +13,4 @@ cd /root/luojunhui/LongArticlesJob
 source /root/miniconda3/etc/profile.d/conda.sh
 conda activate tasks
 
-nohup python3 updatePublishedMsgDaily.py --run_task monitor >> "${LOG_FILE}" 2>&1 &
+nohup python3 published_articles_monitor.py >> "${LOG_FILE}" 2>&1 &