Concurrent Programming - Threads

De la WikiLabs
Jump to navigationJump to search

Java language natively supports threads, which are several sequences of instructions running in parallel, but belonging to the same application. An example would be a server which accepts and manages several connections at the same time. There are two ways of implementing a thread:

Class Thread as well as interface Runnable define a method called run(). This method is the start point of the new thread, just like public static void main(String[]) for the main thread. This method run() can be called in two ways:

  • directly calling it, in which case it is ran on the same thread, as normal method;
  • calling method start(), which starts a new thread and starts executing method run() in parallel.
public class PrintThread extends Thread{

    private int index;

public PrintThread(int _index){
    index = _index;
}

public void run(){
    for(int i=0; i<5; i++){
        System.out.println("This is thread " + index);
        try{
            //pause for 0.5 seconds (500 ms)
            Thread.sleep(500);
        }catch(InterruptedException _ie){
            System.out.println(_ie.getMessage());
        }
    }
}

}

This is the normal call of method run(). The program will first display the test on thread 1 five times, then the one on thread 2 five times, then the one on thread 3, etc:

public class NormalStarter{

public static void main(String[] _args){
    for(int i=0; i<5; i++){
        PrintThread _thread = new PrintThread(i + 1);
        _thread.run();
    }
}

}

This is the behavior when calling start(). The program will display texts from all threads at the same time:

public class ThreadStarter{

public static void main(String[] _args){
    for(int i=0; i<5; i++){
        PrintThread _thread = new PrintThread(i + 1);
        _thread.start();
    }
}

}

Same thing by using the Runnable interface:

public class PrintRunnable implements Runnable{

    private int index;

public PrintRunnable(int _index){
    index = _index;
}

public void run(){
    for(int i=0; i<10; i++){
        System.out.println("This is thread " + index);
        try{
            //pause for 1 second (1000 ms)
            Thread.sleep(1000);
        }catch(InterruptedException _ie){
            System.out.println(_ie.getMessage());
        }
    }
}

}


public class ThreadStarterRunnable{

public static void main(String[] _args){
    for(int i=0; i<5; i++){
        // polymorphism: PrintRunnable is a Runnable
        Runnable _runnable = new PrintRunnable(i + 1);

        PrintThread _thread = new Thread(_runnable);
        _thread.start();
    }
}

}

Synchronizing Threads - Semaphores

There are situations in multithreading applications, where multiple threads are accessing a common resource. In some of these situations, if some methods are program areas using these common resources are accessed by multiple threads at the same time, certain events occur that result in the malfunction of the application. These areas need to be executed atomically, meaning that during their execution by a thread, no other thread can interrupt the current thread or execute the same area.

In the virtual machine, this is achieved with a system of monitors. A monitor is a hidden field defined in class Object (so that by inheriting it, any object can be used as a monitor) which keeps track of the threads that access one or more code sections. These sections are called synchronized. If a thread is about to enter a synchronized section, then the virtual machine verifies if the monitor is not locked, meaning that no other thread executes instructions in any section synchronized by that monitor. The the monitor is locked (if another thread has it locked), then the current thread will stop and wait for its release. If the monitor is unlocked, the thread will lock it and it will continue execution.

In Java, the keyword for synchronizing a code section is synchronized.

public class Stack{

    public static final int MAX_STACK_SIZE = 128;
    
    private Object[] stack;
    private int stackTop;

public Stack(){
    this(MAX_STACK_SIZE);
}

public Stack(int _maxSize){
    stack = new Object[_maxSize];
    stackTop = 0;
}

public synchronized boolean empty(){
    return stackTop == 0;
}

public synchronized boolean full(){
    return stackTop == stack.length;
}


public Object pop() throws Exception{
    synchronized(this){
        if(!empty()){
            return stack[--stackTop];
        }
    }

    throw new Exception("Stack empty");
}

public void push(Object _obj) throws Exception{
    synchronized(this){
        if(!full()){
            stack[stackTop++] = _obj;
        }
    }

    throw new Exception("Stack full");
}

}

In the example above, if two threads call methods push(Object) and pop() at the same time and they would not be synchronized, situations might occur when the counter stackTop would be incremented by one thread, then decremented by the second before the first finishes writing in the array stack, effectively writing in the wrong position. By using keyword synchronized, we are safe to assume that none of the methods declared as such and none of the synchronized sections will be executed at the same time by different threads.

Rule:A thread can not execute a synchronized section belonging to another thread BUT it can execute a synchronized section that already belongs to it. As an example, the synchronized section in method push(Object) is calling method full(), which is itself synchronized by the same monitor.

If a method is declared synchronized, this is equivalent of declaring the whole method content in a synchronized(this) block. Therefore, a method declared synchronized is automatically synchronized using the current object as a monitor.

If a static method is declared synchronized, this is equivalent as declaring the whole method content in a synchronized(<class_name>.class) block. Therefore, a static method declared synchronized is automatically synchronized using the object of type Class associated with the current class as a monitor.

public class StaticSync{

public static synchronized void printSmth(){
    System.out.println("Smth");
}

public static void printSmthElse(){
    synchronized(StaticSync.class){
        System.out.println("SmthElse");
    }
}

}

Race conditions - bariera

În exemplul anterior, dacă considerăm două thread-uri, unul care pune elemente pe stivă și unul care le consumă, observăm că dacă oricare din thread-uri este mai rapid decât celălalt, se ajunge în situația în care se aruncă o excepție, ori pentru că stiva e plină, ori pentru că s-a golit. Aici apare ceea ce se numește race condition, adică există una sau două zone de program executate de două thread-uri diferite și în care unul din thread-uri trebuie să ajungă înaintea celuilalt ca programul să se desfășoare corect. În exemplul anterior, dacă stiva este goală, atunci thread-ul care pune un element pe stivă trebuie să ajungă la metoda push(Object) înainte ca celălalt thread să apeleze metoda pop(), în caz contrar generându-se o excepție.

Această problemă se rezolvă folosind un sistem de bariere, adică un sistem care oprește unul din thread-uri într-un anumit loc până când o condișie este îndeplinită (de cele mai multe ori, când alt thread ajunge în locul potrivit). În Java, acest lucru se realizează cu ajutorul metodelor wait()/ wait(long)/ wait(long, int) și notify()/ notifyAll().

Metodele wait()/ wait(long)/ wait(long, int) blochează thread-ul curent până când un alt thread apelează una din metodele notify()/ notifyAll(), sau până când timpul dat ca argument a expirat.

Regulă: Apelurile metodelor de tip wait() se fac obligatoriu într-un bloc synchronized și se folosește ca referință obiectul monitor:
public class Stack{

    public static final int MAX_STACK_SIZE = 128;
    
    private Object[] stack;
    private int stackTop;

public Stack(){
    this(MAX_STACK_SIZE);
}

public Stack(int _maxSize){
    stack = new Object[_maxSize];
    stackTop = 0;
}

public synchronized boolean empty(){
    return stackTop == 0;
}

public synchronized boolean full(){
    return stackTop == stack.length;
}


public Object pop(){
    synchronized(this){
        while(empty()){
            try{
                this.wait();
            }catch(InterruptedException _ie){
                  System.out.println("InterruptedException: " + _ie.getMessage());
            }
        }
        this.notifyAll();
        return stack[--stackTop];
    }
}

public void push(Object _obj){
    synchronized(this){
        while(full()){
            try{
                this.wait();
            }catch(InterruptedException _ie){
                  System.out.println("InterruptedException: " + _ie.getMessage());
            }
        }
        stack[stackTop++] = _obj;
        this.notifyAll();
    }
}

}

Metodele de tip notify() reactivează threadurile oprite folosind metodele wait(). Acestea nu trebuie obligatoriu să fie apelate dintr-un bloc sincronizat, dar este recomandat. Se folosește ca referință tot obiectul de tip monitor utilizat pentru apelul metodelor wait().

Un thread reactivat folosind notify() trebuie să aștepte eliberarea monitorului pentru a executa zona sincronizată, ca orice alt thread. Observați utilizarea acestei funcționalități în metoda pop(), unde s-a apelat notifyAll() înainte de extragerea elementului de pe stivă, pe linia următoare. Totuși, clasa funcționează corect, pentru că thread-ul care introduce pe stivă un element nou nu va intra în execuție până când thread-ul care apelează notifyAll() nu iese din zona sincronizată, adică după extragerea unui element de pe stivă.

Resurse externe:

  1. http://docs.oracle.com/javase/tutorial/essential/concurrency/
  2. http://www.tutorialspoint.com/java/java_multithreading.htm