Просмотр исходного кода

Merge branch '2025-07-02-search-suply' of luojunhui/LongArticlesJob into master

luojunhui 3 месяцев назад
Родитель
Сommit
d440ad4abb

+ 1 - 0
applications/api/__init__.py

@@ -5,6 +5,7 @@ from .aigc_system_api import AigcSystemApi
 from .apollo_api import ApolloApi
 from .deep_seek_api_by_volcanoengine import fetch_deepseek_response
 from .deep_seek_api_official import fetch_deepseek_completion
+from .es_api import ElasticSearchClient
 from .moon_shot_api import fetch_moon_shot_response
 from .nlp_api import similarity_between_title_list
 from .gewe_api import WechatChannelAPI

+ 49 - 0
applications/api/es_api.py

@@ -0,0 +1,49 @@
+import ssl
+
+from elasticsearch import Elasticsearch, ApiError
+from elasticsearch import helpers
+from config.es_mappings import index_name
+
+
+class ElasticSearchClient:
+    def __init__(self, index_=index_name):
+        self.password = "nkvvASQuQ0XUGRq5OLvm"
+        self.hosts = ["https://192.168.205.85:9200", "https://192.168.205.85:9300"]
+        self.ctx = ssl.create_default_context(cafile="config/es_certs.crt")
+        self.es = Elasticsearch(
+            self.hosts, basic_auth=("elastic", self.password), ssl_context=self.ctx
+        )
+        self.index_name = index_
+
+    def create_index(self, settings, mappings):
+        if self.es.indices.exists(index=self.index_name):
+            self.es.indices.delete(index=self.index_name)
+
+        try:
+            self.es.indices.create(
+                index=self.index_name, settings=settings, mappings=mappings
+            )
+            print("Index created successfully")
+        except ApiError as e:
+            print(f"❌ Failed: {e.meta.error['type']} – {e.meta.error['reason']}")
+
+    def get_max_article_id(self):
+        response = self.es.search(
+            index=self.index_name,
+            size=1,
+            sort="article_id:desc",
+            _source=["article_id"],
+        )
+        return response["hits"]["hits"][0]["_source"]["article_id"]
+
+    def search(self, search_keys, size=10):
+        query = {
+            "query": {"match": {"title": search_keys}},
+            "_source": ["article_id", "title"],
+            "size": size
+        }
+        resp = self.es.search(index=index_name, body=query)
+        return [i["_source"] for i in resp["hits"]["hits"]]
+
+    def bulk_insert(self, docs):
+        helpers.bulk(self.es, docs)

+ 31 - 0
applications/api/es_certs.crt

@@ -0,0 +1,31 @@
+-----BEGIN CERTIFICATE-----
+MIIFaTCCA1GgAwIBAgIUWHH9T8PVfiSyvT6S6NrAQ9iSLeEwDQYJKoZIhvcNAQEL
+BQAwPDE6MDgGA1UEAxMxRWxhc3RpY3NlYXJjaCBzZWN1cml0eSBhdXRvLWNvbmZp
+Z3VyYXRpb24gSFRUUCBDQTAeFw0yNTA3MDcwNzIwNTRaFw0yODA3MDYwNzIwNTRa
+MDwxOjA4BgNVBAMTMUVsYXN0aWNzZWFyY2ggc2VjdXJpdHkgYXV0by1jb25maWd1
+cmF0aW9uIEhUVFAgQ0EwggIiMA0GCSqGSIb3DQEBAQUAA4ICDwAwggIKAoICAQCb
+Y8E68+7S+hGKQX6vhyOxuCe3QyBHYlsxiSqGhi+WFx953u4SEMqrbqiyg2QquB9/
+ynjKo3Tvhn0OPjuJRytteKn9OZkVhUT1D5P6PFo0j8x1LIJZm551XRCnQUZ8jC0C
+REHy/JoKdT4YSCRIuXVTM5iM66vQ1t5Du4sb70mTygtc2DyXwgE4LkVnrHcwr2BZ
+3/O69WvF7Zd7WP93yEfUsLsAAQStaCYMeYyaY5K8UwIVcFyWKJ9lnDGbR9KmuXb9
+ipWqGw6aAYhmSs5gL+6xJ5dBpgMOqoBTvZpNniLA/phkelq9W2nAhBLFpRGRof8K
+5iKwjAN8gnBXeSVklBoL23QD5zfoVjz+5eaXWO4qP+90jbwf+vEg/duncDRONGtk
+TQd0Vr9NeO3Aye8PZsmmhKAaciaPWYyQO30omUq9kPsSUzZPu4k+CYb8qwVQCHpn
+Za19NkvERQ8hCQks08/ly5qDM+5lBxJQFQjhjtzSDQ/ybbarMmgaBxpCexiksRmP
+CQqVLW6IaLxUGEkIJqXRx8nmKUfK43vTBitOBFt5UcKob6+ikZLrqZ6xLY/jklE8
+Z1wt9I8ZdQ3L3X9EORgmQ+4KIu/JQfBdfAYtLaS6MYWhiZSaKaIhgfXiZQTO9YuW
+KrI5g+d2Yu2BYgIioLKo9LFWK1eTG2gNAGUI/+rqswIDAQABo2MwYTAdBgNVHQ4E
+FgQUab2kAtPlJHLirQvbThvIwJ7hbLwwHwYDVR0jBBgwFoAUab2kAtPlJHLirQvb
+ThvIwJ7hbLwwDwYDVR0TAQH/BAUwAwEB/zAOBgNVHQ8BAf8EBAMCAQYwDQYJKoZI
+hvcNAQELBQADggIBAF+wJ598Krfai5Br6Vq0Z1jj0JsU8Kij4t9D+89QPgI85/Mv
+zwj8xRgxx9RinKYdnzFJWrD9BITG2l3D0zcJhXfYUpq5HLP+c3zMwEMGzTLbgi70
+cpYqkTJ+g/Ah5WRYZRHJIMF6BVK6izCOO0J49eYC6AONNxG2HeeUvEL4cNnxpw8T
+NUe7v0FXe2iPLeE713h99ray0lBgI6J9QZqc/oEM47gHy+ByfWCv6Yw9qLlprppP
+taHz2VWnCAACDLzbDnYhemQDji86yrUTEdCT8at1jAwHSixgkm88nEBgxPHDuq8t
+thmiS6dELvXVUbyeWO7A/7zVde0Kndxe003OuYcX9I2IX7aIpC8sW/yY+alRhklq
+t9vF6g1qvsN69xXfW5yI5G31TYMUw/3ng0aVJfRFaXkEV2SWEZD+4sWoYC/GU7kK
+zlfaF22jTeul5qCKkN1k+i8K2lheEE3ZBC358W0RyvsrDwtXOra3VCpZ7qrez8OA
+/HeY6iISZQ7g0s209KjqOPqVGcI8B0p6KMh00AeWisU6E/wy1LNTxkf2IS9b88n6
+a3rj0TCycwhKOPTPB5pwlfbZNI00tGTFjqqi07SLqO9ZypsVkyR32G16JPJzk8Zw
+kngBZt6y9LtCMRVbyDuIDNq+fjtDjgxMI9bQXtve4bOuq8cZzcMjC6khz/Ja
+-----END CERTIFICATE-----

+ 31 - 0
config/es_certs.crt

@@ -0,0 +1,31 @@
+-----BEGIN CERTIFICATE-----
+MIIFaTCCA1GgAwIBAgIUWHH9T8PVfiSyvT6S6NrAQ9iSLeEwDQYJKoZIhvcNAQEL
+BQAwPDE6MDgGA1UEAxMxRWxhc3RpY3NlYXJjaCBzZWN1cml0eSBhdXRvLWNvbmZp
+Z3VyYXRpb24gSFRUUCBDQTAeFw0yNTA3MDcwNzIwNTRaFw0yODA3MDYwNzIwNTRa
+MDwxOjA4BgNVBAMTMUVsYXN0aWNzZWFyY2ggc2VjdXJpdHkgYXV0by1jb25maWd1
+cmF0aW9uIEhUVFAgQ0EwggIiMA0GCSqGSIb3DQEBAQUAA4ICDwAwggIKAoICAQCb
+Y8E68+7S+hGKQX6vhyOxuCe3QyBHYlsxiSqGhi+WFx953u4SEMqrbqiyg2QquB9/
+ynjKo3Tvhn0OPjuJRytteKn9OZkVhUT1D5P6PFo0j8x1LIJZm551XRCnQUZ8jC0C
+REHy/JoKdT4YSCRIuXVTM5iM66vQ1t5Du4sb70mTygtc2DyXwgE4LkVnrHcwr2BZ
+3/O69WvF7Zd7WP93yEfUsLsAAQStaCYMeYyaY5K8UwIVcFyWKJ9lnDGbR9KmuXb9
+ipWqGw6aAYhmSs5gL+6xJ5dBpgMOqoBTvZpNniLA/phkelq9W2nAhBLFpRGRof8K
+5iKwjAN8gnBXeSVklBoL23QD5zfoVjz+5eaXWO4qP+90jbwf+vEg/duncDRONGtk
+TQd0Vr9NeO3Aye8PZsmmhKAaciaPWYyQO30omUq9kPsSUzZPu4k+CYb8qwVQCHpn
+Za19NkvERQ8hCQks08/ly5qDM+5lBxJQFQjhjtzSDQ/ybbarMmgaBxpCexiksRmP
+CQqVLW6IaLxUGEkIJqXRx8nmKUfK43vTBitOBFt5UcKob6+ikZLrqZ6xLY/jklE8
+Z1wt9I8ZdQ3L3X9EORgmQ+4KIu/JQfBdfAYtLaS6MYWhiZSaKaIhgfXiZQTO9YuW
+KrI5g+d2Yu2BYgIioLKo9LFWK1eTG2gNAGUI/+rqswIDAQABo2MwYTAdBgNVHQ4E
+FgQUab2kAtPlJHLirQvbThvIwJ7hbLwwHwYDVR0jBBgwFoAUab2kAtPlJHLirQvb
+ThvIwJ7hbLwwDwYDVR0TAQH/BAUwAwEB/zAOBgNVHQ8BAf8EBAMCAQYwDQYJKoZI
+hvcNAQELBQADggIBAF+wJ598Krfai5Br6Vq0Z1jj0JsU8Kij4t9D+89QPgI85/Mv
+zwj8xRgxx9RinKYdnzFJWrD9BITG2l3D0zcJhXfYUpq5HLP+c3zMwEMGzTLbgi70
+cpYqkTJ+g/Ah5WRYZRHJIMF6BVK6izCOO0J49eYC6AONNxG2HeeUvEL4cNnxpw8T
+NUe7v0FXe2iPLeE713h99ray0lBgI6J9QZqc/oEM47gHy+ByfWCv6Yw9qLlprppP
+taHz2VWnCAACDLzbDnYhemQDji86yrUTEdCT8at1jAwHSixgkm88nEBgxPHDuq8t
+thmiS6dELvXVUbyeWO7A/7zVde0Kndxe003OuYcX9I2IX7aIpC8sW/yY+alRhklq
+t9vF6g1qvsN69xXfW5yI5G31TYMUw/3ng0aVJfRFaXkEV2SWEZD+4sWoYC/GU7kK
+zlfaF22jTeul5qCKkN1k+i8K2lheEE3ZBC358W0RyvsrDwtXOra3VCpZ7qrez8OA
+/HeY6iISZQ7g0s209KjqOPqVGcI8B0p6KMh00AeWisU6E/wy1LNTxkf2IS9b88n6
+a3rj0TCycwhKOPTPB5pwlfbZNI00tGTFjqqi07SLqO9ZypsVkyR32G16JPJzk8Zw
+kngBZt6y9LtCMRVbyDuIDNq+fjtDjgxMI9bQXtve4bOuq8cZzcMjC6khz/Ja
+-----END CERTIFICATE-----

+ 36 - 0
config/es_mappings.py

@@ -0,0 +1,36 @@
+index_name = "meta_articles_v1"
+
+settings = {
+    "number_of_shards": 3,
+    "number_of_replicas": 1,
+    "analysis": {
+        "analyzer": {
+            "ik_smart": {"type": "ik_smart"},
+            "ik_max_word": {"type": "ik_max_word"},
+        }
+    }
+}
+
+mappings = {
+    "properties": {
+        "auto_id": {
+            "type": "long",
+            "doc_values": True,
+        },
+        "article_id": {"type": "long"},
+        "platform": {"type": "keyword"},
+        "out_account_id": {"type": "keyword"},
+        "title": {
+            "type": "text",
+            "analyzer": "ik_max_word",
+            "search_analyzer": "ik_smart",
+            "fields": {
+                "keyword": {"type": "keyword", "ignore_above": 256}
+            }
+        },
+        "created_at": {
+            "type": "date",
+            "format": "yyyy-MM-dd HH:mm:ss||epoch_millis"
+        }
+    }
+}

+ 89 - 0
long_articles_job.py

@@ -0,0 +1,89 @@
+from argparse import ArgumentParser
+
+from tasks.crawler_tasks.crawler_video.crawler_piaoquan_videos import (
+    CrawlerPiaoQuanVideos,
+)
+from tasks.crawler_tasks.crawler_video.crawler_sohu_videos import CrawlerSohuHotVideos
+from tasks.crawler_tasks.crawler_video.crawler_sohu_videos import (
+    CrawlerSohuRecommendVideos,
+)
+from tasks.crawler_tasks.crawler_video.crawler_sph_videos import (
+    CrawlerChannelAccountVideos,
+)
+from tasks.data_tasks.fwh_data_recycle import FwhGroupPublishRecordManager
+from tasks.data_tasks.fwh_data_recycle import SaveFwhDataToDatabase
+from tasks.monitor_tasks.kimi_balance_monitor import check_kimi_balance
+from tasks.publish_tasks.top_article_generalize import (
+    TopArticleGeneralizeFromArticlePool,
+)
+
+
+def run_piaoquan_video_crawler():
+    crawler = CrawlerPiaoQuanVideos()
+    crawler.deal()
+
+
+def run_sohu_video_crawler():
+    # step1, crawl sohu hot videos
+    crawler_sohu_hot_videos = CrawlerSohuHotVideos()
+    crawler_sohu_hot_videos.deal()
+
+    # step2, crawl sohu recommend videos
+    crawler_sohu_recommend_videos = CrawlerSohuRecommendVideos()
+    crawler_sohu_recommend_videos.deal()
+
+
+def run_sph_video_crawler():
+    crawler_channel_account_videos = CrawlerChannelAccountVideos()
+    crawler_channel_account_videos.deal()
+
+
+def run_fwh_data_manager():
+    fwh_group_publish_record_manager = FwhGroupPublishRecordManager()
+    fwh_group_publish_record_manager.deal()
+    fwh_group_publish_record_manager.monitor()
+
+    # 2. 保存数据到数据库
+    save_fwh_data_to_database = SaveFwhDataToDatabase()
+    save_fwh_data_to_database.deal()
+
+
+def run_top_article_generalize_from_article_pool():
+    task = TopArticleGeneralizeFromArticlePool()
+    task.deal()
+
+
+def main():
+    """
+    run long_articles_job
+    """
+    parser = ArgumentParser()
+    parser.add_argument("--task_name", help="which task you want to run")
+    parser.add_argument("--run_date", help="task specify run date")
+
+    args = parser.parse_args()
+
+    task_name = args.task_name
+    if task_name is None:
+        print("task_name cannot be None")
+        return
+    else:
+        match task_name:
+            case "run_piaoquan_video_crawler":
+                run_piaoquan_video_crawler()
+            case "run_sohu_video_crawler":
+                run_sohu_video_crawler()
+            case "run_check_kimi_balance":
+                check_kimi_balance()
+            case "run_fwh_data_manager":
+                run_fwh_data_manager()
+            case "run_sph_video_crawler":
+                run_sph_video_crawler()
+            case "top_article_generalize":
+                run_top_article_generalize_from_article_pool()
+            case _:
+                print("task_name cannot be None")
+
+
+if __name__ == "__main__":
+    main()

+ 1 - 1
run_update_articles_from_aigc_system.py

@@ -1,7 +1,7 @@
 """
 @author: luojunhui
 """
-from tasks.update_article_info_from_aigc import UpdateArticleInfoFromAIGC
+from tasks.data_tasks.update_article_info_from_aigc import UpdateArticleInfoFromAIGC
 
 
 if __name__ == "__main__":

+ 164 - 0
sh/run_long_articles_job.sh

@@ -0,0 +1,164 @@
+#!/bin/bash
+
+# =============================================================
+# 多任务定时调度管理器
+# 功能:在指定时间执行不同任务,每个任务有独立日志
+# 配置:在下方 "任务配置" 区域添加您的任务
+# 使用:设置cron每分钟执行此脚本: * * * * /path/to/this/script.sh
+# =============================================================
+
+# 确保脚本以root权限运行(按需修改)
+#if [ "$(id -u)" != "0" ]; then
+#   echo "错误:此脚本需要以root权限运行" 1>&2
+#   exit 1
+#fi
+
+# *************** 基础配置 ***************
+# 工作目录(脚本所在位置)
+SCRIPT_DIR="/root/luojunhui/LongArticlesJob"
+
+# 日志根目录
+LOG_DIR="${SCRIPT_DIR}/logs"
+
+# Conda 配置
+CONDA_PATH="/root/miniconda3/etc/profile.d/conda.sh"
+CONDA_ENV="tasks"
+
+# Python 脚本名称
+PYTHON_SCRIPT="long_articles_job.py"
+
+# *************** 任务配置 ***************
+# 格式: "任务名称|执行时间|日志文件路径"
+# 注意:
+#   1. 执行时间格式为 HH:MM (24小时制)
+#   2. 日志路径可使用变量 $(date +'格式')
+#   3. 添加新任务只需复制一行并修改参数
+TASKS=(
+    # 示例任务:每天09:00执行
+    "top_article_generalize|11:20|${LOG_DIR}/top_article_generalize_$(date +'%Y-%m-%d').log"
+
+#    # 示例任务:每天12:30执行
+#    "task2|12:30|${LOG_DIR}/task2_$(date +'%Y-%m-%d').log"
+#
+#    # 示例任务:每天18:00执行
+#    "task3|18:00|${LOG_DIR}/task3_$(date +'%Y-%m-%d').log"
+#
+#    # 示例任务:每周一08:00执行
+#    "weekly_report|08:00|${LOG_DIR}/weekly_$(date +'%Y-%m-%d').log"
+
+    # 在此添加您的自定义任务...
+    # "your_task_name|HH:MM|${LOG_DIR}/custom_$(date +'%Y-%m-%d').log"
+)
+
+# *************** 函数定义 ***************
+# 初始化环境
+initialize() {
+    # 创建日志目录
+    mkdir -p "${LOG_DIR}"
+
+    # 设置当前时间变量
+    current_time=$(date '+%Y-%m-%d %H:%M:%S')
+    current_hour_minute=$(date '+%H:%M')
+    current_weekday=$(date +%u)  # 1=周一, 7=周日
+
+    # 主日志文件(记录调度过程)
+    MAIN_LOG="${LOG_DIR}/scheduler_$(date +'%Y-%m-%d').log"
+    touch "${MAIN_LOG}"
+
+    # 重定向所有输出到主日志
+    exec >> "${MAIN_LOG}" 2>&1
+}
+
+# 启动任务函数
+start_task() {
+    local task_name=$1
+    local log_file=$2
+
+    # 创建任务日志文件
+    touch "${log_file}"
+
+    # 检查进程是否已在运行
+    if pgrep -f "python3 ${PYTHON_SCRIPT} --task_name ${task_name}" > /dev/null; then
+        echo "${current_time} - [INFO] 任务 ${task_name} 已在运行中" | tee -a "${log_file}"
+        return 0
+    fi
+
+    # 切换到工作目录
+    cd "${SCRIPT_DIR}" || {
+        echo "${current_time} - [ERROR] 无法切换到目录 ${SCRIPT_DIR}" | tee -a "${log_file}"
+        return 1
+    }
+
+    # 激活 Conda 环境
+    if [[ -f "${CONDA_PATH}" ]]; then
+        source "${CONDA_PATH}"
+        conda activate "${CONDA_ENV}" || {
+            echo "${current_time} - [ERROR] 无法激活 Conda 环境 ${CONDA_ENV}" | tee -a "${log_file}"
+            return 1
+        }
+    else
+        echo "${current_time} - [WARNING] Conda 初始化文件未找到: ${CONDA_PATH}" | tee -a "${log_file}"
+    fi
+
+    # 启动任务脚本
+    echo "${current_time} - [INFO] 启动任务: ${task_name}" | tee -a "${log_file}"
+    nohup python3 "${PYTHON_SCRIPT}" --task_name "${task_name}" >> "${log_file}" 2>&1 &
+
+    # 检查是否启动成功
+    sleep 1
+    if pgrep -f "python3 ${PYTHON_SCRIPT} --task_name ${task_name}" > /dev/null; then
+        local pid=$(pgrep -f "python3 ${PYTHON_SCRIPT} --task_name ${task_name}")
+        echo "${current_time} - [SUCCESS] 任务启动成功: ${task_name} (PID: ${pid})" | tee -a "${log_file}"
+    else
+        echo "${current_time} - [ERROR] 任务启动失败: ${task_name}" | tee -a "${log_file}"
+    fi
+}
+
+# 特殊日期检查函数
+is_special_day() {
+    local task_name=$1
+    local scheduled_time=$2
+
+    # 示例:每周一执行的任务
+    if [[ "${task_name}" == "weekly_report" ]]; then
+        [[ "${current_weekday}" == "1" ]]  # 周一
+        return $?
+    fi
+
+    # 示例:每月1号执行的任务
+    if [[ "${task_name}" == "monthly_report" ]]; then
+        [[ "$(date +'%d')" == "01" ]]
+        return $?
+    fi
+
+    # 默认每天都执行
+    return 0
+}
+
+# 主调度函数
+schedule_tasks() {
+    echo "====== ${current_time} 开始任务调度 ======"
+    echo "当前时间: ${current_hour_minute}, 星期: ${current_weekday}"
+
+    for task_config in "${TASKS[@]}"; do
+        # 解析任务配置
+        IFS='|' read -r task_name scheduled_time log_file <<< "${task_config}"
+
+        # 检查是否到达执行时间
+        if [[ "${current_hour_minute}" == "${scheduled_time}" ]]; then
+            start_task "${task_name}" "${log_file}"
+        else
+            echo "${current_time} - [SCHEDULE] 未到执行时间: ${task_name} (计划: ${scheduled_time})" | tee -a "${log_file}"
+        fi
+    done
+
+    echo "====== ${current_time} 任务调度完成 ======"
+    echo ""
+}
+
+# *************** 主程序 ***************
+initialize
+schedule_tasks
+
+# 日志清理(保留最近7天日志)
+find "${LOG_DIR}" -name "*.log" -mtime +7 -delete

+ 0 - 0
tasks/update_article_info_from_aigc.py → tasks/data_tasks/update_article_info_from_aigc.py


+ 52 - 0
tasks/monitor_tasks/kimi_balance_monitor.py

@@ -0,0 +1,52 @@
+"""
+@author: luojunhui
+"""
+import requests
+import traceback
+
+from applications import bot
+from applications.decoratorApi import retryOnTimeout
+
+BALANCE_LIMIT_THRESHOLD = 200.0
+
+
+@retryOnTimeout(retries=5, delay=5)
+def check_kimi_balance():
+    """
+    校验kimi余额
+    :return:
+    """
+    url = "https://api.moonshot.cn/v1/users/me/balance"
+
+    payload = {}
+    headers = {
+        'Authorization': 'Bearer sk-5DqYCa88kche6nwIWjLE1p4oMm8nXrR9kQMKbBolNAWERu7q'
+    }
+    response = requests.request("GET", url, headers=headers, data=payload, timeout=10)
+    if response.status_code == 200:
+        response_json = response.json()
+        try:
+            balance = response_json['data']['available_balance']
+            if balance < BALANCE_LIMIT_THRESHOLD:
+                bot(
+                    title="kimi余额小于 {} 块".format(BALANCE_LIMIT_THRESHOLD),
+                    detail={
+                        "balance": balance
+                    }
+                )
+        except Exception as e:
+            error_stack = traceback.format_exc()
+            bot(
+                title="kimi余额接口处理失败,数据结构异常",
+                detail={
+                    "error": str(e),
+                    "error_msg": error_stack
+                }
+            )
+    else:
+        bot(
+            title="kimi余额接口调用失败",
+            detail={
+                "response": response.text
+            }
+        )

+ 272 - 0
tasks/publish_tasks/top_article_generalize.py

@@ -0,0 +1,272 @@
+import json
+import datetime
+import traceback
+
+from tqdm import tqdm
+from typing import List, Dict
+
+from pymysql.cursors import DictCursor
+
+from applications import aiditApi
+from applications.api import ElasticSearchClient
+from applications.api import fetch_deepseek_completion
+from applications.api import similarity_between_title_list
+from applications.db import DatabaseConnector
+from config import long_articles_config, denet_config
+from config.es_mappings import index_name
+
+extract_keywords_prompt = """
+你是一名优秀的中文专家
+## 任务说明
+需要你从输入的标题和总结中提取3个搜索词
+### 输出
+输出结构为JSON,格式如下
+{output_format}
+## 输入
+标题:{title}
+总结:{summary}
+"""
+
+
+class TopArticleGeneralize:
+
+    def __init__(self):
+        self.long_articles_client = DatabaseConnector(long_articles_config)
+        self.long_articles_client.connect()
+
+        self.denet_client = DatabaseConnector(denet_config)
+        self.denet_client.connect()
+
+        self.elastic_search = ElasticSearchClient(index_=index_name)
+
+    def fetch_distinct_top_titles(self) -> List[Dict]:
+        """
+        获取top100生成计划中的文章标题
+        """
+        fetch_query = f"""
+            select distinct title, source_id
+            from datastat_sort_strategy
+            where produce_plan_name = 'TOP100' and source_id is not null;
+        """
+        return self.long_articles_client.fetch(fetch_query, cursor_type=DictCursor)
+
+    def get_title_read_info_detail(self, title: str) -> bool:
+        """
+        获取标题最近3篇文章的阅读均值倍数
+        """
+        fetch_query = f"""
+            select read_rate
+            from datastat_sort_strategy
+            where produce_plan_name = 'TOP100' and title = '{title}'
+            order by date_str desc limit 3;
+        """
+        fetch_response = self.long_articles_client.fetch(
+            fetch_query, cursor_type=DictCursor
+        )
+        read_rate_list = [i["read_rate"] for i in fetch_response]
+        for read_rate in read_rate_list:
+            if read_rate < 1.2:
+                return False
+        return True
+
+    def get_article_summary(self, source_id: str) -> str:
+        """
+        use source_id to get article summary
+        """
+        fetch_query = f"""
+            select output 
+            from produce_plan_module_output 
+            where plan_exe_id = '{source_id}' and produce_module_type = 18;
+        """
+        fetch_response = self.denet_client.fetch(fetch_query, cursor_type=DictCursor)
+        return fetch_response[0]["output"]
+
+    def get_keys_by_ai(self, title_obj: Dict) -> List[str]:
+        """
+        获取关键词
+        """
+        title = title_obj["title"]
+        source_id = title_obj["source_id"]
+        article_summary = self.get_article_summary(source_id)
+        output_format = {"keys": ["key1", "key2", "key3"]}
+        prompt = extract_keywords_prompt.format(
+            output_format=output_format, title=title, summary=article_summary
+        )
+        response = fetch_deepseek_completion(
+            model="deepseek-V3", prompt=prompt, output_type="json"
+        )
+        return response["keys"]
+
+    def migrate_article_to_es(self, max_article_id: int = 0):
+        fetch_query = f"""
+            select article_id, platform, out_account_id, title
+            from crawler_meta_article
+            where status = 1 and article_id > %s
+            order by article_id limit 10000;
+        """
+        # 执行查询
+        results = self.long_articles_client.fetch(
+            fetch_query, cursor_type=DictCursor, params=(max_article_id,)
+        )
+        docs = [
+            {
+                "_index": index_name,
+                "_id": item["article_id"],
+                "_source": {
+                    "article_id": item["article_id"],
+                    "platform": item["platform"],
+                    "out_account_id": item["out_account_id"],
+                    "title": item["title"],
+                },
+            }
+            for item in results
+        ]
+        self.elastic_search.bulk_insert(docs)
+
+
+class TopArticleGeneralizeFromArticlePool(TopArticleGeneralize):
+
+    def get_candidate_articles(self, article_id_tuple):
+        fetch_query = f"""
+            select article_id, title, link, llm_sensitivity, score, category_by_ai
+            from crawler_meta_article
+            where status = %s
+              and title_sensitivity = %s
+              and article_id in %s;
+        """
+        fetch_response = self.long_articles_client.fetch(
+            fetch_query, cursor_type=DictCursor, params=(1, 0, article_id_tuple)
+        )
+        return fetch_response
+
+    def change_article_status_while_publishing(self, article_id_list):
+        """
+
+        :param: article_id_list: 文章的唯一 id
+        :return:
+        """
+        update_sql = f"""
+            update crawler_meta_article
+            set status = %s
+            where article_id in %s and status = %s;
+        """
+        affect_rows = self.long_articles_client.save(
+            query=update_sql, params=(2, tuple(article_id_list), 1)
+        )
+
+    def deal(self):
+        # migrate articles
+        max_id = self.elastic_search.get_max_article_id()
+        self.migrate_article_to_es(max_id)
+
+        # fetch titles
+        title_obj_list = self.fetch_distinct_top_titles()
+        publishing_article_list = []
+        for title_obj in tqdm(title_obj_list):
+            if self.get_title_read_info_detail(title_obj["title"]):
+                try:
+                    keys = self.get_keys_by_ai(title_obj)
+                    related_articles = self.elastic_search.search(
+                        search_keys=",".join(keys), size=50
+                    )
+                    if related_articles:
+                        article_id_list = [i["article_id"] for i in related_articles]
+                        article_list = self.get_candidate_articles(
+                            tuple(article_id_list)
+                        )
+
+                        title_list = [i["title"] for i in article_list]
+                        # 相关性排序
+                        similarity_array = similarity_between_title_list(
+                            title_list, [title_obj["title"]]
+                        )
+
+                        response_with_similarity_list = []
+                        for index, item in enumerate(article_list):
+                            item["similarity"] = similarity_array[index][0]
+                            response_with_similarity_list.append(item)
+
+                        sorted_response_with_similarity_list = sorted(
+                            response_with_similarity_list,
+                            key=lambda k: k["similarity"],
+                            reverse=True,
+                        )
+                        # 过滤相关性分大于0.8的文章
+                        sorted_response_with_similarity_list = [
+                            i
+                            for i in sorted_response_with_similarity_list
+                            if i["similarity"] <= 0.8
+                        ]
+
+                        publishing_article_list += sorted_response_with_similarity_list[
+                            :10
+                        ]
+                except Exception as e:
+                    print(e)
+                    print(traceback.format_exc())
+
+        url_list = [i["link"] for i in publishing_article_list]
+        if url_list:
+            # create_crawler_plan
+            crawler_plan_response = aiditApi.auto_create_crawler_task(
+                plan_id=None,
+                plan_name="自动绑定-Top内容泛化-{}--{}".format(
+                    datetime.date.today().__str__(), len(url_list)
+                ),
+                plan_tag="Top内容泛化",
+                article_source="weixin",
+                url_list=url_list,
+            )
+
+            # save to db
+            crawler_plan_id = crawler_plan_response["data"]["id"]
+            crawler_plan_name = crawler_plan_response["data"]["name"]
+
+            # auto bind to generate plan
+            new_crawler_task_list = [
+                {
+                    "contentType": 1,
+                    "inputSourceType": 2,
+                    "inputSourceSubType": None,
+                    "fieldName": None,
+                    "inputSourceValue": crawler_plan_id,
+                    "inputSourceLabel": crawler_plan_name,
+                    "inputSourceModal": 3,
+                    "inputSourceChannel": 5,
+                }
+            ]
+            # 绑定至生成计划
+            generate_plan_response = aiditApi.bind_crawler_task_to_generate_task(
+                crawler_task_list=new_crawler_task_list,
+                generate_task_id="20250703081329508785665",
+            )
+            # change article status
+            article_id_list = [i["article_id"] for i in publishing_article_list]
+            self.change_article_status_while_publishing(article_id_list=article_id_list)
+
+
+class TopArticleGeneralizeFromVideoPool(TopArticleGeneralize):
+
+    def get_candidate_videos(self, key):
+        fetch_query = f"""
+            select article_title, content_trace_id, audit_video_id
+            from publish_single_video_source
+            where status = 0 and bad_status = 0 and article_title like '%{key}%'
+        """
+        fetch_response = self.long_articles_client.fetch(
+            fetch_query, cursor_type=DictCursor
+        )
+        return fetch_response
+
+    def deal(self):
+        title_obj_list = self.fetch_distinct_top_titles()
+        publishing_article_list = []
+        for title_obj in tqdm(title_obj_list):
+            if self.get_title_read_info_detail(title_obj["title"]):
+
+                temp = []
+                keys = self.get_keys_by_ai(title_obj)
+                for key in keys:
+                    candidate_articles = self.get_candidate_videos(key)
+                    temp += candidate_articles
+                print(json.dumps(temp, ensure_ascii=False, indent=4))

+ 0 - 742
tasks/update_published_articles_read_detail.py

@@ -1,742 +0,0 @@
-"""
-@author: luojunhui
-@desc: 更新文章的阅读详情
-"""
-import json
-import time
-import traceback
-import urllib.parse
-from datetime import datetime
-from typing import Dict, List
-
-from pymysql.cursors import DictCursor
-from tqdm import tqdm
-
-from applications import aiditApi
-from applications import bot
-from applications import create_feishu_columns_sheet
-from applications import Functions
-from applications import log
-from applications import WeixinSpider
-from applications.const import updatePublishedMsgTaskConst
-from applications.db import DatabaseConnector
-from config import denet_config, long_articles_config, piaoquan_crawler_config
-
-ARTICLE_TABLE = "official_articles"
-const = updatePublishedMsgTaskConst()
-spider = WeixinSpider()
-functions = Functions()
-empty_dict = {}
-
-
-def generate_bot_columns():
-    """
-    生成列
-    :return:
-    """
-    columns = [
-        create_feishu_columns_sheet(sheet_type="plain_text", sheet_name="name", display_name="公众号名称"),
-        create_feishu_columns_sheet(sheet_type="plain_text", sheet_name="ghId", display_name="ghId"),
-        create_feishu_columns_sheet(sheet_type="number", sheet_name="follower_count", display_name="粉丝数"),
-        create_feishu_columns_sheet(sheet_type="date", sheet_name="account_init_timestamp",
-                                    display_name="账号接入系统时间"),
-        create_feishu_columns_sheet(sheet_type="plain_text", sheet_name="using_status", display_name="利用状态")
-    ]
-    return columns
-
-
-class UpdatePublishedArticlesReadDetail(object):
-    """
-    更新每日发布文章的阅读详情
-    """
-
-    def __init__(self):
-        self.aigc_db_client = None
-        self.piaoquan_crawler_db_client = None
-        self.long_articles_db_client = None
-
-    def get_account_list(self) -> List[Dict]:
-        """
-        从 aigc 数据库中获取目前处于发布状态的账号
-        :return:
-        "name": line[0],
-        "ghId": line[1],
-        "follower_count": line[2],
-        "account_init_time": int(line[3] / 1000),
-        "account_type": line[4], # 订阅号 or 服务号
-        "account_auth": line[5]
-        """
-
-        def get_account_status() -> Dict:
-            """
-            获取账号的实验状态
-            :return:
-            """
-            sql = f"""  
-                SELECT t1.account_id, t2.status
-                FROM wx_statistics_group_source_account t1
-                JOIN wx_statistics_group_source t2
-                ON t1.group_source_name = t2.account_source_name;
-            """
-            account_status_list = self.aigc_db_client.fetch(sql, cursor_type=DictCursor)
-            account_status = {account['account_id']: account['status'] for account in account_status_list}
-            return account_status
-
-        account_list_with_out_using_status = aiditApi.get_publish_account_from_aigc()
-        account_status_dict = get_account_status()
-        account_list = [
-            {
-                **item,
-                'using_status': 0 if account_status_dict.get(item['account_id']) == '实验' else 1
-            }
-            for item in account_list_with_out_using_status
-        ]
-        return account_list
-
-    def get_article_info_by_trace_id(self, trace_id: str) -> Dict:
-        """
-        通过trace_id来查询文章信息
-        """
-        select_sql = f"""
-            SELECT t1.gh_id, t1.account_name, t2.article_title
-            FROM long_articles_match_videos t1
-            JOIN long_articles_text t2
-            ON t1.content_id = t2.content_id
-            WHERE t1.trace_id = '{trace_id}';
-        """
-        article_info = self.long_articles_db_client.fetch(select_sql, cursor_type=DictCursor)
-        if article_info:
-            return article_info[0]
-        else:
-            return empty_dict
-
-    def init_database(self):
-        """
-        初始化数据库连接
-        """
-        # 初始化数据库连接
-        try:
-            self.piaoquan_crawler_db_client = DatabaseConnector(piaoquan_crawler_config)
-            self.piaoquan_crawler_db_client.connect()
-            self.aigc_db_client = DatabaseConnector(denet_config)
-            self.aigc_db_client.connect()
-            self.long_articles_db_client = DatabaseConnector(long_articles_config)
-            self.long_articles_db_client.connect()
-        except Exception as e:
-            error_msg = traceback.format_exc()
-            bot(
-                title="更新文章任务连接数据库失败",
-                detail={
-                    "error": e,
-                    "msg": error_msg
-                }
-            )
-            return
-
-    def insert_each_msg(self, account_info: Dict, msg_list: List[Dict]) -> None:
-        """
-        把消息数据更新到数据库中
-        :param account_info:
-        :param msg_list:
-        :return:
-        """
-        gh_id = account_info['ghId']
-        account_name = account_info['name']
-        for info in msg_list:
-            baseInfo = info.get("BaseInfo", {})
-            appMsgId = info.get("AppMsg", {}).get("BaseInfo", {}).get("AppMsgId", None)
-            createTime = info.get("AppMsg", {}).get("BaseInfo", {}).get("CreateTime", None)
-            updateTime = info.get("AppMsg", {}).get("BaseInfo", {}).get("UpdateTime", None)
-            Type = info.get("AppMsg", {}).get("BaseInfo", {}).get("Type", None)
-            detail_article_list = info.get("AppMsg", {}).get("DetailInfo", [])
-            if detail_article_list:
-                for article in detail_article_list:
-                    title = article.get("Title", None)
-                    Digest = article.get("Digest", None)
-                    ItemIndex = article.get("ItemIndex", None)
-                    ContentUrl = article.get("ContentUrl", None)
-                    SourceUrl = article.get("SourceUrl", None)
-                    CoverImgUrl = article.get("CoverImgUrl", None)
-                    CoverImgUrl_1_1 = article.get("CoverImgUrl_1_1", None)
-                    CoverImgUrl_235_1 = article.get("CoverImgUrl_235_1", None)
-                    ItemShowType = article.get("ItemShowType", None)
-                    IsOriginal = article.get("IsOriginal", None)
-                    ShowDesc = article.get("ShowDesc", None)
-                    show_stat = functions.show_desc_to_sta(ShowDesc)
-                    ori_content = article.get("ori_content", None)
-                    show_view_count = show_stat.get("show_view_count", 0)
-                    show_like_count = show_stat.get("show_like_count", 0)
-                    show_zs_count = show_stat.get("show_zs_count", 0)
-                    show_pay_count = show_stat.get("show_pay_count", 0)
-                    wx_sn = ContentUrl.split("&sn=")[1].split("&")[0] if ContentUrl else None
-                    status = account_info['using_status']
-                    info_tuple = (
-                        gh_id,
-                        account_name,
-                        appMsgId,
-                        title,
-                        Type,
-                        createTime,
-                        updateTime,
-                        Digest,
-                        ItemIndex,
-                        ContentUrl,
-                        SourceUrl,
-                        CoverImgUrl,
-                        CoverImgUrl_1_1,
-                        CoverImgUrl_235_1,
-                        ItemShowType,
-                        IsOriginal,
-                        ShowDesc,
-                        ori_content,
-                        show_view_count,
-                        show_like_count,
-                        show_zs_count,
-                        show_pay_count,
-                        wx_sn,
-                        json.dumps(baseInfo, ensure_ascii=False),
-                        functions.str_to_md5(title),
-                        status
-                    )
-                    self.insert_each_article(
-                        info_tuple=info_tuple,
-                        show_view_count=show_view_count,
-                        show_like_count=show_like_count,
-                        wx_sn=wx_sn
-                    )
-
-    def insert_each_article(self, info_tuple, show_view_count, show_like_count, wx_sn):
-        """
-        插入每一篇文章
-        """
-        try:
-            insert_sql = f"""
-                    INSERT INTO {ARTICLE_TABLE}
-                    (ghId, accountName, appMsgId, title, Type, createTime, updateTime, Digest, ItemIndex, ContentUrl, SourceUrl, CoverImgUrl, CoverImgUrl_1_1, CoverImgUrl_255_1, ItemShowType, IsOriginal, ShowDesc, ori_content, show_view_count, show_like_count, show_zs_count, show_pay_count, wx_sn, baseInfo, title_md5, status)
-                    values
-                    (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
-            """
-            self.piaoquan_crawler_db_client.save(query=insert_sql, params=info_tuple)
-            log(
-                task="updatePublishedMsgDaily",
-                function="insert_each_msg",
-                message="插入文章数据成功",
-                data={
-                    "info": info_tuple
-                }
-            )
-        except Exception as e:
-            try:
-                update_sql = f"""
-                    UPDATE {ARTICLE_TABLE}
-                    SET show_view_count = %s, show_like_count=%s
-                    WHERE wx_sn = %s;
-                """
-                self.piaoquan_crawler_db_client.save(query=update_sql,
-                                                     params=(show_view_count, show_like_count, wx_sn))
-                log(
-                    task="updatePublishedMsgDaily",
-                    function="insert_each_msg",
-                    message="更新文章数据成功",
-                    data={
-                        "wxSn": wx_sn,
-                        "likeCount": show_like_count,
-                        "viewCount": show_view_count
-                    }
-
-                )
-            except Exception as e:
-                log(
-                    task="updatePublishedMsgDaily",
-                    function="insert_each_msg",
-                    message="更新文章失败, 报错原因是: {}".format(e),
-                    status="fail"
-                )
-
-    def update_account_by_spider(self, account_info: Dict, cursor=None):
-        """
-        更新每一个账号信息
-        :param account_info:
-        :param cursor:
-        :return: None
-        """
-        gh_id = account_info['ghId']
-        latest_update_time = self.get_account_info(gh_id)
-        response = spider.update_msg_list(ghId=gh_id, index=cursor)
-        if not response:
-            log(
-                task="updatePublishedMsgDaily",
-                function="update_account_by_spider",
-                status="fail",
-                message="账号更新请求爬虫接口失败",
-                data=account_info
-            )
-            return
-        msg_list = response.get("data", {}).get("data", [])
-        if msg_list:
-            # do
-            last_article_in_this_msg = msg_list[-1]
-            last_time_stamp_in_this_msg = last_article_in_this_msg['AppMsg']['BaseInfo']['UpdateTime']
-            last_url = last_article_in_this_msg['AppMsg']['DetailInfo'][0]['ContentUrl']
-            resdata = spider.get_account_by_url(last_url)
-            check_id = resdata['data'].get('data', {}).get('wx_gh')
-            if check_id == gh_id:
-                self.insert_each_msg(
-                    account_info=account_info,
-                    msg_list=msg_list
-                )
-                # if last_time_stamp_in_this_msg > latest_update_time:
-                #     next_cursor = response['data']['next_cursor']
-                #     return self.update_account_by_spider(
-                #         account_info=account_info,
-                #         cursor=next_cursor
-                #     )
-                log(
-                    task="updatePublishedMsgDaily",
-                    function="update_each_account",
-                    message="账号文章更新成功",
-                    data=response
-                )
-        else:
-            log(
-                task="updatePublishedMsgDaily",
-                function="update_each_account",
-                message="账号文章更新失败",
-                status="fail",
-                data=response
-            )
-            return
-
-    def update_account_by_aigc(self, account_info: Dict, run_date: str):
-        """
-        更新单个账号的文章
-        """
-        gh_id = account_info['ghId']
-        select_sql = f"""
-            SELECT trace_id, wx_sn, published_url, publish_timestamp, root_source_id_list, create_timestamp
-            FROM long_articles_published_trace_id
-            WHERE gh_id = '{gh_id}' AND publish_timestamp > UNIX_TIMESTAMP(DATE_SUB('{run_date}', INTERVAL 3 DAY)) AND delete_status = 0;
-        """
-        result = self.long_articles_db_client.fetch(select_sql, cursor_type=DictCursor)
-        for article in result:
-            trace_id = article['trace_id']
-            wx_sn = article['wx_sn']
-            published_url = article['published_url']
-            publish_timestamp = article['publish_timestamp']
-            article_info = spider.get_article_text(content_link=published_url, is_cache=False, is_count=True)
-            response_code = article_info['code']
-            match response_code:
-                case const.ARTICLE_SUCCESS_CODE:
-                    response_data = article_info['data']['data']
-                    title = response_data['title']
-                    article_url = response_data['content_link']
-                    show_view_count = response_data['view_count']
-                    show_like_count = response_data['like_count']
-                    show_zs_count = 0
-                    show_pay_count = 0
-                    wx_sn = article_url.split("&sn=")[1].split("&")[0] if article_url else None
-                    app_msg_id = article_url.split("&mid=")[1].split("&")[0] if article_url else None
-                    status = account_info['using_status']
-                    info_tuple = (
-                        gh_id,
-                        account_info['name'],
-                        app_msg_id,
-                        title,
-                        "9",
-                        article['create_timestamp'],
-                        response_data['update_timestamp'],
-                        None,
-                        response_data['item_index'],
-                        response_data['content_link'],
-                        None,
-                        None,
-                        None,
-                        None,
-                        None,
-                        response_data.get("is_original", None),
-                        None,
-                        None,
-                        show_view_count,
-                        show_like_count,
-                        show_zs_count,
-                        show_pay_count,
-                        wx_sn,
-                        None,
-                        functions.str_to_md5(title),
-                        status
-                    )
-                    self.insert_each_article(
-                        info_tuple=info_tuple,
-                        show_view_count=show_view_count,
-                        show_like_count=show_like_count,
-                        wx_sn=wx_sn
-                    )
-
-                case const.ARTICLE_DELETE_CODE:
-                    log(
-                        task="updatePublishedMsgDaily",
-                        function="update_account_by_aigc",
-                        message="文章被删除",
-                        data={
-                            "ghId": gh_id,
-                            "publishedUrl": published_url
-                        }
-                    )
-
-                case const.ARTICLE_ILLEGAL_CODE:
-                    article_detail = self.get_article_info_by_trace_id(trace_id)
-                    if article_detail:
-                        error_detail = article_info.get("msg")
-                        insert_sql = f"""
-                                INSERT IGNORE INTO illegal_articles 
-                                (gh_id, account_name, title, wx_sn, publish_date, illegal_reason)
-                                VALUES 
-                                (%s, %s, %s, %s, %s, %s);
-                            """
-
-                        affected_rows = self.long_articles_db_client.save(
-                            query=insert_sql,
-                            params=(
-                                article_info['gh_id'],
-                                article_info['account_name'],
-                                article_info['article_title'],
-                                wx_sn,
-                                functions.timestamp_to_str(publish_timestamp),
-                                error_detail
-                            )
-                        )
-                        if affected_rows:
-                            bot(
-                                title="文章违规告警(new task)",
-                                detail={
-                                    "account_name": article_info['account_name'],
-                                    "gh_id": article_info['gh_id'],
-                                    "title": article_info['article_title'],
-                                    "wx_sn": wx_sn,
-                                    "publish_date": functions.timestamp_to_str(publish_timestamp),
-                                    "error_detail": error_detail,
-                                },
-                                mention=False
-                            )
-                            aiditApi.delete_articles(
-                                gh_id=article_info['gh_id'],
-                                title=article_info['article_title']
-                            )
-
-    def get_account_info(self, gh_id: str) -> int:
-        """
-        通过 gh_id查询账号信息的最新发布时间
-        :param gh_id:
-        :return:
-        """
-        sql = f"""
-            SELECT MAX(publish_timestamp)
-            FROM {ARTICLE_TABLE}
-            WHERE ghId = '{gh_id}';
-            """
-        result = self.piaoquan_crawler_db_client.fetch(sql)
-        if result:
-            return result[0][0]
-        else:
-            # 新号,抓取周期定位抓取时刻往前推30天
-            return int(time.time()) - const.NEW_ACCOUNT_CRAWL_PERIOD
-
-    def check_single_account(self, account_item: Dict) -> bool:
-        """
-        校验每个账号是否更新
-        :param account_item:
-        :return: True / False
-        """
-        gh_id = account_item['ghId']
-        account_type = account_item['account_type']
-        today_str = datetime.today().strftime("%Y-%m-%d")
-        today_date_time = datetime.strptime(today_str, "%Y-%m-%d")
-        today_timestamp = today_date_time.timestamp()
-        sql = f"""
-                SELECT max(updateTime)
-                FROM {ARTICLE_TABLE}
-                WHERE ghId = '{gh_id}';
-                """
-        try:
-            latest_update_time = self.piaoquan_crawler_db_client.fetch(sql)[0][0]
-            # 判断该账号当天发布的文章是否被收集
-            if account_type in const.SUBSCRIBE_TYPE_SET:
-                if int(latest_update_time) > int(today_timestamp):
-                    return True
-                else:
-                    return False
-            else:
-                if int(latest_update_time) > int(today_timestamp) - 7 * 24 * 3600:
-                    return True
-                else:
-                    return False
-        except Exception as e:
-            print(e)
-            return False
-
-    def process_single_account(self, account_info: Dict, run_date: str):
-        """
-        处理单个账号
-        """
-        gh_id = account_info['ghId']
-        # 判断该账号当天是否有自动群发且没有无限流发表
-        select_sql = f"""
-            SELECT push_type
-            FROM long_articles_published_trace_id
-            WHERE gh_id = '{gh_id}' AND publish_timestamp > UNIX_TIMESTAMP('{run_date}');
-        """
-        response = self.long_articles_db_client.fetch(select_sql, cursor_type=DictCursor)
-        UNLIMITED_PUSH = 3
-        if response:
-            unlimited_push_list = [item for item in response if item['push_type'] == UNLIMITED_PUSH]
-            if unlimited_push_list:
-                self.update_account_by_spider(account_info=account_info)
-            else:
-                print("By AIGC", account_info)
-                self.update_account_by_aigc(account_info=account_info, run_date=run_date)
-        else:
-            self.update_account_by_spider(account_info=account_info)
-
-    def update_publish_timestamp(self, article_info: Dict):
-        """
-        更新发布时间戳 && minigram 信息
-        :param article_info:
-        :return:
-        """
-        url = article_info['ContentUrl']
-        wx_sn = article_info['wx_sn']
-        try:
-            response = spider.get_article_text(url)
-            response_code = response['code']
-
-            if response_code == const.ARTICLE_DELETE_CODE:
-                publish_timestamp_s = const.DELETE_STATUS
-                root_source_id_list = []
-            elif response_code == const.ARTICLE_ILLEGAL_CODE:
-                publish_timestamp_s = const.ILLEGAL_STATUS
-                root_source_id_list = []
-            elif response_code == const.ARTICLE_SUCCESS_CODE:
-                data = response['data']['data']
-                publish_timestamp_ms = data['publish_timestamp']
-                publish_timestamp_s = int(publish_timestamp_ms / 1000)
-                mini_program = data.get('mini_program', [])
-                if mini_program:
-                    root_source_id_list = [
-                        urllib.parse.parse_qs(
-                            urllib.parse.unquote(i['path'])
-                        )['rootSourceId'][0]
-                        for i in mini_program
-                    ]
-                else:
-                    root_source_id_list = []
-            else:
-                publish_timestamp_s = const.UNKNOWN_STATUS
-                root_source_id_list = []
-        except Exception as e:
-            publish_timestamp_s = const.REQUEST_FAIL_STATUS
-            root_source_id_list = None
-            error_msg = traceback.format_exc()
-            print(e, error_msg)
-
-        update_sql = f"""
-                UPDATE {ARTICLE_TABLE}
-                SET publish_timestamp = %s, root_source_id_list = %s
-                WHERE wx_sn = %s;
-            """
-        self.piaoquan_crawler_db_client.save(
-            query=update_sql,
-            params=(
-                publish_timestamp_s,
-                json.dumps(root_source_id_list, ensure_ascii=False),
-                wx_sn
-            ))
-        if publish_timestamp_s == const.REQUEST_FAIL_STATUS:
-            return article_info
-        else:
-            return None
-
-    def update_job(self, biz_date: str = None):
-        """
-        执行更新任务
-        """
-        account_list = self.get_account_list()
-        if not biz_date:
-            biz_date = datetime.today().strftime('%Y-%m-%d')
-
-        # 处理订阅号
-        subscription_accounts = [i for i in account_list if i['account_type'] in const.SUBSCRIBE_TYPE_SET]
-        success_count = 0
-        fail_count = 0
-        for account in tqdm(subscription_accounts):
-            try:
-                self.process_single_account(account_info=account, run_date=biz_date)
-                success_count += 1
-                time.sleep(3)
-            except Exception as e:
-                fail_count += 1
-                log(
-                    task="updatePublishedMsgDaily",
-                    function="update_job",
-                    message="单个账号文章更新失败, 报错信息是: {}".format(e),
-                    status="fail",
-                )
-        log(
-            task="updatePublishedMsgDaily",
-            function="update_job",
-            message="订阅号更新完成",
-            data={
-                "success": success_count,
-                "fail": fail_count
-            }
-        )
-        if fail_count / (success_count + fail_count) > const.SUBSCRIBE_FAIL_RATE_THRESHOLD:
-            bot(
-                title="订阅号超过 {}% 的账号更新失败".format(int(const.SUBSCRIBE_FAIL_RATE_THRESHOLD * 100)),
-                detail={
-                    "success": success_count,
-                    "fail": fail_count,
-                    "failRate": fail_count / (success_count + fail_count)
-                }
-            )
-        bot(
-            title="更新每日发布文章任务完成通知(new)",
-            detail={
-                "msg": "订阅号更新完成",
-                "finish_time": datetime.today().__str__()
-            },
-            mention=False
-        )
-
-        # 服务号
-        server_accounts = [i for i in account_list if i['account_type'] == const.SERVICE_TYPE]
-        for account in tqdm(server_accounts):
-            try:
-                self.process_single_account(account_info=account, run_date=biz_date)
-                time.sleep(1)
-            except Exception as e:
-                print(e)
-        bot(
-            title="更新每日发布文章任务完成通知(new)",
-            detail={
-                "msg": "服务号更新完成",
-                "finish_time": datetime.today().__str__()
-            },
-            mention=False
-        )
-
-    def check_job(self, biz_date: str = None):
-        """
-        执行检查任务,check each account
-        """
-        if not biz_date:
-            biz_date = datetime.today().strftime('%Y-%m-%d')
-
-        account_list = self.get_account_list()
-        subscription_accounts = [i for i in account_list if i['account_type'] in const.SUBSCRIBE_TYPE_SET]
-        fail_list = []
-        # check and rework if fail
-        for sub_item in tqdm(subscription_accounts):
-            res = self.check_single_account(sub_item)
-            if not res:
-                self.process_single_account(sub_item, biz_date)
-
-        # check whether success and bot if fails
-        for sub_item in tqdm(subscription_accounts):
-            res = self.check_single_account(sub_item)
-            if not res:
-                # 去掉三个不需要查看的字段
-                sub_item.pop('account_type', None)
-                sub_item.pop('account_auth', None)
-                sub_item.pop('account_id', None)
-                fail_list.append(sub_item)
-        if fail_list:
-            try:
-                bot(
-                    title="更新当天发布文章,存在未更新的账号(new)",
-                    detail={
-                        "columns": generate_bot_columns(),
-                        "rows": fail_list
-                    },
-                    table=True
-                )
-            except Exception as e:
-                print("Timeout Error: {}".format(e))
-        else:
-            bot(
-                title="更新当天发布文章,所有账号均更新成功(new)",
-                mention=False,
-                detail={
-                    "msg": "校验任务完成",
-                    "finish_time": datetime.today().__str__()
-                }
-            )
-
-    def get_article_detail_job(self):
-        """
-        获取发布文章详情
-        :return:
-        """
-        select_sql = f"""
-            SELECT ContentUrl, wx_sn 
-            FROM {ARTICLE_TABLE}
-            WHERE publish_timestamp in {(const.DEFAULT_STATUS, const.REQUEST_FAIL_STATUS)};
-        """
-        article_list = self.piaoquan_crawler_db_client.fetch(select_sql, cursor_type=DictCursor)
-        for article in tqdm(article_list):
-            try:
-                self.update_publish_timestamp(article)
-            except Exception as e:
-                print(e)
-                error_msg = traceback.format_exc()
-                print(error_msg)
-        # check 一遍存在请求失败-1 && 0 的文章
-        select_sql = f"""
-                    SELECT ContentUrl, wx_sn 
-                    FROM {ARTICLE_TABLE}
-                    WHERE publish_timestamp in {(const.DEFAULT_STATUS, const.REQUEST_FAIL_STATUS)};
-                """
-        process_failed_articles = self.piaoquan_crawler_db_client.fetch(select_sql, cursor_type=DictCursor)
-        fail_list = []
-        if process_failed_articles:
-            for article in tqdm(process_failed_articles):
-                try:
-                    res = self.update_publish_timestamp(article)
-                    fail_list.append(res)
-                except Exception as e:
-                    print(e)
-                    error_msg = traceback.format_exc()
-                    print(error_msg)
-
-        # 通过msgId 来修改publish_timestamp
-        update_sql = f"""
-            UPDATE {ARTICLE_TABLE} oav 
-            JOIN (
-                SELECT ghId, appMsgId, MAX(publish_timestamp) AS publish_timestamp 
-                FROM {ARTICLE_TABLE} 
-                WHERE publish_timestamp > %s 
-                GROUP BY ghId, appMsgId
-                ) vv
-                ON oav.appMsgId = vv.appMsgId and oav.ghId = vv.ghId
-            SET oav.publish_timestamp = vv.publish_timestamp
-            WHERE oav.publish_timestamp <= %s;
-        """
-        self.piaoquan_crawler_db_client.save(
-            query=update_sql,
-            params=(0, 0)
-        )
-
-        # 若还是无 publish_timestamp,用update_time当作 publish_timestamp
-        update_sql_2 = f"""
-            UPDATE {ARTICLE_TABLE}
-            SET publish_timestamp = updateTime
-            WHERE publish_timestamp < %s;
-        """
-        self.piaoquan_crawler_db_client.save(
-            query=update_sql_2,
-            params=0
-        )
-        if fail_list:
-            bot(
-                title="更新文章任务,请求detail失败",
-                detail=fail_list
-            )