浏览代码

aiditApi.py 新增一些与 aigc 系统自动交互的功能

updatePublishedMsgDaily.py
修改 1:每天从 aigc  系统获取正在发布的账号

修改 2:status字段,当公众号接入系统时间小于 14 天的时间,将该 status  设置为 0,在晋级的时候不会使用这些文章
罗俊辉 1 年之前
父节点
当前提交
0bd80fff97
共有 3 个文件被更改,包括 158 次插入444 次删除
  1. 0 1
      applications/__init__.py
  2. 82 386
      applications/aiditApi.py
  3. 76 57
      updatePublishedMsgDaily.py

+ 0 - 1
applications/__init__.py

@@ -1,7 +1,6 @@
 """
 @author: luojunhui
 """
-from .aiditApi import AIDTApi
 from .denetMysql import DeNetMysql
 from .longArticlesMysql import longArticlesMySQL
 from .pqMysql import PQMySQL

文件差异内容过多而无法显示
+ 82 - 386
applications/aiditApi.py


+ 76 - 57
updatePublishedMsgDaily.py

@@ -11,10 +11,12 @@ import schedule
 from tqdm import tqdm
 from datetime import datetime
 
-from applications import PQMySQL, WeixinSpider, Functions, log, bot
+from applications import PQMySQL, WeixinSpider, Functions, log, bot, aiditApi
 
+ARTICLE_TABLE = "official_articles_v2"
 
-def getAccounts():
+
+def get_accounts_v1():
     """
     获取账号信息
     :return: [{}, {},...], [{}, {}, {}...]
@@ -26,15 +28,34 @@ def getAccounts():
     return subscription_account, server_account
 
 
-def insertEachMsg(db_client, gh_id, account_name, msg_list):
+def get_accounts():
+    """
+    从 aigc 数据库中获取目前处于发布状态的账号
+    :return:
+    "name": line[0],
+    "ghId": line[1],
+    "follower_count": line[2],
+    "account_init_time": int(line[3] / 1000),
+    "account_type": line[4],
+    "account_auth": line[5]
+    """
+    account_list = aiditApi.get_publish_account_from_aigc()
+    subscription_account = [i for i in account_list if i['account_type'] in {0, 1}]
+    server_account = [i for i in account_list if i['account_type'] == 2]
+    return subscription_account, server_account
+
+
+def insert_each_msg(db_client, item, account_name, msg_list):
     """
     把消息数据更新到数据库中
+    :param item:
     :param db_client:
     :param account_name:
-    :param gh_id:
     :param msg_list:
     :return:
     """
+    gh_id = item['ghId']
+    account_init_timestamp = item['account_init_timestamp']
     for info in msg_list:
         baseInfo = info.get("BaseInfo", {})
         appMsgId = info.get("AppMsg", {}).get("BaseInfo", {}).get("AppMsgId", None)
@@ -62,6 +83,7 @@ def insertEachMsg(db_client, gh_id, account_name, msg_list):
                 show_zs_count = show_stat.get("show_zs_count", 0)
                 show_pay_count = show_stat.get("show_pay_count", 0)
                 wx_sn = ContentUrl.split("&sn=")[1].split("&")[0] if ContentUrl else None
+                status = 1 if int(time.time()) - account_init_timestamp > 14 * 24 * 3600 else 0
                 info_tuple = (
                     gh_id,
                     account_name,
@@ -87,19 +109,20 @@ def insertEachMsg(db_client, gh_id, account_name, msg_list):
                     show_pay_count,
                     wx_sn,
                     json.dumps(baseInfo, ensure_ascii=False),
-                    Functions().str_to_md5(title)
+                    Functions().str_to_md5(title),
+                    status
                 )
                 try:
                     insert_sql = f"""
-                        INSERT INTO official_articles_v2
-                        (ghId, accountName, appMsgId, title, Type, createTime, updateTime, Digest, ItemIndex, ContentUrl, SourceUrl, CoverImgUrl, CoverImgUrl_1_1, CoverImgUrl_255_1, ItemShowType, IsOriginal, ShowDesc, ori_content, show_view_count, show_like_count, show_zs_count, show_pay_count, wx_sn, baseInfo, title_md5)
+                        INSERT INTO {ARTICLE_TABLE}
+                        (ghId, accountName, appMsgId, title, Type, createTime, updateTime, Digest, ItemIndex, ContentUrl, SourceUrl, CoverImgUrl, CoverImgUrl_1_1, CoverImgUrl_255_1, ItemShowType, IsOriginal, ShowDesc, ori_content, show_view_count, show_like_count, show_zs_count, show_pay_count, wx_sn, baseInfo, title_md5, status)
                         values
-                        (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
+                        (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
                         """
                     db_client.update(sql=insert_sql, params=info_tuple)
                     log(
                         task="updatePublishedMsgDaily",
-                        function="insertEachMsg",
+                        function="insert_each_msg",
                         message="插入文章数据成功",
                         data={
                             "info": info_tuple
@@ -109,7 +132,7 @@ def insertEachMsg(db_client, gh_id, account_name, msg_list):
                 except Exception as e:
                     try:
                         update_sql = f"""
-                        UPDATE official_articles_v2
+                        UPDATE {ARTICLE_TABLE}
                         SET show_view_count = %s, show_like_count=%s
                         WHERE wx_sn = %s;
                         """
@@ -117,7 +140,7 @@ def insertEachMsg(db_client, gh_id, account_name, msg_list):
                                          params=(show_view_count, show_like_count, wx_sn))
                         log(
                             task="updatePublishedMsgDaily",
-                            function="insertEachMsg",
+                            function="insert_each_msg",
                             message="更新文章数据成功",
                             data={
                                 "wxSn": wx_sn,
@@ -129,23 +152,24 @@ def insertEachMsg(db_client, gh_id, account_name, msg_list):
                     except Exception as e:
                         log(
                             task="updatePublishedMsgDaily",
-                            function="insertEachMsg",
+                            function="insert_each_msg",
                             message="更新文章失败, 报错原因是: {}".format(e),
                             status="fail"
                         )
                         continue
 
 
-def updateEachAccount(db_client, gh_id, account_name, latest_update_time, cursor=None):
+def update_each_account(db_client, item, account_name, latest_update_time, cursor=None):
     """
     更新每一个账号信息
+    :param item:
     :param account_name:
     :param cursor:
     :param latest_update_time: 最新更新时间
     :param db_client: 数据库连接信息
-    :param gh_id: 公众号 gh_id
     :return: None
     """
+    gh_id = item['ghId']
     response = WeixinSpider().update_msg_list(ghId=gh_id, index=cursor)
     msg_list = response.get("data", {}).get("data", {})
     if msg_list:
@@ -156,31 +180,31 @@ def updateEachAccount(db_client, gh_id, account_name, latest_update_time, cursor
         resdata = WeixinSpider().get_account_by_url(last_url)
         check_id = resdata['data'].get('data', {}).get('wx_gh')
         if check_id == gh_id:
-            insertEachMsg(
+            insert_each_msg(
                 db_client=db_client,
-                gh_id=gh_id,
+                item=item,
                 account_name=account_name,
                 msg_list=msg_list
             )
             if last_time_stamp_in_this_msg > latest_update_time:
                 next_cursor = response['data']['next_cursor']
-                return updateEachAccount(
+                return update_each_account(
                     db_client=db_client,
-                    gh_id=gh_id,
+                    item=item,
                     account_name=account_name,
                     latest_update_time=latest_update_time,
                     cursor=next_cursor
                 )
             log(
                 task="updatePublishedMsgDaily",
-                function="updateEachAccount",
+                function="update_each_account",
                 message="账号文章更新成功",
                 data=response
             )
     else:
         log(
             task="updatePublishedMsgDaily",
-            function="updateEachAccount",
+            function="update_each_account",
             message="账号文章更新失败",
             status="fail",
             data=response
@@ -188,7 +212,7 @@ def updateEachAccount(db_client, gh_id, account_name, latest_update_time, cursor
         return
 
 
-def checkAccountInfo(db_client, gh_id):
+def check_account_info(db_client, gh_id):
     """
     通过 gh_id查询视频信息
     :param db_client:
@@ -196,10 +220,10 @@ def checkAccountInfo(db_client, gh_id):
     :return:
     """
     sql = f"""
-        select accountName, updateTime 
-        from official_articles_v2 
-        where ghId = '{gh_id}' 
-        order by updateTime DESC;
+        SELECT accountName, updateTime 
+        FROM {ARTICLE_TABLE}
+        WHERE ghId = '{gh_id}' 
+        ORDER BY updateTime DESC;
         """
     result = db_client.select(sql)
     if result:
@@ -217,25 +241,26 @@ def checkAccountInfo(db_client, gh_id):
         }
 
 
-def updateSingleJob(db_client, gh_id):
+def update_single_account(db_client, sub_item):
     """
 
+    :param sub_item:
     :param db_client:
-    :param gh_id:
     :return:
     """
-    account_info = checkAccountInfo(db_client, gh_id)
+    gh_id = sub_item['ghId']
+    account_info = check_account_info(db_client, gh_id)
     account_name = account_info['account_name']
     update_time = account_info['update_time']
-    updateEachAccount(
+    update_each_account(
         db_client=db_client,
-        gh_id=gh_id,
+        item=sub_item,
         account_name=account_name,
         latest_update_time=update_time
     )
 
 
-def checkSingleAccount(db_client, account_item):
+def check_single_account(db_client, account_item):
     """
     校验每个账号是否更新
     :param db_client:
@@ -243,21 +268,21 @@ def checkSingleAccount(db_client, account_item):
     :return: True / False
     """
     gh_id = account_item['ghId']
-    account_type = account_item['type']
+    account_type = account_item['account_type']
     today_str = datetime.today().strftime("%Y-%m-%d")
     today_date_time = datetime.strptime(today_str, "%Y-%m-%d")
     today_timestamp = today_date_time.timestamp()
     sql = f"""
-                select updateTime
-                from official_articles_v2
-                where ghId = '{gh_id}'
-                order by updateTime
-                desc;
+            SELECT updateTime
+            FROM {ARTICLE_TABLE}
+            WHERE ghId = '{gh_id}'
+            ORDER BY updateTime
+            DESC;
             """
     try:
         latest_update_time = db_client.select(sql)[0][0]
         # 判断该账号当天发布的文章是否被收集
-        if account_type == "订阅号":
+        if account_type in {0, 1}:
             if int(latest_update_time) > int(today_timestamp):
                 return True
             else:
@@ -272,31 +297,31 @@ def checkSingleAccount(db_client, account_item):
         return False
 
 
-def updateJob():
+def update_job():
     """
     更新任务
     :return:
     """
     db_client = PQMySQL()
-    sub_accounts, server_accounts = getAccounts()
+    sub_accounts, server_accounts = get_accounts()
     s_count = 0
     f_count = 0
     for sub_item in tqdm(sub_accounts):
         try:
-            updateSingleJob(db_client, sub_item['ghId'])
+            update_single_account(db_client, sub_item)
             s_count += 1
             time.sleep(5)
         except Exception as e:
             f_count += 1
             log(
                 task="updatePublishedMsgDaily",
-                function="updateJob",
+                function="update_job",
                 message="单个账号文章更新失败, 报错信息是: {}".format(e),
                 status="fail",
             )
     log(
         task="updatePublishedMsgDaily",
-        function="updateJob",
+        function="update_job",
         message="订阅号更新完成",
         data={
             "success": s_count,
@@ -316,30 +341,30 @@ def updateJob():
 
     for sub_item in tqdm(server_accounts):
         try:
-            updateSingleJob(db_client, sub_item['ghId'])
+            update_single_account(db_client, sub_item)
             time.sleep(5)
         except Exception as e:
             print(e)
 
 
-def checkJob():
+def check_job():
     """
     校验任务
     :return:
     """
     db_client = PQMySQL()
-    sub_accounts, server_accounts = getAccounts()
+    sub_accounts, server_accounts = get_accounts()
     fail_list = []
     # account_list = sub_accounts + server_accounts
     account_list = sub_accounts
     # check and rework if fail
     for sub_item in tqdm(account_list):
-        res = checkSingleAccount(db_client, sub_item)
+        res = check_single_account(db_client, sub_item)
         if not res:
-            updateSingleJob(db_client, sub_item['ghId'])
+            update_single_account(db_client, sub_item)
     # check whether success and bot if fails
     for sub_item in tqdm(account_list):
-        res = checkSingleAccount(db_client, sub_item)
+        res = check_single_account(db_client, sub_item)
         if not res:
             fail_list.append(sub_item)
     if fail_list:
@@ -363,16 +388,10 @@ def job_with_thread(job_func):
 
 
 if __name__ == '__main__':
-    schedule.every().day.at("20:50").do(job_with_thread, updateJob)
+    schedule.every().day.at("20:50").do(job_with_thread, update_job)
 
-    schedule.every().day.at("21:45").do(job_with_thread, checkJob)
+    schedule.every().day.at("21:45").do(job_with_thread, check_job)
 
     while True:
         schedule.run_pending()
         time.sleep(1)
-        # log(
-        #     task="updatePublishedMsgDaily",
-        #     function="main",
-        #     message="更新公众号文章信息任务正常执行"
-        # )
-

部分文件因为文件数量过多而无法显示