Lightweight Causal and Atomic Group Multicast

Kenneth Birman @ Cornell, Andre Schiper @ Switzerland, Pat Stephenson @ Cornell

ACM Trans. on Computer Systems 1991

1. Introduction

Why multicast is important? In message passing system--synchronization by messaging, multicast is the most complex form of messaging!

A simple example of how multicast is used in distributed computing

Multiple file servers among which files are replicated for availability

Read a file: read from any server having the file

Write or lock a file: send a multicast requesting update or lock to group of servers having the file

2. Group Types

Type of groups: Fig 1

Peer group

Client/Server group

Diffusion group - each server broadcasts to all servers and clients

Hierarchical group

Expected usage of group

Groups with many members

Dynamic change of membership

Overlapping groups

3. What makes this project unique?

Asynchronous atomic multicast which keeps causality order

Support every type of groups in section 2

4. Execution Model

4.1 Basic System Model

System = fixed set P of processes with disjoint address spaces

Group g

Subset of P

Process dynamically joins and leaves its group

Process leaves from all the groups it belongs when it finishes or aborts

Multicast

A process can multicast to a group it belongs :-(

In PVM, for example, a task can send broadcast to any group

A multicast to a group results in delivery to all the members of the group

View of group g

Dynamic member list of g

View sequence: sequence of membership snapshots, which changes by a process joining or leaving

Message transport layer: reliable point-to-point & multicast service

Transient message from a failed process is intercepted and discarded

Execution of a process = a partially ordered sequence of atomic events--all or nothing & not interleaved by another action--such as:

send(p, m, g): transmission of msg m by p to group g

rcv(p, m, g): reception of msg m addressed to group g(= sent by a process of group g) by process p

deliver(p, m, g): actual delivery of msg m to process p

Always happens after rcv, i.e. rcv(p, m, g) -> deliver(p, m, g): transport system can hold the msg for some reason, for example to keep causality order

System causality relation

If an event x occurs before y within a process, x -> y for the system

Every rcv occurs after send: send -> rcv

Liveness

A message is eventually received by a process unless the sender or receiver fails

Failure will be eventually detected by view

4.2 Virtual Synchrony Properties Required of Multicast Protocols

Virtual synchronous programming environment: user can program as if the system schedules one distributed event at a time

Aspects of virtual synchrony system

Expansion of the destination list and message delivery appear as a single, instantaneous event

Only and every members of a single view of the recipient group receive the message

Delivery atomicity: all operational receiver or none of them will receive the msg

Delivery order--synchronization of events

CBCAST order: send(m) -> send(m') => deliver(m) -> deliver(m'), for all recipients who receive both of m and m'

ABCAST order: deliver(p, m, g) -> deliver(p, m', g), for p in g => deliver(q, m, g) -> deliver(q, m', g), for all q in g

ABCAST does not talk about sending order -> No causality

Fault tolerance: fault atomicity + CBCAST delivery order

if send(m) -> send(m') and process p received m', then all receivers of m are guaranteed to receive m

4.3 Vector Time

How can we say that send(m) -> send(m')? We need some virtual time

VT(p): vector time of process p

Array of counters

At each send, VT[pid] is incremented by one

At each delivery of a message, the receiving process's VT[pid of sender] is updated by VT(msg)[pid of sender], if VT(receiver)[pid] < VT(msg)[pid]

VT of message

Initialized with the VT of the sending process

VT(sender)[sender pid] = # of previous messages sent by the process

VT(sender)[other pid] = updated values upon reception of messages

The comparison of VT: comparison of corresponding counters

5. The CBCAST and ABCAST Protocol

Single process group with fixed membership

5.1 CBCAST Protocol

CBCAST order can be said

send(x, m, a) -> send(y, m', b) => deliver(p, m, a) -> deliver(p, m', b), or

send(m) -> send(m') => deliver(p, m) -> deliver(p, m')

CBCAST protocol

if sender = receiver, deliver immediately

else, delay the delivery of the msg until

VT(msg)[pid of sender] = VT(receiver)[pid of sender] + 1, and

VT(msg)[other pid] <= VT(receiver)[other pid]

See Fig. 2

CBCAST always guarantees that causal order is kept

Proof: in terms of safety (never violate causal ordering) and liveness (never delay a message indefinitely)

5.2 Causal ABCAST Protocol

ABCAST does not keep causality order

Causal ABCAST: an modified ABCAST which keeps causality

Idea

one of receiving group members (token holder) delivers msgs in the causality order by treating every msg as CBCAST

other members deliver ABCAST msg in the same order as the token holder

Protocol

if sender = token holder, sends msgs as CBCAST -> the order of sends = order of receives

otherwise,

sends as undeliverable CBCAST msg ->

if receiver = token holder

-> deliver the msg as usual CBCAST

-> sends set-order msg, which describes the delivery order(=> requires unique msg id), as CBCAST

else

-> ABCAST msgs will be queue up at receivers, but other CBCAST msgs which causally precede these ABCAST msgs are delivered

-> upon receiving set-order CBCAST:

wait until all ABCAST msgs described in set-order are received and until other CBCAST preceding set-order msg

-> deliver ABCAST msgs in the order described in set-order msg

5.3 Multicast Stability

A CBCAST msg is said to stable, if it gets received by all recipients

Stability number n of a process: VT(msg)[pid] <= n, for all stable msg sent by the process

Stability number may be piggybacked with CBCAST msg or sent by itself

5.4 VT Compression

VT(msg) needs only carry vector timestamp fields that have changed since the last multicast by the sender

<- Receiver would have correct timestamp from the preceding msgs

VT compression could save nothing due to the additional field to record which indices are included in the timestamp

For bursty and highly localized traffic, VT compression can save a lot

5.5 Delivery Atomicity and Group Membership Changes

To achieve atomicity of msg delivery while group membership changes dynamically, several issues must be addressed

Virtually synchronous addressing: no process failure or departure from a group is assumed

A new view(i+1) is defined

-> each member sends 'flush(i+1) to every other members of the view: n2 msgs!--2n msg algorithm by using a flush coordinator

-> members of view(i+1) do not send any msg until 'flush' msg from all the other members are received

=> network is FIFO -> receiving all 'flush' msg means that the network has been virtually flushed

Reinitialization VT fields

After flushing the network, reinitialize VT fields to zero

From now on, we assume that view index is incorporated in msgs

Delivery atomicity and virtual synchrony when failure occur

Failure

Problem: Msg delivered to some but not all of recipients -> Solution: cache received msg & forward to other recipients if sender fails

Problem: Dead process won't send flush msg on a new view -> Solution: wait flush msg only from the intersection of the view and a new view which will be generated because of the dead process

Integrated solution:

Cache receiveds msg & forward to other recipients if sender fails

Only after forwarding all the pending msgs, send out the flush msg

Generate new msgs only after getting all the flush msgs from the intersection of ...(= processes which are still operational in subsequent view)

Departure

The only change to the case of failure is that departing process should be included in the flushing mechanism <- what if the departing process is the only one which had received a multicast whose sender now aborted

ABCAST ordering when token holder fails

How can the order in which token holder delivered ABCAST msgs be observed without receiving set-order msg?

6. Extensions to the Basic Protocol

6.1 Extension of CBCAST to Multiple Groups

VT counts the number of msgs sent -> with multiple groups, msg is delivered to some processes -> when a process receives a msg with VT[i] = k, it does NOT know how many msgs from process i will precede the msg -> Need VT for each group: denoted as VT(msg, group) or VT(sender/receiver, group)

Also stability number for each group

CBCAST protocol for multiple group

if sender = receiver, deliver immediately

else, delay the delivery of the msg until

VT(msg, sender's group)[pid of sender] = VT(receiver, sender's group)[pid of sender] + 1, and

VT(msg, sender's group)[other pid] <= VT(receiver, sender's group)[other pid]

VT(msg, g) <= VT(receiver, g), for other groups receiver belongs

See Fig. 4

=> VT for every group should be included in each msg -> 6.3 Extended VT Compression

6.2 Multiple-Group ABCAST

Causal ABCAST which uses CBCAST no more keeps the order of delivery among different groups -> Need global ordering, but little practical and out of scope of this paper

6.3 Extended VT Compression

VT Compression by sending only delta: changed field, even entire VT for a group can be omitted

By changing the CBCAST protocol as to deliver concurrent messages in causality in the order of reception, VT field can be further compressed

Locality will increase the efficiency of compression

6.4 Garbage Collection of Vector Timestamps

Message carries VT for (virtually) all groups in the system -> Processes hold the VT of groups which do not belongs indefinitely -> Big overhead

Solution: Periodic flush(= setting fields of VT to zero) for every active group & throwing away VT for groups not being refreshed within the timeout

6.5 Atomicity and Group Membership Changes

(1) Virtually synchronous addressing: no process failure or departure from a group is assumed => 5.5 method still works

(3) Delivery atomicity and virtual synchrony when failure occur

An example of the problem newly introduced by multiple groups: see page 295

Solution: Do not send new multicast to another group until all multicast to the current group becomes stable -> less concurrency

6.6 Use of Communication Structure

VT carried with msgs is still headache

If communication structure is known in advance to have a particular pattern, the number of VTs can be reduced drastically

Communication pattern: graph with vertex = group, edge connecting two group iff two group shares members

Biconnected component = subgraph with every pair of vertices connected by at least two paths

k-bounded: max(|biconnected component|) = k

Theorem: If a system uses the VT protocol to maintain causality, it is sufficient and necessary for a process p to maintain and transmit those VT timestamps corresponding to groups in the biconnected component to which p belongs

Acyclic graph: VTs conrresponding to groups to which p belongs are enough

6.7 Extensions to Arbitrary, Dynamic Communication Structures

If process joins and leaves group dynamically, it is almost impossible to get the graph of 6.6

Conservative solution

A process may multicast to group g iff g is the only active group for p or p has no active group: only VT for g is required

Delay should be inserted before multicasting a msg to a new group until the current active group becomes inactive

Multicast epochs: process leaves a group only due to failure

Pessimistic diagnosis whether a group is in a cycle: for example, multiple neighbors -> in a cycle

If a process was received a msg addressed to group g and is going to send one to group g' & g or g' is in a cycle

-> flush g so that all the msgs addressed to g are forced to be delivered before a msg addressed to g' gets injected into the network

else, use only VT for the groups to which the process belongs(= acyclic rule)

(2) Multicast epochs: process leaves a group for other reasons, remaining active and possibly joining other groups

Problem: the current snapshot of communication pattern does not reflect the causality

For example, a process p has sent a msg m to a group g, leaves g and joins g', and then sends msg m' to g': m -> m' but the current graph does not say anything about g

Insert flush whenever a process leaves a group and before it joins another one

7. Applying the Protocols to ISIS

7.1 Optimization for Client/Server Groups

Client/Server model in ISIS

forms a complete graph -> size of timestamp = O(number of servers * number of clients)

Using the fact that clients do not communicate each other, timestamp size can be reduced to O(number of servers + number of clients)

Diffusion model

Clients do not send any msg -> O(number of servers)

Hierarchical model: asynch -> O(number of members of subgroup)

7.2 Point-to-Point Messages

ISIS uses RPC for point-to-point message

RCP blocks sender until ack arrives

=> no need to worry about the causality: RPC -> multicast sent by the same process

=> but need to worry about the causality: multicast -> RPC & RPC -> multicast by other processes

-> use VT with RPC too

7.3 System Interface Issues

How to inform the system of application structure, such as acyclic comm pattern? Current ISIS uses conservative method to detect comm pattern.

Possible extension: introduction of causality domain

Each domain is composed of groups, uses different policy about causality

Interdomain causality is forced by flush

7.4 Group View Management

How should be membership changes detected and managed? Use CBCAST to inform membership change -> flush

Also need a mechanism to detect a failure of a member. In another paper

Evaluation

Is causality only in terms of msg being sent or received enough?

VT is too complex and heavy