Go up to Top Go forward to Footnotes |
// -------------------------------------------------------------------------- // $Id: snapshot.tex,v 1.3 1997/11/12 21:59:23 schreine Exp schreine $ // the Chandy-Lamport algorithm for taking distributed snapshots // // (c) 1997, Wolfgang Schreiner <Wolfgang.Schreiner@risc.uni-linz.ac.at> // http://www.risc.uni-linz.ac.at/software/daj // -------------------------------------------------------------------------- import daj.*; import java.util.*; // ---------------------------------------------------------------------------- // // the application itself // // ---------------------------------------------------------------------------- public class Main extends Application { // -------------------------------------------------------------------------- // main function of application // -------------------------------------------------------------------------- public static void main(String[] args) { new Main().run(); } // -------------------------------------------------------------------------- // constructor for application // -------------------------------------------------------------------------- public Main() { super("Distributed Snapshots", 580, 450); } // -------------------------------------------------------------------------- // construction of a hexagon network (just for fun) // -------------------------------------------------------------------------- public void construct() { // length of first row and half height of hexagon int w = 3; int h = 2; int size = (w+h-1)*(w+h)-w*(w-1)+(w+h); Node nodes[][] = new Node[w+h][2*h+1]; // left upper corner and vertical/horizontal distance int x0 = 170; int y0 = 50; int x = 100; int y = 86; // random generator for initialization of deposit Random random = new Random(); // create and place nodes int number = 0; for (int j = 0; j < h+1; j++) { for (int i = 0; i < w+j; i++) { int deposit = Math.abs(random.nextInt())%1000; nodes[i][j] = node(new Prog(size, number, deposit), String.valueOf(number), x0+i*x-j*x/2, y0+j*y); number++; } } for (int j = h+1; j < 2*h+1; j++) { for (int i = 0; i < w+2*h-j; i++) { int deposit = Math.abs(random.nextInt())%1000; nodes[i][j] = node(new Prog(size, number, deposit), String.valueOf(number), x0+i*x-(2*h-j)*x/2, y0+j*y); number++; } } // link nodes for (int j = 0; j < h+1; j++) { for (int i = 0; i < w+j; i++) { if (i < w+j-1) link(nodes[i][j], nodes[i+1][j]); if (i > 0) link(nodes[i][j], nodes[i-1][j]); if (j < h) { link(nodes[i][j], nodes[i][j+1]); link(nodes[i][j], nodes[i+1][j+1]); } else { if (i < w+j-1) link(nodes[i][j], nodes[i][j+1]); if (i> 0) link(nodes[i][j], nodes[i-1][j+1]); } if (j > 0) { if (i < w+j-1) link(nodes[i][j], nodes[i][j-1]); if (i > 0) link(nodes[i][j], nodes[i-1][j-1]); } } } for (int j = h+1; j < 2*h+1; j++) { for (int i = 0; i < w+2*h-j; i++) { if (i < w+2*h-j-1) link(nodes[i][j], nodes[i+1][j]); if (i > 0) link(nodes[i][j], nodes[i-1][j]); if (j < 2*h) { if (i > 0) link(nodes[i][j], nodes[i-1][j+1]); if (i < w+2*h-j-1) link(nodes[i][j], nodes[i][j+1]); } if (j > 0) { link(nodes[i][j], nodes[i][j-1]); link(nodes[i][j], nodes[i+1][j-1]); } } } } // -------------------------------------------------------------------------- // informative message // -------------------------------------------------------------------------- public String getText() { return "Distributed Snapshots\n \n" + "The Chandy-Lamport algorithm\n" + "for taking consistent global snapshots\n" + "of a running network."; } } // ---------------------------------------------------------------------------- // // a program class // // ---------------------------------------------------------------------------- class Prog extends Program { // state variables public int size; // network size public int number; // current node number public int deposit; // current deposit public Message message; // message currently being processed public int index; // index of channel from which message was received public int mode; // current execution mode // snapped local state public int snapValue; // current snap value public boolean snapped[]; // snap state of input channels public int missingSnaps; // number of missing snaps of input channels // snapped total state public int totalValue; // current network value public boolean done[]; // true iff node has reported its snap value public int missingValues; // number of outstanding node reports // execution modes private final int RUNNING = 0; // initial state private final int SNAPPING = 1; // constructing snapshot private final int BROADCASTING = 2; // broadcasting result private final int TERMINATED = 3; // terminal state // -------------------------------------------------------------------------- // construct program in network of size `s` at node `n` with deposit `d` // -------------------------------------------------------------------------- public Prog(int s, int n, int d) { size = s; number = n; deposit = d; message = null; mode = RUNNING; } // -------------------------------------------------------------------------- // return random integer `r` with 0 <= `r` < `n` // -------------------------------------------------------------------------- private Random rand = new Random(); private int random(int n) { return Math.abs(rand.nextInt())%n; } // -------------------------------------------------------------------------- // go into snapping mode // -------------------------------------------------------------------------- public void snap() { // go into snap mode, store local deposit, reset total value mode = SNAPPING; snapValue = deposit; totalValue = 0; // initialize snapping state of channels int s = in().getSize(); snapped = new boolean[s]; for (int i = 0; i < s; i++) snapped[i] = false; missingSnaps = s; // initialize report state of nodes done = new boolean[size]; for (int i = 0; i < size; i++) done[i] = false; missingValues = size; } // -------------------------------------------------------------------------- // collect `value` from `sender` // -------------------------------------------------------------------------- public void collect(int sender, int value) { totalValue += value; done[sender] = true; missingValues--; if (missingValues == 0) mode = TERMINATED; } // -------------------------------------------------------------------------- // main function of program // -------------------------------------------------------------------------- public void main() { while (mode != TERMINATED) { // node 0 triggers snapping at its own will if (number == 0 && mode == RUNNING && random(10) == 0) { snap(); interrupt(); out().send(new SnapMessage()); } // wait some time for message index = in().select(random(5)); if (index == -1 && deposit > 0) { // --------------------------------------------------------------- // send some money from deposit to some neighbor node // --------------------------------------------------------------- int value = random(deposit); deposit -= value; message = new ValueMessage(value); index = random(out().getSize()); out(index).send(message); message = null; continue; } // message was received message = in(index).receive(); // --------------------------------------------------------------- // value message was received // --------------------------------------------------------------- if (message instanceof ValueMessage) { ValueMessage valueMessage = (ValueMessage)message; int value = valueMessage.getValue(); deposit += value; // snap message if (mode == SNAPPING && !snapped[index]) { snapValue += value; } } // --------------------------------------------------------------- // snap message was received // --------------------------------------------------------------- else if (message instanceof SnapMessage) { SnapMessage snapMessage = (SnapMessage)message; if (mode == RUNNING) { snap(); out().send(snapMessage); } if (!snapped[index]) { snapped[index] = true; missingSnaps--; if (missingSnaps == 0) { mode = BROADCASTING; collect(number, snapValue); if (number == 0) interrupt(); out().send(new ResultMessage(number, snapValue)); } } } // --------------------------------------------------------------- // result message was received // --------------------------------------------------------------- else if (message instanceof ResultMessage) { ResultMessage resultMessage = (ResultMessage)message; int sender = resultMessage.getSender(); if (!done[sender]) { collect(sender, resultMessage.getValue()); out().send(resultMessage); } } // message was handled message = null; } // assert that total amount of money equals the determined value assert(new TotalValue(totalValue)); } // -------------------------------------------------------------------------- // called for display of program state // -------------------------------------------------------------------------- public String getText() { String messageString; if (message == null) messageString = "(null)"; else messageString = message.getText(); String modeString; switch (mode) { case RUNNING: { modeString = "RUNNING"; break; } case SNAPPING: { modeString = "SNAPPING"; break; } case BROADCASTING: { modeString = "BROADCASTING"; break; } case TERMINATED: { modeString = "TERMINATED"; break; } default: { modeString = "(unknown)"; break; } } if (message == null) messageString = "(null)"; else messageString = message.getText(); if (mode == RUNNING) return "mode: " + modeString + "\ndeposit: " + String.valueOf(deposit); else if (mode == SNAPPING) return "mode: " + modeString + "\ndeposit: " + String.valueOf(deposit) + "\nsnapValue: " + String.valueOf(snapValue) + "\nmissingSnaps: " + String.valueOf(missingSnaps); else if (mode == BROADCASTING) return "mode: " + modeString + "\ndeposit: " + String.valueOf(deposit) + "\nsnapValue: " + String.valueOf(snapValue) + "\ntotalValue: " + String.valueOf(totalValue) + "\nmissingValues: " + String.valueOf(missingValues); else return "mode: " + modeString + "\ndeposit: " + String.valueOf(deposit) + "\nsnapValue: " + String.valueOf(snapValue) + "\ntotalValue: " + String.valueOf(totalValue); } } // ---------------------------------------------------------------------------- // // the message classes // // ---------------------------------------------------------------------------- // ---------------------------------------------------------------------------- // carries `value` // ---------------------------------------------------------------------------- class ValueMessage extends Message { private int value; public ValueMessage(int v) { value = v; } public int getValue() { return value; } public String getText() { return "VALUE[" + String.valueOf(value) + "]"; } } // ---------------------------------------------------------------------------- // carries no information // ---------------------------------------------------------------------------- class SnapMessage extends Message { public String getText() { return "SNAP"; } } // ---------------------------------------------------------------------------- // carries `value` of `sender` // ---------------------------------------------------------------------------- class ResultMessage extends Message { private int sender; private int value; public ResultMessage(int s, int v) { sender = s; value = v; } public int getSender() { return sender; } public int getValue() { return value; } public String getText() { return "RESULT[" + String.valueOf(sender) + ", " + String.valueOf(value) + "]"; } } // ---------------------------------------------------------------------------- // // global assertion stating that total values of network equals `value` // // ---------------------------------------------------------------------------- class TotalValue extends GlobalAssertion { int value; // expected value int sum; // counted value public TotalValue(int v) { value = v; } public String getText() { return "invalid value: " + String.valueOf(sum) + "(" + String.valueOf(value) + " expected)"; } // -------------------------------------------------------------------------- // add values of all messages in `msg` // -------------------------------------------------------------------------- private int add(Message msg[]) { int s = 0; for (int i = 0; i < msg.length; i++) { if (msg[i] instanceof ValueMessage) { s += ((ValueMessage)msg[i]).getValue(); } } return s; } // -------------------------------------------------------------------------- // check whether total value of network equals `value` // -------------------------------------------------------------------------- public boolean assert(Program prog[]) { sum = 0; for (int j = 0; j < prog.length; j++) { Prog program = (Prog)prog[j]; // value deposited in node sum += program.deposit; // value in output channels int n = program.out().getSize(); for (int i = 0; i < n; i++) { Message msg[] = getMessages(program.out(i)); sum += add(msg); } // value in pending message if (program.message != null && program.message instanceof ValueMessage) { sum += ((ValueMessage)program.message).getValue(); } } return sum == value; } }