浏览代码

Refactored Watcher and Reader.

didfet 9 年之前
父节点
当前提交
adb969518d

+ 14 - 8
src/main/java/info/fetter/logstashforwarder/FileWatcher.java

@@ -45,6 +45,7 @@ public class FileWatcher implements Watcher {
 	private Map<File,FileState> newWatchMap = new HashMap<File,FileState>();
 	private FileState[] savedStates;
 	private Parameters parameters;
+	private FileReader reader;
 
 	public FileWatcher() {
 	}
@@ -92,14 +93,6 @@ public class FileWatcher implements Watcher {
 		printWatchMap();
 	}
 
-	public int readFiles(FileReader reader) throws IOException, AdapterException {
-		logger.trace("Reading files");
-		logger.trace("==============");
-		int numberOfLinesRead = reader.readFiles(oldWatchMap.values());
-		Registrar.writeStateToJson(parameters.getSincedbFile(),oldWatchMap.values());
-		return numberOfLinesRead;
-	}
-
 	private void detectModifications() throws IOException {
 
 		for(File file : newWatchMap.keySet()) {
@@ -356,6 +349,19 @@ public class FileWatcher implements Watcher {
 		} catch(Exception e) {
 			logger.warn("Could not load saved states : " + e.getMessage(), e);
 		}
+		reader = new FileReader(parameters.getSpoolSize());
+	}
+
+	public Reader getReader() {
+		return reader;
+	}
+	
+	public int readFiles() throws IOException, AdapterException {
+		logger.trace("Reading files");
+		logger.trace("==============");
+		int numberOfLinesRead = reader.readFiles(oldWatchMap.values());
+		Registrar.writeStateToJson(parameters.getSincedbFile(),oldWatchMap.values());
+		return numberOfLinesRead;
 	}
 
 }

+ 4 - 8
src/main/java/info/fetter/logstashforwarder/Forwarder.java

@@ -43,9 +43,7 @@ public class Forwarder {
 	private static Logger logger = Logger.getLogger(Forwarder.class);
 	private static ConfigurationManager configManager;
 	private static FileWatcher fileWatcher;
-	private static FileReader fileReader;
 	private static InputWatcher inputWatcher;
-	private static InputReader inputReader;
 	private static ProtocolAdapter adapter;
 	private static Random random = new Random();
 	private static Parameters parameters;
@@ -65,8 +63,6 @@ public class Forwarder {
 				fileWatcher.addFilesToWatch(files);
 			}
 			fileWatcher.initialize();
-			fileReader = new FileReader(parameters.getSpoolSize());
-			inputReader = new InputReader(parameters.getSpoolSize(), System.in);
 			connectToServer();
 			infiniteLoop();
 		} catch(Exception e) {
@@ -79,8 +75,8 @@ public class Forwarder {
 		while(true) {
 			try {
 				fileWatcher.checkFiles();
-				while(fileWatcher.readFiles(fileReader) == parameters.getSpoolSize());
-				while(inputWatcher.readStdin(inputReader) == parameters.getSpoolSize());
+				while(fileWatcher.readFiles() == parameters.getSpoolSize());
+				while(inputWatcher.readFiles() == parameters.getSpoolSize());
 				Thread.sleep(parameters.getIdleTimeout());
 			} catch(AdapterException e) {
 				logger.error("Lost server connection");
@@ -109,8 +105,8 @@ public class Forwarder {
 				String[] serverAndPort = serverList.get(randomServerIndex).split(":");
 				logger.info("Trying to connect to " + serverList.get(randomServerIndex));
 				adapter = new LumberjackClient(configManager.getConfig().getNetwork().getSslCA(),serverAndPort[0],Integer.parseInt(serverAndPort[1]), networkTimeout);
-				fileReader.setAdapter(adapter);
-				inputReader.setAdapter(adapter);
+				fileWatcher.getReader().setAdapter(adapter);
+				inputWatcher.getReader().setAdapter(adapter);
 			} catch(Exception ex) {
 				if(logger.isDebugEnabled()) {
 					logger.error("Failed to connect to server " + serverList.get(randomServerIndex) + " : ", ex);