123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111 |
- import time
- import schedule
- import multiprocessing
- from common.scheduling_db import MysqlHelper
- from common.aliyun_log import AliyunLogger
- def read_accounts_from_mysql():
- """
- Read accounts from mysql database
- """
- sql = f"""select tag, uid from crawler_user_v3 order by create_time desc;"""
- result = MysqlHelper.get_values(
- log_type="author", crawler="changsha", env="prod", sql=sql
- )
- limit_tag_dict = {
- "352": "余海涛",
- "353": "罗情",
- "53": "范军",
- "51": "鲁涛",
- "131": "王雪珂",
- "6682": "公众新号",
- "469": "小年糕",
- "464": "快手",
- "5662": "快手账号爬虫",
- "459": "spider",
- "85": "快手爬虫",
- "454": "账号",
- "467": "视频号",
- "106": "⭐️小年糕爬虫",
- "120": "西瓜新爬虫",
- "499": "抖音",
- "2235": "抖音爬虫"
- }
- p_dict = {}
- for item in result:
- tag_list = item['tag'].split(",")
- tag_set = set(tag_list)
- require_set = {'454', '459'}
- forbidden_set = {'131', '465', '1379', '160'}
- if len(tag_set) >= 5:
- if require_set.issubset(tag_set) and forbidden_set.isdisjoint(tag_set):
- w = [limit_tag_dict.get(tag, None) for tag in tag_list]
- p_dict[item['uid']] = w
- return p_dict
- def insert_accounts(account_dict):
- """
- 把长沙同学账号插入到 changsha_accounts 中
- """
- for key in account_dict:
- select_sql = f"""select id from changsha_user_accounts where piaoquan_account_id = {key};"""
- result = MysqlHelper.get_values(
- log_type="author", crawler="changsha", env="prod", sql=select_sql
- )
- if result:
- continue
- tags = set(account_dict[key])
- name_set = {'鲁涛', '罗情', '余海涛', '范军'}
- platform_set = {'西瓜新爬虫', '快手账号爬虫', '公众新号', '⭐️小年糕爬虫', '抖音爬虫', '视频号'}
- name = tags & name_set
- platform = tags & platform_set
- if name and platform:
- user_name = list(name)[0]
- platform_name = list(platform)[0]
- sql = f"""INSERT INTO changsha_user_accounts (piaoquan_account_id, user_name, platform) VALUES ('{key}', '{user_name}', '{platform_name}');"""
- MysqlHelper.update_values(log_type="author", crawler="changsha", sql=sql, env="prod")
- AliyunLogger.logging(
- code="8888",
- platform=platform,
- mode="author",
- env="prod",
- message="更新账号-{}-{}".format(user_name, key)
- )
- def protect_(function):
- """
- 守护进程,在程序启动后的某一个时段内守护爬虫进程
- :param function: 被守护的函数
- """
- process = multiprocessing.Process(target=function)
- process.start()
- while True:
- if not process.is_alive():
- process.terminate()
- time.sleep(60)
- process = multiprocessing.Process(target=function)
- process.start()
- time.sleep(60)
- def process_acc():
- """
- 执行函数
- """
- dd_dict = read_accounts_from_mysql()
- insert_accounts(dd_dict)
- def main():
- """
- 定时执行任务, 每天晚上更新账号
- """
- schedule.every().day.at("19:07").do(process_acc)
- if __name__ == '__main__':
- # protect_(main)
- process_acc()
|