avatar

目录
flume源码修改之flumeTailDirSource兼容log4j

tailDir Source

优点

1)断点续传

2)同时监控多目录

存在的问题

1)说明:使用正则表达式监控文件名时,当修改文件名称之后,会重复读取数据。

2)示例:

配置信息 test.conf

Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/data/flume.*
a1.sources.r1.positionFile = /opt/module/flume/taildir/taildir_flume.json

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3)启动任务

shell
1
[ysss@hadoop102 flume]$ bin/flume-ng agent -n a1 -c conf -f conf/test.conf -Dflume.root.logger=INFO,console

4)测试

(1)在/opt/module/data目录下创建flume.log

shell
1
2
3
[ysss@hadoop102 data]$ pwd
/opt/module/data
[ysss@hadoop102 data]$ touch flume.log

(2)向flume.log文件中添加数据

shell
1
2
[ysss@hadoop102 data]$ echo hello >> flume.log 
[ysss@hadoop102 data]$ echo ysss >> flume.log

(3)查看监控Flume控制台

(4)修改flume.log为flume.2020-06-09.log

(5)再次查看监控Flume控制台

解决方案

1)方案一

跟公司后台人员协商;

让他们使用类似logback不更名打印日志框架,不要使用log4j会更名的打印日志框架。对于不想协商、项目经理或组长偏向JAVA组的,只能使用方案二了。

2)方案二

修改TailDirSource源码:

1、flume-taildir-source\src\main\java\org\apache\flume\source\taildir\TailFile.java

java
1
2
3
4
5
6
7
8
9
10
11
public boolean updatePos(String path, long inode, long pos) throws IOException {
// if (this.inode == inode && this.path.equals(path)) {
// ysss
if (this.inode == inode) {
setPos(pos);
updateFilePos(pos);
logger.info("Updated position, file: " + path + ", inode: " + inode + ", pos: " + pos);
return true;
}
return false;
}

2、\src\main\java\org\apache\flume\source\taildir\ReliableTaildirEventReader.java

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
**
* Update tailFiles mapping if a new file is created or appends are detected
* to the existing file.
*/
public List<Long> updateTailFiles(boolean skipToEnd) throws IOException {
updateTime = System.currentTimeMillis();
List<Long> updatedInodes = Lists.newArrayList();

for (TaildirMatcher taildir : taildirCache) {
Map<String, String> headers = headerTable.row(taildir.getFileGroup());

for (File f : taildir.getMatchingFiles()) {
long inode;
try {
inode = getInode(f);
} catch (NoSuchFileException e) {
logger.info("File has been deleted in the meantime: " + e.getMessage());
continue;
}
TailFile tf = tailFiles.get(inode);
//if (tf == null || !tf.getPath().equals(f.getAbsolutePath())) {
//ysss
if (tf == null) {
long startPos = skipToEnd ? f.length() : 0;
tf = openFile(f, headers, inode, startPos);
} else {
boolean updated = tf.getLastUpdated() < f.lastModified() || tf.getPos() != f.length();
if (updated) {
if (tf.getRaf() == null) {
tf = openFile(f, headers, inode, tf.getPos());
}
if (f.length() < tf.getPos()) {
logger.info("Pos " + tf.getPos() + " is larger than file size! "
+ "Restarting from pos 0, file: " + tf.getPath() + ", inode: " + inode);
tf.updatePos(tf.getPath(), inode, 0);
}
}
tf.setNeedTail(updated);
}
tailFiles.put(inode, tf);
updatedInodes.add(inode);
}
}
return updatedInodes;
}

总结

taildir和logback配合使用,为什么不是log4j?

logback的日志:ysss.2020-05-18.log,ysss.2020-05-19.log

Log4j的日志:ysss.log -> ysss.2020-05-18.log,在一天过去之后,改名为后者存盘

Linux对于文件而言

​ (1) 全路径

​ (2) Inode(Linux文件的唯一标识,修改名称不会改动INode值)

但是tailDirSource的工作机制:文件更名或者INode改变都会被识别为一个新文件!也就是说,如果使用log4j,日志会更名,被tailDirSource识别为一个新文件,重复读取。

如果非要使用log4j怎么办呢?

改flume源码!只有INode改变才会是被为一个新文件!

  • source\taildir\TailFile.javaupdatePos 方法

  • source\taildir\ReliableTaildirEventReader.javaupdateTailFiles 方法

文章作者: Yang4
文章链接: https://masteryang4.github.io/2020/06/30/flume%E6%BA%90%E7%A0%81%E4%BF%AE%E6%94%B9%E4%B9%8BflumeTailDirSource%E5%85%BC%E5%AE%B9log4j/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 MasterYangBlog
打赏
  • 微信
    微信
  • 支付宝
    支付宝

评论