前言
來,咱們今天接著聊ELK,前面我們講了ELK的基本操作,ELK的日志檢索。咱們今天來聊聊如何配置敏感信息的郵件通知,作為一個程序員不可能無時無刻的盯著ELK的日志大屏(后面再來聊聊日志的可視化操作),針對ELK的錯誤日志得及時關(guān)注,以免造成不必要的影響。下面我們以每10分鐘發(fā)送503錯誤郵件通知為例。
一、使用管理工具Kibana: Elasticsearch watcher
1.1.編輯/etc/elasticsearch/elasticsearch.yml,在最后添加郵件發(fā)送者的相關(guān)設(shè)置。
xpack.notification.email.account:
outlook_account:
profile: outlook
smtp:
auth: true
starttls.enable: true
host: smtp.office365.com
port: 587
user: xxx@outlook.com
password: xxx
1.2.在Kibana創(chuàng)建一個定制watch。(或者直接使用curl命令添加到watch)
Kibana = > Management = > Elasticsearch = > Watcher = > Create new watch = > Advanced Watch:
{
"trigger" : {
"schedule" : { "cron" : "*/10 * * * * ?" }
},
"input" : {
"search" : {
"request" : {
"indices" : [
"test-qa-access*"
],
"body" : {
"query" : {
"bool" : {
"must" : {
"match": {
"response": 503
}
},
"filter" : {
"range": {
"@timestamp": {
"from": "{{ctx.trigger.scheduled_time}}||-10m",
"to": "{{ctx.trigger.triggered_time}}"
}
}
}
}
}
}
}
}
},
"condition" : {
"compare" : { "ctx.payload.hits.total" : { "gt" : 0 }}
},
"actions" : {
"email_admin" : {
"email" : {
"from": "xxx@outlook.com",
"to" : "xxx@outlook.com",
"subject" : "TEST-QA-ACCESS-LOG - Encountered 503 errors - {{ctx.payload.hits.total}} times",
"body": "Body test"
}
}
}
}
測試執(zhí)行。將操作模式設(shè)置為“執(zhí)行”,如果條件滿足,將發(fā)送到您的真實郵件。

二、在Elasticsearch中設(shè)置cron job查詢
2.1.創(chuàng)建一個腳本alert.py,檢查最近10分鐘內(nèi)是否遇到503錯誤。是則發(fā)送告警郵件,并在郵件正文中包含部分503錯誤信息
from elasticsearch import Elasticsearch
es = Elasticsearch()
import time
from datetime import date
today = date.today()
datestr = date.today().strftime("%Y.%m.%d")
searchidx = "test-qa-access-logs-cw-" + datestr
print(searchidx)
res = es.search(index=searchidx, doc_type="doc, teste-type", body={"query": {"bool": {"must":[{"match": {"response": 503}}, {"range" : {"@timestamp" : {"gte" : "now-10m", "lt" : "now"}}}]}}})
hitstotal = res['hits']['total']
print("%d documents found" % hitstotal)
if hitstotal > 0:
import smtplib
from email.MIMEMultipart import MIMEMultipart
from email.MIMEText import MIMEText
import json
fromaddr = "xxx@outlook.com"
toaddr = "xxx@outlook.com"
msg = MIMEMultipart()
msg['From'] = fromaddr
msg['To'] = toaddr
msg['Subject'] = "503 ALERT Test"
body = json.dumps(res['hits']['hits'])
msg.attach(MIMEText(body, 'plain'))
server = smtplib.SMTP('smtp.office365.com', 587)
server.starttls()
server.login(fromaddr, "xxxxxx")
text = msg.as_string()
server.sendmail(fromaddr, toaddr, text)
server.quit()
else:
print("no hit")
2.2.設(shè)置cron job
*/10 * * * * python /app/errorlogs/alert.py
三、使用 AWS Cloudwatch
3.1.啟用遠程訪問Elasticsearch
使用vim /etc/elasticsearch/elasticsearch.yml, 修改 network.host 字段為network.host: 0.0.0.0。然后,重新啟動elasticsearch服務(wù)使其生效。
3.2.在服務(wù)器上安裝elasticsearch-py開發(fā)包
pip install elasticsearch
創(chuàng)建一個腳本來收集http錯誤代碼的次數(shù),并將數(shù)據(jù)放到AWS Cloudwatch中。創(chuàng)建logs-httpcode-metrics.py文件
import time
import datetime
from datetime import date
from elasticsearch import Elasticsearch
import boto3
def getHitTotal(responseCode, searchIndicesPrefix):
today = date.today()
datestr = date.today().strftime("%Y.%m.%d")
searchidx = searchIndicesPrefix + "-" + datestr
#searchidx = searchIndicesPrefix + "-" + "2021.03.25"
searchtype = "doc, teste-type"
es = Elasticsearch([{'host': '192.168.0.100', 'port': 9200}])
countresult = es.count(index=searchidx, doc_type=searchtype,
body={"query": {"bool": {"must":[{"match": {"response": responseCode}}, {"range" : {"@timestamp" : {"gte" : "now-10m", "lt" : "now"}}}]}}},
ignore=404)
print(searchIndicesPrefix + ":")
if 'count' in countresult.keys():
hitstotal = countresult['count']
print(" %d - %d documents found" % (responseCode, hitstotal))
else:
hitstotal = 0
print(countresult)
return hitstotal
def put_metric(responseCode, searchIndicesPrefix):
cloudwatch= boto3.client('cloudwatch',
# Hard coded strings as credentials, not recommended.
aws_access_key_id='xxx', aws_secret_access_key='xxx',
region_name='ap-northeast-1'
)
metricName = 'Logs_HTTPCode_' + str(responseCode) + '_Count'
hittotal = getHitTotal(responseCode, searchIndicesPrefix)
if hittotal > 0 :
cloudwatch.put_metric_data(
MetricData=[
{
'MetricName': metricName,
'Dimensions': [
{
'Name': 'Elasticsearch Log Indices',
'Value': searchIndicesPrefix
}
],
'Timestamp': str(datetime.datetime.now()),
'Unit': 'Count',
'Value': hittotal
}],
Namespace='ELK/HTTPErrorCode'
)
return
def runTask():
listCodes = [499, 502, 503, 401, 403, 429]
listPrefixs = ['test-qa-access-logs-cw', 'test1-qa-access-logs-cw']
for currPrefix in listPrefixs:
for code in listCodes:
put_metric(code, currPrefix)
return
runTask()
3.3.設(shè)置cron job
*/10 * * * * python /home/ubuntu/workarea/tools/logs-httpcode-metrics.py
3.4.根據(jù)step 1中的自定義指標創(chuàng)建AWS cloudwatch告警
相關(guān)參考
https://www.elastic.co/guide/en/logstash/current/plugins-outputs-email.html
https://tryolabs.com/blog/2015/02/17/python-elasticsearch-first-steps/
https://elasticsearch-py.readthedocs.io/en/master/
https://www.elastic.co/guide/en/x-pack/5.6/how-watcher-works.html
https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-range-query.html