python

超轻量级php框架startmvc

Python对ElasticSearch获取数据及操作

更新时间:2020-07-01 05:00:01 作者:startmvc
使用Python对ElasticSearch获取数据及操作,供大家参考,具体内容如下VersionPython:2.7ElasticSearch

使用Python对ElasticSearch获取数据及操作,供大家参考,具体内容如下

Version

Python :2.7

ElasticSearch:6.3

代码:


#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
 @Time : 2018/7/4
 @Author : LiuXueWen
 @Site : 
 @File : ElasticSearchOperation.py
 @Software: PyCharm
 @Description: 对elasticsearch数据的操作,包括获取数据,发送数据
"""
import elasticsearch
import json

import Util_Ini_Operation

class elasticsearch_data():
 def __init__(self,hosts,username,password,maxsize,is_ssl):
 # 初始化ini操作脚本,获取配置文件
 try:
 # 判断请求方式是否ssl加密
 if is_ssl == "true":
 # 获取证书地址
 cert_pem = Util_Ini_Operation.get_ini("config.ini").get_key_value("certs","certs")
 es_ssl = elasticsearch.Elasticsearch(
 # 地址
 hosts=hosts,
 # 用户名密码
 http_auth=(username,password),
 # 开启ssl
 use_ssl=True,
 # 确认有加密证书
 verify_certs=True,
 # 对应的加密证书地址
 client_cert=cert_pem
 )
 self.es = es_ssl
 elif is_ssl == "false":
 # 创建普通类型的ES客户端
 es_ordinary = elasticsearch.Elasticsearch(hosts, http_auth=(username, password), maxsize=int(maxsize))
 self.es = es_ordinary
 except Exception as e:
 print(e)


 def query_data(self,keywords_list,date):
 gte = "now-"+str(date)
 query_data = {
 # 查询语句
 "query": {
 "bool": {
 "must": [
 {
 "query_string": {
 "query": keywords_list,
 "analyze_wildcard": True
 }
 },
 {
 "range": {
 "@timestamp": {
 "gte": gte,
 "lte": "now",
 "format": "epoch_millis"
 }
 }
 }
 ],
 "must_not": []
 }
 }
 }
 return query_data

 # 从es获取数据
 def get_datas_by_query(self,index_name,keywords,param,date):
 '''
 :param index_name: 索引名称
 :param keywords: 关键字词,数组
 :param param: 需要数据条件,例如_source
 :param date: 过去时间范围,字符串格式,例如过去30分钟内数据,"30m"
 :return: all_datas 返回查询到的所有数据(已经过param过滤)
 '''

 all_datas = []
 # 遍历所有的查询条件
 for keywords_list in keywords:
 # DSL语句
 query_data = self.query_data(keywords_list,date)
 res = self.es.search(
 index=index_name,
 body=query_data
 )
 for hit in res['hits']['hits']:
 # 获取指定的内容
 response = hit[param]
 # 添加所有数据到数据集中
 all_datas.append(response)
 # 返回所有数据内容
 return all_datas

 # 当索引不存在创建索引
 def create_index(self,index_name):
 '''
 :param index_name: 索引名称
 :return:如果创建成功返回创建结果信息,试过已经存在创建新的index失败返回index的名称
 '''
 # 获取索引的映射
 # index_mapping = IndexMapping.index_mapping
 # # 判断索引是否存在
 # if self.es.indices.exists(index=index_name) is not True:
 # # 创建索引
 # res = self.es.indices.create(index=index_name,body=index_mapping)
 # # 返回结果
 # return res
 # else:
 # # 返回索引名称
 # return index_name
 pass

 # 插入指定的单条数据内容
 def insert_single_data(self,index_name,doc_type,data):
 '''
 :param index_name: 索引名称
 :param doc_type: 文档类型
 :param data: 需要插入的数据内容
 :return: 执行结果
 '''
 res = self.es.index(index=index_name,doc_type=doc_type,body=data)
 return res

 # 向ES中新增数据,批量插入
 def insert_datas(self,index_name):
 '''
 :desc 通过读取指定的文件内容获取需要插入的数据集
 :param index_name: 索引名称
 :return: 插入成功的数据条数
 '''
 insert_datas = []
 # 判断插入数据的索引是否存在
 self.createIndex(index_name=index_name)
 # 获取插入数据的文件地址
 data_file_path = self.ini.get_key_value("datafile","datafilepath")
 # 获取需要插入的数据集
 with open(data_file_path,"r+") as data_file:
 # 获取文件所有数据
 data_lines = data_file.readlines()
 for data_line in data_lines:
 # string to json
 data_line = json.loads(data_line)
 insert_datas.append(data_line)
 # 批量处理
 res = self.es.bulk(index=index_name,body=insert_datas,raise_on_error=True)
 return res

 # 从ES中在指定的索引中删除指定数据(根据id判断)
 def delete_data_by_id(self,index_name,doc_type,id):
 '''
 :param index_name: 索引名称
 :param index_type: 文档类型
 :param id: 唯一标识id
 :return: 删除结果信息
 '''
 res = self.es.delete(index=index_name,doc_type=doc_type,id=id)
 return res

 # 根据条件删除数据
 def delete_data_by_query(self,index_name,doc_type,param,gt_time,lt_time):
 '''
 :param index_name:索引名称,为空查询所有索引
 :param doc_type:文档类型,为空查询所有文档类型
 :param param:过滤条件值
 :param gt_time:时间范围,大于该时间
 :param lt_time:时间范围,小于该时间
 :return:执行条件删除后的结果信息
 '''
 # DSL语句
 query_data = {
 # 查询语句
 "query": {
 "bool": {
 "must": [
 {
 "query_string": {
 "query": param,
 "analyze_wildcard": True
 }
 },
 {
 "range": {
 "@timestamp": {
 "gte": gt_time,
 "lte": lt_time,
 "format": "epoch_millis"
 }
 }
 }
 ],
 "must_not": []
 }
 }
 }
 res = self.es.delete_by_query(index=index_name,doc_type=doc_type,body=query_data,_source=True)
 return res

 # 指定index中删除指定时间段内的全部数据
 def delete_all_datas(self,index_name,doc_type,gt_time,lt_time):
 '''
 :param index_name:索引名称,为空查询所有索引
 :param doc_type:文档类型,为空查询所有文档类型
 :param gt_time:时间范围,大于该时间
 :param lt_time:时间范围,小于该时间
 :return:执行条件删除后的结果信息
 '''
 # DSL语句
 query_data = {
 # 查询语句
 "query": {
 "bool": {
 "must": [
 {
 "match_all": {}
 },
 {
 "range": {
 "@timestamp": {
 "gte": gt_time,
 "lte": lt_time,
 "format": "epoch_millis"
 }
 }
 }
 ],
 "must_not": []
 }
 }
 }
 res = self.es.delete_by_query(index=index_name, doc_type=doc_type, body=query_data, _source=True)
 return res

 # 修改ES中指定的数据
 def update_data_by_id(self,index_name,doc_type,id,data):
 '''
 :param index_name: 索引名称
 :param doc_type: 文档类型,为空表示所有类型
 :param id: 文档唯一标识编号
 :param data: 更新的数据
 :return: 更新结果信息
 '''
 res = self.es.update(index=index_name,doc_type=doc_type,id=id,body=data)
 return res

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

Python ElasticSearch 数据