conf_task.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519
  1. import copy
  2. import logging
  3. import os
  4. import sys
  5. import time
  6. import requests
  7. from flask import Flask, request
  8. from flask import jsonify
  9. from dotenv import load_dotenv
  10. sys.path.append(os.path.abspath(os.path.join(os.getcwd(), "..")))
  11. from conf.config import get_config
  12. from common.db.mysql_help import MysqlHelper
  13. load_dotenv(verbose=True)
  14. env = os.getenv('env')
  15. app = Flask(__name__)
  16. app.config['JSON_AS_ASCII'] = False
  17. # mysql实例
  18. mysql_con = MysqlHelper()
  19. conf = get_config()
  20. @app.route("/v1/crawler/task/dellink", methods=["POST"])
  21. def delSpiderLink():
  22. data = request.json
  23. spider_links = data['spider_link']
  24. task_id = data['task_id']
  25. del_link = []
  26. for link in spider_links:
  27. up_sql = f'update crawler_author_map set is_del=0 where spider_link="{link}"'
  28. MysqlHelper.update_values(up_sql)
  29. del_link.append(link)
  30. sql = f'select spider_link from crawler_task where task_id ={task_id}'
  31. task = mysql_con.get_values(sql)
  32. spider_link = eval(task[0]['spider_link'])
  33. for link in spider_link:
  34. spider_link.remove(link)
  35. u_sql = f'update crawler_task set spider_link="{spider_link}" where task_id={task_id}'
  36. mysql_con.update_values(u_sql)
  37. if del_link:
  38. return jsonify({'code': 200, 'message': '抓取名单删除成功', 'del_link': del_link})
  39. else:
  40. return jsonify({'code': 400, 'message': '抓取名单删除失败', 'del_link': del_link, 'spider_link': spider_links})
  41. @app.route("/v1/crawler/task/getcategory", methods=["GET"])
  42. def getCategory():
  43. sql = f'select id, content_category from crawler_content_category'
  44. result = mysql_con.get_values(sql)
  45. return jsonify({'code': 200, 'data': result})
  46. @app.route("/v1/crawler/task/getmodename", methods=["GET"])
  47. def getModeName():
  48. sql = f'select id, mode_name from crawler_mode'
  49. result = mysql_con.get_values(sql)
  50. return jsonify({'code': 200, 'data': result})
  51. @app.route("/v1/crawler/task/getmodeboard", methods=["GET"])
  52. def getModeBoard():
  53. sql = f'select id, mode_board from crawler_board'
  54. result = mysql_con.get_values(sql)
  55. return jsonify({'code': 200, 'data': result})
  56. @app.route("/v1/crawler/user/findmedia", methods=["GET"])
  57. def getMediaInfo():
  58. data = request.args.to_dict()
  59. task_id = data['task_id']
  60. sql = f'select * from crawler_author_map where task_id={task_id}'
  61. result = mysql_con.get_values(sql)
  62. task_user_info = []
  63. for task_info in result:
  64. media_id = task_info['media_id']
  65. media_info = requests.get(url=conf['select_media_url'], params={'uid': media_id}).json()['content']
  66. media_name = media_info['longvideoNickName'] if media_info['longvideoNickName'] else media_info['nickName']
  67. nick_name = task_info['nick_name']
  68. spider_link = task_info['spider_link']
  69. create_user_time = task_info['create_user_time']
  70. media_data = dict(
  71. media_name=media_name,
  72. nick_name=nick_name,
  73. spider_link=spider_link,
  74. media_id={'media_id': media_id, 'media_url': conf['media_main_url'].format(media_id)},
  75. create_user_time=create_user_time * 1000
  76. )
  77. task_user_info.append(media_data)
  78. return jsonify({'code': 200, 'data': task_user_info})
  79. @app.route("/v1/crawler/task/findtask", methods=["GET"])
  80. def getTaskUserInfo():
  81. # 根据条件查找任务
  82. data = request.args.to_dict()
  83. values = ''
  84. for k, v in data.items():
  85. if isinstance(v, int):
  86. values += f'{k}={v} and '
  87. else:
  88. values += f'{k}="{v}" and '
  89. sql = f"select task_id from crawler_author_map where {values[:-4]}" # [:-1]是为了去掉末尾的逗号
  90. res = mysql_con.get_values(sql)
  91. task_id = res['task_id']
  92. sql = f'select task_name, source, task_type, create_task_user, insert_time, update_task_user, update_time from crawler_task where task_id={task_id} '
  93. task_info = mysql_con.get_values(sql)
  94. return jsonify({'code': 200, 'data': task_info})
  95. # 只接受get方法访问
  96. @app.route("/v1/crawler/source/getall", methods=["GET"])
  97. def getSource():
  98. try:
  99. # 获取传入的params参数
  100. get_data = request.args.to_dict()
  101. # # 对参数进行操作
  102. sql = 'select source, task_type, spider_name, machine, source_desc, task_type_desc, spider_name_desc from crawler_source'
  103. result = mysql_con.get_values(sql)
  104. if not result:
  105. return jsonify({'code': '200', 'result': [], 'message': '没有更多数据'})
  106. source_list = list()
  107. for source_info in result:
  108. source_dict = {
  109. 'task_type': [
  110. {
  111. 'type': source_info['task_type'],
  112. 'description': source_info['task_type_desc'],
  113. 'spider': {
  114. 'spider_name': source_info['spider_name'],
  115. 'description': source_info['spider_name_desc']
  116. }
  117. }
  118. ],
  119. 'description': source_info['source_desc'],
  120. 'source': source_info['source'],
  121. 'machine': source_info['machine']
  122. }
  123. source_list.append(source_dict)
  124. except Exception as e:
  125. return jsonify({'code': '400', 'message': '获取数据源信息失败'})
  126. return jsonify({'code': '200', 'result': source_list})
  127. @app.route("/v1/crawler/task/checkrepeat", methods=["POST"])
  128. def get_repeat_list():
  129. data = request.json
  130. # 字段转换
  131. spider_links = data.get('spider_link')
  132. repeat_list = list()
  133. # 判断是否为重复名单
  134. for spider_link in spider_links:
  135. if isinstance(spider_link, int):
  136. s_sql = f"""select spider_link from crawler_author_map where spider_link={spider_link}"""
  137. else:
  138. s_sql = f"""select spider_link from crawler_author_map where spider_link='{spider_link}'"""
  139. result = mysql_con.get_values(s_sql)
  140. if result:
  141. repeat_list.append(spider_link)
  142. if repeat_list:
  143. return jsonify({'code': 400, 'message': '名单重复', 'repeat_list': repeat_list})
  144. else:
  145. return jsonify({'code': 200, 'message': '抓取名单校验通过', 'repeat_list': repeat_list})
  146. @app.route("/v1/crawler/task/insert", methods=["POST"])
  147. def insertTask():
  148. try:
  149. data = request.json
  150. user_data = copy.deepcopy(data)
  151. tag_name_list = []
  152. content_tag_list = []
  153. user_tag = data['user_tag']
  154. user_content_tag = data['user_content_tag']
  155. for tag in user_tag:
  156. tag_name_list.append(tag['tagName'])
  157. for tag in user_content_tag:
  158. content_tag_list.append(tag['tagName'])
  159. if data['min_publish_time']:
  160. data['min_publish_time'] = int(data['min_publish_time'] / 1000)
  161. else:
  162. data['min_publish_time'] = 0
  163. # if data['min_publish_day']:
  164. # data['min_publish_day'] = 0
  165. data['next_time'] = int(data['next_time'] / 1000)
  166. data['insert_time'] = int(time.time())
  167. data['update_time'] = int(time.time())
  168. data['spider_link'] = str(data['spider_link'])
  169. data['spider_rule'] = str(data['spider_rule'])
  170. data['user_tag_info'] = str(user_tag)
  171. data['content_tag_info'] = str(user_content_tag)
  172. data['user_tag'] = ','.join(str(i) for i in tag_name_list)
  173. data['user_content_tag'] = ','.join(str(i) for i in content_tag_list)
  174. # data['crawler_interval'] = data.pop('interval')
  175. # 获取到一个以键且为逗号分隔的字符串,返回一个字符串
  176. keys = ','.join(data.keys())
  177. values = ','.join(['%s'] * len(data))
  178. sql = 'insert into {table}({keys}) VALUES({values})'.format(table='crawler_task', keys=keys, values=values)
  179. task_id = mysql_con.insert_values(sql, tuple(data.values()))
  180. if task_id:
  181. success_list, fail_list = create_uid(user_data, task_id)
  182. return jsonify(
  183. {'code': 200, 'message': 'task create success', 'success_list': success_list, 'fail_list': fail_list})
  184. except Exception as e:
  185. return jsonify({'code': 500, 'message': '任务写入失败,原因:{e}'})
  186. @app.route("/v1/crawler/task/gettask", methods=["POST"])
  187. def getAllTask():
  188. try:
  189. get_data = request.json
  190. page = int(get_data.get('page', 1))
  191. offset = int(get_data.get('offset', 10))
  192. start_count = (page * offset) - offset
  193. end_count = page * offset
  194. if get_data.get('fields'):
  195. select_data = get_data['fields']
  196. values = ''
  197. for k, v in select_data.items():
  198. if isinstance(v, int):
  199. values += f'{k}={v} and '
  200. else:
  201. values += f'{k}="{v}" and '
  202. sql = f"select task_id from crawler_author_map where {values[:-4]} and is_del=1" # [:-1]是为了去掉末尾的逗号
  203. res = mysql_con.get_values(sql)
  204. task_list = list()
  205. for author_info in res:
  206. task_id = author_info['task_id']
  207. sql = f'select * from crawler_task where task_id={task_id} order by update_time desc limit {start_count}, {end_count}'
  208. task_info = mysql_con.get_values(sql)[0]
  209. task_data = dict(
  210. task_id=task_info['task_id'],
  211. task_name=task_info['task_name'],
  212. source=task_info['source'],
  213. task_type=task_info['task_type'],
  214. create_task_user=task_info['create_task_user'],
  215. insert_time=task_info['insert_time'] * 1000,
  216. update_task_user=task_info['update_task_user'],
  217. update_time=task_info['update_time'] * 1000
  218. )
  219. task_list.append(task_data)
  220. return jsonify({'code': 200, 'data': task_list})
  221. sql = f"""select * from crawler_task order by update_time desc limit {start_count}, {end_count} """
  222. result = mysql_con.get_values(sql)
  223. if not result:
  224. return jsonify({'code': '200', 'result': [], 'message': '没有更多任务'})
  225. task_list = list()
  226. for task_info in result:
  227. task_data = dict(
  228. task_id=task_info['task_id'],
  229. task_name=task_info['task_name'],
  230. source=task_info['source'],
  231. task_type=task_info['task_type'],
  232. create_task_user=task_info['create_task_user'],
  233. insert_time=task_info['insert_time'] * 1000,
  234. update_task_user=task_info['update_task_user'],
  235. update_time=task_info['update_time'] * 1000
  236. )
  237. task_list.append(task_data)
  238. t_sql = f"""select count(*) from crawler_task"""
  239. t_res = mysql_con.get_values(t_sql)
  240. total = t_res[0]['count(*)']
  241. except Exception as e:
  242. return jsonify({"code": "400", 'message': "任务列表获取失败"})
  243. return jsonify({'code': '200', 'result': task_list, 'total': total})
  244. @app.route("/v1/crawler/task/getone", methods=["GET"])
  245. def getOneTask():
  246. try:
  247. get_data = request.args.to_dict()
  248. task_id = get_data['task_id']
  249. sql = f'select * from crawler_task where task_id={task_id}'
  250. result = mysql_con.get_values(sql)
  251. if not result:
  252. return jsonify({'code': '400', 'result': [], 'message': 'no data'})
  253. data = result[0]
  254. if data['min_publish_time']:
  255. data['min_publish_time'] = data['min_publish_time'] * 1000
  256. else:
  257. data['min_publish_time'] = 0
  258. data['next_time'] = data['next_time'] * 1000
  259. data['spider_link'] = eval(data['spider_link'])
  260. data['spider_rule'] = eval(data['spider_rule'])
  261. #
  262. data['user_tag_info'] = eval(data['user_tag_info'])
  263. data['content_tag_info'] = eval(data['content_tag_info'])
  264. except Exception as e:
  265. return jsonify({'code': '500', "message": "获取任务信息失败"})
  266. return jsonify({'code': '200', 'result': result})
  267. @app.route("/v1/crawler/task/update", methods=["POST"])
  268. def updateTask():
  269. try:
  270. data = request.json
  271. task_id = data.get('task_id')
  272. task_info = data.get('task_info')
  273. values = ''
  274. task_info['min_publish_time'] = task_info['min_publish_time'] / 1000
  275. task_info['next_time'] = task_info['next_time'] / 1000
  276. for k, v in task_info.items():
  277. if isinstance(v, int):
  278. values += f'{k}={v},'
  279. else:
  280. values += f'{k}="{v}",'
  281. sql = f'update crawler_task set {values[:-1]} where task_id={task_id}'
  282. result = mysql_con.update_values(sql)
  283. if result:
  284. return jsonify({'code': 200, 'message': 'task update success'})
  285. else:
  286. return jsonify({'code': 400, 'message': 'task update faild'})
  287. except Exception as e:
  288. return jsonify({'code': 400, 'message': '任务更新失败'})
  289. def create_uid(task, task_id):
  290. spider_link = task.get('spider_link')
  291. source = task.get('source')
  292. task_type = task.get('task_type')
  293. applets_status = task.get('applets_status')
  294. app_status = task.get('app_status')
  295. user_tag = task.get('user_tag')
  296. user_content_tag = task.get('user_content_tag')
  297. mode_name_id = task.get('mode_name_id', 0)
  298. mode_board_id = task.get('mode_board_id', 0)
  299. content_category_id = task.get('content_category_id', 0)
  300. mn_sql = f'select * from crawler_mode_name where id={mode_name_id}'
  301. mode_name_list = mysql_con.get_values(mn_sql)
  302. mb_sql = f'select * from crawler_mode_board where id={mode_board_id}'
  303. mode_board_list = mysql_con.get_values(mb_sql)
  304. cc_sql = f'select * from crawler_content_category where id={content_category_id}'
  305. content_category_list = mysql_con.get_values(cc_sql)
  306. if mode_name_list:
  307. task['mode_name_str'] = mode_name_list[0]['mode_name']
  308. else:
  309. task['mode_name_str'] = ''
  310. if mode_board_list:
  311. task['mode_board_str'] = mode_board_list[0]['mode_board']
  312. else:
  313. task['mode_board_str'] = ''
  314. if content_category_list:
  315. task['content_category_str'] = content_category_list[0]['content_category']
  316. else:
  317. task['content_category_str'] = ''
  318. success_list = list()
  319. fail_list = list()
  320. tag_name_list = list()
  321. content_tag_list = list()
  322. for tag in user_tag:
  323. tag_name_list.append(tag['tagName'])
  324. for tag in user_content_tag:
  325. content_tag_list.append(tag['tagName'])
  326. user_tags = ','.join(str(i) for i in tag_name_list)
  327. user_content_tags = ','.join(str(i) for i in content_tag_list)
  328. for author_url in spider_link:
  329. now_time = int(time.time())
  330. time_array = time.localtime(now_time)
  331. str_time = time.strftime("%Y-%m-%d", time_array)
  332. # 生成创建用户的tag
  333. tags = ""
  334. tags_list = ['spider', user_tags, task['mode_name_str'], task['mode_board_str'],
  335. task['content_category_str'], str_time]
  336. for v in tags_list:
  337. if v:
  338. tags += str(v) + ','
  339. post_data = {
  340. # 'count': 1, # (必须)账号个数:传1
  341. # 'accountType': 4, # (必须)账号类型 :传 4 app虚拟账号
  342. 'pwd': '', # 密码 默认 12346
  343. 'nickName': '', # 昵称 默认 vuser......
  344. 'avatarUrl': '',
  345. # 头像Url 默认 http://weapppiccdn.yishihui.com/resources/images/pic_normal.png
  346. 'tagName': tags[:-1], # 多条数据用英文逗号分割
  347. }
  348. try:
  349. response = requests.post(url=conf['media_url'], params=post_data)
  350. media_id = response.json()['data']
  351. except Exception as e:
  352. logging.warning(f'创建账户:{spider_link},失败,原因:{e}')
  353. fail_list.append(author_url)
  354. continue
  355. data = dict(
  356. spider_link=author_url,
  357. media_id=media_id,
  358. source=source,
  359. task_type=task_type,
  360. applets_status=applets_status,
  361. app_status=app_status,
  362. user_tag=user_tags,
  363. user_content_tag=user_content_tags,
  364. insert_time=int(time.time()),
  365. update_time=int(time.time()),
  366. create_user_time=now_time,
  367. mode_name_str=task['mode_name_str'],
  368. mode_board_str=task['mode_board_str'],
  369. content_category_str=task['content_category_str'],
  370. # mode_value_str=mode_value_str,
  371. task_id=task_id,
  372. media_main_url=conf['media_main_url'].format(media_id)
  373. )
  374. keys = ','.join(data.keys())
  375. values = ','.join(['%s'] * len(data))
  376. table = 'crawler_author_map'
  377. sql = f"""insert into {table}({keys}) VALUES({values})"""
  378. mysql_con.insert_values(sql, tuple(data.values()))
  379. uer_info = dict(
  380. outer_id=author_url,
  381. uid=media_id
  382. )
  383. success_list.append(uer_info)
  384. return success_list, fail_list
  385. @app.route("/v1/crawler/author/create", methods=["POST"])
  386. def createUser():
  387. spider_link = request.json.get('spider_link')
  388. source = request.json.get('source')
  389. task_type = request.json.get('task_type')
  390. applets_status = request.json.get('applets_status')
  391. app_status = request.json.get('app_status')
  392. user_tag = request.json.get('user_tag')
  393. user_content_tag = request.json.get('user_content_tag')
  394. success_list = list()
  395. fail_list = list()
  396. for author_url in spider_link:
  397. try:
  398. f_sql = f"""select spider_link from crawler_author_map where spider_link="{author_url}" """
  399. result = mysql_con.get_values(f_sql)
  400. if result:
  401. success_list.append(author_url)
  402. continue
  403. else:
  404. tag_name_list = []
  405. content_tag_list = []
  406. for tag in user_tag:
  407. tag_name_list.append(tag['tagName'])
  408. for tag in user_content_tag:
  409. content_tag_list.append(tag['tagName'])
  410. user_tags = ','.join(str(i) for i in tag_name_list)
  411. user_content_tags = ','.join(str(i) for i in content_tag_list)
  412. post_data = {
  413. # 'count': 1, # (必须)账号个数:传1
  414. # 'accountType': 4, # (必须)账号类型 :传 4 app虚拟账号
  415. 'pwd': '', # 密码 默认 12346
  416. 'nickName': '', # 昵称 默认 vuser......
  417. 'avatarUrl': '',
  418. # 头像Url 默认 http://weapppiccdn.yishihui.com/resources/images/pic_normal.png
  419. 'tagName': user_tags, # 多条数据用英文逗号分割
  420. }
  421. response = requests.post(url=conf['media_url'], params=post_data)
  422. media_id = response.json()['data']
  423. data = dict(
  424. spider_link=author_url,
  425. media_id=media_id,
  426. source=source,
  427. task_type=task_type,
  428. applets_status=applets_status,
  429. app_status=app_status,
  430. user_tag=user_tags,
  431. user_content_tag=user_content_tags,
  432. insert_time=int(time.time()),
  433. update_time=int(time.time())
  434. )
  435. keys = ','.join(data.keys())
  436. values = ','.join(['%s'] * len(data))
  437. table = 'crawler_author_map'
  438. sql = f"""insert into {table}({keys}) VALUES({values})"""
  439. result = mysql_con.insert_values(sql, tuple(data.values()))
  440. if not result:
  441. fail_list.append(author_url)
  442. else:
  443. success_list.append(author_url)
  444. except Exception as e:
  445. fail_list.append(author_url)
  446. continue
  447. return jsonify({'code': 200, 'result': {'success': success_list, 'fail': fail_list}})
  448. if __name__ == "__main__":
  449. app.run(debug=True, port=5050)