Quellcode durchsuchen

add negative category to old server

luojunhui vor 5 Monaten
Ursprung
Commit
e8abf38af2
5 geänderte Dateien mit 216 neuen und 29 gelöschten Zeilen
  1. 6 0
      applications/deal/response.py
  2. 43 0
      applications/functions/apollo.py
  3. 49 15
      matchVideoFromHistoryArticleASC.py
  4. 7 6
      requirements.txt
  5. 111 8
      tasks/task3.py

+ 6 - 0
applications/deal/response.py

@@ -336,6 +336,12 @@ class Response(object):
                 "code": 0,
                 "code": 0,
                 "error": "匹配失败,处理超过五次,文章敏感",
                 "error": "匹配失败,处理超过五次,文章敏感",
             }
             }
+        elif status_code == 95:
+            result = {
+                "traceId": self.trace_id,
+                "code": 95,
+                "error": "文章与账号不匹配",
+            }
         else:
         else:
             result = {"traceId": self.trace_id, "Message": "UnKnow Error"}
             result = {"traceId": self.trace_id, "Message": "UnKnow Error"}
         logging(
         logging(

+ 43 - 0
applications/functions/apollo.py

@@ -0,0 +1,43 @@
+"""
+@author: luojunhui
+"""
+import pyapollos
+
+
+class Config(object):
+    """
+    apolloConfig
+    """
+
+    def __init__(self, env="pre"):
+        """
+        :param env:
+        """
+        match env:
+            case "prod":
+                self.apollo_connection = pyapollos.ApolloClient(
+                    app_id="LongArticlesMatchServer",
+                    config_server_url="https://apolloconfig-internal.piaoquantv.com/",
+                    timeout=10
+                )
+            case "dev":
+                self.apollo_connection = pyapollos.ApolloClient(
+                    app_id="LongArticlesMatchServer",
+                    config_server_url="https://devapolloconfig-internal.piaoquantv.com/",
+                    timeout=10
+                )
+            case "pre":
+                self.apollo_connection = pyapollos.ApolloClient(
+                    app_id="LongArticlesMatchServer",
+                    config_server_url="http://preapolloconfig-internal.piaoquantv.com/",
+                    timeout=10
+                )
+
+    def get_config_value(self, key):
+        """
+        通过 key 获取配置的 Config
+        :param key:
+        :return:
+        """
+        response = self.apollo_connection.get_value(key)
+        return response

+ 49 - 15
matchVideoFromHistoryArticleASC.py

@@ -2,7 +2,6 @@
 @author: luojunhui
 @author: luojunhui
 """
 """
 import datetime
 import datetime
-import time
 
 
 import aiomysql
 import aiomysql
 import asyncio
 import asyncio
@@ -15,7 +14,8 @@ class TaskMySQLClient(object):
     Async MySQL
     Async MySQL
     """
     """
 
 
-    def __init__(self):
+    def __init__(self, db_name):
+        self.db_name = db_name
         self.mysql_pool = None
         self.mysql_pool = None
 
 
     async def init_pool(self):
     async def init_pool(self):
@@ -23,15 +23,37 @@ class TaskMySQLClient(object):
         初始化连接
         初始化连接
         :return:
         :return:
         """
         """
-        self.mysql_pool = await aiomysql.create_pool(
-            host='rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com',
-            port=3306,
-            user='crawler',
-            password='crawler123456@',
-            db='piaoquan-crawler',
-            charset='utf8mb4',
-            connect_timeout=120,
-        )
+        match self.db_name:
+            case "pq":
+                self.mysql_pool = await aiomysql.create_pool(
+                    host='rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com',
+                    port=3306,
+                    user='crawler',
+                    password='crawler123456@',
+                    db='piaoquan-crawler',
+                    charset='utf8mb4',
+                    connect_timeout=120,
+                )
+            case "denet":
+                self.mysql_pool = await aiomysql.create_pool(
+                    host='rm-t4na9qj85v7790tf84o.mysql.singapore.rds.aliyuncs.com',
+                    port=3306,
+                    user='crawler_admin',
+                    password='cyber#crawler_2023',
+                    db='aigc-admin-prod',
+                    charset='utf8mb4',
+                    connect_timeout=120,
+                )
+            case "long-article":
+                self.mysql_pool = await aiomysql.create_pool(
+                    host='rm-bp14529nwwcw75yr1ko.mysql.rds.aliyuncs.com',
+                    port=3306,
+                    user='changwen_admin',
+                    password='changwen@123456',
+                    db='long_articles',
+                    charset='utf8mb4',
+                    connect_timeout=120,
+                )
         print("mysql init successfully")
         print("mysql init successfully")
 
 
     async def close_pool(self):
     async def close_pool(self):
@@ -72,10 +94,22 @@ async def main():
     main job
     main job
     :return:
     :return:
     """
     """
-    TMC = TaskMySQLClient()
-    await TMC.init_pool()
-    PD = MatchTask3(TMC)
-    await PD.deal()
+    pq_db_client = TaskMySQLClient("pq")
+    await pq_db_client.init_pool()
+
+    denet_db_client = TaskMySQLClient("denet")
+    await denet_db_client.init_pool()
+
+    long_article_db_client = TaskMySQLClient("long-article")
+    await long_article_db_client.init_pool()
+
+    history_task = MatchTask3(
+        pq_client=pq_db_client,
+        denet_client=denet_db_client,
+        long_article_client=long_article_db_client
+    )
+    await history_task.deal()
+
     now_str = datetime.datetime.now().__str__()
     now_str = datetime.datetime.now().__str__()
     print("{}    请求执行完成, 等待1分钟".format(now_str))
     print("{}    请求执行完成, 等待1分钟".format(now_str))
     await asyncio.sleep(1 * 60)
     await asyncio.sleep(1 * 60)

+ 7 - 6
requirements.txt

@@ -1,5 +1,5 @@
 aiofiles==23.2.1
 aiofiles==23.2.1
-aiohttp==3.9.3
+aiohttp~=3.10.4
 aiosignal==1.3.1
 aiosignal==1.3.1
 alembic==1.11.1
 alembic==1.11.1
 aliyun-log-python-sdk==0.9.1
 aliyun-log-python-sdk==0.9.1
@@ -61,7 +61,7 @@ multidict==6.0.5
 mypy-extensions==1.0.0
 mypy-extensions==1.0.0
 numpy==1.24.4
 numpy==1.24.4
 odps==3.5.1
 odps==3.5.1
-openai==1.21.2
+openai~=1.47.1
 oss2==2.18.4
 oss2==2.18.4
 packaging==23.1
 packaging==23.1
 pandas==2.0.3
 pandas==2.0.3
@@ -80,9 +80,9 @@ PyMySQL==1.1.0
 pyodps==0.11.6
 pyodps==0.11.6
 python-dateutil==2.8.2
 python-dateutil==2.8.2
 pytz==2023.3
 pytz==2023.3
-Quart==0.19.5
+Quart~=0.19.6
 regex==2024.4.16
 regex==2024.4.16
-requests==2.31.0
+requests~=2.32.3
 schedule==1.2.1
 schedule==1.2.1
 six==1.16.0
 six==1.16.0
 sniffio==1.3.1
 sniffio==1.3.1
@@ -103,5 +103,6 @@ WTForms==3.1.2
 yarl==1.9.4
 yarl==1.9.4
 zipp==3.16.2
 zipp==3.16.2
 
 
-lxml~=5.2.1
-aiomysql~=0.2.0
+lxml~=5.3.0
+aiomysql~=0.2.0
+pyapollos~=0.1.5

+ 111 - 8
tasks/task3.py

@@ -6,6 +6,7 @@ import asyncio
 from static.config import db_article, db_video, mysql_coroutines
 from static.config import db_article, db_video, mysql_coroutines
 from applications.functions.log import logging
 from applications.functions.log import logging
 from applications.functions.pqFunctions import *
 from applications.functions.pqFunctions import *
+from applications.functions.apollo import Config
 
 
 
 
 class MatchTask3(object):
 class MatchTask3(object):
@@ -13,11 +14,15 @@ class MatchTask3(object):
     处理已经匹配过小程序的文章
     处理已经匹配过小程序的文章
     """
     """
 
 
-    def __init__(self, mysql_client):
+    def __init__(self, pq_client, denet_client, long_article_client):
         """
         """
-        :param mysql_client:
+        初始化HistoryArticleMySQLClient
         """
         """
-        self.mysql_client = mysql_client
+        self.config = Config(env="prod")
+        self.pq_client = pq_client
+        self.denet_client = denet_client
+        self.long_article_client = long_article_client
+        self.account_negative_category = json.loads(self.config.get_config_value("account_negative_category"))
 
 
     async def getTaskList(self):
     async def getTaskList(self):
         """
         """
@@ -44,7 +49,7 @@ class MatchTask3(object):
             ORDER BY request_time_stamp
             ORDER BY request_time_stamp
             LIMIT {mysql_coroutines};
             LIMIT {mysql_coroutines};
         """
         """
-        tasks = await self.mysql_client.async_select(sql=select_sql1)
+        tasks = await self.pq_client.async_select(sql=select_sql1)
         task_obj_list = [
         task_obj_list = [
             {
             {
                 "trace_id": item[0],
                 "trace_id": item[0],
@@ -75,7 +80,7 @@ class MatchTask3(object):
             FROM {db_video}
             FROM {db_video}
             where content_id = '{content_id}' and oss_status = 1 order by request_time DESC;
             where content_id = '{content_id}' and oss_status = 1 order by request_time DESC;
         """
         """
-        content_videos = await self.mysql_client.async_select(select_sql)
+        content_videos = await self.pq_client.async_select(select_sql)
         video_list = [
         video_list = [
             {
             {
                 "title": line[0],
                 "title": line[0],
@@ -102,7 +107,7 @@ class MatchTask3(object):
             FROM {db_article}
             FROM {db_article}
             WHERE content_id = '{content_id}' and kimi_title is not null limit 1;
             WHERE content_id = '{content_id}' and kimi_title is not null limit 1;
         """
         """
-        info = await self.mysql_client.async_select(sql=select_sql)
+        info = await self.pq_client.async_select(sql=select_sql)
         kimi_title = info[0]
         kimi_title = info[0]
         video_id_list = await getNewVideoIds(video_info_list)
         video_id_list = await getNewVideoIds(video_info_list)
         vid1, vid2, vid3 = video_id_list[0], video_id_list[1], video_id_list[2]
         vid1, vid2, vid3 = video_id_list[0], video_id_list[1], video_id_list[2]
@@ -118,7 +123,7 @@ class MatchTask3(object):
             WHERE  trace_id = %s
             WHERE  trace_id = %s
         """
         """
 
 
-        await self.mysql_client.async_insert(
+        await self.pq_client.async_insert(
             sql=update_sql,
             sql=update_sql,
             params=(
             params=(
                 kimi_title,
                 kimi_title,
@@ -136,14 +141,113 @@ class MatchTask3(object):
             trace_id=trace_id
             trace_id=trace_id
         )
         )
 
 
+    async def get_content_pool_level(self, content_id) -> str:
+        """
+        获取文章的内容池等级
+        :param content_id:
+        """
+        select_sql = f"""
+            SELECT produce_plan.plan_tag
+            FROM produce_plan
+            JOIN produce_plan_exe_record
+            ON produce_plan.id = produce_plan_exe_record.plan_id
+            WHERE produce_plan_exe_record.plan_exe_id = '{content_id}';
+        """
+        result = await self.denet_client.async_select(sql=select_sql)
+        if result:
+            return result[0][0]
+        else:
+            logging(
+                code="5858",
+                function="task3.get_content_pool_level",
+                info="没有找到该文章的内容池等级",
+                data={'content_id': content_id}
+            )
+            return "ERROR"
+
+    async def check_title_category(self, content_id, gh_id, trace_id) -> bool:
+        """
+        判断该文章的品类是否属于该账号的品类
+        :param trace_id:
+        :param content_id:
+        :param gh_id:
+        :return:
+        """
+        bad_category_list = self.account_negative_category.get(gh_id, [])
+        logging(
+            code="history1101",
+            info="该账号的 negative 类型列表",
+            trace_id=trace_id,
+            data=bad_category_list
+        )
+        if bad_category_list:
+            sql = f"""
+                SELECT category
+                FROM article_category
+                WHERE produce_content_id = '{content_id}';
+            """
+            result = await self.long_article_client.async_select(sql)
+            if result:
+                category = result[0][0]
+                logging(
+                    code="history1102",
+                    info="文章的品类-{}".format(category),
+                    trace_id=trace_id
+                )
+                if category in bad_category_list:
+                    return True
+        return False
+
     async def processTask(self, params):
     async def processTask(self, params):
         """
         """
         异步执行
         异步执行
         :param params:
         :param params:
         :return:
         :return:
         """
         """
+        MISMATCH_STATUS = 95
+        TASK_INIT_STATUS = 0
         content_id = params['content_id']
         content_id = params['content_id']
         trace_id = params['trace_id']
         trace_id = params['trace_id']
+        gh_id = params['gh_id']
+        flow_pool_level = await self.get_content_pool_level(content_id)
+        if flow_pool_level == "autoArticlePoolLevel4":
+            # 判断文章的品类是否属于该账号的 negative 类型
+            negative_category_status = await self.check_title_category(
+                content_id=content_id,
+                gh_id=gh_id,
+                trace_id=trace_id
+            )
+            if negative_category_status:
+                # 修改状态为品类不匹配状态
+                logging(
+                    code="history1002",
+                    info="文章属于该账号的 negative 类型",
+                    trace_id=trace_id
+                )
+                update_sql = f"""
+                    UPDATE {db_article}
+                    SET content_status = %s
+                    WHERE trace_id = %s and content_status = %s;
+                """
+                affected_rows = await self.pq_client.async_insert(
+                    sql=update_sql,
+                    params=(
+                        MISMATCH_STATUS,
+                        trace_id,
+                        TASK_INIT_STATUS
+                    )
+                )
+                logging(
+                    code="history1003",
+                    info="已经修改该文章状态为 品类不匹配状态",
+                    trace_id=trace_id
+                )
+                if affected_rows == 0:
+                    print("修改行数为 0,多个进程抢占同一个 task, 抢占失败,进程退出")
+                    return
+                # 处理完成之后,直接return
+                return
+
         # 判断该篇文章是否存在未下架的视频,且判断是否有3条, 如果没有三条,则启动新抓取任务,后续优化点
         # 判断该篇文章是否存在未下架的视频,且判断是否有3条, 如果没有三条,则启动新抓取任务,后续优化点
         oss_path_list = await self.getHistoryVideoOssPath(content_id=content_id)
         oss_path_list = await self.getHistoryVideoOssPath(content_id=content_id)
         if oss_path_list:
         if oss_path_list:
@@ -174,4 +278,3 @@ class MatchTask3(object):
                 code="9008",
                 code="9008",
                 info="没有要处理的请求"
                 info="没有要处理的请求"
             )
             )
-