Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit the memory size of falcon-agent and the number of metric pushed #912

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions modules/agent/cfg.example.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
{
"agent_mem_limit": 200,
"agent_mem_ctrl": false,
"debug": true,
"hostname": "",
"ip": "",
"batch": 2000,
"plugin": {
"enabled": false,
"dir": "./plugin",
Expand Down
75 changes: 75 additions & 0 deletions modules/agent/funcs/agentmonitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package funcs

import (
"bufio"
"bytes"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strconv"
"strings"

"github.com/open-falcon/falcon-plus/g"
"github.com/toolkits/file"
)

var (
memLimit int = 200 // agent最大可以使用的内存,单位MB
cgroupRoot string = "/sys/fs/cgroup/memory/falcon-agent"
)

const (
procsFile = "cgroup.procs"
memStat = "memory.stat"
mb = 1024 * 1024
)

// InitCgroup init falcon-agent
func InitCgroup() {
pid := g.Pid("agent")
_ = os.RemoveAll(cgroupRoot)
// create falcon-agent cgroup dir
err := os.Mkdir(cgroupRoot, 751)
if err != nil {
fmt.Println("falcon-agent cgroup init failed", err)
return
}
// set memory limit
pPath := filepath.Join(cgroupRoot, procsFile)
err = ioutil.WriteFile(pPath, []byte(fmt.Sprintf("%s", pid)), 644)
if err != nil {
fmt.Println("falcon-agent cgroup write cgroup.procs failed", err)
return
}
}

// GetAgentMem get agent memory info
func GetAgentMem() (int64, error) {
filePath := filepath.Join(cgroupRoot, memStat)
contents, err := ioutil.ReadFile(filePath)
if err != nil {
fmt.Printf("error: %v", err)
return 0, err
}

reader := bufio.NewReader(bytes.NewBuffer(contents))
var agentRSS int64
for {
info, err := file.ReadLine(reader)
if err != nil {
return agentRSS, err
}
fields := strings.Fields(string(info))
if len(fields) < 2 || fields[0] != "rss" {
continue
}
val, numErr := strconv.ParseInt(fields[1], 10, 64)
if numErr != nil {
continue
}
agentRSS = val / mb
break
}
return agentRSS, nil
}
25 changes: 25 additions & 0 deletions modules/agent/funcs/agentmonitor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package funcs

import (
"os"
"testing"
)

func TestGetAgentMem(t *testing.T) {
_ = os.RemoveAll(cgroupRoot)
// create falcon-agent cgroup dir
err := os.Mkdir(cgroupRoot, 751)
if err != nil {
t.Error(err)
_ = os.RemoveAll(cgroupRoot)
return
}
_, err = GetAgentMem()
if err != nil {
t.Error(err)
_ = os.RemoveAll(cgroupRoot)
return
}
_ = os.RemoveAll(cgroupRoot)
return
}
39 changes: 39 additions & 0 deletions modules/agent/g/cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"encoding/json"
"log"
"os"
"strconv"
"strings"
"sync"

"github.com/toolkits/file"
Expand Down Expand Up @@ -56,9 +58,12 @@ type CollectorConfig struct {
}

type GlobalConfig struct {
AgentMemLimit uint64 `json:"agent_mem_limit"`
AgentMemCtrl bool `json:"agent_mem_ctrl"`
Debug bool `json:"debug"`
Hostname string `json:"hostname"`
IP string `json:"ip"`
Batch int `json:"batch,omitempty"`
Plugin *PluginConfig `json:"plugin"`
Heartbeat *HeartbeatConfig `json:"heartbeat"`
Transfer *TransferConfig `json:"transfer"`
Expand Down Expand Up @@ -134,6 +139,40 @@ func ParseConfig(cfg string) {
log.Fatalln("parse config file:", cfg, "fail:", err)
}

memCtrl := os.Getenv("AGENT_MEM_CTRL")
if memCtrl != "" {
if strings.ToLower(memCtrl) == "true" {
c.AgentMemCtrl = true
} else {
c.AgentMemCtrl = false
}
log.Println("set AgentMemCtrl:", c.AgentMemCtrl, "from env")
}

transferAddr := os.Getenv("TRANSFER_URL")
if transferAddr != "" {
c.Transfer.Addrs = strings.Split(transferAddr, ",")
log.Println("set transfer url: " + transferAddr + " from env")
}

heartbeatURL := os.Getenv("HEARTBEAT_URL")
if len(heartbeatURL) != 0 {
c.Heartbeat.Addr = heartbeatURL
log.Println("set heartbeat URL: " + heartbeatURL + " from env")
}

limitBatch, err := strconv.Atoi(os.Getenv("LIMIT_BATCH"))
if err != nil {
log.Println("invalid limit Batch: ", limitBatch)
} else {
c.Batch = limitBatch
log.Println("set limit Batch: ", limitBatch, "from env")
}
if c.Batch <= 0 {
c.Batch = 2000
log.Println("set batch default size: ", c.Batch)
}

lock.Lock()
defer lock.Unlock()

Expand Down
39 changes: 35 additions & 4 deletions modules/agent/http/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,58 @@ package http

import (
"encoding/json"
"log"
"net/http"
"strconv"
"sync"

"github.com/open-falcon/falcon-plus/common/model"
"github.com/open-falcon/falcon-plus/modules/agent/funcs"
"github.com/open-falcon/falcon-plus/modules/agent/g"
"net/http"
)

var once sync.Once

func configPushRoutes() {
http.HandleFunc("/v1/push", func(w http.ResponseWriter, req *http.Request) {
if req.ContentLength == 0 {
http.Error(w, "body is blank", http.StatusBadRequest)
return
}

if g.Config().AgentMemCtrl == true {
ok := isHandReq(w)
if !ok {
return
}
}
decoder := json.NewDecoder(req.Body)
var metrics []*model.MetricValue
err := decoder.Decode(&metrics)
if err != nil {
http.Error(w, "connot decode body", http.StatusBadRequest)
http.Error(w, "cannot decode body", http.StatusBadRequest)
return
}
if len(metrics) > g.Config().Batch {
apriltommy0525 marked this conversation as resolved.
Show resolved Hide resolved
g.SendToTransfer(metrics[:g.Config().Batch])
http.Error(w, "post Metric too Big !!! have sent max Batch: "+strconv.Itoa(g.Config().Batch), http.StatusBadRequest)
return
}

g.SendToTransfer(metrics)
w.Write([]byte("success"))
})
}

func isHandReq(w http.ResponseWriter) bool {
once.Do(funcs.InitCgroup)
memUsed, err := funcs.GetAgentMem()
if err != nil {
RenderMsgJson(w, err.Error())
return false
}
if uint64(memUsed) > g.Config().AgentMemLimit {
log.Printf("memory consumption has exceeded the threshold")
http.Error(w, "memory consumption has exceeded the threshold", http.StatusBadRequest)
return false
}
return true
}