kafka_elasticsearch.conf 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. input {
  2. kafka {
  3. codec => json{}
  4. bootstrap_servers => "kafka:9092"
  5. topics => ["random"]
  6. client_id => "logstash_indexer_1"
  7. }
  8. kafka {
  9. codec => json{}
  10. bootstrap_servers => "kafka:9092"
  11. topics => ["apache"]
  12. client_id => "logstash_indexer_1"
  13. }
  14. kafka {
  15. codec => json{}
  16. bootstrap_servers => "kafka:9092"
  17. topics => ["random-forwarder"]
  18. client_id => "logstash_indexer_1"
  19. }
  20. kafka {
  21. codec => json{}
  22. bootstrap_servers => "kafka:9092"
  23. topics => ["apache-forwarder"]
  24. client_id => "logstash_indexer_1"
  25. }
  26. kafka {
  27. codec => json{}
  28. bootstrap_servers => "kafka:9092"
  29. topics => ["javalog"]
  30. client_id => "logstash_indexer_1"
  31. }
  32. }
  33. filter {
  34. if [type] == "nginx-access" {
  35. grok {
  36. match => [ "message" , "%{COMBINEDAPACHELOG}+%{GREEDYDATA:extra_fields}"]
  37. overwrite => [ "message" ]
  38. }
  39. mutate {
  40. convert => ["response", "integer"]
  41. convert => ["bytes", "integer"]
  42. convert => ["responsetime", "float"]
  43. }
  44. geoip {
  45. source => "clientip"
  46. target => "geoip"
  47. add_tag => [ "nginx-geoip" ]
  48. }
  49. date {
  50. match => [ "timestamp" , "dd/MMM/YYYY:HH:mm:ss Z" ]
  51. remove_field => [ "timestamp" ]
  52. }
  53. useragent {
  54. source => "agent"
  55. }
  56. }
  57. if [type] == "random" {
  58. grok {
  59. match => [ "message" , "(?<timestamp>%{YEAR}[./-]%{MONTHNUM}[./-]%{MONTHDAY}[- ]%{TIME}) %{NUMBER:pid} %{GREEDYDATA:filename} %{NUMBER:line} %{GREEDYDATA:logger} %{LOGLEVEL:severity} %{GREEDYDATA:quote}"]
  60. overwrite => [ "message" ]
  61. }
  62. date {
  63. match => [ "timestamp", "YYYY-MM-dd HH:mm:ss,SSS"]
  64. remove_field => [ "timestamp" ]
  65. }
  66. }
  67. if [type] == "apache" {
  68. grok {
  69. match => [ "message" , "%{COMBINEDAPACHELOG}"]
  70. overwrite => [ "message" ]
  71. }
  72. mutate {
  73. convert => ["response", "integer"]
  74. convert => ["bytes", "integer"]
  75. convert => ["responsetime", "float"]
  76. }
  77. geoip {
  78. source => "clientip"
  79. target => "geoip"
  80. add_tag => [ "apache-geoip" ]
  81. }
  82. date {
  83. match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
  84. remove_field => [ "timestamp" ]
  85. }
  86. }
  87. if [type] == "random-forwarder" {
  88. grok {
  89. match => [ "message" , "(?<timestamp>%{YEAR}[./-]%{MONTHNUM}[./-]%{MONTHDAY}[- ]%{TIME}) %{NUMBER:pid} %{GREEDYDATA:filename} %{NUMBER:line} %{GREEDYDATA:logger} %{LOGLEVEL:severity} %{GREEDYDATA:quote}"]
  90. overwrite => [ "message" ]
  91. }
  92. date {
  93. match => [ "timestamp", "YYYY-MM-dd HH:mm:ss,SSS"]
  94. remove_field => [ "timestamp" ]
  95. }
  96. }
  97. if [type] == "apache-forwarder" {
  98. grok {
  99. match => [ "message" , "%{COMBINEDAPACHELOG}"]
  100. overwrite => [ "message" ]
  101. }
  102. mutate {
  103. convert => ["response", "integer"]
  104. convert => ["bytes", "integer"]
  105. convert => ["responsetime", "float"]
  106. }
  107. geoip {
  108. source => "clientip"
  109. target => "geoip"
  110. add_tag => [ "apache-geoip" ]
  111. }
  112. date {
  113. match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
  114. remove_field => [ "timestamp" ]
  115. }
  116. }
  117. }
  118. output {
  119. if [type] == "nginx-access" {
  120. elasticsearch {
  121. hosts => ["elasticsearch:9200"]
  122. index => "nginx-%{+YYYYMM}"
  123. }
  124. }
  125. if [type] == "random" {
  126. elasticsearch {
  127. hosts => ["elasticsearch:9200"]
  128. index => "random-%{+YYYYMM}"
  129. }
  130. }
  131. if [type] == "apache" {
  132. elasticsearch {
  133. hosts => ["elasticsearch:9200"]
  134. index => "apache-%{+YYYYMM}"
  135. }
  136. }
  137. if [type] == "javalog" {
  138. elasticsearch {
  139. hosts => ["elasticsearch:9200"]
  140. index => "javalog-%{+YYYYMM}"
  141. }
  142. }
  143. if [type] == "random-forwarder" {
  144. elasticsearch {
  145. hosts => ["elasticsearch:9200"]
  146. index => "randomforwarder-%{+YYYYMM}"
  147. }
  148. stdout {
  149. codec => rubydebug
  150. }
  151. }
  152. if [type] == "apache-forwarder" {
  153. elasticsearch {
  154. hosts => ["elasticsearch:9200"]
  155. index => "apacheforwarder-%{+YYYYMM}"
  156. }
  157. stdout {
  158. codec => rubydebug
  159. }
  160. }
  161. }