Skip to content

Commit cd8f92b

Browse files
committed
group requests to reduce agent calls
1 parent eb2f485 commit cd8f92b

File tree

7 files changed

+159
-98
lines changed

7 files changed

+159
-98
lines changed

agent/agent.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import (
1414
// Agent is type for handling agent operations
1515
type Agent struct {
1616
Config *config.AgentConfig
17-
conns []gopsutilNet.ConnectionStat
17+
conns []gopsutilNet.ConnectionStat //todo: store only what we need.
1818
Hostname string
1919
mu sync.RWMutex // protects conns
2020
}

agent/http.go

Lines changed: 67 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -2,112 +2,127 @@ package agent
22

33
import (
44
"encoding/json"
5+
"fmt"
56
"net/http"
67
"strconv"
8+
"strings"
79

810
"github.com/cenkalti/log"
911
gopsutilNet "github.com/shirou/gopsutil/v4/net"
1012
gopsutilProcess "github.com/shirou/gopsutil/v4/process"
1113
)
1214

13-
// Response contains basic process information for API responses.
14-
type Response struct {
15+
// Process contains basic process information for API responses.
16+
type Process struct {
1517
Status string `json:"status"`
1618
Pid int32 `json:"pid"`
19+
Port int32 `json:"port"`
1720
Name string `json:"name"`
1821
CmdLine string `json:"cmdline"`
1922
}
2023

21-
// NetworkProcess represents process with its network connection.
22-
type NetworkProcess struct {
23-
process *gopsutilProcess.Process
24-
conn gopsutilNet.ConnectionStat
24+
// Response contains basic process information for API responses.
25+
type Response struct {
26+
Processes []*Process `json:"processes"`
2527
}
2628

27-
// parsePortParam parses and returns port number from the request.
28-
func parsePortParam(w http.ResponseWriter, req *http.Request) (uint32, error) {
29-
portParam, ok := req.URL.Query()["port"]
30-
log.Debugf("Looking for process of port: %s\n", portParam)
29+
// parsePortsParam parses and returns port numbers from the request.
30+
func parsePortsParam(w http.ResponseWriter, req *http.Request) ([]uint32, error) {
31+
portsParam := req.URL.Query().Get("ports")
32+
log.Debugf("Looking for process(es) for ports: %s\n", portsParam)
3133

32-
if !ok || len(portParam) < 1 {
33-
log.Errorln("port param is not provided.")
34-
return 0, nil
34+
if portsParam == "" {
35+
log.Errorln("ports param is not provided.")
36+
return nil, fmt.Errorf("ports param is required") //todo: check again.
3537
}
3638

37-
p, err := strconv.ParseInt(portParam[0], 10, 32)
38-
if err != nil {
39-
log.Errorln("error during string to int32: %s\n", err)
40-
return 0, err
39+
ports := strings.Split(portsParam, ",")
40+
41+
var portNumbers []uint32
42+
for _, port := range ports {
43+
p, err := strconv.Atoi(port)
44+
if err != nil {
45+
return nil, fmt.Errorf("invalid port number: %s", port)
46+
}
47+
portNumbers = append(portNumbers, uint32(p))
4148
}
42-
return uint32(p), nil
49+
50+
return portNumbers, nil
4351
}
4452

45-
// findProcess finds process from connections by given port.
46-
func findProcess(port uint32, conns []gopsutilNet.ConnectionStat) *NetworkProcess {
53+
// findProcesses finds process(es) those have connections with given ports.
54+
func findProcesses(ports []uint32, conns []gopsutilNet.ConnectionStat) []*Process {
55+
ps := make([]*Process, 0)
56+
4757
for _, conn := range conns {
48-
if conn.Laddr.Port != port {
58+
if !portExists(conn.Laddr.Port, ports) {
4959
continue
5060
}
5161

5262
process, err := gopsutilProcess.NewProcess(conn.Pid)
5363
if err != nil {
5464
log.Debugf("Error occured while finding the process %s\n", err.Error())
55-
return nil
65+
continue
5666
}
5767

5868
if process == nil {
59-
log.Debugf("Process could not found for %d\n", conn.Pid)
60-
return nil
69+
log.Debugf("Process not found for %d\n", conn.Pid)
70+
continue
71+
}
72+
73+
name, err := process.Name()
74+
if err != nil {
75+
log.Debugf("Name not found for %d\n", process.Pid)
6176
}
6277

63-
return &NetworkProcess{
64-
process: process,
65-
conn: conn,
78+
cmdline, err := process.Cmdline()
79+
if err != nil {
80+
log.Debugf("Cmdline not found for %d\n", process.Pid)
6681
}
82+
83+
p := &Process{
84+
Status: conn.Status,
85+
Pid: conn.Pid,
86+
Port: int32(conn.Laddr.Port),
87+
Name: name,
88+
CmdLine: cmdline,
89+
}
90+
91+
ps = append(ps, p)
6792
}
68-
return nil
93+
return ps
6994
}
7095

71-
// createResponse creates Response from given NetworkProcess parameter.
72-
func createResponse(np *NetworkProcess) *Response {
73-
if np == nil {
74-
return nil
75-
}
76-
name, err := np.process.Name()
77-
if err != nil {
78-
name = ""
79-
}
80-
cmdline, err := np.process.Cmdline()
81-
if err != nil {
82-
log.Debugf("Cmdline could not found for %d\n", np.process.Pid)
83-
}
84-
return &Response{
85-
Status: np.conn.Status,
86-
Pid: np.conn.Pid,
87-
Name: name,
88-
CmdLine: cmdline,
96+
func portExists(port uint32, ports []uint32) bool {
97+
for _, p := range ports {
98+
if p == port {
99+
return true
100+
}
89101
}
102+
return false
90103
}
91104

92105
// Process is handler for serving process info
93106
func (a *Agent) Process(w http.ResponseWriter, req *http.Request) {
94107
w.Header().Set("X-Kimo-Hostname", a.Hostname)
95108

96109
// todo: cache result for a short period (10s? 30s?)
97-
port, err := parsePortParam(w, req)
110+
ports, err := parsePortsParam(w, req)
98111
if err != nil {
99112
http.Error(w, "port param is required", http.StatusBadRequest)
100113
return
101114
}
102-
p := findProcess(port, a.GetConns())
103-
if p == nil {
104-
http.Error(w, "Connection not found", http.StatusNotFound)
115+
ps := findProcesses(ports, a.GetConns())
116+
if ps == nil {
117+
http.Error(w, "Connection(s) not found", http.StatusNotFound)
105118
return
106119
}
107120

108121
w.Header().Set("Content-Type", "application/json")
109-
ap := createResponse(p)
110-
err = json.NewEncoder(w).Encode(&ap)
122+
response := &Response{
123+
Processes: ps,
124+
}
125+
err = json.NewEncoder(w).Encode(&response)
111126
if err != nil {
112127
http.Error(w, "Can not encode agent process", http.StatusInternalServerError)
113128
}

kimo-agent-entrypoint.sh

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@ if [ "$i" = 0 ]; then
1919
exit 1
2020
fi
2121

22-
echo "mysql sleep query..."
23-
mysql -u"$user" -p"$password" -h"$host" -e "SELECT SLEEP(100000)" &
22+
echo "mysql sleep queries..."
23+
mysql -u"$user" -p"$password" -h"$host" -e "SELECT SLEEP(5000)" &
24+
mysql -u"$user" -p"$password" -h"$host" -e "SELECT SLEEP(6000)" &
25+
mysql -u"$user" -p"$password" -h"$host" -e "SELECT SLEEP(7000)" &
2426

2527
echo "running kimo agent..."
2628
kimo --debug agent

server/agent.go

Lines changed: 45 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,14 @@ import (
55
"encoding/json"
66
"fmt"
77
"net/http"
8+
"strings"
89

910
"github.com/cenkalti/log"
1011
)
1112

1213
// AgentProcess represents process info from a kimo-agent enhanced with response detail.
1314
type AgentProcess struct {
14-
Pid int
15+
Pid uint32
1516
Name string
1617
Cmdline string
1718
ConnectionStatus string
@@ -31,6 +32,7 @@ func NewAgentProcesss(ar *AgentResponse, address IPPort, err error) *AgentProces
3132
ap.Name = ar.Name
3233
ap.Cmdline = ar.CmdLine
3334
ap.hostname = ar.Hostname
35+
ap.ConnectionStatus = ar.Status
3436
}
3537
return ap
3638
}
@@ -50,8 +52,7 @@ func (ap *AgentProcess) Host() string {
5052

5153
// AgentClient represents an agent client to fetch get process from a kimo-agent
5254
type AgentClient struct {
53-
Host string
54-
Port uint32
55+
Address IPPort
5556
}
5657

5758
// AgentError represents an HTTP error that is retured from kimo-agent.
@@ -62,11 +63,12 @@ type AgentError struct {
6263

6364
// AgentResponse represents a success response from kimo-agent.
6465
type AgentResponse struct {
65-
Pid int
66-
Name string
67-
CmdLine string
68-
Hostname string
69-
ConnectionStatus string
66+
Hostname string
67+
Status string
68+
Pid uint32
69+
Name string
70+
CmdLine string
71+
Port uint32
7072
}
7173

7274
func (ae *AgentError) Error() string {
@@ -75,15 +77,13 @@ func (ae *AgentError) Error() string {
7577

7678
// NewAgentClient creates and returns a new AgentClient.
7779
func NewAgentClient(address IPPort) *AgentClient {
78-
ac := new(AgentClient)
79-
ac.Host = address.IP
80-
ac.Port = address.Port
81-
return ac
80+
// kimo-agent listens this address
81+
return &AgentClient{Address: address}
8282
}
8383

8484
// Get gets process info from kimo agent.
85-
func (ac *AgentClient) Get(ctx context.Context, port uint32) (*AgentResponse, error) {
86-
url := fmt.Sprintf("http://%s:%d/proc?port=%d", ac.Host, ac.Port, port)
85+
func (ac *AgentClient) Get(ctx context.Context, ports []uint32) ([]*AgentResponse, error) {
86+
url := fmt.Sprintf("http://%s:%d/proc?ports=%s", ac.Address.IP, ac.Address.Port, createPortsParam(ports))
8787
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
8888
if err != nil {
8989
return nil, err
@@ -101,27 +101,48 @@ func (ac *AgentClient) Get(ctx context.Context, port uint32) (*AgentResponse, er
101101
return nil, &AgentError{Hostname: hostname, Status: response.Status}
102102
}
103103

104-
type result struct {
104+
type process struct {
105105
Status string `json:"status"`
106-
Pid int32 `json:"pid"`
106+
Pid uint32 `json:"pid"`
107+
Port uint32 `json:"port"`
107108
Name string `json:"name"`
108109
CmdLine string `json:"cmdline"`
109110
}
110-
r := result{}
111+
112+
type Response struct {
113+
Processes []*process `json:"processes"`
114+
}
115+
116+
var r Response
111117
err = json.NewDecoder(response.Body).Decode(&r)
112118

113119
if err != nil {
114120
log.Errorln(err.Error())
115121
return nil, err
116122
}
117123

118-
return &AgentResponse{
119-
ConnectionStatus: r.Status,
120-
Pid: int(r.Pid),
121-
Name: r.Name,
122-
CmdLine: r.CmdLine,
123-
Hostname: hostname},
124-
nil
124+
ars := make([]*AgentResponse, 0)
125+
for _, p := range r.Processes {
126+
ar := &AgentResponse{
127+
Status: p.Status,
128+
Pid: p.Pid,
129+
Port: p.Port,
130+
Name: p.Name,
131+
CmdLine: p.CmdLine,
132+
Hostname: hostname,
133+
}
134+
ars = append(ars, ar)
135+
}
136+
137+
return ars, nil
138+
139+
}
140+
func createPortsParam(ports []uint32) string {
141+
numbers := make([]string, len(ports))
142+
for i, port := range ports {
143+
numbers[i] = fmt.Sprint(port)
144+
}
145+
return strings.Join(numbers, ",")
125146
}
126147

127148
func findAgentProcess(addr IPPort, aps []*AgentProcess) *AgentProcess {

0 commit comments

Comments
 (0)