Fluentd日志采集使用教程

fluentd是何方神圣

fluentd是一個(gè)實(shí)時(shí)的數(shù)據(jù)收集系統(tǒng),不僅可以收集日志,還可以收集定期執(zhí)行的命令輸出和HTTP請(qǐng)求內(nèi)容。數(shù)據(jù)被收集后按照用戶配置的解析規(guī)則,形成一系列event。每一個(gè)event包含如下內(nèi)容:

tag = xxx
time = xxx
record = {
    "key1": "value1",
    "key2": "value2"
}

其中:

  • tag:為數(shù)據(jù)流的標(biāo)記。fluentd中可以具有多個(gè)數(shù)據(jù)源,解析器,過濾器和數(shù)據(jù)輸出。他們之前使用tag來對(duì)應(yīng)。類似于數(shù)據(jù)流按照tag分組。數(shù)據(jù)流向下游的時(shí)候只會(huì)進(jìn)入tag相匹配的處理器。
  • time:event產(chǎn)生的時(shí)間,該字段通常由日志內(nèi)的時(shí)間字段解析出來。
  • record:日志的內(nèi)容,為JSON格式。

fluentd支持多種數(shù)據(jù)的解析過濾和輸出操作。其中常用的有:

  • tail輸入:增量讀取日志文件作為數(shù)據(jù)源,支持日志滾動(dòng)。
  • exec輸入:定時(shí)執(zhí)行命令,獲取輸出解析后作為數(shù)據(jù)源。
  • syslog輸出:解析標(biāo)準(zhǔn)的syslog日志作為輸入。
  • forward輸入:接收其他fluentd轉(zhuǎn)發(fā)來的數(shù)據(jù)作為數(shù)據(jù)源。
  • dummy:虛擬數(shù)據(jù)源,可以定時(shí)產(chǎn)生假數(shù)據(jù),用于測(cè)試。
  • regexp解析器:使用正則表達(dá)式命名分組的方式提取出日志內(nèi)容為JSON字段。
  • record_transformer過濾器:人為修改record內(nèi)的字段。
  • file輸出:用于將event落地為日志文件。
  • stdout:將event輸出到stdout。如果fluentd以daemon方式運(yùn)行,輸出到fluentd的運(yùn)行日志中。
  • forward:轉(zhuǎn)發(fā)event到其他fluentd節(jié)點(diǎn)。
  • copy:多路輸出,復(fù)制event到多個(gè)輸出端。
  • kafka:輸出event到Kafka。
  • webhdfs:輸出event到HDFS。
  • elasticsearch:輸出event到HDFS。

接下來以官網(wǎng)介紹為基礎(chǔ),穿插自己的理解,介紹下fluentd的使用方法。

安裝啟動(dòng)方法

官網(wǎng)安裝步驟鏈接:https://docs.fluentd.org/installation/install-by-rpm

下面是精簡(jiǎn)的在CentOS下的安裝步驟。打開shell,執(zhí)行如下命令:

curl -L https://toolbelt.treasuredata.com/sh/install-redhat-td-agent3.sh | sh

systemctl start td-agent

可以安裝并啟動(dòng)fluentd。

配置文件位置

編輯fluentd配置文件的方法:

vim /etc/td-agent/td-agent.conf

修改運(yùn)行用戶和組

默認(rèn)來說fluentd使用td-agent用戶啟動(dòng)。如果需要修改fluentd的用戶,需要執(zhí)行:

vim /usr/lib/systemd/system/td-agent.service

文件內(nèi)容如下所示:

[Unit]
Description=td-agent: Fluentd based data collector for Treasure Data
Documentation=https://docs.treasuredata.com/articles/td-agent
After=network-online.target
Wants=network-online.target

[Service]
User=td-agent
Group=td-agent
LimitNOFILE=65536
Environment=LD_PRELOAD=/opt/td-agent/embedded/lib/libjemalloc.so
Environment=GEM_HOME=/opt/td-agent/embedded/lib/ruby/gems/2.4.0/
Environment=GEM_PATH=/opt/td-agent/embedded/lib/ruby/gems/2.4.0/
Environment=FLUENT_CONF=/etc/td-agent/td-agent.conf
Environment=FLUENT_PLUGIN=/etc/td-agent/plugin
Environment=FLUENT_SOCKET=/var/run/td-agent/td-agent.sock
Environment=TD_AGENT_LOG_FILE=/var/log/td-agent/td-agent.log
Environment=TD_AGENT_OPTIONS=
EnvironmentFile=-/etc/sysconfig/td-agent
PIDFile=/var/run/td-agent/td-agent.pid
RuntimeDirectory=td-agent
Type=forking
ExecStart=/opt/td-agent/embedded/bin/fluentd --log $TD_AGENT_LOG_FILE --daemon /var/run/td-agent/td-agent.pid $TD_AGENT_OPTIONS
ExecStop=/bin/kill -TERM ${MAINPID}
ExecReload=/bin/kill -HUP ${MAINPID}
Restart=always
TimeoutStopSec=120

[Install]
WantedBy=multi-user.target

修改Service部分UserGroup配置項(xiàng)可以更改fluentd進(jìn)程的用戶和組。

檢測(cè)配置文件是否正確的方法

在shell中運(yùn)行:

/opt/td-agent/embedded/bin/fluentd -c /etc/td-agent/td-agent.conf

觀察輸出,如果有錯(cuò)誤會(huì)給出對(duì)應(yīng)提示。

數(shù)據(jù)流邏輯

fluentd以tag值為基準(zhǔn),決定數(shù)據(jù)的流經(jīng)哪些處理器。

數(shù)據(jù)的流向?yàn)椋簊ource -> parser -> filter -> output

input配置

tail

增量讀取日志文件。需要提供一個(gè)用于標(biāo)記已經(jīng)讀取到位置的文件(position file)所在的路徑。

tail針對(duì)日志滾動(dòng)的支持:
tail方式采用跟蹤文件inode的方式進(jìn)行。比如日志名為app.log,如果日志發(fā)生滾動(dòng),被重命名為app.log.1。文件重命名的時(shí)候inode是不會(huì)改變的。因此發(fā)生滾動(dòng)時(shí)寫入到舊文件末尾的日志也可以被收集到。tail會(huì)跟蹤舊文件的inode一段時(shí)間(rotate_wait配置),這段時(shí)間過去之后,tail不再監(jiān)聽app.log.1,開始監(jiān)聽新的app.log文件。

tail方式的示例配置:

<source>
  @type tail
  path /var/log/httpd-access.log
  pos_file /var/log/td-agent/httpd-access.log.pos
  tag apache.access
  <parse>
    @type apache2
  </parse>
</source>

注意:如果文件發(fā)生修改會(huì)輸出全量文件內(nèi)容。

配置項(xiàng)解釋

tag:數(shù)據(jù)源的tag值。*號(hào)可以擴(kuò)展為path(/替換為.)。例如

path /path/to/file
tag foo.*

tag會(huì)被擴(kuò)展為foo.path.to.file

path:配置讀取的路徑。可以使用*或者是strftime。例如:

path /path/to/%Y/%m/%d/*

如果今天是2020年1月2日,fluentd會(huì)讀取/path/to/2020/01/02目錄下的內(nèi)容。
也可以配置多個(gè)路徑,使用逗號(hào)分隔:

path /path/to/a/*,/path/to/b/c.log

exclude_path:排除部分目錄或文件,使用數(shù)組格式配置。

path /path/to/*
exclude_path ["/path/to/*.gz", "/path/to/*.zip"]

refresh_interval:多長(zhǎng)時(shí)間刷新一次文件監(jiān)聽列表,配合*使用才有意義。

pos_file:位置文件地址。這個(gè)文件保存了監(jiān)聽的日志文件已經(jīng)讀取到第幾行。該項(xiàng)一定要配置。
注意,不要再多個(gè)source之間共用pos file,否則會(huì)出現(xiàn)問題。
pos_file_compaction_interval:pos file文件壓縮時(shí)間間隔。用于壓縮pos file中不再監(jiān)聽的記錄,不可解析的記錄以及重復(fù)的記錄。

parse標(biāo)簽:用于指定log的解析器(必須的配置項(xiàng))。
例如:

# json
<parse>
  @type json
</parse>

# regexp
<parse>
  @type regexp
  expression ^(?<name>[^ ]*) (?<user>[^ ]*) (?<age>\d*)$
</parse>

path_key:如果配置此項(xiàng),監(jiān)控文件的path會(huì)在event中,此項(xiàng)的key為path_key
例如:

path /path/to/access.log
path_key tailed_path

生成的數(shù)據(jù)如下所示:

{"tailed_path":"/path/to/access.log","k1":"v1",...,"kN":"vN"}

rotate_wait:日志發(fā)生滾動(dòng)的時(shí)候,可能會(huì)有部分日志仍然輸出在舊的日志文件,此時(shí)需要保持監(jiān)聽舊日志文件一段時(shí)間,這個(gè)時(shí)間配置就是rotate_wait。

exec

周期性執(zhí)行命令,抽取命令輸出為event。

示例配置:

<source>
  @type exec
  command cmd arg arg
  <parse>
    keys k1,k2,k3
  </parse>
  <extract>
    tag_key k1
    time_key k2
    time_format %Y-%m-%d %H:%M:%S
  </extract>
  run_interval 10s
</source>

以上命令的含義為每10秒鐘執(zhí)行cmd arg arg命令,提取命令執(zhí)行結(jié)果,以空白字符分隔三個(gè)字段的值為k1,k2,k3。其中k1的值作為tag,k2作為時(shí)間字段,使用%Y-%m-%d %H:%M:%S格式。

一個(gè)例子,周期獲取系統(tǒng)的平均負(fù)載。配置方法如下:

<source>
  @type exec
  tag system.loadavg
  command cat /proc/loadavg | cut -d ' ' -f 1,2,3
  run_interval 1m
  <parse>
    @type tsv
    keys avg1,avg5,avg15
    delimiter " "
  </parse>
</source>

輸出的日志格式為:

2018-06-29 17:27:35.115878527 +0900 system.loadavg: {"avg1":"0.30","avg5":"0.20","avg15":"0.05"}

syslog

連接rsyslog??梢宰鳛閞syslog的接收端。

一個(gè)配置的例子:

<source>
  @type syslog
  port 5140
  bind 0.0.0.0
  tag system
</source>

fluentd打開5140端口監(jiān)聽rsyslog發(fā)來的log。

rsyslog配置文件/etc/rsyslog.conf設(shè)置為:

# Send log messages to Fluentd
*.* @127.0.0.1:5140

fluentd解析到的event格式如下:

tag = "#{@tag}.#{facility}.#{priority}"
time = 1353436518,
record = {
  "host": "host",
  "ident": "ident",
  "pid": "12345",
  "message": "text"
}

dummy

專用于測(cè)試的數(shù)據(jù)源。周期產(chǎn)生假數(shù)據(jù)。

配置舉例:

<source>
  @type dummy
  dummy {"hello":"world"}
</source>

dummy常用參數(shù):

  • tag: 標(biāo)記值
  • size:每次發(fā)送的event數(shù)量
  • rate:每秒產(chǎn)生多少個(gè)event
  • auto_increment_key:自增鍵名。如果配置了此項(xiàng),會(huì)有一個(gè)key為該配置項(xiàng)值的自增鍵
  • suspend:重啟后自增值是否重新開始
  • dummy:測(cè)試數(shù)據(jù)內(nèi)容

forward

用于接收其他fluentd forward過來的event。

示例配置:

<source>
  @type forward
  port 24224
  bind 0.0.0.0
</source>

output配置

file

輸出event為文件。默認(rèn)每天輸出一個(gè)日志文件。

示例配置:

<match pattern>
  @type file
  path /var/log/fluent/myapp
  compress gzip
  <buffer>
    timekey 1d
    timekey_use_utc true
    timekey_wait 10m
  </buffer>
</match>

包含的參數(shù)類型:

  • path:path支持placeholder,可以在日志路徑中嵌入時(shí)間,tag和record中的字段值。例如:
path /path/to/${tag}/${key1}/file.%Y%m%d
<buffer tag,time,key1>
  # buffer parameters
</buffer>

注意:buffer標(biāo)簽后面的內(nèi)容為buffer chunk key。Buffer根據(jù)這些key分段。

  • append:flush的chuck是否追加到已存在的文件后。默認(rèn)為false,便于文件的并行處理。
  • format標(biāo)簽,用來規(guī)定文件內(nèi)容的格式,默認(rèn)值為out_file。
  • inject標(biāo)簽,用來為event增加time和tag等字段。
  • add_path_suffix:是否增加path后綴
  • path_suffix:path后綴內(nèi)容,默認(rèn)為.log。
  • compress:采用什么壓縮格式,默認(rèn)不壓縮。
  • recompress:是否在buffer chunk已經(jīng)壓縮的情況再次壓縮,默認(rèn)為false。

forward

將event轉(zhuǎn)發(fā)到其他的fluentd節(jié)點(diǎn)。如果配置了多個(gè)fluentd節(jié)點(diǎn),會(huì)使用負(fù)載均衡和支持容錯(cuò)的方式發(fā)送。如果需要發(fā)送多份數(shù)據(jù),需要使用copy。

配置示例:

<match pattern>
  @type forward
  send_timeout 60s
  recover_wait 10s
  hard_timeout 60s

  <server>
    name myserver1
    host 192.168.1.3
    port 24224
    weight 60
  </server>
  <server>
    name myserver2
    host 192.168.1.4
    port 24224
    weight 60
  </server>
  ...

  <secondary>
    @type file
    path /var/log/fluent/forward-failed
  </secondary>
</match>

server標(biāo)簽內(nèi)可以配置如下字段:

  • host
  • name
  • port
  • shared_key
  • username
  • password
  • standby 標(biāo)記server為備用,只有其他node不可用的時(shí)候才會(huì)啟用standby的node
  • weight 負(fù)載均衡的權(quán)重配置

copy

多路輸出(復(fù)制event到多個(gè)輸出端)

示例配置

<match pattern>
  @type copy
  <store>
    @type file
    path /var/log/fluent/myapp1
    ...
  </store>
  <store>
    ...
  </store>
  <store>
    ...
  </store>
</match>

其中每一個(gè)store是一路輸出。

重要參數(shù):

  • copy_mode:復(fù)制模式??蛇x值有
    • no_copy:每路輸出共享event。
    • shallow:淺拷貝,如果不修改嵌套字段可以使用。
    • deep:深拷貝,使用msgpack-ruby方式。
    • marshal:深拷貝,使用marshal方式。
  • store標(biāo)簽的ignore_error參數(shù):如果被標(biāo)記ignore_error的store出現(xiàn)錯(cuò)誤,不會(huì)影響其他的store。官網(wǎng)的例子為:
<match app.**>
  @type copy
  <store>
    @type plugin1
  </store>
  <store>
    @type plugin2
  </store>
</match>

假如plugin1出現(xiàn)錯(cuò)誤,plugin2也不會(huì)執(zhí)行。如果在plugin1的store添加上ignore_error參數(shù),如下所示:

<match app.**>
  @type copy
  <store ignore_error>
    @type plugin1
  </store>
  <store>
    @type plugin2
  </store>
</match>

上述情況plugin2的運(yùn)行不受影響。通常為不重要的store添加ignore_error參數(shù)。

http

通過http請(qǐng)求的方式發(fā)送event。
payload的格式由format標(biāo)簽決定。

示例配置:

<match pattern>
  @type http

  endpoint http://logserver.com:9000/api
  open_timeout 2

  <format>
    @type json
  </format>
  <buffer>
    flush_interval 10s
  </buffer>
</match>

該例子使用http方式將event發(fā)送到http://logserver.com:9000/api,使用post方式,連接超時(shí)時(shí)間為2秒。輸出格式為json,每10秒鐘輸出一次。

注意:

如果使用JSON的方式發(fā)送,HTTP請(qǐng)求的content-type為application/x-ndjson (newline-delimited JSONs)。如果用spring mvc接收會(huì)提示不支持。可以使用HTTPServletRequest接收request body。

stdout

標(biāo)準(zhǔn)輸出的模式,如果使用后臺(tái)模式運(yùn)行fluentd,輸出到fluentd的日志。多用于debug的時(shí)候。

配置方法:

<match pattern>
  @type stdout
</match>

elasticsearch

輸出event到elasticsearch。

示例配置:

<match my.logs>
  @type elasticsearch
  host localhost
  port 9200
  logstash_format true
</match>

可選參數(shù):

  • host:?jiǎn)蝹€(gè)elasticsearch節(jié)點(diǎn)地址
  • port:?jiǎn)蝹€(gè)elasticsearch節(jié)點(diǎn)的端口號(hào)
  • hosts:elasticsearch集群地址。格式為ip1:port1,ip2:port2...
  • user和password:elasticsearch的認(rèn)證信息
  • scheme:使用https還是http。默認(rèn)為http模式
  • path:REST接口路徑,默認(rèn)為空
  • index_name:index名稱
  • logstash_format:index是否使用logstash命名方式(logstash-%Y.%m.%d),默認(rèn)不啟用
  • logstash_prefix:logstash_format啟用的時(shí)候,index命名前綴是什么。默認(rèn)為logstash

kafka

把event輸出到kafka。

示例配置如下:

<match pattern>
  @type kafka2

  # list of seed brokers
  brokers <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>
  use_event_time true

  # buffer settings
  <buffer topic>
    @type file
    path /var/log/td-agent/buffer/td
    flush_interval 3s
  </buffer>

  # data type settings
  <format>
    @type json
  </format>

  # topic settings
  topic_key topic
  default_topic messages

  # producer settings
  required_acks -1
  compression_codec gzip
</match>

重要的參數(shù)為:

  • brokers:Kafka brokers的地址和端口號(hào)
  • topic_key:record中哪個(gè)key對(duì)應(yīng)的值用作Kafka消息的key
  • default_topic:如果沒有配置topic_key,默認(rèn)使用的topic名字
  • format標(biāo)簽:確定發(fā)送的數(shù)據(jù)格式
  • use_event_time:是否使用fluentd event的時(shí)間作為Kafka消息的時(shí)間。默認(rèn)為false。意思為使用當(dāng)前時(shí)間作為發(fā)送消息的時(shí)間
  • required_acks:producer acks的值
  • compression_codec:壓縮編碼方式

webhdfs

event通過REST方式寫入到HDFS。

HADOOP啟用webhdfs的方法

core-site.xml

<configuration>
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://10.180.210.172:9000</value>
    </property>
</configuration>

hdfs-site.xml

<configuration>
    <property>
        <name>dfs.replication</name>
        <value>1</value>
    </property>
    <property>
      <name>dfs.http.address</name>
      <value>0.0.0.0:50070</value>
    </property>
    
    <property>
      <name>dfs.webhdfs.enabled</name>
      <value>true</value>
    </property>
    <property>
      <name>dfs.support.append</name>
      <value>true</value>
    </property>
    <property>
      <name>dfs.support.broken.append</name>
      <value>true</value>
    </property>
</configuration>

最后執(zhí)行$HADOOP_HOME/sbin/httpfs.sh start命令啟動(dòng)webhdfs支持。

注意:此時(shí)webhdfs的端口號(hào)為50070。

示例配置和參數(shù)

示例配置:

<match access.**>
  @type webhdfs
  host namenode.your.cluster.local
  port 50070
  path "/path/on/hdfs/access.log.%Y%m%d_%H.#{Socket.gethostname}.log"
  <buffer>
    flush_interval 10s
  </buffer>
</match>

注意:需要保證HDFS的目標(biāo)目錄具有寫入權(quán)限。debug過程發(fā)現(xiàn)fluentd請(qǐng)求webhdfs沒有使用user proxy,HDFS認(rèn)為操作的用戶為dr.who,無法創(chuàng)建文件。為了解決這個(gè)問題,設(shè)置HDFS目標(biāo)目錄的權(quán)限為777。

重要參數(shù):

  • host:namenode的地址
  • port:namenode的端口號(hào)
  • path:寫入文件路徑??梢允褂谜嘉环蛘遰uby表達(dá)式??梢允褂萌缦路绞奖硎緯r(shí)間:
\%Y: year including the century (at least 4 digits)
\%m: month of the year (01..12)
\%d: Day of the month (01..31)
\%H: Hour of the day, 24-hour clock (00..23)
\%M: Minute of the hour (00..59)
\%S: Second of the minute (00..60)

輸出參數(shù):

  • timekey:多久輸出一次文件到HDFS。如果path中沒有配置占位符,默認(rèn)為86400(1天)。如果指定了和時(shí)間相關(guān)的占位符,則文件輸出周期自動(dòng)和最小的時(shí)間占位符單位一致
  • timekey_wait:允許等待來遲日志的最長(zhǎng)時(shí)間
  • flush_interval:flush間隔時(shí)間,默認(rèn)為不設(shè)置
  • flush_at_shutdown:關(guān)閉的時(shí)候是否flush。如果使用內(nèi)存類型的buffer,需要配置為true

parser配置

regexp

使用正則表達(dá)式命名分組的方式從日志(一行或多行)中提取信息??梢酝ㄟ^time_key指定event的time字段的名字。名字為time字段名的分組內(nèi)容會(huì)被抽取為event時(shí)間。

一個(gè)在線測(cè)試正則表達(dá)式的工具:http://fluentular.herokuapp.com/

基本配置格式:

<parse>
  @type regexp
  expression /.../
</parse>

正則表達(dá)式可以添加額外的參數(shù):
忽略大小寫:/.../i
多行匹配:/.../m。注意,此時(shí).匹配新行
同時(shí)使用忽略大小寫和多行匹配:/.../im

一個(gè)例子,示例配置如下:

<parse>
  @type regexp
  expression /^\[(?<logtime>[^\]]*)\] (?<name>[^ ]*) (?<title>[^ ]*) (?<id>\d*)$/
  time_key logtime
  time_format %Y-%m-%d %H:%M:%S %z
  types id:integer
</parse>

如下的數(shù)據(jù):

[2013-02-28 12:00:00 +0900] alice engineer 1

會(huì)被解析為:

time:
1362020400 (2013-02-28 12:00:00 +0900)

record:
{
  "name" : "alice",
  "title": "engineer",
  "id"   : 1
}

filter配置

record_transformer

record_transformer用來修改event的結(jié)構(gòu),增加或修改字段。

一個(gè)record_transformer的例子:

<filter foo.bar>
  @type record_transformer
  <record>
    hostname "#{Socket.gethostname}"
    tag ${tag}
  </record>
</filter>

這個(gè)filter匹配tag為foo.bar的source。event增加了兩個(gè)新的字段:hostname和tag。

其中hostname這里使用了ruby表達(dá)式。tag使用了字符串插值。

如果數(shù)據(jù)為:

{"message":"hello world!"}

會(huì)被轉(zhuǎn)換為:

{"message":"hello world!", "hostname":"db001.internal.example.com", "tag":"foo.bar"}

可以通過添加enable_ruby配置,在${}中使用ruby表達(dá)式。

例如:

<filter foo.bar>
  @type record_transformer
  enable_ruby
  <record>
    avg ${record["total"] / record["count"]}
  </record>
</filter>

如下輸入:

{"total":100, "count":10}

會(huì)被轉(zhuǎn)換為:

{"total":100, "count":10, "avg":"10"}

注意,可以啟用auto_typecast true配置實(shí)現(xiàn)自動(dòng)類型轉(zhuǎn)換。

修改字段的例子:

<filter foo.bar>
  @type record_transformer
  <record>
    message yay, ${record["message"]}
  </record>
</filter>

如下輸入:

{"message":"hello world!"}

會(huì)被修改為:

{"message":"yay, hello world!"}

可以在表達(dá)式中配置tag_parts變量,引用tag的第n部分。如下所示:

<filter web.*>
  @type record_transformer
  <record>
    service_name ${tag_parts[1]}
  </record>
</filter>

如果遇到tag為web.auth的數(shù)據(jù):

{"user_id":1, "status":"ok"}

會(huì)被轉(zhuǎn)換為:

{"user_id":1, "status":"ok", "service_name":"auth"}

record標(biāo)簽

record標(biāo)簽的語(yǔ)法為:

<record>
  NEW_FIELD NEW_VALUE
</record>

表達(dá)式中可以配置如下變量:

  • record:獲取record中某些字段的內(nèi)容。例如record["count"]
  • tag:獲取tag的內(nèi)容
  • time:獲取日志的時(shí)間戳
  • hostname:獲取主機(jī)名字,和#{Socket.gethostname}作用一樣
  • tag_parts[N]:tag以.分隔,獲取tag的第N部分
  • tag_prefix[N]:獲取tag的0-N部分
  • tag_suffix[N]:獲取tag的N-結(jié)尾部分

例如tag為debug.my.app,tag_parts[1]返回my。tag_prefixtag_suffix的結(jié)果如下:

tag_prefix[0] = debug          tag_suffix[0] = debug.my.app
tag_prefix[1] = debug.my       tag_suffix[1] = my.app
tag_prefix[2] = debug.my.app   tag_suffix[2] = app

配置文件使用通配符和擴(kuò)展

<match><filter>標(biāo)簽可以使用通配符和擴(kuò)展。

tag以.為分隔符,分隔為多個(gè)部分。

fluentd支持的通配符和擴(kuò)展有:
*:只匹配一個(gè)部分。比如a.*匹配a.b,但是不匹配aa.b.c。
**:匹配0個(gè)或多個(gè)部分。比如a.**匹配a,a.ba.b.c。
{X,Y,Z}:匹配X或Y或Z。
#{expression}:使用嵌入的ruby表達(dá)式。有一些快捷變量可以直接使用,例如#{hostname}#{worker_id}。
${..}:使用變量值,tag,record
可以使用如下的方式指定默認(rèn)值。例如:#{ENV["FOOBAR"] || use_default}。如果FOOBAR環(huán)境變量不存在,則使用use_default這個(gè)值。

注意:match標(biāo)簽的匹配過程是有順序的。比如說下面的例子:

<match **>
  @type blackhole_plugin
</match>

<match myapp.access>
  @type file
  path /var/log/fluent/access
</match>

因?yàn)樯厦娴膍atch總是能被匹配到,下面的match永遠(yuǎn)沒有機(jī)會(huì)執(zhí)行。

Buffer

buffer為fluentd很關(guān)鍵的配置,意為緩沖區(qū)。可以決定收集的數(shù)據(jù)存入什么介質(zhì),多長(zhǎng)時(shí)間輸出一次等。

buffer標(biāo)簽必須配置在match標(biāo)簽內(nèi)(即在輸出端配置)。

buffer具有一個(gè)@type屬性,用來配置buffer的儲(chǔ)存介質(zhì):

<buffer>
  @type file
</buffer>

@type有兩個(gè)值:

  • file:存入文件
  • memory:存入內(nèi)存,這個(gè)是默認(rèn)值

buffer標(biāo)簽后面可以跟隨chunk keys,用來決定buffer以record的什么字段來分段存放。例如:

<buffer ARGUMENT_CHUNK_KEYS>
  # ...
</buffer>

注意:

  1. 可以指定多個(gè)buffer chunk keys,使用逗號(hào)分隔。
  2. 如果沒有配置chunk key,所有的event都會(huì)寫入同一個(gè)chunk file,直到buffer滾動(dòng)。

buffer如果使用time作為chunk key,可以按照時(shí)間對(duì)buffer進(jìn)行分段。其中:

  • timekey:時(shí)間的跨度
  • timekey_wait:flush延遲時(shí)間,用于等待遲到的數(shù)據(jù)

官網(wǎng)的例子如下:

<match tag.**>
  # ...
  <buffer time>
    timekey      1h # chunks per hours ("3600" also available)
    timekey_wait 5m # 5mins delay for flush ("300" also available)
  </buffer>
</match>

# Time chunk key: events will be separated for hours (by timekey 3600)

11:59:30 web.access {"key1":"yay","key2":100}  ------> CHUNK_A

12:00:01 web.access {"key1":"foo","key2":200}  --|
                                                 |---> CHUNK_B
12:00:25 ssh.login  {"key1":"yay","key2":100}  --|

部分經(jīng)常用到的配置參數(shù):

  • timekey_use_utc:使用國(guó)際標(biāo)準(zhǔn)時(shí)間還是當(dāng)?shù)貢r(shí)間,默認(rèn)是使用當(dāng)?shù)貢r(shí)間。
  • timekey_zone:指定時(shí)區(qū)。
  • chunk_limit_size:chunk大小限制,默認(rèn)8MB。
  • chunk_limit_records:chunk event條數(shù)限制。
  • total_limit_size:總buffer大小限制。
  • chunk_full_threshold:chunk大小超過chunk_limit_size * chunk_full_threshold時(shí)會(huì)自動(dòng)flush。
  • queued_chunks_limit_size:限制隊(duì)列中的chunk數(shù)目,防止頻繁flush產(chǎn)生過多的chunk。
  • compress:壓縮格式,可使用text或gzip。默認(rèn)為text。
  • flush_at_shutdown:關(guān)閉時(shí)候是否flush。對(duì)于非持久化buffer默認(rèn)值為true,持久化buffer默認(rèn)值為false。
  • flush_interval:多長(zhǎng)時(shí)間flush一次。
  • retry_timeout:重試flush的超時(shí)時(shí)間。在這個(gè)時(shí)間后不再會(huì)retry。
  • retry_forever:是否永遠(yuǎn)嘗試flush。如果設(shè)置為true會(huì)忽略retry_timeout的配置。
  • retry_max_times:重試最大次數(shù)。
  • retry_type:有兩個(gè)配置值:retry時(shí)間間隔,指數(shù)級(jí)增長(zhǎng)或者是固定周期重試。
  • retry_wait:每次重試等待時(shí)間。
  • retry_exponential_backoff_base:retry時(shí)間指數(shù)擴(kuò)大倍數(shù)。
  • retry_max_interval:最長(zhǎng)retry時(shí)間間隔。
  • retry_randomize:是否隨機(jī)retry時(shí)間間隔。

配置文件重用

可以通過@include 配置文件路徑方式,引用其他配置文件片段到fluentd主配置文件中。

配置文件路徑可以使用絕對(duì)路徑或相對(duì)路徑。相對(duì)路徑的基準(zhǔn)路徑為fluentd主配置文件所在的路徑。

@include可以出現(xiàn)在主配置文件的任何位置。

Docker日志輸出到fluentd

通過配置fluentd logging driver的方式實(shí)現(xiàn)。
該driver發(fā)送的log信息包含:

字段 描述
container_id 64字符的container id
container_name container名字
source stdout或stderr
log container的log

全局配置方式

修改/etc/docker/daemon.json,增加如下內(nèi)容:

{
  "log-driver": "fluentd",
  "log-opts": {
    "fluentd-address": "fluentdhost:24224"
  }
}

然后重啟docker daemon使配置生效。

也可以通過添加--log-driver--log-opt參數(shù)的方式指定某個(gè)container使用fluentd logging driver。如下所示:

docker run --log-driver=fluentd --log-opt fluentd-address=fluentdhost:24224

可以通過在--log-opt后指定tag的方式,確定source的tag。

Docker官網(wǎng)參考鏈接:https://docs.docker.com/config/containers/logging/fluentd/

配置實(shí)例

實(shí)例1

采集/root/my.txt文件(內(nèi)容格式為key value),并發(fā)送到http://localhost:9090/

fluentd的配置文件如下:

<source>
  @type tail
  path /root/my.txt
  pos_file /root/my.txt.pos
  tag my
  <parse>
    @type regexp
    expression /(?<key>\w+)\s(?<value>\w+)/
  </parse>
</source>

<match my>
  @type http

  endpoint http://localhost:9090/
  open_timeout 2
  http_method post

  <format>
    @type json
  </format>
  <buffer>
    flush_interval 3s
  </buffer>
</match>

實(shí)例2

提取用戶操作記錄,打印到fluentd日志。

<source>
  @type tail
  # 這里使用HISTFILE環(huán)境變量,如果沒有設(shè)置,使用默認(rèn)值/root/.bash_history
  path "#{ENV["HISTFILE"] || /root/.bash_history}"
  pos_file /root/.bash_history.pos
  tag history
  <parse>
    @type none
  </parse>
</source>

<filter history>
  @type record_transformer
  <record>
    hostname ${hostname}
  </record>
</filter>

<match history>
  @type stdout
</match>

實(shí)例3

收集用戶操作記錄轉(zhuǎn)發(fā)到另一個(gè)fluentd節(jié)點(diǎn),同時(shí)將數(shù)據(jù)發(fā)送到Kafka和存入HDFS。

數(shù)據(jù)流為:fluentd采集端 -> fluentd收集端 -> kafka和HDFS

示例用戶操作記錄數(shù)據(jù)為:

root pts/1 2020-03-26 10:59 (10.180.206.1):root 2020-03-26 11:00:09 130  tail -f /var/log/command.his.log

采集節(jié)點(diǎn)的配置:

<source>
  @type tail
  path /var/log/command.his.log
  pos_file /var/log/command.his.log.pos
  tag history
  <parse>
    @type regexp
    # 使用正則解析日志文件
    expression /^(?<who_user>\w+)\s(?<pts>\S+)\s(?<who_time>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2})\s\((?<remote_ip>\d+\.\d+\.\d+\.\d+)\):(?<user>\w+)\s(?<time>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2})\s(?<res>\d+)\s(?<command>.+)$/
    time_key time
  </parse>
</source>
<filter history>
  @type record_transformer
  <record>
    # event內(nèi)容增加hostname這一行
    hostname ${hostname}
  </record>
</filter>

<match history>
  @type forward
  send_timeout 60s
  recover_wait 10s
  hard_timeout 60s
  <buffer>
    # 1秒鐘向另一個(gè)fluentd節(jié)點(diǎn)轉(zhuǎn)發(fā)一次
    flush_interval 1s
  </buffer>
  <server>
    name myserver1
    host 10.180.210.172
    port 24225
    weight 60
  </server>
</match>

fluentd收集節(jié)點(diǎn)的配置:

<source>
  @type forward
  port 24225
  bind 0.0.0.0
  tag remote
</source>

<match remote>
  # 使用copy方式,分兩路輸出
  @type copy
  <store>
    @type kafka2

    brokers 10.180.210.172:9092
    use_event_time true

    <buffer topic>
        @type file
        path /var/log/td-agent/buffer/td
        flush_interval 3s
    </buffer>

    <format>
        @type json
    </format>

    default_topic history

    required_acks -1
  </store>
  <store>
    @type webhdfs
    host 10.180.210.172
    port 50070
    path "/history/access.log.%Y%m%d_%H.#{Socket.gethostname}.log"
    <buffer>
        flush_interval 60s
    </buffer>
  </store>
</match>

本文為原創(chuàng)內(nèi)容,歡迎大家討論、批評(píng)指正與轉(zhuǎn)載。轉(zhuǎn)載時(shí)請(qǐng)注明出處。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容