Ver código fonte

Implemented stdin input (issue #5).

didfet 10 anos atrás
pai
commit
69d181dfd0

+ 16 - 1
src/main/java/info/fetter/logstashforwarder/FileWatcher.java

@@ -44,6 +44,8 @@ public class FileWatcher {
 	private FileState[] savedStates;
 	private int maxSignatureLength;
 	private boolean tail = false;
+	private Event stdinFields;
+	private boolean stdinConfigured = false;
 
 	public FileWatcher() {
 		try {
@@ -103,6 +105,17 @@ public class FileWatcher {
 		return numberOfLinesRead;
 	}
 
+	public int readStdin(InputReader reader) throws AdapterException, IOException {
+		if(stdinConfigured) {
+			logger.debug("Reading stdin");
+			reader.setFields(stdinFields);
+			int numberOfLinesRead = reader.readInput();
+			return numberOfLinesRead;
+		} else {
+			return 0;
+		}
+	}
+
 	private void processModifications() throws IOException {
 
 		for(File file : newWatchMap.keySet()) {
@@ -224,7 +237,9 @@ public class FileWatcher {
 	}
 
 	private void addStdIn(Event fields) {
-		logger.error("Watching stdin : not implemented yet");
+		logger.error("Watching stdin");
+		stdinFields = fields;
+		stdinConfigured = true;
 	}
 
 	private void initializeWatchMap(File directory, IOFileFilter fileFilter, Event fields) throws Exception {

+ 7 - 4
src/main/java/info/fetter/logstashforwarder/Forwarder.java

@@ -53,7 +53,8 @@ public class Forwarder {
 	private static String config;
 	private static ConfigurationManager configManager;
 	private static FileWatcher watcher;
-	private static FileReader reader;
+	private static FileReader fileReader;
+	private static InputReader inputReader;
 	private static Level logLevel = INFO;
 	private static ProtocolAdapter adapter;
 	private static Random random = new Random();
@@ -75,7 +76,8 @@ public class Forwarder {
 				}
 			}
 			watcher.initialize();
-			reader = new FileReader(spoolSize);
+			fileReader = new FileReader(spoolSize);
+			inputReader = new InputReader(spoolSize, System.in);
 			connectToServer();
 			infiniteLoop();
 		} catch(Exception e) {
@@ -88,7 +90,8 @@ public class Forwarder {
 		while(true) {
 			try {
 				watcher.checkFiles();
-				while(watcher.readFiles(reader) == spoolSize);
+				while(watcher.readFiles(fileReader) == spoolSize);
+				while(watcher.readStdin(inputReader) == spoolSize);
 				Thread.sleep(idleTimeout);
 			} catch(AdapterException e) {
 				logger.error("Lost server connection");
@@ -117,7 +120,7 @@ public class Forwarder {
 				String[] serverAndPort = serverList.get(randomServerIndex).split(":");
 				logger.info("Trying to connect to " + serverList.get(randomServerIndex));
 				adapter = new LumberjackClient(configManager.getConfig().getNetwork().getSslCA(),serverAndPort[0],Integer.parseInt(serverAndPort[1]), networkTimeout);
-				reader.setAdapter(adapter);
+				fileReader.setAdapter(adapter);
 			} catch(Exception ex) {
 				logger.error("Failed to connect to server " + serverList.get(randomServerIndex) + " : " + ex.getMessage());
 			}

+ 5 - 2
src/main/java/info/fetter/logstashforwarder/InputReader.java

@@ -32,10 +32,9 @@ public class InputReader extends Reader {
 	private long position = 0;
 	private Event fields;
 
-	public InputReader(int spoolSize, InputStream in, Event fields) {
+	public InputReader(int spoolSize, InputStream in) {
 		super(spoolSize);
 		reader = new BufferedReader(new InputStreamReader(in));
-		this.fields = fields;
 		stringBuilder = new StringBuilder(STRINGBUILDER_INITIAL_CAPACITY);
 	}
 	
@@ -88,5 +87,9 @@ public class InputReader extends Reader {
 		}
 		return null;
 	}
+	
+	public void setFields(Event fields) {
+		this.fields = fields;
+	}
 
 }

+ 2 - 2
src/test/java/info/fetter/logstashforwarder/InputReaderTest.java

@@ -52,7 +52,7 @@ public class InputReaderTest {
 		PipedInputStream in = new PipedInputStream();
 		PipedOutputStream out = new PipedOutputStream(in);
 		PrintWriter writer = new PrintWriter(out);
-		InputReader reader = new InputReader(2, in, null);
+		InputReader reader = new InputReader(2, in);
 		MockProtocolAdapter adapter = new MockProtocolAdapter();
 		reader.setAdapter(adapter);
 
@@ -103,7 +103,7 @@ public class InputReaderTest {
 		PipedInputStream in = new PipedInputStream();
 		PipedOutputStream out = new PipedOutputStream(in);
 		PrintWriter writer = new PrintWriter(out);
-		InputReader reader = new InputReader(2, in, null);
+		InputReader reader = new InputReader(2, in);
 		MockProtocolAdapter adapter = new MockProtocolAdapter();
 		reader.setAdapter(adapter);