| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120 |
- import json
- import xml.etree.ElementTree as ET
- from urllib.parse import unquote, parse_qs
- from pandas import DataFrame
- def parse_xml(xml_text):
- # 1. 解析 XML
- try:
- root = ET.fromstring(xml_text)
- page_path = root.find(".//pagepath").text
- path, query = page_path.split("?", 1)
- query_decoded = unquote(query)
- params = parse_qs(query_decoded)
- card_title = root.find(".//title").text
- mini_program = root.find(".//sourcedisplayname").text
- obj = {
- "page_path": page_path,
- "title": card_title,
- "mini_program": mini_program,
- "params": params,
- }
- return obj
- except Exception:
- return None
- class AutoReplyCardsMonitorConst:
- # fetch_status
- FETCH_INIT_STATUS = 0
- FETCH_PROCESSING_STATUS = 1
- FETCH_SUCCESS_STATUS = 2
- FETCH_FAIL_STATUS = 3
- # task_status
- INIT_STATUS = 0
- PROCESSING_STATUS = 1
- SUCCESS_STATUS = 2
- FAIL_STATUS = 99
- class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
- def __init__(self, pool, log_client):
- self.pool = pool
- self.log_client = log_client
- async def get_tasks(self):
- query = """
- SELECT * FROM auto_reply_tasks;
- """
- return await self.pool.async_fetch(query)
- async def get_result_from_aigc(self, task_id):
- query = """
- SELECT gzh_name, task_status, task_result, from_unixtime(create_timestamp / 1000) as create_time,
- from_unixtime(update_timestamp / 1000) as update_time,
- err_msg
- FROM gzh_msg_record
- WHERE task_id = %s;
- """
- return await self.pool.async_fetch(query=query, params=(task_id,), db_name="aigc")
- # async def save_to_each_record(self, record: dict):
- # update_query = """
- #
- # """
- # pass
- async def deal(self):
- # tasks = await self.get_tasks()
- task_ids = [i for i in range(1, 9)]
- L = []
- for task_id in task_ids:
- record = await self.get_result_from_aigc(task_id)
- if not record:
- continue
- task_status = record[0]['task_status']
- match task_status:
- case self.FETCH_INIT_STATUS:
- continue
- case self.FETCH_PROCESSING_STATUS:
- continue
- case self.FETCH_SUCCESS_STATUS:
- fetch_result = record[0]['task_result']
- fetch_detail_list = json.loads(fetch_result)
- for index, xml_txt in enumerate(fetch_detail_list, 1):
- extract_info = parse_xml(xml_txt)
- if not extract_info:
- continue
- _id = extract_info.get('params', {}).get('id')
- _video_id = extract_info.get('params', {}).get('video_id')
- if _id:
- vid = _id[0]
- elif _video_id:
- vid = _video_id[0]
- else:
- vid = ''
- temp = [
- record[0]['gzh_name'],
- extract_info.get('title'),
- extract_info.get('mini_program'),
- vid,
- extract_info.get('params', {}).get('rootSourceId', [''])[0],
- extract_info.get('params', {}).get('rootShareId', [''])[0],
- index,
- record[0]['create_time'],
- record[0]['update_time'],
- extract_info['page_path']
- ]
- L.append(temp)
- case self.FETCH_FAIL_STATUS:
- error_msg = record['err_msg']
- print(error_msg)
- df = DataFrame(L, columns=['账号名称', '标题', '小程序', '视频 id', 'rootSourceId', 'rootShareId', 'index', 'create_time', 'update_time', 'page_path'])
- df.to_csv("local_data.csv", index=False)
|