Skip to content

Commit

Permalink
Only cache metadata if successfully gathered
Browse files Browse the repository at this point in the history
  • Loading branch information
brancz committed Sep 6, 2024
1 parent 69ac34b commit aef364d
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 31 deletions.
3 changes: 2 additions & 1 deletion reporter/metadata/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ func NewAgentMetadataProvider(revision string) MetadataProvider {
return &agentMetadataProvider{revision: revision}
}

func (p *agentMetadataProvider) AddMetadata(_ libpf.PID, lb *labels.Builder) {
func (p *agentMetadataProvider) AddMetadata(_ libpf.PID, lb *labels.Builder) bool {
lb.Set("__meta_agent_revision", p.revision)
return true
}
27 changes: 15 additions & 12 deletions reporter/metadata/containermetadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ var (
// MetadataProvider implementations support adding metadata to a labels.Builder.
type MetadataProvider interface {
// AddMetadata adds metadata to the provided labels.Builder for the given PID.
AddMetadata(pid libpf.PID, lb *labels.Builder)
// It returns whether the metadata can be safely cached.
AddMetadata(pid libpf.PID, lb *labels.Builder) bool
}

// containerMetadataProvider does the retrieval of container metadata for a particular pid.
Expand Down Expand Up @@ -495,26 +496,26 @@ func matchContainerID(containerIDStr string) (string, error) {
}

// AddMetadata adds metadata to the provided labels.Builder for the given PID.
func (p *containerMetadataProvider) AddMetadata(pid libpf.PID, lb *labels.Builder) {
func (p *containerMetadataProvider) AddMetadata(pid libpf.PID, lb *labels.Builder) bool {
// Fast path, check container metadata has been cached
// For kubernetes pods, the shared informer may have updated
// the container id to container metadata cache, so retrieve the container ID for this pid.
pidContainerID, env, err := p.lookupContainerID(pid)
if err != nil {
log.Debugf("Failed to get container id for pid %d: %v", pid, err)
return
return false
}
if envUndefined == env {
// We were not able to identify a container technology for the given PID.
return
return true
}

// Fast path, check if the containerID metadata has been cached
if metadata, ok := p.containerMetadataCache.Get(pidContainerID); ok {
for k, v := range metadata {
lb.Set(string(k), string(v))
}
return
return true
}

// For kubernetes pods this route should happen rarely, this means that we are processing a
Expand All @@ -527,41 +528,43 @@ func (p *containerMetadataProvider) AddMetadata(pid libpf.PID, lb *labels.Builde
if err != nil {
log.Debugf("Failed to get kubernetes pod metadata for container id %v: %v",
pidContainerID, err)
return
return false
}
for k, v := range metadata {
lb.Set(string(k), string(v))
}
return
return true
case isContainerEnvironment(env, envDocker) && p.dockerClient != nil:
metadata, err := p.getDockerContainerMetadata(pidContainerID)
if err != nil {
log.Debugf("Failed to get docker container metadata for container id %v: %v",
pidContainerID, err)
return
return false
}
for k, v := range metadata {
lb.Set(string(k), string(v))
}
return
return true
case isContainerEnvironment(env, envContainerd) && p.containerdClient != nil:
metadata, err := p.getContainerdContainerMetadata(pidContainerID)
if err != nil {
log.Debugf("Failed to get containerd container metadata for container id %v: %v",
pidContainerID, err)
return
return false
}
for k, v := range metadata {
lb.Set(string(k), string(v))
}
return
return true
case isContainerEnvironment(env, envDockerBuildkit):
lb.Set("__meta_docker_build_kit_container_id", pidContainerID)
return
return true
case isContainerEnvironment(env, envLxc):
lb.Set("__meta_lxc_container_id", pidContainerID)
return true
default:
log.Debugf("Failed to handle unknown container technology %d", env)
return true
}
}

Expand Down
38 changes: 25 additions & 13 deletions reporter/metadata/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,24 +169,29 @@ func NewMainExecutableMetadataProvider(
func (p *mainExecutableMetadataProvider) AddMetadata(
pid libpf.PID,
lb *labels.Builder,
) {
) bool {
cacheable := true

fileID, err := process(pid).readMainExecutableFileID()
if err != nil {
log.Debugf("Failed to get fileID for PID %d: %v", pid, err)
return
cacheable = false
}
lb.Set("__meta_process_executable_file_id", fileID.StringNoQuotes())

mainExecInfo, exists := p.executableCache.Get(fileID)
if !exists {
log.Debugf("Failed to get main executable metadata for PID %d, continuing but metadata might be incomplete", pid)
cacheable = false
}

lb.Set("__meta_process_executable_name", mainExecInfo.FileName)
lb.Set("__meta_process_executable_build_id", mainExecInfo.BuildID)
lb.Set("__meta_process_executable_compiler", mainExecInfo.Compiler)
lb.Set("__meta_process_executable_static", strconv.FormatBool(mainExecInfo.Static))
lb.Set("__meta_process_executable_stripped", strconv.FormatBool(mainExecInfo.Stripped))

return cacheable
}

type processMetadataProvider struct{}
Expand All @@ -197,38 +202,45 @@ func NewProcessMetadataProvider() MetadataProvider {
}

// AddMetadata adds metadata labels for a process to the given labels.Builder.
func (pmp *processMetadataProvider) AddMetadata(pid libpf.PID, lb *labels.Builder) {
p := process(pid)

func (pmp *processMetadataProvider) AddMetadata(pid libpf.PID, lb *labels.Builder) bool {
cache := true
lb.Set("__meta_process_pid", strconv.Itoa(int(pid)))

p := process(pid)

cmdline, err := p.cmdline()
if err != nil {
log.Debugf("Failed to get cmdline for PID %d: %v", pid, err)
return
cache = false
} else {
lb.Set("__meta_process_cmdline", strings.Join(cmdline, " "))
}
lb.Set("__meta_process_cmdline", strings.Join(cmdline, " "))

comm, err := p.comm()
if err != nil {
log.Debugf("Failed to get comm for PID %d: %v", pid, err)
return
cache = false
} else {
lb.Set("comm", comm)
}
lb.Set("comm", comm)

cgroup, err := p.cgroup()
if err != nil {
log.Debugf("Failed to get cgroups for PID %d: %v", pid, err)
return
cache = false
} else {
lb.Set("__meta_process_cgroup", cgroup.path)
}
lb.Set("__meta_process_cgroup", cgroup.path)

stat, err := p.stat()
if err != nil {
log.Debugf("Failed to get stat for PID %d: %v", pid, err)
return
cache = false
} else {
lb.Set("__meta_process_ppid", strconv.Itoa(stat.PPID))
}
lb.Set("__meta_process_ppid", strconv.Itoa(stat.PPID))

return cache
}

// cgroup reads from /proc/<pid>/cgroups and returns a []*cgroup struct locating this PID in each process
Expand Down
3 changes: 2 additions & 1 deletion reporter/metadata/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ func NewSystemMetadataProvider() (MetadataProvider, error) {
}, nil
}

func (p *systemMetadataProvider) AddMetadata(_ libpf.PID, lb *labels.Builder) {
func (p *systemMetadataProvider) AddMetadata(_ libpf.PID, lb *labels.Builder) bool {
lb.Set("__meta_system_kernel_machine", p.kernelMachine)
lb.Set("__meta_system_kernel_release", p.kernelRelease)
return true
}
16 changes: 12 additions & 4 deletions reporter/parca_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,15 @@ func (r *ParcaReporter) ReportTraceEvent(trace *libpf.Trace,
r.sampleWriter.Timestamp.Append(int64(meta.Timestamp))
}

func (r *ParcaReporter) addMetadataForPID(pid libpf.PID, lb *labels.Builder) {
func (r *ParcaReporter) addMetadataForPID(pid libpf.PID, lb *labels.Builder) bool {
cache := true

for _, p := range r.metadataProviders {
p.AddMetadata(pid, lb)
cacheable := p.AddMetadata(pid, lb)
cache = cache && cacheable
}

return cache
}

func (r *ParcaReporter) labelsForTID(tid, pid libpf.PID, comm string) labelRetrievalResult {
Expand All @@ -192,7 +197,7 @@ func (r *ParcaReporter) labelsForTID(tid, pid libpf.PID, comm string) labelRetri
lb.Set("node", r.nodeName)
lb.Set("__meta_thread_comm", comm)
lb.Set("__meta_thread_id", fmt.Sprint(tid))
r.addMetadataForPID(pid, lb)
cacheable := r.addMetadataForPID(pid, lb)

keep := relabel.ProcessBuilder(lb, r.relabelConfigs...)

Expand All @@ -208,7 +213,10 @@ func (r *ParcaReporter) labelsForTID(tid, pid libpf.PID, comm string) labelRetri
labels: lb.Labels(),
keep: keep,
}
r.labels.Add(tid, res)

if cacheable {
r.labels.Add(tid, res)
}
return res
}

Expand Down

0 comments on commit aef364d

Please sign in to comment.