Home | 简体中文 | 繁体中文 | 杂文 | Github | 知乎专栏 | 51CTO学院 | CSDN程序员研修院 | OSChina 博客 | 腾讯云社区 | 阿里云栖社区 | Facebook | Linkedin | Youtube | 打赏(Donations) | About
知乎专栏多维度架构

3.2. ElasticSearch + Logstash + Kibana

官方网站 https://www.elastic.co

3.2.1. 安装

3.2.1.1. 8.x

dnf 安定
			
curl -s https://raw.githubusercontent.com/netkiller/shell/master/search/elastic/elastic-8.x.sh | bash
			
				

手工安装

			
rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch

cat >> /etc/yum.repos.d/logstash.repo <<EOF
[logstash-8.x]
name=Elastic repository for 8.x packages
baseurl=https://artifacts.elastic.co/packages/8.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
EOF

dnf install -y logstash

cp /etc/logstash/logstash.yml{,.original}
chown logstash:logstash -R /etc/logstash

systemctl daemon-reload
systemctl enable logstash.service
systemctl start logstash.service
			
				

修改启动用户,否则启动会失败

			
[root@netkiller ~]# vim /usr/lib/systemd/system/logstash.service
User=logstash
Group=logstash
修改
User=root
Group=root
			
				
Docker 安装
				
docker run --rm -it -v ~/pipeline/:/usr/share/logstash/pipeline/ docker.elastic.co/logstash/logstash:8.5.1				
				
				

3.2.1.2. 6.x

			
curl -s https://raw.githubusercontent.com/netkiller/shell/master/search/elastic/elastic-6.x.sh | bash
			
			

3.2.1.3. ElasticSearch + Logstash + Kibana 安装

环境准备:

操作系统: CentOS 7

Java 1.8

Redis

ElasticSearch + Logstash + Kibana 均使用 5.2 版本

以下安装均使用 Netkiller OSCM 脚本一键安装

ElasticSearch 安装

粘贴下面命令到Linux控制台即可一键安装

				
curl -s https://raw.githubusercontent.com/netkiller/shell/master/search/elasticsearch/elasticsearch-5.x.sh | bash
				
				
Kibana 安装
				
curl -s https://raw.githubusercontent.com/netkiller/shell/master/log/kibana/kibana-5.x.sh | bash
				
				
Logstash 安装
					curl -s https://raw.githubusercontent.com/netkiller/shell/master/log/kibana/logstash-5.x.sh | bash
				
从 5.x 升级到 6.x

升级仓库

				
curl -s https://raw.githubusercontent.com/netkiller/shell/master/search/elastic/elastic-6.x.sh | bash		
				
				
				
yum update logstash
				
				

3.2.2. logstash 命令简单应用

3.2.2.1. -e 命令行运行

logstash -e "input {stdin{}} output {stdout{}}"

			
/usr/share/logstash/bin/logstash  -e 'input{file {path => "/etc/centos-release" start_position => "beginning"}} output { stdout {}}'			
			
			

3.2.2.2. -f 指定配置文件

			
/usr/share/logstash/bin/logstash -f stdin.conf		

/usr/share/logstash/bin/logstash -f jdbc.conf --path.settings /etc/logstash --path.data /tmp	
			
			

3.2.2.3. -t:测试配置文件是否正确,然后退出。

			
root@netkiller ~/logstash % /usr/share/logstash/bin/logstash -t -f test.conf
WARNING: Default JAVA_OPTS will be overridden by the JAVA_OPTS defined in the environment. Environment JAVA_OPTS are -server -Xms2048m -Xmx4096m
WARNING: Could not find logstash.yml which is typically located in $LS_HOME/config or /etc/logstash. You can specify the path using --path.settings. Continuing using the defaults
Could not find log4j2 configuration at path /usr/share/logstash/config/log4j2.properties. Using default config which logs errors to the console
Configuration OK
			
			

3.2.2.4. -l:日志输出的地址

默认就是stdout直接在控制台中输出

3.2.2.5. log.level 启动Debug模式

			
% /usr/share/logstash/bin/logstash -f nginx.conf --path.settings /etc/logstash --log.level debug			
			
			

3.2.3. 配置 Broker(Redis)

3.2.3.1. indexer

/etc/logstash/conf.d/indexer.conf

			
input {
  redis {
    host => "127.0.0.1"
    port => "6379" 
    key => "logstash:demo"
    data_type => "list"
    codec  => "json"
    type => "logstash-redis-demo"
    tags => ["logstashdemo"]
  }
}

output {
  stdout { codec => rubydebug }
  elasticsearch {
    hosts => ["127.0.0.1:9200"]
  }
}	
			
			

测试

			
# redis-cli 
127.0.0.1:6379> RPUSH logstash:demo "{\"time\": \"2012-01-01T10:20:00\", \"message\": \"logstash demo message\"}"
(integer) 1
127.0.0.1:6379> exit
			
			

如果执行成功日志如下

			
# cat /var/log/logstash/logstash-plain.log 
[2017-03-22T15:54:36,491][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://127.0.0.1:9200/]}}
[2017-03-22T15:54:36,496][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://127.0.0.1:9200/, :path=>"/"}
[2017-03-22T15:54:36,600][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>#<URI::HTTP:0x20dae6aa URL:http://127.0.0.1:9200/>}
[2017-03-22T15:54:36,601][INFO ][logstash.outputs.elasticsearch] Using mapping template from {:path=>nil}
[2017-03-22T15:54:36,686][INFO ][logstash.outputs.elasticsearch] Attempting to install template {:manage_template=>{"template"=>"logstash-*", "version"=>50001, "settings"=>{"index.refresh_interval"=>"5s"}, "mappings"=>{"_default_"=>{"_all"=>{"enabled"=>true, "norms"=>false}, "dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"*", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword"}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date", "include_in_all"=>false}, "@version"=>{"type"=>"keyword", "include_in_all"=>false}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}}
[2017-03-22T15:54:36,693][INFO ][logstash.outputs.elasticsearch] Installing elasticsearch template to _template/logstash
[2017-03-22T15:54:36,780][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>[#<URI::Generic:0x2f9efc89 URL://127.0.0.1>]}
[2017-03-22T15:54:36,787][INFO ][logstash.pipeline        ] Starting pipeline {"id"=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>1000}
[2017-03-22T15:54:36,792][INFO ][logstash.inputs.redis    ] Registering Redis {:identity=>"redis://@127.0.0.1:6379/0 list:logstash:demo"}
[2017-03-22T15:54:36,793][INFO ][logstash.pipeline        ] Pipeline main started
[2017-03-22T15:54:36,838][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2017-03-22T15:55:10,018][WARN ][logstash.runner          ] SIGTERM received. Shutting down the agent.
[2017-03-22T15:55:10,024][WARN ][logstash.agent           ] stopping pipeline {:id=>"main"}			
			
			

3.2.3.2. shipper

			
input {
  file {
    path => [ "/var/log/nginx/access.log" ]
    start_position => "beginning"
  }
}

filter {
  grok {
    match => { "message" => "%{NGINXACCESS}" }
    add_field => { "type" => "access" }
  }
  date {
    match => [ "timestamp" , "dd/MMM/YYYY:HH:mm:ss Z" ]
  }
  geoip {
    source => "clientip"
  }
}

output {
  redis {
    host => "127.0.0.1"
    port => 6379
    data_type => "list"
    key => "logstash:demo"
  }
}
			
			

3.2.4. logstash 配置项

3.2.4.1. 多 pipeline 配置

			
[root@netkiller ~]# cat /etc/logstash/pipelines.yml
# This file is where you define your pipelines. You can define multiple.
# For more information on multiple pipelines, see the documentation:
#   https://www.elastic.co/guide/en/logstash/current/multiple-pipelines.html

- pipeline.id: main
  path.config: "/etc/logstash/conf.d/*.conf"			
			
			

配置 pipelines.yml 文件

			
- pipeline.id: main
  path.config: "/etc/logstash/conf.d/*.conf"
- pipeline.id: finance
  path.config: "/etc/logstash/conf.finance/*.conf"
- pipeline.id: market
  path.config: "/etc/logstash/conf.market/*.conf"
- pipeline.id: customer
  path.config: "/etc/logstash/conf.customer/*.conf"  
			
			

3.2.4.2. input

标准输入输出
root@netkiller ~ % /usr/share/logstash/bin/logstash -e "input {stdin{}} output {stdout{}}"
Helloworld
ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.
WARNING: Could not find logstash.yml which is typically located in $LS_HOME/config or /etc/logstash. You can specify the path using --path.settings. Continuing using the defaults
Could not find log4j2 configuration at path //usr/share/logstash/config/log4j2.properties. Using default config which logs to console
18:03:38.340 [[main]-pipeline-manager] INFO logstash.pipeline - Starting pipeline {"id"=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>1000}
18:03:38.356 [[main]-pipeline-manager] INFO logstash.pipeline - Pipeline main started
The stdin plugin is now waiting for input:
2017-08-03T10:03:38.375Z localhost Helloworld
18:03:38.384 [Api Webserver] INFO logstash.agent - Successfully started Logstash API endpoint {:port=>9601}				
				
				
rubydebug

rubydebug提供以json格式输出到屏幕

								
root@netkiller ~ % /usr/share/logstash/bin/logstash -e 'input{stdin{}}output{stdout{codec=>rubydebug}}'
My name is neo
ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.
WARNING: Could not find logstash.yml which is typically located in $LS_HOME/config or /etc/logstash. You can specify the path using --path.settings. Continuing using the defaults
Could not find log4j2 configuration at path //usr/share/logstash/config/log4j2.properties. Using default config which logs to console
18:05:02.734 [[main]-pipeline-manager] INFO logstash.pipeline - Starting pipeline {"id"=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>1000}
18:05:02.747 [[main]-pipeline-manager] INFO logstash.pipeline - Pipeline main started
The stdin plugin is now waiting for input:
{
"@timestamp" => 2017-08-03T10:05:02.764Z,
"@version" => "1",
"host" => "localhost",
"message" => "My name is neo"
}
18:05:02.782 [Api Webserver] INFO logstash.agent - Successfully started Logstash API endpoint {:port=>9601}
				
				
本地文件
				
input {
  file {
    type => "syslog"
    path => [ "/var/log/maillog", "/var/log/messages", "/var/log/secure" ]
    start_position => "beginning"
  }
}
output {
  stdout { codec => rubydebug }
  elasticsearch { 
    hosts => ["127.0.0.1:9200"] 
  }
}		
				
				

start_position => "beginning" 从头开始读,如果没有这个选项,只会读取最后更新的数据。

指定文件类型
					
input { 
 file { path =>"/var/log/messages" type =>"syslog"} 
 file { path =>"/var/log/apache/access.log" type =>"apache"} 
}					 
					
					
Nginx
						
input {
        file {
                type => "nginx_access"
                path => ["/usr/share/nginx/logs/test.access.log"]
        }
}
output {
        redis {
                host => "localhost"
                data_type => "list"
                key => "logstash:redis"
        }
}						
						
						
TCP/UDP
				
input {
  file {
    type => "syslog"
    path => [ "/var/log/secure", "/var/log/messages", "/var/log/syslog" ]
  }
  tcp {
    port => "5145"
    type => "syslog-network"
  }
  udp {
    port => "5145"
    type => "syslog-network"
  }
}
output {
  elasticsearch { 
    hosts => ["127.0.0.1:9200"] 
  }
}
				
				
Redis
				
input {
  redis {
    host => "127.0.0.1"
    port => "6379" 
    key => "logstash:demo"
    data_type => "list"
    codec  => "json"
    type => "logstash-redis-demo"
    tags => ["logstashdemo"]
  }
}

output {
  elasticsearch {
    hosts => ["127.0.0.1:9200"]
  }
}
				
				

指定 Database 10

				
root@netkiller /etc/logstash/conf.d % cat spring-boot-redis.conf 
input {
 redis {
  codec => json
  host => "localhost"
  port => 6379
  db => 10
  key => "logstash:redis"
  data_type => "list"
 }
}

output {
  stdout { codec => rubydebug }
  elasticsearch {
    hosts => ["127.0.0.1:9200"]
    index => "logstash-api"
  }
}
				
				
Kafka

				
input {
  kafka {
   zk_connect => "kafka:2181"
   group_id => "logstash"
   topic_id => "apache_logs"
   consumer_threads => 16
  }
}		
				
				
jdbc
				
root@netkiller /etc/logstash/conf.d % cat jdbc.conf 
input {
  jdbc {
    jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"
    jdbc_user => "cms"
    jdbc_password => "123456"
    schedule => "* * * * *"
    statement => "select * from article where id > :sql_last_value"
    use_column_value => true
    tracking_column => "id"
    tracking_column_type => "numeric" 
    record_last_run => true
    last_run_metadata_path => "/var/tmp/article.last"
  }
  jdbc {
    jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"
    jdbc_user => "cms"
    jdbc_password => "123456"
    schedule => "* * * * *"	#定时cron的表达式,这里是每分钟执行一次
    statement => "select * from article where ctime > :sql_last_value"
    use_column_value => true
    tracking_column => "ctime"
    tracking_column_type => "timestamp" 
    record_last_run => true
    last_run_metadata_path => "/var/tmp/article-ctime.last"
  }

}
output {
    elasticsearch {
    	hosts => "localhost:9200"
        index => "information"
        document_type => "article"
        document_id => "%{id}"
        action => "update"
        doc_as_upsert => true
    }
}				
				
				

3.2.4.3. filter

日期格式化

系统默认是 ISO8601 如果需要转换为 yyyy-MM-dd-HH:mm:ss 参考:

				

filter {
  date {
    match => [ "ctime", "yyyy-MM-dd HH:mm:ss" ]
    locale => "cn"
  }
 date {
    match => [ "mtime", "yyyy-MM-dd HH:mm:ss" ]
    locale => "cn"
  } 
}
				
				
				
date {
    locale => "zh-CN"
    #match => ["@timestamp", "yyyy-MM-dd HH:mm:ss"]
	match => ["@timestamp", "ISO8601"]
    timezone => "Asia/Shanghai"
	target => ["@timestamp"]
}				
				
				
patterns

创建匹配文件 /usr/share/logstash/patterns

				
mkdir /usr/share/logstash/patterns
vim /usr/share/logstash/patterns

NGUSERNAME [a-zA-Z\.\@\-\+_%]+
NGUSER %{NGUSERNAME}
NGINXACCESS %{IPORHOST:clientip} %{NGUSER:ident} %{NGUSER:auth} \[%{HTTPDATE:timestamp}\] "%{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response} (?:%{NUMBER:bytes}|-) (?:"(?:%{URI:referrer}|-)"|%{QS:referrer}) %{QS:agent}
				
				
				
filter {
  if [type] == "nginx-access" {
    grok {
      match => { "message" => "%{NGINXACCESS}" }
    }
  }
}
				
				
syslog
				
input {
  file {
    type => "syslog" 
    path => [ "/var/log/*.log", "/var/log/messages", "/var/log/syslog" ]
    sincedb_path => "/opt/logstash/sincedb-access"
  }
  syslog {
    type => "syslog"
    port => "5544"
  }
}
 
filter {
  grok {
    type => "syslog"
    match => [ "message", "%{SYSLOGBASE2}" ]
    add_tag => [ "syslog", "grokked" ]
  }
}
 
output {
 elasticsearch { host => "elk.netkiller.cn" }
}				
				
				
csv
				
input {
    file {
        type => "SSRCode"
        path => "/SD/2015*/01*/*.csv"
        start_position => "beginning"
    }
}
 
filter {
	csv {
		columns => ["Code","Source"]
		separator => ","
	}
	kv {
		source => "uri"
		field_split => "&?"
		value_split => "="
	}
 
}
 
# output logs to console and to elasticsearch
output {
    stdout {}
    elasticsearch {
        hosts => ["172.16.1.1:9200"]
    }
}				
				
				
使用ruby 处理 CSV文件
				
input {
    stdin {}
}
filter {
    ruby {
        init => "
            begin
                @@csv_file    = 'output.csv'
                @@csv_headers = ['A','B','C']
                if File.zero?(@@csv_file) || !File.exist?(@@csv_file)
                    CSV.open(@@csv_file, 'w') do |csv|
                        csv << @@csv_headers
                    end
                end
            end
        "
        code => "
            begin
                event['@metadata']['csv_file']    = @@csv_file
                event['@metadata']['csv_headers'] = @@csv_headers
            end
        "
    }
    csv {
        columns => ["a", "b", "c"]
    }
}
output {
    csv {
        fields => ["a", "b", "c"]
        path   => "%{[@metadata][csv_file]}"
    }
    stdout {
        codec => rubydebug {
            metadata => true
        }
    }
}				
				
				

测试

				
echo "1,2,3\n4,5,6\n7,8,9" | ./bin/logstash -f csv-headers.conf				
				
				

输出结果

				
A,B,C
1,2,3
4,5,6
7,8,9
				
				
执行 ruby 代码

日期格式化, 将ISO 8601日期格式转换为 %Y-%m-%d %H:%M:%S

保存下面内容到配置文件data.conf

				
input {
	stdin{}
}
filter {

	ruby {
		init => "require 'time'"
        code => "event.set('ctime', event.get('ctime').time.localtime.strftime('%Y-%m-%d %H:%M:%S'))"
    }

	ruby {
		init => "require 'time'"
        code => "event.set('mtime', event.get('mtime').time.localtime.strftime('%Y-%m-%d %H:%M:%S'))"
    }
}
output {

	stdout {
		codec => rubydebug
	}

}
				
				

/usr/share/logstash/bin/logstash -f date.conf

grok debug 工具

http://grokdebug.herokuapp.com

3.2.4.4. output

stdout
				
output {
	stdout { codec => rubydebug }
}				
				
				
file 写入文件

/etc/logstash/conf.d/file.conf

				
output {
    file {
        path => "/path/to/%{host}/%{+yyyy}/%{+MM}/%{+dd}.log.gz"
        message_format => "%{message}"
        gzip => true
    }
}
				
				

每个 tags 标签生成一个日志文件

				
input {
  tcp {
	port => 4567 
	codec => json_lines
  }
}

filter {
	ruby {
	    code => "event.set('datetime', event.get('@timestamp').time.localtime.strftime('%Y-%m-%d %H:%M:%S'))"
	}
}

output {

	if "finance" in [tags] { 
		file {
			path => "/opt/log/%{app}.finance.%{+yyyy}-%{+MM}-%{+dd}.log"
			codec => line { format => "[%{datetime}] %{level} %{message}"}
		}
	 } else if "market" in [tags] {
		file {
			path => "/opt/log/%{app}.market.%{+yyyy}-%{+MM}-%{+dd}.log"
			codec => line { format => "[%{datetime}] %{level} %{message}"}
		}
	} else {
		file {
			path => "/opt/log/%{app}.unknow.%{+yyyy}-%{+MM}-%{+dd}.log"
			codec => line { format => "[%{datetime}] %{level} %{message}"}
		}
	
	}
    file {
        path => "/opt/log/%{app}.%{+yyyy}-%{+MM}-%{+dd}.log.gz"
		codec => json_lines
        gzip => true
    }

}				
				
				
elasticsearch
				
output {
  stdout { codec => rubydebug }
  elasticsearch {
    hosts => ["127.0.0.1:9200"]
    index => "logging"
  }
}				
				
				
自定义 index

配置实现每日切割一个 index

					
index => "logstash-%{+YYYY.MM.dd}"

"_index" : "logstash-2017.03.22"	
					
					

index 自定义 logstash-%{type}-%{+YYYY.MM.dd}

					
input {

    redis {
        data_type => "list"
        key => "logstash:redis"
        host => "127.0.0.1"
        port => 6379
        threads => 5
        codec => "json"
    }
}
filter {

}
output {

    elasticsearch {
        hosts => ["127.0.0.1:9200"]
        index => "logstash-%{type}-%{+YYYY.MM.dd}"
        document_type => "%{type}"
        workers => 1
        flush_size => 20
        idle_flush_time => 1
        template_overwrite => true
    }
    stdout{}
}					
					
					
exec 执行脚本
				
output {
    exec {
        command => "sendsms.php \"%{message}\" -t %{user}"
    }
}
				
				
http
				
[root@netkiller log]# cat /etc/logstash/conf.d/file.conf 
input {
  tcp {
	port => 4567 
	codec => json_lines
  }
  gelf {
    port => 12201
    use_udp => true
    #use_tcp => true
  }
}

filter {
	ruby {
	    code => "event.set('datetime', event.get('@timestamp').time.localtime.strftime('%Y-%m-%d %H:%M:%S'))"
	}
}

output {

    file {
    	path => "/opt/log/%{marker}.%{+yyyy}-%{+MM}-%{+dd}.log"
    	codec => line { format => "[%{datetime}] %{level} %{message}"}
    }
   
    file {
        path => "/opt/log/origin.%{+yyyy}-%{+MM}-%{+dd}.log.gz"
		codec => json_lines
        gzip => true
    }

    if "ERROR" in [level] {
        http {
            url => "https://oapi.dingtalk.com/robot/send?access_token=56c27cb761c4a16473db02d9d28734a56cf549f6977ecc281d008f9a239ba3e0"
            http_method => "post"
            content_type => "application/json; charset=utf-8"
            format => "message"
            message => '{"msgtype":"text","text":{"content":"Monitor: %{message}"}}'
        }
    }
}
				
				

3.2.5. Example

https://github.com/kmtong/logback-redis-appender

3.2.5.1. Spring boot logback

例 3.2. spring boot logback

				
root@netkiller /etc/logstash/conf.d % cat spring-boot-redis.conf 
input {
 redis {
  codec => json
  host => "localhost"
  port => 6379
  key => "logstash:redis"
  data_type => "list"
 }
}

output {
  elasticsearch {
    hosts => ["127.0.0.1:9200"]
    index => "logstash-api"
  }
}
				
				

src/main/resources/logback.xml

				
neo@MacBook-Pro ~/deployment % cat api.netkiller.cn/src/main/resources/logback.xml 
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
	<include resource="org/springframework/boot/logging/logback/defaults.xml" />
	<include resource="org/springframework/boot/logging/logback/file-appender.xml" />
	<property name="type.name" value="test" />
	<appender name="LOGSTASH" class="com.cwbase.logback.RedisAppender">
		<source>mySource</source>
		<sourcePath>mySourcePath</sourcePath>
		<type>myApplication</type>
		<tags>production</tags>
		<host>localhost</host>
		<port>6379</port>
		<database>0</database>
		<key>logstash:api</key>
	</appender>
	<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
		<encoder>
			<pattern>%date{yyyy-MM-dd HH:mm:ss} %-4relative [%thread] %-5level %logger{35} : %msg %n</pattern>
		</encoder>
	</appender>
	<root level="INFO">
		<appender-ref ref="STDOUT" />
		<appender-ref ref="FILE" />
		<appender-ref ref="LOGSTASH" />
	</root>
</configuration>
				
				

			
[root@netkiller ~]# cat /etc/logstash/conf.d/file.conf
input {
  tcp {
    port => 4567 
    codec => json_lines
  }
}

filter {
    #ruby {
    #      code => "event.set('@timestamp', LogStash::Timestamp.at(event.get('@timestamp').time.localtime + 8*60*60))"
    #}
    ruby {
		code => "event.set('datetime', event.get('@timestamp').time.localtime.strftime('%Y-%m-%d %H:%M:%S'))"
    }
}

output {

    file {
        path => "/opt/log/%{app}.%{+yyyy}-%{+MM}-%{+dd}.log.gz"
		codec => line { format => "[%{datetime}] %{level} %{message}"}
		#codec => json_lines
        gzip => true
    }

}
			
			

每个 tags 一个文件

			
[root@netkiller ~]# cat /etc/logstash/conf.d/file.conf 
input {
  tcp {
    port => 4567 
    codec => json_lines
  }
}

filter {
    ruby {
	code => "event.set('datetime', event.get('@timestamp').time.localtime.strftime('%Y-%m-%d %H:%M:%S'))"
    }
}

output {

	if "finance" in [tags] { 
	    file {
	        path => "/opt/log/%{app}.finance.%{+yyyy}-%{+MM}-%{+dd}.log"
			codec => line { format => "[%{datetime}] %{level} %{message} %{tags}"}
		}
	 
	 } else if "market" in [tags] {
	
	    file {
			path => "/opt/log/%{app}.market.%{+yyyy}-%{+MM}-%{+dd}.log"
			codec => line { format => "[%{datetime}] %{level} %{message} %{tags}"}
		}
	} else {
	
	    file {
			path => "/opt/log/%{app}.unknow.%{+yyyy}-%{+MM}-%{+dd}.log"
			codec => line { format => "[%{datetime}] %{level} %{message} %{tags}"}
		}
    }

}			
			
			

3.2.5.2. 索引切割实例

例 3.3. Elasticsearch 索引切割示例

				
root@netkiller /opt/api.netkiller.cn % cat /etc/logstash/conf.d/spring-boot-redis.conf 
input {
 redis {
  codec => json
  host => "localhost"
  port => 6379
  db => 10
  key => "logstash:redis"
  data_type => "list"
 }
}

output {
  stdout { codec => rubydebug }
  elasticsearch {
    hosts => ["127.0.0.1:9200"]
    index => "logstash-%{type}-%{+YYYY.MM.dd}"
  }
}

				
				
				
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
	<include resource="org/springframework/boot/logging/logback/defaults.xml" />
	<include resource="org/springframework/boot/logging/logback/file-appender.xml" />
	<property name="logstash.type" value="api" />
	<property name="logstash.tags" value="springboot" />
	<appender name="LOGSTASH" class="com.cwbase.logback.RedisAppender">
		<source>application.properties</source>
		<type>${logstash.type}</type>
		<tags>${logstash.tags}</tags>

		<host>localhost</host>
		<database>10</database>
		<key>logstash:redis</key>

		<mdc>true</mdc>
		<location>true</location>
		<callerStackIndex>0</callerStackIndex>

	</appender>
	<appender name="ASYNC" class="ch.qos.logback.classic.AsyncAppender">
		<appender-ref ref="LOGSTASH" />
	</appender>

	<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
		<encoder>
			<pattern>%date{yyyy-MM-dd HH:mm:ss} %-4relative [%thread] %-5level %logger{35} : %msg %n</pattern>
		</encoder>
	</appender>
	<root level="INFO">
		<appender-ref ref="STDOUT" />
		<appender-ref ref="FILE" />
		<appender-ref ref="LOGSTASH" />
	</root>
</configuration>
				
				

3.2.5.3. csv 文件实例

			
			
input {
    file {
        path => ["/home/test/data.csv"]
        start_position => "beginning" #从什么位置读取,beginnig时导入原有数据
        sincedb_path => "/test/111"
                type => "csv"
                tags => ["optical", "gather"]
    }
}

filter {
        if [type] == "csv" { #多个配置文件同时执行的区分
        csv {
            columns =>["name","device_id"]
            separator => "^"
			quote_char => "‰"
			remove_field => ["device_id","branch_id","area_type"]
       }
   }
output{
}			
			
			
			

3.2.5.4. 区分环境

			 
root@logging ~# find /srv/logstash/ -type f
/srv/logstash/pipeline/config.conf
/srv/logstash/bin/logstash
/srv/logstash/config/logstash.yml
			
			
			 
root@logging ~# cat /srv/logstash/bin/logstash
#!/usr/bin/python3
# -*- coding: utf-8 -*-
##############################################
# Home	: http://netkiller.github.io
# Author: Neo <netkiller@msn.com>
# Upgrade: 2023-01-11
##############################################
import os
import sys
try:
	module = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
	sys.path.insert(0, module)
	from netkiller.docker import *
except ImportError as err:
	print("%s" % (err))

project = 'logstash'

# extra_hosts = [
#    'mongo.netkiller.cn:172.17.195.17', 'eos.netkiller.cn:172.17.15.17',
#    'cfca.netkiller.cn:172.17.15.17'
# ]

dockerfile = Dockerfile()
dockerfile.image('docker.elastic.co/logstash/logstash:8.6.0').run(
	['apk add -U tzdata', 'rm -f /usr/share/logstash/pipeline/logstash.conf']
).copy('pipeline/', '/usr/share/logstash/pipeline/').copy('config/', '/usr/share/logstash/config/').workdir('/usr/share/logstash')

logstash = Services(project)
# logstash.image('logstash/logstash:alpine')
# logstash.build(dockerfile)
logstash.image('docker.elastic.co/logstash/logstash:8.6.0')
logstash.container_name(project)
logstash.restart('always')
# logstash.hostname('www.netkiller.cn')
# openrelogstashsty.extra_hosts(extra_hosts)
logstash.extra_hosts(['elasticsearch:127.0.0.1'])
logstash.environment(['TZ=Asia/Shanghai','XPACK_MONITORING_ENABLED=false','LOG_LEVEL=info'])
logstash.ports(['12201:12201/udp', '12201:12201/tcp'])
#logstash.ports(['12201:12201','4567:4567'])
# logstash.depends_on('test')
logstash.working_dir('/usr/share/logstash')
logstash.user('root')
logstash.volumes(
	[
		'/srv/logstash/pipeline/:/usr/share/logstash/pipeline/',
		#'/srv/logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml:rw',
		'/srv/logstash/logs/:/usr/share/logstash/logs/',
		'/opt/log/:/opt/log/',
		'/proc:/proc','/sys:/sys'
	]
).privileged()

development = Composes('development')
development.workdir('/var/tmp/development')
development.version('3.9')
development.services(logstash)

if __name__ == '__main__':
	try:
		docker = Docker(
			# {'DOCKER_HOST': 'ssh://root@192.168.30.11'}
		)
		# docker.sysctl({'neo': '1'})
		docker.environment(development)
		docker.main()
	except KeyboardInterrupt:
		print("Crtl+C Pressed. Shutting down.")

			
			
			 
root@logging ~# cat /srv/logstash/pipeline/config.conf
input {
	tcp {
	port => 4567
	codec => json_lines
	}
	gelf {
	port => 12201
	use_udp => true
	use_tcp => true
	}
}

filter {
	ruby {
		code => "event.set('datetime', event.get('@timestamp').time.localtime.strftime('%Y-%m-%d %H:%M:%S'))"
	}
}

output {
	if [marker] {
	file {
		path => "/opt/log/%{environment}/%{service}/%{marker}.%{+yyyy}-%{+MM}-%{+dd}.log"
		codec => line { format => "[%{datetime}] %{level} %{message}"}
	}
	} else {
		file {
		path => "/opt/log/%{environment}/%{service}/spring.%{+yyyy}-%{+MM}-%{+dd}.log"
		codec => line { format => "[%{datetime}] [%{host}:%{source_host}] [%{level}] (%{class}.%{method}:%{line}) - %{message}"}
		}
	}
	file {
		path => "/opt/log/%{environment}/%{service}/spring.%{+yyyy}-%{+MM}-%{+dd}.json.gz"
		codec => json_lines
		gzip => true
	}
	if [environment] =~ /(prod|grey)/ {
		if "ERROR" in [level] {
			http {
				url => "https://oapi.dingtalk.com/robot/send?access_token=f9257740a3f084b0160ec06ae40f95b0b052e69c699400eaa5db316612de90f8"
				http_method => "post"
				content_type => "application/json; charset=utf-8"
				format => "message"
				message => '{"msgtype":"text","text":{"content":"时间:%{datetime}\n主机:%{host}[%{source_host}]\n环境:%{environment}\n服务:%{service}\n消息:%{message}"}}'
			}
		}
		if "WARN" in [level] {
			http {
				url => "https://oapi.dingtalk.com/robot/send?access_token=d6602c6fb6b47250f38d31f791968a12201a6980f3a1175829a57e6afca7678b"
				http_method => "post"
				content_type => "application/json; charset=utf-8"
				format => "message"
				message => '{"msgtype":"text","text":{"content":"时间:%{datetime}\n主机:%{host}[%{source_host}]\n环境:%{environment}\n服务:%{service}\n消息:%{message}"}}'
			}
		}
	}

	if [environment] =~ /(stage|test|dev)/ {
		if ("ERROR" in [level] or "WARN" in [level]) {
			http {
				url => "https://oapi.dingtalk.com/robot/send?access_token=9501f8d983188517fcbd204c89bf5f47b9dfdac2a788bda85bd353d8e266fb5f"
				http_method => "post"
				content_type => "application/json; charset=utf-8"
				format => "message"
				message => '{"msgtype":"text","text":{"content":"时间:%{datetime}\n主机:%{host}[%{source_host}]\n环境:%{environment}\n服务:%{service}\n消息:%{message}"}}'
			}
		}
	}
}
			
			

3.2.6. Beats

Beats 是一个免费且开放的平台,集合了多种单一用途数据采集器。它们从成百上千或成千上万台机器和系统向 Logstash 或 Elasticsearch 发送数据。

3.2.6.1. 安装 Beta

Beats 6.x 安装
			
curl -s https://raw.githubusercontent.com/netkiller/shell/master/search/elastic/elastic-6.x.sh | bash
curl -s https://raw.githubusercontent.com/netkiller/shell/master/search/elastic/beats/beats.sh | bash
			
				
Beats 5.x 安装
					curl -s https://raw.githubusercontent.com/netkiller/shell/master/log/beats/beats-5.x.sh | bash
				

3.2.6.2. Filebeat

模块管理
				
filebeat modules list				
				
				
文件到文件
				
filebeat.inputs:
- type: log
  paths:
    - /data/logs/*
  fields:
    project: ${PROJECT}
    group: ${GROUP}
    stage: ${STAGE}
    format: ${FORMAT}

processors:
  - add_cloud_metadata:
  - add_host_metadata:

output.file:
  path: "/tmp"
  filename: filebeat			
				
				
TCP
				
[docker@netkiller ~]$ cat filebeat.tcp.yml 
filebeat.inputs:
- type: tcp
  max_message_size: 10MiB
  host: "localhost:9000"

output.file:
  path: "/tmp"
  filename: filebeat.log			
				
				

				
[docker@netkiller ~]$ sudo chmod go-w /home/docker/filebeat.tcp.yml				
				
				
				
[docker@netkiller ~]$ ss -lnt | grep 9000
LISTEN 0      1024       127.0.0.1:9000       0.0.0.0:*  				
				
				
				
[docker@netkiller ~]$ echo "Hello world!!!" | nc localhost 9000
echo "Hello worldss -lnt | grep 9000!" | nc localhost 9000

[docker@netkiller ~]$ cat /etc/filesystems | nc localhost 9000

[docker@netkiller ~]$ sudo cat /tmp/filebeat.log-20220728.ndjson |jq | grep message
  "message": "Hello worldss -lnt | grep 9000!"
  "message": "ext4",
  "message": "ext3",
  "message": "ext2",
  "message": "nodev proc",
  "message": "nodev devpts",
  "message": "iso9660"
  "message": "vfat",
  "message": "hfs",
  "message": "hfsplus",
  "message": "*",				
				
				

3.2.7. FAQ

3.2.7.1. 查看 Kibana 数据库

			
# curl 'http://localhost:9200/_search?pretty'
{
  "took" : 1,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "failed" : 0
  },
  "hits" : {
    "total" : 1,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : ".kibana",
        "_type" : "config",
        "_id" : "5.2.2",
        "_score" : 1.0,
        "_source" : {
          "buildNum" : 14723
        }
      }
    ]
  }
}			
			
			

3.2.7.2. logstash 无法写入 elasticsearch

elasticsearch 的配置不能省略 9200 端口,否则将无法链接elasticsearch

			
  elasticsearch {
    hosts => ["127.0.0.1:9200"]
  }			
			
			

3.2.7.3. 标准输出

			
#cd /etc/logstash/conf.d
#vim logstash_server.conf
input {
    redis {
        port => "6379"
        host => "127.0.0.1"
        data_type => "list"
        key => "logstash-redis"
        type => "redis-input"
   }
}
output {
    stdout {
    	codec => rubydebug
    }
}			
			
			

3.2.7.4. 5.x 升级至 6.x 的变化

5.x type类型如果是date,那么系统默认使用 ISO8601 格式。 6.x 修复了这个问题。"ctime": "2017-12-18 11:21:57"

3.2.7.5. 日志的调试

UDP 调试方法

			 
[root@netkiller log]# cat test.json 
{"facility":"logstash-gelf","source_host":"172.18.0.186","@version":"1","method":"init","message":"Test","class":"Application","host":"macbook-pro-m2.local","@timestamp":"2023-01-07T03:32:28.368Z","timestamp":"2023-01-07 11:32:28.368","marker":"spring","datetime":"2023-01-07 11:32:28","logger":"cn.netkiller.Application","level":"WARN","line":21,"version":"1.1"}

[root@netkiller log]# cat test.json | nc -u 127.0.0.1 12202