浏览代码

重复视频增加appType 888880
新建两个任务task3, task4和新视频抓取解耦
pq_functions请求转化为async_post

response给票圈视频随机分配60%的流量

罗俊辉 8 月之前
父节点
当前提交
f48fd998ae

+ 12 - 5
applications/deal/response.py

@@ -223,17 +223,24 @@ class Response(object):
         """
         L = []
         if self.mini_program_type == 1:
-            mini_id_list = [25, 29, 31]
-            video_count = len(video_list)
-            mini_choice_index = random.sample(range(3), video_count)
-            mini_choice_item = [mini_id_list[i] for i in mini_choice_index]
+            # mini_id_list = [25, 29, 31]
+            # video_count = len(video_list)
+            # mini_choice_index = random.sample(range(3), video_count)
+            # mini_choice_item = [mini_id_list[i] for i in mini_choice_index]
             for index, video_id in enumerate(video_list, 1):
+                random_num = random.randint(1, 10)
+                if random_num in [1, 2, 3, 4, 5, 6]:
+                    mini_id = 25
+                elif random_num in [7, 8]:
+                    mini_id = 29
+                else:
+                    mini_id = 31
                 result = await self.generate_card(
                     video_id=video_id,
                     kimi_title=kimi_title,
                     index=index,
                     gh_id=gh_id,
-                    mini_id=mini_choice_item[index - 1],
+                    mini_id=mini_id,
                 )
                 L.append(result)
         elif self.mini_program_type == 2:

+ 10 - 13
applications/functions/pqFunctions.py

@@ -2,7 +2,6 @@
 @author: luojunhui
 """
 import json
-import requests
 
 from applications.functions.common import async_post
 
@@ -15,9 +14,6 @@ async def publishToPQ(video_obj):
     oss_path = video_obj['videoPath']
     uid = video_obj['uid']
     title = video_obj['title']
-    cover = video_obj['coverPath']
-    # if cover == "None":
-    cover = None
     url = "https://vlogapi.piaoquantv.com/longvideoapi/crawler/video/send"
     headers = {
         "User-Agent": "PQSpeed/486 CFNetwork/1410.1 Darwin/22.6.0",
@@ -28,7 +24,6 @@ async def publishToPQ(video_obj):
         "Content-Type": "application/x-www-form-urlencoded",
     }
     payload = {
-        "coverImgPath": cover,
         "deviceToken": "9ef064f2f7869b3fd67d6141f8a899175dddc91240971172f1f2a662ef891408",
         "fileExtensions": "MP4",
         "loginUid": uid,
@@ -45,15 +40,17 @@ async def publishToPQ(video_obj):
         "videoFromScene": "1",
         "videoPath": oss_path,
         "viewStatus": "1",
+        "appType": 888880,
+        "repeatStatus": 1
     }
-    response = requests.post(
-        url=url,
-        headers=headers,
-        data=payload,
-    )
-    return response.json()
-    # response = await async_post(url, headers, payload)
-    # return response
+    # response = requests.post(
+    #     url=url,
+    #     headers=headers,
+    #     data=payload,
+    # )
+    # return response.json()
+    response = await async_post(url, headers, payload)
+    return response
 
 
 async def getPQVideoDetail(video_id):

+ 5 - 5
tt.py → matchVideoFromHistoryArticleASC.py

@@ -1,12 +1,11 @@
 """
 @author: luojunhui
 """
-import time
-
+import datetime
 import aiomysql
 import asyncio
 
-from tasks.task3 import MatchTask3
+from tasks import MatchTask3
 
 
 class TaskMySQLClient(object):
@@ -80,5 +79,6 @@ async def main():
 if __name__ == '__main__':
     while True:
         asyncio.run(main())
-        print("执行完成")
-        time.sleep(10)
+        now_str = datetime.datetime.now().__str__()
+        print("{}    请求执行完成, 等待5s".format(now_str))
+        asyncio.sleep(5)

+ 84 - 0
matchVideoFromHistoryArticleDESC.py

@@ -0,0 +1,84 @@
+"""
+@author: luojunhui
+"""
+import datetime
+import aiomysql
+import asyncio
+
+from tasks import MatchTask4
+
+
+class TaskMySQLClient(object):
+    """
+    Async MySQL
+    """
+
+    def __init__(self):
+        self.mysql_pool = None
+
+    async def init_pool(self):
+        """
+        初始化连接
+        :return:
+        """
+        self.mysql_pool = await aiomysql.create_pool(
+            host='rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com',
+            port=3306,
+            user='crawler',
+            password='crawler123456@',
+            db='piaoquan-crawler',
+            charset='utf8mb4',
+            connect_timeout=120,
+        )
+        print("mysql init successfully")
+
+    async def close_pool(self):
+        """
+        关闭 mysql 连接
+        :return:
+        """
+        self.mysql_pool.close()
+        await self.mysql_pool.wait_closed()
+
+    async def async_select(self, sql):
+        """
+        select method
+        :param sql:
+        :return:
+        """
+        async with self.mysql_pool.acquire() as conn:
+            async with conn.cursor() as cursor:
+                await cursor.execute(sql)
+                result = await cursor.fetchall()
+                return result
+
+    async def async_insert(self, sql, params):
+        """
+        insert and update method
+        :param params:
+        :param sql:
+        :return:
+        """
+        async with self.mysql_pool.acquire() as coon:
+            async with coon.cursor() as cursor:
+                await cursor.execute(sql, params)
+                await coon.commit()
+
+
+async def main():
+    """
+    main2
+    :return:
+    """
+    TMC = TaskMySQLClient()
+    await TMC.init_pool()
+    PD = MatchTask4(TMC)
+    await PD.deal()
+
+
+if __name__ == '__main__':
+    while True:
+        asyncio.run(main())
+        now_str = datetime.datetime.now().__str__()
+        print("{}    请求执行完成, 等待5s".format(now_str))
+        asyncio.sleep(5)

+ 4 - 18
match_video_task.py

@@ -6,7 +6,7 @@ import datetime
 import asyncio
 
 import aiomysql
-from tasks import MatchTask1, MatchTask3
+from tasks import MatchTask1
 
 
 class TaskMySQLClient(object):
@@ -77,24 +77,10 @@ async def main():
     await PD.deal()
 
 
-async def main2():
-    """
-    main2
-    :return:
-    """
-    TMC = TaskMySQLClient()
-    await TMC.init_pool()
-    PD = MatchTask3(TMC)
-    await PD.deal()
-
-
 if __name__ == '__main__':
     while True:
         asyncio.run(main())
         now_str = datetime.datetime.now().__str__()
-        print("{}    请求执行完成, 等待60s".format(now_str))
-        time.sleep(60)
-        asyncio.run(main2())
-        now_str = datetime.datetime.now().__str__()
-        print("查找历史数据{}    请求执行完成, 等待60s".format(now_str))
-        time.sleep(60)
+        print("{}    请求执行完成, 等待10s".format(now_str))
+        asyncio.sleep(10)
+

+ 1 - 1
static/config.py

@@ -794,4 +794,4 @@ db_video = "article_match_videos"
 spider_coroutines = 10
 
 # mysql coroutines
-mysql_coroutines = 40
+mysql_coroutines = 100

+ 2 - 1
tasks/__init__.py

@@ -4,4 +4,5 @@
 """
 from .task1 import MatchTask1
 from .task2 import MatchTask2
-from .task3 import MatchTask3
+from .task3 import MatchTask3
+from .task4 import MatchTask4

+ 22 - 14
tasks/task3.py

@@ -25,11 +25,23 @@ class MatchTask3(object):
         :return:
         """
         select_sql1 = f"""
-            SELECT trace_id, content_id, gh_id, article_title, article_text, content_status, process_times
-            FROM {db_article} 
-            WHERE content_status = 0 and process_times <= 3
+            SELECT 
+                ART.trace_id, 
+                ART.content_id, 
+                ART.gh_id, 
+                ART.article_title, 
+                ART.article_text, 
+                ART.content_status, 
+                ART.process_times
+            FROM {db_article} ART
+            JOIN (
+                select content_id, count(1) as cnt 
+                from {db_video}
+                where oss_status = 1
+                group by content_id
+            ) VID on ART.content_id = VID.content_id and VID.cnt >= 3
+            WHERE ART.content_status = 0 and ART.process_times <= 3
             ORDER BY request_time_stamp
-            ASC
             LIMIT {mysql_coroutines};
         """
         tasks = await self.mysql_client.async_select(sql=select_sql1)
@@ -140,7 +152,10 @@ class MatchTask3(object):
                 info="存在历史文章",
                 trace_id=trace_id
             )
-            await self.useExistOssPath(video_info_list=oss_path_list, params=params)
+            try:
+                await self.useExistOssPath(video_info_list=oss_path_list, params=params)
+            except Exception as e:
+                print(e)
         else:
             pass
 
@@ -150,15 +165,8 @@ class MatchTask3(object):
         :return:
         """
         task_list = await self.getTaskList()
-        task_dict = {}
-        for task in task_list:
-            key = task['content_id']
-            task_dict[key] = task
-        process_list = []
-        for item in task_dict:
-            process_list.append(task_dict[item])
-        if process_list:
-            tasks = [self.processTask(params) for params in process_list]
+        if task_list:
+            tasks = [self.processTask(params) for params in task_list]
             await asyncio.gather(*tasks)
         else:
             logging(

+ 177 - 0
tasks/task4.py

@@ -0,0 +1,177 @@
+"""
+@author: luojunhui
+"""
+import asyncio
+
+from static.config import db_article, db_video, mysql_coroutines
+from applications.functions.log import logging
+from applications.functions.pqFunctions import *
+
+
+class MatchTask4(object):
+    """
+    处理已经匹配过小程序的文章
+    """
+
+    def __init__(self, mysql_client):
+        """
+        :param mysql_client:
+        """
+        self.mysql_client = mysql_client
+
+    async def getTaskList(self):
+        """
+        获取任务
+        :return:
+        """
+        select_sql1 = f"""
+            SELECT 
+                ART.trace_id, 
+                ART.content_id, 
+                ART.gh_id, 
+                ART.article_title, 
+                ART.article_text, 
+                ART.content_status, 
+                ART.process_times
+            FROM {db_article} ART
+            JOIN (
+                select content_id, count(1) as cnt 
+                from {db_video}
+                where oss_status = 1
+                group by content_id
+            ) VID on ART.content_id = VID.content_id and VID.cnt >= 3
+            WHERE ART.content_status = 0 and ART.process_times <= 3
+            ORDER BY request_time_stamp
+            DESC
+            LIMIT {mysql_coroutines};
+        """
+        tasks = await self.mysql_client.async_select(sql=select_sql1)
+        task_obj_list = [
+            {
+                "trace_id": item[0],
+                "content_id": item[1],
+                "gh_id": item[2],
+                "title": item[3],
+                "text": item[4],
+                "content_status": item[5],
+                "process_times": item[6]
+            } for item in tasks
+        ]
+        logging(
+            code="9001",
+            info="本次任务获取到 {} 条视频".format(len(task_obj_list)),
+            data=task_obj_list
+        )
+        return task_obj_list
+
+    async def getHistoryVideoOssPath(self, content_id):
+        """
+        check whether the contents videos exists
+        :param content_id:
+        :return:
+        """
+        select_sql = f"""
+            SELECT video_title, uid, video_path, cover_path
+            FROM {db_video}
+            where content_id = '{content_id}' and oss_status = 1 order by request_time DESC;
+        """
+        content_videos = await self.mysql_client.async_select(select_sql)
+        video_list = [
+            {
+                "title": line[0],
+                "uid": line[1],
+                "videoPath": line[2],
+                "coverPath": line[3]
+            }
+            for line in content_videos
+        ]
+        if len(video_list) >= 3:
+            return video_list
+        else:
+            return None
+
+    async def useExistOssPath(self, video_info_list, params):
+        """
+        使用已经存在的视频id
+        :return:
+        """
+        trace_id = params['trace_id']
+        content_id = params['content_id']
+        select_sql = f"""
+            SELECT kimi_title
+            FROM {db_article}
+            WHERE content_id = '{content_id}' and kimi_title is not null limit 1;
+        """
+        info = await self.mysql_client.async_select(sql=select_sql)
+        kimi_title = info[0]
+        video_id_list = await getNewVideoIds(video_info_list)
+        vid1, vid2, vid3 = video_id_list[0], video_id_list[1], video_id_list[2]
+        update_sql = f"""
+            UPDATE {db_article}
+            SET 
+                kimi_title=%s,
+                recall_video_id1=%s, 
+                recall_video_id2=%s, 
+                recall_video_id3=%s,
+                content_status=%s,
+                process_times = %s
+            WHERE  trace_id = %s
+        """
+
+        await self.mysql_client.async_insert(
+            sql=update_sql,
+            params=(
+                kimi_title,
+                vid1,
+                vid2,
+                vid3,
+                2,
+                int(params['process_times']) + 1,
+                trace_id
+            )
+        )
+        logging(
+            code="9002",
+            info="已从历史文章更新,文章id: {}".format(content_id),
+            trace_id=trace_id
+        )
+
+    async def processTask(self, params):
+        """
+        异步执行
+        :param params:
+        :return:
+        """
+        content_id = params['content_id']
+        trace_id = params['trace_id']
+        # 判断该篇文章是否存在未下架的视频,且判断是否有3条, 如果没有三条,则启动新抓取任务,后续优化点
+        oss_path_list = await self.getHistoryVideoOssPath(content_id=content_id)
+        if oss_path_list:
+            # 说明已经存在了结果, 将该条记录下的video_oss拿出来
+            logging(
+                code="9001",
+                info="存在历史文章",
+                trace_id=trace_id
+            )
+            try:
+                await self.useExistOssPath(video_info_list=oss_path_list, params=params)
+            except Exception as e:
+                print(e)
+        else:
+            pass
+
+    async def deal(self):
+        """
+        处理
+        :return:
+        """
+        task_list = await self.getTaskList()
+        if task_list:
+            tasks = [self.processTask(params) for params in task_list]
+            await asyncio.gather(*tasks)
+        else:
+            logging(
+                code="9008",
+                info="没有要处理的请求"
+            )
+