소스 검색

merge master

luojunhui 7 달 전
부모
커밋
1d89d91030

+ 2 - 3
applications/api/moon_shot_api.py

@@ -12,15 +12,14 @@ generate_program_title_prompt = """
     1.要点前置:将最重要的信息放在标题的最前面,以快速吸引读者的注意力。例如,“5月一辈子同学,三辈子亲,送给我的老同学,听哭无数人!”中的“5月”和“一辈子同学,三辈子亲”都是重要的信息点。
     2.激发情绪:使用能够触动人心的语言,激发读者的情感共鸣。如“只剩两人同学聚会,看后感动落泪。”使用“感动落泪”激发读者的同情和怀旧情绪。
     3.使用数字和特殊符号:数字可以提供具体性,而特殊符号如“🔴”、“😄”、“🔥”等可以吸引视觉注意力,增加点击率。
-    4.悬念和好奇心:创建悬念或提出问题,激发读者的好奇心。例如,“太神奇了!长江水位下降,重庆出现惊奇一幕!”中的“惊奇一幕”就是一个悬念。
+    4.悬念和好奇心:创建悬念或提出问题,激发读者的好奇心。
     5.名人效应:如果内容与知名人士相关,提及他们的名字可以增加标题的吸引力。
     6.社会价值观:触及读者的文化和社会价值观,如家庭、友情、国家荣誉等。
     7.标点符号的运用:使用感叹号、问号等标点来增强语气和情感表达。
     8.直接的语言:使用直白、口语化的语言,易于理解,如“狗屁股,笑死我了!”。
     9.热点人物或事件:提及当前的热点人物或事件,利用热点效应吸引读者。
     10.字数适中:保持标题在10-20个字之间,既不过长也不过短,确保信息的完整性和吸引力。
-    11.适当的紧迫感:使用“最新”、“首次”、“紧急”等词汇,创造一种紧迫感,促使读者立即行动。
-    12.情感或价值诉求:使用如“感动”、“泪目”、“经典”等词汇,直接与读者的情感或价值观产生共鸣。
+    11.情感或价值诉求:使用如“感动”、“泪目”、“经典”等词汇,直接与读者的情感或价值观产生共鸣。
     避免误导:确保标题准确反映内容,避免夸大或误导读者。
     """
 

+ 46 - 3
applications/api/nlp_api.py

@@ -2,6 +2,9 @@
 @author: luojunhui
 """
 import requests
+from requests.exceptions import RequestException, JSONDecodeError
+
+from applications.feishuBotApi import bot
 
 
 def similarity_between_title_list(target_title_list: list[str], base_title_list: list[str]) -> list[list[float]]:
@@ -11,7 +14,9 @@ def similarity_between_title_list(target_title_list: list[str], base_title_list:
     :param base_title_list: base title_list
     :return: list of similarity
     """
+
     url = 'http://61.48.133.26:6060/nlp'
+    url_backup = 'http://61.48.133.26:6061/nlp'
     body = {
         "data": {
             "text_list_a": target_title_list,
@@ -20,7 +25,45 @@ def similarity_between_title_list(target_title_list: list[str], base_title_list:
         "function": "similarities_cross",
         "use_cache": False
     }
-    response_json = requests.post(url, json=body, timeout=120).json()
-    score_array = response_json['score_list_list']
-    return score_array
 
+    try:
+        response = requests.post(url, json=body, timeout=120)
+        if response.status_code != 200:
+            response = requests.post(url_backup, json=body, timeout=120)
+    except RequestException as e:
+        bot(
+            title='NLP API 网络异常',
+            detail={
+                "error_type": type(e).__name__,
+                "error_msg": str(e)
+            },
+            mention=False
+        )
+        return []
+
+    if response.status_code != 200:
+        bot(
+            title='NLP API 业务异常',
+            detail={
+                "status_code": response.status_code,
+                "response_text": response.text[:200]  # 截取部分内容避免过大
+            },
+            mention=False
+        )
+        return []
+
+    try:
+        response_json = response.json()
+        score_array = response_json['score_list_list']
+    except (JSONDecodeError, KeyError) as e:
+        bot(
+            title='NLP响应数据异常',
+            detail={
+                "error_type": type(e).__name__,
+                "raw_response": response.text[:200]
+            },
+            mention=False
+        )
+        return []
+
+    return score_array

+ 64 - 1
applications/const/__init__.py

@@ -4,7 +4,7 @@
 """
 
 
-class coldStartTaskConst:
+class ColdStartTaskConst:
     """
     冷启动任务常量配置
     """
@@ -12,6 +12,44 @@ class coldStartTaskConst:
     INIT_STATUS = 1  # 文章初始状态
     BAD_STATUS = 0  # 低质量文章状态
 
+    # 常量
+    ACCOUNT_GOOD_STATUS = 1
+
+    # 账号是否每日抓取
+    ACCOUNT_DAILY_SCRAPE = 1
+    ACCOUNT_NOT_DAILY_SCRAPE = 0
+
+    # 默认值
+    DEFAULT_VIEW_COUNT = 0
+    DEFAULT_LIKE_COUNT = 0
+    DEFAULT_ARTICLE_STATUS = 1
+    DEFAULT_TIMESTAMP = 1717171200
+
+    # 标题sensitivity
+    TITLE_SENSITIVE = 1
+    TITLE_NOT_SENSITIVE = 0
+
+    # 文章联想深度
+    ARTICLE_ASSOCIATION_MAX_DEPTH = 4
+
+    # 相关分百分位阈值
+    PERCENT_THRESHOLD = 95
+
+    # 相关性分阈值
+    CORRELATION_THRESHOLD = 0.5
+
+    # 阅读量阈值
+    READ_COUNT_THRESHOLD = 1000
+
+    # 阅读均值倍数阈值
+    READ_AVG_THRESHOLD = 1.3
+
+    # 群发类型
+    BULK_PUBLISH_TYPE = 9
+
+    # 种子文章数量
+    SEED_ARTICLE_LIMIT_NUM = 30
+
 
 class updatePublishedMsgTaskConst:
     """
@@ -208,3 +246,28 @@ class ArticleCollectorConst:
     ARTICLE_SUCCESS_CODE = 0
     ARTICLE_UNKNOWN_CODE = 10000
 
+
+class BaiduVideoCrawlerConst:
+    """
+    const for baidu video crawler
+    """
+    # account status
+    BAIDU_ACCOUNT_GOOD_STATUS = 1
+    BAIDU_ACCOUNT_BAD_STATUS = 0
+
+    # earliest cursor, 2024-01-01 00:00:00
+    DEFAULT_CURSOR = 17040384000000
+
+    # no source account
+    NO_SOURCE_ACCOUNT_STATUS = 0
+
+    # timestamp To Cursor
+    TIMESTAMP_TO_CURSOR = 10000
+
+    # local path dir
+    LOCAL_PATH_DIR = "static"
+
+
+
+
+

+ 12 - 8
applications/db/__init__.py

@@ -30,12 +30,12 @@ class DatabaseConnector:
         """
         try:
             self.connection = pymysql.connect(
-                host=self.db_config.get('host', 'localhost'),
-                user=self.db_config['user'],
-                password=self.db_config['password'],
-                db=self.db_config['db'],
-                port=self.db_config.get('port', 3306),
-                charset=self.db_config.get('charset', 'utf8mb4')
+                host=self.db_config.get("host", "localhost"),
+                user=self.db_config["user"],
+                password=self.db_config["password"],
+                db=self.db_config["db"],
+                port=self.db_config.get("port", 3306),
+                charset=self.db_config.get("charset", "utf8mb4"),
             )
         except pymysql.MySQLError as e:
             raise ConnectionError(f"无法连接到数据库: {e}")
@@ -48,9 +48,10 @@ class DatabaseConnector:
             self.connection.close()
             self.connection = None
 
-    def fetch(self, query, cursor_type=None):
+    def fetch(self, query, cursor_type=None, params=None):
         """
         执行单条查询语句,并返回结果。
+        :param params: 查询传参
         :param cursor_type: 输出的返回格式
         :param query: 查询语句
         :return: 查询结果列表
@@ -61,7 +62,10 @@ class DatabaseConnector:
 
         try:
             with self.connection.cursor(cursor_type) as cursor:
-                cursor.execute(query)
+                if params:
+                    cursor.execute(query, params)
+                else:
+                    cursor.execute(query)
                 result = cursor.fetchall()
                 return result
         except pymysql.MySQLError as e:

+ 4 - 3
applications/exception/spider_error.py

@@ -7,9 +7,9 @@ from applications import log
 
 
 class SpiderError(Exception):
-    """数据库查询异常"""
+    """spider_task_error"""
 
-    def __init__(self, error=None, spider=None, url=None):
+    def __init__(self, platform=None, error=None, spider=None, url=None):
         """
         :param error: 异常对象,可选,用于提供更详细的错误信息。
         :param spider: 爬虫任务
@@ -22,7 +22,8 @@ class SpiderError(Exception):
         }
         log(
             task="spider_task",
-            function="log_spider_error",
+            function="{}".format(platform),
+            message="{} 抓取失败".format(spider),
             data=error_obj
         )
         super().__init__(json.dumps(error_obj, ensure_ascii=False, indent=4))

+ 25 - 0
applications/functions.py

@@ -296,3 +296,28 @@ class Functions(object):
         params = parse_qs(urlparse(url).query)
         info = params.get(key, [])
         return info[0] if info else None
+
+    @classmethod
+    def download_baidu_videos(cls, video_url, save_path):
+        """
+        :param video_url: baidu video url
+        :param save_path: save path
+        """
+        if os.path.exists(save_path):
+            return save_path
+
+        response = requests.get(
+            video_url,
+            headers={
+                'User-Agent': FakeUserAgent().chrome,
+                "Accept": "*/*",
+                "Accept-Language": "zh-CN,zh;q=0.9"
+            }
+        )
+        with open(save_path, 'wb') as f:
+            f.write(response.content)
+        TEN_KB = 1024 * 10
+        if os.path.getsize(save_path) > TEN_KB:
+            return save_path
+        else:
+            return None

+ 3 - 3
applications/llm_sensitivity.py

@@ -8,8 +8,8 @@ from openai import OpenAI
 
 def request_llm_api(prompt, text):
     client = OpenAI(
-        api_key='sk-c1b18099dadc4dd1b48239bdde184f6c',
-        base_url="https://api.deepseek.com"
+        api_key='5e275c38-44fd-415f-abcf-4b59f6377f72',
+        base_url="https://ark.cn-beijing.volces.com/api/v3"
     )
     chat_completion = client.chat.completions.create(
         messages=[
@@ -18,7 +18,7 @@ def request_llm_api(prompt, text):
                 "content": prompt + text,
             }
         ],
-        model="deepseek-chat",
+        model="ep-20250213194558-rrmr2", # deepseek-v3
         temperature=0.2,
         response_format={"type": "json_object"}
     )

+ 4 - 0
applications/utils/__init__.py

@@ -0,0 +1,4 @@
+"""
+utils
+"""
+from .cold_start import *

+ 30 - 0
applications/utils/cold_start.py

@@ -0,0 +1,30 @@
+"""
+@author: luojunhui
+"""
+import json
+
+from applications import aiditApi
+from config import apolloConfig
+
+config = apolloConfig()
+sensitive_word_list = json.loads(config.getConfigValue("sensitive_word_list"))
+
+
+def whether_title_sensitive(title: str) -> bool:
+    """
+    : param title:
+    判断视频是否的标题是否包含敏感词
+    """
+    for word in sensitive_word_list:
+        if word in title:
+            return True
+    return False
+
+
+def get_inner_account_set() -> set:
+    """
+    get inner account set
+    """
+    accounts = aiditApi.get_publish_account_from_aigc()
+    gh_id_list = [i['ghId'] for i in accounts]
+    return set(gh_id_list)

+ 59 - 23
applications/wxSpiderApi.py

@@ -1,9 +1,12 @@
 """
 @author: luojunhui
 """
+
 import json
+import time
 import requests
 
+from applications.aliyunLogApi import log
 from applications.decoratorApi import retryOnNone
 
 
@@ -11,13 +14,12 @@ class WeixinSpider(object):
     """
     Update account articles
     """
+
     # ip = "8.217.190.241"
     # ip = "47.98.154.124"
     # port = "8888"
     base_url = "http://crawler-cn.aiddit.com/crawler/wei_xin"
-    headers = {
-        "Content-Type": "application/json"
-    }
+    headers = {"Content-Type": "application/json"}
 
     @classmethod
     @retryOnNone()
@@ -27,11 +29,10 @@ class WeixinSpider(object):
         :return:
         """
         url = "{}/keyword".format(cls.base_url)
-        payload = json.dumps({
-            "keyword": title,
-            "cursor": page
-        })
-        response = requests.request("POST", url, headers=cls.headers, data=payload, timeout=120)
+        payload = json.dumps({"keyword": title, "cursor": page})
+        response = requests.request(
+            "POST", url, headers=cls.headers, data=payload, timeout=120
+        )
         return response.json()
 
     @classmethod
@@ -45,13 +46,17 @@ class WeixinSpider(object):
         :return:
         """
         url = "{}/detail".format(cls.base_url)
-        payload = json.dumps({
-            "content_link": content_link,
-            "is_count": is_count,
-            "is_ad": False,
-            "is_cache": is_cache
-        })
-        response = requests.request("POST", url, headers=cls.headers, data=payload, timeout=120)
+        payload = json.dumps(
+            {
+                "content_link": content_link,
+                "is_count": is_count,
+                "is_ad": False,
+                "is_cache": is_cache,
+            }
+        )
+        response = requests.request(
+            "POST", url, headers=cls.headers, data=payload, timeout=120
+        )
         return response.json()
 
     @classmethod
@@ -60,12 +65,14 @@ class WeixinSpider(object):
         """
         :return:
         """
-        url = '{}/blogger'.format(cls.base_url)
+        url = "{}/blogger".format(cls.base_url)
         payload = {
-            'account_id': ghId,
-            'cursor': index,
+            "account_id": ghId,
+            "cursor": index,
         }
-        response = requests.post(url=url, headers=cls.headers, data=json.dumps(payload), timeout=120)
+        response = requests.post(
+            url=url, headers=cls.headers, data=json.dumps(payload), timeout=120
+        )
         return response.json()
 
     @classmethod
@@ -76,9 +83,11 @@ class WeixinSpider(object):
         :param content_url:
         :return:
         """
-        url = '{}/account_info'.format(cls.base_url)
+        url = "{}/account_info".format(cls.base_url)
         data = {"content_link": content_url}
-        response = requests.request("POST", url=url, headers=cls.headers, json=data, timeout=120)
+        response = requests.request(
+            "POST", url=url, headers=cls.headers, json=data, timeout=120
+        )
         return response.json()
 
     @classmethod
@@ -89,8 +98,35 @@ class WeixinSpider(object):
         :return:
         """
         url = "{}/recommend".format(cls.base_url)
+        payload = json.dumps({"content_link": content_link})
+        response = requests.request(
+            "POST", url=url, headers=cls.headers, data=payload, timeout=120
+        )
+        response_json = response.json()
+        if response_json["code"] != 0:
+            return cls.get_recommend_articles(content_link)
+        time.sleep(3)
+        return response.json()
+
+    @classmethod
+    def get_recommend_articles_v2(cls, content_link) -> dict:
+        """
+        use content link to get recommend articles
+        :param content_link:
+        :return:
+        """
+        url = "http://datapi.top/wxapi/relatedarticle"
         payload = json.dumps(
-            {"content_link": content_link}
+            {"content_link": content_link, "token": "401e4d3c85068bb5"}
+        )
+        response = requests.request(
+            "POST", url=url, headers=cls.headers, data=payload, timeout=120
+        )
+        log(
+            task="article_association_crawler",
+            function="get_recommend_articles_v2",
+            message="获取推荐链接,付费接口",
+            data={"content_link": content_link, "response": response.json()},
         )
-        response = requests.request("POST", url=url, headers=cls.headers, data=payload, timeout=120)
+        time.sleep(3)
         return response.json()

+ 27 - 0
article_association_task.py

@@ -0,0 +1,27 @@
+"""
+@author: luojunhui
+"""
+from argparse import ArgumentParser
+
+from coldStartTasks.crawler.wechat import ArticleAssociationCrawler
+
+
+def main():
+    """
+    main function
+    """
+    parser = ArgumentParser()
+    parser.add_argument("--biz_date", type=str, help="format 2025-01-01")
+    args = parser.parse_args()
+
+    if args.biz_date:
+        biz_date = args.biz_date
+    else:
+        biz_date = None
+
+    article_association_crawler = ArticleAssociationCrawler()
+    article_association_crawler.deal(biz_date=biz_date)
+
+
+if __name__ == "__main__":
+    main()

+ 1 - 0
coldStartTasks/crawler/baidu/__init__.py

@@ -0,0 +1 @@
+from .video_crawler import BaiduVideoCrawler

+ 1 - 1
coldStartTasks/publish/publishArticleAssociationArticles.py → coldStartTasks/crawler/baidu/account_crawler.py

@@ -1,4 +1,4 @@
 """
 @author: luojunhui
-发布i2i文章
 """
+

+ 96 - 0
coldStartTasks/crawler/baidu/baidu_spider.py

@@ -0,0 +1,96 @@
+"""
+@author: luojunhui
+"""
+import base64
+import uuid
+
+import requests
+
+from fake_useragent import FakeUserAgent
+
+from applications.exception import SpiderError
+from applications import Functions
+
+functions = Functions()
+
+
+def baidu_account_video_crawler(account_id, cursor=None):
+    """
+    baidu account video crawler
+    :param account_id: 百度账号id
+    :param cursor: 游标, 默认为None,表示从最新的开始爬取
+    success requests:
+    """
+    cookie_str = uuid.uuid4().__str__().replace('-', '').upper()
+    url = "https://haokan.baidu.com/web/author/listall?"
+    params = {
+        'app_id': account_id,
+        'ctime': cursor,
+        'rn': 10,
+        'searchAfter': '',
+        '_api': 1
+    }
+    headers = {
+        'Accept': '*/*',
+        'Accept-Language': 'zh,zh-CN;q=0.9',
+        'Connection': 'keep-alive',
+        'Referer': 'https://haokan.baidu.com/author/{}'.format(account_id),
+        'User-Agent': FakeUserAgent().chrome,
+        'x-requested-with': 'xmlhttprequest',
+        'Cookie': 'BAIDUID={}:FG=1; BAIDUID_BFESS={}:FG=1'.format(cookie_str, cookie_str)
+    }
+    try:
+        response = requests.request("GET", url, headers=headers, params=params, proxies=functions.proxy())
+        response_json = response.json()
+        if response_json['errmsg'] == '成功':
+            response_data = response_json['data']
+            return response_data
+        else:
+            raise SpiderError(
+                platform="baidu",
+                spider="account_video_crawler",
+                error=response_json['errmsg'],
+                url=url
+            )
+
+    except Exception as e:
+        raise SpiderError(
+            platform="baidu",
+            spider="account_video_crawler",
+            error=str(e),
+            url=url
+        )
+
+
+def baidu_single_video_crawler(video_id):
+    """
+    baidu video crawler
+    :param video_id: 视频id
+    """
+    url = "https://haokan.baidu.com/v"
+    params = {
+        'vid': video_id,
+        '_format': 'json'
+    }
+    base_64_string = base64.b64encode(str(uuid.uuid4()).encode()).decode()
+    headers = {
+        'Accept': '*/*',
+        'cookie': "BIDUPSID={}".format(base_64_string),
+        'Accept-Language': 'en,zh;q=0.9,zh-CN;q=0.8',
+        'Cache-Control': 'no-cache',
+        'Connection': 'keep-alive',
+        'Content-Type': 'application/x-www-form-urlencoded',
+        'Referer': 'https://haokan.baidu.com',
+        'User-Agent': FakeUserAgent().chrome,
+    }
+    try:
+        response = requests.request("GET", url, headers=headers, params=params, proxies=functions.proxy())
+        response_json = response.json()
+        return response_json['data']['apiData']['curVideoMeta']
+    except Exception as e:
+        raise SpiderError(
+            platform="baidu",
+            spider="single_video_crawler",
+            error=str(e),
+            url=url
+        )

+ 269 - 0
coldStartTasks/crawler/baidu/video_crawler.py

@@ -0,0 +1,269 @@
+"""
+@author: luojunhui
+@description: video crawler
+"""
+
+import os
+import json
+import time
+import traceback
+from typing import List, Dict
+
+from pymysql.cursors import DictCursor
+from tqdm import tqdm
+
+from applications import Functions
+from applications import bot, log
+from applications.const import BaiduVideoCrawlerConst
+from applications.db import DatabaseConnector
+from applications.exception import SpiderError
+from config import long_articles_config
+from coldStartTasks.crawler.baidu.baidu_spider import baidu_account_video_crawler
+from coldStartTasks.crawler.baidu.baidu_spider import baidu_single_video_crawler
+
+const = BaiduVideoCrawlerConst()
+empty_list = []
+functions = Functions()
+
+
+class BaiduVideoCrawler(object):
+    """
+    baidu video crawler
+    """
+
+    def __init__(self):
+        self.db = None
+        self.success_crawler_video_count = 0
+        self.connect_db()
+
+    def connect_db(self) -> None:
+        """
+        connect db
+        """
+        self.db = DatabaseConnector(db_config=long_articles_config)
+        self.db.connect()
+
+    def get_account_list(self) -> List[Dict]:
+        """
+        get account list
+        """
+        sql = f"""
+            select account_id, account_name, max_cursor 
+            from baidu_account_for_videos
+            where status = {const.BAIDU_ACCOUNT_GOOD_STATUS};
+        """
+        account_list = self.db.fetch(query=sql, cursor_type=DictCursor)
+        return account_list
+
+    def whether_video_exists(self, title: str) -> bool:
+        """
+        whether video exists, use video_id && title
+        """
+        # check title
+        sql = f"""
+            select id from publish_single_video_source
+            where article_title = %s;
+        """
+        duplicate_id = self.db.fetch(query=sql, params=(title,))
+        if duplicate_id:
+            print(title + " video exists")
+            return True
+
+        return False
+
+    def save_each_video(self, video: Dict, account_id: str, account_name: str) -> None:
+        """
+        download and save each video
+        """
+        # print(json.dumps(video, ensure_ascii=False, indent=4))
+        video_id = video["id"]
+        title = video["title"]
+
+        # judge whether video exists
+        if self.whether_video_exists(title):
+            return
+
+        read_cnt = video.get("playcnt", 0)
+        like_cnt = video.get("like_num", 0)
+        publish_timestamp = video["publish_time"]
+        # duration = video['duration']
+        cover_url = video["poster"]
+        video_url = video["playurl"]
+        # sensitive_flag = video.get('sensitive_flag')
+        video_more_info = video.get("contentcms_intervene_data")
+        if video_more_info:
+            video_category_list = video_more_info.get("category_v2")
+            if video_category_list:
+                video_category = video_category_list[0]
+            else:
+                video_category = None
+        else:
+            video_category = None
+        manual_tags = video.get("manual_tags")
+
+        video_path = os.path.join(const.LOCAL_PATH_DIR, "{}.mp4".format(video_id))
+        download_path = functions.download_baidu_videos(video_url, video_path)
+        if download_path:
+            oss_path = functions.upload_to_oss(local_video_path=download_path)
+            insert_sql = f"""
+                INSERT INTO publish_single_video_source
+                (content_trace_id, article_title, out_account_id, out_account_name, read_cnt, like_cnt, article_url, cover_url, video_oss_path, publish_timestamp, crawler_timestamp, url_unique_md5, category, tags, platform, source_account)
+                values
+                (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
+            """
+            try:
+                self.db.save(
+                    query=insert_sql,
+                    params=(
+                        "video{}".format(functions.str_to_md5(video_id)),
+                        title,
+                        account_id,
+                        account_name,
+                        read_cnt,
+                        like_cnt,
+                        video_url,
+                        cover_url,
+                        oss_path,
+                        publish_timestamp,
+                        int(time.time()),
+                        video_id,
+                        video_category,
+                        (
+                            json.dumps(manual_tags, ensure_ascii=False)
+                            if manual_tags
+                            else None
+                        ),
+                        "hksp",
+                        const.NO_SOURCE_ACCOUNT_STATUS,
+                    ),
+                )
+                self.success_crawler_video_count += 1
+            except Exception as e:
+                log(
+                    task="baidu_video_crawler",
+                    function="save_each_video",
+                    message="save video failed",
+                    data={
+                        "error": str(e),
+                        "traceback": traceback.format_exc(),
+                        "video_id": video_id,
+                        "oss_path": oss_path,
+                    },
+                )
+        else:
+            print(f"download video failed, video_id: {video_id}")
+
+    def save_video_list(
+        self, account_id: str, account_name: str, video_list: List[Dict]
+    ) -> None:
+        """
+        save video list
+        """
+        progress_bar = tqdm(video_list, desc="crawler account: {}".format(account_name))
+        for video_obj in progress_bar:
+            if video_obj["type"] == "video":
+                video_id = video_obj["content"]["vid"]
+                try:
+                    video_detail = baidu_single_video_crawler(video_id)
+                    self.save_each_video(
+                        video=video_detail,
+                        account_id=account_id,
+                        account_name=account_name,
+                    )
+                    progress_bar.set_postfix({"videoId": video_id})
+                except SpiderError as e:
+                    print("save single video fail", e)
+                    continue
+            else:
+                continue
+
+    def crawler_each_account(self, account: Dict, cursor=None) -> None:
+        """
+        crawler each account
+        response_strategy
+        """
+        account_id = account["account_id"]
+        max_cursor = account["max_cursor"]
+        if not max_cursor:
+            max_cursor = const.DEFAULT_CURSOR
+        account_name = account["account_name"]
+        try:
+            response_json = baidu_account_video_crawler(account_id, cursor=cursor)
+
+            video_list = response_json.get("results", empty_list)
+            if video_list:
+                self.save_video_list(
+                    account_id=account_id,
+                    account_name=account_name,
+                    video_list=video_list,
+                )
+            # check next page
+            has_next_page = response_json.get("has_more", False)
+            if has_next_page:
+                next_cursor = response_json.get("ctime", const.DEFAULT_CURSOR)
+                if next_cursor < max_cursor:
+                    print("No more videos after 2024-01-01")
+                    return
+                else:
+                    return self.crawler_each_account(account, next_cursor)
+        except SpiderError as e:
+            print(e)
+            return
+
+    def update_cursor(self, account_id: str) -> None:
+        """
+        update cursor for each account
+        """
+        select_sql = f"""
+            select max(publish_timestamp) as max_cursor from publish_single_video_source where out_account_id = '{account_id}';
+        """
+        response_mysql = self.db.fetch(query=select_sql)
+        max_publish_timestamp = response_mysql[0][0]
+        if max_publish_timestamp:
+            max_cursor = max_publish_timestamp * const.TIMESTAMP_TO_CURSOR
+            update_sql = f"""
+                update baidu_account_for_videos
+                set max_cursor = %s
+                where account_id = %s;
+            """
+            self.db.save(query=update_sql, params=(max_cursor, account_id))
+
+    def deal(self) -> None:
+        """
+        deal
+        """
+        account_list = self.get_account_list()
+        success_cnt = 0
+        fail_cnt = 0
+        account_list_process_bar = tqdm(account_list, desc="process account list")
+        for account in account_list_process_bar:
+            try:
+                account_list_process_bar.set_postfix(
+                    {"account_name": account["account_name"]}
+                )
+                self.crawler_each_account(account)
+                self.update_cursor(account["account_id"])
+                success_cnt += 1
+            except Exception as e:
+                fail_cnt += 1
+                log(
+                    task="baidu_video_crawler",
+                    function="deal",
+                    message="crawler each account failed",
+                    data={
+                        "account_id": account["account_id"],
+                        "account_name": account["account_name"],
+                        "error": str(e),
+                        "trace_back": traceback.format_exc(),
+                    },
+                )
+        bot(
+            title="baidu video crawler task finished",
+            detail={
+                "success_crawl_account_num": success_cnt,
+                "fail_crawl_account_num": fail_cnt,
+                "success_crawl_video_num": self.success_crawler_video_count,
+                "success_crawl_account_rate": success_cnt / (success_cnt + fail_cnt),
+            },
+            mention=False,
+        )

+ 4 - 0
coldStartTasks/crawler/wechat/__init__.py

@@ -0,0 +1,4 @@
+"""
+@author: luojunhui
+"""
+from .article_association import ArticleAssociationCrawler

+ 210 - 0
coldStartTasks/crawler/wechat/article_association.py

@@ -0,0 +1,210 @@
+"""
+@author: luojunhui
+"""
+
+import time
+import traceback
+from datetime import datetime
+
+import numpy as np
+
+from pymysql.cursors import DictCursor
+from tqdm import tqdm
+
+
+from applications import WeixinSpider, log
+from applications.api import similarity_between_title_list
+from applications.const import ColdStartTaskConst
+from applications.db import DatabaseConnector
+from applications.functions import Functions
+from applications.utils import get_inner_account_set
+from applications.utils import whether_title_sensitive
+from config import long_articles_config
+
+spider = WeixinSpider()
+functions = Functions()
+const = ColdStartTaskConst()
+
+
+class ArticleAssociationCrawler(object):
+    """
+    article association crawler task
+    """
+
+    def __init__(self):
+        self.db_client = DatabaseConnector(db_config=long_articles_config)
+        self.db_client.connect()
+        self.inner_account_set = get_inner_account_set()
+
+    def get_seed_url_list(self, biz_date):
+        """
+        获取种子url列表
+        """
+        sql = f"""
+            select gh_id, title, link
+            from datastat_sort_strategy
+            where date_str > DATE_FORMAT(DATE_SUB('{biz_date}', INTERVAL 2 DAY), '%Y%m%d') 
+                and view_count > {const.READ_COUNT_THRESHOLD} 
+                and read_rate > {const.READ_AVG_THRESHOLD} 
+                and type = {const.BULK_PUBLISH_TYPE}
+            order by read_rate desc 
+            limit {const.SEED_ARTICLE_LIMIT_NUM};
+        """
+        seed_article_list = self.db_client.fetch(query=sql, cursor_type=DictCursor)
+        return seed_article_list
+
+    def get_level_up_title_list(self):
+        """
+        获取晋级文章标题列表
+        status: 1 表示文章已经溯源完成
+        deleted: 0 表示文章正常
+        level = 'autoArticlePoolLevel1' 表示头条
+        """
+        sql = f"""
+            select distinct title 
+            from article_pool_promotion_source 
+            where level = 'autoArticlePoolLevel1' and status = 1 and deleted = 0;
+        """
+        mysql_response = self.db_client.fetch(query=sql)
+        title_list = [i[0] for i in mysql_response]
+        return title_list
+
+    def get_recommend_url_list_with_depth(
+        self, seed_url, source_title, source_account, base_title_list, depth=1
+    ):
+        """
+        @param seed_url: good url from data_sort_strategy
+        @param depth: association depth
+        @param source_title: article title
+        @param source_account: article account
+        """
+        if depth > const.ARTICLE_ASSOCIATION_MAX_DEPTH:
+            return
+
+        res = spider.get_recommend_articles(content_link=seed_url)
+        related_articles = res["data"]["data"]["list"]
+        if related_articles:
+            title_list = [i["title"] for i in related_articles]
+            similarity_array = similarity_between_title_list(
+                title_list, base_title_list
+            )
+
+            recommend_articles = []
+            for index, score_list in enumerate(similarity_array):
+                sorted_score_list = sorted(score_list)
+                percent_threshold_score = np.percentile(
+                    sorted_score_list, const.PERCENT_THRESHOLD
+                )
+                if percent_threshold_score < const.CORRELATION_THRESHOLD:
+                    continue
+
+                else:
+                    article_obj = related_articles[index]
+                    article_obj["score"] = percent_threshold_score
+                    recommend_articles.append(article_obj)
+
+            recommend_process_bar = tqdm(
+                recommend_articles, desc="save recommend articles"
+            )
+            for article in recommend_process_bar:
+                obj = {
+                    "title": article["title"],
+                    "url": article["url"],
+                    "gh_id": article["username"],
+                    "index": article["idx"],
+                    "send_time": article["send_time"],
+                    "read_cnt": article["read_num"],
+                    "depth": depth,
+                    "source_article_title": source_title,
+                    "source_account": source_account,
+                }
+                self.insert_recommend_article(obj)
+                recommend_process_bar.set_postfix(
+                    {"title": article["title"], "depth": depth}
+                )
+                self.get_recommend_url_list_with_depth(
+                    seed_url=obj["url"],
+                    source_title=obj["title"],
+                    source_account=obj["gh_id"],
+                    base_title_list=base_title_list,
+                    depth=depth + 1,
+                )
+        else:
+            return
+
+    def insert_recommend_article(self, obj):
+        """
+        insert recommend article
+        """
+        # whether account inside
+        if obj["gh_id"] in self.inner_account_set:
+            return
+
+        # whether article title exists
+        title = obj["title"]
+        select_sql = "select article_id from crawler_meta_article where title = %s;"
+        res = self.db_client.fetch(query=select_sql, params=(title,))
+        if res:
+            return
+
+        # whether title sensitive
+        title_sensitivity = (
+            const.TITLE_SENSITIVE
+            if whether_title_sensitive(title)
+            else const.TITLE_NOT_SENSITIVE
+        )
+
+        # insert this article
+        insert_sql = f"""
+            insert into crawler_meta_article 
+            (platform, mode, category, out_account_id, article_index, title, link, read_cnt, publish_time, crawler_time, status, unique_index, source_article_title, source_account, title_sensitivity)
+            values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
+        """
+        self.db_client.save(
+            query=insert_sql,
+            params=(
+                "weixin",
+                "recommend",
+                "article_association",
+                obj["gh_id"],
+                obj["index"],
+                obj["title"],
+                obj["url"],
+                obj["read_cnt"],
+                obj["send_time"],
+                int(time.time()),
+                const.DEFAULT_ARTICLE_STATUS,
+                functions.generateGzhId(obj["url"]),
+                obj["source_article_title"],
+                obj["source_account"],
+                title_sensitivity,
+            ),
+        )
+
+    def deal(self, biz_date=None):
+        """
+        class entrance
+        :param biz_date:
+        """
+        if biz_date is None:
+            biz_date = datetime.today().strftime("%Y-%m-%d")
+
+        seed_article_list = self.get_seed_url_list(biz_date)
+        deal_bar = tqdm(seed_article_list, desc="article association crawler")
+        base_title_list = self.get_level_up_title_list()
+        for article in deal_bar:
+            try:
+                self.get_recommend_url_list_with_depth(
+                    seed_url=article["link"],
+                    source_title=article["title"],
+                    source_account=article["gh_id"],
+                    base_title_list=base_title_list,
+                )
+                deal_bar.set_postfix({"article_title": article["title"]})
+            except Exception as e:
+                log(
+                    task="article_association_crawler",
+                    function="deal",
+                    message=f"article association crawler error, article title: {article['title']}, error: {e}",
+                    data={"article": article, "traceback": traceback.format_exc()},
+                )

+ 1 - 25
coldStartTasks/crawler/weixinCategoryCrawler.py

@@ -8,7 +8,7 @@ import time
 from tqdm import tqdm
 from pymysql.cursors import DictCursor
 
-from applications import WeixinSpider, Functions, llm_sensitivity, log
+from applications import WeixinSpider, Functions, log
 from coldStartTasks.filter import article_crawler_duplicate_filter
 from config import apolloConfig
 
@@ -158,18 +158,6 @@ class weixinCategory(object):
                     print(e)
         return success_records
 
-    def update_article_sensitive_status(self, category, unique_index, status):
-        """
-        更新文章敏感状态
-        :return:
-        """
-        update_sql = f"""
-            update crawler_meta_article
-            set llm_sensitivity = %s
-            where category = %s and unique_index = %s;
-        """
-        self.db_client_lam.update(sql=update_sql, params=(status, category, unique_index))
-
     def update_latest_account_timestamp(self, gh_id):
         """
         更新账号的最新时间戳
@@ -242,18 +230,6 @@ class weixinCategory(object):
                 print("success")
             except Exception as e:
                 print("fail because of {}".format(e))
-        success_titles = [x['title'] for x in success_records]
-        if success_titles:
-            try:
-                sensitive_results = llm_sensitivity.check_titles(success_titles)
-                for record, sensitive_result in zip(success_records, sensitive_results):
-                    self.update_article_sensitive_status(
-                        category=category,
-                        unique_index=record['unique_index'],
-                        status=sensitive_result['hit_rule']
-                    )
-            except Exception as e:
-                print("failed to update sensitive status: {}".format(e))
 
     def deal(self, category_list, date_str):
         """

+ 276 - 0
coldStartTasks/publish/basic.py

@@ -0,0 +1,276 @@
+"""
+@author: luojunhui
+"""
+
+import json
+import time
+import datetime
+import pandas as pd
+import traceback
+
+from pandas import DataFrame
+from tqdm import tqdm
+
+from applications import log, aiditApi, bot
+from applications.const import ColdStartTaskConst
+from config import apolloConfig
+
+const = ColdStartTaskConst()
+config = apolloConfig()
+
+category_cold_start_threshold = json.loads(
+    config.getConfigValue("category_cold_start_threshold")
+)
+READ_TIMES_THRESHOLD = category_cold_start_threshold.get("READ_TIMES_THRESHOLD", 1.3)
+READ_THRESHOLD = category_cold_start_threshold.get("READ_THRESHOLD", 5000)
+LIMIT_TITLE_LENGTH = category_cold_start_threshold.get("LIMIT_TITLE_LENGTH", 15)
+TITLE_LENGTH_MAX = category_cold_start_threshold.get("TITLE_LENGTH_MAX", 50)
+
+
+def get_article_from_meta_table(db_client, category: str, platform: str) -> DataFrame:
+    """
+    get article from meta data
+    :param db_client: database connector
+    :param category: article category
+    :param platform: article platform
+    :return: article dataframe
+    """
+    sql = f"""
+        select 
+            article_id, out_account_id, article_index, title, link, read_cnt, status, llm_sensitivity, score
+        from crawler_meta_article
+        where category = "{category}" and platform = "{platform}" and title_sensitivity = {const.TITLE_NOT_SENSITIVE}
+        order by score desc;
+    """
+    article_list = db_client.fetch(sql)
+    log(
+        task="category_publish_task",
+        function="get_articles_from_meta_table",
+        message="获取品类文章总数",
+        data={"total_articles": len(article_list), "category": category},
+    )
+    article_df = pd.DataFrame(
+        article_list,
+        columns=[
+            "article_id",
+            "gh_id",
+            "position",
+            "title",
+            "link",
+            "read_cnt",
+            "status",
+            "llm_sensitivity",
+            "score",
+        ],
+    )
+    return article_df
+
+
+def update_published_articles_status(db_client) -> None:
+    """
+    filter published articles
+    """
+    category_map = json.loads(config.getConfigValue("category_cold_start_map"))
+    category_list = list(category_map.keys())
+    processing_bar = tqdm(category_list, desc="update_published_articles")
+    for category in processing_bar:
+        plan_id = category_map.get(category)
+        if plan_id:
+            article_list = aiditApi.get_generated_article_list(plan_id)
+            title_list = [i[1] for i in article_list]
+            if title_list:
+                update_sql = f"""
+                        update crawler_meta_article
+                        set status = %s 
+                        where title in %s and status = %s;
+                """
+                affected_rows = db_client.save(
+                    query=update_sql,
+                    params=(
+                        const.PUBLISHED_STATUS,
+                        tuple(title_list),
+                        const.INIT_STATUS,
+                    ),
+                )
+                processing_bar.set_postfix(
+                    {"category": category, "affected_rows": affected_rows}
+                )
+        else:
+            return
+
+
+def filter_by_read_times(article_df: DataFrame) -> DataFrame:
+    """
+    filter by read times
+    """
+    article_df["average_read"] = article_df.groupby(["gh_id", "position"])[
+        "read_cnt"
+    ].transform("mean")
+    article_df["read_times"] = article_df["read_cnt"] / article_df["average_read"]
+    filter_df = article_df[article_df["read_times"] >= READ_TIMES_THRESHOLD]
+    return filter_df
+
+
+def filter_by_status(article_df: DataFrame) -> DataFrame:
+    """
+    filter by status
+    """
+    filter_df = article_df[article_df["status"] == const.INIT_STATUS]
+    return filter_df
+
+
+def filter_by_read_cnt(article_df: DataFrame) -> DataFrame:
+    """
+    filter by read cnt
+    """
+    filter_df = article_df[article_df["read_cnt"] >= READ_THRESHOLD]
+    return filter_df
+
+
+def filter_by_title_length(article_df: DataFrame) -> DataFrame:
+    """
+    filter by title length
+    """
+    filter_df = article_df[
+        (article_df["title"].str.len() >= LIMIT_TITLE_LENGTH)
+        & (article_df["title"].str.len() <= TITLE_LENGTH_MAX)
+    ]
+    return filter_df
+
+
+def filter_by_sensitive_words(article_df: DataFrame) -> DataFrame:
+    """
+    filter by sensitive words
+    """
+    filter_df = article_df[
+        (~article_df["title"].str.contains("农历"))
+        & (~article_df["title"].str.contains("太极"))
+        & (~article_df["title"].str.contains("节"))
+        & (~article_df["title"].str.contains("早上好"))
+        & (~article_df["title"].str.contains("赖清德"))
+        & (~article_df["title"].str.contains("普京"))
+        & (~article_df["title"].str.contains("俄"))
+        & (~article_df["title"].str.contains("南海"))
+        & (~article_df["title"].str.contains("台海"))
+        & (~article_df["title"].str.contains("解放军"))
+        & (~article_df["title"].str.contains("蔡英文"))
+        & (~article_df["title"].str.contains("中国"))
+    ]
+    return filter_df
+
+
+def filter_by_similarity_score(article_df: DataFrame, score) -> DataFrame:
+    """
+    filter by similarity score
+    """
+    filter_df = article_df[article_df["score"] >= score]
+    return filter_df
+
+
+def insert_into_article_crawler_plan(
+    db_client, crawler_plan_id, crawler_plan_name, create_timestamp
+):
+    """
+    insert into article crawler plan
+    """
+    insert_sql = f"""
+        insert into article_crawler_plan (crawler_plan_id, name, create_timestamp)
+        values (%s, %s, %s);
+    """
+    try:
+        db_client.save(
+            query=insert_sql,
+            params=(crawler_plan_id, crawler_plan_name, create_timestamp),
+        )
+    except Exception as e:
+        bot(
+            title="品类冷启任务,记录抓取计划id失败",
+            detail={
+                "error": str(e),
+                "error_msg": traceback.format_exc(),
+                "crawler_plan_id": crawler_plan_id,
+                "crawler_plan_name": crawler_plan_name,
+            },
+        )
+
+
+def create_crawler_plan(url_list, plan_tag, platform) -> tuple:
+    """
+    create crawler plan
+    """
+    crawler_plan_response = aiditApi.auto_create_crawler_task(
+        plan_id=None,
+        plan_name="自动绑定-{}--{}--{}".format(
+            plan_tag, datetime.date.today().__str__(), len(url_list)
+        ),
+        plan_tag=plan_tag,
+        article_source=platform,
+        url_list=url_list,
+    )
+    log(
+        task="category_publish_task",
+        function="publish_filter_articles",
+        message="成功创建抓取计划",
+        data=crawler_plan_response,
+    )
+    # save to db
+    create_timestamp = int(time.time()) * 1000
+    crawler_plan_id = crawler_plan_response["data"]["id"]
+    crawler_plan_name = crawler_plan_response["data"]["name"]
+    return crawler_plan_id, crawler_plan_name, create_timestamp
+
+
+def bind_to_generate_plan(category, crawler_plan_id, crawler_plan_name, platform):
+    """
+    auto bind to generate plan
+    """
+    match platform:
+        case "weixin":
+            input_source_channel = 5
+        case "toutiao":
+            input_source_channel = 6
+        case _:
+            input_source_channel = 5
+
+    new_crawler_task_list = [
+        {
+            "contentType": 1,
+            "inputSourceType": 2,
+            "inputSourceSubType": None,
+            "fieldName": None,
+            "inputSourceValue": crawler_plan_id,
+            "inputSourceLabel": crawler_plan_name,
+            "inputSourceModal": 3,
+            "inputSourceChannel": input_source_channel,
+        }
+    ]
+    category_map = json.loads(config.getConfigValue("category_cold_start_map"))
+    generate_plan_response = aiditApi.bind_crawler_task_to_generate_task(
+        crawler_task_list=new_crawler_task_list, generate_task_id=category_map[category]
+    )
+    log(
+        task="category_publish_task",
+        function="publish_filter_articles",
+        message="成功绑定到生成计划",
+        data=generate_plan_response,
+    )
+
+
+def update_article_status_after_publishing(db_client, article_id_list):
+    """
+    update article status after publishing
+    """
+    update_sql = f"""
+        update crawler_meta_article
+        set status = %s
+        where article_id in %s and status = %s;
+    """
+    affect_rows = db_client.save(
+        query=update_sql,
+        params=(const.PUBLISHED_STATUS, tuple(article_id_list), const.INIT_STATUS),
+    )
+    if affect_rows != len(article_id_list):
+        bot(
+            title="品类冷启任务中,出现更新状文章状态失败异常",
+            detail={"affected_rows": affect_rows, "task_rows": len(article_id_list)},
+        )

+ 29 - 1
coldStartTasks/publish/publishCategoryArticles.py

@@ -9,7 +9,7 @@ import traceback
 
 from pandas import DataFrame
 
-from applications import aiditApi, log, bot
+from applications import aiditApi, log, bot, llm_sensitivity
 from config import apolloConfig
 
 apollo = apolloConfig()
@@ -295,6 +295,18 @@ class CategoryColdStartTask(object):
         )
         return zero_level_funnel_df
 
+    def update_article_sensitive_status(self, article_id, status):
+        """
+        更新文章敏感状态
+        :return:
+        """
+        update_sql = f"""
+            update crawler_meta_article
+            set llm_sensitivity = %s
+            where article_id = %s;
+        """
+        self.db_client.update(sql=update_sql, params=(status, article_id))
+
     def publish_filter_articles(self, category, articles_df, article_source):
         """
         过滤文章
@@ -313,6 +325,22 @@ class CategoryColdStartTask(object):
             case _:
                 return
 
+        success_titles = filtered_articles_df['title'].values.tolist()
+        article_id_list = filtered_articles_df['article_id'].values.tolist()
+        if success_titles:
+            try:
+                sensitive_results = llm_sensitivity.check_titles(success_titles)
+                for article_id, sensitive_result in zip(article_id_list, sensitive_results):
+                    self.update_article_sensitive_status(
+                        article_id=article_id,
+                        status=sensitive_result['hit_rule']
+                    )
+                    if sensitive_result['hit_rule'] > TITLE_NOT_SENSITIVE:
+                        filtered_articles_df = filtered_articles_df[filtered_articles_df['article_id'] != article_id]
+
+            except Exception as e:
+                print("failed to update sensitive status: {}".format(e))
+
         url_list = filtered_articles_df['link'].values.tolist()
         if url_list:
             # create_crawler_plan

+ 125 - 0
coldStartTasks/publish/publish_article_association_articles.py

@@ -0,0 +1,125 @@
+"""
+@author: luojunhui
+"""
+
+from pandas import DataFrame
+
+from applications import bot
+from applications.const import ColdStartTaskConst
+from applications.db import DatabaseConnector
+from config import long_articles_config
+
+from coldStartTasks.publish.basic import filter_by_status
+from coldStartTasks.publish.basic import filter_by_sensitive_words
+from coldStartTasks.publish.basic import filter_by_title_length
+from coldStartTasks.publish.basic import update_published_articles_status
+from coldStartTasks.publish.basic import get_article_from_meta_table
+from coldStartTasks.publish.basic import update_article_status_after_publishing
+from coldStartTasks.publish.basic import create_crawler_plan
+from coldStartTasks.publish.basic import insert_into_article_crawler_plan
+from coldStartTasks.publish.basic import bind_to_generate_plan
+
+const = ColdStartTaskConst()
+
+
+def filter_articles_before_create_plan(article_df: DataFrame) -> DataFrame:
+    """
+    filter articles before create plan
+    """
+    total_length = article_df.shape[0]
+
+    # filter by status
+    filter_df = filter_by_status(article_df)
+    filter_length0 = filter_df.shape[0]
+
+    # filter by sensitive words
+    filter_df = filter_by_sensitive_words(filter_df)
+    filter_length1 = filter_df.shape[0]
+
+    # filter by title length
+    filter_df = filter_by_title_length(filter_df)
+    filter_length2 = filter_df.shape[0]
+
+    bot(
+        title="文章联想任务,开始创建抓取计划",
+        detail={
+            "文章总数": total_length,
+            "发布状态过滤": "过滤: {}, 剩余: {}".format(
+                total_length - filter_length0, filter_length0
+            ),
+            "敏感词过滤": "过滤: {}, 剩余: {}".format(
+                filter_length0 - filter_length1, filter_length1
+            ),
+            "标题长度过滤": "过滤: {}, 剩余: {}".format(
+                filter_length1 - filter_length2, filter_length2
+            ),
+        },
+        mention=False,
+    )
+
+    return filter_df
+
+
+class ArticleAssociationPublish(object):
+    """
+    publish i2i articles
+    """
+
+    def __init__(self):
+        self.db_client = DatabaseConnector(db_config=long_articles_config)
+        self.db_client.connect()
+
+    def deal(self):
+        """
+        class entrance
+        """
+        # update published articles
+        update_published_articles_status(db_client=self.db_client)
+
+        # get data from meta table
+        article_dataframe = get_article_from_meta_table(
+            db_client=self.db_client, category="article_association", platform="weixin"
+        )
+
+        # fileter articles
+        filter_dataframe = filter_articles_before_create_plan(article_dataframe)
+
+        # create crawler plan
+        url_list = filter_dataframe["link"].values.tolist()
+        if url_list:
+            crawler_plan_id, crawler_plan_name, create_timestamp = create_crawler_plan(
+                url_list=url_list, plan_tag="article_association", platform="weixin"
+            )
+
+            # insert crawler plan
+            insert_into_article_crawler_plan(
+                db_client=self.db_client,
+                crawler_plan_id=crawler_plan_id,
+                crawler_plan_name=crawler_plan_name,
+                create_timestamp=create_timestamp,
+            )
+
+            # bind to generate plan
+            bind_to_generate_plan(
+                category="article_association",
+                crawler_plan_id=crawler_plan_id,
+                crawler_plan_name=crawler_plan_name,
+                platform="weixin",
+            )
+
+            # update status
+            article_id_list = filter_dataframe["article_id"].values.tolist()
+            update_article_status_after_publishing(
+                db_client=self.db_client, article_id_list=article_id_list
+            )
+
+            bot(
+                title="文章联想任务,创建抓取计划成功",
+                detail={
+                    "抓取计划id": crawler_plan_id,
+                    "抓取计划名称": crawler_plan_name,
+                    "抓取条数": len(url_list),
+                    "冷启动类型": "article_association",
+                },
+                mention=False,
+            )

+ 1 - 1
requirements.txt

@@ -21,4 +21,4 @@ openai~=1.17.0
 oss2~=2.19.1
 fake-useragent~=1.5.1
 playwright~=1.49.1
-tenacity~=9.0.0
+volcengine-python-sdk[ark]

+ 8 - 0
run_baidu_video_crawler.py

@@ -0,0 +1,8 @@
+"""
+@author: luojunhui
+"""
+from coldStartTasks.crawler.baidu import BaiduVideoCrawler
+
+if __name__ == '__main__':
+    task = BaiduVideoCrawler()
+    task.deal()

+ 26 - 0
sh/run_baidu_video_crawler.sh

@@ -0,0 +1,26 @@
+#!/bin/bash
+
+# 获取当前日期,格式为 YYYY-MM-DD
+CURRENT_DATE=$(date +%F)
+
+# 日志文件路径,包含日期
+LOG_FILE="/root/luojunhui/logs/baidu_video_crawler_log_$CURRENT_DATE.txt"
+
+# 重定向整个脚本的输出到带日期的日志文件
+exec >> "$LOG_FILE" 2>&1
+if pgrep -f "python3 run_baidu_video_crawler.py" > /dev/null
+then
+    echo "$(date '+%Y-%m-%d %H:%M:%S') - run_baidu_video_crawler.py is running"
+else
+    echo "$(date '+%Y-%m-%d %H:%M:%S') - trying to restart run_baidu_video_crawler.py"
+    # 切换到指定目录
+    cd /root/luojunhui/LongArticlesJob
+
+    # 激活 Conda 环境
+    source /root/miniconda3/etc/profile.d/conda.sh
+    conda activate tasks
+
+    # 在后台运行 Python 脚本并重定向日志输出
+    nohup python3 run_baidu_video_crawler.py >> "${LOG_FILE}" 2>&1 &
+    echo "$(date '+%Y-%m-%d %H:%M:%S') - successfully restarted run_baidu_video_crawler.py"
+fi