diff --git a/cmds/ocm/app/app.go b/cmds/ocm/app/app.go index 43513dd939..c32383889e 100644 --- a/cmds/ocm/app/app.go +++ b/cmds/ocm/app/app.go @@ -24,6 +24,7 @@ import ( "github.com/open-component-model/ocm/cmds/ocm/commands/ocmcmds/componentarchive" "github.com/open-component-model/ocm/cmds/ocm/commands/ocmcmds/components" "github.com/open-component-model/ocm/cmds/ocm/commands/ocmcmds/plugins" + "github.com/open-component-model/ocm/cmds/ocm/commands/ocmcmds/pubsub" "github.com/open-component-model/ocm/cmds/ocm/commands/ocmcmds/references" "github.com/open-component-model/ocm/cmds/ocm/commands/ocmcmds/resources" "github.com/open-component-model/ocm/cmds/ocm/commands/ocmcmds/routingslips" @@ -44,6 +45,7 @@ import ( "github.com/open-component-model/ocm/cmds/ocm/commands/verbs/hash" "github.com/open-component-model/ocm/cmds/ocm/commands/verbs/install" "github.com/open-component-model/ocm/cmds/ocm/commands/verbs/list" + "github.com/open-component-model/ocm/cmds/ocm/commands/verbs/set" "github.com/open-component-model/ocm/cmds/ocm/commands/verbs/show" "github.com/open-component-model/ocm/cmds/ocm/commands/verbs/sign" "github.com/open-component-model/ocm/cmds/ocm/commands/verbs/transfer" @@ -57,6 +59,7 @@ import ( topicocmaccessmethods "github.com/open-component-model/ocm/cmds/ocm/topics/ocm/accessmethods" topicocmdownloaders "github.com/open-component-model/ocm/cmds/ocm/topics/ocm/downloadhandlers" topicocmlabels "github.com/open-component-model/ocm/cmds/ocm/topics/ocm/labels" + topicocmpubsub "github.com/open-component-model/ocm/cmds/ocm/topics/ocm/pubsub" topicocmrefs "github.com/open-component-model/ocm/cmds/ocm/topics/ocm/refs" topicocmuploaders "github.com/open-component-model/ocm/cmds/ocm/topics/ocm/uploadhandlers" topicbootstrap "github.com/open-component-model/ocm/cmds/ocm/topics/toi/bootstrapping" @@ -222,6 +225,7 @@ func newCliCommand(opts *CLIOptions, mod ...func(clictx.Context, *cobra.Command) cmd.AddCommand(check.NewCommand(opts.Context)) cmd.AddCommand(get.NewCommand(opts.Context)) + cmd.AddCommand(set.NewCommand(opts.Context)) cmd.AddCommand(list.NewCommand(opts.Context)) cmd.AddCommand(create.NewCommand(opts.Context)) cmd.AddCommand(add.NewCommand(opts.Context)) @@ -246,6 +250,7 @@ func newCliCommand(opts *CLIOptions, mod ...func(clictx.Context, *cobra.Command) cmd.AddCommand(cmdutils.HideCommand(plugins.NewCommand(opts.Context))) cmd.AddCommand(cmdutils.HideCommand(action.NewCommand(opts.Context))) cmd.AddCommand(cmdutils.HideCommand(routingslips.NewCommand(opts.Context))) + cmd.AddCommand(cmdutils.HideCommand(pubsub.NewCommand(opts.Context))) cmd.AddCommand(cmdutils.OverviewCommand(cachecmds.NewCommand(opts.Context))) cmd.AddCommand(cmdutils.OverviewCommand(ocicmds.NewCommand(opts.Context))) @@ -268,6 +273,7 @@ func newCliCommand(opts *CLIOptions, mod ...func(clictx.Context, *cobra.Command) cmd.AddCommand(topicocmuploaders.New(ctx)) cmd.AddCommand(topicocmdownloaders.New(ctx)) cmd.AddCommand(topicocmlabels.New(ctx)) + cmd.AddCommand(topicocmpubsub.New(ctx)) cmd.AddCommand(attributes.New(ctx)) cmd.AddCommand(topicbootstrap.New(ctx, "toi-bootstrapping")) @@ -280,6 +286,7 @@ func newCliCommand(opts *CLIOptions, mod ...func(clictx.Context, *cobra.Command) help.AddCommand(topicocmuploaders.New(ctx)) help.AddCommand(topicocmdownloaders.New(ctx)) help.AddCommand(topicocmlabels.New(ctx)) + help.AddCommand(topicocmpubsub.New(ctx)) help.AddCommand(attributes.New(ctx)) help.AddCommand(topicbootstrap.New(ctx, "toi-bootstrapping")) diff --git a/cmds/ocm/commands/ocmcmds/cmd.go b/cmds/ocm/commands/ocmcmds/cmd.go index 25d5145b40..813c2184e0 100644 --- a/cmds/ocm/commands/ocmcmds/cmd.go +++ b/cmds/ocm/commands/ocmcmds/cmd.go @@ -7,6 +7,7 @@ import ( "github.com/open-component-model/ocm/cmds/ocm/commands/ocmcmds/components" "github.com/open-component-model/ocm/cmds/ocm/commands/ocmcmds/ctf" "github.com/open-component-model/ocm/cmds/ocm/commands/ocmcmds/plugins" + "github.com/open-component-model/ocm/cmds/ocm/commands/ocmcmds/pubsub" "github.com/open-component-model/ocm/cmds/ocm/commands/ocmcmds/references" "github.com/open-component-model/ocm/cmds/ocm/commands/ocmcmds/resourceconfig" "github.com/open-component-model/ocm/cmds/ocm/commands/ocmcmds/resources" @@ -38,6 +39,7 @@ func NewCommand(ctx clictx.Context) *cobra.Command { cmd.AddCommand(versions.NewCommand(ctx)) cmd.AddCommand(plugins.NewCommand(ctx)) cmd.AddCommand(routingslips.NewCommand(ctx)) + cmd.AddCommand(pubsub.NewCommand(ctx)) cmd.AddCommand(topicocmrefs.New(ctx)) cmd.AddCommand(topicocmaccessmethods.New(ctx)) diff --git a/cmds/ocm/commands/ocmcmds/names/names.go b/cmds/ocm/commands/ocmcmds/names/names.go index bf3b44b534..7080e38dcf 100644 --- a/cmds/ocm/commands/ocmcmds/names/names.go +++ b/cmds/ocm/commands/ocmcmds/names/names.go @@ -15,4 +15,5 @@ var ( Plugins = []string{"plugins", "plugin", "p"} Action = []string{"action"} RoutingSlips = []string{"routingslips", "routingslip", "rs"} + PubSub = []string{"pubsub", "ps"} ) diff --git a/cmds/ocm/commands/ocmcmds/pubsub/cmd.go b/cmds/ocm/commands/ocmcmds/pubsub/cmd.go new file mode 100644 index 0000000000..478e40da39 --- /dev/null +++ b/cmds/ocm/commands/ocmcmds/pubsub/cmd.go @@ -0,0 +1,23 @@ +package pubsub + +import ( + "github.com/spf13/cobra" + + "github.com/open-component-model/ocm/cmds/ocm/commands/ocmcmds/names" + "github.com/open-component-model/ocm/cmds/ocm/commands/ocmcmds/pubsub/get" + "github.com/open-component-model/ocm/cmds/ocm/commands/ocmcmds/pubsub/set" + "github.com/open-component-model/ocm/cmds/ocm/pkg/utils" + "github.com/open-component-model/ocm/pkg/contexts/clictx" +) + +var Names = names.PubSub + +// NewCommand creates a new pubsub command. +func NewCommand(ctx clictx.Context) *cobra.Command { + cmd := utils.MassageCommand(&cobra.Command{ + Short: "Commands acting on sub/sub specifications", + }, Names...) + cmd.AddCommand(get.NewCommand(ctx, get.Verb)) + cmd.AddCommand(set.NewCommand(ctx, set.Verb)) + return cmd +} diff --git a/cmds/ocm/commands/ocmcmds/pubsub/get/cmd.go b/cmds/ocm/commands/ocmcmds/pubsub/get/cmd.go new file mode 100644 index 0000000000..4f6d2e17b7 --- /dev/null +++ b/cmds/ocm/commands/ocmcmds/pubsub/get/cmd.go @@ -0,0 +1,132 @@ +package get + +import ( + "strings" + + "github.com/mandelsoft/goutils/sliceutils" + "github.com/spf13/cobra" + + "github.com/open-component-model/ocm/cmds/ocm/commands/ocmcmds/names" + "github.com/open-component-model/ocm/cmds/ocm/commands/verbs" + "github.com/open-component-model/ocm/cmds/ocm/pkg/output" + "github.com/open-component-model/ocm/cmds/ocm/pkg/processing" + "github.com/open-component-model/ocm/cmds/ocm/pkg/utils" + "github.com/open-component-model/ocm/pkg/contexts/clictx" + "github.com/open-component-model/ocm/pkg/contexts/ocm" + "github.com/open-component-model/ocm/pkg/contexts/ocm/cpi" + "github.com/open-component-model/ocm/pkg/contexts/ocm/pubsub" +) + +var ( + Names = names.PubSub + Verb = verbs.Get +) + +type Command struct { + utils.BaseCommand + + RepoSpecs []string +} + +var _ utils.OCMCommand = (*Command)(nil) + +// NewCommand creates a new pubsub command. +func NewCommand(ctx clictx.Context, names ...string) *cobra.Command { + return utils.SetupCommand(&Command{BaseCommand: utils.NewBaseCommand(ctx, output.OutputOptions(outputs))}, utils.Names(Names, names...)...) +} + +func (o *Command) ForName(name string) *cobra.Command { + return &cobra.Command{ + Use: "{}", + Short: "Get the pubsub spec for an ocm repository", + Long: ` +A repository may be able to store a publish/subscribe specification +to propagate the creation or update of component version. +If such an implementation is available and a specification is +assigned to the repository, it is shown. The specification +can be set with the ocm set pubsub. +`, + } +} + +func (o *Command) Complete(args []string) error { + o.RepoSpecs = args + return nil +} + +func (o *Command) Run() error { + return utils.HandleOutputsFor("repository spec", output.From(o), o.transform, o.RepoSpecs...) +} + +func (o *Command) transform(in string) *Repo { + var spec cpi.RepositorySpec + rs := &Repo{RepoSpec: in} + u, err := ocm.ParseRepo(in) + if err == nil { + spec, err = o.OCMContext().MapUniformRepositorySpec(&u) + } + if err == nil { + rs.Repo, err = o.OCMContext().RepositoryForSpec(spec) + } + if err == nil { + rs.Spec, err = pubsub.SpecForRepo(rs.Repo) + } + if err != nil { + rs.Error = err.Error() + } + return rs +} + +type Repo struct { + RepoSpec string `json:"repository"` + Repo cpi.Repository `json:"-"` + Spec pubsub.PubSubSpec `json:"pubsub,omitempty"` + Error string `json:"error,omitempty"` +} + +var _ output.Manifest = (*Repo)(nil) + +func (r *Repo) AsManifest() interface{} { + return r +} + +var outputs = output.NewOutputs(getRegular).AddManifestOutputs() + +func TableOutput(opts *output.Options, mapping processing.MappingFunction) *output.TableOutput { + return &output.TableOutput{ + Headers: output.Fields("REPOSITORY", "PUBSUBTYPE", "ERROR"), + Options: opts, + Mapping: mapping, + } +} + +func getRegular(opts *output.Options) output.Output { + return TableOutput(opts, mapGetRegularOutput).New() +} + +func mapGetRegularOutput(e interface{}) interface{} { + r := e.(*Repo) + if r.Error != "" { + return output.Fields(r.RepoSpec, "", r.Error) + } + if r.Spec == nil { + return output.Fields(r.RepoSpec, "-", "") + } + list := sliceutils.Slice[string]{} + Add(r.Repo.GetContext(), r.Spec, &list) + strings.Join(list, ", ") + + return output.Fields(r.RepoSpec, strings.Join(list, ", "), "") +} + +func Add(ctx cpi.Context, s pubsub.PubSubSpec, slice *sliceutils.Slice[string]) { + if s == nil { + return + } + slice.Add(s.GetKind()) + if u, ok := s.(pubsub.Unwrapable); ok { + for _, n := range u.Unwrap(ctx) { + Add(ctx, n, slice) + } + } +} diff --git a/cmds/ocm/commands/ocmcmds/pubsub/get/cmd_test.go b/cmds/ocm/commands/ocmcmds/pubsub/get/cmd_test.go new file mode 100644 index 0000000000..f9f6dea72f --- /dev/null +++ b/cmds/ocm/commands/ocmcmds/pubsub/get/cmd_test.go @@ -0,0 +1,106 @@ +package get_test + +import ( + "bytes" + "fmt" + + . "github.com/mandelsoft/goutils/testutils" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + . "github.com/open-component-model/ocm/cmds/ocm/testhelper" + + "github.com/open-component-model/ocm/pkg/common/accessio" + "github.com/open-component-model/ocm/pkg/contexts/ocm" + "github.com/open-component-model/ocm/pkg/contexts/ocm/pubsub" + "github.com/open-component-model/ocm/pkg/contexts/ocm/pubsub/providers/ocireg" + "github.com/open-component-model/ocm/pkg/contexts/ocm/repositories/ctf" + "github.com/open-component-model/ocm/pkg/runtime" +) + +const ARCH = "ctf" +const ARCH2 = "ctf2" + +var _ = Describe("Test Environment", func() { + var env *TestEnv + + BeforeEach(func() { + env = NewTestEnv() + env.OCMCommonTransport(ARCH, accessio.FormatDirectory) + env.OCMCommonTransport(ARCH2, accessio.FormatDirectory) + attr := pubsub.For(env) + attr.ProviderRegistry.Register(ctf.Type, &ocireg.Provider{}) + attr.TypeScheme.Register(pubsub.NewPubSubType[*Spec](Type)) + attr.TypeScheme.Register(pubsub.NewPubSubType[*Spec](TypeV1)) + + repo := Must(ctf.Open(env, ctf.ACC_WRITABLE, ARCH, 0o600, env)) + defer repo.Close() + MustBeSuccessful(pubsub.SetForRepo(repo, NewSpec("testtarget"))) + }) + + AfterEach(func() { + env.Cleanup() + }) + + It("get pubsub", func() { + var buf bytes.Buffer + + MustBeSuccessful(env.CatchOutput(&buf).Execute("get", "pubsub", ARCH)) + Expect(buf.String()).To(StringEqualTrimmedWithContext(` +REPOSITORY PUBSUBTYPE ERROR +ctf test +`)) + }) + + It("get pubsub list", func() { + var buf bytes.Buffer + + MustBeSuccessful(env.CatchOutput(&buf).Execute("get", "pubsub", ARCH, ARCH2, "ARCH2")) + Expect(buf.String()).To(StringEqualTrimmedWithContext(` +REPOSITORY PUBSUBTYPE ERROR +ctf test +ctf2 - +ARCH2 repository "ARCH2" is unknown +`)) + }) + + It("get pubsub yaml", func() { + var buf bytes.Buffer + + MustBeSuccessful(env.CatchOutput(&buf).Execute("get", "pubsub", ARCH, "-o", "yaml")) + Expect(buf.String()).To(StringEqualTrimmedWithContext(` +--- +pubsub: + target: testtarget + type: test +repository: ctf +`)) + }) +}) + +//////////////////////////////////////////////////////////////////////////////// + +const ( + Type = "test" + TypeV1 = Type + runtime.VersionSeparator + "v1" +) + +type Spec struct { + runtime.ObjectVersionedType + Target string `json:"target"` +} + +var ( + _ pubsub.PubSubSpec = (*Spec)(nil) +) + +func NewSpec(target string) *Spec { + return &Spec{runtime.NewVersionedObjectType(Type), target} +} + +func (s *Spec) PubSubMethod(repo ocm.Repository) (pubsub.PubSubMethod, error) { + return nil, nil +} + +func (s *Spec) Describe(_ ocm.Context) string { + return fmt.Sprintf("test pubsub") +} diff --git a/cmds/ocm/commands/ocmcmds/pubsub/get/suite_test.go b/cmds/ocm/commands/ocmcmds/pubsub/get/suite_test.go new file mode 100644 index 0000000000..e4083958ea --- /dev/null +++ b/cmds/ocm/commands/ocmcmds/pubsub/get/suite_test.go @@ -0,0 +1,13 @@ +package get_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestConfig(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "get pubsub Test Suite") +} diff --git a/cmds/ocm/commands/ocmcmds/pubsub/set/cmd.go b/cmds/ocm/commands/ocmcmds/pubsub/set/cmd.go new file mode 100644 index 0000000000..fbc815a15e --- /dev/null +++ b/cmds/ocm/commands/ocmcmds/pubsub/set/cmd.go @@ -0,0 +1,123 @@ +package set + +import ( + "fmt" + + "github.com/spf13/cobra" + "github.com/spf13/pflag" + + "github.com/open-component-model/ocm/cmds/ocm/commands/ocmcmds/names" + "github.com/open-component-model/ocm/cmds/ocm/commands/verbs" + "github.com/open-component-model/ocm/cmds/ocm/pkg/utils" + "github.com/open-component-model/ocm/pkg/contexts/clictx" + "github.com/open-component-model/ocm/pkg/contexts/ocm" + "github.com/open-component-model/ocm/pkg/contexts/ocm/cpi" + "github.com/open-component-model/ocm/pkg/contexts/ocm/pubsub" + "github.com/open-component-model/ocm/pkg/out" + utils2 "github.com/open-component-model/ocm/pkg/utils" +) + +var ( + Names = names.PubSub + Verb = verbs.Set +) + +type Command struct { + utils.BaseCommand + + Delete bool + + RepoSpec string + Spec []byte +} + +var _ utils.OCMCommand = (*Command)(nil) + +// NewCommand creates a new pubsub command. +func NewCommand(ctx clictx.Context, names ...string) *cobra.Command { + return utils.SetupCommand(&Command{BaseCommand: utils.NewBaseCommand(ctx)}, utils.Names(Names, names...)...) +} + +func (o *Command) ForName(name string) *cobra.Command { + return &cobra.Command{ + Use: "{} []", + Short: "Set the pubsub spec for an ocm repository", + Long: ` +A repository may be able to store a publish/subscribe specification +to propagate the creation or update of component version. +If such an implementation is available this command can be used +to set the pub/sub specification for a repository. +If no specification is given an existing specification +will be removed for the given repository. +The specification +can be queried with the ocm get pubsub. +Types and specification formats are shown for the topic +ocm ocm-pubsub. +`, + Args: cobra.RangeArgs(1, 2), + } +} + +func (o *Command) AddFlags(set *pflag.FlagSet) { + set.BoolVarP(&o.Delete, "delete", "d", false, "delete pub/sub configuration") +} + +func (o *Command) Complete(args []string) error { + var err error + + o.RepoSpec = args[0] + if len(args) > 1 { + if o.Delete { + return fmt.Errorf("delete does not require a specification argument") + } + o.Spec, err = utils2.ResolveData(args[1], o.FileSystem()) + if err != nil { + return err + } + } else { + if !o.Delete { + return fmt.Errorf("pub/sub specification argument required") + } + } + return nil +} + +func (o *Command) Run() error { + var spec cpi.RepositorySpec + var repo cpi.Repository + var ps pubsub.PubSubSpec + + u, err := ocm.ParseRepo(o.RepoSpec) + if err == nil && o.Spec != nil { + ps, err = pubsub.SpecForData(o, o.Spec) + } + if err == nil { + spec, err = o.OCMContext().MapUniformRepositorySpec(&u) + } + if err == nil { + repo, err = o.OCMContext().RepositoryForSpec(spec) + } + if err == nil { + defer repo.Close() + if o.Spec == nil { + ps, err = pubsub.SpecForRepo(repo) + if err == nil { + err = pubsub.SetForRepo(repo, nil) + } + } else { + err = pubsub.SetForRepo(repo, ps) + } + } + if err == nil { + if o.Spec == nil { + if ps == nil { + out.Outf(o, "no pubsub spec configured for repository %q\n", o.RepoSpec) + } else { + out.Outf(o, "removed pubsub spec %q for repository %q\n", ps.GetKind(), o.RepoSpec) + } + } else { + out.Outf(o, "set pubsub spec %q for repository %q\n", ps.GetKind(), o.RepoSpec) + } + } + return err +} diff --git a/cmds/ocm/commands/ocmcmds/pubsub/set/cmd_test.go b/cmds/ocm/commands/ocmcmds/pubsub/set/cmd_test.go new file mode 100644 index 0000000000..099cb4bc87 --- /dev/null +++ b/cmds/ocm/commands/ocmcmds/pubsub/set/cmd_test.go @@ -0,0 +1,110 @@ +package set_test + +import ( + "bytes" + "encoding/json" + "fmt" + + . "github.com/mandelsoft/goutils/testutils" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + . "github.com/open-component-model/ocm/cmds/ocm/testhelper" + "github.com/open-component-model/ocm/pkg/contexts/ocm/pubsub/providers/ocireg" + + "github.com/open-component-model/ocm/pkg/common/accessio" + "github.com/open-component-model/ocm/pkg/contexts/ocm" + "github.com/open-component-model/ocm/pkg/contexts/ocm/pubsub" + "github.com/open-component-model/ocm/pkg/contexts/ocm/repositories/ctf" + "github.com/open-component-model/ocm/pkg/runtime" +) + +const ARCH = "ctf" + +var _ = Describe("Test Environment", func() { + var env *TestEnv + + BeforeEach(func() { + env = NewTestEnv() + env.OCMCommonTransport(ARCH, accessio.FormatDirectory) + + attr := pubsub.For(env) + attr.ProviderRegistry.Register(ctf.Type, &ocireg.Provider{}) + attr.TypeScheme.Register(pubsub.NewPubSubType[*Spec](Type)) + attr.TypeScheme.Register(pubsub.NewPubSubType[*Spec](TypeV1)) + }) + + AfterEach(func() { + env.Cleanup() + }) + + It("sets pubsub", func() { + var buf bytes.Buffer + + spec := Must(json.Marshal(NewSpec("testtarget"))) + + MustBeSuccessful(env.CatchOutput(&buf).Execute("set", "pubsub", ARCH, string(spec))) + Expect(buf.String()).To(StringEqualTrimmedWithContext(` +set pubsub spec "test" for repository "ctf" +`)) + + repo := Must(ctf.Open(env, ctf.ACC_WRITABLE, ARCH, 0o600, env)) + defer Close(repo) + raw := Must(pubsub.SpecForRepo(repo)) + Expect(raw).To(YAMLEqual(spec)) + }) + + It("removes pubsub for non-existing", func() { + var buf bytes.Buffer + + MustBeSuccessful(env.CatchOutput(&buf).Execute("set", "pubsub", "-d", ARCH)) + Expect(buf.String()).To(StringEqualTrimmedWithContext(` +no pubsub spec configured for repository "ctf" +`)) + }) + + It("removes pubsub", func() { + var buf bytes.Buffer + + repo := Must(ctf.Open(env, ctf.ACC_WRITABLE, ARCH, 0o600, env)) + err := pubsub.SetForRepo(repo, NewSpec("testtarget")) + MustBeSuccessful(repo.Close()) + MustBeSuccessful(err) + + MustBeSuccessful(env.CatchOutput(&buf).Execute("set", "pubsub", "-d", ARCH)) + Expect(buf.String()).To(StringEqualTrimmedWithContext(` +removed pubsub spec "test" for repository "ctf" +`)) + + repo = Must(ctf.Open(env, ctf.ACC_WRITABLE, ARCH, 0o600, env)) + defer Close(repo) + Expect(Must(pubsub.SpecForRepo(repo))).To(BeNil()) + }) +}) + +//////////////////////////////////////////////////////////////////////////////// + +const ( + Type = "test" + TypeV1 = Type + runtime.VersionSeparator + "v1" +) + +type Spec struct { + runtime.ObjectVersionedType + Target string `json:"target"` +} + +var ( + _ pubsub.PubSubSpec = (*Spec)(nil) +) + +func NewSpec(target string) *Spec { + return &Spec{runtime.NewVersionedObjectType(Type), target} +} + +func (s *Spec) PubSubMethod(repo ocm.Repository) (pubsub.PubSubMethod, error) { + return nil, nil +} + +func (s *Spec) Describe(_ ocm.Context) string { + return fmt.Sprintf("test pubsub") +} diff --git a/cmds/ocm/commands/ocmcmds/pubsub/set/suite_test.go b/cmds/ocm/commands/ocmcmds/pubsub/set/suite_test.go new file mode 100644 index 0000000000..742bcc22cc --- /dev/null +++ b/cmds/ocm/commands/ocmcmds/pubsub/set/suite_test.go @@ -0,0 +1,13 @@ +package set_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestConfig(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "set pubsub Test Suite") +} diff --git a/cmds/ocm/commands/verbs/get/cmd.go b/cmds/ocm/commands/verbs/get/cmd.go index ec96f4eb7e..085dcf266d 100644 --- a/cmds/ocm/commands/verbs/get/cmd.go +++ b/cmds/ocm/commands/verbs/get/cmd.go @@ -8,6 +8,7 @@ import ( artifacts "github.com/open-component-model/ocm/cmds/ocm/commands/ocicmds/artifacts/get" components "github.com/open-component-model/ocm/cmds/ocm/commands/ocmcmds/components/get" plugins "github.com/open-component-model/ocm/cmds/ocm/commands/ocmcmds/plugins/get" + pubsub "github.com/open-component-model/ocm/cmds/ocm/commands/ocmcmds/pubsub/get" references "github.com/open-component-model/ocm/cmds/ocm/commands/ocmcmds/references/get" resources "github.com/open-component-model/ocm/cmds/ocm/commands/ocmcmds/resources/get" routingslips "github.com/open-component-model/ocm/cmds/ocm/commands/ocmcmds/routingslips/get" @@ -31,5 +32,6 @@ func NewCommand(ctx clictx.Context) *cobra.Command { cmd.AddCommand(plugins.NewCommand(ctx)) cmd.AddCommand(routingslips.NewCommand(ctx)) cmd.AddCommand(config.NewCommand(ctx)) + cmd.AddCommand(pubsub.NewCommand(ctx)) return cmd } diff --git a/cmds/ocm/commands/verbs/set/cmd.go b/cmds/ocm/commands/verbs/set/cmd.go new file mode 100644 index 0000000000..9349e3571c --- /dev/null +++ b/cmds/ocm/commands/verbs/set/cmd.go @@ -0,0 +1,19 @@ +package set + +import ( + "github.com/spf13/cobra" + + pubsub "github.com/open-component-model/ocm/cmds/ocm/commands/ocmcmds/pubsub/set" + "github.com/open-component-model/ocm/cmds/ocm/commands/verbs" + "github.com/open-component-model/ocm/cmds/ocm/pkg/utils" + "github.com/open-component-model/ocm/pkg/contexts/clictx" +) + +// NewCommand creates a new set command. +func NewCommand(ctx clictx.Context) *cobra.Command { + cmd := utils.MassageCommand(&cobra.Command{ + Short: "Set information about OCM repositories", + }, verbs.Set) + cmd.AddCommand(pubsub.NewCommand(ctx)) + return cmd +} diff --git a/cmds/ocm/commands/verbs/verbs.go b/cmds/ocm/commands/verbs/verbs.go index 578e73d313..8d77b71b20 100644 --- a/cmds/ocm/commands/verbs/verbs.go +++ b/cmds/ocm/commands/verbs/verbs.go @@ -2,6 +2,7 @@ package verbs const ( Get = "get" + Set = "set" List = "list" Check = "check" Describe = "describe" diff --git a/cmds/ocm/pkg/utils/handling.go b/cmds/ocm/pkg/utils/handling.go index ba2d2193a1..90b554d3b9 100644 --- a/cmds/ocm/pkg/utils/handling.go +++ b/cmds/ocm/pkg/utils/handling.go @@ -6,6 +6,7 @@ import ( "reflect" "github.com/mandelsoft/goutils/errors" + "github.com/mandelsoft/goutils/transformer" "github.com/open-component-model/ocm/cmds/ocm/pkg/output" ) @@ -98,3 +99,19 @@ func HandleOutput(output output.Output, handler TypeHandler, specs ...ElemSpec) } return output.Out() } + +func HandleOutputsFor[I, O any](name string, opts *output.Options, t transformer.Transformer[I, O], specs ...I) error { + if len(specs) == 0 { + return fmt.Errorf("%s required", name) + } + output := opts.Output + + for _, s := range specs { + output.Add(t(s)) + } + err := output.Close() + if err != nil { + return err + } + return output.Out() +} diff --git a/cmds/ocm/topics/ocm/pubsub/topic.go b/cmds/ocm/topics/ocm/pubsub/topic.go new file mode 100644 index 0000000000..cd7fc6db3b --- /dev/null +++ b/cmds/ocm/topics/ocm/pubsub/topic.go @@ -0,0 +1,26 @@ +package topicocmpubsub + +import ( + "github.com/spf13/cobra" + + "github.com/open-component-model/ocm/pkg/contexts/clictx" + "github.com/open-component-model/ocm/pkg/contexts/ocm/pubsub" +) + +func New(ctx clictx.Context) *cobra.Command { + attr := pubsub.For(ctx) + return &cobra.Command{ + Use: "ocm-pubsub", + Short: "List of all supported publish/subscribe implementations", + + Long: ` +An OCM repository can be configured to propagate change events via a +publish/subscribe system, if there is a persistence provider for the dedicated +repository type. If available any known publish/subscribe system can +be configured with ocm set pubsub and shown with +ocm get pubsub. Hereby, the pub/sub system +is described by a typed specification. + +` + pubsub.PubSubUsage(attr.TypeScheme, attr.ProviderRegistry, true), + } +} diff --git a/docs/reference/ocm.md b/docs/reference/ocm.md index 70560cc8ce..bf1e08c570 100644 --- a/docs/reference/ocm.md +++ b/docs/reference/ocm.md @@ -373,6 +373,7 @@ by a certificate delivered with the signature. * [ocm hash](ocm_hash.md) — Hash and normalization operations * [ocm install](ocm_install.md) — Install elements. * [ocm list](ocm_list.md) — List information about components +* [ocm set](ocm_set.md) — Set information about OCM repositories * [ocm show](ocm_show.md) — Show tags or versions * [ocm sign](ocm_sign.md) — Sign components or hashes * [ocm transfer](ocm_transfer.md) — Transfer artifacts or components @@ -400,6 +401,7 @@ by a certificate delivered with the signature. * [ocm ocm-accessmethods](ocm_ocm-accessmethods.md) — List of all supported access methods * [ocm ocm-downloadhandlers](ocm_ocm-downloadhandlers.md) — List of all available download handlers * [ocm ocm-labels](ocm_ocm-labels.md) — Labels and Label Merging +* [ocm ocm-pubsub](ocm_ocm-pubsub.md) — List of all supported publish/subscribe implementations * [ocm ocm-references](ocm_ocm-references.md) — notation for OCM references * [ocm ocm-uploadhandlers](ocm_ocm-uploadhandlers.md) — List of all available upload handlers * [ocm toi-bootstrapping](ocm_toi-bootstrapping.md) — Tiny OCM Installer based on component versions diff --git a/docs/reference/ocm_credential-handling.md b/docs/reference/ocm_credential-handling.md index 32dc2e39f8..2159e71cb4 100644 --- a/docs/reference/ocm_credential-handling.md +++ b/docs/reference/ocm_credential-handling.md @@ -111,13 +111,18 @@ The following credential consumer types are used/supported: - key: secret key use to access the credential server - - Github: GitHub credential matcher + - Github: Redis PubSub credential matcher + + This matcher is a hostpath matcher with additional attributes: + + - *channel* (required if set in pattern): the channel name + - *database* the database number - This matcher is a hostpath matcher. Credential consumers of the consumer type Github evaluate the following credential properties: - - token: GitHub personal access token + - username: Redis username + - password: Redis password - HashiCorpVault: HashiCorp Vault credential matcher diff --git a/docs/reference/ocm_get.md b/docs/reference/ocm_get.md index 5c5149a7d9..a881bcf603 100644 --- a/docs/reference/ocm_get.md +++ b/docs/reference/ocm_get.md @@ -26,6 +26,7 @@ ocm get [] ... * [ocm get config](ocm_get_config.md) — Get evaluated config for actual command call * [ocm get credentials](ocm_get_credentials.md) — Get credentials for a dedicated consumer spec * [ocm get plugins](ocm_get_plugins.md) — get plugins +* [ocm get pubsub](ocm_get_pubsub.md) — Get the pubsub spec for an ocm repository * [ocm get references](ocm_get_references.md) — get references of a component version * [ocm get resources](ocm_get_resources.md) — get resources of a component version * [ocm get routingslips](ocm_get_routingslips.md) — get routings slips for a component version diff --git a/docs/reference/ocm_get_credentials.md b/docs/reference/ocm_get_credentials.md index 1a7205d394..d187386cd2 100644 --- a/docs/reference/ocm_get_credentials.md +++ b/docs/reference/ocm_get_credentials.md @@ -37,13 +37,18 @@ Matchers exist for the following usage contexts or consumer types: - key: secret key use to access the credential server - - Github: GitHub credential matcher + - Github: Redis PubSub credential matcher + + This matcher is a hostpath matcher with additional attributes: + + - *channel* (required if set in pattern): the channel name + - *database* the database number - This matcher is a hostpath matcher. Credential consumers of the consumer type Github evaluate the following credential properties: - - token: GitHub personal access token + - username: Redis username + - password: Redis password - HashiCorpVault: HashiCorp Vault credential matcher diff --git a/docs/reference/ocm_get_pubsub.md b/docs/reference/ocm_get_pubsub.md new file mode 100644 index 0000000000..8df9aee94a --- /dev/null +++ b/docs/reference/ocm_get_pubsub.md @@ -0,0 +1,53 @@ +## ocm get pubsub — Get The Pubsub Spec For An Ocm Repository + +### Synopsis + +``` +ocm get pubsub {} +``` + +##### Aliases + +``` +pubsub, ps +``` + +### Options + +``` + -h, --help help for pubsub + -o, --output string output mode (JSON, json, yaml) + -s, --sort stringArray sort fields +``` + +### Description + + +A repository may be able to store a publish/subscribe specification +to propagate the creation or update of component version. +If such an implementation is available and a specification is +assigned to the repository, it is shown. The specification +can be set with the [ocm set pubsub](ocm_set_pubsub.md). + + +With the option --output the output mode can be selected. +The following modes are supported: + - (default) + - JSON + - json + - yaml + + +### SEE ALSO + +##### Parents + +* [ocm get](ocm_get.md) — Get information about artifacts and components +* [ocm](ocm.md) — Open Component Model command line client + + + +##### Additional Links + +* [ocm set pubsub](ocm_set_pubsub.md) — Set the pubsub spec for an ocm repository + diff --git a/docs/reference/ocm_ocm-pubsub.md b/docs/reference/ocm_ocm-pubsub.md new file mode 100644 index 0000000000..b8349b4fb0 --- /dev/null +++ b/docs/reference/ocm_ocm-pubsub.md @@ -0,0 +1,66 @@ +## ocm ocm-pubsub — List Of All Supported Publish/Subscribe Implementations + +### Description + + +An OCM repository can be configured to propagate change events via a +publish/subscribe system, if there is a persistence provider for the dedicated +repository type. If available any known publish/subscribe system can +be configured with [ocm set pubsub](ocm_set_pubsub.md) and shown with +[ocm get pubsub](ocm_get_pubsub.md). Hereby, the pub/sub system +is described by a typed specification. + + +The following list describes the supported publish/subscribe system types, their +specification versions, and formats: + +- PubSub type compound + + A pub/sub system forwarding events to described sub-level systems. + + The following versions are supported: + - Version v1 + + It is described by the following field: + + - **specifications** *list of pubsub specs* + + A list of nested sub-level specifications the events should be + forwarded to. + + +- PubSub type redis + + a redis pubsub sytsem. + + The following versions are supported: + - Version v1 + + It is describe by the following field: + + - **serverAddr** *Address of redis server* + - **channel** *pubsub channel* + - **database** *database number* + + Publishing using the redis pubsub API. For every change a string message + with the format : is published. If multiple repositories + should be used, each repository should be configured with a different + channel. + +There are persistence providers for the following repository types: + - OCIRegistry + + +### SEE ALSO + +##### Parents + +* [ocm](ocm.md) — Open Component Model command line client + + + +##### Additional Links + +* [ocm set pubsub](ocm_set_pubsub.md) — Set the pubsub spec for an ocm repository +* [ocm get pubsub](ocm_get_pubsub.md) — Get the pubsub spec for an ocm repository + diff --git a/docs/reference/ocm_ocm.md b/docs/reference/ocm_ocm.md index bd25bbb805..5587501be2 100644 --- a/docs/reference/ocm_ocm.md +++ b/docs/reference/ocm_ocm.md @@ -25,6 +25,7 @@ ocm ocm [] ... * ocm ocm componentarchive — Commands acting on component archives * ocm ocm componentversions — Commands acting on components * ocm ocm plugins — Commands related to OCM plugins +* ocm ocm pubsub — Commands acting on sub/sub specifications * ocm ocm references — Commands related to component references in component versions * ocm ocm resource-configuration — Commands acting on component resource specifications * ocm ocm resources — Commands acting on component resources diff --git a/docs/reference/ocm_set.md b/docs/reference/ocm_set.md new file mode 100644 index 0000000000..c7123228bf --- /dev/null +++ b/docs/reference/ocm_set.md @@ -0,0 +1,25 @@ +## ocm set — Set Information About OCM Repositories + +### Synopsis + +``` +ocm set [] ... +``` + +### Options + +``` + -h, --help help for set +``` + +### SEE ALSO + +##### Parents + +* [ocm](ocm.md) — Open Component Model command line client + + +##### Sub Commands + +* [ocm set pubsub](ocm_set_pubsub.md) — Set the pubsub spec for an ocm repository + diff --git a/docs/reference/ocm_set_pubsub.md b/docs/reference/ocm_set_pubsub.md new file mode 100644 index 0000000000..1244c5ce64 --- /dev/null +++ b/docs/reference/ocm_set_pubsub.md @@ -0,0 +1,50 @@ +## ocm set pubsub — Set The Pubsub Spec For An Ocm Repository + +### Synopsis + +``` +ocm set pubsub {} [] +``` + +##### Aliases + +``` +pubsub, ps +``` + +### Options + +``` + -d, --delete delete pub/sub configuration + -h, --help help for pubsub +``` + +### Description + + +A repository may be able to store a publish/subscribe specification +to propagate the creation or update of component versions. +If such an implementation is available this command can be used +to set the pub/sub specification for a repository. +If no specification is given an existing specification +will be removed for the given repository. +The specification +can be queried with the [ocm get pubsub](ocm_get_pubsub.md). +Types and specification formats are shown for the topic +[ocm ocm-pubsub](ocm_ocm-pubsub.md). + + +### SEE ALSO + +##### Parents + +* [ocm set](ocm_set.md) — Set information about OCM repositories +* [ocm](ocm.md) — Open Component Model command line client + + + +##### Additional Links + +* [ocm get pubsub](ocm_get_pubsub.md) — Get the pubsub spec for an ocm repository +* [ocm ocm-pubsub](ocm_ocm-pubsub.md) — List of all supported publish/subscribe implementations + diff --git a/go.mod b/go.mod index a30f78647b..77a6eed5a6 100644 --- a/go.mod +++ b/go.mod @@ -54,6 +54,7 @@ require ( github.com/opencontainers/go-digest v1.0.0 github.com/opencontainers/image-spec v1.1.0 github.com/pkg/errors v0.9.1 + github.com/redis/go-redis/v9 v9.5.1 github.com/sigstore/cosign/v2 v2.2.4 github.com/sigstore/rekor v1.3.6 github.com/sigstore/sigstore v1.8.4 @@ -158,6 +159,7 @@ require ( github.com/coreos/go-oidc/v3 v3.10.0 // indirect github.com/cyphar/filepath-securejoin v0.2.5 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/digitorus/pkcs7 v0.0.0-20230818184609-3a137a874352 // indirect github.com/digitorus/timestamp v0.0.0-20231217203849-220c5c2851b7 // indirect github.com/dimchansky/utfbom v1.1.1 // indirect diff --git a/go.sum b/go.sum index efa5124d8b..d00fabeabc 100644 --- a/go.sum +++ b/go.sum @@ -218,6 +218,10 @@ github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2y github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= github.com/bshuster-repo/logrus-logstash-hook v1.0.0 h1:e+C0SB5R1pu//O4MQ3f9cFuPGoOVeF2fE4Og9otCc70= github.com/bshuster-repo/logrus-logstash-hook v1.0.0/go.mod h1:zsTqEiSzDgAa/8GZR7E1qaXrhYNDKBYy5/dWPTIflbk= +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= github.com/bugsnag/bugsnag-go v1.0.5-0.20150529004307-13fd6b8acda0 h1:s7+5BfS4WFJoVF9pnB8kBk03S7pZXRdKamnV0FOl5Sc= github.com/bugsnag/bugsnag-go v1.0.5-0.20150529004307-13fd6b8acda0/go.mod h1:2oa8nejYd4cQ/b0hMIopN0lCRxU0bueqREvZLWFrtK8= github.com/bugsnag/osext v0.0.0-20130617224835-0dd3f918b21b h1:otBG+dV+YK+Soembjv71DPz3uX/V/6MMlSyD9JBQ6kQ= @@ -305,6 +309,8 @@ github.com/denisenkom/go-mssqldb v0.0.0-20191124224453-732737034ffd/go.mod h1:xb github.com/denisenkom/go-mssqldb v0.0.0-20191128021309-1d7a30a10f73/go.mod h1:xbL0rPBG9cCiLr28tMa8zpbdarY27NDyej4t/EjAShU= github.com/depcheck-test/depcheck-test v0.0.0-20220607135614-199033aaa936 h1:foGzavPWwtoyBvjWyKJYDYsyzy+23iBV7NKTwdk+LRY= github.com/depcheck-test/depcheck-test v0.0.0-20220607135614-199033aaa936/go.mod h1:ttKPnOepYt4LLzD+loXQ1rT6EmpyIYHro7TAJuIIlHo= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/digitorus/pkcs7 v0.0.0-20230713084857-e76b763bdc49/go.mod h1:SKVExuS+vpu2l9IoOc0RwqE7NYnb0JlcFHFnEJkVDzc= github.com/digitorus/pkcs7 v0.0.0-20230818184609-3a137a874352 h1:ge14PCmCvPjpMQMIAH7uKg0lrtNSOdpYsRXlwk3QbaE= github.com/digitorus/pkcs7 v0.0.0-20230818184609-3a137a874352/go.mod h1:SKVExuS+vpu2l9IoOc0RwqE7NYnb0JlcFHFnEJkVDzc= @@ -849,6 +855,8 @@ github.com/protocolbuffers/txtpbfmt v0.0.0-20231025115547-084445ff1adf h1:014O62 github.com/protocolbuffers/txtpbfmt v0.0.0-20231025115547-084445ff1adf/go.mod h1:jgxiZysxFPM+iWKwQwPR+y+Jvo54ARd4EisXxKYpB5c= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/redis/go-redis/v9 v9.5.1 h1:H1X4D3yHPaYrkL5X06Wh6xNVM/pX0Ft4RV0vMGvLBh8= +github.com/redis/go-redis/v9 v9.5.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= diff --git a/pkg/contexts/credentials/identity/hostpath/identity.go b/pkg/contexts/credentials/identity/hostpath/identity.go index 652e2b9c03..7cd128d78f 100644 --- a/pkg/contexts/credentials/identity/hostpath/identity.go +++ b/pkg/contexts/credentials/identity/hostpath/identity.go @@ -41,73 +41,78 @@ This matcher works on the following properties: var Matcher = IdentityMatcher("") -func IdentityMatcher(identityType string) cpi.IdentityMatcher { - return func(request, cur, id cpi.ConsumerIdentity) bool { - if request[ID_TYPE] != "" && request[ID_TYPE] != id[ID_TYPE] { - return false - } +func Match(identityType string, request, cur, id cpi.ConsumerIdentity) (match bool, better bool) { + if request[ID_TYPE] != "" && request[ID_TYPE] != id[ID_TYPE] { + return false, false + } - if identityType != "" && request[ID_TYPE] != "" && identityType != request[ID_TYPE] { - return false - } + if identityType != "" && request[ID_TYPE] != "" && identityType != request[ID_TYPE] { + return false, false + } - if request[ID_HOSTNAME] != "" && id[ID_HOSTNAME] != "" && request[ID_HOSTNAME] != id[ID_HOSTNAME] { - return false - } + if request[ID_HOSTNAME] != "" && id[ID_HOSTNAME] != "" && request[ID_HOSTNAME] != id[ID_HOSTNAME] { + return false, false + } - if request[ID_PORT] != "" { - if id[ID_PORT] != "" && id[ID_PORT] != request[ID_PORT] { - return false - } + if request[ID_PORT] != "" { + if id[ID_PORT] != "" && id[ID_PORT] != request[ID_PORT] { + return false, false } + } - if request[ID_SCHEME] != "" { - if id[ID_SCHEME] != "" && id[ID_SCHEME] != request[ID_SCHEME] { - return false - } + if request[ID_SCHEME] != "" { + if id[ID_SCHEME] != "" && id[ID_SCHEME] != request[ID_SCHEME] { + return false, false } + } - if request[ID_PATHPREFIX] != "" { - if id[ID_PATHPREFIX] != "" { - if len(id[ID_PATHPREFIX]) > len(request[ID_PATHPREFIX]) { - return false - } - pcomps := strings.Split(request[ID_PATHPREFIX], "/") - icomps := strings.Split(id[ID_PATHPREFIX], "/") - if len(icomps) > len(pcomps) { - return false - } - for i := range icomps { - if pcomps[i] != icomps[i] { - return false - } - } + if request[ID_PATHPREFIX] != "" { + if id[ID_PATHPREFIX] != "" { + if len(id[ID_PATHPREFIX]) > len(request[ID_PATHPREFIX]) { + return false, false } - } else { - if id[ID_PATHPREFIX] != "" { - return false + pcomps := strings.Split(request[ID_PATHPREFIX], "/") + icomps := strings.Split(id[ID_PATHPREFIX], "/") + if len(icomps) > len(pcomps) { + return false, false + } + for i := range icomps { + if pcomps[i] != icomps[i] { + return false, false + } } } - - // ok now it basically matches, check against current match - if len(cur) == 0 { - return true + } else { + if id[ID_PATHPREFIX] != "" { + return false, false } + } - if cur[ID_HOSTNAME] == "" && id[ID_HOSTNAME] != "" { - return true - } - if cur[ID_PORT] == "" && (id[ID_PORT] != "" && request[ID_PORT] != "") { - return true - } - if cur[ID_SCHEME] == "" && (id[ID_SCHEME] != "" && request[ID_SCHEME] != "") { - return true - } + // ok now it basically matches, check against current match + if len(cur) == 0 { + return true, true + } - if len(cur[ID_PATHPREFIX]) < len(id[ID_PATHPREFIX]) { - return true - } - return false + if cur[ID_HOSTNAME] == "" && id[ID_HOSTNAME] != "" { + return true, true + } + if cur[ID_PORT] == "" && (id[ID_PORT] != "" && request[ID_PORT] != "") { + return true, true + } + if cur[ID_SCHEME] == "" && (id[ID_SCHEME] != "" && request[ID_SCHEME] != "") { + return true, true + } + + if len(cur[ID_PATHPREFIX]) < len(id[ID_PATHPREFIX]) { + return true, true + } + return true, false +} + +func IdentityMatcher(identityType string) cpi.IdentityMatcher { + return func(request, cur, id cpi.ConsumerIdentity) bool { + _, better := Match(identityType, request, cur, id) + return better } } diff --git a/pkg/contexts/ocm/cpi/interface.go b/pkg/contexts/ocm/cpi/interface.go index 8b605836ea..efec373b53 100644 --- a/pkg/contexts/ocm/cpi/interface.go +++ b/pkg/contexts/ocm/cpi/interface.go @@ -185,6 +185,7 @@ func NewRawAccessSpecRef(data []byte, unmarshaler runtime.Unmarshaler) (*AccessS } const ( + KIND_REPOSITORY = internal.KIND_REPOSITORY KIND_COMPONENTVERSION = internal.KIND_COMPONENTVERSION KIND_RESOURCE = internal.KIND_RESOURCE KIND_SOURCE = internal.KIND_SOURCE diff --git a/pkg/contexts/ocm/cpi/repocpi/bridge_cv.go b/pkg/contexts/ocm/cpi/repocpi/bridge_cv.go index edc79f6c86..469bba0490 100644 --- a/pkg/contexts/ocm/cpi/repocpi/bridge_cv.go +++ b/pkg/contexts/ocm/cpi/repocpi/bridge_cv.go @@ -20,6 +20,7 @@ import ( "github.com/open-component-model/ocm/pkg/contexts/ocm/cpi" "github.com/open-component-model/ocm/pkg/contexts/ocm/cpi/accspeccpi" "github.com/open-component-model/ocm/pkg/contexts/ocm/internal" + "github.com/open-component-model/ocm/pkg/contexts/ocm/pubsub" "github.com/open-component-model/ocm/pkg/refmgmt" "github.com/open-component-model/ocm/pkg/refmgmt/resource" "github.com/open-component-model/ocm/pkg/runtimefinalizer" @@ -262,12 +263,17 @@ func (b *componentVersionAccessBridge) update(final bool) error { return nil } + pub, err := pubsub.PubSubForRepo(b.Repository()) + if err != nil { + return err + } + d := b.getDescriptor() opts := &cpi.BlobUploadOptions{ UseNoDefaultIfNotSet: optionutils.PointerTo(true), } - err := b.setupLocalBlobs("resource", b.composeAccess, d.Resources, true, opts) + err = b.setupLocalBlobs("resource", b.composeAccess, d.Resources, true, opts) if err == nil { err = b.setupLocalBlobs("source", b.composeAccess, d.Sources, true, opts) } @@ -280,6 +286,12 @@ func (b *componentVersionAccessBridge) update(final bool) error { return err } err = b.blobcache.Clear() + if pub != nil { + err := pub.NotifyComponentVersion(common.VersionedElementKey(b)) + if err != nil { + return err + } + } return err } diff --git a/pkg/contexts/ocm/cpi/repocpi/interface.go b/pkg/contexts/ocm/cpi/repocpi/interface.go new file mode 100644 index 0000000000..40ef3195d8 --- /dev/null +++ b/pkg/contexts/ocm/cpi/repocpi/interface.go @@ -0,0 +1,7 @@ +package repocpi + +import ( + "github.com/open-component-model/ocm/pkg/contexts/ocm/internal" +) + +type Repository = internal.Repository diff --git a/pkg/contexts/ocm/init.go b/pkg/contexts/ocm/init.go index 8686caeab5..2b08aef166 100644 --- a/pkg/contexts/ocm/init.go +++ b/pkg/contexts/ocm/init.go @@ -12,6 +12,8 @@ import ( _ "github.com/open-component-model/ocm/pkg/contexts/ocm/digester/digesters" _ "github.com/open-component-model/ocm/pkg/contexts/ocm/download/config" _ "github.com/open-component-model/ocm/pkg/contexts/ocm/download/handlers" + _ "github.com/open-component-model/ocm/pkg/contexts/ocm/pubsub/providers" + _ "github.com/open-component-model/ocm/pkg/contexts/ocm/pubsub/types" _ "github.com/open-component-model/ocm/pkg/contexts/ocm/repositories" _ "github.com/open-component-model/ocm/pkg/contexts/ocm/valuemergehandler/handlers" ) diff --git a/pkg/contexts/ocm/internal/errors.go b/pkg/contexts/ocm/internal/errors.go index ed08739d2a..e4d5a135fe 100644 --- a/pkg/contexts/ocm/internal/errors.go +++ b/pkg/contexts/ocm/internal/errors.go @@ -10,6 +10,7 @@ import ( ) const ( + KIND_REPOSITORY = "ocm repository" KIND_COMPONENT = errkind.KIND_COMPONENT KIND_COMPONENTVERSION = "component version" KIND_RESOURCE = "component resource" diff --git a/pkg/contexts/ocm/pubsub/attr.go b/pkg/contexts/ocm/pubsub/attr.go new file mode 100644 index 0000000000..34b170dd23 --- /dev/null +++ b/pkg/contexts/ocm/pubsub/attr.go @@ -0,0 +1,40 @@ +package pubsub + +import ( + "github.com/open-component-model/ocm/pkg/contexts/datacontext" + "github.com/open-component-model/ocm/pkg/contexts/ocm/cpi" +) + +const ATTR_PUBSUB_TYPES = "github.com/open-component-model/ocm/pkg/contexts/ocm/pubsub" + +type Attribute struct { + ProviderRegistry + TypeScheme +} + +func For(ctx cpi.ContextProvider) *Attribute { + if ctx == nil { + return &Attribute{ + ProviderRegistry: DefaultRegistry, + TypeScheme: DefaultTypeScheme, + } + } + return ctx.OCMContext().GetAttributes().GetOrCreateAttribute(ATTR_PUBSUB_TYPES, create).(*Attribute) +} + +func create(datacontext.Context) interface{} { + return &Attribute{ + ProviderRegistry: NewProviderRegistry(DefaultRegistry), + TypeScheme: NewTypeScheme(DefaultTypeScheme), + } +} + +func SetSchemeFor(ctx cpi.ContextProvider, registry TypeScheme) { + attr := For(ctx) + attr.TypeScheme = registry +} + +func SetProvidersFor(ctx cpi.ContextProvider, registry ProviderRegistry) { + attr := For(ctx) + attr.ProviderRegistry = registry +} diff --git a/pkg/contexts/ocm/pubsub/doc.go b/pkg/contexts/ocm/pubsub/doc.go new file mode 100644 index 0000000000..447188e109 --- /dev/null +++ b/pkg/contexts/ocm/pubsub/doc.go @@ -0,0 +1,30 @@ +// Package pubsub contains the +// handling required to connect OCM repositories to publish/subscribe +// infrastructures. +// A pubsub system is described by a dedicated specification (de-)serializable +// by a PubSubType object. The deserialized specification must implement the +// PubSubSpec interface. It has to be able to provide a pubsub adapter by +// providing an object implementing the PubSubMethod interface instantiated +// for a dedicated repository object. This object is then used by the +// OCM library (if provided) to generate appropriate events when adding/updating +// a component version for a repository. +// +// The known pubsub types can be registered for an OCM context. This registration +// mechanism uses a dedicated context attribute. +// The default type registry can be filled by init functions using the function +// RegisterType. +// +// The library allows to configure a pub/sub specification at repository level. +// Therefore, dedicated providers (interface Provider) are used, which are able +// to extract/provide a pubsub specification from/for a dedicated repository. +// The task of the provider is to handle the persistence of the serialized data +// of the specification at repository level. The provider just provides the +// specification data, it does not know anything about the types and implementations. +// +// Providers are registered at an OCM context for a dedicated type of repository. +// (the spec kind of the repository spec used to describe the repository). +// The default provider registry can be filled by init functions using the function +// RegisterProvider. +// +// To configure dedicated contexts, the contained registry objects can be modified on the attribute provided by For(ctx). +package pubsub diff --git a/pkg/contexts/ocm/pubsub/interface.go b/pkg/contexts/ocm/pubsub/interface.go new file mode 100644 index 0000000000..5ea017b9b7 --- /dev/null +++ b/pkg/contexts/ocm/pubsub/interface.go @@ -0,0 +1,225 @@ +package pubsub + +import ( + "encoding/json" + "fmt" + "slices" + "sync" + + "github.com/mandelsoft/goutils/errors" + "github.com/mandelsoft/goutils/general" + "github.com/mandelsoft/goutils/generics" + "github.com/mandelsoft/goutils/optionutils" + "github.com/modern-go/reflect2" + + "github.com/open-component-model/ocm/pkg/common" + "github.com/open-component-model/ocm/pkg/contexts/ocm/cpi" + "github.com/open-component-model/ocm/pkg/errkind" + "github.com/open-component-model/ocm/pkg/runtime" + "github.com/open-component-model/ocm/pkg/runtime/descriptivetype" +) + +const KIND_PUBSUBTYPE = "pub/sub" + +type Option = descriptivetype.Option + +func WithFormatSpec(fmt string) Option { + return descriptivetype.WithFormatSpec(fmt) +} + +func WithDesciption(desc string) Option { + return descriptivetype.WithDescription(desc) +} + +//////////////////////////////////////////////////////////////////////////////// + +type PubSubType descriptivetype.TypedObjectType[PubSubSpec] + +// PubSubSpec is the interface publish/subscribe specifications +// must fulfill. The main task is to map the specification +// to a concrete implementation of the pub/sub adapter +// which forwards events to the described system. +type PubSubSpec interface { + runtime.VersionedTypedObject + + PubSubMethod(repo cpi.Repository) (PubSubMethod, error) + Describe(ctx cpi.Context) string +} + +type ( + PubSubSpecDecoder = runtime.TypedObjectDecoder[PubSubSpec] + PubSubTypeProvider = runtime.KnownTypesProvider[PubSubSpec, PubSubType] +) + +// PubSubMethod is the handler able to publish +// an OCM component version event. +type PubSubMethod interface { + NotifyComponentVersion(version common.NameVersion) error +} + +// TypeScheme is the registry for specification types for +// PubSub types. A PubSub type is finally able to +// provide an implementation for notifying a dedicated +// PubSub instance. +type TypeScheme descriptivetype.TypeScheme[PubSubSpec, PubSubType] + +func NewTypeScheme(base ...TypeScheme) TypeScheme { + return descriptivetype.NewTypeScheme[PubSubSpec, PubSubType, TypeScheme]("PubSub type", nil, &UnknownPubSubSpec{}, false, base...) +} + +func NewStrictTypeScheme(base ...TypeScheme) runtime.VersionedTypeRegistry[PubSubSpec, PubSubType] { + return descriptivetype.NewTypeScheme[PubSubSpec, PubSubType, TypeScheme]("PubSub type", nil, &UnknownPubSubSpec{}, false, base...) +} + +// DefaultTypeScheme contains all globally known PubSub serializers. +var DefaultTypeScheme = NewTypeScheme() + +func RegisterType(atype PubSubType) { + DefaultTypeScheme.Register(atype) +} + +func CreatePubSubSpec(t runtime.TypedObject) (PubSubSpec, error) { + return DefaultTypeScheme.Convert(t) +} + +func NewPubSubType[I PubSubSpec](name string, opts ...Option) PubSubType { + t := descriptivetype.NewTypedObjectTypeObject[PubSubSpec](runtime.NewVersionedTypedObjectType[PubSubSpec, I](name)) + ta := descriptivetype.NewTypeObjectTarget[PubSubSpec](t) + optionutils.ApplyOptions[descriptivetype.OptionTarget](ta, opts...) + return t +} + +//////////////////////////////////////////////////////////////////////////////// + +type UnknownPubSubSpec struct { + runtime.UnstructuredVersionedTypedObject `json:",inline"` +} + +var ( + _ runtime.TypedObject = &UnknownPubSubSpec{} + _ runtime.Unknown = &UnknownPubSubSpec{} +) + +func (_ *UnknownPubSubSpec) IsUnknown() bool { + return true +} + +func (s *UnknownPubSubSpec) PubSubMethod(repository cpi.Repository) (PubSubMethod, error) { + return nil, errors.ErrUnknown(KIND_PUBSUBTYPE, s.GetType()) +} + +func (s *UnknownPubSubSpec) Describe(ctx cpi.Context) string { + return fmt.Sprintf("unknown PubSub specification type %q", s.GetType()) +} + +var _ PubSubSpec = &UnknownPubSubSpec{} + +//////////////////////////////////////////////////////////////////////////////// + +type Unwrapable interface { + Unwrap(ctx cpi.Context) []PubSubSpec +} + +type Evaluatable interface { + Evaluate(ctx cpi.Context) (PubSubSpec, error) +} + +//////////////////////////////////////////////////////////////////////////////// + +type GenericPubSubSpec struct { + runtime.UnstructuredVersionedTypedObject `json:",inline"` + + lock sync.Mutex + cached PubSubSpec + cachedData []byte +} + +var ( + _ PubSubSpec = &GenericPubSubSpec{} + _ Unwrapable = &GenericPubSubSpec{} + _ Evaluatable = &GenericPubSubSpec{} +) + +func ToGenericPubSubSpec(spec PubSubSpec) (*GenericPubSubSpec, error) { + if reflect2.IsNil(spec) { + return nil, nil + } + if g, ok := spec.(*GenericPubSubSpec); ok { + return g, nil + } + data, err := json.Marshal(spec) + if err != nil { + return nil, err + } + return newGenericPubSubSpec(data, runtime.DefaultJSONEncoding) +} + +func NewGenericPubSubSpec(data []byte, unmarshaler ...runtime.Unmarshaler) (PubSubSpec, error) { + return generics.CastPointerR[PubSubSpec](newGenericPubSubSpec(data, general.Optional(unmarshaler...))) +} + +func newGenericPubSubSpec(data []byte, unmarshaler runtime.Unmarshaler) (*GenericPubSubSpec, error) { + unstr := &runtime.UnstructuredVersionedTypedObject{} + if unmarshaler == nil { + unmarshaler = runtime.DefaultYAMLEncoding + } + err := unmarshaler.Unmarshal(data, unstr) + if err != nil { + return nil, err + } + return &GenericPubSubSpec{UnstructuredVersionedTypedObject: *unstr}, nil +} + +func (s *GenericPubSubSpec) Unwrap(ctx cpi.Context) []PubSubSpec { + eff, err := s.Evaluate(ctx) + if err != nil { + return nil + } + if u, ok := eff.(Unwrapable); ok { + return u.Unwrap(ctx) + } + return nil +} + +func (s *GenericPubSubSpec) Describe(ctx cpi.Context) string { + eff, err := s.Evaluate(ctx) + if err != nil { + return fmt.Sprintf("invalid access specification: %s", err.Error()) + } + return eff.Describe(ctx) +} + +func (s *GenericPubSubSpec) Evaluate(ctx cpi.Context) (PubSubSpec, error) { + s.lock.Lock() + defer s.lock.Unlock() + + if s.cached != nil && s.cachedData != nil { + if d, err := s.GetRaw(); err == nil { + if slices.Equal(d, s.cachedData) { + return s.cached, nil + } + } + s.cached = nil + s.cachedData = nil + } + raw, err := s.GetRaw() + if err != nil { + return nil, err + } + s.cached, err = For(ctx).TypeScheme.Decode(raw, runtime.DefaultJSONEncoding) + if err == nil { + s.cachedData = raw + } + return s.cached, err +} + +func (s *GenericPubSubSpec) PubSubMethod(repository cpi.Repository) (PubSubMethod, error) { + spec, err := s.Evaluate(repository.GetContext()) + if err != nil { + return nil, err + } + if _, ok := spec.(*GenericPubSubSpec); ok { + return nil, errors.ErrUnknown(errkind.KIND_ACCESSMETHOD, s.GetType()) + } + return spec.PubSubMethod(repository) +} diff --git a/pkg/contexts/ocm/pubsub/provider.go b/pkg/contexts/ocm/pubsub/provider.go new file mode 100644 index 0000000000..f6b18aedc8 --- /dev/null +++ b/pkg/contexts/ocm/pubsub/provider.go @@ -0,0 +1,99 @@ +package pubsub + +import ( + "sync" + + "github.com/mandelsoft/goutils/errors" + "github.com/mandelsoft/goutils/general" + "golang.org/x/exp/maps" + + "github.com/open-component-model/ocm/pkg/contexts/ocm/cpi" +) + +// ProviderRegistry holds handlers able to extract +// a PubSub specification for an OCM repository of a dedicated kind. +type ProviderRegistry interface { + Register(repoKind string, prov Provider) + KnownProviders() map[string]Provider + AddKnownProviders(registry ProviderRegistry) + + For(repo string) Provider +} + +var DefaultRegistry = NewProviderRegistry() + +func RegisterProvider(repokind string, prov Provider) { + DefaultRegistry.Register(repokind, prov) +} + +// A Provider is able to extract a pub sub configuration for +// an ocm repository (typically registered for a dedicated type of repository). +// It does not handle the pub sub system, but just the persistence of +// a pub sub specification configured for a dedicated type of repository. +type Provider interface { + GetPubSubSpec(repo cpi.Repository) (PubSubSpec, error) + SetPubSubSpec(repo cpi.Repository, spec PubSubSpec) error +} + +type NopProvider struct{} + +func (p NopProvider) GetPubSubSpec(repo cpi.Repository) (PubSubSpec, error) { + return nil, nil +} + +func (p NopProvider) SetPubSubSpec(repo cpi.Repository, spec PubSubSpec) error { + return errors.ErrNotSupported("pub/sub configuration") +} + +func NewProviderRegistry(base ...ProviderRegistry) ProviderRegistry { + return &providers{ + base: general.Optional(base...), + providers: map[string]Provider{}, + } +} + +type providers struct { + lock sync.Mutex + + base ProviderRegistry + providers map[string]Provider +} + +func (p *providers) Register(repoKind string, prov Provider) { + p.lock.Lock() + defer p.lock.Unlock() + + p.providers[repoKind] = prov +} + +func (p *providers) For(repo string) Provider { + p.lock.Lock() + defer p.lock.Unlock() + prov := p.providers[repo] + if prov != nil { + return prov + } + if p.base != nil { + return p.base.For(repo) + } + return nil +} + +func (p *providers) KnownProviders() map[string]Provider { + if p.base != nil { + m := p.base.KnownProviders() + for n, e := range p.providers { + if m[n] == nil { + m[n] = e + } + } + return m + } + return maps.Clone(p.providers) +} + +func (p *providers) AddKnownProviders(base ProviderRegistry) { + for n, e := range base.KnownProviders() { + p.providers[n] = e + } +} diff --git a/pkg/contexts/ocm/pubsub/providers/init.go b/pkg/contexts/ocm/pubsub/providers/init.go new file mode 100644 index 0000000000..8a5668e246 --- /dev/null +++ b/pkg/contexts/ocm/pubsub/providers/init.go @@ -0,0 +1,5 @@ +package providers + +import ( + _ "github.com/open-component-model/ocm/pkg/contexts/ocm/pubsub/providers/ocireg" +) diff --git a/pkg/contexts/ocm/pubsub/providers/ocireg/provider.go b/pkg/contexts/ocm/pubsub/providers/ocireg/provider.go new file mode 100644 index 0000000000..aab9f6a8e4 --- /dev/null +++ b/pkg/contexts/ocm/pubsub/providers/ocireg/provider.go @@ -0,0 +1,180 @@ +package ocireg + +import ( + "encoding/json" + "fmt" + "path" + + "github.com/mandelsoft/goutils/errors" + + "github.com/open-component-model/ocm/pkg/blobaccess/blobaccess" + "github.com/open-component-model/ocm/pkg/contexts/ocm/cpi" + "github.com/open-component-model/ocm/pkg/contexts/ocm/cpi/repocpi" + "github.com/open-component-model/ocm/pkg/contexts/ocm/pubsub" + compound2 "github.com/open-component-model/ocm/pkg/contexts/ocm/pubsub/types/compound" + "github.com/open-component-model/ocm/pkg/contexts/ocm/repositories/genericocireg" + "github.com/open-component-model/ocm/pkg/contexts/ocm/repositories/genericocireg/componentmapping" + "github.com/open-component-model/ocm/pkg/contexts/ocm/repositories/ocireg" +) + +const ( + ConfigMimeType = "application/vnd.ocm.software.repository.config.v1+json" + PubSubLayerMimeTye = "application/vnd.ocm.software.repository.config.pubsub.v1+json" +) + +const META = "meta" + +func init() { + pubsub.RegisterProvider(ocireg.Type, &Provider{}) +} + +type Provider struct{} + +var _ pubsub.Provider = (*Provider)(nil) + +func (p *Provider) GetPubSubSpec(repo repocpi.Repository) (pubsub.PubSubSpec, error) { + impl, err := repocpi.GetRepositoryImplementation(repo) + if err != nil { + return nil, err + } + gen, ok := impl.(*genericocireg.RepositoryImpl) + if !ok { + return nil, errors.ErrNotSupported("non-oci based ocm repository") + } + + ocirepo := path.Join(gen.Meta().SubPath, componentmapping.ComponentDescriptorNamespace) + acc, err := gen.OCIRepository().LookupArtifact(ocirepo, META) + if errors.IsErrNotFound(err) || errors.IsErrUnknown(err) { + return nil, nil + } + if err != nil { + return nil, errors.Wrapf(err, "cannot access meta data manifest version") + } + defer acc.Close() + m := acc.ManifestAccess() + if m == nil { + return nil, fmt.Errorf("meta data artifact is no manifest artifact") + } + if m.GetDescriptor().Config.MediaType != ConfigMimeType { + return nil, fmt.Errorf("meta data artifact has unexpected mime type %q", m.GetDescriptor().Config.MediaType) + } + compound, _ := compound2.New() + for _, l := range m.GetDescriptor().Layers { + if l.MediaType == PubSubLayerMimeTye { + var ps pubsub.GenericPubSubSpec + + blob, err := m.GetBlob(l.Digest) + if err != nil { + return nil, err + } + data, err := blob.Get() + blob.Close() + if err != nil { + return nil, err + } + err = json.Unmarshal(data, &ps) + if err != nil { + return nil, err + } + compound.Specifications = append(compound.Specifications, &ps) + } + } + return compound.Effective(), nil +} + +func (p *Provider) SetPubSubSpec(repo cpi.Repository, spec pubsub.PubSubSpec) error { + impl, err := repocpi.GetRepositoryImplementation(repo) + if err != nil { + return err + } + gen, ok := impl.(*genericocireg.RepositoryImpl) + if !ok { + return errors.ErrNotSupported("non-oci based ocm repository") + } + + var data []byte + if spec != nil { + data, err = json.Marshal(spec) + if err != nil { + return err + } + } + + ocirepo := path.Join(gen.Meta().SubPath, componentmapping.ComponentDescriptorNamespace) + ns, err := gen.OCIRepository().LookupNamespace(ocirepo) + if err != nil { + return err + } + defer ns.Close() + + acc, err := ns.GetArtifact(META) + if err != nil { + if errors.IsErrNotFound(err) || errors.IsErrUnknown(err) { + if spec == nil { + return nil + } + } else { + return err + } + } + if acc == nil { + acc, err = ns.NewArtifact() + if err != nil { + return err + } + m, err := acc.Manifest() + if err != nil { + return err + } + config := blobaccess.ForString(ConfigMimeType, "{}") + m.Config.MediaType = config.MimeType() + m.Config.Digest = config.Digest() + err = acc.AddBlob(config) + if err != nil { + return err + } + } + defer acc.Close() + + m := acc.ManifestAccess() + if m == nil { + return fmt.Errorf("meta data artifact is no manifest artifact") + } + if m.GetDescriptor().Config.MediaType != ConfigMimeType { + return fmt.Errorf("meta data artifact has unexpected mime type %q", m.GetDescriptor().Config.MediaType) + } + + blob := blobaccess.ForData(PubSubLayerMimeTye, data) + defer blob.Close() + + layers := m.GetDescriptor().Layers + for i := 0; i < len(layers); i++ { + l := layers[i] + if l.MediaType == PubSubLayerMimeTye { + if data != nil { + m.AddBlob(blob) + l.Digest = blob.Digest() + b, err := ns.AddArtifact(m, META) + if b != nil { + b.Close() + } + return err + } else { + layers = append(layers[:i], layers[i+1:]...) + i-- + } + } + } + m.GetDescriptor().Layers = layers + if data != nil { + _, err = m.AddLayer(blob, nil) + if err != nil { + return err + } + } + b, err := ns.AddArtifact(m, META) + if b != nil { + b.Close() + } + return err +} diff --git a/pkg/contexts/ocm/pubsub/providers/ocireg/provider_test.go b/pkg/contexts/ocm/pubsub/providers/ocireg/provider_test.go new file mode 100644 index 0000000000..9071854e86 --- /dev/null +++ b/pkg/contexts/ocm/pubsub/providers/ocireg/provider_test.go @@ -0,0 +1,99 @@ +package ocireg_test + +import ( + "fmt" + + . "github.com/mandelsoft/goutils/testutils" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + . "github.com/open-component-model/ocm/pkg/env/builder" + + "github.com/open-component-model/ocm/pkg/common/accessio" + ocictf "github.com/open-component-model/ocm/pkg/contexts/oci/repositories/ctf" + "github.com/open-component-model/ocm/pkg/contexts/ocm" + "github.com/open-component-model/ocm/pkg/contexts/ocm/pubsub" + "github.com/open-component-model/ocm/pkg/contexts/ocm/pubsub/providers/ocireg" + "github.com/open-component-model/ocm/pkg/contexts/ocm/repositories/ctf" + "github.com/open-component-model/ocm/pkg/contexts/ocm/repositories/genericocireg/componentmapping" + "github.com/open-component-model/ocm/pkg/runtime" +) + +const ARCH = "ctf" + +var _ = Describe("Test Environment", func() { + var env *Builder + var repo ocm.Repository + + BeforeEach(func() { + env = NewBuilder() + env.OCMCommonTransport(ARCH, accessio.FormatDirectory) + attr := pubsub.For(env) + attr.ProviderRegistry.Register(ctf.Type, &ocireg.Provider{}) + attr.TypeScheme.Register(pubsub.NewPubSubType[*Spec](Type)) + attr.TypeScheme.Register(pubsub.NewPubSubType[*Spec](TypeV1)) + + repo = Must(ctf.Open(env, ctf.ACC_WRITABLE, ARCH, 0o600, env)) + }) + + AfterEach(func() { + if repo != nil { + MustBeSuccessful(repo.Close()) + } + env.Cleanup() + }) + + It("set provider", func() { + MustBeSuccessful(pubsub.SetForRepo(repo, NewSpec("testtarget"))) + + repo.Close() + repo = nil + + ocirepo := Must(ocictf.Open(env, ctf.ACC_WRITABLE, ARCH, 0o600, env)) + defer Close(ocirepo, "ocirepo") + acc := Must(ocirepo.LookupArtifact(componentmapping.ComponentDescriptorNamespace, ocireg.META)) + defer Close(acc, "ociacc") + + m := acc.ManifestAccess() + Expect(len(m.GetDescriptor().Layers)).To(Equal(1)) + + b := Must(m.GetBlob(m.GetDescriptor().Layers[0].Digest)) + defer Close(b, "blob") + data := Must(b.Get()) + Expect(string(data)).To(Equal(`{"type":"test","target":"testtarget"}`)) + }) + + It("set/get provider", func() { + MustBeSuccessful(pubsub.SetForRepo(repo, NewSpec("testtarget"))) + + spec := Must(pubsub.SpecForRepo(repo)) + Expect(spec).To(YAMLEqual(`{"type":"test","target":"testtarget"}`)) + }) +}) + +//////////////////////////////////////////////////////////////////////////////// + +const ( + Type = "test" + TypeV1 = Type + runtime.VersionSeparator + "v1" +) + +type Spec struct { + runtime.ObjectVersionedType + Target string `json:"target"` +} + +var ( + _ pubsub.PubSubSpec = (*Spec)(nil) +) + +func NewSpec(target string) *Spec { + return &Spec{runtime.NewVersionedObjectType(Type), target} +} + +func (s *Spec) PubSubMethod(repo ocm.Repository) (pubsub.PubSubMethod, error) { + return nil, nil +} + +func (s *Spec) Describe(_ ocm.Context) string { + return fmt.Sprintf("test pubsub") +} diff --git a/pkg/contexts/ocm/pubsub/providers/ocireg/suite_test.go b/pkg/contexts/ocm/pubsub/providers/ocireg/suite_test.go new file mode 100644 index 0000000000..b21e9cfadc --- /dev/null +++ b/pkg/contexts/ocm/pubsub/providers/ocireg/suite_test.go @@ -0,0 +1,13 @@ +package ocireg_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestConfig(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "PubSub OCI Test Suite") +} diff --git a/pkg/contexts/ocm/pubsub/pubsub_test.go b/pkg/contexts/ocm/pubsub/pubsub_test.go new file mode 100644 index 0000000000..ff4e5bb15e --- /dev/null +++ b/pkg/contexts/ocm/pubsub/pubsub_test.go @@ -0,0 +1,167 @@ +package pubsub_test + +import ( + "encoding/json" + "fmt" + "sync" + + . "github.com/mandelsoft/goutils/testutils" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/mandelsoft/goutils/sliceutils" + + "github.com/open-component-model/ocm/pkg/common" + "github.com/open-component-model/ocm/pkg/contexts/datacontext" + "github.com/open-component-model/ocm/pkg/contexts/ocm" + "github.com/open-component-model/ocm/pkg/contexts/ocm/cpi" + "github.com/open-component-model/ocm/pkg/contexts/ocm/pubsub" + "github.com/open-component-model/ocm/pkg/contexts/ocm/pubsub/types/compound" + "github.com/open-component-model/ocm/pkg/contexts/ocm/repositories/composition" + "github.com/open-component-model/ocm/pkg/runtime" +) + +const COMP = "acme.org/component" +const VERS = "v1" + +// Provider always provides out test pub sub specification +type Provider struct { + lock sync.Mutex + settings map[string]pubsub.PubSubSpec + published sliceutils.Slice[common.NameVersion] +} + +var _ pubsub.Provider = (*Provider)(nil) + +func NewProvider() *Provider { + return &Provider{settings: map[string]pubsub.PubSubSpec{}} +} + +func (p *Provider) GetPubSubSpec(repo ocm.Repository) (pubsub.PubSubSpec, error) { + p.lock.Lock() + defer p.lock.Unlock() + + s := p.settings[key(repo)] + if s != nil { + p.set(repo.GetContext(), s) + } + return s, nil +} + +func (p *Provider) SetPubSubSpec(repo cpi.Repository, spec pubsub.PubSubSpec) error { + p.lock.Lock() + defer p.lock.Unlock() + + p.settings[key(repo)] = spec + return nil +} + +func (p *Provider) set(ctx cpi.Context, s pubsub.PubSubSpec) { + if e, ok := s.(pubsub.Evaluatable); ok { + eff, err := e.Evaluate(ctx) + if err == nil { + s = eff + } + } + + if m, ok := s.(*Spec); ok { + m.provider = p + } + if u, ok := s.(pubsub.Unwrapable); ok { + for _, n := range u.Unwrap(ctx) { + p.set(ctx, n) + } + } +} + +func key(repo cpi.Repository) string { + s := repo.GetSpecification() + d, err := json.Marshal(s) + if err != nil { + return "" + } + return string(d) +} + +const TYPE = "test" + +// Spec provides a pub sub adapter registering events at its provider. +type Spec struct { + runtime.ObjectVersionedType + provider *Provider +} + +var _ pubsub.PubSubSpec = (*Spec)(nil) + +func NewSpec() pubsub.PubSubSpec { + return &Spec{runtime.NewVersionedObjectType(TYPE), nil} +} + +func (s *Spec) PubSubMethod(repo ocm.Repository) (pubsub.PubSubMethod, error) { + return &Method{s.provider}, nil +} + +func (s *Spec) Describe(ctx ocm.Context) string { + return fmt.Sprintf("pub/sub spec of kind %q", s.GetKind()) +} + +// Method finally registers events at its provider. +type Method struct { + provider *Provider +} + +var _ pubsub.PubSubMethod = (*Method)(nil) + +func (m *Method) NotifyComponentVersion(version common.NameVersion) error { + m.provider.lock.Lock() + defer m.provider.lock.Unlock() + + m.provider.published.Add(version) + return nil +} + +var _ = Describe("Pub SubTest Environment", func() { + var ctx ocm.Context + var prov *Provider + + BeforeEach(func() { + ctx = ocm.New(datacontext.MODE_CONFIGURED) + prov = NewProvider() + // we register our test provider for repository type composition + pubsub.For(ctx).ProviderRegistry.Register(composition.Type, prov) + // now, we register ouŕ test pub sub type. + pubsub.For(ctx).TypeScheme.Register(pubsub.NewPubSubType[*Spec](TYPE)) + }) + + It("direct setting", func() { + repo := composition.NewRepository(ctx, "testrepo") + defer Close(repo) + + pubsub.SetForRepo(repo, NewSpec()) + + cv := composition.NewComponentVersion(ctx, COMP, VERS) + defer Close(cv) + + Expect(repo.GetSpecification().GetKind()).To(Equal(composition.Type)) + + Expect(prov.published).To(BeNil()) + MustBeSuccessful(repo.AddComponentVersion(cv)) + Expect(prov.published).To(ConsistOf(common.VersionedElementKey(cv))) + }) + + It("indirect setting", func() { + repo := composition.NewRepository(ctx, "testrepo") + defer Close(repo) + + pubsub.SetForRepo(repo, Must(compound.New(NewSpec()))) + + cv := composition.NewComponentVersion(ctx, COMP, VERS) + defer Close(cv) + + Expect(repo.GetSpecification().GetKind()).To(Equal(composition.Type)) + + Expect(prov.published).To(BeNil()) + MustBeSuccessful(repo.AddComponentVersion(cv)) + Expect(prov.published).To(ConsistOf(common.VersionedElementKey(cv))) + }) +}) diff --git a/pkg/contexts/ocm/pubsub/setup.go b/pkg/contexts/ocm/pubsub/setup.go new file mode 100644 index 0000000000..db59213355 --- /dev/null +++ b/pkg/contexts/ocm/pubsub/setup.go @@ -0,0 +1,34 @@ +package pubsub + +import ( + "github.com/open-component-model/ocm/pkg/contexts/datacontext" + "github.com/open-component-model/ocm/pkg/contexts/ocm/cpi" +) + +func init() { + datacontext.RegisterSetupHandler(datacontext.SetupHandlerFunction(setupContext)) +} + +func setupContext(mode datacontext.BuilderMode, ctx datacontext.Context) { + if octx, ok := ctx.(cpi.Context); ok { + switch mode { + case datacontext.MODE_SHARED: + fallthrough + case datacontext.MODE_DEFAULTED: + // do nothing, fallback to the default attribute lookup + case datacontext.MODE_EXTENDED: + SetSchemeFor(octx, NewTypeScheme(DefaultTypeScheme)) + SetProvidersFor(octx, NewProviderRegistry(DefaultRegistry)) + case datacontext.MODE_CONFIGURED: + s := NewTypeScheme(nil) + s.AddKnownTypes(DefaultTypeScheme) + SetSchemeFor(octx, s) + r := NewProviderRegistry(nil) + r.AddKnownProviders(DefaultRegistry) + SetProvidersFor(octx, r) + case datacontext.MODE_INITIAL: + SetSchemeFor(octx, NewTypeScheme()) + SetProvidersFor(octx, NewProviderRegistry()) + } + } +} diff --git a/pkg/contexts/ocm/pubsub/suite_test.go b/pkg/contexts/ocm/pubsub/suite_test.go new file mode 100644 index 0000000000..9a66dc494b --- /dev/null +++ b/pkg/contexts/ocm/pubsub/suite_test.go @@ -0,0 +1,13 @@ +package pubsub_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestConfig(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "OCM Core Test Suite") +} diff --git a/pkg/contexts/ocm/pubsub/types/compound/type.go b/pkg/contexts/ocm/pubsub/types/compound/type.go new file mode 100644 index 0000000000..31df9279f4 --- /dev/null +++ b/pkg/contexts/ocm/pubsub/types/compound/type.go @@ -0,0 +1,102 @@ +package compound + +import ( + "fmt" + + "github.com/mandelsoft/goutils/errors" + "github.com/mandelsoft/goutils/sliceutils" + + "github.com/open-component-model/ocm/pkg/common" + "github.com/open-component-model/ocm/pkg/contexts/ocm/cpi" + "github.com/open-component-model/ocm/pkg/contexts/ocm/pubsub" + "github.com/open-component-model/ocm/pkg/runtime" +) + +const ( + Type = "compound" + TypeV1 = Type + runtime.VersionSeparator + "v1" +) + +func init() { + pubsub.RegisterType(pubsub.NewPubSubType[*Spec](Type, + pubsub.WithDesciption("a pub/sub system forwarding events to described sub-level systems."))) + pubsub.RegisterType(pubsub.NewPubSubType[*Spec](TypeV1, + pubsub.WithFormatSpec(`It is describe by the following field: + +- **specifications** *list of pubsub specs* + + A list of nested sub-level specifications the events should be + forwarded to. +`))) +} + +// Spec provides a pub sub adapter registering events at its provider. +type Spec struct { + runtime.ObjectVersionedType + Specifications []*pubsub.GenericPubSubSpec `json:"specifications,omitempty"` +} + +var ( + _ pubsub.PubSubSpec = (*Spec)(nil) + _ pubsub.Unwrapable = (*Spec)(nil) +) + +func New(specs ...pubsub.PubSubSpec) (*Spec, error) { + var gen []*pubsub.GenericPubSubSpec + + for _, s := range specs { + g, err := pubsub.ToGenericPubSubSpec(s) + if err != nil { + return nil, err + } + gen = append(gen, g) + } + return &Spec{runtime.NewVersionedObjectType(Type), gen}, nil +} + +func (s *Spec) PubSubMethod(repo cpi.Repository) (pubsub.PubSubMethod, error) { + var meths []pubsub.PubSubMethod + + for _, e := range s.Specifications { + m, err := e.PubSubMethod(repo) + if err != nil { + return nil, err + } + meths = append(meths, m) + } + return &Method{meths}, nil +} + +func (s *Spec) Unwrap(ctx cpi.Context) []pubsub.PubSubSpec { + return sliceutils.Convert[pubsub.PubSubSpec](s.Specifications) +} + +func (s *Spec) Describe(_ cpi.Context) string { + return fmt.Sprintf("compound pub/sub specification with %d entries", len(s.Specifications)) +} + +func (s *Spec) Effective() pubsub.PubSubSpec { + switch len(s.Specifications) { + case 0: + return nil + case 1: + return s.Specifications[0] + default: + return s + } +} + +// Method finally registers events at contained methods. +type Method struct { + meths []pubsub.PubSubMethod +} + +var _ pubsub.PubSubMethod = (*Method)(nil) + +func (m *Method) NotifyComponentVersion(version common.NameVersion) error { + list := errors.ErrList() + for _, m := range m.meths { + list.Add(m.NotifyComponentVersion(version)) + } + return list.Result() +} diff --git a/pkg/contexts/ocm/pubsub/types/init.go b/pkg/contexts/ocm/pubsub/types/init.go new file mode 100644 index 0000000000..f1720a9d5b --- /dev/null +++ b/pkg/contexts/ocm/pubsub/types/init.go @@ -0,0 +1,6 @@ +package types + +import ( + _ "github.com/open-component-model/ocm/pkg/contexts/ocm/pubsub/types/compound" + _ "github.com/open-component-model/ocm/pkg/contexts/ocm/pubsub/types/redis" +) diff --git a/pkg/contexts/ocm/pubsub/types/redis/identity/identity.go b/pkg/contexts/ocm/pubsub/types/redis/identity/identity.go new file mode 100644 index 0000000000..82041233c9 --- /dev/null +++ b/pkg/contexts/ocm/pubsub/types/redis/identity/identity.go @@ -0,0 +1,120 @@ +package identity + +import ( + "fmt" + "strconv" + "strings" + + "github.com/mandelsoft/goutils/errors" + + "github.com/open-component-model/ocm/pkg/contexts/credentials/cpi" + "github.com/open-component-model/ocm/pkg/contexts/credentials/identity/hostpath" + "github.com/open-component-model/ocm/pkg/listformat" +) + +const CONSUMER_TYPE = "Github" + +// identity properties. +const ( + ID_HOSTNAME = hostpath.ID_HOSTNAME + ID_PORT = hostpath.ID_PORT + ID_PATHPREFIX = hostpath.ID_PATHPREFIX + ID_CHANNEL = "channel" + ID_DATABASE = "database" +) + +// credential properties. +const ( + ATTR_USERNAME = cpi.ATTR_USERNAME + ATTR_PASSWORD = cpi.ATTR_PASSWORD +) + +func IdentityMatcher(request, cur, id cpi.ConsumerIdentity) bool { + match, better := hostpath.Match(CONSUMER_TYPE, request, cur, id) + if !match { + return false + } + + if request[ID_CHANNEL] != "" { + if id[ID_CHANNEL] != "" && id[ID_CHANNEL] != request[ID_CHANNEL] { + return false + } + } + if request[ID_DATABASE] != "" { + if id[ID_DATABASE] != "" && id[ID_DATABASE] != request[ID_DATABASE] { + return false + } + } + + // ok now it basically matches, check against current match + + if cur[ID_CHANNEL] == "" && request[ID_CHANNEL] != "" { + return true + } + if cur[ID_DATABASE] == "" && request[ID_DATABASE] != "" { + return true + } + return better +} + +func init() { + attrs := listformat.FormatListElements("", listformat.StringElementDescriptionList{ + ATTR_USERNAME, "Redis username", + ATTR_PASSWORD, "Redis password", + }) + cpi.RegisterStandardIdentity(CONSUMER_TYPE, IdentityMatcher, + `Redis PubSub credential matcher + +This matcher is a hostpath matcher with additional attributes: + +- *`+ID_CHANNEL+`* (required if set in pattern): the channel name +- *`+ID_DATABASE+`* the database number +`, + attrs) +} + +func PATCredentials(user, pass string) cpi.Credentials { + return cpi.DirectCredentials{ + ATTR_USERNAME: user, + ATTR_PASSWORD: pass, + } +} + +func GetConsumerId(serveraddr string, channel string, db int) cpi.ConsumerIdentity { + p := "" + host, port, err := ParseAddress(serveraddr) + if err == nil { + host = serveraddr + } + + id := cpi.ConsumerIdentity{ + cpi.ID_TYPE: CONSUMER_TYPE, + ID_HOSTNAME: host, + ID_CHANNEL: channel, + ID_DATABASE: fmt.Sprintf("%d", db), + } + if port != 0 { + id[ID_PORT] = fmt.Sprintf("%d", port) + } + if p != "" { + id[ID_PATHPREFIX] = p + } + return id +} + +func GetCredentials(ctx cpi.ContextProvider, serverurl string, channel string, db int) (cpi.Credentials, error) { + id := GetConsumerId(serverurl, channel, db) + return cpi.CredentialsForConsumer(ctx.CredentialsContext(), id, IdentityMatcher) +} + +func ParseAddress(addr string) (string, int, error) { + idx := strings.Index(addr, ":") + if idx < 0 { + return addr, 6379, nil + } + p, err := strconv.ParseInt(addr[idx+1:], 10, 32) + if err != nil { + return "", 0, errors.Wrapf(err, "invalid port in redis address") + } + return addr[:idx], int(p), nil +} diff --git a/pkg/contexts/ocm/pubsub/types/redis/redis_test.go b/pkg/contexts/ocm/pubsub/types/redis/redis_test.go new file mode 100644 index 0000000000..45b81597c8 --- /dev/null +++ b/pkg/contexts/ocm/pubsub/types/redis/redis_test.go @@ -0,0 +1,64 @@ +//go:build redis_test + +package redis_test + +import ( + . "github.com/mandelsoft/goutils/testutils" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/open-component-model/ocm/pkg/common" + "github.com/open-component-model/ocm/pkg/contexts/credentials" + . "github.com/open-component-model/ocm/pkg/env/builder" + + "github.com/open-component-model/ocm/pkg/common/accessio" + "github.com/open-component-model/ocm/pkg/contexts/ocm" + "github.com/open-component-model/ocm/pkg/contexts/ocm/pubsub" + "github.com/open-component-model/ocm/pkg/contexts/ocm/pubsub/providers/ocireg" + "github.com/open-component-model/ocm/pkg/contexts/ocm/pubsub/types/redis" + "github.com/open-component-model/ocm/pkg/contexts/ocm/pubsub/types/redis/identity" + "github.com/open-component-model/ocm/pkg/contexts/ocm/repositories/composition" + "github.com/open-component-model/ocm/pkg/contexts/ocm/repositories/ctf" +) + +const ARCH = "ctf" +const COMP = "acme.org/component" +const VERS = "v1" + +var _ = Describe("Test Environment", func() { + var env *Builder + var repo ocm.Repository + + BeforeEach(func() { + + env = NewBuilder() + env.OCMCommonTransport(ARCH, accessio.FormatDirectory) + attr := pubsub.For(env) + attr.ProviderRegistry.Register(ctf.Type, &ocireg.Provider{}) + + env.CredentialsContext().SetCredentialsForConsumer( + identity.GetConsumerId("localhost:6379", "ocm", 0), + credentials.NewCredentials(common.Properties{identity.ATTR_PASSWORD: "redis-test-0815"}), + ) + + repo = Must(ctf.Open(env, ctf.ACC_WRITABLE, ARCH, 0o600, env)) + }) + + AfterEach(func() { + if repo != nil { + MustBeSuccessful(repo.Close()) + } + env.Cleanup() + }) + + Context("local redis server", func() { + It("tests local server", func() { + MustBeSuccessful(pubsub.SetForRepo(repo, Must(redis.New("localhost:6379", "ocm", 0)))) + + cv := composition.NewComponentVersion(env, COMP, VERS) + defer Close(cv) + + Expect(repo.GetSpecification().GetKind()).To(Equal(ctf.Type)) + MustBeSuccessful(repo.AddComponentVersion(cv)) + }) + }) +}) diff --git a/pkg/contexts/ocm/pubsub/types/redis/suite_test.go b/pkg/contexts/ocm/pubsub/types/redis/suite_test.go new file mode 100644 index 0000000000..f2f9f7d8d5 --- /dev/null +++ b/pkg/contexts/ocm/pubsub/types/redis/suite_test.go @@ -0,0 +1,13 @@ +package redis_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestConfig(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Redis PubSubTest Suite") +} diff --git a/pkg/contexts/ocm/pubsub/types/redis/type.go b/pkg/contexts/ocm/pubsub/types/redis/type.go new file mode 100644 index 0000000000..9809ce077d --- /dev/null +++ b/pkg/contexts/ocm/pubsub/types/redis/type.go @@ -0,0 +1,95 @@ +package redis + +import ( + "context" + "fmt" + + "github.com/redis/go-redis/v9" + + "github.com/open-component-model/ocm/pkg/common" + credcpi "github.com/open-component-model/ocm/pkg/contexts/credentials/cpi" + "github.com/open-component-model/ocm/pkg/contexts/ocm/cpi" + "github.com/open-component-model/ocm/pkg/contexts/ocm/pubsub" + "github.com/open-component-model/ocm/pkg/contexts/ocm/pubsub/types/redis/identity" + "github.com/open-component-model/ocm/pkg/runtime" +) + +const ( + Type = "redis" + TypeV1 = Type + runtime.VersionSeparator + "v1" +) + +func init() { + pubsub.RegisterType(pubsub.NewPubSubType[*Spec](Type, + pubsub.WithDesciption("a redis pubsub sytsem."))) + pubsub.RegisterType(pubsub.NewPubSubType[*Spec](TypeV1, + pubsub.WithFormatSpec(`It is describe by the following field: + +- **serverAddr** *Address of redis server* +- **channel** *pubsub channel* +- **database** *database number* + + Publishing using the redis pubsub API. For every change a string message + with the format : is published. If multiple repositories + should be used, each repository should be configured with a different + channel. +`))) +} + +// Spec provides a pub sub adapter registering events at its provider. +type Spec struct { + runtime.ObjectVersionedType + ServerAddr string `json:"serverAddr"` + Channel string `json:"channel"` + Database int `json:"database"` +} + +var _ pubsub.PubSubSpec = (*Spec)(nil) + +func New(serverurl, channel string, db int) (*Spec, error) { + return &Spec{ + runtime.NewVersionedObjectType(Type), + serverurl, channel, db, + }, nil +} + +func (s *Spec) PubSubMethod(repo cpi.Repository) (pubsub.PubSubMethod, error) { + _, _, err := identity.ParseAddress(s.ServerAddr) + if err != nil { + return nil, err + } + + creds, err := identity.GetCredentials(repo.GetContext(), s.ServerAddr, s.Channel, s.Database) + if err != nil { + return nil, err + } + return &Method{s, creds}, nil +} + +func (s *Spec) Describe(_ cpi.Context) string { + return fmt.Sprintf("redis pubsub system %s channel %s, database %d", s.ServerAddr, s.Channel, s.Database) +} + +// Method finally publishes events. +type Method struct { + spec *Spec + creds credcpi.Credentials +} + +var _ pubsub.PubSubMethod = (*Method)(nil) + +func (m *Method) NotifyComponentVersion(version common.NameVersion) error { + // TODO: ipdate to credential provider interface + opts := &redis.Options{ + Addr: m.spec.ServerAddr, + DB: m.spec.Database, + } + if m.creds != nil { + opts.Username = m.creds.GetProperty(identity.ATTR_USERNAME) + opts.Password = m.creds.GetProperty(identity.ATTR_PASSWORD) + } + + rdb := redis.NewClient(opts) + defer rdb.Close() + return rdb.Publish(context.Background(), m.spec.Channel, version.String()).Err() +} diff --git a/pkg/contexts/ocm/pubsub/utils.go b/pkg/contexts/ocm/pubsub/utils.go new file mode 100644 index 0000000000..671b8c9068 --- /dev/null +++ b/pkg/contexts/ocm/pubsub/utils.go @@ -0,0 +1,68 @@ +package pubsub + +import ( + "github.com/mandelsoft/goutils/errors" + "github.com/mandelsoft/goutils/maputils" + + "github.com/open-component-model/ocm/pkg/common" + "github.com/open-component-model/ocm/pkg/contexts/ocm/cpi" + "github.com/open-component-model/ocm/pkg/listformat" + "github.com/open-component-model/ocm/pkg/runtime" +) + +func SetForRepo(repo cpi.Repository, spec PubSubSpec) error { + prov := For(repo.GetContext()).For(repo.GetSpecification().GetKind()) + if prov != nil { + return prov.SetPubSubSpec(repo, spec) + } + return errors.ErrNotSupported("pub/sub config") +} + +func SpecForRepo(repo cpi.Repository) (PubSubSpec, error) { + prov := For(repo.GetContext()).For(repo.GetSpecification().GetKind()) + if prov != nil { + return prov.GetPubSubSpec(repo) + } + return nil, nil +} + +func SpecForData(ctx cpi.ContextProvider, data []byte) (PubSubSpec, error) { + return For(ctx).TypeScheme.Decode(data, runtime.DefaultYAMLEncoding) +} + +func PubSubForRepo(repo cpi.Repository) (PubSubMethod, error) { + spec, err := SpecForRepo(repo) + if spec == nil || err != nil { + return nil, err + } + return spec.PubSubMethod(repo) +} + +func Notify(repo cpi.Repository, nv common.NameVersion) error { + m, err := PubSubForRepo(repo) + if m == nil || err != nil { + return err + } + return m.NotifyComponentVersion(nv) +} + +func PubSubUsage(scheme TypeScheme, providers ProviderRegistry, cli bool) string { + s := ` +The following list describes the supported publish/subscribe system types, their +specification versions, and formats: +` + if len(scheme.KnownTypes()) == 0 { + s += "There are currently no known pub/sub types!" + } else { + s += scheme.Describe() + } + + list := maputils.OrderedKeys(providers.KnownProviders()) + if len(list) == 0 { + s += "There are currently no persistence providers!" + } else { + s += "There are persistence providers for the following repository types:\n" + s += listformat.FormatList("", list...) + } + return s +} diff --git a/pkg/contexts/ocm/repositories/composition/repository.go b/pkg/contexts/ocm/repositories/composition/repository.go index fa1aa08c15..f26756a33c 100644 --- a/pkg/contexts/ocm/repositories/composition/repository.go +++ b/pkg/contexts/ocm/repositories/composition/repository.go @@ -32,7 +32,7 @@ func NewRepository(ctxp cpi.ContextProvider, names ...string) cpi.Repository { return repo } } - repo := virtual.NewRepository(ctx, NewAccess()) + repo := virtual.NewRepository(ctx, NewAccess(name)) if repositories != nil { repositories.SetRepository(name, repo) repo, _ = repo.Dup() @@ -44,6 +44,7 @@ type Index = virtual.Index[common.NameVersion] type Access struct { lock sync.Mutex + name string index *Index blobs map[string]blobaccess.BlobAccess readonly bool @@ -51,13 +52,18 @@ type Access struct { var _ virtual.Access = (*Access)(nil) -func NewAccess() *Access { +func NewAccess(name string) *Access { return &Access{ + name: name, index: virtual.NewIndex[common.NameVersion](), blobs: map[string]blobaccess.BlobAccess{}, } } +func (a *Access) GetSpecification() cpi.RepositorySpec { + return NewRepositorySpec(a.name) +} + func (a *Access) IsReadOnly() bool { return a.readonly } diff --git a/pkg/contexts/ocm/repositories/composition/repository_test.go b/pkg/contexts/ocm/repositories/composition/repository_test.go index 52a5d2ac6d..9c2a6c638a 100644 --- a/pkg/contexts/ocm/repositories/composition/repository_test.go +++ b/pkg/contexts/ocm/repositories/composition/repository_test.go @@ -40,6 +40,8 @@ var _ = Describe("repository", func() { repo := me.NewRepository(ctx) finalize.Close(repo, "source repo") + Expect(repo.GetSpecification().GetKind()).To(Equal(me.Type)) + c := Must(repo.LookupComponent(COMPONENT)) finalize.Close(c, "src comp") diff --git a/pkg/contexts/ocm/repositories/composition/type.go b/pkg/contexts/ocm/repositories/composition/type.go index 8e511a871c..a8abdc3f56 100644 --- a/pkg/contexts/ocm/repositories/composition/type.go +++ b/pkg/contexts/ocm/repositories/composition/type.go @@ -22,6 +22,8 @@ type RepositorySpec struct { Name string `json:"name"` } +var _ cpi.RepositorySpec = (*RepositorySpec)(nil) + func NewRepositorySpec(name string) *RepositorySpec { return &RepositorySpec{ ObjectVersionedTypedObject: runtime.NewVersionedTypedObject(Type), diff --git a/pkg/contexts/ocm/repositories/genericocireg/repository.go b/pkg/contexts/ocm/repositories/genericocireg/repository.go index 574442ff80..e616521efe 100644 --- a/pkg/contexts/ocm/repositories/genericocireg/repository.go +++ b/pkg/contexts/ocm/repositories/genericocireg/repository.go @@ -105,6 +105,10 @@ func (r *RepositoryImpl) OCIRepository() ocicpi.Repository { return r.ocirepo } +func (r *RepositoryImpl) Meta() ComponentRepositoryMeta { + return r.meta +} + func (r *RepositoryImpl) GetSpecification() cpi.RepositorySpec { return &RepositorySpec{ RepositorySpec: r.ocirepo.GetSpecification(),