|
@@ -24,6 +24,7 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
|
|
|
log_service: LogService,
|
|
log_service: LogService,
|
|
|
config: GlobalConfigSettings,
|
|
config: GlobalConfigSettings,
|
|
|
):
|
|
):
|
|
|
|
|
+ self.log_service: LogService = log_service
|
|
|
self.mapper: AutoReplyCardsMonitorMapper = AutoReplyCardsMonitorMapper(
|
|
self.mapper: AutoReplyCardsMonitorMapper = AutoReplyCardsMonitorMapper(
|
|
|
pool=pool, log_service=log_service
|
|
pool=pool, log_service=log_service
|
|
|
)
|
|
)
|
|
@@ -32,15 +33,52 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
# 存储卡片信息
|
|
# 存储卡片信息
|
|
|
- async def store_card(self, task_id, index, msg_type, xml_obj):
|
|
|
|
|
- video_id, root_source_id = self.tool.extract_page_path(xml_obj["page_path"])
|
|
|
|
|
- cover_obj = await self.tool.get_cover_url(
|
|
|
|
|
- xml_obj["aes_key"], xml_obj["file_size"], xml_obj["file_id"]
|
|
|
|
|
- )
|
|
|
|
|
|
|
+ async def store_card(self, task_id, index, msg_type, xml_obj) -> bool:
|
|
|
|
|
+ page_path = xml_obj.get("page_path")
|
|
|
|
|
+ if not page_path:
|
|
|
|
|
+ await self.log_service.log(
|
|
|
|
|
+ contents={
|
|
|
|
|
+ "task": "auto_reply_cards_monitor",
|
|
|
|
|
+ "function": "store_card",
|
|
|
|
|
+ "status": "fail",
|
|
|
|
|
+ "trace_id": task_id,
|
|
|
|
|
+ "message": f"store_card missing page_path, task_id={task_id}",
|
|
|
|
|
+ }
|
|
|
|
|
+ )
|
|
|
|
|
+ return False
|
|
|
|
|
+ video_id, root_source_id = self.tool.extract_page_path(page_path)
|
|
|
|
|
+
|
|
|
|
|
+ aes_key = xml_obj.get("aes_key")
|
|
|
|
|
+ file_id = xml_obj.get("file_id")
|
|
|
|
|
+ if not all([aes_key, file_id]):
|
|
|
|
|
+ await self.log_service.log(
|
|
|
|
|
+ contents={
|
|
|
|
|
+ "task": "auto_reply_cards_monitor",
|
|
|
|
|
+ "function": "store_card",
|
|
|
|
|
+ "status": "fail",
|
|
|
|
|
+ "trace_id": task_id,
|
|
|
|
|
+ "message": f"store_card missing cover params, task_id={task_id}",
|
|
|
|
|
+ }
|
|
|
|
|
+ )
|
|
|
|
|
+ return False
|
|
|
|
|
+
|
|
|
|
|
+ cover_obj = await self.tool.get_cover_url(aes_key, file_id)
|
|
|
|
|
+ if not cover_obj:
|
|
|
|
|
+ await self.log_service.log(
|
|
|
|
|
+ contents={
|
|
|
|
|
+ "task": "auto_reply_cards_monitor",
|
|
|
|
|
+ "function": "store_card",
|
|
|
|
|
+ "status": "fail",
|
|
|
|
|
+ "trace_id": task_id,
|
|
|
|
|
+ "message": f"store_card get_cover_url failed, task_id={task_id}",
|
|
|
|
|
+ }
|
|
|
|
|
+ )
|
|
|
|
|
+ return False
|
|
|
|
|
+
|
|
|
file_name = f"{task_id}_{index}.jpg"
|
|
file_name = f"{task_id}_{index}.jpg"
|
|
|
save_path = self.tool.download_cover(file_name, cover_obj)
|
|
save_path = self.tool.download_cover(file_name, cover_obj)
|
|
|
if save_path is None:
|
|
if save_path is None:
|
|
|
- return
|
|
|
|
|
|
|
+ return False
|
|
|
exist_covers = await self.mapper.fetch_exist_covers(root_source_id)
|
|
exist_covers = await self.mapper.fetch_exist_covers(root_source_id)
|
|
|
if exist_covers:
|
|
if exist_covers:
|
|
|
exist_cover_id, oss_key = self.tool.check_cover(save_path, exist_covers)
|
|
exist_cover_id, oss_key = self.tool.check_cover(save_path, exist_covers)
|
|
@@ -58,7 +96,16 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
|
|
|
)
|
|
)
|
|
|
else:
|
|
else:
|
|
|
self.tool.remove_local_cover(save_path)
|
|
self.tool.remove_local_cover(save_path)
|
|
|
- return
|
|
|
|
|
|
|
+ await self.log_service.log(
|
|
|
|
|
+ contents={
|
|
|
|
|
+ "task": "auto_reply_cards_monitor",
|
|
|
|
|
+ "function": "store_card",
|
|
|
|
|
+ "status": "fail",
|
|
|
|
|
+ "trace_id": task_id,
|
|
|
|
|
+ "message": f"store_card upload_cover failed, task_id={task_id}",
|
|
|
|
|
+ }
|
|
|
|
|
+ )
|
|
|
|
|
+ return False
|
|
|
else:
|
|
else:
|
|
|
# upload to oss
|
|
# upload to oss
|
|
|
oss_key, cover_id = self.tool.upload_cover(file_name, save_path)
|
|
oss_key, cover_id = self.tool.upload_cover(file_name, save_path)
|
|
@@ -68,14 +115,23 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
|
|
|
)
|
|
)
|
|
|
else:
|
|
else:
|
|
|
self.tool.remove_local_cover(save_path)
|
|
self.tool.remove_local_cover(save_path)
|
|
|
- return
|
|
|
|
|
|
|
+ await self.log_service.log(
|
|
|
|
|
+ contents={
|
|
|
|
|
+ "task": "auto_reply_cards_monitor",
|
|
|
|
|
+ "function": "store_card",
|
|
|
|
|
+ "status": "fail",
|
|
|
|
|
+ "trace_id": task_id,
|
|
|
|
|
+ "message": f"store_card upload_cover failed, task_id={task_id}",
|
|
|
|
|
+ }
|
|
|
|
|
+ )
|
|
|
|
|
+ return False
|
|
|
|
|
|
|
|
query = """
|
|
query = """
|
|
|
INSERT INTO cooperate_auto_reply_detail
|
|
INSERT INTO cooperate_auto_reply_detail
|
|
|
(
|
|
(
|
|
|
- task_id, position, msg_type, card_title, card_cover_id, card_cover,
|
|
|
|
|
|
|
+ task_id, position, msg_type, card_title, card_cover_id, card_cover,
|
|
|
video_id, root_source_id, mini_program_name, task_result
|
|
video_id, root_source_id, mini_program_name, task_result
|
|
|
- ) VALUES
|
|
|
|
|
|
|
+ ) VALUES
|
|
|
(
|
|
(
|
|
|
%s, %s, %s, %s, %s, %s,
|
|
%s, %s, %s, %s, %s, %s,
|
|
|
%s, %s, %s, %s
|
|
%s, %s, %s, %s
|
|
@@ -85,34 +141,39 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
|
|
|
task_id,
|
|
task_id,
|
|
|
index,
|
|
index,
|
|
|
msg_type,
|
|
msg_type,
|
|
|
- xml_obj["title"],
|
|
|
|
|
|
|
+ xml_obj.get("title", ""),
|
|
|
cover_id,
|
|
cover_id,
|
|
|
oss_key,
|
|
oss_key,
|
|
|
video_id,
|
|
video_id,
|
|
|
root_source_id,
|
|
root_source_id,
|
|
|
- xml_obj["mini_program"],
|
|
|
|
|
|
|
+ xml_obj.get("mini_program", ""),
|
|
|
json.dumps(xml_obj, ensure_ascii=False),
|
|
json.dumps(xml_obj, ensure_ascii=False),
|
|
|
)
|
|
)
|
|
|
await self.mapper.store_extract_result(query, insert_row)
|
|
await self.mapper.store_extract_result(query, insert_row)
|
|
|
|
|
+ return True
|
|
|
|
|
|
|
|
# 存储文章信息
|
|
# 存储文章信息
|
|
|
- async def store_article(self, task_id, index, msg_type, xml_obj):
|
|
|
|
|
|
|
+ async def store_article(self, task_id, index, msg_type, xml_obj) -> bool:
|
|
|
article_title = xml_obj.get("title")
|
|
article_title = xml_obj.get("title")
|
|
|
article_link = xml_obj.get("url")
|
|
article_link = xml_obj.get("url")
|
|
|
article_cover = xml_obj.get("cover_url")
|
|
article_cover = xml_obj.get("cover_url")
|
|
|
article_desc = xml_obj.get("desc")
|
|
article_desc = xml_obj.get("desc")
|
|
|
|
|
|
|
|
fetch_fail_status = False
|
|
fetch_fail_status = False
|
|
|
|
|
+ fetch_fail_reason = "获取文章详情失败"
|
|
|
|
|
|
|
|
fetch_response = await get_article_detail(
|
|
fetch_response = await get_article_detail(
|
|
|
article_link=article_link, is_cache=False, is_count=True
|
|
article_link=article_link, is_cache=False, is_count=True
|
|
|
)
|
|
)
|
|
|
if not fetch_response:
|
|
if not fetch_response:
|
|
|
fetch_fail_status = True
|
|
fetch_fail_status = True
|
|
|
|
|
+ fetch_fail_reason = "获取文章详情失败: 响应为空"
|
|
|
|
|
|
|
|
if not fetch_fail_status:
|
|
if not fetch_fail_status:
|
|
|
- if fetch_response.get("code") != 0:
|
|
|
|
|
|
|
+ code = fetch_response.get("code")
|
|
|
|
|
+ if code is None or code != 0:
|
|
|
fetch_fail_status = True
|
|
fetch_fail_status = True
|
|
|
|
|
+ fetch_fail_reason = f"获取文章详情失败: code={code}"
|
|
|
|
|
|
|
|
if fetch_fail_status:
|
|
if fetch_fail_status:
|
|
|
query = """
|
|
query = """
|
|
@@ -121,7 +182,6 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
|
|
|
VALUES
|
|
VALUES
|
|
|
(%s, %s, %s, %s, %s, %s, %s, %s);
|
|
(%s, %s, %s, %s, %s, %s, %s, %s);
|
|
|
"""
|
|
"""
|
|
|
- remark = "获取文章详情失败"
|
|
|
|
|
insert_row = (
|
|
insert_row = (
|
|
|
task_id,
|
|
task_id,
|
|
|
index,
|
|
index,
|
|
@@ -130,34 +190,61 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
|
|
|
article_link,
|
|
article_link,
|
|
|
article_cover,
|
|
article_cover,
|
|
|
article_desc,
|
|
article_desc,
|
|
|
- remark,
|
|
|
|
|
|
|
+ fetch_fail_reason,
|
|
|
)
|
|
)
|
|
|
await self.mapper.store_extract_result(query, insert_row)
|
|
await self.mapper.store_extract_result(query, insert_row)
|
|
|
|
|
+ await self.log_service.log(
|
|
|
|
|
+ contents={
|
|
|
|
|
+ "task": "auto_reply_cards_monitor",
|
|
|
|
|
+ "function": "store_article",
|
|
|
|
|
+ "status": "fail",
|
|
|
|
|
+ "trace_id": task_id,
|
|
|
|
|
+ "message": f"store_article fetch failed, task_id={task_id}, reason={fetch_fail_reason}",
|
|
|
|
|
+ }
|
|
|
|
|
+ )
|
|
|
|
|
+ return False
|
|
|
|
|
|
|
|
else:
|
|
else:
|
|
|
- article_detail = fetch_response["data"]["data"]
|
|
|
|
|
|
|
+ article_detail = fetch_response.get("data", {}).get("data", {})
|
|
|
|
|
+ if not article_detail:
|
|
|
|
|
+ await self.log_service.log(
|
|
|
|
|
+ contents={
|
|
|
|
|
+ "task": "auto_reply_cards_monitor",
|
|
|
|
|
+ "function": "store_article",
|
|
|
|
|
+ "status": "fail",
|
|
|
|
|
+ "trace_id": task_id,
|
|
|
|
|
+ "message": f"store_article missing data.data, task_id={task_id}",
|
|
|
|
|
+ }
|
|
|
|
|
+ )
|
|
|
|
|
+ return False
|
|
|
|
|
+
|
|
|
article_text = article_detail.get("body_text", "")
|
|
article_text = article_detail.get("body_text", "")
|
|
|
article_images = article_detail.get("image_url_list", [])
|
|
article_images = article_detail.get("image_url_list", [])
|
|
|
read_cnt = article_detail.get("view_count") or 0
|
|
read_cnt = article_detail.get("view_count") or 0
|
|
|
like_cnt = article_detail.get("like_count") or 0
|
|
like_cnt = article_detail.get("like_count") or 0
|
|
|
pt = article_detail.get("publish_timestamp")
|
|
pt = article_detail.get("publish_timestamp")
|
|
|
publish_timestamp = int(pt / 1000) if pt is not None else 0
|
|
publish_timestamp = int(pt / 1000) if pt is not None else 0
|
|
|
- parsed = urlparse(article_detail["content_link"])
|
|
|
|
|
- params = parse_qs(parsed.query)
|
|
|
|
|
- wx_sn = params.get("sn", [None])[0]
|
|
|
|
|
|
|
+
|
|
|
|
|
+ content_link = article_detail.get("content_link", "")
|
|
|
|
|
+ if content_link:
|
|
|
|
|
+ parsed = urlparse(content_link)
|
|
|
|
|
+ params = parse_qs(parsed.query)
|
|
|
|
|
+ wx_sn = params.get("sn", [None])[0]
|
|
|
|
|
+ else:
|
|
|
|
|
+ wx_sn = None
|
|
|
|
|
+
|
|
|
mini_info = article_detail.get("mini_program")
|
|
mini_info = article_detail.get("mini_program")
|
|
|
if not mini_info:
|
|
if not mini_info:
|
|
|
- # video_id, root_source_id = None, None
|
|
|
|
|
query = """
|
|
query = """
|
|
|
- INSERT INTO cooperate_auto_reply_detail
|
|
|
|
|
|
|
+ INSERT INTO cooperate_auto_reply_detail
|
|
|
(
|
|
(
|
|
|
- task_id, position, msg_type, article_title, article_link,
|
|
|
|
|
- article_cover, article_text, article_images, article_desc, read_cnt,
|
|
|
|
|
|
|
+ task_id, position, msg_type, article_title, article_link,
|
|
|
|
|
+ article_cover, article_text, article_images, article_desc, read_cnt,
|
|
|
like_cnt, publish_timestamp, task_result, wx_sn
|
|
like_cnt, publish_timestamp, task_result, wx_sn
|
|
|
- ) VALUES
|
|
|
|
|
|
|
+ ) VALUES
|
|
|
(
|
|
(
|
|
|
- %s, %s, %s, %s, %s,
|
|
|
|
|
- %s, %s, %s, %s, %s,
|
|
|
|
|
|
|
+ %s, %s, %s, %s, %s,
|
|
|
|
|
+ %s, %s, %s, %s, %s,
|
|
|
%s, %s, %s, %s
|
|
%s, %s, %s, %s
|
|
|
);
|
|
);
|
|
|
"""
|
|
"""
|
|
@@ -178,27 +265,40 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
|
|
|
wx_sn,
|
|
wx_sn,
|
|
|
)
|
|
)
|
|
|
await self.mapper.store_extract_result(query, values)
|
|
await self.mapper.store_extract_result(query, values)
|
|
|
|
|
+ return True
|
|
|
|
|
|
|
|
else:
|
|
else:
|
|
|
- for card_index, i in enumerate(mini_info, 1):
|
|
|
|
|
|
|
+ for card_index, card_item in enumerate(mini_info, 1):
|
|
|
|
|
+ card_path = card_item.get("path")
|
|
|
|
|
+ if not card_path:
|
|
|
|
|
+ await self.log_service.log(
|
|
|
|
|
+ contents={
|
|
|
|
|
+ "task": "auto_reply_cards_monitor",
|
|
|
|
|
+ "function": "store_article",
|
|
|
|
|
+ "status": "fail",
|
|
|
|
|
+ "trace_id": task_id,
|
|
|
|
|
+ "message": f"store_article mini_program missing path, task_id={task_id}",
|
|
|
|
|
+ }
|
|
|
|
|
+ )
|
|
|
|
|
+ continue
|
|
|
try:
|
|
try:
|
|
|
video_id, root_source_id = self.tool.extract_page_path(
|
|
video_id, root_source_id = self.tool.extract_page_path(
|
|
|
- i["path"]
|
|
|
|
|
|
|
+ card_path
|
|
|
)
|
|
)
|
|
|
- card_title = i["title"]
|
|
|
|
|
- card_cover = i["image_url"]
|
|
|
|
|
- mini_name = i["nike_name"]
|
|
|
|
|
|
|
+ card_title = card_item.get("title", "")
|
|
|
|
|
+ card_cover = card_item.get("image_url", "")
|
|
|
|
|
+ mini_name = card_item.get("nike_name", "")
|
|
|
query = """
|
|
query = """
|
|
|
- INSERT INTO cooperate_auto_reply_detail
|
|
|
|
|
|
|
+ INSERT INTO cooperate_auto_reply_detail
|
|
|
(
|
|
(
|
|
|
- task_id, position, msg_type, card_title, card_cover,
|
|
|
|
|
- video_id, root_source_id, mini_program_name, article_title, article_link,
|
|
|
|
|
- article_cover, article_text, article_images, article_desc, read_cnt,
|
|
|
|
|
|
|
+ task_id, position, msg_type, card_title, card_cover,
|
|
|
|
|
+ video_id, root_source_id, mini_program_name, article_title, article_link,
|
|
|
|
|
+ article_cover, article_text, article_images, article_desc, read_cnt,
|
|
|
like_cnt, publish_timestamp, task_result, wx_sn, card_index
|
|
like_cnt, publish_timestamp, task_result, wx_sn, card_index
|
|
|
) VALUES
|
|
) VALUES
|
|
|
(
|
|
(
|
|
|
- %s, %s, %s, %s, %s,
|
|
|
|
|
- %s, %s, %s, %s, %s,
|
|
|
|
|
|
|
+ %s, %s, %s, %s, %s,
|
|
|
|
|
+ %s, %s, %s, %s, %s,
|
|
|
%s, %s, %s, %s, %s,
|
|
%s, %s, %s, %s, %s,
|
|
|
%s, %s, %s, %s, %s
|
|
%s, %s, %s, %s, %s
|
|
|
);
|
|
);
|
|
@@ -229,16 +329,64 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
|
|
|
except Exception as e:
|
|
except Exception as e:
|
|
|
print(traceback.format_exc())
|
|
print(traceback.format_exc())
|
|
|
print(e)
|
|
print(e)
|
|
|
|
|
+ await self.log_service.log(
|
|
|
|
|
+ contents={
|
|
|
|
|
+ "task": "auto_reply_cards_monitor",
|
|
|
|
|
+ "function": "store_article",
|
|
|
|
|
+ "status": "fail",
|
|
|
|
|
+ "trace_id": task_id,
|
|
|
|
|
+ "message": f"store_article mini_program insert failed, task_id={task_id}, card_index={card_index}, error={e}",
|
|
|
|
|
+ }
|
|
|
|
|
+ )
|
|
|
|
|
+ return True
|
|
|
|
|
|
|
|
# 创建单个关注公众号任务
|
|
# 创建单个关注公众号任务
|
|
|
async def create_follow_single_account_task(self, gh_id):
|
|
async def create_follow_single_account_task(self, gh_id):
|
|
|
response = await get_article_list_from_account(account_id=gh_id)
|
|
response = await get_article_list_from_account(account_id=gh_id)
|
|
|
|
|
+ if not response:
|
|
|
|
|
+ await self.log_service.log(
|
|
|
|
|
+ contents={
|
|
|
|
|
+ "task": "auto_reply_cards_monitor",
|
|
|
|
|
+ "function": "create_follow_single_account_task",
|
|
|
|
|
+ "status": "fail",
|
|
|
|
|
+ "message": f"create_follow_single_account_task response is None, gh_id={gh_id}",
|
|
|
|
|
+ }
|
|
|
|
|
+ )
|
|
|
|
|
+ return
|
|
|
|
|
+
|
|
|
code = response.get("code")
|
|
code = response.get("code")
|
|
|
match code:
|
|
match code:
|
|
|
case 0:
|
|
case 0:
|
|
|
- recent_articles = response["data"]["data"][0]["AppMsg"]["DetailInfo"]
|
|
|
|
|
|
|
+ try:
|
|
|
|
|
+ recent_articles = (
|
|
|
|
|
+ response.get("data", {})
|
|
|
|
|
+ .get("data", [{}])[0]
|
|
|
|
|
+ .get("AppMsg", {})
|
|
|
|
|
+ .get("DetailInfo", [])
|
|
|
|
|
+ )
|
|
|
|
|
+ except (IndexError, TypeError, AttributeError) as e:
|
|
|
|
|
+ await self.log_service.log(
|
|
|
|
|
+ contents={
|
|
|
|
|
+ "task": "auto_reply_cards_monitor",
|
|
|
|
|
+ "function": "create_follow_single_account_task",
|
|
|
|
|
+ "status": "fail",
|
|
|
|
|
+ "message": f"create_follow_single_account_task parse response failed, gh_id={gh_id}, error={e}",
|
|
|
|
|
+ }
|
|
|
|
|
+ )
|
|
|
|
|
+ return
|
|
|
|
|
+
|
|
|
|
|
+ if not recent_articles:
|
|
|
|
|
+ await self.log_service.log(
|
|
|
|
|
+ contents={
|
|
|
|
|
+ "task": "auto_reply_cards_monitor",
|
|
|
|
|
+ "function": "create_follow_single_account_task",
|
|
|
|
|
+ "status": "warn",
|
|
|
|
|
+ "message": f"create_follow_single_account_task no recent articles, gh_id={gh_id}",
|
|
|
|
|
+ }
|
|
|
|
|
+ )
|
|
|
|
|
+ return
|
|
|
|
|
+
|
|
|
article_url = await self.tool.get_sample_url(recent_articles)
|
|
article_url = await self.tool.get_sample_url(recent_articles)
|
|
|
- print(article_url)
|
|
|
|
|
if article_url:
|
|
if article_url:
|
|
|
await self.mapper.set_sample_url(gh_id, article_url)
|
|
await self.mapper.set_sample_url(gh_id, article_url)
|
|
|
|
|
|
|
@@ -258,11 +406,17 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
|
|
|
await self.mapper.set_account_as_invalid(gh_id)
|
|
await self.mapper.set_account_as_invalid(gh_id)
|
|
|
|
|
|
|
|
case _:
|
|
case _:
|
|
|
- pass
|
|
|
|
|
|
|
+ await self.log_service.log(
|
|
|
|
|
+ contents={
|
|
|
|
|
+ "task": "auto_reply_cards_monitor",
|
|
|
|
|
+ "function": "create_follow_single_account_task",
|
|
|
|
|
+ "status": "warn",
|
|
|
|
|
+ "message": f"create_follow_single_account_task unexpected code={code}, gh_id={gh_id}",
|
|
|
|
|
+ }
|
|
|
|
|
+ )
|
|
|
|
|
|
|
|
# 创建单个账号自动回复任务
|
|
# 创建单个账号自动回复任务
|
|
|
async def create_auto_reply_single_account_task(self, gh_id, account_name):
|
|
async def create_auto_reply_single_account_task(self, gh_id, account_name):
|
|
|
- print(account_name)
|
|
|
|
|
task_id = self.tool.generate_task_id(task_name="auto_reply", gh_id=gh_id)
|
|
task_id = self.tool.generate_task_id(task_name="auto_reply", gh_id=gh_id)
|
|
|
# 先插入 task, 再创建自动回复任务
|
|
# 先插入 task, 再创建自动回复任务
|
|
|
create_row = await self.mapper.create_auto_reply_task(task_id, gh_id)
|
|
create_row = await self.mapper.create_auto_reply_task(task_id, gh_id)
|
|
@@ -271,66 +425,157 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
|
|
|
task_id, gh_id
|
|
task_id, gh_id
|
|
|
)
|
|
)
|
|
|
if not affected_rows:
|
|
if not affected_rows:
|
|
|
- print("发布任务至 AIGC 失败")
|
|
|
|
|
|
|
+ await self.log_service.log(
|
|
|
|
|
+ contents={
|
|
|
|
|
+ "task": "auto_reply_cards_monitor",
|
|
|
|
|
+ "function": "create_auto_reply_single_account_task",
|
|
|
|
|
+ "status": "fail",
|
|
|
|
|
+ "trace_id": task_id,
|
|
|
|
|
+ "message": f"发布任务至 AIGC 失败, task_id={task_id}, account={account_name}",
|
|
|
|
|
+ }
|
|
|
|
|
+ )
|
|
|
else:
|
|
else:
|
|
|
await self.mapper.update_auto_reply_task_status(
|
|
await self.mapper.update_auto_reply_task_status(
|
|
|
task_id, "task", self.INIT_STATUS, self.PROCESSING_STATUS
|
|
task_id, "task", self.INIT_STATUS, self.PROCESSING_STATUS
|
|
|
)
|
|
)
|
|
|
else:
|
|
else:
|
|
|
- print("创建任务至 DB 失败")
|
|
|
|
|
|
|
+ await self.log_service.log(
|
|
|
|
|
+ contents={
|
|
|
|
|
+ "task": "auto_reply_cards_monitor",
|
|
|
|
|
+ "function": "create_auto_reply_single_account_task",
|
|
|
|
|
+ "status": "fail",
|
|
|
|
|
+ "trace_id": task_id,
|
|
|
|
|
+ "message": f"创建任务至 DB 失败, task_id={task_id}, account={account_name}",
|
|
|
|
|
+ }
|
|
|
|
|
+ )
|
|
|
|
|
|
|
|
async def follow_gzh_task(self):
|
|
async def follow_gzh_task(self):
|
|
|
- account_list = self.tool.get_monitor_account_list()
|
|
|
|
|
|
|
+ try:
|
|
|
|
|
+ account_list = self.tool.get_monitor_account_list()
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ await self.log_service.log(
|
|
|
|
|
+ contents={
|
|
|
|
|
+ "task": "auto_reply_cards_monitor",
|
|
|
|
|
+ "function": "follow_gzh_task",
|
|
|
|
|
+ "status": "fail",
|
|
|
|
|
+ "message": f"follow_gzh_task get_monitor_account_list failed, error={e}",
|
|
|
|
|
+ }
|
|
|
|
|
+ )
|
|
|
|
|
+ return
|
|
|
|
|
+
|
|
|
|
|
+ if not account_list:
|
|
|
|
|
+ await self.log_service.log(
|
|
|
|
|
+ contents={
|
|
|
|
|
+ "task": "auto_reply_cards_monitor",
|
|
|
|
|
+ "function": "follow_gzh_task",
|
|
|
|
|
+ "status": "warn",
|
|
|
|
|
+ "message": "follow_gzh_task: no accounts from ODPS",
|
|
|
|
|
+ }
|
|
|
|
|
+ )
|
|
|
|
|
+ return
|
|
|
|
|
+
|
|
|
for account in account_list:
|
|
for account in account_list:
|
|
|
- try:
|
|
|
|
|
- fetch_response = await self.mapper.fetch_account_status(
|
|
|
|
|
- account.公众号名
|
|
|
|
|
|
|
+ account_name = getattr(account, "公众号名", None)
|
|
|
|
|
+ if not account_name:
|
|
|
|
|
+ await self.log_service.log(
|
|
|
|
|
+ contents={
|
|
|
|
|
+ "task": "auto_reply_cards_monitor",
|
|
|
|
|
+ "function": "follow_gzh_task",
|
|
|
|
|
+ "status": "warn",
|
|
|
|
|
+ "message": "follow_gzh_task account missing 公众号名, skip",
|
|
|
|
|
+ }
|
|
|
)
|
|
)
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ try:
|
|
|
|
|
+ fetch_response = await self.mapper.fetch_account_status(account_name)
|
|
|
if not fetch_response:
|
|
if not fetch_response:
|
|
|
affected_rows = await self.mapper.fetch_cooperate_accounts(
|
|
affected_rows = await self.mapper.fetch_cooperate_accounts(
|
|
|
- account.公众号名
|
|
|
|
|
|
|
+ account_name
|
|
|
)
|
|
)
|
|
|
if affected_rows:
|
|
if affected_rows:
|
|
|
fetch_response = await self.mapper.fetch_account_status(
|
|
fetch_response = await self.mapper.fetch_account_status(
|
|
|
- account.公众号名
|
|
|
|
|
|
|
+ account_name
|
|
|
)
|
|
)
|
|
|
-
|
|
|
|
|
else:
|
|
else:
|
|
|
- print(f"系统中无账号,跳过: {account.公众号名}")
|
|
|
|
|
|
|
+ await self.log_service.log(
|
|
|
|
|
+ contents={
|
|
|
|
|
+ "task": "auto_reply_cards_monitor",
|
|
|
|
|
+ "function": "follow_gzh_task",
|
|
|
|
|
+ "status": "warn",
|
|
|
|
|
+ "message": f"系统中无账号,跳过: {account_name}",
|
|
|
|
|
+ }
|
|
|
|
|
+ )
|
|
|
continue
|
|
continue
|
|
|
|
|
|
|
|
- account_detail = fetch_response[0]
|
|
|
|
|
- status = account_detail["status"]
|
|
|
|
|
|
|
+ if not fetch_response:
|
|
|
|
|
+ await self.log_service.log(
|
|
|
|
|
+ contents={
|
|
|
|
|
+ "task": "auto_reply_cards_monitor",
|
|
|
|
|
+ "function": "follow_gzh_task",
|
|
|
|
|
+ "status": "warn",
|
|
|
|
|
+ "message": f"fetch_account_status 返回空, 跳过: {account_name}",
|
|
|
|
|
+ }
|
|
|
|
|
+ )
|
|
|
|
|
+ continue
|
|
|
|
|
|
|
|
|
|
+ account_detail = fetch_response[0]
|
|
|
|
|
+ status = account_detail.get("status")
|
|
|
if not status:
|
|
if not status:
|
|
|
- print("账号已经迁移或者封禁")
|
|
|
|
|
|
|
+ await self.log_service.log(
|
|
|
|
|
+ contents={
|
|
|
|
|
+ "task": "auto_reply_cards_monitor",
|
|
|
|
|
+ "function": "follow_gzh_task",
|
|
|
|
|
+ "status": "warn",
|
|
|
|
|
+ "message": f"账号已经迁移或者封禁: {account_name}",
|
|
|
|
|
+ }
|
|
|
|
|
+ )
|
|
|
continue
|
|
continue
|
|
|
|
|
|
|
|
- # 新逻辑,无需考虑账号是否关注
|
|
|
|
|
- await self.create_auto_reply_single_account_task(
|
|
|
|
|
- account_detail["gh_id"], account.公众号名
|
|
|
|
|
- )
|
|
|
|
|
|
|
+ gh_id = account_detail.get("gh_id")
|
|
|
|
|
+ if not gh_id:
|
|
|
|
|
+ await self.log_service.log(
|
|
|
|
|
+ contents={
|
|
|
|
|
+ "task": "auto_reply_cards_monitor",
|
|
|
|
|
+ "function": "follow_gzh_task",
|
|
|
|
|
+ "status": "fail",
|
|
|
|
|
+ "message": f"账号缺少 gh_id: {account_name}",
|
|
|
|
|
+ }
|
|
|
|
|
+ )
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ await self.create_auto_reply_single_account_task(gh_id, account_name)
|
|
|
|
|
|
|
|
except Exception as e:
|
|
except Exception as e:
|
|
|
- print(f"处理账号{account.公众号名}异常", e)
|
|
|
|
|
|
|
+ await self.log_service.log(
|
|
|
|
|
+ contents={
|
|
|
|
|
+ "task": "auto_reply_cards_monitor",
|
|
|
|
|
+ "function": "follow_gzh_task",
|
|
|
|
|
+ "status": "fail",
|
|
|
|
|
+ "message": f"处理账号异常 account={account_name}, error={e}",
|
|
|
|
|
+ }
|
|
|
|
|
+ )
|
|
|
|
|
|
|
|
- # 异步获取关注结果
|
|
|
|
|
|
|
+ # 异步获取自动回复结果
|
|
|
async def get_auto_reply_response(self):
|
|
async def get_auto_reply_response(self):
|
|
|
task_list = await self.mapper.fetch_auto_replying_tasks()
|
|
task_list = await self.mapper.fetch_auto_replying_tasks()
|
|
|
if not task_list:
|
|
if not task_list:
|
|
|
- print("No processing task yet")
|
|
|
|
|
return
|
|
return
|
|
|
|
|
|
|
|
for task in tqdm(task_list):
|
|
for task in tqdm(task_list):
|
|
|
|
|
+ task_id = task.get("task_id")
|
|
|
|
|
+ if not task_id:
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
try:
|
|
try:
|
|
|
- task_id = task["task_id"]
|
|
|
|
|
response = await self.mapper.get_auto_reply_task_result(task_id)
|
|
response = await self.mapper.get_auto_reply_task_result(task_id)
|
|
|
if not response:
|
|
if not response:
|
|
|
continue
|
|
continue
|
|
|
|
|
|
|
|
- task_status = response[0]["task_status"]
|
|
|
|
|
- task_result = response[0]["task_result"]
|
|
|
|
|
- update_timestamp = response[0]["update_timestamp"]
|
|
|
|
|
|
|
+ task_record = response[0]
|
|
|
|
|
+ task_status = task_record.get("task_status")
|
|
|
|
|
+ task_result = task_record.get("task_result")
|
|
|
|
|
+ update_timestamp = task_record.get("update_timestamp")
|
|
|
|
|
|
|
|
match task_status:
|
|
match task_status:
|
|
|
case self.FETCH_FAIL_STATUS:
|
|
case self.FETCH_FAIL_STATUS:
|
|
@@ -347,12 +592,31 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
|
|
|
continue
|
|
continue
|
|
|
|
|
|
|
|
except Exception as e:
|
|
except Exception as e:
|
|
|
- print(e)
|
|
|
|
|
|
|
+ await self.log_service.log(
|
|
|
|
|
+ contents={
|
|
|
|
|
+ "task": "auto_reply_cards_monitor",
|
|
|
|
|
+ "function": "get_auto_reply_response",
|
|
|
|
|
+ "status": "fail",
|
|
|
|
|
+ "trace_id": task_id,
|
|
|
|
|
+ "message": f"get_auto_reply_response failed, task_id={task_id}, error={e}",
|
|
|
|
|
+ }
|
|
|
|
|
+ )
|
|
|
|
|
|
|
|
# 解析单个xml
|
|
# 解析单个xml
|
|
|
async def extract_single_xml(self, task):
|
|
async def extract_single_xml(self, task):
|
|
|
- task_id = task["task_id"]
|
|
|
|
|
- result = task["result"]
|
|
|
|
|
|
|
+ task_id = task.get("task_id")
|
|
|
|
|
+ result = task.get("result")
|
|
|
|
|
+ if not task_id or not result:
|
|
|
|
|
+ await self.log_service.log(
|
|
|
|
|
+ contents={
|
|
|
|
|
+ "task": "auto_reply_cards_monitor",
|
|
|
|
|
+ "function": "extract_single_xml",
|
|
|
|
|
+ "status": "fail",
|
|
|
|
|
+ "trace_id": task_id,
|
|
|
|
|
+ "message": "extract_single_xml missing task_id or result",
|
|
|
|
|
+ }
|
|
|
|
|
+ )
|
|
|
|
|
+ return
|
|
|
|
|
|
|
|
# acquire lock
|
|
# acquire lock
|
|
|
acquire_lock = await self.mapper.update_auto_reply_task_status(
|
|
acquire_lock = await self.mapper.update_auto_reply_task_status(
|
|
@@ -363,35 +627,126 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
|
|
|
|
|
|
|
|
try:
|
|
try:
|
|
|
# parse xml
|
|
# parse xml
|
|
|
- xml_list = json.loads(result) if isinstance(result, str) else result
|
|
|
|
|
|
|
+ if isinstance(result, str):
|
|
|
|
|
+ xml_list = json.loads(result)
|
|
|
|
|
+ else:
|
|
|
|
|
+ xml_list = result
|
|
|
|
|
+
|
|
|
|
|
+ if not isinstance(xml_list, list):
|
|
|
|
|
+ await self.log_service.log(
|
|
|
|
|
+ contents={
|
|
|
|
|
+ "task": "auto_reply_cards_monitor",
|
|
|
|
|
+ "function": "extract_single_xml",
|
|
|
|
|
+ "status": "fail",
|
|
|
|
|
+ "trace_id": task_id,
|
|
|
|
|
+ "message": f"extract_single_xml result is not a list, task_id={task_id}",
|
|
|
|
|
+ }
|
|
|
|
|
+ )
|
|
|
|
|
+ await self.mapper.update_auto_reply_task_status(
|
|
|
|
|
+ task_id, "extract", self.PROCESSING_STATUS, self.FAIL_STATUS
|
|
|
|
|
+ )
|
|
|
|
|
+ return
|
|
|
|
|
+
|
|
|
index = 0
|
|
index = 0
|
|
|
|
|
+ success_count = 0
|
|
|
|
|
+ fail_count = 0
|
|
|
|
|
+
|
|
|
for item in xml_list:
|
|
for item in xml_list:
|
|
|
xml_obj_list = self.tool.extract_callback_xml(item)
|
|
xml_obj_list = self.tool.extract_callback_xml(item)
|
|
|
if xml_obj_list:
|
|
if xml_obj_list:
|
|
|
for xml_obj in xml_obj_list:
|
|
for xml_obj in xml_obj_list:
|
|
|
index += 1
|
|
index += 1
|
|
|
msg_type = xml_obj.get("msg_type", None)
|
|
msg_type = xml_obj.get("msg_type", None)
|
|
|
- match msg_type:
|
|
|
|
|
- case "33":
|
|
|
|
|
- await self.store_card(task_id, index, msg_type, xml_obj)
|
|
|
|
|
-
|
|
|
|
|
- case "5":
|
|
|
|
|
- await self.store_article(
|
|
|
|
|
- task_id, index, msg_type, xml_obj
|
|
|
|
|
- )
|
|
|
|
|
-
|
|
|
|
|
- case _:
|
|
|
|
|
- continue
|
|
|
|
|
-
|
|
|
|
|
- await asyncio.sleep(5)
|
|
|
|
|
|
|
+ try:
|
|
|
|
|
+ result = False
|
|
|
|
|
+ match msg_type:
|
|
|
|
|
+ case "33":
|
|
|
|
|
+ result = await self.store_card(
|
|
|
|
|
+ task_id, index, msg_type, xml_obj
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ case "5":
|
|
|
|
|
+ result = await self.store_article(
|
|
|
|
|
+ task_id, index, msg_type, xml_obj
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ case _:
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ if result:
|
|
|
|
|
+ success_count += 1
|
|
|
|
|
+ else:
|
|
|
|
|
+ fail_count += 1
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ fail_count += 1
|
|
|
|
|
+ await self.log_service.log(
|
|
|
|
|
+ contents={
|
|
|
|
|
+ "task": "auto_reply_cards_monitor",
|
|
|
|
|
+ "function": "extract_single_xml",
|
|
|
|
|
+ "status": "fail",
|
|
|
|
|
+ "trace_id": task_id,
|
|
|
|
|
+ "message": f"处理 xml_obj 异常, task_id={task_id}, index={index}, msg_type={msg_type}, error={e}",
|
|
|
|
|
+ }
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ await asyncio.sleep(5)
|
|
|
|
|
+
|
|
|
|
|
+ # 根据成功/失败数量决定最终状态
|
|
|
|
|
+ if fail_count == 0 and success_count > 0:
|
|
|
|
|
+ final_status = self.SUCCESS_STATUS
|
|
|
|
|
+ elif success_count > 0 and fail_count > 0:
|
|
|
|
|
+ # 部分成功,记录警告但仍标记为成功
|
|
|
|
|
+ final_status = self.SUCCESS_STATUS
|
|
|
|
|
+ await self.log_service.log(
|
|
|
|
|
+ contents={
|
|
|
|
|
+ "task": "auto_reply_cards_monitor",
|
|
|
|
|
+ "function": "extract_single_xml",
|
|
|
|
|
+ "status": "warn",
|
|
|
|
|
+ "trace_id": task_id,
|
|
|
|
|
+ "message": f"任务部分成功, task_id={task_id}, success={success_count}, fail={fail_count}",
|
|
|
|
|
+ }
|
|
|
|
|
+ )
|
|
|
|
|
+ elif success_count == 0 and fail_count > 0:
|
|
|
|
|
+ # 全部失败
|
|
|
|
|
+ final_status = self.FAIL_STATUS
|
|
|
|
|
+ await self.log_service.log(
|
|
|
|
|
+ contents={
|
|
|
|
|
+ "task": "auto_reply_cards_monitor",
|
|
|
|
|
+ "function": "extract_single_xml",
|
|
|
|
|
+ "status": "fail",
|
|
|
|
|
+ "trace_id": task_id,
|
|
|
|
|
+ "message": f"任务全部失败, task_id={task_id}, fail={fail_count}",
|
|
|
|
|
+ }
|
|
|
|
|
+ )
|
|
|
|
|
+ else:
|
|
|
|
|
+ # success_count == 0 and fail_count == 0,没有处理任何数据
|
|
|
|
|
+ final_status = self.SUCCESS_STATUS
|
|
|
|
|
+ await self.log_service.log(
|
|
|
|
|
+ contents={
|
|
|
|
|
+ "task": "auto_reply_cards_monitor",
|
|
|
|
|
+ "function": "extract_single_xml",
|
|
|
|
|
+ "status": "warn",
|
|
|
|
|
+ "trace_id": task_id,
|
|
|
|
|
+ "message": f"任务无数据处理, task_id={task_id}",
|
|
|
|
|
+ }
|
|
|
|
|
+ )
|
|
|
|
|
|
|
|
await self.mapper.update_auto_reply_task_status(
|
|
await self.mapper.update_auto_reply_task_status(
|
|
|
- task_id, "extract", self.PROCESSING_STATUS, self.SUCCESS_STATUS
|
|
|
|
|
|
|
+ task_id, "extract", self.PROCESSING_STATUS, final_status
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
except Exception as e:
|
|
except Exception as e:
|
|
|
- print(e)
|
|
|
|
|
print(traceback.format_exc())
|
|
print(traceback.format_exc())
|
|
|
|
|
+ await self.log_service.log(
|
|
|
|
|
+ contents={
|
|
|
|
|
+ "task": "auto_reply_cards_monitor",
|
|
|
|
|
+ "function": "extract_single_xml",
|
|
|
|
|
+ "status": "fail",
|
|
|
|
|
+ "trace_id": task_id,
|
|
|
|
|
+ "message": f"extract_single_xml failed, task_id={task_id}, error={e}",
|
|
|
|
|
+ }
|
|
|
|
|
+ )
|
|
|
await self.mapper.update_auto_reply_task_status(
|
|
await self.mapper.update_auto_reply_task_status(
|
|
|
task_id, "extract", self.PROCESSING_STATUS, self.FAIL_STATUS
|
|
task_id, "extract", self.PROCESSING_STATUS, self.FAIL_STATUS
|
|
|
)
|
|
)
|
|
@@ -408,15 +763,18 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
|
|
|
case "extract_task":
|
|
case "extract_task":
|
|
|
task_list = await self.mapper.get_extract_tasks()
|
|
task_list = await self.mapper.get_extract_tasks()
|
|
|
if not task_list:
|
|
if not task_list:
|
|
|
- print("No tasks to extract now")
|
|
|
|
|
return
|
|
return
|
|
|
|
|
|
|
|
for task in tqdm(task_list, desc="解析任务"):
|
|
for task in tqdm(task_list, desc="解析任务"):
|
|
|
await self.extract_single_xml(task)
|
|
await self.extract_single_xml(task)
|
|
|
await asyncio.sleep(10)
|
|
await asyncio.sleep(10)
|
|
|
|
|
|
|
|
- case "re_extract_task":
|
|
|
|
|
- pass
|
|
|
|
|
-
|
|
|
|
|
case _:
|
|
case _:
|
|
|
- print("task_error")
|
|
|
|
|
|
|
+ await self.log_service.log(
|
|
|
|
|
+ contents={
|
|
|
|
|
+ "task": "auto_reply_cards_monitor",
|
|
|
|
|
+ "function": "deal",
|
|
|
|
|
+ "status": "fail",
|
|
|
|
|
+ "message": f"unknown task_name: {task_name}",
|
|
|
|
|
+ }
|
|
|
|
|
+ )
|