Flume入门-程序员宅基地

技术标签: flume  Flume  大数据  

概述

Flume 是 Cloudera 提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传 输的系统。Flume 基于流式架构,灵活简单。

基础架构

Flume运行的核心是 Agent。Flume是以agent为最小的独立运行单位。一个agent就是一个JVM。它是 一个完整的数据收集工具,含有三个核心组件,分别是source、 channel、 sink。通过这些组件, Event 可以从一个地方流向另一个地方。如下图所示:

Agent

  • Agent 是一个 JVM 进程,它以事件的形式将数据从源头送至目的。
  • Agent 主要有 3 个部分组成,Source、Channel、Sink。同一台服务器可以运行多个Agent,每个Agent可以有多个source、sink、channel。Agent的名字可以相同但是不能同时启动任务,否则会出现冲突。

Source

  • Source 是负责接收数据到 Flume Agent 并传给Channel的组件。
  • Source 组件可以处理各种类型、各种格式的日志数据,包括 avro、thrift、exec、jms、spooling directory、netcat、taildir、 sequence generator、syslog、http、legacy这些不同的数据源。

Sink

  • Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储系统或索引系统、或者被发送到另一个 Flume Agent。
  • Sink 组件目的地包括 hdfs、logger、avro、thrift、ipc、file、HBase、solr、自定义。

Channel

  • Channel 是位于 Source 和 Sink 之间的缓冲区。因此,Channel 允许 Source 和 Sink 运作在不同的速率上。
  • Channel 是线程安全的,可以同时处理几个 Source 的写入操作和几个 Sink 的读取操作。
  • Flume 自带两种 Channel:Memory Channel 和 File Channel。
  • Memory Channel 是内存中的队列。Memory Channel 在不需要关心数据丢失的情景下适 用。如果需要关心数据丢失,那么 Memory Channel 就不应该使用,因为程序死亡、机器宕 机或者重启都会导致数据丢失。
  • File Channel 将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数 据。

selector 

选择器,作用于source端,然后决定数据发往哪个目标。

interceptor

拦截器,flume允许使用拦截器拦截数据。允许使用拦截器链,作用于source和sink阶段。

Event

  • 传输单元,Flume 数据传输的基本单元,以 Event 的形式将数据从源头送至目的地。
  • Event 由 Header 和 Body 两部分组成,Header 用来存放该 event 的一些属性,为 K-V 结构, Body 用来存放该条数据,形式为字节数组。

安装部署

解压

tar -zxvf /export/server/apache-flume-1.9.0-bin.tar.gz   /export/server/

为了让flume1.9兼容hadoop3.x,要删除flume lib包下的guava-11.0.2.jar

rm guava-11.0.2.jar

Netcat

安装

sudo yum install -y nc

简单案例 

Flume入门案例

1)netcat本机端口监控

在flume文件夹下创建工作目录job

mkdir job

在job目录下建立任务配置文件,文件名任取,建议见名知意,net表示数据源是端口,logger表示数据是日志文件

vim net-flume-logger.conf

配置文件内容如下:

# Name the components on this agent
a1.sources = r1 #a1是该agent名,不可重复
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 4444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000 #最多接收1000个event
a1.channels.c1.transactionCapacity = 100 #100个事务,一次最多发送100个event,事务失败会回滚。capacity应该<transactionCapacity

# Bind the source and sink to the channel
a1.sources.r1.channels = c1 #一个source可以绑定多个channel
a1.sinks.k1.channel = c1 #一个sink只能绑定一个channel

启动两个终端,一个终端启动监听任务:

在flume目录下运行:

flume-ng agent -c conf/ -n a1 -f job/flume-netcat-logger.conf -D flume.root.logger=INFO,console

参数说明:

--conf/-c:表示配置文件存储在 conf/目录

--name/-n:表示给 agent 起名为 a1

--conf-file/-f:flume 本次启动读取的配置文件是在 job 文件夹下的 flume-telnet.conf 文件。

-Dflume.root.logger=INFO,console :-D 表示 flume 运行时动态修改 flume.root.logger 参数属性值,并将控制台日志打印级别设置为 INFO 级别。日志级别包括:log、info、warn、 error。

 

另一个终端使用netcat向监听的端口发送内容:

nc localhost 4444

检查启动任务的端口是否收到。

2)监控hive日志上传hdfs

 在job目录下新建任务的配置文件flume-file-hdfs.conf,内容如下:

# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2

# Describe/configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /export/server/hive/logs/metastore.log
#这里我监控的是hive的元数据日志

# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://linux01:8020/flume/%Y%m%d/%H
#8020端口不要搞错,具体查看hadoop的core-site.xml

#上传文件的前缀
a2.sinks.k2.hdfs.filePrefix = logs-
#是否按照时间滚动文件夹
a2.sinks.k2.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k2.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#注意:对于所有与时间相关的转义序列,Event Header 中必须存在以 “timestamp”的
key(除非 hdfs.useLocalTimeStamp 设置为 true,此方法会使用 TimestampInterceptor 自
动添加 timestamp)。

#积攒多少个 Event 才 flush 到 HDFS 一次
a2.sinks.k2.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k2.hdfs.rollInterval = 60
#设置每个文件的滚动大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a2.sinks.k2.hdfs.rollCount = 0

# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

先在flume文件夹下启动flume的监听任务:

bin/flume-ng agent -c conf/ -n a2 -f job/flume-file-hdfs.conf -D flume.root.logger=INFO,console

启动hdfs和hive的元数据服务

start-dfs.sh 

start-hivemetastore.sh(自己写的脚本)

启动hive开始操作

hive

会产生元数据记录在metastore.log中,然后就会被flume监听到,flume就会把监听到的日志写到hdfs的flume文件夹中。

浏览器打开linux01:9870查看hdfs的文件目录,发现新建了flume文件夹,表示操作成功。 

 

注意! 监听的metastore.log一定要是有效的,如果无效那么hive的日志就不会写到里面,flume就检测不到,具体去看hive的日志配置教程。另外启动的agent的任务名字和配置文件不要搞错了,是a2和flume-file-hdfs.conf。

3)实时读取目录文件到hdfs

job目录下编写flume-dir-hdfs.conf配置文件:

a3.sources = r3
a3.sinks = k3
a3.channels = c3

# Describe/configure the source
# source类型是目录
a3.sources.r3.type = spooldir
#定义监控目录
a3.sources.r3.spoolDir = /export/server/flume/upload
#定义文件上传完后缀
a3.sources.r3.fileSuffix = .COMPLETED
#是否有文件头
a3.sources.r3.fileHeader = true
#忽略所有以.tmp 结尾的文件,不上传
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)

# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://linux01:8020/flume/upload/%Y%m%d/%H
#hdfs的upload文件夹要提前手动创建好,flume不会自己创建,否则会报错。

#上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-

#是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true

#多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1

#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour

#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true

#积攒多少个 Event 才 flush 到 HDFS 一次
a3.sinks.k3.hdfs.batchSize = 100

#设置文件类型,可支持压缩
a3.sinks.k3.hdfs.fileType = DataStream

#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 60

#设置每个文件的滚动大小大概是 128M
a3.sinks.k3.hdfs.rollSize = 134217700

#文件的滚动与 Event 数量无关
a3.sinks.k3.hdfs.rollCount = 0

# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

flume目录下启动agent任务:

bin/flume-ng agent -c conf/ -n a3 -f job/flume-dir-hdfs.conf
注意不要有多余的空格或者不可见字符,启动失败就去logs文件夹看日志

任务启动后就往监控目录/flume/upload文件夹里面放文件 ,放了3个不同的文件,其中tmp后缀的文件没有上传到hdfs,因为在conf配置文件中把tmp后缀的排除了,其他两个上传完毕,并且文件后缀改成COMPLETED:

进入linux01:9870查看hdfs文件目录, 确实上传成功了。

注意! 配置文件的a3.sinks.k3.hdfs.path 指定了linux01:8020,那么flume任务就得在linux01上启动,在linux02上启动不会生效。我的linux01是主机,linux02和03是从机,就算在linux02上启动flume任务,把a3.sinks.k3.hdfs.path 改成linux02:8020也不行,必须在linux01上启动。

注意!向/flume/upload文件夹放的文件不能是以上传完成的后缀结尾,比如文件上传成功后缀是COMPLETED,那么向里面放的文件后缀就不能是COMPLETED。另外不能向upload里放文件名相同的文件,文件名相同的文件只有第一个会上传到hdfs,之后的不会,因为linux同一目录不允许同名文件产生。

4)实时监控目录下的多个追加文件

案例2 的 exec source适用于监控一个实时追加的文件,不能断点续传,案例3的spooldir source适用于同步新文件,但不适用于实时监听同步追加日志的文件,而该案例的Taildir Source就适合于监听多个实时追加的文件,并能实现断点续传。

job目录下新建flume-dir-hdfs.conf配置文件:

a3.sources = r3
a3.sinks = k3
a3.channels = c3

# Describe/configure the source
#定义source类型
a3.sources.r3.type = TAILDIR
a3.sources.r3.positionFile = /export/server/apache-flume-1.9.0-bin/tail_dir.json
#注意!!这里我把软链接flume换成了本来的真实目录apache-flume-1.9.0-bin,原因后面讲

#文件组
a3.sources.r3.filegroups = f1 f2
a3.sources.r3.filegroups.f1 = /export/server/flume/files/.*file.*
a3.sources.r3.filegroups.f2 = /export/server/flume/files2/.*log.*

# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://linux01:8020/flume/upload2/%Y%m%d/%H
#上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a3.sinks.k3.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是 128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a3.sinks.k3.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

hdfs文件中提前创建好upload2文件夹:

hdfs dfs -mkdir /flume/upload2

flume文件夹中创建files和files2文件夹,分别在里面写file1.txt和log1.log用于追加内容让flume任务监控。

flume文件下启动监控任务:

bin/flume-ng agent -c conf/ -n a3 -f job/flume-taildir-hdfs.conf

用echo命令向file1.txt和log1.log追加内容,追加的内容就会被flume检测到,filume就会把追加的新内容上传到hdfs的upload2文件夹。

追加的内容被检测到,上传到hdfs,案例成功! 

注意! 

配置文件中,之前是a3.sources.r3.positionFile = /export/server/flume/tail_dir.json,此时启动flume任务能成功,但是追加的内容不会上传到hdfs,也就是该案例没有成功。去logs文件中查看flume.log日志,发现有一段报错如下:

21 四月 2024 22:52:16,844 ERROR [conf-file-poller-0] (org.apache.flume.node.AbstractConfigurationProvider.loadSources:355)  - Source r3 has been removed due to an error during configuration
org.apache.flume.FlumeException: Error creating positionFile parent directories
	at org.apache.flume.source.taildir.TaildirSource.configure(TaildirSource.java:170)
	at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
	at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:325)
	at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:105)
	at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:145)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.nio.file.FileAlreadyExistsException: /export/server/flume
	at sun.nio.fs.UnixException.translateToIOException(UnixException.java:88)
	at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
	at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
	at sun.nio.fs.UnixFileSystemProvider.createDirectory(UnixFileSystemProvider.java:384)
	at java.nio.file.Files.createDirectory(Files.java:674)
	at java.nio.file.Files.createAndCheckIsDirectory(Files.java:781)
	at java.nio.file.Files.createDirectories(Files.java:727)
	at org.apache.flume.source.taildir.TaildirSource.configure(TaildirSource.java:168)
	... 11 more

给chatgpt看看:

大概意思是positionfile文件创建失败,原因是出现命名冲突。因为我的flume是个软链接,类似于快捷方式,但是写到配置文件里面,flume程序就会把配置文件的flume当成真实目录,进而就会尝试创建名为flume的目录并且去进到创建的flume目录创建r3,然而我已经存在了名为flume的软链接,程序就会创建flume目录失败,进而无法创建r3。所以把配置文件的flume换成真实的apache-flume-1.9.0-bin目录就可以了,这样就可以生成r3,也就是positionfile = tail_dir.json文件。当然另一种解决方法就是把positionfile的位置放到flume软链接外面。

tail_dir.json文件内容如下:

{"inode":83899573,"pos":44,"file":"/export/server/flume/files/file1.txt"}

inode是文件的唯一标识,即使文件重命名也不会变,除非文件删除

pos表示读到哪里

file:监控文件的绝对路径

json文件靠inode和file两个值表示pos位置信息。

注意! log4j日志框架每天凌晨会自动把前一天的hive.log的文件改名,后缀加上日期,这点对我们监控空间极不友好,假如我们监控的是hive.log,然而hive.log会自动更名hive.log.2024-xx-xx,监测的文件名发生改变,而inode不变,然而json文件中记录的绝对路径仍然是hive.log,此时的hive.log是新的文件,inode变化,就无法实现断点续传。

解决方案:1)不使用log4j         2)修改flume源码包

修改源码包的TailFile和ReliableTaildirEventReader:

 

 修改后重新打包生成flume-taildir-source-1.9.0.jar,进入flume/lib目录下,把原来的jar包替换掉:

把原来的后缀改成bak。

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/qq_73181349/article/details/138003460

智能推荐

【笔记】strftime的使用方法-程序员宅基地

文章浏览阅读5.1k次。strftimestrftime是C语言标准库中用来格式化输出时间的的函数。下面是strftime的用法各参数意义代码使用示例#include<stdio.h>#include<time.h>#define print(s1, s2,s3) \ printf("%-20s%-30s%s\n",s1, s2,s3);int main(){ time_t rawtime; struct tm* timeinfo; char timE[80]; /

2018.09.12 poj3621Sightseeing Cows(01分数规划+spfa判环)-程序员宅基地

文章浏览阅读147次。传送门 01分数规划板题啊。 发现就是一个最优比率环。 这个直接二分+spfa判负环就行了。 代码:#include&lt;iostream&gt;#include&lt;cstdio&gt;#include&lt;cstring&gt;#include&lt;algorithm&gt;#include&lt;cmath&gt;#define N 1005#define...

hive sql的常用日期处理函数总结_hive sql 日期函数-程序员宅基地

文章浏览阅读3.1k次,点赞2次,收藏14次。1)date_format函数(根据格式整理日期)  作用:把一个字符串日期格式化为指定的格式。select date_format('2017-01-01','yyyy-MM-dd HH:mm:ss'); --日期字符串必须满足yyyy-MM-dd格式   结果:2017-01-01 00:00:002)date_add、date_sub函数(加减日期)  作用:把一个字符串日期格式加一天、减一天。select date_add('2019-01-01',1); ..._hive sql 日期函数

Android Studio使用百度语音合成是TTS时报错: ****.so文件找不到的有关问题_旧版的百度语言合成报错-程序员宅基地

文章浏览阅读2.1k次。使用百度语音合成过程时,一直error : notfint libgnustl_shared.so在项目工程gradle文件中添加如下代码段:sourceSets { main { jniLibs.srcDirs = ['libs'] } }..._旧版的百度语言合成报错

BZOJ1202: [HNOI2005]狡猾的商人_狡猾的商人[hnoi2005]-程序员宅基地

文章浏览阅读425次。Description 刁姹接到一个任务,为税务部门调查一位商人的账本,看看账本是不是伪造的。账本上记录了n个月以来的收入情况,其中第i个月的收入额为Ai(i=1,2,3…n-1,n), 。当 Ai大于0时表示这个月盈利Ai 元,当 Ai小于0时表示这个月亏损Ai元。所谓一段时间内的总收入,就是这段时间内每个月的收入额的总和。 刁姹的任务是秘密进行的,为了调查商人的账本,她只好跑到商人那_狡猾的商人[hnoi2005]

HTML5 Web SQL 数据库_方式准则的定义-程序员宅基地

文章浏览阅读1k次。1、HTML5 Web SQL 数据库 Web SQL 数据库 API 并不是 HTML5 规范的一部分,但是它是一个独立的规范,引入了一组使用 SQL 操作客户端数据库的 APIs。如果你是一个 Web 后端程序员,应该很容易理解 SQL 的操作。Web SQL 数据库可以在最新版的 Safari, Chrome 和 Opera 浏览器中工作。2、核心方法 以下是规范中定义的三个_方式准则的定义

随便推点

更改vscode Java项目的.class文件输出路径_vscode怎么class文件-程序员宅基地

文章浏览阅读6.7k次,点赞17次,收藏21次。1.在vscode里面按下快捷键ctrl+shift+p2.输入Classpath3.点击Output下的Browse选择.class文件的输出路径4.如图,选择完以后,.class文件的输出层级目录会自动建立_vscode怎么class文件

Python缩进规则-程序员宅基地

文章浏览阅读1.2w次,点赞4次,收藏24次。python的缩进规则:对于类定义、函数定义、流程控制语句、异常处理语句等,行尾的冒号和下一行的缩进,表示下一个代码块的开始,而缩进的结束则表示此代码块的结束。通常情况下都是采用4个空格长度作为一个缩进量(一个Tab键就表示4个空格)。一,Python缩进长度及缩进字符。 看到网上一些Python缩进的错误示范,“tab符和空格不能混用”,“缩进一定是4个空格”下列演示。​def change(a): print(id(a)) # 指向的是同一个对象(tab缩进) a=10_python缩进规则

微信小程序api视频课程-定时器-setTimeout的使用_微信小程序 settimeout 向上层传值-程序员宅基地

文章浏览阅读1.1k次。JS代码 /** * 生命周期函数--监听页面加载 */ onLoad: function (options) { setTimeout( function(){ wx.showToast({ title: '黄菊华老师', }) },2000 ) },说明该代码只执行一次..._微信小程序 settimeout 向上层传值

uploadify2.1.4如何能使按钮显示中文-程序员宅基地

文章浏览阅读48次。uploadify2.1.4如何能使按钮显示中文博客分类:uploadify网上关于这段话的搜索恐怕是太多了。方法多也试过了不知怎么,反正不行。最终自己想办法给解决了。当然首先还是要有fla源码。直接去管网就可以下载。[url]http://www.uploadify.com/wp-content/uploads/uploadify-v2.1.4...

戴尔服务器安装VMware ESXI6.7.0教程(U盘安装)_vmware-vcsa-all-6.7.0-8169922.iso-程序员宅基地

文章浏览阅读9.6k次,点赞5次,收藏36次。戴尔服务器安装VMware ESXI6.7.0教程(U盘安装)一、前期准备1、下载镜像下载esxi6.7镜像:VMware-VMvisor-Installer-6.7.0-8169922.x86_64.iso这里推荐到戴尔官网下载,Baidu搜索“戴尔驱动下载”,选择进入官网,根据提示输入服务器型号搜索适用于该型号服务器的所有驱动下一步选择具体类型的驱动选择一项下载即可待下载完成后打开软碟通(UItraISO),在“文件”选项中打开刚才下载好的镜像文件然后选择启动_vmware-vcsa-all-6.7.0-8169922.iso

百度语音技术永久免费的语音自动转字幕介绍 -程序员宅基地

文章浏览阅读2k次。百度语音技术永久免费的语音自动转字幕介绍基于百度语音技术,识别率97%无时长限制,无文件大小限制永久免费,简单,易用,速度快支持中文,英文,粤语永久免费的语音转字幕网站: http://thinktothings.com视频介绍 https://www.bilibili.com/video/av42750807 ...

推荐文章

热门文章

相关标签