среда, 1 августа 2012 г.

threads: wait()/notify()

x
x
>> x


    Синтаксис

    synchronized/wait()/notify()/notifyAll() крепко увязаны вместе в концепции монитора (monitor). Это приводит к интересным побочным эффектам - нельзя вызывать у объекта wait()/notify()/notifyAll() вне блока синхронизации (или синхронизированного метода) по этому объекту (иначе будет IllegalMonitorStateException):

public class WaitNotifySynchronizedTest_0 {
    public static void main(String[] args) {
        new Object().notify();
    }
}

>> IllegalMonitorStateException


public class WaitNotifySynchronizedTest_1 {
    public static void main(String[] args) {
        new Object().notifyAll();
    }
}
>> IllegalMonitorStateException



public class WaitNotifySynchronizedTest_2 {
    public static void main(String[] args) throws InterruptedException {
        new Object().wait();
    }
}
>> IllegalMonitorStateException

    А так можно (ничего не произойдет):

public class Example {
    public static void main(String[] args) {
        Object ref = new Object();
        synchronized (ref) {
            ref.notify();
        }
    }
}

    И вот так можно (ничего не произойдет):
public class Example {
    public static void main(String[] args) {
        Object ref = new Object();
        synchronized (ref) {
            ref.notifyAll();
        }
    }
}

    А вот так исключения не будет, но программа "повиснет":
public class Example {
    public static void main(String[] args) throws InterruptedException {
        Object ref = new Object();
        synchronized (ref) {
            ref.wait();
        }
    }
}

    Синхронизация делается по конкретному объекту, а не по ссылке (в данном примере две ссылки ref0 и ref1 ссылаются на один и тот же объект):

public class WaitNotifySynchronizedTest_3 {
    public static void main(String[] args) {
        Object ref0 = new Object();
        Object ref1 = ref0;
        synchronized (ref1) {
            ref0.notify();
        }
    }
}

    Исключение, так как синхронизация делается по одному объекту, а метод вызываем у другого:

public class WaitNotifySynchronizedTest_4 {
    public static void main(String[] args){
        Object ref0 = new Object();
        Object ref1 = new Object();
        synchronized (ref0) {
            ref1.notify();
        }
    }
}
>> IllegalMonitorStateException

    Аналогично:

public class WaitNotifySynchronizedTest_4 {
    public static void main(String[] args){
        synchronized (new Object()) {
            new Object().notify();
        }
    }
}
>> IllegalMonitorStateException


    Можно многократно синхронизироваться по одному объекту (идентично однократной синхронизации) - в данной программе ничего не произойдет:

public class WaitNotifySynchronizedTest_5 {
    public static void main(String[] args) {
        Object ref0 = new Object();
        Object ref1 = new Object();
        synchronized (ref0) {
            synchronized (ref0) {
                ref0.notify();
            }
        }
    }
}


    Можно многократно вложено синхронизироваться по разным объектам (строить вложенные конструкции) - в данной программе ничего не произойдет:

public class WaitNotifySynchronizedTest_5 {
    public static void main(String[] args) {
        Object ref0 = new Object();
        Object ref1 = new Object();
        synchronized (ref0) {
            synchronized (ref1) {
                ref0.notify();
                ref1.notify();
            }
        }
    }
}


    Синхронизированные методы аналогичны синхронизированным секциям:

public class WaitNotifySynchronizedTest_6A {
    public static void main(String[] args) {
        new WaitNotifySynchronizedTest_6A().f();
    }
    public synchronized void f() {
        this.notify();
    }
}

    Предыдущий пример идентичен этому:

public class WaitNotifySynchronizedTest_6B {
    public static void main(String[] args) throws InterruptedException {
        new WaitNotifySynchronizedTest_6B().f();
    }
    public void f() {
        synchronized (this) {
            this.notify();
        }
    }
}


    Со статиками сложнее - они синхронизируются по объекту класса:

public class WaitNotifySynchronizedTest_7A {
    public static void main(String[] args) {
        f();
    }
    public static synchronized void f() {
        Class clazz = WaitNotifySynchronizedTest_7A.class;
        clazz.notify();
    }
}

    Предыдущий пример идентичен этому:

public class WaitNotifySynchronizedTest_7B {
    public static void main(String[] args) {
        f();
    }

    public static void f() {
        Class clazz = WaitNotifySynchronizedTest_7B.class;
        synchronized (clazz) {
            clazz.notify();
        }
    }
}



    Демонстрация простого поведения

    Вспомогательный класс (в методе run() вызовет метод f() объекта, полученного в конструкторе):
x
x
>> x
public class BlockedMethodCaller_A implements Runnable {
    private final BlockedSetExample_A ref;
    private final int k;

    public BlockedMethodCaller_A(BlockedSetExample_A ref, int k) {
        this.ref = ref;
        this.k = k;
    }

    @Override
    public void run() {
        try {
            ref.f(k);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
    Стартуем одновременно 5 потоков и заставляем из одновременно вызвать НЕ синхронизированный метод f() одного объекта. Отметьте временную динамику - сразу выводится 5 чисел со знаком "+", через секунду - 5 чисел со знаком "-" (порядок вывода чисел - не детерминирован):
x
x
>> x
public class BlockedSetExample_A {

    public static void main(String[] args) throws InterruptedException {
        BlockedSetExample_A ref = new BlockedSetExample_A();
        for (int k = 0; k < 5; k++) {
            new Thread(new BlockedMethodCaller_A(ref, k)).start();
        }
    }

    public void f(int x) throws InterruptedException {
        System.out.print(" +" + x);
        Thread.sleep(1000);
        System.out.print(" -" + x);
    }
}
>>   +0 +4 +3 +1 +2 
... 1 секунда задержки и
>>   +0 +4 +3 +1 +2 -0 -1 -2 -4 -3
    Объяснение: пять разных потоков могут одновременно зайти в не синхронизированный метод одного объекта.



    Вспомогательный класс (в методе run() вызовет метод f() объекта, полученного в конструкторе):
x
x
>> x
public class BlockedMethodCaller_B implements Runnable {
    private final BlockedSetExample_B ref;
    private final int k;

    public BlockedMethodCaller_B(BlockedSetExample_B ref, int k) {
        this.ref = ref;
        this.k = k;
    }

    @Override
    public void run() {
        try {
            ref.f(k);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
    Стартуем одновременно 5 потоков и заставляем из одновременно вызвать синхронизированный метод f() одного объекта. Отметьте временную динамику - сразу выводится число со знаком "+", через секунду - пара (число со знаком "-" и число со знаком "+") (порядок вывода чисел - не детерминирован = за "+x" всегда идет "-x", но за "-x" не определено какой именно будет "+y"):
x
x
>> x
public class BlockedSetExample_B {

    public static void main(String[] args) throws InterruptedException {
        BlockedSetExample_B ref = new BlockedSetExample_B();
        for (int k = 0; k < 5; k++) {
            new Thread(new BlockedMethodCaller_B(ref, k)).start();
        }
    }

    public synchronized void f(int x) throws InterruptedException {
        System.out.print(" +" + x);
        Thread.sleep(1000);
        System.out.print(" -" + x);
    }
}

>> +0
... 1 секунда задержки и
>> +0 -0 +4 
... 1 секунда задержки и
>> +0 -0 +4 -4 +3 

... 1 секунда задержки и
>> +0 -0 +4 -4 +3 -3 +2

... 1 секунда задержки и
>> +0 -0 +4 -4 +3 -3 +2 -2 +1

... 1 секунда задержки и
>> +0 -0 +4 -4 +3 -3 +2 -2 +1 -1





    Объяснение: пять разных потоков НЕ могут одновременно зайти в синхронизированный метод одного объекта. Они помещаются в blocking-set этого объекта и виртуальная машина запускает их по одному. 



    Вспомогательный класс (в методе run() вызовет метод f() объекта, полученного в конструкторе):
x
x
>> x
public class BlockedMethodCaller_C implements Runnable {
    private final BlockedSetExample_C ref;
    private final int k;

    public BlockedMethodCaller_C(BlockedSetExample_C ref, int k) {
        this.ref = ref;
        this.k = k;
    }

    @Override
    public void run() {
        try {
            ref.f(k);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
    Стартуем одновременно 5 потоков и заставляем из одновременно вызвать синхронизированный метод f() разных объектов одного класса. Отметьте временную динамику - сразу выводится 5 чисел со знаком "+", через секунду - 5 чисел со знаком "-" (порядок вывода чисел - не детерминирован):
x
x
>> x
public class BlockedSetExample_C {

    public static void main(String[] args) throws InterruptedException {
        for (int k = 0; k < 5; k++) {
            new Thread(new BlockedMethodCaller_C(new BlockedSetExample_C(), k)).start();
        }
    }

    public synchronized void f(int x) throws InterruptedException {
        System.out.println("+" + x);
        Thread.sleep(1000);
        System.out.println("-" + x);
    }
}
>>   +0 +4 +3 +1 +2 
... 1 секунда задержки и
>>   +0 +4 +3 +1 +2 -0 -1 -2 -4 -3

    Объяснение: пять разных потоков могут одновременно зайти в синхронизированный метод разных объектов (синхронизация по разным объектам). 


    Вспомогательный класс (в методе run() вызовет метод f() объекта, полученного в конструкторе):
x
x
>> x
public class WaitMethodCaller implements Runnable {
    private final WaitSetExample ref;
    private final int k;

    public WaitMethodCaller(WaitSetExample ref, int k) {
        this.ref = ref;
        this.k = k;
    }

    @Override
    public void run() {
        try {
            ref.f(k);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
    Стартуем одновременно 5 потоков и заставляем из одновременно вызвать синхронизированный метод f() одного объекта. Отметьте временную динамику - сразу выводится 5 чисел со знаком "+", через секунду - 5 чисел со знаком "-" (порядок вывода чисел - не детерминирован):
x
x
>> x
public class WaitSetExample {

    public static void main(String[] args) throws InterruptedException {
        WaitSetExample ref = new WaitSetExample();
        for (int k = 0; k < 5; k++) {
            new Thread(new WaitMethodCaller(ref, k)).start();
        }
    }

    public synchronized void f(int x) throws InterruptedException {
        System.out.println("+" + x);
        this.wait();
        System.out.println("-" + x);
    }
}


>>   +0 +4 +3 +1 +2 
... 1 секунда задержки и
>>   +0 +4 +3 +1 +2 -0 -1 -2 -4 -3


    Объяснение: пять разных потоков НЕ могут одновременно быть активными в синхронизированном методе одного объекта. Но при вызове wait() потоки "замирают" на этой строчке и попадают в wait-set. При этом они "отпускают блокировку" и позволяют зайти внутрь другим потокам.


    Демонстрация сложного поведения

    BoundedBuffer (ограниченный буфер) на один элемент.
x
x
>> x
public class SingleElementBuffer {
    private Integer elem = null;

    public synchronized void put(Integer newElem) throws InterruptedException {
        while (elem != null) {
            this.wait();
        }
        this.elem = newElem;
        this.notifyAll();
    }

    public synchronized Integer get() throws InterruptedException {
        while (elem == null) {
            this.wait();
        }
        int result = this.elem;
        this.elem = null;
        this.notifyAll();
        return result;
    }
}

    Класс-производитель (producer), производит последовательно числа начиная со startValue (startValue, startValue+1, startValue+2, startValue+3, ...) и помещает их в буфер (buffer.put(...)), спит period миллисекунд, повторяет (while(true) {...}).
x
x
>> x
public class Producer implements Runnable {
    private int startValue;
    private final int period;
    private final SingleElementBuffer buffer;

    public Producer(int startValue, int period, SingleElementBuffer buffer) {
        this.buffer = buffer;
        this.period = period;
        this.startValue = startValue;
    }

    @Override
    public void run() {
        while (true) {
            try {
                System.out.println(startValue + " produced");
                buffer.put(startValue++);
                Thread.sleep(period);
            } catch (InterruptedException e) {
                System.out.println(Thread.currentThread().getName() + " stopped.");
                return;
            }
        }
    }
}

    Класс-потребитель (consumer), с максимальной скоростью изымает числа из буфера (buffer.get()), выводит в консоль, повторяет (while(true) {...}).
x
x
>> x
public class Consumer implements Runnable {
    private final SingleElementBuffer buffer;

    public Consumer(SingleElementBuffer buffer) {
        this.buffer = buffer;
    }

    @Override
    public void run() {
        while (true) {
            try {
                int elem = buffer.get();
                System.out.println(elem + " consumed");
            } catch (InterruptedException e) {
                System.out.println(Thread.currentThread().getName() + " stopped.");
                return;
            }
        }
    }
}

    Система с одним потребителем сразу же блокируется (потребитель висит на очереди ожидая данных)
x
x
>> x
public class ProducerConsumerExample_0x1 {
    public static void main(String[] args) {
        SingleElementBuffer buffer = new SingleElementBuffer();
        new Thread(new Consumer(buffer)).start();
    }
}
>> ... повисла

    Система с одним производителем блокируется позже (успевает произвести 2 числа и повисает пытаясь поместить второе число в очередь еще занятую первым числом)
x
x
>> x
public class ProducerConsumerExample_1x0 {
    public static void main(String[] args) {
        SingleElementBuffer buffer = new SingleElementBuffer();
        new Thread(new Producer(1, 1000, buffer)).start();
    }
}

>> 1 produced
... задержка 1 секунда
>> 2 produced

... повисла

    Система из потребителя и производителя работает стабильно
x
x
>> x
public class ProducerConsumerExample_1x1 {
    public static void main(String[] args) {
        SingleElementBuffer buffer = new SingleElementBuffer();
        new Thread(new Producer(1, 1000, buffer)).start();
        new Thread(new Consumer(buffer)).start();
    }
}

>> 1 produced
>> 1 consumed
... задержка 1 секунда
>> 2 produced
>> 2 consumed
... задержка 1 секунда
>> 3 produced
>> 3 consumed
... задержка 1 секунда
>> 4 produced
>> 4 consumed
... и так далее



    Система с 3-мя производителями (с разной скоростью помещения элементов в буфер - 300, 500 и 700 миллисекунд) и 2-мя потребителями работает с "легкими рывками"
x
x
>> x
public class ProducerConsumerExample_3x2 {
    public static void main(String[] args) {
        SingleElementBuffer buffer = new SingleElementBuffer();
        Thread[] producers = new Thread[]{
                new Thread(new Producer(1, 950, buffer)),
                new Thread(new Producer(100, 1550, buffer)),
                new Thread(new Producer(1000, 2010, buffer)),
        };
        for (Thread producer : producers) {
            producer.start();
        }
        Thread[] consumers = new Thread[]{
                new Thread(new Consumer(buffer)),
                new Thread(new Consumer(buffer)),
        };
        for (Thread consumer : consumers) {
            consumer.start();
        }
    }
}

>>100 produced
>>1000 produced
>>1 produced
>>100 consumed
>>1 consumed
>>1000 consumed
>>2 produced
>>2 consumed
>>101 produced
>>101 consumed
>>3 produced
>>3 consumed
>>1001 produced
>>1001 consumed

... и так далее


    История вопроса

    Многопоточность сегодня
    "Software and the Concurrency Revolution". Herb Sutter, James Larus.

    Проблемы с потоками
    "The Problem with Threads". Edward A. Lee. 


    Литература




    Monitor
    "Java synchronization is based on the variety of monitors that were developed in the early 1970s by Per Brinch-Hansen and C. A. R. Hoare, but Java synchronization is not as well developed as Brinch-Hansen and Hoare monitors are."

    "Brinch-Hansen and Hoare called their solution monitors. They gave this definition of a monitor:
    A monitor is essentially a shared class with explicit queues."

    "This led Brinch-Hansen to express a great disappointment with Java’s synchronization. He writes: 
    Java’s most serious mistake was the decision to use the sequential part of the language to implement the run-time support for the parallel features. In 1975, Concurrent Pascal demonstrated that platform-independent parallel programs (even small operating systems) can be written as a secure programming language with monitors. It is astounding to me that Java’s insecure parallelism is taken seriously by the programming language community a quarter of a century after the invention of monitors and Concurrent Pascal. It has no merit.Brinch-Hansen, Per, “Java’s Insecure Parallelism,” ACM SIGPLAN Notices, 34(4) April 1999.


    Терминология

    Monitor, conditional variable, wait-set, blocked-set, spirious wakeup, monitor owner.