FileReader.java 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. package info.fetter.logstashforwarder;
  2. /*
  3. * Copyright 2015 Didier Fetter
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. import info.fetter.logstashforwarder.util.AdapterException;
  19. import java.io.File;
  20. import java.io.IOException;
  21. import java.io.RandomAccessFile;
  22. import java.util.Arrays;
  23. import java.util.Collection;
  24. import java.util.HashMap;
  25. import java.util.Map;
  26. import org.apache.log4j.Logger;
  27. public class FileReader extends Reader {
  28. private static Logger logger = Logger.getLogger(FileReader.class);
  29. private static final byte[] ZIP_MAGIC = new byte[] {(byte) 0x50, (byte) 0x4b, (byte) 0x03, (byte) 0x04};
  30. private static final byte[] LZW_MAGIC = new byte[] {(byte) 0x1f, (byte) 0x9d};
  31. private static final byte[] LZH_MAGIC = new byte[] {(byte) 0x1f, (byte) 0xa0};
  32. private static final byte[] GZ_MAGIC = new byte[] {(byte) 0x1f, (byte) 0x8b, (byte) 0x08};
  33. private static final byte[][] MAGICS = new byte[][] {ZIP_MAGIC, LZW_MAGIC, LZH_MAGIC, GZ_MAGIC};
  34. private Map<File,Long> pointerMap;
  35. public FileReader(int spoolSize) {
  36. super(spoolSize);
  37. }
  38. public int readFiles(Collection<FileState> fileList) throws AdapterException {
  39. int eventCount = 0;
  40. stringBuilder = new StringBuilder(STRINGBUILDER_INITIAL_CAPACITY);
  41. if(logger.isTraceEnabled()) {
  42. logger.trace("Reading " + fileList.size() + " file(s)");
  43. }
  44. pointerMap = new HashMap<File,Long>(fileList.size(),1);
  45. for(FileState state : fileList) {
  46. eventCount += readFile(state, spoolSize - eventCount);
  47. }
  48. if(eventCount > 0) {
  49. try {
  50. adapter.sendEvents(eventList);
  51. } catch(AdapterException e) {
  52. eventList.clear(); // Be sure no events will be sent twice after reconnect
  53. throw e;
  54. }
  55. }
  56. for(FileState state : fileList) {
  57. state.setPointer(pointerMap.get(state.getFile()));
  58. }
  59. eventList.clear();
  60. return eventCount; // Return number of events sent to adapter
  61. }
  62. private int readFile(FileState state, int spaceLeftInSpool) {
  63. File file = state.getFile();
  64. long pointer = state.getPointer();
  65. if(state.isDeleted() || state.getRandomAccessFile() == null) { // Don't try to read this file
  66. pointerMap.put(file, pointer);
  67. return 0;
  68. } else {
  69. int eventListSizeBefore = eventList.size();
  70. if(logger.isTraceEnabled()) {
  71. logger.trace("File : " + file + " pointer : " + pointer);
  72. logger.trace("Space left in spool : " + spaceLeftInSpool);
  73. }
  74. if(isCompressedFile(state)) {
  75. pointer = file.length();
  76. } else {
  77. pointer = readLines(state, spaceLeftInSpool);
  78. }
  79. pointerMap.put(file, pointer);
  80. return eventList.size() - eventListSizeBefore; // Return number of events read
  81. }
  82. }
  83. private boolean isCompressedFile(FileState state) {
  84. RandomAccessFile reader = state.getRandomAccessFile();
  85. try {
  86. for(byte[] magic : MAGICS) {
  87. byte[] fileBytes = new byte[magic.length];
  88. reader.seek(0);
  89. int read = reader.read(fileBytes);
  90. if (read != magic.length) {
  91. continue;
  92. }
  93. if(Arrays.equals(magic, fileBytes)) {
  94. logger.debug("Compressed file detected : " + state.getFile());
  95. return true;
  96. }
  97. }
  98. } catch(IOException e) {
  99. logger.warn("Exception raised while reading file : " + state.getFile());
  100. e.printStackTrace();
  101. }
  102. return false;
  103. }
  104. private long readLines(FileState state, int spaceLeftInSpool) {
  105. RandomAccessFile reader = state.getRandomAccessFile();
  106. long pos = state.getPointer();
  107. try {
  108. reader.seek(pos);
  109. String line = readLine(reader);
  110. while (line != null && spaceLeftInSpool > 0) {
  111. if(logger.isTraceEnabled()) {
  112. logger.trace("-- Read line : " + line);
  113. logger.trace("-- Space left in spool : " + spaceLeftInSpool);
  114. }
  115. pos = reader.getFilePointer();
  116. addEvent(state, pos, line);
  117. line = readLine(reader);
  118. spaceLeftInSpool--;
  119. }
  120. reader.seek(pos); // Ensure we can re-read if necessary
  121. } catch(IOException e) {
  122. logger.warn("Exception raised while reading file : " + state.getFile());
  123. e.printStackTrace();
  124. }
  125. return pos;
  126. }
  127. private String readLine(RandomAccessFile reader) throws IOException {
  128. stringBuilder.setLength(0);
  129. int ch;
  130. boolean seenCR = false;
  131. while((ch=reader.read()) != -1) {
  132. switch(ch) {
  133. case '\n':
  134. return stringBuilder.toString();
  135. case '\r':
  136. seenCR = true;
  137. break;
  138. default:
  139. if (seenCR) {
  140. stringBuilder.append('\r');
  141. seenCR = false;
  142. }
  143. stringBuilder.append((char)ch); // add character, not its ascii value
  144. }
  145. }
  146. return null;
  147. }
  148. }