使用redis,kafka做為緩存來收集日志

filebeat使用redis作為緩存

1.前提條件

  • filebeat不支持傳輸給redis哨兵或集群
  • logstash也不支持從redis哨兵或集群里讀取數(shù)據(jù)

2.安裝配置redis

yum install redis -y
sed -i 's#^bind 127.0.0.1#bind 127.0.0.1 10.0.0.51#' /etc/redis.conf
systemctl start redis

3.安裝配置nginx
配置官方源

yum install nginx -y

放在nginx.conf最后一行的}后面,不要放在conf.d里面

stream {
  upstream redis {
      server 10.0.0.51:6379 max_fails=2 fail_timeout=10s;
      server 10.0.0.52:6379 max_fails=2 fail_timeout=10s backup;
  }
  
  server {
          listen 6380;
          proxy_connect_timeout 1s;
          proxy_timeout 3s;
          proxy_pass redis;
  }
}
nginx -t
systemctl start nginx 

4.安裝配置keepalived

yum install keepalived -y
db01的配置
cat>/etc/keepalived/keepalived.conf<<EOF
global_defs {
    router_id db01
}
vrrp_instance VI_1 {
    state MASTER
        interface eth0
        virtual_router_id 50
        priority 150
        advert_int 1
        authentication {
            auth_type PASS
            auth_pass 1111
        }
        virtual_ipaddress {
            10.0.0.100
        }
}
EOF

db02的配置:

cat>/etc/keepalived/keepalived.conf<<EOF
global_defs {
    router_id db02
}
vrrp_instance VI_1 {
    state BACKUP
    interface eth0
    virtual_router_id 50
    priority 100
    advert_int 1
    authentication {
        auth_type PASS
        auth_pass 1111
    }
    virtual_ipaddress {
        10.0.0.100
    }
}
EOF
systemctl start keepalived 
ip a

5.測(cè)試訪問能否代理到redis

redis-cli -h 10.0.0.100 -p 6380
把db01的redis停掉,測(cè)試還能不能連接redis

6.配置filebeat

cat >/etc/filebeat/filebeat.yml <<EOF
filebeat.inputs:
- type: log
  enabled: true 
  paths:
    - /var/log/nginx/access.log
  json.keys_under_root: true
  json.overwrite_keys: true
  tags: ["access"]

- type: log
  enabled: true 
  paths:
    - /var/log/nginx/error.log
  tags: ["error"]

output.redis:
  hosts: ["10.0.0.100:6380"]
  keys:
    - key: "nginx_access"
      when.contains:
        tags: "access"
    - key: "nginx_error"
      when.contains:
        tags: "error"

setup.template.name: "nginx"
setup.template.pattern: "nginx_*"
setup.template.enabled: false
setup.template.overwrite: true
EOF

7.測(cè)試訪問filebeat能否傳輸?shù)絩edis
curl 10.0.0.51/haha
redis-cli -h 10.0.0.51 #應(yīng)該有數(shù)據(jù)
redis-cli -h 10.0.0.52 #應(yīng)該沒數(shù)據(jù)
redis-cli -h 10.0.0.100 -p 6380 #應(yīng)該有數(shù)據(jù)

8.配置logstash

cat >/etc/logstash/conf.d/redis.conf<<EOF 
input {
  redis {
    host => "10.0.0.100"
    port => "6380"
    db => "0"
    key => "nginx_access"
    data_type => "list"
  }
  redis {
    host => "10.0.0.100"
    port => "6380"
    db => "0"
    key => "nginx_error"
    data_type => "list"
  }
}

filter {
  mutate {
    convert => ["upstream_time", "float"]
    convert => ["request_time", "float"]
  }
}

output {
   stdout {}
   if "access" in [tags] {
      elasticsearch {
        hosts => "http://10.0.0.51:9200"
        manage_template => false
        index => "nginx_access-%{+yyyy.MM}"
      }
    }
    if "error" in [tags] {
      elasticsearch {
        hosts => "http://10.0.0.51:9200"
        manage_template => false
        index => "nginx_error-%{+yyyy.MM}"
      }
    }
}
EOF

9.啟動(dòng)測(cè)試

/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/redis.conf

10.關(guān)閉前臺(tái)運(yùn)行,使用后臺(tái)運(yùn)行

ctr+c #結(jié)束前臺(tái)運(yùn)行,使用后臺(tái)運(yùn)行
systemctl start logstash

11.最終測(cè)試
ab -n 10000 -c 100 10.0.0.100/
檢查es-head上索引條目是否為10000條
關(guān)閉db01的redis,在訪問,測(cè)試logstash正不正常
恢復(fù)db01的redis,再測(cè)試

filbeat引入redis優(yōu)化方案

1.新增加一個(gè)日志路徑需要修改4個(gè)地方:

  • filebat 2個(gè)位置
  • logstash 2個(gè)位置

2.優(yōu)化之后需要修改的地方2個(gè)地方

  • filebat 1個(gè)位置
  • logstash 1個(gè)位置

3.filebeat配置文件

vim /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: log
  enabled: true 
  paths:
    - /var/log/nginx/access.log
  json.keys_under_root: true
  json.overwrite_keys: true
  tags: ["access"]

- type: log
  enabled: true 
  paths:
    - /var/log/nginx/error.log
  tags: ["error"]


output.redis:
  hosts: ["10.0.0.100:6380"]
  key: "nginx_log"

setup.template.name: "nginx"
setup.template.pattern: "nginx_*"
setup.template.enabled: false
setup.template.overwrite: true

4.優(yōu)化后的logstash

vim /etc/logstash/conf.d/redis.conf
input {
  redis {
    host => "10.0.0.100"
    port => "6380"
    db => "0"
    key => "nginx_log"
    data_type => "list"
  }
}

filter {
  mutate {
    convert => ["upstream_time", "float"]
    convert => ["request_time", "float"]
  }
}

output {
   stdout {}
   if "access" in [tags] {
      elasticsearch {
        hosts => "http://10.0.0.51:9200"
        manage_template => false
        index => "nginx_access-%{+yyyy.MM}"
      }
    }
    if "error" in [tags] {
      elasticsearch {
        hosts => "http://10.0.0.51:9200"
        manage_template => false
        index => "nginx_error-%{+yyyy.MM}"
      }
    }
}

使用kafka作為緩存

1.配置hosts

10.0.0.51 kafka51
10.0.0.52 kafka52
10.0.0.53 kafka53

2.安裝配置zookeeper

cd /data/soft/
tar zxf zookeeper-3.4.11.tar.gz -C /opt/
ln -s /opt/zookeeper-3.4.11/ /opt/zookeeper                   
mkdir -p /data/zookeeper
cp /opt/zookeeper/conf/zoo_sample.cfg /opt/zookeeper/conf/zoo.cfg
cat >/opt/zookeeper/conf/zoo.cfg<<EOF
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zookeeper
clientPort=2181
server.1=10.0.0.51:2888:3888
server.2=10.0.0.52:2888:3888
server.3=10.0.0.53:2888:3888 
EOF
#每臺(tái)機(jī)器不一樣
echo "1" > /data/zookeeper/myid
cat /data/zookeeper/myid

3.啟動(dòng)zookeeper
所有節(jié)點(diǎn)都啟動(dòng)

/opt/zookeeper/bin/zkServer.sh start

4.每個(gè)節(jié)點(diǎn)都檢查

/opt/zookeeper/bin/zkServer.sh status

5.測(cè)試zookeeper

在一個(gè)節(jié)點(diǎn)上執(zhí)行,創(chuàng)建一個(gè)頻道

/opt/zookeeper/bin/zkCli.sh -server 10.0.0.51:2181
create /test "hello"

在其他節(jié)點(diǎn)上看能否接收到

/opt/zookeeper/bin/zkCli.sh -server 10.0.0.52:2181
get /test

6.安裝部署kafka
kafka51操作:

cd /data/soft/
tar zxf kafka_2.11-1.0.0.tgz -C /opt/
ln -s /opt/kafka_2.11-1.0.0/ /opt/kafka
mkdir /opt/kafka/logs
cat >/opt/kafka/config/server.properties<<EOF
broker.id=1
listeners=PLAINTEXT://10.0.0.51:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/kafka/logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=24
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=10.0.0.51:2181,10.0.0.52:2181,10.0.0.53:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
EOF 

kafka52操作:

cd /data/soft/
tar zxf kafka_2.11-1.0.0.tgz -C /opt/
ln -s /opt/kafka_2.11-1.0.0/ /opt/kafka
mkdir /opt/kafka/logs
cat >/opt/kafka/config/server.properties<<EOF
broker.id=2
listeners=PLAINTEXT://10.0.0.52:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/kafka/logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=24
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=10.0.0.51:2181,10.0.0.52:2181,10.0.0.53:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
EOF

kafka53操作:

cd /data/soft/
tar zxf kafka_2.11-1.0.0.tgz -C /opt/
ln -s /opt/kafka_2.11-1.0.0/ /opt/kafka
mkdir /opt/kafka/logs
cat >/opt/kafka/config/server.properties<<EOF
broker.id=3
listeners=PLAINTEXT://10.0.0.53:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/kafka/logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=24
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=10.0.0.51:2181,10.0.0.52:2181,10.0.0.53:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
EOF

7.前臺(tái)啟動(dòng)測(cè)試

#每個(gè)節(jié)點(diǎn)測(cè)試
[root@kafka51 ~]# /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties

8.驗(yàn)證進(jìn)程

#每個(gè)節(jié)點(diǎn)查看
[root@kafka51 ~]# jps
17000 Kafka
15273 QuorumPeerMain
17084 Jps

9.測(cè)試創(chuàng)建topic

/opt/kafka/bin/kafka-topics.sh --create  --zookeeper 10.0.0.51:2181,10.0.0.52:2181,10.0.0.53:2181 --partitions 3 --replication-factor 3 --topic kafkatest

10.測(cè)試獲取toppid

/opt/kafka/bin/kafka-topics.sh --describe --zookeeper 10.0.0.51:2181,10.0.0.52:2181,10.0.0.53:2181 --topic kafkatest

11.測(cè)試刪除topic

/opt/kafka/bin/kafka-topics.sh --delete --zookeeper 10.0.0.51:2181,10.0.0.52:2181,10.0.0.53:2181 --topic kafkatest

12.kafka測(cè)試命令發(fā)送消息

#創(chuàng)建命令
/opt/kafka/bin/kafka-topics.sh --create --zookeeper 10.0.0.51:2181,10.0.0.52:2181,10.0.0.53:2181 --partitions 3 --replication-factor 3 --topic  messagetest
#測(cè)試發(fā)送消息
/opt/kafka/bin/kafka-console-producer.sh --broker-list  10.0.0.51:9092,10.0.0.52:9092,10.0.0.53:9092 --topic  messagetest
#其他節(jié)點(diǎn)測(cè)試接收
/opt/kafka/bin/kafka-console-consumer.sh --zookeeper 10.0.0.51:2181,10.0.0.52:2181,10.0.0.53:2181 --topic messagetest --from-beginning
#測(cè)試獲取所有的頻道
/opt/kafka/bin/kafka-topics.sh  --list --zookeeper 10.0.0.51:2181,10.0.0.52:2181,10.0.0.53:2181

13.測(cè)試成功之后,可以放在后臺(tái)啟動(dòng)

ctr+c  #結(jié)束前臺(tái)運(yùn)行
/opt/kafka/bin/kafka-server-start.sh  -daemon /opt/kafka/config/server.properties

14.修改filebeat配置文件

cat >/etc/filebeat/filebeat.yml <<EOF
filebeat.inputs:
- type: log
  enabled: true 
  paths:
    - /var/log/nginx/access.log
  json.keys_under_root: true
  json.overwrite_keys: true
  tags: ["access"]

- type: log
  enabled: true 
  paths:
    - /var/log/nginx/error.log
  tags: ["error"]

output.kafka:
  hosts: ["10.0.0.51:9092", "10.0.0.52:9092", "10.0.0.53:9092"]
  topic: 'filebeat'

setup.template.name: "nginx"
setup.template.pattern: "nginx_*"
setup.template.enabled: false
setup.template.overwrite: true

EOF

15.修改logstash配置文件

cat >/etc/logstash/conf.d/kafka.conf <<EOF
input {
  kafka{
    bootstrap_servers=>["10.0.0.51:9092", "10.0.0.52:9092", "10.0.0.53:9092"]
    topics=>["filebeat"]
    group_id=>"logstash"
    codec => "json"
  }
}

filter {
  mutate {
    convert => ["upstream_time", "float"]
    convert => ["request_time", "float"]
  }
}

output {
   stdout {}
   if "access" in [tags] {
      elasticsearch {
        hosts => "http://10.0.0.51:9200"
        manage_template => false
        index => "nginx_access-%{+yyyy.MM}"
      }
    }
    if "error" in [tags] {
      elasticsearch {
        hosts => "http://10.0.0.51:9200"
        manage_template => false
        index => "nginx_error-%{+yyyy.MM}"
      }
    }
}
EOF

16.啟動(dòng)logstash并測(cè)試
1.前臺(tái)啟動(dòng)

/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/kafka.conf

2.后臺(tái)啟動(dòng)

#ctr +c 結(jié)束之前的前臺(tái)啟動(dòng)
systemctl start logstash

17.集群考可用測(cè)試
結(jié)果:任意一臺(tái)或兩臺(tái)服務(wù)器壞了zookpeer或者kafka或者全壞了或者隨機(jī)壞了,只要剩一個(gè)zookper和kafka都不影正常收集日志。
18.總結(jié)kafka實(shí)驗(yàn)
1.前提條件

  • kafka和zook都是基于java的,所以需要java環(huán)境
  • 這倆比較吃資源,內(nèi)存得夠

2.安裝zook注意

  • 每臺(tái)機(jī)器的myid要不一樣,而且要和配置文件里的id對(duì)應(yīng)上
  • 啟動(dòng)測(cè)試,角色為leader和follower
  • 測(cè)試發(fā)送和接受消息

3.安裝kafka注意

  • kafka依賴于zook,所以如果zook不正常,kafka不能工作
  • kafka配置文件里要配上zook的所有IP的列表
  • kafka配置文件里要注意,寫自己的IP地址
  • kafka配置文件里要注意,自己的ID是zook里配置的myid
  • kafka啟動(dòng)要看日志出現(xiàn)started才算是成功

4.測(cè)試zook和kafka

  • 一端發(fā)送消息
  • 兩端能實(shí)時(shí)接收消息

5.配置filebeat

  • output要配上kafka的所有的IP列表

6.配置logstash

  • input要寫上所有的kafka的IP列表,別忘了[]
  • 前臺(tái)啟動(dòng)測(cè)試成功后再后臺(tái)啟動(dòng)
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。
禁止轉(zhuǎn)載,如需轉(zhuǎn)載請(qǐng)通過簡(jiǎn)信或評(píng)論聯(lián)系作者。

友情鏈接更多精彩內(nèi)容