ElasticSearch 簡(jiǎn)要技術(shù)總結(jié)
1. 總覽
ES是高度可伸縮的開(kāi)源全文搜索和分析引擎。它可以實(shí)時(shí)地存儲(chǔ)、搜索和分析大容量的數(shù)據(jù)。通常用作底層引擎/技術(shù)力量有復(fù)雜的搜索功能和需求的應(yīng)用程序。
這是一些典型的應(yīng)用場(chǎng)景:
- 在線網(wǎng)上商店允許客戶搜索銷(xiāo)售的產(chǎn)品。在這種情況下,可以使用ElasticSearch存儲(chǔ)整個(gè)產(chǎn)品目錄和庫(kù)存,并為它們提供搜索和自動(dòng)填充建議。
- 希望收集日志或交易數(shù)據(jù),并且希望分析和挖掘此數(shù)據(jù)以查找趨勢(shì),統(tǒng)計(jì)信息,摘要或異常。在這種情況下,可以使用Logstash(Elasticsearch / Logstash / Kibana堆棧的一部分)來(lái)收集,聚合和解析數(shù)據(jù),然后讓Logstash將此數(shù)據(jù)提供給Elasticsearch。一旦數(shù)據(jù)在ElasticSearch中,就可以運(yùn)行搜索和聚合來(lái)挖掘您感興趣的任何信息。
- 運(yùn)行價(jià)格警報(bào)平臺(tái),允許精通價(jià)格的客戶指定一條規(guī)則,例如“我有興趣購(gòu)買(mǎi)特定的電子產(chǎn)品,如果小工具的價(jià)格在下個(gè)月內(nèi)從任何供應(yīng)商降至X美元以下,我希望收到通知” 。在這種情況下,可以刮取供應(yīng)商價(jià)格,將其推入ElasticSearch并使用其反向搜索(Percolator)功能來(lái)匹配價(jià)格變動(dòng)與客戶查詢,并最終在發(fā)現(xiàn)匹配后將警報(bào)推送給客戶。
- 有分析/業(yè)務(wù)智能需求,并希望快速調(diào)查,分析,可視化并詢問(wèn)有關(guān)大量數(shù)據(jù)的特定問(wèn)題(數(shù)百萬(wàn)或數(shù)十億條記錄)。在這種情況下,可以使用ElasticSearch存儲(chǔ)數(shù)據(jù),然后使用Kibana(Elasticsearch / Logstash / Kibana堆棧的一部分)構(gòu)建自定義儀表板,以便可視化重要的數(shù)據(jù)。此外,可以使用ElasticSearch聚合功能對(duì)數(shù)據(jù)執(zhí)行復(fù)雜的商業(yè)智能查詢。
2. 基本概念
2.1 Node 與 Cluster
Elastic 本質(zhì)上是一個(gè)分布式數(shù)據(jù)庫(kù),允許多臺(tái)服務(wù)器協(xié)同工作,每臺(tái)服務(wù)器可以運(yùn)行多個(gè) Elastic 實(shí)例。
單個(gè) Elastic 實(shí)例稱(chēng)為一個(gè)節(jié)點(diǎn)(node)。一組節(jié)點(diǎn)構(gòu)成一個(gè)集群(cluster)。
2.2 Index
Elastic 會(huì)索引所有字段,經(jīng)過(guò)處理后寫(xiě)入一個(gè)反向索引(Inverted Index)。查找數(shù)據(jù)的時(shí)候,直接查找該索引。
所以,Elastic 數(shù)據(jù)管理的頂層單位就叫做 Index(索引)。它是單個(gè)數(shù)據(jù)庫(kù)的同義詞。每個(gè) Index (即數(shù)據(jù)庫(kù))的名字必須是小寫(xiě)。
下面的命令可以查看當(dāng)前節(jié)點(diǎn)的所有 Index。
$ curl -X GET 'http://localhost:9200/_cat/indices?v'
事實(shí)上,我們的數(shù)據(jù)被存儲(chǔ)和索引在分片(shards)中,索引只是一個(gè)把一個(gè)或多個(gè)分片分組在一起的邏輯空間。然而,這只是一些內(nèi)部細(xì)節(jié)——我們的程序完全不用關(guān)心分片。對(duì)于我們的程序而言,文檔存儲(chǔ)在索引(index)中。剩下的細(xì)節(jié)由Elasticsearch關(guān)心既可。
2.3 Document
Index 里面單條的記錄稱(chēng)為 Document(文檔)。許多條 Document 構(gòu)成了一個(gè) Index。
Document 使用 JSON 格式表示,下面是一個(gè)例子。
{
"user": "張三",
"title": "工程師",
"desc": "數(shù)據(jù)庫(kù)管理"
}
同一個(gè) Index 里面的 Document,不要求有相同的結(jié)構(gòu)(scheme),但是最好保持相同,這樣有利于提高搜索效率。
通常,我們可以認(rèn)為對(duì)象(object)和文檔(document)是等價(jià)相通的。不過(guò),他們還是有所差別:對(duì)象(Object)是一個(gè)JSON結(jié)構(gòu)體——類(lèi)似于哈希、hashmap、字典或者關(guān)聯(lián)數(shù)組;對(duì)象(Object)中還可能包含其他對(duì)象(Object)。 在Elasticsearch中,文檔(document)這個(gè)術(shù)語(yǔ)有著特殊含義。它特指最頂層結(jié)構(gòu)或者根對(duì)象(root object)序列化成的JSON數(shù)據(jù)(以唯一ID標(biāo)識(shí)并存儲(chǔ)于Elasticsearch中)。
2.4 Type(將在ES6.0移除)
Document 可以分組,比如weather這個(gè) Index 里面,可以按城市分組(北京和上海),也可以按氣候分組(晴天和雨天)。這種分組就叫做 Type,它是虛擬的邏輯分組,用來(lái)過(guò)濾 Document。
不同的 Type 應(yīng)該有相似的結(jié)構(gòu)(schema),舉例來(lái)說(shuō),id字段不能在這個(gè)組是字符串,在另一個(gè)組是數(shù)值。這是與關(guān)系型數(shù)據(jù)庫(kù)的表的一個(gè)區(qū)別。性質(zhì)完全不同的數(shù)據(jù)(比如products和logs)應(yīng)該存成兩個(gè) Index,而不是一個(gè) Index 里面的兩個(gè) Type(雖然可以做到)。
下面的命令可以列出每個(gè) Index 所包含的 Type。
$ curl 'localhost:9200/_mapping?pretty=true'
在應(yīng)用中,我們使用對(duì)象表示一些“事物”,例如一個(gè)用戶、一篇博客、一個(gè)評(píng)論,或者一封郵件。每個(gè)對(duì)象都屬于一個(gè)類(lèi)(class),這個(gè)類(lèi)定義了屬性或與對(duì)象關(guān)聯(lián)的數(shù)據(jù)。
user類(lèi)的對(duì)象可能包含姓名、性別、年齡和Email地址。在關(guān)系型數(shù)據(jù)庫(kù)中,我們經(jīng)常將相同類(lèi)的對(duì)象存儲(chǔ)在一個(gè)表里,因?yàn)樗鼈冇兄嗤慕Y(jié)構(gòu)。同理,在Elasticsearch中,我們使用相同類(lèi)型(type)的文檔表示相同的“事物”,因?yàn)樗麄兊臄?shù)據(jù)結(jié)構(gòu)也是相同的。
每個(gè)類(lèi)型(type)都有自己的映射(mapping)或者結(jié)構(gòu)定義,就像傳統(tǒng)數(shù)據(jù)庫(kù)表中的列一樣。所有類(lèi)型下的文檔被存儲(chǔ)在同一個(gè)索引下,但是類(lèi)型的映射(mapping)會(huì)告訴Elasticsearch不同的文檔如何被索引。
3. 數(shù)據(jù)操作
3.1 新增記錄
向指定的 /Index/Type 發(fā)送 PUT 請(qǐng)求,就可以在 Index 里面新增一條記錄。比如,向/accounts/person發(fā)送請(qǐng)求,就可以新增一條人員記錄。
$ curl -X PUT 'localhost:9200/accounts/person/1' -d '
{
"user": "張三",
"title": "工程師",
"desc": "數(shù)據(jù)庫(kù)管理"
}'
服務(wù)器返回的 JSON 對(duì)象,會(huì)給出 Index、Type、Id、Version 等信息。
{
"_index":"accounts",
"_type":"person",
"_id":"1",
"_version":1,
"result":"created",
"_shards":{"total":2,"successful":1,"failed":0},
"created":true
}
如果你仔細(xì)看,會(huì)發(fā)現(xiàn)請(qǐng)求路徑是/accounts/person/1,最后的1是該條記錄的 Id。它不一定是數(shù)字,任意字符串(比如abc)都可以。
新增記錄的時(shí)候,也可以不指定 Id,這時(shí)要改成 POST 請(qǐng)求。
$ curl -X POST 'localhost:9200/accounts/person' -d '
{
"user": "李四",
"title": "工程師",
"desc": "系統(tǒng)管理"
}'
上面代碼中,向/accounts/person發(fā)出一個(gè) POST 請(qǐng)求,添加一個(gè)記錄。這時(shí),服務(wù)器返回的 JSON 對(duì)象里面,_id字段就是一個(gè)隨機(jī)字符串。
{
"_index":"accounts",
"_type":"person",
"_id":"AV3qGfrC6jMbsbXb6k1p",
"_version":1,
"result":"created",
"_shards":{"total":2,"successful":1,"failed":0},
"created":true
}
如果沒(méi)有先創(chuàng)建 Index(這個(gè)例子是accounts),直接執(zhí)行上面的命令,Elastic 也不會(huì)報(bào)錯(cuò),而是直接生成指定的 Index
3.2 查看記錄
向/Index/Type/Id發(fā)出 GET 請(qǐng)求,就可以查看這條記錄。
$ curl 'localhost:9200/accounts/person/1?pretty=true'
上面代碼請(qǐng)求查看/accounts/person/1這條記錄,URL 的參數(shù)pretty=true表示以易讀的格式返回。
返回的數(shù)據(jù)中,found字段表示查詢成功,_source字段返回原始記錄。
{
"_index" : "accounts",
"_type" : "person",
"_id" : "1",
"_version" : 1,
"found" : true,
"_source" : {
"user" : "張三",
"title" : "工程師",
"desc" : "數(shù)據(jù)庫(kù)管理"
}
}
如果 Id 不正確,就查不到數(shù)據(jù),found字段就是false。
$ curl 'localhost:9200/weather/beijing/abc?pretty=true'
{
"_index" : "accounts",
"_type" : "person",
"_id" : "abc",
"found" : false
}
3.3 刪除記錄
刪除記錄就是發(fā)出 DELETE 請(qǐng)求
$ curl -X DELETE 'localhost:9200/accounts/person/1'
3.4 更新記錄
更新記錄就是使用 PUT 請(qǐng)求,重新發(fā)送一次數(shù)據(jù)
$ curl -X PUT 'localhost:9200/accounts/person/1' -d '
{
"user" : "張三",
"title" : "工程師",
"desc" : "數(shù)據(jù)庫(kù)管理,軟件開(kāi)發(fā)"
}'
{
"_index":"accounts",
"_type":"person",
"_id":"1",
"_version":2,
"result":"updated",
"_shards":{"total":2,"successful":1,"failed":0},
"created":false
}
上面代碼中,我們將原始數(shù)據(jù)從"數(shù)據(jù)庫(kù)管理"改成"數(shù)據(jù)庫(kù)管理,軟件開(kāi)發(fā)"。 返回結(jié)果里面,有幾個(gè)字段發(fā)生了變化
可以看到,記錄的 Id 沒(méi)變,但是版本(version)從1變成2,操作類(lèi)型(result)從created變成updated,created字段變成false,因?yàn)檫@次不是新建記錄
Elasticsearch是一個(gè)分布式系統(tǒng)。當(dāng)documents被創(chuàng)建、更新或者刪除,其新版本會(huì)被復(fù)制到集群的其它節(jié)點(diǎn)。Elasticsearch既是異步的(asynchronous )也是同步的(concurrent),其含義是復(fù)制請(qǐng)求都是并行發(fā)送的,但是到達(dá)目的地的順序是無(wú)序的。Elasticsearch系統(tǒng)需要一種方法使得老版本的文檔永遠(yuǎn)都無(wú)法覆蓋新的版本。
每當(dāng)文檔被改變的時(shí)候,文檔中的
_version將會(huì)被增加(+1)。Elasticsearch使用_version確保所有的修改都會(huì)按照正確的順序執(zhí)行。如果文檔舊的版本在新的版本之后到達(dá),它會(huì)被簡(jiǎn)單的忽略。
4. 數(shù)據(jù)查詢
4.1 返回所有記錄
使用 GET 方法,直接請(qǐng)求/Index/Type/_search,就會(huì)返回所有記錄。
$ curl 'localhost:9200/accounts/person/_search'
{
"took":2,
"timed_out":false,
"_shards":{"total":5,"successful":5,"failed":0},
"hits":{
"total":2,
"max_score":1.0,
"hits":[
{
"_index":"accounts",
"_type":"person",
"_id":"AV3qGfrC6jMbsbXb6k1p",
"_score":1.0,
"_source": {
"user": "李四",
"title": "工程師",
"desc": "系統(tǒng)管理"
}
},
{
"_index":"accounts",
"_type":"person",
"_id":"1",
"_score":1.0,
"_source": {
"user" : "張三",
"title" : "工程師",
"desc" : "數(shù)據(jù)庫(kù)管理,軟件開(kāi)發(fā)"
}
}
]
}
}
上面代碼中,返回結(jié)果的 took字段表示該操作的耗時(shí)(單位為毫秒),timed_out字段表示是否超時(shí),hits字段表示命中的記錄,里面子字段的含義如下。
-
total:返回記錄數(shù),本例是2條。 -
max_score:最高的匹配程度,本例是1.0。 -
hits:返回的記錄組成的數(shù)組。
返回的記錄中,每條記錄都有一個(gè)_score字段,表示匹配的程序,默認(rèn)是按照這個(gè)字段降序排列。
4.2 全文搜索
Elastic 的查詢非常特別,使用自己的查詢語(yǔ)法,要求 GET 請(qǐng)求帶有數(shù)據(jù)體
$ curl 'localhost:9200/accounts/person/_search' -d '
{
"query" : { "match" : { "desc" : "軟件" }}
}'
上面代碼使用 Match 查詢,指定的匹配條件是desc字段里面包含"軟件"這個(gè)詞,返回結(jié)果如下
{
"took":3,
"timed_out":false,
"_shards":{"total":5,"successful":5,"failed":0},
"hits":{
"total":1,
"max_score":0.28582606,
"hits":[
{
"_index":"accounts",
"_type":"person",
"_id":"1",
"_score":0.28582606,
"_source": {
"user" : "張三",
"title" : "工程師",
"desc" : "數(shù)據(jù)庫(kù)管理,軟件開(kāi)發(fā)"
}
}
]
}
}
Elastic 默認(rèn)一次返回10條結(jié)果,可以通過(guò)size字段改變這個(gè)設(shè)置。
$ curl 'localhost:9200/accounts/person/_search' -d '
{
"query" : { "match" : { "desc" : "管理" }},
"size": 1
}'
上面代碼指定,每次只返回一條結(jié)果。
還可以通過(guò)from字段,指定位移。
$ curl 'localhost:9200/accounts/person/_search' -d '
{
"query" : { "match" : { "desc" : "管理" }},
"from": 1,
"size": 1
}'
上面代碼指定,從位置1開(kāi)始(默認(rèn)是從位置0開(kāi)始),只返回一條結(jié)果。
4.3 邏輯運(yùn)算
如果有多個(gè)搜索關(guān)鍵字, Elastic 認(rèn)為它們是or關(guān)系
$ curl 'localhost:9200/accounts/person/_search' -d '
{
"query" : { "match" : { "desc" : "軟件 系統(tǒng)" }}
}'
上面代碼搜索的是軟件 or 系統(tǒng)。
如果要執(zhí)行多個(gè)關(guān)鍵詞的and搜索,必須使用布爾查詢。
$ curl 'localhost:9200/accounts/person/_search' -d '
{
"query": {
"bool": {
"must": [
{ "match": { "desc": "軟件" } },
{ "match": { "desc": "系統(tǒng)" } }
]
}
}
}'
5. ES與Spark整合
5.1 Maven配置
引入對(duì)應(yīng)依賴
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-13_2.10</artifactId>
<version>5.0.1</version>
</dependency>
5.2 寫(xiě)入Map對(duì)象
// 1
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.SparkConf;
// 2
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
...
// 3
SparkConf conf = ...
JavaSparkContext jsc = new JavaSparkContext(conf);
// 4
Map<String, ?> numbers = ImmutableMap.of("one", 1, "two", 2);
Map<String, ?> airports = ImmutableMap.of("OTP", "Otopeni", "SFO", "San Fran");
// 5
JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(numbers, airports));
// 6
JavaEsSpark.saveToEs(javaRDD, "spark/docs");
下面是代碼解釋?zhuān)?/p>
| 步驟 | 具體含義 |
|---|---|
![]() image
|
Spark imports |
![]() image
|
elasticsearch-hadoop imports |
![]() image
|
運(yùn)行Spark |
![]() image
|
使用了GuavaImmutable* 方法簡(jiǎn)化 Map, List 的創(chuàng)建 |
![]() image
|
創(chuàng)建 RDD |
![]() image
|
保存到ES中,Index為spark/docs
|
5.3 寫(xiě)入JSON對(duì)象
我們可以直接將Json字符串寫(xiě)入到ElasticSearch中,如下:
String json1 = "{\"reason\" : \"business\",\"airport\" : \"SFO\"}";
String json2 = "{\"participants\" : 5,\"airport\" : \"OTP\"}";
JavaSparkContext jsc = ...
JavaRDD<String> stringRDD = jsc.parallelize(ImmutableList.of(json1, json2));
JavaEsSpark.saveJsonToEs(stringRDD, "spark/json-trips");
5.4 Spark Streaming 寫(xiě)入數(shù)據(jù)
Java有一個(gè)專(zhuān)用的類(lèi),它提供與EsSparkStreaming類(lèi)似的功能,即包org.elasticsearch.spark.streaming.api.java中的JavaEsSparkStreaming(類(lèi)似于Spark的Java API的包):
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.elasticsearch.spark.streaming.api.java.JavaEsSparkStreaming;
...
SparkConf conf = ...
JavaSparkContext jsc = new JavaSparkContext(conf);
JavaStreamingContext jssc = new JavaSparkStreamingContext(jsc, Seconds.apply(1));
Map<String, ?> numbers = ImmutableMap.of("one", 1, "two", 2);
Map<String, ?> airports = ImmutableMap.of("OTP", "Otopeni", "SFO", "San Fran");
JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(numbers, airports));
Queue<JavaRDD<Map<String, ?>>> microbatches = new LinkedList<>();
microbatches.add(javaRDD);
JavaDStream<Map<String, ?>> javaDStream = jssc.queueStream(microbatches);
JavaEsSparkStreaming.saveToEs(javaDStream, "spark/docs");
jssc.start()
5.5 讀取數(shù)據(jù)
Java有一個(gè)專(zhuān)用的JavaPairRDD,返回的Tuple2值(或第二個(gè)元素)將文檔作為java.util集合返回。
import org.apache.spark.api.java.JavaSparkContext;
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
...
SparkConf conf = ...
JavaSparkContext jsc = new JavaSparkContext(conf);
JavaPairRDD<String, Map<String, Object>> esRDD =
JavaEsSpark.esRDD(jsc, "radio/artists");
5.6 其他操作
我們還可以將JavaBean 或者Spark SQL中的DataFrame存入到ES中,具體可以參考https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html#CO47-3
Ref
[1] https://www.elastic.co/guide/en/elasticsearch/reference/6.0/_basic_concepts.html (官方文檔)
[2] http://www.ruanyifeng.com/blog/2017/08/elasticsearch.html (阮一峰寫(xiě)的簡(jiǎn)要教程)
[3] https://www.elastic.co/guide/cn/elasticsearch/guide/cn/index.html (中文文檔,較舊)
[4]https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html (ES與Spark整合 官方文檔)
[5]https://www.iteblog.com/archives/1728.html (Spark+ES 實(shí)踐博客)
[6]http://wiki.jikexueyuan.com/project/elasticsearch-definitive-guide-cn/ (極客學(xué)院中文教程)





