Browse Source

增加冷启动任务

luojunhui 1 month ago
parent
commit
20eaf4fe90

+ 2 - 0
applications/api/__init__.py

@@ -21,6 +21,8 @@ from .aliyun_log_api import log
 
 # aigc system api
 from .async_aigc_system_api import delete_illegal_gzh_articles
+from .async_aigc_system_api import auto_create_crawler_task
+from .async_aigc_system_api import auto_bind_crawler_task_to_generate_task
 
 feishu_robot = FeishuBotApi()
 feishu_sheet = FeishuSheetApi()

+ 141 - 0
applications/api/async_aigc_system_api.py

@@ -1,5 +1,28 @@
+import json
 from applications.utils import AsyncHttpClient
 
+HEADERS = {
+    "Accept": "application/json",
+    "Accept-Language": "zh,zh-CN;q=0.9",
+    "Content-Type": "application/json",
+    "Origin": "http://admin.cybertogether.net",
+    "Proxy-Connection": "keep-alive",
+    "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36",
+}
+PERSON_COOKIE = {
+    "token": "af54cdc404c3464d896745df389b2dce",
+    "appType": 9,
+    "platform": "pc",
+    "appVersionCode": 1000,
+    "clientTimestamp": 1,
+    "fid": 1,
+    "loginUid": 1,
+    "pageSource": 1,
+    "requestId": 1,
+    "rid": 1,
+    "uid": 1,
+}
+
 
 async def delete_illegal_gzh_articles(gh_id: str, title: str):
     """
@@ -17,3 +40,121 @@ async def delete_illegal_gzh_articles(gh_id: str, title: str):
         res = await client.post(url=url, headers=headers, json=payload)
 
     return res
+
+
+async def auto_create_crawler_task(plan_id, plan_name, plan_tag, url_list, platform):
+    """
+    Create crawler task
+    """
+    match platform:
+        case "weixin":
+            channel = 5
+        case "toutiao":
+            channel = 6
+        case _:
+            raise RuntimeError(f"Unsupported platform: {platform}")
+
+    url = "http://aigc-api.cybertogether.net/aigc/crawler/plan/save"
+    payload = {
+        "params": {
+            "contentFilters": [],
+            "accountFilters": [],
+            "filterAccountMatchMode": 1,
+            "filterContentMatchMode": 1,
+            "selectModeValues": [],
+            "searchModeValues": [],
+            "contentModal": 3,
+            "analyze": {},
+            "crawlerComment": 0,
+            "inputGroup": None,
+            "inputSourceGroups": [],
+            "modePublishTime": [],
+            "planType": 2,
+            "frequencyType": 2,
+            "planTag": plan_tag,
+            "tagPenetrateFlag": 0,
+            "id": plan_id,
+            "name": plan_name,
+            "channel": channel,
+            "crawlerMode": 5,
+            "inputModeValues": url_list,
+            "modePublishTimeStart": None,
+            "modePublishTimeEnd": None,
+            "executeRate": None,
+            "executeDate": None,
+            "executeWindowStart": None,
+            "executeWindowEnd": None,
+            "executeTimeInterval": None,
+            "executeNum": None,
+            "addModal": None,
+            "addChannel": None,
+            "fileUpload": None,
+            "prompt": None,
+            "acelFlag": None,
+            "tasks": [],
+        },
+        "baseInfo": PERSON_COOKIE,
+    }
+    async with AsyncHttpClient(timeout=600) as client:
+        res = await client.post(url=url, headers=HEADERS, json=payload)
+
+    return res
+
+
+async def auto_bind_crawler_task_to_generate_task(crawler_task_list, generate_task_id):
+    url = "http://aigc-api.cybertogether.net/aigc/produce/plan/save"
+    plan_info = await get_generate_task_detail(generate_task_id)
+    input_source_groups = plan_info.get("inputSourceGroups")
+    existed_crawler_task = input_source_groups[0].get("inputSources")
+    new_task_list = existed_crawler_task + crawler_task_list
+    input_source_group_0 = input_source_groups[0]
+    input_source_group_0["inputSources"] = new_task_list
+    payload = json.dumps(
+        {
+            "params": {
+                "contentFilters": [],
+                "produceModal": plan_info.get("produceModal"),
+                "inputModal": plan_info.get("inputModal"),
+                "tasks": plan_info.get("tasks", []),
+                "modules": [],
+                "moduleGroups": plan_info.get("moduleGroups"),
+                "inputSourceGroups": [input_source_group_0],
+                "layoutType": plan_info.get("layoutType"),
+                "activeManualReview": plan_info.get("activeManualReview"),
+                "totalProduceNum": plan_info.get("totalProduceNum"),
+                "dailyProduceNum": plan_info.get("dailyProduceNum"),
+                "maxConcurrentNum": plan_info.get("maxConcurrentNum"),
+                "id": generate_task_id,
+                "name": plan_info.get("name"),
+                "planTag": plan_info.get("planTag"),
+                "tagPenetrateFlag": plan_info.get("tagPenetrateFlag"),
+                "inputType": plan_info.get("inputType"),
+                "inputChannel": plan_info.get("inputChannel"),
+                "activeManualReviewCount": plan_info.get("activeManualReviewCount"),
+                "autoComposite": plan_info.get("autoComposite"),
+            },
+            "baseInfo": PERSON_COOKIE,
+        }
+    )
+    async with AsyncHttpClient(timeout=600) as client:
+        response = await client.post(url=url, headers=HEADERS, data=payload)
+    return response
+
+
+async def get_generate_task_detail(generate_task_id):
+    """
+    通过生成计划的 id,获取该生成计划已有的抓取计划 list
+    :param generate_task_id:
+    :return:
+    """
+    url = "http://aigc-api.cybertogether.net/aigc/produce/plan/detail"
+    payload = json.dumps(
+        {"params": {"id": generate_task_id}, "baseInfo": PERSON_COOKIE}
+    )
+    async with AsyncHttpClient(timeout=600) as client:
+        res = await client.post(url=url, headers=HEADERS, data=payload)
+
+    if res["msg"] == "success":
+        return res["data"]
+    else:
+        return {}

+ 1 - 0
applications/tasks/cold_start_tasks/__init__.py

@@ -0,0 +1 @@
+from .article_pool_cold_start import ArticlePoolColdStart

+ 395 - 0
applications/tasks/cold_start_tasks/article_pool_cold_start.py

@@ -0,0 +1,395 @@
+from __future__ import annotations
+
+import time
+import datetime
+import traceback
+
+from typing import List, Dict
+from pandas import DataFrame
+
+from applications.api import task_apollo, feishu_robot
+from applications.api import auto_create_crawler_task
+from applications.api import auto_bind_crawler_task_to_generate_task
+from applications.utils import get_titles_from_produce_plan
+
+
+class ArticlePoolColdStartConst:
+    # article
+    DAILY_ARTICLE_NUM = 1000
+    SIMILARITY_SCORE_THRESHOLD = 0.5
+
+    TITLE_NOT_SENSITIVE = 0
+    TITLE_SENSITIVE = 1
+
+    PUBLISHED_STATUS = 2
+    INIT_STATUS = 1
+    BAD_STATUS = 0
+    READ_TIMES_THRESHOLD = 1.3
+    READ_THRESHOLD = 5000
+
+    TITLE_LENGTH_LIMIT = 15
+    TITLE_LENGTH_MAX = 50
+
+    DEFAULT_CRAWLER_METHODS = ["1030-手动挑号", "account_association"]
+
+
+class ArticlePoolColdStart(ArticlePoolColdStartConst):
+    def __init__(self, pool, log_client, trace_id):
+        self.pool = pool
+        self.log_client = log_client
+        self.trace_id = trace_id
+
+    async def get_article_from_meta_table(
+        self, platform: str, crawl_method: str, strategy: str
+    ) -> DataFrame:
+        """
+        @param platform: 文章抓取平台
+        @param crawl_method: 文章抓取模式
+        @param strategy: 供给策略
+        """
+        match platform:
+            case "weixin":
+                article_list = await self.get_weixin_cold_start_articles(
+                    crawl_method, strategy
+                )
+            case "toutiao":
+                article_list = await self.get_toutiao_cold_start_articles(
+                    crawl_method, strategy
+                )
+            case _:
+                raise ValueError("Invalid platform")
+        return DataFrame(
+            article_list,
+            columns=[
+                "article_id",
+                "title",
+                "link",
+                "llm_sensitivity",
+                "score",
+                "category_by_ai",
+            ],
+        )
+
+    async def get_weixin_cold_start_articles(
+        self, crawl_method: str, strategy: str
+    ) -> List[Dict]:
+        match strategy:
+            case "strategy_v1":
+                query = f"""
+                    select 
+                        article_id, title, link,  llm_sensitivity, score, category_by_ai
+                    from crawler_meta_article t1 
+                    join crawler_meta_article_accounts_read_avg t2 on t1.out_account_id = t2.gh_id and t1.article_index = t2.position
+                    where category = %s 
+                        and platform = %s
+                        and title_sensitivity = %s 
+                        and t1.status = %s
+                        and t1.read_cnt / t2.read_avg >= %s
+                        and t1.read_cnt >= %s
+                        and t2.status = %s
+                    order by score desc;
+                """
+                article_list = await self.pool.async_fetch(
+                    query=query,
+                    params=(
+                        crawl_method,
+                        "weixin",
+                        self.TITLE_NOT_SENSITIVE,
+                        self.INIT_STATUS,
+                        self.READ_TIMES_THRESHOLD,
+                        self.READ_THRESHOLD,
+                        self.INIT_STATUS,
+                    ),
+                )
+                return article_list
+            case _:
+                raise ValueError("Invalid strategy")
+
+    async def get_toutiao_cold_start_articles(
+        self, crawl_method: str, strategy: str
+    ) -> List[Dict]:
+        match strategy:
+            case "strategy_v1":
+                query = f"""
+                    select article_id, title, link,  llm_sensitivity, score, category_by_ai
+                    from crawler_meta_article
+                    where category = %s 
+                        and platform = %s
+                        and status = %s;
+                """
+                article_list = await self.pool.async_fetch(
+                    query=query, params=(crawl_method, "toutiao", self.INIT_STATUS)
+                )
+                return article_list
+            case _:
+                raise ValueError("Invalid strategy")
+
+    async def filter_published_titles(self, plan_id):
+        """
+        过滤已添加至aigc中的标题
+        """
+        published_title_tuple = await get_titles_from_produce_plan(self.pool, plan_id)
+        update_query = f"""
+            update crawler_meta_article set status = %s where title in %s and status = %s;
+        """
+        changed_rows = await self.pool.async_save(
+            query=update_query,
+            params=(self.PUBLISHED_STATUS, published_title_tuple, self.INIT_STATUS),
+        )
+        return changed_rows
+
+    async def filter_weixin_articles(self, dataframe, crawl_method):
+        """微信过滤漏斗"""
+        total_length: int = dataframe.shape[0]
+
+        # 通过标题长度过滤
+        filter_df = dataframe[
+            (dataframe["title"].str.len() <= self.TITLE_LENGTH_MAX)
+            & (dataframe["title"].str.len() >= self.TITLE_LENGTH_LIMIT)
+        ]
+        length_level1 = filter_df.shape[0]
+
+        # 通过敏感词过滤
+        sensitive_keywords = [
+            "农历",
+            "太极",
+            "节",
+            "早上好",
+            "赖清德",
+            "普京",
+            "俄",
+            "南海",
+            "台海",
+            "解放军",
+            "蔡英文",
+            "中国",
+        ]
+        # 构建正则表达式,使用 | 连接表示“或”的关系
+        pattern = "|".join(sensitive_keywords)
+        filter_df = filter_df[~filter_df["title"].str.contains(pattern, na=False)]
+        # 获取过滤后的行数
+        length_level2 = filter_df.shape[0]
+        filter_df = filter_df[~(filter_df["llm_sensitivity"] > 0)]
+        length_level3 = filter_df.shape[0]
+        # 第4层通过相关性分数过滤
+        filter_df = filter_df[filter_df["score"] > self.SIMILARITY_SCORE_THRESHOLD]
+        length_level4 = filter_df.shape[0]
+
+        await feishu_robot.bot(
+            title="冷启任务发布通知",
+            detail={
+                "总文章数量": total_length,
+                "通过标题长度过滤": "过滤数量: {}    剩余数量: {}".format(
+                    total_length - length_level1, length_level1
+                ),
+                "通过敏感词过滤": "过滤数量: {}    剩余数量: {}".format(
+                    length_level1 - length_level2, length_level2
+                ),
+                "通过LLM敏感度过滤": "过滤数量: {}    剩余数量: {}".format(
+                    length_level2 - length_level3, length_level3
+                ),
+                "通过相关性分数过滤": "过滤数量: {}    剩余数量: {}".format(
+                    length_level3 - length_level4, length_level4
+                ),
+                "渠道": crawl_method,
+                "阅读均值倍数阈值": self.READ_TIMES_THRESHOLD,
+                "阅读量阈值": self.READ_THRESHOLD,
+                "标题长度阈值": self.TITLE_LENGTH_LIMIT,
+            },
+            mention=False,
+        )
+        return filter_df[: self.DAILY_ARTICLE_NUM]
+
+    async def filter_toutiao_articles(self, dataframe, crawl_method):
+        total_length = dataframe.shape[0]
+        filter_df = dataframe[dataframe["score"] > self.SIMILARITY_SCORE_THRESHOLD]
+
+        await feishu_robot.bot(
+            title="冷启动创建抓取计划",
+            detail={
+                "渠道": crawl_method,
+                "总文章数量": total_length,
+                "相关性分数过滤": filter_df.shape[0],
+            },
+            mention=False,
+        )
+        return filter_df[: self.DAILY_ARTICLE_NUM]
+
+    async def insert_crawler_plan_into_database(
+        self, crawler_plan_id, crawler_plan_name, create_timestamp
+    ):
+        query = f"""
+            insert into article_crawler_plan (crawler_plan_id, name, create_timestamp)
+            values (%s, %s, %s)
+        """
+        try:
+            await self.pool.async_save(
+                query=query,
+                params=(crawler_plan_id, crawler_plan_name, create_timestamp),
+            )
+        except Exception as e:
+            await feishu_robot.bot(
+                title="品类冷启任务,记录抓取计划id失败",
+                detail={
+                    "error": str(e),
+                    "error_msg": traceback.format_exc(),
+                    "crawler_plan_id": crawler_plan_id,
+                    "crawler_plan_name": crawler_plan_name,
+                },
+            )
+
+    async def change_article_status_while_publishing(self, article_id_list):
+        """
+        :param: article_id_list: 文章的唯一 id
+        :return:
+        """
+        query = f"""
+            update crawler_meta_article
+            set status = %s
+            where article_id in %s and status = %s;
+        """
+        affect_rows = await self.pool.async_save(
+            query=query,
+            params=(self.PUBLISHED_STATUS, tuple(article_id_list), self.INIT_STATUS),
+        )
+        return affect_rows
+
+    async def create_cold_start_plan(
+        self, platform, crawl_method, plan_id, strategy="strategy_v1"
+    ):
+        article_dataframe = await self.get_article_from_meta_table(
+            platform, crawl_method, strategy
+        )
+        await self.log_client.log(
+            contents={
+                "task": "article_pool_cold_start",
+                "platform": platform,
+                "crawl_method": crawl_method,
+                "status": "success",
+                "trace_id": self.trace_id,
+                "message": "获取文章成功",
+                "data": {"article_length": article_dataframe.shape[0]},
+            }
+        )
+
+        match platform:
+            case "weixin":
+                input_source_channel = 5
+                filter_article_df = await self.filter_weixin_articles(
+                    article_dataframe, crawl_method
+                )
+            case "toutiao":
+                input_source_channel = 6
+                filter_article_df = await self.filter_toutiao_articles(
+                    article_dataframe, crawl_method
+                )
+            case _:
+                raise ValueError("Invalid platform")
+
+        # split article into each category
+        category_list = await task_apollo.get_config_value(key="category_list")
+        for ai_category in category_list:
+            filter_category_df = filter_article_df[
+                filter_article_df["category_by_ai"] == ai_category
+            ]
+            url_list = filter_category_df["link"].values.tolist()
+            if url_list:
+                # create_crawler_plan
+                crawler_plan_response = await auto_create_crawler_task(
+                    plan_id=None,
+                    plan_name="自动绑定-{}-{}-{}--{}".format(
+                        crawl_method,
+                        ai_category,
+                        datetime.date.today().__str__(),
+                        len(url_list),
+                    ),
+                    plan_tag="品类冷启动",
+                    platform=platform,
+                    url_list=url_list,
+                )
+                # save to db
+                create_timestamp = int(time.time()) * 1000
+                crawler_plan_id = crawler_plan_response["data"]["id"]
+                crawler_plan_name = crawler_plan_response["data"]["name"]
+                await self.insert_crawler_plan_into_database(
+                    crawler_plan_id, crawler_plan_name, create_timestamp
+                )
+
+                # auto bind to generate plan
+                new_crawler_task_list = [
+                    {
+                        "contentType": 1,
+                        "inputSourceType": 2,
+                        "inputSourceSubType": None,
+                        "fieldName": None,
+                        "inputSourceValue": crawler_plan_id,
+                        "inputSourceLabel": crawler_plan_name,
+                        "inputSourceModal": 3,
+                        "inputSourceChannel": input_source_channel,
+                    }
+                ]
+                generate_plan_response = await auto_bind_crawler_task_to_generate_task(
+                    crawler_task_list=new_crawler_task_list, generate_task_id=plan_id
+                )
+                await self.log_client.log(
+                    contents={
+                        "task": "article_pool_cold_start",
+                        "platform": platform,
+                        "crawl_method": crawl_method,
+                        "status": "success",
+                        "trace_id": self.trace_id,
+                        "message": "绑定至生成计划成功",
+                        "data": generate_plan_response,
+                    }
+                )
+                # change article status
+                article_id_list = filter_category_df["article_id"].values.tolist()
+                await self.change_article_status_while_publishing(
+                    article_id_list=article_id_list
+                )
+
+    async def deal(self, platform: str, crawl_methods: List[str]) -> None:
+        if not crawl_methods:
+            crawl_methods = self.DEFAULT_CRAWLER_METHODS
+
+        await self.log_client.log(
+            contents={
+                "task": "article_pool_cold_start",
+                "platform": platform,
+                "crawl_methods": crawl_methods,
+                "status": "success",
+                "trace_id": self.trace_id,
+            }
+        )
+
+        crawl_methods_map = await task_apollo.get_config_value(
+            key="category_cold_start_map"
+        )
+
+        for crawl_method in crawl_methods:
+            try:
+                plan_id = crawl_methods_map[crawl_method]
+                affected_rows = await self.filter_published_titles(plan_id)
+                await self.log_client.log(
+                    contents={
+                        "task": "article_pool_cold_start",
+                        "platform": platform,
+                        "crawl_method": crawl_method,
+                        "status": "success",
+                        "trace_id": self.trace_id,
+                        "message": "通过已抓取标题修改文章状态",
+                        "data": {"affected_rows": affected_rows},
+                    }
+                )
+                await self.create_cold_start_plan(platform, crawl_method, plan_id)
+
+            except Exception as e:
+                await feishu_robot.bot(
+                    title="文章冷启动异常",
+                    detail={
+                        "crawl_method": crawl_method,
+                        "error": str(e),
+                        "function": "deal",
+                        "traceback": traceback.format_exc(),
+                    },
+                )

+ 13 - 0
applications/tasks/task_mapper.py

@@ -18,6 +18,7 @@ class Const:
     RECYCLE_DAILY_ARTICLE_TIMEOUT = 3600
     UPDATE_ROOT_SOURCE_ID_TIMEOUT = 3600
     CRAWLER_TOUTIAO_ARTICLES_TIMEOUT = 5 * 3600
+    ARTICLE_POOL_COLD_START_TIMEOUT = 4 * 3600
 
 
 class TaskMapper(Const):
@@ -26,22 +27,34 @@ class TaskMapper(Const):
         match task_name:
             case "check_kimi_balance":
                 expire_duration = self.CHECK_KIMI_BALANCE_TIMEOUT
+
             case "get_off_videos":
                 expire_duration = self.GET_OFF_VIDEO_TIMEOUT
+
             case "check_publish_video_audit_status":
                 expire_duration = self.CHECK_VIDEO_AUDIT_TIMEOUT
+
             case "outside_article_monitor":
                 expire_duration = self.OUTSIDE_ARTICLE_MONITOR_TIMEOUT
+
             case "inner_article_monitor":
                 expire_duration = self.INNER_ARTICLE_MONITOR_TIMEOUT
+
             case "title_rewrite":
                 expire_duration = self.TITLE_REWRITE_TIMEOUT
+
             case "daily_publish_articles_recycle":
                 expire_duration = self.RECYCLE_DAILY_ARTICLE_TIMEOUT
+
             case "update_root_source_id":
                 expire_duration = self.UPDATE_ROOT_SOURCE_ID_TIMEOUT
+
             case "crawler_toutiao_articles":
                 expire_duration = self.CRAWLER_TOUTIAO_ARTICLES_TIMEOUT
+
+            case "article_pool_pool_cold_start":
+                expire_duration = self.ARTICLE_POOL_COLD_START_TIMEOUT
+
             case _:
                 expire_duration = self.DEFAULT_TIMEOUT
 

+ 44 - 4
applications/tasks/task_scheduler.py

@@ -3,8 +3,9 @@ import time
 from datetime import datetime
 
 from applications.api import feishu_robot
-from applications.utils import task_schedule_response
+from applications.utils import task_schedule_response, generate_task_trace_id
 
+from applications.tasks.cold_start_tasks import ArticlePoolColdStart
 from applications.tasks.crawler_tasks import CrawlerToutiao
 from applications.tasks.data_recycle_tasks import CheckDailyPublishArticlesTask
 from applications.tasks.data_recycle_tasks import RecycleDailyPublishArticlesTask
@@ -26,6 +27,7 @@ class TaskScheduler(TaskMapper):
         self.log_client = log_service
         self.db_client = db_client
         self.table = "long_articles_task_manager"
+        self.trace_id = generate_task_trace_id()
 
     async def whether_task_processing(self, task_name: str) -> bool:
         """whether task is processing"""
@@ -51,9 +53,10 @@ class TaskScheduler(TaskMapper):
 
     async def record_task(self, task_name, date_string):
         """record task"""
-        query = f"""insert into {self.table} (date_string, task_name, start_timestamp) values (%s, %s, %s);"""
+        query = f"""insert into {self.table} (date_string, task_name, start_timestamp, trace_id) values (%s, %s, %s, %s);"""
         await self.db_client.async_save(
-            query=query, params=(date_string, task_name, int(time.time()))
+            query=query,
+            params=(date_string, task_name, int(time.time()), self.trace_id),
         )
 
     async def lock_task(self, task_name, date_string):
@@ -116,6 +119,15 @@ class TaskScheduler(TaskMapper):
                 error_code="5001", error_message="task is processing"
             )
 
+        await self.log_client.log(
+            contents={
+                "trace_id": self.trace_id,
+                "task": task_name,
+                "message": "start processing",
+                "data": self.data,
+            }
+        )
+
         await self.record_task(task_name=task_name, date_string=date_string)
 
         await self.lock_task(task_name, date_string)
@@ -163,7 +175,6 @@ class TaskScheduler(TaskMapper):
 
                 async def background_check_publish_video_audit_status():
                     sub_task = CheckVideoAuditStatus(self.db_client, self.log_client)
-                    print("start processing task status: ")
                     task_status = await sub_task.deal()
                     await self.release_task(
                         task_name=task_name,
@@ -304,6 +315,35 @@ class TaskScheduler(TaskMapper):
                     data={"code": 0, "message": "task started background"},
                 )
 
+            case "article_pool_pool_cold_start":
+
+                async def background_article_pool_pool_cold_start():
+                    sub_task = ArticlePoolColdStart(
+                        self.db_client, self.log_client, self.trace_id
+                    )
+                    crawler_methods = self.data.get("crawler_methods", [])
+                    platform = self.data.get("platform", "weixin")
+                    await sub_task.deal(
+                        platform=platform, crawl_methods=crawler_methods
+                    )
+                    await self.release_task(
+                        task_name=task_name, date_string=date_string
+                    )
+                    await self.log_client.log(
+                        contents={
+                            "trace_id": self.trace_id,
+                            "task": task_name,
+                            "message": "finish processed",
+                            "data": self.data,
+                        }
+                    )
+
+                asyncio.create_task(background_article_pool_pool_cold_start())
+                return await task_schedule_response.success_response(
+                    task_name=task_name,
+                    data={"code": 0, "message": "task started background"},
+                )
+
             case _:
                 await self.log_client.log(
                     contents={

+ 3 - 0
applications/utils/__init__.py

@@ -1,6 +1,9 @@
 # import async apollo client
 from .async_apollo_client import AsyncApolloClient
 
+# aigc system
+from .aigc_system_database import *
+
 # import async http client
 from .async_http_client import AsyncHttpClient
 

+ 20 - 0
applications/utils/aigc_system_database.py

@@ -0,0 +1,20 @@
+from datetime import datetime, timedelta
+
+
+async def get_titles_from_produce_plan(pool, plan_id, threshold=None):
+    if not threshold:
+        fifteen_days_ago = datetime.now() - timedelta(days=30)
+        threshold = fifteen_days_ago.strftime("%Y%m%d") + 15 * "0"
+
+    query = f"""
+        select distinct t2.title
+        from produce_plan_input_source t3
+            join crawler_plan_result_rel t1 on t3.input_source_value = t1.plan_id
+            join crawler_content t2 on t1.channel_source_id = t2.channel_content_id
+        where t3.plan_id = %s
+        and t3.input_source_value > %s;
+    """
+    response = await pool.async_fetch(
+        query=query, db_name="aigc", params=(plan_id, threshold)
+    )
+    return tuple([i["title"] for i in response])

+ 7 - 0
applications/utils/common.py

@@ -2,6 +2,8 @@
 @author: luojunhui
 """
 
+import random
+import string
 import hashlib
 
 from datetime import datetime, timezone, date, timedelta
@@ -191,3 +193,8 @@ def days_remaining_in_month():
     remaining_days = (last_day_of_month - today).days
 
     return remaining_days
+
+
+def generate_task_trace_id():
+    random_str = "".join(random.choices(string.ascii_lowercase + string.digits, k=16))
+    return f"Task-{datetime.now().strftime('%Y%m%d%H%M%S')}-{random_str}"