input { file { path => "/proj/winccoa/smh/log/*.log" } } filter { # Join lines based on the time multiline { pattern => "%{YEAR}.%{MONTHNUM}.%{MONTHDAY} %{TIME}" negate => true what => "previous" } # Extract the date and the rest from the message grok { match => [ "message","%{YEAR:year}.%{MONTHNUM:month}.%{MONTHDAY:day} %{TIME:time}(?.*$)" ] } grok { match => { path => "%{GREEDYDATA}/%{GREEDYDATA:filename}\.log" } } mutate { add_field => { "timestamp" => "%{year}.%{month}.%{day} %{time}" } } date { locale => "en" match => [ "timestamp" , "yyyy.MM.dd HH:mm:ss.SSS" ] } mutate { replace => [ "message", "%{log_message}" ] } mutate { remove_field => [ "timestamp", "year", "month", "day", "time", "log_message" ] } } output { # stdout { code => rubydebug } elasticsearch { hosts => ["elastic1:9200","elastic2:9200","elastic3:9200"] index => "winccoa-log-%{+YYYY.MM}" } kafka { bootstrap_servers => "hadoop-node2:9092,hadoop-node3:9092,hadoop-node4:9092" topic_id => "logstash-winccoa" } }