Flume入门--万字详解

Flume基础架构、安装部署、入门案例、断点续传源码修改

概述

Flume 是 Cloudera 提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传 输的系统。Flume 基于流式架构,灵活简单。

基础架构

Flume 运行的核心是 Agent。Flume 是以 agent 为最小的独立运行单位。一个 agent 就是一个 JVM。它是 一个完整的数据收集工具,含有三个核心组件,分别是 source、 channel、 sink。通过这些组件, Event 可以从一个地方流向另一个地方。如下图所示:

Agent

  • Agent 是一个 JVM 进程,它以事件的形式将数据从源头送至目的。
  • Agent 主要有 3 个部分组成,Source、Channel、Sink。同一台服务器可以运行多个 Agent,每个 Agent 可以有多个 source、sink、channel。Agent 的名字可以相同但是不能同时启动任务,否则会出现冲突。

Source

  • Source 是负责接收数据到 Flume Agent 并传给 Channel 的组件。
  • Source 组件可以处理各种类型、各种格式的日志数据,包括 avro、thrift、exec、jms、spooling directory、netcat、taildir、 sequence generator、syslog、http、legacy 这些不同的数据源。

Sink

  • Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储系统或索引系统、或者被发送到另一个 Flume Agent。
  • Sink 组件目的地包括 hdfs、logger、avro、thrift、ipc、file、HBase、solr、自定义。

Channel

  • Channel 是位于 Source 和 Sink 之间的缓冲区。因此,Channel 允许 Source 和 Sink 运作在不同的速率上。
  • Channel 是线程安全的,可以同时处理几个 Source 的写入操作和几个 Sink 的读取操作。
  • Flume 自带两种 Channel:Memory Channel 和 File Channel。
  • Memory Channel 是内存中的队列。Memory Channel 在不需要关心数据丢失的情景下适 用。如果需要关心数据丢失,那么 Memory Channel 就不应该使用,因为程序死亡、机器宕 机或者重启都会导致数据丢失。
  • File Channel 将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数 据。

selector

选择器,作用于 source 端,然后决定数据发往哪个目标。

interceptor

拦截器,flume 允许使用拦截器拦截数据。允许使用拦截器链,作用于 source 和 sink 阶段。

Event

  • 传输单元,Flume 数据传输的基本单元,以 Event 的形式将数据从源头送至目的地。
  • Event 由 Header 和 Body 两部分组成,Header 用来存放该 event 的一些属性,为 K-V 结构, Body 用来存放该条数据,形式为字节数组。

安装部署

解压

tar -zxvf /export/server/apache-flume-1.9.0-bin.tar.gz /export/server/


为了让 flume1.9 兼容 hadoop3.x,要删除 flume lib 包下的 guava-11.0.2.jar

rm guava-11.0.2.jar

Netcat

安装

sudo yum install -y nc

简单案例


Flume 入门案例

netcat 本机端口监控

在 flume 文件夹下创建工作目录 job

mkdir job

在 job 目录下建立任务配置文件,文件名任取,建议见名知意,net 表示数据源是端口,logger 表示数据是日志文件

vim net-flume-logger.conf

配置文件内容如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
    # Name the components on this agent
    a1.sources = r1 #a1是该agent名,不可重复
    a1.sinks = k1
    a1.channels = c1

    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 4444

    # Describe the sink
    a1.sinks.k1.type = logger

    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000 #最多接收1000个event
    a1.channels.c1.transactionCapacity = 100 #100个事务,一次最多发送100个event,事务失败会回滚。capacity应该 < transactionCapacity
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1 #一个source可以绑定多个channel
    a1.sinks.k1.channel = c1 #一个sink只能绑定一个channel

启动两个终端,一个终端启动监听任务:

1
2
3
4
5
6
7
8
在 flume 目录下运行:
flume-ng agent -c conf/ -n a1 -f job/flume-netcat-logger.conf -D flume.root.logger=INFO,console

参数说明:
--conf/-c:表示配置文件存储在 conf/目录
--name/-n:表示给 agent 起名为 a1
--conf-file/-f:flume 本次启动读取的配置文件是在 job 文件夹下的 flume-telnet.conf 文件.
-Dflume.root.logger=INFO,console :-D 表示 flume 运行时动态修改 flume.root.logger 参数属性值,并将控制台日志打印级别设置为 INFO 级别。日志级别包括:log、info、warn、 error。

另一个终端使用 netcat 向监听的端口发送内容:

nc localhost 4444 检查启动任务的端口是否收到。

监控 hive 日志上传 hdfs

在 job 目录下新建任务的配置文件 flume-file-hdfs.conf,内容如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
    # Name the components on this agent
    a2.sources = r2
    a2.sinks = k2
    a2.channels = c2

    # Describe/configure the source
    a2.sources.r2.type = exec
    a2.sources.r2.command = tail -F /export/server/hive/logs/metastore.log
    #这里我监控的是hive的元数据日志

    # Describe the sink
    a2.sinks.k2.type = hdfs
    a2.sinks.k2.hdfs.path = hdfs://linux01:8020/flume/%Y%m%d/%H
    #8020端口不要搞错,具体查看hadoop的core-site.xml

    #上传文件的前缀
    a2.sinks.k2.hdfs.filePrefix = logs-
    #是否按照时间滚动文件夹
    a2.sinks.k2.hdfs.round = true
    #多少时间单位创建一个新的文件夹
    a2.sinks.k2.hdfs.roundValue = 1
    #重新定义时间单位
    a2.sinks.k2.hdfs.roundUnit = hour
    #是否使用本地时间戳
    a2.sinks.k2.hdfs.useLocalTimeStamp = true
    #注意:对于所有与时间相关的转义序列,Event Header 中必须存在以 “timestamp”的
    key(除非 hdfs.useLocalTimeStamp 设置为 true,此方法会使用 TimestampInterceptor 自
    动添加 timestamp)。

    #积攒多少个 Event 才 flush 到 HDFS 一次
    a2.sinks.k2.hdfs.batchSize = 100
    #设置文件类型,可支持压缩
    a2.sinks.k2.hdfs.fileType = DataStream
    #多久生成一个新的文件
    a2.sinks.k2.hdfs.rollInterval = 60
    #设置每个文件的滚动大小
    a2.sinks.k2.hdfs.rollSize = 134217700
    #文件的滚动与 Event 数量无关
    a2.sinks.k2.hdfs.rollCount = 0

    # Use a channel which buffers events in memory
    a2.channels.c2.type = memory
    a2.channels.c2.capacity = 1000
    a2.channels.c2.transactionCapacity = 100

    # Bind the source and sink to the channel
    a2.sources.r2.channels = c2
    a2.sinks.k2.channel = c2
1
2
3
4
5
6
7
8
先在 flume 文件夹下启动 flume 的监听任务:
bin/flume-ng agent -c conf/ -n a2 -f job/flume-file-hdfs.conf -D flume.root.logger=INFO,console
启动 hdfs 和 hive 的元数据服务
start-dfs.sh
start-hivemetastore.sh(自己写的脚本)
启动 hive 开始操作
hive 会产生元数据记录在 metastore.log 中,然后就会被 flume 监听到,flume 就会把监听到的日志写到 hdfs 的 flume 文件夹中。浏览器打开 linux01:9870 查看 hdfs 的文件目录,发现新建了 flume 文件夹,表示操作成功。
![](image-6.png)

注意!监听的 metastore.log 一定要是有效的,如果无效那么 hive 的日志就不会写到里面,flume 就检测不到,具体去看 hive 的日志配置教程。另外启动的 agent 的任务名字和配置文件不要搞错了,是 a2 和 flume-file-hdfs.conf。

实时读取目录文件到 hdfs

job 目录下编写 flume-dir-hdfs.conf 配置文件:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
    a3.sources = r3
    a3.sinks = k3
    a3.channels = c3

    # Describe/configure the source
    # source类型是目录
    a3.sources.r3.type = spooldir
    #定义监控目录
    a3.sources.r3.spoolDir = /export/server/flume/upload
    #定义文件上传完后缀
    a3.sources.r3.fileSuffix = .COMPLETED
    #是否有文件头
    a3.sources.r3.fileHeader = true
    #忽略所有以.tmp 结尾的文件,不上传
    a3.sources.r3.ignorePattern = ([^ ]*\.tmp)

    # Describe the sink
    a3.sinks.k3.type = hdfs
    a3.sinks.k3.hdfs.path = hdfs://linux01:8020/flume/upload/%Y%m%d/%H
    #hdfs的upload文件夹要提前手动创建好,flume不会自己创建,否则会报错。

    #上传文件的前缀
    a3.sinks.k3.hdfs.filePrefix = upload-

    #是否按照时间滚动文件夹
    a3.sinks.k3.hdfs.round = true

    #多少时间单位创建一个新的文件夹
    a3.sinks.k3.hdfs.roundValue = 1

    #重新定义时间单位
    a3.sinks.k3.hdfs.roundUnit = hour

    #是否使用本地时间戳
    a3.sinks.k3.hdfs.useLocalTimeStamp = true

    #积攒多少个 Event 才 flush 到 HDFS 一次
    a3.sinks.k3.hdfs.batchSize = 100

    #设置文件类型,可支持压缩
    a3.sinks.k3.hdfs.fileType = DataStream

    #多久生成一个新的文件
    a3.sinks.k3.hdfs.rollInterval = 60

    #设置每个文件的滚动大小大概是 128M
    a3.sinks.k3.hdfs.rollSize = 134217700

    #文件的滚动与 Event 数量无关
    a3.sinks.k3.hdfs.rollCount = 0

    # Use a channel which buffers events in memory
    a3.channels.c3.type = memory
    a3.channels.c3.capacity = 1000
    a3.channels.c3.transactionCapacity = 100

    # Bind the source and sink to the channel
    a3.sources.r3.channels = c3
    a3.sinks.k3.channel = c3

flume 目录下启动 agent 任务:

bin/flume-ng agent -c conf/ -n a3 -f job/flume-dir-hdfs.conf

注意不要有多余的空格或者不可见字符,启动失败就去 logs 文件夹看日志

任务启动后就往监控目录/flume/upload 文件夹里面放文件 ,放了 3 个不同的文件,其中 tmp 后缀的文件没有上传到 hdfs,因为在 conf 配置文件中把 tmp 后缀的排除了,其他两个上传完毕,并且文件后缀改成 COMPLETED: 进入 linux01:9870 查看 hdfs 文件目录, 确实上传成功了。

注意! 配置文件的 a3.sinks.k3.hdfs.path 指定了 linux01:8020,那么 flume 任务就得在 linux01 上启动,在 linux02 上启动不会生效。我的 linux01 是主机,linux02 和 03 是从机,就算在 linux02 上启动 flume 任务,把 a3.sinks.k3.hdfs.path 改成 linux02:8020 也不行,必须在 linux01 上启动。 注意!向/flume/upload 文件夹放的文件不能是以上传完成的后缀结尾,比如文件上传成功后缀是 COMPLETED,那么向里面放的文件后缀就不能是 COMPLETED。另外不能向 upload 里放文件名相同的文件,文件名相同的文件只有第一个会上传到 hdfs,之后的不会,因为 linux 同一目录不允许同名文件产生。

实时监控目录下的多个追加文件

案例 2 的 exec source 适用于监控一个实时追加的文件,不能断点续传,案例 3 的 spooldir source 适用于同步新文件,但不适用于实时监听同步追加日志的文件,而该案例的 Taildir Source 就适合于监听多个实时追加的文件,并能实现断点续传。 job 目录下新建 flume-dir-hdfs.conf 配置文件:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
    a3.sources = r3
    a3.sinks = k3
    a3.channels = c3

    # Describe/configure the source
    #定义source类型
    a3.sources.r3.type = TAILDIR
    a3.sources.r3.positionFile = /export/server/apache-flume-1.9.0-bin/tail_dir.json
    #注意!!这里我把软链接flume换成了本来的真实目录apache-flume-1.9.0-bin,原因后面讲

    #文件组
    a3.sources.r3.filegroups = f1 f2
    a3.sources.r3.filegroups.f1 = /export/server/flume/files/.*file.*
    a3.sources.r3.filegroups.f2 = /export/server/flume/files2/.*log.*

    # Describe the sink
    a3.sinks.k3.type = hdfs
    a3.sinks.k3.hdfs.path = hdfs://linux01:8020/flume/upload2/%Y%m%d/%H
    #上传文件的前缀
    a3.sinks.k3.hdfs.filePrefix = upload-
    #是否按照时间滚动文件夹
    a3.sinks.k3.hdfs.round = true
    #多少时间单位创建一个新的文件夹
    a3.sinks.k3.hdfs.roundValue = 1
    #重新定义时间单位
    a3.sinks.k3.hdfs.roundUnit = hour
    #是否使用本地时间戳
    a3.sinks.k3.hdfs.useLocalTimeStamp = true
    #积攒多少个 Event 才 flush 到 HDFS 一次
    a3.sinks.k3.hdfs.batchSize = 100
    #设置文件类型,可支持压缩
    a3.sinks.k3.hdfs.fileType = DataStream
    #多久生成一个新的文件
    a3.sinks.k3.hdfs.rollInterval = 60
    #设置每个文件的滚动大小大概是 128M
    a3.sinks.k3.hdfs.rollSize = 134217700
    #文件的滚动与 Event 数量无关
    a3.sinks.k3.hdfs.rollCount = 0
    # Use a channel which buffers events in memory
    a3.channels.c3.type = memory
    a3.channels.c3.capacity = 1000
    a3.channels.c3.transactionCapacity = 100
    # Bind the source and sink to the channel
    a3.sources.r3.channels = c3
    a3.sinks.k3.channel = c3

hdfs 文件中提前创建好 upload2 文件夹:

hdfs dfs -mkdir /flume/upload2 flume 文件夹中创建 files 和 files2 文件夹,分别在里面写 file1.txt 和 log1.log 用于追加内容让 flume 任务监控。

flume 文件下启动监控任务:

bin/flume-ng agent -c conf/ -n a3 -f job/flume-taildir-hdfs.conf

用 echo 命令向 file1.txt 和 log1.log 追加内容,追加的内容就会被 flume 检测到,filume 就会把追加的新内容上传到 hdfs 的 upload2 文件夹。 追加的内容被检测到,上传到 hdfs,案例成功! 注意!

配置文件中,之前是 a3.sources.r3.positionFile = /export/server/flume/tail_dir.json,此时启动 flume 任务能成功,但是追加的内容不会上传到 hdfs,也就是该案例没有成功。去 logs 文件中查看 flume.log 日志,发现有一段报错如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
    21 四月 2024 22:52:16,844 ERROR [conf-file-poller-0] (org.apache.flume.node.AbstractConfigurationProvider.loadSources:355)  - Source r3 has been removed due to an error during configuration
org.apache.flume.FlumeException: Error creating positionFile parent directories
	at org.apache.flume.source.taildir.TaildirSource.configure(TaildirSource.java:170)
	at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
	at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:325)
	at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:105)
	at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:145)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.nio.file.FileAlreadyExistsException: /export/server/flume
	at sun.nio.fs.UnixException.translateToIOException(UnixException.java:88)
	at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
	at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
	at sun.nio.fs.UnixFileSystemProvider.createDirectory(UnixFileSystemProvider.java:384)
	at java.nio.file.Files.createDirectory(Files.java:674)
	at java.nio.file.Files.createAndCheckIsDirectory(Files.java:781)
	at java.nio.file.Files.createDirectories(Files.java:727)
	at org.apache.flume.source.taildir.TaildirSource.configure(TaildirSource.java:168)
	... 11 more

给 chatgpt 看看:

大概意思是 positionfile 文件创建失败,原因是出现命名冲突。因为我的 flume 是个软链接,类似于快捷方式,但是写到配置文件里面,flume 程序就会把配置文件的 flume 当成真实目录,进而就会尝试创建名为 flume 的目录并且去进到创建的 flume 目录创建 r3,然而我已经存在了名为 flume 的软链接,程序就会创建 flume 目录失败,进而无法创建 r3。所以把配置文件的 flume 换成真实的 apache-flume-1.9.0-bin 目录就可以了,这样就可以生成 r3,也就是 positionfile = tail_dir.json 文件。当然另一种解决方法就是把 positionfile 的位置放到 flume 软链接外面。

tail_dir.json 文件内容如下:

1
2
3
4
5
{"inode":83899573,"pos":44,"file":"/export/server/flume/files/file1.txt"}
inode是文件的唯一标识,即使文件重命名也不会变,除非文件删除
pos表示读到哪里
file:监控文件的绝对路径
json文件靠inode和file两个值表示pos位置信息

注意! log4j 日志框架每天凌晨会自动把前一天的 hive.log 的文件改名,后缀加上日期,这点对我们监控空间极不友好,假如我们监控的是 hive.log,然而 hive.log 会自动更名 hive.log.2024-xx-xx,监测的文件名发生改变,而 inode 不变,然而 json 文件中记录的绝对路径仍然是 hive.log,此时的 hive.log 是新的文件,inode 变化,就无法实现断点续传。

解决方案:1)不使用 log4j 2)修改 flume 源码包 修改源码包的 TailFile 和 ReliableTaildirEventReader: 修改后重新打包生成 flume-taildir-source-1.9.0.jar,进入 flume/lib 目录下,把原来的 jar 包替换掉: 把原来的后缀改成 bak。

comments powered by Disqus
使用 Hugo 构建
主题 StackJimmy 设计