avatar

目录
flume拦截器之flumeHDFS_Sink时间问题

HDFS Sink存在的问题

hdfs.useLocalTimeStamp设置为true,也会在Event头信息中添加”timestamp”的key

我们一般设置为false,因为我们目前使用的是KafkaSource,会根据当前系统时间添加该头信息。

说明:HDFS Sink要想根据时间滚动文件夹,必须在Event头信息中添加”timestamp”的key用于提供给HDFS Sink使用。

我们目前使用的是KafkaSource,会根据当前系统时间添加该头信息。

问题:我们使用的是按照每天的具体时间来创建新的目录,假如我们Flume任务在夜间11点多挂了,零点以后任务才被重新启动,那么昨天的挂掉之后的数据就会被算作第二天的数据了。

解决方案

我们需要根据事件内部时间来控制HDFS目录时间的创建,

思路为自定义拦截器来修改KafkaSource自动添加的时间戳。

使用事件内部时间【替换】KafkaSource自动添加的时间戳

1)创建Maven工程flume-interceptor

2)创建包名:com.ysss.flume.interceptor

3)在pom.xml文件中添加如下配置

xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>

</dependencies>

<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

4)在com.ysss.flume.interceptor包下创建TimeStampInterceptor类

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package com.ysss.interceptor;

import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class TimeStampInterceptor implements Interceptor {

private ArrayList<Event> events = new ArrayList<>();

@Override
public void initialize() {

}

@Override
public Event intercept(Event event) {

Map<String, String> headers = event.getHeaders();
String log = new String(event.getBody(), StandardCharsets.UTF_8);

JSONObject jsonObject = JSONObject.parseObject(log);

String ts = jsonObject.getString("ts");
headers.put("timestamp", ts);

return event;
}

@Override
public List<Event> intercept(List<Event> list) {
events.clear();
for (Event event : list) {
events.add(intercept(event));
}

return events;
}

@Override
public void close() {

}

public static class Builder implements Interceptor.Builder {
@Override
public Interceptor build() {
return new TimeStampInterceptor();
}

@Override
public void configure(Context context) {
}
}
}

5)打包

flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar

6)需要先将打好的包放入到hadoop102的/opt/module/flume/lib文件夹下面。

shell
1
2
[ysss@hadoop102 lib]$ ls | grep interceptor
flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar

7)分发Flume到hadoop103、hadoop104

shell
1
[ysss@hadoop102 module]$ xsync flume/

调整消费Flume配置文件

shell
1
2
[ysss@hadoop104 conf]$ pwd
/opt/module/flume/conf

修改配置文件kafka-flume-hdfs.conf

Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1

#source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics=topic_log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.ysss.interceptor.TimeStampInterceptor$Builder #【注意】

#channel
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6

#sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log-
a1.sinks.k1.hdfs.round = false

a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0

#控制输出文件是原生文件。
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = lzop

#拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1
文章作者: Yang4
文章链接: https://masteryang4.github.io/2020/06/30/flume%E6%8B%A6%E6%88%AA%E5%99%A8%E4%B9%8BflumeHDFS-Sink%E6%97%B6%E9%97%B4%E9%97%AE%E9%A2%98/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 MasterYangBlog
打赏
  • 微信
    微信
  • 支付宝
    支付宝

评论