Forwarder.java 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287
  1. package info.fetter.logstashforwarder;
  2. /*
  3. * Copyright 2015 Didier Fetter
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. import static org.apache.log4j.Level.*;
  19. import java.io.IOException;
  20. import java.util.List;
  21. import java.util.Random;
  22. import info.fetter.logstashforwarder.config.ConfigurationManager;
  23. import info.fetter.logstashforwarder.config.FilesSection;
  24. import info.fetter.logstashforwarder.protocol.LumberjackClient;
  25. import info.fetter.logstashforwarder.util.AdapterException;
  26. import org.apache.commons.cli.CommandLine;
  27. import org.apache.commons.cli.CommandLineParser;
  28. import org.apache.commons.cli.GnuParser;
  29. import org.apache.commons.cli.HelpFormatter;
  30. import org.apache.commons.cli.Option;
  31. import org.apache.commons.cli.OptionBuilder;
  32. import org.apache.commons.cli.Options;
  33. import org.apache.commons.cli.ParseException;
  34. import org.apache.log4j.Appender;
  35. import org.apache.log4j.BasicConfigurator;
  36. import org.apache.log4j.ConsoleAppender;
  37. import org.apache.log4j.Layout;
  38. import org.apache.log4j.Level;
  39. import org.apache.log4j.Logger;
  40. import org.apache.log4j.PatternLayout;
  41. import org.apache.log4j.RollingFileAppender;
  42. import org.apache.log4j.spi.RootLogger;
  43. public class Forwarder {
  44. private static final String SINCEDB = ".logstash-forwarder-java";
  45. private static Logger logger = Logger.getLogger(Forwarder.class);
  46. private static int spoolSize = 1024;
  47. private static int idleTimeout = 5000;
  48. private static int networkTimeout = 15000;
  49. private static String config;
  50. private static ConfigurationManager configManager;
  51. private static FileWatcher watcher;
  52. private static FileReader fileReader;
  53. private static InputReader inputReader;
  54. private static Level logLevel = INFO;
  55. private static boolean debugWatcherSelected = false;
  56. private static ProtocolAdapter adapter;
  57. private static Random random = new Random();
  58. private static int signatureLength = 4096;
  59. private static boolean tailSelected = false;
  60. private static String logfile = null;
  61. private static String logfileSize = "10MB";
  62. private static int logfileNumber = 5;
  63. private static String sincedbFile = SINCEDB;
  64. public static void main(String[] args) {
  65. try {
  66. parseOptions(args);
  67. setupLogging();
  68. watcher = new FileWatcher();
  69. watcher.setMaxSignatureLength(signatureLength);
  70. watcher.setTail(tailSelected);
  71. watcher.setSincedb(sincedbFile);
  72. configManager = new ConfigurationManager(config);
  73. configManager.readConfiguration();
  74. for(FilesSection files : configManager.getConfig().getFiles()) {
  75. for(String path : files.getPaths()) {
  76. watcher.addFilesToWatch(path, new Event(files.getFields()), files.getDeadTimeInSeconds() * 1000, files.getMultiline());
  77. }
  78. }
  79. watcher.initialize();
  80. fileReader = new FileReader(spoolSize);
  81. inputReader = new InputReader(spoolSize, System.in);
  82. connectToServer();
  83. infiniteLoop();
  84. } catch(Exception e) {
  85. e.printStackTrace();
  86. System.exit(3);
  87. }
  88. }
  89. private static void infiniteLoop() throws IOException, InterruptedException {
  90. while(true) {
  91. try {
  92. watcher.checkFiles();
  93. while(watcher.readFiles(fileReader) == spoolSize);
  94. while(watcher.readStdin(inputReader) == spoolSize);
  95. Thread.sleep(idleTimeout);
  96. } catch(AdapterException e) {
  97. logger.error("Lost server connection");
  98. Thread.sleep(networkTimeout);
  99. connectToServer();
  100. }
  101. }
  102. }
  103. private static void connectToServer() {
  104. int randomServerIndex = 0;
  105. List<String> serverList = configManager.getConfig().getNetwork().getServers();
  106. networkTimeout = configManager.getConfig().getNetwork().getTimeout() * 1000;
  107. if(adapter != null) {
  108. try {
  109. adapter.close();
  110. } catch(AdapterException e) {
  111. logger.error("Error while closing connection to " + adapter.getServer() + ":" + adapter.getPort());
  112. } finally {
  113. adapter = null;
  114. }
  115. }
  116. while(adapter == null) {
  117. try {
  118. randomServerIndex = random.nextInt(serverList.size());
  119. String[] serverAndPort = serverList.get(randomServerIndex).split(":");
  120. logger.info("Trying to connect to " + serverList.get(randomServerIndex));
  121. adapter = new LumberjackClient(configManager.getConfig().getNetwork().getSslCA(),serverAndPort[0],Integer.parseInt(serverAndPort[1]), networkTimeout);
  122. fileReader.setAdapter(adapter);
  123. inputReader.setAdapter(adapter);
  124. } catch(Exception ex) {
  125. if(logger.isDebugEnabled()) {
  126. logger.error("Failed to connect to server " + serverList.get(randomServerIndex) + " : ", ex);
  127. } else {
  128. logger.error("Failed to connect to server " + serverList.get(randomServerIndex) + " : " + ex.getMessage());
  129. }
  130. try {
  131. Thread.sleep(networkTimeout);
  132. } catch (InterruptedException e) {
  133. logger.error(e.getMessage());
  134. }
  135. }
  136. }
  137. }
  138. @SuppressWarnings("static-access")
  139. static void parseOptions(String[] args) {
  140. Options options = new Options();
  141. Option helpOption = new Option("help", "print this message");
  142. Option quietOption = new Option("quiet", "operate in quiet mode - only emit errors to log");
  143. Option debugOption = new Option("debug", "operate in debug mode");
  144. Option debugWatcherOption = new Option("debugwatcher", "operate watcher in debug mode");
  145. Option traceOption = new Option("trace", "operate in trace mode");
  146. Option tailOption = new Option("tail", "read new files from the end");
  147. Option spoolSizeOption = OptionBuilder.withArgName("number of events")
  148. .hasArg()
  149. .withDescription("event count spool threshold - forces network flush")
  150. .create("spoolsize");
  151. Option idleTimeoutOption = OptionBuilder.withArgName("")
  152. .hasArg()
  153. .withDescription("time between file reads in seconds")
  154. .create("idletimeout");
  155. Option configOption = OptionBuilder.withArgName("config file")
  156. .hasArg()
  157. .isRequired()
  158. .withDescription("path to logstash-forwarder configuration file")
  159. .create("config");
  160. Option signatureLengthOption = OptionBuilder.withArgName("signature length")
  161. .hasArg()
  162. .withDescription("Maximum length of file signature")
  163. .create("signaturelength");
  164. Option logfileOption = OptionBuilder.withArgName("logfile name")
  165. .hasArg()
  166. .withDescription("Logfile name")
  167. .create("logfile");
  168. Option logfileSizeOption = OptionBuilder.withArgName("logfile size")
  169. .hasArg()
  170. .withDescription("Logfile size (default 10M)")
  171. .create("logfilesize");
  172. Option logfileNumberOption = OptionBuilder.withArgName("number of logfiles")
  173. .hasArg()
  174. .withDescription("Number of logfiles (default 5)")
  175. .create("logfilenumber");
  176. Option sincedbOption = OptionBuilder.withArgName("sincedb file")
  177. .hasArg()
  178. .withDescription("Sincedb file name")
  179. .create("sincedb");
  180. options.addOption(helpOption)
  181. .addOption(idleTimeoutOption)
  182. .addOption(spoolSizeOption)
  183. .addOption(quietOption)
  184. .addOption(debugOption)
  185. .addOption(debugWatcherOption)
  186. .addOption(traceOption)
  187. .addOption(tailOption)
  188. .addOption(signatureLengthOption)
  189. .addOption(configOption)
  190. .addOption(logfileOption)
  191. .addOption(logfileNumberOption)
  192. .addOption(logfileSizeOption)
  193. .addOption(sincedbOption);
  194. CommandLineParser parser = new GnuParser();
  195. try {
  196. CommandLine line = parser.parse(options, args);
  197. if(line.hasOption("spoolsize")) {
  198. spoolSize = Integer.parseInt(line.getOptionValue("spoolsize"));
  199. }
  200. if(line.hasOption("idletimeout")) {
  201. idleTimeout = Integer.parseInt(line.getOptionValue("idletimeout"));
  202. }
  203. if(line.hasOption("config")) {
  204. config = line.getOptionValue("config");
  205. }
  206. if(line.hasOption("signaturelength")) {
  207. signatureLength = Integer.parseInt(line.getOptionValue("signaturelength"));
  208. }
  209. if(line.hasOption("quiet")) {
  210. logLevel = ERROR;
  211. }
  212. if(line.hasOption("debug")) {
  213. logLevel = DEBUG;
  214. }
  215. if(line.hasOption("trace")) {
  216. logLevel = TRACE;
  217. }
  218. if(line.hasOption("debugwatcher")) {
  219. debugWatcherSelected = true;
  220. }
  221. if(line.hasOption("tail")) {
  222. tailSelected = true;
  223. }
  224. if(line.hasOption("logfile")) {
  225. logfile = line.getOptionValue("logfile");
  226. }
  227. if(line.hasOption("logfilesize")) {
  228. logfileSize = line.getOptionValue("logfilesize");
  229. }
  230. if(line.hasOption("logfilenumber")) {
  231. logfileNumber = Integer.parseInt(line.getOptionValue("logfilenumber"));
  232. }
  233. if(line.hasOption("sincedb")) {
  234. sincedbFile = line.getOptionValue("sincedb");
  235. }
  236. } catch(ParseException e) {
  237. printHelp(options);
  238. System.exit(1);;
  239. } catch(NumberFormatException e) {
  240. System.err.println("Value must be an integer");
  241. printHelp(options);
  242. System.exit(2);;
  243. }
  244. }
  245. private static void printHelp(Options options) {
  246. HelpFormatter formatter = new HelpFormatter();
  247. formatter.printHelp("logstash-forwarder", options);
  248. }
  249. private static void setupLogging() throws IOException {
  250. Appender appender;
  251. Layout layout = new PatternLayout("%d %p %c{1} - %m%n");
  252. if(logfile == null) {
  253. appender = new ConsoleAppender(layout);
  254. } else {
  255. RollingFileAppender rolling = new RollingFileAppender(layout, logfile, true);
  256. rolling.setMaxFileSize(logfileSize);
  257. rolling.setMaxBackupIndex(logfileNumber);
  258. appender = rolling;
  259. }
  260. BasicConfigurator.configure(appender);
  261. RootLogger.getRootLogger().setLevel(logLevel);
  262. if(debugWatcherSelected) {
  263. Logger.getLogger(FileWatcher.class).addAppender(appender);
  264. Logger.getLogger(FileWatcher.class).setLevel(DEBUG);
  265. Logger.getLogger(FileWatcher.class).setAdditivity(false);
  266. }
  267. // Logger.getLogger(FileReader.class).addAppender((Appender)RootLogger.getRootLogger().getAllAppenders().nextElement());
  268. // Logger.getLogger(FileReader.class).setLevel(TRACE);
  269. // Logger.getLogger(FileReader.class).setAdditivity(false);
  270. }
  271. }