Skip to content

Commit

Permalink
ignore insecure ssl, support override type name, fix connection leak
Browse files Browse the repository at this point in the history
  • Loading branch information
medcl committed Apr 18, 2019
1 parent 8eb8ac6 commit 1cda69f
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 24 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# Created by .ignore support plugin (hsz.mobi)
.DS_Store

### Go template
# Compiled Object files, Static and Dynamic libs (Shared Objects)
*.o
Expand Down
8 changes: 7 additions & 1 deletion bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,21 @@ func (c *Migrator) NewBulkWorker(docCount *int, pb *pb.ProgressBar, wg *sync.Wai
}

var tempDestIndexName string
var tempTargetTypeName string
tempDestIndexName = docI["_index"].(string)
tempTargetTypeName = docI["_type"].(string)

if c.Config.TargetIndexName != "" {
tempDestIndexName = c.Config.TargetIndexName
}

if c.Config.OverrideTypeName != "" {
tempTargetTypeName = c.Config.OverrideTypeName
}

doc := Document{
Index: tempDestIndexName,
Type: docI["_type"].(string),
Type: tempTargetTypeName,
source: docI["_source"].(map[string]interface{}),
Id: docI["_id"].(string),
}
Expand Down
3 changes: 2 additions & 1 deletion domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,10 @@ type Config struct {
ShardsCount int `long:"shards" description:"set a number of shards on newly created indexes"`
SourceIndexNames string `short:"x" long:"src_indexes" description:"indexes name to copy,support regex and comma separated list" default:"_all"`
TargetIndexName string `short:"y" long:"dest_index" description:"indexes name to save, allow only one indexname, original indexname will be used if not specified" default:""`
OverrideTypeName string `short:"u" long:"type_override" description:"override type name" default:""`
WaitForGreen bool `long:"green" description:"wait for both hosts cluster status to be green before dump. otherwise yellow is okay"`
LogLevel string `short:"v" long:"log" description:"setting log level,options:trace,debug,info,warn,error" default:"INFO"`
DumpOutFile string `short:"o" long:"output_file" description:"output documents of source index into local file" `
DumpOutFile string `short:"o" long:"output_file" description:"output documents of source index into local file" `
DumpInputFile string `short:"i" long:"input_file" description:"indexing from local dump file" `
SourceProxy string `long:"source_proxy" description:"set proxy to source http connections, ie: http://127.0.0.1:8080"`
TargetProxy string `long:"dest_proxy" description:"set proxy to target http connections, ie: http://127.0.0.1:8080"`
Expand Down
44 changes: 39 additions & 5 deletions http.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,26 @@ import (
"errors"
"bytes"
"net/url"
"crypto/tls"
)

func Get(url string,auth *Auth,proxy string) (*http.Response, string, []error) {
request := gorequest.New()

tr := &http.Transport{
DisableKeepAlives: true,
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
request.Transport=tr


if(auth!=nil){
request.SetBasicAuth(auth.User,auth.Pass)
}

request.Header.Set("Content-Type", "application/json")

request.Header["Content-Type"]= "application/json"
//request.Header.Set("Content-Type", "application/json")

if(len(proxy)>0){
request.Proxy(proxy)
}
Expand All @@ -46,19 +56,25 @@ func Get(url string,auth *Auth,proxy string) (*http.Response, string, []error) {

func Post(url string,auth *Auth, body string,proxy string)(*http.Response, string, []error) {
request := gorequest.New()
tr := &http.Transport{
DisableKeepAlives: true,
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
request.Transport=tr

if(auth!=nil){
request.SetBasicAuth(auth.User,auth.Pass)
}

request.Header.Set("Content-Type", "application/json")
request.Header["Content-Type"]="application/json"

if(len(proxy)>0){
request.Proxy(proxy)
}

request.Post(url)

if(len(body)>0){
if(len(body)>0) {
request.Send(body)
}

Expand All @@ -76,6 +92,7 @@ func newDeleteRequest(client *http.Client,method, urlStr string) (*http.Request,
if err != nil {
return nil, err
}

req := &http.Request{
Method: method,
URL: u,
Expand All @@ -90,18 +107,30 @@ func newDeleteRequest(client *http.Client,method, urlStr string) (*http.Request,

func Request(method string,r string,auth *Auth,body *bytes.Buffer,proxy string)(string,error) {

//TODO use global client
var client *http.Client
client = &http.Client{}
if(len(proxy)>0){
proxyURL, err := url.Parse(proxy)
if(err!=nil){
log.Error(err)
}else{
transport := &http.Transport{Proxy: http.ProxyURL(proxyURL)}
transport := &http.Transport{
Proxy: http.ProxyURL(proxyURL),
DisableKeepAlives: true,
}
client = &http.Client{Transport: transport}
}
}

tr := &http.Transport{
DisableKeepAlives: true,
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
}

client.Transport=tr

var reqest *http.Request
if(body!=nil){
Expand All @@ -122,6 +151,11 @@ func Request(method string,r string,auth *Auth,body *bytes.Buffer,proxy string)(
return "",errs
}

if resp!=nil&& resp.Body!=nil{
//io.Copy(ioutil.Discard, resp.Body)
defer resp.Body.Close()
}

if resp.StatusCode != 200 {
b, _ := ioutil.ReadAll(resp.Body)
return "",errors.New("server error: "+string(b))
Expand Down
9 changes: 8 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
pb "gopkg.in/cheggaaa/pb.v1"
"os"
"io"
"io/ioutil"
)

func main() {
Expand Down Expand Up @@ -442,7 +443,13 @@ func (c *Migrator) recoveryIndexSettings(sourceIndexRefreshSettings map[string]i
func (c *Migrator) ClusterVersion(host string, auth *Auth,proxy string) (*ClusterVersion, []error) {

url := fmt.Sprintf("%s", host)
_, body, errs := Get(url, auth,proxy)
resp, body, errs := Get(url, auth,proxy)

if resp!=nil&& resp.Body!=nil{
io.Copy(ioutil.Discard, resp.Body)
defer resp.Body.Close()
}

if errs != nil {
log.Error(errs)
return nil, errs
Expand Down
46 changes: 34 additions & 12 deletions v0.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,12 @@ type ESAPIV0 struct {
func (s *ESAPIV0) ClusterHealth() *ClusterHealth {

url := fmt.Sprintf("%s/_cluster/health", s.Host)
_, body, errs := Get(url, s.Auth,s.HttpProxy)
r, body, errs := Get(url, s.Auth,s.HttpProxy)

if r!=nil&& r.Body!=nil{
io.Copy(ioutil.Discard, r.Body)
defer r.Body.Close()
}

if errs != nil {
return &ClusterHealth{Name: s.Host, Status: "unreachable"}
Expand Down Expand Up @@ -80,11 +85,15 @@ func (s *ESAPIV0) GetIndexSettings(indexNames string) (*Indexes, error) {

url := fmt.Sprintf("%s/%s/_settings", s.Host, indexNames)
resp, body, errs := Get(url, s.Auth,s.HttpProxy)

if resp!=nil&& resp.Body!=nil{
io.Copy(ioutil.Discard, resp.Body)
defer resp.Body.Close()
}

if errs != nil {
return nil, errs[0]
}
io.Copy(ioutil.Discard, resp.Body)
defer resp.Body.Close()

if resp.StatusCode != 200 {
return nil, errors.New(body)
Expand All @@ -104,12 +113,17 @@ func (s *ESAPIV0) GetIndexSettings(indexNames string) (*Indexes, error) {
func (s *ESAPIV0) GetIndexMappings(copyAllIndexes bool, indexNames string) (string, int, *Indexes, error) {
url := fmt.Sprintf("%s/%s/_mapping", s.Host, indexNames)
resp, body, errs := Get(url, s.Auth,s.HttpProxy)

if resp!=nil&& resp.Body!=nil{
io.Copy(ioutil.Discard, resp.Body)
defer resp.Body.Close()
}

if errs != nil {
log.Error(errs)
return "", 0, nil, errs[0]
}
io.Copy(ioutil.Discard, resp.Body)
defer resp.Body.Close()


if resp.StatusCode != 200 {
return "", 0, nil, errors.New(body)
Expand Down Expand Up @@ -288,7 +302,11 @@ func (s *ESAPIV0) Refresh(name string) (err error) {

url := fmt.Sprintf("%s/%s/_refresh", s.Host, name)

Post(url,s.Auth,"",s.HttpProxy)
resp,_,_:=Post(url,s.Auth,"",s.HttpProxy)
if resp!=nil&& resp.Body!=nil{
io.Copy(ioutil.Discard, resp.Body)
defer resp.Body.Close()
}

return nil
}
Expand Down Expand Up @@ -327,14 +345,15 @@ func (s *ESAPIV0) NewScroll(indexNames string, scrollTime string, docBufferCount
}
resp, body, errs := Post(url, s.Auth,jsonBody,s.HttpProxy)


if resp!=nil&& resp.Body!=nil{
io.Copy(ioutil.Discard, resp.Body)
defer resp.Body.Close()
}

if err != nil {
log.Error(errs)
return nil, errs[0]
}
io.Copy(ioutil.Discard, resp.Body)
defer resp.Body.Close()

log.Trace("new scroll,",url, body)

Expand Down Expand Up @@ -362,6 +381,12 @@ func (s *ESAPIV0) NextScroll(scrollTime string, scrollId string) (*Scroll, error
id := bytes.NewBufferString(scrollId)
url := fmt.Sprintf("%s/_search/scroll?scroll=%s&scroll_id=%s", s.Host, scrollTime, id)
resp, body, errs := Get(url, s.Auth,s.HttpProxy)

if resp!=nil&& resp.Body!=nil{
io.Copy(ioutil.Discard, resp.Body)
defer resp.Body.Close()
}

if errs != nil {
log.Error(errs)
return nil, errs[0]
Expand All @@ -371,9 +396,6 @@ func (s *ESAPIV0) NextScroll(scrollTime string, scrollId string) (*Scroll, error
return nil, errors.New(body)
}

io.Copy(ioutil.Discard, resp.Body)
defer resp.Body.Close()

log.Trace("next scroll,",url,body)

// decode elasticsearch scroll response
Expand Down
15 changes: 11 additions & 4 deletions v5.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,15 @@ func (s *ESAPIV5) NewScroll(indexNames string,scrollTime string,docBufferCount i

resp, body, errs := Post(url, s.Auth,jsonBody,s.HttpProxy)

if resp!=nil&& resp.Body!=nil{
io.Copy(ioutil.Discard, resp.Body)
defer resp.Body.Close()
}

if errs != nil {
log.Error(errs)
return nil,errs[0]
}
io.Copy(ioutil.Discard, resp.Body)
defer resp.Body.Close()

if resp.StatusCode != 200 {
return nil,errors.New(body)
Expand Down Expand Up @@ -140,12 +143,16 @@ func (s *ESAPIV5) NextScroll(scrollTime string,scrollId string)(*Scroll,error)

url:=fmt.Sprintf("%s/_search/scroll?scroll=%s&scroll_id=%s", s.Host, scrollTime, id)
resp,body, errs := Get(url,s.Auth,s.HttpProxy)

if resp!=nil&& resp.Body!=nil{
io.Copy(ioutil.Discard, resp.Body)
defer resp.Body.Close()
}

if errs != nil {
log.Error(errs)
return nil,errs[0]
}
io.Copy(ioutil.Discard, resp.Body)
defer resp.Body.Close()

if resp.StatusCode != 200 {
return nil,errors.New(body)
Expand Down

0 comments on commit 1cda69f

Please sign in to comment.