|
|
@@ -154,6 +154,7 @@ class AutoReplyCardsMonitorUtils(AutoReplyCardsMonitorConst):
|
|
|
GROUP BY 公众号名, ghid
|
|
|
HAVING uv > 100
|
|
|
ORDER BY uv DESC
|
|
|
+ LIMIT 10
|
|
|
;
|
|
|
"""
|
|
|
result = fetch_from_odps(query)
|
|
|
@@ -173,18 +174,10 @@ class AutoReplyCardsMonitorMapper(AutoReplyCardsMonitorUtils):
|
|
|
self.pool = pool
|
|
|
self.log_client = log_client
|
|
|
|
|
|
- # 获取关注公众号任务结果
|
|
|
- async def get_follow_account_task_result(self, task_id):
|
|
|
- pass
|
|
|
-
|
|
|
- # 创建自动回复任务
|
|
|
- async def create_auto_reply_task(self):
|
|
|
- pass
|
|
|
-
|
|
|
# 获取自动回复任务结果
|
|
|
async def get_auto_reply_task_result(self, task_id):
|
|
|
query = """
|
|
|
- SELECT task_result, task_status, err_msg,from_unixtime(update_timestamp / 1000) AS update_time
|
|
|
+ SELECT task_result, task_status, err_msg, update_timestamp
|
|
|
FROM gzh_msg_record
|
|
|
WHERE task_id = %s;
|
|
|
"""
|
|
|
@@ -192,18 +185,6 @@ class AutoReplyCardsMonitorMapper(AutoReplyCardsMonitorUtils):
|
|
|
query=query, params=(task_id,), db_name="aigc"
|
|
|
)
|
|
|
|
|
|
- # 获取关注公众号任务列表
|
|
|
- async def get_follow_account_task_list(self):
|
|
|
- pass
|
|
|
-
|
|
|
- # 获取自动回复任务列表
|
|
|
- async def get_auto_reply_task_list(self):
|
|
|
- pass
|
|
|
-
|
|
|
- # 插入待关注公众号
|
|
|
- async def insert_accounts_task(self, account_name, gh_id):
|
|
|
- pass
|
|
|
-
|
|
|
# 查询账号
|
|
|
async def fetch_account_status(self, account_name):
|
|
|
query = """
|
|
|
@@ -271,13 +252,69 @@ class AutoReplyCardsMonitorMapper(AutoReplyCardsMonitorUtils):
|
|
|
query=query, params=(f"follow_{gh_id}",), db_name="aigc"
|
|
|
)
|
|
|
|
|
|
+ # 创建自动回复任务
|
|
|
+ async def create_auto_reply_task(self, task_id, gh_id):
|
|
|
+ query = """
|
|
|
+ INSERT INTO cooperate_accounts_task (task_id, gh_id) VALUES (%s, %s);
|
|
|
+ """
|
|
|
+ return await self.pool.async_save(query=query, params=(task_id, gh_id))
|
|
|
+
|
|
|
+ async def update_auto_reply_task_status(
|
|
|
+ self, task_id, status_type, ori_status, new_status
|
|
|
+ ):
|
|
|
+ task_query = """
|
|
|
+ UPDATE cooperate_accounts_task SET task_status = %s WHERE task_id = %s AND task_status = %s;
|
|
|
+ """
|
|
|
+ extract_query = """
|
|
|
+ UPDATE cooperate_accounts_task SET extract_status = %s WHERE task_id = %s AND extract_status = %s;
|
|
|
+ """
|
|
|
+ match status_type:
|
|
|
+ case "task":
|
|
|
+ return await self.pool.async_save(
|
|
|
+ query=task_query, params=(new_status, task_id, ori_status)
|
|
|
+ )
|
|
|
+ case "extract":
|
|
|
+ return await self.pool.async_save(
|
|
|
+ query=extract_query, params=(new_status, task_id, ori_status)
|
|
|
+ )
|
|
|
+ case _:
|
|
|
+ print("status_type_error")
|
|
|
+ return None
|
|
|
+
|
|
|
+ # 获取正在自动回复卡片的任务 id
|
|
|
+ async def fetch_auto_replying_tasks(self):
|
|
|
+ query = """
|
|
|
+ SELECT task_id FROM cooperate_accounts_task WHERE task_status = %s;
|
|
|
+ """
|
|
|
+ return await self.pool.async_fetch(
|
|
|
+ query=query, params=(self.PROCESSING_STATUS,)
|
|
|
+ )
|
|
|
+
|
|
|
+ # 设置自动回复结果
|
|
|
+ async def set_auto_reply_result(self, task_id, finish_timestamp, result):
|
|
|
+ query = """
|
|
|
+ UPDATE cooperate_accounts_task
|
|
|
+ SET finish_timestamp = %s, result = %s, task_status = %s
|
|
|
+ WHERE task_id = %s and task_status = %s;
|
|
|
+ """
|
|
|
+ return await self.pool.async_save(
|
|
|
+ query=query,
|
|
|
+ params=(
|
|
|
+ finish_timestamp,
|
|
|
+ result,
|
|
|
+ self.SUCCESS_STATUS,
|
|
|
+ task_id,
|
|
|
+ self.PROCESSING_STATUS,
|
|
|
+ ),
|
|
|
+ )
|
|
|
+
|
|
|
|
|
|
class AutoReplyCardsMonitor(AutoReplyCardsMonitorMapper):
|
|
|
def __init__(self, pool, log_client):
|
|
|
super().__init__(pool, log_client)
|
|
|
|
|
|
# 创建单个关注公众号任务
|
|
|
- async def create_follow_account_task(self, gh_id):
|
|
|
+ async def create_follow_single_account_task(self, gh_id):
|
|
|
response = await get_article_list_from_account(account_id=gh_id)
|
|
|
code = response.get("code")
|
|
|
match code:
|
|
|
@@ -304,6 +341,24 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorMapper):
|
|
|
case _:
|
|
|
pass
|
|
|
|
|
|
+ # 创建单个账号自动回复任务
|
|
|
+ async def create_auto_reply_single_account_task(self, gh_id, account_name):
|
|
|
+ task_id = self.generate_task_id(task_name="auto_reply", gh_id=gh_id)
|
|
|
+ # 先插入 task, 再创建自动回复任务
|
|
|
+ create_row = await self.create_auto_reply_task(task_id, gh_id)
|
|
|
+ if create_row:
|
|
|
+ affected_rows = await self.insert_aigc_auto_reply_task(
|
|
|
+ task_id, account_name
|
|
|
+ )
|
|
|
+ if not affected_rows:
|
|
|
+ print("发布任务至 AIGC 失败")
|
|
|
+ else:
|
|
|
+ await self.update_auto_reply_task_status(
|
|
|
+ task_id, "task", self.INIT_STATUS, self.PROCESSING_STATUS
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ print("创建任务至 DB 失败")
|
|
|
+
|
|
|
async def follow_gzh_task(self):
|
|
|
account_list = self.get_monitor_account_list()
|
|
|
for account in account_list:
|
|
|
@@ -326,7 +381,7 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorMapper):
|
|
|
|
|
|
match follow_status:
|
|
|
case self.INIT_STATUS:
|
|
|
- await self.create_follow_account_task(
|
|
|
+ await self.create_follow_single_account_task(
|
|
|
account_detail["gh_id"]
|
|
|
)
|
|
|
|
|
|
@@ -361,7 +416,10 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorMapper):
|
|
|
)
|
|
|
|
|
|
case self.SUCCESS_STATUS:
|
|
|
- continue
|
|
|
+ # 账号已经关注,创建获取自动回复任务
|
|
|
+ await self.create_auto_reply_single_account_task(
|
|
|
+ account_detail["gh_id"], account.公众号名
|
|
|
+ )
|
|
|
|
|
|
case _:
|
|
|
print(f"{account.公众号名}账号状态异常")
|
|
|
@@ -369,11 +427,54 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorMapper):
|
|
|
except Exception as e:
|
|
|
print(f"处理账号{account.公众号名}异常", e)
|
|
|
|
|
|
+ # 异步获取关注结果
|
|
|
+ async def get_auto_reply_response(self):
|
|
|
+ task_list = await self.fetch_auto_replying_tasks()
|
|
|
+ if not task_list:
|
|
|
+ return
|
|
|
+
|
|
|
+ for task in task_list:
|
|
|
+ try:
|
|
|
+ task_id = task["task_id"]
|
|
|
+ response = await self.get_auto_reply_task_result(task_id)
|
|
|
+ if not response:
|
|
|
+ continue
|
|
|
+
|
|
|
+ task_status = response[0]["task_status"]
|
|
|
+ task_result = response[0]["task_result"]
|
|
|
+ update_timestamp = response[0]["update_timestamp"]
|
|
|
+ match task_status:
|
|
|
+ case self.FETCH_FAIL_STATUS:
|
|
|
+ await self.update_auto_reply_task_status(
|
|
|
+ task_id, "task", self.PROCESSING_STATUS, self.FAIL_STATUS
|
|
|
+ )
|
|
|
+
|
|
|
+ case self.FETCH_SUCCESS_STATUS:
|
|
|
+ await self.set_auto_reply_result(
|
|
|
+ task_id, update_timestamp, task_result
|
|
|
+ )
|
|
|
+
|
|
|
+ case _:
|
|
|
+ continue
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print(e)
|
|
|
+
|
|
|
+ # 解析 xml 并且更新数据
|
|
|
+ async def extract_task(self):
|
|
|
+ pass
|
|
|
+
|
|
|
# main function
|
|
|
async def deal(self, task_name):
|
|
|
match task_name:
|
|
|
case "follow_gzh_task":
|
|
|
await self.follow_gzh_task()
|
|
|
|
|
|
+ case "get_auto_reply_task":
|
|
|
+ await self.get_auto_reply_response()
|
|
|
+
|
|
|
+ case "extract_task":
|
|
|
+ await self.extract_task()
|
|
|
+
|
|
|
case _:
|
|
|
print("task_error")
|