查询数据

package api

import (
	"encoding/json"
	"fmt"
	"net/http"

	"github.com/gin-gonic/gin"
	"github.com/olivere/elastic/v7"
)

type EsInfo struct {
	Host       string `json:"host,omitempty"`
	User       string `json:"user,omitempty"`
	Pwd        string `json:"password,omitempty"`
	Timeout    string `json:"timeout,omitempty"`
}


type EsCli struct {
	Es                *elastic.Client
	TimeOut           string
}

type HourCountEsResponse struct {
	Aggregations struct {
		Hourcount struct {
			Buckets []struct {
				Key          string `json:"key"`
				SumHourcount struct {
					Value float64 `json:"value"`
				} `json:"sum_hour_count"`
			} `json:"buckets"`
		} `json:"hour_count"`
	} `json:"aggregations"`
}

var	queryStr = `
	{
		"aggs": {
			  "hour_count": {
				  "aggs": {
					  "sum_hour_count": {
						  "sum": {
							  "field": "hour_count"
						  }
					  }
				  },
				  "terms": {
					  "field": "project"
				  }
			  }
		  },
		  "size":0
	  }
	`

func apiEsQuery(c *gin.Context) {
	esInfo := &EsInfo{
		Host:           "**",
		User:           "**",
		Pwd:            "**",
		Timeout:        "10",
	}
	var (
		cli *EsCli
	    err error
    )
    //初始化es
	if cli, err = InitEs(esInfo); err != nil {
		fmt.Printf("[%s] err: %s", tid, err.Error())
		return
	}
    //查询
	var esRawResp *elastic.Response
	if esRawResp, err = QueryES(cli,"GET", "/indexName/_search", "application/json;charset=UTF-8", queryStr); err != nil {
		fmt.Printf("[%s] err: %s", tid, err.Error())
		return
	}
	if esRawResp.StatusCode != 200 {
		fmt.Printf("[%s] err: es response code %d", tid, esRawResp.StatusCode)
		return
	}
	//aggregations 解析结果
	esResp := &HourCountEsResponse{}
	if err = json.Unmarshal(esRawResp.Body, esResp); err != nil {
		fmt.Printf("[%s] err: %s", tid, err.Error())
		return
	}

	fmt.Printf("esResp:%+#v", esResp.Aggregations.Hourcount.Buckets)
	res := make(map[string]interface{})
	for _, v := range esResp.Aggregations.Hourcount.Buckets {
		res[v.Key] = v.SumHourcount.Value
	}
	fmt.Printf("esResp:%+#v", res)
}

func  QueryES(cli *EsCli, method, path, contentType string, body interface{}) (*elastic.Response, error) {
	reqopts := elastic.PerformRequestOptions{
		Method:      method,
		Path:        path, // build url
		Body:        body,
		ContentType: contentType,
	}
	timeout := "20"

	to, err := time.ParseDuration(timeout)
	if err != nil {
		fmt.Printf("time duration: %s", err.Error())
		return nil, err
	}

	ctx, cancel := context.WithTimeout(context.Background(), to)
	defer cancel()

	return cli.Es.PerformRequest(ctx, reqopts)
}

func InitEs(esInfo *models.EsInfo) (*EsCli, error) {
	to, err := time.ParseDuration(esInfo.Timeout)
	if err != nil {
		fmt.Printf("time duration: %s", err.Error())
		return nil, err
	}

	httpCli := &http.Client{
		Timeout: to,
		Transport: &http.Transport{
			TLSClientConfig: &tls.Config{
				InsecureSkipVerify: true,
			},
			TLSHandshakeTimeout: 10 * time.Second,
			MaxIdleConns:        100,
			MaxIdleConnsPerHost: 100,
		},
	}

	client, err := elastic.NewClient(elastic.SetHttpClient(httpCli),
		elastic.SetURL(esInfo.Host),
		elastic.SetSniff(false), // disable sniffing
		elastic.SetBasicAuth(esInfo.User, esInfo.Pwd),
	)
	if err != nil {
		fmt.Printf("NewClient %s", err.Error())
		return nil, err
	}

	ctx := context.Background()
	info, code, err := client.Ping(esInfo.Host).Do(ctx)
	if err != nil {
		fmt.Printf("Ping %s", err.Error())
		return nil, err
	}

	return &EsCli{
		Es:                client,
		TimeOut:           esInfo.Timeout
	}, nil
}
Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐