Просмотр исходного кода

更新
1. 每日更新小程序表现特征任务
2. 每日抓取当天发布文章任务,直接请求pw 接口,增加一轮校验
3. 将广告数据库替换为长文数据库

罗俊辉 7 месяцев назад
Родитель
Сommit
714d51efb5

+ 1 - 1
applications/__init__.py

@@ -3,7 +3,7 @@
 """
 from .aiditApi import AIDTApi
 from .denetMysql import DeNetMysql
-from .adMysql import AdMySQL
+from .longArticlesMysql import longArticlesMySQL
 from .pqMysql import PQMySQL
 from .functions import Functions
 from .data_works import ODPSApi

+ 12 - 22
applications/functions.py

@@ -1,6 +1,7 @@
 """
 @author: luojunhui
 """
+import threading
 from datetime import datetime, timezone
 import hashlib
 import requests
@@ -58,27 +59,6 @@ class Functions(object):
             L.append([account, account_score])
         return L
 
-    @classmethod
-    def matchLinkById(cls, channel_content_id):
-        """
-        Use channelContentId to match articleUrl
-        :param channel_content_id:
-        :return:
-        """
-        connection = pymysql.connect(
-            host='rm-bp12k5fuh5zyx31d28o.mysql.rds.aliyuncs.com',
-            port=3306,
-            user='wx2023_ad',
-            password='wx2023_adP@assword1234',
-            db='adplatform',
-            charset='utf8mb4'
-        )
-        sql = f"""select account_id, link, item_index from changwen_article where id = '{channel_content_id}';"""
-        cursor = connection.cursor()
-        cursor.execute(sql)
-        article_link = cursor.fetchone()
-        return article_link
-
     @classmethod
     def matchLinkByIdTuple(cls, channel_id_tuple):
         """
@@ -219,4 +199,14 @@ class Functions(object):
         """
         dt_object = datetime.utcfromtimestamp(timestamp).replace(tzinfo=timezone.utc).astimezone()
         date_string = dt_object.strftime('%Y-%m-%d %H:%M:%S')
-        return date_string
+        return date_string
+
+    @classmethod
+    def job_with_thread(cls, job_func):
+        """
+        每个任务放到单个线程中
+        :param job_func:
+        :return:
+        """
+        job_thread = threading.Thread(target=job_func)
+        job_thread.start()

+ 5 - 5
applications/adMysql.py → applications/longArticlesMysql.py

@@ -4,16 +4,16 @@
 import pymysql
 
 
-class AdMySQL(object):
+class longArticlesMySQL(object):
     """
     PQ Mysql
     """
     connection = pymysql.connect(
-        host='rm-bp12k5fuh5zyx31d28o.mysql.rds.aliyuncs.com',
+        host='rm-bp14529nwwcw75yr1ko.mysql.rds.aliyuncs.com',
         port=3306,
-        user='wx2023_ad',
-        password='wx2023_adP@assword1234',
-        db='adplatform',
+        user='changwen_admin',
+        password='changwen@123456',
+        db='long_articles',
         charset='utf8mb4'
     )
 

+ 9 - 9
applications/wxSpiderApi.py

@@ -11,6 +11,9 @@ class WeixinSpider(object):
     """
     Update account articles
     """
+    # ip = "8.217.190.241"
+    ip = "47.98.154.124"
+    port = "8888"
 
     @classmethod
     @retryOnNone()
@@ -19,7 +22,7 @@ class WeixinSpider(object):
         search articles in wx
         :return:
         """
-        url = "http://8.217.190.241:8888/crawler/wei_xin/keyword"
+        url = "http://{}:{}/crawler/wei_xin/keyword".format(cls.ip, cls.port)
         payload = json.dumps({
             "keyword": title,
             "cursor": "1"
@@ -39,7 +42,7 @@ class WeixinSpider(object):
         :param content_link:
         :return:
         """
-        url = "http://47.98.154.124:8888/crawler/wei_xin/detail"
+        url = "http://{}:{}/crawler/wei_xin/detail".format(cls.ip, cls.port)
         payload = json.dumps({
             "content_link": content_link,
             "is_count": False,
@@ -48,10 +51,7 @@ class WeixinSpider(object):
         headers = {
             'Content-Type': 'application/json'
         }
-        # print(url)
-        # print(payload)
         response = requests.request("POST", url, headers=headers, data=payload)
-        print("info", response.text)
         return response.json()
 
     @classmethod
@@ -60,12 +60,11 @@ class WeixinSpider(object):
         """
         :return:
         """
-        url = 'http://47.98.154.124:8888/crawler/wei_xin/blogger'
+        url = 'http://{}:{}/crawler/wei_xin/blogger'.format(cls.ip, cls.port)
         payload = {
             'account_id': ghId,
             'cursor': index,
         }
-        print(payload)
         headers = {
             'Content-Type': 'application/json'
         }
@@ -82,8 +81,9 @@ class WeixinSpider(object):
         """
         response = requests.request(
             "POST",
-            url='http://47.98.154.124:8888/crawler/wei_xin/account_info',
+            url='http://{}:{}/crawler/wei_xin/account_info'.format(cls.ip, cls.port),
             headers={'Content-Type': 'application/json'},
             json={"content_link": content_url}
         )
-        return response.json()
+        return response.json()
+

+ 48 - 0
coldStartTasks/publishAccountAssociationArticles.py

@@ -0,0 +1,48 @@
+"""
+@author: luojunhui
+发布i2u2i文章
+"""
+import pandas as pd
+from applications import DeNetMysql
+
+
+class I2U2I(object):
+    """
+    发布账号联想文章
+    """
+    db = DeNetMysql()
+
+    @classmethod
+    def getAccountPositionArticles(cls, gh_id, position):
+        """
+        获取联想账号的某个位置的所有文章
+        :return:
+        """
+        sql = f"""
+            select title, read_cnt, link 
+            from crawler_meta_article
+            where out_account_id = '{gh_id}' and article_index = {position};
+        """
+        article_list = cls.db.select(sql)
+        # df = pd.DataFrame(article_list, columns=['title', 'read_cnt', 'link'])
+        # read_mean = df['read_cnt'].mean()
+        # filter_response = df[
+        #     (df['read_cnt'] > read_mean * 1.3)
+        #     & (df['read_cnt'] > 5000)
+        #     ]
+        # return filter_response
+        return article_list
+
+    @classmethod
+    def filter(cls):
+        """
+        :return:
+        """
+        return
+
+if __name__ == '__main__':
+    job = I2U2I()
+    article_list = job.getAccountPositionArticles(gh_id='gh_e6be5a12e83c', position=1)
+    for article in article_list:
+        print(article)
+

+ 4 - 0
coldStartTasks/publishArticleAssociationArticles.py

@@ -0,0 +1,4 @@
+"""
+@author: luojunhui
+发布i2i文章
+"""

+ 4 - 0
coldStartTasks/publishCategoryArticles.py

@@ -0,0 +1,4 @@
+"""
+@author: luojunhui
+品类文章发布到aigc系统
+"""

+ 30 - 116
dev/hurry_up.py

@@ -1,6 +1,6 @@
 import json
 
-import pandas as pd
+from concurrent.futures.thread import ThreadPoolExecutor
 from tqdm import tqdm
 from datetime import datetime, timedelta
 
@@ -20,15 +20,9 @@ class DailyDataManager(object):
         获取已经发布的文章的信息
         :return:
         """
-        # sql = f"""
-        # SELECT ContentUrl, wx_sn, createTime
-        # FROM official_articles_v2
-        # WHERE createTime > 1719763200;
-        # """
         sql2 = f"""
         select ContentUrl, wx_sn, createTime from official_articles_v2 where createTime > 1719763200 and accountName in (
-    select distinct account_name from account_avg_info_v2
-    );
+        select distinct account_name from account_avg_info_v2);
         """
         result_list = cls.pq_mysql.select(sql2)
         return result_list
@@ -43,7 +37,6 @@ class DailyDataManager(object):
         article_detail = cls.wx_spider.get_article_text(url)
         print(url)
         print(article_detail)
-        # print(json.dumps(article_detail, ensure_ascii=False, indent=4))
         mini_info = article_detail['data']['data']['mini_program']
         return data_info[1].decode(), mini_info, data_info[2]
 
@@ -56,7 +49,7 @@ class DailyDataManager(object):
         """
         sql = f"""
         select type, machinecode, create_time, first_level_dt 
-        from changwen_data_base
+        from changwen_data_base_v2
         where rootsourceid = '{rootSourceId}';
         """
         result_list = cls.ad_mysql.select(sql)
@@ -72,22 +65,30 @@ class DailyDataManager(object):
             for line in values:
                 # 先统计首层
                 if line[0] == '首层':
-                    c_time = line[-2].__str__().split(" ")[0]
-                    if first_level.get(c_time):
-                        first_level[c_time].add(line[1])
-                    else:
-                        first_level[c_time] = {line[1]}
+                    try:
+                        dt = str(line[-1])
+                        key_dt = datetime.strptime(dt, '%Y%m%d').strftime('%Y-%m-%d')
+                        if first_level.get(key_dt):
+                            first_level[key_dt].add(line[1])
+                        else:
+                            first_level[key_dt] = {line[1]}
+                    except Exception as e:
+                        continue
                 else:
-                    dt = str(line[-1])
-                    first_level_dt = datetime.strptime(dt, '%Y%m%d')
-                    create_level_dt = line[-2]
-                    delta = create_level_dt - first_level_dt
-                    days = int(delta.days)
-                    key_dt = datetime.strptime(dt, '%Y%m%d').strftime('%Y-%m-%d')
-                    if fission_level.get(key_dt):
-                        fission_level[key_dt].append((line[1], days))
-                    else:
-                        fission_level[key_dt] = [(line[1], days)]
+                    try:
+                        dt = str(line[-1])
+                        first_level_dt = datetime.strptime(dt, '%Y%m%d')
+                        create_level_dt = line[-2]
+                        delta = create_level_dt - first_level_dt
+                        days = int(delta.days)
+                        key_dt = datetime.strptime(dt, '%Y%m%d').strftime('%Y-%m-%d')
+                        if fission_level.get(key_dt):
+                            fission_level[key_dt].append((line[1], days))
+                        else:
+                            fission_level[key_dt] = [(line[1], days)]
+                    except Exception as e:
+                        continue
+                        # print("first level dt is NULL")
 
             tt = {}
             for key in fission_level:
@@ -134,7 +135,7 @@ class DailyDataManager(object):
         """
         sql = f"""
         select distinct root_source_id
-        from long_articles_detail_info
+        from long_articles_detail_info;
         """
         source_id_list = cls.pq_mysql.select(sql)
         for item in tqdm(source_id_list):
@@ -166,96 +167,9 @@ class DailyDataManager(object):
                 print(e)
 
 
+
 if __name__ == '__main__':
     DM = DailyDataManager()
-    # DM.updateDetail()
-    publishArticles = DM.getPublishedArticles()
-    print(len(publishArticles))
-    for line in tqdm(publishArticles):
-        try:
-            wx_sn, mini_info, create_time = DM.getRootSourceIds(line)
-            dt_object = datetime.fromtimestamp(create_time)
-            publish_dt = dt_object.strftime('%Y-%m-%d')
-            one_day = timedelta(days=1)
-            two_day = timedelta(days=2)
-            next_day = dt_object + one_day
-            next_next_day = dt_object + two_day
-            recall_dt_list = [dt_object, next_day, next_next_day]
-            recall_dt_str_list = [i.strftime('%Y-%m-%d') for i in recall_dt_list]
-            for dt_str in recall_dt_str_list:
-                for index, item in enumerate(mini_info, 1):
-                    image_url = item['image_url']
-                    nick_name = item['nike_name']
-                    root_source_id = item['path'].split("rootSourceId%3D")[-1]
-                    video_id = item['path'].split("videos%3Fid%3D")[1].split("%26su%3D")[0]
-                    kimi_title = item['title']
-                    insert_sql = f"""
-                        INSERT INTO long_articles_detail_info
-                        (wx_sn, mini_title, mini_name, cover_url, video_index, root_source_id, video_id, publish_dt, recall_dt)
-                        values
-                        (%s, %s, %s, %s, %s, %s, %s, %s, %s);
-                    """
-                    DM.pq_mysql.update(
-                        sql=insert_sql,
-                        params=(
-                            wx_sn,
-                            kimi_title,
-                            nick_name,
-                            image_url,
-                            index,
-                            root_source_id,
-                            video_id,
-                            publish_dt,
-                            dt_str
-                        )
-                    )
-        except Exception as e:
-            print(e)
-            pass
-
-
-
-
-
-    # for line in DailyIds:
-    #     try:
-    #         source_id_tuple = DM.getRootSourceIds(trace_id=line)
-    #         result = DM.getMinigramInfo(source_id_tuple)
-    #         print(line)
-    #         print(result)
-    #         print("\n")
-    #     except Exception as e:
-    #         print(e)
-    #     print(line)
-    # L = {}
-    # trace_id = "search-a9bb246a-57fa-49f4-88d7-eec575813130-1723608633"
-    # source_id_tuple = DM.getRootSourceIds(trace_id=trace_id)
-    # result = DM.getMinigramInfo(source_id_tuple)
+    # result = DM.getMinigramInfo("longArticles_d409f27d9d64501d6811b47a3779d2d7")
     # print(result)
-    # for t_id in tqdm(DailyIds):
-    #     s_tuple = DM.getRootSourceIds(trace_id=t_id)
-    #     first_, fission_ = DM.getMinigramInfo(s_tuple)
-    #     obj = {
-    #         "first_": first_,
-    #         "fission_": fission_,
-    #         "rate": fission_ / first_ if first_ > 0 else 0
-    #     }
-    #     L[t_id] = obj
-    # Df = []
-    # with open("t.json", encoding="utf-8") as f:
-    #     L = json.loads(f.read())
-    # for key in L:
-    #     print(key)
-    #     value = L[key]
-    #     result = DM.getArticleInfo(trace_id=key)
-    #     account_name, article_title = result
-    #     temp = [
-    #         account_name,
-    #         article_title,
-    #         value['first_'],
-    #         value['fission_'],
-    #         value['rate']
-    #     ]
-    #     Df.append(temp)
-    # df = pd.DataFrame(Df, columns=['account_name', 'article_title', 'first_', 'fission_', 'rate0'])
-    # df.to_excel("0825.xlsx", index=False)
+    # DM.updateDetail()

+ 7 - 39
dev/test.py

@@ -1,44 +1,12 @@
 import json
+import requests
 
-with open("pool_.txt", encoding="utf-8") as f:
-    data = f.readlines()
+from applications import WeixinSpider
 
-L = {}
-for line in data[1:]:
-    line = line.strip().split("\t")
-    # print(line)
-    gh_id = line[3]
-    position = line[5]
-    account_level = line[-2]
-    if account_level == "一级":
-        acc_l = "1"
-    elif account_level == "二级":
-        acc_l = "2"
-    elif account_level == "三级":
-        acc_l = "3"
-    else:
-        continue
-    key = "{}_{}".format(gh_id, position)
-    print(gh_id, position, account_level)
-    L[key] = acc_l
 
+w = WeixinSpider()
 
-#     account_name = line[0]
-#     gh_id = line[1]
-#     fans = int(line[2])
-#     level = line[3]
-#     read_avg = float(line[4])
-#     like_avg = float(line[5])
-#     key = "{}_{}".format(gh_id, level)
-#     obj = {
-#         "accountName": account_name,
-#         "ghId": gh_id,
-#         "fans": fans,
-#         "position": level,
-#         "readAvg": read_avg,
-#         "likeAvg": like_avg
-#     }
-#     L[key] = obj
-#
-with open("pool_detail.json", "w", encoding="utf-8") as f:
-    f.write(json.dumps(L, ensure_ascii=False, indent=4))
+response = w.get_article_text(
+    "http://mp.weixin.qq.com/s?__biz=MzkzNjYxMjY0Nw==&mid=2247494372&idx=5&sn=18c437aa9bb3da67a3ce6665872fae58&chksm=c3a4dc202b0ee83d74f9bc678e6c97ff0ea4e9b4b82c158d366515c9d96e8f468f7948f945e0&scene=126&sessionid=1679649075#rd")
+
+print(response)

+ 0 - 31
longArticleJob.py

@@ -1,31 +0,0 @@
-"""
-@author: luojunhui
-"""
-from tasks import *
-
-
-class Job(object):
-    """
-    Long articles job
-    """
-    @classmethod
-    def initColdPool(cls):
-        """
-        每天初始化冷启动池
-        :return:
-        """
-        cold_pool = ColdStartPool()
-        cold_pool.deal()
-
-    @classmethod
-    def sendToLevel3(cls):
-        """
-        按照配比,将数据抓取计划发不到第三层
-        :return:
-        """
-        cold_start = ColdStartTask()
-        cold_start.sendToColdPool()
-
-
-
-

+ 4 - 4
tasks/task2.py

@@ -69,7 +69,6 @@ class ColdStartTask(object):
         :return:
         """
 
-
     @classmethod
     def splitCategoryToAccount(cls, cate_list):
         """
@@ -117,7 +116,8 @@ class ColdStartTask(object):
         L_map = {}
         for account_id in account_list:
             account_name = account_dict[account_id]
-            score_list = cls.Fun.getTitleScore(title_list=title_list, account_name=account_name)[account_name]['score_list']
+            score_list = cls.Fun.getTitleScore(title_list=title_list, account_name=account_name)[account_name][
+                'score_list']
             for index, score in enumerate(score_list):
                 channel_content_id = cate_list[index]['id']
                 item = tuple([account_id, score])
@@ -160,7 +160,8 @@ class ColdStartTask(object):
             (%s, %s, %s);
             """
             try:
-                PQMySQL.update(sql=insert_sql, params=(account, date_str, json.dumps(account_article_dict[account], ensure_ascii=False)))
+                PQMySQL.update(sql=insert_sql, params=(
+                account, date_str, json.dumps(account_article_dict[account], ensure_ascii=False)))
             except Exception as e:
                 print("插入出现问题----{}".format(e))
 
@@ -361,4 +362,3 @@ class ColdStartTask(object):
 if __name__ == '__main__':
     CT = ColdStartTask()
     CT.sendToColdPool()
-

+ 0 - 271
tasks/task2_dev.py

@@ -1,271 +0,0 @@
-"""
-@author: luojunhui
-"""
-import datetime
-import json
-
-from tqdm import tqdm
-
-from applications import AIDTApi, DeNetMysql, PQMySQL, Functions, WeixinSpider
-from config import cateMap, coldPoolArticlesNum, accountBaseInfo
-
-
-class ColdStartTask(object):
-    """
-    冷启分配任务
-    """
-    AidApi = AIDTApi()
-    DeMysql = DeNetMysql()
-    PqMysql = PQMySQL()
-    Fun = Functions()
-    Spider = WeixinSpider()
-    pool3 = "autoArticlePoolLevel3"
-
-    @classmethod
-    def generate_account_dict(cls):
-        """
-        生成account_list
-        :return:
-        """
-        account_dict = {}
-        for key in accountBaseInfo:
-            account_name = accountBaseInfo[key]['accountName']
-            account_gh_id = accountBaseInfo[key]['ghId']
-            account_dict[account_gh_id] = account_name
-        return account_dict
-
-    @classmethod
-    def usedArticle(cls, content_id):
-        """
-        已经使用的文章,把文章状态改为0
-        :return:
-        """
-        print("useful", content_id)
-
-    @classmethod
-    def badArticle(cls, content_id):
-        """
-        低分的文章,把文章状态改为2
-        :return:
-        """
-        print("bad", content_id)
-
-    @classmethod
-    def getTopArticles(cls, category, limit_count):
-        """
-        获取高分享的文章list
-        :return:
-        """
-        sql = f"""
-            select content_channel_id, content_link, title 
-            from cold_start_article_pool
-            where category = '{category}' and status = 1
-            order by view_count DESC, publish_time_stamp DESC
-            limit {limit_count};
-        """
-        result = cls.PqMysql.select(sql)
-        return result
-
-    @classmethod
-    def splitCategoryToAccount(cls, cate_list):
-        """
-        split articles to each account
-        :return:
-        """
-        account_index_info = {
-            "gh_058e41145a0c": 30,
-            "gh_0e4fd9e88386": 30,
-            "gh_744cb16f6e16": 30,
-            "gh_ac43eb24376d": 30,
-            "gh_970460d9ccec": 30,
-            "gh_56ca3dae948c": 30,
-            "gh_c91b42649690": 30,
-            "gh_6d205db62f04": 30,
-            "gh_e24da99dc899": 30,
-            "gh_4c058673c07e": 30,
-            "gh_03d32e83122f": 30,
-            "gh_c69776baf2cd": 30,
-            "gh_30816d8adb52": 30,
-            "gh_789a40fe7935": 30,
-            "gh_95ed5ecf9363": 30,
-            "gh_3e91f0624545": 30,
-            "gh_57573f01b2ee": 30,
-            "gh_9877c8541764": 30,
-            "gh_6cfd1132df94": 30,
-            "gh_008ef23062ee": 30,
-            "gh_5ae65db96cb7": 30,
-            "gh_be8c29139989": 30,
-            "gh_51e4ad40466d": 30,
-            "gh_d4dffc34ac39": 30,
-            "gh_89ef4798d3ea": 30,
-            "gh_b15de7c99912": 30,
-            "gh_9f8dc5b0c74e": 30,
-            "gh_7b4a5f86d68c": 30,
-            "gh_c5cdf60d9ab4": 5,
-            "gh_0c89e11f8bf3": 5,
-            "gh_e0eb490115f5": 5,
-            "gh_a2901d34f75b": 5,
-            "gh_d5f935d0d1f2": 30
-        }
-        account_dict = cls.generate_account_dict()
-        account_list = list(account_index_info.keys())
-        title_list = [i['title'] for i in cate_list]
-        L_map = {}
-        for account_id in account_list:
-            account_name = account_dict[account_id]
-            score_list = cls.Fun.getTitleScore(title_list=title_list, account_name=account_name)[account_name]['score_list']
-            for index, score in enumerate(score_list):
-                channel_content_id = cate_list[index]['id']
-                item = tuple([account_id, score])
-                if L_map.get(channel_content_id):
-                    L_map[channel_content_id].append(item)
-                else:
-                    L_map[channel_content_id] = [item]
-        for key in L_map:
-            L_map[key] = sorted(L_map[key], reverse=True, key=lambda x: x[1])
-
-        content_account = []
-        for item in cate_list:
-            content_id = item['id']
-            account_list = L_map[content_id]
-            for account_tuple in account_list:
-                gh_id, score = account_tuple[0], account_tuple[1]
-                if account_index_info[gh_id] > 0:
-                    sub_item = tuple([content_id, gh_id, score])
-                    content_account.append(sub_item)
-                    account_index_info[gh_id] -= 1
-                    break
-        # return content_account
-        account_article_dict = {}
-        for item in content_account:
-            content_id, gh_id, score = item
-            sub_i = tuple([content_id, score])
-            if account_article_dict.get(gh_id):
-                account_article_dict[gh_id].append(sub_i)
-            else:
-                account_article_dict[gh_id] = [sub_i]
-
-        for account in tqdm(account_article_dict):
-            date_str = datetime.datetime.today().strftime("%Y-%m-%d")
-            print(account)
-            print(account_article_dict[account])
-            # insert_sql = f"""
-            # INSERT INTO article_pre_distribute_account
-            # (gh_id, date, article_list)
-            # VALUES
-            # (%s, %s, %s);
-            # """
-            # try:
-            #     PQMySQL.update(sql=insert_sql, params=(
-            #     account, date_str, json.dumps(account_article_dict[account], ensure_ascii=False)))
-            # except Exception as e:
-            #     print("插入出现问题----{}".format(e))
-
-        print("成功更新完成")
-
-    @classmethod
-    def findCategoryArticlesDaily(cls):
-        """
-        和每个账号计算相关性分数
-        :return:
-        """
-        category_list = [
-            "军事政法",
-            "健康养生",
-            "宗教历史",
-            "情感生活",
-            "娱乐八卦",
-            "新闻媒体",
-        ]
-        L = []
-        for category in tqdm(category_list):
-            print("{} is processing......".format(category))
-            category_total = coldPoolArticlesNum * cateMap.get(category, 0.1)
-            category_count = 0
-            while category_count < category_total:
-                article_list = cls.getTopArticles(category, 10)
-                if len(article_list) == 0:
-                    print("{}:  该品类没有数据了!".format(category))
-                    break
-                title_list = [article[2] for article in article_list]
-                score_list = cls.Fun.getTitleScore(title_list, "指尖奇文")['指尖奇文']['score_list']
-                for index, score in enumerate(score_list):
-                    content_id = article_list[index][0]
-                    if score >= 0.35:
-                        obj = {
-                            "id": article_list[index][0],
-                            "url": article_list[index][1],
-                            "title": article_list[index][2],
-                            "cate": category,
-                            "score": score
-                        }
-                        category_count += 1
-                        # cls.usedArticle(content_id=content_id)
-                        print("used_article")
-                        L.append(obj)
-                    else:
-                        # cls.badArticle(content_id=content_id)
-                        print("bad article")
-                    print(category_count)
-
-        return L
-
-    @classmethod
-    def findAssociationArticlesDaily(cls):
-        """
-        获取相关文章
-        :return:
-        """
-        target_num = int(0.4 * coldPoolArticlesNum)
-        sql = f"""
-            select id, ori_account_name, association_url, association_title, url_md5
-            from association_articles
-            where status = 1
-            order by read_count DESC
-            limit {target_num};"""
-        temp_list = cls.PqMysql.select(sql)
-        article_list = []
-        for i in tqdm(temp_list):
-            try:
-                o = {
-                    "related_account_name": i[1],
-                    "url": i[2],
-                    "title": i[3],
-                    "url_md5": i[4],
-                    # "id": i[4]
-                    "id": cls.Spider.get_article_text(i[2])['data']['data']['channel_content_id']
-                }
-            except:
-                o = {
-                    "related_account_name": i[1],
-                    "url": i[2],
-                    "title": i[3],
-                    "url_md5": i[4],
-                    "id": i[4]
-                }
-            article_list.append(o)
-        return article_list
-
-    @classmethod
-    def sendToColdPool(cls, plan_id=None):
-        """
-        把文章send至第四层
-        :return:
-        """
-        # 获取6个品类的数据
-        category_list = cls.findCategoryArticlesDaily()
-        # cls.splitCategoryToAccount(category_list)
-        # for line in category_list:
-        #     print(line)
-        association_list = cls.findAssociationArticlesDaily()
-        # for line in association_list:
-        #     print(line)
-        # 预分配账号
-        d_list = association_list + category_list
-        cls.splitCategoryToAccount(d_list)
-
-
-if __name__ == '__main__':
-    CT = ColdStartTask()
-    CT.sendToColdPool()
-

+ 15 - 1
tasks/task4.py

@@ -1,6 +1,8 @@
 """
 @author: luojunhui
 """
+import time
+
 import requests
 
 
@@ -9,9 +11,21 @@ def update_articles(gh_id):
     :param gh_id:
     :return:
     """
-    url = "http://61.48.133.26:6060/article_crawler"
+    # url = "http://47.98.136.48:6060/article_crawler"
+    url = "http://localhost:6060/article_crawler"
     headers = {"Content-Type": "application/json"}
     body = {"ghId": gh_id}
     response = requests.request("POST", url=url, headers=headers, json=body, timeout=120)
+    print(response.status_code)
+    print("info", response.text)
     print(response.json())
 
+
+# gh_id_list = ["gh_ac43e43b253b", "gh_93e00e187787", 'gh_68e7fdc09fe4', 'gh_77f36c109fb1', 'gh_7c66e0dbd2cf', 'gh_b181786a6c8c']
+#
+# for item in gh_id_list:
+#     try:
+#         update_articles(item)
+#         time.sleep(5)
+#     except Exception as e:
+#         print(e)

+ 15 - 25
tasks/task6.py

@@ -5,40 +5,29 @@
 import json
 
 from pandas import DataFrame
-
+from tqdm import tqdm
 from applications import DeNetMysql
 from applications import AIDTApi
 
 D = DeNetMysql()
 
 
-def get_accounts():
-    """
-    获取账号
-    :return:
-    """
-    sql = f"""select account_id from long_article_accounts_outside where category = '军事政法';"""
-    account_list = D.select(sql)
-    account_list_ = []
-    for account in account_list:
-        account_list_.append(account[0])
-    return account_list_
-
-
 def get_account_avg():
     """
     获取账号
     :return:
     """
-    with open("avg.json", encoding="utf-8") as f:
+    with open("/Users/luojunhui/cyber/LongArticlesJob/dev/avg_new_health.json", encoding="utf-8") as f:
         avg_dict = json.loads(f.read())
 
-    account_list = get_accounts()
+    account_position_list = list(avg_dict.keys())
     L = []
-    for account in account_list:
+    for account in tqdm(account_position_list):
+        gh_id = account[:-2]
+        index = int(account[-1:])
         select_sql = f"""
         select title, read_cnt, link from crawler_meta_article
-        where out_account_id = '{account}';
+        where out_account_id = '{gh_id}' and article_index = {index} and status = 1;
         """
         result_list = D.select(select_sql)
         try:
@@ -58,21 +47,22 @@ def get_account_avg():
     for line in sl:
         title = line[0]
         read_cnt = line[2]
-        if "农历" in title or "节" in title or line[3] < 1.3 or len(title) < 15 or read_cnt < 1000:
+        if "农历" in title or '太极' in title or "节" in title or line[3] < 1.3 or len(title) < 15 or read_cnt < 5000:
             a += 1
             continue
         else:
             b += 1
+            print(line)
             LL.append(line)
-    # print(a)
-    # print(b)
-    # df = DataFrame(LL, columns=["title", "link", "read", "read_avg"])
-    # df.to_excel("test.xlsx", index=False)
-    # url_list = [i[1] for i in LL]
+    print(a)
+    print(b)
+    df = DataFrame(LL, columns=["title", "link", "read", "read_avg"])
+    df.to_excel("health_2.xlsx", index=False)
+    # url_list = [i[1] for i in LL[3:]]
     # try:
     #     AIDTApi().updateArticleIntoCrawlerPlan(
     #         plan_id=None,
-    #         plan_name="军事政法类冷启-0805-new",
+    #         plan_name="历史冷启-0816-new",
     #         plan_tag="autoArticlePoolLevel1",
     #         url_list=url_list
     #     )

+ 0 - 23
tasks/task6_dev.py

@@ -1,23 +0,0 @@
-"""
-@author: luojunhui
-"""
-import json
-from applications import DeNetMysql
-
-D = DeNetMysql()
-
-with open("avg.json", encoding="utf-8") as f:
-    data = json.loads(f.read())
-
-for key in data:
-    print(key)
-    print(data[key])
-    sql = f"""
-    UPDATE long_article_accounts_outside
-    SET read_avg = %s 
-    where account_id = %s;
-    """
-    D.update(
-        sql=sql,
-        params=(data[key], key)
-    )

+ 109 - 40
updateMinigramInfoDaily.py

@@ -1,37 +1,94 @@
-import json
+"""
+@author luojunhui
+@description Update Daily
+"""
+import time
 
-from concurrent.futures.thread import ThreadPoolExecutor
 from tqdm import tqdm
 from datetime import datetime, timedelta
+import schedule
 
-from applications import AdMySQL, PQMySQL, WeixinSpider
+from applications import longArticlesMySQL, PQMySQL, WeixinSpider, Functions
 
 
 class DailyDataManager(object):
     """
     daily 数据每日更新
     """
-    ad_mysql = AdMySQL()
-    pq_mysql = PQMySQL()
-    wx_spider = WeixinSpider()
+    laMysql = longArticlesMySQL()
+    pqMysql = PQMySQL()
+    wxSpider = WeixinSpider()
+    functions = Functions()
 
     @classmethod
     def getPublishedArticles(cls):
         """
-        获取已经发布的文章的信息
+        获取已经发布的文章的信息, createTime 选择为前一天的 0 点并且转化为时间戳
         :return:
         """
+        today = datetime.today()
+        # 获取昨天的日期
+        yesterday = today - timedelta(days=1)
+        yesterday_midnight = datetime(year=yesterday.year, month=yesterday.month, day=yesterday.day)
+        yesterday_timestamp = yesterday_midnight.timestamp()
         sql2 = f"""
-        select ContentUrl, wx_sn, createTime 
-        from official_articles_v2 
-        where createTime >= 1724774400
+        select ContentUrl, wx_sn, createTime
+        from official_articles_v2
+        where createTime >= {yesterday_timestamp}
         and accountName in (
                         select distinct account_name from account_avg_info_v2
                         );
         """
-        result_list = cls.pq_mysql.select(sql2)
+        result_list = cls.pqMysql.select(sql2)
         return result_list
 
+    @classmethod
+    def updateInfo(cls, line):
+        """
+        update info into mysql
+        :return:
+        """
+        try:
+            wx_sn, mini_info, create_time = cls.getRootSourceIds(line)
+            dt_object = datetime.fromtimestamp(create_time)
+            publish_dt = dt_object.strftime('%Y-%m-%d')
+            one_day = timedelta(days=1)
+            two_day = timedelta(days=2)
+            next_day = dt_object + one_day
+            next_next_day = dt_object + two_day
+            recall_dt_list = [dt_object, next_day, next_next_day]
+            recall_dt_str_list = [i.strftime('%Y-%m-%d') for i in recall_dt_list]
+            for dt_str in recall_dt_str_list:
+                for index, item in enumerate(mini_info, 1):
+                    image_url = item['image_url']
+                    nick_name = item['nike_name']
+                    root_source_id = item['path'].split("rootSourceId%3D")[-1]
+                    video_id = item['path'].split("videos%3Fid%3D")[1].split("%26su%3D")[0]
+                    kimi_title = item['title']
+                    insert_sql = f"""
+                            INSERT INTO long_articles_detail_info
+                            (wx_sn, mini_title, mini_name, cover_url, video_index, root_source_id, video_id, publish_dt, recall_dt)
+                            values
+                            (%s, %s, %s, %s, %s, %s, %s, %s, %s);
+                        """
+                    cls.pqMysql.update(
+                        sql=insert_sql,
+                        params=(
+                            wx_sn,
+                            kimi_title,
+                            nick_name,
+                            image_url,
+                            index,
+                            root_source_id,
+                            video_id,
+                            publish_dt,
+                            dt_str
+                        )
+                    )
+        except Exception as e:
+            print(e)
+            pass
+
     @classmethod
     def getRootSourceIds(cls, data_info):
         """
@@ -39,9 +96,7 @@ class DailyDataManager(object):
         :return:
         """
         url = data_info[0]
-        article_detail = cls.wx_spider.get_article_text(url)
-        print(url)
-        print(article_detail)
+        article_detail = cls.wxSpider.get_article_text(url)
         mini_info = article_detail['data']['data']['mini_program']
         return data_info[1].decode(), mini_info, data_info[2]
 
@@ -49,7 +104,7 @@ class DailyDataManager(object):
     def getMinigramInfo(cls, rootSourceId):
         """
 
-        :param rootIdTuple:
+        :param rootSourceId:
         :return:
         """
         sql = f"""
@@ -57,7 +112,7 @@ class DailyDataManager(object):
         from changwen_data_base_v2
         where rootsourceid = '{rootSourceId}';
         """
-        result_list = cls.ad_mysql.select(sql)
+        result_list = cls.laMysql.select(sql)
 
         def summarize(values):
             """
@@ -117,32 +172,21 @@ class DailyDataManager(object):
 
         return summarize(result_list)
 
-    @classmethod
-    def getArticleInfo(cls, trace_id):
-        """
-        通过 trace_id来获取文章信息
-        :param trace_id:
-        :return:
-        """
-        sql = f"""
-        SELECT account_name, article_title
-        FROM long_articles_video
-        WHERE trace_id = '{trace_id}';
-        """
-        info = cls.pq_mysql.select(sql)
-        return info[0]
-
     @classmethod
     def updateDetail(cls):
         """
-
         :return:
         """
+        today = datetime.today()
+        # 获取昨天的日期
+        yesterday = today - timedelta(days=3)
+        yesterday_str = yesterday.__str__().split(" ")[0]
         sql = f"""
         select distinct root_source_id
-        from long_articles_detail_info;
+        from long_articles_detail_info
+        where publish_dt >= '{yesterday_str}';
         """
-        source_id_list = cls.pq_mysql.select(sql)
+        source_id_list = cls.pqMysql.select(sql)
         for item in tqdm(source_id_list):
             s_id = item[0]
             try:
@@ -153,14 +197,13 @@ class DailyDataManager(object):
                     fission_0 = result[key][1]
                     fission_1 = result[key][2]
                     fission_2 = result[key][3]
-                    print(key, first_level, fission_0, fission_1, fission_2)
                     update_sql = f"""
                     UPDATE long_articles_detail_info
                     set first_level = %s, fission_0 = %s, fission_1 = %s, fission_2 = %s
                     where root_source_id = %s and recall_dt = %s;
                     """
                     try:
-                        cls.pq_mysql.update(
+                        cls.pqMysql.update(
                             sql=update_sql,
                             params=(
                                 first_level, fission_0, fission_1, fission_2, s_id, recall_dt
@@ -172,8 +215,34 @@ class DailyDataManager(object):
                 print(e)
 
 
+def updateArticlesJob():
+    """
+    更新文章数据
+    :return:
+    """
+    DDM = DailyDataManager()
+    article_list = DDM.getPublishedArticles()
+    for article in tqdm(article_list):
+        DDM.updateInfo(article)
+    print("文章更新完成---{}".format(datetime.today().__str__()))
+
+
+def updateMinigramInfoJob():
+    """
+    更新前三天小程序数据
+    :return:
+    """
+    DDM = DailyDataManager()
+    DDM.updateDetail()
+    print("小程序更新完成---{}".format(datetime.today().__str__()))
+
+
 if __name__ == '__main__':
-    DM = DailyDataManager()
-    # result = DM.getMinigramInfo("longArticles_d409f27d9d64501d6811b47a3779d2d7")
-    # print(result)
-    # DM.updateDetail()
+
+    schedule.every().day.at("01:00").do(Functions().job_with_thread, updateArticlesJob)
+
+    schedule.every().day.at("04:30").do(Functions().job_with_thread, updateMinigramInfoJob)
+
+    while True:
+        schedule.run_pending()
+        time.sleep(1)

+ 8 - 1
updatePublishedMsgDaily.py

@@ -286,6 +286,7 @@ def updateJob():
     sub_accounts, server_accounts = getAccounts()
 
     for sub_item in tqdm(sub_accounts):
+        # updateSingleJob(db_client, sub_item['ghId'])
         try:
             updateSingleJob(db_client, sub_item['ghId'])
             time.sleep(2)
@@ -312,6 +313,12 @@ def checkJob():
     sub_accounts, server_accounts = getAccounts()
     fail_list = []
     account_list = sub_accounts + server_accounts
+    # check and rework if fail
+    for sub_item in tqdm(account_list):
+        res = checkSingleAccount(db_client, sub_item)
+        if not res:
+            updateSingleJob(db_client, sub_item['ghId'])
+    # check whether success and bot if fails
     for sub_item in tqdm(account_list):
         res = checkSingleAccount(db_client, sub_item)
         if not res:
@@ -335,7 +342,7 @@ def job_with_thread(job_func):
 
 if __name__ == '__main__':
 
-    schedule.every().day.at("21:00").do(job_with_thread, updateJob)
+    schedule.every().day.at("20:50").do(job_with_thread, updateJob)
 
     schedule.every().day.at("21:30").do(job_with_thread, checkJob)
 

+ 0 - 161
update_msg.py

@@ -1,161 +0,0 @@
-"""
-@author: luojunhui
-"""
-
-import time
-import json
-import threading
-
-import requests
-import schedule
-from tqdm import tqdm
-from datetime import datetime
-
-from config import accountBaseInfo
-from applications import PQMySQL
-from tasks.task4 import update_articles
-from applications.decoratorApi import retryOnTimeout
-
-
-class UpdateMsgDaily(object):
-    """
-    日常更新文章
-    """
-
-    db_client = PQMySQL()
-
-    @classmethod
-    def getAccountIdDict(cls):
-        """
-        获取全部内部账号的id
-        :return:
-        """
-        gh_id_dict = {}
-        for key in accountBaseInfo:
-            gh_id = accountBaseInfo[key]["ghId"]
-            name = accountBaseInfo[key]["accountName"]
-            gh_id_dict[gh_id] = name
-        return gh_id_dict
-
-    @classmethod
-    @retryOnTimeout()
-    def bot(cls, account_list):
-        """
-        机器人
-        """
-        url = "https://open.feishu.cn/open-apis/bot/v2/hook/b44333f2-16c0-4cb1-af01-d135f8704410"
-        headers = {"Content-Type": "application/json"}
-        payload = {
-            "msg_type": "interactive",
-            "card": {
-                "elements": [
-                    {
-                        "tag": "div",
-                        "text": {
-                            "content": "存在文章更新失败<at id=all></at>\n",
-                            "tag": "lark_md",
-                        },
-                    },
-                    {
-                        "tag": "div",
-                        "text": {
-                            "content": json.dumps(
-                                account_list, ensure_ascii=False, indent=4
-                            ),
-                            "tag": "lark_md",
-                        },
-                    },
-                ],
-                "header": {"title": {"content": "【重点关注】", "tag": "plain_text"}},
-            },
-        }
-        requests.request("POST", url=url, headers=headers, data=json.dumps(payload), timeout=10)
-
-    @classmethod
-    def checkEachAccount(cls, gh_id):
-        """
-        验证单个账号是否当天有更新
-        :param gh_id:
-        :return:
-        """
-        today_str = datetime.today().strftime("%Y-%m-%d")
-        today_date_time = datetime.strptime(today_str, "%Y-%m-%d")
-        today_timestamp = today_date_time.timestamp()
-        sql = f"""
-            select updateTime
-            from official_articles_v2
-            where ghId = '{gh_id}'
-            order by updateTime
-            desc
-        """
-        try:
-            latest_update_time = cls.db_client.select(sql)[0][0]
-            # 判断该账号当天发布的文章是否被收集
-            if int(latest_update_time) > int(today_timestamp):
-                return True
-            else:
-                return False
-        except Exception as e:
-            print("updateTime Error -- {}".format(e))
-            return False
-
-    @classmethod
-    def updateJob(cls):
-        """
-        更新文章任务
-        :return:
-        """
-        account_dict = cls.getAccountIdDict()
-        account_list = list(account_dict.keys())
-        for account_id in tqdm(account_list):
-            try:
-                update_articles(gh_id=account_id)
-            except Exception as e:
-                response = {
-                    "code": 1001,
-                    "info": "单个账号更新失败",
-                    "error": str(e),
-                    "time_stamp": datetime.now().__str__(),
-                }
-                print(response)
-
-    @classmethod
-    def checkJob(cls):
-        """
-        验证所有账号是否已经有更新数据
-        :return:
-        todo: 被封禁账号&&服务号需要做区分
-        """
-        account_dict = cls.getAccountIdDict()
-        error_account_list = []
-        for account_id in tqdm(account_dict):
-            if not cls.checkEachAccount(account_id):
-                name = account_dict[account_id]
-                error_account_list.append(name)
-        if error_account_list:
-            try:
-                cls.bot(error_account_list)
-            except Exception as e:
-                print("Timeout Error: {}".format(e))
-
-
-def job_with_thread(job_func):
-    """
-    每个任务放到单个线程中
-    :param job_func:
-    :return:
-    """
-    job_thread = threading.Thread(target=job_func)
-    job_thread.start()
-
-
-if __name__ == "__main__":
-    UMD = UpdateMsgDaily()
-
-    schedule.every().day.at("21:00").do(job_with_thread, UMD.updateJob)
-
-    schedule.every().day.at("21:30").do(job_with_thread, UMD.checkJob)
-
-    while True:
-        schedule.run_pending()
-        time.sleep(1)