Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make osq runner responsive to registration updates #2007

Open
wants to merge 23 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
0c31928
rework interfaces for runner change detection
zackattack01 Dec 17, 2024
81e4209
gofmt
zackattack01 Dec 17, 2024
e028a65
restarts seem to be working this way
zackattack01 Dec 18, 2024
b9e1918
shift to using buffered shutdown channel, rework
zackattack01 Dec 19, 2024
2d74749
cleanup and add comment
zackattack01 Dec 19, 2024
db0b434
don't parallel test for runtime
zackattack01 Dec 20, 2024
81d84d9
PR feedback: add registrationID lock, and fix error handling flow whe…
zackattack01 Dec 20, 2024
a4168fb
fix merge conflict
zackattack01 Dec 20, 2024
c20b160
put tests back to parallel
zackattack01 Dec 20, 2024
b5451ff
Merge branch 'main' of https://github.com/kolide/launcher into zack/r…
zackattack01 Dec 23, 2024
c20d34a
update var names from rebase
zackattack01 Dec 23, 2024
6e9172a
Merge branch 'main' of github.com:kolide/launcher into zack/runner_re…
zackattack01 Jan 14, 2025
b6029da
update tests for new mocking and cleanup patterns
zackattack01 Jan 15, 2025
25edcce
pull in main and fix conflicts
zackattack01 Jan 24, 2025
bccda00
fix data races, add comments
zackattack01 Jan 24, 2025
506e14d
Merge branch 'main' into zack/runner_registration_ids
zackattack01 Jan 24, 2025
49654cb
Merge branch 'main' into zack/runner_registration_ids
zackattack01 Jan 24, 2025
4060380
fix up tests with new startupsettingswriter mocks
zackattack01 Jan 27, 2025
2c70a29
Merge branch 'main' into zack/runner_registration_ids
zackattack01 Jan 27, 2025
c6592cd
Merge branch 'main' into zack/runner_registration_ids
zackattack01 Jan 28, 2025
f551f6b
don't acquire locks for runner's String method
zackattack01 Jan 28, 2025
5180768
Merge branch 'main' into zack/runner_registration_ids
zackattack01 Jan 28, 2025
658307c
Update pkg/osquery/runtime/runner.go
zackattack01 Jan 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ func runLauncher(ctx context.Context, cancel func(), multiSlogger, systemMultiSl
osqueryruntime.WithAugeasLensFunction(augeas.InstallLenses),
)
runGroup.Add("osqueryRunner", osqueryRunner.Run, osqueryRunner.Interrupt)
k.SetInstanceQuerier(osqueryRunner)
k.SetInstanceRunner(osqueryRunner)

versionInfo := version.Version()
k.SystemSlogger().Log(ctx, slog.LevelInfo,
Expand Down
20 changes: 14 additions & 6 deletions ee/agent/knapsack/knapsack.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type knapsack struct {

slogger, systemSlogger *multislogger.MultiSlogger

querier types.InstanceQuerier
osqRunner types.OsqRunner

// This struct is a work in progress, and will be iteratively added to as needs arise.
}
Expand Down Expand Up @@ -87,23 +87,31 @@ func (k *knapsack) AddSlogHandler(handler ...slog.Handler) {
k.systemSlogger.AddHandler(handler...)
}

// Osquery instance querier
func (k *knapsack) SetInstanceQuerier(q types.InstanceQuerier) {
k.querier = q
// Osquery instance runner
func (k *knapsack) SetInstanceRunner(r types.OsqRunner) {
k.osqRunner = r
}

// RegistrationTracker interface methods
func (k *knapsack) RegistrationIDs() []string {
return []string{types.DefaultRegistrationID}
}

func (k *knapsack) SetRegistrationIDs(registrationIDs []string) error {
if k.osqRunner == nil {
return nil
}

return k.osqRunner.UpdateRegistrationIDs(registrationIDs)
}

// InstanceStatuses returns the current status of each osquery instance.
// It performs a healthcheck against each existing instance.
func (k *knapsack) InstanceStatuses() map[string]types.InstanceStatus {
if k.querier == nil {
if k.osqRunner == nil {
return nil
}
return k.querier.InstanceStatuses()
return k.osqRunner.InstanceStatuses()
}

// BboltDB interface methods
Expand Down
2 changes: 1 addition & 1 deletion ee/agent/types/knapsack.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ type Knapsack interface {
Slogger
RegistrationTracker
InstanceQuerier
SetInstanceQuerier(q InstanceQuerier)
SetInstanceRunner(r OsqRunner)
// LatestOsquerydPath finds the path to the latest osqueryd binary, after accounting for updates.
LatestOsquerydPath(ctx context.Context) string
// ReadEnrollSecret returns the enroll secret value, checking in various locations.
Expand Down
26 changes: 22 additions & 4 deletions ee/agent/types/mocks/knapsack.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions ee/agent/types/registration.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ const (
// data may be provided by e.g. a control server subsystem.
type RegistrationTracker interface {
RegistrationIDs() []string
SetRegistrationIDs(registrationIDs []string) error
}
13 changes: 13 additions & 0 deletions ee/agent/types/runner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package types

type (
// RegistrationChangeHandler is implemented by pkg/osquery/runtime/runner.go
RegistrationChangeHandler interface {
UpdateRegistrationIDs(registrationIDs []string) error
}

OsqRunner interface {
RegistrationChangeHandler
InstanceQuerier
}
)
130 changes: 117 additions & 13 deletions pkg/osquery/runtime/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"log/slog"
"slices"
"sync"
"sync/atomic"
"time"
Expand All @@ -27,14 +28,16 @@ type settingsStoreWriter interface {

type Runner struct {
registrationIds []string // we expect to run one instance per registration ID
regIDLock sync.Mutex // locks access to registrationIds
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I discovered that if we are locking for a single value, there is an atmoic.Value or atmoic.Pointer that can be used, like:

	var registrationIds atomic.Value
	registrationIds.Store([]string{"a", "b", "c"})
	theIds := registrationIds.Load().([]string)

or

	var registrationIds atomic.Pointer[[]string]
	registrationIds.Store(&[]string{"a", "b", "c"})
	theIds := registrationIds.Load()

the internets generally say that atmoics are faster ... but I doubt that it makes any noticeable difference for us, also pointer to an array feels weird and I'm not sure of the implications

feel free to not act on this, I don't know if it's any better

instances map[string]*OsqueryInstance // maps registration ID to currently-running instance
instanceLock sync.Mutex // locks access to `instances` to avoid e.g. restarting an instance that isn't running yet
slogger *slog.Logger
knapsack types.Knapsack
serviceClient service.KolideService // shared service client for communication between osquery instance and Kolide SaaS
settingsWriter settingsStoreWriter // writes to startup settings store
opts []OsqueryInstanceOption // global options applying to all osquery instances
shutdown chan struct{}
shutdown chan struct{} // buffered shutdown channel for to enable shutting down to restart or exit
zackattack01 marked this conversation as resolved.
Show resolved Hide resolved
rerunRequired atomic.Bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a quick comment here? // if set, the runner will restart all instances after Shutdown is called, instead of exiting or similar

interrupted atomic.Bool
}

Expand All @@ -46,8 +49,9 @@ func New(k types.Knapsack, serviceClient service.KolideService, settingsWriter s
knapsack: k,
serviceClient: serviceClient,
settingsWriter: settingsWriter,
shutdown: make(chan struct{}),
opts: opts,
// the buffer length is arbitrarily set at 100, this number just needs to be higher than the total possible instances
shutdown: make(chan struct{}, 100),
opts: opts,
}

k.RegisterChangeObserver(runner,
Expand All @@ -57,12 +61,56 @@ func New(k types.Knapsack, serviceClient service.KolideService, settingsWriter s
return runner
}

// String method is only added to runner because it is often used in our runtime tests as an argument
// passed to mocked knapsack calls. when we AssertExpectations, the runner struct is traversed by the
// Diff logic inside testify. This causes data races to be incorrectly reported for structs containing mutexes-
// the second read is coming from testify.
// see (one of) the issues here for additional context https://github.com/stretchr/testify/issues/1597
// If we really needed to expose more here, we could acquire all locks and return fmt.Sprintf("%#v", r). but given
// that we do not, it seems safer to avoid introducing any additional lock contention in case our Stringer call
// is invoked someday in a production flow
func (r *Runner) String() string {
return "runtime.Runner{}"
}
RebeccaMahany marked this conversation as resolved.
Show resolved Hide resolved

func (r *Runner) Run() error {
for {
err := r.runRegisteredInstances()
if err != nil {
// log any errors but continue, in case we intend to reload
r.slogger.Log(context.TODO(), slog.LevelWarn,
"runRegisteredInstances terminated with error",
"err", err,
)
}

// if we're in a state that required re-running all registered instances,
// reset the field and do that
if r.rerunRequired.Load() {
r.rerunRequired.Store(false)
continue
}

return err
}
}

func (r *Runner) runRegisteredInstances() error {
// clear the internal instances to add back in fresh as we runInstance,
// this prevents old instances from sticking around if a registrationID is ever removed
r.instanceLock.Lock()
r.instances = make(map[string]*OsqueryInstance)
r.instanceLock.Unlock()

// Create a group to track the workers running each instance
wg, ctx := errgroup.WithContext(context.TODO())

// Start each worker for each instance
for _, registrationId := range r.registrationIds {
r.regIDLock.Lock()
regIDs := r.registrationIds
r.regIDLock.Unlock()

for _, registrationId := range regIDs {
id := registrationId
wg.Go(func() error {
if err := r.runInstance(id); err != nil {
Expand Down Expand Up @@ -205,6 +253,13 @@ func (r *Runner) Query(query string) ([]map[string]string, error) {
}

func (r *Runner) Interrupt(_ error) {
if r.interrupted.Load() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it'd be useful to also do r.rerunRequired.Store(false) here? I'm thinking about the case where UpdateRegistrationIDs is called and then Interrupt is called while the shutdown/restart is ongoing. Could maybe add a test case for this as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ohh yeah that seems wise. will do both!

// Already shut down, nothing else to do
return
}

r.interrupted.Store(true)

if err := r.Shutdown(); err != nil {
r.slogger.Log(context.TODO(), slog.LevelWarn,
"could not shut down runner on interrupt",
Expand All @@ -218,14 +273,12 @@ func (r *Runner) Interrupt(_ error) {
func (r *Runner) Shutdown() error {
ctx, span := traces.StartSpan(context.TODO())
defer span.End()

if r.interrupted.Load() {
// Already shut down, nothing else to do
return nil
// ensure one shutdown is sent for each instance to read
r.instanceLock.Lock()
for range r.instances {
r.shutdown <- struct{}{}
}
Comment on lines +278 to 280
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about giving each instance it's own shutdown channel to avoid the buffered channel?

Suggested change
for range r.instances {
r.shutdown <- struct{}{}
}
for instance := range r.instances {
close(instance.shutdown)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh yeah i'm gonna give this a try, ty! I'm actually seeing some interesting behavior while trying to write a test for Interrupt during restart per Becca's comment here and I wonder if this wouldn't clear things up a little. the behavior I'm seeing in the test is that the interrupt can end up ignored if timed correctly and I think It's related to the buffered channel use so far


r.interrupted.Store(true)
close(r.shutdown)
r.instanceLock.Unlock()

if err := r.triggerShutdownForInstances(ctx); err != nil {
return fmt.Errorf("triggering shutdown for instances during runner shutdown: %w", err)
Expand Down Expand Up @@ -326,7 +379,11 @@ func (r *Runner) Healthy() error {
defer r.instanceLock.Unlock()

healthcheckErrs := make([]error, 0)
for _, registrationId := range r.registrationIds {
r.regIDLock.Lock()
regIDs := r.registrationIds
r.regIDLock.Unlock()

for _, registrationId := range regIDs {
instance, ok := r.instances[registrationId]
if !ok {
healthcheckErrs = append(healthcheckErrs, fmt.Errorf("running instance does not exist for %s", registrationId))
Expand All @@ -349,8 +406,11 @@ func (r *Runner) InstanceStatuses() map[string]types.InstanceStatus {
r.instanceLock.Lock()
defer r.instanceLock.Unlock()

r.regIDLock.Lock()
regIDs := r.registrationIds
r.regIDLock.Unlock()
instanceStatuses := make(map[string]types.InstanceStatus)
for _, registrationId := range r.registrationIds {
for _, registrationId := range regIDs {
instance, ok := r.instances[registrationId]
if !ok {
instanceStatuses[registrationId] = types.InstanceStatusNotStarted
Expand All @@ -367,3 +427,47 @@ func (r *Runner) InstanceStatuses() map[string]types.InstanceStatus {

return instanceStatuses
}

// UpdateRegistrationIDs detects any changes between the new and stored registration IDs,
// and resets the runner instances for the new registrationIDs if required
func (r *Runner) UpdateRegistrationIDs(newRegistrationIDs []string) error {
slices.Sort(newRegistrationIDs)

r.regIDLock.Lock()
existingRegistrationIDs := r.registrationIds
r.regIDLock.Unlock()
slices.Sort(existingRegistrationIDs)

if slices.Equal(newRegistrationIDs, existingRegistrationIDs) {
r.slogger.Log(context.TODO(), slog.LevelDebug,
"skipping runner restarts for updated registration IDs, no changes detected",
)

return nil
}

r.slogger.Log(context.TODO(), slog.LevelDebug,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit -- I think reasonable to log this at the info level (so that we ship this log to the cloud)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh good call, will do!

"detected changes to registrationIDs, will restart runner instances",
"previous_registration_ids", existingRegistrationIDs,
"new_registration_ids", newRegistrationIDs,
)

// we know there are changes, safe to update the internal registrationIDs now
r.regIDLock.Lock()
r.registrationIds = newRegistrationIDs
RebeccaMahany marked this conversation as resolved.
Show resolved Hide resolved
r.regIDLock.Unlock()
// mark rerun as required so that we can safely shutdown all workers and have the changes
// picked back up from within the main Run function
r.rerunRequired.Store(true)

if err := r.Shutdown(); err != nil {
r.slogger.Log(context.TODO(), slog.LevelWarn,
"could not shut down runner instances for restart after registration changes",
"err", err,
)

return err
}

return nil
}
Loading
Loading