Skip to content

Commit 1f7e204

Browse files
authored
Merge pull request #967 from systream/rack_awareness
Add Rack awareness support
2 parents e817ee7 + c196b6e commit 1f7e204

File tree

9 files changed

+588
-26
lines changed

9 files changed

+588
-26
lines changed

docs/rack-awareness.md

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
# Rack Awareness / Availability zones / Location support
2+
3+
The aim is to be able to increase data safety, and make the cluster more resilient
4+
against a location/site/availability zone/rack loss.
5+
6+
To achieve this, a location parameter has been introduced.
7+
It can be set at runtime for each RIAK node.
8+
When claiming a new ring, the list of nodes is ordered taking into consideration the
9+
location of the individual nodes, in a manner that adjacent nodes are preferably
10+
from different locations.
11+
12+
Basically it only changes the order of the nodes fed into the claiming algorithm.
13+
14+
The default location is `undefined`. This means every node with no location parameter set
15+
will be handled as being in the same location.
16+
17+
## Ring visualization
18+
19+
![RIAK Ring Location](ring-location.png)
20+
21+
## Setup node's location parameter
22+
23+
Setting up nodes’ location parameter is a staged operation like
24+
other ring manipulations (join, leave, resize-ring, etc).
25+
26+
### via riak admin
27+
Change current node location parameter:
28+
```bash
29+
riak admin cluster location rack_a
30+
```
31+
or specify a node:
32+
```bash
33+
riak admin cluster location site_b [email protected]
34+
```
35+
36+
#### by erlang function call
37+
38+
```erlang
39+
riak_core_claimant:set_node_location(node(), "location_a"),
40+
```
41+
```erlang
42+
riak_core_claimant:plan(),
43+
riak_core_claimant:comit().
44+
```
45+
46+
## Pitfalls
47+
There are circumstances in which the preferable node location assignment cannot be guaranteed.
48+
49+
If at least one location parameter is set in the cluster when planning a cluster change, a warning
50+
message will be displayed when not all nodes in a preflist are assigned to a different location.
51+
52+
For example, if the default `n_val = 3` is specified and there are only `two distinct locations` set in the cluster,
53+
the message `WARNING: Not all replicas will be on distinct locations` will be shown.
54+
55+
### Not enough distinct locations
56+
When Distinct Location Count is not divisible by Ring size.
57+
58+
### Tail violations
59+
When Ring Size not divisible by Count Of Nodes.
60+
[claim-fixes](claim-fixes.md) cover this, but improper distinct location count could result in undesirable location distribution within the ring.
61+
62+
For example, there are 8 nodes on 3 distinct locations.
63+
To ensure that every site/location has a piece of data, n_val must be at least 4.
64+
65+
It can be checked:
66+
67+
Stages changes:
68+
```erlang
69+
PlannedRing = element(1, lists:last(element(3, riak_core_claimant:plan()))).
70+
riak_core_location:check_ring(PlannedRing, Nval = 4, MinimumNumberOfDistinctLocations = 3).
71+
```
72+
73+
Actual ring:
74+
```erlang
75+
{ok, Ring} = riak_core_ring_manager:get_my_ring(),
76+
riak_core_location:check_ring(Ring, Nval = 4, MinimumNumberOfDistinctLocations = 3).
77+
```
78+
79+
If `riak_core_location:check_ring/3` returns with an empty list `[]`, there is no location violation.
80+
81+
### Won't optimize transfers between old and new ring
82+
When location parameter change triggers ring ownership change, it currently does not optimize transfers.

docs/ring-location.png

89.7 KB
Loading

src/riak_core_claim.erl

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,12 @@ wants_claim_v2(Ring, Node) ->
187187
Count = proplists:get_value(Node, Counts, 0),
188188
case Count < Avg of
189189
false ->
190-
no;
190+
case riak_core_ring:has_location_changed(Ring) of
191+
true ->
192+
{yes, 1};
193+
false ->
194+
no
195+
end;
191196
true ->
192197
{yes, Avg - Count}
193198
end.
@@ -289,7 +294,8 @@ choose_claim_v2(Ring, Node) ->
289294
Params = default_choose_params(),
290295
choose_claim_v2(Ring, Node, Params).
291296

292-
choose_claim_v2(Ring, Node, Params0) ->
297+
choose_claim_v2(RingOrig, Node, Params0) ->
298+
Ring = riak_core_ring:clear_location_changed(RingOrig),
293299
Params = default_choose_params(Params0),
294300
%% Active::[node()]
295301
Active = riak_core_ring:claiming_members(Ring),
@@ -326,7 +332,8 @@ choose_claim_v2(Ring, Node, Params0) ->
326332
%% number of indices desired is less than the computed set.
327333
Padding = lists:duplicate(TargetN, undefined),
328334
Expanded = lists:sublist(Active ++ Padding, TargetN),
329-
PreferredClaim = riak_core_claim:diagonal_stripe(Ring, Expanded),
335+
ExpandedLocation = get_nodes_by_location(Expanded, Ring),
336+
PreferredClaim = riak_core_claim:diagonal_stripe(Ring, ExpandedLocation),
330337
PreferredNth = [begin
331338
{Nth, Idx} = lists:keyfind(Idx, 2, AllIndices),
332339
Nth
@@ -343,8 +350,10 @@ choose_claim_v2(Ring, Node, Params0) ->
343350
Indices2 = prefilter_violations(Ring, Node, AllIndices, Indices,
344351
TargetN, RingSize),
345352
%% Claim indices from the remaining candidate set
346-
Claim = select_indices(Owners, Deltas, Indices2, TargetN, RingSize),
347-
Claim2 = lists:sublist(Claim, Want),
353+
Claim2 = case select_indices(Owners, Deltas, Indices2, TargetN, RingSize) of
354+
[] -> [];
355+
Claim -> lists:sublist(Claim, Want)
356+
end,
348357
NewRing = lists:foldl(fun(Idx, Ring0) ->
349358
riak_core_ring:transfer_node(Idx, Node, Ring0)
350359
end, Ring, Claim2),
@@ -622,7 +631,8 @@ claim_diagonal(Wants, Owners, Params) ->
622631
riak_core_ring:riak_core_ring().
623632
sequential_claim(Ring0, Node, TargetN) ->
624633
Ring = riak_core_ring:upgrade(Ring0),
625-
Nodes = lists:usort([Node|riak_core_ring:claiming_members(Ring)]),
634+
OrigNodes = lists:usort([Node|riak_core_ring:claiming_members(Ring)]),
635+
Nodes = get_nodes_by_location(OrigNodes, Ring),
626636
NodeCount = length(Nodes),
627637
RingSize = riak_core_ring:num_partitions(Ring),
628638

@@ -709,7 +719,8 @@ backfill_ring(RingSize, Nodes, Remaining, Acc) ->
709719

710720
claim_rebalance_n(Ring0, Node) ->
711721
Ring = riak_core_ring:upgrade(Ring0),
712-
Nodes = lists:usort([Node|riak_core_ring:claiming_members(Ring)]),
722+
OrigNodes = lists:usort([Node|riak_core_ring:claiming_members(Ring)]),
723+
Nodes = get_nodes_by_location(OrigNodes, Ring),
713724
Zipped = diagonal_stripe(Ring, Nodes),
714725

715726
lists:foldl(fun({P, N}, Acc) ->
@@ -1270,6 +1281,19 @@ indices_within_n([This | Indices], TN, Last, Q, Acc) ->
12701281
circular_distance(I1, I2, Q) ->
12711282
min((Q + I1 - I2) rem Q, (Q + I2 - I1) rem Q).
12721283

1284+
%% @private
1285+
%% Get active nodes ordered by take location parameters into account
1286+
-spec get_nodes_by_location([node()|undefined], riak_core_ring:riak_core_ring()) ->
1287+
[node()|undefined].
1288+
get_nodes_by_location(Nodes, Ring) ->
1289+
NodesLocations = riak_core_ring:get_nodes_locations(Ring),
1290+
case riak_core_location:has_location_set_in_cluster(NodesLocations) of
1291+
false ->
1292+
Nodes;
1293+
true ->
1294+
riak_core_location:stripe_nodes_by_location(Nodes, NodesLocations)
1295+
end.
1296+
12731297
%% ===================================================================
12741298
%% Unit tests
12751299
%% ===================================================================

src/riak_core_claimant.erl

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@
4242
activate_bucket_type/1,
4343
get_bucket_type/2,
4444
get_bucket_type/3,
45-
bucket_type_iterator/0]).
45+
bucket_type_iterator/0,
46+
set_node_location/2]).
4647
-export([reassign_indices/1]). % helpers for claim sim
4748

4849
%% gen_server callbacks
@@ -52,7 +53,8 @@
5253
-type action() :: leave
5354
| remove
5455
| {replace, node()}
55-
| {force_replace, node()}.
56+
| {force_replace, node()}
57+
| {set_location, string()}.
5658

5759
-type riak_core_ring() :: riak_core_ring:riak_core_ring().
5860

@@ -164,6 +166,11 @@ abort_resize() ->
164166
pending_close(Ring, RingID) ->
165167
gen_server:call(?MODULE, {pending_close, Ring, RingID}).
166168

169+
%% @doc Stage a request to set a new location for the given node.
170+
-spec set_node_location(node(), string()) -> ok | {error, atom()}.
171+
set_node_location(Node, Location) ->
172+
stage(Node, {set_location, Location}).
173+
167174
%% @doc Clear the current set of staged transfers
168175
clear() ->
169176
gen_server:call(claimant(), clear, infinity).
@@ -446,8 +453,9 @@ maybe_commit_staged(Ring, NextRing, #state{next_ring=PlannedRing}) ->
446453
{_, _, false} ->
447454
{ignore, plan_changed};
448455
_ ->
449-
NewRing = riak_core_ring:increment_vclock(Claimant, NextRing),
450-
{new_ring, NewRing}
456+
NewRing0 = riak_core_ring:clear_location_changed(NextRing),
457+
NewRing1 = riak_core_ring:increment_vclock(Claimant, NewRing0),
458+
{new_ring, NewRing1}
451459
end.
452460

453461
%% @private
@@ -502,7 +510,9 @@ valid_request(Node, Action, Changes, Ring) ->
502510
{resize, NewRingSize} ->
503511
valid_resize_request(NewRingSize, Changes, Ring);
504512
abort_resize ->
505-
valid_resize_abort_request(Ring)
513+
valid_resize_abort_request(Ring);
514+
{set_location, Location} ->
515+
valid_set_location_request(Location, Node, Ring)
506516
end.
507517

508518
%% @private
@@ -615,6 +625,20 @@ valid_resize_abort_request(Ring) ->
615625
false -> {error, not_resizing}
616626
end.
617627

628+
%% @private
629+
%% Validating node member status
630+
valid_set_location_request(_Location, Node, Ring) ->
631+
case riak_core_ring:member_status(Ring, Node) of
632+
valid ->
633+
true;
634+
joining ->
635+
true;
636+
invalid ->
637+
{error, not_member};
638+
_ ->
639+
true
640+
end.
641+
618642
%% @private
619643
%% @doc Filter out any staged changes that are no longer valid. Changes
620644
%% can become invalid based on other staged changes, or by cluster
@@ -1094,7 +1118,9 @@ change({{force_replace, NewNode}, Node}, Ring) ->
10941118
change({{resize, NewRingSize}, _Node}, Ring) ->
10951119
riak_core_ring:resize(Ring, NewRingSize);
10961120
change({abort_resize, _Node}, Ring) ->
1097-
riak_core_ring:set_pending_resize_abort(Ring).
1121+
riak_core_ring:set_pending_resize_abort(Ring);
1122+
change({{set_location, Location}, Node}, Ring) ->
1123+
riak_core_ring:set_node_location(Node, Location, Ring).
10981124

10991125
internal_ring_changed(Node, CState) ->
11001126
{Changed, CState5} = do_claimant(Node, CState, fun log/2),

src/riak_core_cluster_cli.erl

Lines changed: 58 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,14 @@ register_all_usage() ->
4646
clique:register_usage(["riak-admin", "cluster", "status"], status_usage()),
4747
clique:register_usage(["riak-admin", "cluster", "partition"], partition_usage()),
4848
clique:register_usage(["riak-admin", "cluster", "partitions"], partitions_usage()),
49-
clique:register_usage(["riak-admin", "cluster", "partition_count"], partition_count_usage()).
49+
clique:register_usage(["riak-admin", "cluster", "partition_count"], partition_count_usage()),
50+
clique:register_usage(["riak-admin", "cluster", "partition_count"], partition_count_usage()),
51+
clique:register_usage(["riak-admin", "cluster", "location"], location_usage()).
5052

5153
register_all_commands() ->
5254
lists:foreach(fun(Args) -> apply(clique, register_command, Args) end,
5355
[status_register(), partition_count_register(),
54-
partitions_register(), partition_register()]).
56+
partitions_register(), partition_register(), location_register()]).
5557

5658
%%%
5759
%% Cluster status
@@ -72,6 +74,7 @@ cluster_usage() ->
7274
" partition Map partition IDs to indexes\n",
7375
" partitions Display partitions on a node\n",
7476
" partition-count Display ring size or node partition count\n\n",
77+
" location Set node location\n\n",
7578
" Use --help after a sub-command for more details.\n"
7679
].
7780

@@ -111,12 +114,20 @@ status(_CmdBase, [], []) ->
111114
[T0,T1,Table,T2].
112115

113116
format_status(Node, Status, Ring, RingStatus) ->
114-
{Claimant, _RingReady, Down, MarkedDown, Changes} = RingStatus,
115-
[{node, is_claimant(Node, Claimant)},
116-
{status, Status},
117-
{avail, node_availability(Node, Down, MarkedDown)},
118-
{ring, claim_percent(Ring, Node)},
119-
{pending, future_claim_percentage(Changes, Ring, Node)}].
117+
NodesLocations = riak_core_ring:get_nodes_locations(Ring),
118+
HasLocationInCluster = riak_core_location:has_location_set_in_cluster(NodesLocations),
119+
format_status(Node, Status, Ring, RingStatus, HasLocationInCluster, NodesLocations).
120+
121+
format_status(Node, Status, Ring, RingStatus, false, _) ->
122+
{Claimant, _RingReady, Down, MarkedDown, Changes} = RingStatus,
123+
[{node, is_claimant(Node, Claimant)},
124+
{status, Status},
125+
{avail, node_availability(Node, Down, MarkedDown)},
126+
{ring, claim_percent(Ring, Node)},
127+
{pending, future_claim_percentage(Changes, Ring, Node)}];
128+
format_status(Node, Status, Ring, RingStatus, true, NodesLocations) ->
129+
Row = format_status(Node, Status, Ring, RingStatus, false, NodesLocations),
130+
Row ++ [{location, riak_core_location:get_node_location(Node, NodesLocations)}].
120131

121132
is_claimant(Node, Node) ->
122133
" (C) " ++ atom_to_list(Node) ++ " ";
@@ -263,6 +274,45 @@ id_out1(id, Id, Ring, RingSize) when Id < RingSize ->
263274
id_out1(id, Id, _Ring, _RingSize) ->
264275
make_alert(["ERROR: Id ", integer_to_list(Id), " is invalid."]).
265276

277+
278+
%%%
279+
%% Location
280+
%%%
281+
location_usage() ->
282+
["riak-admin cluster location <new_location> [--node node]\n\n",
283+
" Set the node location parameter\n\n",
284+
"Options\n",
285+
" -n <node>, --node <node>\n",
286+
" Set node location for the specified node.\n"
287+
].
288+
289+
location_register() ->
290+
[["riak-admin", "cluster", "location", '*'], % Cmd
291+
[], % KeySpecs
292+
[{node, [{shortname, "n"}, {longname, "node"},
293+
{typecast, fun clique_typecast:to_node/1}]}], % FlagSpecs
294+
fun stage_set_location/3]. % Implementation callback
295+
296+
stage_set_location([_, _, _, Location], _, Flags) ->
297+
Node = proplists:get_value(node, Flags, node()),
298+
try
299+
case riak_core_claimant:set_node_location(Node, Location) of
300+
ok ->
301+
[clique_status:text(
302+
io_lib:format("Success: staged changing location of node ~p to ~s~n",
303+
[Node, Location]))];
304+
{error, not_member} ->
305+
make_alert(
306+
io_lib:format("Failed: ~p is not a member of the cluster.~n", [Node])
307+
)
308+
end
309+
catch
310+
Exception:Reason ->
311+
lager:error("Setting node location failed ~p:~p", [Exception, Reason]),
312+
make_alert("Setting node location failed, see log for details~n")
313+
end.
314+
315+
266316
%%%
267317
%% Internal
268318
%%%

0 commit comments

Comments
 (0)