diff --git a/main.go b/main.go index d2fec2e..306384a 100644 --- a/main.go +++ b/main.go @@ -37,7 +37,10 @@ func main() { utils.GetLogger(ctx).WithError(err).Error("create task manager") return } - taskMgr.Run(ctx) + if err := taskMgr.Run(ctx); err != nil { + utils.GetLogger(ctx).WithError(err).Error("run task manager") + return + } return } diff --git a/pkg/es/es5settings.go b/pkg/es/es5settings.go index 0f995fa..78deff9 100644 --- a/pkg/es/es5settings.go +++ b/pkg/es/es5settings.go @@ -110,7 +110,7 @@ func (v5 *V5Settings) ToESV6Mapping(targetIndex string) map[string]interface{} { } func (v5 *V5Settings) ToESV7Mapping(targetIndex string) map[string]interface{} { - return v5.ToESV5Mapping(targetIndex) + return v5.ToESV8Mapping() } func (v5 *V5Settings) ToESV8Mapping() map[string]interface{} { diff --git a/pkg/es/es7.go b/pkg/es/es7.go index df46c1f..42990a1 100644 --- a/pkg/es/es7.go +++ b/pkg/es/es7.go @@ -45,6 +45,31 @@ func (es *V7) GetClusterVersion() string { return es.ClusterVersion } +type ScrollResultV7 struct { + Took int `json:"took,omitempty"` + ScrollId string `json:"_scroll_id,omitempty"` + TimedOut bool `json:"timed_out,omitempty"` + Hits struct { + MaxScore float32 `json:"max_score,omitempty"` + Total struct { + Value int `json:"value,omitempty"` + Relation string `json:"relation,omitempty"` + } `json:"total,omitempty"` + Docs []interface{} `json:"hits,omitempty"` + } `json:"hits"` + Shards struct { + Successful int `json:"successful,omitempty"` + Skipped int `json:"skipped,omitempty"` + Failed int `json:"failed,omitempty"` + Failures []struct { + Shard int `json:"shard,omitempty"` + Index string `json:"index,omitempty"` + Status int `json:"status,omitempty"` + Reason interface{} `json:"reason,omitempty"` + } `json:"failures,omitempty"` + } `json:"_shards,omitempty"` +} + func (es *V7) SearchByScroll(ctx context.Context, index string, query map[string]interface{}, sort string, size int, yield func(*ScrollResultYield)) error { scrollSearchOptions := []func(*esapi.SearchRequest){ @@ -76,7 +101,7 @@ func (es *V7) SearchByScroll(ctx context.Context, index string, query map[string return errors.New(res.String()) } - var scrollResult ScrollResultV5 + var scrollResult ScrollResultV7 if err := json.NewDecoder(res.Body).Decode(&scrollResult); err != nil { return errors.WithStack(err) } @@ -97,7 +122,7 @@ func (es *V7) SearchByScroll(ctx context.Context, index string, query map[string } yield(&ScrollResultYield{ - Total: uint64(scrollResult.Hits.Total), + Total: uint64(scrollResult.Hits.Total.Value), Docs: hitDocs, }) @@ -117,7 +142,7 @@ func (es *V7) SearchByScroll(ctx context.Context, index string, query map[string return errors.New(res.String()) } - var scrollResult ScrollResultV5 + var scrollResult ScrollResultV7 if err := json.NewDecoder(res.Body).Decode(&scrollResult); err != nil { return errors.WithStack(err) } @@ -135,7 +160,7 @@ func (es *V7) SearchByScroll(ctx context.Context, index string, query map[string } yield(&ScrollResultYield{ - Total: uint64(scrollResult.Hits.Total), + Total: uint64(scrollResult.Hits.Total.Value), Docs: hitDocs, }) return nil @@ -218,7 +243,6 @@ func (es *V7) BulkInsert(index string, hitDocs []Doc) error { "index": map[string]interface{}{ "_index": index, "_id": doc.ID, - "_type": doc.Type, }, } metaBytes, _ := json.Marshal(meta) diff --git a/pkg/es/es8.go b/pkg/es/es8.go index 00eea85..3395c6b 100644 --- a/pkg/es/es8.go +++ b/pkg/es/es8.go @@ -244,7 +244,6 @@ func (es *V8) BulkInsert(index string, hitDocs []Doc) error { "index": map[string]interface{}{ "_index": index, "_id": doc.ID, - "_type": doc.Type, }, } metaBytes, _ := json.Marshal(meta)