auto_reply_cards_monitor.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. import json
  2. import xml.etree.ElementTree as ET
  3. from urllib.parse import unquote, parse_qs
  4. from pandas import DataFrame
  5. def parse_xml(xml_text):
  6. # 1. 解析 XML
  7. try:
  8. root = ET.fromstring(xml_text)
  9. page_path = root.find(".//pagepath").text
  10. path, query = page_path.split("?", 1)
  11. query_decoded = unquote(query)
  12. params = parse_qs(query_decoded)
  13. card_title = root.find(".//title").text
  14. mini_program = root.find(".//sourcedisplayname").text
  15. obj = {
  16. "page_path": page_path,
  17. "title": card_title,
  18. "mini_program": mini_program,
  19. "params": params,
  20. }
  21. return obj
  22. except Exception:
  23. return None
  24. class AutoReplyCardsMonitorConst:
  25. # fetch_status
  26. FETCH_INIT_STATUS = 0
  27. FETCH_PROCESSING_STATUS = 1
  28. FETCH_SUCCESS_STATUS = 2
  29. FETCH_FAIL_STATUS = 3
  30. # task_status
  31. INIT_STATUS = 0
  32. PROCESSING_STATUS = 1
  33. SUCCESS_STATUS = 2
  34. FAIL_STATUS = 99
  35. class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
  36. def __init__(self, pool, log_client):
  37. self.pool = pool
  38. self.log_client = log_client
  39. async def get_tasks(self):
  40. query = """
  41. SELECT * FROM auto_reply_tasks;
  42. """
  43. return await self.pool.async_fetch(query)
  44. async def get_result_from_aigc(self, task_id):
  45. query = """
  46. SELECT gzh_name, task_status, task_result, from_unixtime(create_timestamp / 1000) as create_time,
  47. from_unixtime(update_timestamp / 1000) as update_time,
  48. err_msg
  49. FROM gzh_msg_record
  50. WHERE task_id = %s;
  51. """
  52. return await self.pool.async_fetch(query=query, params=(task_id,), db_name="aigc")
  53. # async def save_to_each_record(self, record: dict):
  54. # update_query = """
  55. #
  56. # """
  57. # pass
  58. async def deal(self):
  59. # tasks = await self.get_tasks()
  60. task_ids = [i for i in range(1, 9)]
  61. L = []
  62. for task_id in task_ids:
  63. record = await self.get_result_from_aigc(task_id)
  64. if not record:
  65. continue
  66. task_status = record[0]['task_status']
  67. match task_status:
  68. case self.FETCH_INIT_STATUS:
  69. continue
  70. case self.FETCH_PROCESSING_STATUS:
  71. continue
  72. case self.FETCH_SUCCESS_STATUS:
  73. fetch_result = record[0]['task_result']
  74. fetch_detail_list = json.loads(fetch_result)
  75. for index, xml_txt in enumerate(fetch_detail_list, 1):
  76. extract_info = parse_xml(xml_txt)
  77. if not extract_info:
  78. continue
  79. _id = extract_info.get('params', {}).get('id')
  80. _video_id = extract_info.get('params', {}).get('video_id')
  81. if _id:
  82. vid = _id[0]
  83. elif _video_id:
  84. vid = _video_id[0]
  85. else:
  86. vid = ''
  87. temp = [
  88. record[0]['gzh_name'],
  89. extract_info.get('title'),
  90. extract_info.get('mini_program'),
  91. vid,
  92. extract_info.get('params', {}).get('rootSourceId', [''])[0],
  93. extract_info.get('params', {}).get('rootShareId', [''])[0],
  94. index,
  95. record[0]['create_time'],
  96. record[0]['update_time'],
  97. extract_info['page_path']
  98. ]
  99. L.append(temp)
  100. case self.FETCH_FAIL_STATUS:
  101. error_msg = record['err_msg']
  102. print(error_msg)
  103. df = DataFrame(L, columns=['账号名称', '标题', '小程序', '视频 id', 'rootSourceId', 'rootShareId', 'index', 'create_time', 'update_time', 'page_path'])
  104. df.to_csv("local_data.csv", index=False)