luojunhui 1 周之前
父节点
当前提交
b075d98b43

+ 88 - 0
long_articles_job.py

@@ -0,0 +1,88 @@
+from argparse import ArgumentParser
+
+from tasks.crawler_tasks.crawler_video.crawler_piaoquan_videos import (
+    CrawlerPiaoQuanVideos,
+)
+from tasks.crawler_tasks.crawler_video.crawler_sohu_videos import CrawlerSohuHotVideos
+from tasks.crawler_tasks.crawler_video.crawler_sohu_videos import (
+    CrawlerSohuRecommendVideos,
+)
+from tasks.crawler_tasks.crawler_video.crawler_sph_videos import (
+    CrawlerChannelAccountVideos,
+)
+from tasks.data_tasks.fwh_data_recycle import FwhGroupPublishRecordManager
+from tasks.data_tasks.fwh_data_recycle import SaveFwhDataToDatabase
+from tasks.monitor_tasks.kimi_balance_monitor import check_kimi_balance
+from tasks.publish_tasks.top_article_generalize import (
+    TopArticleGeneralizeFromArticlePool,
+)
+
+
+def run_piaoquan_video_crawler():
+    crawler = CrawlerPiaoQuanVideos()
+    crawler.deal()
+
+
+def run_sohu_video_crawler():
+    # step1, crawl sohu hot videos
+    crawler_sohu_hot_videos = CrawlerSohuHotVideos()
+    crawler_sohu_hot_videos.deal()
+
+    # step2, crawl sohu recommend videos
+    crawler_sohu_recommend_videos = CrawlerSohuRecommendVideos()
+    crawler_sohu_recommend_videos.deal()
+
+
+def run_sph_video_crawler():
+    crawler_channel_account_videos = CrawlerChannelAccountVideos()
+    crawler_channel_account_videos.deal()
+
+
+def run_fwh_data_manager():
+    fwh_group_publish_record_manager = FwhGroupPublishRecordManager()
+    fwh_group_publish_record_manager.deal()
+    fwh_group_publish_record_manager.monitor()
+
+    # 2. 保存数据到数据库
+    save_fwh_data_to_database = SaveFwhDataToDatabase()
+    save_fwh_data_to_database.deal()
+
+
+def run_top_article_generalize_from_article_pool():
+    TopArticleGeneralizeFromArticlePool().deal()
+
+
+def main():
+    """
+    run long_articles_job
+    """
+    parser = ArgumentParser()
+    parser.add_argument("--task_name", help="which task you want to run")
+    parser.add_argument("--run_date", help="task specify run date")
+
+    args = parser.parse_args()
+
+    task_name = args.task_name
+    if task_name is None:
+        print("task_name cannot be None")
+        return
+    else:
+        match task_name:
+            case "run_piaoquan_video_crawler":
+                run_piaoquan_video_crawler()
+            case "run_sohu_video_crawler":
+                run_sohu_video_crawler()
+            case "run_check_kimi_balance":
+                check_kimi_balance()
+            case "run_fwh_data_manager":
+                run_fwh_data_manager()
+            case "run_sph_video_crawler":
+                run_sph_video_crawler()
+            case "top":
+                run_top_article_generalize_from_article_pool()
+            case _:
+                print("task_name cannot be None")
+
+
+if __name__ == "__main__":
+    main()

+ 1 - 1
run_update_articles_from_aigc_system.py

@@ -1,7 +1,7 @@
 """
 @author: luojunhui
 """
-from tasks.update_article_info_from_aigc import UpdateArticleInfoFromAIGC
+from tasks.data_tasks.update_article_info_from_aigc import UpdateArticleInfoFromAIGC
 
 
 if __name__ == "__main__":

+ 0 - 0
tasks/update_article_info_from_aigc.py → tasks/data_tasks/update_article_info_from_aigc.py


+ 52 - 0
tasks/monitor_tasks/kimi_balance_monitor.py

@@ -0,0 +1,52 @@
+"""
+@author: luojunhui
+"""
+import requests
+import traceback
+
+from applications import bot
+from applications.decoratorApi import retryOnTimeout
+
+BALANCE_LIMIT_THRESHOLD = 200.0
+
+
+@retryOnTimeout(retries=5, delay=5)
+def check_kimi_balance():
+    """
+    校验kimi余额
+    :return:
+    """
+    url = "https://api.moonshot.cn/v1/users/me/balance"
+
+    payload = {}
+    headers = {
+        'Authorization': 'Bearer sk-5DqYCa88kche6nwIWjLE1p4oMm8nXrR9kQMKbBolNAWERu7q'
+    }
+    response = requests.request("GET", url, headers=headers, data=payload, timeout=10)
+    if response.status_code == 200:
+        response_json = response.json()
+        try:
+            balance = response_json['data']['available_balance']
+            if balance < BALANCE_LIMIT_THRESHOLD:
+                bot(
+                    title="kimi余额小于 {} 块".format(BALANCE_LIMIT_THRESHOLD),
+                    detail={
+                        "balance": balance
+                    }
+                )
+        except Exception as e:
+            error_stack = traceback.format_exc()
+            bot(
+                title="kimi余额接口处理失败,数据结构异常",
+                detail={
+                    "error": str(e),
+                    "error_msg": error_stack
+                }
+            )
+    else:
+        bot(
+            title="kimi余额接口调用失败",
+            detail={
+                "response": response.text
+            }
+        )

+ 200 - 0
tasks/publish_tasks/top_article_generalize.py

@@ -0,0 +1,200 @@
+import time
+import datetime
+from typing import List, Dict
+
+from pymysql.cursors import DictCursor
+
+from applications import aiditApi
+from applications.api import fetch_deepseek_completion
+from applications.api import similarity_between_title_list
+from applications.db import DatabaseConnector
+from config import long_articles_config, denet_config
+
+extract_keywords_prompt = """
+你是一名优秀的中文专家
+## 任务说明
+需要你从输入的标题和总结中提取3个搜索词
+### 输出
+输出结构为JSON,格式如下
+{output_format}
+## 输入
+标题:{title}
+总结:{summary}
+"""
+
+
+class TopArticleGeneralize:
+
+    def __init__(self):
+        self.long_articles_client = DatabaseConnector(long_articles_config)
+        self.long_articles_client.connect()
+
+        self.denet_client = DatabaseConnector(denet_config)
+        self.denet_client.connect()
+
+    def fetch_distinct_top_titles(self) -> List[Dict]:
+        """
+        获取top100生成计划中的文章标题
+        """
+        fetch_query = f"""
+            select distinct title, source_id
+            from datastat_sort_strategy
+            where produce_plan_name = 'TOP100' and source_id is not null;
+        """
+        return self.long_articles_client.fetch(fetch_query, cursor_type=DictCursor)
+
+    def get_title_read_info_detail(self, title: str) -> bool:
+        """
+        获取标题最近3篇文章的阅读均值倍数
+        """
+        fetch_query = f"""
+            select read_rate
+            from datastat_sort_strategy
+            where produce_plan_name = 'TOP100' and title = '{title}'
+            order by date_str desc limit 3;
+        """
+        fetch_response = self.long_articles_client.fetch(
+            fetch_query, cursor_type=DictCursor
+        )
+        read_rate_list = [i["read_rate"] for i in fetch_response]
+        for read_rate in read_rate_list:
+            if read_rate < 1.2:
+                return False
+        return True
+
+    def get_article_summary(self, source_id: str) -> str:
+        """
+        use source_id to get article summary
+        """
+        fetch_query = f"""
+            select output 
+            from produce_plan_module_output 
+            where plan_exe_id = '{source_id}' and produce_module_type = 18;
+        """
+        fetch_response = self.denet_client.fetch(fetch_query, cursor_type=DictCursor)
+        return fetch_response[0]["output"]
+
+    def get_keys_by_ai(self, title_obj: Dict) -> List[str]:
+        """
+        获取关键词
+        """
+        title = title_obj["title"]
+        source_id = title_obj["source_id"]
+        article_summary = self.get_article_summary(source_id)
+        output_format = {"keys": ["key1", "key2", "key3"]}
+        prompt = extract_keywords_prompt.format(
+            output_format=output_format, title=title, summary=article_summary
+        )
+        response = fetch_deepseek_completion(
+            model="deepseek-V3", prompt=prompt, output_type="json"
+        )
+        return response["keys"]
+
+
+class TopArticleGeneralizeFromArticlePool(TopArticleGeneralize):
+
+    def get_candidate_articles(self, key):
+        fetch_query = f"""
+            select article_id, title, link, llm_sensitivity, score, category_by_ai
+            from crawler_meta_article
+            where status = 1
+              and title_sensitivity = 0
+              and title like '%{key}%';
+        """
+        fetch_response = self.long_articles_client.fetch(
+            fetch_query, cursor_type=DictCursor
+        )
+        return fetch_response
+
+    def change_article_status_while_publishing(self, article_id_list):
+        """
+
+        :param: article_id_list: 文章的唯一 id
+        :return:
+        """
+        update_sql = f"""
+                    update crawler_meta_article
+                    set status = %s
+                    where article_id in %s and status = %s;
+        """
+        affect_rows = self.long_articles_client.save(
+            query=update_sql, params=(2, tuple(article_id_list), 1)
+        )
+
+    def deal(self):
+        title_obj_list = self.fetch_distinct_top_titles()
+        publishing_article_list = []
+        for title_obj in title_obj_list:
+            if (
+                title_obj["title"]
+                == "母亲去世136天后,女子回到家,在锅盖上留下一句话,瞬间泪崩!"
+            ):
+                if self.get_title_read_info_detail(title_obj["title"]):
+
+                    temp = []
+                    keys = self.get_keys_by_ai(title_obj)
+                    for key in keys:
+                        candidate_articles = self.get_candidate_articles(key)
+                        temp += candidate_articles
+
+                    if temp:
+                        print(title_obj["title"])
+                        title_list = [i["title"] for i in temp]
+                        # 相关性排序
+                        similarity_array = similarity_between_title_list(
+                            title_list, [title_obj["title"]]
+                        )
+                        print(similarity_array)
+                        print(title_list)
+                        response_with_similarity_list = []
+                        for index, item in enumerate(temp):
+                            item["similarity"] = similarity_array[index][0]
+                            response_with_similarity_list.append(item)
+
+                        sorted_response_with_similarity_list = sorted(
+                            response_with_similarity_list,
+                            key=lambda k: k["similarity"],
+                            reverse=True,
+                        )
+                        publishing_article_list += sorted_response_with_similarity_list[
+                            :10
+                        ]
+
+        url_list = [i["link"] for i in publishing_article_list]
+        if url_list:
+            # create_crawler_plan
+            crawler_plan_response = aiditApi.auto_create_crawler_task(
+                plan_id=None,
+                plan_name="自动绑定-Top内容泛化-{}--{}".format(
+                    datetime.date.today().__str__(), len(url_list)
+                ),
+                plan_tag="Top内容泛化",
+                article_source="weixin",
+                url_list=url_list,
+            )
+
+            # save to db
+            crawler_plan_id = crawler_plan_response["data"]["id"]
+            crawler_plan_name = crawler_plan_response["data"]["name"]
+
+            # auto bind to generate plan
+            new_crawler_task_list = [
+                {
+                    "contentType": 1,
+                    "inputSourceType": 2,
+                    "inputSourceSubType": None,
+                    "fieldName": None,
+                    "inputSourceValue": crawler_plan_id,
+                    "inputSourceLabel": crawler_plan_name,
+                    "inputSourceModal": 3,
+                    "inputSourceChannel": 5,
+                }
+            ]
+            # 绑定至生成计划
+            generate_plan_response = aiditApi.bind_crawler_task_to_generate_task(
+                crawler_task_list=new_crawler_task_list,
+                generate_task_id="20250703081329508785665",
+            )
+            # change article status
+            article_id_list = [i["article_id"] for i in generate_plan_response]
+            self.change_article_status_while_publishing(article_id_list=article_id_list)