Explorar o código

transcode job id

丁云鹏 hai 1 mes
pai
achega
02384fe3cf
Modificáronse 2 ficheiros con 20 adicións e 15 borrados
  1. 0 1
      utils/mysql_db.py
  2. 20 14
      workers/video_insight_consumption_work.py

+ 0 - 1
utils/mysql_db.py

@@ -4,7 +4,6 @@
 """
 """
 import pymysql
 import pymysql
 from loguru import logger
 from loguru import logger
-logger.add("/app/logs/consumption.log", rotation="10 MB")
 
 
 class MysqlHelper:
 class MysqlHelper:
     @classmethod
     @classmethod

+ 20 - 14
workers/video_insight_consumption_work.py

@@ -24,20 +24,26 @@ class ConsumptionRecommend(object):
     async def run(cls):
     async def run(cls):
         logger.info(f"[处理] 开始获取redis数据")
         logger.info(f"[处理] 开始获取redis数据")
 
 
-        task = RedisHelper().get_client().rpop(name = 'task:video_insight')
+        while True:
+            task = RedisHelper().get_client().rpop(name='task:video_insight')
 
 
-        if not task:
-            logger.info('[处理] 无待执行的扫描任务')
-            return
-        task = orjson.loads(task)
-        logger.info(f"[处理] 获取redis数据{task}")
-        video_id = task['video_id']
-
-        count_sql = f"""select count(1) from video_demand_analysis where video_id = {video_id}"""
-        count = MysqlHelper.get_values(count_sql)
-        if count and count[0][0] > 0:
-            logger.info(f"[处理] 视频重复过滤")
-            return
+            if not task:
+                logger.info('[处理] 无待执行的扫描任务')
+                return
+
+            task = orjson.loads(task)
+            logger.info(f"[处理] 获取redis数据{task}")
+            video_id = task['video_id']
+
+            count_sql = f"""select count(1) from video_demand_analysis where video_id = {video_id}"""
+            count = MysqlHelper.get_values(count_sql)
+
+            if not count or count[0][0] == 0:
+                logger.info(f"[处理] 视频ID {video_id} 可用")
+                # 这里可以继续处理 video_id
+                break
+            else:
+                logger.info(f"[处理] 视频ID {video_id} 重复过滤,继续获取下一个任务")
 
 
         logger.info(f"[处理] 开始获取原视频OSS地址")
         logger.info(f"[处理] 开始获取原视频OSS地址")
         video_title, video_path = PQ.get_pq_oss(video_id)
         video_title, video_path = PQ.get_pq_oss(video_id)
@@ -83,7 +89,7 @@ class ConsumptionRecommend(object):
         MysqlHelper.update_values(sql)
         MysqlHelper.update_values(sql)
 
 
         # AliyunLogger.logging(str(video_id), orjson.dumps(text).decode())
         # AliyunLogger.logging(str(video_id), orjson.dumps(text).decode())
-        logger.info(f"[处理] 写入日志成功")
+        logger.info(f"[处理] 写入数据库成功")
 
 
 
 
 async def run():
 async def run():