|
Go up to Top Go forward to Footnotes |
|
// --------------------------------------------------------------------------
// $Id: snapshot.tex,v 1.3 1997/11/12 21:59:23 schreine Exp schreine $
// the Chandy-Lamport algorithm for taking distributed snapshots
//
// (c) 1997, Wolfgang Schreiner <Wolfgang.Schreiner@risc.uni-linz.ac.at>
// http://www.risc.uni-linz.ac.at/software/daj
// --------------------------------------------------------------------------
import daj.*;
import java.util.*;
// ----------------------------------------------------------------------------
//
// the application itself
//
// ----------------------------------------------------------------------------
public class Main extends Application
{
// --------------------------------------------------------------------------
// main function of application
// --------------------------------------------------------------------------
public static void main(String[] args)
{
new Main().run();
}
// --------------------------------------------------------------------------
// constructor for application
// --------------------------------------------------------------------------
public Main()
{
super("Distributed Snapshots", 580, 450);
}
// --------------------------------------------------------------------------
// construction of a hexagon network (just for fun)
// --------------------------------------------------------------------------
public void construct()
{
// length of first row and half height of hexagon
int w = 3;
int h = 2;
int size = (w+h-1)*(w+h)-w*(w-1)+(w+h);
Node nodes[][] = new Node[w+h][2*h+1];
// left upper corner and vertical/horizontal distance
int x0 = 170;
int y0 = 50;
int x = 100;
int y = 86;
// random generator for initialization of deposit
Random random = new Random();
// create and place nodes
int number = 0;
for (int j = 0; j < h+1; j++)
{
for (int i = 0; i < w+j; i++)
{
int deposit = Math.abs(random.nextInt())%1000;
nodes[i][j] =
node(new Prog(size, number, deposit), String.valueOf(number),
x0+i*x-j*x/2, y0+j*y);
number++;
}
}
for (int j = h+1; j < 2*h+1; j++)
{
for (int i = 0; i < w+2*h-j; i++)
{
int deposit = Math.abs(random.nextInt())%1000;
nodes[i][j] =
node(new Prog(size, number, deposit), String.valueOf(number),
x0+i*x-(2*h-j)*x/2, y0+j*y);
number++;
}
}
// link nodes
for (int j = 0; j < h+1; j++)
{
for (int i = 0; i < w+j; i++)
{
if (i < w+j-1) link(nodes[i][j], nodes[i+1][j]);
if (i > 0) link(nodes[i][j], nodes[i-1][j]);
if (j < h)
{
link(nodes[i][j], nodes[i][j+1]);
link(nodes[i][j], nodes[i+1][j+1]);
}
else
{
if (i < w+j-1) link(nodes[i][j], nodes[i][j+1]);
if (i> 0) link(nodes[i][j], nodes[i-1][j+1]);
}
if (j > 0)
{
if (i < w+j-1) link(nodes[i][j], nodes[i][j-1]);
if (i > 0) link(nodes[i][j], nodes[i-1][j-1]);
}
}
}
for (int j = h+1; j < 2*h+1; j++)
{
for (int i = 0; i < w+2*h-j; i++)
{
if (i < w+2*h-j-1) link(nodes[i][j], nodes[i+1][j]);
if (i > 0) link(nodes[i][j], nodes[i-1][j]);
if (j < 2*h)
{
if (i > 0) link(nodes[i][j], nodes[i-1][j+1]);
if (i < w+2*h-j-1) link(nodes[i][j], nodes[i][j+1]);
}
if (j > 0)
{
link(nodes[i][j], nodes[i][j-1]);
link(nodes[i][j], nodes[i+1][j-1]);
}
}
}
}
// --------------------------------------------------------------------------
// informative message
// --------------------------------------------------------------------------
public String getText()
{
return "Distributed Snapshots\n \n" +
"The Chandy-Lamport algorithm\n" +
"for taking consistent global snapshots\n" +
"of a running network.";
}
}
// ----------------------------------------------------------------------------
//
// a program class
//
// ----------------------------------------------------------------------------
class Prog extends Program
{
// state variables
public int size; // network size
public int number; // current node number
public int deposit; // current deposit
public Message message; // message currently being processed
public int index; // index of channel from which message was received
public int mode; // current execution mode
// snapped local state
public int snapValue; // current snap value
public boolean snapped[]; // snap state of input channels
public int missingSnaps; // number of missing snaps of input channels
// snapped total state
public int totalValue; // current network value
public boolean done[]; // true iff node has reported its snap value
public int missingValues; // number of outstanding node reports
// execution modes
private final int RUNNING = 0; // initial state
private final int SNAPPING = 1; // constructing snapshot
private final int BROADCASTING = 2; // broadcasting result
private final int TERMINATED = 3; // terminal state
// --------------------------------------------------------------------------
// construct program in network of size `s` at node `n` with deposit `d`
// --------------------------------------------------------------------------
public Prog(int s, int n, int d)
{
size = s;
number = n;
deposit = d;
message = null;
mode = RUNNING;
}
// --------------------------------------------------------------------------
// return random integer `r` with 0 <= `r` < `n`
// --------------------------------------------------------------------------
private Random rand = new Random();
private int random(int n)
{
return Math.abs(rand.nextInt())%n;
}
// --------------------------------------------------------------------------
// go into snapping mode
// --------------------------------------------------------------------------
public void snap()
{
// go into snap mode, store local deposit, reset total value
mode = SNAPPING;
snapValue = deposit;
totalValue = 0;
// initialize snapping state of channels
int s = in().getSize();
snapped = new boolean[s];
for (int i = 0; i < s; i++)
snapped[i] = false;
missingSnaps = s;
// initialize report state of nodes
done = new boolean[size];
for (int i = 0; i < size; i++)
done[i] = false;
missingValues = size;
}
// --------------------------------------------------------------------------
// collect `value` from `sender`
// --------------------------------------------------------------------------
public void collect(int sender, int value)
{
totalValue += value;
done[sender] = true;
missingValues--;
if (missingValues == 0)
mode = TERMINATED;
}
// --------------------------------------------------------------------------
// main function of program
// --------------------------------------------------------------------------
public void main()
{
while (mode != TERMINATED)
{
// node 0 triggers snapping at its own will
if (number == 0 && mode == RUNNING && random(10) == 0)
{
snap();
interrupt();
out().send(new SnapMessage());
}
// wait some time for message
index = in().select(random(5));
if (index == -1 && deposit > 0)
{
// ---------------------------------------------------------------
// send some money from deposit to some neighbor node
// ---------------------------------------------------------------
int value = random(deposit);
deposit -= value;
message = new ValueMessage(value);
index = random(out().getSize());
out(index).send(message);
message = null;
continue;
}
// message was received
message = in(index).receive();
// ---------------------------------------------------------------
// value message was received
// ---------------------------------------------------------------
if (message instanceof ValueMessage)
{
ValueMessage valueMessage = (ValueMessage)message;
int value = valueMessage.getValue();
deposit += value;
// snap message
if (mode == SNAPPING && !snapped[index])
{
snapValue += value;
}
}
// ---------------------------------------------------------------
// snap message was received
// ---------------------------------------------------------------
else if (message instanceof SnapMessage)
{
SnapMessage snapMessage = (SnapMessage)message;
if (mode == RUNNING)
{
snap();
out().send(snapMessage);
}
if (!snapped[index])
{
snapped[index] = true;
missingSnaps--;
if (missingSnaps == 0)
{
mode = BROADCASTING;
collect(number, snapValue);
if (number == 0) interrupt();
out().send(new ResultMessage(number, snapValue));
}
}
}
// ---------------------------------------------------------------
// result message was received
// ---------------------------------------------------------------
else if (message instanceof ResultMessage)
{
ResultMessage resultMessage = (ResultMessage)message;
int sender = resultMessage.getSender();
if (!done[sender])
{
collect(sender, resultMessage.getValue());
out().send(resultMessage);
}
}
// message was handled
message = null;
}
// assert that total amount of money equals the determined value
assert(new TotalValue(totalValue));
}
// --------------------------------------------------------------------------
// called for display of program state
// --------------------------------------------------------------------------
public String getText()
{
String messageString;
if (message == null)
messageString = "(null)";
else
messageString = message.getText();
String modeString;
switch (mode)
{
case RUNNING:
{
modeString = "RUNNING";
break;
}
case SNAPPING:
{
modeString = "SNAPPING";
break;
}
case BROADCASTING:
{
modeString = "BROADCASTING";
break;
}
case TERMINATED:
{
modeString = "TERMINATED";
break;
}
default:
{
modeString = "(unknown)";
break;
}
}
if (message == null)
messageString = "(null)";
else
messageString = message.getText();
if (mode == RUNNING)
return
"mode: " + modeString +
"\ndeposit: " + String.valueOf(deposit);
else if (mode == SNAPPING)
return
"mode: " + modeString +
"\ndeposit: " + String.valueOf(deposit) +
"\nsnapValue: " + String.valueOf(snapValue) +
"\nmissingSnaps: " + String.valueOf(missingSnaps);
else if (mode == BROADCASTING)
return
"mode: " + modeString +
"\ndeposit: " + String.valueOf(deposit) +
"\nsnapValue: " + String.valueOf(snapValue) +
"\ntotalValue: " + String.valueOf(totalValue) +
"\nmissingValues: " + String.valueOf(missingValues);
else
return
"mode: " + modeString +
"\ndeposit: " + String.valueOf(deposit) +
"\nsnapValue: " + String.valueOf(snapValue) +
"\ntotalValue: " + String.valueOf(totalValue);
}
}
// ----------------------------------------------------------------------------
//
// the message classes
//
// ----------------------------------------------------------------------------
// ----------------------------------------------------------------------------
// carries `value`
// ----------------------------------------------------------------------------
class ValueMessage extends Message
{
private int value;
public ValueMessage(int v)
{
value = v;
}
public int getValue()
{
return value;
}
public String getText()
{
return "VALUE[" + String.valueOf(value) + "]";
}
}
// ----------------------------------------------------------------------------
// carries no information
// ----------------------------------------------------------------------------
class SnapMessage extends Message
{
public String getText()
{
return "SNAP";
}
}
// ----------------------------------------------------------------------------
// carries `value` of `sender`
// ----------------------------------------------------------------------------
class ResultMessage extends Message
{
private int sender;
private int value;
public ResultMessage(int s, int v)
{
sender = s;
value = v;
}
public int getSender()
{
return sender;
}
public int getValue()
{
return value;
}
public String getText()
{
return "RESULT[" + String.valueOf(sender) + ", " +
String.valueOf(value) + "]";
}
}
// ----------------------------------------------------------------------------
//
// global assertion stating that total values of network equals `value`
//
// ----------------------------------------------------------------------------
class TotalValue extends GlobalAssertion
{
int value; // expected value
int sum; // counted value
public TotalValue(int v)
{
value = v;
}
public String getText()
{
return "invalid value: " + String.valueOf(sum) +
"(" + String.valueOf(value) + " expected)";
}
// --------------------------------------------------------------------------
// add values of all messages in `msg`
// --------------------------------------------------------------------------
private int add(Message msg[])
{
int s = 0;
for (int i = 0; i < msg.length; i++)
{
if (msg[i] instanceof ValueMessage)
{
s += ((ValueMessage)msg[i]).getValue();
}
}
return s;
}
// --------------------------------------------------------------------------
// check whether total value of network equals `value`
// --------------------------------------------------------------------------
public boolean assert(Program prog[])
{
sum = 0;
for (int j = 0; j < prog.length; j++)
{
Prog program = (Prog)prog[j];
// value deposited in node
sum += program.deposit;
// value in output channels
int n = program.out().getSize();
for (int i = 0; i < n; i++)
{
Message msg[] = getMessages(program.out(i));
sum += add(msg);
}
// value in pending message
if (program.message != null && program.message instanceof ValueMessage)
{
sum += ((ValueMessage)program.message).getValue();
}
}
return sum == value;
}
}