Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

return error properly #743

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions platform/fabric/core/generic/committer/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,10 +315,10 @@ func (c *Committer) CommitConfig(blockNumber uint64, raw []byte, env *common.Env
}
switch vc {
case driver.Valid:
c.logger.Infof("config block [%s] already committed, skip it.", txID)
c.logger.Debugf("config block [%s] already committed, skip it.", txID)
return nil
case driver.Unknown:
c.logger.Infof("config block [%s] not committed, commit it.", txID)
c.logger.Debugf("config block [%s] not committed, commit it.", txID)
// this is okay
default:
return errors.Errorf("invalid configtx's [%s] status [%d]", txID, vc)
Expand Down Expand Up @@ -494,19 +494,19 @@ func (c *Committer) ReloadConfigTransactions() error {
}
defer qe.Done()

c.logger.Infof("looking up the latest config block available")
c.logger.Debugf("looking up the latest config block available")
var sequence uint64 = 0
for {
txID := ConfigTXPrefix + strconv.FormatUint(sequence, 10)
vc, _, err := c.Vault.Status(txID)
if err != nil {
return errors.WithMessagef(err, "failed getting tx's status [%s]", txID)
}
c.logger.Infof("check config block at txID [%s], status [%v]...", txID, vc)
c.logger.Debugf("check config block at txID [%s], status [%v]...", txID, vc)
done := false
switch vc {
case driver.Valid:
c.logger.Infof("config block available, txID [%s], loading...", txID)
c.logger.Debugf("config block available, txID [%s], loading...", txID)

key, err := rwset.CreateCompositeKey(channelConfigKey, []string{strconv.FormatUint(sequence, 10)})
if err != nil {
Expand Down Expand Up @@ -567,22 +567,22 @@ func (c *Committer) ReloadConfigTransactions() error {
continue
}

c.logger.Infof("config block at txID [%s] unavailable, stop loading", txID)
c.logger.Debugf("config block at txID [%s] unavailable, stop loading", txID)
done = true
default:
return errors.Errorf("invalid configtx's [%s] status [%d]", txID, vc)
}
if done {
c.logger.Infof("loading config block done")
c.logger.Debugf("loading config block done")
break
}
}
if sequence == 1 {
c.logger.Infof("no config block available, must start from genesis")
c.logger.Debugf("no config block available, must start from genesis")
// no configuration block found
return nil
}
c.logger.Infof("latest config block available at sequence [%d]", sequence-1)
c.logger.Debugf("latest config block available at sequence [%d]", sequence-1)

return nil
}
Expand Down Expand Up @@ -695,7 +695,7 @@ func (c *Committer) listenTo(ctx context.Context, txID string, timeout time.Dura
}

func (c *Committer) commitConfig(txID string, blockNumber uint64, seq uint64, envelope []byte) error {
c.logger.Infof("[Channel: %s] commit config transaction number [bn:%d][seq:%d]", c.ChannelConfig.ID(), blockNumber, seq)
c.logger.Debugf("[Channel: %s] commit config transaction number [bn:%d][seq:%d]", c.ChannelConfig.ID(), blockNumber, seq)

rws, err := c.Vault.NewRWSet(txID)
if err != nil {
Expand Down Expand Up @@ -844,7 +844,7 @@ func (c *Committer) applyBundle(bundle *channelconfig.Bundle) error {
// update the list of orderers
ordererConfig, exists := c.MembershipService.ChannelResources.OrdererConfig()
if !exists {
c.logger.Infof("no orderer configuration found in Channel config")
c.logger.Debugf("no orderer configuration found in Channel config")
return nil
}
c.logger.Debugf("[Channel: %s] Orderer config has changed, updating the list of orderers", c.ChannelConfig.ID())
Expand Down Expand Up @@ -885,7 +885,7 @@ func (c *Committer) applyBundle(bundle *channelconfig.Bundle) error {
// https://hyperledger-fabric.readthedocs.io/en/latest/upgrade_to_newest_version.html#define-ordering-node-endpoint-per-org
addr := bundle.ChannelConfig().OrdererAddresses()
if len(newOrderers) == 0 && len(orgs) == 1 && len(addr) > 0 {
c.logger.Infof("falling back to OrdererAddresses field in channel config (deprecated, please refer to Fabric docs)")
c.logger.Debugf("falling back to OrdererAddresses field in channel config (deprecated, please refer to Fabric docs)")
for _, endpoint := range addr {
if len(endpoint) == 0 {
c.logger.Debugf("[Channel: %s] empty orderer address, skipping", c.ChannelConfig.ID())
Expand All @@ -906,7 +906,7 @@ func (c *Committer) applyBundle(bundle *channelconfig.Bundle) error {
c.logger.Debugf("[Channel: %s] Updating the list of orderers: (%d) found", c.ChannelConfig.ID(), len(newOrderers))
return c.OrderingService.Configure(ordererConfig.ConsensusType(), newOrderers)
}
c.logger.Infof("[Channel: %s] No orderers found in Channel config", c.ChannelConfig.ID())
c.logger.Debugf("[Channel: %s] No orderers found in Channel config", c.ChannelConfig.ID())

return nil
}
Expand Down
19 changes: 11 additions & 8 deletions platform/fabric/core/generic/delivery/delivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ func (d *Delivery) Start(ctx context.Context) {
}

func (d *Delivery) Stop(err error) {
logger.Debugf("stop delivery with error [%s]", err)
d.stop <- err
close(d.stop)
}
Expand All @@ -158,15 +159,17 @@ func (d *Delivery) readBlocks(ch <-chan blockResponse) {
logger.Debugf("Invoking callback for block [%d]", b.block.Header)
stop, err := d.callback(b.ctx, b.block)
if err != nil {
logger.Errorf("callback errored for block %d: %v", b.block.Header.Number, err)
logger.Errorf("callback errored for block [%d], stop delivery: [%v]", b.block.Header.Number, err)
d.Stop(err)
return
}
if stop {
logger.Infof("Stopping delivery at block [%d]", b.block.Header.Number)
logger.Debugf("stopping delivery at block [%d]", b.block.Header.Number)
d.Stop(nil)
return
}
case <-d.stop:
logger.Infof("Stopping delivery service")
case err := <-d.stop:
logger.Debugf("stopping delivery service with err [%s]", err)
return
}
}
Expand All @@ -183,15 +186,15 @@ func (d *Delivery) runReceiver(ctx context.Context, ch chan<- blockResponse) {
for {
select {
case <-d.stop:
logger.Infof("Stopped receiver")
logger.Debugf("Stopped receiver")
return
default:
select {
case <-d.stop:
logger.Infof("Stopped receiver")
logger.Debugf("Stopped receiver")
return
case <-ctx.Done():
logger.Infof("Context done")
logger.Debugf("Context done")
// Time to cancel
d.Stop(errors.New("context done"))
default:
Expand Down Expand Up @@ -273,7 +276,7 @@ func (d *Delivery) runReceiver(ctx context.Context, ch chan<- blockResponse) {

func (d *Delivery) untilStop() error {
for err := range d.stop {
logger.Infof("Stopping delivery service")
logger.Debugf("stopping delivery service with error [%s]", err)
return err
}
return nil
Expand Down
4 changes: 2 additions & 2 deletions platform/fabric/core/generic/finality/deliveryflm.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@ func (m *txInfoMapper) MapTxData(ctx context.Context, tx []byte, block *common.B
}

txInfos := make(map[driver2.Namespace]txInfo, len(rwSet.WriteSet.Writes))
logger.Infof("TX [%s] has %d namespaces", chdr.TxId, len(rwSet.WriteSet.Writes))
logger.Debugf("TX [%s] has %d namespaces", chdr.TxId, len(rwSet.WriteSet.Writes))
for ns, write := range rwSet.WriteSet.Writes {
logger.Infof("TX [%s:%s] has %d writes", chdr.TxId, ns, len(write))
logger.Debugf("TX [%s:%s] has %d writes", chdr.TxId, ns, len(write))
txInfos[ns] = txInfo{
txID: chdr.TxId,
status: finalityEvent.ValidationCode,
Expand Down