main.py 2.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. import threading
  2. import traceback
  3. import json
  4. import time
  5. import uuid
  6. from application.common import AliyunLogger, get_consumer, ack_message
  7. from application.common.log import Local
  8. from application.spiders.universal_crawler import UniversalCrawler
  9. from application.config import TopicGroup
  10. from application.service.user_service import get_user_list
  11. from application.service.rule_service import get_rule_dict
  12. def generate_trace_id():
  13. return f"{uuid.uuid4().hex}{int(time.time() * 1000)}"
  14. def handle_message(topic: str):
  15. consumer = get_consumer(topic)
  16. logger = AliyunLogger(platform=topic, mode="unknown")
  17. while True:
  18. try:
  19. messages = consumer.consume_message(wait_seconds=10, batch_size=1)
  20. if not messages:
  21. continue
  22. for message in messages:
  23. trace_id = generate_trace_id()
  24. body = message.message_body
  25. try:
  26. payload = json.loads(body)
  27. platform = payload["platform"]
  28. mode = payload.get("mode", "recommend")
  29. logger = AliyunLogger(platform=platform, mode=mode)
  30. Local.logger(platform, mode).info(f"[trace_id={trace_id}] 收到任务: {body}")
  31. # 加载 user_list 与 rule_dict
  32. user_list = get_user_list(platform, mode)
  33. rule_dict = get_rule_dict(platform, mode)
  34. # 同步执行 UniversalCrawler
  35. crawler = UniversalCrawler(platform, mode, rule_dict, user_list)
  36. crawler.run()
  37. # 执行成功后 ack
  38. ack_message(mode, platform, message, consumer, trace_id=trace_id)
  39. logger.logging(code="1000", message="任务成功完成并确认消息", trace_id=trace_id)
  40. except Exception as e:
  41. logger.logging(
  42. code="9001",
  43. message=f"处理消息失败(未确认 ack): {e}\n{traceback.format_exc()}",
  44. trace_id=trace_id,
  45. data=body,
  46. )
  47. # 不 ack,等待下次重试
  48. except Exception as err:
  49. logger.logging(code="9002", message=f"消费失败: {err}\n{traceback.format_exc()}")
  50. time.sleep(2)
  51. def main():
  52. topic_list = TopicGroup().topics
  53. print(f"监听 Topics:{topic_list}")
  54. threads = []
  55. for topic in topic_list:
  56. t = threading.Thread(target=handle_message, args=(topic,))
  57. t.start()
  58. threads.append(t)
  59. for t in threads:
  60. t.join()
  61. if __name__ == '__main__':
  62. main()