12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576 |
- import threading
- import traceback
- import json
- import time
- import uuid
- from application.common import AliyunLogger, get_consumer, ack_message
- from application.common.log import Local
- from application.spiders.universal_crawler import UniversalCrawler
- from application.config import TopicGroup
- from application.service.user_service import get_user_list
- from application.service.rule_service import get_rule_dict
- def generate_trace_id():
- return f"{uuid.uuid4().hex}{int(time.time() * 1000)}"
- def handle_message(topic: str):
- consumer = get_consumer(topic)
- logger = AliyunLogger(platform=topic, mode="unknown")
- while True:
- try:
- messages = consumer.consume_message(wait_seconds=10, batch_size=1)
- if not messages:
- continue
- for message in messages:
- trace_id = generate_trace_id()
- body = message.message_body
- try:
- payload = json.loads(body)
- platform = payload["platform"]
- mode = payload.get("mode", "recommend")
- logger = AliyunLogger(platform=platform, mode=mode)
- Local.logger(platform, mode).info(f"[trace_id={trace_id}] 收到任务: {body}")
- # 加载 user_list 与 rule_dict
- user_list = get_user_list(platform, mode)
- rule_dict = get_rule_dict(platform, mode)
- # 同步执行 UniversalCrawler
- crawler = UniversalCrawler(platform, mode, rule_dict, user_list)
- crawler.run()
- # 执行成功后 ack
- ack_message(mode, platform, message, consumer, trace_id=trace_id)
- logger.logging(code="1000", message="任务成功完成并确认消息", trace_id=trace_id)
- except Exception as e:
- logger.logging(
- code="9001",
- message=f"处理消息失败(未确认 ack): {e}\n{traceback.format_exc()}",
- trace_id=trace_id,
- data=body,
- )
- # 不 ack,等待下次重试
- except Exception as err:
- logger.logging(code="9002", message=f"消费失败: {err}\n{traceback.format_exc()}")
- time.sleep(2)
- def main():
- topic_list = TopicGroup().topics
- print(f"监听 Topics:{topic_list}")
- threads = []
- for topic in topic_list:
- t = threading.Thread(target=handle_message, args=(topic,))
- t.start()
- threads.append(t)
- for t in threads:
- t.join()
- if __name__ == '__main__':
- main()
|