-
Notifications
You must be signed in to change notification settings - Fork 124
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(rpc): Add connection manager #49
Conversation
WalkthroughThe Ethereum client in the Go codebase has been enhanced for better context handling and connection health checks. The Changes
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (invoked as PR comments)
Additionally, you can add CodeRabbit Configration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review Status
Actionable comments generated: 6
Configuration used: CodeRabbit UI
Files selected for processing (10)
- client/eth/client.go (5 hunks)
- client/eth/client_provider.go (2 hunks)
- client/eth/config.go (1 hunks)
- client/eth/connection_pool.go (1 hunks)
- client/eth/health_client.go (1 hunks)
- cmd/start.go (1 hunks)
- config/config.go (1 hunks)
- core/transactor/sender/sender.go (2 hunks)
- core/transactor/tracker/noncer.go (1 hunks)
- core/transactor/transactor.go (1 hunks)
Additional comments: 25
client/eth/config.go (2)
5-6: The renaming of
EthHTTPURL
andEthWSURL
toEthHTTPURLs
andEthWSURLs
suggests that the system now supports multiple Ethereum endpoints. Ensure that the rest of the codebase is updated to handle multiple URLs where these configurations are used.11-12: The default configuration has been updated to match the new field names. Confirm that these defaults are appropriate for the intended use cases and that they are correctly handled throughout the application.
config/config.go (1)
- 18-18: The
Eth
configuration field has been replaced withConnectionPool
. This change should be cross-checked with the rest of the application to ensure that all references to the oldEth
field are updated to use the newConnectionPool
configuration.client/eth/health_client.go (3)
29-42: The
DialContext
method inHealthCheckedClient
dials a new connection and starts a health check in a goroutine. Ensure that the goroutine is properly managed and does not lead to a resource leak if the client is closed or encounters an error.45-49: The
Healthy
method uses a mutex to protect the read of thehealthy
field. This is good practice for concurrent access. Ensure that all other accesses to this field are similarly protected.58-71: The
StartHealthCheck
method contains a loop that continuously checks the health of the client. Ensure that there is a mechanism to stop this loop when the client is closed to prevent a goroutine leak.core/transactor/tracker/noncer.go (1)
- 37-40: The
InitializeExistingTxs
method has been added to theNoncer
type. Ensure that this method is called at the appropriate time during application startup to initialize the transaction pool.client/eth/connection_pool.go (6)
14-18: The
ConnectionPool
interface has been updated with new methodsDial
andDialContext
, and theClose
method. Ensure that all implementations of this interface are updated accordingly.22-25: The
ConnectionPoolImpl
struct now includes alogger
field. Verify that the logger is being used consistently throughout the implementation.29-31: The
ConnectionPoolConfig
struct has been updated with new fields to support multiple URLs and a default timeout. Ensure that these new configurations are used appropriately in the connection pool's methods.41-48: The
NewConnectionPoolImpl
function initializes a newConnectionPoolImpl
with a cache eviction function. Ensure that the eviction function correctly handles closing of clients and that the sleep duration is appropriate for the use case.60-68: The
Close
method inConnectionPoolImpl
iterates over all clients and closes them. Ensure that this method is called during application shutdown to cleanly close all connections.75-90: The
DialContext
method inConnectionPoolImpl
dials multiple clients and adds them to the cache. Ensure that error handling is robust and that partially successful dials are managed correctly.cmd/start.go (4)
78-81: The initialization of
ConnectionPoolImpl
withcfg.ConnectionPool
andlogger
should be verified to ensure that the connection pool is correctly configured and that the logger is used consistently.83-86: The creation of
ChainProviderImpl
with the connection pool should be checked to ensure that theChainProviderImpl
is correctly utilizing the connection pool for Ethereum client interactions.88-89: The call to
cpi.DialContext
should be verified to ensure that it is correctly establishing connections as part of the application startup sequence.92-92: The registration of the Ethereum client with the application builder should be verified to ensure that the application is correctly using the new client provider implementation.
core/transactor/sender/sender.go (1)
- 45-49: The updated error handling and retry logic in the
SendTransaction
function should be verified to ensure that it correctly captures the gas price and applies the replacement policy as intended.client/eth/client.go (4)
21-21: The addition of the
DialContext
method to theClient
interface should be verified to ensure that all implementations of this interface are updated to include this new method.51-53: The addition of the
TxPoolContent
method to theReader
interface should be verified to ensure that all implementations of this interface are updated to include this new method and that it is used correctly throughout the application.63-63: The renaming of the
client
struct toExtendedEthClient
should be verified to ensure that all references to the old struct name are updated throughout the application.112-120: The
TxPoolContent
method inExtendedEthClient
should be verified to ensure that it correctly retrieves the transaction pool content using thetxpool_content
RPC call.core/transactor/transactor.go (1)
- 119-121: The call to
noncer.InitializeExistingTxs
within themainLoop
function should be verified to ensure that it is correctly initializing transactions from the pool at the appropriate time during the transactor's operation.client/eth/client_provider.go (2)
32-32: The change in the return type of the
NewChainProviderImpl
function fromChainProvider
toClient
should be verified to ensure that it aligns with the intended usage of the function and that all callers are updated accordingly.220-227: The addition of the
TxPoolContent
method to theChainProviderImpl
type should be verified to ensure that it is correctly implemented and that the method is used appropriately within the application.
type HealthCheckedClient struct { | ||
*ExtendedEthClient | ||
dialurl string | ||
logger log.Logger | ||
healthy bool | ||
healthCheckInterval time.Duration | ||
mu sync.Mutex | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The HealthCheckedClient
struct has been introduced with a logger
and healthCheckInterval
. Ensure that the health check interval is configurable and not hardcoded, as indicated by the // todo parameterize
comment.
func NewHealthCheckedClient(logger log.Logger) *HealthCheckedClient { | ||
return &HealthCheckedClient{ | ||
logger: logger, | ||
healthCheckInterval: 5 * time.Second, //nolint:gomnd // todo paramaterize. | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The NewHealthCheckedClient
function initializes a HealthCheckedClient
with a hardcoded health check interval. It would be better to allow this value to be passed as a parameter or read from a configuration to provide more flexibility.
- func NewHealthCheckedClient(logger log.Logger) *HealthCheckedClient {
+ func NewHealthCheckedClient(logger log.Logger, interval time.Duration) *HealthCheckedClient {
- healthCheckInterval: 5 * time.Second, //nolint:gomnd // todo paramaterize.
+ healthCheckInterval: interval,
Committable suggestion
❗ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
func NewHealthCheckedClient(logger log.Logger) *HealthCheckedClient { | |
return &HealthCheckedClient{ | |
logger: logger, | |
healthCheckInterval: 5 * time.Second, //nolint:gomnd // todo paramaterize. | |
} | |
func NewHealthCheckedClient(logger log.Logger, interval time.Duration) *HealthCheckedClient { | |
return &HealthCheckedClient{ | |
logger: logger, | |
healthCheckInterval: interval, | |
} |
client/eth/health_client.go
Outdated
func (c *HealthCheckedClient) SetHealthy(healthy bool) { | ||
c.mu.Lock() | ||
defer c.mu.Unlock() | ||
c.logger.With() | ||
c.healthy = healthy | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The SetHealthy
method sets the healthy
field under a mutex lock, which is good for thread safety. However, the logger.With()
call seems to be incomplete and should either be removed or completed with additional context.
- c.logger.With()
+ // Either complete the logger context or remove this line.
Committable suggestion
❗ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
func (c *HealthCheckedClient) SetHealthy(healthy bool) { | |
c.mu.Lock() | |
defer c.mu.Unlock() | |
c.logger.With() | |
c.healthy = healthy | |
} | |
func (c *HealthCheckedClient) SetHealthy(healthy bool) { | |
c.mu.Lock() | |
defer c.mu.Unlock() | |
// Either complete the logger context or remove this line. | |
c.healthy = healthy | |
} |
func (n *Noncer) InitializeExistingTxs(ctx context.Context) error { | ||
_, err := n.ethClient.TxPoolContent(ctx) | ||
return err | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The InitializeExistingTxs
method does not seem to use the result from n.ethClient.TxPoolContent(ctx)
. If the intention is to initialize some state with the transaction pool content, the implementation might be incomplete.
client/eth/connection_pool.go
Outdated
func (c *ConnectionPoolImpl) GetAnyChainClient() (*HealthCheckedClient, bool) { | ||
c.mutex.Lock() | ||
defer c.mutex.Unlock() | ||
// If replacing, be sure to close old client first. | ||
// The LRU cache's eviction policy is not triggered on value updates/replacements. | ||
if c.cache.Contains(cfg.EthHTTPURL) { | ||
err := c.removeClient(cfg.EthHTTPURL) | ||
if err != nil { | ||
return false | ||
} | ||
retry: | ||
_, client, ok := c.cache.GetOldest() | ||
if client.Healthy() { | ||
goto retry | ||
} | ||
client := NewClient(&cfg) | ||
return c.cache.Add(cfg.EthHTTPURL, client) | ||
return client, ok |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The GetAnyChainClient
method in ConnectionPoolImpl
retrieves a client from the cache. The retry logic with the label retry
seems incorrect as it would create an infinite loop if the first client is healthy. This needs to be fixed.
- if client.Healthy() {
+ if !client.Healthy() {
Committable suggestion
❗ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
func (c *ConnectionPoolImpl) GetAnyChainClient() (*HealthCheckedClient, bool) { | |
c.mutex.Lock() | |
defer c.mutex.Unlock() | |
// If replacing, be sure to close old client first. | |
// The LRU cache's eviction policy is not triggered on value updates/replacements. | |
if c.cache.Contains(cfg.EthHTTPURL) { | |
err := c.removeClient(cfg.EthHTTPURL) | |
if err != nil { | |
return false | |
} | |
retry: | |
_, client, ok := c.cache.GetOldest() | |
if client.Healthy() { | |
goto retry | |
} | |
client := NewClient(&cfg) | |
return c.cache.Add(cfg.EthHTTPURL, client) | |
return client, ok | |
func (c *ConnectionPoolImpl) GetAnyChainClient() (*HealthCheckedClient, bool) { | |
c.mutex.Lock() | |
defer c.mutex.Unlock() | |
retry: | |
_, client, ok := c.cache.GetOldest() | |
if !client.Healthy() { | |
goto retry | |
} | |
return client, ok |
func (c *ExtendedEthClient) Close() error { | ||
if c == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Close
method in ExtendedEthClient
calls c.Close()
on itself, which will result in a recursive call and a stack overflow. This should be corrected to call the close method on the embedded ethclient.Client
.
- c.Close()
+ c.Client.Close()
Committable suggestion
❗ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
func (c *ExtendedEthClient) Close() error { | |
if c == nil { | |
func (c *ExtendedEthClient) Close() error { | |
if c == nil { | |
c.Client.Close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review Status
Actionable comments generated: 6
Configuration used: CodeRabbit UI
Files ignored due to path filters (2)
config.toml
is excluded by:!**/*.toml
examples/listener/config.toml
is excluded by:!**/*.toml
Files selected for processing (1)
- client/eth/connection_pool.go (1 hunks)
Additional comments: 3
client/eth/connection_pool.go (3)
- 14-18: The
ConnectionPool
interface has been updated with new methodsDial
andDialContext
, and a newClose
method. Ensure that all implementations of this interface are updated to include these new methods.Verification successful
The verification process has successfully identified the implementation of the
GetAnyChainClient
,Close
, andDial
methods within theConnectionPoolImpl
structure inclient/eth/connection_pool.go
. Additionally, theClose
method is also implemented in theExtendedEthClient
structure inclient/eth/client.go
. TheDialContext
method is referenced in theDial
method implementation, indicating its existence, although the full implementation details are not shown in the output.Based on the provided output, it appears that the
ConnectionPoolImpl
structure has been updated to include the new methods as required by the updatedConnectionPool
interface. However, the output for theRemoveChainClient
method and the full implementation ofDialContext
method are not shown, which suggests that further verification may be needed for these specific methods.Given the information at hand, it seems appropriate to conclude that the
ConnectionPool
interface's new methods have been implemented in theConnectionPoolImpl
structure, with the caveat that theRemoveChainClient
method and full details ofDialContext
need further confirmation.* 22-25: The `ConnectionPoolImpl` struct now includes a `logger` field. This is a good addition for improved logging and debugging capabilities.Scripts Executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Search for all implementations of ConnectionPool interface to ensure they implement new methods. ast-grep --lang go --pattern $'type $_ struct { $$$ } func ($_ $_) GetAnyChainClient() (*HealthCheckedClient, bool) { $$$ } func ($_ $_) RemoveChainClient(string) error { $$$ } func ($_ $_) Close() error { $$$ } func ($_ $_) Dial(string) error { $$$ } func ($_ $_) DialContext(context.Context, string) error { $$$ }'Length of output: 1048
Script:
#!/bin/bash # Search for all implementations of ConnectionPool interface to ensure they implement the new methods. # We will check for each method individually to avoid the multiple AST nodes error. # Check for implementations of GetAnyChainClient method ast-grep --lang go --pattern $'type $_ struct { $$$ } func ($_ $_) GetAnyChainClient() (*HealthCheckedClient, bool) { $$$ }' # Check for implementations of RemoveChainClient method ast-grep --lang go --pattern $'type $_ struct { $$$ } func ($_ $_) RemoveChainClient(string) error { $$$ }' # Check for implementations of Close method ast-grep --lang go --pattern $'type $_ struct { $$$ } func ($_ $_) Close() error { $$$ }' # Check for implementations of Dial method ast-grep --lang go --pattern $'type $_ struct { $$$ } func ($_ $_) Dial(string) error { $$$ }' # Check for implementations of DialContext method ast-grep --lang go --pattern $'type $_ struct { $$$ } func ($_ $_) DialContext(context.Context, string) error { $$$ }'Length of output: 3068
Script:
#!/bin/bash # Simplify the ast-grep pattern to only match the function definitions. # Check for implementations of GetAnyChainClient method ast-grep --lang go --pattern $'func ($_ $_) GetAnyChainClient() (*HealthCheckedClient, bool) { $$$ }' # Check for implementations of RemoveChainClient method ast-grep --lang go --pattern $'func ($_ $_) RemoveChainClient(string) error { $$$ }' # Check for implementations of Close method ast-grep --lang go --pattern $'func ($_ $_) Close() error { $$$ }' # Check for implementations of Dial method ast-grep --lang go --pattern $'func ($_ $_) Dial(string) error { $$$ }' # Check for implementations of DialContext method ast-grep --lang go --pattern $'func ($_ $_) DialContext(context.Context, string) error { $$$ }'Length of output: 2068
- 34-38: The
DefaultConnectPoolConfig
function provides default values for theConnectionPoolConfig
. It's crucial to ensure that these defaults are sensible and that they are overridden appropriately in different environments.
EthHTTPURLs []string | ||
EthWSURLs []string | ||
DefaultTimeout time.Duration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The definition of ConnectionPoolConfig
in connection_pool.go
confirms the presence of the DefaultTimeout
field, but the output does not indicate that a default value is set within the struct definition itself. Given that the instantiation of ConnectionPoolConfig
found in the codebase does not include the DefaultTimeout
field, this could potentially be an oversight that needs to be addressed.
Based on the information gathered, it is recommended to ensure that all parts of the codebase that instantiate ConnectionPoolConfig
are updated to initialize the DefaultTimeout
field, unless there is logic elsewhere in the code that sets a default value for this field when it is not explicitly initialized.
Analysis chain
The ConnectionPoolConfig
struct has been updated with new fields. It's important to ensure that these new fields are properly documented and that all parts of the codebase that instantiate ConnectionPoolConfig
are updated to initialize these new fields.
Scripts Executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for instantiations of ConnectionPoolConfig to ensure new fields are initialized.
ast-grep --lang go --pattern $'ConnectionPoolConfig{$$$}'
Length of output: 330
Script:
#!/bin/bash
# Search for all instantiations of ConnectionPoolConfig across the codebase to check if the DefaultTimeout field is being initialized.
ast-grep --lang go --pattern $'ConnectionPoolConfig{$$$}'
Length of output: 330
Script:
#!/bin/bash
# Check the definition of ConnectionPoolConfig to see if a default value for DefaultTimeout is specified.
ast-grep --lang go --pattern $'type ConnectionPoolConfig struct {$$$}'
Length of output: 363
client/eth/connection_pool.go
Outdated
func NewConnectionPoolImpl(cfg ConnectionPoolConfig, logger log.Logger) (ConnectionPool, error) { | ||
cache, err := lru.NewWithEvict[string, *HealthCheckedClient]( | ||
len(cfg.EthHTTPURLs)+len(cfg.EthWSURLs), func(_ string, v *HealthCheckedClient) { | ||
defer v.Close() | ||
// The timeout is added so that any in progress | ||
// requests have a chance to complete before we close. | ||
time.Sleep(cfg.DefaultTimeout) | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The NewConnectionPoolImpl
function now takes a log.Logger
as a parameter, which is a good practice for dependency injection. However, the eviction function in lru.NewWithEvict
sleeps for cfg.DefaultTimeout
which could block the eviction process. Consider using a non-blocking approach to handle in-progress requests.
- time.Sleep(cfg.DefaultTimeout)
+ // Implement a non-blocking approach to handle in-progress requests before closing the client.
Committable suggestion
❗ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
func NewConnectionPoolImpl(cfg ConnectionPoolConfig, logger log.Logger) (ConnectionPool, error) { | |
cache, err := lru.NewWithEvict[string, *HealthCheckedClient]( | |
len(cfg.EthHTTPURLs)+len(cfg.EthWSURLs), func(_ string, v *HealthCheckedClient) { | |
defer v.Close() | |
// The timeout is added so that any in progress | |
// requests have a chance to complete before we close. | |
time.Sleep(cfg.DefaultTimeout) | |
}) | |
func NewConnectionPoolImpl(cfg ConnectionPoolConfig, logger log.Logger) (ConnectionPool, error) { | |
cache, err := lru.NewWithEvict[string, *HealthCheckedClient]( | |
len(cfg.EthHTTPURLs)+len(cfg.EthWSURLs), func(_ string, v *HealthCheckedClient) { | |
defer v.Close() | |
// Implement a non-blocking approach to handle in-progress requests before closing the client. | |
}) |
func (c *ConnectionPoolImpl) Dial(string) error { | ||
return c.DialContext(context.Background(), "") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Dial
method is a wrapper for DialContext
with a background context and an empty string. This seems like an incomplete implementation as it does not use the provided string parameter. Ensure that this is the intended behavior or that the method is fully implemented.
client/eth/connection_pool.go
Outdated
func (c *ConnectionPoolImpl) DialContext(ctx context.Context, _ string) error { | ||
for _, url := range c.config.EthHTTPURLs { | ||
client := NewHealthCheckedClient(c.logger) | ||
if err := client.DialContext(ctx, url); err != nil { | ||
return err | ||
} | ||
c.cache.Add(url, client) | ||
} | ||
for _, url := range c.config.EthWSURLs { | ||
client := NewHealthCheckedClient(c.logger) | ||
if err := client.DialContext(ctx, url); err != nil { | ||
return err | ||
} | ||
c.cache.Add(url, client) | ||
} | ||
return nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The DialContext
method attempts to dial all configured URLs and adds them to the cache. Ensure that there is error handling for the case where some URLs fail to connect, potentially leaving the pool in a partially initialized state.
+ // Consider adding error handling for partial failures.
Committable suggestion
❗ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
func (c *ConnectionPoolImpl) DialContext(ctx context.Context, _ string) error { | |
for _, url := range c.config.EthHTTPURLs { | |
client := NewHealthCheckedClient(c.logger) | |
if err := client.DialContext(ctx, url); err != nil { | |
return err | |
} | |
c.cache.Add(url, client) | |
} | |
for _, url := range c.config.EthWSURLs { | |
client := NewHealthCheckedClient(c.logger) | |
if err := client.DialContext(ctx, url); err != nil { | |
return err | |
} | |
c.cache.Add(url, client) | |
} | |
return nil | |
func (c *ConnectionPoolImpl) DialContext(ctx context.Context, _ string) error { | |
// Consider adding error handling for partial failures. | |
for _, url := range c.config.EthHTTPURLs { | |
client := NewHealthCheckedClient(c.logger) | |
if err := client.DialContext(ctx, url); err != nil { | |
return err | |
} | |
c.cache.Add(url, client) | |
} | |
for _, url := range c.config.EthWSURLs { | |
client := NewHealthCheckedClient(c.logger) | |
if err := client.DialContext(ctx, url); err != nil { | |
return err | |
} | |
c.cache.Add(url, client) | |
} | |
return nil | |
} |
client/eth/connection_pool.go
Outdated
func (c *ConnectionPoolImpl) GetAnyChainClient() (*HealthCheckedClient, bool) { | ||
c.mutex.Lock() | ||
defer c.mutex.Unlock() | ||
// If replacing, be sure to close old client first. | ||
// The LRU cache's eviction policy is not triggered on value updates/replacements. | ||
if c.cache.Contains(cfg.EthHTTPURL) { | ||
err := c.removeClient(cfg.EthHTTPURL) | ||
if err != nil { | ||
return false | ||
} | ||
retry: | ||
_, client, ok := c.cache.GetOldest() | ||
if !client.Healthy() { | ||
goto retry | ||
} | ||
client := NewClient(&cfg) | ||
return c.cache.Add(cfg.EthHTTPURL, client) | ||
return client, ok |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The GetAnyChainClient
method uses a retry loop without a break condition, which could lead to an infinite loop if all clients are unhealthy. Implement a maximum retry count or a timeout to prevent this.
retry:
- _, client, ok := c.cache.GetOldest()
- if !client.Healthy() {
- goto retry
- }
+ // Implement a maximum retry count or a timeout to prevent potential infinite loops.
Committable suggestion
❗ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
func (c *ConnectionPoolImpl) GetAnyChainClient() (*HealthCheckedClient, bool) { | |
c.mutex.Lock() | |
defer c.mutex.Unlock() | |
// If replacing, be sure to close old client first. | |
// The LRU cache's eviction policy is not triggered on value updates/replacements. | |
if c.cache.Contains(cfg.EthHTTPURL) { | |
err := c.removeClient(cfg.EthHTTPURL) | |
if err != nil { | |
return false | |
} | |
retry: | |
_, client, ok := c.cache.GetOldest() | |
if !client.Healthy() { | |
goto retry | |
} | |
client := NewClient(&cfg) | |
return c.cache.Add(cfg.EthHTTPURL, client) | |
return client, ok | |
func (c *ConnectionPoolImpl) GetAnyChainClient() (*HealthCheckedClient, bool) { | |
c.mutex.Lock() | |
defer c.mutex.Unlock() | |
// Implement a maximum retry count or a timeout to prevent potential infinite loops. | |
return client, ok |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review Status
Actionable comments generated: 6
Configuration used: CodeRabbit UI
Files selected for processing (4)
- client/eth/client_provider.go (16 hunks)
- client/eth/connection_pool.go (1 hunks)
- client/eth/health_client.go (1 hunks)
- types/queue/sqs/sqs.go (3 hunks)
Files skipped from review as they are similar to previous changes (2)
- client/eth/client_provider.go
- client/eth/health_client.go
Additional comments: 8
client/eth/connection_pool.go (4)
14-19: The
ConnectionPool
interface has been updated with new methodsDial
andDialContext
, and a newClose
method has been added. Ensure that all implementations of this interface are updated to include these new methods.23-27: The
ConnectionPoolImpl
struct now includes alogger
field. Ensure that the logger is being used appropriately throughout the implementation and that it is properly initialized whenConnectionPoolImpl
is instantiated.31-33: The
ConnectionPoolConfig
struct has been updated with new fields. Ensure that these new fields are properly documented and that all parts of the codebase that instantiateConnectionPoolConfig
are updated to initialize these new fields.43-53: The
NewConnectionPoolImpl
function now takes alog.Logger
as a parameter. Verify that all calls to this function have been updated to pass the logger.Verification successful
The output from the
rg
command indicates that there are two occurrences ofNewConnectionPoolImpl
in the codebase:
- The definition of the function itself in
client/eth/connection_pool.go
.- A call to this function in
cmd/start.go
, where it is being passed two parameters:cfg.ConnectionPool
andlogger
.This confirms that the call to
NewConnectionPoolImpl
incmd/start.go
has been updated to include thelogger
as a parameter, as required by the new function signature.Based on this information, the review comment has been addressed correctly in the codebase.
Scripts Executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Search for calls to NewConnectionPoolImpl to ensure the logger is being passed. ast-grep --lang go --pattern $'NewConnectionPoolImpl($_, $_)'Length of output: 62
Script:
#!/bin/bash # Search for calls to NewConnectionPoolImpl to ensure the logger is being passed. # Using ripgrep to search for the function call pattern in .go files rg 'NewConnectionPoolImpl\(' --type goLength of output: 249
types/queue/sqs/sqs.go (4)
37-37: The
Queue
struct now includes afifoQueueID
field. Ensure that this field is properly documented and that it is being used correctly in all relevant methods.41-47: The
NewQueueFromConfig
function has been updated to initialize the newfifoQueueID
field. Verify that all calls to this function have been updated to provide the new parameter.33-55: > Note: This review was outside the patches, so it was mapped to the patch with the greatest overlap. Original lines [52-66]
The
NewQueue
function has been updated to accept afifoQueueID
parameter and pass it toNewQueueFromConfig
. Ensure that all calls toNewQueue
are updated accordingly.
- 80-82: The
Push
method has been modified to include theMessageGroupId
when sending a message to the SQS queue. Ensure that thefifoQueueID
is being set correctly and that it is appropriate for all use cases wherePush
is called.
func DefaultConnectPoolConfig() *ConnectionPoolConfig { | ||
return &ConnectionPoolConfig{ | ||
EthHTTPURLs: []string{"http://localhost:8545"}, | ||
EthWSURLs: []string{"ws://localhost:8546"}, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The DefaultConnectPoolConfig
function provides default values for EthHTTPURLs
and EthWSURLs
. Ensure that the DefaultTimeout
field is also given a sensible default value if it is not being set elsewhere.
func (c *ConnectionPoolImpl) Close() error { | ||
c.mutex.Lock() | ||
defer c.mutex.Unlock() | ||
return c.cache.Get(clientAddr) | ||
for _, client := range c.cache.Keys() { | ||
if err := c.removeClient(client); err != nil { | ||
return err | ||
} | ||
} | ||
return nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Close
method in ConnectionPoolImpl
locks the entire pool to close all clients. This could be a performance bottleneck if the pool is large. Consider closing clients concurrently.
- for _, client := range c.cache.Keys() {
- if err := c.removeClient(client); err != nil {
- return err
- }
- }
+ // Implement concurrent client closing logic to improve performance.
Committable suggestion
❗ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
func (c *ConnectionPoolImpl) Close() error { | |
c.mutex.Lock() | |
defer c.mutex.Unlock() | |
return c.cache.Get(clientAddr) | |
for _, client := range c.cache.Keys() { | |
if err := c.removeClient(client); err != nil { | |
return err | |
} | |
} | |
return nil | |
func (c *ConnectionPoolImpl) Close() error { | |
c.mutex.Lock() | |
defer c.mutex.Unlock() | |
// Implement concurrent client closing logic to improve performance. | |
return nil |
func (c *ConnectionPoolImpl) Dial(string) error { | ||
return c.DialContext(context.Background(), "") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Dial
method is a wrapper for DialContext
with a background context and an empty string. This seems like an incomplete implementation as it does not use the provided string parameter. Ensure that this is the intended behavior or that the method is fully implemented.
func (c *ConnectionPoolImpl) DialContext(ctx context.Context, _ string) error { | ||
for _, url := range c.config.EthHTTPURLs { | ||
client := NewHealthCheckedClient(c.logger) | ||
if err := client.DialContext(ctx, url); err != nil { | ||
return err | ||
} | ||
c.cache.Add(url, client) | ||
} | ||
for _, url := range c.config.EthWSURLs { | ||
client := NewHealthCheckedClient(c.logger) | ||
if err := client.DialContext(ctx, url); err != nil { | ||
return err | ||
} | ||
c.wsCache.Add(url, client) | ||
} | ||
return nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The DialContext
method attempts to dial all HTTP and WS URLs and add them to their respective caches. Ensure that there is error handling for the case where some URLs fail to dial, potentially leaving the pool in a partially initialized state.
func (c *ConnectionPoolImpl) GetHTTP() (*HealthCheckedClient, bool) { | ||
c.mutex.Lock() | ||
defer c.mutex.Unlock() | ||
retry: | ||
_, client, ok := c.cache.GetOldest() | ||
if !client.Healthy() { | ||
goto retry | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The GetHTTP
method uses a retry loop with a label to get the oldest client and check its health. Ensure that there is a maximum number of retries or a timeout to prevent an infinite loop if all clients are unhealthy.
func (c *ConnectionPoolImpl) GetWS() (*HealthCheckedClient, bool) { | ||
c.mutex.Lock() | ||
defer c.mutex.Unlock() | ||
// If replacing, be sure to close old client first. | ||
// The LRU cache's eviction policy is not triggered on value updates/replacements. | ||
if c.cache.Contains(cfg.EthHTTPURL) { | ||
err := c.removeClient(cfg.EthHTTPURL) | ||
if err != nil { | ||
return false | ||
} | ||
retry: | ||
_, client, ok := c.wsCache.GetOldest() | ||
if !client.Healthy() { | ||
goto retry | ||
} | ||
client := NewClient(&cfg) | ||
return c.cache.Add(cfg.EthHTTPURL, client) | ||
return client, ok |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to GetHTTP
, the GetWS
method also uses a retry loop. Apply the same considerations here regarding a maximum number of retries or a timeout.
Summary by CodeRabbit
New Features
Enhancements
Configuration Changes
Refactor
Bug Fixes