FileReader.java 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. package info.fetter.logstashforwarder;
  2. /*
  3. * Copyright 2015 Didier Fetter
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. import info.fetter.logstashforwarder.util.AdapterException;
  19. import java.io.File;
  20. import java.io.IOException;
  21. import java.io.RandomAccessFile;
  22. import java.util.Arrays;
  23. import java.util.Collection;
  24. import java.util.HashMap;
  25. import java.util.List;
  26. import java.util.Map;
  27. import org.apache.log4j.Logger;
  28. public class FileReader extends Reader {
  29. private static Logger logger = Logger.getLogger(FileReader.class);
  30. private static final byte[] ZIP_MAGIC = new byte[] {(byte) 0x50, (byte) 0x4b, (byte) 0x03, (byte) 0x04};
  31. private static final byte[] LZW_MAGIC = new byte[] {(byte) 0x1f, (byte) 0x9d};
  32. private static final byte[] LZH_MAGIC = new byte[] {(byte) 0x1f, (byte) 0xa0};
  33. private static final byte[] GZ_MAGIC = new byte[] {(byte) 0x1f, (byte) 0x8b, (byte) 0x08};
  34. private static final byte[][] MAGICS = new byte[][] {ZIP_MAGIC, LZW_MAGIC, LZH_MAGIC, GZ_MAGIC};
  35. private Map<File,Long> pointerMap;
  36. public FileReader(int spoolSize) {
  37. super(spoolSize);
  38. }
  39. public int readFiles(Collection<FileState> fileList) throws AdapterException {
  40. int eventCount = 0;
  41. stringBuilder = new StringBuilder(STRINGBUILDER_INITIAL_CAPACITY);
  42. if(logger.isTraceEnabled()) {
  43. logger.trace("Reading " + fileList.size() + " file(s)");
  44. }
  45. pointerMap = new HashMap<File,Long>(fileList.size(),1);
  46. for(FileState state : fileList) {
  47. eventCount += readFile(state, spoolSize - eventCount);
  48. }
  49. if(eventCount > 0) {
  50. adapter.sendEvents(eventList);
  51. }
  52. for(FileState state : fileList) {
  53. state.setPointer(pointerMap.get(state.getFile()));
  54. }
  55. eventList.clear();
  56. return eventCount; // Return number of events sent to adapter
  57. }
  58. private int readFile(FileState state, int spaceLeftInSpool) {
  59. int eventListSizeBefore = eventList.size();
  60. File file = state.getFile();
  61. long pointer = state.getPointer();
  62. if(logger.isTraceEnabled()) {
  63. logger.trace("File : " + file + " pointer : " + pointer);
  64. logger.trace("Space left in spool : " + spaceLeftInSpool);
  65. }
  66. if(isCompressedFile(state)) {
  67. pointer = file.length();
  68. } else {
  69. pointer = readLines(state, spaceLeftInSpool);
  70. }
  71. pointerMap.put(file, pointer);
  72. return eventList.size() - eventListSizeBefore; // Return number of events read
  73. }
  74. private boolean isCompressedFile(FileState state) {
  75. RandomAccessFile reader = state.getRandomAccessFile();
  76. try {
  77. for(byte[] magic : MAGICS) {
  78. byte[] fileBytes = new byte[magic.length];
  79. reader.seek(0);
  80. int read = reader.read(fileBytes);
  81. if (read != magic.length) {
  82. continue;
  83. }
  84. if(Arrays.equals(magic, fileBytes)) {
  85. logger.debug("Compressed file detected : " + state.getFile());
  86. return true;
  87. }
  88. }
  89. } catch(IOException e) {
  90. logger.warn("Exception raised while reading file : " + state.getFile());
  91. e.printStackTrace();
  92. }
  93. return false;
  94. }
  95. private long readLines(FileState state, int spaceLeftInSpool) {
  96. RandomAccessFile reader = state.getRandomAccessFile();
  97. long pos = state.getPointer();
  98. try {
  99. reader.seek(pos);
  100. String line = readLine(reader);
  101. while (line != null && spaceLeftInSpool > 0) {
  102. if(logger.isTraceEnabled()) {
  103. logger.trace("-- Read line : " + line);
  104. logger.trace("-- Space left in spool : " + spaceLeftInSpool);
  105. }
  106. pos = reader.getFilePointer();
  107. addEvent(state, pos, line);
  108. line = readLine(reader);
  109. spaceLeftInSpool--;
  110. }
  111. reader.seek(pos); // Ensure we can re-read if necessary
  112. } catch(IOException e) {
  113. logger.warn("Exception raised while reading file : " + state.getFile());
  114. e.printStackTrace();
  115. }
  116. return pos;
  117. }
  118. private String readLine(RandomAccessFile reader) throws IOException {
  119. stringBuilder.setLength(0);
  120. int ch;
  121. boolean seenCR = false;
  122. while((ch=reader.read()) != -1) {
  123. switch(ch) {
  124. case '\n':
  125. return stringBuilder.toString();
  126. case '\r':
  127. seenCR = true;
  128. break;
  129. default:
  130. if (seenCR) {
  131. stringBuilder.append('\r');
  132. seenCR = false;
  133. }
  134. stringBuilder.append((char)ch); // add character, not its ascii value
  135. }
  136. }
  137. return null;
  138. }
  139. }