kafka_elasticsearch.conf 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. input {
  2. kafka {
  3. codec => json{}
  4. bootstrap_servers => "kafka:9092"
  5. topics => ["random", "apache", "random-forwarder", "apache-forwarder"]
  6. client_id => "logstash_indexer_1"
  7. }
  8. }
  9. filter {
  10. if [type] == "nginx-access" {
  11. grok {
  12. match => [ "message" , "%{COMBINEDAPACHELOG}+%{GREEDYDATA:extra_fields}"]
  13. overwrite => [ "message" ]
  14. }
  15. mutate {
  16. convert => ["response", "integer"]
  17. convert => ["bytes", "integer"]
  18. convert => ["responsetime", "float"]
  19. }
  20. geoip {
  21. source => "clientip"
  22. target => "geoip"
  23. add_tag => [ "nginx-geoip" ]
  24. }
  25. date {
  26. match => [ "timestamp" , "dd/MMM/YYYY:HH:mm:ss Z" ]
  27. remove_field => [ "timestamp" ]
  28. }
  29. useragent {
  30. source => "agent"
  31. }
  32. }
  33. if [type] == "random" {
  34. grok {
  35. match => [ "message" , "(?<timestamp>%{YEAR}[./-]%{MONTHNUM}[./-]%{MONTHDAY}[- ]%{TIME}) %{NUMBER:pid} %{GREEDYDATA:filename} %{NUMBER:line} %{GREEDYDATA:logger} %{LOGLEVEL:severity} %{GREEDYDATA:quote}"]
  36. overwrite => [ "message" ]
  37. }
  38. date {
  39. match => [ "timestamp", "YYYY-MM-dd HH:mm:ss,SSS"]
  40. remove_field => [ "timestamp" ]
  41. }
  42. }
  43. if [type] == "apache" {
  44. grok {
  45. match => [ "message" , "%{COMBINEDAPACHELOG}"]
  46. overwrite => [ "message" ]
  47. }
  48. mutate {
  49. convert => ["response", "integer"]
  50. convert => ["bytes", "integer"]
  51. convert => ["responsetime", "float"]
  52. }
  53. geoip {
  54. source => "clientip"
  55. target => "geoip"
  56. add_tag => [ "apache-geoip" ]
  57. }
  58. date {
  59. match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
  60. remove_field => [ "timestamp" ]
  61. }
  62. }
  63. if [type] == "random-forwarder" {
  64. grok {
  65. match => [ "message" , "(?<timestamp>%{YEAR}[./-]%{MONTHNUM}[./-]%{MONTHDAY}[- ]%{TIME}) %{NUMBER:pid} %{GREEDYDATA:filename} %{NUMBER:line} %{GREEDYDATA:logger} %{LOGLEVEL:severity} %{GREEDYDATA:quote}"]
  66. overwrite => [ "message" ]
  67. }
  68. date {
  69. match => [ "timestamp", "YYYY-MM-dd HH:mm:ss,SSS"]
  70. remove_field => [ "timestamp" ]
  71. }
  72. }
  73. if [type] == "apache-forwarder" {
  74. grok {
  75. match => [ "message" , "%{COMBINEDAPACHELOG}"]
  76. overwrite => [ "message" ]
  77. }
  78. mutate {
  79. convert => ["response", "integer"]
  80. convert => ["bytes", "integer"]
  81. convert => ["responsetime", "float"]
  82. }
  83. geoip {
  84. source => "clientip"
  85. target => "geoip"
  86. add_tag => [ "apache-geoip" ]
  87. }
  88. date {
  89. match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
  90. remove_field => [ "timestamp" ]
  91. }
  92. }
  93. }
  94. output {
  95. if [type] == "nginx-access" {
  96. elasticsearch {
  97. hosts => ["elasticsearch:9200"]
  98. index => "nginx-%{+YYYYMM}"
  99. }
  100. }
  101. if [type] == "random" {
  102. elasticsearch {
  103. hosts => ["elasticsearch:9200"]
  104. index => "random-%{+YYYYMM}"
  105. }
  106. }
  107. if [type] == "apache" {
  108. elasticsearch {
  109. hosts => ["elasticsearch:9200"]
  110. index => "apache-%{+YYYYMM}"
  111. }
  112. }
  113. if [type] == "random-forwarder" {
  114. elasticsearch {
  115. hosts => ["elasticsearch:9200"]
  116. index => "randomforwarder-%{+YYYYMM}"
  117. }
  118. stdout {
  119. codec => rubydebug
  120. }
  121. }
  122. if [type] == "apache-forwarder" {
  123. elasticsearch {
  124. hosts => ["elasticsearch:9200"]
  125. index => "apacheforwarder-%{+YYYYMM}"
  126. }
  127. stdout {
  128. codec => rubydebug
  129. }
  130. }
  131. }