main.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. import importlib
  2. import threading
  3. import traceback
  4. import json
  5. import time
  6. import uuid
  7. from application.common import AliyunLogger, get_consumer, ack_message
  8. from application.common.log import Local
  9. from crawler_worker.universal_crawler import UniversalCrawler
  10. from application.config import TopicGroup
  11. from application.functions.mysql_service import MysqlService
  12. from utils.config_loader import ConfigLoader
  13. def generate_trace_id():
  14. return f"{uuid.uuid4().hex}{int(time.time() * 1000)}"
  15. def import_custom_class(class_path):
  16. """
  17. 动态导入模块中的类,如 crawler_worker.universal_crawler.UniversalCrawler
  18. """
  19. module_path, class_name = class_path.rsplit(".", 1)
  20. print(module_path, class_name)
  21. module = importlib.import_module(module_path)
  22. return getattr(module, class_name)
  23. def handle_message(topic: str, mode: str):
  24. consumer = get_consumer(topic_name=topic, group_id=topic)
  25. logger = AliyunLogger(platform=topic, mode=mode)
  26. platform_config = ConfigLoader().get_platform_config(topic)
  27. while True:
  28. try:
  29. messages = consumer.consume_message(wait_seconds=10, batch_size=1)
  30. if not messages:
  31. continue
  32. for message in messages:
  33. trace_id = generate_trace_id()
  34. body = message.message_body
  35. try:
  36. payload = json.loads(body)
  37. platform = payload["platform"]
  38. mode = payload["mode"]
  39. task_id = payload["id"]
  40. mysql_service = MysqlService(task_id, mode, platform)
  41. logger.logging(
  42. 1001,
  43. "开始一轮抓取",
  44. data=payload,
  45. trace_id=trace_id
  46. )
  47. Local.init_logger(platform, mode).info(f"[trace_id={trace_id}] 收到任务: {body}")
  48. # 加载 user_list 与 rule_dict
  49. user_list = mysql_service.get_user_list()
  50. rule_dict = mysql_service.get_rule_dict()
  51. custom_class = platform_config.get("custom_class") # 自定义类
  52. try:
  53. if custom_class:
  54. CrawlerClass = import_custom_class(custom_class)
  55. else:
  56. CrawlerClass = UniversalCrawler
  57. crawler = CrawlerClass(
  58. platform_config=platform_config, # 把整段配置传进去
  59. rule_dict=rule_dict,
  60. user_list=user_list,
  61. trace_id=trace_id
  62. )
  63. crawler.run()
  64. except Exception as e:
  65. print(f"[{topic}] 爬虫运行异常: {e}")
  66. # 执行成功后 ack
  67. ack_message(mode, platform, message, consumer, trace_id=trace_id)
  68. logger.logging(code="1000", message="任务成功完成并确认消息", trace_id=trace_id)
  69. except Exception as e:
  70. logger.logging(
  71. code="9001",
  72. message=f"处理消息失败(未确认 ack): {e}\n{traceback.format_exc()}",
  73. trace_id=trace_id,
  74. data=body,
  75. )
  76. # 不 ack,等待下次重试
  77. except Exception as err:
  78. logger.logging(code="9002", message=f"消费失败: {err}\n{traceback.format_exc()}")
  79. def main():
  80. topic_list = TopicGroup()
  81. print(f"监听 Topics:{topic_list}")
  82. threads = []
  83. for topic in topic_list:
  84. mode = topic.split("_")[1]
  85. t = threading.Thread(target=handle_message, args=(topic, mode,))
  86. t.start()
  87. threads.append(t)
  88. for t in threads:
  89. t.join()
  90. if __name__ == '__main__':
  91. main()