Skip to content

Commit

Permalink
Merge pull request crossplane-contrib#2 from turkenh/minor-fixes
Browse files Browse the repository at this point in the history
ACL fixes and readme updates
  • Loading branch information
marshmallory authored Jan 25, 2022
2 parents d728f59 + 31c2acc commit cb9f959
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 25 deletions.
35 changes: 17 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,11 @@ parameters [here](https://github.com/bitnami/charts/tree/master/bitnami/kafka/#i
```
helm repo add bitnami https://charts.bitnami.com/bitnami
kubectl create ns kafka-cluster
helm upgrade --install kafka-dev -n kafka-cluster bitnami/kafka --set auth.clientProtocol=sasl --set deleteTopicEnable=true --wait
helm upgrade --install kafka-dev -n kafka-cluster bitnami/kafka \
--set auth.clientProtocol=sasl \
--set deleteTopicEnable=true \
--set authorizerClassName="kafka.security.auth.SimpleAclAuthorizer" \
--wait
```
Username is "user", obtain password using the following
Expand Down Expand Up @@ -84,34 +88,29 @@ parameters [here](https://github.com/bitnami/charts/tree/master/bitnami/kafka/#i
sudo kubefwd svc -n kafka-cluster
```

5. (optional) Install [kafka cli](https://github.com/birdayz/kaf).
5. (optional) Install the [kafka cli](https://github.com/twmb/kcl).


6. (optional) Configure the kafka cli to talk against local Kafka installation:

1. Create a config file for the client with the following content at `~/.kaf/config`:
1. Create a config file for the client with the following content at `~/.kcl/config.toml`:

```
current-cluster: local
clusteroverride: ""
clusters:
- name: local
version: ""
brokers:
- kafka-dev-0.kafka-dev-headless:9092
SASL:
mechanism: PLAIN
username: user
password: <password-you-obtained-in-step-2>
TLS: null
security-protocol: ""
schema-registry-url: ""
seed_brokers = ["kafka-dev-0.kafka-dev-headless:9092"]
timeout_ms = 10000

[sasl]
method = "plain"
user = "user"
pass = "<password-you-obtained-in-step-2>"
```

1. Verify that cli could talk to the Kafka cluster:

```
kaf nodes
export KCL_CONFIG_DIR=~/.kcl

kcl metadata --all
```

### Building and Running the provider locally
Expand Down
2 changes: 1 addition & 1 deletion examples/provider/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ spec:
credentials:
source: Secret
secretRef:
namespace: crossplane-system
namespace: default
name: kafka-creds
key: credentials
5 changes: 4 additions & 1 deletion internal/clients/kafka/acl/acl.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package acl
import (
"context"
"fmt"

"github.com/twmb/franz-go/pkg/kadm"
)

Expand Down Expand Up @@ -37,14 +38,16 @@ func List() {
func Create(ctx context.Context, cl *kadm.Client, name string) (*kadm.CreateACLsResults, error) {

b := kadm.ACLBuilder{}
ab := b.Topics("sample_topic").Allow("Jon").AllowHosts("localhost").Operations(2).ResourcePatternType(3)
ab := b.Topics("sample_topic").Allow("User:Jon").AllowHosts("*").Operations(kadm.OpWrite).ResourcePatternType(kadm.ACLPatternLiteral)

c, err := cl.CreateACLs(ctx, ab)

fmt.Println("*** CREATING ACL ***", c)

fmt.Println("ERROR: ", err)

fmt.Println("Underlying ERROR: ", c[0].Err)

return &c, err
}

Expand Down
14 changes: 9 additions & 5 deletions internal/controller/acl/acl.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ package acl
import (
"context"
"fmt"

"github.com/crossplane-contrib/provider-kafka/internal/clients/kafka"

"github.com/crossplane-contrib/provider-kafka/internal/clients/kafka/acl"
"github.com/crossplane/crossplane-runtime/pkg/meta"
"github.com/twmb/franz-go/pkg/kadm"
Expand Down Expand Up @@ -69,7 +72,7 @@ func Setup(mgr ctrl.Manager, l logging.Logger, rl workqueue.RateLimiter) error {
managed.WithExternalConnecter(&connector{
kube: mgr.GetClient(),
usage: resource.NewProviderConfigUsageTracker(mgr.GetClient(), &apisv1alpha1.ProviderConfigUsage{}),
newServiceFn: newNoOpService}),
newServiceFn: kafka.NewAdminClient}),
managed.WithLogger(l.WithValues("controller", name)),
managed.WithRecorder(event.NewAPIRecorder(mgr.GetEventRecorderFor(name))))

Expand All @@ -85,7 +88,8 @@ func Setup(mgr ctrl.Manager, l logging.Logger, rl workqueue.RateLimiter) error {
type connector struct {
kube client.Client
usage resource.Tracker
newServiceFn func(creds []byte) (interface{}, error)
log logging.Logger
newServiceFn func(creds []byte) (*kadm.Client, error)
}

// Connect typically produces an ExternalClient by:
Expand Down Expand Up @@ -119,14 +123,14 @@ func (c *connector) Connect(ctx context.Context, mg resource.Managed) (managed.E
return nil, errors.Wrap(err, errNewClient)
}

return &external{service: svc}, nil
return &external{kafkaClient: svc, log: c.log}, nil
}

// An ExternalClient observes, then either creates, updates, or deletes an
// external resource to ensure it reflects the managed resource's desired state.
type external struct {
kafkaClient *kadm.Client
service interface{}
log logging.Logger
}

func (c *external) Observe(ctx context.Context, mg resource.Managed) (managed.ExternalObservation, error) {
Expand All @@ -136,7 +140,7 @@ func (c *external) Observe(ctx context.Context, mg resource.Managed) (managed.Ex
}

// These fmt statements should be removed in the real implementation.
fmt.Printf("Observing: %+v", cr)
fmt.Printf("Observing ACL resource: %s", cr.Name)

return managed.ExternalObservation{
ResourceExists: false,
Expand Down

0 comments on commit cb9f959

Please sign in to comment.