Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove refute v2 #351

Draft
wants to merge 10 commits into
base: 1.15.4
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/config/deprecated/server.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@
"maxJoinedPerCycle": 1,
"maxSyncingPerCycle": 5,
"maxRotatedPerCycle": 1,
"maxProblematicNodeRemovalsPerCycle": 1,
"problematicNodeConsecutiveRefuteThreshold": 6,
"problematicNodeRefutePercentageThreshold": 0.1,
"problematicNodeHistoryLength": 100,
"firstCycleJoin": 10,
"maxPercentOfDelta": 40,
"minScaleReqsNeeded": 5,
Expand Down
6 changes: 6 additions & 0 deletions src/config/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ const SERVER_CONFIG: StrictServerConfiguration = {
maxSyncTimeFloor: 1200,
maxNodeForSyncTime: 9,
maxRotatedPerCycle: 1,
enableProblematicNodeRemoval: true,
enableProblematicNodeRemovalOnCycle: 10,
maxProblematicNodeRemovalsPerCycle: 1,
problematicNodeConsecutiveRefuteThreshold: 6,
problematicNodeRefutePercentageThreshold: 0.1,
problematicNodeHistoryLength: 100,
firstCycleJoin: 10,
maxPercentOfDelta: 40,
minScaleReqsNeeded: 5,
Expand Down
28 changes: 28 additions & 0 deletions src/debug/debug.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ import Trie from 'trie-prefix-tree'
import { isDebugModeMiddleware, isDebugModeMiddlewareMedium } from '../network/debugMiddleware'
import { nestedCountersInstance } from '../utils/nestedCounters'
import { logFlags } from '../logger'
import { currentCycle } from '../p2p/CycleCreator'
import * as ProblemNodeHandler from '../p2p/ProblemNodeHandler'

import { nodes, NodeWithRefuteCycles } from '../p2p/NodeList'
const tar = require('tar-fs')
const fs = require('fs')

Expand Down Expand Up @@ -133,6 +137,30 @@ class Debug {
}
return res.json({ success: true })
})
this.network.registerExternalGet('debug_problemNodeTrackerDump', isDebugModeMiddleware, (req, res) => {
try {
const dump: Record<string, any> = {}

// Collect data for all nodes that have any refute history
for (const [nodeId, node] of nodes as Map<string, NodeWithRefuteCycles>) {
if (node.refuteCycles?.size > 0) {
const refuteCycles = Array.from(node.refuteCycles).sort((a, b) => a - b)
dump[nodeId] = {
refuteCycles,
stats: {
refutePercentage: ProblemNodeHandler.getRefutePercentage(node.refuteCycles, currentCycle),
consecutiveRefutes: ProblemNodeHandler.getConsecutiveRefutes(refuteCycles, currentCycle),
isProblematic: ProblemNodeHandler.isNodeProblematic(node, currentCycle)
}
}
}
}

return res.json({ success: true, data: { nodeHistories: dump } })
} catch (e) {
return res.json({ success: false, error: e.message })
}
})
}
}

Expand Down
123 changes: 113 additions & 10 deletions src/p2p/ModeSystemFuncs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import * as CycleCreator from './CycleCreator'
import * as CycleChain from './CycleChain'
import { logFlags } from '../logger'
import { Utils } from '@shardus/types'
import { getProblematicNodes } from './ProblemNodeHandler'

interface ToAcceptResult {
add: number
Expand Down Expand Up @@ -239,7 +240,16 @@ export function calculateToAcceptV2(prevRecord: P2P.CycleCreatorTypes.CycleRecor
return { add, remove }
}

// need to think about and maybe ask Omar about using prev record for determining mode, could use next record
const getApoptosizedNodes = (txs: P2P.RotationTypes.Txs & P2P.ApoptosisTypes.Txs): string[] => {
const apoptosizedNodesList = []
for (const request of txs.apoptosis) {
const node = NodeList.nodes.get(request.id)
if (node) {
apoptosizedNodesList.push(node.id)
}
}
return apoptosizedNodesList
}

/** Returns the number of expired nodes and the list of removed nodes using calculateToAcceptV2 */
export function getExpiredRemovedV2(
Expand Down Expand Up @@ -374,12 +384,105 @@ export function getExpiredRemovedV2(
return { expired, removed }
}

/** Returns a linearly interpolated value between `amountToShrink` and the same
* multiplied by a `scaleFactor`. The result depends on the
* `scaleInfluenceForShrink` */
function getScaledAmountToShrink(): number {
const nonScaledAmount = config.p2p.amountToShrink
const scaledAmount = config.p2p.amountToShrink * CycleCreator.scaleFactor
const scaleInfluence = config.p2p.scaleInfluenceForShrink
return Math.floor(lerp(nonScaledAmount, scaledAmount, scaleInfluence))
}
/** Returns the number of expired nodes and the list of removed nodes using calculateToAcceptV2
* this list includes problematic nodes + expired nodes.
*/
export function getExpiredRemovedV3(
prevRecord: P2P.CycleCreatorTypes.CycleRecord,
lastLoggedCycle: number,
txs: P2P.RotationTypes.Txs & P2P.ApoptosisTypes.Txs,
info: (...msg: string[]) => void
): { problematic: number; expired: number; removed: string[] } {

// clear state from last run
NodeList.potentiallyRemoved.clear()

// Don't expire/remove any if nodeExpiryAge is negative
if (config.p2p.nodeExpiryAge < 0) return { problematic: 0, expired: 0, removed: [] }

const active = NodeList.activeByIdOrder.length
const start = prevRecord.start
let expireTimestamp = start - config.p2p.nodeExpiryAge
if (expireTimestamp < 0) expireTimestamp = 0

// calculate the target number of nodes
const { add, remove } = calculateToAcceptV2(prevRecord)
nestedCountersInstance.countEvent(
'p2p',
`results of getExpiredRemovedV2.calculateToAcceptV2: add: ${add}, remove: ${remove}`
)

// get list of nodes that have been requested to be removed
const apoptosizedNodesList = getApoptosizedNodes(txs)
const numApoptosizedRemovals = apoptosizedNodesList.length

// Get the set of problematic nodes
const problematicWithApoptosizedNodes = getProblematicNodes(prevRecord)
// filter out apoptosized nodes from the problematic nodes
const problematicNodes = problematicWithApoptosizedNodes.filter(id => !apoptosizedNodesList.includes(id))
const numProblematicRemovals = Math.min(
problematicNodes.length,
config.p2p.maxProblematicNodeRemovalsPerCycle || 1,
)

// get list of expired nodes
const expirationTimeThreshold = Math.max(start - config.p2p.nodeExpiryAge, 0)
// expired, non-apoptosized, non-syncing nodes
const expiredNodes = NodeList.byJoinOrder.filter(node => node.activeTimestamp <= expirationTimeThreshold && node.status !== 'syncing' && !apoptosizedNodesList.includes(node.id)).map(node => node.id)
const numExpiredNodes = expiredNodes.length

// we can remove `remove` nodes, but we *must* remove the number of apoptosized nodes,
// as well as the number of problematic nodes (determined by config.p2p.maxProblematicNodeRemovalsPerCycle, if any)
// the remainder is the number of expired nodes we can remove this cycle
const numExpiredRemovals = remove - numApoptosizedRemovals - numProblematicRemovals

const cycle = CycleChain.newest.counter

if (cycle > lastLoggedCycle && remove > 0) {
lastLoggedCycle = cycle
info(
'scale down dump:' +
Utils.safeStringify({
cycle,
scaleFactor: CycleCreator.scaleFactor,
desired: prevRecord.desired,
active,
maxRemove: remove,
expired: numExpiredNodes,
})
)
}

nestedCountersInstance.countEvent(
'p2p',
`results of getExpiredRemovedV2: numApoptosizedRemovals: ${numApoptosizedRemovals}, numProblematicRemovals: ${numProblematicRemovals}, numExpiredRemovals: ${numExpiredRemovals}, removed: ${remove}`
)

// array that hold all the nodes to remove
// maintains the sort order provided in activeByIdOrder
const toRemoveUnsorted = problematicNodes
.slice(0, numProblematicRemovals)
.concat(
expiredNodes.slice(0, numExpiredRemovals)
)

// maintains the sort order provided in activeByIdOrder
const toRemove = NodeList.byJoinOrder
.filter(node => toRemoveUnsorted.includes(node.id))




const removed = [];
// Process nodes for removal
for (const node of toRemove) {
nestedCountersInstance.countEvent(
'p2p',
`getExpiredRemovedV2: adding node to removed: ${node.id}`
)
NodeList.potentiallyRemoved.add(node.id)
insertSorted(removed, node.id)
}

return { problematic: problematicNodes.length, expired: numExpiredNodes, removed }
}
22 changes: 21 additions & 1 deletion src/p2p/NodeList.ts
Original file line number Diff line number Diff line change
Expand Up @@ -305,13 +305,33 @@ export function removeNodes(
for (const id of ids) removeNode(id, raiseEvents, cycle)
}

// create shorthand type for node with refuteCycles
export type NodeWithRefuteCycles = P2P.NodeListTypes.Node & { refuteCycles: Set<number> }

export function updateNode(
update: P2P.NodeListTypes.Update,
raiseEvents: boolean,
cycle: P2P.CycleCreatorTypes.CycleRecord | null
) {
const node = nodes.get(update.id)
const node = nodes.get(update.id) as NodeWithRefuteCycles
if (node) {
// Initialize refuteCycles if it doesn't exist
if (!node.refuteCycles) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be in the if statement below as it can impact the hash of a node

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or at least inside of config.p2p.enableProblematicNodeRemoval

node.refuteCycles = new Set();
}

if (config.p2p.enableProblematicNodeRemoval && cycle.counter >= config.p2p.enableProblematicNodeRemovalOnCycle) {
// Track refutes if this update is from a cycle record
if (cycle && cycle.refuted?.includes(node.id)) {
node.refuteCycles.add(cycle.counter);
}

// Clean up old refutes using sliding window
const windowStart = Math.max(1, cycle.counter - config.p2p.problematicNodeHistoryLength);
const oldRefutes = Array.from(node.refuteCycles).filter(c => c < windowStart);
oldRefutes.forEach(c => node.refuteCycles.delete(c));
}

// Update node properties
for (const key of Object.keys(update)) {
node[key] = update[key]
Expand Down
58 changes: 58 additions & 0 deletions src/p2p/ProblemNodeHandler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import { P2P } from '@shardus/types'
import * as NodeList from './NodeList'
import { config } from './Context'
import { NodeWithRefuteCycles } from './NodeList';

export function isNodeProblematic(node: NodeWithRefuteCycles, currentCycle: number): boolean {
if (!node.refuteCycles) return false;

// Check consecutive refutes
const refuteCyclesArray = Array.from(node.refuteCycles as Set<number>).sort((a: number, b: number) => a - b);
const consecutiveRefutes = getConsecutiveRefutes(refuteCyclesArray, currentCycle);
if (consecutiveRefutes >= config.p2p.problematicNodeConsecutiveRefuteThreshold) {
return true;
}

// Check refute percentage in recent history
const refutePercentage = getRefutePercentage(node.refuteCycles, currentCycle);
if (refutePercentage >= config.p2p.problematicNodeRefutePercentageThreshold) {
return true;
}

return false;
}

export function getConsecutiveRefutes(refuteCycles: number[], currentCycle: number): number {
return refuteCycles[refuteCycles.length - 1] !== currentCycle ? 0 :
refuteCycles.filter((cycle, index) => {
return index === 0 || cycle === refuteCycles[index - 1] + 1;
}).length;
}

export function getRefutePercentage(refuteCycles: Set<number>, currentCycle: number): number {
const windowStart = Math.max(1, currentCycle - config.p2p.problematicNodeHistoryLength + 1);
const windowSize = Math.min(config.p2p.problematicNodeHistoryLength, currentCycle);
const recentRefutes = Array.from(refuteCycles)
.filter(cycle => cycle >= windowStart && cycle <= currentCycle).length;

return recentRefutes / windowSize;
}

export function getProblematicNodes(prevRecord: P2P.CycleCreatorTypes.CycleRecord): string[] {
const problematicNodes = new Set<string>();

for (const node of NodeList.activeByIdOrder) {
if (isNodeProblematic(node as NodeWithRefuteCycles, prevRecord.counter)) {
problematicNodes.add(node.id);
}
}

// Sort by refute percentage
return Array.from(problematicNodes).sort((a, b) => {
const nodeA = NodeList.nodes.get(a) as NodeWithRefuteCycles;
const nodeB = NodeList.nodes.get(b) as NodeWithRefuteCycles;
const percentageA = getRefutePercentage(nodeA.refuteCycles, prevRecord.counter);
const percentageB = getRefutePercentage(nodeB.refuteCycles, prevRecord.counter);
return percentageB - percentageA;
});
}
31 changes: 22 additions & 9 deletions src/p2p/Rotation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import * as CycleCreator from './CycleCreator'
import * as CycleChain from './CycleChain'
import { nestedCountersInstance } from '../utils/nestedCounters'
import { currentCycle } from './CycleCreator'
import { getExpiredRemovedV2 } from './ModeSystemFuncs'
import { getExpiredRemovedV2, getExpiredRemovedV3 } from './ModeSystemFuncs'
import { logFlags } from '../logger'
import { Utils } from '@shardus/types'

Expand Down Expand Up @@ -81,14 +81,27 @@ export function updateRecord(
nestedCountersInstance.countEvent('p2p', `results of getExpiredRemoved: expired: ${expired} removed: ${removed.length}`, 1)
if (logFlags && logFlags.verbose) console.log(`results of getExpiredRemoved: expired: ${expired} removed: ${removed.length} array: ${removed}`)
}

// Allow the autoscale module to set this value
const { expired, removed } = getExpiredRemovedV2(prev, lastLoggedCycle, txs, info)
nestedCountersInstance.countEvent('p2p', `results of getExpiredRemovedV2: expired: ${expired} removed: ${removed.length}`, 1)
if (logFlags && logFlags.verbose) console.log(`results of getExpiredRemovedV2: expired: ${expired} removed: ${removed.length} array: ${removed}`)

record.expired = expired
record.removed = removed // already sorted

// we only want to use the problematic node removal logic if we are past the enableProblematicNodeRemovalOnCycle and have a full history of refutes
// note: we may want to wait an additional config.p2p.problematicNodeHistoryLength cycles before we start removing problematic nodes
// this would give us a full history of refutes before we start removing problematic nodes
if (!config.p2p.enableProblematicNodeRemoval || currentCycle < config.p2p.enableProblematicNodeRemovalOnCycle) {
// Allow the autoscale module to set this value
const { expired, removed } = getExpiredRemovedV2(prev, lastLoggedCycle, txs, info)
nestedCountersInstance.countEvent('p2p', `results of getExpiredRemovedV2: expired: ${expired} removed: ${removed.length}`, 1)
if (logFlags && logFlags.verbose) console.log(`results of getExpiredRemovedV2: expired: ${expired} removed: ${removed.length} array: ${removed}`)

record.expired = expired
record.removed = removed // already sorted
} else {
const { expired, removed, problematic } = getExpiredRemovedV3(prev, lastLoggedCycle, txs, info)
nestedCountersInstance.countEvent('p2p', `results of getExpiredRemovedV2: expired: ${expired} removed: ${removed.length} problematic: ${problematic}`, 1)
if (logFlags && logFlags.verbose) console.log(`results of getExpiredRemovedV2: expired: ${expired} removed: ${removed.length} array: ${removed} problematic: ${problematic}`)

// record.problematic = problematic // may want to write this to cycle record for
record.expired = expired
record.removed = removed // already sorted
}
}

export function parseRecord(record: P2P.CycleCreatorTypes.CycleRecord): P2P.CycleParserTypes.Change {
Expand Down
Loading