Browse Source

feat: 添加视频解构与脚本生成功能及相关基础设施

refactor: 重构代码结构并优化模块组织

build: 更新依赖项以支持新功能

docs: 添加新模块的文档和注释

test: 添加任务调度和数据库操作测试

chore: 清理无用文件并更新.gitignore
max_liu 6 days ago
parent
commit
e980dfb014
48 changed files with 963 additions and 2 deletions
  1. 1 1
      .gitignore
  2. 137 0
      decode_task/decodeTask.py
  3. 150 0
      decode_task/scriptTask.py
  4. BIN
      examples/html 2.zip
  5. 210 0
      loggers/sls.py
  6. 107 0
      main.py
  7. 32 0
      models/decode_record.py
  8. 31 0
      models/decode_result_record.py
  9. 11 1
      requirements.txt
  10. BIN
      src/__pycache__/__init__.cpython-313.pyc
  11. BIN
      src/components/__pycache__/__init__.cpython-313.pyc
  12. BIN
      src/components/agents/__pycache__/__init__.cpython-313.pyc
  13. BIN
      src/components/agents/__pycache__/base.cpython-313.pyc
  14. BIN
      src/components/agents/__pycache__/inspiration_points_agent.cpython-313.pyc
  15. BIN
      src/components/agents/__pycache__/key_points_agent.cpython-313.pyc
  16. BIN
      src/components/agents/__pycache__/purpose_point_agent.cpython-313.pyc
  17. BIN
      src/components/agents/__pycache__/script_form_extraction_agent.cpython-313.pyc
  18. BIN
      src/components/agents/__pycache__/script_section_division_agent.cpython-313.pyc
  19. BIN
      src/components/agents/__pycache__/script_substance_extraction_agent.cpython-313.pyc
  20. BIN
      src/components/agents/__pycache__/search_keyword_agent.cpython-313.pyc
  21. BIN
      src/components/agents/__pycache__/structure_agent.cpython-313.pyc
  22. BIN
      src/components/agents/__pycache__/topic_agent_v2.cpython-313.pyc
  23. BIN
      src/components/agents/__pycache__/topic_selection_understanding_agent.cpython-313.pyc
  24. BIN
      src/components/functions/__pycache__/__init__.cpython-313.pyc
  25. BIN
      src/components/functions/__pycache__/base.cpython-313.pyc
  26. BIN
      src/components/functions/__pycache__/json_utils.cpython-313.pyc
  27. BIN
      src/components/functions/__pycache__/result_aggregation_function.cpython-313.pyc
  28. BIN
      src/components/functions/__pycache__/video_upload_function.cpython-313.pyc
  29. BIN
      src/components/tools/__pycache__/__init__.cpython-313.pyc
  30. BIN
      src/components/tools/__pycache__/base.cpython-313.pyc
  31. BIN
      src/components/tools/__pycache__/knowledge_retrieval_tools.cpython-313.pyc
  32. BIN
      src/components/tools/__pycache__/nanobanana_tools.cpython-313.pyc
  33. BIN
      src/components/tools/__pycache__/segment_tools.cpython-313.pyc
  34. BIN
      src/states/__pycache__/__init__.cpython-313.pyc
  35. BIN
      src/states/__pycache__/script_state.cpython-313.pyc
  36. BIN
      src/states/__pycache__/what_deconstruction_state.cpython-313.pyc
  37. BIN
      src/utils/__pycache__/__init__.cpython-313.pyc
  38. BIN
      src/utils/__pycache__/json_extractor.cpython-313.pyc
  39. BIN
      src/utils/__pycache__/llm_invoker.cpython-313.pyc
  40. BIN
      src/utils/__pycache__/logger.cpython-313.pyc
  41. BIN
      src/workflows/__pycache__/__init__.cpython-313.pyc
  42. BIN
      src/workflows/__pycache__/script_workflow.cpython-313.pyc
  43. BIN
      src/workflows/__pycache__/what_deconstruction_workflow.cpython-313.pyc
  44. 46 0
      task_schedule.py
  45. 29 0
      utils/ali_oss.py
  46. 108 0
      utils/general.py
  47. 15 0
      utils/params.py
  48. 86 0
      utils/sync_mysql_help.py

+ 1 - 1
.gitignore

@@ -6,7 +6,7 @@ logs/
 examples/html/
 examples/output_*.json
 examples/*.zip
-
+*.pyc
 # C extensions
 *.so
 

+ 137 - 0
decode_task/decodeTask.py

@@ -0,0 +1,137 @@
+import json
+from loguru import logger
+import sys
+import time
+from utils.sync_mysql_help import mysql
+from typing import List
+from datetime import datetime
+from utils.params import DecodeParam
+from examples.run_batch import process_single_video
+
+
+from src.workflows.what_deconstruction_workflow import WhatDeconstructionWorkflow
+
+
+logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True)
+
+
+def save_decode_result(results, timestamp, total, success_count, fail_count):
+    output_data = {
+        "timestamp": timestamp,
+        "total": total,
+        "success_count": success_count,
+        "fail_count": fail_count,
+        "results": results
+    }
+    return output_data
+   
+
+
+def invoke_decode_workflow(video_list: List[DecodeParam]):
+    """主函数"""
+    print("=" * 80)
+    print("批量处理视频 - What 解构工作流(视频分析版本)")
+    print("=" * 80)
+
+    # 1. 读取视频列表
+    print(f"\n[1] 读取视频列表...{video_list}")
+
+    # 2. 初始化工作流
+    print("\n[2] 初始化工作流...")
+    try:
+        workflow = WhatDeconstructionWorkflow(
+            model_provider="google_genai",
+            max_depth=10
+        )
+        print(f"✅ 工作流初始化成功")
+    except Exception as e:
+        print(f"❌ 工作流初始化失败: {e}")
+        import traceback
+        traceback.print_exc()
+        return
+
+    # 3. 准备结果文件路径和时间戳
+    print("\n[3] 准备结果文件...")
+   
+
+    # 4. 批量处理视频(每处理完一个立即保存)
+    print("\n[4] 开始批量处理视频(每处理完一个立即保存结果)...")
+    results = []
+    total = len(video_list)
+    success_count = 0
+    fail_count = 0
+
+    for index, video_data in enumerate[DecodeParam](video_list, 1):
+        # 处理单个视频
+        result = process_single_video(workflow, video_data, index, total)
+        results.append(result)
+
+        # 更新统计
+        if result["success"]:
+            success_count += 1
+        else:
+            fail_count += 1
+
+        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
+
+        # 立即保存结果到文件
+        output_data = save_decode_result(results, timestamp, total, success_count, fail_count)
+        # print(f"   保存结果到文件... [{success_count}成功/{fail_count}失败/{total}总计]")
+        if output_data:
+            print(f"✅ 结果已实时保存到: {output_path}")
+            return output_data
+        else:
+            print(f"❌ 保存结果失败,但将继续处理")
+            return None
+
+
+def get_decode_result_by_id(task_id:str):
+    sql = "SELECT * FROM decode_record WHERE task_id = %s AND task_status = 2 "
+    tasks = mysql.fetchone(sql, (task_id,))
+    if not tasks:
+        logger.info(f"task_id = {task_id} , 任务不存在")
+        return None
+    return tasks['decode_result'], tasks['task_status']
+
+
+def  decode_task_status_handler(task_id:str):
+    # 从数据库中获取任务,每次获取一个
+    sql = "SELECT * FROM decode_record WHERE task_status = 1 "
+    """json"""
+    tasks = mysql.fetchall(sql)
+    """字典"""
+    # tasks = mysql.fetchall(sql,())
+
+    if not tasks:
+        logger.info("任务列表为空")
+        return
+    for task in tasks:
+        task_id = task['task_id']
+        task_params = json.loads(task['task_params'])
+        logger.info(f"task_id = {task_id} , task_params = {task_params}")
+        #如何任务超过30分钟,则认为任务超时,更新任务状态为3
+        task_create_timestamp = task['create_timestamp']
+        current_timestamp = int(time.time() * 1000)
+ 
+        
+        # 获取任务结果
+        try:
+            decode_result = invoke_decode_workflow(task_params)
+
+            if decode_result:
+                # 更新任务状态为2,任务完成
+                sql = "UPDATE decode_record SET task_status = 2, decode_result = %s WHERE task_id = %s"
+                mysql.execute(sql, (json.dumps(decode_result, ensure_ascii=False), task_id))
+                logger.info(f"task_id = {task_id} , decode_result = {decode_result}")
+            else:
+                if current_timestamp - task_create_timestamp > 1000 * 60 * 60:
+                    sql = "UPDATE decode_record SET task_status = 3, decode_result = %s WHERE task_id = %s"
+                    mysql.execute(sql, (json.dumps(decode_result, ensure_ascii=False), task_id))
+                    logger.info(f"task_id = {task_id} ,任务状态异常")
+        except Exception as e:
+            logger.error(f"task_id = {task_id} , error = {e}")
+            sql = "UPDATE decode_record SET task_status = 3, decode_result = '任务异常' WHERE task_id = %s"
+            mysql.execute(sql,  (task_id))
+            logger.info(f"task_id = {task_id} ,任务异常")
+            continue
+

+ 150 - 0
decode_task/scriptTask.py

@@ -0,0 +1,150 @@
+import json
+from loguru import logger
+import sys
+import time
+from typing import Dict, Any, List
+from datetime import datetime
+from utils.sync_mysql_help import mysql
+from examples.run_batch_script import build_script_input
+
+from src.workflows.script_workflow import ScriptWorkflow
+
+
+logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True)
+
+
+def invoke_script_workflow(decode_result: Dict[str, Any], script_result: Dict[str, Any]):
+    """主函数"""
+   
+
+    # 读取原始三点解构结果
+    raw_results: List[Dict[str, Any]] = decode_result.get("results", []) or []
+
+    # 读取已有的脚本理解输出,支持增量追加
+    output_data = script_result
+    if not output_data:
+        output_data = {
+            "timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
+            "total": 0,
+            "success_count": 0,
+            "fail_count": 0,
+            "results": [],
+        }
+
+    existing_results: List[Dict[str, Any]] = output_data.get("results", []) or []
+    # 用 channel_content_id + video URL 去重,避免重复处理
+    processed_keys = {
+        f"{item.get('video_data', {}).get('channel_content_id','')}|"
+        f"{item.get('video_data', {}).get('video','')}"
+        for item in existing_results
+    }
+
+    workflow = ScriptWorkflow()
+
+    for item in raw_results:
+        video_data = item.get("video_data", {}) or {}
+        result = item.get("result", {}) or {}
+
+        key = f"{video_data.get('channel_content_id','')}|{video_data.get('video','')}"
+        if key in processed_keys:
+            logger.info(f"已处理过该视频,跳过: {key}")
+            continue
+
+        logger.info(f"处理视频: channel_content_id={video_data.get('channel_content_id')} title={video_data.get('title')}")
+
+        try:
+            script_input = build_script_input(video_data, result)
+            script_result = workflow.invoke(script_input)
+
+            record = {
+                "video_data": video_data,
+                "what_deconstruction_result": result,
+                "script_result": script_result,
+                "success": True,
+                "error": None,
+            }
+
+            output_data["success_count"] = output_data.get("success_count", 0) + 1
+
+        except Exception as e:
+            logger.error(f"脚本理解处理失败: {e}", exc_info=True)
+            record = {
+                "video_data": video_data,
+                "what_deconstruction_result": result,
+                "script_result": None,
+                "success": False,
+                "error": str(e),
+            }
+            output_data["fail_count"] = output_data.get("fail_count", 0) + 1
+
+        output_data["results"].append(record)
+        output_data["total"] = output_data.get("total", 0) + 1
+
+        # 处理完一条就保存一次,避免长任务中途失败导致全部丢失
+
+
+        return json.dump(output_data, ensure_ascii=False, indent=2)
+
+    
+
+
+def get_script_result_by_id(task_id:str):
+    sql = "SELECT * FROM decode_record WHERE task_id = %s AND task_status = 4 "
+    tasks = mysql.fetchone(sql, (task_id,))
+    if not tasks:
+        logger.info(f"task_id = {task_id} , 任务不存在")
+        return None
+    return tasks['script_result'], tasks['decode_result'], tasks['task_status']
+
+
+def  script_task_status_handler(task_id:str):
+    # 从数据库中获取任务,每次获取一个
+    sql = "SELECT * FROM decode_record WHERE task_status = 2 "
+    """json"""
+    tasks = mysql.fetchall(sql)
+    # tasks = mysql.fetchall(sql,())
+
+    if not tasks:
+        logger.info("任务列表为空")
+        return
+    for task in tasks:
+        task_id = task['task_id']
+        decode_result = json.loads(task['decode_result'])
+        script_result = json.loads(task['script_result'])
+
+        logger.info(f"task_id = {task_id} , decode_result = {decode_result}")
+        #如何任务超过30分钟,则认为任务超时,更新任务状态为3
+        task_create_timestamp = task['script_timestamp']
+        current_timestamp = int(time.time() * 1000)
+
+
+        try:
+            sql = "UPDATE decode_record SET task_status = 3,task_id = %s"
+            mysql.execute(sql, (task_id))
+        
+        except Exception as e:
+            logger.error(f"task_id = {task_id} , error = {e}")
+            sql = "UPDATE decode_record SET task_status = 3, script_result = '任务异常' WHERE task_id = %s"
+            mysql.execute(sql,  (task_id))
+            logger.info(f"task_id = {task_id} ,任务异常")
+            continue
+
+        try:
+            result = invoke_script_workflow(decode_result, script_result)
+            if result:
+                # 更新任务状态为2,任务完成
+                sql = "UPDATE decode_record SET task_status = 4, script_result = %s WHERE task_id = %s"
+                mysql.execute(sql, (result, task_id))
+                logger.info(f"task_id = {task_id} , script_result = {result}")
+            else:
+                if current_timestamp - task_create_timestamp > 1000 * 60 * 60:
+                    sql = "UPDATE decode_record SET task_status = 3, script_result = %s WHERE task_id = %s"
+                    mysql.execute(sql, (result, task_id))
+                    logger.info(f"task_id = {task_id} ,任务状态异常")
+        except Exception as e:
+            logger.error(f"task_id = {task_id} , error = {e}")
+            sql = "UPDATE decode_record SET task_status = 3, script_result = '任务异常' WHERE task_id = %s"
+            mysql.execute(sql,  (task_id))
+            logger.info(f"task_id = {task_id} ,任务异常")
+            continue
+

BIN
examples/html 2.zip


+ 210 - 0
loggers/sls.py

@@ -0,0 +1,210 @@
+# import json
+# import time
+# import traceback
+# from syslog import syslog
+# from typing import Optional, List
+
+# from aliyun.log import LogClient, LogItem, PutLogsRequest, GetLogsRequest
+# from loguru import logger
+# from tornado.process import task_id
+
+
+# from utils import get_global_config
+# from datetime import datetime
+
+# _config = get_global_config().log.aliyun
+
+
+# class AliyunLog(object):
+#     client = LogClient(endpoint=_config.endpoint,
+#                        accessKey=_config.access_key_secret,
+#                        accessKeyId=_config.access_key_id)
+#     project_name = 'cyber-crawler-prod'
+#     logstore_name = 'error-log'
+#     process_logstore_name = 'process-log'
+
+#     @classmethod
+#     def record(cls, task: CrawlerTask, stacktrace: str):
+#         log_item = LogItem()
+#         log_item.set_contents([
+#             ('task_id', task.task_id),
+#             ('plan_id', task.plan_id),
+#             ('plan_type', str(task.plan_type.value.id)),
+#             ('channel', str(task.channel.value.id)),
+#             ('crawler_mode', str(task.crawler_mode.value.id)),
+#             ('task_params', task.task_params),
+#             ('stacktrace', stacktrace),
+#         ])
+#         request = PutLogsRequest(project=cls.project_name,
+#                                  logstore=cls.logstore_name,
+#                                  logitems=[log_item],
+#                                  compress=False)
+#         cls.client.put_logs(request)
+
+#     @classmethod
+#     def process(cls, task: CrawlerTask, process_step: str, log_type: str, message: str,
+#                     content: Optional[AiDitContent],
+#                     account: Optional[AiDitAccount],
+#                     content_portrait: Optional[List[CrawlerContentPortrait]],
+#                     account_portrait: Optional[List[CrawlerAccountPortrait]]):
+#         """
+#         记录任务执行&爬取过程
+#         process_step: crawler、skip、filter、after_filter
+#         log_type: content、content_portrait、account_portrait
+#         """
+#         try:
+#             # 序列化
+#             # 只有在对象不为 None 时才进行序列化,否则为 None
+#             content_str = content.model_dump_json() if content else None
+#             account_str = account.model_dump_json() if account else None
+#             # 序列化
+#             if content_portrait:
+#                 # 使用列表推导式将每个对象转换为字典,然后序列化整个列表
+#                 content_portrait_str = json.dumps([item.model_dump() for item in content_portrait])
+#             else:
+#                 content_portrait_str = None
+
+#             if account_portrait:
+#                 # 使用列表推导式将每个对象转换为字典,然后序列化整个列表
+#                 account_portrait_str = json.dumps([item.model_dump() for item in account_portrait])
+#             else:
+#                 account_portrait_str = None
+
+#             log_item = LogItem()
+#             task_id = task.task_id
+#             plan_id = task.plan_id
+#             plan_type = ''
+#             if task.plan_type is not None:
+#                 plan_type = str(task.plan_type.value.id)
+#             channel = ''
+#             if task.channel is not None:
+#                 channel = str(task.channel.value.id)
+#             crawler_mode = ''
+#             if task.crawler_mode is not None:
+#                 crawler_mode = str(task.crawler_mode.value.id)
+#             task_params = ''
+#             if task.task_params is not None:
+#                 task_params = json.dumps(task.task_params)
+
+#             log_item.set_contents([
+#                 # ('task_id', task.task_id),
+#                 # ('plan_id', task.plan_id),
+#                 # ('plan_type', str(task.plan_type.value.id)),
+#                 # ('channel', str(task.channel.value.id)),
+#                 # ('crawler_mode', str(task.crawler_mode.value.id)),
+#                 # ('task_params', task.task_params),
+#                 ('task_id', task_id),
+#                 ('plan_id', plan_id),
+#                 ('plan_type', plan_type),
+#                 ('channel', channel),
+#                 ('crawler_mode', crawler_mode),
+#                 ('task_params', task_params),
+#                 ('process_step', process_step),
+#                 ('log_type', log_type),
+#                 ('message', message),
+#                 ('content', content_str or ''),
+#                 ('account', account_str or ''),
+#                 ('content_portrait', content_portrait_str or ''),
+#                 ('account_portrait', account_portrait_str or ''),
+#                 ('timestamp', str(time.time())),
+#             ])
+#             request = PutLogsRequest(project=cls.project_name,
+#                                      logstore=cls.process_logstore_name,
+#                                      logitems=[log_item],
+#                                      compress=False)
+#             cls.client.put_logs(request)
+#         except Exception as e:
+#             traceback.print_exc()
+
+#     @classmethod
+#     def info(cls, path: str, channel: int, params: str, response: str, status_code: int, msg: str = '',
+#              token: str = ''):
+#         log_item = LogItem()
+#         log_item.set_contents([
+#             ('path', path),
+#             ('channel', channel),
+#             ('params', params),
+#             ('response', response),
+#             ('status_code', status_code),
+#             ('msg', msg),
+#             ('token', token)
+#         ])
+#         request = PutLogsRequest(project=cls.project_name,
+#                                  logstore='request-log',
+#                                  logitems=[log_item],
+#                                  compress=False)
+#         cls.client.put_logs(request)
+
+#     @classmethod
+#     def req_info(cls, channel: str, params: str, response: str, source: str, path: str = '/', status_code: int = 0,
+#                  token: str = ''):
+#         try:
+#             log_item = LogItem()
+#             log_item.set_contents([
+#                 ('channel', channel),
+#                 ('params', str(params)),
+#                 ('response', str(response)),
+#                 ('path', path),
+#                 ('source', source),
+#                 ('status_code', str(status_code)),
+#                 ('token', token)
+#             ])
+#             request = PutLogsRequest(project=cls.project_name,
+#                                      logstore='info-log',
+#                                      logitems=[log_item],
+#                                      compress=False)
+#             cls.client.put_logs(request)
+#         except Exception as e:
+#             logger.error(f"AliyunLog.req_info error: {e}")
+#             pass
+
+#     @classmethod
+#     def http_req_info(cls, path: str, params: str, response: str, status_code: int = 0):
+#         log_item = LogItem()
+#         log_item.set_contents([
+#             ('path', path),
+#             ('params', params),
+#             ('response', response),
+#             ('status_code', status_code)
+#         ])
+#         request = PutLogsRequest(project=cls.project_name,
+#                                  logstore='info-log',
+#                                  logitems=[log_item],
+#                                  compress=False)
+#         cls.client.put_logs(request)
+
+#     @classmethod
+#     def get_log(cls):
+#         from_time = int(datetime.now().timestamp() * 1000) - 1000 * 60 * 60 * 24
+#         to_time = int(datetime.now().timestamp() * 1000)
+
+#         response = cls.client.get_logs(GetLogsRequest(project='cyber-crawler-prod',
+#                                                       logstore='request-log',
+#                                                       topic='',
+#                                                       fromTime=from_time,
+#                                                       toTime=to_time,
+#                                                       query='path: /crawler/moonshot/kimi and status_code :10000'))
+#         print(response.body)
+#         return response
+
+
+# class AliyunHkLog(object):
+#     client = LogClient(endpoint='cn-hongkong.log.aliyuncs.com',
+#                        accessKey=_config.access_key_secret,
+#                        accessKeyId=_config.access_key_id)
+#     project_name = 'cyber-crawler-prod'
+
+#     @classmethod
+#     def get_log(cls, query: str, project_name: str = 'cyber-crawler-prod', logstore_name: str = 'request-log'):
+#         today = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
+#         from_time = int(today.timestamp()) - 24 * 60 * 60
+#         to_time = int(today.timestamp())
+
+#         response = cls.client.get_logs(GetLogsRequest(project=cls.project_name,
+#                                                       logstore=logstore_name,
+#                                                       topic='',
+#                                                       fromTime=from_time,
+#                                                       toTime=to_time,
+#                                                       query=query))
+
+#         return response.body

+ 107 - 0
main.py

@@ -0,0 +1,107 @@
+from ast import main
+import json
+import uuid
+from fastapi import FastAPI, HTTPException, Request
+from fastapi.responses import JSONResponse
+from pydantic import BaseModel
+from utils.params import TaskStatusParam, DecodeListParam
+from dotenv import load_dotenv
+
+
+from decode_task.decodeTask import get_decode_result_by_id
+from decode_task.scriptTask import get_script_result_by_id
+
+
+
+from typing import List
+from models.decode_record import DecodeRecord
+from task_schedule import TaskScheduler
+
+from loguru import logger
+import sys
+
+logger.add(sink=sys.stderr, level="ERROR", backtrace=True, diagnose=True)
+
+load_dotenv()
+app = FastAPI()
+
+scheduler = TaskScheduler()
+
+
+@app.exception_handler(HTTPException)
+async def http_exception_handler(request: Request, exc: HTTPException):
+    return JSONResponse(
+        status_code=200,
+        content={"code": exc.status_code, "message": exc.message, "data": None}
+    )
+
+@app.on_event("startup")
+def startup_event():
+    scheduler.start()
+
+
+
+
+  
+
+
+@app.post("/decodeVideo/create")
+def decode_video(param:DecodeListParam):
+    task_id = str(uuid.uuid4())
+    DecodeRecord(
+        task_id=task_id,
+        task_params=json.dumps(param.model_dump()),
+        decode_result=None,
+        script_result=None,
+        task_status = 1
+    ).save()
+    return {
+        "code": 0,
+        "message": "success",
+        "data": {
+            "task_id": task_id
+        }
+    }
+
+
+@app.post("/decode/result")
+def get_decode_result(param: TaskStatusParam):
+    decode_result, task_status = get_decode_result_by_id(param.task_id)
+    if decode_result:
+        return {
+            "code": 0,
+            "message": "success",
+            "data": {
+                "decode_result": decode_result,
+                "task_status": task_status
+            }
+        }
+    else:
+        return {
+            "code": -1,
+            "message": "error",
+            "data": None
+        }
+
+@app.post("/script/result")
+def get_script_result(param: TaskStatusParam):
+    decode_result,script_result, task_status = get_script_result_by_id(param.task_id)
+    if script_result:
+        return {
+            "code": 0,
+            "message": "success",
+            "data": {
+                "script_result": script_result,
+                "decode_result": decode_result,
+                "task_status": task_status
+            }
+        }
+    else:
+        return {
+            "code": -1,
+            "message": "error",
+            "data": None
+        }
+
+
+

+ 32 - 0
models/decode_record.py

@@ -0,0 +1,32 @@
+from typing import Optional
+
+from pydantic import BaseModel, Field
+from typing_extensions import Annotated
+
+from utils.general import get_now_ts
+from utils.sync_mysql_help import mysql
+
+class DecodeRecord(BaseModel):
+    table_name:       Annotated[Optional[str], Field(description='表名', exclude=True)] = 'decode_record'
+
+    task_id:          Annotated[str, Field(description='任务ID')]
+    model:            Annotated[str, Field(description='模型名称')]
+    task_params:      Annotated[Optional[str], Field(description='任务参数', default=None)]
+    task_status:      Annotated[Optional[int], Field(description='任务状态', default=1)]  # 1: 进行中, 2: 已完成, 3: 失败
+    decode_result:      Annotated[Optional[str], Field(description='解构结果', default=None)]
+    script_result:      Annotated[Optional[str], Field(description='脚本结果', default=None)]
+    create_timestamp: Annotated[Optional[int], Field(description='任务创建时间戳', default_factory=get_now_ts)]
+
+    def save(self):
+
+        record = self.model_dump(exclude={'table_name'})
+        keys = record.keys()
+        sql = f'''INSERT INTO {self.table_name} ({", ".join(keys)}) VALUES ({", ".join(["%s"] * len(keys))})'''
+        mysql.execute(sql, tuple([record[key] for key in keys]))
+
+    async def async_save(self):
+    
+        record = self.model_dump(exclude={'table_name'})
+        keys = record.keys()
+        sql = f'''INSERT INTO {self.table_name} ({", ".join(keys)}) VALUES ({", ".join(["%s"] * len(keys))})'''
+        await mysql.execute(sql, tuple([record[key] for key in keys]))

+ 31 - 0
models/decode_result_record.py

@@ -0,0 +1,31 @@
+from typing import Optional
+
+from pydantic import BaseModel, Field
+from typing_extensions import Annotated
+
+from utils.general import get_now_ts
+from utils.sync_mysql_help import mysql
+
+class DecodeResultRecord(BaseModel):
+    table_name:       Annotated[Optional[str], Field(description='表名', exclude=True)] = 'decode_result_record'
+
+    task_id:          Annotated[str, Field(description='任务ID')]
+    model:            Annotated[str, Field(description='模型名称')]
+    task_params:      Annotated[Optional[str], Field(description='任务参数', default=None)]
+    task_status:      Annotated[Optional[int], Field(description='任务状态', default=1)]  # 1: 进行中, 2: 已完成, 3: 失败
+    task_result:      Annotated[Optional[str], Field(description='任务结果', default=None)]
+    create_timestamp: Annotated[Optional[int], Field(description='任务创建时间戳', default_factory=get_now_ts)]
+
+    def save(self):
+
+        record = self.model_dump(exclude={'table_name'})
+        keys = record.keys()
+        sql = f'''INSERT INTO {self.table_name} ({", ".join(keys)}) VALUES ({", ".join(["%s"] * len(keys))})'''
+        mysql.execute(sql, tuple([record[key] for key in keys]))
+
+    async def async_save(self):
+
+        record = self.model_dump(exclude={'table_name'})
+        keys = record.keys()
+        sql = f'''INSERT INTO {self.table_name} ({", ".join(keys)}) VALUES ({", ".join(["%s"] * len(keys))})'''
+        await mysql.execute(sql, tuple([record[key] for key in keys]))

+ 11 - 1
requirements.txt

@@ -12,4 +12,14 @@ requests>=2.32.0
 aiohttp>=3.9.0
 pytest>=8.0.0
 pytest-asyncio>=0.24.0
-aiofiles>=24.0.0
+aiofiles>=24.0.0
+fastapi>=0.111.0
+uvicorn>=0.30.0
+loguru>=0.7.0
+APScheduler>=3.10.4
+redis>=5.0.0
+PyMySQL>=1.1.0
+DBUtils>=3.0.3
+dashscope>=1.14.0
+oss2>=2.17.0
+python-dotenv

BIN
src/__pycache__/__init__.cpython-313.pyc


BIN
src/components/__pycache__/__init__.cpython-313.pyc


BIN
src/components/agents/__pycache__/__init__.cpython-313.pyc


BIN
src/components/agents/__pycache__/base.cpython-313.pyc


BIN
src/components/agents/__pycache__/inspiration_points_agent.cpython-313.pyc


BIN
src/components/agents/__pycache__/key_points_agent.cpython-313.pyc


BIN
src/components/agents/__pycache__/purpose_point_agent.cpython-313.pyc


BIN
src/components/agents/__pycache__/script_form_extraction_agent.cpython-313.pyc


BIN
src/components/agents/__pycache__/script_section_division_agent.cpython-313.pyc


BIN
src/components/agents/__pycache__/script_substance_extraction_agent.cpython-313.pyc


BIN
src/components/agents/__pycache__/search_keyword_agent.cpython-313.pyc


BIN
src/components/agents/__pycache__/structure_agent.cpython-313.pyc


BIN
src/components/agents/__pycache__/topic_agent_v2.cpython-313.pyc


BIN
src/components/agents/__pycache__/topic_selection_understanding_agent.cpython-313.pyc


BIN
src/components/functions/__pycache__/__init__.cpython-313.pyc


BIN
src/components/functions/__pycache__/base.cpython-313.pyc


BIN
src/components/functions/__pycache__/json_utils.cpython-313.pyc


BIN
src/components/functions/__pycache__/result_aggregation_function.cpython-313.pyc


BIN
src/components/functions/__pycache__/video_upload_function.cpython-313.pyc


BIN
src/components/tools/__pycache__/__init__.cpython-313.pyc


BIN
src/components/tools/__pycache__/base.cpython-313.pyc


BIN
src/components/tools/__pycache__/knowledge_retrieval_tools.cpython-313.pyc


BIN
src/components/tools/__pycache__/nanobanana_tools.cpython-313.pyc


BIN
src/components/tools/__pycache__/segment_tools.cpython-313.pyc


BIN
src/states/__pycache__/__init__.cpython-313.pyc


BIN
src/states/__pycache__/script_state.cpython-313.pyc


BIN
src/states/__pycache__/what_deconstruction_state.cpython-313.pyc


BIN
src/utils/__pycache__/__init__.cpython-313.pyc


BIN
src/utils/__pycache__/json_extractor.cpython-313.pyc


BIN
src/utils/__pycache__/llm_invoker.cpython-313.pyc


BIN
src/utils/__pycache__/logger.cpython-313.pyc


BIN
src/workflows/__pycache__/__init__.cpython-313.pyc


BIN
src/workflows/__pycache__/script_workflow.cpython-313.pyc


BIN
src/workflows/__pycache__/what_deconstruction_workflow.cpython-313.pyc


+ 46 - 0
task_schedule.py

@@ -0,0 +1,46 @@
+from apscheduler.schedulers.background import BackgroundScheduler
+from apscheduler.triggers.interval import IntervalTrigger
+from decode_task.decodeTask import  decode_task_status_handler
+from decode_task.scriptTask import  script_task_status_handler
+from loguru import logger
+
+
+class TaskScheduler:
+    def __init__(self):
+        self.scheduler = BackgroundScheduler()
+        self.setup_jobs()
+
+    def setup_jobs(self):
+        """设置定时任务"""
+
+        self.scheduler.add_job(
+            func=decode_task_status_handler,
+            trigger=IntervalTrigger(seconds=60),
+            id='decode_task_status_handler',
+            name='任务状态刷新',
+            max_instances=5  # 防止重复执行
+        )
+
+        self.scheduler.add_job(
+            func=script_task_status_handler,
+            trigger=IntervalTrigger(seconds=60),
+            id='script_task_status_handler',
+            name='script_task_status_handler',
+            max_instances=5  # 防止重复执行
+        )
+
+    def start(self):
+        """启动调度器"""
+        logger.info("定时任务调度器启动")
+        try:
+            self.scheduler.start()
+        except KeyboardInterrupt:
+            logger.info("收到停止信号,正在关闭调度器...")
+            self.shutdown()
+
+    def shutdown(self):
+        """关闭调度器"""
+        self.scheduler.shutdown()
+        logger.info("定时任务调度器已关闭")
+
+

+ 29 - 0
utils/ali_oss.py

@@ -0,0 +1,29 @@
+import os
+from hashlib import md5
+import oss2
+from oss2.credentials import EnvironmentVariableCredentialsProvider
+
+
+os.environ['OSS_ACCESS_KEY_ID'] = 'LTAI5tEYvefc4U3fyU5du225'
+os.environ['OSS_ACCESS_KEY_SECRET'] = 'Z1gZtAGe8NwRtXgPzVgRMkRez4Ex4K'
+OSS_BUCKET_ENDPOINT = 'oss-accelerate.aliyuncs.com'
+OSS_BUCKET_NAME = 'aigc-admin'
+OSS_BUCKET_PATH = 'crawler'
+OSS_BUCKET_REGION = 'ap-southeast-1'
+
+def local_file_upload(local_file_path: str):
+    """
+    上传本地文件
+    """
+    
+    auth = oss2.ProviderAuthV4(EnvironmentVariableCredentialsProvider())
+    # 创建Bucket实例,指定存储空间的名称和Region信息。
+    bucket = oss2.Bucket(auth, OSS_BUCKET_ENDPOINT, OSS_BUCKET_NAME, region=OSS_BUCKET_REGION)
+
+    # 本地文件的完整路径
+    file_suffix = local_file_path.split('.')[-1]
+    objectName = f'crawler/video/{md5(local_file_path.encode("utf-8")).hexdigest()}.{file_suffix}'
+
+    # 使用put_object_from_file方法将本地文件上传至OSS
+    bucket.put_object_from_file(objectName, local_file_path)
+    return f'https://res.cybertogether.net/{objectName}'

+ 108 - 0
utils/general.py

@@ -0,0 +1,108 @@
+
+import os
+import re
+from datetime import datetime, timedelta
+from pathlib import Path
+from random import randint
+from typing import Any, List, Literal, Optional, Tuple
+
+
+_STR_DIGITAL_MAP = {
+    'k': 1e3,
+    '千': 1e3,
+    'w': 1e4,
+    '万': 1e4,
+    '亿': 1e8,
+}
+
+def get_root_dir() -> str:
+    """获取项目根目录的绝对路径"""
+    current_path = Path(__file__).resolve()
+    root_path = current_path.parent if not current_path.is_dir() else current_path
+    while True:
+        if 'requirements.txt' in os.listdir(root_path):
+            return str(root_path.absolute())
+        root_path = root_path.parent
+
+
+def get_abs_path(relative_path: str) -> str:
+    """获取目标文件或目录的相对路径在系统中的绝对路径"""
+    return os.path.join(get_root_dir(), relative_path)
+
+
+
+
+def get_now_ts(length: Literal[10, 13] = 13) -> int:
+    """获取当前时间的10位或13位时间戳"""
+    now_ts = datetime.now().timestamp()
+    if length == 10:
+        return int(now_ts)
+    else:
+        return int(now_ts * 1000)
+
+
+def pascal_to_snake(pascal_str: str) -> str:
+    """将Pascal字符串转为蛇形字符串"""
+    snake_str = re.sub(r'([a-z])([A-Z])', r'\1_\2', pascal_str)
+    return snake_str.lower()
+
+
+def snake_to_pascal(snake_str: str) -> str:
+    """将蛇形字符串转为Pascal字符串"""
+    return ''.join([item.title() for item in snake_str.split('_')])
+
+
+def generate_task_id():
+    """生成task_id"""
+    now = datetime.now().strftime('%Y%m%d%H%M%S%f')[:-3]
+    return f'{now}{randint(100000, 999999)}'
+
+
+def get_x_day_ago_timestamp(x_days_ago: int = 0, length: Literal[13, 10] = 13):
+    now = datetime.now()
+    yesterday = now - timedelta(days=x_days_ago)
+    yesterday_midnight = yesterday.replace(hour=0, minute=0, second=0, microsecond=0)
+    if length == 10:
+        return int(yesterday_midnight.timestamp())
+    else:
+        return int(yesterday_midnight.timestamp() * 1000)
+
+
+def get_file_size(file_path):
+    with open(file_path, 'rb') as f:
+        return len(f.read())
+
+
+def cookie_str_to_json(cookie: str) -> dict:
+    components = cookie.strip().split(';')
+    short_parsed_request = {component.split('=')[0].strip(): component.split('=')[1].strip() for component in components if
+                            '=' in component}
+    return short_parsed_request
+
+def get_day_timestamp(day):
+    now = datetime.now()
+    yesterday = now - timedelta(days=day)
+    yesterday_midnight = yesterday.replace(hour=0, minute=0, second=0, microsecond=0)
+    timestamp_yesterday_midnight_ms = int(yesterday_midnight.timestamp())
+
+    return timestamp_yesterday_midnight_ms
+
+
+def str_digital_to_int(v: str) -> int:
+    """将带有+、k、w字符的数据类字符串转为int"""
+    v = v.rstrip('+')
+    unit = v[-1]
+    if unit in _STR_DIGITAL_MAP:
+        v = float(v.rstrip(unit)) * _STR_DIGITAL_MAP[unit]
+    else:
+        v = float(v)
+    return int(v)
+
+def datetime_to_timestamp(time_str:str):
+    dt = datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S")
+    # 转换为时间戳
+    timestamp = int(dt.timestamp())*1000
+    return timestamp
+
+if __name__ == '__main__':
+    print(datetime_to_timestamp("2024-10-30 19:05:56"))

+ 15 - 0
utils/params.py

@@ -0,0 +1,15 @@
+from pydantic import BaseModel
+from typing import List
+
+
+
+class TaskStatusParam(BaseModel):
+    task_id: str
+
+class DecodeParam(BaseModel):
+    channel_content_id: str
+    video_url: str
+    title: str
+
+class DecodeListParam(BaseModel):
+    video_list: List[DecodeParam]

+ 86 - 0
utils/sync_mysql_help.py

@@ -0,0 +1,86 @@
+import pymysql
+
+
+from typing import Tuple, Any, Dict, Literal, Optional
+from dbutils.pooled_db import PooledDB, PooledDedicatedDBConnection
+from dbutils.steady_db import SteadyDBCursor
+from pymysql.cursors import DictCursor
+
+
+class SyncMySQLHelper(object):
+    _pool: PooledDB = None
+    _instance = None
+
+    def __new__(cls, *args, **kwargs):
+        """单例"""
+        if cls._instance is None:
+            cls._instance = super().__new__(cls, *args, **kwargs)
+        return cls._instance
+
+    def get_pool(self):
+        if self._pool is None:
+            self._pool = PooledDB(
+                creator=pymysql,
+                mincached=10,
+                maxconnections=20,
+                blocking=True,
+                host='rm-bp1fl9aw4hn6u864a.mysql.rds.aliyuncs.com',
+                port=3306,
+                user='content_rw',
+                password='bC1aH4bA1lB0',
+                database='content-deconstruction')
+
+        return self._pool
+
+    def fetchone(self, sql: str, data: Optional[Tuple[Any, ...]] = None) -> Dict[str, Any]:
+        pool = self.get_pool()
+        with pool.connection() as conn:  
+            with conn.cursor(DictCursor) as cursor: 
+                cursor.execute(sql, data)
+                result = cursor.fetchone()
+                return result
+
+    def fetchall(self, sql: str, data: Optional[Tuple[Any, ...]] = None) -> Tuple[Dict[str, Any]]:
+        pool = self.get_pool()
+        with pool.connection() as conn: 
+            with conn.cursor(DictCursor) as cursor: 
+                cursor.execute(sql, data)
+                result = cursor.fetchall()
+                return result
+
+    def fetchmany(self,
+                  sql: str,
+                  data: Optional[Tuple[Any, ...]] = None,
+                  size: Optional[int] = None) -> Tuple[Dict[str, Any]]:
+        pool = self.get_pool()
+        with pool.connection() as conn:  
+            with conn.cursor(DictCursor) as cursor: 
+                cursor.execute(sql, data)
+                result = cursor.fetchmany(size=size)
+                return result
+
+    def execute(self, sql: str, data: Optional[Tuple[Any, ...]] = None):
+        pool = self.get_pool()
+        with pool.connection() as conn:  
+            with conn.cursor(DictCursor) as cursor:  
+                try:
+                    cursor.execute(sql, data)
+                    result = conn.commit()
+                    return result
+                except pymysql.err.IntegrityError as e:
+                    if e.args[0] == 1062:  # 重复值
+                        return None
+                    else:
+                        raise e
+                except pymysql.err.OperationalError as e:
+                    if e.args[0] == 1205:  # 死锁
+                        conn.rollback()
+                        return None
+                    else:
+                        raise e
+
+
+mysql = SyncMySQLHelper()
+
+
+