3_handle.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. import os
  2. import json
  3. import time
  4. import sys
  5. import argparse
  6. from typing import Dict, Any, List, Optional, Tuple
  7. # 导入自定义模块
  8. sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  9. from utils.mysql_db import MysqlHelper
  10. from gemini import GeminiProcessor
  11. from utils.file import File
  12. class Handler:
  13. def __init__(self):
  14. # 初始化处理器
  15. self.processor = GeminiProcessor()
  16. self.system_prompt = File.read_file('prompt/handle.md')
  17. def build_query_conditions(self, query_word: Optional[str],
  18. source_type: Optional[str],
  19. source_channel: Optional[str]) -> Tuple[str, Tuple]:
  20. """构建查询条件和参数"""
  21. conditions = ["multimodal_recognition is not null", "structured_data is null"]
  22. params = []
  23. if query_word is not None:
  24. conditions.append("query_word = %s")
  25. params.append(query_word)
  26. if source_type is not None:
  27. conditions.append("source_type = %s")
  28. params.append(source_type)
  29. if source_channel is not None:
  30. conditions.append("source_channel = %s")
  31. params.append(source_channel)
  32. where_clause = " AND ".join(conditions)
  33. return where_clause, tuple(params)
  34. def process_all_records(self, query_word: Optional[str],
  35. source_type: Optional[str],
  36. source_channel: Optional[str]):
  37. """处理所有记录"""
  38. total_processed = 0
  39. total_success = 0
  40. try:
  41. # 构建查询条件和参数
  42. where_clause, params = self.build_query_conditions(query_word, source_type, source_channel)
  43. sql = f"""
  44. SELECT id, multimodal_recognition
  45. FROM knowledge_search_content
  46. WHERE {where_clause}
  47. """
  48. # 查询记录
  49. records = MysqlHelper.get_values(sql, params)
  50. print(f"获取到 {len(records)} 条记录")
  51. # 处理每条记录
  52. for row in records:
  53. total_processed += 1
  54. try:
  55. # 处理内容
  56. result = self.processor.process(row[1], self.system_prompt)
  57. print(result)
  58. # 更新数据库
  59. update_sql = """
  60. UPDATE knowledge_search_content
  61. SET structured_data = %s
  62. WHERE id = %s
  63. """
  64. affected_rows = MysqlHelper.update_values(update_sql, (result, row[0]))
  65. total_success += 1
  66. # 添加延迟避免API限制
  67. time.sleep(5)
  68. except Exception as e:
  69. print(f"处理记录 {row[0]} 失败: {str(e)}")
  70. finally:
  71. print(f"处理完成!总数据量 {len(records)},已处理 {total_processed} ,成功 {total_success} ")
  72. except Exception as e:
  73. print(f"处理过程中发生错误: {str(e)}")
  74. def main():
  75. """主函数"""
  76. parser = argparse.ArgumentParser(description='内容识别脚本')
  77. parser.add_argument('--query_word', default=None, help='query词')
  78. parser.add_argument('--source_type', default=None, help='数据源类型')
  79. parser.add_argument('--source_channel', default=None, help='数据源渠道')
  80. args = parser.parse_args()
  81. try:
  82. handler = Handler()
  83. handler.process_all_records(
  84. query_word=args.query_word,
  85. source_type=args.source_type,
  86. source_channel=args.source_channel
  87. )
  88. except Exception as e:
  89. print(f"程序执行失败: {str(e)}")
  90. sys.exit(1)
  91. if __name__ == "__main__":
  92. main()