HDFS Sink存在的问题
hdfs.useLocalTimeStamp设置为true,也会在Event头信息中添加”timestamp”的key
我们一般设置为false,因为我们目前使用的是KafkaSource,会根据当前系统时间添加该头信息。
![]()
说明:HDFS Sink要想根据时间滚动文件夹,必须在Event头信息中添加”timestamp”的key用于提供给HDFS Sink使用。
我们目前使用的是KafkaSource,会根据当前系统时间添加该头信息。
问题:我们使用的是按照每天的具体时间来创建新的目录,假如我们Flume任务在夜间11点多挂了,零点以后任务才被重新启动,那么昨天的挂掉之后的数据就会被算作第二天的数据了。
解决方案
我们需要根据事件内部时间来控制HDFS目录时间的创建,
思路为自定义拦截器来修改KafkaSource自动添加的时间戳。
使用事件内部时间【替换】KafkaSource自动添加的时间戳
1)创建Maven工程flume-interceptor
2)创建包名:com.ysss.flume.interceptor
3)在pom.xml文件中添加如下配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| <dependencies> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.9.0</version> <scope>provided</scope> </dependency>
<dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.62</version> </dependency>
</dependencies>
<build> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
|
4)在com.ysss.flume.interceptor包下创建TimeStampInterceptor类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
| package com.ysss.interceptor;
import com.alibaba.fastjson.JSONObject; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import java.util.Map;
public class TimeStampInterceptor implements Interceptor {
private ArrayList<Event> events = new ArrayList<>();
@Override public void initialize() {
}
@Override public Event intercept(Event event) {
Map<String, String> headers = event.getHeaders(); String log = new String(event.getBody(), StandardCharsets.UTF_8);
JSONObject jsonObject = JSONObject.parseObject(log);
String ts = jsonObject.getString("ts"); headers.put("timestamp", ts);
return event; }
@Override public List<Event> intercept(List<Event> list) { events.clear(); for (Event event : list) { events.add(intercept(event)); }
return events; }
@Override public void close() {
}
public static class Builder implements Interceptor.Builder { @Override public Interceptor build() { return new TimeStampInterceptor(); }
@Override public void configure(Context context) { } } }
|
5)打包
flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar
6)需要先将打好的包放入到hadoop102的/opt/module/flume/lib文件夹下面。
1 2
| [ysss@hadoop102 lib]$ ls | grep interceptor flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar
|
7)分发Flume到hadoop103、hadoop104
1
| [ysss@hadoop102 module]$ xsync flume/
|
调整消费Flume配置文件
1 2
| [ysss@hadoop104 conf]$ pwd /opt/module/flume/conf
|
修改配置文件kafka-flume-hdfs.conf
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| #组件 a1.sources=r1 a1.channels=c1 a1.sinks=k1
#source a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r1.batchSize = 5000 a1.sources.r1.batchDurationMillis = 2000 a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092 a1.sources.r1.kafka.topics=topic_log a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = com.ysss.interceptor.TimeStampInterceptor$Builder #【注意】
#channel a1.channels.c1.type = file a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1 a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1/ a1.channels.c1.maxFileSize = 2146435071 a1.channels.c1.capacity = 1000000 a1.channels.c1.keep-alive = 6
#sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d a1.sinks.k1.hdfs.filePrefix = log- a1.sinks.k1.hdfs.round = false
a1.sinks.k1.hdfs.rollInterval = 10 a1.sinks.k1.hdfs.rollSize = 134217728 a1.sinks.k1.hdfs.rollCount = 0
#控制输出文件是原生文件。 a1.sinks.k1.hdfs.fileType = CompressedStream a1.sinks.k1.hdfs.codeC = lzop
#拼装 a1.sources.r1.channels = c1 a1.sinks.k1.channel= c1
|