3_handle.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. import os
  2. import json
  3. import time
  4. import sys
  5. import argparse
  6. from typing import Dict, Any, List, Optional
  7. # 导入自定义模块
  8. sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  9. from utils.mysql_db import MysqlHelper
  10. from gemini import GeminiProcessor
  11. from utils.file import File
  12. class Handler:
  13. def __init__(self):
  14. # 初始化飞书客户端
  15. self.processor = GeminiProcessor()
  16. self.system_prompt = File.read_file('prompt/handle.md')
  17. # print(self.system_prompt)
  18. def process_all_records(self, query_word, source_type, source_channel):
  19. """处理所有记录"""
  20. total_processed = 0
  21. total_success = 0
  22. while True:
  23. try:
  24. # 查库 获取记录
  25. sql = """
  26. select id, formatted_content from knowledge_search_content
  27. where formatted_content is not null and multimodal_recognition is null
  28. """
  29. # 添加条件(当参数有值时)
  30. conditions = []
  31. if query_word is not None:
  32. conditions.append(f"query_word='{query_word}'")
  33. if source_type is not None:
  34. conditions.append(f"source_type='{source_type}'")
  35. if source_channel is not None:
  36. conditions.append(f"source_channel='{source_channel}'")
  37. # 如果有条件,添加到SQL中
  38. if conditions:
  39. sql += " and " + " and ".join(conditions)
  40. records = MysqlHelper.get_values(sql)
  41. print(f"获取到 {len(result)} 条记录")
  42. # 处理每条记录
  43. for row in records:
  44. total_processed += 1
  45. """处理单条记录"""
  46. try:
  47. result = self.processor.process(row[1], self.system_prompt)
  48. # 更新数据库
  49. update_sql = """
  50. update knowledge_search_content set multimodal_recognition = %s where id = %s
  51. """
  52. MysqlHelper.update_values(update_sql, (result, row[0]))
  53. # 添加延迟避免API限制
  54. time.sleep(1)
  55. total_success += 1
  56. except Exception as e:
  57. print(f"处理记录 {record.record_id} 失败: {e}")
  58. # 检查是否有下一页
  59. if not result.has_more:
  60. break
  61. page_token = result.page_token
  62. print(f"继续获取下一页,token: {page_token}")
  63. except Exception as e:
  64. print(f"获取记录失败: {e}")
  65. break
  66. print(f"处理完成!总共处理 {total_processed} 条记录,成功 {total_success} 条")
  67. def main():
  68. """主函数"""
  69. # 创建命令行参数解析器
  70. parser = argparse.ArgumentParser(description='内容识别脚本')
  71. parser.add_argument('--query_word', default=None, help='query词')
  72. parser.add_argument('--source_type', default=None, help='数据源类型')
  73. parser.add_argument('--source_channel', default=None, help='数据源渠道')
  74. args = parser.parse_args()
  75. try:
  76. # 创建内容识别器实例
  77. handler = Handler()
  78. handler.process_all_records(
  79. query_word=args.query_word,
  80. source_type=args.source_type,
  81. source_channel=args.source_channel
  82. )
  83. except Exception as e:
  84. print(f"程序执行失败: {e}")
  85. sys.exit(1)
  86. if __name__ == "__main__":
  87. main()