|
@@ -6,7 +6,7 @@ description: 票圈线上代码控制程序,分布式抓取消费核心
|
|
|
import asyncio
|
|
|
import json
|
|
|
import time
|
|
|
-
|
|
|
+import uuid
|
|
|
from mq_http_sdk.mq_consumer import *
|
|
|
from mq_http_sdk.mq_exception import MQExceptionBase
|
|
|
|
|
@@ -27,14 +27,26 @@ async def run(task_id, mode, platform):
|
|
|
"""
|
|
|
# 创建一个aliyun日志对象
|
|
|
logger = AliyunLogger(platform=platform, mode=mode)
|
|
|
- logger.logging(code=1005, message="{}: 启动进程".format(platform))
|
|
|
# 创建并一个子进程
|
|
|
- await asyncio.create_subprocess_shell(
|
|
|
- "python3 scheduler/run_spider_online.py --task_id {} --mode {} --platform {}".format(
|
|
|
- task_id, mode, platform
|
|
|
+ try:
|
|
|
+ process = await asyncio.create_subprocess_shell(
|
|
|
+ "python3 scheduler/run_spider_online.py --task_id {} --mode {} --platform {}".format(
|
|
|
+ task_id, mode, platform
|
|
|
+ )
|
|
|
)
|
|
|
- )
|
|
|
- logger.logging(code=5002, message="successfully run spider")
|
|
|
+ # 检查进程是否已经启动
|
|
|
+ if process.returncode is None:
|
|
|
+ logger.logging(code=1005, message="{}: 启动进程成功,进程ID:{}".format(platform,process.pid))
|
|
|
+ else:
|
|
|
+ logger.logging(code=1006, message="{}: 启动进程失败".format(platform))
|
|
|
+ # logger.logging(code=5002, message="successfully run spider")
|
|
|
+ except Exception as e:
|
|
|
+ logger.logging(code=1007,message= f"发生未知错误: {e}")
|
|
|
+
|
|
|
+# def generate_trace_id():
|
|
|
+# timestamp = str(int(time.time() * 1000))
|
|
|
+# unique_id = str(uuid.uuid4())
|
|
|
+# return f"{timestamp}-{unique_id}"
|
|
|
|
|
|
|
|
|
async def consume_single_message(spider):
|
|
@@ -49,7 +61,11 @@ async def consume_single_message(spider):
|
|
|
logger = AliyunLogger(platform=platform, mode=mode)
|
|
|
consumer = get_consumer(topic, group)
|
|
|
try:
|
|
|
- messages = consumer.consume_message(wait_seconds=10, batch_size=1)
|
|
|
+ try:
|
|
|
+ messages = consumer.consume_message(wait_seconds=10, batch_size=1)
|
|
|
+ except Exception as err:
|
|
|
+ logger.logging(code="5005", message=f"Consume Message Fail! Exception:{err}")
|
|
|
+ raise
|
|
|
if messages:
|
|
|
# 在这里消费消息,做一些数据处理分析
|
|
|
for single_message in messages:
|
|
@@ -64,6 +80,8 @@ async def consume_single_message(spider):
|
|
|
)
|
|
|
message_body = single_message.message_body
|
|
|
task_id = json.loads(message_body)["id"]
|
|
|
+ Local.logger(platform, mode).info(
|
|
|
+ f"task_id=={task_id}")
|
|
|
# 创建爬虫task
|
|
|
await asyncio.create_task(
|
|
|
run(task_id, spider["mode"], spider["platform"])
|