浏览代码

Added multiline support for files.

toliman7 9 年之前
父节点
当前提交
bd0c146a89

+ 5 - 3
src/main/java/info/fetter/logstashforwarder/FileModificationListener.java

@@ -25,10 +25,12 @@ import org.apache.commons.io.monitor.FileAlterationObserver;
 public class FileModificationListener implements FileAlterationListener {
 	private Event fields;
 	private FileWatcher watcher;
+        private Multiline multiline;
 	
-	public FileModificationListener(FileWatcher watcher, Event fields) {
+	public FileModificationListener(FileWatcher watcher, Event fields, Multiline multiline) {
 		this.watcher = watcher;
 		this.fields = fields;
+                this.multiline = multiline;
 	}
 
 	public void onDirectoryChange(File file) {
@@ -44,11 +46,11 @@ public class FileModificationListener implements FileAlterationListener {
 	}
 
 	public void onFileChange(File file) {
-		watcher.onFileChange(file, fields);
+		watcher.onFileChange(file, fields, multiline);
 	}
 
 	public void onFileCreate(File file) {
-		watcher.onFileCreate(file, fields);
+		watcher.onFileCreate(file, fields, multiline);
 	}
 
 	public void onFileDelete(File file) {

+ 45 - 1
src/main/java/info/fetter/logstashforwarder/FileReader.java

@@ -26,6 +26,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
+import org.apache.commons.lang.ArrayUtils;
 
 import org.apache.log4j.Logger;
 
@@ -123,19 +124,62 @@ public class FileReader extends Reader {
 	private long readLines(FileState state, int spaceLeftInSpool) {
 		RandomAccessFile reader = state.getRandomAccessFile();
 		long pos = state.getPointer();
+                Multiline multiline = state.getMultiline();
 		try {
 			reader.seek(pos);
 			byte[] line = readLine(reader);
+                        byte[] bufferedLines = null;
 			while (line != null && spaceLeftInSpool > 0) {
 				if(logger.isTraceEnabled()) {
 					logger.trace("-- Read line : " + new String(line));
 					logger.trace("-- Space left in spool : " + spaceLeftInSpool);
 				}
 				pos = reader.getFilePointer();
-				addEvent(state, pos, line);
+                                if (multiline == null) {
+                                  addEvent(state, pos, line);
+                                }
+                                else {
+                                  if (logger.isTraceEnabled()) {
+                                    logger.trace("-- Multiline : " + multiline);
+                                    logger.trace("-- Multiline : matches " + multiline.isPatternFound(line));
+                                  }
+                                  if (multiline.isPatternFound(line))
+                                  {                                    
+                                    // buffer the line
+                                    if (bufferedLines != null)
+                                    {
+                                      bufferedLines = ArrayUtils.addAll(bufferedLines, line);
+                                    }
+                                    else
+                                    {
+                                      bufferedLines = line;
+                                    }
+                                  }
+                                  else {                                    
+                                    if (multiline.isPrevious()) {
+                                      // did not match, so new event started
+                                      if (bufferedLines != null) {
+                                        addEvent(state, pos, bufferedLines);                                        
+                                      }
+                                      bufferedLines = line;
+                                    }
+                                    else {
+                                      // did not match, add the current line
+                                      if (bufferedLines != null) {
+                                        addEvent(state, pos, ArrayUtils.addAll(bufferedLines, line));                                      
+                                        bufferedLines = null;
+                                      }
+                                      else
+                                        addEvent(state, pos, line);
+                                    }
+                                  }
+                                }
 				line = readLine(reader);
 				spaceLeftInSpool--;
 			}
+                        if (bufferedLines != null) {
+                          addEvent(state, pos, bufferedLines); // send any buffered lines left
+                        }
 			reader.seek(pos); // Ensure we can re-read if necessary
 		} catch(IOException e) {
 			logger.warn("Exception raised while reading file : " + state.getFile(), e);

+ 21 - 0
src/main/java/info/fetter/logstashforwarder/FileState.java

@@ -25,6 +25,7 @@ import java.io.RandomAccessFile;
 import org.apache.commons.lang.builder.ToStringBuilder;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
+import java.util.Map;
 
 public class FileState {
 	@JsonIgnore
@@ -48,6 +49,10 @@ public class FileState {
 	private FileState oldFileState;
 	@JsonIgnore
 	private Event fields;
+        @JsonIgnore
+        private Multiline multiline;
+        @JsonIgnore
+        private byte[] bufferedLines = null;
 
 	public FileState() {
 	}
@@ -172,6 +177,22 @@ public class FileState {
 	public void setFields(Event fields) {
 		this.fields = fields;
 	}
+        
+        public Multiline getMultiline() {
+               return multiline;
+        }
+        
+        public void setMultiline(Multiline multiline) {
+               this.multiline = multiline;
+        }
+        
+        public byte[] getBufferedLines() {
+               return bufferedLines;
+        }
+               
+        public void setBufferedLines(byte[] bufferedLines) {
+               this.bufferedLines = bufferedLines;
+        }
 
 	@Override
 	public String toString() {

+ 16 - 15
src/main/java/info/fetter/logstashforwarder/FileWatcher.java

@@ -76,14 +76,14 @@ public class FileWatcher {
 		printWatchMap();
 	}
 
-	public void addFilesToWatch(String fileToWatch, Event fields, int deadTime) {
+	public void addFilesToWatch(String fileToWatch, Event fields, int deadTime, Multiline multiline) {
 		try {
 			if(fileToWatch.equals("-")) {
 				addStdIn(fields);
 			} else if(fileToWatch.contains("*")) {
-				addWildCardFiles(fileToWatch, fields, deadTime);
+				addWildCardFiles(fileToWatch, fields, deadTime, multiline);
 			} else {
-				addSingleFile(fileToWatch, fields, deadTime);
+				addSingleFile(fileToWatch, fields, deadTime, multiline);
 			}
 		} catch(Exception e) {
 			throw new RuntimeException(e);
@@ -219,7 +219,7 @@ public class FileWatcher {
 		removeMarkedFilesFromWatchMap();
 	}
 
-	private void addSingleFile(String fileToWatch, Event fields, int deadTime) throws Exception {
+	private void addSingleFile(String fileToWatch, Event fields, int deadTime, Multiline multiline) throws Exception {
 		logger.info("Watching file : " + new File(fileToWatch).getCanonicalPath());
 		String directory = FilenameUtils.getFullPath(fileToWatch);
 		String fileName = FilenameUtils.getName(fileToWatch); 
@@ -227,10 +227,10 @@ public class FileWatcher {
 				FileFilterUtils.fileFileFilter(),
 				FileFilterUtils.nameFileFilter(fileName),
 				new LastModifiedFileFilter(deadTime));
-		initializeWatchMap(new File(directory), fileFilter, fields);
+		initializeWatchMap(new File(directory), fileFilter, fields, multiline);
 	}
 
-	private void addWildCardFiles(String filesToWatch, Event fields, int deadTime) throws Exception {
+	private void addWildCardFiles(String filesToWatch, Event fields, int deadTime, Multiline multiline) throws Exception {
 		logger.info("Watching wildcard files : " + filesToWatch);
 		String directory = FilenameUtils.getFullPath(filesToWatch);
 		String wildcard = FilenameUtils.getName(filesToWatch);
@@ -239,7 +239,7 @@ public class FileWatcher {
 				FileFilterUtils.fileFileFilter(),
 				new WildcardFileFilter(wildcard),
 				new LastModifiedFileFilter(deadTime));
-		initializeWatchMap(new File(directory), fileFilter, fields);
+		initializeWatchMap(new File(directory), fileFilter, fields, multiline);
 	}
 
 	private void addStdIn(Event fields) {
@@ -248,22 +248,22 @@ public class FileWatcher {
 		stdinConfigured = true;
 	}
 
-	private void initializeWatchMap(File directory, IOFileFilter fileFilter, Event fields) throws Exception {
+	private void initializeWatchMap(File directory, IOFileFilter fileFilter, Event fields, Multiline multiline) throws Exception {
 		if(!directory.isDirectory()) {
 			logger.warn("Directory " + directory + " does not exist");
 			return;
 		}
 		FileAlterationObserver observer = new FileAlterationObserver(directory, fileFilter);
-		FileModificationListener listener = new FileModificationListener(this, fields);
+		FileModificationListener listener = new FileModificationListener(this, fields, multiline);
 		observer.addListener(listener);
 		observerList.add(observer);
 		observer.initialize();
 		for(File file : FileUtils.listFiles(directory, fileFilter, null)) {
-			addFileToWatchMap(newWatchMap, file, fields);
+			addFileToWatchMap(newWatchMap, file, fields, multiline);
 		}
 	}
 
-	private void addFileToWatchMap(Map<File,FileState> map, File file, Event fields) {
+	private void addFileToWatchMap(Map<File,FileState> map, File file, Event fields, Multiline multiline) {
 		try {
 			FileState state = new FileState(file);
 			state.setFields(fields);
@@ -272,25 +272,26 @@ public class FileWatcher {
 			long signature = FileSigner.computeSignature(state.getRandomAccessFile(), signatureLength);
 			state.setSignature(signature);
 			logger.trace("Setting signature of size : " + signatureLength + " on file : " + file + " : " + signature);
+                        state.setMultiline(multiline);
 			map.put(file, state);
 		} catch(IOException e) {
 			logger.error("Caught IOException : " + e.getMessage());
 		}
 	}
 
-	public void onFileChange(File file, Event fields) {
+	public void onFileChange(File file, Event fields, Multiline multiline) {
 		try {
 			logger.debug("Change detected on file : " + file.getCanonicalPath());
-			addFileToWatchMap(newWatchMap, file, fields);
+			addFileToWatchMap(newWatchMap, file, fields, multiline);
 		} catch (IOException e) {
 			logger.error("Caught IOException : " + e.getMessage());
 		}	
 	}
 
-	public void onFileCreate(File file, Event fields) {
+	public void onFileCreate(File file, Event fields, Multiline multiline) {
 		try {
 			logger.debug("Create detected on file : " + file.getCanonicalPath());
-			addFileToWatchMap(newWatchMap, file, fields);
+			addFileToWatchMap(newWatchMap, file, fields, multiline);
 		} catch (IOException e) {
 			logger.error("Caught IOException : " + e.getMessage());
 		}

+ 1 - 1
src/main/java/info/fetter/logstashforwarder/Forwarder.java

@@ -80,7 +80,7 @@ public class Forwarder {
 			configManager.readConfiguration();
 			for(FilesSection files : configManager.getConfig().getFiles()) {
 				for(String path : files.getPaths()) {
-					watcher.addFilesToWatch(path, new Event(files.getFields()), files.getDeadTimeInSeconds() * 1000);
+					watcher.addFilesToWatch(path, new Event(files.getFields()), files.getDeadTimeInSeconds() * 1000, files.getMultiline());
 				}
 			}
 			watcher.initialize();

+ 89 - 0
src/main/java/info/fetter/logstashforwarder/Multiline.java

@@ -0,0 +1,89 @@
+package info.fetter.logstashforwarder;
+
+/*
+ * Copyright 2015 Didier Fetter
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+import java.io.UnsupportedEncodingException;
+import java.util.Map;
+import java.util.regex.Pattern;
+import org.apache.commons.lang.builder.ToStringBuilder;
+
+public class Multiline {
+        public enum WhatType { Previous, Next };
+  
+        private Pattern pattern = null;
+        private boolean negate = false;
+        private WhatType what = WhatType.Previous;
+	
+	public Multiline() {
+	}
+	
+	public Multiline(Multiline event) {
+		if(event != null) {
+			this.negate = event.negate;
+                        this.pattern = event.pattern;
+                        this.what = event.what;
+		}
+	}
+	
+	public Multiline(Map<String,String> fields) throws UnsupportedEncodingException {
+                String strPattern = "";
+		for(String key : fields.keySet()) {
+			if ("pattern".equals(key))
+                            strPattern = fields.get(key);
+                        else if ("negate".equals(key))
+                            negate = Boolean.parseBoolean(fields.get(key));
+                        else if ("what".equals(key))
+                            what = WhatType.valueOf(fields.get(key));
+                        else
+                            throw new UnsupportedEncodingException(key + " not supported");
+		}
+                pattern = Pattern.compile(strPattern);
+                
+	}
+	
+        public Pattern getPattern() {
+          return pattern;
+        }
+
+        public boolean isNegate() {
+          return negate;
+        }
+
+        public WhatType getWhat() {
+          return what;
+        }
+        
+        public boolean isPrevious() {
+          return what == WhatType.Previous;
+        }
+        
+        public boolean isPatternFound (byte[] line) {          
+          boolean result = pattern.matcher(new String(line)).find();
+          if (negate) return !result; 
+          return result;
+        }
+        
+        @Override
+	public String toString() {
+		return new ToStringBuilder(this).
+				append("pattern", pattern).
+				append("negate", negate).
+				append("what", what).
+                        toString();
+        }
+}

+ 13 - 0
src/main/java/info/fetter/logstashforwarder/config/FilesSection.java

@@ -19,16 +19,20 @@ package info.fetter.logstashforwarder.config;
 
 import java.util.List;
 import java.util.Map;
+import java.util.regex.Pattern;
 
 import org.apache.commons.lang.builder.ToStringBuilder;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
+import info.fetter.logstashforwarder.Multiline;
+import java.io.UnsupportedEncodingException;
 
 public class FilesSection {
 	private List<String> paths;
 	private Map<String,String> fields;
 	@JsonProperty("dead time")
 	private String deadTime = "24h";
+	private Multiline multiline;
 
 	public List<String> getPaths() {
 		return paths;
@@ -78,13 +82,22 @@ public class FilesSection {
 	public void setDeadTime(String deadTime) {
 		this.deadTime = deadTime;
 	}
+	
+	public Multiline getMultiline() {
+		return multiline;
+	}
 
+	public void setMultiline(Map<String, String> multilineMap) throws UnsupportedEncodingException {
+		this.multiline = new Multiline(multilineMap);
+	}
+	
 	@Override
 	public String toString() {
 		return new ToStringBuilder(this).
 				append("paths", paths).
 				append("fields", fields).
 				append("dead time", deadTime).
+				append("multiline", multiline).
 				toString();
 	}
 }

+ 31 - 0
src/test/java/info/fetter/logstashforwarder/FileReaderTest.java

@@ -23,7 +23,9 @@ import info.fetter.logstashforwarder.util.AdapterException;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.log4j.BasicConfigurator;
@@ -54,12 +56,41 @@ public class FileReaderTest {
 		List<FileState> fileList = new ArrayList<FileState>(1);
 		File file1 = new File("testFileReader1.txt");
 		FileUtils.write(file1, "testFileReader1 line1\n");
+                FileUtils.write(file1, " nl line12\n", true);
 		FileUtils.write(file1, "testFileReader1 line2\n", true);
 		FileUtils.write(file1, "testFileReader1 line3\n", true);
 		Thread.sleep(500);
 		FileState state = new FileState(file1);
 		fileList.add(state);
 		state.setFields(new Event().addField("testFileReader1", "testFileReader1"));
+                Map<String, String> m = new HashMap<String, String>();
+                m.put("pattern", " nl");
+                m.put("negate", "false");
+                state.setMultiline(new Multiline(m));
+		reader.readFiles(fileList);
+		reader.readFiles(fileList);
+		reader.readFiles(fileList);
+		//FileUtils.forceDelete(file1);
+	}
+        
+        @Test
+	public void testFileReader2() throws IOException, InterruptedException, AdapterException {
+		FileReader reader = new FileReader(2);
+		reader.setAdapter(new MockProtocolAdapter());
+		List<FileState> fileList = new ArrayList<FileState>(1);
+		File file1 = new File("testFileReader1.txt");
+		FileUtils.write(file1, "testFileReader1 line1\n");
+                FileUtils.write(file1, " nl line12\n", true);
+		FileUtils.write(file1, "testFileReader1 line2\n", true);
+		FileUtils.write(file1, "testFileReader1 line3\n", true);
+		Thread.sleep(500);
+		FileState state = new FileState(file1);
+		fileList.add(state);
+		state.setFields(new Event().addField("testFileReader1", "testFileReader1"));
+                Map<String, String> m = new HashMap<String, String>();
+                m.put("pattern", "testFileReader1");
+                m.put("negate", "true");
+                state.setMultiline(new Multiline(m));
 		reader.readFiles(fileList);
 		reader.readFiles(fileList);
 		reader.readFiles(fileList);

+ 68 - 3
src/test/java/info/fetter/logstashforwarder/FileWatcherTest.java

@@ -21,6 +21,8 @@ import static org.apache.log4j.Level.*;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.log4j.BasicConfigurator;
@@ -47,21 +49,32 @@ public class FileWatcherTest {
 	//@Test
 	public void testFileWatch() throws InterruptedException, IOException {
 		FileWatcher watcher = new FileWatcher();
-		watcher.addFilesToWatch("./test.txt", new Event().addField("test", "test"), FileWatcher.ONE_DAY);
+		watcher.addFilesToWatch("./test.txt", new Event().addField("test", "test"), FileWatcher.ONE_DAY, null);
+		for(int i = 0; i < 100; i++) {
+			Thread.sleep(1000);
+			watcher.checkFiles();
+		}
+	}
+        
+        //@Test
+        public void testFileWatchWithMultilines() throws InterruptedException, IOException {
+		FileWatcher watcher = new FileWatcher();
+                Multiline multiline = new Multiline();
+		watcher.addFilesToWatch("./test.txt", new Event().addField("test", "test"), FileWatcher.ONE_DAY, multiline);
 		for(int i = 0; i < 100; i++) {
 			Thread.sleep(1000);
 			watcher.checkFiles();
 		}
 	}
 	
-	@Test
+	//@Test
 	public void testWildcardWatch() throws InterruptedException, IOException {
 		if(System.getProperty("os.name").toLowerCase().contains("win")) {
 			logger.warn("Not executing this test on windows");
 			return;
 		}
 		FileWatcher watcher = new FileWatcher();
-		watcher.addFilesToWatch("./testFileWatcher*.txt", new Event().addField("test", "test"), FileWatcher.ONE_DAY);
+		watcher.addFilesToWatch("./testFileWatcher*.txt", new Event().addField("test", "test"), FileWatcher.ONE_DAY, null);
 		watcher.initialize();
 
 		File file1 = new File("testFileWatcher1.txt");
@@ -97,6 +110,58 @@ public class FileWatcherTest {
 		
 		
 
+	}
+        
+        @Test
+	public void testWildcardWatchMultiline() throws InterruptedException, IOException {
+		if(System.getProperty("os.name").toLowerCase().contains("win")) {
+			logger.warn("Not executing this test on windows");
+			return;
+		}
+		FileWatcher watcher = new FileWatcher();
+                Map<String, String> m = new HashMap<String, String>();
+                m.put("pattern", " nl");
+                m.put("negate", "false");
+                Multiline multiline = new Multiline(m);
+		watcher.addFilesToWatch("./testFileWatcher*.txt", new Event().addField("test", "test"), FileWatcher.ONE_DAY, multiline);
+		watcher.initialize();
+
+		File file1 = new File("testFileWatcher1.txt");
+		File file2 = new File("testFileWatcher2.txt");
+		//File file3 = new File("test3.txt");
+		//File file4 = new File("test4.txt");
+		
+		//File testDir = new File("testFileWatcher");
+		//FileUtils.forceMkdir(new File("test"));
+		
+		watcher.checkFiles();
+		Thread.sleep(100);
+		FileUtils.write(file1, "file 1 line 1\n nl line 1-2", true);
+		Thread.sleep(100);
+		watcher.checkFiles();
+		FileUtils.write(file1, "file 1 line 2\n", true);
+                Thread.sleep(100);
+		watcher.checkFiles();
+		FileUtils.write(file1, " nl line 3\n", true);
+		//FileUtils.write(file2, "file 2 line 1\n", true);
+		Thread.sleep(1000);
+		watcher.checkFiles();
+//		FileUtils.moveFileToDirectory(file1, testDir, true);
+//		FileUtils.write(file2, "file 2 line 2\n", true);
+		FileUtils.moveFile(file1, file2);
+//		FileUtils.write(file2, "file 3 line 1\n", true);
+//
+		Thread.sleep(1000);
+		watcher.checkFiles();
+//		
+//		
+		watcher.close();
+		FileUtils.deleteQuietly(file1);
+		FileUtils.deleteQuietly(file2);
+//		FileUtils.forceDelete(testDir);
+		
+		
+
 	}
 	
 	@Test

+ 3 - 0
src/test/java/info/fetter/logstashforwarder/config/ConfigurationManagerTest.java

@@ -57,6 +57,9 @@ public class ConfigurationManagerTest {
 			for(String path : files.getPaths()) {
 				logger.debug(" - Path : " + path);
 			}
+                        logger.debug(" - Multiline : " + files.getMultiline());
+                        //files.getMultiline()
+                                
 			logger.debug(" - Dead time : " + files.getDeadTimeInSeconds());
 			if(files.getDeadTime().equals("24h")) {
 				assertEquals(86400, files.getDeadTimeInSeconds());

+ 1 - 0
src/test/resources/config1.json

@@ -53,6 +53,7 @@
         "/var/log/apache/error-*.log"
       ],
       "fields": { "type": "error" },
+      "multiline": { "pattern": "^[0-9]{4}", "negate": "true" },
       "dead time": "8h32m50s" 
     }
   ]