Skip to content

Commit

Permalink
feat: Enable StateClients within Docker to connect to destinations (#386
Browse files Browse the repository at this point in the history
)

StateClients open a connection to destinations directly in order to read & persist state. This doesn't work if the source plugin is in a docker container, because docker containers cannot mount sockets (they can mount the files, but the communication doesn't work).

This PR makes two simple changes:
- Adds the `WithUseTCP` option to the client constructor. If it ends up using `startLocal` (as most plugins do), it will send an `--address` with a free TCP address, rather than the default of a Unix socket. Some code is updated to be consistent with this change.
- For Docker registries, it always enables the special extra host `host.docker.internal`, so that the container can address the host's destination connection. There's no good way of asking if the source plugin has a StateClient, so this minimal change will apply to all docker syncs.

By itself, this PR shouldn't change anything. I'll open a minimal PR on the cli to leverage this.


I've tested a Docker sync (typeform) and a non-Docker sync (aws) with these changes and cli changes and they work fine
<img width="1354" alt="Screenshot 2024-08-08 at 12 45 17" src="https://github.com/user-attachments/assets/5b5319c8-b9f6-487d-9713-b007e9486377">
<img width="1353" alt="Screenshot 2024-08-08 at 12 45 49" src="https://github.com/user-attachments/assets/8319b334-e4df-4ea0-b6ec-16fefb85eca7">
  • Loading branch information
marianogappa authored Aug 8, 2024
1 parent 790f62e commit da98954
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 10 deletions.
12 changes: 12 additions & 0 deletions managedplugin/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,15 @@ func WithCloudQueryDockerHost(dockerHost string) Option {
c.cqDockerHost = dockerHost
}
}

func WithUseTCP() Option {
return func(c *Client) {
c.useTCP = true
}
}

func WithDockerExtraHosts(extraHosts []string) Option {
return func(c *Client) {
c.dockerExtraHosts = extraHosts
}
}
88 changes: 78 additions & 10 deletions managedplugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ type Client struct {
teamName string
licenseFile string
dockerAuth string
useTCP bool
tcpAddr string
dockerExtraHosts []string
}

// typ will be deprecated soon but now required for a transition period
Expand Down Expand Up @@ -141,13 +144,14 @@ func (c Clients) Terminate() error {
// If registrySpec is Docker then client downloads the docker image, runs it and creates a gRPC connection.
func NewClient(ctx context.Context, typ PluginType, config Config, opts ...Option) (*Client, error) {
c := &Client{
directory: defaultDownloadDir,
wg: &sync.WaitGroup{},
config: config,
metrics: &Metrics{},
registry: config.Registry,
cqDockerHost: DefaultCloudQueryDockerHost,
dockerAuth: config.DockerAuth,
directory: defaultDownloadDir,
wg: &sync.WaitGroup{},
config: config,
metrics: &Metrics{},
registry: config.Registry,
cqDockerHost: DefaultCloudQueryDockerHost,
dockerAuth: config.DockerAuth,
dockerExtraHosts: []string{},
}
for _, opt := range opts {
opt(c)
Expand Down Expand Up @@ -262,6 +266,9 @@ func (c *Client) ConnectionString() string {
case RegistryLocal,
RegistryGithub,
RegistryCloudQuery:
if c.useTCP {
return tgt
}
return "unix://" + tgt
case RegistryDocker:
return tgt
Expand Down Expand Up @@ -294,6 +301,7 @@ func (c *Client) startDockerPlugin(ctx context.Context, configPath string) error
Env: c.config.Environment,
}
hostConfig := &container.HostConfig{
ExtraHosts: c.dockerExtraHosts,
PortBindings: map[nat.Port][]nat.PortBinding{
"7777/tcp": {
{
Expand All @@ -303,6 +311,7 @@ func (c *Client) startDockerPlugin(ctx context.Context, configPath string) error
},
},
}

networkingConfig := &network.NetworkingConfig{}
platform := &containerSpecs.Platform{}
containerName := c.config.Name + "-" + uuid.New().String()
Expand Down Expand Up @@ -399,7 +408,60 @@ func waitForContainerRunning(ctx context.Context, cli *dockerClient.Client, cont
return err
}

func getFreeTCPAddr() (string, error) {
addr, err := net.ResolveTCPAddr("tcp", "localhost:0")
if err != nil {
return "", err
}

l, err := net.ListenTCP("tcp", addr)
if err != nil {
return "", err
}
defer l.Close()

return l.Addr().String(), nil
}

func (c *Client) startLocal(ctx context.Context, path string) error {
if c.useTCP {
tcpAddr, err := getFreeTCPAddr()
if err != nil {
return fmt.Errorf("failed to get free port: %w", err)
}
c.tcpAddr = tcpAddr
return c.startLocalTCP(ctx, path)
}
return c.startLocalUnixSocket(ctx, path)
}

func (c *Client) startLocalTCP(ctx context.Context, path string) error {
// spawn the plugin first and then connect
args := c.getPluginArgs()
cmd := exec.CommandContext(ctx, path, args...)
reader, err := cmd.StdoutPipe()
if err != nil {
return fmt.Errorf("failed to get stdout pipe: %w", err)
}
cmd.Stderr = os.Stderr
if c.config.Environment != nil {
cmd.Env = c.config.Environment
}
cmd.SysProcAttr = getSysProcAttr()
if err := cmd.Start(); err != nil {
return fmt.Errorf("failed to start plugin %s: %w", path, err)
}

c.cmd = cmd

c.logReader = reader
c.wg.Add(1)
go c.readLogLines(reader)

return c.connectUsingTCP(ctx, c.tcpAddr)
}

func (c *Client) startLocalUnixSocket(ctx context.Context, path string) error {
c.grpcSocketName = GenerateRandomUnixSocketName()
// spawn the plugin first and then connect
args := c.getPluginArgs()
Expand All @@ -424,10 +486,13 @@ func (c *Client) startLocal(ctx context.Context, path string) error {
go c.readLogLines(reader)

err = c.connectToUnixSocket(ctx)
// N.B. we exit early if connecting succeeds here!
if err == nil {
return err
return nil
}

// Error scenarios:

if killErr := cmd.Process.Kill(); killErr != nil {
c.logger.Error().Err(killErr).Msg("failed to kill plugin process")
}
Expand All @@ -441,9 +506,12 @@ func (c *Client) startLocal(ctx context.Context, path string) error {

func (c *Client) getPluginArgs() []string {
args := []string{"serve", "--log-level", c.logger.GetLevel().String(), "--log-format", "json"}
if c.grpcSocketName != "" {
switch {
case c.grpcSocketName != "":
args = append(args, "--network", "unix", "--address", c.grpcSocketName)
} else {
case c.useTCP:
args = append(args, "--network", "tcp", "--address", c.tcpAddr)
default:
args = append(args, "--network", "tcp", "--address", "0.0.0.0:7777")
}
if c.noSentry {
Expand Down

0 comments on commit da98954

Please sign in to comment.