浏览代码

分表,修改task1, task2, search_schedule.py

罗俊辉 9 月之前
父节点
当前提交
883b2dc928
共有 3 个文件被更改,包括 95 次插入230 次删除
  1. 4 2
      applications/schedule/search_schedule.py
  2. 48 71
      tasks/task1.py
  3. 43 157
      tasks/task2.py

+ 4 - 2
applications/schedule/search_schedule.py

@@ -234,10 +234,11 @@ class SearchMethod(object):
             return L
 
 
-async def video_sender(video_obj, user, trace_id, platform, index):
+async def video_sender(video_obj, user, trace_id, platform, content_id, index):
     """
     异步处理微信 video_obj
     公众号和站内账号一一对应
+    :param content_id: 文章id
     :param index:
     :param platform:
     :param user:
@@ -274,6 +275,7 @@ async def video_sender(video_obj, user, trace_id, platform, index):
         mq_obj = {}
     mq_obj['index'] = index
     mq_obj['trace_id'] = trace_id
+    mq_obj['content_id'] = content_id
     header = {
         "Content-Type": "application/json",
     }
@@ -326,7 +328,6 @@ async def search_videos(params, trace_id, gh_id, mysql_client):
     )
     # 按照标题相似度排序
     ranked_list = title_similarity_rank(content_title=params['title'].split("@@")[-1], recall_list=recall_list)
-    # print(params['title'].split("@@")[-1])
     for i in ranked_list:
         print(i['title'], i['score'])
     index = 0
@@ -341,6 +342,7 @@ async def search_videos(params, trace_id, gh_id, mysql_client):
                     user=gh_id_dict.get(gh_id),
                     trace_id=trace_id,
                     platform=platform,
+                    content_id=params['content_id'],
                     index=index
                 )
                 logging(

+ 48 - 71
tasks/task1.py

@@ -3,7 +3,7 @@
 """
 import asyncio
 
-from static.config import db_article
+from static.config import db_article, db_video
 from applications.schedule import search_videos
 from applications.functions.log import logging
 from static.config import spider_coroutines
@@ -68,26 +68,21 @@ class MatchTask1(object):
         else:
             return []
 
-    async def get_history_contents(self, content_id):
+    async def get_history_videos(self, content_id):
         """
-        check whether the content id exists
-        :return: trace_id or None
+        check whether the contents videos exists
+        :param content_id:
+        :return:
         """
         select_sql = f"""
-               SELECT trace_id, content_status
-               FROM {db_article}
-               WHERE content_id = '{content_id}'
-               ORDER BY id DESC;
-           """
-        result = await self.mysql_client.async_select(select_sql)
-        if result:
-            for item in result:
-                trace_id, content_status = item
-                if content_status == 2:
-                    return trace_id
-                else:
-                    continue
-            return None
+            SELECT video_id
+            FROM {db_video}
+            where content_id = '{content_id}' and video_status = 1 order by request_time DESC;
+        """
+        content_videos = await self.mysql_client.async_select(select_sql)
+        videos = [vid for vid in content_videos]
+        if len(videos) >= 3:
+            return videos
         else:
             return None
 
@@ -113,70 +108,48 @@ class MatchTask1(object):
         else:
             return True
 
-    async def insert_history_contents_videos(self, history_trace_id, params):
+    async def use_exists_contents_videos(self, video_id_list, params):
         """
-        插入历史视频id
+        使用已经存在的视频id
         :return:
         """
+        trace_id = params['trace_id']
+        content_id = params['content_id']
         select_sql = f"""
-            SELECT kimi_title, recall_video_id1, recall_video_id2, recall_video_id3
+            SELECT kimi_title
             FROM {db_article}
-            WHERE trace_id = '{history_trace_id}';
+            WHERE content_id = '{content_id}' and kimi_title is not null limit 1;
         """
         info = await self.mysql_client.async_select(sql=select_sql)
-        kimi_title, vid1, vid2, vid3 = info[0]
+        kimi_title = info[0]
         update_sql = f"""
-        UPDATE {db_article}
-        SET 
-            kimi_title=%s,
-            recall_video_id1=%s, 
-            recall_video_id2=%s, 
-            recall_video_id3=%s,
-            content_status=%s,
-            process_times = %s
-        WHERE  trace_id = %s
+            UPDATE {db_article}
+            SET 
+                kimi_title=%s,
+                recall_video_id1=%s, 
+                recall_video_id2=%s, 
+                recall_video_id3=%s,
+                content_status=%s,
+                process_times = %s
+            WHERE  trace_id = %s
         """
+        vid1, vid2, vid3 = video_id_list[0], video_id_list[1], video_id_list[2]
         await self.mysql_client.async_insert(
             sql=update_sql,
             params=(
                 kimi_title,
-                vid1,
+                video_id_list[0],
                 "NULL" if vid2 is None else vid2,
                 "NULL" if vid3 is None else vid3,
                 2,
                 int(params['process_times']) + 1,
-                params['trace_id']
+                trace_id
             )
         )
         logging(
             code="9002",
-            info="已从历史文章更新,历史id: {}".format(history_trace_id),
-            trace_id=params['trace_id']
-        )
-
-    async def process_video_id(self, title, trace_id, process_times):
-        """
-        如果video_id在标题中,则做特殊处理
-        :return:
-        """
-        video_id = title.split("video_id=")[-1]
-        update_sql = f"""
-            UPDATE  
-                {db_article}
-            SET 
-                recall_video_id1 = %s,
-                content_status = %s,
-                process_times = %s
-            WHERE  
-                trace_id = %s;"""
-        await self.mysql_client.async_insert(
-            sql=update_sql,
-            params=(
-                video_id,
-                2,
-                {int(process_times) + 1},
-                trace_id
-            )
+            info="已从历史文章更新,文章id: {}".format(content_id),
+            trace_id=trace_id
         )
 
     async def start_process(self, params):
@@ -201,19 +174,23 @@ class MatchTask1(object):
         )
         try:
             await search_videos(
-                params={"title": params['title'], "content": params['text'], "trace_id": params['trace_id']},
+                params={
+                    "title": params['title'],
+                    "content": params['text'],
+                    "trace_id": params['trace_id'],
+                    "content_id": params['content_id']
+                },
                 trace_id=params['trace_id'],
                 gh_id=params['gh_id'],
                 mysql_client=self.mysql_client
             )
-            # 执行完成之后,判断是否存在视频id
             select_sql = f"""
-                SELECT recall_video_id1, recall_video_id2, recall_video_id3
-                FROM {db_article}
-                WHERE trace_id = '{params["trace_id"]}';
+                SELECT video_id
+                FROM article_match_videos
+                WHERE content_id = '{params['content_id']}'
             """
             result = await self.mysql_client.async_select(sql=select_sql)
-            vid1, vid2, vid3 = result[0]
+            vid1, vid2, vid3 = result[0], result[1], result[2]
             if vid1 or vid2 or vid3:
                 update_sql2 = f"""
                     UPDATE {db_article}
@@ -312,16 +289,16 @@ class MatchTask1(object):
         """
         content_id = params['content_id']
         trace_id = params['trace_id']
-        # 判断该文章是否已经生成了
-        history_trace_id = await self.get_history_contents(content_id)
-        if history_trace_id:
+        # 判断该篇文章是否存在未下架的视频,且判断是否有3条, 如果没有三条,则启动新抓取任务,后续优化点
+        video_id_list = await self.get_history_videos(content_id=content_id)
+        if video_id_list:
             # 说明已经存在了结果, 将该条记录下的video_id拿出来
             logging(
                 code="9001",
                 info="存在历史文章",
                 trace_id=trace_id
             )
-            await self.insert_history_contents_videos(history_trace_id, params)
+            await self.use_exists_contents_videos(video_id_list=video_id_list, params=params)
         else:
             flag = await self.judge_content_processing(content_id)
             if flag:

+ 43 - 157
tasks/task2.py

@@ -3,8 +3,7 @@
 """
 import asyncio
 
-from static.config import db_article
-from applications.schedule import search_videos
+from static.config import db_article, db_video
 from applications.functions.log import logging
 from static.config import mysql_coroutines
 
@@ -52,75 +51,66 @@ class MatchTask2(object):
         )
         return task_obj_list
 
-    async def get_history_contents(self, content_id):
+    async def get_history_videos(self, content_id):
         """
-        check whether the content id exists
-        :return: trace_id or None
+        check whether the contents videos exists
+        :param content_id:
+        :return:
         """
         select_sql = f"""
-               SELECT trace_id, content_status
-               FROM {db_article}
-               WHERE content_id = '{content_id}'
-               ORDER BY id DESC;
-           """
-        result = await self.mysql_client.async_select(select_sql)
-        if result:
-            for item in result:
-                trace_id, content_status = item
-                if content_status == 2:
-                    return trace_id
-                elif content_status == 3:
-                    update_sql = f"""
-                    UPDATE {db_article}
-                    SET content_status = 3
-                    WHERE trace_id = %s;
-                    """
-                    await self.mysql_client.async_insert(update_sql, trace_id)
-                else:
-                    continue
-            return None
+            SELECT video_id
+            FROM {db_video}
+            where content_id = '{content_id}' and video_status = 1 order by request_time DESC;
+        """
+        content_videos = await self.mysql_client.async_select(select_sql)
+        videos = [vid for vid in content_videos]
+        if len(videos) >= 3:
+            return videos
         else:
             return None
 
-    async def insert_history_contents_videos(self, history_trace_id, params):
+    async def use_exists_contents_videos(self, video_id_list, params):
         """
-        插入历史视频id
+        使用已经存在的视频id
         :return:
         """
+        trace_id = params['trace_id']
+        content_id = params['content_id']
         select_sql = f"""
-            SELECT kimi_title, recall_video_id1, recall_video_id2, recall_video_id3
+            SELECT kimi_title
             FROM {db_article}
-            WHERE trace_id = '{history_trace_id}';
+            WHERE content_id = '{content_id}' and kimi_title is not null limit 1;
         """
         info = await self.mysql_client.async_select(sql=select_sql)
-        kimi_title, vid1, vid2, vid3 = info[0]
+        kimi_title = info[0]
         update_sql = f"""
-        UPDATE {db_article}
-        SET 
-            kimi_title=%s,
-            recall_video_id1=%s, 
-            recall_video_id2=%s, 
-            recall_video_id3=%s,
-            content_status=%s,
-            process_times = %s
-        WHERE  trace_id = %s;
+            UPDATE {db_article}
+            SET 
+                kimi_title=%s,
+                recall_video_id1=%s, 
+                recall_video_id2=%s, 
+                recall_video_id3=%s,
+                content_status=%s,
+                process_times = %s
+            WHERE  trace_id = %s
         """
+        vid1, vid2, vid3 = video_id_list[0], video_id_list[1], video_id_list[2]
         await self.mysql_client.async_insert(
-            update_sql,
+            sql=update_sql,
             params=(
                 kimi_title,
-                vid1,
-                {"NULL" if vid2 is None else vid2},
-                {"NULL" if vid2 is None else vid3},
+                video_id_list[0],
+                "NULL" if vid2 is None else vid2,
+                "NULL" if vid3 is None else vid3,
                 2,
                 int(params['process_times']) + 1,
-                params['trace_id']
+                trace_id
             )
         )
         logging(
             code="9002",
-            info="已从历史文章更新,历史id: {}".format(history_trace_id),
-            trace_id=params['trace_id']
+            info="已从历史文章更新,文章id: {}".format(content_id),
+            trace_id=trace_id
         )
 
     async def process_video_id(self, title, trace_id, process_times):
@@ -143,104 +133,6 @@ class MatchTask2(object):
             params=(video_id, 2, int(process_times) + 1, trace_id)
         )
 
-    async def start_process(self, params):
-        """
-        开始处理
-        :param params:
-        :return:
-        """
-        # 更新文章contentId为1, 说明该文章正在处理中
-        update_sql = f"""
-            UPDATE {db_article}
-            SET 
-                content_status = %s
-            WHERE 
-                trace_id = %s;
-        """
-        await self.mysql_client.async_insert(
-            sql=update_sql,
-            params=(1, params['trace_id'])
-        )
-        try:
-            # 判断标题中是否包含video_id
-            if "video_id=" in params['title']:
-                logging(
-                    code="9006",
-                    info="视频生成文本测试",
-                    trace_id=params['trace_id']
-                )
-                await self.process_video_id(
-                    title=params['title'],
-                    trace_id=params['trace_id'],
-                    process_times=params['process_times']
-                )
-            else:
-                await search_videos(
-                    params={"title": params['title'], "content": params['text'], "trace_id": params['trace_id']},
-                    trace_id=params['trace_id'],
-                    gh_id=params['gh_id'],
-                    mysql_client=self.mysql_client
-                )
-                # 执行完成之后,判断是否存在视频id
-                select_sql = f"""
-                    SELECT recall_video_id1, recall_video_id2, recall_video_id3
-                    FROM {db_article}
-                    WHERE trace_id = '{params["trace_id"]}';
-                """
-                result = await self.mysql_client.async_select(sql=select_sql)
-                vid1, vid2, vid3 = result[0]
-                if vid1:
-                    update_sql2 = f"""
-                        UPDATE {db_article}
-                        SET 
-                           content_status = %s,
-                           process_times = %s
-                           WHERE trace_id = %s;
-                    """
-                    await self.mysql_client.async_insert(
-                        sql=update_sql2,
-                        params=(2, int(params['process_times']) + 1, params['trace_id'])
-                    )
-                    logging(
-                        code="9008",
-                        info="视频搜索成功, 状态修改为2",
-                        trace_id=params['trace_id']
-                    )
-                else:
-                    update_sql3 = f"""
-                        UPDATE {db_article}
-                        SET 
-                           content_status = %s,
-                           process_times = %s
-                        WHERE trace_id = %s;
-                    """
-                    await self.mysql_client.async_insert(
-                        sql=update_sql3,
-                        params=(0, int(params['process_times']) + 1, params["trace_id"])
-                    )
-                    logging(
-                        code="9018",
-                        info="视频搜索失败,回退状态为0",
-                        trace_id=params['trace_id']
-                    )
-        except Exception as e:
-            logging(
-                code="9018",
-                info="{}异常错误:{}, 回退状态为0".format(params['trace_id'], e),
-                trace_id=params['trace_id']
-            )
-            update_sql4 = f"""
-                UPDATE {db_article}
-                SET 
-                   content_status = %s,
-                   process_times = %s
-                WHERE trace_id = %s;
-            """
-            await self.mysql_client.async_insert(
-                sql=update_sql4,
-                params=(0, int(params['process_times']) + 1, params["trace_id"])
-            )
-
     async def process_task(self, params):
         """
         异步执行
@@ -249,24 +141,18 @@ class MatchTask2(object):
         """
         content_id = params['content_id']
         trace_id = params['trace_id']
-        # 判断该文章是否已经生成了
-        history_trace_id = await self.get_history_contents(content_id)
-        if history_trace_id:
+        # 判断该篇文章是否存在未下架的视频,且判断是否有3条, 如果没有三条,则启动新抓取任务,后续优化点
+        video_id_list = await self.get_history_videos(content_id=content_id)
+        if video_id_list:
             # 说明已经存在了结果, 将该条记录下的video_id拿出来
             logging(
                 code="9001",
                 info="存在历史文章",
-                trace_id=trace_id,
-                function="find_history_article"
+                trace_id=trace_id
             )
-            await self.insert_history_contents_videos(history_trace_id, params)
+            await self.use_exists_contents_videos(video_id_list=video_id_list, params=params)
         else:
-            logging(
-                code="9003",
-                info="未找到历史文章",
-                trace_id=trace_id,
-                function="find_history_article"
-            )
+            pass
 
     async def deal(self):
         """