Die Thread-Programmierung in C nach dem POSIX Standard P1003.1c ist vor allem für die Systemprogrammierung bzw. für die Erstellung von Client/Server-Anwendungen geeignet. Unter Ausnutzung der Eigenschaften von C++ ist es möglich, wesentlich abstrakterere Programmiermodelle für Threads zu implementieren. Insbesondere können Threads streng getypte Schnittstellen erhalten und mittels automatischer Speicherbereinigung alle belegten Resourcen wieder freigeben.Parallele Programmiermodelle auf Basis von C++ sind zum Teil schon längere Zeit verfügbar [1] aber weit weniger abstrakt als dies für viele Anwendungen wünschenswert wäre. Im zweiten Teil dieses Artikels zur Thread-Programmierung stellen wir die selbst entwickelte C++ Klassenbibliothek RT++ [3] vor, die eine abstrakte Thread-Schnittstelle auf verschiedenen Workstations und Multiprozessor-Systemen implementiert. Insbesondere ist diese Bibliothek auch unter Linux verfügbar (tatsächlich wurde sie zu großen Teilen darauf entwickelt); sobald die zur Zeit im Alpha-Test befindliche Multiprozessor-Version des Linux-Kernels verfügbar ist, laufen mit RT++ entwickelte Programme damit auch unter Linux "echt" parallel.
Im Unterschied zu Thread-Implementierungen im Kern des Betriebssystems werden in RT++ Threads auf der Benutzer-Ebene realisiert (siehe Teil 1 des Artikels) und zwischen verschiedenen Prozessen zur Ausführung geschaltet. Da aber jeder Prozeß einen eigenen Adreßraum besitzt, müssen sowohl die Threads selbst als auch alle Objekte, auf denen die Threads operieren im gemeinsamen Heap gehalten werden.
// Header Files #include <rt++.h> // RT++ header file #include <iostream.h> // C++ header file // Registrierung der Typen Atom(int); // int ist Atom Atom(float); // float ist Atom Ref(Array<float>); // Array ist Referenz // Registrierung der Threads ThreadRes(int); // Ergebnis-Typ ThreadArg3(A, int, Array<float>, int, int); // teile a[l..r] durch e in a[l..m0], a[m0..m1], a[m1..r] void partition(Array<float> a, int l, int r, float e, int *m0, int *m1); // sortiere a[l..r] int qsort(Array<float> a, int l, int r) { if (r-l < 1000) return(sort(a, l, r)); int m0, m1; partition(a, l, r, a[(l+r)/2], &m0, &m1); Thread3<int, Array<float>, int, int> t0(qsort, a, l, m0); Thread3<int, Array<float>, int, int> t1(qsort, a, m1, r); return(Wait(t0)+Wait(t1)); } // RT++ Hauptprogramm int qmain(int argc, char *argv[]) { int L = atoi(argv[1]); for (int i = 0; i < 1000; i++) { Array<float> a(L); for (int j = 0; j < L; j++) a[j] = rand(); int n = qsort(a, 0, L); } return(0); } // C++ Hauptprogramm int main(int argc, char *argv[]) { RT_Argument rt_arg; // RT++ Argumente RT_Result rt_res; // RT++ Ergebnis rt_arg.proc = 3; // 3 Prozesse rt_arg.memory = 1000000; // 1 MB gemeinsamer Speicher rt_arg.heap = 100000; // 100 KB Heap zu Beginn rt_arg.blocks = 4; // 4 Blocks/Prozess rt_arg.stack = 15000; // 15 KB Stack/Thread rtarg.verbose = 1; // Drucke Statistik exit(rt_main(qmain, argc, argv, rt_arg, &rt_res)); }Das Programm ist aus folgenden Bestandteilen aufgebaut:
main
übergibt mittels der Funktion
rt_main
die Kontrolle an das RT++ Laufzeitsystem, das die Funktion
qmain(argc,argv)
ausführt. Das Argument rt_arg
wird zum
Hochstarten der parallelen Umgebung benötigt und gibt u.a. die gewünschte
Anzahl der Prozesse, die Größe des gemeinsamen Speichers, die Anfangsgröße des
Heaps und die Größe des Stacks für jede Thread an. Im Argument rt_res
wird nach Beendigung des RT++ Programms Zustands- und Statistikinformation in
das C++ Hauptprogramm zurückgeliefert. Das Hauptprogramm kann terminieren oder
das RT++ Laufzeitsystem erneut starten.
qmain
deklariert mit Hilfe des RT++ Template-Typs
Array<
T>
ein Feld a
der Länge L
. Objekte
dieses Typs werden im Heap angelegt und von der automatischen
Speicherbereinigung freigegeben, sobald das Objekt nicht mehr referenziert
wird. Das Feld wird mit Zufallswerten initialisert und der Funktion
qsort
zur Sortierung übergeben.
qsort
partitioniert das Feld a
mittels eines
Pivot-Elements in zwei Teile und erzeugt zwei Threads zur rekursiven
Sortierung dieser Teile. Die Deklaration
Thread3<int, Array<float>, int, int> t0(qsort, a, l, m0);erzeugt eine Thread
t0
, die qsort(a, l, m0)
ausführt. Diese
Thread ist streng getypt im Ergebnis-Typ int
und in den drei
Argument-Typen Array<float>
, int
, int
; Typ-Fehler werden
damit bereits vom Compiler erkannt.
Der Ausdruck
Wait(t0)wartet auf die Beendigung der Thread
t0
und liefert deren Ergebniswert
zurück. Thread-Argumente und Ergebnisse können von jedem zugelassenen
RT++ Typ (siehe den nächsten Abschnitt) sein.
Atom(int);gibt bekannt, daß der Typ
int
atomar ist d.h. keine Zeiger
enthält, die bei der Speicherbereinigung berücksichigt werden müssen. Dagegen
gibt die Deklaration
Ref(Array<float>);an, daß
Array<float>
ein Referenztyp ist, d.h. Objekte von
diesem Zeiger auf ein anderes Objekt beinhalten (ein Array
ist als
Zeiger auf den entsprechenden Speicherbereich repräsentiert). Deklarationen
dieser Art sind im wesentlichen für alle vom RT++ Programm verwendeten Typen
notwendig (siehe den Abschnitt über Speicherbereinigung).
------------------------------------------------------------------------------ RT++ 1.0 (C) 1996 Wolfgang Schreiner <Wolfgang.Schreiner@risc.uni-linz.ac.at> proc: 3, mem: 1000000, heap: 100032, blocks: 12, stack: 5000 ------------------------------------------------------------------------------ RT++ GC 1 (1360 ms): 8 blocks with 42560 bytes in 30 (+ 0) ms. RT++ GC 2 (1820 ms): 11 blocks with 64112 bytes in 20 (+ 0) ms. RT++ GC 3 (2640 ms): 9 blocks with 53456 bytes in 20 (+ 0) ms. RT++ GC 4 (3200 ms): 3 blocks with 15984 bytes in 30 (+ 0) ms. RT++ GC: less than 25008 bytes heap (1 block per process) free. RT++ GC: increase heap by 100032 bytes. RT++ GC 5 (4670 ms): 7 blocks with 33520 bytes in 50 (+ 0) ms. ... RT++ GC 13 (13170 ms): 41 blocks with 243984 bytes in 70 (+ 0) ms. ------------------------------------------------------------------------------ RT++ execution Return code: 0 Real time(ms): 16650 Proc time(ms): 16620 All procs(ms): 46900 RT++ garbage collections Number: 13 Increments: 3 Waiting(ms): 30 Collecting(ms): 570 Blocks: 167 Bytes: 880464 ------------------------------------------------------------------------------Die Kopfzeilen werden beim Start der RT++ Laufzeitumgebung ausgegeben und geben die für die Initialisierung verwendeten Parameter an. Jede während der Programmausführung ausgeführte Speicherbereinigung druckt Information über deren Dauer und über die Größe des wiedergewonnenen Speichers aus. Ist zu wenig freier Speicher vorhanden oder dieser zu fragmentiert für die weitere Verwendung, kann der Heap auch vergrößert werden. Bei Beendigung der RT++ Laufzeitumgebung wird Status-Information über das Programm und über die durchgeführten Speicherbereinigungen ausgegeben.
Daß obiges Beispiel auf einem Multiprozessor ausgeführt wurde zeigt sich darin, daß die real time um fast ein Drittel kleiner als die gesamte Ausführungszeit aller Prozesse war; die Prozesse liefen also auf mehreren Prozessoren echt parallel.
Thread
und
Array
haben wir im vorigen Beispiel schon kurz vorgestellt. Im
folgenden wollen wir eine etwas ausführlichere Darstellung geben:
Threadn<R,A_1,...,A_n>
t(f, a_1,...,a_n)
erzeugt eine neue Thread t zur Ausführung von f(a_1,...,a_n). Normalerweise (abhängig von einer Preprozessor-Direktive) sind solche Threads noch nicht aktivert und werden in die Liste der zu aktivierenden Threads gestellt. RT++ stellt auch andere Varianten der Thread-Erzeugung zur Verfügung, in den meisten Fällen ist aber die obige die bequemste. Die Funktion
Wait(t)wartet auf die Beendigung von t und liefert deren Ergebnis vom Typ R. Auf Threads sind außerdem verschiedene Basis-Operatoren wie Zuweisung
=
,
Vergleich ==
und eine Reihe von spezielleren Funktionen definiert.
Array<T>
a(L)
legt im Heap ein Feld mit L Elementen vom Typ T an und liefert eine
Referenz a auf dieses Feld. Auf Feldern ist der übliche Zugriffs-Operator
[]
für das Lesen und Schreiben von Feld-Elementen und eine Funktion
Length(a)definiert, die die Länge L des Feldes liefert. Auf Feldern sind außerdem die Basis-Operationen Zuweisung
=
und Vergleich ==
definiert.
List<T>
l
definiert eine (leere) Liste l mit einer Reihe von Konstruktoren und Selektoren:
l.cons(h,t)bzw. die äquivalente Konstruktion
l = Cons(h,t)
legt eine neue
Liste mit Kopf h vom Typ T und Restliste t vom Typ List<t>
an
und weist l eine Referenz auf diese Liste zu. Selektor-Operationen wie
n = Null(l); h = Head(l); t = Tail(l);sowie verschiedene destruktive Operatoren erlauben Zugriff auf die Listenelemente. Natürlich sind die entsprechenden Basis-Operatoren wie
=
und ==
ebenfalls auf Listen definiert.
Pointer<T>
p(1)
definiert einen Zeiger p auf Objekte vom Typ T und legt eine eine entsprechenden Speicherbereich im Heap an, auf die p verweist. Die Deklaration ist damit äquivalent zu
Pointer<T> p; p = Pointer<T>::Alloc()Die Operatoren
*
und ->
erlauben den Zugriff (Lesen und
Schreiben) auf das referenzierte Objekt; auf Zeigern existieren außerdem
die Basis-Operatoren =
und ==
. Mit Hilfe dieses Typs lassen sich
allgemeine Graphen-Strukturen realisieren.
ThreadBag<T>
b
definiert eine leere Multimenge (bag) b zur Aufnahme von Threads mit Ergebnistyp T. Die Operation
ThreadBagn<T, A_1, ..., A_n>(b, f, a_1, ..., a_n)fügt zu b eine Thread hinzu, die f(a_1,...,a_n) berechnet. Die Operation
Wait(b, i)liefert das Ergebnis derjenigen Thread in b, die als i-te terminiert ist (nicht die Thread, die als i-te der Bag hinzugefügt wurde!). Da im vorhinein im allgemeinen die Reihenfolge der Terminationen nicht bekannt ist, erlaubt
ThreadBag
die Ausnutzung
von Nicht-Determinismus einem parallelen Programm.
int
,
float
, char
, ...) aufgebaut sind, sind zulässige RT++ Typen
und können ihrerseits als Argumente für weitere RT++ Typ-Konstruktionen
bilden. Insbesondere können auch Threads selbst als Argumente und Ergebnise
anderer Threads dienen oder können als Objekte in Feldern, Listen und
Zeiger-Objekten gespeichert werden. Alle derartig aufgebauten Objekte (auch
Threads) werden von der automatischen Speicherbereinigung verwaltet und deren
Speicher freigegeben, sobald die Objekte nicht mehr referenziert werden.
Der Kern eines solchen Programms könnte in RT++ folgendermaßen aussehen:
typedef char Digit; typedef Array<Digit> Number; Number mult(Number a, Number b) { int lb = Length(b); List< Thread<Number> > l; for (int i = 0; i < lb; i++) { Thread2<Number, Number, Digit> t(prod, a, b[i]); l.cons(t, l); // l = Cons(t, l); } int n = 2*lb-1; for (int j = 0; j < n-2 ; j += 2) { Thread<Number> ta(Head(l)); l.tail(); // l = Tail(l); Thread<Number> tb(Head(l)); l.tail(); // l = Tail(l); Thread2<Number, Thread<Number>, Thread<Number> > t(sum, ta, tb); l.cons(t, l); // l = Cons(t, l); } return(Wait(Head(l))); }Diese Lösung verwendet eine Liste l vom Typ
List<Thread<Number>>
die
beliebige Threads vom Ergebnis-Typ Number
aufnehmen kann (der Typ
Thread<Number>
ist zuweisungs-kompatibel zu jedem Typ
Threadn<Number, ...>
). Das Programm läuft in der ersten Schleife
über alle Ziffern der Zahl b, start eine neue Thread für die Ausführung von
prod(a, b[i])
und speichert die Thread in der Liste l.
In der zweiten Schleife läuft das Programm über alle Threads in dieser Liste,
nimmt je zwei Threads heraus und startet eine neue Thread für die Ausführung
von sum(ta, tb)
, die die Ergebnisse der beiden Threads t_a und t_b
addieren soll. Die neue Thread wird ebenfalls wieder in diese Liste gestellt
(um einen balanizerten Thread-Baum zu erzeugen, sollte die neue Thread
allerdings nicht wie oben dargestellt am Anfang sondern am Ende
der Liste eingefügt werden). Das Ergebnis der letzten Thread ist das
Gesamtergebnis des Programms.
Ein wesentlicher Punkt in obigem Programm ist, daß nicht auf das Ergebnis der
prod
Threads gewartet wird, um die sum
Threads zu
erzeugen. Vielmehr erhalten die sum
Threads die prod
Threads als
Argumente und sind für die Synchronisation selbst verantwortlich:
Number sum(Thread<Number> ta, Thread<Number> tb) { Number a = Wait(ta); Number b = Wait(tb); return(sum(a,b)); }Unter Verwendung von Bags kann unsere Lösung noch eleganter gestaltet werden:
Number mult(Number a, Number b) { int lb = Length(b); int n = 2*lb-1; ThreadBag<Number> bag(n); for (int i = 0; i < lb; i++) { ThreadBag2<Number, Number, Digit>(bag, prod, a, b[i]); } for (int j = 0; j < n-2 ; j += 2) { ThreadBag2<Number, ThreadBag<Number>, int>(bag, sum, bag, j); } return(Wait(bag, n-1)); }Das Programm verwendet an stelle einer Liste eine Bag (der optionale Parameter n in der Deklaration gibt die Maximalgröße der Bag an und wird für die Optimierung des Zugriffs verwendet). In der ersten Schleife werden die
prod
Threads erzeugt und in die Bag gestellt, in der zweiten Schleife
werden die sum
Threads erzeugt und ihr die Bag und der Index der
entsprechenden Argument-Threads als Argument übergeben:
Number sum(ThreadBag<Number> a, int j) { Number a = Wait(bag, j); Number b = Wait(bag, j+1); return(sum(a,b)); }Jede
sum
Thread wartet damit nicht auf die Ergebnisse eines bestimmten
Thread-Paars sondern auf die Ergebnisse der Threads, die in Position j
bzw. j+1 "ins Ziel gekommen sind". Das Programm ist damit sehr viel
abstrakter und eleganter als die erste Lösung.
Diese Beispiele illustrieren sehr schön den Programmierstil, der von RT++ vorgeschlagen wird: Threads sind Objekte, die benutzt und herumgereicht werden können wie alle anderen Arten von Objekten. Es ist für den Erzeuger einer Thread nicht notwendig, Synchronisationsabhängigkeiten selbst aufzulösen; er kann diese Aufgabe auch an die Threads übertragen, die die jeweiligen Ergebnisse benötigen. Außerdem kümmert sich das Programm nicht selbst um Anlegen und Freigeben von Resourcen; diese Aufgabe wird vom darunterliegenden Speicherverwaltungs-System gelöst. Der daraus resultierende Programmierstil ähnelt in gewisser Weise dem funktionalen Datenfluß-Stil der parallelen Programmierung.
Der Aufbau von RT++ Threads ist in folgendem Bild skizziert:
SystemThread
,
die das Bindeglied zur System-Schnittstelle der Threads darstellt. Im
wesentlichen enthält diese Klasse den Zeiger auf den Thread-Stack und
realisiert das Umschalten von einer Thread zur anderen (context switch).
SystemThread
ist die Klasse CoreThread
,
die die Schnittstelle zum RT++ Laufzeit-System darstellt. Im wesentlichen ist
hier die Ausführung der benutzerdefinierten Thread-Funktion und die
Auslieferung des Thread-Ergebnisses an wartende Threads implementiert.
CoreThread
ist die Template-Klasse
ResultThread<T>
, die Basis-Klasse für alle Threads, deren Ergebnis
vom Typ T ist. Diese Klasse stellt die Schnittstelle zwischen
Speicherbereinigung und dem Ergebnis einer Thread dar.
ArgumentThreadn<T,A_i>
, die die vollständige Typ-Schnittstelle
einer Thread implementiert und die damit die Schnittstelle zwischen
Speicherbereinigung und den Argumenten einer Thread darstellt.
Threadn<T,A_i>
, die einen Zeiger auf das entsprechende
Thread-Objekt durch Herleitung vom Typ
Pointer<ArgumentThreadn<T,A_i>>
implementiert und um die
Thread-spezifische Schnittstelle erweitert.
Der Mechanismus zur Speicherbereinigung beruht auf einem parallelisierten mark&sweep Verfahren bei dem im ersten Schritt alle erreichbaren Objekte markiert werden und im zweiten Schritt der Heap nach nicht markierten Objekten durchsucht wird, deren Speicher freigegeben werden kann. Dieses Verfahren setzt allerdings voraus, daß einerseits die Menge der Zeiger bekannt ist, über die alle im Heap angelegten Objekte direkt oder indirekt erreicht werden können (root set), und daß andererseits die Menge der in einem Objekt enthaltenen Zeiger auf andere Objekte bestimmt werden kann.
RT++ verwendet eine Methode, bei der jede Variable mit Hilfe des entsprechenden Klassen-Konstruktors bei ihrer Erzeugung ihre Stack-Adresse und ihren Typ registriert. Die "root set" eines Programs ist damit der Stack der Haupt-Thread; alle Objekte (und alle Threads), die nicht über diese Thread referenzierbar sind, können nichts zum Ergebnis beitragen und ihr Speicher kann freigegeben werden.
Bei der registierten Typ-Information handelt es sich um die
scan
-Function des jeweiligen Typs, einer Funktion, die alle von der
jeweiligen Variable aus erreichbaren Zeiger durchläuft und die referenzierten
Objekte markiert. Während der ersten Phase der Speicherbereinigung wird bei
der Markierung einer Thread für alle auf dem Stack angelegten Variablen
die jeweilige scan
-Funktion aufgerufen.
Jeder RT++ Typ-Konstruktor besitzt eine solche scan
-Funktion; die
Deklaration Ref(T)
ist der Aufruf eines Makros, der im wesentlichen
in die Initialisierung
Type<T>::scan = T::scan;der statischen Komponente
scan
der Klasse Type<T>
aufgelöst
wird. Eine Deklaration Atom(T)
dagegen sagt, daß Typ T atomar
ist und eine solche Funktion nicht besitzt; die Auflösung
lautet daher
Type<T>::scan = 0;und wird von der Speicherbereinigung entsprechend behandelt. Der GNU G++ Compiler unterstützt leider noch nicht das Konzept der "Initializer" für statische Mitglieder einer Template-Klasse, damit würden die lästigen
Ref
Deklarationen überflüssig.
Bisher haben wir noch nicht erklärt, wo Adressen und Typen der von einer Thread auf ihrem Stack angelegten Variablen eigentlich registriert werden. Die Lösung ist ganz einfach: es wird dafür ein zweiter Stack verwendet, der am anderen Ende des für den Thread-Stack angelegten Speicherbereiches liegt und in die entgegengesetzte Richtung wächst. Für jede Referenz-Variable, die auf dem Stack angelegt wird, werden zwei Zeiger auf diesem "Spiegel-Stack" registriert: die Adresse der Variable und die scan-Funktion des entsprechenden Typs. Das folgende Bild veranschaulicht diese Idee:
Die eigentliche Speicherbereinigung wird von allen Prozessen des RT++ Laufzeitsystems parallel vorgenommen: in der ersten Phase werden referenzierte Stacks (und die von Variablen auf diesen Stacks referenzierten Objekte) parallel markiert, in der zweiten Phase werden die Blöcke, in die der Heap unterteilt ist, parallel nach unmarkierten Objekten untersucht und deren Speicher freigegeben. Ist nach der Speicherbereinigung der freie Speicher zu gering oder zu fragmentiert, wird ein neuer Cluster von Blöcken aus dem gemeinsamen Speicher allokiert und dem Heap hinzugefügt.
Die Klassenbibliothek ist unter der GNU Library Public License auf einer Vielzahl von Architekturen, Workstations und Multiprozessoren, verfügbar.