瀏覽代碼

Update user_manager: add UserRelationManager

StrayWarrior 2 周之前
父節點
當前提交
2bbdcd6764
共有 1 個文件被更改,包括 90 次插入4 次删除
  1. 90 4
      user_manager.py

+ 90 - 4
user_manager.py

@@ -2,7 +2,7 @@
 # -*- coding: utf-8 -*-
 # vim:fenc=utf-8
 import logging
-from typing import Dict, Optional, Tuple, Any
+from typing import Dict, Optional, Tuple, Any, List
 import json
 import time
 import os
@@ -64,6 +64,10 @@ class UserRelationManager(abc.ABC):
     def list_users(self, staff_id: str, page: int = 1, page_size: int = 100):
         pass
 
+    @abc.abstractmethod
+    def list_staff_users(self) -> List[Dict]:
+        pass
+
 class LocalUserManager(UserManager):
     def get_user_profile(self, user_id) -> Dict:
         """加载用户个人资料,如不存在则创建默认资料。主要用于本地调试"""
@@ -120,8 +124,90 @@ class MySQLUserManager(UserManager):
         data = self.db.select(sql, pymysql.cursors.DictCursor)
         return [user['third_party_user_id'] for user in data]
 
+
+class MySQLUserRelationManager(UserRelationManager):
+    def __init__(self, agent_db_config, wecom_db_config,
+                 agent_staff_table, agent_user_table,
+                 staff_table, relation_table, user_table):
+        # FIXME(zhoutian): 因为现在数据库表不统一,需要从两个库读取
+        self.agent_db = MySQLManager(agent_db_config)
+        self.wecom_db = MySQLManager(wecom_db_config)
+        self.agent_staff_table = agent_staff_table
+        self.staff_table = staff_table
+        self.relation_table = relation_table
+        self.agent_user_table = agent_user_table
+        self.user_table = user_table
+
+    def list_staffs(self):
+        return []
+
+    def list_users(self, staff_id: str, page: int = 1, page_size: int = 100):
+        return []
+
+    def list_staff_users(self):
+        sql = f"SELECT third_party_user_id, wxid FROM {self.agent_staff_table} WHERE status = 1"
+        agent_staff_data = self.agent_db.select(sql, pymysql.cursors.DictCursor)
+        if not agent_staff_data:
+            return []
+        ret = []
+        for agent_staff in agent_staff_data:
+            wxid = agent_staff['wxid']
+            sql = f"SELECT id FROM {self.staff_table} WHERE carrier_id = '{wxid}'"
+            staff_data = self.wecom_db.select(sql, pymysql.cursors.DictCursor)
+            if not staff_data:
+                logging.error(f"staff[{wxid}] not found in wecom database")
+                continue
+            staff_id = staff_data[0]['id']
+            sql = f"SELECT user_id FROM {self.relation_table} WHERE staff_id = '{staff_id}' AND is_delete = 0"
+            user_data = self.wecom_db.select(sql, pymysql.cursors.DictCursor)
+            if not user_data:
+                logging.warning(f"staff[{wxid}] has no user")
+                continue
+            user_ids = tuple(user['user_id'] for user in user_data)
+            sql = f"SELECT union_id FROM {self.user_table} WHERE id IN {str(user_ids)}"
+            user_data = self.wecom_db.select(sql, pymysql.cursors.DictCursor)
+            if not user_data:
+                logging.error(f"staff[{wxid}] users not found in wecom database")
+                continue
+            user_union_ids = tuple(user['union_id'] for user in user_data)
+            batch_size = 100
+            n_batches = (len(user_union_ids) + batch_size - 1) // batch_size
+            agent_user_data = []
+            for i in range(n_batches):
+                idx_begin = i * batch_size
+                idx_end = min((i + 1) * batch_size, len(user_union_ids))
+                batch_union_ids = user_union_ids[idx_begin:idx_end]
+                sql = f"SELECT third_party_user_id, wxid FROM {self.agent_user_table} WHERE wxid IN {str(batch_union_ids)}"
+                batch_agent_user_data = self.agent_db.select(sql, pymysql.cursors.DictCursor)
+                if len(agent_user_data) != len(batch_union_ids):
+                    logging.error(f"staff[{wxid}] some users not found in agent database")
+                agent_user_data.extend(batch_agent_user_data)
+            staff_user_pairs = [
+                {
+                    'staff_id': agent_staff['third_party_user_id'],
+                    'user_id': agent_user['third_party_user_id']
+                }
+                for agent_user in agent_user_data
+            ]
+            ret.extend(staff_user_pairs)
+        return ret
+
+
 if __name__ == '__main__':
-    db_config = configs.get()['storage']['user']
-    user_manager = MySQLUserManager(db_config['mysql'], db_config['table'])
+    config = configs.get()
+    user_db_config = config['storage']['user']
+    user_manager = MySQLUserManager(user_db_config['mysql'], user_db_config['table'])
     user_profile = user_manager.get_user_profile('7881301263964433')
-    print(user_profile)
+    print(user_profile)
+
+    wecom_db_config = config['storage']['user_relation']
+    user_relation_manager = MySQLUserRelationManager(
+        user_db_config['mysql'], wecom_db_config['mysql'],
+        config['storage']['staff']['table'],
+        user_db_config['table'],
+        wecom_db_config['table']['staff'],
+        wecom_db_config['table']['relation'],
+        wecom_db_config['table']['user']
+    )
+    all_staff_users = user_relation_manager.list_staff_users()
+    print(all_staff_users)