Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rpc): Add connection manager #49

Merged
merged 5 commits into from
Jan 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 23 additions & 60 deletions client/eth/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const (
)

type Client interface {
Dial() error
DialContext(ctx context.Context, rawurl string) error
Close() error
Reader
Writer
Expand Down Expand Up @@ -48,6 +48,9 @@ type Reader interface {
SuggestGasTipCap(ctx context.Context) (*big.Int, error)
TransactionByHash(ctx context.Context, hash common.Hash,
) (tx *ethcoretypes.Transaction, isPending bool, err error)
TxPoolContent(
ctx context.Context) (
map[string]map[string]map[string]*ethcoretypes.Transaction, error)
}

type Writer interface {
Expand All @@ -57,71 +60,20 @@ type Writer interface {
}

// client is the indexer eth client.
type client struct {
type ExtendedEthClient struct {
*ethclient.Client
cfg *Config
wsclient *ethclient.Client
}

// NewClient returns a new client. It has both reader and writer privilege.
func NewClient(cfg *Config) Client {
client := &client{
cfg: cfg,
}
return client
}

// ==================================================================
// Client Lifecycle
// ==================================================================

// Dial dials the client.
func (c *client) Dial() error {
if c.Client != nil || c.wsclient != nil {
return ErrAlreadyDial
}
// TODO: manage context better
ctx := context.Background()
retries := 0
var err error
var httpclient, wsethclient *ethclient.Client
for retries < MaxRetries {
retries++
httpclient, err = ethclient.DialContext(ctx, c.cfg.EthHTTPURL)
if err == nil {
c.Client = httpclient
break
}
time.Sleep(defaultRetryTime)
}
if err != nil {
return err
}

retries = 0
for retries < MaxRetries {
retries++
wsethclient, err = ethclient.DialContext(ctx, c.cfg.EthWSURL)
if err == nil {
c.wsclient = wsethclient
break
}
time.Sleep(defaultRetryTime)
}
if err != nil {
return err
}

return nil
}

// Close closes the client.
func (c *client) Close() error {
if c == nil || c.wsclient == nil {
func (c *ExtendedEthClient) Close() error {
if c == nil {
Comment on lines +72 to +73
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Close method in ExtendedEthClient calls c.Close() on itself, which will result in a recursive call and a stack overflow. This should be corrected to call the close method on the embedded ethclient.Client.

- c.Close()
+ c.Client.Close()

Committable suggestion

IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
func (c *ExtendedEthClient) Close() error {
if c == nil {
func (c *ExtendedEthClient) Close() error {
if c == nil {
c.Client.Close()

return ErrClosed
}
c.Close()
c.wsclient.Close()
return nil
}

Expand All @@ -130,7 +82,7 @@ func (c *client) Close() error {
// ==================================================================

// GetReceipts returns the receipts for the given transactions.
func (c *client) GetReceipts(
func (c *ExtendedEthClient) GetReceipts(
ctx context.Context, txs ethcoretypes.Transactions) (ethcoretypes.Receipts, error) {
var receipts ethcoretypes.Receipts
for _, tx := range txs {
Expand All @@ -144,15 +96,26 @@ func (c *client) GetReceipts(
}

// SubscribeNewHead subscribes to new block headers.
func (c *client) SubscribeNewHead(
func (c *ExtendedEthClient) SubscribeNewHead(
ctx context.Context) (chan *ethcoretypes.Header, ethereum.Subscription, error) {
ch := make(chan *ethcoretypes.Header)
sub, err := c.wsclient.SubscribeNewHead(ctx, ch)
sub, err := c.Client.SubscribeNewHead(ctx, ch)
return ch, sub, err
}

func (c *client) SubscribeFilterLogs(
func (c *ExtendedEthClient) SubscribeFilterLogs(
ctx context.Context,
q ethereum.FilterQuery, ch chan<- ethcoretypes.Log) (ethereum.Subscription, error) {
return c.wsclient.SubscribeFilterLogs(ctx, q, ch)
return c.Client.SubscribeFilterLogs(ctx, q, ch)
}

func (c *ExtendedEthClient) TxPoolContent(
ctx context.Context,
) (map[string]map[string]map[string]*ethcoretypes.Transaction, error) {
// var result map[string]map[string]map[string]*ethcoretypes.Transaction
var result map[string]map[string]map[string]*ethcoretypes.Transaction
if err := c.Client.Client().CallContext(ctx, &result, "txpool_content"); err != nil {
return nil, err
}
return result, nil
}
59 changes: 39 additions & 20 deletions client/eth/client_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type ChainProviderImpl struct {
}

// NewChainProviderImpl creates a new ChainProviderImpl with the given ConnectionPool.
func NewChainProviderImpl(pool ConnectionPool) (ChainProvider, error) {
func NewChainProviderImpl(pool ConnectionPool) (Client, error) {
return &ChainProviderImpl{pool}, nil
}

Expand All @@ -40,7 +40,7 @@ func NewChainProviderImpl(pool ConnectionPool) (ChainProvider, error) {
// BlockByNumber returns the block for the given number.
func (c *ChainProviderImpl) BlockByNumber(
ctx context.Context, num *big.Int) (*types.Block, error) {
if client, ok := c.GetAnyChainClient(); ok {
if client, ok := c.GetHTTP(); ok {
return client.BlockByNumber(ctx, num)
}
return nil, ErrClientNotFound
Expand All @@ -49,7 +49,7 @@ func (c *ChainProviderImpl) BlockByNumber(
// BlockReceipts returns the receipts for the given block number or hash.
func (c *ChainProviderImpl) BlockReceipts(
ctx context.Context, blockNrOrHash rpc.BlockNumberOrHash) ([]*types.Receipt, error) {
if client, ok := c.GetAnyChainClient(); ok {
if client, ok := c.GetHTTP(); ok {
return client.BlockReceipts(ctx, blockNrOrHash)
}
return nil, ErrClientNotFound
Expand All @@ -58,7 +58,7 @@ func (c *ChainProviderImpl) BlockReceipts(
// TransactionReceipt returns the receipt for the given transaction hash.
func (c *ChainProviderImpl) TransactionReceipt(
ctx context.Context, txHash common.Hash) (*types.Receipt, error) {
if client, ok := c.GetAnyChainClient(); ok {
if client, ok := c.GetHTTP(); ok {
return client.TransactionReceipt(ctx, txHash)
}
return nil, ErrClientNotFound
Expand All @@ -67,23 +67,23 @@ func (c *ChainProviderImpl) TransactionReceipt(
// SubscribeNewHead subscribes to new head events.
func (c *ChainProviderImpl) SubscribeNewHead(
ctx context.Context) (chan *types.Header, ethereum.Subscription, error) {
if client, ok := c.GetAnyChainClient(); ok {
if client, ok := c.GetWS(); ok {
return client.SubscribeNewHead(ctx)
}
return nil, nil, ErrClientNotFound
}

// BlockNumber returns the current block number.
func (c *ChainProviderImpl) BlockNumber(ctx context.Context) (uint64, error) {
if client, ok := c.GetAnyChainClient(); ok {
if client, ok := c.GetHTTP(); ok {
return client.BlockNumber(ctx)
}
return 0, ErrClientNotFound
}

// ChainID returns the current chain ID.
func (c *ChainProviderImpl) ChainID(ctx context.Context) (*big.Int, error) {
if client, ok := c.GetAnyChainClient(); ok {
if client, ok := c.GetHTTP(); ok {
return client.ChainID(ctx)
}
return nil, ErrClientNotFound
Expand All @@ -92,7 +92,7 @@ func (c *ChainProviderImpl) ChainID(ctx context.Context) (*big.Int, error) {
// BalanceAt returns the balance of the given address at the given block number.
func (c *ChainProviderImpl) BalanceAt(
ctx context.Context, address common.Address, blockNumber *big.Int) (*big.Int, error) {
if client, ok := c.GetAnyChainClient(); ok {
if client, ok := c.GetHTTP(); ok {
return client.BalanceAt(ctx, address, blockNumber)
}
return nil, ErrClientNotFound
Expand All @@ -101,7 +101,7 @@ func (c *ChainProviderImpl) BalanceAt(
// CodeAt returns the code of the given account at the given block number.
func (c *ChainProviderImpl) CodeAt(
ctx context.Context, account common.Address, blockNumber *big.Int) ([]byte, error) {
if client, ok := c.GetAnyChainClient(); ok {
if client, ok := c.GetHTTP(); ok {
return client.CodeAt(ctx, account, blockNumber)
}
return nil, ErrClientNotFound
Expand All @@ -110,7 +110,7 @@ func (c *ChainProviderImpl) CodeAt(
// EstimateGas estimates the gas needed to execute a specific transaction.
func (c *ChainProviderImpl) EstimateGas(
ctx context.Context, msg ethereum.CallMsg) (uint64, error) {
if client, ok := c.GetAnyChainClient(); ok {
if client, ok := c.GetHTTP(); ok {
return client.EstimateGas(ctx, msg)
}
return 0, ErrClientNotFound
Expand All @@ -119,7 +119,7 @@ func (c *ChainProviderImpl) EstimateGas(
// FilterLogs returns the logs that satisfy the given filter query.
func (c *ChainProviderImpl) FilterLogs(
ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error) {
if client, ok := c.GetAnyChainClient(); ok {
if client, ok := c.GetHTTP(); ok {
return client.FilterLogs(ctx, q)
}
return nil, ErrClientNotFound
Expand All @@ -128,7 +128,7 @@ func (c *ChainProviderImpl) FilterLogs(
// HeaderByNumber returns the header of the block with the given number.
func (c *ChainProviderImpl) HeaderByNumber(
ctx context.Context, number *big.Int) (*types.Header, error) {
if client, ok := c.GetAnyChainClient(); ok {
if client, ok := c.GetHTTP(); ok {
return client.HeaderByNumber(ctx, number)
}
return nil, ErrClientNotFound
Expand All @@ -137,7 +137,7 @@ func (c *ChainProviderImpl) HeaderByNumber(
// PendingCodeAt returns the code of the given account in the pending state.
func (c *ChainProviderImpl) PendingCodeAt(
ctx context.Context, account common.Address) ([]byte, error) {
if client, ok := c.GetAnyChainClient(); ok {
if client, ok := c.GetHTTP(); ok {
return client.PendingCodeAt(ctx, account)
}
return nil, ErrClientNotFound
Expand All @@ -146,7 +146,7 @@ func (c *ChainProviderImpl) PendingCodeAt(
// PendingNonceAt returns the nonce of the given account in the pending state.
func (c *ChainProviderImpl) PendingNonceAt(
ctx context.Context, account common.Address) (uint64, error) {
if client, ok := c.GetAnyChainClient(); ok {
if client, ok := c.GetHTTP(); ok {
return client.PendingNonceAt(ctx, account)
}
return 0, ErrClientNotFound
Expand All @@ -155,7 +155,7 @@ func (c *ChainProviderImpl) PendingNonceAt(
// SendTransaction sends the given transaction.
func (c *ChainProviderImpl) SendTransaction(
ctx context.Context, tx *types.Transaction) error {
if client, ok := c.GetAnyChainClient(); ok {
if client, ok := c.GetHTTP(); ok {
return client.SendTransaction(ctx, tx)
}
return ErrClientNotFound
Expand All @@ -165,15 +165,15 @@ func (c *ChainProviderImpl) SendTransaction(
func (c *ChainProviderImpl) SubscribeFilterLogs(
ctx context.Context, q ethereum.FilterQuery, ch chan<- types.Log,
) (ethereum.Subscription, error) {
if client, ok := c.GetAnyChainClient(); ok {
if client, ok := c.GetWS(); ok {
return client.SubscribeFilterLogs(ctx, q, ch)
}
return nil, ErrClientNotFound
}

// SuggestGasPrice suggests a gas price.
func (c *ChainProviderImpl) SuggestGasPrice(ctx context.Context) (*big.Int, error) {
if client, ok := c.GetAnyChainClient(); ok {
if client, ok := c.GetHTTP(); ok {
return client.SuggestGasPrice(ctx)
}
return nil, ErrClientNotFound
Expand All @@ -183,15 +183,15 @@ func (c *ChainProviderImpl) SuggestGasPrice(ctx context.Context) (*big.Int, erro
func (c *ChainProviderImpl) CallContract(
ctx context.Context, msg ethereum.CallMsg, blockNumber *big.Int,
) ([]byte, error) {
if client, ok := c.GetAnyChainClient(); ok {
if client, ok := c.GetHTTP(); ok {
return client.CallContract(ctx, msg, blockNumber)
}
return nil, ErrClientNotFound
}

// SuggestGasTipCap suggests a gas tip cap.
func (c *ChainProviderImpl) SuggestGasTipCap(ctx context.Context) (*big.Int, error) {
if client, ok := c.GetAnyChainClient(); ok {
if client, ok := c.GetHTTP(); ok {
return client.SuggestGasTipCap(ctx)
}
return nil, ErrClientNotFound
Expand All @@ -201,8 +201,27 @@ func (c *ChainProviderImpl) SuggestGasTipCap(ctx context.Context) (*big.Int, err
func (c *ChainProviderImpl) TransactionByHash(
ctx context.Context, hash common.Hash,
) (*types.Transaction, bool, error) {
if client, ok := c.GetAnyChainClient(); ok {
if client, ok := c.GetHTTP(); ok {
return client.TransactionByHash(ctx, hash)
}
return nil, false, ErrClientNotFound
}

// "id": 1,
// "result": {
// "pending": {
// "0xe74aA377Dbc22450349774d1C427337995120DCB": {
// "3698316": {
// "blockHash": null,
// "blockNumber": null,
// "from": "0xe74aa377dbc22450349774d1c427337995120dcb",
// "gas": "0x715b",

func (c *ChainProviderImpl) TxPoolContent(ctx context.Context) (
map[string]map[string]map[string]*types.Transaction, error,
) {
if client, ok := c.GetHTTP(); ok {
return client.TxPoolContent(ctx)
}
return nil, ErrClientNotFound
}
8 changes: 4 additions & 4 deletions client/eth/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package eth

// Config is the configuration for the eth client.
type Config struct {
EthHTTPURL string
EthWSURL string
EthHTTPURLs string
EthWSURLs string
}

func DefaultConfig() *Config {
return &Config{
EthHTTPURL: "http://localhost:8545",
EthWSURL: "ws://localhost:8546",
EthHTTPURLs: "http://localhost:8545",
EthWSURLs: "ws://localhost:8546",
}
}
Loading
Loading