Elastic Stack

来自tomtalk
跳转至: 导航搜索

通用

  • 安装java8,java9不兼容。
  • ElasticSearch、Kinaba在config里默认是用localhost,指定IP,否则用IP无法访问。

Beats

Logstash

Hello World

# bin/logstash -e 'input{stdin{}}output{stdout{codec=>rubydebug}}'
 
{
    "@timestamp" => 2017-05-03T05:34:41.343Z,
      "@version" => "1",
          "host" => "elk",
       "message" => "hello world"
}

Logstash会给事件添加一些额外信息。最重要的就是 @timestamp,用来标记事件的发生时间。

此外,大多数时候,还可以见到另外几个:

  • host 标记事件发生在哪里。
  • type 标记事件的唯一类型。
  • tags 标记事件的某方面属性。这是一个数组,一个事件可以有多个标签。

每个logstash过滤插件,都会有四个方法:

  1. add_tag
  2. remove_tag
  3. add_field
  4. remove_field

que360

input { 
    file { 
        path => ["/var/log/apache2/*.log"] 
        type => "system" 
        start_position => "beginning" 
    } 
 
    beats { 
      host => "192.168.10.11" 
      port => 5044 
    } 
} 
 
filter { 
    grok { 
        match => {
                "message" => "%{IP:remove_addr} - (%{USERNAME:remote_user}|-) \[%{HTTPDATE:time_local}\] (%{HOSTNAME:http_host}|-) \"(?:%{WORD:verb} (?<request>[^\?]*)(?:\?%{NOTSPACE:args})? (HTTP/%{NUMBER:http_version})?|-)\" %{NUMBER:status:int} %{NUMBER:body_bytes_sent:int} %{NUMBER:request_time:float} \"(?<http_referer_host>(http://|https://)?[A-Za-z0-9:\-\.]*)(?<http_referer_uri>.*)\" \"(%{GREEDYDATA:http_user_agent}|-)\""
                "remove_field" => ["message", "tags", "beat", "body_bytes_sent", "verb", "http_referer_uri", "offset", "input_type", "args", "remote_user", "remove_addr", "http_version", "source", "http_user_agent", "@version", "host"]
            }
    } 
 
    date { 
        match => ["time_local", "dd/MMM/yyyy:hh:mm:ss Z"] 
        locale => "en" 
    } 
}                    
 
output { 
    elasticsearch { 
        hosts => ["192.168.10.11:9200"] 
        user => "elastic"
        password => "changeme"
        index => "%{type}-%{+YYYY.MM.dd}" 
        document_type => "%{type}" 
        flush_size => 200 
        idle_flush_time => 1 
        sniffing => true 
        template_overwrite => true 
    } 
}

配置语法

Logstash社区通常习惯用shipper,broker和indexer来描述数据流中不同进程各自的角色。

不过我见过很多运用场景里都没有用logstash作为shipper,或者说没有用elasticsearch作为数据存储也就是说也没有indexer。所以,我们其实不需要这些概念。只需要学好怎么使用和配置logstash进程,然后把它运用到你的日志管理架构中最合适它的位置就够了。

./logstash -e 'input{stdin{}}output{stdout{}}' 
 
2017-05-03T06:15:51.690Z elk hello world
 
./logstash -e ''  
 
{
    "@timestamp" => 2017-05-03T06:16:38.373Z,
      "@version" => "1",
          "host" => "elk",
       "message" => "hello world",
          "type" => "stdin"
}


命令行参数
bin/logstash -e ''               #-e 意即执行。
bin/logstash -f agent.conf       #-f 意即文件。
bin/logstash -f /etc/logstash.d/ #读取目录下所有文本文件,然后在自己内存里拼接成一个完整的大配置文件,再去执行。

logstash列出目录下所有文件时,是字母排序的。而logstash配置段的filter和output都是顺序执行,所以顺序非常重要。采用多文件管理的用户,推荐采用数字编号方式命名配置文件。


设置文件

从Logstash 5.0开始,新增了$LS_HOME/config/logstash.yml文件,可以将所有的命令行参数都通过YAML文件方式设置。同时为了反映命令行配置参数的层级关系,参数也都改成用.而不是-了。

pipeline:
    workers: 24
    batch:
        size: 125
        delay: 5

plugin的安装

从logstash 1.5.0版本开始,logstash将所有的插件都独立拆分成gem包。这样,每个插件都可以独立更新,不用等待logstash自身做整体更新的时候才能使用了。

为了达到这个目标,logstash配置了专门的plugins管理命令。

bin/logstash-plugin list                                       查看本机现在有多少插件可用
bin/logstash-plugin install logstash-output-webhdfs            安装插件
bin/logstash-plugin update logstash-input-tcp                  更新插件
bin/logstash-plugin install /path/to/logstash-filter-crash.gem 安装本地插件

长期运行

  • 标准的service方式 : service logstash start
  • 最基础的nohup方式 : nohup command &> /dev/null
  • 更优雅的SCREEN方式
  • 最推荐的daemontools方式

插件配置

input

请记住一个原则:Logstash配置一定要有一个input和一个output。在演示过程中,如果没有写明input,默认就会使用"hello world"里我们已经演示过的input/stdin ,同理,没有写明的output就是output/stdout。


标准输入(Stdin)

我们已经见过好几个示例使用 stdin 了。这也应该是 logstash 里最简单和基础的插件了。 所以,在这段中,我们可以学到一些未来每个插件都会有的一些方法。 配置示例

input {

   stdin {
       add_field => {"key" => "value"}
       codec => "plain"
       tags => ["add"]
       type => "std"
   }

}

output {

   stdout {
       codec=>rubydebug
   }

}

{

   "@timestamp" => 2017-05-03T07:21:24.631Z,
     "@version" => "1",
         "host" => "elk",
      "message" => "hello world",
         "type" => "std",
          "key" => "value",
         "tags" => [
       [0] "add"
   ]

}


读取文件(File)

input {

   file {
       path => ["/var/log/apache2/*.log"]
       type => "system"
       start_position => "beginning"
   }

}

output {

   stdout {
       codec=>rubydebug
   }

}

{

         "path" => "/var/log/apache2/access.log",
   "@timestamp" => 2017-05-03T07:28:04.825Z,
     "@version" => "1",
         "host" => "elk",
      "message" => "192.168.10.1 - - [03/May/2017:15:28:04 +0800] \"GET / HTTP/1.1\" 200 3524 \"-\" \"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/56.0.2924.87 Safari/537.36\"",
         "type" => "system"

}


解释
discover_interval #logstash每隔多久去检查一次被监听的path下是否有新文件。默认值是15秒。
exclude           #不想被监听的文件可以排除出去。
close_older       #一个已经监听中的文件,如果超过这个值的时间内没有更新内容,就关闭监听它的文件句柄。默认是3600秒,即一小时。
ignore_older      #在每次检查文件列表的时候,如果一个文件的最后修改时间超过这个值,就忽略这个文件。默认是86400秒,即一天。
stat_interval     #logstash每隔多久检查一次被监听文件状态(是否有更新),默认是1秒。
start_position    #logstash从什么位置开始读取文件数据,默认是结束位置。如果你是要导入原有数据,把这个设定改成 "beginning",logstash 进程就从头开始读取。


注意
  • 通常你要导入原有数据进Elasticsearch的话,你还需要filter/date插件来修改默认的"@timestamp" 字段值。
  • FileWatch只支持文件的绝对路径,而且会不自动递归目录。所以有需要的话,请用数组方式都写明具体哪些文件。
  • 因为windows平台上没有inode的概念,Logstash某些版本在windows平台上监听文件不是很靠谱。windows平台上,推荐考虑使用nxlog作为收集端。

codec

Codec 是 logstash 从 1.3.0 版开始新引入的概念(Codec 来自 Coder/decoder 两个单词的首字母缩写)。

在此之前,logstash 只支持纯文本形式输入,然后以过滤器处理它。但现在,我们可以在输入 期处理不同类型的数据,这全是因为有了 codec 设置。

所以,这里需要纠正之前的一个概念。Logstash 不只是一个input | filter | output 的数据流,而是一个 input | decode | filter | encode | output 的数据流!codec 就是用来 decode、encode 事件的。

codec 的引入,使得 logstash 可以更好更方便的与其他有自定义数据格式的运维产品共存,比如 graphite、fluent、netflow、collectd,以及使用 msgpack、json、edn 等通用数据格式的其他产品等。

事实上,我们在第一个 "hello world" 用例中就已经用过 codec 了 —— rubydebug 就是一种 codec!虽然它一般只会用在 stdout 插件中,作为配置测试或者调试的工具。


采用 JSON 编码

在早期的版本中,有一种降低 logstash 过滤器的 CPU 负载消耗的做法盛行于社区(在当时的 cookbook 上有专门的一节介绍):直接输入预定义好的 JSON 数据,这样就可以省略掉 filter/grok 配置!+

这个建议依然有效,不过在当前版本中需要稍微做一点配置变动 —— 因为现在有专门的 codec 设置。


合并多行数据(Multiline)

有些时候,应用程序调试日志会包含非常丰富的内容,为一个事件打印出很多行内容。这种日志通常都很难通过命令行解析的方式做分析。

而 logstash 正为此准备好了 codec/multiline 插件!

其实这个插件的原理很简单,就是把当前行的数据添加到前面一行后面,,直到新进的当前行匹配 ^\[ 正则为止。


collectd简述

collectd 是一个守护(daemon)进程,用来收集系统性能和提供各种存储方式来存储不同值的机制。它会在系统运行和存储信息时周期性的统计系统的相关统计信息。利用这些信息有助于查找当前系统性能瓶颈(如作为性能分析 performance analysis)和预测系统未来的 load(如能力部署capacity planning)等。

filter

丰富的过滤器插件的存在是 logstash 威力如此强大的重要因素。名为过滤器,其实提供的不单单是过滤的功能。在本章我们就会重点介绍几个插件,它们扩展了进入过滤器的原始数据,进行复杂的逻辑处理,甚至可以无中生有的添加新的 logstash 事件到后续的流程中去!

每个 logstash 过滤插件,都会有四个方法叫 add_tag, remove_tag, add_field 和 remove_field。它们在插件过滤匹配成功时生效。

时间处理(Date)

之前章节已经提过,filters/date 插件可以用来转换你的日志记录中的时间字符串,变成 LogStash::Timestamp 对象,然后转存到 @timestamp 字段里。

filter {
    grok {
        match => ["message", "%{HTTPDATE:logdate}"]
    }
    date {
        match => ["logdate", "dd/MMM/yyyy:HH:mm:ss Z"]
    }
}


Grok 正则捕获

Grok是Logstash最重要的插件。你可以在grok里预定义好命名正则表达式,在稍后(grok参数或者其他正则表达式里)引用它。

filter {
    grok {
        match => {
            "message" => "%{WORD} %{NUMBER:request_time:float} %{WORD}"
        }
    }
}
 
{
         "message" => "begin 123.456 end",
        "@version" => "1",
      "@timestamp" => "2014-08-09T12:23:36.634Z",
            "host" => "raochenlindeMacBook-Air.local",
    "request_time" => 123.456
}


dissect

grok作为Logstash最广为人知的插件,在性能和资源损耗方面同样也广为诟病。为了应对这个情况,同时也考虑到大多数时候,日志格式并没有那么复杂,Logstash开发团队在5.0版新添加了另一个解析字段的插件:dissect。

当日志格式有比较简明的分隔标志位,而且重复性较大的时候,我们可以使用dissect插件更快的完成解析工作。

比如对http://rizhiyi.com/index.do?id=123写这么一段配置:

http://%{domain}/%{?url}?%{?arg1}=%{&arg1}

则最终生成的Event内容是这样的:

{
  domain => "rizhiyi.com",
  id => "123"
}


GeoIP 地址查询归类

GeoIP 是最常见的免费 IP 地址归类查询库,同时也有收费版可以采购。GeoIP 库可以根据 IP 地址提供对应的地域信息,包括国别,省市,经纬度等,对于可视化地图和区域统计非常有用。


JSON编解码

在上一章,已经讲过在codec中使用JSON编码。但是,有些日志可能是一种复合的数据结构,其中只是一部分记录是JSON格式的。这时候,我们依然需要在filter阶段,单独启用JSON解码插件。

filter {
    json {
        source => "message"
        target => "jsoncontent"
    }
}
 
{
    "@version": "1",
    "@timestamp": "2014-11-18T08:11:33.000Z",
    "host": "web121.mweibo.tc.sinanode.com",
    "message": "{\"uid\":3081609001,\"type\":\"signal\"}",
    "jsoncontent": {
        "uid": 3081609001,
        "type": "signal"
    }
}

如果不打算使用多层结构的话,删掉 target 配置即可。新的结果如下:

{
    "@version": "1",
    "@timestamp": "2014-11-18T08:11:33.000Z",
    "host": "web121.mweibo.tc.sinanode.com",
    "message": "{\"uid\":3081609001,\"type\":\"signal\"}",
    "uid": 3081609001,
    "type": "signal"
}


Key-Value 切分

在很多情况下,日志内容本身都是一个类似于key-value的格式,但是格式具体的样式却是多种多样的。logstash提供filters/kv插件,帮助处理不同样式的key-value日志。


数值统计(Metrics)

filters/metrics 插件是使用 Ruby 的 Metriks 模块来实现在内存里实时的计数和采样分析。该模块支持两个类型的数值分析:meter 和 timer。


数据修改(Mutate)

filters/mutate插件是Logstash另一个重要插件。它提供了丰富的基础类型数据处理能力。

  • 类型转换
  • 字符串处理
  • 字段处理


随心所欲的 Ruby 处理

如果你稍微懂那么一点点Ruby语法的话,filters/ruby插件将会是一个非常有用的工具。

比如你需要稍微修改一下LogStash::Event对象,但是又不打算为此写一个完整的插件,用filters/ruby插件绝对感觉良好。


split 拆分事件

上一章我们通过multiline插件将多行数据合并进一个事件里,那么反过来,也可以把一行数据,拆分成多个事件。这就是split插件。

filter {
    split {
        field => "message"
        terminator => "#"
    }
}

这个测试中,我们在 intputs/stdin的终端中输入一行数据:"test1#test2",结果看到输出两个事件:

{
    "@version": "1",
    "@timestamp": "2014-11-18T08:11:33.000Z",
    "host": "web121.mweibo.tc.sinanode.com",
    "message": "test1"
}
{
    "@version": "1",
    "@timestamp": "2014-11-18T08:11:33.000Z",
    "host": "web121.mweibo.tc.sinanode.com",
    "message": "test2"
}

output

保存进Elasticsearch

Logstash可以使用不同的协议实现完成将数据写入Elasticsearch的工作。在不同时期,也有不同的插件实现方式。本节以最新版为准,即主要介绍HTTP方式。

output {
    elasticsearch {
        hosts => ["192.168.0.2:9200"]
        index => "logstash-%{type}-%{+YYYY.MM.dd}"
        document_type => "%{type}"
        flush_size => 20000
        idle_flush_time => 10
        sniffing => true
        template_overwrite => true
    }
}

Logstash会努力攒到20000条数据一次性发送出去,但是如果10秒钟内也没攒够20000条,Logstash还是会以当前攒到的数据量发一次。

默认情况下,flush_size是500条,idle_flush_time是1秒。


发送邮件(Email)
output {
    email {
        port           =>    "25"
        address        =>    "smtp.126.com"
        username       =>    "test@126.com"
        password       =>    ""
        authentication =>    "plain"
        use_tls        =>    true
        from           =>    "test@126.com"
        subject        =>    "Warning: %{title}"
        to             =>    "test@qq.com"
        via            =>    "smtp"
        body           =>    "%{message}"
    }
}


调用命令执行(Exec)

outputs/exec插件的运用也非常简单,如下所示,将logstash切割成的内容作为参数传递给命令。这样,在每个事件到达该插件的时候,都会触发这个命令的执行。

output {
    exec {
        command => "sendsms.pl \"%{message}\" -t %{user}"
    }
}

需要注意的是。这种方式是每次都重新开始执行一次命令并退出。本身是比较慢速的处理方式(程序加载,网络建联等都有一定的时间消耗)。最好只用于少量的信息处理场景,比如不适用nagios的其他报警方式。示例就是通过短信发送消息。


保存成文件(File)

通过日志收集系统将分散在数百台服务器上的数据集中存储在某中心服务器上,这是运维最原始的需求。

output {
    file {
        path => "/path/to/%{+yyyy}/%{+MM}/%{+dd}/%{host}.log.gz"
        message_format => "%{message}"
        gzip => true
    }
}
报警到Nagios

Logstash中有两个output插件是nagios有关的。outputs/nagios插件发送数据给本机的nagios.cmd管道命令文件,outputs/nagios_nsca插件则是调用send_nsca命令以NSCA协议格式把数据发送给nagios服务器(远端或者本地皆可)。


输出到Statsd

Statsd最早是2008年Flickr公司用Perl写的针对Graphite、datadog等监控数据后端存储开发的前端网络应用,2011年Etsy公司用node.js重构。用于接收(默认UDP)、写入、读取和聚合时间序列数据,包括即时值和累积值等。 Graphite是用Python模仿RRDtools写的时间序列数据库套件。包括三个部分:

  • carbon:是一个Twisted守护进程,监听处理数据;
  • whisper:存储时间序列的数据库;
  • webapp:一个用Django框架实现的网页应用。


标准输出(Stdout)

和之前inputs/stdin插件一样,outputs/stdout插件也是最基础和简单的输出插件。同样在这里简单介绍一下,作为输出插件的一个共性了解。

output {
    stdout {
        codec => rubydebug {
        }
        workers => 2
    }
}

可能除了codecs/multiline,其他codec插件本身并没有太多的设置项。所以一般省略掉后面的配置区段。


发送网络数据(TCP)

虽然之前我们已经提到过不建议直接使用LogStash::Inputs::TCP和LogStash::Outputs::TCP做转发工作,不过在实际交流中,发现确实有不少朋友觉得这种简单配置足够使用,因而不愿意多加一层消息队列的。所以,还是把Logstash如何直接发送TCP数据也稍微提点一下。

output {
    tcp {
        host  => "192.168.0.2"
        port  => 8888
        codec => json_lines
    }
}

性能与测试

任何软件都需要掌握其性能瓶颈,以及线上运行时的性能状态。Logstash也不例外。

长久以来,Logstash在这方面一直处于比较黑盒的状态。因为其内部队列使用的是标准的stud库,并非自己实现,在Logstash本身源代码里是找不出来什么问题的。我们只能按照其pipeline原理,总结出来一些模拟检测的手段。

在Logstash-5.0.0中,一大改进就是学习Elasticsearch的方式,通过API提供了一部分运行性能指标!本节就会介绍这方面的内容。同时,作为极限压测的方式,依然会介绍一些模拟数据的生成和JVM指标观测方法。

扩展方案

之前章节中,讲述的都是单个logstash进程,如何配置实现对数据的读取、解析和输出处理。但是在生产环境中,从每台应用服务器运行logstash进程并将数据直接发送到Elasticsearch里,显然不是第一选择:第一,过多的客户端连接对Elasticsearch是一种额外的压力;第二,网络抖动会影响到logstash进程,进而影响生产应用;第三,运维人员未必愿意在生产服务器上部署Java,或者让logstash跟业务代码争夺Java资源。

所以,在实际运用中,logstash进程会被分为两个不同的角色。运行在应用服务器上的,尽量减轻运行压力,只做读取和转发,这个角色叫做shipper;运行在独立服务器上,完成数据解析处理,负责写入Elasticsearch的角色,叫indexer。


利用Redis队列扩展logstash

input {
    redis {
        data_type => "pattern_channel"
        key => "logstash-*"
        host => "192.168.0.2"
        port => 6379
        threads => 5
    }
}

通过kafka传输

Kafka是一个高吞吐量的分布式发布订阅日志服务,具有高可用、高性能、分布式、高扩展、持久性等特性。目前已经在各大公司中广泛使用。和之前采用Redis做轻量级消息队列不同,Kafka利用磁盘作队列,所以也就无所谓消息缓冲时的磁盘问题。此外,如果公司内部已有Kafka服务在运行,logstash也可以快速接入,免去重复建设的麻烦。

如果打算新建Kafka系统的,请参考Kafka官方入门文档:http://kafka.apache.org/documentation.html#quickstart

Rsyslog

Rsyslog是RHEL6开始的默认系统syslog应用软件(当然,RHEL自带的版本较低,实际官方稳定版本已经到v8了)。官网地址:http://www.rsyslog.com

rsyslog是一个开源工具,被广泛用于Linux系统以通过TCP/UDP协议转发或接收日志消息。rsyslog守护进程可以被配置成两种环境,一种是配置成日志收集服务器,rsyslog进程可以从网络中收集其它主机上的日志数据,这些主机会将日志配置为发送到另外的远程服务器。rsyslog的另外一个用法,就是可以配置为客户端,用来过滤和发送内部日志消息到本地文件夹(如/var/log)或一台可以路由到的远程rsyslog服务器上。

rsyslog负责写入日志,logrotate负责备份和删除旧日志,以及更新日志文件。

Input模块包括imklg、imsock、imfile、imtcp等,是消息来源。

Filetr模块处理消息的分析和过滤,rsyslog可以根据消息的任何部分进行过滤,后面会介绍到具体的做法。

Output模块包括omfile、omprog、omtcp、ommysql等。是消息的目的地。

Queue模块负责消息的存储,从Input传入的未经过滤的消息放在主队列中,过滤后的消息放入到不同action queue中,再由action queue送到各个输出模块。

nxlog

nxlog是用C语言写的一个跨平台日志收集处理软件。其内部支持使用Perl正则和语法来进行数据结构化和逻辑判断操作。不过,其最常用的场景。是在windows服务器上,作为logstash的替代品运行。

nxlog的windows安装文件下载url见:http://nxlog.org/system/files/products/files/1/nxlog-ce-2.8.1248.msi

Heka

heka是Mozilla公司仿造logstash设计,用Golang重写的一个开源项目。同样采用了input->decoder->filter->encoder->output的流程概念。其特点在于,在中间的decoder/filter/encoder部分,设计了sandbox概念,可以采用内嵌lua脚本做这一部分的工作,降低了全程使用静态Golang编写的难度。此外,其filter阶段还提供了一些监控和统计报警功能。

官网地址见:http://hekad.readthedocs.org/

Mozilla员工已经在2016年中宣布放弃对heka项目的维护,但是社区依然坚持推动了部分代码更新和新版发布。所以本书继续保留heka的使用介绍。

经测试,其处理性能跟开启了多线程filters的logstash进程类似,都在每秒30000条。

fluentd

Fluentd是另一个Ruby语言编写的日志收集系统。和Logstash不同的是,Fluentd是基于MRI实现的,并不是利用多线程,而是利用事件驱动。

Fluentd的开发和使用者,大多集中在日本。

Elastic Search

Elasticsearch来源于作者Shay Banon的第一个开源项目Compass库,而这个Java库最初的目的只是为了给Shay当时正在学厨师的妻子做一个菜谱的搜索引擎。2010年,Elasticsearch正式发布。至今已经成为GitHub上最流行的Java项目,不过Shay承诺给妻子的菜谱搜索依然没有面世……

2015年初,Elasticsearch公司召开了第一次全球用户大会Elastic{ON}15。诸多IT巨头纷纷赞助,参会,演讲。会后,Elasticsearch公司宣布改名Elastic,公司官网也变成http://elastic.co/。这意味着Elasticsearch的发展方向,不再限于搜索业务,也就是说,Elastic Stack等机器数据和IT服务领域成为官方更加注意的方向。随后几个月,专注监控报警的Watcher发布beta版,社区有名的网络抓包工具Packetbeat、多年专注于基于机器学习的异常探测Prelert等ITOA周边产品纷纷被Elastic公司收购。

增删改查

GET /_cat/indices
GET /fgo/servant/1
GET /fgo/servant/1/_source
GET /fgo/servant/1?_source_include=name,class
GET /fgo/servant/1/_source?_source_include=name,class
 
POST /fgo/servant/
{
    "name": "tom",
    "class": "human",
    "description": "one people"
}
 
POST /fgo/servant/1
{
    "name": "tom",
    "class": "human",
    "description": "one people"
}
 
POST http://127.0.0.1:9200/logstash-2015.06.21/testlog/AU4ew3h2nBE6n0qcyVJK 
{
    "date" : "1434966686000",
    "user" : "chenlin7",
    "mesg" " "first message into Elasticsearch but version 2"
}
 
POST http://127.0.0.1:9200/logstash-2015.06.21/testlog/AU4ew3h2nBE6n0qcyVJK/_update 
{
    "doc" : {
        "user" : "someone"
    }
}
 
DELETE http://127.0.0.1:9200/logstash-2015.06.21/testlog/AU4ew3h2nBE6n0qcyVJK
DELETE http://127.0.0.1:9200/logstash-2015.06.0*
 
根据查询删除
POST /access-log-2017.06.29/_delete_by_query?conflicts=proceed
{
  "query": {
    "match": {
      "tags": {
        "query": "_grokparsefailure"
      }
    }
  }
}

组合多查询

POST http://192.168.1.242:9200/ucpin_linkedin/cv_parse/_search
{
  "query": {
    "bool": {
      "must": {
        "match_all": {}
      }
    }
  },
  "_source": [
    "linkedin_recruiter_url"
  ],
  "size": 200
}

搜索请求

Make elasticsearch only return certain fields?

{
    "_source": ["user", "message", ...],
    "query": ...,
    "size": ...
}

全文搜索

querystring语法

GET /fgo/_search?q=elle

完整语法

GET /fgo/_search
{
  "query": {
    "match": {
      "name": "不"
    }
  }
}

query_string查询

GET logstash-nginxacclog-2017.06.21/_search
{
  "size": 100, 
  "_source": ["request","http_host"],
  "query": {
    "query_string": {
      "query":"+http_host:local.admin.com +status:200"
    }
  }
}

使用聚合的查询

GET logstash-nginxacclog-2017.06.21/_search
{
  "size": 0,
  "_source": [
    "request",
    "http_host"
  ],
  "query": {
    "query_string": {
      "query": "+http_host:local.sms.com +status:200"
    }
  },
  "aggs": {
    "avg_request_time": {
      "terms": {
        "field": "request.keyword",
        "size": 2
      }
    }
  }
}

聚合请求

GET /logstash-nginxacclog-2017.05.19/_search
{
  "size": 0,
  "aggs": {
    "status": {
      "stats": {
        "field": "status"
      }
    }
  }
}
 
GET /logstash-nginxacclog-2017.05.19/_search
{
  "size": 0,
  "aggs": {
    "http_host": {
      "terms": {
        "field": "http_host.keyword"
      }
    },
    "request": {
      "terms": {
        "field": "request.keyword"
      }
    }
  }
}
 
GET /logstash-nginxacclog-2017.05.19/_search
{
  "size": 0, 
    "aggs" : {
        "request" : {
            "terms" : { 
              "field" : "request.keyword" ,
              "size": 150
            }
        }
    }
}
 
嵌套聚合
GET /logstash-nginxacclog-2017.05.19/_search
{
  "size": 0, 
    "aggs" : {
        "request" : {
            "terms" : { 
              "field" : "request.keyword" ,
              "size": 150
            },
             "aggs" : {
                "avg_request_time" : { 
                  "avg" : { 
                    "field" : "request_time" 
                  } 
                }
            }
        }
    }
}
 
按度量排序
GET /logstash-nginxacclog-2017.06.*/_search
{
  "size": 0,
  "aggs": {
    "request": {
      "terms": {
        "field": "request.keyword",
        "size": 10,
        "order": {
          "avg_request_time": "desc"
        }
      },
      "aggs": {
        "avg_request_time": {
          "avg": {
            "field": "request_time"
          }
        }
      }
    }
  }
}
 
聚合字符字段
GET /logstash-nginxacclog-2017.06*/_search
{
  "size": 0,
  "aggs": {
    "request": {
      "terms": {
        "field": "status"
      },
      "aggs": {
        "http_host": {
          "terms": {
            "field": "http_host.keyword"
          }
        }
      }
    }
  }
}
 
多层嵌套聚合
GET /logstash-nginxacclog-2017.06*/_search
{
  "size": 0,
  "aggs": {
    "request": {
      "terms": {
        "field": "request.keyword"
      },
      "aggs": {
        "http_referer": {
          "terms": {
            "field": "http_referer.keyword"
          },
          "aggs": {
            "avg_request_time": {
              "avg": {
                "field": "request_time"
              }
            }
          }
        }
      }
    }
  }
}

search 请求参数

GET /fgo/_search
{
  "from": 2, 
  "size": 2  , 
  "timeout": "1ms", 
  "query": {
    "match": {
      "name": "tom"
    }
  }
}

search示例

GET /logstash-nginxacclog-*/_search
{
  "query": {
    "match": {
      "http_host": {
        "query": "local.shebao520.com",
        "type": "phrase"
      }
    }
  }
}
 
GET /logstash-nginxacclog-2017.05.19/_search
{
  "query": {
    "match": {
      "http_host": "local.milamall.com"
    }
  }
}
 
GET /logstash-nginxacclog-2017.05.19/_search
{
  "size": 20,
  "query": {
    "match": {
      "request.keyword": "/oauth/check?access_token=fnLHShyrSOiiE3dcDrnfQrC5NExrs3PnsmN48fNn"
    }
  }
}

script

Elasticsearch中,可以使用自定义脚本扩展功能。包括评分、过滤函数和聚合字段等方面。内置脚本引擎历经MVEL、Groovy、Lucene expression的变换后,Elastic.co最终决定实现一个自己专用的Painless脚本语言,并在5.0版正式发布。

动态提交

最简单易用的方式,就是在正常的请求体中,把field换成script提交。

固定文件

为了和动态提交的语法有区别,调用固定文件

其他语言

ES支持通过插件方式,扩展脚本语言的支持,目前默认自带的语言包括:

  • painless
  • lucene expression
  • groovy
  • mustache

而github上目前已有以下语言插件支持,基本覆盖了所有JVM上的可用语言:

reindex

Elasticsearch本身不提供对索引的rename,mapping的alter等操作。所以,如果有需要对全索引数据进行导出,或者修改某个已有字段的mapping设置等情况下,我们只能通过scroll API导出全部数据,然后重新做一次索引写入。这个过程,叫做reindex。

之前完成这个过程只能自己写程序或者用logstash。5.0中,Elasticsearch将这个过程内置为reindex API,但是要注意:这个接口并没有什么黑科技,其本质仅仅是将这段相同逻辑的代码预置分发而已。如果有复杂的数据变更操作等细节需求,依然需要自己编程完成。

下面分别给出这三种方法的示例:

  • Perl客户端
  • Logstash做reindex
  • reindex API

ES在运维监控领域的其他玩法

percolator接口

在运维体系中,监控和报警总是成双成对的出现。Elastic Stack在时序统计方面的便捷,在很多时候被作为监控的一种方式在使用。那么,自然就引申出一个问题:Elastic Stack如何做报警?

对于简单而且固定需求的模式,我们可以在Logstash中利用filter/metric和filter/ruby等插件做预处理,直接output/nagios或output/zabbix来报警;但是对于针对全局的、更复杂的情况,Logstash就无能为力了。

目前比较通行的办法。有两种:

  • 对于匹配报警,采用ES的Percolator接口做响应报警;
  • 对于时序统计,采用定时任务方式,发送ES aggs请求,分析响应体报警。

针对报警的需求,ES官方也在最近开发了Watcher商业产品,和Shield一样以ES插件形式存在。

设置内存大小

config/jvm.options
-Xms1g
-Xmx1g

Linux max_map_count参数修改的问题

vm.max_map_count=200000直接写到/etc/sysctl.conf中,然后执行sysctl -p。

max file descriptors [4096] for elasticsearch process likely too low, increase to at least [65536]

加入以下两行,redhat这里为用户名。

sudo vim /etccuritymits.conf
redhat hard nofile 65536
redhat soft nofile 65536

Elasticsearch 5.0.0. cluster node not joining

Exception: failed to send join request to master, reason RemoteTransportException can't add node found existing node with the same id but is a different node instance]

Ok so the issue was copying the elasticsearch folder from one node to another over scp. Elasticsearch saves the node id in elasticsearch/data/ folder. Deleted the data folder on one node and restarted it. The cluster is up and running. Hope this saves someone the hassle.

kibana

xpack

在运维体系中,监控和报警总是成双成对的出现。Elastic Stack在时序统计方面的便捷,在很多时候被作为监控的一种方式在使用。那么,自然就引申出一个问题:Elastic Stack如何做报警?+

对于简单而且固定需求的模式,我们可以在Logstash中利用filter/metric和filter/ruby等插件做预处理,直接output/nagios或output/zabbix来报警;但是对于针对全局的、更复杂的情况,Logstash就无能为力了。

目前比较通行的办法。有两种:

  • 对于匹配报警,采用ES的Percolator接口做响应报警;
  • 对于时序统计,采用定时任务方式,发送ES aggs请求,分析响应体报警。

授权

GET /_xpack/license    #查看授权信息

申请授权页面:https://www.elastic.co/subscriptions

一般申请免费授权就够用了。

curl -XGET -u elastic:changeme 'http://192.168.10.11:9200/_xpack/license'
curl -XPUT -u elastic:changeme 'http://192.168.10.11:9200/_xpack/license?acknowledge=true' -H "Content-Type: application/json" -d @tom-xie-68d464d3-1a5f-4041-b7c2-0beb5e0d26e3-v5.json

watcher

Managing Watches

  • Use the Put Watch API to add or update watches
  • Use the Get Watch API to retrieve watches
  • Use the Delete Watch API to delete watches
  • Use the Activate Watch API to activate watches
  • Use the Deactivate Watch API to deactivate watches
  • Use the Ack Watch API to acknowledge watches

相关命令

GET _xpack/watcher/watch/my-watch
GET _xpack/watcher/stats
DELETE _xpack/watcher/watch/my-watch
 
GET .watches/_search
{
  "size" : 100
}
 
PUT _xpack/watcher/watch/my-watch
{
  "trigger" : {
    "schedule" : { 
        "hourly" : { "minute" : 1 }
    }
  },
  "input" : {
    "search" : {
      "request" : {
        "indices" : [ "logstash-*" ],
        "body" : {
          "query" : { "match_all" : {}}
        }
      }
    }
  },
  "condition" : {
     "always" : {}
  },
  "actions" : {
    "email_admin" : {
      "email" : {
        "to" : "tom.xie@que360.com",
        "subject" : "200 tom"
      }
    },
    "log" : { 
        "logging" : {
            "text" : "executed at tom" 
        }
    },
    "my_webhook" : { 
        "webhook" : {
            "method" : "GET", 
            "host" : "http://www.tomtalk.net", 
            "port" : 8080, 
            "path": "/receive/unit_data?value=a&param=b&return=17&code=uhlk2881yor80s1g"
        }
    }
  }
}

一个简单的watcher logging

PUT _xpack/watcher/watch/tom
{
  "trigger" : {
    "schedule" : { 
        "interval" : "1s" 
    }
  },
  "input" : {
    "simple" : {
      "name" : "John"
    }
  },
  "condition" : {
     "always" : {}
  },
  "actions" : {
    "log" : { 
        "logging" : {
            "text" : "executed at tomtom. first ok!" 
        }
    }
  }
}

一个简单的watcher webhook

PUT _xpack/watcher/watch/my-watch
{
  "trigger" : {
    "schedule" : { 
        "interval" : "1s" 
    }
  },
  "input" : {
    "simple" : {
      "name" : "John"
    }
  },
  "condition" : {
     "always" : {}
  },
  "actions" : {    
    "my_webhook" : { 
        "webhook" : {
            "method" : "GET", 
            "url" : "http://www.tomtalk.net", 
            "port" : 8080, 
            "path": "/receive/unit_data?value=a&param=b&return=17&code=uhlk2881yor80s1g"
        }
    }
  }
}

一个简单的watcher email

PUT _xpack/watcher/watch/my-watch
{
  "trigger" : {
    "schedule" : { 
        "interval" : "1s" 
    }
  },
  "input" : {
    "simple" : {
      "name" : "John"
    }
  },
  "condition" : {
     "always" : {}
  },
  "actions" : {
    "email_admin" : {
      "email" : {
        "to" : "tom.xie@que360.com",
        "from": "sky007_tom@163.com",    #163邮箱需要指定from,否则报错。
        "subject" : "subject 200 tom",
        "body" : "body 200 tom"
      }
    }
  }
}

config/elasticsearch.yml 邮箱配置

xpack.notification.email.account:
    tom_account:
        smtp:
            host: smtp.163.com
            port: 25
            user: xxxx@xxxx.com
            password: xxxxxx