Sfoglia il codice sorgente

Created FileReader and ProtocolAdapter

didfet 10 anni fa
parent
commit
340076f850

+ 56 - 0
src/main/java/info/fetter/logstashforwarder/FileReader.java

@@ -0,0 +1,56 @@
+package info.fetter.logstashforwarder;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/*
+ * Copyright 2015 Didier Fetter
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+public class FileReader {
+	private static final int DEFAULT_SPOOL_SIZE = 1024;
+	private ProtocolAdapter adapter;
+	private int spoolSize = DEFAULT_SPOOL_SIZE;
+	private List<Event> eventList;
+	
+	public FileReader(int spoolSize) {
+		this.spoolSize = spoolSize;
+		eventList = new ArrayList<Event>(spoolSize);
+	}
+	
+	public int readFiles(List<FileState> fileList) {
+		// TODO: Read files and send events until there's nothing left to read or spool size reached
+		int eventCounter = 0;
+		for(FileState state : fileList) {
+			eventCounter += readFile(state, spoolSize - eventCounter);
+		}
+		return 0; // Return number of events sent to adapter
+	}
+
+	private int readFile(FileState state, int spaceLeftInSpool) {
+		// TODO Read file
+		return 0; // Return number of events read
+	}
+
+	public ProtocolAdapter getAdapter() {
+		return adapter;
+	}
+
+	public void setAdapter(ProtocolAdapter adapter) {
+		this.adapter = adapter;
+	}
+
+}

+ 6 - 25
src/main/java/info/fetter/logstashforwarder/FileWatcher.java

@@ -55,11 +55,11 @@ public class FileWatcher implements FileAlterationListener {
 	public void addFilesToWatch(String fileToWatch) {
 		try {
 			if(fileToWatch.equals("-")) {
-				watchStdIn();
+				addStdIn();
 			} else if(fileToWatch.contains("*")) {
-				watchWildCardFiles(fileToWatch);
+				addWildCardFiles(fileToWatch);
 			} else {
-				watchFile(fileToWatch);
+				addSingleFile(fileToWatch);
 			}
 		} catch(Exception e) {
 			throw new RuntimeException(e);
@@ -69,12 +69,10 @@ public class FileWatcher implements FileAlterationListener {
 	public void checkFiles() throws IOException {
 		logger.debug("Checking files");
 		logger.trace("==============");
-		
 		for(FileAlterationObserver observer : observerList) {
 			observer.checkAndNotify();
 		}
 		processModifications();
-
 		printWatchMap();
 	}
 	
@@ -95,39 +93,31 @@ public class FileWatcher implements FileAlterationListener {
 				logger.trace("-- Filename : " + state.getFileName());
 			}
 
-			// Determine if file is still the same
 			logger.trace("Determine if file has just been written to");
 			FileState oldState = watchMap.get(file);
 			if(oldState != null) {
 				if(oldState.getSize() > state.getSize()) {
-					// File is shorter, can't be the same
 					logger.trace("File shorter : file can't be the same");
 				} else {
 					if(oldState.getSignatureLength() == state.getSignatureLength() && oldState.getSignature() == state.getSignature()) {
-						// File is the same
 						state.setOldFileState(oldState);
 						logger.trace("Same signature size and value : file is the same");
 						continue;
 					} else if(oldState.getSignatureLength() < state.getSignatureLength()){
-						// Compute the signature on the new file
 						long signature = FileSigner.computeSignature(state.getRandomAccessFile(), oldState.getSignatureLength());
 						if(signature == oldState.getSignature()) {
-							// File is the same
 							state.setOldFileState(oldState);
 							logger.trace("Same signature : file is the same");
 							continue;
 						} else {
-							// File can't be the same
 							logger.trace("Signature different : file can't be the same");
 						}
 					} else if(oldState.getSignatureLength() > state.getSignatureLength()){
-						// File can't be the same
 						logger.trace("Signature shorter : file can't be the same");
 					}
 				}
 			}
 
-			// Determine if file has been renamed and/or written to
 			if(state.getOldFileState() == null) {
 				logger.trace("Determine if file has been renamed and/or written to");
 				for(File otherFile : watchMap.keySet()) {
@@ -136,26 +126,20 @@ public class FileWatcher implements FileAlterationListener {
 						if(logger.isTraceEnabled()) {
 							logger.trace("Comparing to : " + otherFile.getCanonicalPath());
 						}
-						// File in the same directory which was smaller
 						if(otherState.getSignatureLength() == state.getSignatureLength() && otherState.getSignature() == state.getSignature()) {
-							// File is the same
 							state.setOldFileState(otherState);
 							logger.trace("Same signature size and value : file is the same");
 							break;
 						} else if(otherState.getSignatureLength() < state.getSignatureLength()){
-							// Compute the signature on the new file
 							long signature = FileSigner.computeSignature(state.getRandomAccessFile(), otherState.getSignatureLength());
 							if(signature == otherState.getSignature()) {
-								// File is the same
 								state.setOldFileState(otherState);
 								logger.trace("Same signature : file is the same");
 								break;
 							} else {
-								// File can't be the same
 								logger.trace("Signature different : file can't be the same");
 							}
 						} else if(otherState.getSignatureLength() > state.getSignatureLength()){
-							// File can't be the same
 							logger.trace("Signature shorter : file can't be the same");
 						}
 					}
@@ -163,7 +147,6 @@ public class FileWatcher implements FileAlterationListener {
 			}
 		}
 
-		// Refresh file state
 		logger.trace("Refreshing file state");
 		for(File file : changedWatchMap.keySet()) {
 			if(logger.isTraceEnabled()) {
@@ -180,10 +163,8 @@ public class FileWatcher implements FileAlterationListener {
 			}
 		}
 
-		// Replacing old state
 		logger.trace("Replacing old state");
 		for(File file : changedWatchMap.keySet()) {
-			//logger.trace("Replacing file : " + file.getCanonicalPath());
 			FileState state = changedWatchMap.get(file);
 			watchMap.put(file, state);
 		}
@@ -194,7 +175,7 @@ public class FileWatcher implements FileAlterationListener {
 		removeMarkedFilesFromWatchMap();
 	}
 
-	private void watchFile(String fileToWatch) throws Exception {
+	private void addSingleFile(String fileToWatch) throws Exception {
 		logger.info("Watching file : " + new File(fileToWatch).getCanonicalPath());
 		String directory = FilenameUtils.getFullPath(fileToWatch);
 		String fileName = FilenameUtils.getName(fileToWatch); 
@@ -205,7 +186,7 @@ public class FileWatcher implements FileAlterationListener {
 		initializeWatchMap(new File(directory), fileFilter);
 	}
 
-	private void watchWildCardFiles(String filesToWatch) throws Exception {
+	private void addWildCardFiles(String filesToWatch) throws Exception {
 		logger.info("Watching wildcard files : " + filesToWatch);
 		String directory = FilenameUtils.getFullPath(filesToWatch);
 		String wildcard = FilenameUtils.getName(filesToWatch);
@@ -217,7 +198,7 @@ public class FileWatcher implements FileAlterationListener {
 		initializeWatchMap(new File(directory), fileFilter);
 	}
 
-	private void watchStdIn() {
+	private void addStdIn() {
 		logger.info("Watching stdin : not implemented yet");
 	}
 

+ 26 - 0
src/main/java/info/fetter/logstashforwarder/ProtocolAdapter.java

@@ -0,0 +1,26 @@
+package info.fetter.logstashforwarder;
+
+/*
+ * Copyright 2015 Didier Fetter
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+import java.io.IOException;
+import java.util.List;
+
+public interface ProtocolAdapter {
+	public int sendEvents(List<Event> eventList) throws IOException;
+	public void close() throws IOException;
+}

+ 2 - 1
src/main/java/info/fetter/logstashforwarder/protocol/LumberjackClient.java

@@ -18,6 +18,7 @@ package info.fetter.logstashforwarder.protocol;
  */
 
 import info.fetter.logstashforwarder.Event;
+import info.fetter.logstashforwarder.ProtocolAdapter;
 
 import java.io.BufferedOutputStream;
 import java.io.ByteArrayOutputStream;
@@ -40,7 +41,7 @@ import javax.net.ssl.TrustManagerFactory;
 import org.apache.commons.io.HexDump;
 import org.apache.log4j.Logger;
 
-public class LumberjackClient {
+public class LumberjackClient implements ProtocolAdapter {
 	private final static Logger logger = Logger.getLogger(LumberjackClient.class);
 	private final static byte PROTOCOL_VERSION = 0x31;
 	private final static byte FRAME_ACK = 0x41;