kafka_elasticsearch.conf 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. input {
  2. kafka {
  3. codec => json{}
  4. bootstrap_servers => "kafka:9092"
  5. topics => ["random"]
  6. client_id => "logstash_indexer_1"
  7. }
  8. kafka {
  9. codec => json{}
  10. bootstrap_servers => "kafka:9092"
  11. topics => ["apache"]
  12. client_id => "logstash_indexer_1"
  13. }
  14. kafka {
  15. codec => json{}
  16. bootstrap_servers => "kafka:9092"
  17. topics => ["random-forwarder"]
  18. client_id => "logstash_indexer_1"
  19. }
  20. kafka {
  21. codec => json{}
  22. bootstrap_servers => "kafka:9092"
  23. topics => ["apache-forwarder"]
  24. client_id => "logstash_indexer_1"
  25. }
  26. }
  27. filter {
  28. if [type] == "nginx-access" {
  29. grok {
  30. match => [ "message" , "%{COMBINEDAPACHELOG}+%{GREEDYDATA:extra_fields}"]
  31. overwrite => [ "message" ]
  32. }
  33. mutate {
  34. convert => ["response", "integer"]
  35. convert => ["bytes", "integer"]
  36. convert => ["responsetime", "float"]
  37. }
  38. geoip {
  39. source => "clientip"
  40. target => "geoip"
  41. add_tag => [ "nginx-geoip" ]
  42. }
  43. date {
  44. match => [ "timestamp" , "dd/MMM/YYYY:HH:mm:ss Z" ]
  45. remove_field => [ "timestamp" ]
  46. }
  47. useragent {
  48. source => "agent"
  49. }
  50. }
  51. if [type] == "random" {
  52. grok {
  53. match => [ "message" , "(?<timestamp>%{YEAR}[./-]%{MONTHNUM}[./-]%{MONTHDAY}[- ]%{TIME}) %{NUMBER:pid} %{GREEDYDATA:filename} %{NUMBER:line} %{GREEDYDATA:logger} %{LOGLEVEL:severity} %{GREEDYDATA:quote}"]
  54. overwrite => [ "message" ]
  55. }
  56. date {
  57. match => [ "timestamp", "YYYY-MM-dd HH:mm:ss,SSS"]
  58. remove_field => [ "timestamp" ]
  59. }
  60. }
  61. if [type] == "apache" {
  62. grok {
  63. match => [ "message" , "%{COMBINEDAPACHELOG}"]
  64. overwrite => [ "message" ]
  65. }
  66. mutate {
  67. convert => ["response", "integer"]
  68. convert => ["bytes", "integer"]
  69. convert => ["responsetime", "float"]
  70. }
  71. geoip {
  72. source => "clientip"
  73. target => "geoip"
  74. add_tag => [ "apache-geoip" ]
  75. }
  76. date {
  77. match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
  78. remove_field => [ "timestamp" ]
  79. }
  80. }
  81. if [type] == "random-forwarder" {
  82. grok {
  83. match => [ "message" , "(?<timestamp>%{YEAR}[./-]%{MONTHNUM}[./-]%{MONTHDAY}[- ]%{TIME}) %{NUMBER:pid} %{GREEDYDATA:filename} %{NUMBER:line} %{GREEDYDATA:logger} %{LOGLEVEL:severity} %{GREEDYDATA:quote}"]
  84. overwrite => [ "message" ]
  85. }
  86. date {
  87. match => [ "timestamp", "YYYY-MM-dd HH:mm:ss,SSS"]
  88. remove_field => [ "timestamp" ]
  89. }
  90. }
  91. if [type] == "apache-forwarder" {
  92. grok {
  93. match => [ "message" , "%{COMBINEDAPACHELOG}"]
  94. overwrite => [ "message" ]
  95. }
  96. mutate {
  97. convert => ["response", "integer"]
  98. convert => ["bytes", "integer"]
  99. convert => ["responsetime", "float"]
  100. }
  101. geoip {
  102. source => "clientip"
  103. target => "geoip"
  104. add_tag => [ "apache-geoip" ]
  105. }
  106. date {
  107. match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
  108. remove_field => [ "timestamp" ]
  109. }
  110. }
  111. }
  112. output {
  113. if [type] == "nginx-access" {
  114. elasticsearch {
  115. hosts => ["elasticsearch:9200"]
  116. index => "nginx-%{+YYYYMM}"
  117. }
  118. }
  119. if [type] == "random" {
  120. elasticsearch {
  121. hosts => ["elasticsearch:9200"]
  122. index => "random-%{+YYYYMM}"
  123. }
  124. }
  125. if [type] == "apache" {
  126. elasticsearch {
  127. hosts => ["elasticsearch:9200"]
  128. index => "apache-%{+YYYYMM}"
  129. }
  130. }
  131. if [type] == "random-forwarder" {
  132. elasticsearch {
  133. hosts => ["elasticsearch:9200"]
  134. index => "randomforwarder-%{+YYYYMM}"
  135. }
  136. stdout {
  137. codec => rubydebug
  138. }
  139. }
  140. if [type] == "apache-forwarder" {
  141. elasticsearch {
  142. hosts => ["elasticsearch:9200"]
  143. index => "apacheforwarder-%{+YYYYMM}"
  144. }
  145. stdout {
  146. codec => rubydebug
  147. }
  148. }
  149. }