Forwarder.java 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. package info.fetter.logstashforwarder;
  2. import static org.apache.log4j.Level.*;
  3. import java.io.IOException;
  4. import info.fetter.logstashforwarder.config.ConfigurationManager;
  5. import info.fetter.logstashforwarder.config.FilesSection;
  6. import info.fetter.logstashforwarder.protocol.LumberjackClient;
  7. import info.fetter.logstashforwarder.util.AdapterException;
  8. import org.apache.commons.cli.CommandLine;
  9. import org.apache.commons.cli.CommandLineParser;
  10. import org.apache.commons.cli.GnuParser;
  11. import org.apache.commons.cli.HelpFormatter;
  12. import org.apache.commons.cli.Option;
  13. import org.apache.commons.cli.OptionBuilder;
  14. import org.apache.commons.cli.Options;
  15. import org.apache.commons.cli.ParseException;
  16. import org.apache.log4j.BasicConfigurator;
  17. import org.apache.log4j.Level;
  18. import org.apache.log4j.Logger;
  19. import org.apache.log4j.spi.RootLogger;
  20. /*
  21. * Copyright 2015 Didier Fetter
  22. *
  23. * Licensed under the Apache License, Version 2.0 (the "License");
  24. * you may not use this file except in compliance with the License.
  25. * You may obtain a copy of the License at
  26. *
  27. * http://www.apache.org/licenses/LICENSE-2.0
  28. *
  29. * Unless required by applicable law or agreed to in writing, software
  30. * distributed under the License is distributed on an "AS IS" BASIS,
  31. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  32. * See the License for the specific language governing permissions and
  33. * limitations under the License.
  34. *
  35. */
  36. public class Forwarder {
  37. private static Logger logger = Logger.getLogger(Forwarder.class);
  38. private static int spoolSize = 1024;
  39. private static int idleTimeout = 5000;
  40. private static String config;
  41. private static ConfigurationManager configManager;
  42. private static FileWatcher watcher;
  43. private static FileReader reader;
  44. private static Level logLevel = INFO;
  45. private static ProtocolAdapter adapter;
  46. public static void main(String[] args) {
  47. try {
  48. parseOptions(args);
  49. BasicConfigurator.configure();
  50. RootLogger.getRootLogger().setLevel(logLevel);
  51. // Logger.getLogger(FileReader.class).addAppender((Appender)RootLogger.getRootLogger().getAllAppenders().nextElement());
  52. // Logger.getLogger(FileReader.class).setLevel(TRACE);
  53. // Logger.getLogger(FileReader.class).setAdditivity(false);
  54. watcher = new FileWatcher();
  55. configManager = new ConfigurationManager(config);
  56. configManager.readConfiguration();
  57. for(FilesSection files : configManager.getConfig().getFiles()) {
  58. for(String path : files.getPaths()) {
  59. watcher.addFilesToWatch(path, new Event(files.getFields()), FileWatcher.ONE_DAY);
  60. }
  61. }
  62. reader = new FileReader(spoolSize);
  63. connectToServer();
  64. infiniteLoop();
  65. } catch(Exception e) {
  66. e.printStackTrace();
  67. System.exit(3);
  68. }
  69. }
  70. private static void infiniteLoop() throws IOException, InterruptedException {
  71. while(true) {
  72. try {
  73. watcher.checkFiles();
  74. while(watcher.readFiles(reader) == spoolSize);
  75. Thread.sleep(idleTimeout);
  76. } catch(AdapterException e) {
  77. try {
  78. logger.error("Lost server connection");
  79. Thread.sleep(configManager.getConfig().getNetwork().getTimeout() * 1000);
  80. connectToServer();
  81. } catch(Exception ex) {
  82. logger.error("Failed to reconnect to server : " + ex.getMessage());
  83. }
  84. }
  85. }
  86. }
  87. private static void connectToServer() throws NumberFormatException, IOException {
  88. String[] serverAndPort = configManager.getConfig().getNetwork().getServers().get(0).split(":");
  89. adapter = new LumberjackClient(configManager.getConfig().getNetwork().getSslCA(),serverAndPort[0],Integer.parseInt(serverAndPort[1]));
  90. reader.setAdapter(adapter);
  91. }
  92. @SuppressWarnings("static-access")
  93. static void parseOptions(String[] args) {
  94. Options options = new Options();
  95. Option helpOption = new Option("help", "print this message");
  96. Option quiet = new Option("quiet", "operate in quiet mode - only emit errors to log");
  97. Option debug = new Option("debug", "operate in debug mode");
  98. Option trace = new Option("trace", "operate in trace mode");
  99. Option spoolSizeOption = OptionBuilder.withArgName("number of events")
  100. .hasArg()
  101. .withDescription("event count spool threshold - forces network flush")
  102. .create("spoolsize");
  103. Option idleTimeoutOption = OptionBuilder.withArgName("")
  104. .hasArg()
  105. .withDescription("time between file reads in seconds")
  106. .create("idletimeout");
  107. Option configOption = OptionBuilder.withArgName("config file")
  108. .hasArg()
  109. .isRequired()
  110. .withDescription("path to logstash-forwarder configuration file")
  111. .create("config");
  112. options.addOption(helpOption)
  113. .addOption(idleTimeoutOption)
  114. .addOption(spoolSizeOption)
  115. .addOption(quiet)
  116. .addOption(debug)
  117. .addOption(trace)
  118. .addOption(configOption);
  119. CommandLineParser parser = new GnuParser();
  120. try {
  121. CommandLine line = parser.parse(options, args);
  122. if(line.hasOption("spoolsize")) {
  123. spoolSize = Integer.parseInt(line.getOptionValue("spoolsize"));
  124. }
  125. if(line.hasOption("idletimeout")) {
  126. idleTimeout = Integer.parseInt(line.getOptionValue("idletimeout"));
  127. }
  128. if(line.hasOption("config")) {
  129. config = line.getOptionValue("config");
  130. }
  131. if(line.hasOption("quiet")) {
  132. logLevel = ERROR;
  133. }
  134. if(line.hasOption("debug")) {
  135. logLevel = DEBUG;
  136. }
  137. if(line.hasOption("trace")) {
  138. logLevel = TRACE;
  139. }
  140. } catch(ParseException e) {
  141. printHelp(options);
  142. System.exit(1);;
  143. } catch(NumberFormatException e) {
  144. System.err.println("Value must be an integer");
  145. printHelp(options);
  146. System.exit(2);;
  147. }
  148. }
  149. private static void printHelp(Options options) {
  150. HelpFormatter formatter = new HelpFormatter();
  151. formatter.printHelp("logstash-forwarder", options);
  152. }
  153. }