Ver código fonte

Merge branch '2024-12-06-update_published_articles_improve' of luojunhui/LongArticlesJob into master

luojunhui 10 meses atrás
pai
commit
7604c6028c

+ 20 - 1
applications/aiditApi.py

@@ -325,7 +325,7 @@ def bind_crawler_task_to_publish_task(target_publish_task_id, crawler_task_name,
                 "publishTimeInterval": publish_task_detail_data.get("publishTimeInterval"),
                 "publishWindowEnd": publish_task_detail_data.get("publishWindowEnd"),
                 "publishWindowStart": publish_task_detail_data.get("publishWindowStart"),
-                "wxContentInsert":  publish_task_detail_data.get("wxContentInsert"),
+                "wxContentInsert": publish_task_detail_data.get("wxContentInsert"),
                 "wxVideoPublishAccountSetting": publish_task_detail_data.get("wxVideoPublishAccountSetting"),
                 "scoreJudgeFlag": publish_task_detail_data.get("scoreJudgeFlag"),
                 "scoreJudgeTasks": publish_task_detail_data.get("scoreJudgeTasks"),
@@ -363,3 +363,22 @@ def bind_crawler_task_to_publish_task(target_publish_task_id, crawler_task_name,
     else:
         return
 
+
+def delete_articles(gh_id, title):
+    """
+    删除公众号文章
+    :param gh_id: 
+    :param title: 
+    :return: 
+    """
+    url = "http://101.37.174.139:80/articleAudit/titleDangerFindDelete"
+
+    payload = {
+        "ghId": gh_id,
+        'title': title
+    }
+    headers = {
+        'Content-Type': 'application/json;charset=UTF-8'
+    }
+    response = requests.request("POST", url, headers=headers, json=payload, timeout=600)
+    return response

+ 6 - 0
applications/const.py

@@ -18,8 +18,10 @@ class updatePublishedMsgTaskConst:
     更新已发布文章消息常量配置
     """
     # 爬虫详情接口返回code
+    ARTICLE_ILLEGAL_CODE = 25012
     ARTICLE_DELETE_CODE = 25005
     ARTICLE_SUCCESS_CODE = 0
+    ARTICLE_UNKNOWN_CODE = 10000
 
     # 请求爬虫详情接口状态码
     # 记录默认状态
@@ -30,12 +32,16 @@ class updatePublishedMsgTaskConst:
     DELETE_STATUS = -2
     # 未知原因无信息返回状态
     UNKNOWN_STATUS = -3
+    # 文章违规状态
+    ILLEGAL_STATUS = -4
 
     # 公众号类型(订阅号 or 服务号)
     # 订阅号
     SUBSCRIBE_TYPE_SET = {0, 1}
     # 服务号
     SERVICE_TYPE = 2
+    # 监测周期(秒)
+    MONITOR_PERIOD = 60 * 60 * 24 * 7
 
 
 class updateAccountReadRateTaskConst:

+ 4 - 2
applications/wxSpiderApi.py

@@ -35,9 +35,10 @@ class WeixinSpider(object):
         return response.json()
 
     @classmethod
-    def get_article_text(cls, content_link, is_count=False) -> dict:
+    def get_article_text(cls, content_link, is_count=False, is_cache=True) -> dict:
         """
         获取文章
+        :param is_cache:
         :param is_count:
         :param content_link:
         :return:
@@ -46,7 +47,8 @@ class WeixinSpider(object):
         payload = json.dumps({
             "content_link": content_link,
             "is_count": is_count,
-            "is_ad": False
+            "is_ad": False,
+            "is_cache": is_cache
         })
         response = requests.request("POST", url, headers=cls.headers, data=payload, timeout=120)
         return response.json()

+ 16 - 0
sh/published_articles_monitor.sh

@@ -0,0 +1,16 @@
+#!/bin/bash
+
+# 获取当前日期,格式为 YYYY-MM-DD
+CURRENT_DATE=$(date +%F)
+
+# 日志文件路径,包含日期
+LOG_FILE="/root/luojunhui/logs/article_monitor_task_log_$CURRENT_DATE.txt"
+
+# 重定向整个脚本的输出到带日期的日志文件
+exec >> "$LOG_FILE" 2>&1
+
+cd /root/luojunhui/LongArticlesJob
+source /root/miniconda3/etc/profile.d/conda.sh
+conda activate tasks
+
+nohup python3 updatePublishedMsgDaily.py --run_task monitor >> "${LOG_FILE}" 2>&1 &

+ 9 - 5
sh/run_update_published_articles_daily.sh

@@ -13,14 +13,18 @@ then
     echo "$(date '+%Y-%m-%d %H:%M:%S') - updatePublishedMsgDaily.py is running"
 else
     echo "$(date '+%Y-%m-%d %H:%M:%S') - trying to restart updatePublishedMsgDaily.py"
-    # 切换到指定目录
     cd /root/luojunhui/LongArticlesJob
-
-    # 激活 Conda 环境
     source /root/miniconda3/etc/profile.d/conda.sh
     conda activate tasks
 
-    # 在后台运行 Python 脚本并重定向日志输出
-    nohup python3 updatePublishedMsgDaily.py >> "${LOG_FILE}" 2>&1 &
+    current_time=$(date +%H:%M)
+    target_time="19:00"
+
+    if [[ "$current_time" < "$target_time" ]]; then
+        nohup python3 updatePublishedMsgDaily.py --run_task update >> "${LOG_FILE}" 2>&1 &
+        nohup python3 updatePublishedMsgDaily.py --run_task detail >> "${LOG_FILE}" 2>&1 &
+    else
+        nohup python3 updatePublishedMsgDaily.py >> "${LOG_FILE}" 2>&1 &
     echo "$(date '+%Y-%m-%d %H:%M:%S') - successfully restarted updatePublishedMsgDaily.py"
+    fi
 fi

+ 138 - 5
updatePublishedMsgDaily.py

@@ -10,13 +10,15 @@ import traceback
 import urllib.parse
 from tqdm import tqdm
 from datetime import datetime
+from argparse import ArgumentParser
 
-
-from applications import PQMySQL, WeixinSpider, Functions, log, bot, aiditApi
+from applications import PQMySQL, WeixinSpider, Functions, log, bot, aiditApi, longArticlesMySQL
 from applications.const import updatePublishedMsgTaskConst
 
 ARTICLE_TABLE = "official_articles_v2"
 const = updatePublishedMsgTaskConst()
+spider = WeixinSpider()
+functions = Functions()
 
 
 def get_account_using_status():
@@ -418,6 +420,10 @@ def check_job():
     for sub_item in tqdm(account_list):
         res = check_single_account(db_client, sub_item)
         if not res:
+            sub_item.drop('account_type')
+            sub_item.drop('account_auth')
+            init_timestamp = sub_item.pop('account_init_timestamp')
+            sub_item['account_init_date'] = datetime.fromtimestamp(init_timestamp).strftime('%Y-%m-%d %H:%M:%S')
             fail_list.append(sub_item)
     if fail_list:
         try:
@@ -467,6 +473,9 @@ def update_publish_timestamp(db_client, row):
         if response_code == const.ARTICLE_DELETE_CODE:
             publish_timestamp_s = const.DELETE_STATUS
             root_source_id_list = []
+        elif response_code == const.ARTICLE_ILLEGAL_CODE:
+            publish_timestamp_s = const.ILLEGAL_STATUS
+            root_source_id_list = []
         elif response_code == const.ARTICLE_SUCCESS_CODE:
             data = response['data']['data']
             publish_timestamp_ms = data['publish_timestamp']
@@ -582,14 +591,138 @@ def get_article_detail_job():
         )
 
 
+def whether_title_unsafe(db_client, title):
+    """
+    检查文章标题是否已经存在违规记录
+    :param db_client:
+    :param title:
+    :return:
+    """
+    title_md5 = functions.str_to_md5(title)
+    sql = f"""
+        SELECT title_md5
+        FROM article_unsafe_title
+        WHERE title_md5 = '{title_md5}';
+    """
+    res = db_client.select(sql)
+    if res:
+        return True
+    else:
+        return False
+
+
+def monitor(run_date):
+    """
+    监控任务, 监测周期为7天,监测文章是否被违规,若监测到违规文章,则进行告警
+    :return:
+    """
+    try:
+        pq_client = PQMySQL()
+        lam_client = longArticlesMySQL()
+    except Exception as e:
+        error_msg = traceback.format_exc()
+        bot(
+            title="监控任务连接数据库失败",
+            detail={
+                "job": "monitor",
+                "error": str(e),
+                "msg": error_msg
+            }
+        )
+        return
+
+    if not run_date:
+        run_date = datetime.today().strftime("%Y-%m-%d")
+
+    monitor_start_timestamp = int(datetime.strptime(run_date, "%Y-%m-%d").timestamp()) - const.MONITOR_PERIOD
+    select_sql = f"""
+        SELECT ghId, accountName, title, ContentUrl, wx_sn, from_unixtime(publish_timestamp) AS publish_timestamp
+        FROM {ARTICLE_TABLE}
+        WHERE publish_timestamp >= {monitor_start_timestamp};
+    """
+    article_list = pq_client.select(select_sql)
+    for article in tqdm(article_list, desc="monitor article list"):
+        gh_id = article[0]
+        account_name = article[1]
+        title = article[2]
+        # 判断标题是否存在违规记录
+        if whether_title_unsafe(lam_client, title):
+            continue
+        url = article[3]
+        wx_sn = article[4]
+        publish_date = article[5]
+        try:
+            response = spider.get_article_text(url, is_cache=False)
+            response_code = response['code']
+            if response_code == const.ARTICLE_ILLEGAL_CODE:
+                bot(
+                    title="文章违规告警",
+                    detail={
+                        "ghId": gh_id,
+                        "accountName": account_name,
+                        "title": title,
+                        "wx_sn": str(wx_sn),
+                        "publish_date": str(publish_date)
+                    },
+                    mention=False
+                )
+                aiditApi.delete_articles(
+                    gh_id=gh_id,
+                    title=title
+                )
+        except Exception as e:
+            error_msg = traceback.format_exc()
+            log(
+                task="monitor",
+                function="monitor",
+                message="请求文章详情失败",
+                data={
+                    "ghId": gh_id,
+                    "accountName": account_name,
+                    "title": title,
+                    "wx_sn": str(wx_sn),
+                    "error": str(e),
+                    "msg": error_msg
+                }
+            )
+
+
 def main():
     """
     main
     :return:
     """
-    update_job()
-    check_job()
-    get_article_detail_job()
+    parser = ArgumentParser()
+    parser.add_argument(
+        "--run_task",
+        help="update: update_job, check: check_job, detail: get_article_detail_job, monitor: monitor")
+    parser.add_argument(
+        "--run_date",
+        help="--run_date %Y-%m-%d",
+    )
+    args = parser.parse_args()
+
+    if args.run_task:
+        run_task = args.run_task
+        match run_task:
+            case "update":
+                update_job()
+            case "check":
+                check_job()
+            case "detail":
+                get_article_detail_job()
+            case "monitor":
+                if args.run_date:
+                    run_date = args.run_date
+                else:
+                    run_date = None
+                monitor(run_date)
+            case _:
+                print("No such task, input update: update_job, check: check_job, detail: get_article_detail_job")
+    else:
+        update_job()
+        check_job()
+        get_article_detail_job()
 
 
 if __name__ == '__main__':