Kaynağa Gözat

Merge branch 'master' of https://git.yishihui.com/weapp/video_decode into decode_prompt

jihuaqiang 2 gün önce
ebeveyn
işleme
7512c2940b
36 değiştirilmiş dosya ile 927 ekleme ve 2 silme
  1. 1 0
      .gitignore
  2. 88 0
      decode_task/decodeTask.py
  3. 170 0
      decode_task/scriptTask.py
  4. 210 0
      loggers/sls.py
  5. 96 0
      main.py
  6. 30 0
      models/decode_record.py
  7. 31 0
      models/decode_result_record.py
  8. 12 2
      requirements.txt
  9. BIN
      src/__pycache__/__init__.cpython-313.pyc
  10. BIN
      src/components/__pycache__/__init__.cpython-313.pyc
  11. BIN
      src/components/agents/__pycache__/__init__.cpython-313.pyc
  12. BIN
      src/components/agents/__pycache__/base.cpython-313.pyc
  13. BIN
      src/components/agents/__pycache__/inspiration_points_agent.cpython-313.pyc
  14. BIN
      src/components/agents/__pycache__/key_points_agent.cpython-313.pyc
  15. BIN
      src/components/agents/__pycache__/purpose_point_agent.cpython-313.pyc
  16. BIN
      src/components/agents/__pycache__/search_keyword_agent.cpython-313.pyc
  17. BIN
      src/components/agents/__pycache__/topic_agent_v2.cpython-313.pyc
  18. BIN
      src/components/agents/__pycache__/topic_selection_understanding_agent.cpython-313.pyc
  19. BIN
      src/components/functions/__pycache__/__init__.cpython-313.pyc
  20. BIN
      src/components/functions/__pycache__/base.cpython-313.pyc
  21. BIN
      src/components/functions/__pycache__/json_utils.cpython-313.pyc
  22. BIN
      src/components/tools/__pycache__/__init__.cpython-313.pyc
  23. BIN
      src/components/tools/__pycache__/base.cpython-313.pyc
  24. BIN
      src/components/tools/__pycache__/knowledge_retrieval_tools.cpython-313.pyc
  25. BIN
      src/components/tools/__pycache__/nanobanana_tools.cpython-313.pyc
  26. BIN
      src/components/tools/__pycache__/segment_tools.cpython-313.pyc
  27. BIN
      src/states/__pycache__/__init__.cpython-313.pyc
  28. BIN
      src/states/__pycache__/what_deconstruction_state.cpython-313.pyc
  29. BIN
      src/utils/__pycache__/__init__.cpython-313.pyc
  30. BIN
      src/utils/__pycache__/logger.cpython-313.pyc
  31. BIN
      src/workflows/__pycache__/__init__.cpython-313.pyc
  32. 46 0
      task_schedule.py
  33. 29 0
      utils/ali_oss.py
  34. 108 0
      utils/general.py
  35. 20 0
      utils/params.py
  36. 86 0
      utils/sync_mysql_help.py

+ 1 - 0
.gitignore

@@ -6,6 +6,7 @@ logs/
 examples/html/
 examples/output_*.json
 examples/*.zip
+*.pyc
 examples/videos/
 
 # C extensions

+ 88 - 0
decode_task/decodeTask.py

@@ -0,0 +1,88 @@
+from ast import Dict
+import json
+import re
+from loguru import logger
+import sys
+import time
+from utils.sync_mysql_help import mysql
+from typing import List
+from datetime import datetime
+from utils.params import DecodeWorkflowParam
+from src.workflows.decode_workflow import DecodeWorkflow
+
+
+from src.workflows.what_deconstruction_workflow import WhatDeconstructionWorkflow
+
+
+logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True)
+
+
+
+   
+
+
+def invoke_decode_workflow(task_params: Dict[DecodeWorkflowParam]):
+    """主函数"""
+
+    result = DecodeWorkflow(task_params)
+    if result:
+            return result
+    else:
+        print(f"❌ 保存结果失败,但将继续处理")
+        return None
+
+
+def get_decode_result_by_id(task_id:str):
+    sql = "SELECT * FROM decode_record WHERE task_id = %s AND task_status = 2 "
+    tasks = mysql.fetchone(sql, (task_id,))
+    if not tasks:
+        logger.info(f"task_id = {task_id} , 任务不存在")
+        return None
+    return tasks['decode_result'], tasks['task_status']
+
+
+def  decode_task_status_handler():
+    # 从数据库中获取任务,每次获取一个
+    sql = "SELECT * FROM decode_record WHERE task_status = 0 "
+    """json"""
+    task = mysql.fetchone(sql)
+    task_id = task['task_id']
+    video_url = task['video_url']
+
+    if not task:
+        logger.info("任务列表为空")
+        return
+    else:
+        sql = "UPDATE decode_record SET task_status = 1 WHERE task_id = %s"
+        mysql.execute(sql, (task_id,))     
+        
+        # 获取任务结果
+    try:
+        video_id = task['video_id']
+        task_params = {'task_id':task_id, 'video_id':video_id, 'video_url':video_url}
+        logger.info(f"task_id = {task_id} , video_id = {video_id}")
+        task_create_timestamp = task['create_timestamp']
+        current_timestamp = int(time.time() * 1000)
+
+        decode_result = invoke_decode_workflow(task_params)
+
+        if decode_result:
+            # 更新任务状态为2,任务完成
+            sql = "UPDATE decode_record SET task_status = 2, WHERE task_id = %s"
+            mysql.execute(sql, (task_id))
+            logger.info(f"task_id = {task_id} , decode_result = {decode_result}")
+
+           
+        else:
+            if current_timestamp - task_create_timestamp > 1000 * 60 * 60:
+                sql = "UPDATE decode_record SET task_status = 3, WHERE task_id = %s"
+                mysql.execute(sql, (task_id))
+                logger.info(f"task_id = {task_id} ,任务状态异常")
+    except Exception as e:
+        logger.error(f"task_id = {task_id} , error = {e}")
+        sql = "UPDATE decode_record SET task_status = 3, WHERE task_id = %s"
+        mysql.execute(sql,  (task_id))
+        logger.info(f"task_id = {task_id} ,任务异常")
+        raise {"task_id": task_id, "error": '任务异常'}
+
+

+ 170 - 0
decode_task/scriptTask.py

@@ -0,0 +1,170 @@
+import json
+from loguru import logger
+import sys
+import time
+from typing import Dict, Any, List
+from datetime import datetime
+from utils.sync_mysql_help import mysql
+from examples.run_batch_script import build_script_input
+
+from src.workflows.script_workflow import ScriptWorkflow
+
+
+logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True)
+
+
+def invoke_script_workflow(decode_result: Dict[str, Any], script_result: Dict[str, Any]):
+    """主函数"""
+   
+
+    # 读取原始三点解构结果
+    raw_results: List[Dict[str, Any]] = decode_result.get("results", []) or []
+
+    # 读取已有的脚本理解输出,支持增量追加
+    output_data = script_result if isinstance(script_result, dict) else None
+    if not output_data:
+        output_data = {
+            "timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
+            "total": 0,
+            "success_count": 0,
+            "fail_count": 0,
+            "results": [],
+        }
+
+    existing_results: List[Dict[str, Any]] = output_data.get("results", []) or []
+    # 用 channel_content_id + video URL 去重,避免重复处理
+    processed_keys = {
+        f"{item.get('video_data', {}).get('channel_content_id','')}|"
+        f"{item.get('video_data', {}).get('video','')}"
+        for item in existing_results
+    }
+
+    workflow = ScriptWorkflow()
+
+    for item in raw_results:
+        video_data = item.get("video_data", {}) or {}
+        result = item.get("result", {}) or {}
+
+        key = f"{video_data.get('channel_content_id','')}|{video_data.get('video','')}"
+        if key in processed_keys:
+            logger.info(f"已处理过该视频,跳过: {key}")
+            continue
+
+        logger.info(f"处理视频: channel_content_id={video_data.get('channel_content_id')} title={video_data.get('title')}")
+
+        try:
+            script_input = build_script_input(video_data, result)
+            script_result = workflow.invoke(script_input)
+
+            record = {
+                "video_data": video_data,
+                "what_deconstruction_result": result,
+                "script_result": script_result,
+                "success": True,
+                "error": None,
+            }
+
+            output_data["success_count"] = output_data.get("success_count", 0) + 1
+
+        except Exception as e:
+            logger.error(f"脚本理解处理失败: {e}", exc_info=True)
+            record = {
+                "video_data": video_data,
+                "what_deconstruction_result": result,
+                "script_result": None,
+                "success": False,
+                "error": str(e),
+            }
+            output_data["fail_count"] = output_data.get("fail_count", 0) + 1
+
+        output_data["results"].append(record)
+        output_data["total"] = output_data.get("total", 0) + 1
+
+    # 返回序列化后的 JSON 字符串(供数据库存储)
+    return json.dumps(output_data, ensure_ascii=False)
+
+    
+
+
+def get_script_result_by_id(task_id:str):
+    sql = "SELECT * FROM decode_record WHERE task_id = %s AND task_status = 4 "
+    tasks = mysql.fetchone(sql, (task_id,))
+    if not tasks:
+        logger.info(f"task_id = {task_id} , 任务不存在")
+        return None
+    return tasks['decode_result'], tasks['script_result'], tasks['task_status']
+
+
+def  script_task_status_handler():
+    # 从数据库中获取任务,每次获取一个
+    sql = "SELECT * FROM decode_record WHERE task_status = 2 "
+    """json"""
+    tasks = mysql.fetchall(sql)
+    # tasks = mysql.fetchall(sql,())
+
+    if not tasks:
+        logger.info("script任务列表为空")
+        return
+    for task in tasks:
+        task_id = task['task_id']
+        # 解析 decode_result(可能为 None 或已是对象)
+        decode_result_raw = task.get('decode_result')
+        if isinstance(decode_result_raw, (dict, list)):
+            decode_result = decode_result_raw
+        elif decode_result_raw:
+            try:
+                decode_result = json.loads(decode_result_raw)
+            except Exception as e:
+                logger.error(f"task_id = {task_id} , decode_result 解析失败: {e}")
+                decode_result = {"results": []}
+        else:
+            decode_result = {"results": []}
+
+        # 解析 script_result(可能为 None 或已是对象)
+        script_result_raw = task.get('script_result')
+        if isinstance(script_result_raw, (dict, list)):
+            script_result = script_result_raw
+        elif script_result_raw:
+            try:
+                script_result = json.loads(script_result_raw)
+            except Exception as e:
+                logger.error(f"task_id = {task_id} , script_result 解析失败: {e}")
+                script_result = None
+        else:
+            script_result = None
+
+        logger.info(f"task_id = {task_id} , decode_result = {decode_result}")
+        #如何任务超过30分钟,则认为任务超时,更新任务状态为3
+        task_create_timestamp = task['create_timestamp']
+        current_timestamp = int(time.time() * 1000)
+
+
+        try:
+            sql = "UPDATE decode_record SET task_status = 3,task_id = %s"
+            mysql.execute(sql, (task_id))
+        
+        except Exception as e:
+            logger.error(f"task_id = {task_id} , error = {e}")
+            sql = "UPDATE decode_record SET task_status = 3, script_result = '任务异常' WHERE task_id = %s"
+            mysql.execute(sql,  (task_id))
+            logger.info(f"task_id = {task_id} ,任务异常")
+            continue
+
+        try:
+            result = invoke_script_workflow(decode_result, script_result)
+            if result:
+                # 更新任务状态为2,任务完成
+                sql = "UPDATE decode_record SET task_status = 4, script_result = %s WHERE task_id = %s"
+                mysql.execute(sql, (result, task_id))
+                logger.info(f"task_id = {task_id} , script_result = {result}")
+            else:
+                if current_timestamp - task_create_timestamp > 1000 * 60 * 60:
+                    sql = "UPDATE decode_record SET task_status = 3, script_result = %s WHERE task_id = %s"
+                    mysql.execute(sql, (result, task_id))
+                    logger.info(f"task_id = {task_id} ,任务状态异常")
+        except Exception as e:
+            logger.error(f"task_id = {task_id} , error = {e}")
+            sql = "UPDATE decode_record SET task_status = 3, script_result = '任务异常' WHERE task_id = %s"
+            mysql.execute(sql,  (task_id))
+            logger.info(f"task_id = {task_id} ,任务异常")
+            continue

+ 210 - 0
loggers/sls.py

@@ -0,0 +1,210 @@
+# import json
+# import time
+# import traceback
+# from syslog import syslog
+# from typing import Optional, List
+
+# from aliyun.log import LogClient, LogItem, PutLogsRequest, GetLogsRequest
+# from loguru import logger
+# from tornado.process import task_id
+
+
+# from utils import get_global_config
+# from datetime import datetime
+
+# _config = get_global_config().log.aliyun
+
+
+# class AliyunLog(object):
+#     client = LogClient(endpoint=_config.endpoint,
+#                        accessKey=_config.access_key_secret,
+#                        accessKeyId=_config.access_key_id)
+#     project_name = 'cyber-crawler-prod'
+#     logstore_name = 'error-log'
+#     process_logstore_name = 'process-log'
+
+#     @classmethod
+#     def record(cls, task: CrawlerTask, stacktrace: str):
+#         log_item = LogItem()
+#         log_item.set_contents([
+#             ('task_id', task.task_id),
+#             ('plan_id', task.plan_id),
+#             ('plan_type', str(task.plan_type.value.id)),
+#             ('channel', str(task.channel.value.id)),
+#             ('crawler_mode', str(task.crawler_mode.value.id)),
+#             ('task_params', task.task_params),
+#             ('stacktrace', stacktrace),
+#         ])
+#         request = PutLogsRequest(project=cls.project_name,
+#                                  logstore=cls.logstore_name,
+#                                  logitems=[log_item],
+#                                  compress=False)
+#         cls.client.put_logs(request)
+
+#     @classmethod
+#     def process(cls, task: CrawlerTask, process_step: str, log_type: str, message: str,
+#                     content: Optional[AiDitContent],
+#                     account: Optional[AiDitAccount],
+#                     content_portrait: Optional[List[CrawlerContentPortrait]],
+#                     account_portrait: Optional[List[CrawlerAccountPortrait]]):
+#         """
+#         记录任务执行&爬取过程
+#         process_step: crawler、skip、filter、after_filter
+#         log_type: content、content_portrait、account_portrait
+#         """
+#         try:
+#             # 序列化
+#             # 只有在对象不为 None 时才进行序列化,否则为 None
+#             content_str = content.model_dump_json() if content else None
+#             account_str = account.model_dump_json() if account else None
+#             # 序列化
+#             if content_portrait:
+#                 # 使用列表推导式将每个对象转换为字典,然后序列化整个列表
+#                 content_portrait_str = json.dumps([item.model_dump() for item in content_portrait])
+#             else:
+#                 content_portrait_str = None
+
+#             if account_portrait:
+#                 # 使用列表推导式将每个对象转换为字典,然后序列化整个列表
+#                 account_portrait_str = json.dumps([item.model_dump() for item in account_portrait])
+#             else:
+#                 account_portrait_str = None
+
+#             log_item = LogItem()
+#             task_id = task.task_id
+#             plan_id = task.plan_id
+#             plan_type = ''
+#             if task.plan_type is not None:
+#                 plan_type = str(task.plan_type.value.id)
+#             channel = ''
+#             if task.channel is not None:
+#                 channel = str(task.channel.value.id)
+#             crawler_mode = ''
+#             if task.crawler_mode is not None:
+#                 crawler_mode = str(task.crawler_mode.value.id)
+#             task_params = ''
+#             if task.task_params is not None:
+#                 task_params = json.dumps(task.task_params)
+
+#             log_item.set_contents([
+#                 # ('task_id', task.task_id),
+#                 # ('plan_id', task.plan_id),
+#                 # ('plan_type', str(task.plan_type.value.id)),
+#                 # ('channel', str(task.channel.value.id)),
+#                 # ('crawler_mode', str(task.crawler_mode.value.id)),
+#                 # ('task_params', task.task_params),
+#                 ('task_id', task_id),
+#                 ('plan_id', plan_id),
+#                 ('plan_type', plan_type),
+#                 ('channel', channel),
+#                 ('crawler_mode', crawler_mode),
+#                 ('task_params', task_params),
+#                 ('process_step', process_step),
+#                 ('log_type', log_type),
+#                 ('message', message),
+#                 ('content', content_str or ''),
+#                 ('account', account_str or ''),
+#                 ('content_portrait', content_portrait_str or ''),
+#                 ('account_portrait', account_portrait_str or ''),
+#                 ('timestamp', str(time.time())),
+#             ])
+#             request = PutLogsRequest(project=cls.project_name,
+#                                      logstore=cls.process_logstore_name,
+#                                      logitems=[log_item],
+#                                      compress=False)
+#             cls.client.put_logs(request)
+#         except Exception as e:
+#             traceback.print_exc()
+
+#     @classmethod
+#     def info(cls, path: str, channel: int, params: str, response: str, status_code: int, msg: str = '',
+#              token: str = ''):
+#         log_item = LogItem()
+#         log_item.set_contents([
+#             ('path', path),
+#             ('channel', channel),
+#             ('params', params),
+#             ('response', response),
+#             ('status_code', status_code),
+#             ('msg', msg),
+#             ('token', token)
+#         ])
+#         request = PutLogsRequest(project=cls.project_name,
+#                                  logstore='request-log',
+#                                  logitems=[log_item],
+#                                  compress=False)
+#         cls.client.put_logs(request)
+
+#     @classmethod
+#     def req_info(cls, channel: str, params: str, response: str, source: str, path: str = '/', status_code: int = 0,
+#                  token: str = ''):
+#         try:
+#             log_item = LogItem()
+#             log_item.set_contents([
+#                 ('channel', channel),
+#                 ('params', str(params)),
+#                 ('response', str(response)),
+#                 ('path', path),
+#                 ('source', source),
+#                 ('status_code', str(status_code)),
+#                 ('token', token)
+#             ])
+#             request = PutLogsRequest(project=cls.project_name,
+#                                      logstore='info-log',
+#                                      logitems=[log_item],
+#                                      compress=False)
+#             cls.client.put_logs(request)
+#         except Exception as e:
+#             logger.error(f"AliyunLog.req_info error: {e}")
+#             pass
+
+#     @classmethod
+#     def http_req_info(cls, path: str, params: str, response: str, status_code: int = 0):
+#         log_item = LogItem()
+#         log_item.set_contents([
+#             ('path', path),
+#             ('params', params),
+#             ('response', response),
+#             ('status_code', status_code)
+#         ])
+#         request = PutLogsRequest(project=cls.project_name,
+#                                  logstore='info-log',
+#                                  logitems=[log_item],
+#                                  compress=False)
+#         cls.client.put_logs(request)
+
+#     @classmethod
+#     def get_log(cls):
+#         from_time = int(datetime.now().timestamp() * 1000) - 1000 * 60 * 60 * 24
+#         to_time = int(datetime.now().timestamp() * 1000)
+
+#         response = cls.client.get_logs(GetLogsRequest(project='cyber-crawler-prod',
+#                                                       logstore='request-log',
+#                                                       topic='',
+#                                                       fromTime=from_time,
+#                                                       toTime=to_time,
+#                                                       query='path: /crawler/moonshot/kimi and status_code :10000'))
+#         print(response.body)
+#         return response
+
+
+# class AliyunHkLog(object):
+#     client = LogClient(endpoint='cn-hongkong.log.aliyuncs.com',
+#                        accessKey=_config.access_key_secret,
+#                        accessKeyId=_config.access_key_id)
+#     project_name = 'cyber-crawler-prod'
+
+#     @classmethod
+#     def get_log(cls, query: str, project_name: str = 'cyber-crawler-prod', logstore_name: str = 'request-log'):
+#         today = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
+#         from_time = int(today.timestamp()) - 24 * 60 * 60
+#         to_time = int(today.timestamp())
+
+#         response = cls.client.get_logs(GetLogsRequest(project=cls.project_name,
+#                                                       logstore=logstore_name,
+#                                                       topic='',
+#                                                       fromTime=from_time,
+#                                                       toTime=to_time,
+#                                                       query=query))
+
+#         return response.body

+ 96 - 0
main.py

@@ -0,0 +1,96 @@
+from ast import main
+import json
+import uuid
+from fastapi import FastAPI, HTTPException, Request
+from fastapi.responses import JSONResponse
+from pydantic import BaseModel
+from utils.params import TaskStatusParam, DecodeListParam
+from dotenv import load_dotenv
+
+
+from decode_task.decodeTask import get_decode_result_by_id
+from decode_task.scriptTask import get_script_result_by_id
+
+
+
+from typing import List
+from models.decode_record import DecodeRecord
+from task_schedule import TaskScheduler
+
+from loguru import logger
+import sys
+
+logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True)
+
+load_dotenv()
+app = FastAPI()
+
+scheduler = TaskScheduler()
+
+
+@app.exception_handler(HTTPException)
+async def http_exception_handler(request: Request, exc: HTTPException):
+    return JSONResponse(
+        status_code=200,
+        content={"code": exc.status_code, "message": exc.message, "data": None}
+    )
+
+@app.on_event("startup")
+def startup_event():
+    scheduler.start()
+
+
+
+
+  
+
+
+@app.post("/decodeVideo/create")
+def decode_video(param:DecodeListParam):
+
+    video_list = param.video_list
+    data_list = []
+    for video in video_list:
+        video_id = video.channel_content_id
+        video_url = video.video
+        task_id = str(uuid.uuid4())
+        DecodeRecord(
+            task_id=task_id,
+            video_id=video_id,
+            video_url=video_url,
+            task_status = 0
+        ).save()
+        data_list.append({
+            "task_id": task_id,
+            "video_id": video_id,
+        })
+    return {
+        "code": 0,
+        "message": "success",
+        "data": data_list
+    }
+
+
+
+
+@app.post("/decode/result")
+def get_script_result(param: TaskStatusParam):
+    decode_result,script_result, task_status = get_script_result_by_id(param.task_id)
+    if script_result:
+        return {
+            "code": 0,
+            "message": "success",
+            "data": {
+                "script_result": script_result,
+                "decode_result": decode_result,
+                "task_status": task_status
+            }
+        }
+    else:
+        return {
+            "code": -1,
+            "message": "error",
+            "data": None
+        }
+
+

+ 30 - 0
models/decode_record.py

@@ -0,0 +1,30 @@
+from typing import Optional
+
+from pydantic import BaseModel, Field
+from typing_extensions import Annotated
+
+from utils.general import get_now_ts
+from utils.sync_mysql_help import mysql
+
+class DecodeRecord(BaseModel):
+    table_name:       Annotated[Optional[str], Field(description='表名', exclude=True)] = 'decode_record'
+
+    task_id:          Annotated[str, Field(description='任务ID')]
+    video_id:            Annotated[str, Field(description='视频ID')]
+    video_url:            Annotated[str, Field(description='视频地址')]
+    status:      Annotated[Optional[int], Field(description='任务状态', default=1)]  # 0:待执行  /   1:执行中  /  2:执行成功  3:执行失败
+    create_timestamp: Annotated[Optional[int], Field(description='任务创建时间戳', default_factory=get_now_ts)]
+
+    def save(self):
+
+        record = self.model_dump(exclude={'table_name'})
+        keys = record.keys()
+        sql = f'''INSERT INTO {self.table_name} ({", ".join(keys)}) VALUES ({", ".join(["%s"] * len(keys))})'''
+        mysql.execute(sql, tuple([record[key] for key in keys]))
+
+    async def async_save(self):
+    
+        record = self.model_dump(exclude={'table_name'})
+        keys = record.keys()
+        sql = f'''INSERT INTO {self.table_name} ({", ".join(keys)}) VALUES ({", ".join(["%s"] * len(keys))})'''
+        await mysql.execute(sql, tuple([record[key] for key in keys]))

+ 31 - 0
models/decode_result_record.py

@@ -0,0 +1,31 @@
+from typing import Optional
+
+from pydantic import BaseModel, Field
+from typing_extensions import Annotated
+
+from utils.general import get_now_ts
+from utils.sync_mysql_help import mysql
+
+class DecodeResultRecord(BaseModel):
+    table_name:       Annotated[Optional[str], Field(description='表名', exclude=True)] = 'decode_result_record'
+
+    task_id:          Annotated[str, Field(description='任务ID')]
+    model:            Annotated[str, Field(description='模型名称')]
+    task_params:      Annotated[Optional[str], Field(description='任务参数', default=None)]
+    task_status:      Annotated[Optional[int], Field(description='任务状态', default=1)]  # 1: 进行中, 2: 已完成, 3: 失败
+    task_result:      Annotated[Optional[str], Field(description='任务结果', default=None)]
+    create_timestamp: Annotated[Optional[int], Field(description='任务创建时间戳', default_factory=get_now_ts)]
+
+    def save(self):
+
+        record = self.model_dump(exclude={'table_name'})
+        keys = record.keys()
+        sql = f'''INSERT INTO {self.table_name} ({", ".join(keys)}) VALUES ({", ".join(["%s"] * len(keys))})'''
+        mysql.execute(sql, tuple([record[key] for key in keys]))
+
+    async def async_save(self):
+
+        record = self.model_dump(exclude={'table_name'})
+        keys = record.keys()
+        sql = f'''INSERT INTO {self.table_name} ({", ".join(keys)}) VALUES ({", ".join(["%s"] * len(keys))})'''
+        await mysql.execute(sql, tuple([record[key] for key in keys]))

+ 12 - 2
requirements.txt

@@ -13,6 +13,16 @@ aiohttp>=3.9.0
 pytest>=8.0.0
 pytest-asyncio>=0.24.0
 aiofiles>=24.0.0
+fastapi>=0.111.0
+uvicorn>=0.30.0
+loguru>=0.7.0
+APScheduler>=3.10.4
+redis>=5.0.0
+PyMySQL>=1.1.0
+DBUtils>=3.0.3
+dashscope>=1.14.0
+oss2>=2.17.0
+python-dotenv
+socksio>=1.0.0
 sqlalchemy>=2.0.0
-pymysql>=1.1.0
-cryptography>=41.0.0
+cryptography>=41.0.0

BIN
src/__pycache__/__init__.cpython-313.pyc


BIN
src/components/__pycache__/__init__.cpython-313.pyc


BIN
src/components/agents/__pycache__/__init__.cpython-313.pyc


BIN
src/components/agents/__pycache__/base.cpython-313.pyc


BIN
src/components/agents/__pycache__/inspiration_points_agent.cpython-313.pyc


BIN
src/components/agents/__pycache__/key_points_agent.cpython-313.pyc


BIN
src/components/agents/__pycache__/purpose_point_agent.cpython-313.pyc


BIN
src/components/agents/__pycache__/search_keyword_agent.cpython-313.pyc


BIN
src/components/agents/__pycache__/topic_agent_v2.cpython-313.pyc


BIN
src/components/agents/__pycache__/topic_selection_understanding_agent.cpython-313.pyc


BIN
src/components/functions/__pycache__/__init__.cpython-313.pyc


BIN
src/components/functions/__pycache__/base.cpython-313.pyc


BIN
src/components/functions/__pycache__/json_utils.cpython-313.pyc


BIN
src/components/tools/__pycache__/__init__.cpython-313.pyc


BIN
src/components/tools/__pycache__/base.cpython-313.pyc


BIN
src/components/tools/__pycache__/knowledge_retrieval_tools.cpython-313.pyc


BIN
src/components/tools/__pycache__/nanobanana_tools.cpython-313.pyc


BIN
src/components/tools/__pycache__/segment_tools.cpython-313.pyc


BIN
src/states/__pycache__/__init__.cpython-313.pyc


BIN
src/states/__pycache__/what_deconstruction_state.cpython-313.pyc


BIN
src/utils/__pycache__/__init__.cpython-313.pyc


BIN
src/utils/__pycache__/logger.cpython-313.pyc


BIN
src/workflows/__pycache__/__init__.cpython-313.pyc


+ 46 - 0
task_schedule.py

@@ -0,0 +1,46 @@
+from apscheduler.schedulers.background import BackgroundScheduler
+from apscheduler.triggers.interval import IntervalTrigger
+from decode_task.decodeTask import  decode_task_status_handler
+from decode_task.scriptTask import  script_task_status_handler
+from loguru import logger
+
+
+class TaskScheduler:
+    def __init__(self):
+        self.scheduler = BackgroundScheduler()
+        self.setup_jobs()
+
+    def setup_jobs(self):
+        """设置定时任务"""
+
+        self.scheduler.add_job(
+            func=decode_task_status_handler,
+            trigger=IntervalTrigger(seconds=30),
+            id='decode_task_handler',
+            name='decode_task任务刷新',
+            max_instances=1  # 防止重复执行
+        )
+
+        # self.scheduler.add_job(
+        #     func=script_task_status_handler,
+        #     trigger=IntervalTrigger(seconds=30),
+        #     id='script_task_status_handler',
+        #     name='script_task任务刷新',
+        #     max_instances=1  # 防止重复执行
+        # )
+
+    def start(self):
+        """启动调度器"""
+        logger.info("定时任务调度器启动")
+        try:
+            self.scheduler.start()
+        except KeyboardInterrupt:
+            logger.info("收到停止信号,正在关闭调度器...")
+            self.shutdown()
+
+    def shutdown(self):
+        """关闭调度器"""
+        self.scheduler.shutdown()
+        logger.info("定时任务调度器已关闭")
+
+

+ 29 - 0
utils/ali_oss.py

@@ -0,0 +1,29 @@
+import os
+from hashlib import md5
+import oss2
+from oss2.credentials import EnvironmentVariableCredentialsProvider
+
+
+os.environ['OSS_ACCESS_KEY_ID'] = 'LTAI5tEYvefc4U3fyU5du225'
+os.environ['OSS_ACCESS_KEY_SECRET'] = 'Z1gZtAGe8NwRtXgPzVgRMkRez4Ex4K'
+OSS_BUCKET_ENDPOINT = 'oss-accelerate.aliyuncs.com'
+OSS_BUCKET_NAME = 'aigc-admin'
+OSS_BUCKET_PATH = 'crawler'
+OSS_BUCKET_REGION = 'ap-southeast-1'
+
+def local_file_upload(local_file_path: str):
+    """
+    上传本地文件
+    """
+    
+    auth = oss2.ProviderAuthV4(EnvironmentVariableCredentialsProvider())
+    # 创建Bucket实例,指定存储空间的名称和Region信息。
+    bucket = oss2.Bucket(auth, OSS_BUCKET_ENDPOINT, OSS_BUCKET_NAME, region=OSS_BUCKET_REGION)
+
+    # 本地文件的完整路径
+    file_suffix = local_file_path.split('.')[-1]
+    objectName = f'crawler/video/{md5(local_file_path.encode("utf-8")).hexdigest()}.{file_suffix}'
+
+    # 使用put_object_from_file方法将本地文件上传至OSS
+    bucket.put_object_from_file(objectName, local_file_path)
+    return f'https://res.cybertogether.net/{objectName}'

+ 108 - 0
utils/general.py

@@ -0,0 +1,108 @@
+
+import os
+import re
+from datetime import datetime, timedelta
+from pathlib import Path
+from random import randint
+from typing import Any, List, Literal, Optional, Tuple
+
+
+_STR_DIGITAL_MAP = {
+    'k': 1e3,
+    '千': 1e3,
+    'w': 1e4,
+    '万': 1e4,
+    '亿': 1e8,
+}
+
+def get_root_dir() -> str:
+    """获取项目根目录的绝对路径"""
+    current_path = Path(__file__).resolve()
+    root_path = current_path.parent if not current_path.is_dir() else current_path
+    while True:
+        if 'requirements.txt' in os.listdir(root_path):
+            return str(root_path.absolute())
+        root_path = root_path.parent
+
+
+def get_abs_path(relative_path: str) -> str:
+    """获取目标文件或目录的相对路径在系统中的绝对路径"""
+    return os.path.join(get_root_dir(), relative_path)
+
+
+
+
+def get_now_ts(length: Literal[10, 13] = 13) -> int:
+    """获取当前时间的10位或13位时间戳"""
+    now_ts = datetime.now().timestamp()
+    if length == 10:
+        return int(now_ts)
+    else:
+        return int(now_ts * 1000)
+
+
+def pascal_to_snake(pascal_str: str) -> str:
+    """将Pascal字符串转为蛇形字符串"""
+    snake_str = re.sub(r'([a-z])([A-Z])', r'\1_\2', pascal_str)
+    return snake_str.lower()
+
+
+def snake_to_pascal(snake_str: str) -> str:
+    """将蛇形字符串转为Pascal字符串"""
+    return ''.join([item.title() for item in snake_str.split('_')])
+
+
+def generate_task_id():
+    """生成task_id"""
+    now = datetime.now().strftime('%Y%m%d%H%M%S%f')[:-3]
+    return f'{now}{randint(100000, 999999)}'
+
+
+def get_x_day_ago_timestamp(x_days_ago: int = 0, length: Literal[13, 10] = 13):
+    now = datetime.now()
+    yesterday = now - timedelta(days=x_days_ago)
+    yesterday_midnight = yesterday.replace(hour=0, minute=0, second=0, microsecond=0)
+    if length == 10:
+        return int(yesterday_midnight.timestamp())
+    else:
+        return int(yesterday_midnight.timestamp() * 1000)
+
+
+def get_file_size(file_path):
+    with open(file_path, 'rb') as f:
+        return len(f.read())
+
+
+def cookie_str_to_json(cookie: str) -> dict:
+    components = cookie.strip().split(';')
+    short_parsed_request = {component.split('=')[0].strip(): component.split('=')[1].strip() for component in components if
+                            '=' in component}
+    return short_parsed_request
+
+def get_day_timestamp(day):
+    now = datetime.now()
+    yesterday = now - timedelta(days=day)
+    yesterday_midnight = yesterday.replace(hour=0, minute=0, second=0, microsecond=0)
+    timestamp_yesterday_midnight_ms = int(yesterday_midnight.timestamp())
+
+    return timestamp_yesterday_midnight_ms
+
+
+def str_digital_to_int(v: str) -> int:
+    """将带有+、k、w字符的数据类字符串转为int"""
+    v = v.rstrip('+')
+    unit = v[-1]
+    if unit in _STR_DIGITAL_MAP:
+        v = float(v.rstrip(unit)) * _STR_DIGITAL_MAP[unit]
+    else:
+        v = float(v)
+    return int(v)
+
+def datetime_to_timestamp(time_str:str):
+    dt = datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S")
+    # 转换为时间戳
+    timestamp = int(dt.timestamp())*1000
+    return timestamp
+
+if __name__ == '__main__':
+    print(datetime_to_timestamp("2024-10-30 19:05:56"))

+ 20 - 0
utils/params.py

@@ -0,0 +1,20 @@
+from pydantic import BaseModel
+from typing import List
+
+
+
+class TaskStatusParam(BaseModel):
+    task_id: str
+
+class DecodeParam(BaseModel):
+    channel_content_id: str
+    video: str
+    title: str
+
+class DecodeWorkflowParam(BaseModel):
+    video_id: str
+    video_url: str
+    task_id: str
+
+class DecodeListParam(BaseModel):
+    video_list: List[DecodeParam]

+ 86 - 0
utils/sync_mysql_help.py

@@ -0,0 +1,86 @@
+import pymysql
+
+
+from typing import Tuple, Any, Dict, Literal, Optional
+from dbutils.pooled_db import PooledDB, PooledDedicatedDBConnection
+from dbutils.steady_db import SteadyDBCursor
+from pymysql.cursors import DictCursor
+
+
+class SyncMySQLHelper(object):
+    _pool: PooledDB = None
+    _instance = None
+
+    def __new__(cls, *args, **kwargs):
+        """单例"""
+        if cls._instance is None:
+            cls._instance = super().__new__(cls, *args, **kwargs)
+        return cls._instance
+
+    def get_pool(self):
+        if self._pool is None:
+            self._pool = PooledDB(
+                creator=pymysql,
+                mincached=10,
+                maxconnections=20,
+                blocking=True,
+                host='rm-bp1fl9aw4hn6u864a.mysql.rds.aliyuncs.com',
+                port=3306,
+                user='content_rw',
+                password='bC1aH4bA1lB0',
+                database='content-deconstruction')
+
+        return self._pool
+
+    def fetchone(self, sql: str, data: Optional[Tuple[Any, ...]] = None) -> Dict[str, Any]:
+        pool = self.get_pool()
+        with pool.connection() as conn:  
+            with conn.cursor(DictCursor) as cursor: 
+                cursor.execute(sql, data)
+                result = cursor.fetchone()
+                return result
+
+    def fetchall(self, sql: str, data: Optional[Tuple[Any, ...]] = None) -> Tuple[Dict[str, Any]]:
+        pool = self.get_pool()
+        with pool.connection() as conn: 
+            with conn.cursor(DictCursor) as cursor: 
+                cursor.execute(sql, data)
+                result = cursor.fetchall()
+                return result
+
+    def fetchmany(self,
+                  sql: str,
+                  data: Optional[Tuple[Any, ...]] = None,
+                  size: Optional[int] = None) -> Tuple[Dict[str, Any]]:
+        pool = self.get_pool()
+        with pool.connection() as conn:  
+            with conn.cursor(DictCursor) as cursor: 
+                cursor.execute(sql, data)
+                result = cursor.fetchmany(size=size)
+                return result
+
+    def execute(self, sql: str, data: Optional[Tuple[Any, ...]] = None):
+        pool = self.get_pool()
+        with pool.connection() as conn:  
+            with conn.cursor(DictCursor) as cursor:  
+                try:
+                    cursor.execute(sql, data)
+                    result = conn.commit()
+                    return result
+                except pymysql.err.IntegrityError as e:
+                    if e.args[0] == 1062:  # 重复值
+                        return None
+                    else:
+                        raise e
+                except pymysql.err.OperationalError as e:
+                    if e.args[0] == 1205:  # 死锁
+                        conn.rollback()
+                        return None
+                    else:
+                        raise e
+
+
+mysql = SyncMySQLHelper()
+
+
+