es_api.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. import ssl
  2. import datetime
  3. import time
  4. import requests
  5. import json
  6. from pymysql.cursors import DictCursor
  7. from elasticsearch import Elasticsearch, ApiError
  8. from elasticsearch import helpers
  9. from applications.db import DatabaseConnector
  10. from config import long_articles_config
  11. from config.es_mappings import index_name, mappings, settings
  12. db_client = DatabaseConnector(long_articles_config)
  13. db_client.connect()
  14. ctx = ssl.create_default_context(cafile="es_certs.crt")
  15. es_password = 'nkvvASQuQ0XUGRq5OLvm'
  16. es = Elasticsearch(
  17. "https://192.168.205.85:9200",
  18. # 如果启用了用户名密码
  19. basic_auth=("elastic", es_password),
  20. ssl_context=ctx
  21. )
  22. def create_index():
  23. # 2. 防守式删除旧索引(可选)
  24. if es.indices.exists(index=index_name):
  25. es.indices.delete(index=index_name)
  26. # 3. 创建索引
  27. try:
  28. es.indices.create(
  29. index=index_name,
  30. settings=settings,
  31. mappings=mappings
  32. )
  33. print(f"✅ Index <{index_name}> created.")
  34. except ApiError as e:
  35. # 打印 ES 返回的具体错误
  36. print(f"❌ Failed: {e.meta.error['type']} – {e.meta.error['reason']}")
  37. def get_max_article_id():
  38. response = es.search(
  39. index=index_name,
  40. size=1,
  41. sort="article_id:desc",
  42. _source=["article_id"]
  43. )
  44. # print(json.dumps(response.body, indent=4))
  45. return response['hits']['hits'][0]['_source']['article_id']
  46. def get_articles(id_):
  47. fetch_query = f"""
  48. select article_id, platform, out_account_id, title
  49. from crawler_meta_article
  50. where status = 1 and article_id > %s
  51. order by article_id limit 10000;
  52. """
  53. # 执行查询
  54. results = db_client.fetch(fetch_query, cursor_type=DictCursor, params=(id_,))
  55. docs = [
  56. {
  57. "_index": index_name,
  58. "_id": item['article_id'],
  59. "_source": {
  60. "article_id": item['article_id'],
  61. "platform": item['platform'],
  62. "out_account_id": item['out_account_id'],
  63. "title": item['title']
  64. }
  65. } for item in results
  66. ]
  67. return docs
  68. def search():
  69. query = {
  70. "query": {
  71. "match": {
  72. "title": "刘伯承元帅"
  73. }
  74. },
  75. "_source": ["article_id", "title"]
  76. }
  77. a = time.time()
  78. resp = es.search(index=index_name, body=query)
  79. b = time.time()
  80. for hit in resp["hits"]["hits"]:
  81. print(hit["_source"])
  82. print(b - a)
  83. def get_cluster_docs_stats():
  84. """获取集群文档统计信息"""
  85. stats = es.nodes.stats()
  86. # print(stats)
  87. # print(type(stats))
  88. print(json.dumps(stats.body, indent=4, ensure_ascii=False))
  89. if __name__ == "__main__":
  90. max_id = get_max_article_id()
  91. i = 0
  92. while int(max_id) < 27492350:
  93. articles = get_articles(max_id)
  94. res = helpers.bulk(es, articles)
  95. print(es.count(index=index_name))
  96. max_id = get_max_article_id()
  97. i += 1
  98. print(i)