| 知乎专栏 |
curl -s https://raw.githubusercontent.com/netkiller/shell/master/search/elastic/elastic-8.x.sh | bash
手工安装
rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
cat >> /etc/yum.repos.d/logstash.repo <<EOF
[logstash-8.x]
name=Elastic repository for 8.x packages
baseurl=https://artifacts.elastic.co/packages/8.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
EOF
dnf install -y logstash
cp /etc/logstash/logstash.yml{,.original}
chown logstash:logstash -R /etc/logstash
systemctl daemon-reload
systemctl enable logstash.service
systemctl start logstash.service
修改启动用户,否则启动会失败
[root@netkiller ~]# vim /usr/lib/systemd/system/logstash.service User=logstash Group=logstash 修改 User=root Group=root
apiVersion: v1 data: filebeat.yml: |- filebeat.inputs: - type: log paths: - /tmp/* fields: project: test group: test stage: test format: json multiline: pattern: '^\[[^stacktrace]' negate: true match: after processors: - add_cloud_metadata: - add_host_metadata: output.logstash: hosts: ["172.18.200.10:5044"] kind: ConfigMap metadata: name: filebeat namespace: default
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
app: bottleneck
name: bottleneck
namespace: default
spec:
progressDeadlineSeconds: 600
replicas: 1
revisionHistoryLimit: 10
selector:
matchLabels:
app: bottleneck
strategy:
rollingUpdate:
maxSurge: 1
maxUnavailable: 0
type: RollingUpdate
template:
metadata:
creationTimestamp: null
labels:
app: bottleneck
spec:
affinity: {}
containers:
- env:
- name: TZ
value: Asia/Shanghai
- name: JAVA_OPTS
value: -Xms2048m -Xmx4096m
- name: SPRING_OPTS
value: --spring.profiles.active=dev --server.undertow.worker-threads=5000
image: nginx:latest
imagePullPolicy: IfNotPresent
name: nginx
ports:
- containerPort: 80
name: http
protocol: TCP
resources: {}
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
volumeMounts:
- mountPath: /tmp
name: tmp
- args:
- -c
- /usr/share/filebeat/filebeat.yml
- -e
env:
- name: TZ
value: Asia/Shanghai
- name: JAVA_OPTS
- name: SPRING_OPTS
image: docker.elastic.co/beats/filebeat:8.6.1
imagePullPolicy: IfNotPresent
name: filebeat
resources: {}
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
volumeMounts:
- mountPath: /usr/share/filebeat/filebeat.yml
name: config
readOnly: true
subPath: filebeat.yml
- mountPath: /tmp
name: tmp
dnsPolicy: ClusterFirst
restartPolicy: Always
schedulerName: default-scheduler
securityContext: {}
terminationGracePeriodSeconds: 30
volumes:
- configMap:
defaultMode: 420
name: filebeat
name: config
- emptyDir: {}
name: tmp
logstash -e "input {stdin{}} output {stdout{}}"
/usr/share/logstash/bin/logstash -e 'input{file {path => "/etc/centos-release" start_position => "beginning"}} output { stdout {}}'
/usr/share/logstash/bin/logstash -f stdin.conf /usr/share/logstash/bin/logstash -f jdbc.conf --path.settings /etc/logstash --path.data /tmp
root@netkiller ~/logstash % /usr/share/logstash/bin/logstash -t -f test.conf WARNING: Default JAVA_OPTS will be overridden by the JAVA_OPTS defined in the environment. Environment JAVA_OPTS are -server -Xms2048m -Xmx4096m WARNING: Could not find logstash.yml which is typically located in $LS_HOME/config or /etc/logstash. You can specify the path using --path.settings. Continuing using the defaults Could not find log4j2 configuration at path /usr/share/logstash/config/log4j2.properties. Using default config which logs errors to the console Configuration OK
## JVM configuration
# Xms represents the initial size of total heap space
# Xmx represents the maximum size of total heap space
-Xms1g
-Xmx4g
################################################################
## Expert settings
################################################################
##
## All settings below this section are considered
## expert settings. Don't tamper with them unless
## you understand what you are doing
##
################################################################
## GC configuration
11-13:-XX:+UseConcMarkSweepGC
11-13:-XX:CMSInitiatingOccupancyFraction=75
11-13:-XX:+UseCMSInitiatingOccupancyOnly
## Locale
# Set the locale language
-Duser.language=zh
# Set the locale country
-Duser.country=CN
# Set the locale variant, if any
#-Duser.variant=
## basic
# set the I/O temp directory
#-Djava.io.tmpdir=$HOME
# set to headless, just in case
-Djava.awt.headless=true
# ensure UTF-8 encoding by default (e.g. filenames)
-Dfile.encoding=UTF-8
# use our provided JNA always versus the system one
#-Djna.nosys=true
# Turn on JRuby invokedynamic
-Djruby.compile.invokedynamic=true
## heap dumps
# generate a heap dump when an allocation from the Java heap fails
# heap dumps are created in the working directory of the JVM
-XX:+HeapDumpOnOutOfMemoryError
# specify an alternative path for heap dumps
# ensure the directory exists and has sufficient space
#-XX:HeapDumpPath=${LOGSTASH_HOME}/heapdump.hprof
## GC logging
#-Xlog:gc*,gc+age=trace,safepoint:file=@loggc@:utctime,pid,tags:filecount=32,filesize=64m
# log GC status to a file with time stamps
# ensure the directory exists
#-Xloggc:${LS_GC_LOG_FILE}
# Entropy source for randomness
-Djava.security.egd=file:/dev/urandom
# Copy the logging context from parent threads to children
-Dlog4j2.isThreadContextMapInheritable=true
[root@netkiller ~]# cat /etc/logstash/pipelines.yml # This file is where you define your pipelines. You can define multiple. # For more information on multiple pipelines, see the documentation: # https://www.elastic.co/guide/en/logstash/current/multiple-pipelines.html - pipeline.id: main path.config: "/etc/logstash/conf.d/*.conf"
配置 pipelines.yml 文件
- pipeline.id: main path.config: "/etc/logstash/conf.d/*.conf" - pipeline.id: finance path.config: "/etc/logstash/conf.finance/*.conf" - pipeline.id: market path.config: "/etc/logstash/conf.market/*.conf" - pipeline.id: customer path.config: "/etc/logstash/conf.customer/*.conf"
root@netkiller ~ % /usr/share/logstash/bin/logstash -e "input {stdin{}} output {stdout{}}"
Helloworld
ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.
WARNING: Could not find logstash.yml which is typically located in $LS_HOME/config or /etc/logstash. You can specify the path using --path.settings. Continuing using the defaults
Could not find log4j2 configuration at path //usr/share/logstash/config/log4j2.properties. Using default config which logs to console
18:03:38.340 [[main]-pipeline-manager] INFO logstash.pipeline - Starting pipeline {"id"=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>1000}
18:03:38.356 [[main]-pipeline-manager] INFO logstash.pipeline - Pipeline main started
The stdin plugin is now waiting for input:
2017-08-03T10:03:38.375Z localhost Helloworld
18:03:38.384 [Api Webserver] INFO logstash.agent - Successfully started Logstash API endpoint {:port=>9601}
rubydebug提供以json格式输出到屏幕
root@netkiller ~ % /usr/share/logstash/bin/logstash -e 'input{stdin{}}output{stdout{codec=>rubydebug}}'
My name is neo
ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.
WARNING: Could not find logstash.yml which is typically located in $LS_HOME/config or /etc/logstash. You can specify the path using --path.settings. Continuing using the defaults
Could not find log4j2 configuration at path //usr/share/logstash/config/log4j2.properties. Using default config which logs to console
18:05:02.734 [[main]-pipeline-manager] INFO logstash.pipeline - Starting pipeline {"id"=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>1000}
18:05:02.747 [[main]-pipeline-manager] INFO logstash.pipeline - Pipeline main started
The stdin plugin is now waiting for input:
{
"@timestamp" => 2017-08-03T10:05:02.764Z,
"@version" => "1",
"host" => "localhost",
"message" => "My name is neo"
}
18:05:02.782 [Api Webserver] INFO logstash.agent - Successfully started Logstash API endpoint {:port=>9601}
input {
file {
type => "syslog"
path => [ "/var/log/maillog", "/var/log/messages", "/var/log/secure" ]
start_position => "beginning"
}
}
output {
stdout { codec => rubydebug }
elasticsearch {
hosts => ["127.0.0.1:9200"]
}
}
start_position => "beginning" 从头开始读,如果没有这个选项,只会读取最后更新的数据。
input {
file { path =>"/var/log/messages" type =>"syslog"}
file { path =>"/var/log/apache/access.log" type =>"apache"}
}
input {
file {
type => "syslog"
path => [ "/var/log/secure", "/var/log/messages", "/var/log/syslog" ]
}
tcp {
port => "5145"
type => "syslog-network"
}
udp {
port => "5145"
type => "syslog-network"
}
}
output {
elasticsearch {
hosts => ["127.0.0.1:9200"]
}
}
input {
redis {
host => "127.0.0.1"
port => "6379"
key => "logstash:demo"
data_type => "list"
codec => "json"
type => "logstash-redis-demo"
tags => ["logstashdemo"]
}
}
output {
elasticsearch {
hosts => ["127.0.0.1:9200"]
}
}
指定 Database 10
root@netkiller /etc/logstash/conf.d % cat spring-boot-redis.conf
input {
redis {
codec => json
host => "localhost"
port => 6379
db => 10
key => "logstash:redis"
data_type => "list"
}
}
output {
stdout { codec => rubydebug }
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "logstash-api"
}
}
![]() |
input {
kafka {
zk_connect => "kafka:2181"
group_id => "logstash"
topic_id => "apache_logs"
consumer_threads => 16
}
}
root@netkiller /etc/logstash/conf.d % cat jdbc.conf
input {
jdbc {
jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"
jdbc_user => "cms"
jdbc_password => "123456"
schedule => "* * * * *"
statement => "select * from article where id > :sql_last_value"
use_column_value => true
tracking_column => "id"
tracking_column_type => "numeric"
record_last_run => true
last_run_metadata_path => "/var/tmp/article.last"
}
jdbc {
jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"
jdbc_user => "cms"
jdbc_password => "123456"
schedule => "* * * * *" #定时cron的表达式,这里是每分钟执行一次
statement => "select * from article where ctime > :sql_last_value"
use_column_value => true
tracking_column => "ctime"
tracking_column_type => "timestamp"
record_last_run => true
last_run_metadata_path => "/var/tmp/article-ctime.last"
}
}
output {
elasticsearch {
hosts => "localhost:9200"
index => "information"
document_type => "article"
document_id => "%{id}"
action => "update"
doc_as_upsert => true
}
}
系统默认是 ISO8601 如果需要转换为 yyyy-MM-dd-HH:mm:ss 参考:
filter {
date {
match => [ "ctime", "yyyy-MM-dd HH:mm:ss" ]
locale => "cn"
}
date {
match => [ "mtime", "yyyy-MM-dd HH:mm:ss" ]
locale => "cn"
}
}
date {
locale => "zh-CN"
#match => ["@timestamp", "yyyy-MM-dd HH:mm:ss"]
match => ["@timestamp", "ISO8601"]
timezone => "Asia/Shanghai"
target => ["@timestamp"]
}
创建匹配文件 /usr/share/logstash/patterns
mkdir /usr/share/logstash/patterns
vim /usr/share/logstash/patterns
NGUSERNAME [a-zA-Z\.\@\-\+_%]+
NGUSER %{NGUSERNAME}
NGINXACCESS %{IPORHOST:clientip} %{NGUSER:ident} %{NGUSER:auth} \[%{HTTPDATE:timestamp}\] "%{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response} (?:%{NUMBER:bytes}|-) (?:"(?:%{URI:referrer}|-)"|%{QS:referrer}) %{QS:agent}
filter {
if [type] == "nginx-access" {
grok {
match => { "message" => "%{NGINXACCESS}" }
}
}
}
input {
file {
type => "syslog"
path => [ "/var/log/*.log", "/var/log/messages", "/var/log/syslog" ]
sincedb_path => "/opt/logstash/sincedb-access"
}
syslog {
type => "syslog"
port => "5544"
}
}
filter {
grok {
type => "syslog"
match => [ "message", "%{SYSLOGBASE2}" ]
add_tag => [ "syslog", "grokked" ]
}
}
output {
elasticsearch { host => "elk.netkiller.cn" }
}
input {
file {
type => "SSRCode"
path => "/SD/2015*/01*/*.csv"
start_position => "beginning"
}
}
filter {
csv {
columns => ["Code","Source"]
separator => ","
}
kv {
source => "uri"
field_split => "&?"
value_split => "="
}
}
# output logs to console and to elasticsearch
output {
stdout {}
elasticsearch {
hosts => ["172.16.1.1:9200"]
}
}
input {
stdin {}
}
filter {
ruby {
init => "
begin
@@csv_file = 'output.csv'
@@csv_headers = ['A','B','C']
if File.zero?(@@csv_file) || !File.exist?(@@csv_file)
CSV.open(@@csv_file, 'w') do |csv|
csv << @@csv_headers
end
end
end
"
code => "
begin
event['@metadata']['csv_file'] = @@csv_file
event['@metadata']['csv_headers'] = @@csv_headers
end
"
}
csv {
columns => ["a", "b", "c"]
}
}
output {
csv {
fields => ["a", "b", "c"]
path => "%{[@metadata][csv_file]}"
}
stdout {
codec => rubydebug {
metadata => true
}
}
}
测试
echo "1,2,3\n4,5,6\n7,8,9" | ./bin/logstash -f csv-headers.conf
输出结果
A,B,C 1,2,3 4,5,6 7,8,9
日期格式化, 将ISO 8601日期格式转换为 %Y-%m-%d %H:%M:%S
保存下面内容到配置文件data.conf
input {
stdin{}
}
filter {
ruby {
init => "require 'time'"
code => "event.set('ctime', event.get('ctime').time.localtime.strftime('%Y-%m-%d %H:%M:%S'))"
}
ruby {
init => "require 'time'"
code => "event.set('mtime', event.get('mtime').time.localtime.strftime('%Y-%m-%d %H:%M:%S'))"
}
}
output {
stdout {
codec => rubydebug
}
}
/usr/share/logstash/bin/logstash -f date.conf
丢弃日志种包含 MonthShardingAlgorithm 字符串的日志
root@logging /o/l/p/e/03# cat /srv/logstash/pipeline/filebeat.conf
input {
beats {
port => 5044
}
}
filter{
if "MonthShardingAlgorithm" in [message] {
drop{}
}
}
output {
file {
path => "/opt/log/%{[fields][environment]}/%{[fields][service]}/%{+MM}/spring.%{+yyyy}-%{+MM}-%{+dd}.log"
codec => line { format => "%{message}"}
}
#file {
# path => "/opt/log/%{[fields][environment]}/%{[fields][service]}/%{+MM}/spring.%{+yyyy}-%{+MM}-%{+dd}.log.json.gz"
# codec => json_lines
# gzip => true
#}
redis {
host => ["r-bp1d17217fa77e14756.redis.rds.aliyuncs.com"]
password => "Ejy2016redis"
key => "filebeat2"
codec => json_lines
data_type => "list"
}
}
http://grokdebug.herokuapp.com
grok-patternsfilebeat 发送过来日志是文本可是,我们需要使用 grok 匹配后将对应的值放入指定变量
input {
beats {
port => 5044
}
}
filter{
if "MonthShardingAlgorithm" in [message] {
drop{}
}
grok{
match => ["message", "\[%{TIMESTAMP_ISO8601:timestamp}\] \[%{NOTSPACE:hostname}\] \[%{LOGLEVEL:level}\] \[%{NOTSPACE:thread-id}\] %{NOTSPACE:class} - %{JAVALOGMESSAGE:msg}"]
#target => "result"
}
}
output {
file {
path => "/opt/log/%{[fields][environment]}/%{[fields][service]}/%{+MM}/spring.%{+yyyy}-%{+MM}-%{+dd}.log"
codec => line { format => "%{message}"}
}
file {
path => "/opt/log/%{[fields][environment]}/%{[fields][service]}/%{+MM}/%{+dd}/spring.%{level}.log"
codec => line { format => "%{message}"}
}
if "ERROR" == [level] {
file {
path => "/opt/log/%{[fields][environment]}/%{[fields][service]}/beats.log"
codec => line { format => "%{message}"}
}
}
file {
path => "/opt/log/beats.%{+yyyy}-%{+MM}-%{+dd}.log.gz"
codec => json_lines
gzip => true
}
}
/etc/logstash/conf.d/file.conf
output {
file {
path => "/path/to/%{host}/%{+yyyy}/%{+MM}/%{+dd}.log.gz"
message_format => "%{message}"
gzip => true
}
}
每个 tags 标签生成一个日志文件
input {
tcp {
port => 4567
codec => json_lines
}
}
filter {
ruby {
code => "event.set('datetime', event.get('@timestamp').time.localtime.strftime('%Y-%m-%d %H:%M:%S'))"
}
}
output {
if "finance" in [tags] {
file {
path => "/opt/log/%{app}.finance.%{+yyyy}-%{+MM}-%{+dd}.log"
codec => line { format => "[%{datetime}] %{level} %{message}"}
}
} else if "market" in [tags] {
file {
path => "/opt/log/%{app}.market.%{+yyyy}-%{+MM}-%{+dd}.log"
codec => line { format => "[%{datetime}] %{level} %{message}"}
}
} else {
file {
path => "/opt/log/%{app}.unknow.%{+yyyy}-%{+MM}-%{+dd}.log"
codec => line { format => "[%{datetime}] %{level} %{message}"}
}
}
file {
path => "/opt/log/%{app}.%{+yyyy}-%{+MM}-%{+dd}.log.gz"
codec => json_lines
gzip => true
}
}
output {
stdout { codec => rubydebug }
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "logging"
}
}
配置实现每日切割一个 index
index => "logstash-%{+YYYY.MM.dd}"
"_index" : "logstash-2017.03.22"
index 自定义 logstash-%{type}-%{+YYYY.MM.dd}
input {
redis {
data_type => "list"
key => "logstash:redis"
host => "127.0.0.1"
port => 6379
threads => 5
codec => "json"
}
}
filter {
}
output {
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "logstash-%{type}-%{+YYYY.MM.dd}"
document_type => "%{type}"
workers => 1
flush_size => 20
idle_flush_time => 1
template_overwrite => true
}
stdout{}
}
[root@netkiller log]# cat /etc/logstash/conf.d/file.conf
input {
tcp {
port => 4567
codec => json_lines
}
gelf {
port => 12201
use_udp => true
#use_tcp => true
}
}
filter {
ruby {
code => "event.set('datetime', event.get('@timestamp').time.localtime.strftime('%Y-%m-%d %H:%M:%S'))"
}
}
output {
file {
path => "/opt/log/%{marker}.%{+yyyy}-%{+MM}-%{+dd}.log"
codec => line { format => "[%{datetime}] %{level} %{message}"}
}
file {
path => "/opt/log/origin.%{+yyyy}-%{+MM}-%{+dd}.log.gz"
codec => json_lines
gzip => true
}
if "ERROR" in [level] {
http {
url => "https://oapi.dingtalk.com/robot/send?access_token=56c27cb761c4a16473db02d9d28734a56cf549f6977ecc281d008f9a239ba3e0"
http_method => "post"
content_type => "application/json; charset=utf-8"
format => "message"
message => '{"msgtype":"text","text":{"content":"Monitor: %{message}"}}'
}
}
}
https://github.com/kmtong/logback-redis-appender
![]() |
/etc/logstash/conf.d/indexer.conf
input {
redis {
host => "127.0.0.1"
port => "6379"
key => "logstash:demo"
data_type => "list"
codec => "json"
type => "logstash-redis-demo"
tags => ["logstashdemo"]
}
}
output {
stdout { codec => rubydebug }
elasticsearch {
hosts => ["127.0.0.1:9200"]
}
}
测试
# redis-cli
127.0.0.1:6379> RPUSH logstash:demo "{\"time\": \"2012-01-01T10:20:00\", \"message\": \"logstash demo message\"}"
(integer) 1
127.0.0.1:6379> exit
如果执行成功日志如下
# cat /var/log/logstash/logstash-plain.log
[2017-03-22T15:54:36,491][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://127.0.0.1:9200/]}}
[2017-03-22T15:54:36,496][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://127.0.0.1:9200/, :path=>"/"}
[2017-03-22T15:54:36,600][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>#<URI::HTTP:0x20dae6aa URL:http://127.0.0.1:9200/>}
[2017-03-22T15:54:36,601][INFO ][logstash.outputs.elasticsearch] Using mapping template from {:path=>nil}
[2017-03-22T15:54:36,686][INFO ][logstash.outputs.elasticsearch] Attempting to install template {:manage_template=>{"template"=>"logstash-*", "version"=>50001, "settings"=>{"index.refresh_interval"=>"5s"}, "mappings"=>{"_default_"=>{"_all"=>{"enabled"=>true, "norms"=>false}, "dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"*", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword"}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date", "include_in_all"=>false}, "@version"=>{"type"=>"keyword", "include_in_all"=>false}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}}
[2017-03-22T15:54:36,693][INFO ][logstash.outputs.elasticsearch] Installing elasticsearch template to _template/logstash
[2017-03-22T15:54:36,780][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>[#<URI::Generic:0x2f9efc89 URL://127.0.0.1>]}
[2017-03-22T15:54:36,787][INFO ][logstash.pipeline ] Starting pipeline {"id"=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>1000}
[2017-03-22T15:54:36,792][INFO ][logstash.inputs.redis ] Registering Redis {:identity=>"redis://@127.0.0.1:6379/0 list:logstash:demo"}
[2017-03-22T15:54:36,793][INFO ][logstash.pipeline ] Pipeline main started
[2017-03-22T15:54:36,838][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
[2017-03-22T15:55:10,018][WARN ][logstash.runner ] SIGTERM received. Shutting down the agent.
[2017-03-22T15:55:10,024][WARN ][logstash.agent ] stopping pipeline {:id=>"main"}
input {
file {
path => [ "/var/log/nginx/access.log" ]
start_position => "beginning"
}
}
filter {
grok {
match => { "message" => "%{NGINXACCESS}" }
add_field => { "type" => "access" }
}
date {
match => [ "timestamp" , "dd/MMM/YYYY:HH:mm:ss Z" ]
}
geoip {
source => "clientip"
}
}
output {
redis {
host => "127.0.0.1"
port => 6379
data_type => "list"
key => "logstash:demo"
}
}
例 70.2. spring boot logback
root@netkiller /etc/logstash/conf.d % cat spring-boot-redis.conf
input {
redis {
codec => json
host => "localhost"
port => 6379
key => "logstash:redis"
data_type => "list"
}
}
output {
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "logstash-api"
}
}
src/main/resources/logback.xml
neo@MacBook-Pro ~/deployment % cat api.netkiller.cn/src/main/resources/logback.xml
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<include resource="org/springframework/boot/logging/logback/defaults.xml" />
<include resource="org/springframework/boot/logging/logback/file-appender.xml" />
<property name="type.name" value="test" />
<appender name="LOGSTASH" class="com.cwbase.logback.RedisAppender">
<source>mySource</source>
<sourcePath>mySourcePath</sourcePath>
<type>myApplication</type>
<tags>production</tags>
<host>localhost</host>
<port>6379</port>
<database>0</database>
<key>logstash:api</key>
</appender>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%date{yyyy-MM-dd HH:mm:ss} %-4relative [%thread] %-5level %logger{35} : %msg %n</pattern>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="STDOUT" />
<appender-ref ref="FILE" />
<appender-ref ref="LOGSTASH" />
</root>
</configuration>
[root@netkiller ~]# cat /etc/logstash/conf.d/file.conf
input {
tcp {
port => 4567
codec => json_lines
}
}
filter {
#ruby {
# code => "event.set('@timestamp', LogStash::Timestamp.at(event.get('@timestamp').time.localtime + 8*60*60))"
#}
ruby {
code => "event.set('datetime', event.get('@timestamp').time.localtime.strftime('%Y-%m-%d %H:%M:%S'))"
}
}
output {
file {
path => "/opt/log/%{app}.%{+yyyy}-%{+MM}-%{+dd}.log.gz"
codec => line { format => "[%{datetime}] %{level} %{message}"}
#codec => json_lines
gzip => true
}
}
每个 tags 一个文件
[root@netkiller ~]# cat /etc/logstash/conf.d/file.conf
input {
tcp {
port => 4567
codec => json_lines
}
}
filter {
ruby {
code => "event.set('datetime', event.get('@timestamp').time.localtime.strftime('%Y-%m-%d %H:%M:%S'))"
}
}
output {
if "finance" in [tags] {
file {
path => "/opt/log/%{app}.finance.%{+yyyy}-%{+MM}-%{+dd}.log"
codec => line { format => "[%{datetime}] %{level} %{message} %{tags}"}
}
} else if "market" in [tags] {
file {
path => "/opt/log/%{app}.market.%{+yyyy}-%{+MM}-%{+dd}.log"
codec => line { format => "[%{datetime}] %{level} %{message} %{tags}"}
}
} else {
file {
path => "/opt/log/%{app}.unknow.%{+yyyy}-%{+MM}-%{+dd}.log"
codec => line { format => "[%{datetime}] %{level} %{message} %{tags}"}
}
}
}
例 70.3. Elasticsearch 索引切割示例
root@netkiller /opt/api.netkiller.cn % cat /etc/logstash/conf.d/spring-boot-redis.conf
input {
redis {
codec => json
host => "localhost"
port => 6379
db => 10
key => "logstash:redis"
data_type => "list"
}
}
output {
stdout { codec => rubydebug }
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "logstash-%{type}-%{+YYYY.MM.dd}"
}
}
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<include resource="org/springframework/boot/logging/logback/defaults.xml" />
<include resource="org/springframework/boot/logging/logback/file-appender.xml" />
<property name="logstash.type" value="api" />
<property name="logstash.tags" value="springboot" />
<appender name="LOGSTASH" class="com.cwbase.logback.RedisAppender">
<source>application.properties</source>
<type>${logstash.type}</type>
<tags>${logstash.tags}</tags>
<host>localhost</host>
<database>10</database>
<key>logstash:redis</key>
<mdc>true</mdc>
<location>true</location>
<callerStackIndex>0</callerStackIndex>
</appender>
<appender name="ASYNC" class="ch.qos.logback.classic.AsyncAppender">
<appender-ref ref="LOGSTASH" />
</appender>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%date{yyyy-MM-dd HH:mm:ss} %-4relative [%thread] %-5level %logger{35} : %msg %n</pattern>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="STDOUT" />
<appender-ref ref="FILE" />
<appender-ref ref="LOGSTASH" />
</root>
</configuration>
input {
file {
path => ["/home/test/data.csv"]
start_position => "beginning" #从什么位置读取,beginnig时导入原有数据
sincedb_path => "/test/111"
type => "csv"
tags => ["optical", "gather"]
}
}
filter {
if [type] == "csv" { #多个配置文件同时执行的区分
csv {
columns =>["name","device_id"]
separator => "^"
quote_char => "‰"
remove_field => ["device_id","branch_id","area_type"]
}
}
output{
}
root@logging ~# find /srv/logstash/ -type f /srv/logstash/pipeline/config.conf /srv/logstash/bin/logstash /srv/logstash/config/logstash.yml
root@logging ~# cat /srv/logstash/bin/logstash
#!/usr/bin/python3
# -*- coding: utf-8 -*-
##############################################
# Home : http://netkiller.github.io
# Author: Neo <netkiller@msn.com>
# Upgrade: 2023-01-11
##############################################
import os
import sys
try:
module = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, module)
from netkiller.docker import *
except ImportError as err:
print("%s" % (err))
project = 'logstash'
# extra_hosts = [
# 'mongo.netkiller.cn:172.17.195.17', 'eos.netkiller.cn:172.17.15.17',
# 'cfca.netkiller.cn:172.17.15.17'
# ]
dockerfile = Dockerfile()
dockerfile.image('docker.elastic.co/logstash/logstash:8.6.0').run(
['apk add -U tzdata', 'rm -f /usr/share/logstash/pipeline/logstash.conf']
).copy('pipeline/', '/usr/share/logstash/pipeline/').copy('config/', '/usr/share/logstash/config/').workdir('/usr/share/logstash')
logstash = Services(project)
# logstash.image('logstash/logstash:alpine')
# logstash.build(dockerfile)
logstash.image('docker.elastic.co/logstash/logstash:8.6.0')
logstash.container_name(project)
logstash.restart('always')
# logstash.hostname('www.netkiller.cn')
# openrelogstashsty.extra_hosts(extra_hosts)
logstash.extra_hosts(['elasticsearch:127.0.0.1'])
logstash.environment(['TZ=Asia/Shanghai','XPACK_MONITORING_ENABLED=false','LOG_LEVEL=info'])
logstash.ports(['12201:12201/udp', '12201:12201/tcp'])
#logstash.ports(['12201:12201','4567:4567'])
# logstash.depends_on('test')
logstash.working_dir('/usr/share/logstash')
logstash.user('root')
logstash.volumes(
[
'/srv/logstash/pipeline/:/usr/share/logstash/pipeline/',
#'/srv/logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml:rw',
'/srv/logstash/logs/:/usr/share/logstash/logs/',
'/opt/log/:/opt/log/',
'/proc:/proc','/sys:/sys'
]
).privileged()
development = Composes('development')
development.workdir('/var/tmp/development')
development.version('3.9')
development.services(logstash)
if __name__ == '__main__':
try:
docker = Docker(
# {'DOCKER_HOST': 'ssh://root@192.168.30.11'}
)
# docker.sysctl({'neo': '1'})
docker.environment(development)
docker.main()
except KeyboardInterrupt:
print("Crtl+C Pressed. Shutting down.")
root@logging ~# cat /srv/logstash/pipeline/config.conf
input {
tcp {
port => 4567
codec => json_lines
}
gelf {
port => 12201
use_udp => true
use_tcp => true
}
}
filter {
ruby {
code => "event.set('datetime', event.get('@timestamp').time.localtime.strftime('%Y-%m-%d %H:%M:%S'))"
}
}
output {
if [marker] {
file {
path => "/opt/log/%{environment}/%{service}/%{marker}.%{+yyyy}-%{+MM}-%{+dd}.log"
codec => line { format => "[%{datetime}] %{level} %{message}"}
}
} else {
file {
path => "/opt/log/%{environment}/%{service}/spring.%{+yyyy}-%{+MM}-%{+dd}.log"
codec => line { format => "[%{datetime}] [%{host}:%{source_host}] [%{level}] (%{class}.%{method}:%{line}) - %{message}"}
}
}
file {
path => "/opt/log/%{environment}/%{service}/spring.%{+yyyy}-%{+MM}-%{+dd}.json.gz"
codec => json_lines
gzip => true
}
if [environment] =~ /(prod|grey)/ {
if "ERROR" in [level] {
http {
url => "https://oapi.dingtalk.com/robot/send?access_token=f9257740a3f084b0160ec06ae40f95b0b052e69c699400eaa5db316612de90f8"
http_method => "post"
content_type => "application/json; charset=utf-8"
format => "message"
message => '{"msgtype":"text","text":{"content":"时间:%{datetime}\n主机:%{host}[%{source_host}]\n环境:%{environment}\n服务:%{service}\n消息:%{message}"}}'
}
}
if "WARN" in [level] {
http {
url => "https://oapi.dingtalk.com/robot/send?access_token=d6602c6fb6b47250f38d31f791968a12201a6980f3a1175829a57e6afca7678b"
http_method => "post"
content_type => "application/json; charset=utf-8"
format => "message"
message => '{"msgtype":"text","text":{"content":"时间:%{datetime}\n主机:%{host}[%{source_host}]\n环境:%{environment}\n服务:%{service}\n消息:%{message}"}}'
}
}
}
if [environment] =~ /(stage|test|dev)/ {
if ("ERROR" in [level] or "WARN" in [level]) {
http {
url => "https://oapi.dingtalk.com/robot/send?access_token=9501f8d983188517fcbd204c89bf5f47b9dfdac2a788bda85bd353d8e266fb5f"
http_method => "post"
content_type => "application/json; charset=utf-8"
format => "message"
message => '{"msgtype":"text","text":{"content":"时间:%{datetime}\n主机:%{host}[%{source_host}]\n环境:%{environment}\n服务:%{service}\n消息:%{message}"}}'
}
}
}
}
日志钉钉报警,同时创建禅道任务,用来跟进故障
input {
tcp {
port => 4567
codec => json_lines
}
gelf {
port => 12201
use_udp => true
use_tcp => true
}
}
filter {
ruby {
code => "event.set('datetime', event.get('@timestamp').time.localtime.strftime('%Y-%m-%d %H:%M:%S'))"
}
}
output {
if [marker] {
file {
path => "/opt/log/%{environment}/%{service}/%{+MM}/%{marker}.%{+yyyy}-%{+MM}-%{+dd}.log"
codec => line { format => "[%{datetime}] %{level} %{message}"}
}
} else {
file {
path => "/opt/log/%{environment}/%{service}/%{+MM}/unknow.%{+yyyy}-%{+MM}-%{+dd}.log"
codec => line { format => "[%{datetime}] [%{host}:%{source_host}] [%{level}] (%{class}.%{method}:%{line}) - %{message}"}
}
}
file {
path => "/opt/log/%{environment}/%{service}/%{+MM}/unknow.%{+yyyy}-%{+MM}-%{+dd}.json.gz"
codec => json_lines
gzip => true
}
if [environment] =~ /(prod|grey)/ {
if "ERROR" in [level] {
http {
url => "https://oapi.dingtalk.com/robot/send?access_token=f9257740a0ec06ae40f316613f084b095b0b052e69c699400eaa5db162de90f8"
http_method => "post"
content_type => "application/json; charset=utf-8"
format => "message"
message => '{"msgtype":"text","text":{"content":"时间:%{datetime}\n主机:%{host}[%{source_host}]\n环境:%{environment}\n服务:%{service}\n消息:%{message}"}}'
}
}
if "WARN" in [level] {
http {
url => "https://oapi.dingtalk.com/robot/send?access_token=d66029a57e68d31f791968a12201a6980f3ac6fb6b47250f3117582afca7678b"
http_method => "post"
content_type => "application/json; charset=utf-8"
format => "message"
message => '{"msgtype":"text","text":{"content":"时间:%{datetime}\n主机:%{host}[%{source_host}]\n环境:%{environment}\n服务:%{service}\n消息:%{message}"}}'
}
}
}
if "compute" in [marker] and "prod" in [environment] {
http {
url => "https://oapi.dingtalk.com/robot/send?access_token=324ab12a36bcb2bb788720c974486218f2517de5a8f5fa009b52297934310c7f"
http_method => "post"
content_type => "application/json; charset=utf-8"
format => "message"
message => '{"msgtype":"text","text":{"content":"时间:%{datetime}\n主机:%{host}[%{source_host}]\n环境:%{environment}\n服务:%{service}\n消息:%{message}"}}'
}
http {
url => "http://zentao.netkiller.cn/zentao/gitlab.php?type=task&func=create&name=服务%{service}环境%{environment}"
http_method => "post"
format => "form"
mapping => {"message" => "时间:%{datetime}</br>主机:%{host}[%{source_host}]</br>环境:%{environment}</br>服务:%{service}</br>消息:%{message}"}
}
}
if [environment] =~ /(pre|test|dev|office)/ {
if ("ERROR" in [level] or "WARN" in [level]) {
http {
url => "https://oapi.dingtalk.com/robot/send?access_token=9501f8d9b9dfda204c89bf5f47788bda85bc2a83188517fcbdd353d8e266fb5f"
http_method => "post"
content_type => "application/json; charset=utf-8"
format => "message"
message => '{"msgtype":"text","text":{"content":"时间:%{datetime}\n主机:%{host}[%{source_host}]\n环境:%{environment}\n服务:%{service}\n消息:%{message}"}}'
}
}
}
}
curl -s https://raw.githubusercontent.com/netkiller/shell/master/search/elastic/elastic-6.x.sh | bash
curl -s https://raw.githubusercontent.com/netkiller/shell/master/search/elastic/beats/beats.sh | bash
filebeat.inputs:
- type: log
paths:
- /data/logs/*
fields:
project: ${PROJECT}
group: ${GROUP}
stage: ${STAGE}
format: ${FORMAT}
processors:
- add_cloud_metadata:
- add_host_metadata:
output.file:
path: "/tmp"
filename: filebeat
[docker@netkiller ~]$ cat filebeat.tcp.yml
filebeat.inputs:
- type: tcp
max_message_size: 10MiB
host: "localhost:9000"
output.file:
path: "/tmp"
filename: filebeat.log
[docker@netkiller ~]$ sudo chmod go-w /home/docker/filebeat.tcp.yml
[docker@netkiller ~]$ ss -lnt | grep 9000
LISTEN 0 1024 127.0.0.1:9000 0.0.0.0:*
[docker@netkiller ~]$ echo "Hello world!!!" | nc localhost 9000
echo "Hello worldss -lnt | grep 9000!" | nc localhost 9000
[docker@netkiller ~]$ cat /etc/filesystems | nc localhost 9000
[docker@netkiller ~]$ sudo cat /tmp/filebeat.log-20220728.ndjson |jq | grep message
"message": "Hello worldss -lnt | grep 9000!"
"message": "ext4",
"message": "ext3",
"message": "ext2",
"message": "nodev proc",
"message": "nodev devpts",
"message": "iso9660"
"message": "vfat",
"message": "hfs",
"message": "hfsplus",
"message": "*",
filebeat.yml
filebeat.inputs:
- type: log
paths:
- /tmp/*
fields:
project: www
group: netkiller.cn
stage: dev
format: json
multiline:
pattern: '^\[[^stacktrace]'
negate: true
match: after
processors:
- add_cloud_metadata:
- add_host_metadata:
output.logstash:
hosts: ["172.18.200.10:5044"]
logstash 配置
input {
beats {
port => 5044
}
}
output {
file {
path => "/opt/log/%{[fields][environment]}/%{[fields][service]}/spring.%{+yyyy}-%{+MM}-%{+dd}.log"
codec => line { format => "%{message}"}
}
file {
path => "/opt/log/%{[fields][environment]}/%{[fields][service]}/spring.%{+yyyy}-%{+MM}-%{+dd}.log.json.gz"
codec => json_lines
gzip => true
}
redis {
host => ["redis.netkiller.cn"]
password => "passw0rd"
key => "filebeat"
codec => json_lines
data_type => "channel"
}
}
filebeat 从 file 采集日志,发送到 logstash,logstash 接收的是一行一行的文本数据,没有 level 变量。实现 INFO,WARN,ERROR 切割,可以通过字符串匹配方式实现。
input {
beats {
port => 5044
}
}
filter{
if "MonthShardingAlgorithm" in [message] {
drop{}
}
}
output {
file {
path => "/opt/log/%{[fields][environment]}/%{[fields][service]}/%{+MM}/spring.%{+yyyy}-%{+MM}-%{+dd}.log"
codec => line { format => "%{message}"}
}
if "netkiller-service" == [fields][service] and "ERROR" in [message] {
file {
path => "/opt/log/%{[fields][environment]}/%{[fields][service]}/netkiller.beats.log"
codec => line { format => "%{message}"}
}
}
file {
path => "/opt/log/beats.%{+yyyy}-%{+MM}-%{+dd}.log.gz"
codec => json_lines
gzip => true
}
}
input {
beats {
port => 5044
}
}
filter{
if "MonthShardingAlgorithm" in [message] {
drop{}
}
grok{
match => ["message", "\[%{TIMESTAMP_ISO8601:timestamp}\] \[%{NOTSPACE:hostname}\] \[%{LOGLEVEL:level}\] \[%{NOTSPACE:thread-id}\] %{NOTSPACE:class} - %{JAVALOGMESSAGE:msg}"]
}
}
output {
file {
path => "/opt/log/%{[fields][environment]}/%{[fields][service]}/%{+MM}/spring.%{+yyyy}-%{+MM}-%{+dd}.log"
codec => line { format => "%{message}"}
}
if "compute-service" == [fields][service] and "ERROR" == [level] {
file {
path => "/opt/log/%{[fields][environment]}/%{[fields][service]}/compute.error.log"
codec => line { format => "%{message}"}
}
}
}
使用 top 查看 cpu 占用率长期80%,甚至 100%,解决方案是修改 jvm.options 配置文件,将 -Xmx1g 改为 -Xmx4g,-Xmx 配置建议是4~8g
## JVM configuration
# Xms represents the initial size of total heap space
# Xmx represents the maximum size of total heap space
-Xms1g
-Xmx4g
################################################################
## Expert settings
################################################################
##
## All settings below this section are considered
## expert settings. Don't tamper with them unless
## you understand what you are doing
##
################################################################
## GC configuration
11-13:-XX:+UseConcMarkSweepGC
11-13:-XX:CMSInitiatingOccupancyFraction=75
11-13:-XX:+UseCMSInitiatingOccupancyOnly
## Locale
# Set the locale language
#-Duser.language=en
# Set the locale country
#-Duser.country=US
# Set the locale variant, if any
#-Duser.variant=
## basic
# set the I/O temp directory
#-Djava.io.tmpdir=$HOME
# set to headless, just in case
-Djava.awt.headless=true
# ensure UTF-8 encoding by default (e.g. filenames)
-Dfile.encoding=UTF-8
# use our provided JNA always versus the system one
#-Djna.nosys=true
# Turn on JRuby invokedynamic
-Djruby.compile.invokedynamic=true
## heap dumps
# generate a heap dump when an allocation from the Java heap fails
# heap dumps are created in the working directory of the JVM
-XX:+HeapDumpOnOutOfMemoryError
# specify an alternative path for heap dumps
# ensure the directory exists and has sufficient space
#-XX:HeapDumpPath=${LOGSTASH_HOME}/heapdump.hprof
## GC logging
#-Xlog:gc*,gc+age=trace,safepoint:file=@loggc@:utctime,pid,tags:filecount=32,filesize=64m
# log GC status to a file with time stamps
# ensure the directory exists
#-Xloggc:${LS_GC_LOG_FILE}
# Entropy source for randomness
-Djava.security.egd=file:/dev/urandom
# Copy the logging context from parent threads to children
-Dlog4j2.isThreadContextMapInheritable=true
# curl 'http://localhost:9200/_search?pretty'
{
"took" : 1,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"failed" : 0
},
"hits" : {
"total" : 1,
"max_score" : 1.0,
"hits" : [
{
"_index" : ".kibana",
"_type" : "config",
"_id" : "5.2.2",
"_score" : 1.0,
"_source" : {
"buildNum" : 14723
}
}
]
}
}
elasticsearch 的配置不能省略 9200 端口,否则将无法链接elasticsearch
elasticsearch {
hosts => ["127.0.0.1:9200"]
}
#cd /etc/logstash/conf.d
#vim logstash_server.conf
input {
redis {
port => "6379"
host => "127.0.0.1"
data_type => "list"
key => "logstash-redis"
type => "redis-input"
}
}
output {
stdout {
codec => rubydebug
}
}
5.x type类型如果是date,那么系统默认使用 ISO8601 格式。 6.x 修复了这个问题。"ctime": "2017-12-18 11:21:57"
UDP 调试方法
[root@netkiller log]# cat test.json
{"facility":"logstash-gelf","source_host":"172.18.0.186","@version":"1","method":"init","message":"Test","class":"Application","host":"macbook-pro-m2.local","@timestamp":"2023-01-07T03:32:28.368Z","timestamp":"2023-01-07 11:32:28.368","marker":"spring","datetime":"2023-01-07 11:32:28","logger":"cn.netkiller.Application","level":"WARN","line":21,"version":"1.1"}
[root@netkiller log]# cat test.json | nc -u 127.0.0.1 12202
curl -s https://raw.githubusercontent.com/netkiller/shell/master/search/elastic/elastic-6.x.sh | bash
环境准备:
操作系统: CentOS 7
Java 1.8
Redis
ElasticSearch + Logstash + Kibana 均使用 5.2 版本
以下安装均使用 Netkiller OSCM 脚本一键安装
粘贴下面命令到Linux控制台即可一键安装
curl -s https://raw.githubusercontent.com/netkiller/shell/master/search/elasticsearch/elasticsearch-5.x.sh | bash
curl -s https://raw.githubusercontent.com/netkiller/shell/master/log/kibana/kibana-5.x.sh | bash
curl -s https://raw.githubusercontent.com/netkiller/shell/master/log/kibana/logstash-5.x.sh | bash