百科狗-知识改变命运!
--

flume 的source 、channel和sink 多种组合

梵高1年前 (2023-12-20)阅读数 7#综合百科
文章标签数据文件

flume 有三大组件source 、channel和sink,各个组件之间都可以相互组合使用,各组件间耦合度低。使用灵活,方便。

1.多sink

channel 的内容只输出一次,同一个event 如果sink1 输出,sink2 不输出;如果sink1 输出,sink1 不输出。 最终 sink1+sink2=channel 中的数据。

配置文件如下:

a1.sources=r1a1.sinks= k1 k2a1.channels= c1# Describe/configure the sourcea1.sources.r1.type= execa1.sources.r1.shell= /bin/bash -ca1.sources.r1.channels= c1a1.sources.r1.command= tail -F /opt/apps/logs/tail4.log# channela1.channels.c1.type= memorya1.channels.c1.capacity=1000a1.channels.c1.transactionCapacity=100#sink1a1.sinks.k1.channel= c1a1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSinka1.sinks.k1.kafka.topic= mytopica1.sinks.k1.kafka.bootstrap.servers= localhost:9092a1.sinks.k1.kafka.flumeBatchSize=20a1.sinks.k1.kafka.producer.acks=1a1.sinks.k1.kafka.producer.linger.ms=1a1.sinks.ki.kafka.producer.compression.type= snappy#sink2a1.sinks.k2.type= file_rolla1.sinks.k2.channel= c1#a1.sinks.k2.sink.rollInterval=0a1.sinks.k2.sink.directory= /opt/apps/tmp

2.多 channel 多sink ,每个sink 输出内容一致

(memory channel 用于kafka操作,实时性高,file channel 用于 sink file 数据安全性高)?

(多channel 单 sink 的情况没有举例,个人感觉用处不广泛。)

配置文件如下:

a1.sources=r1a1.sinks= k1 k2a1.channels= c1 c2# Describe/configure the sourcea1.sources.r1.type= execa1.sources.r1.shell= /bin/bash -ca1.sources.r1.channels= c1 c2a1.sources.r1.command= tail -F /opt/apps/logs/tail4.log#多个channel 的数据相同a1.sources.r1.selector.type=replicating# channel1a1.channels.c1.type= memorya1.channels.c1.capacity=1000a1.channels.c1.transactionCapacity=100#channel2a1.channels.c2.type= filea1.channels.c2.checkpointDir= /opt/apps/flume-1.7.0/checkpointa1.channels.c2.dataDirs= /opt/apps/flume-1.7.0/data#sink1a1.sinks.k1.channel= c1a1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSinka1.sinks.k1.kafka.topic= mytopica1.sinks.k1.kafka.bootstrap.servers= localhost:9092a1.sinks.k1.kafka.flumeBatchSize=20a1.sinks.k1.kafka.producer.acks=1a1.sinks.k1.kafka.producer.linger.ms=1a1.sinks.ki.kafka.producer.compression.type= snappy#sink2a1.sinks.k2.type= file_rolla1.sinks.k2.channel= c2#a1.sinks.k2.sink.rollInterval=0a1.sinks.k2.sink.directory= /opt/apps/tmp

3. 多source 单 channel 单 sink

多个source 可以读取多种信息放在一个channel 然后输出到同一个地方?

配置文件如下:

a1.sources=r1r2a1.sinks= k1a1.channels= c1# source1a1.sources.r1.type= execa1.sources.r1.shell= /bin/bash -ca1.sources.r1.channels= c1a1.sources.r1.command= tail -F /opt/apps/logs/tail4.log# source2a1.sources.r2.type= execa1.sources.r2.shell= /bin/bash -ca1.sources.r2.channels= c1a1.sources.r2.command= tail -F /opt/apps/logs/tail2.log# channel1? in memorya1.channels.c1.type= memorya1.channels.c1.capacity=1000a1.channels.c1.transactionCapacity=100#sink1a1.sinks.k1.channel= c1a1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSinka1.sinks.k1.kafka.topic= mytopica1.sinks.k1.kafka.bootstrap.servers= localhost:9092a1.sinks.k1.kafka.flumeBatchSize=20a1.sinks.k1.kafka.producer.acks=1a1.sinks.k1.kafka.producer.linger.ms=1a1.sinks.ki.kafka.producer.compression.type= snappy

flume 像乐高积木一样可以自己随心所欲将不同的组件进行搭配使用,耦合度低。

Source

rpc远程过程调用协议,客户机与服务机的调用模式需要对数据进行序列化。

1:客户机将参数序列化并以二进制形式通过网络传输到服务器。

2:服务器接收到后进行反序列化再调用方法获取返回值。

3:服务器将返回值序列化后再通过网络传输给客户机。

4:客户机接收到结果后再进行反序列化获取结果。

Avro source:

Avro就是一种序列化形式,avrosource监听一个端口只接收avro序列化后的数据,其他类型的不接收。

type:avrosource的类型,必须是avro。

bind:要监听的(本机的)主机名或者ip。此监听不是过滤发送方。一台电脑不是说只有一个IP。有多网卡的电脑,对应多个IP。

port:绑定的本地的端口。

Thrif source:

和avro一样是一种数据序列化形式,Thrifsource只采集thrift数据序列化后的数据

Exec source:

采集linux命令的返回结果传输给channel

type:source的类型:必须是exec。

command:要执行命令。

tail? –f? 若文件被删除即使重新创建同名文件也不会监听

tail? -F? 只要文件同名就可以继续监听

以上可以用在日志文件切割时的监听

JMS Source:

Java消息服务数据源,Java消息服务是一个与具体平台无关的API,这是支持jms规范的数据源采集;

Spooling Directory Source:通过文件夹里的新增的文件作为数据源的采集;

Kafka Source:从kafka服务中采集数据。

NetCat Source:绑定的端口(tcp、udp),将流经端口的每一个文本行数据作为Event输入

type:source的类型,必须是netcat。

bind:要监听的(本机的)主机名或者ip。此监听不是过滤发送方。一台电脑不是说只有一个IP。有多网卡的电脑,对应多个IP。

port:绑定的本地的端口。

HTTP Source:监听HTTP POST和 GET产生的数据的采集

Chanel

是一个数据存储池,中间通道,从source中接收数据再向sink目的地传输,如果sink写入失败会自动重写因此不会造成数据丢失。

Memory:用内存存储,但服务器宕机会丢失数据。

?Typechannel的类型:必须为memory

capacity:channel中的最大event数目

transactionCapacity:channel中允许事务的最大event数目

File:使用文件存储数据不会丢失数据但会耗费io。

?Typechannel的类型:必须为 file

checkpointDir :检查点的数据存储目录

dataDirs :数据的存储目录

transactionCapacity:channel中允许事务的最大event数目

flume 的source 、channel和sink 多种组合

SpillableMemory Channel:内存文件综合使用,先存入内存达到阀值后flush到文件中。

Typechannel的类型:必须为SPILLABLEMEMORY

memoryCapacity:内存的容量event数

overflowCapacity:数据存到文件的event阀值数

checkpointDir:检查点的数据存储目录

dataDirs:数据的存储目录

Jdbc:使用jdbc数据源来存储数据。

Kafka:使用kafka服务来存储数据。

Sink

各种类型的目的地,接收channel写入的数据并以指定的形式表现出来。Sink有很多种类型。

type:sink的类型 必须是hdfs。

hdfs.path:hdfs的上传路径。

hdfs.filePrefix:hdfs文件的前缀。默认是:FlumeData

hdfs.rollInterval:间隔多久产生新文件,默认是:30(秒) 0表示不以时间间隔为准。

hdfs.rollSize:文件到达多大再产生一个新文件,默认是:1024(bytes)0表示不以文件大小为准。

hdfs.rollCount:event达到多大再产生一个新文件,默认是:10(个)0表示不以event数目为准。

hdfs.batchSize:每次往hdfs里提交多少个event,默认为100

hdfs.fileType:hdfs文件的格式主要包括:SequenceFile,DataStream ,CompressedStream,如果使用了CompressedStream就要设置压缩方式。

hdfs.codeC:压缩方式:gzip,bzip2, lzo, lzop, snappy

注:%{host}可以使用header的key。以及%Y%m%d来表示时间,但关于时间的表示需要在header里有timestamp这个key。

Logger Sink将数据作为日志处理(根据flume中的设置的日志方式来显示)

要在控制台显示在运行agent的时候加入:-Dflume.root.logger=INFO,console。

type:sink的类型:必须是logger。

maxBytesToLog:打印body的最长的字节数 默认为16

Avro Sink:数据被转换成Avro Event,然后发送到指定的服务端口上。

?type:sink的类型:必须是 avro。

hostname:指定发送数据的主机名或者ip

port:指定发送数据的端口

实例

1:监听一个文件的增加变化,采集数据并在控制台打印。

在这个例子中我使用exec source,memory chanel,logger sink。可以看我的agent结构图

以下是我创建的exec_source.conf

a1.sources=r1

a1.channels=c1

a1.sinks=k1

a1.sources.r1.type=exec

a1.sources.r1.command=tail -F/usr/local/success.log

a1.channels.c1.type=memory

a1.channels.c1.capacity=1000

a1.channels.c1.transactioncapacity=100

a1.sinks.k1.type=logger

a1.sources.r1.channels=c1

a1.sinks.k1.channel=c1

执行命令:

bin/flume-ngagent --conf conf/ --conf-file conf/exec_source.conf --name a1-Dflume.root.logger=INFO,console &

然后更改/usr/local/success.log文件中的内容后可以看到flume采集到了文件的变化并在控制台上打印出来。文件初始内容hello和how are you,剩下的i am fine和ok为新增加内容。

2:监控一个文件变化并将其发送到另一个服务器上然后打印

这个例子可以建立在上一个例子之上,但是需要对flume的结构做一些修改,我使用avro序列化数据再发送到指定的服务器上。详情看结构图。

实际上flume可以进行多个节点关联,本例中我只使用131向139发送数据

131,139上都必须启动agent

服务器131配置

以下是我创建的exec_source_avro_sink.conf

a1.sources=r1

a1.channels=c1

a1.sinks=k1

a1.sources.r1.type=exec

a1.sources.r1.command=tail -F/usr/local/success.log

a1.channels.c1.type=memory

a1.channels.c1.capacity=1000

a1.channels.c1.transactioncapacity=100

a1.sinks.k1.type=avro

a1.sinks.k1.hostname=192.168.79.139

a1.sinks.k1.port=42424

a1.sources.r1.channels=c1

a1.sinks.k1.channel=c1

执行命令启动agent

bin/flume-ng agent --conf conf/ --conf-fileconf/exec_source_avro_sink.conf --name a1 -Dflume.root.logger=INFO,console&

139服务器配置

执行命令拷贝flume到139

scp -r apache-flume-1.7.0-bin/root@192.168.79.139:/usr/local/

修改exec_source_avro_sink.conf

a1.sources=r1

a1.channels=c1

a1.sinks=k1

a1.sources.r1.type=avro

a1.sources.r1.bind=0.0.0.0

a1.sources.r1.port=42424

a1.channels.c1.type=memory

a1.channels.c1.capacity=1000

a1.channels.c1.transactioncapacity=100

a1.sinks.k1.type=logger

a1.sources.r1.channels=c1

a1.sinks.k1.channel=c1

执行命令启动agent

bin/flume-ng agent --conf conf/ --conf-fileconf/exec_source_avro_sink.conf --name a1 -Dflume.root.logger=INFO,console&

结果可以在139控制台上看到131中修改success.log的变化信息

3:avro-client实例

执行bin/flume-ng会提示有命令如下

help?display this help text

agent run aFlume agent

avro-client run anavro Flume client

version show Flume version info

avro-clinet是avro客户端,可以把本地文件以avro序列化方式序列化后发送到指定的服务器端口。本例就是将131的一个文件一次性的发送到139中并打印。

Agent结构图如下

131启动的是一个avro-client,它会建立连接,发送数据,断开连接,它只是一个客户端。

启动一个avro客户端

bin/flume-ngavro-client --conf conf/ --host 192.168.79.139 --port 42424 --filename/usr/local/success.log --headerFile /usr/local/kv.log

--headerFile是用来区分是哪个服务器发送的数据,kv.log中的内容会被发送到139,可以作为标识来使用。

139的avro_client.conf如下

a1.sources=r1

a1.channels=c1

a1.sinks=k1

a1.sources.r1.type=avro

a1.sources.r1.bind=0.0.0.0

a1.sources.r1.port=42424

a1.channels.c1.type=memory

a1.channels.c1.capacity=1000

a1.channels.c1.transactioncapacity=100

a1.sinks.k1.type=logger

a1.sources.r1.channels=c1

a1.sinks.k1.channel=c1

启动agent

bin/flume-ngagent --conf conf/ --conf-file conf/avro_client.conf --name a1-Dflume.root.logger=INFO,console &

139控制台显示如下

可以看到headers的内容headers:{hostname=192.168.79.131}

注意:

1:Flume服务没有stop命令需要通过kill来杀掉进行,可以使用jps? -m来确认是那个agent的number

[root@shb01 conf]# jps -m

3610 Jps -m

3512 Application --conf-fileconf/exec_source.conf --name a1

2:修改flume的配置文件后如avro_client.conf,flume会自动重启

3:logger sink默认只显示16个字节

4:flume是以event为单位进行数据传输的,其中headers是一个map容器map

Event: { headers:{hostname=192.168.79.131}body: 31 61 1a }

5:flume支持多节点关联但是sink和source的类型要一致,比如avro-client发送数据那么接收方的source也必须是avro否则会警告。

1.大数据工程师工作中会做什么?

集群运维:安装、测试、运维各种大数据组件

数据开发:细分一点的话会有ETL工程师、数据仓库工程师等

数据系统开发:偏重Web系统开发,比如报表系统、推荐系统等

这里面有很多内容其实是十分重合的,下面大致聊一下每一块内容大致需要学什么,以及侧重点。

2.集群运维

数据工程师,基本上是离不开集群搭建,比如hadoop、Spark、Kafka,不要指望有专门的运维帮你搞定,新组件的引入一般都要自己来动手的。

因此这就要求数据工程师了解各种大数据的组件。

由于要自己的安装各种开源的组件,就要求数据工程师要具备的能力:Linux。要对Linux比较熟悉,能各种自己折腾着玩。

由于现在的大数据生态系统基本上是JVM系的,因此在语言上,就不要犹豫了,JVM系的Java和Scala基本上跑不掉,Java基本上要学的很深,Scala就看情况了。

3.ETL

ETL在大数据领域主要体现在各种数据流的处理。这一块一方面体现在对一些组件的了解上,比如Sqoop、Flume、Kafka、Spark、MapRece;另一方面就是编程语言的需要,Java、Shell和Sql是基本功。

4.系统开发

我们大部分的价值最后都会由系统来体现,比如报表系统和推荐系统。因此就要求有一定的系统开发能力,最常用的就是JavaWeb这一套了,当然Python也是挺方便的。

需要注意的是,一般数据开发跑不掉的就是各种提数据的需求,很多是临时和定制的需求,这种情况下,Sql就跑不掉了,老老实实学一下Sql很必要。

如何入门?

前面提到了一些数据工程师会用到的技能树,下面给一个入门的建议,完全个人意见。

1.了解行业情况

刚开始一定要了解清楚自己和行业的情况,很多人根本就分不清招聘信息中的大数据和数据挖掘的区别就说自己要转行,其实是很不负责的。不要总是赶热点,反正我就是经常被鄙视做什么大数据开发太Low,做数据就要做数据挖掘,不然永远都是水货。

2.选择学习途径

如果真是清楚自己明确地想转数据开发了,要考虑一下自己的时间和精力,能拿出来多少时间,而且在学习的时候最好有人能多指点下,不然太容易走弯路了。

在选择具体的学习途径时,要慎重一点,有几个选择:

自学

报班

找人指点

别的不说了,报班是可以考虑的,不要全指望报个辅导班就能带你上天,但是可以靠他帮你梳理思路。如果有专业从事这一行的人多帮帮的话,是最好的。不一定是技术好,主要是可沟通性强。

3.学习路线

学习路线,下面是一个大致的建议:

第一阶段

先具备一定的Linux和Java的基础,不一定要特别深,先能玩起来,Linux的话能自己执行各种操作,Java能写点小程序。这些事为搭建Hadoop环境做准备。

学习Hadoop,学会搭建单机版的Hadoop,然后是分布式的Hadoop,写一些MR的程序。

接着学学Hadoop生态系统的其它大数据组件,比如Spark、Hive、Hbase,尝试去搭建然后跑一些官网的Demo。

Linux、Java、各种组件都有一些基础后,要有一些项目方面的实践,这时候找一些成功案例,比如搜搜各种视频教程中如何搞一个推荐系统,把自己学到的用起来。

第二阶段

到这里是一个基本的阶段了,大致对数据开发有一些了解了。接着要有一些有意思内容可以选学。

数据仓库体系:如何搞数据分层,数据仓库体系该如何建设,可以有一些大致的了解。

用户画像和特征工程:这一部分越早了解越好。

一些系统的实现思路:比如调度系统、元数据系统、推荐系统这些系统如何实现。

第三阶段

下面要有一些细分的领域需要深入进行,看工作和兴趣来选择一些来深入进行

分布式理论:比如Gossip、DHT、Paxo这些构成了各种分布式系统的底层协议和算法,还是要学一下的。

数据挖掘算法:算法是要学的,但是不一定纯理论,在分布式环境中实现算法,本身就是一个大的挑战。

各种系统的源码学习:比如Hadoop、Spark、Kafka的源码,想深入搞大数据,源码跑不掉。

鹏仔微信 15129739599 鹏仔QQ344225443 鹏仔前端 pjxi.com 共享博客 sharedbk.com

免责声明:我们致力于保护作者版权,注重分享,当前被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理!邮箱:344225443@qq.com)

图片声明:本站部分配图来自网络。本站只作为美观性配图使用,无任何非法侵犯第三方意图,一切解释权归图片著作权方,本站不承担任何责任。如有恶意碰瓷者,必当奉陪到底严惩不贷!

内容声明:本文中引用的各种信息及资料(包括但不限于文字、数据、图表及超链接等)均来源于该信息及资料的相关主体(包括但不限于公司、媒体、协会等机构)的官方网站或公开发表的信息。部分内容参考包括:(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供参考使用,不准确地方联系删除处理!本站为非盈利性质站点,本着为中国教育事业出一份力,发布内容不收取任何费用也不接任何广告!)