Bladeren bron

Added debug information.

didfet 10 jaren geleden
bovenliggende
commit
de012bb84a
1 gewijzigde bestanden met toevoegingen van 28 en 5 verwijderingen
  1. 28 5
      src/main/java/info/fetter/logstashforwarder/protocol/LumberjackClient.java

+ 28 - 5
src/main/java/info/fetter/logstashforwarder/protocol/LumberjackClient.java

@@ -37,7 +37,11 @@ import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLSocket;
 import javax.net.ssl.TrustManagerFactory;
 
+import org.apache.commons.io.HexDump;
+import org.apache.log4j.Logger;
+
 public class LumberjackClient {
+	private final static Logger logger = Logger.getLogger(LumberjackClient.class);
 	private final static byte PROTOCOL_VERSION = 0x31;
 	private final static byte FRAME_ACK = 0x41;
 	private final static byte FRAME_WINDOW_SIZE = 0x57;
@@ -73,6 +77,8 @@ public class LumberjackClient {
 
 			output = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream()));
 			input = new DataInputStream(socket.getInputStream());
+			
+			logger.info("Connected to " + server + ":" + port);
 		} catch(IOException e) {
 			throw e;
 		} catch(Exception e) {
@@ -85,6 +91,7 @@ public class LumberjackClient {
 		output.writeByte(FRAME_WINDOW_SIZE);
 		output.writeInt(size);
 		output.flush();
+		logger.debug("Sending window size frame : " + size + " frames");
 		return 6;
 	}
 
@@ -121,11 +128,17 @@ public class LumberjackClient {
 		ByteArrayOutputStream uncompressedBytes = new ByteArrayOutputStream();
 		DataOutputStream uncompressedOutput = new DataOutputStream(uncompressedBytes);
 		for(Map<String,byte[]> keyValues : keyValuesList) {
+			logger.debug("Adding data frame");
 			sendDataFrame(uncompressedOutput, keyValues);
 		}
 		uncompressedOutput.close();
 		Deflater compressor = new Deflater();
-		compressor.setInput(uncompressedBytes.toByteArray());
+		byte[] uncompressedData = uncompressedBytes.toByteArray();
+		logger.debug("Deflating data : " + uncompressedData.length + " bytes");
+		if(logger.isTraceEnabled()) {
+			HexDump.dump(uncompressedData, 0, System.out, 0);
+		}
+		compressor.setInput(uncompressedData);
 		compressor.finish();
 
 		ByteArrayOutputStream compressedBytes = new ByteArrayOutputStream();
@@ -135,10 +148,17 @@ public class LumberjackClient {
 			compressedBytes.write(buffer, 0, count);
 		}
 		compressedBytes.close();
-
+		byte[] compressedData = compressedBytes.toByteArray();
+		logger.debug("Deflated data : " + compressor.getTotalOut() + " bytes");
+		if(logger.isTraceEnabled()) {
+			HexDump.dump(compressedData, 0, System.out, 0);
+		}
+		
 		output.writeInt(compressor.getTotalOut());
-		output.write(compressedBytes.toByteArray());
-
+		output.write(compressedData);
+		output.flush();
+		
+		logger.debug("Sending compressed frame : " + keyValuesList.size() + " frames");
 		return 6 + compressor.getTotalOut();
 	}
 
@@ -152,23 +172,26 @@ public class LumberjackClient {
 			throw new ProtocolException("Frame type should be Ack, received " + frameType);
 		}
 		int sequenceNumber = input.readInt();
+		logger.debug("Received ack sequence : " + sequenceNumber);
 		return sequenceNumber;
 	}
 
 	public int sendEvents(List<Event> eventList) throws IOException {
 		int beginSequence = sequence;
 		int numberOfEvents = eventList.size();
+		logger.info("Sending " + numberOfEvents + " events");
 		sendWindowSizeFrame(numberOfEvents);
 		List<Map<String,byte[]>> keyValuesList = new ArrayList<Map<String,byte[]>>(numberOfEvents);
 		for(Event event : eventList) {
 			keyValuesList.add(event.getKeyValues());
 		}
 		sendCompressedFrame(keyValuesList);
-		while(readAckFrame() < sequence) {}
+		while(readAckFrame() < (sequence - 1) ) {}
 		return sequence - beginSequence;
 	}
 
 	public void close() throws IOException {
 		socket.close();
+		logger.info("Connection to " + server + ":" + port + " closed");
 	}
 }