07.112012

Multithreaded-Queue in Java

Es kommt manchmal vor, das man in seinem Java-Programm durch das auftreten von Events oder Notifications/Nachrichten irgendwelcher Art, deren Berarbeitung unterschiedlich lange dauert, in die Situation kommt, diese geordnet asynchron abarbeiten zu wollen.

Hier bietet Java ab Version 1.5 eine bequeme Queue an, die ArrayBlockingQueue, die schon dafür geschaffen ist, sie mit mehreren Threads parallel zu befüllen bzw. abzuarbeiten. Hierbei wird das erste Element, das hinzugefügt wird, auch als erstes wieder abgearbeitet (FIFO - first in fist out). Da die Queue eine fixe Größe hat, muss man sich entscheiden, ob man beim befüllen komplett blockiert (Methode offer(), auch mit Timeout) oder es einfach versucht und das im negativen Fall dann mit einer Exception fehlschlägt (Methode add()).

Für unser Beispiel nehmen wir den Fall an, das unser Programm von einer entfernten Quelle (z.B. über eine HTTP-Schnittstelle) sich viele wartende Ereignisse oder Benachrichtigungen abholt und diese in die Queue schiebt, wo dann mehrere Threads diese parallel abarbeiten. Der Übersichthalber ist der Code etwas vereinfacht, wir starten einfach direkt das Befüllen und Abarbeiten der queue bei Programmstart und kümmern uns nicht um das beenden oder ähnliches.

public class BlockingQueueDemo {
    private ArrayBlockingQueue<Nachricht> queue;
    private final Thread[] handlers;
    protected final Timer timer;

    public static void main(String args[]) throws IOException {
        // Wir legen eine Queue mit max.
        // 20 Einträgen an und arbeiten sie
        // mit 3 Arbeiter-Threads ab
        queue = new ArrayBlockingQueue<Nachricht>(20);
        handlers = new Thread[3];
        timer = new Timer();

        // Handler-Threads erstellen
        for (int i = 0; i < handlers.length; i++) {
            handlers[i] = new Thread(new Arbeiter());
        }

        // Timer anlegen und starten
        TimerTask task = new TimerTask() {
            public void run() {
                nachrichtenAbholen();
            }
        };

        // Einmal alle 5 Sekunden Nachrichten abholen
        timer.schedule(task, 0, 5000);

        // Threads, die die Nachrichten in der
        // Queue bearbeiten starten
        for (Thread handler : handlers) {
            handler.start();
        }
    }

    private void nachrichtenAbholen() {
        // Die Queue ist zur Zeit voll, erst beim
        // nächsten Durchlauf wieder probieren
        if (queue.remainingCapacity() <= 0) {
            return;
        }

        // Pseudo-Code zum holen der Nachrichten
        Set<Nachricht> nachrichten = abrufVomServer();

        // Nachrichten in die Queue einfuegen - das
        // blockiert hier solange die Queue keinen
        // Platz hat
        for (nachricht : nachrichten ) {
            queue.offer(nachricht);
        }
    }

    private class Arbeiterimplements Runnable {
        public void run() {
            while (true) {
                // Nächste Nachricht aus der
                // Queue holen
                try {
                    // Wenn keine Nachrichten da sind
                    // blockiert der Thread hier solange
                    Nachricht nachricht = queue.take();

                    // Pseudo-Code zum Verarbeiten der
                    // Nachricht
                    nachricht.process();
                } catch (Exception e) {
                    // Logging
                }
            }
        }
    }
}

Von oben nach unten gesehen legen wir uns hier also unsere Queue sowie unsere Arbeiter-Threads an und stellen desweiteren noch einen Timer, der unsere Abholfunktion aufruft.
Das Abholen der Nachrichten ist jetzt hier nicht weiter ausgeführt. Die Nachrichten werden dann per offer() in die Queue geschoben, sofern Platz ist, ansonsten wartet der Hauptthread darauf.

Die 3 Arbeiter-Threads die wir am Anfang angelegt haben sind in einer Privaten-Klasse am Ende definiert. Sie machen nichts weiter als sobald die Queue etwas anzubieten hat, dieses per take() entgegenzunehmen und dann abzuarbeiten (das Abarbeiten ist hier nicht ausgeführt). Ist nichts in der Queue, warten die Arbeiter darauf.

Vielleicht ist das eine Idee, von der aus ihr starten könnt, wenn ihr eine solche oder ähnliche Queue benötigt.