Skip to content

Commit 136cf10

Browse files
authored
some fix/improve (#489)
* typo * [feat] impl Describer for RouteTable * [feat] impl Describer for RegionEngine * [feat] impl Describer for StoreEngine * [feat] impl Describer for StoreEngine * [rheakv] avoid sending repeatedly * [rheakv] avoid sending repeatedly * lock free improve * format * upgrade rocksdb(v5.18.4) for aarch64 facebook/rocksdb#6497 * upgrade bolt to 1.6.2 * read-index ut * let append-entry fail-fast * typo * rocksdb:5.18.3 * add weak read for `isLeader()` * improve `checkDeadNodes` * improve `checkDeadNodes` * fail-fast on `addReplicator` * format * add `LongHeldDetectingReadWriteLock` * add node-lock-blocked metric * add UT: LongHeldDetectingReadWriteLockTest * by CR * by CR
1 parent 406d34d commit 136cf10

File tree

29 files changed

+609
-67
lines changed

29 files changed

+609
-67
lines changed

jraft-core/src/main/java/com/alipay/sofa/jraft/Node.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,12 @@ public interface Node extends Lifecycle<NodeOptions>, Describer {
7676
*/
7777
boolean isLeader();
7878

79+
/**
80+
* Returns true when the node is leader.
81+
* @param blocking if true, will be blocked until the node finish it's state change
82+
*/
83+
boolean isLeader(final boolean blocking);
84+
7985
/**
8086
* Shutdown local replica node.
8187
*

jraft-core/src/main/java/com/alipay/sofa/jraft/ReplicatorGroup.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,9 @@ public interface ReplicatorGroup extends Describer {
5353
* @param peer target peer
5454
* @return true on success
5555
*/
56-
boolean addReplicator(final PeerId peer);
56+
default boolean addReplicator(final PeerId peer) {
57+
return addReplicator(peer, ReplicatorType.Follower);
58+
}
5759

5860
/**
5961
* Add a replicator attached with |peer|
@@ -67,7 +69,24 @@ public interface ReplicatorGroup extends Describer {
6769
* @param replicatorType replicator type
6870
* @return true on success
6971
*/
70-
boolean addReplicator(final PeerId peer, ReplicatorType replicatorType);
72+
default boolean addReplicator(final PeerId peer, ReplicatorType replicatorType) {
73+
return addReplicator(peer, replicatorType, true);
74+
}
75+
76+
/**
77+
* Try to add a replicator attached with |peer|
78+
* will be a notification when the replicator catches up according to the
79+
* arguments.
80+
* NOTE: when calling this function, the replicators starts to work
81+
* immediately, and might call Node#stepDown which might have race with
82+
* the caller, you should deal with this situation.
83+
*
84+
* @param peer target peer
85+
* @param replicatorType replicator type
86+
* @param sync synchronous
87+
* @return true on success
88+
*/
89+
boolean addReplicator(final PeerId peer, ReplicatorType replicatorType, boolean sync);
7190

7291
/**
7392
* Send heartbeat to a peer.

jraft-core/src/main/java/com/alipay/sofa/jraft/RouteTable.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import com.alipay.sofa.jraft.rpc.CliClientService;
3535
import com.alipay.sofa.jraft.rpc.CliRequests;
3636
import com.alipay.sofa.jraft.rpc.RpcRequests;
37+
import com.alipay.sofa.jraft.util.Describer;
3738
import com.alipay.sofa.jraft.util.Requires;
3839
import com.google.protobuf.Message;
3940

@@ -44,7 +45,7 @@
4445
*
4546
* 2018-Apr-09 10:41:21 AM
4647
*/
47-
public class RouteTable {
48+
public class RouteTable implements Describer {
4849

4950
private static final Logger LOG = LoggerFactory.getLogger(RouteTable.class);
5051

@@ -353,14 +354,31 @@ public boolean removeGroup(final String groupId) {
353354
return this.groupConfTable.remove(groupId) != null;
354355
}
355356

357+
@Override
358+
public String toString() {
359+
return "RouteTable{" + "groupConfTable=" + groupConfTable + '}';
360+
}
361+
356362
private RouteTable() {
357363
}
358364

365+
@Override
366+
public void describe(final Printer out) {
367+
out.println("RouteTable:") //
368+
.print(" ") //
369+
.println(toString());
370+
}
371+
359372
private static class GroupConf {
360373

361374
private final StampedLock stampedLock = new StampedLock();
362375

363376
private Configuration conf;
364377
private PeerId leader;
378+
379+
@Override
380+
public String toString() {
381+
return "GroupConf{" + "conf=" + conf + ", leader=" + leader + '}';
382+
}
365383
}
366384
}

jraft-core/src/main/java/com/alipay/sofa/jraft/closure/ReadIndexClosure.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public abstract class ReadIndexClosure implements Closure {
6363
private long index = INVALID_LOG_INDEX;
6464
private byte[] requestContext;
6565

66-
private volatile int state;
66+
private volatile int state = PENDING;
6767

6868
public ReadIndexClosure() {
6969
this(DEFAULT_TIMEOUT);
@@ -75,7 +75,6 @@ public ReadIndexClosure() {
7575
* @param timeoutMs timeout millis
7676
*/
7777
public ReadIndexClosure(long timeoutMs) {
78-
this.state = PENDING;
7978
if (timeoutMs >= 0) {
8079
// Lazy to init the timer
8180
TimeoutScanner.TIMER.newTimeout(new TimeoutTask(this), timeoutMs, TimeUnit.MILLISECONDS);
@@ -130,7 +129,7 @@ public void run(final Status status) {
130129

131130
try {
132131
run(status, this.index, this.requestContext);
133-
} catch (Throwable t) {
132+
} catch (final Throwable t) {
134133
LOG.error("Fail to run ReadIndexClosure with status: {}.", status, t);
135134
}
136135
}

jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java

Lines changed: 79 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import java.util.concurrent.atomic.AtomicInteger;
3232
import java.util.concurrent.locks.Lock;
3333
import java.util.concurrent.locks.ReadWriteLock;
34-
import java.util.concurrent.locks.ReentrantReadWriteLock;
3534

3635
import org.apache.commons.lang.StringUtils;
3736
import org.slf4j.Logger;
@@ -114,9 +113,11 @@
114113
import com.alipay.sofa.jraft.util.Requires;
115114
import com.alipay.sofa.jraft.util.RpcFactoryHelper;
116115
import com.alipay.sofa.jraft.util.SignalHelper;
116+
import com.alipay.sofa.jraft.util.SystemPropertyUtil;
117117
import com.alipay.sofa.jraft.util.ThreadHelper;
118118
import com.alipay.sofa.jraft.util.ThreadId;
119119
import com.alipay.sofa.jraft.util.Utils;
120+
import com.alipay.sofa.jraft.util.concurrent.LongHeldDetectingReadWriteLock;
120121
import com.alipay.sofa.jraft.util.timer.RaftTimerFactory;
121122
import com.google.protobuf.Message;
122123
import com.lmax.disruptor.BlockingWaitStrategy;
@@ -164,7 +165,8 @@ public class NodeImpl implements Node, RaftServerService {
164165
0);
165166

166167
/** Internal states */
167-
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
168+
private final ReadWriteLock readWriteLock = new NodeReadWriteLock(
169+
this);
168170
protected final Lock writeLock = this.readWriteLock
169171
.writeLock();
170172
protected final Lock readLock = this.readWriteLock
@@ -210,7 +212,7 @@ public class NodeImpl implements Node, RaftServerService {
210212
private Disruptor<LogEntryAndClosure> applyDisruptor;
211213
private RingBuffer<LogEntryAndClosure> applyQueue;
212214

213-
/** Metrics*/
215+
/** Metrics */
214216
private NodeMetrics metrics;
215217

216218
private NodeId nodeId;
@@ -223,6 +225,33 @@ public class NodeImpl implements Node, RaftServerService {
223225
/** The number of elections time out for current node */
224226
private volatile int electionTimeoutCounter;
225227

228+
private static class NodeReadWriteLock extends LongHeldDetectingReadWriteLock {
229+
230+
static final long MAX_BLOCKING_MS_TO_REPORT = SystemPropertyUtil.getLong(
231+
"jraft.node.detecting.lock.max_blocking_ms_to_report", -1);
232+
233+
private final Node node;
234+
235+
public NodeReadWriteLock(Node node) {
236+
super(MAX_BLOCKING_MS_TO_REPORT, TimeUnit.MILLISECONDS);
237+
this.node = node;
238+
}
239+
240+
@Override
241+
public void report(final AcquireMode acquireMode, final Thread heldThread,
242+
final Collection<Thread> queuedThreads, final long blockedNanos) {
243+
final long blockedMs = TimeUnit.NANOSECONDS.toMillis(blockedNanos);
244+
LOG.warn(
245+
"Raft-Node-Lock report: currentThread={}, acquireMode={}, heldThread={}, queuedThreads={}, blockedMs={}.",
246+
Thread.currentThread(), acquireMode, heldThread, queuedThreads, blockedMs);
247+
248+
final NodeMetrics metrics = this.node.getNodeMetrics();
249+
if (metrics != null) {
250+
metrics.recordLatency("node-lock-blocked", blockedMs);
251+
}
252+
}
253+
}
254+
226255
/**
227256
* Node service event.
228257
*
@@ -2123,22 +2152,27 @@ private void onCaughtUp(final PeerId peer, final long term, final long version,
21232152
}
21242153
}
21252154

2126-
private void checkDeadNodes(final Configuration conf, final long monotonicNowMs) {
2155+
private boolean checkDeadNodes(final Configuration conf, final long monotonicNowMs,
2156+
final boolean stepDownOnCheckFail) {
21272157
// Check learner replicators at first.
2128-
for (PeerId peer : conf.getLearners()) {
2158+
for (final PeerId peer : conf.getLearners()) {
21292159
checkReplicator(peer);
21302160
}
21312161
// Ensure quorum nodes alive.
21322162
final List<PeerId> peers = conf.listPeers();
21332163
final Configuration deadNodes = new Configuration();
21342164
if (checkDeadNodes0(peers, monotonicNowMs, true, deadNodes)) {
2135-
return;
2165+
return true;
21362166
}
2137-
LOG.warn("Node {} steps down when alive nodes don't satisfy quorum, term={}, deadNodes={}, conf={}.",
2138-
getNodeId(), this.currTerm, deadNodes, conf);
2139-
final Status status = new Status();
2140-
status.setError(RaftError.ERAFTTIMEDOUT, "Majority of the group dies: %d/%d", deadNodes.size(), peers.size());
2141-
stepDown(this.currTerm, false, status);
2167+
if (stepDownOnCheckFail) {
2168+
LOG.warn("Node {} steps down when alive nodes don't satisfy quorum, term={}, deadNodes={}, conf={}.",
2169+
getNodeId(), this.currTerm, deadNodes, conf);
2170+
final Status status = new Status();
2171+
status.setError(RaftError.ERAFTTIMEDOUT, "Majority of the group dies: %d/%d", deadNodes.size(),
2172+
peers.size());
2173+
stepDown(this.currTerm, false, status);
2174+
}
2175+
return false;
21422176
}
21432177

21442178
private boolean checkDeadNodes0(final List<PeerId> peers, final long monotonicNowMs, final boolean checkReplicator,
@@ -2189,17 +2223,41 @@ private List<PeerId> getAliveNodes(final Collection<PeerId> peers, final long mo
21892223
return alivePeers;
21902224
}
21912225

2226+
@SuppressWarnings({ "LoopStatementThatDoesntLoop", "ConstantConditions" })
21922227
private void handleStepDownTimeout() {
2228+
do {
2229+
this.readLock.lock();
2230+
try {
2231+
if (this.state.compareTo(State.STATE_TRANSFERRING) > 0) {
2232+
LOG.debug("Node {} stop step-down timer, term={}, state={}.", getNodeId(), this.currTerm,
2233+
this.state);
2234+
return;
2235+
}
2236+
final long monotonicNowMs = Utils.monotonicMs();
2237+
if (!checkDeadNodes(this.conf.getConf(), monotonicNowMs, false)) {
2238+
break;
2239+
}
2240+
if (!this.conf.getOldConf().isEmpty()) {
2241+
if (!checkDeadNodes(this.conf.getOldConf(), monotonicNowMs, false)) {
2242+
break;
2243+
}
2244+
}
2245+
return;
2246+
} finally {
2247+
this.readLock.unlock();
2248+
}
2249+
} while (false);
2250+
21932251
this.writeLock.lock();
21942252
try {
21952253
if (this.state.compareTo(State.STATE_TRANSFERRING) > 0) {
21962254
LOG.debug("Node {} stop step-down timer, term={}, state={}.", getNodeId(), this.currTerm, this.state);
21972255
return;
21982256
}
21992257
final long monotonicNowMs = Utils.monotonicMs();
2200-
checkDeadNodes(this.conf.getConf(), monotonicNowMs);
2258+
checkDeadNodes(this.conf.getConf(), monotonicNowMs, true);
22012259
if (!this.conf.getOldConf().isEmpty()) {
2202-
checkDeadNodes(this.conf.getOldConf(), monotonicNowMs);
2260+
checkDeadNodes(this.conf.getOldConf(), monotonicNowMs, true);
22032261
}
22042262
} finally {
22052263
this.writeLock.unlock();
@@ -2636,6 +2694,14 @@ private void handleVoteTimeout() {
26362694

26372695
@Override
26382696
public boolean isLeader() {
2697+
return isLeader(true);
2698+
}
2699+
2700+
@Override
2701+
public boolean isLeader(final boolean blocking) {
2702+
if (!blocking) {
2703+
return this.state == State.STATE_LEADER;
2704+
}
26392705
this.readLock.lock();
26402706
try {
26412707
return this.state == State.STATE_LEADER;

jraft-core/src/main/java/com/alipay/sofa/jraft/core/ReplicatorGroupImpl.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import com.alipay.sofa.jraft.option.RaftOptions;
3636
import com.alipay.sofa.jraft.option.ReplicatorGroupOptions;
3737
import com.alipay.sofa.jraft.option.ReplicatorOptions;
38+
import com.alipay.sofa.jraft.rpc.RaftClientService;
3839
import com.alipay.sofa.jraft.rpc.RpcRequests.AppendEntriesResponse;
3940
import com.alipay.sofa.jraft.rpc.RpcResponseClosure;
4041
import com.alipay.sofa.jraft.util.OnlyForTest;
@@ -109,21 +110,23 @@ public ThreadId getReplicator(final PeerId peer) {
109110
}
110111

111112
@Override
112-
public boolean addReplicator(final PeerId peer) {
113-
return addReplicator(peer, ReplicatorType.Follower);
114-
}
115-
116-
@Override
117-
public boolean addReplicator(final PeerId peer, final ReplicatorType replicatorType) {
113+
public boolean addReplicator(final PeerId peer, final ReplicatorType replicatorType, final boolean sync) {
118114
Requires.requireTrue(this.commonOptions.getTerm() != 0);
119115
this.failureReplicators.remove(peer);
120116
if (this.replicatorMap.containsKey(peer)) {
121117
return true;
122118
}
123119
final ReplicatorOptions opts = this.commonOptions == null ? new ReplicatorOptions() : this.commonOptions.copy();
124-
125120
opts.setReplicatorType(replicatorType);
126121
opts.setPeerId(peer);
122+
if (!sync) {
123+
final RaftClientService client = opts.getRaftRpcService();
124+
if (client != null && !client.checkConnection(peer.getEndpoint(), true)) {
125+
LOG.error("Fail to check replicator connection to peer={}, replicatorType={}.", peer, replicatorType);
126+
this.failureReplicators.put(peer, replicatorType);
127+
return false;
128+
}
129+
}
127130
final ThreadId rid = Replicator.start(opts, this.raftOptions);
128131
if (rid == null) {
129132
LOG.error("Fail to start replicator to peer={}, replicatorType={}.", peer, replicatorType);
@@ -181,7 +184,7 @@ public void checkReplicator(final PeerId peer, final boolean lockNode) {
181184
try {
182185
if (node.isLeader()) {
183186
final ReplicatorType rType = this.failureReplicators.get(peer);
184-
if (rType != null && addReplicator(peer, rType)) {
187+
if (rType != null && addReplicator(peer, rType, false)) {
185188
this.failureReplicators.remove(peer, rType);
186189
}
187190
}

jraft-core/src/main/java/com/alipay/sofa/jraft/entity/Checksum.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
public interface Checksum {
2626

2727
/**
28-
* Caculate a checksum value for this entity.
28+
* Calculate a checksum value for this entity.
2929
* @return checksum value
3030
*/
3131
long checksum();

jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/ClientService.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,14 @@ public interface ClientService extends Lifecycle<RpcOptions> {
4040
*/
4141
boolean connect(final Endpoint endpoint);
4242

43+
/**
44+
* Check connection for given address and async to create a new one if there is no connection.
45+
* @param endpoint target address
46+
* @param createIfAbsent create a new one if there is no connection
47+
* @return true if there is a connection and the connection is active and writable.
48+
*/
49+
boolean checkConnection(final Endpoint endpoint, final boolean createIfAbsent);
50+
4351
/**
4452
* Disconnect from endpoint.
4553
*

jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/RpcClient.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,18 @@ public interface RpcClient extends Lifecycle<RpcOptions> {
3232
* Check connection for given address.
3333
*
3434
* @param endpoint target address
35-
* @return true if there is a connection adn the connection is active adn writable.
35+
* @return true if there is a connection and the connection is active and writable.
3636
*/
3737
boolean checkConnection(final Endpoint endpoint);
3838

39+
/**
40+
* Check connection for given address and async to create a new one if there is no connection.
41+
* @param endpoint target address
42+
* @param createIfAbsent create a new one if there is no connection
43+
* @return true if there is a connection and the connection is active and writable.
44+
*/
45+
boolean checkConnection(final Endpoint endpoint, final boolean createIfAbsent);
46+
3947
/**
4048
* Close all connections of a address.
4149
*

0 commit comments

Comments
 (0)