KafkaOffsetMonitor簡述
KafkaOffsetMonitor(下文簡稱KOM)是有由Kafka開源社區(qū)提供的一款Web管理界面,這個應用程序用來實時監(jiān)控Kafka服務的Consumer以及它們所在的Partition中的Offset,可以瀏覽當前的消費者組,查看每個Topic的所有Partition的當前消費情況,瀏覽查閱Topic的歷史消費信息等
KafkaOffsetMonitor 數(shù)據(jù)采集展現(xiàn)
數(shù)據(jù)采集源
Kafka源碼中有定義對象ZkUtils(kafka-master\core\src\main\scala\kafka\utils):
而KOM本質上就是對ZkUtils中的這些屬性的讀取操作。
web實現(xiàn)
KOM是使用jetty作為web容器的,通過angular.js來實現(xiàn)類似MVC功能的。
getGroups具體流程分析:
KOM中一些流程主要體現(xiàn)在app.js和controller.js中。
- 首先需要定義app.js文件,在KOM中的app.js文件為:
var app = angular.module('offsetapp',
["offsetapp.controllers", "offsetapp.directives", "ngRoute"],
function($routeProvider) {
$routeProvider
.when("/", {
templateUrl: "views/grouplist.html",
controller: "GroupListCtrl"
})
.when("/group/:group", {
templateUrl: "views/group.html",
controller: "GroupCtrl"
})
.when("/group/:group/:topic", {
templateUrl: "views/topic.html",
controller: "TopicCtrl"
})
......
;;
});
angular.module("offsetapp.services", ["ngResource"])
.factory("offsetinfo", ["$resource", "$http", function($resource, $http) {
function groupPartitions(cb) {
return function(data) {
var groups = _(data.offsets).groupBy(function(p) {
var t = p.timestamp;
if(!t) t = 0;
return p.group+p.topic+t.toString();
});
groups = groups.values().map(function(partitions) {
return {
group: partitions[0].group,
topic: partitions[0].topic,
partitions: partitions,
logSize: _(partitions).pluck("logSize").reduce(function(sum, num) {
return sum + num;
}),
offset: _(partitions).pluck("offset").reduce(function(sum, num) {
return sum + num;
}),
timestamp: partitions[0].timestamp
};
}).value();
data.offsets = groups;
cb(data);
};
}
return {
getGroup: function(group, cb) {
return $resource("./group/:group").get({group:group}, groupPartitions(cb));
},
......
};
}]);
- 下面是controller.js文件:
angular.module('offsetapp.controllers',["offsetapp.services"])
.controller("GroupCtrl", ["$scope", "$interval", "$routeParams", "offsetinfo",
function($scope, $interval, $routeParams, offsetinfo) {
offsetinfo.getGroup($routeParams.group, function(d) {
$scope.info = d;
$scope.loading=false;
});
$scope.loading=true;
$scope.group = $routeParams.group;
}])
.controller("GroupListCtrl", ["$scope", "offsetinfo",
function($scope, offsetinfo) {
$scope.loading = true;
offsetinfo.listGroup().success(function(d) {
$scope.loading=false;
$scope.groups = d;
});
}])
......
;
- index.html部分代碼塊
<!-- Collect the nav links, forms, and other content for toggling -->
<div class="collapse navbar-collapse" id="bs-example-navbar-collapse-1">
<ul class="nav navbar-nav">
<li><a href="#">Consumer Groups</a></li>
<li><a href="/#/topics">Topic List</a></li>
<li class="dropdown">
<a href="javascript:void(0)" class="dropdown-toggle" data-toggle="dropdown">Visualizations <b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="/#/activetopicsviz">Active Topic Consumers</a></li>
<li><a href="/#/clusterviz">Cluster Overview</a></li>
</ul>
</li>
</ul>
</div><!-- /.navbar-collapse -->
其中"#"表示訪問項目根目錄:對比app.js文件的
.when("/", {templateUrl: "views/grouplist.html",controller: "GroupListCtrl"})
表示當訪問項目根目錄時使用的模板文件是grouplist.html,
<div class="page-header">
<h1>Please select the group you would like to monitor</h1>
</div>
<div class="alert alert-info" ng-show="loading">
Loading ...
</div>
<ul class="list-group">
<li ng-repeat="g in groups" class="list-group-item"><a href="./#/group/{{g}}">{{g}}</a></li>
</ul>
使用的controller是GroupListCtrl,繼續(xù)看controller.js中的GroupListCtrl定義:
.controller("GroupListCtrl", ["$scope", "offsetinfo", function($scope, offsetinfo) {
$scope.loading = true;
offsetinfo.listGroup().success(function(d) {
$scope.loading=false;
$scope.groups = d;
});
}])
會調用offsetinfo.listGroup()方法,再到app.js文件中查看listGroup方法定義:
listGroup: function() {return $http.get("./group");}
這個時候會使用http模塊映射到group這個path上,到這里就要看scala的代碼了,進到OffsetGetterWeb.scala中,該類繼承了UnfilteredWebApp類,在UnfilteredWebApp中定義了啟動方法.
繼續(xù)看group這個path的定義:
case GET(Path(Seg("group" :: Nil))) =>
JsonContent ~> ResponseString(write(getGroups(args)))
調用getGroups方法首先會初始化zkClient和使用zkClient構造OffsetGetter類,接著調用OffsetGetter的getGroups方法:
def getGroups: Seq[String] = {
try {
ZkUtils.getChildren(zkClient, ZkUtils.ConsumersPath)
} catch {
case NonFatal(t) =>
error(s"could not get groups because of ${t.getMessage}", t)
Seq()
}
}
也就是說getGroups就是讀取zookeeper中的/consumers目錄的數(shù)據(jù),讀取完成之后通過$scope.groups = d;代碼將結果賦給$scope.groups,這樣grouplist.html中就可以通過遍歷groups來得到每個group了:
<li ng-repeat="g in groups" class="list-group-item"><a href="./#/group/{{g}}">{{g}}</a></li>
<font size=2>注:ng-repeat 指令用于循環(huán)輸出指定次數(shù)的 HTML 元素
得到所有的groups之后,通過./#/group/鏈接可以訪問每個group的具體信息。
數(shù)據(jù)采集周期
kafka監(jiān)控的采集周期,也就是刷新時間refresh,還有保留時間retain,是在啟動時指定的,默認是10s刷新一次,數(shù)據(jù)保留2天
注:KOM的運行需要通過sbt assembly進行編譯打包
OffsetGetterWeb中定時任務方法:
def schedule(args: OWArgs) {
def retryTask[T](fn: => T) {
try {
retry(3) {
fn
}
} catch {
case NonFatal(e) =>
error("Failed to run scheduled task", e)
}
}
timer.scheduleAtFixedRate(new TimerTask() {
override def run() {
retryTask(writeToDb(args))
}
}, 0, args.refresh.toMillis)
timer.scheduleAtFixedRate(new TimerTask() {
override def run() {
retryTask(args.db.emptyOld(System.currentTimeMillis - args.retain.toMillis))
}
}, args.retain.toMillis, args.retain.toMillis)
}
def writeToDb(args: OWArgs) {
val groups = getGroups(args)
groups.foreach {
g =>
val inf = getInfo(g, args).offsets.toIndexedSeq
info(s"inserting ${inf.size}")
args.db.insertAll(inf)
}
}
DB寫操作:每執(zhí)行一次(采集)刷新,就會執(zhí)行一次寫操作(insertAll),一次清除舊數(shù)據(jù)的操作(emptyOld).
DB讀操作:監(jiān)控的讀操作只有在查詢歷史信息(offsetHistory)時才查詢DB,其他的數(shù)據(jù)都是實時的數(shù)據(jù)。
def offsetHistory(group: String, topic: String): OffsetHistory = database.withSession {
implicit s =>
val o = offsets
.where(off => off.group === group && off.topic === topic)
.sortBy(_.timestamp)
.map(_.forHistory)
.list()
OffsetHistory(group, topic, o)
}
數(shù)據(jù)庫存儲
數(shù)據(jù)庫sqlite
KOM的數(shù)據(jù)庫采用sqlite
val database = Database.forURL(s"jdbc:sqlite:$dbfile.db",
driver = "org.sqlite.JDBC")
默認數(shù)據(jù)庫文件位置:.../KafkaOffsetMonitor-master/offsetapp.db
數(shù)據(jù)庫字段
表名:OFFSETS
存儲字段:
| 字段名 | 字段類型 | 說明 | 是否可空 |
|---|---|---|---|
| id | INTEGER | PRIMARY KEY、AUTOINCREMEN | NOT NULL |
| group | VARCHAR(254) | 分組 | NOT NULL |
| topic | VARCHAR(254) | 話題 | NOT NULL |
| partition | INTEGER | 分區(qū)編號 | NOT NULL |
| offset | BIGINT | 偏移量 | NOT NULL |
| log_size | BIGINT | 分區(qū)內已接收消息總量 | NOT NULL |
| owner | VARCHAR(254) | 所屬者 | 可為null |
| timestamp | BIGINT | 時間戳 | NOT NULL |
| creation | BIGINT | 創(chuàng)建時間 | NOT NULL |
| modified | BIGINT | 最新更新時間 | NOT NULL |
數(shù)據(jù)庫索引:
def idx = index("idx_search", (group, topic))
def tidx = index("idx_time", (timestamp))
def uidx = index("idx_unique", (group, topic, partition, timestamp), unique = true)
存儲性能及改造分析
因為KOM可以配置sqlite數(shù)據(jù)保留時間,定期清除過期數(shù)據(jù),具體的存儲性能跟存儲時間和存儲量有關,需要根據(jù)需求測試評估。
但基于sqlite本身特性:主打輕便,基于文件
因此對于大規(guī)模存儲(>100W)性能欠佳,會導致頁面加載較慢,且不支持分布式,沒有用戶管理。
Mysql的功能完全能覆蓋Sqlite,若改造則需要將源碼中的OffsetDB.scala文件中對數(shù)據(jù)庫操作的函數(shù)(insert,insertAll,emptyOld,offsetHistory,maybeCreate)進行改寫。
難點分析:要求熟悉scala語言,熟悉scala對mysql的操作。