123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748 |
- #! /usr/bin/env python
- # -*- coding: utf-8 -*-
- # vim:fenc=utf-8
- from pqai_agent.logging_service import logger
- from typing import Dict, Optional, List
- import json
- import time
- import os
- import abc
- import pymysql.cursors
- from pqai_agent import configs
- from pqai_agent.database import MySQLManager
- class UserManager(abc.ABC):
- @abc.abstractmethod
- def get_user_profile(self, user_id) -> Dict:
- pass
- @abc.abstractmethod
- def save_user_profile(self, user_id, profile: Dict) -> None:
- pass
- @abc.abstractmethod
- def list_all_users(self):
- pass
- @abc.abstractmethod
- def get_staff_profile(self, staff_id) -> Dict:
- # FIXME(zhoutian): 重新设计用户和员工数据管理模型
- pass
- @staticmethod
- def get_default_profile(**kwargs) -> Dict:
- default_profile = {
- "name": "",
- "nickname": "",
- "avatar": "",
- "preferred_nickname": "",
- "gender": "未知",
- "age": 0,
- "region": "",
- "interests": [],
- "family_members": {},
- "health_conditions": [],
- "medications": [],
- "reminder_preferences": {
- "medication": True,
- "health": True,
- "weather": True,
- "news": False,
- },
- "interaction_style": "standard", # standard, verbose, concise
- "interaction_frequency": "medium", # low, medium, high
- "last_topics": [],
- "created_at": int(time.time() * 1000),
- "human_intervention_history": [],
- }
- for key, value in kwargs.items():
- if key in default_profile:
- default_profile[key] = value
- return default_profile
- def list_users(self, **kwargs) -> List[Dict]:
- pass
- class UserRelationManager(abc.ABC):
- @abc.abstractmethod
- def list_staffs(self):
- pass
- @abc.abstractmethod
- def list_users(self, staff_id: str, page: int = 1, page_size: int = 100):
- pass
- @abc.abstractmethod
- def list_staff_users(self, staff_id: str = None, tag_id: int = None) -> List[Dict]:
- pass
- @abc.abstractmethod
- def get_user_tags(self, user_id: str) -> List[str]:
- pass
- @abc.abstractmethod
- def stop_user_daily_push(self, user_id: str) -> bool:
- pass
- class LocalUserManager(UserManager):
- def get_user_profile(self, user_id) -> Dict:
- """加载用户个人资料,如不存在则创建默认资料。主要用于本地调试"""
- default_profile = self.get_default_profile()
- try:
- with open(f"user_profiles/{user_id}.json", "r", encoding="utf-8") as f:
- profile = json.load(f)
- entry_added = False
- for key, value in default_profile.items():
- if key not in profile:
- logger.debug(
- f"user[{user_id}] add profile key[{key}] value[{value}]"
- )
- profile[key] = value
- entry_added = True
- if entry_added:
- self.save_user_profile(user_id, profile)
- return profile
- except FileNotFoundError:
- # 创建默认用户资料
- self.save_user_profile(user_id, default_profile)
- return default_profile
- def save_user_profile(self, user_id, profile: Dict) -> None:
- if not user_id:
- raise Exception("Invalid user_id: {}".format(user_id))
- with open(f"user_profiles/{user_id}.json", "w", encoding="utf-8") as f:
- json.dump(profile, f, ensure_ascii=False, indent=2)
- def list_all_users(self):
- user_ids = []
- for root, dirs, files in os.walk("../user_profiles/"):
- for file in files:
- if file.endswith(".json"):
- user_ids.append(os.path.splitext(file)[0])
- return user_ids
- def get_staff_profile(self, staff_id) -> Dict:
- try:
- with open(f"user_profiles/{staff_id}.json", "r", encoding="utf-8") as f:
- profile = json.load(f)
- entry_added = False
- if entry_added:
- self.save_user_profile(staff_id, profile)
- return profile
- except Exception as e:
- logger.error("staff profile not found: {}".format(e))
- return {}
- def list_users(self, **kwargs) -> List[Dict]:
- pass
- class MySQLUserManager(UserManager):
- PROFILE_EXCLUDE_ITEMS = [
- "avatar",
- ]
- def __init__(self, db_config, table_name, staff_table):
- self.db = MySQLManager(db_config)
- self.table_name = table_name
- self.staff_table = staff_table
- def get_user_profile(self, user_id) -> Dict:
- sql = (
- f"SELECT name, wxid, profile_data_v1, gender, iconurl as avatar"
- f" FROM {self.table_name} WHERE third_party_user_id = {user_id}"
- )
- data = self.db.select(sql, pymysql.cursors.DictCursor)
- if not data:
- logger.error(f"user[{user_id}] not found")
- return {}
- data = data[0]
- gender_map = {0: "未知", 1: "男", 2: "女", None: "未知"}
- gender = gender_map[data["gender"]]
- default_profile = self.get_default_profile(
- nickname=data["name"], gender=gender, avatar=data["avatar"]
- )
- if not data["profile_data_v1"]:
- logger.warning(f"user[{user_id}] profile not found, create a default one")
- self.save_user_profile(user_id, default_profile)
- return default_profile
- else:
- profile = json.loads(data["profile_data_v1"])
- # 资料条目有增加时,需合并更新
- entry_added = False
- for key, value in default_profile.items():
- if key not in profile:
- # logger.debug(f"user[{user_id}] add profile key[{key}] value[{value}]")
- profile[key] = value
- entry_added = True
- if entry_added:
- self.save_user_profile(user_id, profile)
- return profile
- def save_user_profile(self, user_id, profile: Dict) -> None:
- if not user_id:
- raise Exception("Invalid user_id: {}".format(user_id))
- if configs.get().get("debug_flags", {}).get("disable_database_write", False):
- return
- profile = profile.copy()
- for name in self.PROFILE_EXCLUDE_ITEMS:
- profile.pop(name, None)
- sql = f"UPDATE {self.table_name} SET profile_data_v1 = %s WHERE third_party_user_id = {user_id}"
- self.db.execute(sql, (json.dumps(profile),))
- def list_all_users(self):
- sql = f"SELECT third_party_user_id FROM {self.table_name}"
- data = self.db.select(sql, pymysql.cursors.DictCursor)
- return [user["third_party_user_id"] for user in data]
- def get_staff_profile(self, staff_id) -> Dict:
- if not self.staff_table:
- raise Exception("staff_table is not set")
- return self.get_staff_profile_v3(staff_id)
- def get_staff_profile_v1(self, staff_id) -> Dict:
- sql = (
- f"SELECT agent_name, agent_gender, agent_age, agent_region, agent_profile "
- f"FROM {self.staff_table} WHERE third_party_user_id = '{staff_id}'"
- )
- data = self.db.select(sql, pymysql.cursors.DictCursor)
- if not data:
- logger.error(f"staff[{staff_id}] not found")
- return {}
- profile = data[0]
- # 转换性别格式
- gender_map = {0: "未知", 1: "男", 2: "女", None: "未知"}
- profile["agent_gender"] = gender_map[profile["agent_gender"]]
- return profile
- def get_staff_profile_v2(self, staff_id) -> Dict:
- sql = (
- f"SELECT agent_name as name, agent_gender as gender, agent_age as age, agent_region as region, agent_profile "
- f"FROM {self.staff_table} WHERE third_party_user_id = '{staff_id}'"
- )
- data = self.db.select(sql, pymysql.cursors.DictCursor)
- if not data:
- logger.error(f"staff[{staff_id}] not found")
- return {}
- profile = data[0]
- # 转换性别格式
- gender_map = {0: "未知", 1: "男", 2: "女", None: "未知"}
- profile["gender"] = gender_map[profile["gender"]]
- # 合并JSON字段(新版本)数据
- if profile["agent_profile"]:
- detail_profile = json.loads(profile["agent_profile"])
- profile.update(detail_profile)
- # 去除原始字段
- profile.pop("agent_profile", None)
- return profile
- def get_staff_profile_v3(self, staff_id) -> Dict:
- sql = (
- f"SELECT agent_profile "
- f"FROM {self.staff_table} WHERE third_party_user_id = '{staff_id}'"
- )
- data = self.db.select(sql)
- if not data:
- logger.error(f"staff[{staff_id}] not found")
- return {}
- profile_str = data[0][0]
- if not profile_str:
- return {}
- profile = json.loads(profile_str)
- return profile
- def save_staff_profile(self, staff_id: str, profile: Dict):
- # 正常情况下不应该有此操作
- if not self.staff_table:
- raise Exception("staff_table is not set")
- if not staff_id:
- raise Exception("Invalid staff_id: {}".format(staff_id))
- sql = f"UPDATE {self.staff_table} SET agent_profile = %s WHERE third_party_user_id = '{staff_id}'"
- self.db.execute(sql, (json.dumps(profile),))
- def list_users(self, **kwargs) -> List[Dict]:
- user_union_id = kwargs.get("user_union_id", None)
- user_name = kwargs.get("user_name", None)
- if not user_union_id and not user_name:
- raise Exception("user_union_id or user_name is required")
- sql = f"SELECT third_party_user_id, wxid, name, iconurl, gender FROM {self.table_name} WHERE 1=1 "
- if user_name:
- sql += f"AND name = '{user_name}' COLLATE utf8mb4_bin "
- if user_union_id:
- sql += f"AND wxid = '{user_union_id}' "
- data = self.db.select(sql, pymysql.cursors.DictCursor)
- return data
- def get_staff_sessions(
- self,
- staff_id,
- page_id: int = 1,
- page_size: int = 10,
- session_type: str = "default",
- ) -> List[Dict]:
- """
- :param page_size:
- :param page_id:
- :param session_type:
- :param staff_id:
- :return:
- """
- match session_type:
- case "active":
- sql = f"""
- select staff_id, current_state, user_id
- from agent_state
- where staff_id = %s and update_timestamp >= DATE_SUB(NOW(), INTERVAL 2 HOUR)
- order by update_timestamp desc;
- """
- case "human_intervention":
- sql = f"""
- select staff_id, current_state, user_id
- from agent_state
- where staff_id = %s and current_state = 5 order by update_timestamp desc;
- """
- case _:
- sql = f"""
- select t1.staff_id, t1.current_state, t1.user_id, t2.name, t2.iconurl
- from agent_state t1 join third_party_user t2
- on t1.user_id = t2.third_party_user_id
- where t1.staff_id = %s
- order by
- IF(t1.current_state = 5, 0, 1),
- t1.update_timestamp desc
- limit {page_size + 1} offset {page_size * (page_id - 1)};
- """
- staff_sessions = self.db.select(
- sql, cursor_type=pymysql.cursors.DictCursor, args=(staff_id,)
- )
- return staff_sessions
- def get_staff_sessions_summary_v1(
- self, staff_id, page_id: int, page_size: int, status: int
- ) -> Dict:
- """
- :param status: staff status(0: unemployed, 1: employed)
- :param staff_id: staff
- :param page_id: page id
- :param page_size: page size
- :return:
- :todo: 未使用 Mysql 连接池,每次查询均需要与 MySQL 建立连接,性能较低,需要优化
- """
- if not staff_id:
- get_staff_query = f"""
- select third_party_user_id, name from {self.staff_table} where status = %s
- limit %s offset %s;
- """
- staff_id_list = self.db.select(
- sql=get_staff_query,
- cursor_type=pymysql.cursors.DictCursor,
- args=(status, page_size + 1, (page_id - 1) * page_size),
- )
- if not staff_id_list:
- return {}
- if len(staff_id_list) > page_size:
- has_next_page = True
- next_page_id = page_id + 1
- staff_id_list = staff_id_list[:page_size]
- else:
- has_next_page = False
- next_page_id = None
- else:
- get_staff_query = f"""
- select third_party_user_id, name from {self.staff_table}
- where status = %s and third_party_user_id = %s;
- """
- staff_id_list = self.db.select(
- sql=get_staff_query,
- cursor_type=pymysql.cursors.DictCursor,
- args=(status, staff_id),
- )
- if not staff_id_list:
- return {}
- has_next_page = False
- next_page_id = None
- response_data = [
- {
- "staff_id": staff["third_party_user_id"],
- "staff_name": staff["name"],
- "active_sessions": len(
- self.get_staff_sessions(
- staff["third_party_user_id"], session_type="active"
- )
- ),
- "human_intervention_sessions": len(
- self.get_staff_sessions(
- staff["third_party_user_id"], session_type="human_intervention"
- )
- ),
- }
- for staff in staff_id_list
- ]
- return {
- "has_next_page": has_next_page,
- "next_page_id": next_page_id,
- "data": response_data,
- }
- def get_staff_session_list_v1(self, staff_id, page_id: int, page_size: int) -> Dict:
- """
- :param page_size:
- :param page_id:
- :param staff_id:
- :return:
- """
- session_list = self.get_staff_sessions(staff_id, page_id, page_size)
- if len(session_list) > page_size:
- has_next_page = True
- next_page_id = page_id + 1
- session_list = session_list[:page_size]
- else:
- has_next_page = False
- next_page_id = None
- response_data = []
- for session in session_list:
- temp_obj = {}
- user_id = session["user_id"]
- room_id = ":".join(["private", staff_id, user_id])
- select_query = f"""select content, max(sendtime) as max_timestamp from qywx_chat_history where roomid = %s;"""
- last_message = self.db.select(
- sql=select_query,
- cursor_type=pymysql.cursors.DictCursor,
- args=(room_id,),
- )
- if not last_message:
- temp_obj["message"] = ""
- temp_obj["timestamp"] = 0
- else:
- temp_obj["message"] = last_message[0]["content"]
- temp_obj["timestamp"] = last_message[0]["max_timestamp"]
- temp_obj["customer_id"] = user_id
- temp_obj["customer_name"] = session["name"]
- temp_obj["avatar"] = session["iconurl"]
- response_data.append(temp_obj)
- return {
- "staff_id": staff_id,
- "has_next_page": has_next_page,
- "next_page_id": next_page_id,
- "data": response_data,
- }
- def get_staff_list(self, page_id: int, page_size: int) -> Dict:
- """
- :param page_size:
- :param page_id:
- :return:
- """
- sql = f"""
- select t1.third_party_user_id as staff_id, t1.name as staff_name, t2.iconurl as avatar
- from qywx_employee t1 left join third_party_user t2
- on t1.third_party_user_id = t2.third_party_user_id
- limit %s offset %s;
- """
- staff_list = self.db.select(
- sql=sql,
- cursor_type=pymysql.cursors.DictCursor,
- args=(page_size + 1, page_size * (page_id - 1)),
- )
- if len(staff_list) > page_size:
- has_next_page = True
- next_page_id = page_id + 1
- staff_list = staff_list[:page_size]
- else:
- has_next_page = False
- next_page_id = None
- return {
- "has_next_page": has_next_page,
- "next_page": next_page_id,
- "data": staff_list,
- }
- def get_conversation_list_v1(
- self, staff_id: str, customer_id: str, page: Optional[int]
- ):
- """
- :param staff_id:
- :param customer_id:
- :param page: timestamp
- :return:
- """
- room_id = ":".join(["private", staff_id, customer_id])
- page_size = 20
- if not page:
- fetch_query = f"""
- select t1.sender, t2.name, t1.sendtime, t1.content, t2.iconurl, t1.msg_type
- from qywx_chat_history t1
- join third_party_user t2 on t1.sender = t2.third_party_user_id
- where roomid = %s
- order by sendtime desc
- limit %s;
- """
- messages = self.db.select(
- sql=fetch_query,
- cursor_type=pymysql.cursors.DictCursor,
- args=(room_id, page_size + 1),
- )
- else:
- fetch_query = f"""
- select t1.sender, t2.name, t1.sendtime, t1.content, t2.iconurl, t1.msg_type
- from qywx_chat_history t1
- join third_party_user t2 on t1.sender = t2.third_party_user_id
- where t1.roomid = %s and t1.sendtime <= %s
- order by sendtime desc
- limit %s;
- """
- messages = self.db.select(
- sql=fetch_query,
- cursor_type=pymysql.cursors.DictCursor,
- args=(room_id, page, page_size + 1),
- )
- if messages:
- if len(messages) > page_size:
- has_next_page = True
- next_page = messages[-1]["sendtime"]
- else:
- has_next_page = False
- next_page = None
- response_data = [
- {
- "sender_id": message["sender"],
- "sender_name": message["name"],
- "avatar": message["iconurl"],
- "content": message["content"],
- "timestamp": message["sendtime"],
- "role": "customer" if message["sender"] == customer_id else "staff",
- "message_type": message["msg_type"],
- }
- for message in messages
- ]
- return {
- "staff_id": staff_id,
- "customer_id": customer_id,
- "has_next_page": has_next_page,
- "next_page": next_page,
- "data": response_data,
- }
- else:
- has_next_page = False
- next_page = None
- return {
- "staff_id": staff_id,
- "customer_id": customer_id,
- "has_next_page": has_next_page,
- "next_page": next_page,
- "data": [],
- }
- class LocalUserRelationManager(UserRelationManager):
- def __init__(self):
- pass
- def list_staffs(self):
- return [
- {
- "third_party_user_id": "1688855931724582",
- "name": "",
- "wxid": "ShengHuoLeQu",
- "agent_name": "小芳",
- }
- ]
- def list_users(self, staff_id: str, page: int = 1, page_size: int = 100):
- return []
- def list_staff_users(self, staff_id: str = None, tag_id: int = None):
- user_ids = [
- "7881299453089278",
- "7881299453132630",
- "7881299454186909",
- "7881299455103430",
- "7881299455173476",
- "7881299456216398",
- "7881299457990953",
- "7881299461167644",
- "7881299463002136",
- "7881299464081604",
- "7881299465121735",
- "7881299465998082",
- "7881299466221881",
- "7881299467152300",
- "7881299470051791",
- "7881299470112816",
- "7881299471149567",
- "7881299471168030",
- "7881299471277650",
- "7881299473321703",
- ]
- user_ids = user_ids[:5]
- return [
- {"staff_id": "1688855931724582", "user_id": "7881299670930896"},
- *[
- {"staff_id": "1688855931724582", "user_id": user_id}
- for user_id in user_ids
- ],
- ]
- def get_user_tags(self, user_id: str):
- return []
- def stop_user_daily_push(self, user_id: str) -> bool:
- return True
- class MySQLUserRelationManager(UserRelationManager):
- def __init__(
- self,
- agent_db_config,
- wecom_db_config,
- agent_staff_table,
- agent_user_table,
- staff_table,
- relation_table,
- user_table,
- ):
- # FIXME(zhoutian): 因为现在数据库表不统一,需要从两个库读取
- self.agent_db = MySQLManager(agent_db_config)
- self.wecom_db = MySQLManager(wecom_db_config)
- self.agent_staff_table = agent_staff_table
- self.staff_table = staff_table
- self.relation_table = relation_table
- self.agent_user_table = agent_user_table
- self.user_table = user_table
- def list_staffs(self):
- sql = f"SELECT third_party_user_id, name, wxid, agent_name FROM {self.agent_staff_table} WHERE status = 1"
- data = self.agent_db.select(sql, pymysql.cursors.DictCursor)
- return data
- def list_users(self, staff_id: str, page: int = 1, page_size: int = 100):
- return []
- def list_staff_users(self, staff_id: str = None, tag_id: int = None):
- sql = f"SELECT third_party_user_id, wxid FROM {self.agent_staff_table} WHERE status = 1"
- if staff_id:
- sql += f" AND third_party_user_id = '{staff_id}'"
- agent_staff_data = self.agent_db.select(sql, pymysql.cursors.DictCursor)
- if not agent_staff_data:
- return []
- ret = []
- for agent_staff in agent_staff_data:
- wxid = agent_staff["wxid"]
- sql = f"SELECT id FROM {self.staff_table} WHERE carrier_id = '{wxid}'"
- staff_data = self.wecom_db.select(sql, pymysql.cursors.DictCursor)
- if not staff_data:
- logger.error(f"staff[{wxid}] not found in wecom database")
- continue
- staff_id = staff_data[0]["id"]
- sql = f"SELECT user_id FROM {self.relation_table} WHERE staff_id = '{staff_id}' AND is_delete = 0"
- user_data = self.wecom_db.select(sql, pymysql.cursors.DictCursor)
- if not user_data:
- logger.warning(f"staff[{wxid}] has no user")
- continue
- user_ids = tuple(user["user_id"] for user in user_data)
- sql = f"SELECT union_id FROM {self.user_table} WHERE id IN {str(user_ids)} AND union_id is not null"
- if tag_id:
- sql += f" AND id in (SELECT distinct user_id FROM we_com_user_with_tag WHERE tag_id = {tag_id} and is_delete = 0)"
- user_data = self.wecom_db.select(sql, pymysql.cursors.DictCursor)
- if not user_data:
- logger.warning(f"staff[{wxid}] users not found in wecom database")
- continue
- user_union_ids = tuple(user["union_id"] for user in user_data)
- batch_size = 500
- n_batches = (len(user_union_ids) + batch_size - 1) // batch_size
- agent_user_data = []
- for i in range(n_batches):
- idx_begin = i * batch_size
- idx_end = min((i + 1) * batch_size, len(user_union_ids))
- batch_union_ids = user_union_ids[idx_begin:idx_end]
- sql = f"SELECT third_party_user_id, wxid FROM {self.agent_user_table} WHERE wxid IN {str(batch_union_ids)}"
- batch_agent_user_data = self.agent_db.select(
- sql, pymysql.cursors.DictCursor
- )
- if len(agent_user_data) != len(batch_union_ids):
- # logger.debug(f"staff[{wxid}] some users not found in agent database")
- pass
- agent_user_data.extend(batch_agent_user_data)
- staff_user_pairs = [
- {
- "staff_id": agent_staff["third_party_user_id"],
- "user_id": agent_user["third_party_user_id"],
- }
- for agent_user in agent_user_data
- ]
- ret.extend(staff_user_pairs)
- return ret
- def get_user_union_id(self, user_id: str) -> Optional[str]:
- sql = f"SELECT wxid FROM {self.agent_user_table} WHERE third_party_user_id = '{user_id}' AND wxid is not null"
- user_data = self.agent_db.select(sql, pymysql.cursors.DictCursor)
- if not user_data:
- logger.error(f"user[{user_id}] has no union id")
- return None
- union_id = user_data[0]["wxid"]
- return union_id
- def get_user_tags(self, user_id: str) -> List[str]:
- union_id = self.get_user_union_id(user_id)
- if not union_id:
- return []
- sql = f"""
- select b.tag_id, c.`tag_name` from `we_com_user` as a
- join `we_com_user_with_tag` as b
- join `we_com_tag` as c
- on a.`id` = b.`user_id`
- and b.`tag_id` = c.id
- where a.union_id = '{union_id}' """
- tag_data = self.wecom_db.select(sql, pymysql.cursors.DictCursor)
- tag_names = [tag["tag_name"] for tag in tag_data]
- return tag_names
- def stop_user_daily_push(self, user_id: str) -> bool:
- try:
- union_id = self.get_user_union_id(user_id)
- if not union_id:
- return False
- sql = f"UPDATE {self.user_table} SET group_msg_disabled = 1 WHERE union_id = %s"
- rows = self.wecom_db.execute(sql, (union_id,))
- if rows > 0:
- return True
- else:
- return False
- except Exception as e:
- logger.error(f"stop_user_daily_push failed: {e}")
- return False
- if __name__ == "__main__":
- config = configs.get()
- user_db_config = config["storage"]["user"]
- staff_db_config = config["storage"]["staff"]
- user_manager = MySQLUserManager(
- user_db_config["mysql"], user_db_config["table"], staff_db_config["table"]
- )
- user_profile = user_manager.get_user_profile("7881301263964433")
- print(user_profile)
- wecom_db_config = config["storage"]["user_relation"]
- user_relation_manager = MySQLUserRelationManager(
- user_db_config["mysql"],
- wecom_db_config["mysql"],
- config["storage"]["staff"]["table"],
- user_db_config["table"],
- wecom_db_config["table"]["staff"],
- wecom_db_config["table"]["relation"],
- wecom_db_config["table"]["user"],
- )
- # all_staff_users = user_relation_manager.list_staff_users()
- user_tags = user_relation_manager.get_user_tags("7881302078008656")
- print(user_tags)
|