Python 批量插入ES

摘要:
“Xx”}])print(e.info())#添加timestamp_now=int(time.time())time_Local=time。localtime(time_now)timestamp=时间。strftime(“%Y-%m-%d%H:time_t)print(time_format)ip_ports=[]#从mascan.json提取ip:

  使用Python批量插入数据到ES中,如果是一条条插入,会发现效率很低,这时需要使用ES的批量插入bulk的功能。

  以下示例代码,是将masscan输出的结果文件,抽取ip,port,和时间戳,插入到es中的。

#!/usr/bin/python
# coding=utf-8

import json
import time
from elasticsearch import Elasticsearch
from elasticsearch import helpers
import ssl

es = Elasticsearch(
    [{"host": "xx.xx.xx.xx", "port": "xx"}])

print(es.info())


# 添加timestamp
time_now = int(time.time())
time_local = time.localtime(time_now)
timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time_local)
date_t, time_t = timestamp.split(' ')
time_format = '{}T{}.000Z'.format(date_t, time_t)
print(time_format)


ip_ports = []
# 提取 masscan.json 中的 ip:port 信息


def handle_masscan(target):
    index = 0
    with open(target, 'r') as f:
        for line in f:
            index += 1
            if line.startswith('{ '):
                temp = json.loads(line[:-2])
                ip = str(temp["ip"]).strip()
                port = str(temp["ports"][0]["port"]).strip()
                ip_port = [ip, port]
                ip_ports.append(ip_port)


def timer(func):
    def wrapper(*args, **kwargs):
        start = time.time()
        res = func(*args, **kwargs)
        print('共耗时约 {:.2f} 秒'.format(time.time() - start))
        return res

    return wrapper

@timer
def gen():
    actions = []
    for line in ip_ports:
        # 拼接插入数据结构
        action = {
            "_index": "server_port_info_2020_q4",
            "_type": "doc",
            "_source": {
                "ip": line[0],
                "port": line[1],
                "@timestamp": time_format,
            }
        }
        actions.append(action)
    g(es, actions)


if __name__ == '__main__':
    target = '../port_info_2_es/masscan.json'
    handle_masscan(target)
    gen()
    pass

参考:

  Elasticsearch - 使用Python批量写入数据:

    https://www.cnblogs.com/Neeo/articles/10788573.html

  使用Python-elasticsearch-bulk批量快速向elasticsearch插入数据:
    https://blog.csdn.net/weixin_39198406/article/details/82983256

  Bulk helpers:

    https://elasticsearch-py.readthedocs.io/en/7.10.0/helpers.html

免责声明:文章转载自《Python 批量插入ES》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇安装配置管理 之 NVIDIA nForce Linux Drivers 集成声网卡和声卡的安装说明python+requests接口自动化--请求方法封装下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

springboot---redis

<!--redis依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependen...

博客迁移到亚马逊云端1

将我的博客迁移到亚马逊云端(1)  Octopress已经被公认为Geeker的博客框架。它所拥有的特性都很符合Geeker的癖好:强大的命令行操作方式、简洁的MarkDown语法、灵活的插件配置、美轮美奂的theme(自带响应式设计哦)、完全可定义的部署…… 一般大家都喜欢把博客部署到github pages上,免费速度快,与Octopress无缝...

Solr搜索引擎入门知识汇总

1.技术选型,为什么用solr而不用lucene,或者其他检索工具 lucene:需要开发者自己维护索引文件,在多机环境中备份同步索引文件很是麻烦 Lucene本质上是搜索库,不是独立的应用程序。而Solr是。 Lucene专注于搜索底层的建设,而Solr专注于企业应用。 Lucene不负责支撑搜索服务所必须的管理,而Solr负责。 一句话概括Solr:...

Systemd自定义开机启动服务(转载)

一、开机启动 对于那些支持 Systemd 的软件,安装的时候,会自动在/usr/lib/systemd/system目录添加一个配置文件。 如果你想让该软件开机启动,就执行下面的命令(以httpd.service为例)。 $ sudo systemctl enable httpd 上面的命令相当于在/etc/systemd/system目录添加一个符号链...

vue+element-ui el-table表格(含表头)内容溢出省略,鼠标悬浮提示

第一种:参考:https://my.oschina.net/u/3455362/blog/4674804 <template> <div class="test"> <el-table :data="gridData" border stripe style=" 100%"> &...

JQuery表格操作的常用技巧总结

JQuery对表格进行操作的常用技巧。 1、表格奇数行和偶数行分别添加样式  复制代码代码如下: $(function(){  $('tr:odd').addClass("odd");  $('tr:even').addClass("even");  });  不算表的头部  复制代码代码如下: $(function(){  $('tbody>...