今天我們一起來寫一個(gè)從csv文件將數(shù)據(jù)導(dǎo)入elastic search的小程序
準(zhǔn)備工作
在gopath的src文件夾下創(chuàng)建csv2es文件夾,并創(chuàng)建main.go文件。下載一些csv文件備用
解析命令行參數(shù)
首先我們需要使用flag package解析命令行參數(shù),代碼如下
func main() {
// 解析命令行輸入
host := flag.String("host", "http://localhost:9200", "host, e.g. http://localhost:9200")
file := flag.String("file", "", "file path")
esIndex := flag.String("index", "", "elastic search index")
esType := flag.String("type", "", "elastic search type")
flag.Parse()
if *file == "" {
fmt.Println("please set which csv file you want to import clearly")
return
}
if *esIndex == "" {
fmt.Println("please set elastic search index")
return
}
if *esType == "" {
fmt.Println("please set elastic search type")
return
}
...
}
主要解析的參數(shù)有es的地址,待導(dǎo)入的文件的路徑,導(dǎo)入到的es的index和type。運(yùn)行g(shù)o build,生成可執(zhí)行文件csv2go,執(zhí)行
csv2go -h
-file string
file path (default "")
-host string
host, e.g. http://localhost:9200 (default "http://localhost:9200")
-index string
elastic search index (default "")
-type string
elastic search type (default "")
連接es
elastic這個(gè)開源項(xiàng)目可以幫助我們連接elastic
// 連接es
ctx := context.Background()
client, err := elastic.NewClient(
elastic.SetURL(*host),
elastic.SetSniff(false))
if err != nil {
panic(err)
}
// 檢查index是否存在,如果不存在則創(chuàng)建index
exists, err := client.IndexExists(*esIndex).Do(ctx)
if err != nil {
panic(err)
}
if !exists {
createIndex, err := client.CreateIndex(*esIndex).Do(ctx)
if err != nil {
panic(err)
}
}
解析csv并導(dǎo)入(index)到elastic search
這里需要注意幾點(diǎn)。第一,Mac上會(huì)存在\r結(jié)尾的文件的問題,所以我們使用macreader這個(gè)包對(duì)io.Reader包了一層,有興趣的同學(xué)可以看我之前的文章《mac上的文件有毒》,第二,我們默認(rèn)csv文件的第一行為column name,后面各行都是合法的記錄。
// 解析csv
f, _ := os.Open(*file)
r := csv.NewReader(macreader.New(bufio.NewReader(f)))
keys, err := r.Read()
for { //1
record, err := r.Read()
if err == io.EOF {
break
}
m := make(map[string]string)
for i, key := range keys {
m[key] = record[i]
}
jsonStr, err := json.Marshal(m)
if err != nil {
panic(err)
}
put1, err := client.Index().
Index(*esIndex).
Type(*esType).
BodyString(string(jsonStr)).
Do(ctx)
if err != nil {
// Handle error
panic(err)
}
fmt.Printf("Indexed tweet %s to index %s, type %s\n", put1.Id, put1.Index, put1.Type)
} //2
ok,一個(gè)基本的將csv中的數(shù)據(jù)導(dǎo)入elastic search的程序完成了,我們來測試一下性能吧。在上面的(1)行代碼前面加上
start := time.Now().Unix()
在上面的(2)行代碼后面加上
end := time.Now().Unix()
fmt.Println(end, start)
測試了一下跑了一個(gè)61567條記錄的文件,一共跑了36分鐘。夠我睡一個(gè)午覺了...
提升效率
elastic search有一個(gè)bulk api,可以將一些操作合并起來,同時(shí)傳遞給elastic search處理并返回
// 新建一個(gè)Bulk
bulkRequest := client.Bulk()
for {
...
// 為每一條記錄生成一個(gè)IndexRequest并加入Bulk
req := elastic.NewBulkIndexRequest().Index(*esIndex).Type(*esType).Doc(string(jsonStr))
bulkRequest.Add(req)
}
// 一次性完成請求
bulkResponse, err := bulkRequest.Do(ctx)
if err != nil {
}
indexed := bulkResponse.Indexed()
fmt.Println("向es導(dǎo)入了",len(indexed),"條數(shù)據(jù)")
優(yōu)化過后,插入相同的6萬多條記錄只需要幾秒鐘。cool _
本文的代碼已經(jīng)開源在github,歡迎使用或者提出意見。