forked from royaltm/node-zmq-raft
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstate_machine_base.js
107 lines (86 loc) · 3.16 KB
/
state_machine_base.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
/*
* Copyright (c) 2016 Rafał Michalski <[email protected]>
*/
"use strict";
const assert = require('assert');
const ReadyEmitter = require('../common/readyemitter');
/*
Implementations may implement nothing.
There are 2 special events emitted on the state machine from zmq-raft:
On raft state change:
"raft-state": (state, currentTerm)
- `state{symbol}`: peer's current state (see common.constants.FSM_*)
- `currentTerm{number}`: peer's current term
on client request, when its type is unknown to raft:
"client-request": (reply, msgType, msgArgs)
- `reply{Function}`: function which replies messages to the client: reply([arg1, arg2, ...args]),
for more about `reply` function see `protocol.FramesProtocol.prototype.createRouterMessageListener`
- `msgType{Buffer}`: a message type (any length >= 1)
- `msgArgs{Array<Buffer>}: the rest of the message frames.
Client's rpc requests sent to the raft's ZMQ_ROUTER socket follows Dispatch RPC. The msgType is the 1st frame,
while the msgArgs are frames beginning with 3rd frame.
The 2nd frame is authorization frame and is discarded from event args.
Original ZMQ_ROUTER ident frame and requestId frame are accessible on `reply` function as:
- `reply.ident{Buffer}`
- `reply.requestId{Buffer}`
They allow to detect and bounce message duplicates.
*/
class StateMachineBase extends ReadyEmitter {
/**
* Creates new instance
*
* Implementations might load current state from persistent storage and read
* the `lastApplied` value from it.
*
* Implementations must call this[Symbol.for('setReady')]() after initialization.
*
* @return this
**/
constructor() {
super();
this.lastApplied = 0;
}
/**
* This property is read by raft to initialize its `lastApplied` and `commitIndex` values on startup.
*
* @property lastApplied{Number}
**/
/**
* closes this instance
*
* Implementations should close all resources asynchronously and resolve returned promise.
*
* @return {Promise}
**/
close() {
return Promise.resolve();
}
/**
* applies log entries to the state machine
*
* Returned promise should resolve to lastApplied value after applying provided entries.
*
* `entries` - contains buffers, each buffer for each log entry
* `nextIndex` - raft log index of the first (potential) entry provided in entries
* `snapshot` - optional snapshot to reset state machine from
*
* NOTE: `nextIndex` must never be less or equal to `lastApplied`
* `nextIndex` must never be grater than lastApplied + 1 or snapshot index + 1
*
* Asynchronous updates must be applied in FIFO order:
* First in (applied) - first resolved.
*
* @param {Array} entries
* @param {number} nextIndex
* @param {number} currentTerm
* @param {SnapshotBase} [snapshot]
* @return {Promise}
**/
applyEntries(entries, nextIndex, currentTerm, snapshot) {
assert(nextIndex > this.lastApplied);
assert(snapshot && nextIndex === snapshot.logIndex + 1 || !snapshot && nextIndex === this.lastApplied + 1);
this.lastApplied = nextIndex + entries.length - 1;
return Promise.resolve(this.lastApplied);
}
}
module.exports = exports = StateMachineBase;