Forwarder.java 6.9 KB


  1. package info.fetter.logstashforwarder;
  2. /*
  3. * Copyright 2015 Didier Fetter
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. import static org.apache.log4j.Level.*;
  19. import java.io.IOException;
  20. import java.util.List;
  21. import java.util.Random;
  22. import info.fetter.logstashforwarder.config.ConfigurationManager;
  23. import info.fetter.logstashforwarder.config.FilesSection;
  24. import info.fetter.logstashforwarder.protocol.LumberjackClient;
  25. import info.fetter.logstashforwarder.util.AdapterException;
  26. import org.apache.commons.cli.CommandLine;
  27. import org.apache.commons.cli.CommandLineParser;
  28. import org.apache.commons.cli.GnuParser;
  29. import org.apache.commons.cli.HelpFormatter;
  30. import org.apache.commons.cli.Option;
  31. import org.apache.commons.cli.OptionBuilder;
  32. import org.apache.commons.cli.Options;
  33. import org.apache.commons.cli.ParseException;
  34. import org.apache.log4j.BasicConfigurator;
  35. import org.apache.log4j.Level;
  36. import org.apache.log4j.Logger;
  37. import org.apache.log4j.spi.RootLogger;
  38. public class Forwarder {
  39. private static Logger logger = Logger.getLogger(Forwarder.class);
  40. private static int spoolSize = 1024;
  41. private static int idleTimeout = 5000;
  42. private static int networkTimeout = 15000;
  43. private static String config;
  44. private static ConfigurationManager configManager;
  45. private static FileWatcher watcher;
  46. private static FileReader reader;
  47. private static Level logLevel = INFO;
  48. private static ProtocolAdapter adapter;
  49. private static Random random = new Random();
  50. private static int signatureLength = 4096;
  51. private static boolean tailSelected = false;
  52. public static void main(String[] args) {
  53. try {
  54. parseOptions(args);
  55. BasicConfigurator.configure();
  56. RootLogger.getRootLogger().setLevel(logLevel);
  57. // Logger.getLogger(FileReader.class).addAppender((Appender)RootLogger.getRootLogger().getAllAppenders().nextElement());
  58. // Logger.getLogger(FileReader.class).setLevel(TRACE);
  59. // Logger.getLogger(FileReader.class).setAdditivity(false);
  60. watcher = new FileWatcher();
  61. watcher.setMaxSignatureLength(signatureLength);
  62. watcher.setTail(tailSelected);
  63. configManager = new ConfigurationManager(config);
  64. configManager.readConfiguration();
  65. for(FilesSection files : configManager.getConfig().getFiles()) {
  66. for(String path : files.getPaths()) {
  67. watcher.addFilesToWatch(path, new Event(files.getFields()), FileWatcher.ONE_DAY);
  68. }
  69. }
  70. watcher.initialize();
  71. reader = new FileReader(spoolSize);
  72. connectToServer();
  73. infiniteLoop();
  74. } catch(Exception e) {
  75. e.printStackTrace();
  76. System.exit(3);
  77. }
  78. }
  79. private static void infiniteLoop() throws IOException, InterruptedException {
  80. while(true) {
  81. try {
  82. watcher.checkFiles();
  83. while(watcher.readFiles(reader) == spoolSize);
  84. Thread.sleep(idleTimeout);
  85. } catch(AdapterException e) {
  86. logger.error("Lost server connection");
  87. Thread.sleep(networkTimeout);
  88. connectToServer();
  89. }
  90. }
  91. }
  92. private static void connectToServer() {
  93. int randomServerIndex = 0;
  94. List<String> serverList = configManager.getConfig().getNetwork().getServers();
  95. networkTimeout = configManager.getConfig().getNetwork().getTimeout() * 1000;
  96. if(adapter != null) {
  97. try {
  98. adapter.close();
  99. } catch(AdapterException e) {
  100. logger.error("Error while closing connection to " + adapter.getServer() + ":" + adapter.getPort());
  101. } finally {
  102. adapter = null;
  103. }
  104. }
  105. while(adapter == null) {
  106. try {
  107. randomServerIndex = random.nextInt(serverList.size());
  108. String[] serverAndPort = serverList.get(randomServerIndex).split(":");
  109. logger.info("Trying to connect to " + serverList.get(randomServerIndex));
  110. adapter = new LumberjackClient(configManager.getConfig().getNetwork().getSslCA(),serverAndPort[0],Integer.parseInt(serverAndPort[1]), networkTimeout);
  111. reader.setAdapter(adapter);
  112. } catch(Exception ex) {
  113. logger.error("Failed to connect to server " + serverList.get(randomServerIndex) + " : " + ex.getMessage());
  114. }
  115. }
  116. }
  117. @SuppressWarnings("static-access")
  118. static void parseOptions(String[] args) {
  119. Options options = new Options();
  120. Option helpOption = new Option("help", "print this message");
  121. Option quiet = new Option("quiet", "operate in quiet mode - only emit errors to log");
  122. Option debug = new Option("debug", "operate in debug mode");
  123. Option trace = new Option("trace", "operate in trace mode");
  124. Option tail = new Option("tail", "read new files from the end");
  125. Option spoolSizeOption = OptionBuilder.withArgName("number of events")
  126. .hasArg()
  127. .withDescription("event count spool threshold - forces network flush")
  128. .create("spoolsize");
  129. Option idleTimeoutOption = OptionBuilder.withArgName("")
  130. .hasArg()
  131. .withDescription("time between file reads in seconds")
  132. .create("idletimeout");
  133. Option configOption = OptionBuilder.withArgName("config file")
  134. .hasArg()
  135. .isRequired()
  136. .withDescription("path to logstash-forwarder configuration file")
  137. .create("config");
  138. Option signatureLengthOption = OptionBuilder.withArgName("signature length")
  139. .hasArg()
  140. .withDescription("Maximum length of file signature")
  141. .create("signaturelength");
  142. options.addOption(helpOption)
  143. .addOption(idleTimeoutOption)
  144. .addOption(spoolSizeOption)
  145. .addOption(quiet)
  146. .addOption(debug)
  147. .addOption(trace)
  148. .addOption(tail)
  149. .addOption(signatureLengthOption)
  150. .addOption(configOption);
  151. CommandLineParser parser = new GnuParser();
  152. try {
  153. CommandLine line = parser.parse(options, args);
  154. if(line.hasOption("spoolsize")) {
  155. spoolSize = Integer.parseInt(line.getOptionValue("spoolsize"));
  156. }
  157. if(line.hasOption("idletimeout")) {
  158. idleTimeout = Integer.parseInt(line.getOptionValue("idletimeout"));
  159. }
  160. if(line.hasOption("config")) {
  161. config = line.getOptionValue("config");
  162. }
  163. if(line.hasOption("signaturelength")) {
  164. signatureLength = Integer.parseInt(line.getOptionValue("signaturelength"));
  165. }
  166. if(line.hasOption("quiet")) {
  167. logLevel = ERROR;
  168. }
  169. if(line.hasOption("debug")) {
  170. logLevel = DEBUG;
  171. }
  172. if(line.hasOption("trace")) {
  173. logLevel = TRACE;
  174. }
  175. if(line.hasOption("tail")) {
  176. tailSelected = true;
  177. }
  178. } catch(ParseException e) {
  179. printHelp(options);
  180. System.exit(1);;
  181. } catch(NumberFormatException e) {
  182. System.err.println("Value must be an integer");
  183. printHelp(options);
  184. System.exit(2);;
  185. }
  186. }
  187. private static void printHelp(Options options) {
  188. HelpFormatter formatter = new HelpFormatter();
  189. formatter.printHelp("logstash-forwarder", options);
  190. }
  191. }