tailDir Source 优点 1)断点续传
2)同时监控多目录
存在的问题 1)说明:使用正则表达式 监控文件名时,当修改文件名称之后,会重复读取数据。
2)示例:
配置信息 test.conf
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = TAILDIR a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /opt/module/data/flume.* a1.sources.r1.positionFile = /opt/module/flume/taildir/taildir_flume.json # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
3)启动任务
1 [ysss@hadoop102 flume]$ bin/flume-ng agent -n a1 -c conf -f conf/test.conf -Dflume.root.logger=INFO,console
4)测试
(1)在/opt/module/data目录下创建flume.log
1 2 3 [ysss@hadoop102 data]$ pwd /opt/module/data [ysss@hadoop102 data]$ touch flume.log
(2)向flume.log文件中添加数据
1 2 [ysss@hadoop102 data]$ echo hello >> flume.log [ysss@hadoop102 data]$ echo ysss >> flume.log
(3)查看监控Flume控制台
(4)修改flume.log为flume.2020-06-09.log
(5)再次查看监控Flume控制台
解决方案 1)方案一
跟公司后台人员协商;
让他们使用类似logback不更名打印日志框架,不要使用log4j会更名的打印日志框架。对于不想协商、项目经理或组长偏向JAVA组的,只能使用方案二了。
2)方案二
修改TailDirSource源码:
1、flume-taildir-source\src\main\java\org\apache\flume\source\taildir\TailFile.java
1 2 3 4 5 6 7 8 9 10 11 public boolean updatePos (String path, long inode, long pos) throws IOException { if (this .inode == inode) { setPos(pos); updateFilePos(pos); logger.info("Updated position, file: " + path + ", inode: " + inode + ", pos: " + pos); return true ; } return false ; }
2、\src\main\java\org\apache\flume\source\taildir\ReliableTaildirEventReader.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 ** * Update tailFiles mapping if a new file is created or appends are detected * to the existing file. */ public List<Long> updateTailFiles (boolean skipToEnd) throws IOException { updateTime = System.currentTimeMillis(); List<Long> updatedInodes = Lists.newArrayList(); for (TaildirMatcher taildir : taildirCache) { Map<String, String> headers = headerTable.row(taildir.getFileGroup()); for (File f : taildir.getMatchingFiles()) { long inode; try { inode = getInode(f); } catch (NoSuchFileException e) { logger.info("File has been deleted in the meantime: " + e.getMessage()); continue ; } TailFile tf = tailFiles.get(inode); if (tf == null ) { long startPos = skipToEnd ? f.length() : 0 ; tf = openFile(f, headers, inode, startPos); } else { boolean updated = tf.getLastUpdated() < f.lastModified() || tf.getPos() != f.length(); if (updated) { if (tf.getRaf() == null ) { tf = openFile(f, headers, inode, tf.getPos()); } if (f.length() < tf.getPos()) { logger.info("Pos " + tf.getPos() + " is larger than file size! " + "Restarting from pos 0, file: " + tf.getPath() + ", inode: " + inode); tf.updatePos(tf.getPath(), inode, 0 ); } } tf.setNeedTail(updated); } tailFiles.put(inode, tf); updatedInodes.add(inode); } } return updatedInodes; }
总结
taildir和logback配合使用,为什么不是log4j?
logback的日志:ysss.2020-05-18.log,ysss.2020-05-19.log
Log4j的日志:ysss.log -> ysss.2020-05-18.log,在一天过去之后,改名为后者存盘
Linux对于文件而言
(1) 全路径
(2) Inode(Linux文件的唯一标识,修改名称不会改动INode值)
但是tailDirSource的工作机制:文件更名或者INode改变都会被识别为一个新文件!也就是说,如果使用log4j,日志会更名,被tailDirSource识别为一个新文件,重复读取。
如果非要使用log4j怎么办呢?
改flume源码!只有INode改变才会是被为一个新文件!