curl -s https://raw.githubusercontent.com/netkiller/shell/master/search/elastic/elastic-8.x.sh | bash
rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch cat >> /etc/yum.repos.d/logstash.repo <<EOF [logstash-8.x] name=Elastic repository for 8.x packages baseurl=https://artifacts.elastic.co/packages/8.x/yum gpgcheck=1 gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch enabled=1 autorefresh=1 type=rpm-md EOF dnf install -y logstash cp /etc/logstash/logstash.yml{,.original} chown logstash:logstash -R /etc/logstash systemctl daemon-reload systemctl enable logstash.service systemctl start logstash.service
[root@netkiller ~]# vim /usr/lib/systemd/system/logstash.service User=logstash Group=logstash 修改 User=root Group=root
apiVersion: v1 data: filebeat.yml: |- filebeat.inputs: - type: log paths: - /tmp/* fields: project: test group: test stage: test format: json multiline: pattern: '^\[[^stacktrace]' negate: true match: after processors: - add_cloud_metadata: - add_host_metadata: output.logstash: hosts: [""] kind: ConfigMap metadata: name: filebeat namespace: default
apiVersion: apps/v1 kind: Deployment metadata: labels: app: bottleneck name: bottleneck namespace: default spec: progressDeadlineSeconds: 600 replicas: 1 revisionHistoryLimit: 10 selector: matchLabels: app: bottleneck strategy: rollingUpdate: maxSurge: 1 maxUnavailable: 0 type: RollingUpdate template: metadata: creationTimestamp: null labels: app: bottleneck spec: affinity: {} containers: - env: - name: TZ value: Asia/Shanghai - name: JAVA_OPTS value: -Xms2048m -Xmx4096m - name: SPRING_OPTS value: --spring.profiles.active=dev --server.undertow.worker-threads=5000 image: nginx:latest imagePullPolicy: IfNotPresent name: nginx ports: - containerPort: 80 name: http protocol: TCP resources: {} terminationMessagePath: /dev/termination-log terminationMessagePolicy: File volumeMounts: - mountPath: /tmp name: tmp - args: - -c - /usr/share/filebeat/filebeat.yml - -e env: - name: TZ value: Asia/Shanghai - name: JAVA_OPTS - name: SPRING_OPTS image: docker.elastic.co/beats/filebeat:8.6.1 imagePullPolicy: IfNotPresent name: filebeat resources: {} terminationMessagePath: /dev/termination-log terminationMessagePolicy: File volumeMounts: - mountPath: /usr/share/filebeat/filebeat.yml name: config readOnly: true subPath: filebeat.yml - mountPath: /tmp name: tmp dnsPolicy: ClusterFirst restartPolicy: Always schedulerName: default-scheduler securityContext: {} terminationGracePeriodSeconds: 30 volumes: - configMap: defaultMode: 420 name: filebeat name: config - emptyDir: {} name: tmp
logstash -e "input {stdin{}} output {stdout{}}"
/usr/share/logstash/bin/logstash -e 'input{file {path => "/etc/centos-release" start_position => "beginning"}} output { stdout {}}'
/usr/share/logstash/bin/logstash -f stdin.conf /usr/share/logstash/bin/logstash -f jdbc.conf --path.settings /etc/logstash --path.data /tmp
root@netkiller ~/logstash % /usr/share/logstash/bin/logstash -t -f test.conf WARNING: Default JAVA_OPTS will be overridden by the JAVA_OPTS defined in the environment. Environment JAVA_OPTS are -server -Xms2048m -Xmx4096m WARNING: Could not find logstash.yml which is typically located in $LS_HOME/config or /etc/logstash. You can specify the path using --path.settings. Continuing using the defaults Could not find log4j2 configuration at path /usr/share/logstash/config/log4j2.properties. Using default config which logs errors to the console Configuration OK
## JVM configuration # Xms represents the initial size of total heap space # Xmx represents the maximum size of total heap space -Xms1g -Xmx4g ################################################################ ## Expert settings ################################################################ ## ## All settings below this section are considered ## expert settings. Don't tamper with them unless ## you understand what you are doing ## ################################################################ ## GC configuration 11-13:-XX:+UseConcMarkSweepGC 11-13:-XX:CMSInitiatingOccupancyFraction=75 11-13:-XX:+UseCMSInitiatingOccupancyOnly ## Locale # Set the locale language -Duser.language=zh # Set the locale country -Duser.country=CN # Set the locale variant, if any #-Duser.variant= ## basic # set the I/O temp directory #-Djava.io.tmpdir=$HOME # set to headless, just in case -Djava.awt.headless=true # ensure UTF-8 encoding by default (e.g. filenames) -Dfile.encoding=UTF-8 # use our provided JNA always versus the system one #-Djna.nosys=true # Turn on JRuby invokedynamic -Djruby.compile.invokedynamic=true ## heap dumps # generate a heap dump when an allocation from the Java heap fails # heap dumps are created in the working directory of the JVM -XX:+HeapDumpOnOutOfMemoryError # specify an alternative path for heap dumps # ensure the directory exists and has sufficient space #-XX:HeapDumpPath=${LOGSTASH_HOME}/heapdump.hprof ## GC logging #-Xlog:gc*,gc+age=trace,safepoint:file=@loggc@:utctime,pid,tags:filecount=32,filesize=64m # log GC status to a file with time stamps # ensure the directory exists #-Xloggc:${LS_GC_LOG_FILE} # Entropy source for randomness -Djava.security.egd=file:/dev/urandom # Copy the logging context from parent threads to children -Dlog4j2.isThreadContextMapInheritable=true
[root@netkiller ~]# cat /etc/logstash/pipelines.yml # This file is where you define your pipelines. You can define multiple. # For more information on multiple pipelines, see the documentation: # https://www.elastic.co/guide/en/logstash/current/multiple-pipelines.html - pipeline.id: main path.config: "/etc/logstash/conf.d/*.conf"
配置 pipelines.yml 文件
- pipeline.id: main path.config: "/etc/logstash/conf.d/*.conf" - pipeline.id: finance path.config: "/etc/logstash/conf.finance/*.conf" - pipeline.id: market path.config: "/etc/logstash/conf.market/*.conf" - pipeline.id: customer path.config: "/etc/logstash/conf.customer/*.conf"
root@netkiller ~ % /usr/share/logstash/bin/logstash -e "input {stdin{}} output {stdout{}}" Helloworld ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console. WARNING: Could not find logstash.yml which is typically located in $LS_HOME/config or /etc/logstash. You can specify the path using --path.settings. Continuing using the defaults Could not find log4j2 configuration at path //usr/share/logstash/config/log4j2.properties. Using default config which logs to console 18:03:38.340 [[main]-pipeline-manager] INFO logstash.pipeline - Starting pipeline {"id"=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>1000} 18:03:38.356 [[main]-pipeline-manager] INFO logstash.pipeline - Pipeline main started The stdin plugin is now waiting for input: 2017-08-03T10:03:38.375Z localhost Helloworld 18:03:38.384 [Api Webserver] INFO logstash.agent - Successfully started Logstash API endpoint {:port=>9601}
root@netkiller ~ % /usr/share/logstash/bin/logstash -e 'input{stdin{}}output{stdout{codec=>rubydebug}}' My name is neo ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console. WARNING: Could not find logstash.yml which is typically located in $LS_HOME/config or /etc/logstash. You can specify the path using --path.settings. Continuing using the defaults Could not find log4j2 configuration at path //usr/share/logstash/config/log4j2.properties. Using default config which logs to console 18:05:02.734 [[main]-pipeline-manager] INFO logstash.pipeline - Starting pipeline {"id"=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>1000} 18:05:02.747 [[main]-pipeline-manager] INFO logstash.pipeline - Pipeline main started The stdin plugin is now waiting for input: { "@timestamp" => 2017-08-03T10:05:02.764Z, "@version" => "1", "host" => "localhost", "message" => "My name is neo" } 18:05:02.782 [Api Webserver] INFO logstash.agent - Successfully started Logstash API endpoint {:port=>9601}
input { file { type => "syslog" path => [ "/var/log/maillog", "/var/log/messages", "/var/log/secure" ] start_position => "beginning" } } output { stdout { codec => rubydebug } elasticsearch { hosts => [""] } }
start_position => "beginning" 从头开始读,如果没有这个选项,只会读取最后更新的数据。
input { file { path =>"/var/log/messages" type =>"syslog"} file { path =>"/var/log/apache/access.log" type =>"apache"} }
input { file { type => "syslog" path => [ "/var/log/secure", "/var/log/messages", "/var/log/syslog" ] } tcp { port => "5145" type => "syslog-network" } udp { port => "5145" type => "syslog-network" } } output { elasticsearch { hosts => [""] } }
input { redis { host => "" port => "6379" key => "logstash:demo" data_type => "list" codec => "json" type => "logstash-redis-demo" tags => ["logstashdemo"] } } output { elasticsearch { hosts => [""] } }
指定 Database 10
root@netkiller /etc/logstash/conf.d % cat spring-boot-redis.conf input { redis { codec => json host => "localhost" port => 6379 db => 10 key => "logstash:redis" data_type => "list" } } output { stdout { codec => rubydebug } elasticsearch { hosts => [""] index => "logstash-api" } }
input { kafka { zk_connect => "kafka:2181" group_id => "logstash" topic_id => "apache_logs" consumer_threads => 16 } }
root@netkiller /etc/logstash/conf.d % cat jdbc.conf input { jdbc { jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://localhost:3306/cms" jdbc_user => "cms" jdbc_password => "123456" schedule => "* * * * *" statement => "select * from article where id > :sql_last_value" use_column_value => true tracking_column => "id" tracking_column_type => "numeric" record_last_run => true last_run_metadata_path => "/var/tmp/article.last" } jdbc { jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://localhost:3306/cms" jdbc_user => "cms" jdbc_password => "123456" schedule => "* * * * *" #定时cron的表达式,这里是每分钟执行一次 statement => "select * from article where ctime > :sql_last_value" use_column_value => true tracking_column => "ctime" tracking_column_type => "timestamp" record_last_run => true last_run_metadata_path => "/var/tmp/article-ctime.last" } } output { elasticsearch { hosts => "localhost:9200" index => "information" document_type => "article" document_id => "%{id}" action => "update" doc_as_upsert => true } }
系统默认是 ISO8601 如果需要转换为 yyyy-MM-dd-HH:mm:ss 参考:
filter { date { match => [ "ctime", "yyyy-MM-dd HH:mm:ss" ] locale => "cn" } date { match => [ "mtime", "yyyy-MM-dd HH:mm:ss" ] locale => "cn" } }
date { locale => "zh-CN" #match => ["@timestamp", "yyyy-MM-dd HH:mm:ss"] match => ["@timestamp", "ISO8601"] timezone => "Asia/Shanghai" target => ["@timestamp"] }
创建匹配文件 /usr/share/logstash/patterns
mkdir /usr/share/logstash/patterns vim /usr/share/logstash/patterns NGUSERNAME [a-zA-Z\.\@\-\+_%]+ NGUSER %{NGUSERNAME} NGINXACCESS %{IPORHOST:clientip} %{NGUSER:ident} %{NGUSER:auth} \[%{HTTPDATE:timestamp}\] "%{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response} (?:%{NUMBER:bytes}|-) (?:"(?:%{URI:referrer}|-)"|%{QS:referrer}) %{QS:agent}
filter { if [type] == "nginx-access" { grok { match => { "message" => "%{NGINXACCESS}" } } } }
input { file { type => "syslog" path => [ "/var/log/*.log", "/var/log/messages", "/var/log/syslog" ] sincedb_path => "/opt/logstash/sincedb-access" } syslog { type => "syslog" port => "5544" } } filter { grok { type => "syslog" match => [ "message", "%{SYSLOGBASE2}" ] add_tag => [ "syslog", "grokked" ] } } output { elasticsearch { host => "elk.netkiller.cn" } }
input { file { type => "SSRCode" path => "/SD/2015*/01*/*.csv" start_position => "beginning" } } filter { csv { columns => ["Code","Source"] separator => "," } kv { source => "uri" field_split => "&?" value_split => "=" } } # output logs to console and to elasticsearch output { stdout {} elasticsearch { hosts => [""] } }
input { stdin {} } filter { ruby { init => " begin @@csv_file = 'output.csv' @@csv_headers = ['A','B','C'] if File.zero?(@@csv_file) || !File.exist?(@@csv_file) CSV.open(@@csv_file, 'w') do |csv| csv << @@csv_headers end end end " code => " begin event['@metadata']['csv_file'] = @@csv_file event['@metadata']['csv_headers'] = @@csv_headers end " } csv { columns => ["a", "b", "c"] } } output { csv { fields => ["a", "b", "c"] path => "%{[@metadata][csv_file]}" } stdout { codec => rubydebug { metadata => true } } }
echo "1,2,3\n4,5,6\n7,8,9" | ./bin/logstash -f csv-headers.conf
A,B,C 1,2,3 4,5,6 7,8,9
日期格式化, 将ISO 8601日期格式转换为 %Y-%m-%d %H:%M:%S
input { stdin{} } filter { ruby { init => "require 'time'" code => "event.set('ctime', event.get('ctime').time.localtime.strftime('%Y-%m-%d %H:%M:%S'))" } ruby { init => "require 'time'" code => "event.set('mtime', event.get('mtime').time.localtime.strftime('%Y-%m-%d %H:%M:%S'))" } } output { stdout { codec => rubydebug } }
/usr/share/logstash/bin/logstash -f date.conf
丢弃日志种包含 MonthShardingAlgorithm 字符串的日志
root@logging /o/l/p/e/03# cat /srv/logstash/pipeline/filebeat.conf input { beats { port => 5044 } } filter{ if "MonthShardingAlgorithm" in [message] { drop{} } } output { file { path => "/opt/log/%{[fields][environment]}/%{[fields][service]}/%{+MM}/spring.%{+yyyy}-%{+MM}-%{+dd}.log" codec => line { format => "%{message}"} } #file { # path => "/opt/log/%{[fields][environment]}/%{[fields][service]}/%{+MM}/spring.%{+yyyy}-%{+MM}-%{+dd}.log.json.gz" # codec => json_lines # gzip => true #} redis { host => ["r-bp1d17217fa77e14756.redis.rds.aliyuncs.com"] password => "Ejy2016redis" key => "filebeat2" codec => json_lines data_type => "list" } }
grok-patternsfilebeat 发送过来日志是文本可是,我们需要使用 grok 匹配后将对应的值放入指定变量
input { beats { port => 5044 } } filter{ if "MonthShardingAlgorithm" in [message] { drop{} } grok{ match => ["message", "\[%{TIMESTAMP_ISO8601:timestamp}\] \[%{NOTSPACE:hostname}\] \[%{LOGLEVEL:level}\] \[%{NOTSPACE:thread-id}\] %{NOTSPACE:class} - %{JAVALOGMESSAGE:msg}"] #target => "result" } } output { file { path => "/opt/log/%{[fields][environment]}/%{[fields][service]}/%{+MM}/spring.%{+yyyy}-%{+MM}-%{+dd}.log" codec => line { format => "%{message}"} } file { path => "/opt/log/%{[fields][environment]}/%{[fields][service]}/%{+MM}/%{+dd}/spring.%{level}.log" codec => line { format => "%{message}"} } if "ERROR" == [level] { file { path => "/opt/log/%{[fields][environment]}/%{[fields][service]}/beats.log" codec => line { format => "%{message}"} } } file { path => "/opt/log/beats.%{+yyyy}-%{+MM}-%{+dd}.log.gz" codec => json_lines gzip => true } }
output { file { path => "/path/to/%{host}/%{+yyyy}/%{+MM}/%{+dd}.log.gz" message_format => "%{message}" gzip => true } }
每个 tags 标签生成一个日志文件
input { tcp { port => 4567 codec => json_lines } } filter { ruby { code => "event.set('datetime', event.get('@timestamp').time.localtime.strftime('%Y-%m-%d %H:%M:%S'))" } } output { if "finance" in [tags] { file { path => "/opt/log/%{app}.finance.%{+yyyy}-%{+MM}-%{+dd}.log" codec => line { format => "[%{datetime}] %{level} %{message}"} } } else if "market" in [tags] { file { path => "/opt/log/%{app}.market.%{+yyyy}-%{+MM}-%{+dd}.log" codec => line { format => "[%{datetime}] %{level} %{message}"} } } else { file { path => "/opt/log/%{app}.unknow.%{+yyyy}-%{+MM}-%{+dd}.log" codec => line { format => "[%{datetime}] %{level} %{message}"} } } file { path => "/opt/log/%{app}.%{+yyyy}-%{+MM}-%{+dd}.log.gz" codec => json_lines gzip => true } }
output { stdout { codec => rubydebug } elasticsearch { hosts => [""] index => "logging" } }
配置实现每日切割一个 index
index => "logstash-%{+YYYY.MM.dd}" "_index" : "logstash-2017.03.22"
index 自定义 logstash-%{type}-%{+YYYY.MM.dd}
input { redis { data_type => "list" key => "logstash:redis" host => "" port => 6379 threads => 5 codec => "json" } } filter { } output { elasticsearch { hosts => [""] index => "logstash-%{type}-%{+YYYY.MM.dd}" document_type => "%{type}" workers => 1 flush_size => 20 idle_flush_time => 1 template_overwrite => true } stdout{} }
[root@netkiller log]# cat /etc/logstash/conf.d/file.conf input { tcp { port => 4567 codec => json_lines } gelf { port => 12201 use_udp => true #use_tcp => true } } filter { ruby { code => "event.set('datetime', event.get('@timestamp').time.localtime.strftime('%Y-%m-%d %H:%M:%S'))" } } output { file { path => "/opt/log/%{marker}.%{+yyyy}-%{+MM}-%{+dd}.log" codec => line { format => "[%{datetime}] %{level} %{message}"} } file { path => "/opt/log/origin.%{+yyyy}-%{+MM}-%{+dd}.log.gz" codec => json_lines gzip => true } if "ERROR" in [level] { http { url => "https://oapi.dingtalk.com/robot/send?access_token=56c27cb761c4a16473db02d9d28734a56cf549f6977ecc281d008f9a239ba3e0" http_method => "post" content_type => "application/json; charset=utf-8" format => "message" message => '{"msgtype":"text","text":{"content":"Monitor: %{message}"}}' } } }
input { redis { host => "" port => "6379" key => "logstash:demo" data_type => "list" codec => "json" type => "logstash-redis-demo" tags => ["logstashdemo"] } } output { stdout { codec => rubydebug } elasticsearch { hosts => [""] } }
# redis-cli> RPUSH logstash:demo "{\"time\": \"2012-01-01T10:20:00\", \"message\": \"logstash demo message\"}" (integer) 1> exit
# cat /var/log/logstash/logstash-plain.log [2017-03-22T15:54:36,491][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[]}} [2017-03-22T15:54:36,496][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>, :path=>"/"} [2017-03-22T15:54:36,600][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>#<URI::HTTP:0x20dae6aa URL:>} [2017-03-22T15:54:36,601][INFO ][logstash.outputs.elasticsearch] Using mapping template from {:path=>nil} [2017-03-22T15:54:36,686][INFO ][logstash.outputs.elasticsearch] Attempting to install template {:manage_template=>{"template"=>"logstash-*", "version"=>50001, "settings"=>{"index.refresh_interval"=>"5s"}, "mappings"=>{"_default_"=>{"_all"=>{"enabled"=>true, "norms"=>false}, "dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"*", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword"}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date", "include_in_all"=>false}, "@version"=>{"type"=>"keyword", "include_in_all"=>false}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}} [2017-03-22T15:54:36,693][INFO ][logstash.outputs.elasticsearch] Installing elasticsearch template to _template/logstash [2017-03-22T15:54:36,780][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>[#<URI::Generic:0x2f9efc89 URL://>]} [2017-03-22T15:54:36,787][INFO ][logstash.pipeline ] Starting pipeline {"id"=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>1000} [2017-03-22T15:54:36,792][INFO ][logstash.inputs.redis ] Registering Redis {:identity=>"redis://@ list:logstash:demo"} [2017-03-22T15:54:36,793][INFO ][logstash.pipeline ] Pipeline main started [2017-03-22T15:54:36,838][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600} [2017-03-22T15:55:10,018][WARN ][logstash.runner ] SIGTERM received. Shutting down the agent. [2017-03-22T15:55:10,024][WARN ][logstash.agent ] stopping pipeline {:id=>"main"}
input { file { path => [ "/var/log/nginx/access.log" ] start_position => "beginning" } } filter { grok { match => { "message" => "%{NGINXACCESS}" } add_field => { "type" => "access" } } date { match => [ "timestamp" , "dd/MMM/YYYY:HH:mm:ss Z" ] } geoip { source => "clientip" } } output { redis { host => "" port => 6379 data_type => "list" key => "logstash:demo" } }
例 70.2. spring boot logback
root@netkiller /etc/logstash/conf.d % cat spring-boot-redis.conf input { redis { codec => json host => "localhost" port => 6379 key => "logstash:redis" data_type => "list" } } output { elasticsearch { hosts => [""] index => "logstash-api" } }
neo@MacBook-Pro ~/deployment % cat api.netkiller.cn/src/main/resources/logback.xml <?xml version="1.0" encoding="UTF-8"?> <configuration> <include resource="org/springframework/boot/logging/logback/defaults.xml" /> <include resource="org/springframework/boot/logging/logback/file-appender.xml" /> <property name="type.name" value="test" /> <appender name="LOGSTASH" class="com.cwbase.logback.RedisAppender"> <source>mySource</source> <sourcePath>mySourcePath</sourcePath> <type>myApplication</type> <tags>production</tags> <host>localhost</host> <port>6379</port> <database>0</database> <key>logstash:api</key> </appender> <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <encoder> <pattern>%date{yyyy-MM-dd HH:mm:ss} %-4relative [%thread] %-5level %logger{35} : %msg %n</pattern> </encoder> </appender> <root level="INFO"> <appender-ref ref="STDOUT" /> <appender-ref ref="FILE" /> <appender-ref ref="LOGSTASH" /> </root> </configuration>
[root@netkiller ~]# cat /etc/logstash/conf.d/file.conf input { tcp { port => 4567 codec => json_lines } } filter { #ruby { # code => "event.set('@timestamp', LogStash::Timestamp.at(event.get('@timestamp').time.localtime + 8*60*60))" #} ruby { code => "event.set('datetime', event.get('@timestamp').time.localtime.strftime('%Y-%m-%d %H:%M:%S'))" } } output { file { path => "/opt/log/%{app}.%{+yyyy}-%{+MM}-%{+dd}.log.gz" codec => line { format => "[%{datetime}] %{level} %{message}"} #codec => json_lines gzip => true } }
每个 tags 一个文件
[root@netkiller ~]# cat /etc/logstash/conf.d/file.conf input { tcp { port => 4567 codec => json_lines } } filter { ruby { code => "event.set('datetime', event.get('@timestamp').time.localtime.strftime('%Y-%m-%d %H:%M:%S'))" } } output { if "finance" in [tags] { file { path => "/opt/log/%{app}.finance.%{+yyyy}-%{+MM}-%{+dd}.log" codec => line { format => "[%{datetime}] %{level} %{message} %{tags}"} } } else if "market" in [tags] { file { path => "/opt/log/%{app}.market.%{+yyyy}-%{+MM}-%{+dd}.log" codec => line { format => "[%{datetime}] %{level} %{message} %{tags}"} } } else { file { path => "/opt/log/%{app}.unknow.%{+yyyy}-%{+MM}-%{+dd}.log" codec => line { format => "[%{datetime}] %{level} %{message} %{tags}"} } } }
例 70.3. Elasticsearch 索引切割示例
root@netkiller /opt/api.netkiller.cn % cat /etc/logstash/conf.d/spring-boot-redis.conf input { redis { codec => json host => "localhost" port => 6379 db => 10 key => "logstash:redis" data_type => "list" } } output { stdout { codec => rubydebug } elasticsearch { hosts => [""] index => "logstash-%{type}-%{+YYYY.MM.dd}" } }
<?xml version="1.0" encoding="UTF-8"?> <configuration> <include resource="org/springframework/boot/logging/logback/defaults.xml" /> <include resource="org/springframework/boot/logging/logback/file-appender.xml" /> <property name="logstash.type" value="api" /> <property name="logstash.tags" value="springboot" /> <appender name="LOGSTASH" class="com.cwbase.logback.RedisAppender"> <source>application.properties</source> <type>${logstash.type}</type> <tags>${logstash.tags}</tags> <host>localhost</host> <database>10</database> <key>logstash:redis</key> <mdc>true</mdc> <location>true</location> <callerStackIndex>0</callerStackIndex> </appender> <appender name="ASYNC" class="ch.qos.logback.classic.AsyncAppender"> <appender-ref ref="LOGSTASH" /> </appender> <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <encoder> <pattern>%date{yyyy-MM-dd HH:mm:ss} %-4relative [%thread] %-5level %logger{35} : %msg %n</pattern> </encoder> </appender> <root level="INFO"> <appender-ref ref="STDOUT" /> <appender-ref ref="FILE" /> <appender-ref ref="LOGSTASH" /> </root> </configuration>
input { file { path => ["/home/test/data.csv"] start_position => "beginning" #从什么位置读取,beginnig时导入原有数据 sincedb_path => "/test/111" type => "csv" tags => ["optical", "gather"] } } filter { if [type] == "csv" { #多个配置文件同时执行的区分 csv { columns =>["name","device_id"] separator => "^" quote_char => "‰" remove_field => ["device_id","branch_id","area_type"] } } output{ }
root@logging ~# find /srv/logstash/ -type f /srv/logstash/pipeline/config.conf /srv/logstash/bin/logstash /srv/logstash/config/logstash.yml
root@logging ~# cat /srv/logstash/bin/logstash #!/usr/bin/python3 # -*- coding: utf-8 -*- ############################################## # Home : http://netkiller.github.io # Author: Neo <netkiller@msn.com> # Upgrade: 2023-01-11 ############################################## import os import sys try: module = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.insert(0, module) from netkiller.docker import * except ImportError as err: print("%s" % (err)) project = 'logstash' # extra_hosts = [ # 'mongo.netkiller.cn:', 'eos.netkiller.cn:', # 'cfca.netkiller.cn:' # ] dockerfile = Dockerfile() dockerfile.image('docker.elastic.co/logstash/logstash:8.6.0').run( ['apk add -U tzdata', 'rm -f /usr/share/logstash/pipeline/logstash.conf'] ).copy('pipeline/', '/usr/share/logstash/pipeline/').copy('config/', '/usr/share/logstash/config/').workdir('/usr/share/logstash') logstash = Services(project) # logstash.image('logstash/logstash:alpine') # logstash.build(dockerfile) logstash.image('docker.elastic.co/logstash/logstash:8.6.0') logstash.container_name(project) logstash.restart('always') # logstash.hostname('www.netkiller.cn') # openrelogstashsty.extra_hosts(extra_hosts) logstash.extra_hosts(['elasticsearch:']) logstash.environment(['TZ=Asia/Shanghai','XPACK_MONITORING_ENABLED=false','LOG_LEVEL=info']) logstash.ports(['12201:12201/udp', '12201:12201/tcp']) #logstash.ports(['12201:12201','4567:4567']) # logstash.depends_on('test') logstash.working_dir('/usr/share/logstash') logstash.user('root') logstash.volumes( [ '/srv/logstash/pipeline/:/usr/share/logstash/pipeline/', #'/srv/logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml:rw', '/srv/logstash/logs/:/usr/share/logstash/logs/', '/opt/log/:/opt/log/', '/proc:/proc','/sys:/sys' ] ).privileged() development = Composes('development') development.workdir('/var/tmp/development') development.version('3.9') development.services(logstash) if __name__ == '__main__': try: docker = Docker( # {'DOCKER_HOST': 'ssh://root@'} ) # docker.sysctl({'neo': '1'}) docker.environment(development) docker.main() except KeyboardInterrupt: print("Crtl+C Pressed. Shutting down.")
root@logging ~# cat /srv/logstash/pipeline/config.conf input { tcp { port => 4567 codec => json_lines } gelf { port => 12201 use_udp => true use_tcp => true } } filter { ruby { code => "event.set('datetime', event.get('@timestamp').time.localtime.strftime('%Y-%m-%d %H:%M:%S'))" } } output { if [marker] { file { path => "/opt/log/%{environment}/%{service}/%{marker}.%{+yyyy}-%{+MM}-%{+dd}.log" codec => line { format => "[%{datetime}] %{level} %{message}"} } } else { file { path => "/opt/log/%{environment}/%{service}/spring.%{+yyyy}-%{+MM}-%{+dd}.log" codec => line { format => "[%{datetime}] [%{host}:%{source_host}] [%{level}] (%{class}.%{method}:%{line}) - %{message}"} } } file { path => "/opt/log/%{environment}/%{service}/spring.%{+yyyy}-%{+MM}-%{+dd}.json.gz" codec => json_lines gzip => true } if [environment] =~ /(prod|grey)/ { if "ERROR" in [level] { http { url => "https://oapi.dingtalk.com/robot/send?access_token=f9257740a3f084b0160ec06ae40f95b0b052e69c699400eaa5db316612de90f8" http_method => "post" content_type => "application/json; charset=utf-8" format => "message" message => '{"msgtype":"text","text":{"content":"时间:%{datetime}\n主机:%{host}[%{source_host}]\n环境:%{environment}\n服务:%{service}\n消息:%{message}"}}' } } if "WARN" in [level] { http { url => "https://oapi.dingtalk.com/robot/send?access_token=d6602c6fb6b47250f38d31f791968a12201a6980f3a1175829a57e6afca7678b" http_method => "post" content_type => "application/json; charset=utf-8" format => "message" message => '{"msgtype":"text","text":{"content":"时间:%{datetime}\n主机:%{host}[%{source_host}]\n环境:%{environment}\n服务:%{service}\n消息:%{message}"}}' } } } if [environment] =~ /(stage|test|dev)/ { if ("ERROR" in [level] or "WARN" in [level]) { http { url => "https://oapi.dingtalk.com/robot/send?access_token=9501f8d983188517fcbd204c89bf5f47b9dfdac2a788bda85bd353d8e266fb5f" http_method => "post" content_type => "application/json; charset=utf-8" format => "message" message => '{"msgtype":"text","text":{"content":"时间:%{datetime}\n主机:%{host}[%{source_host}]\n环境:%{environment}\n服务:%{service}\n消息:%{message}"}}' } } } }
input { tcp { port => 4567 codec => json_lines } gelf { port => 12201 use_udp => true use_tcp => true } } filter { ruby { code => "event.set('datetime', event.get('@timestamp').time.localtime.strftime('%Y-%m-%d %H:%M:%S'))" } } output { if [marker] { file { path => "/opt/log/%{environment}/%{service}/%{+MM}/%{marker}.%{+yyyy}-%{+MM}-%{+dd}.log" codec => line { format => "[%{datetime}] %{level} %{message}"} } } else { file { path => "/opt/log/%{environment}/%{service}/%{+MM}/unknow.%{+yyyy}-%{+MM}-%{+dd}.log" codec => line { format => "[%{datetime}] [%{host}:%{source_host}] [%{level}] (%{class}.%{method}:%{line}) - %{message}"} } } file { path => "/opt/log/%{environment}/%{service}/%{+MM}/unknow.%{+yyyy}-%{+MM}-%{+dd}.json.gz" codec => json_lines gzip => true } if [environment] =~ /(prod|grey)/ { if "ERROR" in [level] { http { url => "https://oapi.dingtalk.com/robot/send?access_token=f9257740a0ec06ae40f316613f084b095b0b052e69c699400eaa5db162de90f8" http_method => "post" content_type => "application/json; charset=utf-8" format => "message" message => '{"msgtype":"text","text":{"content":"时间:%{datetime}\n主机:%{host}[%{source_host}]\n环境:%{environment}\n服务:%{service}\n消息:%{message}"}}' } } if "WARN" in [level] { http { url => "https://oapi.dingtalk.com/robot/send?access_token=d66029a57e68d31f791968a12201a6980f3ac6fb6b47250f3117582afca7678b" http_method => "post" content_type => "application/json; charset=utf-8" format => "message" message => '{"msgtype":"text","text":{"content":"时间:%{datetime}\n主机:%{host}[%{source_host}]\n环境:%{environment}\n服务:%{service}\n消息:%{message}"}}' } } } if "compute" in [marker] and "prod" in [environment] { http { url => "https://oapi.dingtalk.com/robot/send?access_token=324ab12a36bcb2bb788720c974486218f2517de5a8f5fa009b52297934310c7f" http_method => "post" content_type => "application/json; charset=utf-8" format => "message" message => '{"msgtype":"text","text":{"content":"时间:%{datetime}\n主机:%{host}[%{source_host}]\n环境:%{environment}\n服务:%{service}\n消息:%{message}"}}' } http { url => "http://zentao.netkiller.cn/zentao/gitlab.php?type=task&func=create&name=服务%{service}环境%{environment}" http_method => "post" format => "form" mapping => {"message" => "时间:%{datetime}</br>主机:%{host}[%{source_host}]</br>环境:%{environment}</br>服务:%{service}</br>消息:%{message}"} } } if [environment] =~ /(pre|test|dev|office)/ { if ("ERROR" in [level] or "WARN" in [level]) { http { url => "https://oapi.dingtalk.com/robot/send?access_token=9501f8d9b9dfda204c89bf5f47788bda85bc2a83188517fcbdd353d8e266fb5f" http_method => "post" content_type => "application/json; charset=utf-8" format => "message" message => '{"msgtype":"text","text":{"content":"时间:%{datetime}\n主机:%{host}[%{source_host}]\n环境:%{environment}\n服务:%{service}\n消息:%{message}"}}' } } } }
curl -s https://raw.githubusercontent.com/netkiller/shell/master/search/elastic/elastic-6.x.sh | bash curl -s https://raw.githubusercontent.com/netkiller/shell/master/search/elastic/beats/beats.sh | bash
filebeat.inputs: - type: log paths: - /data/logs/* fields: project: ${PROJECT} group: ${GROUP} stage: ${STAGE} format: ${FORMAT} processors: - add_cloud_metadata: - add_host_metadata: output.file: path: "/tmp" filename: filebeat
[docker@netkiller ~]$ cat filebeat.tcp.yml filebeat.inputs: - type: tcp max_message_size: 10MiB host: "localhost:9000" output.file: path: "/tmp" filename: filebeat.log
[docker@netkiller ~]$ sudo chmod go-w /home/docker/filebeat.tcp.yml
[docker@netkiller ~]$ ss -lnt | grep 9000 LISTEN 0 1024*
[docker@netkiller ~]$ echo "Hello world!!!" | nc localhost 9000 echo "Hello worldss -lnt | grep 9000!" | nc localhost 9000 [docker@netkiller ~]$ cat /etc/filesystems | nc localhost 9000 [docker@netkiller ~]$ sudo cat /tmp/filebeat.log-20220728.ndjson |jq | grep message "message": "Hello worldss -lnt | grep 9000!" "message": "ext4", "message": "ext3", "message": "ext2", "message": "nodev proc", "message": "nodev devpts", "message": "iso9660" "message": "vfat", "message": "hfs", "message": "hfsplus", "message": "*",
filebeat.inputs: - type: log paths: - /tmp/* fields: project: www group: netkiller.cn stage: dev format: json multiline: pattern: '^\[[^stacktrace]' negate: true match: after processors: - add_cloud_metadata: - add_host_metadata: output.logstash: hosts: [""]
logstash 配置
input { beats { port => 5044 } } output { file { path => "/opt/log/%{[fields][environment]}/%{[fields][service]}/spring.%{+yyyy}-%{+MM}-%{+dd}.log" codec => line { format => "%{message}"} } file { path => "/opt/log/%{[fields][environment]}/%{[fields][service]}/spring.%{+yyyy}-%{+MM}-%{+dd}.log.json.gz" codec => json_lines gzip => true } redis { host => ["redis.netkiller.cn"] password => "passw0rd" key => "filebeat" codec => json_lines data_type => "channel" } }
filebeat 从 file 采集日志,发送到 logstash,logstash 接收的是一行一行的文本数据,没有 level 变量。实现 INFO,WARN,ERROR 切割,可以通过字符串匹配方式实现。
input { beats { port => 5044 } } filter{ if "MonthShardingAlgorithm" in [message] { drop{} } } output { file { path => "/opt/log/%{[fields][environment]}/%{[fields][service]}/%{+MM}/spring.%{+yyyy}-%{+MM}-%{+dd}.log" codec => line { format => "%{message}"} } if "netkiller-service" == [fields][service] and "ERROR" in [message] { file { path => "/opt/log/%{[fields][environment]}/%{[fields][service]}/netkiller.beats.log" codec => line { format => "%{message}"} } } file { path => "/opt/log/beats.%{+yyyy}-%{+MM}-%{+dd}.log.gz" codec => json_lines gzip => true } }
input { beats { port => 5044 } } filter{ if "MonthShardingAlgorithm" in [message] { drop{} } grok{ match => ["message", "\[%{TIMESTAMP_ISO8601:timestamp}\] \[%{NOTSPACE:hostname}\] \[%{LOGLEVEL:level}\] \[%{NOTSPACE:thread-id}\] %{NOTSPACE:class} - %{JAVALOGMESSAGE:msg}"] } } output { file { path => "/opt/log/%{[fields][environment]}/%{[fields][service]}/%{+MM}/spring.%{+yyyy}-%{+MM}-%{+dd}.log" codec => line { format => "%{message}"} } if "compute-service" == [fields][service] and "ERROR" == [level] { file { path => "/opt/log/%{[fields][environment]}/%{[fields][service]}/compute.error.log" codec => line { format => "%{message}"} } } }
使用 top 查看 cpu 占用率长期80%,甚至 100%,解决方案是修改 jvm.options 配置文件,将 -Xmx1g 改为 -Xmx4g,-Xmx 配置建议是4~8g
## JVM configuration # Xms represents the initial size of total heap space # Xmx represents the maximum size of total heap space -Xms1g -Xmx4g ################################################################ ## Expert settings ################################################################ ## ## All settings below this section are considered ## expert settings. Don't tamper with them unless ## you understand what you are doing ## ################################################################ ## GC configuration 11-13:-XX:+UseConcMarkSweepGC 11-13:-XX:CMSInitiatingOccupancyFraction=75 11-13:-XX:+UseCMSInitiatingOccupancyOnly ## Locale # Set the locale language #-Duser.language=en # Set the locale country #-Duser.country=US # Set the locale variant, if any #-Duser.variant= ## basic # set the I/O temp directory #-Djava.io.tmpdir=$HOME # set to headless, just in case -Djava.awt.headless=true # ensure UTF-8 encoding by default (e.g. filenames) -Dfile.encoding=UTF-8 # use our provided JNA always versus the system one #-Djna.nosys=true # Turn on JRuby invokedynamic -Djruby.compile.invokedynamic=true ## heap dumps # generate a heap dump when an allocation from the Java heap fails # heap dumps are created in the working directory of the JVM -XX:+HeapDumpOnOutOfMemoryError # specify an alternative path for heap dumps # ensure the directory exists and has sufficient space #-XX:HeapDumpPath=${LOGSTASH_HOME}/heapdump.hprof ## GC logging #-Xlog:gc*,gc+age=trace,safepoint:file=@loggc@:utctime,pid,tags:filecount=32,filesize=64m # log GC status to a file with time stamps # ensure the directory exists #-Xloggc:${LS_GC_LOG_FILE} # Entropy source for randomness -Djava.security.egd=file:/dev/urandom # Copy the logging context from parent threads to children -Dlog4j2.isThreadContextMapInheritable=true
# curl 'http://localhost:9200/_search?pretty' { "took" : 1, "timed_out" : false, "_shards" : { "total" : 1, "successful" : 1, "failed" : 0 }, "hits" : { "total" : 1, "max_score" : 1.0, "hits" : [ { "_index" : ".kibana", "_type" : "config", "_id" : "5.2.2", "_score" : 1.0, "_source" : { "buildNum" : 14723 } } ] } }
elasticsearch 的配置不能省略 9200 端口,否则将无法链接elasticsearch
elasticsearch { hosts => [""] }
#cd /etc/logstash/conf.d #vim logstash_server.conf input { redis { port => "6379" host => "" data_type => "list" key => "logstash-redis" type => "redis-input" } } output { stdout { codec => rubydebug } }
5.x type类型如果是date,那么系统默认使用 ISO8601 格式。 6.x 修复了这个问题。"ctime": "2017-12-18 11:21:57"
UDP 调试方法
[root@netkiller log]# cat test.json {"facility":"logstash-gelf","source_host":"","@version":"1","method":"init","message":"Test","class":"Application","host":"macbook-pro-m2.local","@timestamp":"2023-01-07T03:32:28.368Z","timestamp":"2023-01-07 11:32:28.368","marker":"spring","datetime":"2023-01-07 11:32:28","logger":"cn.netkiller.Application","level":"WARN","line":21,"version":"1.1"} [root@netkiller log]# cat test.json | nc -u 12202
curl -s https://raw.githubusercontent.com/netkiller/shell/master/search/elastic/elastic-6.x.sh | bash
操作系统: CentOS 7
Java 1.8
ElasticSearch + Logstash + Kibana 均使用 5.2 版本
以下安装均使用 Netkiller OSCM 脚本一键安装
curl -s https://raw.githubusercontent.com/netkiller/shell/master/search/elasticsearch/elasticsearch-5.x.sh | bash
curl -s https://raw.githubusercontent.com/netkiller/shell/master/log/kibana/kibana-5.x.sh | bash
curl -s https://raw.githubusercontent.com/netkiller/shell/master/log/kibana/logstash-5.x.sh | bash