Parcourir la source

长文任务优化

luojunhui il y a 1 mois
Parent
commit
5519facfaf
6 fichiers modifiés avec 121 ajouts et 184 suppressions
  1. 5 0
      applications/api/feishu_api.py
  2. 0 9
      crawler_sph_video.py
  3. 0 18
      fwh_data_manager.py
  4. 8 1
      long_articles_job.py
  5. 0 26
      schedule_app.py
  6. 108 130
      sh/run_long_articles_job.sh

+ 5 - 0
applications/api/feishu_api.py

@@ -15,6 +15,9 @@ class Feishu:
     # 测试环境报警机器人
     long_articles_bot_dev = "https://open.feishu.cn/open-apis/bot/v2/hook/f32c0456-847f-41f3-97db-33fcc1616bcd"
 
+    # 长文任务报警群
+    long_articles_task = "https://open.feishu.cn/open-apis/bot/v2/hook/223b3d72-f2e8-40e0-9b53-6956e0ae7158"
+
     def __init__(self):
         self.token = None
         self.headers = {"Content-Type": "application/json"}
@@ -197,6 +200,8 @@ class FeishuBotApi(Feishu):
                 url = self.outside_gzh_monitor_bot
             case "server_account_publish_monitor":
                 url = self.server_account_publish_monitor_bot
+            case "long_articles_task":
+                url = self.long_articles_task
             case _:
                 url = self.long_articles_bot_dev
 

+ 0 - 9
crawler_sph_video.py

@@ -1,9 +0,0 @@
-"""
-@author: luojunhui
-"""
-
-from tasks.crawler_tasks.crawler_video.crawler_sph_videos import CrawlerChannelAccountVideos
-
-if __name__ == "__main__":
-    crawler_channel_account_videos = CrawlerChannelAccountVideos()
-    crawler_channel_account_videos.deal()

+ 0 - 18
fwh_data_manager.py

@@ -1,18 +0,0 @@
-from tasks.data_tasks.fwh_data_recycle import FwhGroupPublishRecordManager
-from tasks.data_tasks.fwh_data_recycle import SaveFwhDataToDatabase
-from tasks.data_tasks.fwh_data_recycle import FwhGroupPublishMonitor
-
-
-if __name__ == '__main__':
-    # 1. 从 aigc 获取数据
-    fwh_group_publish_record_manager = FwhGroupPublishRecordManager()
-    fwh_group_publish_record_manager.deal()
-
-    # 2. 监测报警
-    fwh_group_publish_monitor = FwhGroupPublishMonitor()
-    fwh_group_publish_monitor.deal()
-
-    # 3. 保存数据到数据库
-    save_fwh_data_to_database = SaveFwhDataToDatabase()
-    save_fwh_data_to_database.deal()
-

+ 8 - 1
long_articles_job.py

@@ -13,6 +13,7 @@ from tasks.crawler_tasks.crawler_video.crawler_sph_videos import (
 from tasks.crawler_tasks.crawler_video.crawler_gzh_videos import CrawlerGzhMetaVideos
 from tasks.data_tasks.fwh_data_recycle import FwhGroupPublishRecordManager
 from tasks.data_tasks.fwh_data_recycle import SaveFwhDataToDatabase
+from tasks.data_tasks.fwh_data_recycle import FwhGroupPublishMonitor
 from tasks.monitor_tasks.kimi_balance_monitor import check_kimi_balance
 from tasks.publish_tasks.top_article_generalize import (
     TopArticleGeneralizeFromArticlePool,
@@ -40,10 +41,15 @@ def run_sph_video_crawler():
 
 
 def run_fwh_data_manager():
+    # 1. 从 aigc 获取数据
     fwh_group_publish_record_manager = FwhGroupPublishRecordManager()
     fwh_group_publish_record_manager.deal()
 
-    # 2. 保存数据到数据库
+    # 2. 监测报警
+    fwh_group_publish_monitor = FwhGroupPublishMonitor()
+    fwh_group_publish_monitor.deal()
+
+    # 3. 保存数据到数据库
     save_fwh_data_to_database = SaveFwhDataToDatabase()
     save_fwh_data_to_database.deal()
 
@@ -52,6 +58,7 @@ def run_top_article_generalize_from_article_pool():
     task = TopArticleGeneralizeFromArticlePool()
     task.deal()
 
+
 def crawler_gzh_meta_videos():
     task = CrawlerGzhMetaVideos()
     task.deal()

+ 0 - 26
schedule_app.py

@@ -1,26 +0,0 @@
-# from celery import Celery
-from tasks.crawler_tasks.crawler_video.crawler_piaoquan_videos import CrawlerPiaoQuanVideos
-from tasks.crawler_tasks.crawler_video.crawler_sohu_videos import CrawlerSohuHotVideos
-from tasks.crawler_tasks.crawler_video.crawler_sohu_videos import CrawlerSohuRecommendVideos
-
-
-# app = Celery('tasks', broker='redis://localhost:6379/0')
-
-# @app.task
-def run_piaoquan_video_crawler():
-    crawler = CrawlerPiaoQuanVideos()
-    crawler.deal()
-
-def run_sohu_video_crawler():
-    # step1, crawl sohu hot videos
-    crawler_sohu_hot_videos = CrawlerSohuHotVideos()
-    crawler_sohu_hot_videos.deal()
-
-    # step2, crawl sohu recommend videos
-    crawler_sohu_recommend_videos = CrawlerSohuRecommendVideos()
-    crawler_sohu_recommend_videos.deal()
-
-if __name__ == "__main__":
-    run_piaoquan_video_crawler()
-    run_sohu_video_crawler()
-

+ 108 - 130
sh/run_long_articles_job.sh

@@ -1,155 +1,133 @@
-#!/bin/bash
-
+#!/usr/bin/env bash
 # =============================================================
-# 多任务定时调度管理器
-# 功能:在指定时间执行不同任务,每个任务有独立日志
-# 配置:在下方 "任务配置" 区域添加您的任务
-# 使用:设置cron每分钟执行此脚本: * * * * /path/to/this/script.sh
+# 作者:Junhui Luo / 2025-07-14
+# 每分钟由系统 cron 触发一次,内部解析任务表达式决定是否执行
 # =============================================================
 
-# 确保脚本以root权限运行(按需修改)
-#if [ "$(id -u)" != "0" ]; then
-#   echo "错误:此脚本需要以root权限运行" 1>&2
-#   exit 1
-#fi
-
-# *************** 基础配置 ***************
-# 工作目录(脚本所在位置)
-SCRIPT_DIR="/root/luojunhui/LongArticlesJob"
-
-# 日志根目录
-LOG_DIR="${SCRIPT_DIR}/logs"
+###################### 全局配置 ######################
+SCRIPT_DIR="/root/luojunhui/LongArticlesJob"     # 工作目录
+LOG_DIR="${SCRIPT_DIR}/logs"                     # 日志根目录
+PYTHON_SCRIPT="long_articles_job.py"             # 统一入口脚本
 
-# Conda 配置
-CONDA_PATH="/root/miniconda3/etc/profile.d/conda.sh"
+CONDA_SH="/root/miniconda3/etc/profile.d/conda.sh"
 CONDA_ENV="tasks"
 
-# Python 脚本名称
-PYTHON_SCRIPT="long_articles_job.py"
-
-# *************** 任务配置 ***************
-# 格式: "任务名称|执行时间|日志文件路径"
-# 注意:
-#   1. 执行时间格式为 HH:MM (24小时制)
-#   2. 日志路径可使用变量 $(date +'格式')
-#   3. 添加新任务只需复制一行并修改参数
-TASKS=(
-    "run_sph_video_crawler|03:00|${LOG_DIR}/run_sph_video_crawler_$(date +'%Y-%m-%d').log"
-    "run_piaoquan_video_crawler|06:00|${LOG_DIR}/run_piaoquan_video_crawler_$(date +'%Y-%m-%d').log"
-    "run_sohu_video_crawler|06:10|${LOG_DIR}/run_sohu_video_crawler_$(date +'%Y-%m-%d').log"
-    "top_article_generalize|11:20|${LOG_DIR}/top_article_generalize_$(date +'%Y-%m-%d').log"
-    "run_sph_video_crawler|15:00|${LOG_DIR}/run_sph_video_crawler_$(date +'%Y-%m-%d').log"
-)
+LOG_RETENTION_DAYS=7     # 日志保存天数
+LOG_MAX_MB=100           # 单文件最大 MB,超过清空
 
-# *************** 函数定义 ***************
-# 初始化环境
-initialize() {
-    # 创建日志目录
-    mkdir -p "${LOG_DIR}"
 
-    # 设置当前时间变量
-    current_time=$(date '+%Y-%m-%d %H:%M:%S')
-    current_hour_minute=$(date '+%H:%M')
-    current_weekday=$(date +%u)  # 1=周一, 7=周日
+# 失败告警(自行实现:邮件、钉钉、Prometheus Pushgateway…)
+on_failure(){
+  local task=$1
+  local now=$(date '+%F %T')
+  local timestamp=$(($(date +%s%N)/1000000))
 
-    # 主日志文件(记录调度过程)
-    MAIN_LOG="${LOG_DIR}/scheduler_$(date +'%Y-%m-%d').log"
-    touch "${MAIN_LOG}"
+  local url="https://open.feishu.cn/open-apis/bot/v2/hook/223b3d72-f2e8-40e0-9b53-6956e0ae7158"
+  local content="定时任务失败:${task}\n时间:${now}"
 
-    # 重定向所有输出到主日志
-    exec >> "${MAIN_LOG}" 2>&1
+  curl --request POST "${url}" \
+    --header 'Content-Type: application/json' \
+    --data-row "{\"msg_type\":\"interactive\",\"card\":{\"content\":\"${content}\"}}" \
+    >/dev/null 2>&1
 }
 
-# 启动任务函数
-start_task() {
-    local task_name=$1
-    local log_file=$2
-
-    # 创建任务日志文件
-    touch "${log_file}"
-
-    # 检查进程是否已在运行
-    if pgrep -f "python3 ${PYTHON_SCRIPT} --task_name ${task_name}" > /dev/null; then
-        echo "${current_time} - [INFO] 任务 ${task_name} 已在运行中" | tee -a "${log_file}"
-        return 0
-    fi
+###################### 任务定义 ######################
+# 语法: "分 时 日 月 周|任务名|日志模板"
+# 支持 *、*/n、a-b、a,b,c 以及它们组合
+TASKS=(
+  "0 3 * * *|run_sph_video_crawler|${LOG_DIR}/run_sph_video_crawler/%Y-%m-%d.log"
+  "0 6 * * *|run_piaoquan_video_crawler|${LOG_DIR}/run_piaoquan_video_crawler/%Y-%m-%d.log"
+  "10 6 * * *|run_sohu_video_crawler|${LOG_DIR}/run_sohu_video_crawler/%Y-%m-%d.log"
+  "20 11 * * *|top_article_generalize|${LOG_DIR}/top_article_generalize/%Y-%m-%d.log"
+  "0 15 * * *|run_sph_video_crawler|${LOG_DIR}/run_sph_video_crawler/%Y-%m-%d.log"
+  # 示例:每分钟执行
+  # "* * * * *|heartbeat|${LOG_DIR}/heartbeat/%Y-%m-%d.log"
+)
 
-    # 切换到工作目录
-    cd "${SCRIPT_DIR}" || {
-        echo "${current_time} - [ERROR] 无法切换到目录 ${SCRIPT_DIR}" | tee -a "${log_file}"
-        return 1
-    }
-
-    # 激活 Conda 环境
-    if [[ -f "${CONDA_PATH}" ]]; then
-        source "${CONDA_PATH}"
-        conda activate "${CONDA_ENV}" || {
-            echo "${current_time} - [ERROR] 无法激活 Conda 环境 ${CONDA_ENV}" | tee -a "${log_file}"
-            return 1
-        }
-    else
-        echo "${current_time} - [WARNING] Conda 初始化文件未找到: ${CONDA_PATH}" | tee -a "${log_file}"
+###################### 工具函数 ######################
+log(){ printf '%s [%s] %s\n' "$(date '+%F %T')" "$1" "$2"; }
+
+cron_field_match(){            # 参数:字段 当前值
+  local field=$1 now=$2
+  [[ $field == "*" ]] && return 0
+  IFS=',' read -ra parts <<< "$field"
+  for p in "${parts[@]}"; do
+    if [[ $p == "*/"* ]]; then             # 步进 */n
+      local step=${p#*/}
+      (( now % step == 0 )) && return 0
+    elif [[ $p == *-* ]]; then             # 范围 a-b
+      local start=${p%-*} end=${p#*-}
+      (( now >= start && now <= end )) && return 0
+    elif (( now == p )); then              # 单值
+      return 0
     fi
+  done
+  return 1
+}
 
-    # 启动任务脚本
-    echo "${current_time} - [INFO] 启动任务: ${task_name}" | tee -a "${log_file}"
-    nohup python3 "${PYTHON_SCRIPT}" --task_name "${task_name}" >> "${log_file}" 2>&1 &
-
-    # 检查是否启动成功
-    sleep 1
-    if pgrep -f "python3 ${PYTHON_SCRIPT} --task_name ${task_name}" > /dev/null; then
-        local pid=$(pgrep -f "python3 ${PYTHON_SCRIPT} --task_name ${task_name}")
-        echo "${current_time} - [SUCCESS] 任务启动成功: ${task_name} (PID: ${pid})" | tee -a "${log_file}"
-    else
-        echo "${current_time} - [ERROR] 任务启动失败: ${task_name}" | tee -a "${log_file}"
-    fi
+cron_match(){                # 参数:完整表达式
+  read -r m h dom mon dow <<< "$1"
+  local n_m=$(date +%-M) n_h=$(date +%-H) n_dom=$(date +%-d) \
+        n_mon=$(date +%-m) n_dow=$(date +%-u)       # 1(周一)…7(周日)
+  cron_field_match "$m"   "$n_m"   && \
+  cron_field_match "$h"   "$n_h"   && \
+  cron_field_match "$dom" "$n_dom" && \
+  cron_field_match "$mon" "$n_mon" && \
+  cron_field_match "$dow" "$n_dow"
 }
 
-# 特殊日期检查函数
-is_special_day() {
-    local task_name=$1
-    local scheduled_time=$2
+start_task(){                 # 参数:任务名 日志文件
+  local name=$1 logfile=$2
+  mkdir -p "$(dirname "$logfile")"; touch "$logfile"
 
-    # 示例:每周一执行的任务
-    if [[ "${task_name}" == "weekly_report" ]]; then
-        [[ "${current_weekday}" == "1" ]]  # 周一
-        return $?
-    fi
+  # 若已在运行则跳过
+  pgrep -f "python3 $PYTHON_SCRIPT --task_name $name" >/dev/null && {
+    log INFO "任务 $name 已在运行" | tee -a "$logfile"; return; }
 
-    # 示例:每月1号执行的任务
-    if [[ "${task_name}" == "monthly_report" ]]; then
-        [[ "$(date +'%d')" == "01" ]]
-        return $?
-    fi
+  (
+    # 子 shell 中运行,便于 flock 持锁
+    exec >>"$logfile" 2>&1
+    log INFO "启动任务 $name"
+    cd "$SCRIPT_DIR" || { log ERROR "进入 $SCRIPT_DIR 失败"; exit 1; }
+
+    [[ -f $CONDA_SH ]] && { source "$CONDA_SH"; conda activate "$CONDA_ENV"; }
 
-    # 默认每天都执行
-    return 0
+    nohup python3 "$PYTHON_SCRIPT" --task_name "$name" &
+    sleep 1
+    pgrep -f "python3 $PYTHON_SCRIPT --task_name $name" \
+      && log SUCCESS "任务 $name 启动成功" \
+      || { log ERROR "任务 $name 启动失败"; on_failure "$name"; }
+  ) &
 }
 
-# 主调度函数
-schedule_tasks() {
-    echo "====== ${current_time} 开始任务调度 ======"
-    echo "当前时间: ${current_hour_minute}, 星期: ${current_weekday}"
-
-    for task_config in "${TASKS[@]}"; do
-        # 解析任务配置
-        IFS='|' read -r task_name scheduled_time log_file <<< "${task_config}"
-
-        # 检查是否到达执行时间
-        if [[ "${current_hour_minute}" == "${scheduled_time}" ]]; then
-            start_task "${task_name}" "${log_file}"
-        else
-            echo "${current_time} - [SCHEDULE] 未到执行时间: ${task_name} (计划: ${scheduled_time})" | tee -a "${log_file}"
-        fi
-    done
-
-    echo "====== ${current_time} 任务调度完成 ======"
-    echo ""
+cleanup_logs(){
+  # 过期删除
+  find "$LOG_DIR" -type f -name '*.log' -mtime +"$LOG_RETENTION_DAYS" -delete
+  # 超大清空
+  find "$LOG_DIR" -type f -name '*.log' -size +"${LOG_MAX_MB}M" -exec \
+       sh -c 'cat /dev/null > "$1"' sh {} \;
 }
 
-# *************** 主程序 ***************
-initialize
-schedule_tasks
+###################### 主流程 ######################
+(
+  # 全局锁,阻止同脚本并行
+  exec 200>"$0.lock"
+  flock -n 200 || exit 0
+
+  mkdir -p "$LOG_DIR"
+  MAIN_LOG=$(printf '%s/scheduler_%(%Y-%m-%d)T.log' "$LOG_DIR" -1)
+  exec >>"$MAIN_LOG" 2>&1
+  log INFO "====== 调度开始 ======"
+
+  for task in "${TASKS[@]}"; do
+    IFS='|' read -r cron_expr name log_tpl <<< "$task"
+    cron_match "$cron_expr" || continue
+    logfile=$(date +"$log_tpl")          # 渲染 %Y-%m-%d
+    start_task "$name" "$logfile"
+  done
+
+  cleanup_logs
+  log INFO "====== 调度结束 ======"
+)
 
-# 日志清理(保留最近7天日志)
-find "${LOG_DIR}" -name "*.log" -mtime +7 -delete
+exit 0