Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RED-103: use nodelistFromStates function in all cycle tx sendGossip calls #253

Open
wants to merge 11 commits into
base: dev
Choose a base branch
from
Open
6 changes: 3 additions & 3 deletions src/p2p/Active.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import { getSortedStandbyJoinRequests } from './Join/v2'
import { selectNodesFromReadyList } from './Join/v2/syncFinished'
import { isDebugModeMiddleware } from '../network/debugMiddleware'
import { Utils } from '@shardus/types'
import { nodeListFromStates } from "./Join";
import { nodelistFromStates } from "./Join"
import { checkGossipPayload } from '../utils/GossipValidation'

let syncTimes = []
Expand Down Expand Up @@ -46,7 +46,7 @@ const gossipActiveRoute: P2P.P2PTypes.GossipHandler<P2P.ActiveTypes.SignedActive
payload,
tracker,
sender,
nodeListFromStates([
nodelistFromStates([
P2P.P2PTypes.NodeStatus.ACTIVE,
P2P.P2PTypes.NodeStatus.READY,
P2P.P2PTypes.NodeStatus.SYNCING,
Expand Down Expand Up @@ -269,7 +269,7 @@ export function sendRequests() {
activeTx,
'',
null,
nodeListFromStates([
nodelistFromStates([
P2P.P2PTypes.NodeStatus.ACTIVE,
P2P.P2PTypes.NodeStatus.READY,
P2P.P2PTypes.NodeStatus.SYNCING,
Expand Down
29 changes: 26 additions & 3 deletions src/p2p/Apoptosis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import { VectorBufferStream } from '../utils/serialization/VectorBufferStream'
import * as Comms from './Comms'
import { crypto, logger, network } from './Context'
import { currentCycle, currentQuarter } from './CycleCreator'
import { activeByIdOrder, byIdOrder, byPubKey, nodes } from './NodeList'
import { activeByIdOrder, byPubKey, nodes } from './NodeList'
import * as Self from './Self'
import { robustQuery } from './Utils'
import { TypeIdentifierEnum } from '../types/enum/TypeIdentifierEnum'
Expand All @@ -52,6 +52,7 @@ import { BadRequest, serializeResponseError } from '../types/ResponseError'
import { RequestErrorEnum } from '../types/enum/RequestErrorEnum'
import { getStreamWithTypeCheck, requestErrorHandler } from '../types/Helpers'

import { nodelistFromStates } from './Join'

/** STATE */

Expand Down Expand Up @@ -195,7 +196,18 @@ const apoptosisGossipRoute: P2P.P2PTypes.GossipHandler<P2P.ApoptosisTypes.Signed
}
if ([1, 2].includes(currentQuarter)) {
if (addProposal(payload)) {
Comms.sendGossip(gossipRouteName, payload, tracker, Self.id, byIdOrder, false) // use Self.id so we don't gossip to ourself
Comms.sendGossip(
gossipRouteName,
payload,
tracker,
Self.id,
nodelistFromStates([
P2P.P2PTypes.NodeStatus.ACTIVE,
P2P.P2PTypes.NodeStatus.READY,
P2P.P2PTypes.NodeStatus.SYNCING,
]),
false
) // use Self.id so we don't gossip to ourself
}
}
} finally {
Expand Down Expand Up @@ -306,7 +318,18 @@ export function sendRequests() {
// make sure node is still in the network, since it might
// have already been removed
if (nodes.get(id)) {
Comms.sendGossip(gossipRouteName, proposals[id], '', null, byIdOrder, true)
Comms.sendGossip(
gossipRouteName,
proposals[id],
'',
null,
nodelistFromStates([
P2P.P2PTypes.NodeStatus.ACTIVE,
P2P.P2PTypes.NodeStatus.READY,
P2P.P2PTypes.NodeStatus.SYNCING,
]),
true
)
}
}
}
Expand Down
53 changes: 49 additions & 4 deletions src/p2p/Archivers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import { Result, ResultAsync } from 'neverthrow'
import { Utils } from '@shardus/types'
import { arch } from 'os'
import { checkGossipPayload } from '../utils/GossipValidation'
import { nodelistFromStates } from './Join'

const clone = rfdc()

Expand Down Expand Up @@ -319,7 +320,18 @@ export function addArchiverJoinRequest(joinRequest: P2P.ArchiversTypes.Request,
joinRequest
)
if (gossip === true) {
Comms.sendGossip('joinarchiver', joinRequest, tracker, null, NodeList.byIdOrder, true)
Comms.sendGossip(
'joinarchiver',
joinRequest,
tracker,
null,
nodelistFromStates([
P2P.P2PTypes.NodeStatus.ACTIVE,
P2P.P2PTypes.NodeStatus.READY,
P2P.P2PTypes.NodeStatus.SYNCING,
]),
true
)
}
return { success: true }
}
Expand Down Expand Up @@ -428,7 +440,18 @@ export function addLeaveRequest(leaveRequest: P2P.ArchiversTypes.Request, tracke
leaveRequests.push(leaveRequest)
if (logFlags.console) console.log('adding leave requests', leaveRequests)
if (gossip === true) {
Comms.sendGossip('leavingarchiver', leaveRequest, tracker, null, NodeList.byIdOrder, true)
Comms.sendGossip(
'leavingarchiver',
leaveRequest,
tracker,
null,
nodelistFromStates([
P2P.P2PTypes.NodeStatus.ACTIVE,
P2P.P2PTypes.NodeStatus.READY,
P2P.P2PTypes.NodeStatus.SYNCING,
]),
true
)
}
return { success: true }
}
Expand Down Expand Up @@ -939,7 +962,18 @@ export function registerRoutes() {
}
if (!accepted.success) return warn('Archiver join request not accepted.')
if (logFlags.p2pNonFatal) info('Archiver join request accepted!')
Comms.sendGossip('joinarchiver', payload, tracker, sender, NodeList.byIdOrder, false)
Comms.sendGossip(
'joinarchiver',
payload,
tracker,
sender,
nodelistFromStates([
P2P.P2PTypes.NodeStatus.ACTIVE,
P2P.P2PTypes.NodeStatus.READY,
P2P.P2PTypes.NodeStatus.SYNCING,
]),
false
)
} finally {
profilerInstance.scopedProfileSectionEnd('joinarchiver')
}
Expand All @@ -958,7 +992,18 @@ export function registerRoutes() {
const accepted = await addLeaveRequest(payload, tracker, false)
if (!accepted.success) return warn('Archiver leave request not accepted.')
/* prettier-ignore */ if (logFlags.p2pNonFatal) info('Archiver leave request accepted!')
Comms.sendGossip('leavingarchiver', payload, tracker, sender, NodeList.byIdOrder, false)
Comms.sendGossip(
'leavingarchiver',
payload,
tracker,
sender,
nodelistFromStates([
P2P.P2PTypes.NodeStatus.ACTIVE,
P2P.P2PTypes.NodeStatus.READY,
P2P.P2PTypes.NodeStatus.SYNCING,
]),
false
)
} finally {
profilerInstance.scopedProfileSectionEnd('leavingarchiver')
}
Expand Down
1 change: 0 additions & 1 deletion src/p2p/Comms.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import { getStreamWithTypeCheck, requestErrorHandler } from '../types/Helpers'
import { TypeIdentifierEnum } from '../types/enum/TypeIdentifierEnum'
import { InternalError, ResponseError, serializeResponseError } from '../types/ResponseError'
import { Utils } from '@shardus/types'
import { nodeListFromStates } from './Join'

/** ROUTES */

Expand Down
25 changes: 23 additions & 2 deletions src/p2p/CycleAutoScale.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { enterRecovery, enterSafety } from './Modes'
import { getOurNodeIndex } from './Utils'
import { shardusGetTime } from '../network'
import { Utils } from '@shardus/types'
import { nodelistFromStates } from './Join'

/** STATE */

Expand Down Expand Up @@ -54,7 +55,17 @@ const gossipScaleRoute: P2P.P2PTypes.GossipHandler<P2P.CycleAutoScaleTypes.Signe

const added = addExtScalingRequest(payload)
if (!added) return
Comms.sendGossip('scaling', payload, tracker, sender, NodeList.byIdOrder, false, 2)
Comms.sendGossip(
'scaling',
payload,
tracker,
sender,
nodelistFromStates([
P2P.P2PTypes.NodeStatus.ACTIVE
]),
false,
2
)
} finally {
profilerInstance.scopedProfileSectionEnd('gossip-scaling')
}
Expand Down Expand Up @@ -131,7 +142,17 @@ function _requestNetworkScaling(upOrDown) {
signedRequest
)}`
)
Comms.sendGossip('scaling', signedRequest, '', null, NodeList.byIdOrder, true, 2)
Comms.sendGossip(
'scaling',
signedRequest,
'',
null,
nodelistFromStates([
P2P.P2PTypes.NodeStatus.ACTIVE
]),
true,
2
)
scalingRequested = true
requestedScalingType = signedRequest.scale //only set this when our node requests scaling
nestedCountersInstance.countEvent('p2p', 'initiate gossip: scaling: ' + (upOrDown ? 'up' : 'down'))
Expand Down
4 changes: 2 additions & 2 deletions src/p2p/CycleCreator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ import { getStreamWithTypeCheck, requestErrorHandler } from '../types/Helpers'
import { RequestErrorEnum } from '../types/enum/RequestErrorEnum'
import { BadRequest, InternalError, NotFound, serializeResponseError } from '../types/ResponseError'
import { Utils } from '@shardus/types'
import { nodeListFromStates } from './Join'
import { nodelistFromStates } from './Join'
import { AJVSchemaEnum } from '../types/enum/AJVSchemaEnum'

/** CONSTANTS */
Expand Down Expand Up @@ -1302,7 +1302,7 @@ async function gossipCycleCert(sender: P2P.NodeListTypes.Node['id'], tracker?: s
signedCertGossip,
tracker,
sender,
Join.nodeListFromStates([
nodelistFromStates([
P2P.P2PTypes.NodeStatus.ACTIVE,
P2P.P2PTypes.NodeStatus.READY,
P2P.P2PTypes.NodeStatus.SYNCING,
Expand Down
12 changes: 6 additions & 6 deletions src/p2p/Join/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import deepmerge from 'deepmerge'

Check warning on line 1 in src/p2p/Join/index.ts

View workflow job for this annotation

GitHub Actions / ci / QA merge checks

'deepmerge' is defined but never used
import { version } from '../../../package.json'
import * as http from '../../http'
import { logFlags } from '../../logger'
Expand All @@ -11,7 +11,7 @@
import * as CycleCreator from '../CycleCreator'
import * as NodeList from '../NodeList'
import * as Self from '../Self'
import { getOurNodeIndex, robustQuery } from '../Utils'

Check warning on line 14 in src/p2p/Join/index.ts

View workflow job for this annotation

GitHub Actions / ci / QA merge checks

'getOurNodeIndex' is defined but never used
import { isBogonIP, isInvalidIP, isIPv6 } from '../../utils/functions/checkIP'
import { nestedCountersInstance } from '../../utils/nestedCounters'
import { Logger } from 'log4js'
Expand All @@ -29,8 +29,8 @@
import { drainSelectedPublicKeys, forceSelectSelf } from './v2/select'
import { deleteStandbyNode, drainNewUnjoinRequests, processNewUnjoinRequest } from './v2/unjoin'
import { JoinRequest } from '@shardus/types/build/src/p2p/JoinTypes'
import { updateNodeState } from '../Self'

Check warning on line 32 in src/p2p/Join/index.ts

View workflow job for this annotation

GitHub Actions / ci / QA merge checks

'updateNodeState' is defined but never used
import { HTTPError } from 'got'

Check warning on line 33 in src/p2p/Join/index.ts

View workflow job for this annotation

GitHub Actions / ci / QA merge checks

'HTTPError' is defined but never used
import {
drainLostAfterSelectionNodes,
drainSyncStarted,
Expand All @@ -47,8 +47,8 @@
/** STATE */

let p2pLogger: Logger
let mainLogger: Logger

Check warning on line 50 in src/p2p/Join/index.ts

View workflow job for this annotation

GitHub Actions / ci / QA merge checks

'mainLogger' is assigned a value but never used
const clone = rfdc()

Check warning on line 51 in src/p2p/Join/index.ts

View workflow job for this annotation

GitHub Actions / ci / QA merge checks

'clone' is assigned a value but never used

let requests: P2P.JoinTypes.JoinRequest[]
let seen: Set<P2P.P2PTypes.Node['publicKey']>
Expand Down Expand Up @@ -444,7 +444,7 @@

// Select the most recent nodes for removal
for (let i = standbyList.length - 1; i >= 0 && removeCount < nodesToRemoveCount; i--) {
const node = standbyList[i]

Check warning on line 447 in src/p2p/Join/index.ts

View workflow job for this annotation

GitHub Actions / ci / QA merge checks

Variable Assigned to Object Injection Sink

// check if the node is not already marked for removal
if (!standbyRemoveSet.has(node.nodeInfo.publicKey)) {
Expand Down Expand Up @@ -660,7 +660,7 @@
syncStartedTx,
'',
null,
nodeListFromStates([
nodelistFromStates([
P2P.P2PTypes.NodeStatus.ACTIVE,
P2P.P2PTypes.NodeStatus.READY,
P2P.P2PTypes.NodeStatus.SYNCING,
Expand Down Expand Up @@ -688,7 +688,7 @@
syncFinishedTx,
'',
null,
nodeListFromStates([
nodelistFromStates([
P2P.P2PTypes.NodeStatus.ACTIVE,
P2P.P2PTypes.NodeStatus.READY,
P2P.P2PTypes.NodeStatus.SYNCING,
Expand Down Expand Up @@ -720,7 +720,7 @@
standbyRefreshTx,
'',
null,
nodeListFromStates([
nodelistFromStates([
P2P.P2PTypes.NodeStatus.ACTIVE,
P2P.P2PTypes.NodeStatus.READY,
P2P.P2PTypes.NodeStatus.SYNCING,
Expand Down Expand Up @@ -770,7 +770,7 @@
signedObjectWithJoinRequest,
'',
null,
nodeListFromStates([
nodelistFromStates([
P2P.P2PTypes.NodeStatus.ACTIVE,
P2P.P2PTypes.NodeStatus.READY,
P2P.P2PTypes.NodeStatus.SYNCING,
Expand Down Expand Up @@ -805,7 +805,7 @@
unjoinRequest,
'',
null,
nodeListFromStates([
nodelistFromStates([
P2P.P2PTypes.NodeStatus.ACTIVE,
P2P.P2PTypes.NodeStatus.READY,
P2P.P2PTypes.NodeStatus.SYNCING,
Expand Down Expand Up @@ -1628,7 +1628,7 @@
}
}

export function nodeListFromStates(states: P2P.P2PTypes.NodeStatus[]): P2P.NodeListTypes.Node[] {
export function nodelistFromStates(states: P2P.P2PTypes.NodeStatus[]): P2P.NodeListTypes.Node[] {
if (Self.isRestartNetwork) return NodeList.byIdOrder
const { NodeStatus } = P2P.P2PTypes
const stateMappings: { [key in P2P.P2PTypes.NodeStatus]?: P2P.NodeListTypes.Node[] } = {
Expand All @@ -1642,8 +1642,8 @@
let result: P2P.NodeListTypes.Node[] = []

for (const state of states) {
if (stateMappings[state]) {

Check warning on line 1645 in src/p2p/Join/index.ts

View workflow job for this annotation

GitHub Actions / ci / QA merge checks

Generic Object Injection Sink
result = result.concat(stateMappings[state])

Check warning on line 1646 in src/p2p/Join/index.ts

View workflow job for this annotation

GitHub Actions / ci / QA merge checks

Function Call Object Injection Sink
}
}
const self = NodeList.byJoinOrder.find((node) => node.id === Self.id)
Expand Down
22 changes: 13 additions & 9 deletions src/p2p/Join/routes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
queueStandbyRefreshRequest,
queueJoinRequest,
queueUnjoinRequest,
verifyJoinRequestTypes,

Check warning on line 22 in src/p2p/Join/routes.ts

View workflow job for this annotation

GitHub Actions / ci / QA merge checks

'verifyJoinRequestTypes' is defined but never used
nodeListFromStates
nodelistFromStates,
} from '.'
import { config } from '../Context'
import { isBogonIP } from '../../utils/functions/checkIP'
Expand Down Expand Up @@ -214,7 +214,7 @@
joinRequest,
'',
null,
nodeListFromStates([
nodelistFromStates([
P2P.P2PTypes.NodeStatus.ACTIVE,
P2P.P2PTypes.NodeStatus.READY,
P2P.P2PTypes.NodeStatus.SYNCING,
Expand Down Expand Up @@ -269,7 +269,11 @@
if (processResult.success === false) {
return res.status(500).send(processResult.reason)
}
Comms.sendGossip('gossip-sync-started', syncStarted, '', null, NodeList.byIdOrder, true)
Comms.sendGossip('gossip-sync-started', syncStarted, '', null, nodelistFromStates([
P2P.P2PTypes.NodeStatus.ACTIVE,
P2P.P2PTypes.NodeStatus.READY,
P2P.P2PTypes.NodeStatus.SYNCING,
]), true)
return res.status(200).send()
},
}
Expand Down Expand Up @@ -450,7 +454,7 @@
payload,
tracker,
sender,
nodeListFromStates([
nodelistFromStates([
P2P.P2PTypes.NodeStatus.ACTIVE,
P2P.P2PTypes.NodeStatus.READY,
P2P.P2PTypes.NodeStatus.SYNCING,
Expand Down Expand Up @@ -564,7 +568,7 @@
payload,
tracker,
sender,
nodeListFromStates([
nodelistFromStates([
P2P.P2PTypes.NodeStatus.ACTIVE,
P2P.P2PTypes.NodeStatus.READY,
P2P.P2PTypes.NodeStatus.SYNCING,
Expand Down Expand Up @@ -596,7 +600,7 @@
payload,
tracker,
sender,
nodeListFromStates([
nodelistFromStates([
P2P.P2PTypes.NodeStatus.ACTIVE,
P2P.P2PTypes.NodeStatus.READY,
P2P.P2PTypes.NodeStatus.SYNCING,
Expand Down Expand Up @@ -644,7 +648,7 @@
payload,
tracker,
sender,
nodeListFromStates([
nodelistFromStates([
P2P.P2PTypes.NodeStatus.ACTIVE,
P2P.P2PTypes.NodeStatus.READY,
P2P.P2PTypes.NodeStatus.SYNCING,
Expand Down Expand Up @@ -699,7 +703,7 @@
payload,
tracker,
sender,
nodeListFromStates([
nodelistFromStates([
P2P.P2PTypes.NodeStatus.ACTIVE,
P2P.P2PTypes.NodeStatus.READY,
P2P.P2PTypes.NodeStatus.SYNCING,
Expand Down Expand Up @@ -748,7 +752,7 @@
payload,
tracker,
sender,
nodeListFromStates([
nodelistFromStates([
P2P.P2PTypes.NodeStatus.ACTIVE,
P2P.P2PTypes.NodeStatus.READY,
P2P.P2PTypes.NodeStatus.SYNCING,
Expand Down
Loading
Loading