Elastic Stack
目录
通用
- 安装java8,java9不兼容。
- ElasticSearch、Kinaba在config里默认是用localhost,指定IP,否则用IP无法访问。
Beats
Logstash
Hello World
# bin/logstash -e 'input{stdin{}}output{stdout{codec=>rubydebug}}' { "@timestamp" => 2017-05-03T05:34:41.343Z, "@version" => "1", "host" => "elk", "message" => "hello world" }
Logstash会给事件添加一些额外信息。最重要的就是 @timestamp,用来标记事件的发生时间。
此外,大多数时候,还可以见到另外几个:
- host 标记事件发生在哪里。
- type 标记事件的唯一类型。
- tags 标记事件的某方面属性。这是一个数组,一个事件可以有多个标签。
每个logstash过滤插件,都会有四个方法:
- add_tag
- remove_tag
- add_field
- remove_field
配置语法
Logstash社区通常习惯用shipper,broker和indexer来描述数据流中不同进程各自的角色。
不过我见过很多运用场景里都没有用logstash作为shipper,或者说没有用elasticsearch作为数据存储也就是说也没有indexer。所以,我们其实不需要这些概念。只需要学好怎么使用和配置logstash进程,然后把它运用到你的日志管理架构中最合适它的位置就够了。
./logstash -e 'input{stdin{}}output{stdout{}}' 2017-05-03T06:15:51.690Z elk hello world ./logstash -e '' { "@timestamp" => 2017-05-03T06:16:38.373Z, "@version" => "1", "host" => "elk", "message" => "hello world", "type" => "stdin" }
- 命令行参数
bin/logstash -e '' #-e 意即执行。 bin/logstash -f agent.conf #-f 意即文件。 bin/logstash -f /etc/logstash.d/ #读取目录下所有文本文件,然后在自己内存里拼接成一个完整的大配置文件,再去执行。
logstash列出目录下所有文件时,是字母排序的。而logstash配置段的filter和output都是顺序执行,所以顺序非常重要。采用多文件管理的用户,推荐采用数字编号方式命名配置文件。
- 设置文件
从Logstash 5.0开始,新增了$LS_HOME/config/logstash.yml文件,可以将所有的命令行参数都通过YAML文件方式设置。同时为了反映命令行配置参数的层级关系,参数也都改成用.而不是-了。
pipeline: workers: 24 batch: size: 125 delay: 5
plugin的安装
从logstash 1.5.0版本开始,logstash将所有的插件都独立拆分成gem包。这样,每个插件都可以独立更新,不用等待logstash自身做整体更新的时候才能使用了。
为了达到这个目标,logstash配置了专门的plugins管理命令。
bin/logstash-plugin list 查看本机现在有多少插件可用 bin/logstash-plugin install logstash-output-webhdfs 安装插件 bin/logstash-plugin update logstash-input-tcp 更新插件 bin/logstash-plugin install /path/to/logstash-filter-crash.gem 安装本地插件
长期运行
- 标准的service方式 : service logstash start
- 最基础的nohup方式 : nohup command &> /dev/null
- 更优雅的SCREEN方式
- 最推荐的daemontools方式
插件配置
input
请记住一个原则:Logstash配置一定要有一个input和一个output。在演示过程中,如果没有写明input,默认就会使用"hello world"里我们已经演示过的input/stdin ,同理,没有写明的output就是output/stdout。
- 标准输入(Stdin)
我们已经见过好几个示例使用 stdin 了。这也应该是 logstash 里最简单和基础的插件了。 所以,在这段中,我们可以学到一些未来每个插件都会有的一些方法。 配置示例
input {
stdin { add_field => {"key" => "value"} codec => "plain" tags => ["add"] type => "std" }
}
output {
stdout { codec=>rubydebug }
}
{
"@timestamp" => 2017-05-03T07:21:24.631Z, "@version" => "1", "host" => "elk", "message" => "hello world", "type" => "std", "key" => "value", "tags" => [ [0] "add" ]
}
- 读取文件(File)
input {
file { path => ["/var/log/apache2/*.log"] type => "system" start_position => "beginning" }
}
output {
stdout { codec=>rubydebug }
}
{
"path" => "/var/log/apache2/access.log", "@timestamp" => 2017-05-03T07:28:04.825Z, "@version" => "1", "host" => "elk", "message" => "192.168.10.1 - - [03/May/2017:15:28:04 +0800] \"GET / HTTP/1.1\" 200 3524 \"-\" \"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/56.0.2924.87 Safari/537.36\"", "type" => "system"
}
- 解释
discover_interval #logstash每隔多久去检查一次被监听的path下是否有新文件。默认值是15秒。 exclude #不想被监听的文件可以排除出去。 close_older #一个已经监听中的文件,如果超过这个值的时间内没有更新内容,就关闭监听它的文件句柄。默认是3600秒,即一小时。 ignore_older #在每次检查文件列表的时候,如果一个文件的最后修改时间超过这个值,就忽略这个文件。默认是86400秒,即一天。 stat_interval #logstash每隔多久检查一次被监听文件状态(是否有更新),默认是1秒。 start_position #logstash从什么位置开始读取文件数据,默认是结束位置。如果你是要导入原有数据,把这个设定改成 "beginning",logstash 进程就从头开始读取。
- 注意
- 通常你要导入原有数据进Elasticsearch的话,你还需要filter/date插件来修改默认的"@timestamp" 字段值。
- FileWatch只支持文件的绝对路径,而且会不自动递归目录。所以有需要的话,请用数组方式都写明具体哪些文件。
- 因为windows平台上没有inode的概念,Logstash某些版本在windows平台上监听文件不是很靠谱。windows平台上,推荐考虑使用nxlog作为收集端。
codec
Codec 是 logstash 从 1.3.0 版开始新引入的概念(Codec 来自 Coder/decoder 两个单词的首字母缩写)。
在此之前,logstash 只支持纯文本形式输入,然后以过滤器处理它。但现在,我们可以在输入 期处理不同类型的数据,这全是因为有了 codec 设置。
所以,这里需要纠正之前的一个概念。Logstash 不只是一个input | filter | output 的数据流,而是一个 input | decode | filter | encode | output 的数据流!codec 就是用来 decode、encode 事件的。
codec 的引入,使得 logstash 可以更好更方便的与其他有自定义数据格式的运维产品共存,比如 graphite、fluent、netflow、collectd,以及使用 msgpack、json、edn 等通用数据格式的其他产品等。
事实上,我们在第一个 "hello world" 用例中就已经用过 codec 了 —— rubydebug 就是一种 codec!虽然它一般只会用在 stdout 插件中,作为配置测试或者调试的工具。
- 采用 JSON 编码
在早期的版本中,有一种降低 logstash 过滤器的 CPU 负载消耗的做法盛行于社区(在当时的 cookbook 上有专门的一节介绍):直接输入预定义好的 JSON 数据,这样就可以省略掉 filter/grok 配置!+
这个建议依然有效,不过在当前版本中需要稍微做一点配置变动 —— 因为现在有专门的 codec 设置。
- 合并多行数据(Multiline)
有些时候,应用程序调试日志会包含非常丰富的内容,为一个事件打印出很多行内容。这种日志通常都很难通过命令行解析的方式做分析。
而 logstash 正为此准备好了 codec/multiline 插件!
其实这个插件的原理很简单,就是把当前行的数据添加到前面一行后面,,直到新进的当前行匹配 ^\[ 正则为止。
- collectd简述
collectd 是一个守护(daemon)进程,用来收集系统性能和提供各种存储方式来存储不同值的机制。它会在系统运行和存储信息时周期性的统计系统的相关统计信息。利用这些信息有助于查找当前系统性能瓶颈(如作为性能分析 performance analysis)和预测系统未来的 load(如能力部署capacity planning)等。
filter
丰富的过滤器插件的存在是 logstash 威力如此强大的重要因素。名为过滤器,其实提供的不单单是过滤的功能。在本章我们就会重点介绍几个插件,它们扩展了进入过滤器的原始数据,进行复杂的逻辑处理,甚至可以无中生有的添加新的 logstash 事件到后续的流程中去!
- 时间处理(Date)
之前章节已经提过,filters/date 插件可以用来转换你的日志记录中的时间字符串,变成 LogStash::Timestamp 对象,然后转存到 @timestamp 字段里。
filter { grok { match => ["message", "%{HTTPDATE:logdate}"] } date { match => ["logdate", "dd/MMM/yyyy:HH:mm:ss Z"] } }
- Grok 正则捕获
Grok是Logstash最重要的插件。你可以在grok里预定义好命名正则表达式,在稍后(grok参数或者其他正则表达式里)引用它。
filter { grok { match => { "message" => "%{WORD} %{NUMBER:request_time:float} %{WORD}" } } } { "message" => "begin 123.456 end", "@version" => "1", "@timestamp" => "2014-08-09T12:23:36.634Z", "host" => "raochenlindeMacBook-Air.local", "request_time" => 123.456 }
- dissect
grok作为Logstash最广为人知的插件,在性能和资源损耗方面同样也广为诟病。为了应对这个情况,同时也考虑到大多数时候,日志格式并没有那么复杂,Logstash开发团队在5.0版新添加了另一个解析字段的插件:dissect。
当日志格式有比较简明的分隔标志位,而且重复性较大的时候,我们可以使用dissect插件更快的完成解析工作。
比如对http://rizhiyi.com/index.do?id=123写这么一段配置:
http://%{domain}/%{?url}?%{?arg1}=%{&arg1}
则最终生成的Event内容是这样的:
{ domain => "rizhiyi.com", id => "123" }
- GeoIP 地址查询归类
GeoIP 是最常见的免费 IP 地址归类查询库,同时也有收费版可以采购。GeoIP 库可以根据 IP 地址提供对应的地域信息,包括国别,省市,经纬度等,对于可视化地图和区域统计非常有用。
- JSON编解码
在上一章,已经讲过在codec中使用JSON编码。但是,有些日志可能是一种复合的数据结构,其中只是一部分记录是JSON格式的。这时候,我们依然需要在filter阶段,单独启用JSON解码插件。
filter { json { source => "message" target => "jsoncontent" } } { "@version": "1", "@timestamp": "2014-11-18T08:11:33.000Z", "host": "web121.mweibo.tc.sinanode.com", "message": "{\"uid\":3081609001,\"type\":\"signal\"}", "jsoncontent": { "uid": 3081609001, "type": "signal" } }
如果不打算使用多层结构的话,删掉 target 配置即可。新的结果如下:
{ "@version": "1", "@timestamp": "2014-11-18T08:11:33.000Z", "host": "web121.mweibo.tc.sinanode.com", "message": "{\"uid\":3081609001,\"type\":\"signal\"}", "uid": 3081609001, "type": "signal" }
- Key-Value 切分
在很多情况下,日志内容本身都是一个类似于key-value的格式,但是格式具体的样式却是多种多样的。logstash提供filters/kv插件,帮助处理不同样式的key-value日志。
- 数值统计(Metrics)
filters/metrics 插件是使用 Ruby 的 Metriks 模块来实现在内存里实时的计数和采样分析。该模块支持两个类型的数值分析:meter 和 timer。
- 数据修改(Mutate)
filters/mutate插件是Logstash另一个重要插件。它提供了丰富的基础类型数据处理能力。
- 类型转换
- 字符串处理
- 字段处理
- 随心所欲的 Ruby 处理
如果你稍微懂那么一点点Ruby语法的话,filters/ruby插件将会是一个非常有用的工具。
比如你需要稍微修改一下LogStash::Event对象,但是又不打算为此写一个完整的插件,用filters/ruby插件绝对感觉良好。
- split 拆分事件
上一章我们通过multiline插件将多行数据合并进一个事件里,那么反过来,也可以把一行数据,拆分成多个事件。这就是split插件。
filter { split { field => "message" terminator => "#" } } <source> 这个测试中,我们在 intputs/stdin的终端中输入一行数据:"test1#test2",结果看到输出两个事件: <source lang="bash"> { "@version": "1", "@timestamp": "2014-11-18T08:11:33.000Z", "host": "web121.mweibo.tc.sinanode.com", "message": "test1" } { "@version": "1", "@timestamp": "2014-11-18T08:11:33.000Z", "host": "web121.mweibo.tc.sinanode.com", "message": "test2" }
output
- 保存进Elasticsearch
Logstash可以使用不同的协议实现完成将数据写入Elasticsearch的工作。在不同时期,也有不同的插件实现方式。本节以最新版为准,即主要介绍HTTP方式。
output { elasticsearch { hosts => ["192.168.0.2:9200"] index => "logstash-%{type}-%{+YYYY.MM.dd}" document_type => "%{type}" flush_size => 20000 idle_flush_time => 10 sniffing => true template_overwrite => true } }
Logstash会努力攒到20000条数据一次性发送出去,但是如果10秒钟内也没攒够20000条,Logstash还是会以当前攒到的数据量发一次。
默认情况下,flush_size是500条,idle_flush_time是1秒。
- 发送邮件(Email)
output { email { port => "25" address => "smtp.126.com" username => "test@126.com" password => "" authentication => "plain" use_tls => true from => "test@126.com" subject => "Warning: %{title}" to => "test@qq.com" via => "smtp" body => "%{message}" } }
- 调用命令执行(Exec)
outputs/exec插件的运用也非常简单,如下所示,将logstash切割成的内容作为参数传递给命令。这样,在每个事件到达该插件的时候,都会触发这个命令的执行。
output { exec { command => "sendsms.pl \"%{message}\" -t %{user}" } }
需要注意的是。这种方式是每次都重新开始执行一次命令并退出。本身是比较慢速的处理方式(程序加载,网络建联等都有一定的时间消耗)。最好只用于少量的信息处理场景,比如不适用nagios的其他报警方式。示例就是通过短信发送消息。
- 保存成文件(File)
通过日志收集系统将分散在数百台服务器上的数据集中存储在某中心服务器上,这是运维最原始的需求。
output { file { path => "/path/to/%{+yyyy}/%{+MM}/%{+dd}/%{host}.log.gz" message_format => "%{message}" gzip => true } }
- 报警到Nagios
Logstash中有两个output插件是nagios有关的。outputs/nagios插件发送数据给本机的nagios.cmd管道命令文件,outputs/nagios_nsca插件则是调用send_nsca命令以NSCA协议格式把数据发送给nagios服务器(远端或者本地皆可)。
- 输出到Statsd
Statsd最早是2008年Flickr公司用Perl写的针对Graphite、datadog等监控数据后端存储开发的前端网络应用,2011年Etsy公司用node.js重构。用于接收(默认UDP)、写入、读取和聚合时间序列数据,包括即时值和累积值等。 Graphite是用Python模仿RRDtools写的时间序列数据库套件。包括三个部分:
- carbon:是一个Twisted守护进程,监听处理数据;
- whisper:存储时间序列的数据库;
- webapp:一个用Django框架实现的网页应用。
- 标准输出(Stdout)
和之前inputs/stdin插件一样,outputs/stdout插件也是最基础和简单的输出插件。同样在这里简单介绍一下,作为输出插件的一个共性了解。
output { stdout { codec => rubydebug { } workers => 2 } }
可能除了codecs/multiline,其他codec插件本身并没有太多的设置项。所以一般省略掉后面的配置区段。
- 发送网络数据(TCP)
虽然之前我们已经提到过不建议直接使用LogStash::Inputs::TCP和LogStash::Outputs::TCP做转发工作,不过在实际交流中,发现确实有不少朋友觉得这种简单配置足够使用,因而不愿意多加一层消息队列的。所以,还是把Logstash如何直接发送TCP数据也稍微提点一下。
output { tcp { host => "192.168.0.2" port => 8888 codec => json_lines } }
Elastic Search
设置内存大小
config/jvm.options -Xms1g -Xmx1g
Linux max_map_count参数修改的问题
vm.max_map_count=200000直接写到/etc/sysctl.conf中,然后执行sysctl -p。