Selaa lähdekoodia

Added sendEvents method.

didfet 10 vuotta sitten
vanhempi
commit
b54e3d9f25

+ 20 - 0
src/main/java/info/fetter/logstashforwarder/Event.java

@@ -0,0 +1,20 @@
+package info.fetter.logstashforwarder;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class Event {
+	private Map<String,byte[]> keyValues = new HashMap<String,byte[]>(10);
+	
+	public void addField(String key, byte[] value) {
+		keyValues.put(key, value);
+	}
+	
+	public void addField(String key, String value) {
+		keyValues.put(key, value.getBytes());
+	}
+	
+	public Map<String,byte[]> getKeyValues() {
+		return keyValues;
+	}
+}

+ 55 - 38
src/main/java/info/fetter/logstashforwarder/protocol/LumberjackClient.java

@@ -1,17 +1,16 @@
 package info.fetter.logstashforwarder.protocol;
 
+import info.fetter.logstashforwarder.Event;
+
 import java.io.BufferedOutputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.FileInputStream;
-import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.security.KeyManagementException;
+import java.net.ProtocolException;
 import java.security.KeyStore;
-import java.security.KeyStoreException;
-import java.security.NoSuchAlgorithmException;
-import java.security.cert.CertificateException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.zip.Deflater;
@@ -27,7 +26,7 @@ public class LumberjackClient {
 	private final static byte FRAME_WINDOW_SIZE = 0x57;
 	private final static byte FRAME_DATA = 0x44;
 	private final static byte FRAME_COMPRESSED = 0x43;
-	
+
 	private SSLSocket socket;
 	private KeyStore keyStore;
 	private String server;
@@ -35,29 +34,35 @@ public class LumberjackClient {
 	private DataOutputStream output;
 	private DataInputStream input;
 	private int sequence = 1;
-	
-	public LumberjackClient(String keyStorePath, String server, int port) throws NoSuchAlgorithmException, CertificateException, FileNotFoundException, IOException, KeyStoreException, KeyManagementException {
+
+	public LumberjackClient(String keyStorePath, String server, int port) throws IOException {
 		this.server = server;
 		this.port = port;
-		
-		keyStore = KeyStore.getInstance("JKS");
-		keyStore.load(new FileInputStream(keyStorePath), null);
-		
-		TrustManagerFactory tmf = TrustManagerFactory.getInstance("PKIX");
-		tmf.init(keyStore);
-
-		SSLContext context = SSLContext.getInstance("TLS");
-		context.init(null, tmf.getTrustManagers(), null);
-		
-		SocketFactory socketFactory = context.getSocketFactory();
-		socket = (SSLSocket)socketFactory.createSocket(this.server, this.port);
-		socket.setUseClientMode(true);
-		socket.startHandshake();
-		
-		output = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream()));
-		input = new DataInputStream(socket.getInputStream());
+
+		try {
+			keyStore = KeyStore.getInstance("JKS");
+			keyStore.load(new FileInputStream(keyStorePath), null);
+
+			TrustManagerFactory tmf = TrustManagerFactory.getInstance("PKIX");
+			tmf.init(keyStore);
+
+			SSLContext context = SSLContext.getInstance("TLS");
+			context.init(null, tmf.getTrustManagers(), null);
+
+			SocketFactory socketFactory = context.getSocketFactory();
+			socket = (SSLSocket)socketFactory.createSocket(this.server, this.port);
+			socket.setUseClientMode(true);
+			socket.startHandshake();
+
+			output = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream()));
+			input = new DataInputStream(socket.getInputStream());
+		} catch(IOException e) {
+			throw e;
+		} catch(Exception e) {
+			throw new RuntimeException(e);
+		}
 	}
-	
+
 	public int sendWindowSizeFrame(int size) throws IOException {
 		output.writeByte(PROTOCOL_VERSION);
 		output.writeByte(FRAME_WINDOW_SIZE);
@@ -65,7 +70,7 @@ public class LumberjackClient {
 		output.flush();
 		return 6;
 	}
-	
+
 	private int sendDataFrame(DataOutputStream output, Map<String,byte[]> keyValues) throws IOException {
 		output.writeByte(PROTOCOL_VERSION);
 		output.writeByte(FRAME_DATA);
@@ -87,15 +92,15 @@ public class LumberjackClient {
 		output.flush();
 		return bytesSent;
 	}
-	
+
 	public int sendDataFrameInSocket(Map<String,byte[]> keyValues) throws IOException {
 		return sendDataFrame(output, keyValues);
 	}
-	
+
 	public int sendCompressedFrame(List<Map<String,byte[]>> keyValuesList) throws IOException {
 		output.writeByte(PROTOCOL_VERSION);
 		output.writeByte(FRAME_COMPRESSED);
-		
+
 		ByteArrayOutputStream uncompressedBytes = new ByteArrayOutputStream();
 		DataOutputStream uncompressedOutput = new DataOutputStream(uncompressedBytes);
 		for(Map<String,byte[]> keyValues : keyValuesList) {
@@ -105,7 +110,7 @@ public class LumberjackClient {
 		Deflater compressor = new Deflater();
 		compressor.setInput(uncompressedBytes.toByteArray());
 		compressor.finish();
-		
+
 		ByteArrayOutputStream compressedBytes = new ByteArrayOutputStream();
 		byte[] buffer = new byte[1024];
 		while(!compressor.finished()) {
@@ -113,26 +118,38 @@ public class LumberjackClient {
 			compressedBytes.write(buffer, 0, count);
 		}
 		compressedBytes.close();
-		
+
 		output.writeInt(compressor.getTotalOut());
 		output.write(compressedBytes.toByteArray());
-		
+
 		return 6 + compressor.getTotalOut();
 	}
-	
-	public int readAckFrame() throws IOException {
+
+	public int readAckFrame() throws ProtocolException, IOException {
 		byte protocolVersion = input.readByte();
 		if(protocolVersion != PROTOCOL_VERSION) {
-			throw new IOException("Protocol version should be 1, received " + protocolVersion);
+			throw new ProtocolException("Protocol version should be 1, received " + protocolVersion);
 		}
 		byte frameType = input.readByte();
 		if(frameType != FRAME_ACK) {
-			throw new IOException("Frame type should be Ack, received " + frameType);
+			throw new ProtocolException("Frame type should be Ack, received " + frameType);
 		}
 		int sequenceNumber = input.readInt();
 		return sequenceNumber;
 	}
-	
+
+	public int sendEvents(List<Event> eventList) throws IOException {
+		int beginSequence = sequence;
+		int numberOfEvents = eventList.size();
+		List<Map<String,byte[]>> keyValuesList = new ArrayList<Map<String,byte[]>>(numberOfEvents);
+		for(Event event : eventList) {
+			keyValuesList.add(event.getKeyValues());
+		}
+		sendCompressedFrame(keyValuesList);
+		while(readAckFrame() < sequence) {}
+		return sequence - beginSequence;
+	}
+
 	public void close() throws IOException {
 		socket.close();
 	}