瀏覽代碼

开发v0.3

罗俊辉 8 月之前
父節點
當前提交
edccbecc8c
共有 10 個文件被更改,包括 402 次插入160 次删除
  1. 1 1
      applications/wx_spider_api.py
  2. 29 29
      config/__init__.py
  3. 2 0
      longArticleJob.py
  4. 3 0
      spider/__init__.py
  5. 61 0
      spider/weixinCategoryCrawler.py
  6. 144 0
      stratrgy/distribution.py
  7. 127 109
      tasks/task2.py
  8. 5 5
      tasks/task3.py
  9. 0 16
      tasks/task4.py
  10. 30 0
      update_msg.py

+ 1 - 1
applications/wx_spider_api.py

@@ -17,7 +17,7 @@ def retryOnNone():
         :param func:
         :return:
         """
-        max_retries = 5
+        max_retries = 10
         wait_seconds = 1
 
         def wrapper(*args, **kwargs):

+ 29 - 29
config/__init__.py

@@ -6,39 +6,39 @@ import json
 planConfigDict = {
     # "20240721063854162433630": "动漫二次元",
     # "20240721064001535851572": "游戏",
-    "20240721064012570747614": "装修设计",
-    "20240721064027033693901": "家居生活",
-    "20240721064037856692958": "创意美学",
-    "20240721064054346901161": "时尚潮流",
-    "20240721065139955704312": "美容美妆",
-    "20240721065144690822816": "服饰穿搭",
-    "20240721065204600967818": "好物种草",
-    "20240721065216328783909": "影视影评",
-    "20240721065222773392809": "娱乐八卦",
-    "20240721065313016877129": "情感生活",
-    "20240721065418085156676": "文化文学",
-    "20240721065444407933304": "生活方式",
+    # "20240721064012570747614": "装修设计",
+    # "20240721064027033693901": "家居生活",
+    # "20240721064037856692958": "创意美学",
+    # "20240721064054346901161": "时尚潮流",
+    # "20240721065139955704312": "美容美妆",
+    # "20240721065144690822816": "服饰穿搭",
+    # "20240721065204600967818": "好物种草",
+    # "20240721065216328783909": "影视影评",
+    # "20240721065222773392809": "娱乐八卦",
+    # "20240721065313016877129": "情感生活",
+    # "20240721065418085156676": "文化文学",
+    # "20240721065444407933304": "生活方式",
     # "20240721065448295911613": "宗教历史",
     # "20240721065456074517643": "军事政法",
-    "20240721065536790163449": "金融财经",
-    "20240721065541597979774": "房产楼市",
-    "20240721065548525430079": "科学科普",
-    "20240721065636638920221": "互联网",
+    # "20240721065536790163449": "金融财经",
+    # "20240721065541597979774": "房产楼市",
+    # "20240721065548525430079": "科学科普",
+    # "20240721065636638920221": "互联网",
     # "20240721065642988552526": "数码3C",
-    "20240721065757870627696": "新闻媒体",
-    "20240721065858360363927": "行业资讯",
-    "20240721065936394197711": "区域生活",
-    "20240721070021406546573": "健康养生",
-    "20240721070027562219631": "体育赛事",
-    "20240721070031464491910": "运动健身",
+    # "20240721065757870627696": "新闻媒体",
+    # "20240721065858360363927": "行业资讯",
+    # "20240721065936394197711": "区域生活",
+    # "20240721070021406546573": "健康养生",
+    # "20240721070027562219631": "体育赛事",
+    # "20240721070031464491910": "运动健身",
     # "20240721070120535645091": "母婴育儿",
-    "20240721070316597585132": "教育培训",
-    "20240721070328794991834": "校园生活",
-    "20240721070341572360754": "职场管理",
-    "20240721070358450257397": "餐饮美食",
-    "20240721070400833119871": "星座命理",
-    "20240721070405096586304": "搞笑幽默",
-    "20240721070427794574827": "旅游出行",
+    # "20240721070316597585132": "教育培训",
+    # "20240721070328794991834": "校园生活",
+    # "20240721070341572360754": "职场管理",
+    # "20240721070358450257397": "餐饮美食",
+    # "20240721070400833119871": "星座命理",
+    # "20240721070405096586304": "搞笑幽默",
+    # "20240721070427794574827": "旅游出行",
     # "20240721070443436973433": "汽车",
     # "20240721070450497257695": "音乐",
     "20240723131249517316218": "军事政法-年龄56-66",

+ 2 - 0
longArticleJob.py

@@ -27,3 +27,5 @@ class Job(object):
         cold_start.sendToColdPool()
 
 
+
+

+ 3 - 0
spider/__init__.py

@@ -0,0 +1,3 @@
+"""
+@author: luojunhui
+"""

+ 61 - 0
spider/weixinCategoryCrawler.py

@@ -0,0 +1,61 @@
+"""
+@author: luojunhui
+抓取全局品类文章
+"""
+import json
+import time
+
+from applications import WeixinSpider
+
+
+class weixinCategory(object):
+    """
+    微信全局品类账号抓取
+    """
+
+    def __init__(self, spider_client):
+        self.spider_client = spider_client
+        self.spider = WeixinSpider()
+
+    def getAccountList(self):
+        """
+        获取账号
+        :return:
+        """
+        now_time = int(time.time())
+        twenty_hours_ago = now_time - 3600 * 20
+        sql = f"""select * from long_article_accounts_outside where latest_article_timestamp < {twenty_hours_ago};"""
+        account_tuple = self.spider_client.select(sql)
+        result = [
+            {
+                "gh_id": i[0],
+                "platform": i[1],
+                "account_name": i[2],
+                "category": i[3],
+                "latest_timestamp": i[4]
+            } for i in account_tuple
+        ]
+        return result
+
+    def update_data_into_mysql(self, msg_list):
+        """
+        将数据更新到数据库
+        :return:
+        """
+        for obj in msg_list['data']['data']:
+            print(json.dumps(obj, ensure_ascii=False, indent=4))
+
+    def updateEachAccountArticles(self, gh_id, latest_time_stamp):
+        """
+        更新账号文章
+        :return:
+        """
+        index = None
+        msg_list = self.spider.update_msg_list(ghId=gh_id, index=index)
+        latest_time_stamp_in_this_msg = msg_list['data']['data'][-1]['BaseInfo']
+        print(latest_time_stamp_in_this_msg)
+
+
+if __name__ == '__main__':
+    wc = weixinCategory(spider_client="123")
+    wc.updateEachAccountArticles("gh_ddafea4bcc29", latest_time_stamp=1)

+ 144 - 0
stratrgy/distribution.py

@@ -0,0 +1,144 @@
+"""
+@author: luojunhui
+分发逻辑
+"""
+import json
+import datetime
+from applications import PQMySQL, WeixinSpider
+from tqdm import tqdm
+from config import accountBaseInfo
+
+
+class ArticleDistribution(object):
+    """
+    冷启文章分发逻辑
+    """
+    account_position_dict = {
+        "gh_058e41145a0c": 30,
+        "gh_0e4fd9e88386": 30,
+        "gh_744cb16f6e16": 30,
+        "gh_ac43eb24376d": 30,
+        "gh_970460d9ccec": 30,
+        "gh_56ca3dae948c": 30,
+        "gh_c91b42649690": 30,
+        "gh_6d205db62f04": 30,
+        "gh_e24da99dc899": 30,
+        "gh_4c058673c07e": 30,
+        "gh_03d32e83122f": 30,
+        "gh_c69776baf2cd": 30,
+        "gh_30816d8adb52": 30,
+        "gh_789a40fe7935": 30,
+        "gh_95ed5ecf9363": 30,
+        "gh_3e91f0624545": 30,
+        "gh_57573f01b2ee": 30,
+        "gh_9877c8541764": 30,
+        "gh_6cfd1132df94": 30,
+        "gh_008ef23062ee": 30,
+        "gh_5ae65db96cb7": 30,
+        "gh_be8c29139989": 30,
+        "gh_51e4ad40466d": 30,
+        "gh_d4dffc34ac39": 30,
+        "gh_89ef4798d3ea": 30,
+        "gh_b15de7c99912": 30,
+        "gh_9f8dc5b0c74e": 30,
+        "gh_7b4a5f86d68c": 30,
+        "gh_c5cdf60d9ab4": 5,
+        "gh_0c89e11f8bf3": 5,
+        "gh_e0eb490115f5": 5,
+        "gh_a2901d34f75b": 5,
+        "gh_d5f935d0d1f2": 30
+    }
+    pq_mysql_client = PQMySQL()
+    Spider = WeixinSpider()
+
+    @classmethod
+    def generate_account_dict(cls):
+        """
+        生成account_list
+        :return:
+        """
+        account_dict = {}
+        for key in accountBaseInfo:
+            account_name = accountBaseInfo[key]['accountName']
+            account_gh_id = accountBaseInfo[key]['ghId']
+            account_dict[account_name] = account_gh_id
+        return account_dict
+
+    @classmethod
+    def findArticleScoreList(cls, url_md5):
+        """
+        获取文章的相关账号的相关性分数
+        :param url_md5:
+        :return:
+        """
+        sql = f"""
+        select account_score, ori_account from association_articles where url_md5 = '{url_md5}';
+        """
+        response = cls.pq_mysql_client.select(sql=sql)
+        return response
+
+    @classmethod
+    def association_split(cls, article_list):
+        """
+        联想类型文章分发逻辑
+        {
+        'url': 'http://mp.weixin.qq.com/s?__biz=MzkxOTUzMTYwNg==&mid=2247490482&idx=1&sn=14553e013cbc15b0448332935f7835df&chksm=c06ad7c3e89bcaf88fda88eaafd66295fc82d1ecea66ab3fd5df5393932a01ad85d11565f9f6&scene=126&sessionid=1679649075#rd',
+        'title': '不和中国合作了?俄罗斯穾然宣布:取消1.7万亿合作项目,绝不接受中国技术',
+        'url_md5': '4dd9ed803305a4ca53139443ec311b27',
+        'id': 'http://mp.weixin.qq.com/s?__biz=MzkxOTUzMTYwNg==&mid=2247490482&idx=1&sn=14553e013cbc15b0448332935f7835df&chksm=c06ad7c3e89bcaf88fda88eaafd66295fc82d1ecea66ab3fd5df5393932a01ad85d11565f9f6&scene=126&sessionid=1679649075#rd'
+        }
+        :param article_list:
+        :return:
+        """
+        account_name_map = cls.generate_account_dict()
+        L = {}
+        for article in tqdm(article_list):
+            link = article['url']
+            url_md5 = article['url_md5']
+            title = article['title']
+            title_match_list = cls.findArticleScoreList(url_md5)
+            title_match_list = sorted(title_match_list, key=lambda x: x[0], reverse=True)
+            # print("标题:\t", title)
+            # print("相关账号:\t", title_match_list)
+            # print("\n")
+            for account_tuple in title_match_list:
+                account_name = account_tuple[1]
+                score = account_tuple[0]
+                account_gh_id = account_name_map[account_name]
+                if cls.account_position_dict.get(account_gh_id):
+                    try:
+                        channel_content_id = cls.Spider.get_article_text(link)['data']['data']['channel_content_id']
+                    except:
+                        print(link)
+                        channel_content_id = url_md5
+                    # channel_content_id = "id"
+                    if cls.account_position_dict[account_gh_id] > 0:
+                        if L.get(account_gh_id):
+                            if len(L[account_gh_id]) >= 10:
+                                continue
+                            else:
+                                L[account_gh_id].append([channel_content_id, score])
+                        else:
+                            L[account_gh_id] = [[channel_content_id, score]]
+                        cls.account_position_dict[account_gh_id] -= 1
+                    else:
+                        continue
+        for account in tqdm(L):
+            date_str = datetime.datetime.today().strftime("%Y-%m-%d")
+            print(account, date_str, json.dumps(L[account], ensure_ascii=False))
+            insert_sql = f"""
+            INSERT INTO article_pre_distribute_account
+            (gh_id, date, article_list)
+            VALUES
+            (%s, %s, %s);
+            """
+            try:
+                PQMySQL.update(sql=insert_sql, params=(account, date_str, json.dumps(L[account], ensure_ascii=False)))
+            except Exception as e:
+                print("插入出现问题----{}".format(e))
+        return L
+
+
+
+
+

+ 127 - 109
tasks/task2.py

@@ -8,6 +8,7 @@ from tqdm import tqdm
 
 from applications import AIDTApi, DeNetMysql, PQMySQL, Functions, WeixinSpider
 from config import cateMap, coldPoolArticlesNum, accountBaseInfo
+from stratrgy.distribution import ArticleDistribution
 
 
 class ColdStartTask(object):
@@ -19,6 +20,7 @@ class ColdStartTask(object):
     PqMysql = PQMySQL()
     Fun = Functions()
     Spider = WeixinSpider()
+    D = ArticleDistribution()
     pool3 = "autoArticlePoolLevel3"
 
     @classmethod
@@ -158,16 +160,17 @@ class ColdStartTask(object):
 
         for account in tqdm(account_article_dict):
             date_str = datetime.datetime.today().strftime("%Y-%m-%d")
-            insert_sql = f"""
-            INSERT INTO article_pre_distribute_account
-            (gh_id, date, article_list)
-            VALUES
-            (%s, %s, %s);
-            """
-            try:
-                PQMySQL.update(sql=insert_sql, params=(account, date_str, json.dumps(account_article_dict[account], ensure_ascii=False)))
-            except Exception as e:
-                print("插入出现问题----{}".format(e))
+            print(account, date_str, json.dumps(account_article_dict[account], ensure_ascii=False))
+            # insert_sql = f"""
+            # INSERT INTO article_pre_distribute_account
+            # (gh_id, date, article_list)
+            # VALUES
+            # (%s, %s, %s);
+            # """
+            # try:
+            #     PQMySQL.update(sql=insert_sql, params=(account, date_str, json.dumps(account_article_dict[account], ensure_ascii=False)))
+            # except Exception as e:
+            #     print("插入出现问题----{}".format(e))
 
         print("成功更新完成")
 
@@ -179,11 +182,11 @@ class ColdStartTask(object):
         """
         category_list = [
             "军事政法",
-            "健康养生",
+            # "健康养生",
             "宗教历史",
-            "情感生活",
-            "娱乐八卦",
-            "新闻媒体",
+            # "情感生活",
+            # "娱乐八卦",
+            # "新闻媒体",
         ]
         L = []
         for category in tqdm(category_list):
@@ -224,13 +227,14 @@ class ColdStartTask(object):
         获取相关文章
         :return:
         """
-        target_num = int(0.4 * coldPoolArticlesNum)
+        # target_num = int(0.8 * coldPoolArticlesNum)
         sql = f"""
-            select id, ori_account_name, association_url, association_title, url_md5
+            select id, publish_timestamp, title, link, title_score, url_md5
             from association_articles
-            where status = 1
-            order by read_count DESC
-            limit {target_num};"""
+            where status = 1 and content_length > 500
+            order by publish_timestamp
+            DESC limit 10000;
+        """
         temp_list = cls.PqMysql.select(sql)
         id_tuple = tuple([i[0] for i in temp_list])
         update_sql = f"""
@@ -239,24 +243,37 @@ class ColdStartTask(object):
             where id in %s
         """
         cls.PqMysql.update(sql=update_sql, params=(0, id_tuple))
+        # url_md5去重
+        L = {}
+        for line in temp_list:
+            key = line[-1]
+            if L.get(key):
+                L[key].append(list(line))
+            else:
+                L[key] = [list(line)]
+
+        LL = []
+        for key in L:
+            value_list = L[key]
+            sorted_k = sorted(value_list, reverse=True, key=lambda x: (x[1], x[4]))
+            LL.append(sorted_k[0])
         article_list = []
-        for i in tqdm(temp_list):
+        LL = sorted(LL, reverse=True, key=lambda x: x[1])
+        for i in tqdm(LL[:int(680 * 0.8)]):
             try:
                 o = {
-                    "related_account_name": i[1],
-                    "url": i[2],
-                    "title": i[3],
-                    "url_md5": i[4],
-                    # "id": i[4]
-                    "id": cls.Spider.get_article_text(i[2])['data']['data']['channel_content_id']
+                    "url": i[3],
+                    "title": i[2],
+                    "url_md5": i[5],
+                    "id": i[3]
+                    # "id": cls.Spider.get_article_text(i[3])['data']['data']['channel_content_id']
                 }
             except:
                 o = {
-                    "related_account_name": i[1],
-                    "url": i[2],
-                    "title": i[3],
-                    "url_md5": i[4],
-                    "id": i[4]
+                    "url": i[3],
+                    "title": i[2],
+                    "url_md5": i[5],
+                    "id": i[3]
                 }
             article_list.append(o)
         return article_list
@@ -264,88 +281,89 @@ class ColdStartTask(object):
     @classmethod
     def sendToColdPool(cls, plan_id=None):
         """
-        把文章send至第
+        把文章send至第
         :return:
         """
         # 获取6个品类的数据
         association_list = cls.findAssociationArticlesDaily()
-        category_list = cls.findCategoryArticlesDaily()
-        d_list = category_list + association_list
-        # 预分配账号
-        cls.splitCategoryToAccount(d_list)
-
-        try:
-            army = [i for i in category_list if i['cate'] == '军事政法']
-            cls.AidApi.updateArticleIntoCrawlerPlan(
-                plan_id=plan_id,
-                plan_name="军事政法类冷启",
-                plan_tag=cls.pool3,
-                url_list=[i['url'] for i in army]
-            )
-        except Exception as e:
-            print("error--{}".format(e))
-
-        try:
-            history = [i for i in category_list if i['cate'] == '宗教历史']
-            cls.AidApi.updateArticleIntoCrawlerPlan(
-                plan_id=plan_id,
-                plan_name="宗教历史类冷启",
-                plan_tag=cls.pool3,
-                url_list=[i['url'] for i in history]
-            )
-        except Exception as e:
-            print("error--{}".format(e))
-
-        try:
-            news = [i for i in category_list if i['cate'] == '新闻媒体']
-            cls.AidApi.updateArticleIntoCrawlerPlan(
-                plan_id=plan_id,
-                plan_name="新闻媒体类冷启",
-                plan_tag=cls.pool3,
-                url_list=[i['url'] for i in news]
-            )
-        except Exception as e:
-            print("error--{}".format(e))
-
-        try:
-            life = [i for i in category_list if i['cate'] == '情感生活']
-            cls.AidApi.updateArticleIntoCrawlerPlan(
-                plan_id=plan_id,
-                plan_name="生活情感类冷启",
-                plan_tag=cls.pool3,
-                url_list=[i['url'] for i in life]
-            )
-        except Exception as e:
-            print("error--{}".format(e))
-
-        try:
-            healthy = [i for i in category_list if i['cate'] == '健康养生']
-            cls.AidApi.updateArticleIntoCrawlerPlan(
-                plan_id=plan_id,
-                plan_name="健康养生类冷启",
-                plan_tag=cls.pool3,
-                url_list=[i['url'] for i in healthy]
-            )
-        except Exception as e:
-            print("error--{}".format(e))
-
-        try:
-            fun = [i for i in category_list if i['cate'] == '娱乐八卦']
-            cls.AidApi.updateArticleIntoCrawlerPlan(
-                plan_id=plan_id,
-                plan_name="娱乐八卦类冷启",
-                plan_tag=cls.pool3,
-                url_list=[i['url'] for i in fun]
-            )
-        except Exception as e:
-            print("error--{}".format(e))
-
-        cls.AidApi.updateArticleIntoCrawlerPlan(
-            plan_id=plan_id,
-            plan_name="文章账号联想冷启",
-            plan_tag=cls.pool3,
-            url_list=[i['url'] for i in association_list]
-        )
+        cls.D.association_split(association_list)
+        # category_list = cls.findCategoryArticlesDaily()
+        # d_list = category_list + association_list
+        # # # 预分配账号
+        # cls.splitCategoryToAccount(association_list)
+        # #
+        # try:
+        #     army = [i for i in category_list if i['cate'] == '军事政法']
+        #     cls.AidApi.updateArticleIntoCrawlerPlan(
+        #         plan_id=plan_id,
+        #         plan_name="军事政法类冷启",
+        #         plan_tag=cls.pool3,
+        #         url_list=[i['url'] for i in army]
+        #     )
+        # except Exception as e:
+        #     print("error--{}".format(e))
+        #
+        # try:
+        #     history = [i for i in category_list if i['cate'] == '宗教历史']
+        #     cls.AidApi.updateArticleIntoCrawlerPlan(
+        #         plan_id=plan_id,
+        #         plan_name="宗教历史类冷启",
+        #         plan_tag=cls.pool3,
+        #         url_list=[i['url'] for i in history]
+        #     )
+        # except Exception as e:
+        #     print("error--{}".format(e))
+        # #
+        # # try:
+        # #     news = [i for i in category_list if i['cate'] == '新闻媒体']
+        # #     cls.AidApi.updateArticleIntoCrawlerPlan(
+        # #         plan_id=plan_id,
+        # #         plan_name="新闻媒体类冷启",
+        # #         plan_tag=cls.pool3,
+        # #         url_list=[i['url'] for i in news]
+        # #     )
+        # # except Exception as e:
+        # #     print("error--{}".format(e))
+        # #
+        # # try:
+        # #     life = [i for i in category_list if i['cate'] == '情感生活']
+        # #     cls.AidApi.updateArticleIntoCrawlerPlan(
+        # #         plan_id=plan_id,
+        # #         plan_name="生活情感类冷启",
+        # #         plan_tag=cls.pool3,
+        # #         url_list=[i['url'] for i in life]
+        # #     )
+        # # except Exception as e:
+        # #     print("error--{}".format(e))
+        # #
+        # # try:
+        # #     healthy = [i for i in category_list if i['cate'] == '健康养生']
+        # #     cls.AidApi.updateArticleIntoCrawlerPlan(
+        # #         plan_id=plan_id,
+        # #         plan_name="健康养生类冷启",
+        # #         plan_tag=cls.pool3,
+        # #         url_list=[i['url'] for i in healthy]
+        # #     )
+        # # except Exception as e:
+        # #     print("error--{}".format(e))
+        # #
+        # # try:
+        # #     fun = [i for i in category_list if i['cate'] == '娱乐八卦']
+        # #     cls.AidApi.updateArticleIntoCrawlerPlan(
+        # #         plan_id=plan_id,
+        # #         plan_name="娱乐八卦类冷启",
+        # #         plan_tag=cls.pool3,
+        # #         url_list=[i['url'] for i in fun]
+        # #     )
+        # # except Exception as e:
+        # #     print("error--{}".format(e))
+        # #
+        # cls.AidApi.updateArticleIntoCrawlerPlan(
+        #     plan_id=plan_id,
+        #     plan_name="文章账号联想冷启",
+        #     plan_tag=cls.pool3,
+        #     url_list=[i['url'] for i in association_list]
+        # )
 
 
 if __name__ == '__main__':

+ 5 - 5
tasks/task3.py

@@ -27,11 +27,11 @@ class SendToMultiLevels(object):
         :return:
         """
         sql = f"""
-        select article_id, read_count from changwen_article_datastat
-        where article_id in (
-            select id from changwen_article
-            where publish_timestamp >= 1721664000000
-        ) and read_count > 100;
+            select article_id, read_count from changwen_article_datastat
+            where article_id in (
+                select id from changwen_article
+                where publish_timestamp >= 1722268800000
+            ) and read_count > 100;
         """
         result = cls.Ad.select(sql=sql)
         response_list = [

+ 0 - 16
tasks/task4.py

@@ -13,24 +13,8 @@ def update_articles(gh_id):
     :return:
     """
     url = "http://61.48.133.26:6060/article_crawler"
-
     headers = {"Content-Type": "application/json"}
-
     body = {"ghId": gh_id}
     response = requests.request("POST", url=url, headers=headers, json=body)
     print(response.json())
 
-
-if __name__ == '__main__':
-    gh_id_set = set()
-    for key in accountBaseInfo:
-        value = accountBaseInfo[key]['ghId']
-        gh_id_set.add(value)
-
-    for gh_id in tqdm(gh_id_set):
-        try:
-            update_articles(gh_id)
-        except Exception as e:
-            print(e)
-            continue
-

+ 30 - 0
update_msg.py

@@ -0,0 +1,30 @@
+"""
+@author: luojunhui
+"""
+import time
+from config import accountBaseInfo
+from tqdm import tqdm
+from tasks.task4 import update_articles
+import schedule
+
+
+def run():
+    gh_id_set = set()
+    for key in accountBaseInfo:
+        value = accountBaseInfo[key]['ghId']
+        gh_id_set.add(value)
+
+    for gh_id in tqdm(gh_id_set):
+        try:
+            update_articles(gh_id)
+        except Exception as e:
+            print(e)
+            continue
+
+
+if __name__ == '__main__':
+    schedule.every().day.at("21:00").do(run)
+    while True:
+        schedule.run_pending()
+        print("定时任务正在执行")
+        time.sleep(1)