FileReader.java 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. package info.fetter.logstashforwarder;
  2. /*
  3. * Copyright 2015 Didier Fetter
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. import info.fetter.logstashforwarder.util.AdapterException;
  19. import java.io.File;
  20. import java.io.IOException;
  21. import java.io.RandomAccessFile;
  22. import java.util.Arrays;
  23. import java.util.Collection;
  24. import java.util.HashMap;
  25. import java.util.Map;
  26. import org.apache.log4j.Logger;
  27. public class FileReader extends Reader {
  28. private static Logger logger = Logger.getLogger(FileReader.class);
  29. private static final byte[] ZIP_MAGIC = new byte[] {(byte) 0x50, (byte) 0x4b, (byte) 0x03, (byte) 0x04};
  30. private static final byte[] LZW_MAGIC = new byte[] {(byte) 0x1f, (byte) 0x9d};
  31. private static final byte[] LZH_MAGIC = new byte[] {(byte) 0x1f, (byte) 0xa0};
  32. private static final byte[] GZ_MAGIC = new byte[] {(byte) 0x1f, (byte) 0x8b, (byte) 0x08};
  33. private static final byte[][] MAGICS = new byte[][] {ZIP_MAGIC, LZW_MAGIC, LZH_MAGIC, GZ_MAGIC};
  34. private Map<File,Long> pointerMap;
  35. public FileReader(int spoolSize) {
  36. super(spoolSize);
  37. }
  38. public int readFiles(Collection<FileState> fileList) throws AdapterException {
  39. int eventCount = 0;
  40. if(logger.isTraceEnabled()) {
  41. logger.trace("Reading " + fileList.size() + " file(s)");
  42. }
  43. pointerMap = new HashMap<File,Long>(fileList.size(),1);
  44. for(FileState state : fileList) {
  45. eventCount += readFile(state, spoolSize - eventCount);
  46. }
  47. if(eventCount > 0) {
  48. try {
  49. adapter.sendEvents(eventList);
  50. } catch(AdapterException e) {
  51. eventList.clear(); // Be sure no events will be sent twice after reconnect
  52. throw e;
  53. }
  54. }
  55. for(FileState state : fileList) {
  56. state.setPointer(pointerMap.get(state.getFile()));
  57. }
  58. eventList.clear();
  59. return eventCount; // Return number of events sent to adapter
  60. }
  61. private int readFile(FileState state, int spaceLeftInSpool) {
  62. File file = state.getFile();
  63. long pointer = state.getPointer();
  64. int numberOfEvents = 0;
  65. try {
  66. if(state.isDeleted() || state.getRandomAccessFile() == null) { // Don't try to read this file
  67. if(logger.isTraceEnabled()) {
  68. logger.trace("File : " + file + " has been deleted");
  69. }
  70. } else if(state.getRandomAccessFile().length() == 0) {
  71. if(logger.isTraceEnabled()) {
  72. logger.trace("File : " + file + " is empty");
  73. }
  74. } else {
  75. int eventListSizeBefore = eventList.size();
  76. if(logger.isTraceEnabled()) {
  77. logger.trace("File : " + file + " pointer : " + pointer);
  78. logger.trace("Space left in spool : " + spaceLeftInSpool);
  79. }
  80. if(isCompressedFile(state)) {
  81. pointer = file.length();
  82. } else {
  83. pointer = readLines(state, spaceLeftInSpool);
  84. }
  85. numberOfEvents = eventList.size() - eventListSizeBefore;
  86. }
  87. } catch(IOException e) {
  88. logger.warn("Exception raised while reading file : " + state.getFile(), e);
  89. }
  90. pointerMap.put(file, pointer);
  91. return numberOfEvents; // Return number of events read
  92. }
  93. private boolean isCompressedFile(FileState state) {
  94. RandomAccessFile reader = state.getRandomAccessFile();
  95. try {
  96. for(byte[] magic : MAGICS) {
  97. byte[] fileBytes = new byte[magic.length];
  98. reader.seek(0);
  99. int read = reader.read(fileBytes);
  100. if (read != magic.length) {
  101. continue;
  102. }
  103. if(Arrays.equals(magic, fileBytes)) {
  104. logger.debug("Compressed file detected : " + state.getFile());
  105. return true;
  106. }
  107. }
  108. } catch(IOException e) {
  109. logger.warn("Exception raised while reading file : " + state.getFile(), e);
  110. }
  111. return false;
  112. }
  113. private long readLines(FileState state, int spaceLeftInSpool) {
  114. RandomAccessFile reader = state.getRandomAccessFile();
  115. long pos = state.getPointer();
  116. try {
  117. reader.seek(pos);
  118. byte[] line = readLine(reader);
  119. while (line != null && spaceLeftInSpool > 0) {
  120. if(logger.isTraceEnabled()) {
  121. logger.trace("-- Read line : " + new String(line));
  122. logger.trace("-- Space left in spool : " + spaceLeftInSpool);
  123. }
  124. pos = reader.getFilePointer();
  125. addEvent(state, pos, line);
  126. line = readLine(reader);
  127. spaceLeftInSpool--;
  128. }
  129. reader.seek(pos); // Ensure we can re-read if necessary
  130. } catch(IOException e) {
  131. logger.warn("Exception raised while reading file : " + state.getFile(), e);
  132. }
  133. return pos;
  134. }
  135. private byte[] readLine(RandomAccessFile reader) throws IOException {
  136. byteBuffer.clear();
  137. int ch;
  138. boolean seenCR = false;
  139. while((ch=reader.read()) != -1) {
  140. switch(ch) {
  141. case '\n':
  142. byte[] line = new byte[byteBuffer.position()];
  143. byteBuffer.rewind();
  144. byteBuffer.get(line);
  145. return line;
  146. case '\r':
  147. seenCR = true;
  148. break;
  149. default:
  150. if (seenCR) {
  151. byteBuffer.put((byte) '\r');
  152. seenCR = false;
  153. }
  154. byteBuffer.put((byte)ch);
  155. }
  156. }
  157. return null;
  158. }
  159. }