-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathFollower.java
120 lines (100 loc) · 3.53 KB
/
Follower.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import java.net.MalformedURLException;
import java.rmi.Naming;
import java.rmi.NotBoundException;
import java.rmi.RemoteException;
import java.util.ArrayList;
public class Follower implements NodeState {
private Node node;
private Watch watch;
public Follower(Node node) {
this.node = node;
}
@Override
public void work() {
this.watch = new Watch(150,300,this);
this.watch.start();
}
@Override
public void reactVoteRequest(VoteRequest message) {
this.watch.reset();
Message vr=null;
//term superior or equal vote true
if(this.node.getTerm() <= message.getTerm() && this.node.getCandidateAddress() == null ){
vr = MessageConstructor.constructVoteReply(this.node,message.getFrom(),true);
this.node.setCandidateAddress(message.getFrom());
this.node.setTerm(message.getTerm());
}
if(this.node.getCandidateAddress()!=null){
vr = MessageConstructor.constructVoteReply(this.node,message.getFrom(),false);
}
this.node.sendUDPMessage(vr);
this.watch.reset();
}
@Override
public void reactVoteReply(VoteReply message) {
//followers don't care about vote reply's
}
@Override
public void reactAppendRequest(AppendRequest message) {
this.watch.reset();
//synchronise whit leader
if(this.node.getTerm() < message.getTerm() ||
(this.node.getTerm()== message.getTerm() && this.node.getLeaderAddress() == null)){
this.node.setTerm(message.getTerm());
this.node.setLeaderAddress(message.getFrom());
this.node.setCurrentCommitIndex(message.getCommitIndex());
this.node.setCandidateAddress(null);
retrieveChangesFromLeader();
this.watch.reset();
return;
}
if(this.node.getLeaderAddress().equals(message.getFrom())){
if(this.node.getCurrentCommitIndex()+1 == message.getCommitIndex()) {
Change c = message.getChange();
if (c != null) {
switch (c.getMethod()) {
case "ADD":
this.node.addPlace(c.getPlace());
break;
case "REMOVE":
this.node.removePlace(c.getPlace());
break;
}
this.node.setCurrentCommitIndex(message.getCommitIndex());
}
}
answerAppendRequest();
}
this.watch.reset();
}
@Override
public void reactAppendReply(AppendReply message) {
//follower don't care about append reply's
}
@Override
public void call() {
//time is up start candidacy
this.node.setState("Candidate");
}
@Override
public String getState() {
return "Follower";
}
private void retrieveChangesFromLeader(){
//contact leader
PlacesListInterface pli;
ArrayList places =null;
try {
pli=(PlacesListInterface) Naming.lookup("rmi://"+this.node.getLeaderAddress()+"/placesmanager");
places = pli.allPlaces();
} catch (NotBoundException | RemoteException | MalformedURLException e) {
e.printStackTrace();
}
this.node.setAllPlaces(places);
}
private void answerAppendRequest(){
Message ar;
ar= MessageConstructor.contructAppendReply(this.node);
this.node.sendUDPMessage(ar);
}
}