zhangyong 5 months ago
parent
commit
e8bfa0055c
8 changed files with 502 additions and 10 deletions
  1. 302 0
      advertisement_job.py
  2. 54 1
      common/aliyun_log.py
  3. 12 0
      common/feishu_data.py
  4. 67 0
      common/mysql_db.py
  5. 23 3
      common/redis.py
  6. 16 0
      common/sql_help.py
  7. 13 0
      docker-compose.yml
  8. 15 6
      job_redis_data.py

+ 302 - 0
advertisement_job.py

@@ -0,0 +1,302 @@
+import json
+import os
+import time
+import uuid
+from typing import Literal, Optional, Tuple
+
+import cv2
+import google.generativeai as genai
+import requests
+import schedule
+from google.generativeai.types import (File, GenerateContentResponse,
+                                       HarmBlockThreshold, HarmCategory)
+from loguru import logger
+
+from common.aliyun_log import AliyunLogger
+from common.common_log import Common
+from common.feishu_data import Material
+from common.redis import SyncRedisHelper, ad_in_video_data
+
+ENV = os.getenv('ENV', 'dev')
+API_KEY = os.getenv('API_KEY')
+TASK_TYPE = os.getenv('TASK_TYPE')
+
+PROXY_ADDR = 'http://localhost:1081'
+CACHE_DIR = '/app/cache/' if ENV == 'prod' else os.path.expanduser('~/Downloads/')
+SAMPLE_DATA = {
+    "一、基础信息": {
+        "视频主题": "",
+        "视频关键词": "",
+        "产品卖点":[]
+    },
+    "二、主体和场景": {
+        "视频主体": "",
+        "人物数量": "",
+        "人物年龄段": [],
+        "人物性别": [],
+        "场景描述": []
+    },
+    "三、情感与风格": {
+        "情感倾向": "",
+        "视频风格": ""
+    },
+    "四、视频传播性与时效": {
+        "片尾引导": "",
+        "传播强度": "",
+        "时效性": "",
+        "适用时间": ""
+    },
+    "五、用户画像": {
+        "价值类型": "",
+        "用户价值点": "",
+        "性别": "",
+        "年龄": "",
+        "观众收入": ""
+    },
+    "六、音画信息": {
+        "背景音类型": "",
+        "背景音风格": "",
+        "语音类型": "",
+        "字幕": "",
+        "颜色": "",
+        "logo": ""
+    },
+    "七、封面信息": {
+        "封面主体": "",
+        "人物个数": "",
+        "文字数量": "",
+        "文字关键字": [],
+        "封面主题": ""
+    },
+    "八、剪辑信息": {
+        "视频开场风格": "",
+        "展现形式": ""
+    },
+    "九、类目": {
+        "视频一级分类": "",
+        "视频二级分类": ["品类- 、分数-","品类- 、分数-","品类- 、分数-"]
+    },
+    "十、视频时长": {
+        "时长": "",
+    },
+    "八、视频宽高": {
+        "宽高": "",
+        "宽高比": ""
+    }
+}
+
+if ENV == 'dev':
+    os.environ['http_proxy'] = PROXY_ADDR
+    os.environ['https_proxy'] = PROXY_ADDR
+
+
+def get_redis_task(task_type: Literal['recommend', 'top']) -> Optional[bytes]:
+    redis_key = f'task:ad_video_recommend'
+    redis_task: bytes = SyncRedisHelper().get_client().rpop(redis_key)
+    if redis_task:
+        logger.success(f'[+] 获取到 {task_type} 类型任务: {redis_task.decode()}')
+    else:
+        logger.error(f'[+] 未获取到 {task_type} 类型任务')
+    return redis_task
+
+
+def get_video_duration(video_link: str) -> int:
+    cap = cv2.VideoCapture(video_link)
+    if cap.isOpened():
+        rate = cap.get(5)
+        frame_num = cap.get(7)
+        duration = int(frame_num / rate)
+        return duration
+    return 0
+
+
+def download_video(video_link: str) -> Optional[str]:
+    file_path = os.path.join(CACHE_DIR, f'{str(uuid.uuid4())}.mp4')
+    for _ in range(3):
+        try:
+            response = requests.get(url=video_link)
+            if response.status_code == 200:
+                with open(file_path, 'wb') as f:
+                    f.write(response.content)
+                logger.info(f'[+] 视频链接: {video_link}, 存储地址: {file_path}')
+                return file_path
+        except Exception:
+            time.sleep(1)
+            continue
+    return
+
+
+def upload_video(video_path: str, redis_task) -> Optional[Tuple[File, str]]:
+    try:
+        file = genai.upload_file(path=video_path)
+        while True:
+            if file.state.name == 'PROCESSING':
+                time.sleep(1)
+                file = genai.get_file(name=file.name)
+            else:
+                return file, file.state.name
+    except Exception as e:
+        AliyunLogger.ad_logging(str(redis_task['ad_id']),
+                                redis_task['creative_code'],
+                                redis_task['creative_title'],
+                                redis_task['material_address'],
+                                redis_task['click_button_text'],
+                                redis_task['creative_logo_address'],
+                                redis_task['update_time'],
+                                f"[+] 上传视频失败: {e}")
+        logger.error(f'[+] 上传视频失败: {e}')
+        ad_in_video_data(redis_task)
+
+        return
+
+
+def create_model_cache(redis_task) -> Optional[genai.GenerativeModel]:
+    try:
+        model = genai.GenerativeModel(
+            model_name='gemini-1.5-flash',
+            generation_config={'response_mime_type': 'application/json'},
+            safety_settings={HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: HarmBlockThreshold.BLOCK_NONE},
+        )
+        logger.info('[+] 创建缓存模型成功')
+        return model
+    except Exception as e:
+        AliyunLogger.ad_logging(str(redis_task['ad_id']),
+                                redis_task['creative_code'],
+                                redis_task['creative_title'],
+                                redis_task['material_address'],
+                                redis_task['click_button_text'],
+                                redis_task['creative_logo_address'],
+                                redis_task['update_time'],
+                                f"[+] 视频创建缓存内容,并返回生成模型异常信息: {e}")
+        logger.error(f'视频创建缓存内容,并返回生成模型异常信息: {e}')
+        ad_in_video_data(redis_task)
+
+        Common.logger('ai').info(f'视频创建缓存内容,并返回生成模型异常信息: {e}')
+        return
+
+
+def analyze_video(model: genai.GenerativeModel, google_file: File, prompt: str, redis_task) -> Optional[GenerateContentResponse]:
+    try:
+        session = model.start_chat(history=[])
+        content = {
+            'parts': [
+                google_file,
+                f'{prompt}\n输出返回格式样例:\n{SAMPLE_DATA}',
+            ],
+        }
+        return session.send_message(content=content)
+    except Exception as e:
+        AliyunLogger.ad_logging(str(redis_task['ad_id']),
+                                redis_task['creative_code'],
+                                redis_task['creative_title'],
+                                redis_task['material_address'],
+                                redis_task['click_button_text'],
+                                redis_task['creative_logo_address'],
+                                redis_task['update_time'],
+                                f"[+] 视频处理请求失败: {e}")
+        ad_in_video_data(redis_task)
+        logger.error(f'视频处理请求失败: {e}')
+        Common.logger('ai').info(f'视频处理请求失败: {e}')
+        return
+
+
+def run():
+    if not API_KEY:
+        logger.error('[+] 请在环境变量中新增 API_KEY')
+        return
+    if not TASK_TYPE:
+        logger.error('[+] 请在环境变量中新增 TASK_TYPE, 可选值: recommend | top')
+        return
+    genai.configure(api_key=API_KEY)
+
+    redis_task = get_redis_task(task_type=TASK_TYPE)
+    if not redis_task:
+        time.sleep(10)
+        return
+    redis_task = json.loads(redis_task)
+    mark, prompt = Material.ad_feishu_list()
+
+    # video_duration = get_video_duration(video_link=redis_task[3])
+    # if not video_duration:
+    #     AliyunLogger.logging( str( redis_task[0] ), redis_task[1], redis_task['video_path'], "",
+    #                           redis_task['type'], redis_task['partition'], "[+] 获取视频时长失败, 跳过任务" )
+    #     logger.error('[+] 获取视频时长失败, 跳过任务')
+    #     return
+    # elif video_duration >= 600:
+    #     AliyunLogger.logging( str( redis_task['video_id'] ), redis_task['title'], redis_task['video_path'], "",
+    #                           redis_task['type'], redis_task['partition'], "[+] 视频时长超过10分钟, 跳过任务" )
+    #     logger.error('[+] 视频时长超过10分钟, 跳过任务')
+    #     return
+
+    video_path = download_video(video_link=redis_task['material_address'])
+    if not video_path:
+        AliyunLogger.ad_logging( str(redis_task['ad_id']),
+                                 redis_task['creative_code'],
+                                 redis_task['creative_title'],
+                                 redis_task['material_address'],
+                                 redis_task['click_button_text'],
+                                 redis_task['creative_logo_address'],
+                                 redis_task['update_time'],
+                                 "[+] 视频下载失败, 跳过任务" )
+        logger.error(f'[+] 视频下载失败, 跳过任务')
+        if os.path.exists(video_path):
+            os.remove(video_path)
+            logger.info(f"文件已删除: {video_path}")
+        return
+
+    google_file, google_file_state = upload_video(video_path=video_path, redis_task=redis_task)
+    if not google_file_state:
+        return
+    elif google_file_state != 'ACTIVE':
+        logger.error('[+] 视频上传状态不为 ACTIVE, 跳过任务')
+        genai.delete_file(google_file)
+        if os.path.exists(video_path):
+            os.remove(video_path)
+            logger.info(f"文件已删除: {video_path}")
+        return
+
+    model = create_model_cache(redis_task=redis_task)
+    if isinstance(model, str):
+        logger.error('[+] 创建模型失败, 跳过任务')
+        genai.delete_file(google_file)
+        if os.path.exists(video_path):
+            os.remove(video_path)
+            logger.info(f"文件已删除: {video_path}")
+        return
+
+    response = analyze_video(model=model, google_file=google_file, prompt=prompt, redis_task=redis_task)
+    if isinstance(response, str):
+        logger.error('[+] 获取模型响应失败, 跳过任务')
+        genai.delete_file(google_file)
+        if os.path.exists(video_path):
+            os.remove(video_path)
+            logger.info(f"文件已删除: {video_path}")
+        return
+
+    text = response.text.strip()
+    cleaned_text = text.replace("```json", '').replace("```", '').strip()
+    AliyunLogger.ad_logging(str(redis_task['ad_id']),
+                            redis_task['creative_code'],
+                            redis_task['creative_title'],
+                            redis_task['material_address'],
+                            redis_task['click_button_text'],
+                            redis_task['creative_logo_address'],
+                            redis_task['update_time'],
+                            str(cleaned_text))
+    logger.info(f'[+] 模型响应结果: {text}')
+    if os.path.exists(video_path):
+        os.remove(video_path)
+        logger.info(f"文件已删除: {video_path}")
+    genai.delete_file(google_file)
+
+
+if __name__ == '__main__':
+    logger.info(f'[+] 任务已启动 -> API_KEY: {API_KEY}, TASK_TYPE: {TASK_TYPE}')
+    schedule.every(interval=1).seconds.do(run)
+    while True:
+        try:
+            schedule.run_pending()
+            time.sleep(1)
+        except KeyboardInterrupt:
+            break
+    logger.info('[+] 任务已停止')

+ 54 - 1
common/aliyun_log.py

@@ -73,4 +73,57 @@ class AliyunLogger:
 
             client.put_logs(request)
         except Exception as e:
-            logger.info( f'[+] 日志写入失败: {e}' )
+            logger.info( f'[+] 日志写入失败: {e}' )
+
+    # 写入阿里云日志
+    @staticmethod
+    def ad_logging(
+            ad_id: str,
+            creative_code: str,
+            creative_title: str,
+            material_address: str,
+            click_button_text: str,
+            creative_logo_address: str,
+            update_time: str,
+            data: Optional[str] = None):
+        """
+        写入阿里云日志
+        测试库: https://sls.console.aliyun.com/lognext/project/crawler-log-dev/logsearch/crawler-log-dev
+        正式库: https://sls.console.aliyun.com/lognext/project/crawler-log-prod/logsearch/crawler-log-prod
+        """
+        accessKeyId = "LTAIWYUujJAm7CbH"
+        accessKey = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P"
+
+        project = "crawler-log-prod"
+        logstore = "ad_tag_info"
+        endpoint = "cn-hangzhou.log.aliyuncs.com"
+        try:
+            contents = [
+                ("ad_id", ad_id),
+                ("creative_code", creative_code),
+                ("creative_title", creative_title),
+                ("material_address", material_address),
+                ("click_button_text", click_button_text),
+                ("creative_logo_address", creative_logo_address),
+                ("update_time", update_time),
+                ("data", data),
+            ]
+            # 创建 LogClient 实例
+            client = LogClient(endpoint, accessKeyId, accessKey)
+            log_group = []
+            log_item = LogItem()
+            log_item.set_contents(contents)
+            log_group.append(log_item)
+            # 写入日志
+            request = PutLogsRequest(
+                project=project,
+                logstore=logstore,
+                topic="",
+                source="",
+                logitems=log_group,
+                compress=False,
+            )
+
+            client.put_logs(request)
+        except Exception as e:
+            logger.info(f'[+] 日志写入失败: {e}')

+ 12 - 0
common/feishu_data.py

@@ -17,6 +17,18 @@ class Material():
                 else:
                     time.sleep(5)
 
+    @classmethod
+    def ad_feishu_list(cls):
+        for i in range(3):
+            data = Feishu.get_values_batch("Q5u4sylPKhTyr4tQV3QcgauInTh", "ccb7ec")
+            for row in data[0:]:
+                mark = row[0]
+                prompt = row[1]
+                if mark and prompt:
+                    return mark, prompt
+                else:
+                    time.sleep(5)
+
 if __name__ == '__main__':
     mark, prompt = Material.feishu_list()
     print(mark, prompt)

+ 67 - 0
common/mysql_db.py

@@ -0,0 +1,67 @@
+# -*- coding: utf-8 -*-
+"""
+数据库连接及操作
+"""
+import pymysql
+
+class MysqlHelper:
+    @classmethod
+    def connect_mysql(cls):
+        # 创建一个 Connection 对象,代表了一个数据库连接
+        connection = pymysql.connect(
+            host="rm-bp12k5fuh5zyx31d2.mysql.rds.aliyuncs.com",  # 数据库IP地址,内网地址
+            # host="rm-bp1159bu17li9hi94ro.mysql.rds.aliyuncs.com",# 数据库IP地址,外网地址
+            port=3306,  # 端口号
+            user="wx2023_ad",  # mysql用户名
+            passwd="wx2023_adP@assword1234",  # mysql用户登录密码
+            db="adplatform",  # 数据库名
+            # 如果数据库里面的文本是utf8编码的,charset指定是utf8
+            charset="utf8")
+        return connection
+
+    @classmethod
+    def get_values(cls, sql, params=None):
+        try:
+            # 连接数据库
+            connect = cls.connect_mysql()
+            # 返回一个 Cursor对象
+            mysql = connect.cursor()
+
+            if params:
+                # 如果传递了 params 参数
+                mysql.execute(sql, params)
+            else:
+                # 如果没有传递 params 参数
+                mysql.execute(sql)
+            # fetchall方法返回的是一个元组,里面每个元素也是元组,代表一行记录
+            data = mysql.fetchall()
+
+            # 关闭数据库连接
+            connect.close()
+
+            # 返回查询结果,元组
+            return data
+        except Exception as e:
+            print(f"get_values异常:{e}\n")
+
+    @classmethod
+    def update_values(cls, sql):
+        # 连接数据库
+        connect = cls.connect_mysql()
+        # 返回一个 Cursor对象
+        mysql = connect.cursor()
+        try:
+            # 执行 sql 语句
+            res = mysql.execute(sql)
+            # 注意 一定要commit,否则添加数据不生效
+            connect.commit()
+            return res
+        except Exception as e:
+            # 发生错误时回滚
+            connect.rollback()
+        # 关闭数据库连接
+        connect.close()
+
+
+
+

+ 23 - 3
common/redis.py

@@ -2,6 +2,7 @@ import json
 
 import redis
 from common.odps_data import OdpsDataCount
+from common.sql_help import sqlCollect
 
 
 class SyncRedisHelper:
@@ -45,19 +46,38 @@ def install_video_data(dt, redis_task, table_name):
     client = helper.get_client()
     client.rpush(redis_task, *data)
 
+def install_ad_video_data(redis_task):
+    """广告写入redis需要打标签的视频"""
+    data = sqlCollect.select_ad_list()
+    if not data:
+        return
+    data = list(data)
+    # 字段名列表,用于创建字典
+    field_names = ["ad_id", "creative_code", "creative_title", "material_address", "click_button_text", "creative_logo_address", "update_time"]
+
+    # 将每个元组转换为字典,并存入新的列表 data_dicts 中
+    data_dicts = [{key: str(value) for key, value in zip(field_names, item)} for item in data]
+    print(len(data_dicts))
+    helper = SyncRedisHelper()
+    client = helper.get_client()
+    for item in data_dicts:
+        json_data = json.dumps(item)
+        client.rpush(redis_task, json_data)
+
+    # client.rpush(redis_task, *data_dicts)
+
 
 
 def get_video_data(redis_task):
     """获取一条需要打标签的视频"""
-    # task = redis_task
     helper = SyncRedisHelper()
     client = helper.get_client()
     ret = client.rpop(redis_task)
     return ret
 
-def in_video_data(ret):
+def ad_in_video_data(ret):
     """分析失败视频重新写入redis"""
-    task = f"task:video_ai"
+    task = f"task:ad_video_recommend"
     helper = SyncRedisHelper()
     client = helper.get_client()
     client.rpush(task, ret)

+ 16 - 0
common/sql_help.py

@@ -0,0 +1,16 @@
+from common.mysql_db import MysqlHelper
+from datetime import datetime
+
+
+class sqlCollect():
+    """查询该账号是否存在"""
+    @classmethod
+    def select_ad_list(cls):
+        # 获取当前日期和时间
+        current_date = datetime.now()
+        formatted_date = current_date.strftime("%Y-%m-%d")
+        sql = """SELECT `ad_id` ,`creative_code` , `creative_title` ,`material_address` , `click_button_text` ,`creative_logo_address`,`update_time` FROM `creative`  WHERE `update_time` >=  %s"""
+        data = MysqlHelper.get_values(sql,formatted_date)
+        if data:
+            return data
+        return None

+ 13 - 0
docker-compose.yml

@@ -72,6 +72,19 @@ services:
       - TASK_TYPE=recommend
     networks:
       - google_net
+  worker7:
+    depends_on:
+      - worker1
+    image: google_ai_studio
+    container_name: google_worker7
+    restart: unless-stopped
+    environment:
+      - ENV=prod
+      - API_KEY=AIzaSyDXeugvEaYpKNrLPavMU1U5GtRhSaNLpAc
+      - TASK_TYPE=recommend
+    networks:
+      - google_net
+    entrypoint: "python /app/advertisement_job.py"
 networks:
   google_net:
     name: google_net

+ 15 - 6
job_redis_data.py

@@ -3,7 +3,8 @@ import time
 
 import schedule
 
-from common.redis import install_video_data
+from common.redis import install_video_data, install_ad_video_data
+
 
 def bot_video_ai_top():
     """当日头部"""
@@ -28,17 +29,25 @@ def bot_video_ai_recommend():
     except Exception as e:
         print(f"新推荐异常了{e}")
 
-
+def ab_video_ai_recommend():
+    """广告"""
+    try:
+        redis_task = 'task:ad_video_recommend'
+        install_ad_video_data(redis_task)
+    except Exception as e:
+        print(f"广告异常了{e}")
 
 def schedule_tasks():
     schedule.every().hour.at(":22").do(bot_video_ai_recommend)
     schedule.every().day.at("01:25").do(bot_video_ai_top)
+    schedule.every().day.at("23:25").do(ab_video_ai_recommend())
 
 
 if __name__ == "__main__":
-    schedule_tasks()  # 调用任务调度函数
-    while True:
-        schedule.run_pending()
-        time.sleep(1)  # 每秒钟检查一次
+    # schedule_tasks()  # 调用任务调度函数
+    # while True:
+    #     schedule.run_pending()
+    #     time.sleep(1)  # 每秒钟检查一次
+    ab_video_ai_recommend()
     # bot_video_ai_top()
     # bot_video_ai_recommend()