KafkaOffsetMonitor簡述

KafkaOffsetMonitor簡述

KafkaOffsetMonitor download

KafkaOffsetMonitor(下文簡稱KOM)是有由Kafka開源社區(qū)提供的一款Web管理界面,這個應用程序用來實時監(jiān)控Kafka服務的Consumer以及它們所在的Partition中的Offset,可以瀏覽當前的消費者組,查看每個Topic的所有Partition的當前消費情況,瀏覽查閱Topic的歷史消費信息等


topic的所有partiton消費情況列表


一個topic的歷史消費情況

KafkaOffsetMonitor 數(shù)據(jù)采集展現(xiàn)

數(shù)據(jù)采集源

Kafka源碼中有定義對象ZkUtils(kafka-master\core\src\main\scala\kafka\utils):

ZkUtils

而KOM本質上就是對ZkUtils中的這些屬性的讀取操作

web實現(xiàn)

KOM是使用jetty作為web容器的,通過angular.js來實現(xiàn)類似MVC功能的。

getGroups具體流程分析:

KOM中一些流程主要體現(xiàn)在app.js和controller.js中。

  • 首先需要定義app.js文件,在KOM中的app.js文件為:
var app = angular.module('offsetapp',
                         ["offsetapp.controllers", "offsetapp.directives",  "ngRoute"],
                                                 function($routeProvider) {
                                                         $routeProvider
                                                         .when("/", {
                                                             templateUrl: "views/grouplist.html",
                                                             controller: "GroupListCtrl"
                                                         })
                                                         .when("/group/:group", {
                                                             templateUrl: "views/group.html",
                                                             controller: "GroupCtrl"
                                                         })
                                                         .when("/group/:group/:topic", {
                                                             templateUrl: "views/topic.html",
                                                             controller: "TopicCtrl"
                                                         })
                                                         ......
                                                         ;;
                                                 });

angular.module("offsetapp.services", ["ngResource"])
    .factory("offsetinfo", ["$resource", "$http", function($resource, $http) {
        function groupPartitions(cb) {
            return function(data) {
                var groups = _(data.offsets).groupBy(function(p) {
                    var t = p.timestamp;
                    if(!t) t = 0;
                    return p.group+p.topic+t.toString();
                });
                groups = groups.values().map(function(partitions) {
                    return {
                        group: partitions[0].group,
                        topic: partitions[0].topic,
                        partitions: partitions,
                        logSize: _(partitions).pluck("logSize").reduce(function(sum, num) {
                            return sum + num;
                        }),
                        offset: _(partitions).pluck("offset").reduce(function(sum, num) {
                            return sum + num;
                        }),
                        timestamp: partitions[0].timestamp
                    };
                }).value();
                data.offsets = groups;
                cb(data);
            };
        }

        return {
            getGroup: function(group, cb) {
                return $resource("./group/:group").get({group:group}, groupPartitions(cb));
            },
            ......
        };
    }]);

  • 下面是controller.js文件:
angular.module('offsetapp.controllers',["offsetapp.services"])
    .controller("GroupCtrl", ["$scope", "$interval", "$routeParams", "offsetinfo",
                              function($scope, $interval, $routeParams, offsetinfo) {
                                  offsetinfo.getGroup($routeParams.group, function(d) {
                                      $scope.info = d;
                                      $scope.loading=false;
                                  });
                                  $scope.loading=true;

                                  $scope.group = $routeParams.group;
                              }])
    .controller("GroupListCtrl", ["$scope", "offsetinfo",
                                  function($scope, offsetinfo) {
                                      $scope.loading = true;
                                      offsetinfo.listGroup().success(function(d) {
                                          $scope.loading=false;
                                          $scope.groups = d;
                                      });
                                  }])
......
;


  • index.html部分代碼塊
              <!-- Collect the nav links, forms, and other content for toggling -->
              <div class="collapse navbar-collapse" id="bs-example-navbar-collapse-1">
                  <ul class="nav navbar-nav">
                      <li><a href="#">Consumer Groups</a></li>
                      <li><a href="/#/topics">Topic List</a></li>
                      <li class="dropdown">
                          <a  href="javascript:void(0)"  class="dropdown-toggle" data-toggle="dropdown">Visualizations <b class="caret"></b></a>
                          <ul class="dropdown-menu">
                              <li><a href="/#/activetopicsviz">Active Topic Consumers</a></li>
                              <li><a href="/#/clusterviz">Cluster Overview</a></li>
                          </ul>
                      </li>
                  </ul>
              </div><!-- /.navbar-collapse -->

其中"#"表示訪問項目根目錄:對比app.js文件的

.when("/", {templateUrl: "views/grouplist.html",controller: "GroupListCtrl"})

表示當訪問項目根目錄時使用的模板文件是grouplist.html,

<div class="page-header">
  <h1>Please select the group you would like to monitor</h1>
</div>

<div class="alert alert-info" ng-show="loading">
Loading ...
</div>

<ul class="list-group">
<li ng-repeat="g in groups" class="list-group-item"><a href="./#/group/{{g}}">{{g}}</a></li>
</ul>

使用的controller是GroupListCtrl,繼續(xù)看controller.js中的GroupListCtrl定義:

.controller("GroupListCtrl", ["$scope", "offsetinfo", function($scope, offsetinfo) {
$scope.loading = true;
offsetinfo.listGroup().success(function(d) {
    $scope.loading=false;
    $scope.groups = d;
});
}])

會調用offsetinfo.listGroup()方法,再到app.js文件中查看listGroup方法定義:

listGroup: function() {return $http.get("./group");}

這個時候會使用http模塊映射到group這個path上,到這里就要看scala的代碼了,進到OffsetGetterWeb.scala中,該類繼承了UnfilteredWebApp類,在UnfilteredWebApp中定義了啟動方法.

繼續(xù)看group這個path的定義:

case GET(Path(Seg("group" :: Nil))) =>
        JsonContent ~> ResponseString(write(getGroups(args)))

調用getGroups方法首先會初始化zkClient和使用zkClient構造OffsetGetter類,接著調用OffsetGetter的getGroups方法:

  def getGroups: Seq[String] = {
    try {
      ZkUtils.getChildren(zkClient, ZkUtils.ConsumersPath)
    } catch {
      case NonFatal(t) =>
        error(s"could not get groups because of ${t.getMessage}", t)
        Seq()
    }
  }

也就是說getGroups就是讀取zookeeper中的/consumers目錄的數(shù)據(jù),讀取完成之后通過$scope.groups = d;代碼將結果賦給$scope.groups,這樣grouplist.html中就可以通過遍歷groups來得到每個group了:

<li ng-repeat="g in groups" class="list-group-item"><a href="./#/group/{{g}}">{{g}}</a></li>

<font size=2>注:ng-repeat 指令用于循環(huán)輸出指定次數(shù)的 HTML 元素

得到所有的groups之后,通過./#/group/鏈接可以訪問每個group的具體信息。

數(shù)據(jù)采集周期

kafka監(jiān)控的采集周期,也就是刷新時間refresh,還有保留時間retain,是在啟動時指定的,默認是10s刷新一次,數(shù)據(jù)保留2天

kafka監(jiān)控的采集周期

注:KOM的運行需要通過sbt assembly進行編譯打包

OffsetGetterWeb中定時任務方法:

  def schedule(args: OWArgs) {
    def retryTask[T](fn: => T) {
      try {
        retry(3) {
          fn
        }
      } catch {
        case NonFatal(e) =>
          error("Failed to run scheduled task", e)
      }
    }

    timer.scheduleAtFixedRate(new TimerTask() {
      override def run() {
        retryTask(writeToDb(args))
      }
    }, 0, args.refresh.toMillis)
    timer.scheduleAtFixedRate(new TimerTask() {
      override def run() {
        retryTask(args.db.emptyOld(System.currentTimeMillis - args.retain.toMillis))
      }
    }, args.retain.toMillis, args.retain.toMillis)
  }

  def writeToDb(args: OWArgs) {
    val groups = getGroups(args)
    groups.foreach {
      g =>
        val inf = getInfo(g, args).offsets.toIndexedSeq
        info(s"inserting ${inf.size}")
        args.db.insertAll(inf)
    }
  }

DB寫操作:每執(zhí)行一次(采集)刷新,就會執(zhí)行一次寫操作(insertAll),一次清除舊數(shù)據(jù)的操作(emptyOld).

DB讀操作:監(jiān)控的讀操作只有在查詢歷史信息(offsetHistory)時才查詢DB,其他的數(shù)據(jù)都是實時的數(shù)據(jù)。

  def offsetHistory(group: String, topic: String): OffsetHistory = database.withSession {
    implicit s =>
      val o = offsets
        .where(off => off.group === group && off.topic === topic)
        .sortBy(_.timestamp)
        .map(_.forHistory)
        .list()
      OffsetHistory(group, topic, o)
  }

數(shù)據(jù)庫存儲

數(shù)據(jù)庫sqlite

KOM的數(shù)據(jù)庫采用sqlite

  val database = Database.forURL(s"jdbc:sqlite:$dbfile.db",
    driver = "org.sqlite.JDBC")

默認數(shù)據(jù)庫文件位置:.../KafkaOffsetMonitor-master/offsetapp.db

數(shù)據(jù)庫字段

表名:OFFSETS
存儲字段:

字段名 字段類型 說明 是否可空
id INTEGER PRIMARY KEY、AUTOINCREMEN NOT NULL
group VARCHAR(254) 分組 NOT NULL
topic VARCHAR(254) 話題 NOT NULL
partition INTEGER 分區(qū)編號 NOT NULL
offset BIGINT 偏移量 NOT NULL
log_size BIGINT 分區(qū)內已接收消息總量 NOT NULL
owner VARCHAR(254) 所屬者 可為null
timestamp BIGINT 時間戳 NOT NULL
creation BIGINT 創(chuàng)建時間 NOT NULL
modified BIGINT 最新更新時間 NOT NULL

數(shù)據(jù)庫索引:

    def idx = index("idx_search", (group, topic))

    def tidx = index("idx_time", (timestamp))

    def uidx = index("idx_unique", (group, topic, partition, timestamp), unique = true)

存儲性能及改造分析

因為KOM可以配置sqlite數(shù)據(jù)保留時間,定期清除過期數(shù)據(jù),具體的存儲性能跟存儲時間和存儲量有關,需要根據(jù)需求測試評估。

但基于sqlite本身特性:主打輕便,基于文件
因此對于大規(guī)模存儲(>100W)性能欠佳,會導致頁面加載較慢,且不支持分布式,沒有用戶管理。

Mysql的功能完全能覆蓋Sqlite,若改造則需要將源碼中的OffsetDB.scala文件中對數(shù)據(jù)庫操作的函數(shù)(insert,insertAll,emptyOld,offsetHistory,maybeCreate)進行改寫。

難點分析:要求熟悉scala語言,熟悉scala對mysql的操作。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

  • 姓名:周小蓬 16019110037 轉載自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw閱讀 34,916評論 13 425
  • Spring Cloud為開發(fā)人員提供了快速構建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,715評論 19 139
  • Android 自定義View的各種姿勢1 Activity的顯示之ViewRootImpl詳解 Activity...
    passiontim閱讀 179,366評論 25 708
  • 發(fā)行說明 - Kafka - 版本1.0.0 以下是Kafka 1.0.0發(fā)行版中解決的JIRA問題的摘要。有關該...
    全能程序猿閱讀 3,018評論 2 7
  • 事件在不同瀏覽器需要兼容性處理,現(xiàn)封裝起來,便于日后使用和學習。
    sdcV閱讀 362評論 0 0

友情鏈接更多精彩內容