input { kafka { codec => json{} bootstrap_servers => "kafka:9092" topics => ["random", "apache", "random-forwarder", "apache-forwarder"] client_id => "logstash_indexer_1" } } filter { if [type] == "nginx-access" { grok { match => [ "message" , "%{COMBINEDAPACHELOG}+%{GREEDYDATA:extra_fields}"] overwrite => [ "message" ] } mutate { convert => ["response", "integer"] convert => ["bytes", "integer"] convert => ["responsetime", "float"] } geoip { source => "clientip" target => "geoip" add_tag => [ "nginx-geoip" ] } date { match => [ "timestamp" , "dd/MMM/YYYY:HH:mm:ss Z" ] remove_field => [ "timestamp" ] } useragent { source => "agent" } } if [type] == "random" { grok { match => [ "message" , "(?%{YEAR}[./-]%{MONTHNUM}[./-]%{MONTHDAY}[- ]%{TIME}) %{NUMBER:pid} %{GREEDYDATA:filename} %{NUMBER:line} %{GREEDYDATA:logger} %{LOGLEVEL:severity} %{GREEDYDATA:quote}"] overwrite => [ "message" ] } date { match => [ "timestamp", "YYYY-MM-dd HH:mm:ss,SSS"] remove_field => [ "timestamp" ] } } if [type] == "apache" { grok { match => [ "message" , "%{COMBINEDAPACHELOG}"] overwrite => [ "message" ] } mutate { convert => ["response", "integer"] convert => ["bytes", "integer"] convert => ["responsetime", "float"] } geoip { source => "clientip" target => "geoip" add_tag => [ "apache-geoip" ] } date { match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ] remove_field => [ "timestamp" ] } } if [type] == "random-forwarder" { grok { match => [ "message" , "(?%{YEAR}[./-]%{MONTHNUM}[./-]%{MONTHDAY}[- ]%{TIME}) %{NUMBER:pid} %{GREEDYDATA:filename} %{NUMBER:line} %{GREEDYDATA:logger} %{LOGLEVEL:severity} %{GREEDYDATA:quote}"] overwrite => [ "message" ] } date { match => [ "timestamp", "YYYY-MM-dd HH:mm:ss,SSS"] remove_field => [ "timestamp" ] } } if [type] == "apache-forwarder" { grok { match => [ "message" , "%{COMBINEDAPACHELOG}"] overwrite => [ "message" ] } mutate { convert => ["response", "integer"] convert => ["bytes", "integer"] convert => ["responsetime", "float"] } geoip { source => "clientip" target => "geoip" add_tag => [ "apache-geoip" ] } date { match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ] remove_field => [ "timestamp" ] } } } output { if [type] == "nginx-access" { elasticsearch { hosts => ["elasticsearch:9200"] index => "nginx-%{+YYYYMM}" } } if [type] == "random" { elasticsearch { hosts => ["elasticsearch:9200"] index => "random-%{+YYYYMM}" } } if [type] == "apache" { elasticsearch { hosts => ["elasticsearch:9200"] index => "apache-%{+YYYYMM}" } } if [type] == "random-forwarder" { elasticsearch { hosts => ["elasticsearch:9200"] index => "randomforwarder-%{+YYYYMM}" } stdout { codec => rubydebug } } if [type] == "apache-forwarder" { elasticsearch { hosts => ["elasticsearch:9200"] index => "apacheforwarder-%{+YYYYMM}" } stdout { codec => rubydebug } } }