Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SHARD-1235: add more helper function to serviceQueue module and export it as an object for clarity #364

Open
wants to merge 2 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/config/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ const SERVER_CONFIG: StrictServerConfiguration = {
networkTransactionsToProcessPerCycle: 20,
getTxTimestampTimeoutOffset: 0,
dropNGTByGossipEnabled: false,
removedNodeIDCacheSize: 1000
},
ip: {
externalIp: '0.0.0.0',
Expand Down
25 changes: 24 additions & 1 deletion src/p2p/NodeList.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,14 @@ import { P2P } from '@shardeum-foundation/lib-types'
import { Logger } from 'log4js'
import { isDebugModeMiddleware, isDebugModeMiddlewareLow } from '../network/debugMiddleware'
import { ShardusEvent } from '../shardus/shardus-types'
import { binarySearch, getTime, insertSorted, linearInsertSorted, propComparator, propComparator2 } from '../utils'
import {
binarySearch,
FIFOCache,
insertSorted,
linearInsertSorted,
propComparator,
propComparator2
} from '../utils'
import * as Comms from './Comms'
import { config, crypto, logger, network } from './Context'
import * as CycleChain from './CycleChain'
Expand Down Expand Up @@ -41,6 +48,10 @@ export let readyByTimeAndIdOrder: P2P.NodeListTypes.Node[]
export let activeOthersByIdOrder: P2P.NodeListTypes.Node[]
export let potentiallyRemoved: Set<P2P.NodeListTypes.Node['id']>
export let selectedById: Map<P2P.NodeListTypes.Node['id'], number>
export let removedNodeIDCache: FIFOCache<
P2P.NodeListTypes.Node['id'],
P2P.NodeListTypes.Node['publicKey']
>

const VERBOSE = false // Use to dump complete NodeList and CycleChain data

Expand Down Expand Up @@ -179,6 +190,10 @@ export function addNodes(newNodes: P2P.NodeListTypes.Node[], caller: string) {
}
}

export function getRemovedNodePubKeyFromCache(nodeId: P2P.NodeListTypes.Node['id']) {
return removedNodeIDCache.get(nodeId);
}

export function removeSelectedNode(id: string) {
selectedById.delete(id)
const idx = binarySearch(selectedByIdOrder, { id }, propComparator('id'))
Expand Down Expand Up @@ -264,6 +279,14 @@ export function removeNode(
selectedById.delete(id)
//readyByTimeAndIdOrder = readyByTimeAndIdOrder.filter((node) => node.id !== id)

// add to removed node cache
if (!removedNodeIDCache) {
removedNodeIDCache = new FIFOCache<P2P.NodeListTypes.Node['id'], P2P.NodeListTypes.Node['publicKey']>(
config.p2p.removedNodeIDCacheSize
)
}
removedNodeIDCache.set(id, node.publicKey)

Comms.evictCachedSockets([node])

if (raiseEvents) {
Expand Down
9 changes: 9 additions & 0 deletions src/p2p/ServiceQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@
// but we dont want to alter the txList, so we make a copy
const txListCopy = structuredClone(txList)

function applyTxAdd(txAddList: typeof record.txadd) {

Check failure on line 345 in src/p2p/ServiceQueue.ts

View workflow job for this annotation

GitHub Actions / ci / QA merge checks

Missing return type on function
for (const txadd of txAddList) {
const { sign, ...txDataWithoutSign } = txadd.txData
sortedInsert(txListCopy, {
Expand Down Expand Up @@ -370,9 +370,9 @@
}
}

function processShutdownHandlers(cycleRecord: P2P.CycleCreatorTypes.CycleRecord) {

Check failure on line 373 in src/p2p/ServiceQueue.ts

View workflow job for this annotation

GitHub Actions / ci / QA merge checks

Missing return type on function
Nodelist.activeByIdOrder.forEach((node) => {
for (let [key, handler] of shutdownHandlers) {

Check failure on line 375 in src/p2p/ServiceQueue.ts

View workflow job for this annotation

GitHub Actions / ci / QA merge checks

'key' is never reassigned. Use 'const' instead

Check failure on line 375 in src/p2p/ServiceQueue.ts

View workflow job for this annotation

GitHub Actions / ci / QA merge checks

'handler' is never reassigned. Use 'const' instead
try {
const txAddData = handler(node, cycleRecord)
if (txAddData?.txData == null) continue
Expand Down Expand Up @@ -533,6 +533,15 @@

/** Module Functions */

export function containsTx(txHash: string): boolean {
return txList.some((entry) => entry.hash === txHash)
}

export function containsTxData(txData: OpaqueTransaction): boolean {
const hash = crypto.hash(txData)
return containsTx(hash)
}

export function registerShutdownHandler(
type: string,
handler: (
Expand Down Expand Up @@ -561,7 +570,7 @@
type: string,
tx: OpaqueTransaction,
subQueueKey?: string,
priority: number = 0

Check failure on line 573 in src/p2p/ServiceQueue.ts

View workflow job for this annotation

GitHub Actions / ci / QA merge checks

Type number trivially inferred from a number literal, remove type annotation
): Promise<void> {
const hash = crypto.hash(tx)
const networkTx = {
Expand Down Expand Up @@ -929,7 +938,7 @@
p2pLogger.error(entry)
}

function omitKey(obj: any, keyToOmit: string) {

Check failure on line 941 in src/p2p/ServiceQueue.ts

View workflow job for this annotation

GitHub Actions / ci / QA merge checks

Missing return type on function
// deep emit key from object
const newObj = { ...obj }
for (const key in newObj) {
Expand Down
4 changes: 4 additions & 0 deletions src/p2p/Wrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ class State extends EventEmitter {
return NodeList.nodes.get(id)
}

getRemovedNodePubKeyFromCache(id: string): P2PTypings.NodeListTypes.Node['publicKey'] | undefined {
return NodeList.getRemovedNodePubKeyFromCache(id)
}

getNodes() {
return NodeList.nodes
}
Expand Down
36 changes: 23 additions & 13 deletions src/shardus/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,15 +139,15 @@ interface Shardus {
registerExternalPut: RouteHandlerRegister
registerExternalDelete: RouteHandlerRegister
registerExternalPatch: RouteHandlerRegister
registerBeforeAddVerifier: (type: string, verifier: (tx: OpaqueTransaction) => Promise<boolean>) => void
registerApplyVerifier: (type: string, verifier: (tx: OpaqueTransaction) => Promise<boolean>) => void
registerShutdownHandler: (
type: string,
handler: (
activeNode: P2P.NodeListTypes.Node,
record: P2P.CycleCreatorTypes.CycleRecord
) => Omit<P2P.ServiceQueueTypes.AddNetworkTx, 'cycle' | 'hash'> | null | undefined
) => void
serviceQueue: {
registerBeforeAddVerifier: typeof ServiceQueue.registerBeforeAddVerifier
registerApplyVerifier: typeof ServiceQueue.registerApplyVerifier
registerShutdownHandler: typeof ServiceQueue.registerShutdownHandler
containsTxData: typeof ServiceQueue.containsTxData
containsTx: typeof ServiceQueue.containsTx
addNetworkTx: typeof ServiceQueue.addNetworkTx
getLatestNetworkTxEntryForSubqueueKey: typeof ServiceQueue.getLatestNetworkTxEntryForSubqueueKey
}
_listeners: any
appliedConfigChanges: Set<string>

Expand Down Expand Up @@ -265,10 +265,16 @@ class Shardus extends EventEmitter {
this.registerExternalPatch = (route, authHandler, handler) =>
this.network.registerExternalPatch(route, authHandler, handler)

this.registerBeforeAddVerifier = ServiceQueue.registerBeforeAddVerifier
this.registerApplyVerifier = ServiceQueue.registerApplyVerifier
this.registerApplyVerifier = ServiceQueue.registerApplyVerifier
this.registerShutdownHandler = ServiceQueue.registerShutdownHandler
// serviceQueue module
this.serviceQueue = {
PudgyPug marked this conversation as resolved.
Show resolved Hide resolved
registerBeforeAddVerifier: ServiceQueue.registerBeforeAddVerifier,
registerApplyVerifier: ServiceQueue.registerApplyVerifier,
registerShutdownHandler: ServiceQueue.registerShutdownHandler,
containsTxData: ServiceQueue.containsTxData,
containsTx: ServiceQueue.containsTx,
addNetworkTx: ServiceQueue.addNetworkTx,
getLatestNetworkTxEntryForSubqueueKey: ServiceQueue.getLatestNetworkTxEntryForSubqueueKey
}

this.exitHandler.addSigListeners()
this.exitHandler.registerSync('reporter', () => {
Expand Down Expand Up @@ -1808,6 +1814,10 @@ class Shardus extends EventEmitter {
return this.p2p.state.getNode(id)
}

getRemovedNodePubKeyFromCache(id: string): ShardusTypes.Node['publicKey'] | undefined {
return this.p2p.state.getRemovedNodePubKeyFromCache(id)
}

getNodeByPubKey(id: string): ShardusTypes.Node {
return this.p2p.state.getNodeByPubKey(id)
}
Expand Down
4 changes: 3 additions & 1 deletion src/shardus/shardus-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,7 @@ export interface ServerConfiguration {
enableProblematicNodeRemoval?: boolean
/** when true, we will remove problematic nodes even when calculateToAcceptV2 says we should not remove any nodes. This is useful in development when testing this feature. */
enableDangerousProblematicNodeRemoval?: boolean
/** enable problematic node removal on a specific cycle. This is to allow the network to stabilize before removing problematic nodes.
/** enable problematic node removal on a specific cycle. This is to allow the network to stabilize before removing problematic nodes.
* enableProblematicNodeRemoval must be true for this to take effect*/
enableProblematicNodeRemovalOnCycle?: number
/** The problematicNodeRemovalCycleFrequency parameter is an Integer specifying the number of cycles between problematic node removals. */
Expand Down Expand Up @@ -986,6 +986,8 @@ export interface ServerConfiguration {
getTxTimestampTimeoutOffset?: number // default timeout is 5 seconds so this can be used to add or subtract time from that
/** allow dropping NGTs by hitting a single node's endpoint and the drop mesage being sent to other nodes by gossip */
dropNGTByGossipEnabled: boolean
// cache size for mapping of nodeId to pubKey for the `n` last removed nodes
removedNodeIDCacheSize: number
}
/** Server IP configuration */
ip?: {
Expand Down
Loading