Class NonBlockingCoordinator
- All Implemented Interfaces:
ChannelInterceptor, Heartbeat, MembershipListener
Title: Auto merging leader election algorithm
Description: Implementation of a simple coordinator algorithm that not only selects a coordinator, it also merges groups automatically when members are discovered that weren't part of the
This algorithm is non blocking meaning it allows for transactions while the coordination phase is going on
This implementation is based on a home brewed algorithm that uses the AbsoluteOrder of a membership to pass a token
ring of the current membership.
This is not the same as just using AbsoluteOrder! Consider the following scenario:
Nodes, A,B,C,D,E on a network, in that priority. AbsoluteOrder will only work if all nodes are receiving pings from
all the other nodes. meaning, that node{i} receives pings from node{all}-node{i}
but the following could happen if a multicast problem occurs. A has members {B,C,D}
B has members {A,C}
C has members {D,E}
D has members {A,B,C,E}
E has members {A,C,D}
Because the default Tribes membership implementation, relies on the multicast packets to arrive at all nodes
correctly, there is nothing guaranteeing that it will.
To best explain how this algorithm works, lets take the above example: For simplicity we assume that a send operation
is O(1) for all nodes, although this algorithm will work where messages overlap, as they all depend on absolute
order
Scenario 1: A,B,C,D,E all come online at the same time Eval phase, A thinks of itself as leader, B thinks of A as
leader, C thinks of itself as leader, D,E think of A as leader
Token phase:
(1) A sends out a message X{A-ldr, A-src, mbrs-A,B,C,D} to B where X is the id for the message(and the view)
(1) C sends out a message Y{C-ldr, C-src, mbrs-C,D,E} to D where Y is the id for the message(and the view)
(2) B receives X{A-ldr, A-src, mbrs-A,B,C,D}, sends X{A-ldr, A-src, mbrs-A,B,C,D} to C
(2) D receives Y{C-ldr, C-src, mbrs-C,D,E} D is aware of A,B, sends Y{A-ldr, C-src, mbrs-A,B,C,D,E} to E
(3) C receives X{A-ldr, A-src, mbrs-A,B,C,D}, sends X{A-ldr, A-src, mbrs-A,B,C,D,E} to D
(3) E receives Y{A-ldr, C-src, mbrs-A,B,C,D,E} sends Y{A-ldr, C-src, mbrs-A,B,C,D,E} to A
(4) D receives X{A-ldr, A-src, mbrs-A,B,C,D,E} sends sends X{A-ldr, A-src, mbrs-A,B,C,D,E} to A
(4) A receives Y{A-ldr, C-src, mbrs-A,B,C,D,E}, holds the message, add E to its list of members
(5) A receives X{A-ldr, A-src, mbrs-A,B,C,D,E}
At this point, the state looks like
A - {A-ldr, mbrs-A,B,C,D,E, id=X}
B - {A-ldr, mbrs-A,B,C,D, id=X}
C - {A-ldr, mbrs-A,B,C,D,E, id=X}
D - {A-ldr, mbrs-A,B,C,D,E, id=X}
E - {A-ldr, mbrs-A,B,C,D,E, id=Y}
A message doesn't stop until it reaches its original sender, unless its dropped by a higher leader. As you can see, E
still thinks the viewId=Y, which is not correct. But at this point we have arrived at the same membership and all
nodes are informed of each other.
To synchronize the rest we simply perform the following check at A when A receives X:
Original X{A-ldr, A-src, mbrs-A,B,C,D} == Arrived X{A-ldr, A-src, mbrs-A,B,C,D,E}
Since the condition is false, A, will resend the token, and A sends X{A-ldr, A-src, mbrs-A,B,C,D,E} to B When A
receives X again, the token is complete.
Optionally, A can send a message X{A-ldr, A-src, mbrs-A,B,C,D,E confirmed} to A,B,C,D,E who then install and accept
the view.
Let's assume that C1 arrives, C1 has lower priority than C, but higher priority than D.
Let's also assume that C1 sees the following view {B,D,E}
C1 waits for a token to arrive. When the token arrives, the same scenario as above will happen.
In the scenario where C1 sees {D,E} and A,B,C cannot see C1, no token will ever arrive.
In this case, C1 sends a Z{C1-ldr, C1-src, mbrs-C1,D,E} to D
D receives Z{C1-ldr, C1-src, mbrs-C1,D,E} and sends Z{A-ldr, C1-src, mbrs-A,B,C,C1,D,E} to E
E receives Z{A-ldr, C1-src, mbrs-A,B,C,C1,D,E} and sends it to A
A sends Z{A-ldr, A-src, mbrs-A,B,C,C1,D,E} to B and the chain continues until A receives the token again. At that
time A optionally sends out Z{A-ldr, A-src, mbrs-A,B,C,C1,D,E, confirmed} to A,B,C,C1,D,E
To ensure that the view gets implemented at all nodes at the same time, A will send out a VIEW_CONF message, this is the 'confirmed' message that is optional above.
Ideally, the interceptor below this one would be the TcpFailureDetector to ensure correct memberships
The example above, of course can be simplified with a finite statemachine:
But I suck at writing state machines, my head gets all confused. One day I will document this algorithm though.
Maybe I'll do a state diagram :)
State Diagrams
Initiate an electionReceive an election message
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic classRepresents a coordination event in the election protocol.static classRepresents a coordination message used in the election protocol.Nested classes/interfaces inherited from interface ChannelInterceptor
ChannelInterceptor.InterceptorEvent -
Field Summary
FieldsModifier and TypeFieldDescriptionprotected static final byte[]Alive messageprotected static final byte[]Coordination confirmation, for blocking installationsprotected static final byte[]header for a coordination messageprotected static final byte[]Coordination requestprotected final AtomicBooleanFlag indicating whether a coordination message has been received.protected final ObjectMutex for election operations.protected MembershipOur nonblocking membershipprotected static final StringManagerString manager for internationalization.protected booleanWhether this interceptor has been started.protected final intStart service command value.protected MembershipThe suggested view during an election.protected UniqueIdindicates that we are running an election and this is the one we are runningprotected MembershipOur current viewprotected UniqueIdOut current viewIdprotected final longTime to wait for coordination timeoutFields inherited from class ChannelInterceptorBase
optionFlag -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionprotected booleanChecks if a member is alive using the default timeout.Creates a ChannelData object from a coordination message.voidFire an event.Returns coordinator if one is availablegetLocalMember(boolean incAlive) Intercepts theChannel.getLocalMember(boolean)methodIntercepts theChannel.getMember(Member)methodMember[]Intercepts theChannel.getMembers()methodMember[]getView()Returns the current view of members.Returns the current view ID.protected voidhalt()Block in/out messages while an election is going onprotected voidhandleMyToken(Member local, NonBlockingCoordinator.CoordinationMessage msg, Membership merged) Handles a coordination token that originated from the local member.protected voidhandleOtherToken(Member local, NonBlockingCoordinator.CoordinationMessage msg, Membership merged) Handles a coordination token from another member.protected voidHandles a coordination token, routing to the appropriate handler based on source.protected voidHandles a view confirmation message.protected booleanhasHigherPriority(Member[] complete, Member[] local) Checks if the complete membership has a higher priority leader than the local membership.booleanIntercepts theChannel.hasMembers()methodvoidTheheartbeat()method gets invoked periodically to allow interceptors to clean up resources, time out object and perform actions that are unrelated to sending/receiving data.booleanChecks if this member is the current coordinator.booleanChecks if this member has the highest priority in the membership.protected booleanChecks if a coordination message is a view confirmation.voidmemberAdded(Member member) A member was added to the groupvoidmemberAdded(Member member, boolean elect) Handles the addition of a new member to the group.protected booleanmemberAlive(Member mbr, long conTimeout) Checks if a member is alive by attempting a socket connection.voidmemberDisappeared(Member member) A member was removed from the group
If the member left voluntarily, the Member.getCommand will contain the Member.SHUTDOWN_PAYLOAD dataprotected MembershipMerges the incoming coordination message members with the current membership.voidThemessageReceivedis invoked when a message is received.protected voidProcesses an incoming coordination message.protected voidrelease()Release lock for in/out messages election is completedprotected voidsendElectionMsg(Member local, Member next, NonBlockingCoordinator.CoordinationMessage msg) Sends an election message to the next member in the ring.protected voidSends an election message to the next member inline, retrying on failure.voidsendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) ThesendMessagemethod is called when a message is being sent to one more destinations.protected voidInitializes the membership if not already set up.voidstart(int svc) Starts up the channel.voidstartElection(boolean force) Starts an election to determine the coordinator.voidstop(int svc) Shuts down the channel.protected voidWait for an election to endMethods inherited from class ChannelInterceptorBase
getChannel, getNext, getOptionFlag, getPrevious, okToProcess, setChannel, setNext, setOptionFlag, setPrevious
-
Field Details
-
sm
String manager for internationalization. -
COORD_HEADER
protected static final byte[] COORD_HEADERheader for a coordination message -
COORD_REQUEST
protected static final byte[] COORD_REQUESTCoordination request -
COORD_CONF
protected static final byte[] COORD_CONFCoordination confirmation, for blocking installations -
COORD_ALIVE
protected static final byte[] COORD_ALIVEAlive message -
waitForCoordMsgTimeout
protected final long waitForCoordMsgTimeoutTime to wait for coordination timeout- See Also:
-
view
Our current view -
viewId
Out current viewId -
membership
Our nonblocking membership -
suggestedviewId
indicates that we are running an election and this is the one we are running -
suggestedView
The suggested view during an election. -
started
protected volatile boolean startedWhether this interceptor has been started. -
startsvc
protected final int startsvcStart service command value.- See Also:
-
electionMutex
Mutex for election operations. -
coordMsgReceived
Flag indicating whether a coordination message has been received.
-
-
Constructor Details
-
NonBlockingCoordinator
public NonBlockingCoordinator()Constructs a new NonBlockingCoordinator.
-
-
Method Details
-
startElection
Starts an election to determine the coordinator.- Parameters:
force- Whether to force a new election even if one is in progress- Throws:
ChannelException- if an error occurs during the election
-
sendElectionMsg
protected void sendElectionMsg(Member local, Member next, NonBlockingCoordinator.CoordinationMessage msg) throws ChannelException Sends an election message to the next member in the ring.- Parameters:
local- The local membernext- The next member to send the message tomsg- The coordination message to send- Throws:
ChannelException- if sending fails
-
sendElectionMsgToNextInline
protected void sendElectionMsgToNextInline(Member local, NonBlockingCoordinator.CoordinationMessage msg) throws ChannelException Sends an election message to the next member inline, retrying on failure.- Parameters:
local- The local membermsg- The coordination message to send- Throws:
ChannelException- if sending fails to all members
-
createData
Creates a ChannelData object from a coordination message.- Parameters:
msg- The coordination messagelocal- The local member- Returns:
- The channel data containing the message
-
alive
Checks if a member is alive using the default timeout.- Parameters:
mbr- The member to check- Returns:
- true if the member is alive
-
memberAlive
Checks if a member is alive by attempting a socket connection.- Parameters:
mbr- The member to checkconTimeout- The connection timeout in milliseconds- Returns:
- true if the member is alive
-
mergeOnArrive
Merges the incoming coordination message members with the current membership.- Parameters:
msg- The coordination message- Returns:
- The merged membership
-
processCoordMessage
protected void processCoordMessage(NonBlockingCoordinator.CoordinationMessage msg) throws ChannelException Processes an incoming coordination message.- Parameters:
msg- The coordination message to process- Throws:
ChannelException- if processing fails
-
handleToken
protected void handleToken(NonBlockingCoordinator.CoordinationMessage msg, Membership merged) throws ChannelException Handles a coordination token, routing to the appropriate handler based on source.- Parameters:
msg- The coordination messagemerged- The merged membership- Throws:
ChannelException- if handling fails
-
handleMyToken
protected void handleMyToken(Member local, NonBlockingCoordinator.CoordinationMessage msg, Membership merged) throws ChannelException Handles a coordination token that originated from the local member.- Parameters:
local- The local membermsg- The coordination messagemerged- The merged membership- Throws:
ChannelException- if handling fails
-
handleOtherToken
protected void handleOtherToken(Member local, NonBlockingCoordinator.CoordinationMessage msg, Membership merged) throws ChannelException Handles a coordination token from another member.- Parameters:
local- The local membermsg- The coordination messagemerged- The merged membership- Throws:
ChannelException- if handling fails
-
handleViewConf
protected void handleViewConf(NonBlockingCoordinator.CoordinationMessage msg, Membership merged) throws ChannelException Handles a view confirmation message.- Parameters:
msg- The coordination messagemerged- The merged membership- Throws:
ChannelException- if handling fails
-
isViewConf
Checks if a coordination message is a view confirmation.- Parameters:
msg- The coordination message- Returns:
- true if the message is a view confirmation
-
hasHigherPriority
-
getCoordinator
-
getView
Returns the current view of members.- Returns:
- The array of members in the current view
-
getViewId
-
halt
protected void halt()Block in/out messages while an election is going on -
release
protected void release()Release lock for in/out messages election is completed -
waitForRelease
protected void waitForRelease()Wait for an election to end -
start
Description copied from interface:ChannelInterceptorStarts up the channel. This can be called multiple times for individual services to start The svc parameter can be the logical or value of any constants- Specified by:
startin interfaceChannelInterceptor- Overrides:
startin classChannelInterceptorBase- Parameters:
svc- one of:- Channel.DEFAULT - will start all services
- Channel.MBR_RX_SEQ - starts the membership receiver
- Channel.MBR_TX_SEQ - starts the membership broadcaster
- Channel.SND_TX_SEQ - starts the replication transmitter
- Channel.SND_RX_SEQ - starts the replication receiver
- Throws:
ChannelException- if a startup error occurs or the service is already started.- See Also:
-
stop
Description copied from interface:ChannelInterceptorShuts down the channel. This can be called multiple times for individual services to shut down. The svc parameter can be the logical or value of any constants- Specified by:
stopin interfaceChannelInterceptor- Overrides:
stopin classChannelInterceptorBase- Parameters:
svc- one of:- Channel.DEFAULT - will shut down all services
- Channel.MBR_RX_SEQ - stops the membership receiver
- Channel.MBR_TX_SEQ - stops the membership broadcaster
- Channel.SND_TX_SEQ - stops the replication transmitter
- Channel.SND_RX_SEQ - stops the replication receiver
- Throws:
ChannelException- if a startup error occurs or the service is already started.- See Also:
-
sendMessage
public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException Description copied from interface:ChannelInterceptorThesendMessagemethod is called when a message is being sent to one more destinations. The interceptor can modify any of the parameters and then pass on the message down the stack by invokinggetNext().sendMessage(destination,msg,payload).Alternatively the interceptor can stop the message from being sent by not invoking
getNext().sendMessage(destination,msg,payload).If the message is to be sent asynchronous the application can be notified of completion and errors by passing in an error handler attached to a payload object.
The ChannelMessage.getAddress contains Channel.getLocalMember, and can be overwritten to simulate a message sent from another node.
- Specified by:
sendMessagein interfaceChannelInterceptor- Overrides:
sendMessagein classChannelInterceptorBase- Parameters:
destination- Member[] - the destination for this messagemsg- ChannelMessage - the message to be sentpayload- InterceptorPayload - the payload, carrying an error handler and future useful data, can be null- Throws:
ChannelException- if a serialization error happens.- See Also:
-
messageReceived
Description copied from interface:ChannelInterceptorThemessageReceivedis invoked when a message is received.ChannelMessage.getAddress()is the sender, or the reply-to address if it has been overwritten.- Specified by:
messageReceivedin interfaceChannelInterceptor- Overrides:
messageReceivedin classChannelInterceptorBase- Parameters:
msg- ChannelMessage
-
memberAdded
Description copied from interface:MembershipListenerA member was added to the group- Specified by:
memberAddedin interfaceMembershipListener- Overrides:
memberAddedin classChannelInterceptorBase- Parameters:
member- Member - the member that was added
-
memberAdded
Handles the addition of a new member to the group.- Parameters:
member- The member that was addedelect- Whether to trigger an election
-
memberDisappeared
Description copied from interface:MembershipListenerA member was removed from the group
If the member left voluntarily, the Member.getCommand will contain the Member.SHUTDOWN_PAYLOAD data- Specified by:
memberDisappearedin interfaceMembershipListener- Overrides:
memberDisappearedin classChannelInterceptorBase- Parameters:
member- Member- See Also:
-
isHighest
public boolean isHighest()Checks if this member has the highest priority in the membership.- Returns:
- true if this member has the highest priority
-
isCoordinator
public boolean isCoordinator()Checks if this member is the current coordinator.- Returns:
- true if this member is the coordinator
-
heartbeat
public void heartbeat()Description copied from interface:ChannelInterceptorTheheartbeat()method gets invoked periodically to allow interceptors to clean up resources, time out object and perform actions that are unrelated to sending/receiving data.- Specified by:
heartbeatin interfaceChannelInterceptor- Specified by:
heartbeatin interfaceHeartbeat- Overrides:
heartbeatin classChannelInterceptorBase
-
hasMembers
public boolean hasMembers()Description copied from interface:ChannelInterceptorIntercepts theChannel.hasMembers()method- Specified by:
hasMembersin interfaceChannelInterceptor- Overrides:
hasMembersin classChannelInterceptorBase- Returns:
- boolean - if the channel has members in its membership group
- See Also:
-
getMembers
Description copied from interface:ChannelInterceptorIntercepts theChannel.getMembers()method- Specified by:
getMembersin interfaceChannelInterceptor- Overrides:
getMembersin classChannelInterceptorBase- Returns:
- the members
- See Also:
-
getMember
Description copied from interface:ChannelInterceptorIntercepts theChannel.getMember(Member)method- Specified by:
getMemberin interfaceChannelInterceptor- Overrides:
getMemberin classChannelInterceptorBase- Parameters:
mbr- Member- Returns:
- Member - the actual member information, including stay alive
- See Also:
-
getLocalMember
Description copied from interface:ChannelInterceptorIntercepts theChannel.getLocalMember(boolean)method- Specified by:
getLocalMemberin interfaceChannelInterceptor- Overrides:
getLocalMemberin classChannelInterceptorBase- Parameters:
incAlive- boolean- Returns:
- the member that represents this node
- See Also:
-
setupMembership
protected void setupMembership()Initializes the membership if not already set up. -
fireInterceptorEvent
Description copied from interface:ChannelInterceptorFire an event.- Specified by:
fireInterceptorEventin interfaceChannelInterceptor- Overrides:
fireInterceptorEventin classChannelInterceptorBase- Parameters:
event- the event
-