Flume进阶--万字详解【老大爷也能学会】

Flume进阶——事务、架构原理、拓扑结构、负载均衡、单源多出口、故障转移、聚合、自定义拦截器、自定义source、自定义sink

事务(Put、Take)

put 事务把数据批处理写入临时缓冲区 putList,,然后 doCommit 去检查 Channel 内存队列是否足够合并,如果不够,就回滚数据,如果够,就把 putList 的数据写入到 Channel,然后由 take 事务从 channel 中拉取,写入到临时缓冲区 takeList,然后把数据从 takeList 发送到 HDFS,发送完毕后清空缓冲区,如果某个数据发送失败,就回滚到 channel。

架构原理

在拦截阶段可以进行数据过滤清洗,洗掉脏数据。

ChannelSelector

因为一个 source 可以对应对各 channel ,ChannelSelector 的作用就是选出 Event 将要被发往哪个 Channel。其共有两种类型, 分别是 Replicating(复制)和 Multiplexing(多路复用)。 ReplicatingSelector 会将同一个 Event 发往所有的 Channel,Multiplexing 会根据自定义的配置,将不同的 Event 发往不同的 Channel,Multiplexing 要结合拦截器使用,Multiplexing 会根据数据的头信息来决定发送到哪个 channel。

SinkProcessor

一个 sink 只能绑定一个 channel,一个 channel 能绑定多个 sink。SinkProcessor 共 有 三 种 类 型 , 分 别 是 DefaultSinkProcessor 、LoadBalancingSinkProcessor 和 FailoverSinkProcessor。

DefaultSinkProcessor 对 应 的 是 单个的 Sink , LoadBalancingSinkProcessor 和 FailoverSinkProcessor 对应的是 Sink Group,LoadBalancingSinkProcessor 可以实现负载均衡的功能,FailoverSinkProcessor 可以错误恢复的功能。

LoadBalancingSinkProcessor 负载均衡:

一个 channel 会发给多个 sink

FailoverSinkProcessor 故障转移:

当一个 sink 故障,任务会转移到其他 sink

拓扑结构

简单串联

复制和多路复用

负载均衡和故障转移

聚合

单源多出口案例

前置条件: linux01 上启动 hive,hdfs,在 linux03 上部署 3 个 flume 任务,启动 hdfs。linux01 和 linux03 配置 ssh 免密登录。

要求: flume1 在 linux03 监听 linux01 的 hive 日志,把 hive 日志的新内容发送给 linux03 上的 flume2 和 flume3,flume2 把内容写到 hdfs,flume3 把内容写到 linux03 的本地文件/export/server/flume/job/group1/datas 文件夹中。

剧透: flume3 成功把 hive 日志的新内容写到 datas 文件夹,说明 linux03 确实监听到了 linux01 的 hive 日志并且成功把日志从 linux01 弄到了 linux03,但是 flume2 却没有把新内容写到 hdfs,猜想的可能是因为在 linux03 上写 flume2 的配置文件**sinks.k1.hdfs.path = hdfs://linux01:9820/flume/group1/%Y%m%d/%H,**linux01 和 linux03 是不同的服务器,跨服务器没写进去,所以建议在同一台服务器搞。

在 flume/job 目录中新建文件夹 group1 来存放本次案例的任务配置文件

1
2
3
4
5
mkdir group1
cd group1
vim flume-file-flume.conf
vim flume-flume-hdfs.conf
vim flume-flume-dir.conf

三个 conf 配置如下:

flume-file-flume.conf

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1 k2
    a1.channels = c1 c2

    # 将数据流复制给所有 channel
    a1.sources.r1.selector.type = replicating

    # Describe/configure the source
    a1.sources.r1.type = exec
    a1.sources.r1.command = ssh root@linux01 'tail -F /export/server/hive/logs/hive.log'
    #因为hive在linux01,flume在linux03,为了跨服务器监听,这里用了ssh免密登录
    a1.sources.r1.shell = /bin/bash -c

    # Describe the sink
    # sink 端的 avro 是一个数据发送者
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = linux03
    a1.sinks.k1.port = 4141

    a1.sinks.k2.type = avro
    a1.sinks.k2.hostname = linux03
    a1.sinks.k2.port = 4142
    # Describe the channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    a1.channels.c2.type = memory
    a1.channels.c2.capacity = 1000
    a1.channels.c2.transactionCapacity = 100
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1 c2
    a1.sinks.k1.channel = c1
    a1.sinks.k2.channel = c2

flume-flume-hdfs.conf

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
    #Name the components on this agent
    a2.sources = r1
    a2.sinks = k1
    a2.channels = c1
    #Describe/configure the source
    #source 端的 avro 是一个数据接收服务
    a2.sources.r1.type = avro
    a2.sources.r1.bind = linux03
    a2.sources.r1.port = 4141
    #Describe the sink
    a2.sinks.k1.type = hdfs
    a2.sinks.k1.hdfs.path = hdfs://linux01:9820/flume/group1/%Y%m%d/%H #这里在 linux03 把路径配置为 linux01 的 hdfs,可能就是出错原因
    #上传文件的前缀
    a2.sinks.k1.hdfs.filePrefix = group1- #是否按照时间滚动文件夹
    a2.sinks.k1.hdfs.round = true #多少时间单位创建一个新的文件夹
    a2.sinks.k1.hdfs.roundValue = 1 #重新定义时间单位
    a2.sinks.k1.hdfs.roundUnit = hour #是否使用本地时间戳
    a2.sinks.k1.hdfs.useLocalTimeStamp = true #积攒多少个 Event 才 flush 到 HDFS 一次
    a2.sinks.k1.hdfs.batchSize = 100 #设置文件类型,可支持压缩
    a2.sinks.k1.hdfs.fileType = DataStream #多久生成一个新的文件
    a2.sinks.k1.hdfs.rollInterval = 30 #设置每个文件的滚动大小大概是 128M
    a2.sinks.k1.hdfs.rollSize = 134217700 #文件的滚动与 Event 数量无关
    a2.sinks.k1.hdfs.rollCount = 0
    #Describe the channel
    a2.channels.c1.type = memory
    a2.channels.c1.capacity = 1000
    a2.channels.c1.transactionCapacity = 100
    #Bind the source and sink to the channel
    a2.sources.r1.channels = c1
    a2.sinks.k1.channel = c1

flume-flume-dir.conf

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
    #Name the components on this agent
    a3.sources = r1
    a3.sinks = k1
    a3.channels = c2
    # Describe/configure the source
    a3.sources.r1.type = avro
    a3.sources.r1.bind = linux03
    a3.sources.r1.port = 4142
    # Describe the sink
    a3.sinks.k1.type = file_roll
    a3.sinks.k1.sink.directory = /export/server/flume/job/group1/datas
    # Describe the channel
    a3.channels.c2.type = memory
    a3.channels.c2.capacity = 1000
    a3.channels.c2.transactionCapacity = 100
    # Bind the source and sink to the channel
    a3.sources.r1.channels = c2
    a3.sinks.k1.channel = c2

3 个配置文件写好后,在 linux03 启动 hdfs,然后在 flume 文件夹下启动三个 flume 任务

flume-ng agent -c conf/ -n a3 -f job/group1/flume-flume-dir.conf

flume-ng agent -c conf/ -n a2 -f job/group1/flume-flume-hdfs.conf

flume-ng agent -c conf/ -n a1 -f job/group1/flume-file-flume.conf

在 linux01 启动 hdfs ,然后启动 hivemetastore 和 hive,开始操作 hive,就会产生 hive 日志记录在 hive.log。

注意!hive.log 没产生新内容可能是因为 hive 日志配置出错,去 conf 文件夹找 hive-log4j2.properties,找到 hive.log.dir。修改成自己的 logs 路径,默认路径可能要用到 hive 环境变量,如果环境变量没有就直接写绝对路径。 datas 确实产生了新文件,但是有很多空的,不知道咋回事,可能是任务配置问题。

故障转移案例

前置:确保 4142、4141、44444 端口没被占用 在 linux03 的 flume/job 目录建 group2 文件夹,里面有如下 3 个配置文件:

flume-flume-console1.conf

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
    # Name the components on this agent
    a2.sources = r1
    a2.sinks = k1
    a2.channels = c1
    # Describe/configure the source
    a2.sources.r1.type = avro
    a2.sources.r1.bind = linux03
    a2.sources.r1.port = 4141
    # Describe the sink
    a2.sinks.k1.type = logger
    # Describe the channel
    a2.channels.c1.type = memory
    a2.channels.c1.capacity = 1000
    a2.channels.c1.transactionCapacity = 100
    # Bind the source and sink to the channel
    a2.sources.r1.channels = c1
    a2.sinks.k1.channel = c1

flume-flume-console2.conf

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
    #Name the components on this agent
    a3.sources = r1
    a3.sinks = k1
    a3.channels = c2 # Describe/configure the source
    a3.sources.r1.type = avro
    a3.sources.r1.bind = linux03
    a3.sources.r1.port = 4142 # Describe the sink
    a3.sinks.k1.type = logger # Describe the channel
    a3.channels.c2.type = memory
    a3.channels.c2.capacity = 1000
    a3.channels.c2.transactionCapacity = 100 # Bind the source and sink to the channel
    a3.sources.r1.channels = c2
    a3.sinks.k1.channel = c2

flume-netcat-flume.conf

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
    # Name the components on this agent
    a1.sources = r1
    a1.channels = c1
    a1.sinkgroups = g1
    a1.sinks = k1 k2
    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444


    a1.sinkgroups.g1.processor.type = failover
    a1.sinkgroups.g1.processor.priority.k1 = 5
    a1.sinkgroups.g1.processor.priority.k2 = 10
    #配置优先级,k1=5,优先级更高,因此数据会优先发给k1,当k1故障时,才会转移到k2
    a1.sinkgroups.g1.processor.maxpenalty = 10000


    # Describe the sink
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = linux03
    a1.sinks.k1.port = 4141
    a1.sinks.k2.type = avro
    a1.sinks.k2.hostname = linux03
    a1.sinks.k2.port = 4142
    # Describe the channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinkgroups.g1.sinks = k1 k2
    a1.sinks.k1.channel = c1
    a1.sinks.k2.channel = c1

分别启动 3 个任务

1)flume-ng agent -c conf/ -n a3 -f job/group2/flume-flume-console2.conf -Dflume.root.logger=INFO,console

2)flume-ng agent -c conf/ -n a2 -f job/group2/flume-flume-console1.conf -Dflume.root.logger=INFO,console

3)flume-ng agent -c conf/ -n a1 -f job/group2/flume-netcat-flume.conf 运行 nc localhost 44444 并发送内容: 在 console2 接收到(发送的汉字显示……) 找到 flume 进程,制造故障杀死 console2 任务,此时发生故障 ,任务会转移到 console1: 可以看到 console2 被杀死 继续发送数据,数据被 console1 接收 console1 接收成功 。

负载均衡案例

尚硅谷 P25

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
    # Name the components on this agent
    a1.sources = r1
    a1.channels = c1
    a1.sinkgroups = g1
    a1.sinks = k1 k2
    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444
    a1.sinkgroups.g1.processor.type = load_balance
    # Describe the sink
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = linux03
    a1.sinks.k1.port = 4141
    a1.sinks.k2.type = avro
    a1.sinks.k2.hostname = linux03
    a1.sinks.k2.port = 4142
    # Describe the channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinkgroups.g1.sinks = k1 k2
    a1.sinks.k1.channel = c1
    a1.sinks.k2.channel = c1

配置文件和故障转移案例一样,只有 flume-netcat-flume.conf 需要改。

Flume 聚合案例

三台服务器:linux01、linux02、linux03,监控 linux03 的 job/group.log 文件和 linux02 的 44444 端口,把监测到的数据传给 linux01,在 linux01 的控制台输出。

linux03 的任务配置文件:flume3-logger-flume.conf

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    # Describe/configure the source
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -F /export/server/flume/job/group.log
    a1.sources.r1.shell = /bin/bash -c
    # Describe the sink
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = linux01
    a1.sinks.k1.port = 4141

    # Describe the channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

linux02 的任务配置文件:flume2-netcat-flume.conf

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
    # Name the components on this agent
    a2.sources = r1
    a2.sinks = k1
    a2.channels = c1
    # Describe/configure the source
    a2.sources.r1.type = netcat
    a2.sources.r1.bind = linux02
    a2.sources.r1.port = 44444
    # Describe the sink
    a2.sinks.k1.type = avro
    a2.sinks.k1.hostname = linux01
    a2.sinks.k1.port = 4141
    # Use a channel which buffers events in memory
    a2.channels.c1.type = memory
    a2.channels.c1.capacity = 1000
    a2.channels.c1.transactionCapacity = 100
    # Bind the source and sink to the channel
    a2.sources.r1.channels = c1
    a2.sinks.k1.channel = c1

linux01 的任务配置文件:flume1-flume-logger.conf

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
    # Name the components on this agent
    a3.sources = r1
    a3.sinks = k1
    a3.channels = c1
    # Describe/configure the source
    a3.sources.r1.type = avro

    a3.sources.r1.bind = linux01
    a3.sources.r1.port = 4141
    # Describe the sink
    # Describe the sink
    a3.sinks.k1.type = logger
    # Describe the channel
    a3.channels.c1.type = memory
    a3.channels.c1.capacity = 1000
    a3.channels.c1.transactionCapacity = 100
    # Bind the source and sink to the channel
    a3.sources.r1.channels = c1
    a3.sinks.k1.channel = c1

自定义拦截器

案例需求

使用 Flume 采集服务器本地日志,需要按照日志类型的不同,将不同种类的日志发往不 同的分析系统。

需求分析

实际的开发中,一台服务器产生的日志类型可能有很多种,不同类型的日志可能需要 发送到不同的分析系统。此时会用到 Flume 拓扑结构中的 Multiplexing 结构,Multiplexing 的原理是,根据 event 中 Header 的某个 key 的值,将不同的 event 发送到不同的 Channel,所以我们需要自定义一个 Interceptor,为不同类型的 event 的 Header 中的 key 赋予不同的值。 在该案例中,我们以端口数据模拟日志,以是否包含”atguigu”模拟不同类型的日志, 我们需要自定义 interceptor 区分数据中是否包含”atguigu”,将其分别发往不同的分析 系统(Channel)。

自定义拦截器打成 jar 包

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;


/**
 * @Author:懒大王Smile
 * @Date: 2024/4/28
 * @Time: 19:54
 * @Description:
 */

public class TypeInterceptor implements Interceptor {


    //声明一个存放事件的集合
    private List<Event> addHeaderEvents;


    @Override
    public void initialize() {
        //初始化存放事件的集合
        addHeaderEvents = new ArrayList<>();
    }

    //单个事件拦截
    @Override
    public Event intercept(Event event) {
        //1.获取事件中的头信息
        Map<String, String> headers = event.getHeaders();
        //2.获取事件中的 body 信息
        String body = new String(event.getBody());
        //3.根据 body 中是否有"atguigu"来决定添加怎样的头信息
        if (body.contains("sereins")) {
            //4.添加头信息
            headers.put("type", "sereins");
        } else {
            //4.添加头信息
            headers.put("type", "other");
        }
        return event;
    }

    //批量事件拦截
    @Override
    public List<Event> intercept(List<Event> events) {
        //1.清空集合
        addHeaderEvents.clear();
        //2.遍历 events
        for (Event event : events) {
            //3.给每一个事件添加头信息
            addHeaderEvents.add(intercept(event));
        }
        //4.返回结果
        return addHeaderEvents;
    }

    @Override
    public void close() {
    }


    //静态内部类
    public static class Builder implements Interceptor.Builder {
        @Override
        public Interceptor build() {
            return new TypeInterceptor();
        }

        @Override
        public void configure(Context context) {
        }

    }
}

所需依赖:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.smile</groupId>
    <artifactId>interceptor</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>


    <dependencies>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.9.0</version>
        </dependency>
    </dependencies>

</project>

jar 包打好放到 flume 的 lib 目录下,flume 启动时会加载 lib 的所有 jar 包。

注意!!自定义拦截器的 jar 包源代码是定制的,里面的过滤拦截规则需要根据实际业务来编写,并且 jdk 最好是 1.8。

任务配置文件

inux01 服务器的 flume 配置文件 job/group4/interceptor-flume1.conf:

1 配置 1 个 netcat source,1 个 sink group(2 个 avro sink), 并配置相应的 ChannelSelector 和 interceptor。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1 k2
    a1.channels = c1 c2
    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444

    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = TypeInterceptor$Builder
    a1.sources.r1.selector.type = multiplexing
    a1.sources.r1.selector.header = type
    a1.sources.r1.selector.mapping.sereins = c1
    a1.sources.r1.selector.mapping.other = c2

    # Describe the sink
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = linux02
    a1.sinks.k1.port = 4141
    a1.sinks.k2.type=avro
    a1.sinks.k2.hostname = linux03
    a1.sinks.k2.port = 4242
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    # Use a channel which buffers events in memory
    a1.channels.c2.type = memory
    a1.channels.c2.capacity = 1000
    a1.channels.c2.transactionCapacity = 100
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1 c2
    a1.sinks.k1.channel = c1
    a1.sinks.k2.channel = c2

linux02 服务器的 flume 配置文件 job/group4/interceptor-flume2.conf:

配置一个 avro source 和一个 logger sink

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    a1.sources.r1.type = avro
    a1.sources.r1.bind = linux02
    a1.sources.r1.port = 4141
    a1.sinks.k1.type = logger
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    a1.sinks.k1.channel = c1
    a1.sources.r1.channels = c1

linux03 服务器的 flume 配置文件 job/group4/interceptor-flume3.conf:

配置一个 avro source 和一个 logger sink

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    a1.sources.r1.type = avro
    a1.sources.r1.bind = linux03
    a1.sources.r1.port = 4242
    a1.sinks.k1.type = logger
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    a1.sinks.k1.channel = c1
    a1.sources.r1.channels = c1

以上配置完后,在 linux01 nc localhost 44444,然而

尚硅谷 Flume P33 后面的以后再搞

自定义 Source

自定义 Sink

事务源码

comments powered by Disqus
使用 Hugo 构建
主题 StackJimmy 设计