github.com/olivere/elastic
options := []elastic.ClientOptionFunc{
elastic.SetURL(“http://xxxxxxx:9200”),
elastic.SetSniff(true), //是否开启集群嗅探
elastic.SetHealthcheckInterval(10 * time.Second), //设置两次运行状况检查之间的间隔, 默认60s
elastic.SetGzip(false), //启用或禁用gzip压缩
elastic.SetErrorLog(log.New(os.Stderr, “ELASTIC “, log.LstdFlags)), //ERROR日志输出配置
elastic.SetInfoLog(log.New(os.Stdout, “”, log.LstdFlags)), //INFO级别日志输出配置
}
options = append(options, elastic.SetBasicAuth(
“xxxx”, //账号
“xxxxxxxxxxxxxx”, //密码
))
con, err := elastic.NewClient(options…)
1 创建索引
_, err = conf.ES().CreateIndex(indexName).BodyJson(mapping).Do(context.Background())
2 判断索引是否存在
exists, err := conf.ES().IndexExists(indexName).Do(context.Background())
3 更新索引
_, err = conf.ES().PutMapping().
Index(indexName).
Type(“_doc”).
BodyJson(mapping).
Do(context.Background())
4 删除索引
_, err = conf.ES().DeleteIndex(indexName).Do(context.Background())
5 数据迁移
type mi = map[string]interface{}
_, err = conf.ES().Reindex().Body(mi{
“source”: mi{
“index”: oldIndexName,
},
“dest”: mi{
“index”: newIndexName,
},
}).Do(context.Background())
6 设置别名
_, err = conf.ES().Alias().Action(
elastic.NewAliasAddAction(oldIndexName).Index(newIndexName),
).Do(context.Background())
7 新增或覆盖数据(单条)
_, err = conf.ES().Index().
Index(indexName).
Type(“_doc”).
// id为字符串, 创建一条此id的数据或覆盖已有此id的记录
// data为结构体或map, 当然结构需要跟索引的mapping类型保持一致
Id(id).BodyJson(data).
Do(context.Background())
8 根据id新增或更新数据(单条)
_, err = conf.ES().Update().
Index(t.index()).
Type(“_doc”).
Id(id).
// data为结构体或map, 需注意的是如果使用结构体零值也会去更新原记录
Upsert(data).
// true 无则插入, 有则更新, 设置为false时记录不存在将报错
DocAsUpsert(true).
Do(context.Background())
9,根据id新增或更新数据(批量)
bulkRequest := conf.ES().Bulk()
// data map[int]interface{}, key为id, value为要更新的数据
for id, v := range data {
doc := elastic.NewBulkUpdateRequest().
Index(t.index()).
Type(“_doc”).
Id(strconv.Itoa(id)).
Doc(v).
// true 无则插入, 有则更新, 设置为false时记录不存在将报错
DocAsUpsert(true)
bulkRequest.Add(doc)
}
bulkResponse, err := bulkRequest.Do(context.Background())
if err != nil {
return
}
// 获取操作失败的记录
bad := bulkResponse.Failed()
if len(bad) > 0 {
s, _ := jsoniter.MarshalToString(bad)
err = errors.New(“部分记录更新失败 “ + s)
}
10.根据条件更新数据
_, err = conf.ES().UpdateByQuery().
Index(indexName).
Type(“_doc”).
//查询条件, 详细配置查询条件请查看章节 5
Query(query).
//要执行的更新操作, 详细配置请查看章节 6及7.1
Script(script).
Do(context.Background())
https://blog.csdn.net/p1049990866/article/details/117254708