Skip to content

Commit

Permalink
Replace closer with resource package (#2864)
Browse files Browse the repository at this point in the history
  • Loading branch information
rallen090 authored Nov 9, 2020
1 parent 9ba35ad commit c6a256d
Show file tree
Hide file tree
Showing 48 changed files with 223 additions and 204 deletions.
4 changes: 2 additions & 2 deletions src/aggregator/aggregator/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
"github.com/m3db/m3/src/metrics/metric/aggregated"
"github.com/m3db/m3/src/metrics/metric/unaggregated"
"github.com/m3db/m3/src/x/clock"
"github.com/m3db/m3/src/x/close"
xresource "github.com/m3db/m3/src/x/resource"

"github.com/uber-go/tally"
)
Expand Down Expand Up @@ -108,7 +108,7 @@ type metricMap struct {
firstInsertAt time.Time
rateLimiter *rate.Limiter
runtimeOpts runtime.Options
runtimeOptsCloser close.SimpleCloser
runtimeOptsCloser xresource.SimpleCloser
sleepFn sleepFn
metrics metricMapMetrics
}
Expand Down
6 changes: 3 additions & 3 deletions src/aggregator/runtime/options_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
package runtime

import (
"github.com/m3db/m3/src/x/close"
xresource "github.com/m3db/m3/src/x/resource"
"github.com/m3db/m3/src/x/watch"
)

Expand All @@ -36,7 +36,7 @@ type OptionsManager interface {
// RegisterWatcher registers a watcher that watches updates to runtime options.
// When an update arrives, the manager will deliver the update to all registered
// watchers.
RegisterWatcher(l OptionsWatcher) close.SimpleCloser
RegisterWatcher(l OptionsWatcher) xresource.SimpleCloser

// Close closes the watcher and all descendent watches
Close()
Expand Down Expand Up @@ -72,7 +72,7 @@ func (w *optionsManager) RuntimeOptions() Options {

func (w *optionsManager) RegisterWatcher(
watcher OptionsWatcher,
) close.SimpleCloser {
) xresource.SimpleCloser {
_, watch, _ := w.watchable.Watch()

// The watchable is always initialized so it's okay to do a blocking read.
Expand Down
6 changes: 3 additions & 3 deletions src/dbnode/client/connection_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (

"github.com/m3db/m3/src/dbnode/generated/thrift/rpc"
"github.com/m3db/m3/src/dbnode/topology"
xclose "github.com/m3db/m3/src/x/close"
xresource "github.com/m3db/m3/src/x/resource"
murmur3 "github.com/m3db/stackmurmur3/v2"

"github.com/uber-go/tally"
Expand Down Expand Up @@ -68,14 +68,14 @@ type connPool struct {
}

type conn struct {
channel xclose.SimpleCloser
channel xresource.SimpleCloser
client rpc.TChanNode
}

// NewConnectionFn is a function that creates a connection.
type NewConnectionFn func(
channelName string, addr string, opts Options,
) (xclose.SimpleCloser, rpc.TChanNode, error)
) (xresource.SimpleCloser, rpc.TChanNode, error)

type healthCheckFn func(client rpc.TChanNode, opts Options) error

Expand Down
6 changes: 3 additions & 3 deletions src/dbnode/client/connection_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"github.com/m3db/m3/src/dbnode/generated/thrift/rpc"
"github.com/m3db/m3/src/dbnode/topology"
xclock "github.com/m3db/m3/src/x/clock"
xclose "github.com/m3db/m3/src/x/close"
xresource "github.com/m3db/m3/src/x/resource"
"github.com/stretchr/testify/require"

"github.com/golang/mock/gomock"
Expand Down Expand Up @@ -85,7 +85,7 @@ func TestConnectionPoolConnectsAndRetriesConnects(t *testing.T) {

fn := func(
ch string, addr string, opts Options,
) (xclose.SimpleCloser, rpc.TChanNode, error) {
) (xresource.SimpleCloser, rpc.TChanNode, error) {
attempt := int(atomic.AddInt32(&attempts, 1))
if attempt == 1 {
return nil, nil, fmt.Errorf("a connect error")
Expand Down Expand Up @@ -237,7 +237,7 @@ func TestConnectionPoolHealthChecks(t *testing.T) {

fn := func(
ch string, addr string, opts Options,
) (xclose.SimpleCloser, rpc.TChanNode, error) {
) (xresource.SimpleCloser, rpc.TChanNode, error) {
attempt := atomic.AddInt32(&newConnAttempt, 1)
if attempt == 1 {
return channelNone, client1, nil
Expand Down
4 changes: 2 additions & 2 deletions src/dbnode/client/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ import (
"github.com/m3db/m3/src/dbnode/storage/index"
"github.com/m3db/m3/src/dbnode/topology"
"github.com/m3db/m3/src/x/clock"
xclose "github.com/m3db/m3/src/x/close"
"github.com/m3db/m3/src/x/context"
"github.com/m3db/m3/src/x/ident"
"github.com/m3db/m3/src/x/instrument"
"github.com/m3db/m3/src/x/pool"
xresource "github.com/m3db/m3/src/x/resource"
xretry "github.com/m3db/m3/src/x/retry"
"github.com/m3db/m3/src/x/sampler"
"github.com/m3db/m3/src/x/serialize"
Expand Down Expand Up @@ -319,7 +319,7 @@ func NewOptionsForAsyncClusters(opts Options, topoInits []topology.Initializer,

func defaultNewConnectionFn(
channelName string, address string, opts Options,
) (xclose.SimpleCloser, rpc.TChanNode, error) {
) (xresource.SimpleCloser, rpc.TChanNode, error) {
channel, err := tchannel.NewChannel(channelName, opts.ChannelOptions())
if err != nil {
return nil, nil, err
Expand Down
4 changes: 2 additions & 2 deletions src/dbnode/client/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@ import (
"github.com/m3db/m3/src/dbnode/x/xpool"
"github.com/m3db/m3/src/x/checked"
"github.com/m3db/m3/src/x/clock"
xclose "github.com/m3db/m3/src/x/close"
"github.com/m3db/m3/src/x/context"
xerrors "github.com/m3db/m3/src/x/errors"
"github.com/m3db/m3/src/x/ident"
"github.com/m3db/m3/src/x/instrument"
"github.com/m3db/m3/src/x/pool"
xresource "github.com/m3db/m3/src/x/resource"
xretry "github.com/m3db/m3/src/x/retry"
"github.com/m3db/m3/src/x/sampler"
"github.com/m3db/m3/src/x/serialize"
Expand Down Expand Up @@ -136,7 +136,7 @@ type sessionState struct {
type session struct {
state sessionState
opts Options
runtimeOptsListenerCloser xclose.Closer
runtimeOptsListenerCloser xresource.SimpleCloser
scope tally.Scope
nowFn clock.NowFn
log *zap.Logger
Expand Down
4 changes: 2 additions & 2 deletions src/dbnode/client/session_fetch_high_concurrency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ import (
"github.com/m3db/m3/src/dbnode/sharding"
"github.com/m3db/m3/src/dbnode/topology"
"github.com/m3db/m3/src/dbnode/ts"
xclose "github.com/m3db/m3/src/x/close"
"github.com/m3db/m3/src/x/ident"
xresource "github.com/m3db/m3/src/x/resource"
xtime "github.com/m3db/m3/src/x/time"

"github.com/golang/mock/gomock"
Expand Down Expand Up @@ -102,7 +102,7 @@ func TestSessionFetchIDsHighConcurrency(t *testing.T) {
// to be able to mock the entire end to end pipeline
newConnFn := func(
_ string, addr string, _ Options,
) (xclose.SimpleCloser, rpc.TChanNode, error) {
) (xresource.SimpleCloser, rpc.TChanNode, error) {
mockClient := rpc.NewMockTChanNode(ctrl)
mockClient.EXPECT().Health(gomock.Any()).
Return(healthCheckResult, nil).
Expand Down
4 changes: 2 additions & 2 deletions src/dbnode/digest/fd_digest.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ import (
"hash/adler32"
"os"

xclose "github.com/m3db/m3/src/x/close"
xresource "github.com/m3db/m3/src/x/resource"
)

// FdWithDigest is a container for a file descriptor and the digest for the file contents.
type FdWithDigest interface {
xclose.Closer
xresource.Closer

// Fd returns the file descriptor.
Fd() *os.File
Expand Down
6 changes: 3 additions & 3 deletions src/dbnode/namespace/namespace_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions src/dbnode/namespace/namespace_runtime_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ package namespace
import (
"sync"

xclose "github.com/m3db/m3/src/x/close"
xresource "github.com/m3db/m3/src/x/resource"
"github.com/m3db/m3/src/x/watch"
)

Expand Down Expand Up @@ -88,7 +88,7 @@ type RuntimeOptionsManager interface {
// RegisterListener registers a listener for updates to runtime options,
// it will synchronously call back the listener when this method is called
// to deliver the current set of runtime options.
RegisterListener(l RuntimeOptionsListener) xclose.SimpleCloser
RegisterListener(l RuntimeOptionsListener) xresource.SimpleCloser

// Close closes the watcher and all descendent watches.
Close()
Expand Down Expand Up @@ -228,7 +228,7 @@ func (w *runtimeOptionsManager) Get() RuntimeOptions {

func (w *runtimeOptionsManager) RegisterListener(
listener RuntimeOptionsListener,
) xclose.SimpleCloser {
) xresource.SimpleCloser {
_, watch, _ := w.watchable.Watch()

// We always initialize the watchable so always read
Expand Down
4 changes: 2 additions & 2 deletions src/dbnode/namespace/schema_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
"fmt"
"sync"

xclose "github.com/m3db/m3/src/x/close"
"github.com/m3db/m3/src/x/ident"
xresource "github.com/m3db/m3/src/x/resource"
xwatch "github.com/m3db/m3/src/x/watch"

"go.uber.org/zap"
Expand Down Expand Up @@ -145,7 +145,7 @@ func (sr *schemaRegistry) getSchemaHistory(nsIDStr string) (SchemaHistory, error
func (sr *schemaRegistry) RegisterListener(
nsID ident.ID,
listener SchemaListener,
) (xclose.SimpleCloser, error) {
) (xresource.SimpleCloser, error) {
if !sr.protoEnabled {
return nil, nil
}
Expand Down
4 changes: 2 additions & 2 deletions src/dbnode/namespace/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ import (

"github.com/m3db/m3/src/cluster/client"
"github.com/m3db/m3/src/dbnode/retention"
xclose "github.com/m3db/m3/src/x/close"
"github.com/m3db/m3/src/x/ident"
"github.com/m3db/m3/src/x/instrument"
xresource "github.com/m3db/m3/src/x/resource"

"github.com/gogo/protobuf/proto"
)
Expand Down Expand Up @@ -205,7 +205,7 @@ type SchemaRegistry interface {

// RegisterListener registers a schema listener for the namespace.
// If proto is not enabled, nil, nil is returned
RegisterListener(id ident.ID, listener SchemaListener) (xclose.SimpleCloser, error)
RegisterListener(id ident.ID, listener SchemaListener) (xresource.SimpleCloser, error)

// Close closes all the listeners.
Close()
Expand Down
4 changes: 2 additions & 2 deletions src/dbnode/network/server/httpjson/cluster/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import (
ns "github.com/m3db/m3/src/dbnode/network/server"
"github.com/m3db/m3/src/dbnode/network/server/httpjson"
ttcluster "github.com/m3db/m3/src/dbnode/network/server/tchannelthrift/cluster"
xclose "github.com/m3db/m3/src/x/close"
"github.com/m3db/m3/src/x/context"
xresource "github.com/m3db/m3/src/x/resource"
)

type server struct {
Expand Down Expand Up @@ -83,6 +83,6 @@ func (s *server) ListenAndServe() (ns.Close, error) {

return func() {
listener.Close()
xclose.TryClose(service)
xresource.TryClose(service) // nolint: errcheck
}, nil
}
4 changes: 2 additions & 2 deletions src/dbnode/network/server/tchannelthrift/cluster/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import (
"github.com/m3db/m3/src/dbnode/generated/thrift/rpc"
ns "github.com/m3db/m3/src/dbnode/network/server"
"github.com/m3db/m3/src/dbnode/network/server/tchannelthrift"
xclose "github.com/m3db/m3/src/x/close"
"github.com/m3db/m3/src/x/context"
xresource "github.com/m3db/m3/src/x/resource"

"github.com/uber/tchannel-go"
)
Expand Down Expand Up @@ -76,6 +76,6 @@ func (s *server) ListenAndServe() (ns.Close, error) {

return func() {
channel.Close()
xclose.TryClose(service)
xresource.TryClose(service) // nolint: errcheck
}, nil
}
4 changes: 2 additions & 2 deletions src/dbnode/network/server/tchannelthrift/node/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ import (
"github.com/m3db/m3/src/x/instrument"
xopentracing "github.com/m3db/m3/src/x/opentracing"
"github.com/m3db/m3/src/x/pool"
"github.com/m3db/m3/src/x/resource"
xresource "github.com/m3db/m3/src/x/resource"
"github.com/m3db/m3/src/x/serialize"
xtime "github.com/m3db/m3/src/x/time"

Expand Down Expand Up @@ -2302,7 +2302,7 @@ func (s *service) readEncodedResult(
segments := s.pools.segmentsArray.Get()
segments = segmentsArr(segments).grow(len(encoded))
segments = segments[:0]
ctx.RegisterFinalizer(resource.FinalizerFn(func() {
ctx.RegisterFinalizer(xresource.FinalizerFn(func() {
s.pools.segmentsArray.Put(segments)
}))

Expand Down
12 changes: 0 additions & 12 deletions src/dbnode/persist/fs/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"github.com/m3db/m3/src/dbnode/persist/fs/msgpack"
"github.com/m3db/m3/src/dbnode/persist/schema"
idxpersist "github.com/m3db/m3/src/m3ninx/persist"
xclose "github.com/m3db/m3/src/x/close"
xerrors "github.com/m3db/m3/src/x/errors"
"github.com/m3db/m3/src/x/ident"
"github.com/m3db/m3/src/x/instrument"
Expand Down Expand Up @@ -360,17 +359,6 @@ func openFiles(opener fileOpener, fds map[string]**os.File) error {
return firstErr
}

// TODO(xichen): move closeAll to m3x/close.
func closeAll(closers ...xclose.Closer) error {
multiErr := xerrors.NewMultiError()
for _, closer := range closers {
if err := closer.Close(); err != nil {
multiErr = multiErr.Add(err)
}
}
return multiErr.FinalError()
}

// DeleteFiles delete a set of files, returning all the errors encountered during
// the deletion process.
func DeleteFiles(filePaths []string) error {
Expand Down
3 changes: 2 additions & 1 deletion src/dbnode/persist/fs/files_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/m3db/m3/src/dbnode/retention"
"github.com/m3db/m3/src/x/ident"
"github.com/m3db/m3/src/x/instrument"
xresource "github.com/m3db/m3/src/x/resource"

"github.com/pborman/uuid"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -88,7 +89,7 @@ func TestCloseAllFails(t *testing.T) {
defer os.Remove(file.Name())

assert.NoError(t, file.Close())
assert.Error(t, closeAll(file))
assert.Error(t, xresource.CloseAll(file))
}

func TestDeleteFiles(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions src/dbnode/persist/fs/persist_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ import (
m3ninxpersist "github.com/m3db/m3/src/m3ninx/persist"
"github.com/m3db/m3/src/x/checked"
"github.com/m3db/m3/src/x/clock"
xclose "github.com/m3db/m3/src/x/close"
"github.com/m3db/m3/src/x/instrument"
xresource "github.com/m3db/m3/src/x/resource"

"github.com/pborman/uuid"
"github.com/uber-go/tally"
Expand Down Expand Up @@ -93,7 +93,7 @@ type persistManager struct {

metrics persistManagerMetrics

runtimeOptsListener xclose.SimpleCloser
runtimeOptsListener xresource.SimpleCloser
}

type dataPersistManager struct {
Expand Down
3 changes: 2 additions & 1 deletion src/dbnode/persist/fs/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/m3db/m3/src/dbnode/ts"
"github.com/m3db/m3/src/x/checked"
"github.com/m3db/m3/src/x/ident"
xresource "github.com/m3db/m3/src/x/resource"
"github.com/m3db/m3/src/x/serialize"
xtime "github.com/m3db/m3/src/x/time"

Expand Down Expand Up @@ -390,7 +391,7 @@ func (w *writer) closeWOIndex() error {
return err
}

return closeAll(
return xresource.CloseAll(
w.infoFdWithDigest,
w.indexFdWithDigest,
w.summariesFdWithDigest,
Expand Down
Loading

0 comments on commit c6a256d

Please sign in to comment.