import threading import traceback import json import time import uuid from application.common import AliyunLogger, get_consumer, ack_message from application.common.log import Local from application.spiders.universal_crawler import UniversalCrawler from application.config import TopicGroup from application.service.user_service import get_user_list from application.service.rule_service import get_rule_dict def generate_trace_id(): return f"{uuid.uuid4().hex}{int(time.time() * 1000)}" def handle_message(topic: str): consumer = get_consumer(topic) logger = AliyunLogger(platform=topic, mode="unknown") while True: try: messages = consumer.consume_message(wait_seconds=10, batch_size=1) if not messages: continue for message in messages: trace_id = generate_trace_id() body = message.message_body try: payload = json.loads(body) platform = payload["platform"] mode = payload.get("mode", "recommend") logger = AliyunLogger(platform=platform, mode=mode) Local.logger(platform, mode).info(f"[trace_id={trace_id}] 收到任务: {body}") # 加载 user_list 与 rule_dict user_list = get_user_list(platform, mode) rule_dict = get_rule_dict(platform, mode) # 同步执行 UniversalCrawler crawler = UniversalCrawler(platform, mode, rule_dict, user_list) crawler.run() # 执行成功后 ack ack_message(mode, platform, message, consumer, trace_id=trace_id) logger.logging(code="1000", message="任务成功完成并确认消息", trace_id=trace_id) except Exception as e: logger.logging( code="9001", message=f"处理消息失败(未确认 ack): {e}\n{traceback.format_exc()}", trace_id=trace_id, data=body, ) # 不 ack,等待下次重试 except Exception as err: logger.logging(code="9002", message=f"消费失败: {err}\n{traceback.format_exc()}") time.sleep(2) def main(): topic_list = TopicGroup().topics print(f"监听 Topics:{topic_list}") threads = [] for topic in topic_list: t = threading.Thread(target=handle_message, args=(topic,)) t.start() threads.append(t) for t in threads: t.join() if __name__ == '__main__': main()