FileReader.java 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. package info.fetter.logstashforwarder;
  2. import java.io.File;
  3. import java.io.IOException;
  4. import java.io.RandomAccessFile;
  5. import java.net.InetAddress;
  6. import java.net.UnknownHostException;
  7. import java.util.ArrayList;
  8. import java.util.HashMap;
  9. import java.util.List;
  10. import java.util.Map;
  11. import org.apache.log4j.Logger;
  12. /*
  13. * Copyright 2015 Didier Fetter
  14. *
  15. * Licensed under the Apache License, Version 2.0 (the "License");
  16. * you may not use this file except in compliance with the License.
  17. * You may obtain a copy of the License at
  18. *
  19. * http://www.apache.org/licenses/LICENSE-2.0
  20. *
  21. * Unless required by applicable law or agreed to in writing, software
  22. * distributed under the License is distributed on an "AS IS" BASIS,
  23. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  24. * See the License for the specific language governing permissions and
  25. * limitations under the License.
  26. *
  27. */
  28. public class FileReader {
  29. private static Logger logger = Logger.getLogger(FileReader.class);
  30. private ProtocolAdapter adapter;
  31. private int spoolSize = 0;
  32. private List<Event> eventList;
  33. private Map<File,Long> pointerMap;
  34. private String hostname;
  35. {
  36. try {
  37. hostname = InetAddress.getLocalHost().getHostName();
  38. } catch (UnknownHostException e) {
  39. throw new RuntimeException(e);
  40. }
  41. }
  42. public FileReader(int spoolSize) {
  43. this.spoolSize = spoolSize;
  44. eventList = new ArrayList<Event>(spoolSize);
  45. }
  46. public int readFiles(List<FileState> fileList) throws IOException {
  47. int eventCount = 0;
  48. if(logger.isTraceEnabled()) {
  49. logger.trace("Reading " + fileList.size() + " file(s)");
  50. }
  51. pointerMap = new HashMap<File,Long>(fileList.size(),1);
  52. for(FileState state : fileList) {
  53. eventCount += readFile(state, spoolSize - eventCount);
  54. }
  55. adapter.sendEvents(eventList);
  56. for(FileState state : fileList) {
  57. state.setPointer(pointerMap.get(state.getFile()));
  58. }
  59. eventList.clear();
  60. return eventCount; // Return number of events sent to adapter
  61. }
  62. private int readFile(FileState state, int spaceLeftInSpool) throws IOException {
  63. int eventListSizeBefore = eventList.size();
  64. File file = state.getFile();
  65. long pointer = state.getPointer();
  66. if(logger.isTraceEnabled()) {
  67. logger.trace("File : " + file.getCanonicalPath() + " pointer : " + pointer);
  68. logger.trace("Space left in spool : " + spaceLeftInSpool);
  69. }
  70. pointer = readLines(state, spaceLeftInSpool);
  71. pointerMap.put(file, pointer);
  72. return eventList.size() - eventListSizeBefore; // Return number of events read
  73. }
  74. private long readLines(FileState state, int spaceLeftInSpool) throws IOException {
  75. RandomAccessFile reader = state.getRandomAccessFile();
  76. long pos = reader.getFilePointer();
  77. reader.seek(pos);
  78. String line = readLine(reader);
  79. while (line != null && spaceLeftInSpool > 0) {
  80. if(logger.isTraceEnabled()) {
  81. logger.trace("-- Read line : " + line);
  82. logger.trace("-- Space left in spool : " + spaceLeftInSpool);
  83. }
  84. pos = reader.getFilePointer();
  85. addEvent(state, pos, line);
  86. line = readLine(reader);
  87. spaceLeftInSpool--;
  88. }
  89. reader.seek(pos); // Ensure we can re-read if necessary
  90. return pos;
  91. }
  92. private String readLine(RandomAccessFile reader) throws IOException {
  93. StringBuffer sb = new StringBuffer();
  94. int ch;
  95. boolean seenCR = false;
  96. while((ch=reader.read()) != -1) {
  97. switch(ch) {
  98. case '\n':
  99. return sb.toString();
  100. case '\r':
  101. seenCR = true;
  102. break;
  103. default:
  104. if (seenCR) {
  105. sb.append('\r');
  106. seenCR = false;
  107. }
  108. sb.append((char)ch); // add character, not its ascii value
  109. }
  110. }
  111. return null;
  112. }
  113. private void addEvent(FileState state, long pos, String line) throws IOException {
  114. Event event = new Event(state.getFields());
  115. event.addField("file", state.getFile().getCanonicalPath())
  116. .addField("offset", pos)
  117. .addField("line", line)
  118. .addField("host", hostname);
  119. eventList.add(event);
  120. }
  121. public ProtocolAdapter getAdapter() {
  122. return adapter;
  123. }
  124. public void setAdapter(ProtocolAdapter adapter) {
  125. this.adapter = adapter;
  126. }
  127. }