Skip to content

Commit e9241f4

Browse files
feat: create acls (#165)
* bump kafka-go to include acl apis * add acl interfaces and aclinfo type stub * pull latest kafka-go and use kafka-go aclresource type * wip * fix test * fix typos * get acls working * getacls working * upgrade cobra to latest * finish separating get into separate subcommands * remove unneeded variables * wip * pr feedback * Revert "upgrade cobra to latest" This reverts commit 7b8ee42. * use getCliRunnerAndCtx in get acls * more consistent variable names * custom cobra type * bring in new kafka-go * support resource pattern type * add support for acloperationtype and remove options for unknown * improve descriptions * support permissiontype and host filters * add resource name filter and fix permission type formatting * support principal filtering * improve docs * add examples * remove comment * remove TODOs that are complete * remove TODOs that are complete * update README * fix test * wip * fix error handling * error handling for zk * more consistent error msg * clean up createacl * add TestBrokerClientCreateACLReadOnly * improve zk tests * run acl tests in ci * enable acls for kafka 2.4.1 in ci * fix zk tests * skip TestBrokerClientCreateACLReadOnly on old versions of kafka * try to debug * handle nested errors from createacls * operations -> operation * operations -> operation * remove setting log level in test * clean up allowed types in help command * fix merge conflict * fix test * add json annotations * bump kafka-go to version on main * wip * basic tests * start on getusers cmd * add json annotations * get users working * wip * add todos and fix type annotaitons * improve test * use CanTestBrokerAdminSecurity to feature flag test * update README * remove duplicate test from merge conflicts * fix more merge conflicts * create user working * add uncommitted files * start adding validation * meta validation for users * wip * support dry run and skip confirm * wip * wip * add more files * resourcemta * consistency checking for acls * remove emacs backups * remove user stuff * remove diff from cluster.yaml file * remove diff from topic file * remove debug log * smaller diff * remove completed todos * remove unused error helper * add missing meta file * skip ACL tests when ACLs cannot be used due to kafka version limitations * fix loadacls test * add more todos * add validation and set defaults * don't use ioutil * move confirm to util package * move confirm to util package * add create to README * use validation and setdefaults * add example acl * fix formatting in readme * use released version of kafka-go * fix spelling * make invalid field more obvious * fix dryrun and skip confirm * fix grammar
1 parent 4e44ea4 commit e9241f4

32 files changed

+1546
-108
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,6 @@ vendor/
1717
build/
1818

1919
.vscode
20+
21+
# Emacs backups
22+
*~

README.md

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,15 @@ The `check` command validates that each topic config has the correct fields set
155155
consistent with the associated cluster config. Unless `--validate-only` is set, it then
156156
checks the topic config against the state of the topic in the corresponding cluster.
157157

158+
#### create
159+
```
160+
topicctl create [flags] [command]
161+
```
162+
163+
The `create` command creates resources in the cluster from a configuration file.
164+
Currently, only ACLs are supported. The create command is separate from the apply
165+
command as it is intended for usage with immutable resources managed by topicctl.
166+
158167
#### get
159168

160169
```
@@ -419,6 +428,47 @@ This subcommand will not rebalance a topic if:
419428
1. a topic's `retention.ms` in the kafka cluster does not match the topic's `retentionMinutes` setting in the topic config
420429
1. a topic does not exist in the kafka cluster
421430

431+
### ACLs
432+
433+
Sets of ACLs can be configured in a YAML file. The following is an
434+
annotated example:
435+
436+
```yaml
437+
meta:
438+
name: acls-test # Name of the group of ACLs
439+
cluster: my-cluster # Name of the cluster
440+
environment: stage # Environment of the cluster
441+
region: us-west-2 # Region of the cluster
442+
description: | # Free-text description of the topic (optional)
443+
Test topic in my-cluster.
444+
labels: # Custom key-value pairs purposed for ACL bookkeeping (optional)
445+
key1: value1
446+
key2: value2
447+
448+
spec:
449+
acls:
450+
- resource:
451+
type: topic # Type of resource (topic, group, cluster, etc.)
452+
name: test-topic # Name of the resource to apply an ACL to
453+
patternType: literal # Type of pattern (literal, prefixed, etc.)
454+
principal: User:my-user # Principal to apply the ACL to
455+
host: * # Host to apply the ACL to
456+
permission: allow # Permission to apply (allow, deny)
457+
operations: # List of operations to use for the ACLs
458+
- read
459+
- describe
460+
```
461+
462+
The `cluster`, `environment`, and `region` fields are used for matching
463+
against a cluster config and double-checking that the cluster we're applying
464+
in is correct; they don't appear in any API calls.
465+
466+
See the [Kafka documentation](https://kafka.apache.org/documentation/#security_authz_primitives)
467+
for more details on the parameters that can be set in the `acls` field.
468+
469+
Multiple groups of ACLs can be included in the same file, separated by `---` lines, provided
470+
that they reference the same cluster.
471+
422472
## Tool safety
423473

424474
The `bootstrap`, `get`, `repl`, and `tail` subcommands are read-only and should never make
@@ -441,6 +491,9 @@ The `apply` subcommand can make changes, but under the following conditions:
441491

442492
The `reset-offsets` command can also make changes in the cluster and should be used carefully.
443493

494+
The `create` command can be used to create new resources in the cluster. It cannot be used with
495+
mutable resources.
496+
444497
### Idempotency
445498

446499
Apply runs are designed to be idemponent- the effects should be the same no matter how many

cmd/topicctl/subcmd/create.go

Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
package subcmd
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"os"
7+
"os/signal"
8+
"path/filepath"
9+
"syscall"
10+
11+
"github.com/segmentio/topicctl/pkg/admin"
12+
"github.com/segmentio/topicctl/pkg/cli"
13+
"github.com/segmentio/topicctl/pkg/config"
14+
"github.com/segmentio/topicctl/pkg/create"
15+
log "github.com/sirupsen/logrus"
16+
"github.com/spf13/cobra"
17+
)
18+
19+
var createCmd = &cobra.Command{
20+
Use: "create [resource type]",
21+
Short: "creates one or more resources",
22+
PersistentPreRunE: createPreRun,
23+
}
24+
25+
type createCmdConfig struct {
26+
dryRun bool
27+
pathPrefix string
28+
skipConfirm bool
29+
30+
shared sharedOptions
31+
}
32+
33+
var createConfig createCmdConfig
34+
35+
func init() {
36+
createCmd.PersistentFlags().BoolVar(
37+
&createConfig.dryRun,
38+
"dry-run",
39+
false,
40+
"Do a dry-run",
41+
)
42+
createCmd.PersistentFlags().StringVar(
43+
&createConfig.pathPrefix,
44+
"path-prefix",
45+
os.Getenv("TOPICCTL_ACL_PATH_PREFIX"),
46+
"Prefix for ACL config paths",
47+
)
48+
createCmd.PersistentFlags().BoolVar(
49+
&createConfig.skipConfirm,
50+
"skip-confirm",
51+
false,
52+
"Skip confirmation prompts during creation process",
53+
)
54+
55+
addSharedFlags(createCmd, &createConfig.shared)
56+
createCmd.AddCommand(
57+
createACLsCmd(),
58+
)
59+
RootCmd.AddCommand(createCmd)
60+
}
61+
62+
func createPreRun(cmd *cobra.Command, args []string) error {
63+
if err := RootCmd.PersistentPreRunE(cmd, args); err != nil {
64+
return err
65+
}
66+
return createConfig.shared.validate()
67+
}
68+
69+
func createACLsCmd() *cobra.Command {
70+
cmd := &cobra.Command{
71+
Use: "acls [acl configs]",
72+
Short: "creates ACLs from configuration files",
73+
Args: cobra.MinimumNArgs(1),
74+
RunE: createACLRun,
75+
PreRunE: createPreRun,
76+
}
77+
78+
return cmd
79+
}
80+
81+
func createACLRun(cmd *cobra.Command, args []string) error {
82+
ctx, cancel := context.WithCancel(context.Background())
83+
defer cancel()
84+
85+
sigChan := make(chan os.Signal, 1)
86+
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
87+
go func() {
88+
<-sigChan
89+
cancel()
90+
}()
91+
92+
// Keep a cache of the admin clients with the cluster config path as the key
93+
adminClients := map[string]admin.Client{}
94+
95+
defer func() {
96+
for _, adminClient := range adminClients {
97+
adminClient.Close()
98+
}
99+
}()
100+
101+
matchCount := 0
102+
103+
for _, arg := range args {
104+
if createConfig.pathPrefix != "" && !filepath.IsAbs(arg) {
105+
arg = filepath.Join(createConfig.pathPrefix, arg)
106+
}
107+
108+
matches, err := filepath.Glob(arg)
109+
if err != nil {
110+
return err
111+
}
112+
113+
for _, match := range matches {
114+
matchCount++
115+
if err := createACL(ctx, match, adminClients); err != nil {
116+
return err
117+
}
118+
}
119+
}
120+
121+
if matchCount == 0 {
122+
return fmt.Errorf("No ACL configs match the provided args (%+v)", args)
123+
}
124+
125+
return nil
126+
}
127+
128+
func createACL(
129+
ctx context.Context,
130+
aclConfigPath string,
131+
adminClients map[string]admin.Client,
132+
) error {
133+
clusterConfigPath, err := clusterConfigForACLCreate(aclConfigPath)
134+
if err != nil {
135+
return err
136+
}
137+
138+
aclConfigs, err := config.LoadACLsFile(aclConfigPath)
139+
if err != nil {
140+
return err
141+
}
142+
143+
clusterConfig, err := config.LoadClusterFile(clusterConfigPath, createConfig.shared.expandEnv)
144+
if err != nil {
145+
return err
146+
}
147+
148+
adminClient, ok := adminClients[clusterConfigPath]
149+
if !ok {
150+
adminClient, err = clusterConfig.NewAdminClient(
151+
ctx,
152+
nil,
153+
createConfig.dryRun,
154+
createConfig.shared.saslUsername,
155+
createConfig.shared.saslPassword,
156+
)
157+
if err != nil {
158+
return err
159+
}
160+
adminClients[clusterConfigPath] = adminClient
161+
}
162+
163+
cliRunner := cli.NewCLIRunner(adminClient, log.Infof, false)
164+
165+
for _, aclConfig := range aclConfigs {
166+
aclConfig.SetDefaults()
167+
log.Infof(
168+
"Processing ACL %s in config %s with cluster config %s",
169+
aclConfig.Meta.Name,
170+
aclConfigPath,
171+
clusterConfigPath,
172+
)
173+
174+
creatorConfig := create.ACLCreatorConfig{
175+
DryRun: createConfig.dryRun,
176+
SkipConfirm: createConfig.skipConfirm,
177+
ACLConfig: aclConfig,
178+
ClusterConfig: clusterConfig,
179+
}
180+
181+
if err := cliRunner.CreateACL(ctx, creatorConfig); err != nil {
182+
return err
183+
}
184+
}
185+
186+
return nil
187+
}
188+
189+
func clusterConfigForACLCreate(aclConfigPath string) (string, error) {
190+
if createConfig.shared.clusterConfig != "" {
191+
return createConfig.shared.clusterConfig, nil
192+
}
193+
194+
return filepath.Abs(
195+
filepath.Join(
196+
filepath.Dir(aclConfigPath),
197+
"..",
198+
"cluster.yaml",
199+
),
200+
)
201+
}

cmd/topicctl/subcmd/rebalance.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,15 @@ package subcmd
33
import (
44
"context"
55
"fmt"
6-
"github.com/spf13/cobra"
76
"os"
87
"os/signal"
98
"path/filepath"
109
"strconv"
1110
"syscall"
1211
"time"
1312

13+
"github.com/spf13/cobra"
14+
1415
"github.com/segmentio/topicctl/pkg/admin"
1516
"github.com/segmentio/topicctl/pkg/apply"
1617
"github.com/segmentio/topicctl/pkg/cli"
@@ -159,7 +160,7 @@ func rebalanceRun(cmd *cobra.Command, args []string) error {
159160

160161
for _, topicConfig := range topicConfigs {
161162
// topic config should be consistent with the cluster config
162-
if err := config.CheckConsistency(topicConfig, clusterConfig); err != nil {
163+
if err := config.CheckConsistency(topicConfig.Meta, clusterConfig); err != nil {
163164
log.Errorf("topic file: %s inconsistent with cluster: %s", topicFile, clusterConfigPath)
164165
continue
165166
}

cmd/topicctl/subcmd/reset.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@ import (
66
"fmt"
77
"strconv"
88

9-
"github.com/segmentio/topicctl/pkg/apply"
109
"github.com/segmentio/topicctl/pkg/cli"
1110
"github.com/segmentio/topicctl/pkg/groups"
11+
"github.com/segmentio/topicctl/pkg/util"
1212
log "github.com/sirupsen/logrus"
1313
"github.com/spf13/cobra"
1414
)
@@ -167,7 +167,7 @@ func resetOffsetsRun(cmd *cobra.Command, args []string) error {
167167
"Please ensure that all other consumers are stopped, otherwise the reset might be overridden.",
168168
)
169169

170-
ok, _ := apply.Confirm("OK to continue?", false)
170+
ok, _ := util.Confirm("OK to continue?", false)
171171
if !ok {
172172
return errors.New("Stopping because of user response")
173173
}

cmd/topicctl/subcmd/tester.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import (
1010
"time"
1111

1212
"github.com/segmentio/kafka-go"
13-
"github.com/segmentio/topicctl/pkg/apply"
13+
"github.com/segmentio/topicctl/pkg/util"
1414
log "github.com/sirupsen/logrus"
1515
"github.com/spf13/cobra"
1616
)
@@ -104,7 +104,7 @@ func runTestReader(ctx context.Context) error {
104104
testerConfig.readConsumer,
105105
)
106106

107-
ok, _ := apply.Confirm("OK to continue?", false)
107+
ok, _ := util.Confirm("OK to continue?", false)
108108
if !ok {
109109
return errors.New("Stopping because of user response")
110110
}
@@ -153,7 +153,7 @@ func runTestWriter(ctx context.Context) error {
153153
testerConfig.writeRate,
154154
)
155155

156-
ok, _ := apply.Confirm("OK to continue?", false)
156+
ok, _ := util.Confirm("OK to continue?", false)
157157
if !ok {
158158
return errors.New("Stopping because of user response")
159159
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
meta:
2+
name: acl-default
3+
cluster: local-cluster-auth
4+
environment: local-env
5+
region: local-region
6+
description: |
7+
This is a default ACL for the local cluster.
8+
It grants read and describe access to the topic `my-topic` and read access to the group `my-group`
9+
to the user `default`.
10+
11+
spec:
12+
acls:
13+
- resource:
14+
type: topic
15+
name: my-topic
16+
patternType: literal
17+
principal: 'User:default'
18+
host: '*'
19+
permission: allow
20+
operations:
21+
- Read
22+
- Describe
23+
- resource:
24+
type: group
25+
name: my-group
26+
patternType: prefixed
27+
principal: 'User:default'
28+
host: '*'
29+
permission: allow
30+
operations:
31+
- Read

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ require (
1111
github.com/hashicorp/go-multierror v1.1.1
1212
github.com/olekukonko/tablewriter v0.0.5
1313
github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da
14-
github.com/segmentio/kafka-go v0.4.44
14+
github.com/segmentio/kafka-go v0.4.45
1515
github.com/segmentio/kafka-go/sasl/aws_msk_iam v0.0.0-20220211180808-78889264d070
1616
github.com/sirupsen/logrus v1.9.0
1717
github.com/spf13/cobra v1.5.0

0 commit comments

Comments
 (0)