Ver Fonte

Implemented FileReader.

didfet há 10 anos atrás
pai
commit
d6d4897acb

+ 13 - 0
src/main/java/info/fetter/logstashforwarder/Event.java

@@ -23,6 +23,15 @@ import java.util.Map;
 public class Event {
 	private Map<String,byte[]> keyValues = new HashMap<String,byte[]>(10);
 	
+	public Event() {
+	}
+	
+	public Event(Event event) {
+		if(event != null) {
+			keyValues.putAll(event.keyValues);
+		}
+	}
+	
 	public void addField(String key, byte[] value) {
 		keyValues.put(key, value);
 	}
@@ -31,6 +40,10 @@ public class Event {
 		keyValues.put(key, value.getBytes());
 	}
 	
+	public void addField(String key, long value) {
+		keyValues.put(key, String.valueOf(value).getBytes());
+	}
+	
 	public Map<String,byte[]> getKeyValues() {
 		return keyValues;
 	}

+ 81 - 10
src/main/java/info/fetter/logstashforwarder/FileReader.java

@@ -1,7 +1,14 @@
 package info.fetter.logstashforwarder;
 
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 /*
  * Copyright 2015 Didier Fetter
@@ -25,24 +32,88 @@ public class FileReader {
 	private ProtocolAdapter adapter;
 	private int spoolSize = DEFAULT_SPOOL_SIZE;
 	private List<Event> eventList;
-	
+	private Map<File,Long> pointerMap;
+	private String hostname;
+	{
+		try {
+			hostname = InetAddress.getLocalHost().getHostName();
+		} catch (UnknownHostException e) {
+			throw new RuntimeException(e);
+		}
+	}
+
 	public FileReader(int spoolSize) {
 		this.spoolSize = spoolSize;
 		eventList = new ArrayList<Event>(spoolSize);
 	}
-	
-	public int readFiles(List<FileState> fileList) {
-		// TODO: Read files and send events until there's nothing left to read or spool size reached
-		int eventCounter = 0;
+
+	public int readFiles(List<FileState> fileList) throws IOException {
+		int eventCount = 0;
+		pointerMap = new HashMap<File,Long>(fileList.size(),1);
 		for(FileState state : fileList) {
-			eventCounter += readFile(state, spoolSize - eventCounter);
+			eventCount += readFile(state, spoolSize - eventCount);
+		}
+		adapter.sendEvents(eventList);
+		for(FileState state : fileList) {
+			state.setPointer(pointerMap.get(state.getFile()));
+		}
+		eventList.clear();
+		return eventCount; // Return number of events sent to adapter
+	}
+
+	private int readFile(FileState state, int spaceLeftInSpool) throws IOException {
+		int eventListSizeBefore = eventList.size();
+		File file = state.getFile();
+		long pointer = state.getPointer();
+		RandomAccessFile reader = state.getRandomAccessFile();
+		reader.seek(pointer);
+		pointer = readLines(state, spaceLeftInSpool);
+		pointerMap.put(file, pointer);
+		return eventList.size() - eventListSizeBefore; // Return number of events read
+	}
+
+	private long readLines(FileState state, int spaceLeftInSpool) throws IOException {
+		RandomAccessFile reader = state.getRandomAccessFile();
+		long pos = reader.getFilePointer();
+		String line = readLine(reader);
+		while (line != null && spaceLeftInSpool > 0) {
+			pos = reader.getFilePointer();
+			addEvent(state, pos, line);
+			line = readLine(reader);
+		}
+		reader.seek(pos); // Ensure we can re-read if necessary
+		return pos;
+	}
+
+	private String readLine(RandomAccessFile reader) throws IOException {
+		StringBuffer sb  = new StringBuffer();
+		int ch;
+		boolean seenCR = false;
+		while((ch=reader.read()) != -1) {
+			switch(ch) {
+			case '\n':
+				return sb.toString();
+			case '\r':
+				seenCR = true;
+				break;
+			default:
+				if (seenCR) {
+					sb.append('\r');
+					seenCR = false;
+				}
+				sb.append((char)ch); // add character, not its ascii value
+			}
 		}
-		return 0; // Return number of events sent to adapter
+		return null;
 	}
 
-	private int readFile(FileState state, int spaceLeftInSpool) {
-		// TODO Read file
-		return 0; // Return number of events read
+	private void addEvent(FileState state, long pos, String line) throws IOException {
+		Event event = new Event(state.getFields());
+		event.addField("file", state.getFile().getCanonicalPath());
+		event.addField("offset", pos);
+		event.addField("line", line);
+		event.addField("host", hostname);
+		eventList.add(event);
 	}
 
 	public ProtocolAdapter getAdapter() {

+ 9 - 0
src/main/java/info/fetter/logstashforwarder/FileState.java

@@ -34,6 +34,7 @@ public class FileState {
 	private long pointer = 0;
 	private FileState oldFileState;
 	private String fileName;
+	private Event fields;
 	
 	public FileState(File file) throws IOException {
 		this.file = file;
@@ -115,5 +116,13 @@ public class FileState {
 	public String getFileName() {
 		return fileName;
 	}
+
+	public Event getFields() {
+		return fields;
+	}
+
+	public void setFields(Event fields) {
+		this.fields = fields;
+	}
 	
 }