Class NonBlockingCoordinator

java.lang.Object
org.apache.catalina.tribes.group.ChannelInterceptorBase
org.apache.catalina.tribes.group.interceptors.NonBlockingCoordinator
All Implemented Interfaces:
ChannelInterceptor, Heartbeat, MembershipListener

public class NonBlockingCoordinator extends ChannelInterceptorBase

Title: Auto merging leader election algorithm

Description: Implementation of a simple coordinator algorithm that not only selects a coordinator, it also merges groups automatically when members are discovered that weren't part of the

This algorithm is non blocking meaning it allows for transactions while the coordination phase is going on

This implementation is based on a home brewed algorithm that uses the AbsoluteOrder of a membership to pass a token ring of the current membership.
This is not the same as just using AbsoluteOrder! Consider the following scenario:
Nodes, A,B,C,D,E on a network, in that priority. AbsoluteOrder will only work if all nodes are receiving pings from all the other nodes. meaning, that node{i} receives pings from node{all}-node{i}
but the following could happen if a multicast problem occurs. A has members {B,C,D}
B has members {A,C}
C has members {D,E}
D has members {A,B,C,E}
E has members {A,C,D}
Because the default Tribes membership implementation, relies on the multicast packets to arrive at all nodes correctly, there is nothing guaranteeing that it will.

To best explain how this algorithm works, lets take the above example: For simplicity we assume that a send operation is O(1) for all nodes, although this algorithm will work where messages overlap, as they all depend on absolute order
Scenario 1: A,B,C,D,E all come online at the same time Eval phase, A thinks of itself as leader, B thinks of A as leader, C thinks of itself as leader, D,E think of A as leader
Token phase:
(1) A sends out a message X{A-ldr, A-src, mbrs-A,B,C,D} to B where X is the id for the message(and the view)
(1) C sends out a message Y{C-ldr, C-src, mbrs-C,D,E} to D where Y is the id for the message(and the view)
(2) B receives X{A-ldr, A-src, mbrs-A,B,C,D}, sends X{A-ldr, A-src, mbrs-A,B,C,D} to C
(2) D receives Y{C-ldr, C-src, mbrs-C,D,E} D is aware of A,B, sends Y{A-ldr, C-src, mbrs-A,B,C,D,E} to E
(3) C receives X{A-ldr, A-src, mbrs-A,B,C,D}, sends X{A-ldr, A-src, mbrs-A,B,C,D,E} to D
(3) E receives Y{A-ldr, C-src, mbrs-A,B,C,D,E} sends Y{A-ldr, C-src, mbrs-A,B,C,D,E} to A
(4) D receives X{A-ldr, A-src, mbrs-A,B,C,D,E} sends sends X{A-ldr, A-src, mbrs-A,B,C,D,E} to A
(4) A receives Y{A-ldr, C-src, mbrs-A,B,C,D,E}, holds the message, add E to its list of members
(5) A receives X{A-ldr, A-src, mbrs-A,B,C,D,E}
At this point, the state looks like
A - {A-ldr, mbrs-A,B,C,D,E, id=X}
B - {A-ldr, mbrs-A,B,C,D, id=X}
C - {A-ldr, mbrs-A,B,C,D,E, id=X}
D - {A-ldr, mbrs-A,B,C,D,E, id=X}
E - {A-ldr, mbrs-A,B,C,D,E, id=Y}

A message doesn't stop until it reaches its original sender, unless its dropped by a higher leader. As you can see, E still thinks the viewId=Y, which is not correct. But at this point we have arrived at the same membership and all nodes are informed of each other.
To synchronize the rest we simply perform the following check at A when A receives X:
Original X{A-ldr, A-src, mbrs-A,B,C,D} == Arrived X{A-ldr, A-src, mbrs-A,B,C,D,E}
Since the condition is false, A, will resend the token, and A sends X{A-ldr, A-src, mbrs-A,B,C,D,E} to B When A receives X again, the token is complete.
Optionally, A can send a message X{A-ldr, A-src, mbrs-A,B,C,D,E confirmed} to A,B,C,D,E who then install and accept the view.

Let's assume that C1 arrives, C1 has lower priority than C, but higher priority than D.
Let's also assume that C1 sees the following view {B,D,E}
C1 waits for a token to arrive. When the token arrives, the same scenario as above will happen.
In the scenario where C1 sees {D,E} and A,B,C cannot see C1, no token will ever arrive.
In this case, C1 sends a Z{C1-ldr, C1-src, mbrs-C1,D,E} to D
D receives Z{C1-ldr, C1-src, mbrs-C1,D,E} and sends Z{A-ldr, C1-src, mbrs-A,B,C,C1,D,E} to E
E receives Z{A-ldr, C1-src, mbrs-A,B,C,C1,D,E} and sends it to A
A sends Z{A-ldr, A-src, mbrs-A,B,C,C1,D,E} to B and the chain continues until A receives the token again. At that time A optionally sends out Z{A-ldr, A-src, mbrs-A,B,C,C1,D,E, confirmed} to A,B,C,C1,D,E

To ensure that the view gets implemented at all nodes at the same time, A will send out a VIEW_CONF message, this is the 'confirmed' message that is optional above.

Ideally, the interceptor below this one would be the TcpFailureDetector to ensure correct memberships

The example above, of course can be simplified with a finite statemachine:
But I suck at writing state machines, my head gets all confused. One day I will document this algorithm though.
Maybe I'll do a state diagram :)

State Diagrams

Initiate an election

Receive an election message

  • Field Details

    • sm

      protected static final StringManager sm
      String manager for internationalization.
    • COORD_HEADER

      protected static final byte[] COORD_HEADER
      header for a coordination message
    • COORD_REQUEST

      protected static final byte[] COORD_REQUEST
      Coordination request
    • COORD_CONF

      protected static final byte[] COORD_CONF
      Coordination confirmation, for blocking installations
    • COORD_ALIVE

      protected static final byte[] COORD_ALIVE
      Alive message
    • waitForCoordMsgTimeout

      protected final long waitForCoordMsgTimeout
      Time to wait for coordination timeout
      See Also:
    • view

      protected volatile Membership view
      Our current view
    • viewId

      protected UniqueId viewId
      Out current viewId
    • membership

      protected Membership membership
      Our nonblocking membership
    • suggestedviewId

      protected UniqueId suggestedviewId
      indicates that we are running an election and this is the one we are running
    • suggestedView

      protected volatile Membership suggestedView
      The suggested view during an election.
    • started

      protected volatile boolean started
      Whether this interceptor has been started.
    • startsvc

      protected final int startsvc
      Start service command value.
      See Also:
    • electionMutex

      protected final Object electionMutex
      Mutex for election operations.
    • coordMsgReceived

      protected final AtomicBoolean coordMsgReceived
      Flag indicating whether a coordination message has been received.
  • Constructor Details

    • NonBlockingCoordinator

      public NonBlockingCoordinator()
      Constructs a new NonBlockingCoordinator.
  • Method Details

    • startElection

      public void startElection(boolean force) throws ChannelException
      Starts an election to determine the coordinator.
      Parameters:
      force - Whether to force a new election even if one is in progress
      Throws:
      ChannelException - if an error occurs during the election
    • sendElectionMsg

      protected void sendElectionMsg(Member local, Member next, NonBlockingCoordinator.CoordinationMessage msg) throws ChannelException
      Sends an election message to the next member in the ring.
      Parameters:
      local - The local member
      next - The next member to send the message to
      msg - The coordination message to send
      Throws:
      ChannelException - if sending fails
    • sendElectionMsgToNextInline

      protected void sendElectionMsgToNextInline(Member local, NonBlockingCoordinator.CoordinationMessage msg) throws ChannelException
      Sends an election message to the next member inline, retrying on failure.
      Parameters:
      local - The local member
      msg - The coordination message to send
      Throws:
      ChannelException - if sending fails to all members
    • createData

      Creates a ChannelData object from a coordination message.
      Parameters:
      msg - The coordination message
      local - The local member
      Returns:
      The channel data containing the message
    • alive

      protected boolean alive(Member mbr)
      Checks if a member is alive using the default timeout.
      Parameters:
      mbr - The member to check
      Returns:
      true if the member is alive
    • memberAlive

      protected boolean memberAlive(Member mbr, long conTimeout)
      Checks if a member is alive by attempting a socket connection.
      Parameters:
      mbr - The member to check
      conTimeout - The connection timeout in milliseconds
      Returns:
      true if the member is alive
    • mergeOnArrive

      Merges the incoming coordination message members with the current membership.
      Parameters:
      msg - The coordination message
      Returns:
      The merged membership
    • processCoordMessage

      protected void processCoordMessage(NonBlockingCoordinator.CoordinationMessage msg) throws ChannelException
      Processes an incoming coordination message.
      Parameters:
      msg - The coordination message to process
      Throws:
      ChannelException - if processing fails
    • handleToken

      protected void handleToken(NonBlockingCoordinator.CoordinationMessage msg, Membership merged) throws ChannelException
      Handles a coordination token, routing to the appropriate handler based on source.
      Parameters:
      msg - The coordination message
      merged - The merged membership
      Throws:
      ChannelException - if handling fails
    • handleMyToken

      protected void handleMyToken(Member local, NonBlockingCoordinator.CoordinationMessage msg, Membership merged) throws ChannelException
      Handles a coordination token that originated from the local member.
      Parameters:
      local - The local member
      msg - The coordination message
      merged - The merged membership
      Throws:
      ChannelException - if handling fails
    • handleOtherToken

      protected void handleOtherToken(Member local, NonBlockingCoordinator.CoordinationMessage msg, Membership merged) throws ChannelException
      Handles a coordination token from another member.
      Parameters:
      local - The local member
      msg - The coordination message
      merged - The merged membership
      Throws:
      ChannelException - if handling fails
    • handleViewConf

      protected void handleViewConf(NonBlockingCoordinator.CoordinationMessage msg, Membership merged) throws ChannelException
      Handles a view confirmation message.
      Parameters:
      msg - The coordination message
      merged - The merged membership
      Throws:
      ChannelException - if handling fails
    • isViewConf

      protected boolean isViewConf(NonBlockingCoordinator.CoordinationMessage msg)
      Checks if a coordination message is a view confirmation.
      Parameters:
      msg - The coordination message
      Returns:
      true if the message is a view confirmation
    • hasHigherPriority

      protected boolean hasHigherPriority(Member[] complete, Member[] local)
      Checks if the complete membership has a higher priority leader than the local membership.
      Parameters:
      complete - The complete membership
      local - The local membership
      Returns:
      true if the complete membership has higher priority
    • getCoordinator

      public Member getCoordinator()
      Returns coordinator if one is available
      Returns:
      Member
    • getView

      public Member[] getView()
      Returns the current view of members.
      Returns:
      The array of members in the current view
    • getViewId

      public UniqueId getViewId()
      Returns the current view ID.
      Returns:
      The view ID
    • halt

      protected void halt()
      Block in/out messages while an election is going on
    • release

      protected void release()
      Release lock for in/out messages election is completed
    • waitForRelease

      protected void waitForRelease()
      Wait for an election to end
    • start

      public void start(int svc) throws ChannelException
      Description copied from interface: ChannelInterceptor
      Starts up the channel. This can be called multiple times for individual services to start The svc parameter can be the logical or value of any constants
      Specified by:
      start in interface ChannelInterceptor
      Overrides:
      start in class ChannelInterceptorBase
      Parameters:
      svc - one of:
      • Channel.DEFAULT - will start all services
      • Channel.MBR_RX_SEQ - starts the membership receiver
      • Channel.MBR_TX_SEQ - starts the membership broadcaster
      • Channel.SND_TX_SEQ - starts the replication transmitter
      • Channel.SND_RX_SEQ - starts the replication receiver
      Throws:
      ChannelException - if a startup error occurs or the service is already started.
      See Also:
    • stop

      public void stop(int svc) throws ChannelException
      Description copied from interface: ChannelInterceptor
      Shuts down the channel. This can be called multiple times for individual services to shut down. The svc parameter can be the logical or value of any constants
      Specified by:
      stop in interface ChannelInterceptor
      Overrides:
      stop in class ChannelInterceptorBase
      Parameters:
      svc - one of:
      • Channel.DEFAULT - will shut down all services
      • Channel.MBR_RX_SEQ - stops the membership receiver
      • Channel.MBR_TX_SEQ - stops the membership broadcaster
      • Channel.SND_TX_SEQ - stops the replication transmitter
      • Channel.SND_RX_SEQ - stops the replication receiver
      Throws:
      ChannelException - if a startup error occurs or the service is already started.
      See Also:
    • sendMessage

      public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException
      Description copied from interface: ChannelInterceptor
      The sendMessage method is called when a message is being sent to one more destinations. The interceptor can modify any of the parameters and then pass on the message down the stack by invoking getNext().sendMessage(destination,msg,payload).

      Alternatively the interceptor can stop the message from being sent by not invoking getNext().sendMessage(destination,msg,payload).

      If the message is to be sent asynchronous the application can be notified of completion and errors by passing in an error handler attached to a payload object.

      The ChannelMessage.getAddress contains Channel.getLocalMember, and can be overwritten to simulate a message sent from another node.

      Specified by:
      sendMessage in interface ChannelInterceptor
      Overrides:
      sendMessage in class ChannelInterceptorBase
      Parameters:
      destination - Member[] - the destination for this message
      msg - ChannelMessage - the message to be sent
      payload - InterceptorPayload - the payload, carrying an error handler and future useful data, can be null
      Throws:
      ChannelException - if a serialization error happens.
      See Also:
    • messageReceived

      public void messageReceived(ChannelMessage msg)
      Description copied from interface: ChannelInterceptor
      The messageReceived is invoked when a message is received. ChannelMessage.getAddress() is the sender, or the reply-to address if it has been overwritten.
      Specified by:
      messageReceived in interface ChannelInterceptor
      Overrides:
      messageReceived in class ChannelInterceptorBase
      Parameters:
      msg - ChannelMessage
    • memberAdded

      public void memberAdded(Member member)
      Description copied from interface: MembershipListener
      A member was added to the group
      Specified by:
      memberAdded in interface MembershipListener
      Overrides:
      memberAdded in class ChannelInterceptorBase
      Parameters:
      member - Member - the member that was added
    • memberAdded

      public void memberAdded(Member member, boolean elect)
      Handles the addition of a new member to the group.
      Parameters:
      member - The member that was added
      elect - Whether to trigger an election
    • memberDisappeared

      public void memberDisappeared(Member member)
      Description copied from interface: MembershipListener
      A member was removed from the group
      If the member left voluntarily, the Member.getCommand will contain the Member.SHUTDOWN_PAYLOAD data
      Specified by:
      memberDisappeared in interface MembershipListener
      Overrides:
      memberDisappeared in class ChannelInterceptorBase
      Parameters:
      member - Member
      See Also:
    • isHighest

      public boolean isHighest()
      Checks if this member has the highest priority in the membership.
      Returns:
      true if this member has the highest priority
    • isCoordinator

      public boolean isCoordinator()
      Checks if this member is the current coordinator.
      Returns:
      true if this member is the coordinator
    • heartbeat

      public void heartbeat()
      Description copied from interface: ChannelInterceptor
      The heartbeat() method gets invoked periodically to allow interceptors to clean up resources, time out object and perform actions that are unrelated to sending/receiving data.
      Specified by:
      heartbeat in interface ChannelInterceptor
      Specified by:
      heartbeat in interface Heartbeat
      Overrides:
      heartbeat in class ChannelInterceptorBase
    • hasMembers

      public boolean hasMembers()
      Description copied from interface: ChannelInterceptor
      Intercepts the Channel.hasMembers() method
      Specified by:
      hasMembers in interface ChannelInterceptor
      Overrides:
      hasMembers in class ChannelInterceptorBase
      Returns:
      boolean - if the channel has members in its membership group
      See Also:
    • getMembers

      public Member[] getMembers()
      Description copied from interface: ChannelInterceptor
      Intercepts the Channel.getMembers() method
      Specified by:
      getMembers in interface ChannelInterceptor
      Overrides:
      getMembers in class ChannelInterceptorBase
      Returns:
      the members
      See Also:
    • getMember

      public Member getMember(Member mbr)
      Description copied from interface: ChannelInterceptor
      Intercepts the Channel.getMember(Member) method
      Specified by:
      getMember in interface ChannelInterceptor
      Overrides:
      getMember in class ChannelInterceptorBase
      Parameters:
      mbr - Member
      Returns:
      Member - the actual member information, including stay alive
      See Also:
    • getLocalMember

      public Member getLocalMember(boolean incAlive)
      Description copied from interface: ChannelInterceptor
      Intercepts the Channel.getLocalMember(boolean) method
      Specified by:
      getLocalMember in interface ChannelInterceptor
      Overrides:
      getLocalMember in class ChannelInterceptorBase
      Parameters:
      incAlive - boolean
      Returns:
      the member that represents this node
      See Also:
    • setupMembership

      protected void setupMembership()
      Initializes the membership if not already set up.
    • fireInterceptorEvent

      public void fireInterceptorEvent(ChannelInterceptor.InterceptorEvent event)
      Description copied from interface: ChannelInterceptor
      Fire an event.
      Specified by:
      fireInterceptorEvent in interface ChannelInterceptor
      Overrides:
      fireInterceptorEvent in class ChannelInterceptorBase
      Parameters:
      event - the event