zhangyong 3 ヶ月 前
コミット
cebef9eaf5

+ 16 - 0
docker-compose.yml

@@ -24,6 +24,22 @@ services:
     deploy:
       replicas: 15
     entrypoint: sh /app/sh/select.sh
+  select_spider:
+    image: content_job
+    restart: unless-stopped
+    environment:
+      - ENV=prod
+    networks:
+      - content_net
+    entrypoint: "python /app/workers/select_spider_work.py"
+#  consumption_spider:
+#    image: content_job
+#    restart: unless-stopped
+#    environment:
+#      - ENV=prod
+#    networks:
+#      - content_net
+#    entrypoint: "python /app/workers/consumption_spider_work.py"
 networks:
   content_net:
     name: content_net

+ 48 - 0
utils/gpt4o_mini_help.py

@@ -0,0 +1,48 @@
+import json
+
+import requests
+class GPT4oMini:
+
+    @classmethod
+    def get_ai_mini_early(cls, title):
+        url = "http://aigc-api.cybertogether.net//aigc/dev/test/gpt"
+        payload = json.dumps({
+            "imageList": [],
+            "model": "gpt-4o-mini-2024-07-18",
+            "prompt": (
+                "判断标题是否适合早上发,比如包含早上好、早安等....如果该标题包含日期是否为当天日期 如果符合则返回是,如果不符合则返回否"
+                f"请严格按照示例输出,是否符合早上发:{title},返回是否结果。"
+            ),
+            "responseFormat": {
+                "type": "json_schema",
+                "json_schema": {
+                    "strict": True,
+                    "name": "share_script_result",
+                    "schema": {
+                        "type": "object",
+                        "properties": {
+                            "是否": {
+                                "type": "string",
+                                "description": "是否"
+                            }
+                        },
+                        "required": ["是否"],
+                        "additionalProperties": False
+                    }
+                }
+            }
+        })
+        headers = {'Content-Type': 'application/json'}
+        try:
+            response = requests.post(url, headers=headers, data=payload)
+            response_data = response.json()
+
+            data = json.loads(response_data.get('data', '{}'))
+            new_title = data["是否"]
+            return new_title
+        except Exception as e:
+            return "否"
+
+if __name__ == '__main__':
+    title = GPT4oMini.get_ai_mini_early("❤️早上好!今天是3月3日!春暖花开鸟语花香,愿我们都幸福安康!")
+    print(title)

+ 4 - 4
utils/mysql_db.py

@@ -9,12 +9,12 @@ class MysqlHelper:
     def connect_mysql(cls):
         # 创建一个 Connection 对象,代表了一个数据库连接
         connection = pymysql.connect(
-            host="rm-bp12k5fuh5zyx31d2.mysql.rds.aliyuncs.com",  # 数据库IP地址,内网地址
+            host="rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com",  # 数据库IP地址,内网地址
             # host="rm-bp1159bu17li9hi94ro.mysql.rds.aliyuncs.com",# 数据库IP地址,外网地址
             port=3306,  # 端口号
-            user="wx2023_ad",  # mysql用户名
-            passwd="wx2023_adP@assword1234",  # mysql用户登录密码
-            db="adplatform",  # 数据库名
+            user="crawler",  # mysql用户名
+            passwd="crawler123456@",  # mysql用户登录密码
+            db="piaoquan-crawler",  # 数据库名
             # 如果数据库里面的文本是utf8编码的,charset指定是utf8
             charset="utf8")
         return connection

+ 16 - 0
utils/piaoquan.py

@@ -192,6 +192,22 @@ class PQ:
         return code
 
 
+    @classmethod
+    def pq_recommended(cls, video_id: str):
+        headers = {
+            'Content-Type': 'application/json'
+        }
+        url = "http://8.217.192.46:8889/api/piao_quan/backend/recommend"
+        payload = json.dumps({
+            "content_id": video_id
+        })
+        response = requests.request("POST", url, headers=headers, data=payload)
+        response = response.json()
+        if response['code'] != 0:
+            return False
+        return True
+
+
 
 if __name__ == '__main__':
     PQ.get_pq_oss(47377130)

+ 1 - 2
utils/redis.py

@@ -29,9 +29,8 @@ class RedisHelper(object):
         if self._pool:
             self._pool.disconnect(inuse_connections=True)
 
-def content_video_data(ret):
+def content_video_data(ret, task):
     """分析失败视频重新写入redis"""
-    task = f"task:video_ai_pq"
     helper = RedisHelper()
     client = helper.get_client()
     client.rpush(task, ret)

+ 17 - 10
utils/sql_help.py

@@ -1,16 +1,23 @@
+from venv import logger
+
 from utils.mysql_db import MysqlHelper
-from datetime import datetime
 
 
 class sqlCollect():
     """查询该账号是否存在"""
     @classmethod
-    def select_ad_list(cls):
-        # 获取当前日期和时间
-        current_date = datetime.now()
-        formatted_date = current_date.strftime("%Y-%m-%d")
-        sql = """SELECT `ad_id` ,`creative_code` , `creative_title` ,`material_address` , `click_button_text` ,`creative_logo_address`,`update_time` FROM `creative`  WHERE `update_time` >=  %s"""
-        data = MysqlHelper.get_values(sql,formatted_date)
-        if data:
-            return data
-        return None
+    def select_hour_data(cls):
+
+        sql = """SELECT video_id, video_title
+                FROM crawler_video
+                WHERE create_time >= DATE_SUB(NOW(), INTERVAL 1 HOUR)
+                  AND create_time < NOW() AND video_title LIKE '%早%';"""
+        try:
+            data = MysqlHelper.get_values(sql)
+            if data:
+                data_list = [{"video_id": row[0], "video_title": row[1]} for row in data]
+                return data_list
+            return None
+        except Exception as e:
+            logger.error(f"查询数据时发生异常: {e}")
+            return None

+ 68 - 0
workers/consumption_spider_work.py

@@ -0,0 +1,68 @@
+import asyncio
+import json
+import sys
+import orjson
+from apscheduler.schedulers.asyncio import AsyncIOScheduler
+from apscheduler.triggers.cron import CronTrigger
+from loguru import logger
+
+from utils.gpt4o_mini_help import GPT4oMini
+
+sys.path.append('/app')
+from utils.piaoquan import PQ
+from utils.redis import RedisHelper, content_video_data
+
+
+class ConsumptionSpiderRecommend(object):
+    @classmethod
+    async def run(cls):
+        logger.info(f"[处理] 开始获取redis数据")
+
+        task = RedisHelper().get_client().rpop(name = 'task:pq_hour_early')
+        if not task:
+            logger.info('[处理] 无待处理的数据')
+            return
+        task = orjson.loads(task)
+        logger.info(f"[处理] 获取redis数据{task}")
+        video_title = task['video_title']
+        logger.info(f"[处理] 开始分析标题是否符合早上发送,标题:{video_title}")
+        title_type = GPT4oMini.get_ai_mini_early(video_title)
+        content_video_data(json.dumps({
+            "video_id": task['video_id'],
+            "video_title": task['video_title'],
+            "status": title_type
+        }), "task:spider_early_data")
+        if title_type == "否":
+            return
+        logger.info('[处理] 开始上推荐处理')
+        code = PQ.pq_recommended(task['video_id'])
+        if code:
+            logger.info('[处理] 上推荐处理成功')
+            PQ.video_tag(task['video_id'], "lev-供给,rol-机器,#str-早安内容固定时段上推荐_62")
+            logger.info('[处理] 视频打固定标签成功')
+
+
+
+async def run():
+    scheduler = AsyncIOScheduler()
+    try:
+        logger.info(f"[处理] 开始启动")
+        scheduler.add_job(
+            ConsumptionSpiderRecommend.run,
+            trigger=CronTrigger(hour='2-6', minute='*')  # 2:00-6:59 每分钟执行一次
+        )
+        scheduler.start()
+        await asyncio.Event().wait()
+    except KeyboardInterrupt:
+        pass
+    except Exception as e:
+        logger.error(f"[处理] 启动异常,异常信息:{e}")
+        pass
+    finally:
+        scheduler.shutdown()
+
+
+if __name__ == '__main__':
+    # asyncio.run(ConsumptionSpiderRecommend.run())
+    loop = asyncio.get_event_loop()
+    loop.run_until_complete(run())

+ 1 - 1
workers/consumption_work.py

@@ -42,7 +42,7 @@ class ConsumptionRecommend(object):
         logger.info(f"[处理] 使用的API_KEY:{api_key}")
         text = GoogleAI.run(api_key, video_url)
         if "[异常]" in text:
-            content_video_data(json.dumps(task))
+            content_video_data(json.dumps(task), "task:video_ai_pq")
         AliyunLogger.logging(str(video_id), task['title'], task['video_path'], "", task['type'], task['partition'], orjson.dumps(text).decode())
         logger.info(f"[处理] 写入日志成功")
 

+ 40 - 0
workers/select_spider_work.py

@@ -0,0 +1,40 @@
+import json
+import sys
+import time
+
+import schedule
+from loguru import logger
+
+from utils.sql_help import sqlCollect
+
+sys.path.append('/app')
+
+from utils.redis import RedisHelper
+
+
+
+
+def bot_get_video_id():
+    try:
+        logger.info(f"[R] 开始获取早上好数据任务")
+        redis_task = 'task:pq_hour_early'
+        data = sqlCollect.select_hour_data()
+        if not data:
+            return
+        RedisHelper().get_client().rpush(redis_task, *(json.dumps(item) for item in data))
+        logger.info(f"[R] 早上好数据写入Redis 成功 共写入 {len(data)} 条")
+    except Exception as e:
+        logger.error(f"[R] 早上好数据写入Redis写入失败,失败信息{e}")
+
+def schedule_tasks():
+    schedule.every().hour.at(":05").do(bot_get_video_id)
+
+
+
+if __name__ == "__main__":
+    schedule_tasks()  # 调用任务调度函数
+    while True:
+        schedule.run_pending()
+        time.sleep(1)  # 每秒钟检查一次
+
+    # bot_get_video_id()