From 870cfe9310a23e9bae6005ba736ccda276b257c7 Mon Sep 17 00:00:00 2001 From: CloudQuery Bot <102256036+cq-bot@users.noreply.github.com> Date: Tue, 30 Jul 2024 04:15:58 -0400 Subject: [PATCH] fix: Generate Go Code from `plugin-pb` (#373) Co-authored-by: cq-bot --- pb/destination/v0/destination_grpc.pb.go | 147 ++++---------- pb/destination/v1/destination_grpc.pb.go | 86 +++------ pb/discovery/v0/discovery_grpc.pb.go | 25 ++- pb/discovery/v1/discovery_grpc.pb.go | 25 ++- pb/plugin/v3/plugin_grpc.pb.go | 232 ++++++----------------- pb/source/v0/source_grpc.pb.go | 113 ++++------- pb/source/v1/source_grpc.pb.go | 69 +++---- pb/source/v2/source_grpc.pb.go | 69 +++---- 8 files changed, 254 insertions(+), 512 deletions(-) diff --git a/pb/destination/v0/destination_grpc.pb.go b/pb/destination/v0/destination_grpc.pb.go index e631bc1..e2c0cb2 100644 --- a/pb/destination/v0/destination_grpc.pb.go +++ b/pb/destination/v0/destination_grpc.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: -// - protoc-gen-go-grpc v1.4.0 +// - protoc-gen-go-grpc v1.5.1 // - protoc v4.23.4 // source: plugin-pb/destination/v0/destination.proto @@ -16,8 +16,8 @@ import ( // This is a compile-time assertion to ensure that this generated file // is compatible with the grpc package it is being compiled against. -// Requires gRPC-Go v1.62.0 or later. -const _ = grpc.SupportPackageIsVersion8 +// Requires gRPC-Go v1.64.0 or later. +const _ = grpc.SupportPackageIsVersion9 const ( Destination_GetProtocolVersion_FullMethodName = "/proto.Destination/GetProtocolVersion" @@ -49,9 +49,9 @@ type DestinationClient interface { // Migrate tables to the given plugin version Migrate(ctx context.Context, in *Migrate_Request, opts ...grpc.CallOption) (*Migrate_Response, error) // Write resources - Write(ctx context.Context, opts ...grpc.CallOption) (Destination_WriteClient, error) + Write(ctx context.Context, opts ...grpc.CallOption) (grpc.ClientStreamingClient[Write_Request, Write_Response], error) // Write2 resources - Write2(ctx context.Context, opts ...grpc.CallOption) (Destination_Write2Client, error) + Write2(ctx context.Context, opts ...grpc.CallOption) (grpc.ClientStreamingClient[Write2_Request, Write2_Response], error) // Send signal to flush and close open connections Close(ctx context.Context, in *Close_Request, opts ...grpc.CallOption) (*Close_Response, error) // DeleteStale deletes stale data that was inserted by a given source @@ -119,75 +119,31 @@ func (c *destinationClient) Migrate(ctx context.Context, in *Migrate_Request, op return out, nil } -func (c *destinationClient) Write(ctx context.Context, opts ...grpc.CallOption) (Destination_WriteClient, error) { +func (c *destinationClient) Write(ctx context.Context, opts ...grpc.CallOption) (grpc.ClientStreamingClient[Write_Request, Write_Response], error) { cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) stream, err := c.cc.NewStream(ctx, &Destination_ServiceDesc.Streams[0], Destination_Write_FullMethodName, cOpts...) if err != nil { return nil, err } - x := &destinationWriteClient{ClientStream: stream} + x := &grpc.GenericClientStream[Write_Request, Write_Response]{ClientStream: stream} return x, nil } -type Destination_WriteClient interface { - Send(*Write_Request) error - CloseAndRecv() (*Write_Response, error) - grpc.ClientStream -} - -type destinationWriteClient struct { - grpc.ClientStream -} - -func (x *destinationWriteClient) Send(m *Write_Request) error { - return x.ClientStream.SendMsg(m) -} - -func (x *destinationWriteClient) CloseAndRecv() (*Write_Response, error) { - if err := x.ClientStream.CloseSend(); err != nil { - return nil, err - } - m := new(Write_Response) - if err := x.ClientStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type Destination_WriteClient = grpc.ClientStreamingClient[Write_Request, Write_Response] -func (c *destinationClient) Write2(ctx context.Context, opts ...grpc.CallOption) (Destination_Write2Client, error) { +func (c *destinationClient) Write2(ctx context.Context, opts ...grpc.CallOption) (grpc.ClientStreamingClient[Write2_Request, Write2_Response], error) { cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) stream, err := c.cc.NewStream(ctx, &Destination_ServiceDesc.Streams[1], Destination_Write2_FullMethodName, cOpts...) if err != nil { return nil, err } - x := &destinationWrite2Client{ClientStream: stream} + x := &grpc.GenericClientStream[Write2_Request, Write2_Response]{ClientStream: stream} return x, nil } -type Destination_Write2Client interface { - Send(*Write2_Request) error - CloseAndRecv() (*Write2_Response, error) - grpc.ClientStream -} - -type destinationWrite2Client struct { - grpc.ClientStream -} - -func (x *destinationWrite2Client) Send(m *Write2_Request) error { - return x.ClientStream.SendMsg(m) -} - -func (x *destinationWrite2Client) CloseAndRecv() (*Write2_Response, error) { - if err := x.ClientStream.CloseSend(); err != nil { - return nil, err - } - m := new(Write2_Response) - if err := x.ClientStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type Destination_Write2Client = grpc.ClientStreamingClient[Write2_Request, Write2_Response] func (c *destinationClient) Close(ctx context.Context, in *Close_Request, opts ...grpc.CallOption) (*Close_Response, error) { cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) @@ -221,7 +177,7 @@ func (c *destinationClient) GetMetrics(ctx context.Context, in *GetDestinationMe // DestinationServer is the server API for Destination service. // All implementations must embed UnimplementedDestinationServer -// for forward compatibility +// for forward compatibility. type DestinationServer interface { // Get the current protocol version of the plugin. This helps // get the right message about upgrade/downgrade of cli and/or plugin. @@ -236,9 +192,9 @@ type DestinationServer interface { // Migrate tables to the given plugin version Migrate(context.Context, *Migrate_Request) (*Migrate_Response, error) // Write resources - Write(Destination_WriteServer) error + Write(grpc.ClientStreamingServer[Write_Request, Write_Response]) error // Write2 resources - Write2(Destination_Write2Server) error + Write2(grpc.ClientStreamingServer[Write2_Request, Write2_Response]) error // Send signal to flush and close open connections Close(context.Context, *Close_Request) (*Close_Response, error) // DeleteStale deletes stale data that was inserted by a given source @@ -249,9 +205,12 @@ type DestinationServer interface { mustEmbedUnimplementedDestinationServer() } -// UnimplementedDestinationServer must be embedded to have forward compatible implementations. -type UnimplementedDestinationServer struct { -} +// UnimplementedDestinationServer must be embedded to have +// forward compatible implementations. +// +// NOTE: this should be embedded by value instead of pointer to avoid a nil +// pointer dereference when methods are called. +type UnimplementedDestinationServer struct{} func (UnimplementedDestinationServer) GetProtocolVersion(context.Context, *v0.GetProtocolVersion_Request) (*v0.GetProtocolVersion_Response, error) { return nil, status.Errorf(codes.Unimplemented, "method GetProtocolVersion not implemented") @@ -268,10 +227,10 @@ func (UnimplementedDestinationServer) Configure(context.Context, *v0.Configure_R func (UnimplementedDestinationServer) Migrate(context.Context, *Migrate_Request) (*Migrate_Response, error) { return nil, status.Errorf(codes.Unimplemented, "method Migrate not implemented") } -func (UnimplementedDestinationServer) Write(Destination_WriteServer) error { +func (UnimplementedDestinationServer) Write(grpc.ClientStreamingServer[Write_Request, Write_Response]) error { return status.Errorf(codes.Unimplemented, "method Write not implemented") } -func (UnimplementedDestinationServer) Write2(Destination_Write2Server) error { +func (UnimplementedDestinationServer) Write2(grpc.ClientStreamingServer[Write2_Request, Write2_Response]) error { return status.Errorf(codes.Unimplemented, "method Write2 not implemented") } func (UnimplementedDestinationServer) Close(context.Context, *Close_Request) (*Close_Response, error) { @@ -284,6 +243,7 @@ func (UnimplementedDestinationServer) GetMetrics(context.Context, *GetDestinatio return nil, status.Errorf(codes.Unimplemented, "method GetMetrics not implemented") } func (UnimplementedDestinationServer) mustEmbedUnimplementedDestinationServer() {} +func (UnimplementedDestinationServer) testEmbeddedByValue() {} // UnsafeDestinationServer may be embedded to opt out of forward compatibility for this service. // Use of this interface is not recommended, as added methods to DestinationServer will @@ -293,6 +253,13 @@ type UnsafeDestinationServer interface { } func RegisterDestinationServer(s grpc.ServiceRegistrar, srv DestinationServer) { + // If the following call pancis, it indicates UnimplementedDestinationServer was + // embedded by pointer and is nil. This will cause panics if an + // unimplemented method is ever invoked, so we test this at initialization + // time to prevent it from happening at runtime later due to I/O. + if t, ok := srv.(interface{ testEmbeddedByValue() }); ok { + t.testEmbeddedByValue() + } s.RegisterService(&Destination_ServiceDesc, srv) } @@ -387,56 +354,18 @@ func _Destination_Migrate_Handler(srv interface{}, ctx context.Context, dec func } func _Destination_Write_Handler(srv interface{}, stream grpc.ServerStream) error { - return srv.(DestinationServer).Write(&destinationWriteServer{ServerStream: stream}) + return srv.(DestinationServer).Write(&grpc.GenericServerStream[Write_Request, Write_Response]{ServerStream: stream}) } -type Destination_WriteServer interface { - SendAndClose(*Write_Response) error - Recv() (*Write_Request, error) - grpc.ServerStream -} - -type destinationWriteServer struct { - grpc.ServerStream -} - -func (x *destinationWriteServer) SendAndClose(m *Write_Response) error { - return x.ServerStream.SendMsg(m) -} - -func (x *destinationWriteServer) Recv() (*Write_Request, error) { - m := new(Write_Request) - if err := x.ServerStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type Destination_WriteServer = grpc.ClientStreamingServer[Write_Request, Write_Response] func _Destination_Write2_Handler(srv interface{}, stream grpc.ServerStream) error { - return srv.(DestinationServer).Write2(&destinationWrite2Server{ServerStream: stream}) -} - -type Destination_Write2Server interface { - SendAndClose(*Write2_Response) error - Recv() (*Write2_Request, error) - grpc.ServerStream -} - -type destinationWrite2Server struct { - grpc.ServerStream + return srv.(DestinationServer).Write2(&grpc.GenericServerStream[Write2_Request, Write2_Response]{ServerStream: stream}) } -func (x *destinationWrite2Server) SendAndClose(m *Write2_Response) error { - return x.ServerStream.SendMsg(m) -} - -func (x *destinationWrite2Server) Recv() (*Write2_Request, error) { - m := new(Write2_Request) - if err := x.ServerStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type Destination_Write2Server = grpc.ClientStreamingServer[Write2_Request, Write2_Response] func _Destination_Close_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(Close_Request) diff --git a/pb/destination/v1/destination_grpc.pb.go b/pb/destination/v1/destination_grpc.pb.go index ea63a16..8bcdcc7 100644 --- a/pb/destination/v1/destination_grpc.pb.go +++ b/pb/destination/v1/destination_grpc.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: -// - protoc-gen-go-grpc v1.4.0 +// - protoc-gen-go-grpc v1.5.1 // - protoc v4.23.4 // source: plugin-pb/destination/v1/destination.proto @@ -15,8 +15,8 @@ import ( // This is a compile-time assertion to ensure that this generated file // is compatible with the grpc package it is being compiled against. -// Requires gRPC-Go v1.62.0 or later. -const _ = grpc.SupportPackageIsVersion8 +// Requires gRPC-Go v1.64.0 or later. +const _ = grpc.SupportPackageIsVersion9 const ( Destination_GetName_FullMethodName = "/cloudquery.destination.v1.Destination/GetName" @@ -42,7 +42,7 @@ type DestinationClient interface { // Migrate tables to the given plugin version Migrate(ctx context.Context, in *Migrate_Request, opts ...grpc.CallOption) (*Migrate_Response, error) // Write resources - Write(ctx context.Context, opts ...grpc.CallOption) (Destination_WriteClient, error) + Write(ctx context.Context, opts ...grpc.CallOption) (grpc.ClientStreamingClient[Write_Request, Write_Response], error) // Send signal to flush and close open connections Close(ctx context.Context, in *Close_Request, opts ...grpc.CallOption) (*Close_Response, error) // DeleteStale deletes stale data that was inserted by a given source @@ -100,40 +100,18 @@ func (c *destinationClient) Migrate(ctx context.Context, in *Migrate_Request, op return out, nil } -func (c *destinationClient) Write(ctx context.Context, opts ...grpc.CallOption) (Destination_WriteClient, error) { +func (c *destinationClient) Write(ctx context.Context, opts ...grpc.CallOption) (grpc.ClientStreamingClient[Write_Request, Write_Response], error) { cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) stream, err := c.cc.NewStream(ctx, &Destination_ServiceDesc.Streams[0], Destination_Write_FullMethodName, cOpts...) if err != nil { return nil, err } - x := &destinationWriteClient{ClientStream: stream} + x := &grpc.GenericClientStream[Write_Request, Write_Response]{ClientStream: stream} return x, nil } -type Destination_WriteClient interface { - Send(*Write_Request) error - CloseAndRecv() (*Write_Response, error) - grpc.ClientStream -} - -type destinationWriteClient struct { - grpc.ClientStream -} - -func (x *destinationWriteClient) Send(m *Write_Request) error { - return x.ClientStream.SendMsg(m) -} - -func (x *destinationWriteClient) CloseAndRecv() (*Write_Response, error) { - if err := x.ClientStream.CloseSend(); err != nil { - return nil, err - } - m := new(Write_Response) - if err := x.ClientStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type Destination_WriteClient = grpc.ClientStreamingClient[Write_Request, Write_Response] func (c *destinationClient) Close(ctx context.Context, in *Close_Request, opts ...grpc.CallOption) (*Close_Response, error) { cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) @@ -167,7 +145,7 @@ func (c *destinationClient) GetMetrics(ctx context.Context, in *GetDestinationMe // DestinationServer is the server API for Destination service. // All implementations must embed UnimplementedDestinationServer -// for forward compatibility +// for forward compatibility. type DestinationServer interface { // Get the name of the plugin GetName(context.Context, *GetName_Request) (*GetName_Response, error) @@ -178,7 +156,7 @@ type DestinationServer interface { // Migrate tables to the given plugin version Migrate(context.Context, *Migrate_Request) (*Migrate_Response, error) // Write resources - Write(Destination_WriteServer) error + Write(grpc.ClientStreamingServer[Write_Request, Write_Response]) error // Send signal to flush and close open connections Close(context.Context, *Close_Request) (*Close_Response, error) // DeleteStale deletes stale data that was inserted by a given source @@ -189,9 +167,12 @@ type DestinationServer interface { mustEmbedUnimplementedDestinationServer() } -// UnimplementedDestinationServer must be embedded to have forward compatible implementations. -type UnimplementedDestinationServer struct { -} +// UnimplementedDestinationServer must be embedded to have +// forward compatible implementations. +// +// NOTE: this should be embedded by value instead of pointer to avoid a nil +// pointer dereference when methods are called. +type UnimplementedDestinationServer struct{} func (UnimplementedDestinationServer) GetName(context.Context, *GetName_Request) (*GetName_Response, error) { return nil, status.Errorf(codes.Unimplemented, "method GetName not implemented") @@ -205,7 +186,7 @@ func (UnimplementedDestinationServer) Configure(context.Context, *Configure_Requ func (UnimplementedDestinationServer) Migrate(context.Context, *Migrate_Request) (*Migrate_Response, error) { return nil, status.Errorf(codes.Unimplemented, "method Migrate not implemented") } -func (UnimplementedDestinationServer) Write(Destination_WriteServer) error { +func (UnimplementedDestinationServer) Write(grpc.ClientStreamingServer[Write_Request, Write_Response]) error { return status.Errorf(codes.Unimplemented, "method Write not implemented") } func (UnimplementedDestinationServer) Close(context.Context, *Close_Request) (*Close_Response, error) { @@ -218,6 +199,7 @@ func (UnimplementedDestinationServer) GetMetrics(context.Context, *GetDestinatio return nil, status.Errorf(codes.Unimplemented, "method GetMetrics not implemented") } func (UnimplementedDestinationServer) mustEmbedUnimplementedDestinationServer() {} +func (UnimplementedDestinationServer) testEmbeddedByValue() {} // UnsafeDestinationServer may be embedded to opt out of forward compatibility for this service. // Use of this interface is not recommended, as added methods to DestinationServer will @@ -227,6 +209,13 @@ type UnsafeDestinationServer interface { } func RegisterDestinationServer(s grpc.ServiceRegistrar, srv DestinationServer) { + // If the following call pancis, it indicates UnimplementedDestinationServer was + // embedded by pointer and is nil. This will cause panics if an + // unimplemented method is ever invoked, so we test this at initialization + // time to prevent it from happening at runtime later due to I/O. + if t, ok := srv.(interface{ testEmbeddedByValue() }); ok { + t.testEmbeddedByValue() + } s.RegisterService(&Destination_ServiceDesc, srv) } @@ -303,30 +292,11 @@ func _Destination_Migrate_Handler(srv interface{}, ctx context.Context, dec func } func _Destination_Write_Handler(srv interface{}, stream grpc.ServerStream) error { - return srv.(DestinationServer).Write(&destinationWriteServer{ServerStream: stream}) -} - -type Destination_WriteServer interface { - SendAndClose(*Write_Response) error - Recv() (*Write_Request, error) - grpc.ServerStream -} - -type destinationWriteServer struct { - grpc.ServerStream + return srv.(DestinationServer).Write(&grpc.GenericServerStream[Write_Request, Write_Response]{ServerStream: stream}) } -func (x *destinationWriteServer) SendAndClose(m *Write_Response) error { - return x.ServerStream.SendMsg(m) -} - -func (x *destinationWriteServer) Recv() (*Write_Request, error) { - m := new(Write_Request) - if err := x.ServerStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type Destination_WriteServer = grpc.ClientStreamingServer[Write_Request, Write_Response] func _Destination_Close_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(Close_Request) diff --git a/pb/discovery/v0/discovery_grpc.pb.go b/pb/discovery/v0/discovery_grpc.pb.go index b1c0c75..cfef0e4 100644 --- a/pb/discovery/v0/discovery_grpc.pb.go +++ b/pb/discovery/v0/discovery_grpc.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: -// - protoc-gen-go-grpc v1.4.0 +// - protoc-gen-go-grpc v1.5.1 // - protoc v4.23.4 // source: plugin-pb/discovery/v0/discovery.proto @@ -15,8 +15,8 @@ import ( // This is a compile-time assertion to ensure that this generated file // is compatible with the grpc package it is being compiled against. -// Requires gRPC-Go v1.62.0 or later. -const _ = grpc.SupportPackageIsVersion8 +// Requires gRPC-Go v1.64.0 or later. +const _ = grpc.SupportPackageIsVersion9 const ( Discovery_GetVersions_FullMethodName = "/cloudquery.discovery.v0.Discovery/GetVersions" @@ -50,21 +50,25 @@ func (c *discoveryClient) GetVersions(ctx context.Context, in *GetVersions_Reque // DiscoveryServer is the server API for Discovery service. // All implementations must embed UnimplementedDiscoveryServer -// for forward compatibility +// for forward compatibility. type DiscoveryServer interface { // Get the name of the plugin GetVersions(context.Context, *GetVersions_Request) (*GetVersions_Response, error) mustEmbedUnimplementedDiscoveryServer() } -// UnimplementedDiscoveryServer must be embedded to have forward compatible implementations. -type UnimplementedDiscoveryServer struct { -} +// UnimplementedDiscoveryServer must be embedded to have +// forward compatible implementations. +// +// NOTE: this should be embedded by value instead of pointer to avoid a nil +// pointer dereference when methods are called. +type UnimplementedDiscoveryServer struct{} func (UnimplementedDiscoveryServer) GetVersions(context.Context, *GetVersions_Request) (*GetVersions_Response, error) { return nil, status.Errorf(codes.Unimplemented, "method GetVersions not implemented") } func (UnimplementedDiscoveryServer) mustEmbedUnimplementedDiscoveryServer() {} +func (UnimplementedDiscoveryServer) testEmbeddedByValue() {} // UnsafeDiscoveryServer may be embedded to opt out of forward compatibility for this service. // Use of this interface is not recommended, as added methods to DiscoveryServer will @@ -74,6 +78,13 @@ type UnsafeDiscoveryServer interface { } func RegisterDiscoveryServer(s grpc.ServiceRegistrar, srv DiscoveryServer) { + // If the following call pancis, it indicates UnimplementedDiscoveryServer was + // embedded by pointer and is nil. This will cause panics if an + // unimplemented method is ever invoked, so we test this at initialization + // time to prevent it from happening at runtime later due to I/O. + if t, ok := srv.(interface{ testEmbeddedByValue() }); ok { + t.testEmbeddedByValue() + } s.RegisterService(&Discovery_ServiceDesc, srv) } diff --git a/pb/discovery/v1/discovery_grpc.pb.go b/pb/discovery/v1/discovery_grpc.pb.go index a75e632..e403c98 100644 --- a/pb/discovery/v1/discovery_grpc.pb.go +++ b/pb/discovery/v1/discovery_grpc.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: -// - protoc-gen-go-grpc v1.4.0 +// - protoc-gen-go-grpc v1.5.1 // - protoc v4.23.4 // source: plugin-pb/discovery/v1/discovery.proto @@ -15,8 +15,8 @@ import ( // This is a compile-time assertion to ensure that this generated file // is compatible with the grpc package it is being compiled against. -// Requires gRPC-Go v1.62.0 or later. -const _ = grpc.SupportPackageIsVersion8 +// Requires gRPC-Go v1.64.0 or later. +const _ = grpc.SupportPackageIsVersion9 const ( Discovery_GetVersions_FullMethodName = "/cloudquery.discovery.v1.Discovery/GetVersions" @@ -50,21 +50,25 @@ func (c *discoveryClient) GetVersions(ctx context.Context, in *GetVersions_Reque // DiscoveryServer is the server API for Discovery service. // All implementations must embed UnimplementedDiscoveryServer -// for forward compatibility +// for forward compatibility. type DiscoveryServer interface { // Get the name of the plugin GetVersions(context.Context, *GetVersions_Request) (*GetVersions_Response, error) mustEmbedUnimplementedDiscoveryServer() } -// UnimplementedDiscoveryServer must be embedded to have forward compatible implementations. -type UnimplementedDiscoveryServer struct { -} +// UnimplementedDiscoveryServer must be embedded to have +// forward compatible implementations. +// +// NOTE: this should be embedded by value instead of pointer to avoid a nil +// pointer dereference when methods are called. +type UnimplementedDiscoveryServer struct{} func (UnimplementedDiscoveryServer) GetVersions(context.Context, *GetVersions_Request) (*GetVersions_Response, error) { return nil, status.Errorf(codes.Unimplemented, "method GetVersions not implemented") } func (UnimplementedDiscoveryServer) mustEmbedUnimplementedDiscoveryServer() {} +func (UnimplementedDiscoveryServer) testEmbeddedByValue() {} // UnsafeDiscoveryServer may be embedded to opt out of forward compatibility for this service. // Use of this interface is not recommended, as added methods to DiscoveryServer will @@ -74,6 +78,13 @@ type UnsafeDiscoveryServer interface { } func RegisterDiscoveryServer(s grpc.ServiceRegistrar, srv DiscoveryServer) { + // If the following call pancis, it indicates UnimplementedDiscoveryServer was + // embedded by pointer and is nil. This will cause panics if an + // unimplemented method is ever invoked, so we test this at initialization + // time to prevent it from happening at runtime later due to I/O. + if t, ok := srv.(interface{ testEmbeddedByValue() }); ok { + t.testEmbeddedByValue() + } s.RegisterService(&Discovery_ServiceDesc, srv) } diff --git a/pb/plugin/v3/plugin_grpc.pb.go b/pb/plugin/v3/plugin_grpc.pb.go index c6950c8..c40431e 100644 --- a/pb/plugin/v3/plugin_grpc.pb.go +++ b/pb/plugin/v3/plugin_grpc.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: -// - protoc-gen-go-grpc v1.4.0 +// - protoc-gen-go-grpc v1.5.1 // - protoc v4.23.4 // source: plugin-pb/plugin/v3/plugin.proto @@ -15,8 +15,8 @@ import ( // This is a compile-time assertion to ensure that this generated file // is compatible with the grpc package it is being compiled against. -// Requires gRPC-Go v1.62.0 or later. -const _ = grpc.SupportPackageIsVersion8 +// Requires gRPC-Go v1.64.0 or later. +const _ = grpc.SupportPackageIsVersion9 const ( Plugin_GetName_FullMethodName = "/cloudquery.plugin.v3.Plugin/GetName" @@ -49,14 +49,14 @@ type PluginClient interface { // Get all tables the source plugin supports. Must be called after Init GetTables(ctx context.Context, in *GetTables_Request, opts ...grpc.CallOption) (*GetTables_Response, error) // Start a sync on the source plugin. It streams messages as output. - Sync(ctx context.Context, in *Sync_Request, opts ...grpc.CallOption) (Plugin_SyncClient, error) + Sync(ctx context.Context, in *Sync_Request, opts ...grpc.CallOption) (grpc.ServerStreamingClient[Sync_Response], error) // Start a Read on the source plugin for a given table and schema. It streams messages as output. // The plugin assume that that schema was used to also write the data beforehand - Read(ctx context.Context, in *Read_Request, opts ...grpc.CallOption) (Plugin_ReadClient, error) + Read(ctx context.Context, in *Read_Request, opts ...grpc.CallOption) (grpc.ServerStreamingClient[Read_Response], error) // Write resources. Write is the mirror of Sync, expecting a stream of messages as input. - Write(ctx context.Context, opts ...grpc.CallOption) (Plugin_WriteClient, error) + Write(ctx context.Context, opts ...grpc.CallOption) (grpc.ClientStreamingClient[Write_Request, Write_Response], error) // Transform resources. - Transform(ctx context.Context, opts ...grpc.CallOption) (Plugin_TransformClient, error) + Transform(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[Transform_Request, Transform_Response], error) // Send signal to flush and close open connections Close(ctx context.Context, in *Close_Request, opts ...grpc.CallOption) (*Close_Response, error) // Validate and test the connections used by the plugin @@ -121,13 +121,13 @@ func (c *pluginClient) GetTables(ctx context.Context, in *GetTables_Request, opt return out, nil } -func (c *pluginClient) Sync(ctx context.Context, in *Sync_Request, opts ...grpc.CallOption) (Plugin_SyncClient, error) { +func (c *pluginClient) Sync(ctx context.Context, in *Sync_Request, opts ...grpc.CallOption) (grpc.ServerStreamingClient[Sync_Response], error) { cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) stream, err := c.cc.NewStream(ctx, &Plugin_ServiceDesc.Streams[0], Plugin_Sync_FullMethodName, cOpts...) if err != nil { return nil, err } - x := &pluginSyncClient{ClientStream: stream} + x := &grpc.GenericClientStream[Sync_Request, Sync_Response]{ClientStream: stream} if err := x.ClientStream.SendMsg(in); err != nil { return nil, err } @@ -137,30 +137,16 @@ func (c *pluginClient) Sync(ctx context.Context, in *Sync_Request, opts ...grpc. return x, nil } -type Plugin_SyncClient interface { - Recv() (*Sync_Response, error) - grpc.ClientStream -} - -type pluginSyncClient struct { - grpc.ClientStream -} +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type Plugin_SyncClient = grpc.ServerStreamingClient[Sync_Response] -func (x *pluginSyncClient) Recv() (*Sync_Response, error) { - m := new(Sync_Response) - if err := x.ClientStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} - -func (c *pluginClient) Read(ctx context.Context, in *Read_Request, opts ...grpc.CallOption) (Plugin_ReadClient, error) { +func (c *pluginClient) Read(ctx context.Context, in *Read_Request, opts ...grpc.CallOption) (grpc.ServerStreamingClient[Read_Response], error) { cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) stream, err := c.cc.NewStream(ctx, &Plugin_ServiceDesc.Streams[1], Plugin_Read_FullMethodName, cOpts...) if err != nil { return nil, err } - x := &pluginReadClient{ClientStream: stream} + x := &grpc.GenericClientStream[Read_Request, Read_Response]{ClientStream: stream} if err := x.ClientStream.SendMsg(in); err != nil { return nil, err } @@ -170,89 +156,34 @@ func (c *pluginClient) Read(ctx context.Context, in *Read_Request, opts ...grpc. return x, nil } -type Plugin_ReadClient interface { - Recv() (*Read_Response, error) - grpc.ClientStream -} +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type Plugin_ReadClient = grpc.ServerStreamingClient[Read_Response] -type pluginReadClient struct { - grpc.ClientStream -} - -func (x *pluginReadClient) Recv() (*Read_Response, error) { - m := new(Read_Response) - if err := x.ClientStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} - -func (c *pluginClient) Write(ctx context.Context, opts ...grpc.CallOption) (Plugin_WriteClient, error) { +func (c *pluginClient) Write(ctx context.Context, opts ...grpc.CallOption) (grpc.ClientStreamingClient[Write_Request, Write_Response], error) { cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) stream, err := c.cc.NewStream(ctx, &Plugin_ServiceDesc.Streams[2], Plugin_Write_FullMethodName, cOpts...) if err != nil { return nil, err } - x := &pluginWriteClient{ClientStream: stream} + x := &grpc.GenericClientStream[Write_Request, Write_Response]{ClientStream: stream} return x, nil } -type Plugin_WriteClient interface { - Send(*Write_Request) error - CloseAndRecv() (*Write_Response, error) - grpc.ClientStream -} - -type pluginWriteClient struct { - grpc.ClientStream -} +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type Plugin_WriteClient = grpc.ClientStreamingClient[Write_Request, Write_Response] -func (x *pluginWriteClient) Send(m *Write_Request) error { - return x.ClientStream.SendMsg(m) -} - -func (x *pluginWriteClient) CloseAndRecv() (*Write_Response, error) { - if err := x.ClientStream.CloseSend(); err != nil { - return nil, err - } - m := new(Write_Response) - if err := x.ClientStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} - -func (c *pluginClient) Transform(ctx context.Context, opts ...grpc.CallOption) (Plugin_TransformClient, error) { +func (c *pluginClient) Transform(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[Transform_Request, Transform_Response], error) { cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) stream, err := c.cc.NewStream(ctx, &Plugin_ServiceDesc.Streams[3], Plugin_Transform_FullMethodName, cOpts...) if err != nil { return nil, err } - x := &pluginTransformClient{ClientStream: stream} + x := &grpc.GenericClientStream[Transform_Request, Transform_Response]{ClientStream: stream} return x, nil } -type Plugin_TransformClient interface { - Send(*Transform_Request) error - Recv() (*Transform_Response, error) - grpc.ClientStream -} - -type pluginTransformClient struct { - grpc.ClientStream -} - -func (x *pluginTransformClient) Send(m *Transform_Request) error { - return x.ClientStream.SendMsg(m) -} - -func (x *pluginTransformClient) Recv() (*Transform_Response, error) { - m := new(Transform_Response) - if err := x.ClientStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type Plugin_TransformClient = grpc.BidiStreamingClient[Transform_Request, Transform_Response] func (c *pluginClient) Close(ctx context.Context, in *Close_Request, opts ...grpc.CallOption) (*Close_Response, error) { cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) @@ -276,7 +207,7 @@ func (c *pluginClient) TestConnection(ctx context.Context, in *TestConnection_Re // PluginServer is the server API for Plugin service. // All implementations must embed UnimplementedPluginServer -// for forward compatibility +// for forward compatibility. type PluginServer interface { // Get the name of the plugin GetName(context.Context, *GetName_Request) (*GetName_Response, error) @@ -291,14 +222,14 @@ type PluginServer interface { // Get all tables the source plugin supports. Must be called after Init GetTables(context.Context, *GetTables_Request) (*GetTables_Response, error) // Start a sync on the source plugin. It streams messages as output. - Sync(*Sync_Request, Plugin_SyncServer) error + Sync(*Sync_Request, grpc.ServerStreamingServer[Sync_Response]) error // Start a Read on the source plugin for a given table and schema. It streams messages as output. // The plugin assume that that schema was used to also write the data beforehand - Read(*Read_Request, Plugin_ReadServer) error + Read(*Read_Request, grpc.ServerStreamingServer[Read_Response]) error // Write resources. Write is the mirror of Sync, expecting a stream of messages as input. - Write(Plugin_WriteServer) error + Write(grpc.ClientStreamingServer[Write_Request, Write_Response]) error // Transform resources. - Transform(Plugin_TransformServer) error + Transform(grpc.BidiStreamingServer[Transform_Request, Transform_Response]) error // Send signal to flush and close open connections Close(context.Context, *Close_Request) (*Close_Response, error) // Validate and test the connections used by the plugin @@ -306,9 +237,12 @@ type PluginServer interface { mustEmbedUnimplementedPluginServer() } -// UnimplementedPluginServer must be embedded to have forward compatible implementations. -type UnimplementedPluginServer struct { -} +// UnimplementedPluginServer must be embedded to have +// forward compatible implementations. +// +// NOTE: this should be embedded by value instead of pointer to avoid a nil +// pointer dereference when methods are called. +type UnimplementedPluginServer struct{} func (UnimplementedPluginServer) GetName(context.Context, *GetName_Request) (*GetName_Response, error) { return nil, status.Errorf(codes.Unimplemented, "method GetName not implemented") @@ -325,16 +259,16 @@ func (UnimplementedPluginServer) Init(context.Context, *Init_Request) (*Init_Res func (UnimplementedPluginServer) GetTables(context.Context, *GetTables_Request) (*GetTables_Response, error) { return nil, status.Errorf(codes.Unimplemented, "method GetTables not implemented") } -func (UnimplementedPluginServer) Sync(*Sync_Request, Plugin_SyncServer) error { +func (UnimplementedPluginServer) Sync(*Sync_Request, grpc.ServerStreamingServer[Sync_Response]) error { return status.Errorf(codes.Unimplemented, "method Sync not implemented") } -func (UnimplementedPluginServer) Read(*Read_Request, Plugin_ReadServer) error { +func (UnimplementedPluginServer) Read(*Read_Request, grpc.ServerStreamingServer[Read_Response]) error { return status.Errorf(codes.Unimplemented, "method Read not implemented") } -func (UnimplementedPluginServer) Write(Plugin_WriteServer) error { +func (UnimplementedPluginServer) Write(grpc.ClientStreamingServer[Write_Request, Write_Response]) error { return status.Errorf(codes.Unimplemented, "method Write not implemented") } -func (UnimplementedPluginServer) Transform(Plugin_TransformServer) error { +func (UnimplementedPluginServer) Transform(grpc.BidiStreamingServer[Transform_Request, Transform_Response]) error { return status.Errorf(codes.Unimplemented, "method Transform not implemented") } func (UnimplementedPluginServer) Close(context.Context, *Close_Request) (*Close_Response, error) { @@ -344,6 +278,7 @@ func (UnimplementedPluginServer) TestConnection(context.Context, *TestConnection return nil, status.Errorf(codes.Unimplemented, "method TestConnection not implemented") } func (UnimplementedPluginServer) mustEmbedUnimplementedPluginServer() {} +func (UnimplementedPluginServer) testEmbeddedByValue() {} // UnsafePluginServer may be embedded to opt out of forward compatibility for this service. // Use of this interface is not recommended, as added methods to PluginServer will @@ -353,6 +288,13 @@ type UnsafePluginServer interface { } func RegisterPluginServer(s grpc.ServiceRegistrar, srv PluginServer) { + // If the following call pancis, it indicates UnimplementedPluginServer was + // embedded by pointer and is nil. This will cause panics if an + // unimplemented method is ever invoked, so we test this at initialization + // time to prevent it from happening at runtime later due to I/O. + if t, ok := srv.(interface{ testEmbeddedByValue() }); ok { + t.testEmbeddedByValue() + } s.RegisterService(&Plugin_ServiceDesc, srv) } @@ -451,94 +393,36 @@ func _Plugin_Sync_Handler(srv interface{}, stream grpc.ServerStream) error { if err := stream.RecvMsg(m); err != nil { return err } - return srv.(PluginServer).Sync(m, &pluginSyncServer{ServerStream: stream}) + return srv.(PluginServer).Sync(m, &grpc.GenericServerStream[Sync_Request, Sync_Response]{ServerStream: stream}) } -type Plugin_SyncServer interface { - Send(*Sync_Response) error - grpc.ServerStream -} - -type pluginSyncServer struct { - grpc.ServerStream -} - -func (x *pluginSyncServer) Send(m *Sync_Response) error { - return x.ServerStream.SendMsg(m) -} +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type Plugin_SyncServer = grpc.ServerStreamingServer[Sync_Response] func _Plugin_Read_Handler(srv interface{}, stream grpc.ServerStream) error { m := new(Read_Request) if err := stream.RecvMsg(m); err != nil { return err } - return srv.(PluginServer).Read(m, &pluginReadServer{ServerStream: stream}) -} - -type Plugin_ReadServer interface { - Send(*Read_Response) error - grpc.ServerStream -} - -type pluginReadServer struct { - grpc.ServerStream + return srv.(PluginServer).Read(m, &grpc.GenericServerStream[Read_Request, Read_Response]{ServerStream: stream}) } -func (x *pluginReadServer) Send(m *Read_Response) error { - return x.ServerStream.SendMsg(m) -} +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type Plugin_ReadServer = grpc.ServerStreamingServer[Read_Response] func _Plugin_Write_Handler(srv interface{}, stream grpc.ServerStream) error { - return srv.(PluginServer).Write(&pluginWriteServer{ServerStream: stream}) + return srv.(PluginServer).Write(&grpc.GenericServerStream[Write_Request, Write_Response]{ServerStream: stream}) } -type Plugin_WriteServer interface { - SendAndClose(*Write_Response) error - Recv() (*Write_Request, error) - grpc.ServerStream -} - -type pluginWriteServer struct { - grpc.ServerStream -} - -func (x *pluginWriteServer) SendAndClose(m *Write_Response) error { - return x.ServerStream.SendMsg(m) -} - -func (x *pluginWriteServer) Recv() (*Write_Request, error) { - m := new(Write_Request) - if err := x.ServerStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type Plugin_WriteServer = grpc.ClientStreamingServer[Write_Request, Write_Response] func _Plugin_Transform_Handler(srv interface{}, stream grpc.ServerStream) error { - return srv.(PluginServer).Transform(&pluginTransformServer{ServerStream: stream}) -} - -type Plugin_TransformServer interface { - Send(*Transform_Response) error - Recv() (*Transform_Request, error) - grpc.ServerStream -} - -type pluginTransformServer struct { - grpc.ServerStream -} - -func (x *pluginTransformServer) Send(m *Transform_Response) error { - return x.ServerStream.SendMsg(m) + return srv.(PluginServer).Transform(&grpc.GenericServerStream[Transform_Request, Transform_Response]{ServerStream: stream}) } -func (x *pluginTransformServer) Recv() (*Transform_Request, error) { - m := new(Transform_Request) - if err := x.ServerStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type Plugin_TransformServer = grpc.BidiStreamingServer[Transform_Request, Transform_Response] func _Plugin_Close_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(Close_Request) diff --git a/pb/source/v0/source_grpc.pb.go b/pb/source/v0/source_grpc.pb.go index 504fcd6..5343ffc 100644 --- a/pb/source/v0/source_grpc.pb.go +++ b/pb/source/v0/source_grpc.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: -// - protoc-gen-go-grpc v1.4.0 +// - protoc-gen-go-grpc v1.5.1 // - protoc v4.23.4 // source: plugin-pb/source/v0/source.proto @@ -16,8 +16,8 @@ import ( // This is a compile-time assertion to ensure that this generated file // is compatible with the grpc package it is being compiled against. -// Requires gRPC-Go v1.62.0 or later. -const _ = grpc.SupportPackageIsVersion8 +// Requires gRPC-Go v1.64.0 or later. +const _ = grpc.SupportPackageIsVersion9 const ( Source_GetProtocolVersion_FullMethodName = "/proto.Source/GetProtocolVersion" @@ -53,9 +53,9 @@ type SourceClient interface { // every sync request. GetSyncSummary(ctx context.Context, in *GetSyncSummary_Request, opts ...grpc.CallOption) (*GetSyncSummary_Response, error) // Fetch resources - Sync(ctx context.Context, in *Sync_Request, opts ...grpc.CallOption) (Source_SyncClient, error) + Sync(ctx context.Context, in *Sync_Request, opts ...grpc.CallOption) (grpc.ServerStreamingClient[Sync_Response], error) // Sync2 is a new sync API that supports CQ Types. It is not backward compatible with Sync. - Sync2(ctx context.Context, in *Sync2_Request, opts ...grpc.CallOption) (Source_Sync2Client, error) + Sync2(ctx context.Context, in *Sync2_Request, opts ...grpc.CallOption) (grpc.ServerStreamingClient[Sync2_Response], error) // Get metrics for the source plugin GetMetrics(ctx context.Context, in *GetSourceMetrics_Request, opts ...grpc.CallOption) (*GetSourceMetrics_Response, error) } @@ -128,13 +128,13 @@ func (c *sourceClient) GetSyncSummary(ctx context.Context, in *GetSyncSummary_Re return out, nil } -func (c *sourceClient) Sync(ctx context.Context, in *Sync_Request, opts ...grpc.CallOption) (Source_SyncClient, error) { +func (c *sourceClient) Sync(ctx context.Context, in *Sync_Request, opts ...grpc.CallOption) (grpc.ServerStreamingClient[Sync_Response], error) { cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) stream, err := c.cc.NewStream(ctx, &Source_ServiceDesc.Streams[0], Source_Sync_FullMethodName, cOpts...) if err != nil { return nil, err } - x := &sourceSyncClient{ClientStream: stream} + x := &grpc.GenericClientStream[Sync_Request, Sync_Response]{ClientStream: stream} if err := x.ClientStream.SendMsg(in); err != nil { return nil, err } @@ -144,30 +144,16 @@ func (c *sourceClient) Sync(ctx context.Context, in *Sync_Request, opts ...grpc. return x, nil } -type Source_SyncClient interface { - Recv() (*Sync_Response, error) - grpc.ClientStream -} - -type sourceSyncClient struct { - grpc.ClientStream -} - -func (x *sourceSyncClient) Recv() (*Sync_Response, error) { - m := new(Sync_Response) - if err := x.ClientStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type Source_SyncClient = grpc.ServerStreamingClient[Sync_Response] -func (c *sourceClient) Sync2(ctx context.Context, in *Sync2_Request, opts ...grpc.CallOption) (Source_Sync2Client, error) { +func (c *sourceClient) Sync2(ctx context.Context, in *Sync2_Request, opts ...grpc.CallOption) (grpc.ServerStreamingClient[Sync2_Response], error) { cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) stream, err := c.cc.NewStream(ctx, &Source_ServiceDesc.Streams[1], Source_Sync2_FullMethodName, cOpts...) if err != nil { return nil, err } - x := &sourceSync2Client{ClientStream: stream} + x := &grpc.GenericClientStream[Sync2_Request, Sync2_Response]{ClientStream: stream} if err := x.ClientStream.SendMsg(in); err != nil { return nil, err } @@ -177,22 +163,8 @@ func (c *sourceClient) Sync2(ctx context.Context, in *Sync2_Request, opts ...grp return x, nil } -type Source_Sync2Client interface { - Recv() (*Sync2_Response, error) - grpc.ClientStream -} - -type sourceSync2Client struct { - grpc.ClientStream -} - -func (x *sourceSync2Client) Recv() (*Sync2_Response, error) { - m := new(Sync2_Response) - if err := x.ClientStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type Source_Sync2Client = grpc.ServerStreamingClient[Sync2_Response] func (c *sourceClient) GetMetrics(ctx context.Context, in *GetSourceMetrics_Request, opts ...grpc.CallOption) (*GetSourceMetrics_Response, error) { cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) @@ -206,7 +178,7 @@ func (c *sourceClient) GetMetrics(ctx context.Context, in *GetSourceMetrics_Requ // SourceServer is the server API for Source service. // All implementations must embed UnimplementedSourceServer -// for forward compatibility +// for forward compatibility. type SourceServer interface { // Get the current protocol version of the plugin. This helps // get the right message about upgrade/downgrade of cli and/or plugin. @@ -226,17 +198,20 @@ type SourceServer interface { // every sync request. GetSyncSummary(context.Context, *GetSyncSummary_Request) (*GetSyncSummary_Response, error) // Fetch resources - Sync(*Sync_Request, Source_SyncServer) error + Sync(*Sync_Request, grpc.ServerStreamingServer[Sync_Response]) error // Sync2 is a new sync API that supports CQ Types. It is not backward compatible with Sync. - Sync2(*Sync2_Request, Source_Sync2Server) error + Sync2(*Sync2_Request, grpc.ServerStreamingServer[Sync2_Response]) error // Get metrics for the source plugin GetMetrics(context.Context, *GetSourceMetrics_Request) (*GetSourceMetrics_Response, error) mustEmbedUnimplementedSourceServer() } -// UnimplementedSourceServer must be embedded to have forward compatible implementations. -type UnimplementedSourceServer struct { -} +// UnimplementedSourceServer must be embedded to have +// forward compatible implementations. +// +// NOTE: this should be embedded by value instead of pointer to avoid a nil +// pointer dereference when methods are called. +type UnimplementedSourceServer struct{} func (UnimplementedSourceServer) GetProtocolVersion(context.Context, *v0.GetProtocolVersion_Request) (*v0.GetProtocolVersion_Response, error) { return nil, status.Errorf(codes.Unimplemented, "method GetProtocolVersion not implemented") @@ -256,16 +231,17 @@ func (UnimplementedSourceServer) GetTablesForSpec(context.Context, *GetTablesFor func (UnimplementedSourceServer) GetSyncSummary(context.Context, *GetSyncSummary_Request) (*GetSyncSummary_Response, error) { return nil, status.Errorf(codes.Unimplemented, "method GetSyncSummary not implemented") } -func (UnimplementedSourceServer) Sync(*Sync_Request, Source_SyncServer) error { +func (UnimplementedSourceServer) Sync(*Sync_Request, grpc.ServerStreamingServer[Sync_Response]) error { return status.Errorf(codes.Unimplemented, "method Sync not implemented") } -func (UnimplementedSourceServer) Sync2(*Sync2_Request, Source_Sync2Server) error { +func (UnimplementedSourceServer) Sync2(*Sync2_Request, grpc.ServerStreamingServer[Sync2_Response]) error { return status.Errorf(codes.Unimplemented, "method Sync2 not implemented") } func (UnimplementedSourceServer) GetMetrics(context.Context, *GetSourceMetrics_Request) (*GetSourceMetrics_Response, error) { return nil, status.Errorf(codes.Unimplemented, "method GetMetrics not implemented") } func (UnimplementedSourceServer) mustEmbedUnimplementedSourceServer() {} +func (UnimplementedSourceServer) testEmbeddedByValue() {} // UnsafeSourceServer may be embedded to opt out of forward compatibility for this service. // Use of this interface is not recommended, as added methods to SourceServer will @@ -275,6 +251,13 @@ type UnsafeSourceServer interface { } func RegisterSourceServer(s grpc.ServiceRegistrar, srv SourceServer) { + // If the following call pancis, it indicates UnimplementedSourceServer was + // embedded by pointer and is nil. This will cause panics if an + // unimplemented method is ever invoked, so we test this at initialization + // time to prevent it from happening at runtime later due to I/O. + if t, ok := srv.(interface{ testEmbeddedByValue() }); ok { + t.testEmbeddedByValue() + } s.RegisterService(&Source_ServiceDesc, srv) } @@ -391,42 +374,22 @@ func _Source_Sync_Handler(srv interface{}, stream grpc.ServerStream) error { if err := stream.RecvMsg(m); err != nil { return err } - return srv.(SourceServer).Sync(m, &sourceSyncServer{ServerStream: stream}) + return srv.(SourceServer).Sync(m, &grpc.GenericServerStream[Sync_Request, Sync_Response]{ServerStream: stream}) } -type Source_SyncServer interface { - Send(*Sync_Response) error - grpc.ServerStream -} - -type sourceSyncServer struct { - grpc.ServerStream -} - -func (x *sourceSyncServer) Send(m *Sync_Response) error { - return x.ServerStream.SendMsg(m) -} +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type Source_SyncServer = grpc.ServerStreamingServer[Sync_Response] func _Source_Sync2_Handler(srv interface{}, stream grpc.ServerStream) error { m := new(Sync2_Request) if err := stream.RecvMsg(m); err != nil { return err } - return srv.(SourceServer).Sync2(m, &sourceSync2Server{ServerStream: stream}) -} - -type Source_Sync2Server interface { - Send(*Sync2_Response) error - grpc.ServerStream + return srv.(SourceServer).Sync2(m, &grpc.GenericServerStream[Sync2_Request, Sync2_Response]{ServerStream: stream}) } -type sourceSync2Server struct { - grpc.ServerStream -} - -func (x *sourceSync2Server) Send(m *Sync2_Response) error { - return x.ServerStream.SendMsg(m) -} +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type Source_Sync2Server = grpc.ServerStreamingServer[Sync2_Response] func _Source_GetMetrics_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(GetSourceMetrics_Request) diff --git a/pb/source/v1/source_grpc.pb.go b/pb/source/v1/source_grpc.pb.go index 8792d08..046b35c 100644 --- a/pb/source/v1/source_grpc.pb.go +++ b/pb/source/v1/source_grpc.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: -// - protoc-gen-go-grpc v1.4.0 +// - protoc-gen-go-grpc v1.5.1 // - protoc v4.23.4 // source: plugin-pb/source/v1/source.proto @@ -15,8 +15,8 @@ import ( // This is a compile-time assertion to ensure that this generated file // is compatible with the grpc package it is being compiled against. -// Requires gRPC-Go v1.62.0 or later. -const _ = grpc.SupportPackageIsVersion8 +// Requires gRPC-Go v1.64.0 or later. +const _ = grpc.SupportPackageIsVersion9 const ( Source_GetName_FullMethodName = "/cloudquery.source.v1.Source/GetName" @@ -46,7 +46,7 @@ type SourceClient interface { // Get all tables the source plugin supports. Must be called after Init GetDynamicTables(ctx context.Context, in *GetDynamicTables_Request, opts ...grpc.CallOption) (*GetDynamicTables_Response, error) // Start the sync the source plugin - Sync(ctx context.Context, in *Sync_Request, opts ...grpc.CallOption) (Source_SyncClient, error) + Sync(ctx context.Context, in *Sync_Request, opts ...grpc.CallOption) (grpc.ServerStreamingClient[Sync_Response], error) // Generate documentation for the source plugin GenDocs(ctx context.Context, in *GenDocs_Request, opts ...grpc.CallOption) (*GenDocs_Response, error) } @@ -119,13 +119,13 @@ func (c *sourceClient) GetDynamicTables(ctx context.Context, in *GetDynamicTable return out, nil } -func (c *sourceClient) Sync(ctx context.Context, in *Sync_Request, opts ...grpc.CallOption) (Source_SyncClient, error) { +func (c *sourceClient) Sync(ctx context.Context, in *Sync_Request, opts ...grpc.CallOption) (grpc.ServerStreamingClient[Sync_Response], error) { cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) stream, err := c.cc.NewStream(ctx, &Source_ServiceDesc.Streams[0], Source_Sync_FullMethodName, cOpts...) if err != nil { return nil, err } - x := &sourceSyncClient{ClientStream: stream} + x := &grpc.GenericClientStream[Sync_Request, Sync_Response]{ClientStream: stream} if err := x.ClientStream.SendMsg(in); err != nil { return nil, err } @@ -135,22 +135,8 @@ func (c *sourceClient) Sync(ctx context.Context, in *Sync_Request, opts ...grpc. return x, nil } -type Source_SyncClient interface { - Recv() (*Sync_Response, error) - grpc.ClientStream -} - -type sourceSyncClient struct { - grpc.ClientStream -} - -func (x *sourceSyncClient) Recv() (*Sync_Response, error) { - m := new(Sync_Response) - if err := x.ClientStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type Source_SyncClient = grpc.ServerStreamingClient[Sync_Response] func (c *sourceClient) GenDocs(ctx context.Context, in *GenDocs_Request, opts ...grpc.CallOption) (*GenDocs_Response, error) { cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) @@ -164,7 +150,7 @@ func (c *sourceClient) GenDocs(ctx context.Context, in *GenDocs_Request, opts .. // SourceServer is the server API for Source service. // All implementations must embed UnimplementedSourceServer -// for forward compatibility +// for forward compatibility. type SourceServer interface { // Get the name of the plugin GetName(context.Context, *GetName_Request) (*GetName_Response, error) @@ -179,15 +165,18 @@ type SourceServer interface { // Get all tables the source plugin supports. Must be called after Init GetDynamicTables(context.Context, *GetDynamicTables_Request) (*GetDynamicTables_Response, error) // Start the sync the source plugin - Sync(*Sync_Request, Source_SyncServer) error + Sync(*Sync_Request, grpc.ServerStreamingServer[Sync_Response]) error // Generate documentation for the source plugin GenDocs(context.Context, *GenDocs_Request) (*GenDocs_Response, error) mustEmbedUnimplementedSourceServer() } -// UnimplementedSourceServer must be embedded to have forward compatible implementations. -type UnimplementedSourceServer struct { -} +// UnimplementedSourceServer must be embedded to have +// forward compatible implementations. +// +// NOTE: this should be embedded by value instead of pointer to avoid a nil +// pointer dereference when methods are called. +type UnimplementedSourceServer struct{} func (UnimplementedSourceServer) GetName(context.Context, *GetName_Request) (*GetName_Response, error) { return nil, status.Errorf(codes.Unimplemented, "method GetName not implemented") @@ -207,13 +196,14 @@ func (UnimplementedSourceServer) Init(context.Context, *Init_Request) (*Init_Res func (UnimplementedSourceServer) GetDynamicTables(context.Context, *GetDynamicTables_Request) (*GetDynamicTables_Response, error) { return nil, status.Errorf(codes.Unimplemented, "method GetDynamicTables not implemented") } -func (UnimplementedSourceServer) Sync(*Sync_Request, Source_SyncServer) error { +func (UnimplementedSourceServer) Sync(*Sync_Request, grpc.ServerStreamingServer[Sync_Response]) error { return status.Errorf(codes.Unimplemented, "method Sync not implemented") } func (UnimplementedSourceServer) GenDocs(context.Context, *GenDocs_Request) (*GenDocs_Response, error) { return nil, status.Errorf(codes.Unimplemented, "method GenDocs not implemented") } func (UnimplementedSourceServer) mustEmbedUnimplementedSourceServer() {} +func (UnimplementedSourceServer) testEmbeddedByValue() {} // UnsafeSourceServer may be embedded to opt out of forward compatibility for this service. // Use of this interface is not recommended, as added methods to SourceServer will @@ -223,6 +213,13 @@ type UnsafeSourceServer interface { } func RegisterSourceServer(s grpc.ServiceRegistrar, srv SourceServer) { + // If the following call pancis, it indicates UnimplementedSourceServer was + // embedded by pointer and is nil. This will cause panics if an + // unimplemented method is ever invoked, so we test this at initialization + // time to prevent it from happening at runtime later due to I/O. + if t, ok := srv.(interface{ testEmbeddedByValue() }); ok { + t.testEmbeddedByValue() + } s.RegisterService(&Source_ServiceDesc, srv) } @@ -339,21 +336,11 @@ func _Source_Sync_Handler(srv interface{}, stream grpc.ServerStream) error { if err := stream.RecvMsg(m); err != nil { return err } - return srv.(SourceServer).Sync(m, &sourceSyncServer{ServerStream: stream}) -} - -type Source_SyncServer interface { - Send(*Sync_Response) error - grpc.ServerStream + return srv.(SourceServer).Sync(m, &grpc.GenericServerStream[Sync_Request, Sync_Response]{ServerStream: stream}) } -type sourceSyncServer struct { - grpc.ServerStream -} - -func (x *sourceSyncServer) Send(m *Sync_Response) error { - return x.ServerStream.SendMsg(m) -} +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type Source_SyncServer = grpc.ServerStreamingServer[Sync_Response] func _Source_GenDocs_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(GenDocs_Request) diff --git a/pb/source/v2/source_grpc.pb.go b/pb/source/v2/source_grpc.pb.go index 1edfdfc..ecb0b8a 100644 --- a/pb/source/v2/source_grpc.pb.go +++ b/pb/source/v2/source_grpc.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: -// - protoc-gen-go-grpc v1.4.0 +// - protoc-gen-go-grpc v1.5.1 // - protoc v4.23.4 // source: plugin-pb/source/v2/source.proto @@ -15,8 +15,8 @@ import ( // This is a compile-time assertion to ensure that this generated file // is compatible with the grpc package it is being compiled against. -// Requires gRPC-Go v1.62.0 or later. -const _ = grpc.SupportPackageIsVersion8 +// Requires gRPC-Go v1.64.0 or later. +const _ = grpc.SupportPackageIsVersion9 const ( Source_GetName_FullMethodName = "/cloudquery.source.v2.Source/GetName" @@ -46,7 +46,7 @@ type SourceClient interface { // Get all tables the source plugin supports. Must be called after Init GetDynamicTables(ctx context.Context, in *GetDynamicTables_Request, opts ...grpc.CallOption) (*GetDynamicTables_Response, error) // Start the sync the source plugin - Sync(ctx context.Context, in *Sync_Request, opts ...grpc.CallOption) (Source_SyncClient, error) + Sync(ctx context.Context, in *Sync_Request, opts ...grpc.CallOption) (grpc.ServerStreamingClient[Sync_Response], error) // Generate documentation for the source plugin GenDocs(ctx context.Context, in *GenDocs_Request, opts ...grpc.CallOption) (*GenDocs_Response, error) } @@ -119,13 +119,13 @@ func (c *sourceClient) GetDynamicTables(ctx context.Context, in *GetDynamicTable return out, nil } -func (c *sourceClient) Sync(ctx context.Context, in *Sync_Request, opts ...grpc.CallOption) (Source_SyncClient, error) { +func (c *sourceClient) Sync(ctx context.Context, in *Sync_Request, opts ...grpc.CallOption) (grpc.ServerStreamingClient[Sync_Response], error) { cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) stream, err := c.cc.NewStream(ctx, &Source_ServiceDesc.Streams[0], Source_Sync_FullMethodName, cOpts...) if err != nil { return nil, err } - x := &sourceSyncClient{ClientStream: stream} + x := &grpc.GenericClientStream[Sync_Request, Sync_Response]{ClientStream: stream} if err := x.ClientStream.SendMsg(in); err != nil { return nil, err } @@ -135,22 +135,8 @@ func (c *sourceClient) Sync(ctx context.Context, in *Sync_Request, opts ...grpc. return x, nil } -type Source_SyncClient interface { - Recv() (*Sync_Response, error) - grpc.ClientStream -} - -type sourceSyncClient struct { - grpc.ClientStream -} - -func (x *sourceSyncClient) Recv() (*Sync_Response, error) { - m := new(Sync_Response) - if err := x.ClientStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type Source_SyncClient = grpc.ServerStreamingClient[Sync_Response] func (c *sourceClient) GenDocs(ctx context.Context, in *GenDocs_Request, opts ...grpc.CallOption) (*GenDocs_Response, error) { cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) @@ -164,7 +150,7 @@ func (c *sourceClient) GenDocs(ctx context.Context, in *GenDocs_Request, opts .. // SourceServer is the server API for Source service. // All implementations must embed UnimplementedSourceServer -// for forward compatibility +// for forward compatibility. type SourceServer interface { // Get the name of the plugin GetName(context.Context, *GetName_Request) (*GetName_Response, error) @@ -179,15 +165,18 @@ type SourceServer interface { // Get all tables the source plugin supports. Must be called after Init GetDynamicTables(context.Context, *GetDynamicTables_Request) (*GetDynamicTables_Response, error) // Start the sync the source plugin - Sync(*Sync_Request, Source_SyncServer) error + Sync(*Sync_Request, grpc.ServerStreamingServer[Sync_Response]) error // Generate documentation for the source plugin GenDocs(context.Context, *GenDocs_Request) (*GenDocs_Response, error) mustEmbedUnimplementedSourceServer() } -// UnimplementedSourceServer must be embedded to have forward compatible implementations. -type UnimplementedSourceServer struct { -} +// UnimplementedSourceServer must be embedded to have +// forward compatible implementations. +// +// NOTE: this should be embedded by value instead of pointer to avoid a nil +// pointer dereference when methods are called. +type UnimplementedSourceServer struct{} func (UnimplementedSourceServer) GetName(context.Context, *GetName_Request) (*GetName_Response, error) { return nil, status.Errorf(codes.Unimplemented, "method GetName not implemented") @@ -207,13 +196,14 @@ func (UnimplementedSourceServer) Init(context.Context, *Init_Request) (*Init_Res func (UnimplementedSourceServer) GetDynamicTables(context.Context, *GetDynamicTables_Request) (*GetDynamicTables_Response, error) { return nil, status.Errorf(codes.Unimplemented, "method GetDynamicTables not implemented") } -func (UnimplementedSourceServer) Sync(*Sync_Request, Source_SyncServer) error { +func (UnimplementedSourceServer) Sync(*Sync_Request, grpc.ServerStreamingServer[Sync_Response]) error { return status.Errorf(codes.Unimplemented, "method Sync not implemented") } func (UnimplementedSourceServer) GenDocs(context.Context, *GenDocs_Request) (*GenDocs_Response, error) { return nil, status.Errorf(codes.Unimplemented, "method GenDocs not implemented") } func (UnimplementedSourceServer) mustEmbedUnimplementedSourceServer() {} +func (UnimplementedSourceServer) testEmbeddedByValue() {} // UnsafeSourceServer may be embedded to opt out of forward compatibility for this service. // Use of this interface is not recommended, as added methods to SourceServer will @@ -223,6 +213,13 @@ type UnsafeSourceServer interface { } func RegisterSourceServer(s grpc.ServiceRegistrar, srv SourceServer) { + // If the following call pancis, it indicates UnimplementedSourceServer was + // embedded by pointer and is nil. This will cause panics if an + // unimplemented method is ever invoked, so we test this at initialization + // time to prevent it from happening at runtime later due to I/O. + if t, ok := srv.(interface{ testEmbeddedByValue() }); ok { + t.testEmbeddedByValue() + } s.RegisterService(&Source_ServiceDesc, srv) } @@ -339,21 +336,11 @@ func _Source_Sync_Handler(srv interface{}, stream grpc.ServerStream) error { if err := stream.RecvMsg(m); err != nil { return err } - return srv.(SourceServer).Sync(m, &sourceSyncServer{ServerStream: stream}) -} - -type Source_SyncServer interface { - Send(*Sync_Response) error - grpc.ServerStream + return srv.(SourceServer).Sync(m, &grpc.GenericServerStream[Sync_Request, Sync_Response]{ServerStream: stream}) } -type sourceSyncServer struct { - grpc.ServerStream -} - -func (x *sourceSyncServer) Send(m *Sync_Response) error { - return x.ServerStream.SendMsg(m) -} +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type Source_SyncServer = grpc.ServerStreamingServer[Sync_Response] func _Source_GenDocs_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(GenDocs_Request)