Bladeren bron

Implemented FileWatcher.

didfet 10 jaren geleden
bovenliggende
commit
9b7c7b0760

+ 67 - 3
src/main/java/info/fetter/logstashforwarder/FileState.java

@@ -19,19 +19,27 @@ package info.fetter.logstashforwarder;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.RandomAccessFile;
 
 public class FileState {
 	private File file;
 	private String filePath;
 	private long lastModified;
 	private long size;
+	private boolean deleted = false;
+	private long signature;
+	private int signatureLength;
+	private boolean changed = false;
+	private RandomAccessFile randomAccessFile;
+	private long pointer = 0;
+	private FileState oldFileState;
+	private String fileName;
 	
 	public FileState(File file) throws IOException {
 		this.file = file;
 		filePath = file.getCanonicalPath();
-	}
-	
-	public void refresh() {
+		fileName = file.getName();
+		randomAccessFile = new RandomAccessFile(file, "r");
 		lastModified = file.lastModified();
 		size = file.length();
 	}
@@ -52,4 +60,60 @@ public class FileState {
 		return filePath;
 	}
 	
+	public boolean isDeleted() {
+		return deleted;
+	}
+	
+	public void setDeleted() {
+		deleted = true;
+	}
+	
+	public boolean hasChanged() {
+		return changed;
+	}
+	
+	public void setChanged(boolean changed) {
+		this.changed = changed;
+	}
+
+	public long getSignature() {
+		return signature;
+	}
+
+	public void setSignature(long signature) {
+		this.signature = signature;
+	}
+
+	public RandomAccessFile getRandomAccessFile() {
+		return randomAccessFile;
+	}
+
+	public long getPointer() {
+		return pointer;
+	}
+	
+	public void setPointer(long pointer) {
+		this.pointer = pointer;
+	}
+
+	public int getSignatureLength() {
+		return signatureLength;
+	}
+
+	public void setSignatureLength(int signatureLength) {
+		this.signatureLength = signatureLength;
+	}
+
+	public FileState getOldFileState() {
+		return oldFileState;
+	}
+
+	public void setOldFileState(FileState oldFileState) {
+		this.oldFileState = oldFileState;
+	}
+
+	public String getFileName() {
+		return fileName;
+	}
+	
 }

+ 192 - 31
src/main/java/info/fetter/logstashforwarder/FileWatcher.java

@@ -41,6 +41,8 @@ public class FileWatcher implements FileAlterationListener {
 	private static final int ONE_DAY = 24 * 3600 * 1000;
 	private long deadTime;
 	private Map<File,FileState> watchMap = new HashMap<File,FileState>();
+	private Map<File,FileState> changedWatchMap = new HashMap<File,FileState>();
+	private static int MAX_SIGNATURE_LENGTH = 1024;
 
 	public FileWatcher(long deadTime) {
 		this.deadTime = deadTime;
@@ -64,6 +66,129 @@ public class FileWatcher implements FileAlterationListener {
 		}
 	}
 
+	public void checkFiles() throws IOException {
+		logger.debug("Checking files");
+		printWatchMap();
+		for(FileAlterationObserver observer : observerList) {
+			observer.checkAndNotify();
+		}
+		processModifications();
+
+		printWatchMap();
+	}
+
+	private void processModifications() throws IOException {
+		removeMarkedFilesFromWatchMap();
+
+		for(File file : changedWatchMap.keySet()) {
+			FileState state = changedWatchMap.get(file);
+			logger.trace("Checking file : " + file.getCanonicalPath());
+
+			// Determine if file is still the same
+			logger.trace("Determine if file name has not changed");
+			FileState oldState = watchMap.get(file);
+			if(oldState != null) {
+				if(oldState.getSize() > state.getSize()) {
+					// File is shorter, can't be the same
+					logger.trace("File shorter : file can't be the same");
+				} else {
+					if(oldState.getSignatureLength() == state.getSignatureLength() && oldState.getSignature() == state.getSignature()) {
+						// File is the same
+						state.setOldFileState(oldState);
+						logger.trace("Same signature size and value : file is the same");
+					} else if(oldState.getSignatureLength() < state.getSignatureLength()){
+						// Compute the signature on the new file
+						long signature = FileSigner.computeSignature(state.getRandomAccessFile(), oldState.getSignatureLength());
+						if(signature == oldState.getSignature()) {
+							// File is the same
+							state.setOldFileState(oldState);
+							logger.trace("Same signature : file is the same");
+						} else {
+							// File can't be the same
+							logger.trace("Signature different : file can't be the same");
+						}
+					} else if(oldState.getSignatureLength() > state.getSignatureLength()){
+						// File can't be the same
+						logger.trace("Signature shorter : file can't be the same");
+					}
+				}
+			}
+
+			// Determine if there was a file with the same size and last modification date in the same directory and with a different name
+			logger.trace("Determine if file has just been renamed");
+			if(state.getOldFileState() == null) {
+				for(File otherFile : changedWatchMap.keySet()) {
+					FileState otherState = watchMap.get(otherFile);
+					if(otherState != null
+							&& state.getLastModified() == otherState.getLastModified()
+							&& state.getSize() == otherState.getSize() 
+							&& state.getFilePath().equals(otherState.getFilePath())
+							&& ! state.getFileName().equals(otherState.getFileName())) {
+						logger.trace("Comparing to : " + otherFile.getCanonicalPath());
+						// Assume file has been renamed
+						state.setOldFileState(otherState);
+						logger.trace("Same directory, same size and last modification date : file has been renamed to : " + otherFile.getCanonicalPath());
+					}
+				}
+			}
+
+			// Determine if file has been renamed and appended
+			logger.trace("Determine if file has been renamed and appended");
+			if(state.getOldFileState() == null) {
+				for(File otherFile : changedWatchMap.keySet()) {
+					FileState otherState = watchMap.get(otherFile);
+					if(otherState != null && state.getSize() > otherState.getSize() && state.getFilePath().equals(otherState.getFilePath())) {
+						logger.trace("Comparing to : " + otherFile.getCanonicalPath());
+						// File in the same directory which was smaller
+						if(otherState.getSignatureLength() == state.getSignatureLength() && otherState.getSignature() == state.getSignature()) {
+							// File is the same
+							state.setOldFileState(otherState);
+							logger.trace("Same signature size and value : file is the same");
+						} else if(otherState.getSignatureLength() < state.getSignatureLength()){
+							// Compute the signature on the new file
+							long signature = FileSigner.computeSignature(state.getRandomAccessFile(), otherState.getSignatureLength());
+							if(signature == otherState.getSignature()) {
+								// File is the same
+								state.setOldFileState(otherState);
+								logger.trace("Same signature : file is the same");
+							} else {
+								// File can't be the same
+								logger.trace("Signature different : file can't be the same");
+							}
+						} else if(otherState.getSignatureLength() > state.getSignatureLength()){
+							// File can't be the same
+							logger.trace("Signature shorter : file can't be the same");
+						}
+					}
+				}
+			}
+		}
+
+		// Refresh file state
+		logger.trace("refreshing file state");
+		for(File file : changedWatchMap.keySet()) {
+			logger.trace("Refreshing file : " + file.getCanonicalPath());
+			FileState state = changedWatchMap.get(file);
+			FileState oldState = state.getOldFileState();
+			if(oldState == null) {
+				logger.trace("File has been truncated or created, not retrieving pointer");
+			} else {
+				logger.trace("File has not been truncated or created, retrieving pointer");
+				state.setPointer(oldState.getPointer());
+				oldState.getRandomAccessFile().close();
+			}
+		}
+
+		// Replacing old state
+		logger.trace("Replacing old state");
+		for(File file : changedWatchMap.keySet()) {
+			logger.trace("Replacing file : " + file.getCanonicalPath());
+			FileState state = changedWatchMap.get(file);
+			watchMap.put(file, state);
+		}
+
+	}
+
 	private void watchFile(String fileToWatch) throws Exception {
 		logger.info("Watching file : " + new File(fileToWatch).getCanonicalPath());
 		String directory = FilenameUtils.getFullPath(fileToWatch);
@@ -72,7 +197,7 @@ public class FileWatcher implements FileAlterationListener {
 				FileFilterUtils.fileFileFilter(),
 				FileFilterUtils.nameFileFilter(fileName),
 				new LastModifiedFileFilter(deadTime));
-		updateWatchMap(new File(directory), fileFilter);
+		initializeWatchMap(new File(directory), fileFilter);
 	}
 
 	private void watchWildCardFiles(String filesToWatch) throws Exception {
@@ -84,61 +209,105 @@ public class FileWatcher implements FileAlterationListener {
 				FileFilterUtils.fileFileFilter(),
 				new WildcardFileFilter(wildcard),
 				new LastModifiedFileFilter(deadTime));
-		updateWatchMap(new File(directory), fileFilter);
+		initializeWatchMap(new File(directory), fileFilter);
 	}
 
 	private void watchStdIn() {
 		logger.info("Watching stdin : not implemented yet");
 	}
 
-	private void updateWatchMap(File directory, IOFileFilter fileFilter) throws Exception {
+	private void initializeWatchMap(File directory, IOFileFilter fileFilter) throws Exception {
 		FileAlterationObserver observer = new FileAlterationObserver(directory, fileFilter);
 		observer.addListener(this);
 		observerList.add(observer);
 		observer.initialize();
 		for(File file : FileUtils.listFiles(directory, fileFilter, null)) {
-			addFileToWatchMap(file);
+			addFileToWatchMap(watchMap, file);
 		}
 	}
 
-	private void addFileToWatchMap(File file) {
+	private void addFileToWatchMap(Map<File,FileState> map, File file) {
 		try {
 			FileState state = new FileState(file);
-			state.refresh();
-			watchMap.put(file, state);
+			int signatureLength = (int) (state.getSize() > MAX_SIGNATURE_LENGTH ? MAX_SIGNATURE_LENGTH : state.getSize());
+			state.setSignatureLength(signatureLength);
+			long signature = FileSigner.computeSignature(state.getRandomAccessFile(), signatureLength);
+			state.setSignature(signature);
+			logger.trace("Setting signature of size : " + signatureLength + " on file : " + file + " : " + signature);
+			map.put(file, state);
 		} catch(IOException e) {
 			logger.error("Caught IOException : " + e.getMessage());
 		}
 	}
 
-	private void removeFileFromWatchMap(File file) {
-		watchMap.remove(file);
+	public void onFileChange(File file) {
+		try {
+			logger.debug("Change detected on file : " + file.getCanonicalPath());
+			addFileToWatchMap(changedWatchMap, file);
+		} catch (IOException e) {
+			logger.error("Caught IOException : " + e.getMessage());
+		}	
 	}
 
-	public void onDirectoryChange(File directory) {
-		// Do nothing
+	public void onFileCreate(File file) {
+		try {
+			logger.debug("Create detected on file : " + file.getCanonicalPath());
+			addFileToWatchMap(changedWatchMap, file);
+		} catch (IOException e) {
+			logger.error("Caught IOException : " + e.getMessage());
+		}
 	}
 
-	public void onDirectoryCreate(File directory) {
-		// Do nothing
+	public void onFileDelete(File file) {
+		try {
+			logger.debug("Delete detected on file : " + file.getCanonicalPath());
+			watchMap.get(file).setDeleted();
+		} catch (IOException e) {
+			logger.error("Caught IOException : " + e.getMessage());
+		}
 	}
 
-	public void onDirectoryDelete(File directory) {
-		// Do nothing
+	private void printWatchMap() {
+		if(logger.isTraceEnabled()) {
+			logger.trace("WatchMap contents : ");
+			for(File file : watchMap.keySet()) {
+				FileState state = watchMap.get(file);
+				logger.trace("\tFile : " + state.getFilePath() + " marked for deletion : " + state.isDeleted());
+			}
+		}
 	}
 
-	public void onFileChange(File file) {
-		logger.debug("Change detected on file : " + file.getAbsolutePath());	
+	private void removeMarkedFilesFromWatchMap() throws IOException {
+		logger.trace("Removing deleted files from watchMap");
+		List<File> markedList = null;
+		for(File file : watchMap.keySet()) {
+			FileState state = watchMap.get(file);
+			if(state.isDeleted()) {
+				if(markedList == null) {
+					markedList = new ArrayList<File>();
+				}
+				markedList.add(file);	
+			}
+		}
+		if(markedList != null) {
+			for(File file : markedList) {
+				FileState state = watchMap.remove(file);
+				state.getRandomAccessFile().close();
+				logger.trace("\tFile : " + state.getFilePath() + " removed");
+			}
+		}
 	}
 
-	public void onFileCreate(File file) {
-		logger.debug("Create detected on file : " + file.getAbsolutePath());
-		addFileToWatchMap(file);
+	public void onDirectoryChange(File directory) {
+		// Do nothing
 	}
 
-	public void onFileDelete(File file) {
-		logger.debug("Delete detected on file : " + file.getAbsolutePath());
-		removeFileFromWatchMap(file);
+	public void onDirectoryCreate(File directory) {
+		// Do nothing
+	}
+
+	public void onDirectoryDelete(File directory) {
+		// Do nothing
 	}
 
 	public void onStart(FileAlterationObserver observer) {
@@ -148,12 +317,4 @@ public class FileWatcher implements FileAlterationListener {
 	public void onStop(FileAlterationObserver observer) {
 		// TODO Auto-generated method stub
 	}
-
-	public void checkFiles() {
-		logger.debug("Checking files");
-		for(FileAlterationObserver observer : observerList) {
-			observer.checkAndNotify();
-		}
-	}
-
 }

+ 71 - 0
src/test/java/info/fetter/logstashforwarder/FileWatcherTest.java

@@ -0,0 +1,71 @@
+package info.fetter.logstashforwarder;
+
+import static org.apache.log4j.Level.*;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.log4j.BasicConfigurator;
+import org.apache.log4j.Logger;
+import org.apache.log4j.spi.RootLogger;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class FileWatcherTest {
+	Logger logger = Logger.getLogger(FileWatcherTest.class);
+
+	@BeforeClass
+	public static void setUpBeforeClass() throws Exception {
+		BasicConfigurator.configure();
+		RootLogger.getRootLogger().setLevel(TRACE);
+	}
+
+	@AfterClass
+	public static void tearDownAfterClass() throws Exception {
+		BasicConfigurator.resetConfiguration();
+	}
+
+	//@Test
+	public void testFileWatch() throws InterruptedException, IOException {
+		FileWatcher watcher = new FileWatcher();
+		watcher.addFilesToWatch("./test.txt");
+		for(int i = 0; i < 100; i++) {
+			Thread.sleep(1000);
+			watcher.checkFiles();
+		}
+	}
+
+	@Test
+	public void testWildcardWatch() throws InterruptedException, IOException {
+		FileWatcher watcher = new FileWatcher();
+		watcher.addFilesToWatch("./test*.txt");
+
+		File file1 = new File("test1.txt");
+		File file2 = new File("test2.txt");
+		File file3 = new File("test3.txt");
+		File file4 = new File("test4.txt");
+		
+		watcher.checkFiles();
+		Thread.sleep(500);
+		FileUtils.write(file1, "line 1\n", true);
+		Thread.sleep(500);
+		watcher.checkFiles();
+		FileUtils.forceDeleteOnExit(file1);
+//		FileUtils.touch(file2);
+//		Thread.sleep(500);
+//		watcher.checkFiles();
+//		FileUtils.touch(file3);
+//		FileUtils.forceDelete(file1);
+//		FileUtils.forceDelete(file2);
+//		Thread.sleep(500);
+//		watcher.checkFiles();
+//		FileUtils.moveFile(file3, file4);
+//		FileUtils.touch(file3);
+//		Thread.sleep(500);
+//		watcher.checkFiles();
+//		FileUtils.forceDelete(file3);
+//		FileUtils.forceDelete(file4);
+	}
+}