Verbindung verschiedener Verabeitungsschritte

Häufig beinhaltet eine Software verschiedene Verarbeitungsschritte, die von den Daten nacheinander durchlaufen werden.

Im einfachsten Fall kann man mit Verabeitungsschritten f, g und h man so etwas machen wie (Pseudocode):


for x in input_data {
y = h(g(f(x));
  store_partial_result(y);
}

Das stimmt, wenn die Datenmengen jeweils übereinstimmen. Häufig ist aber in der Praxis der Fall, dass die Datengrößen nicht so übereinstimmen. Man muss also Daten zwischenspeichern, bis für den jeweiligen Verarbeitungsschritt genug Eingaben da sind. Im Grunde genommen ist ein sehr einfacher und robuster Mechanismuns seit Jahrhunderten bewährt, zumindest wenn man in Jahrhunderten von zehn Jahren Dauer rechnet.. 😉


cat input_data_file| process_f | process_g | process_h > output_data_file

Der Ansatz lässt sich in einem Programm direkt nachbauen. Man muss named oder anonymous Pipes, TCP/IP, Message-Queues oder Messaging (wie JMS, RabbitMQ, MQ-Series,…) verwenden, um die Programmteile miteinander zu verbinden. Wenn man mit Akka arbeitet, kann man für jeden Verarbeitungsschritt einen (oder mehrere parallele) Aktor verwenden und diese mit Messages verknüpfen. Die Aktoren müssen und Daten sammeln, bis sie genug haben, um sie sinnvoll zu verarbeiten. Etwas unschön ist, dass man hier plötzlich „state“ im Aktor braucht.

Mehrere Threads sind auch möglich, wenn man für die Daten jeweils den augenblicklichen „Owner“ kennt und nur diesem Zugriff darauf erlaubt. Die zu implementierende Logik passt eigentlich sehr gut dazu, da definiert sein sollte, welchem Verarbeitungsschritt die Daten gerade unterzogen werden.

Es lässt sich aber auch mit einem Thread etwas machen, etwa entlang dieser Linie:

while (true) {
  if (enough_for_h(g_out)) {
    h_in = get_block(g_out, required_size_for_h)
    h_out = h(h_in)
    store_result(h_out)
  } else if (enough_for_g(f_out)) {
    g_in = get_block(f_out, required_size_for_g)
    append(g_out, g(g_in));
  } else if (enough_for_f(new_data)) {
    f_in = get_block(new_data, required_size_for_f)
    append(f_out, f(f_in));
  } else {
    append(new_data, read_block(input_data));
  }
}

Man muss immer aufpassen, dass sich das System nicht totläuft, was bei schlechten Implementierungen leicht passieren kann, weil entweder die Eingabedaten ausgehen oder die Buffer überlaufen. Die obige Implementierung strebt an, die Buffer immer auf der minimalen Füllung zu belassen.

Oft hat man natürlich die Möglichkeit und auch die Notwendigkeit, alle Daten auf einmal ins Memory zu laden. Dann gehen viele Dinge einfacher, man denke nur an Sortierfunktionen, die ja alle Daten sehen müssen.

Für große Datenmengen ist das aber nicht möglich und da sind die richtigen Ansätze nützlich, um nur eine verträgliche Datenmenge im Speicher zu halten.

Ein recht radikaler Ansatz ist es, alle Zwischenergebnisse in Dateien oder Datenbanken zu speichern. Dann kann man die Schritte sogar nacheinander ausführen.

In jedem Fall sollte man sich über die richtige Architektur Gedanken machen, bevor man munter drauf los programmiert.

In der Regel fährt man am besten, wenn die Quelldaten den Prozess treiben, also wenn diese sequentiell gelesen werden und zwar jeweils in den Mengen, die auch verabeitet werden. Das ist das Prinzip des „Backpressure“.

Share Button

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert

*