x
Синтаксис
synchronized/wait()/notify()/notifyAll() крепко увязаны вместе в концепции монитора (monitor). Это приводит к интересным побочным эффектам - нельзя вызывать у объекта wait()/notify()/notifyAll() вне блока синхронизации (или синхронизированного метода) по этому объекту (иначе будет IllegalMonitorStateException):
Класс-производитель (producer), производит последовательно числа начиная со startValue (startValue, startValue+1, startValue+2, startValue+3, ...) и помещает их в буфер (buffer.put(...)), спит period миллисекунд, повторяет (while(true) {...}).
x
>> 1 produced
... задержка 1 секунда
>> 2 produced
Система из потребителя и производителя работает стабильно
x
>> 1 produced
>> 1 consumed
... задержка 1 секунда
>> 2 produced
>> 2 consumed
... задержка 1 секунда
>> 3 produced
>> 3 consumed
... задержка 1 секунда
>> 4 produced
>> 4 consumed
... и так далее
>>100 produced
>>1000 produced
>>1 produced
>>100 consumed
>>1 consumed
>>1000 consumed
>>2 produced
>>2 consumed
>>101 produced
>>101 consumed
>>3 produced
>>3 consumed
>>1001 produced
>>1001 consumed
История вопроса
"Brinch-Hansen and Hoare called their solution monitors. They gave this definition of a monitor:
A monitor is essentially a shared class with explicit queues."
"This led Brinch-Hansen to express a great disappointment with Java’s synchronization. He writes:
Java’s most serious mistake was the decision to use the sequential part of the language to implement the run-time support for the parallel features. In 1975, Concurrent Pascal demonstrated that platform-independent parallel programs (even small operating systems) can be written as a secure programming language with monitors. It is astounding to me that Java’s insecure parallelism is taken seriously by the programming language community a quarter of a century after the invention of monitors and Concurrent Pascal. It has no merit." Brinch-Hansen, Per, “Java’s Insecure Parallelism,” ACM SIGPLAN Notices, 34(4) April 1999.
Терминология
Monitor, conditional variable, wait-set, blocked-set, spirious wakeup, monitor owner.
x
>> x
Синтаксис
synchronized/wait()/notify()/notifyAll() крепко увязаны вместе в концепции монитора (monitor). Это приводит к интересным побочным эффектам - нельзя вызывать у объекта wait()/notify()/notifyAll() вне блока синхронизации (или синхронизированного метода) по этому объекту (иначе будет IllegalMonitorStateException):
public class WaitNotifySynchronizedTest_0 {
public static void main(String[] args) {
new Object().notify();
}
}
>> IllegalMonitorStateException
public class WaitNotifySynchronizedTest_1 { public static void main(String[] args) { new Object().notifyAll(); } }
>> IllegalMonitorStateException
public class WaitNotifySynchronizedTest_2 { public static void main(String[] args) throws InterruptedException { new Object().wait(); } }
>> IllegalMonitorStateException
А так можно (ничего не произойдет):
public class Example {
public static void main(String[] args) {
Object ref = new Object();
synchronized (ref) {
ref.notify();
}
}
}
И вот так можно (ничего не произойдет):
public class Example { public static void main(String[] args) { Object ref = new Object(); synchronized (ref) { ref.notifyAll(); } } }
А вот так исключения не будет, но программа "повиснет":
public class Example { public static void main(String[] args) throws InterruptedException { Object ref = new Object(); synchronized (ref) { ref.wait(); } } }
Синхронизация делается по конкретному объекту, а не по ссылке (в данном примере две ссылки ref0 и ref1 ссылаются на один и тот же объект):
public class WaitNotifySynchronizedTest_3 { public static void main(String[] args) { Object ref0 = new Object(); Object ref1 = ref0; synchronized (ref1) { ref0.notify(); } } }
Исключение, так как синхронизация делается по одному объекту, а метод вызываем у другого:
public class WaitNotifySynchronizedTest_4 {
public static void main(String[] args){
Object ref0 = new Object();
Object ref1 = new Object();
synchronized (ref0) {
ref1.notify();
}
}
}
>> IllegalMonitorStateException
Аналогично:
public class WaitNotifySynchronizedTest_4 {
public static void main(String[] args){
synchronized (new Object()) {
new Object().notify();
}
}
}
>> IllegalMonitorStateException
Можно многократно синхронизироваться по одному объекту (идентично однократной синхронизации) - в данной программе ничего не произойдет:
public class WaitNotifySynchronizedTest_5 { public static void main(String[] args) { Object ref0 = new Object(); Object ref1 = new Object(); synchronized (ref0) { synchronized (ref0) { ref0.notify(); } } } }
Можно многократно вложено синхронизироваться по разным объектам (строить вложенные конструкции) - в данной программе ничего не произойдет:
public class WaitNotifySynchronizedTest_5 { public static void main(String[] args) { Object ref0 = new Object(); Object ref1 = new Object(); synchronized (ref0) { synchronized (ref1) { ref0.notify(); ref1.notify(); } } } }
Синхронизированные методы аналогичны синхронизированным секциям:
public class WaitNotifySynchronizedTest_6A { public static void main(String[] args) { new WaitNotifySynchronizedTest_6A().f(); } public synchronized void f() { this.notify(); } }
Предыдущий пример идентичен этому:
public class WaitNotifySynchronizedTest_6B {
public static void main(String[] args) throws InterruptedException {
new WaitNotifySynchronizedTest_6B().f();
}
public void f() {
synchronized (this) {
this.notify();
}
}
}
Со статиками сложнее - они синхронизируются по объекту класса:
public class WaitNotifySynchronizedTest_7A { public static void main(String[] args) { f(); } public static synchronized void f() { Class clazz = WaitNotifySynchronizedTest_7A.class; clazz.notify(); } }
Предыдущий пример идентичен этому:
public class WaitNotifySynchronizedTest_7B {
public static void main(String[] args) {
f();
}
public static void f() {
Class clazz = WaitNotifySynchronizedTest_7B.class;
synchronized (clazz) {
clazz.notify();
}
}
}
Демонстрация простого поведения
Вспомогательный класс (в методе run() вызовет метод f() объекта, полученного в конструкторе):
x
x
x
>> x
public class BlockedMethodCaller_A implements Runnable {
private final BlockedSetExample_A ref;
private final int k;
public BlockedMethodCaller_A(BlockedSetExample_A ref, int k) {
this.ref = ref;
this.k = k;
}
@Override
public void run() {
try {
ref.f(k);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Стартуем одновременно 5 потоков и заставляем из одновременно вызвать НЕ синхронизированный метод f() одного объекта. Отметьте временную динамику - сразу выводится 5 чисел со знаком "+", через секунду - 5 чисел со знаком "-" (порядок вывода чисел - не детерминирован):
x
x
x
>> x
public class BlockedSetExample_A {
public static void main(String[] args) throws InterruptedException {
BlockedSetExample_A ref = new BlockedSetExample_A();
for (int k = 0; k < 5; k++) {
new Thread(new BlockedMethodCaller_A(ref, k)).start();
}
}
public void f(int x) throws InterruptedException {
System.out.print(" +" + x);
Thread.sleep(1000);
System.out.print(" -" + x);
}
}
>> +0 +4 +3 +1 +2
... 1 секунда задержки и
>> +0 +4 +3 +1 +2 -0 -1 -2 -4 -3
Объяснение: пять разных потоков могут одновременно зайти в не синхронизированный метод одного объекта.
Вспомогательный класс (в методе run() вызовет метод f() объекта, полученного в конструкторе):
x
x
x
>> x
public class BlockedMethodCaller_B implements Runnable {
private final BlockedSetExample_B ref;
private final int k;
public BlockedMethodCaller_B(BlockedSetExample_B ref, int k) {
this.ref = ref;
this.k = k;
}
@Override
public void run() {
try {
ref.f(k);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Стартуем одновременно 5 потоков и заставляем из одновременно вызвать синхронизированный метод f() одного объекта. Отметьте временную динамику - сразу выводится число со знаком "+", через секунду - пара (число со знаком "-" и число со знаком "+") (порядок вывода чисел - не детерминирован = за "+x" всегда идет "-x", но за "-x" не определено какой именно будет "+y"):
x
x
x
>> x
public class BlockedSetExample_B {
public static void main(String[] args) throws InterruptedException {
BlockedSetExample_B ref = new BlockedSetExample_B();
for (int k = 0; k < 5; k++) {
new Thread(new BlockedMethodCaller_B(ref, k)).start();
}
}
public synchronized void f(int x) throws InterruptedException {
System.out.print(" +" + x);
Thread.sleep(1000);
System.out.print(" -" + x);
}
}
>> +0
... 1 секунда задержки и
>> +0 -0 +4
... 1 секунда задержки и
>> +0 -0 +4 -4 +3
... 1 секунда задержки и
>> +0 -0 +4 -4 +3 -3 +2
... 1 секунда задержки и
>> +0 -0 +4 -4 +3 -3 +2 -2 +1
... 1 секунда задержки и
>> +0 -0 +4 -4 +3 -3 +2 -2 +1 -1
Объяснение: пять разных потоков НЕ могут одновременно зайти в синхронизированный метод одного объекта. Они помещаются в blocking-set этого объекта и виртуальная машина запускает их по одному.
Вспомогательный класс (в методе run() вызовет метод f() объекта, полученного в конструкторе):
x
x
x
>> x
public class BlockedMethodCaller_C implements Runnable {
private final BlockedSetExample_C ref;
private final int k;
public BlockedMethodCaller_C(BlockedSetExample_C ref, int k) {
this.ref = ref;
this.k = k;
}
@Override
public void run() {
try {
ref.f(k);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Стартуем одновременно 5 потоков и заставляем из одновременно вызвать синхронизированный метод f() разных объектов одного класса. Отметьте временную динамику - сразу выводится 5 чисел со знаком "+", через секунду - 5 чисел со знаком "-" (порядок вывода чисел - не детерминирован):
x
x
x
>> x
public class BlockedSetExample_C {
public static void main(String[] args) throws InterruptedException {
for (int k = 0; k < 5; k++) {
new Thread(new BlockedMethodCaller_C(new BlockedSetExample_C(), k)).start();
}
}
public synchronized void f(int x) throws InterruptedException {
System.out.println("+" + x);
Thread.sleep(1000);
System.out.println("-" + x);
}
}
>> +0 +4 +3 +1 +2
... 1 секунда задержки и
>> +0 +4 +3 +1 +2 -0 -1 -2 -4 -3
Объяснение: пять разных потоков могут одновременно зайти в синхронизированный метод разных объектов (синхронизация по разным объектам).
Вспомогательный класс (в методе run() вызовет метод f() объекта, полученного в конструкторе):
x
x
x
>> x
public class WaitMethodCaller implements Runnable {
private final WaitSetExample ref;
private final int k;
public WaitMethodCaller(WaitSetExample ref, int k) {
this.ref = ref;
this.k = k;
}
@Override
public void run() {
try {
ref.f(k);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Стартуем одновременно 5 потоков и заставляем из одновременно вызвать синхронизированный метод f() одного объекта. Отметьте временную динамику - сразу выводится 5 чисел со знаком "+", через секунду - 5 чисел со знаком "-" (порядок вывода чисел - не детерминирован):
x
x
x
>> x
public class WaitSetExample {
public static void main(String[] args) throws InterruptedException {
WaitSetExample ref = new WaitSetExample();
for (int k = 0; k < 5; k++) {
new Thread(new WaitMethodCaller(ref, k)).start();
}
}
public synchronized void f(int x) throws InterruptedException {
System.out.println("+" + x);
this.wait();
System.out.println("-" + x);
}
}
>> +0 +4 +3 +1 +2
... 1 секунда задержки и
>> +0 +4 +3 +1 +2 -0 -1 -2 -4 -3
Объяснение: пять разных потоков НЕ могут одновременно быть активными в синхронизированном методе одного объекта. Но при вызове wait() потоки "замирают" на этой строчке и попадают в wait-set. При этом они "отпускают блокировку" и позволяют зайти внутрь другим потокам.
Демонстрация сложного поведения
BoundedBuffer (ограниченный буфер) на один элемент.
x
x
x
>> x
public class SingleElementBuffer {
private Integer elem = null;
public synchronized void put(Integer newElem) throws InterruptedException {
while (elem != null) {
this.wait();
}
this.elem = newElem;
this.notifyAll();
}
public synchronized Integer get() throws InterruptedException {
while (elem == null) {
this.wait();
}
int result = this.elem;
this.elem = null;
this.notifyAll();
return result;
}
}
Класс-производитель (producer), производит последовательно числа начиная со startValue (startValue, startValue+1, startValue+2, startValue+3, ...) и помещает их в буфер (buffer.put(...)), спит period миллисекунд, повторяет (while(true) {...}).
x
x
>> x
public class Producer implements Runnable {
private int startValue;
private final int period;
private final SingleElementBuffer buffer;
public Producer(int startValue, int period, SingleElementBuffer buffer) {
this.buffer = buffer;
this.period = period;
this.startValue = startValue;
}
@Override
public void run() {
while (true) {
try {
System.out.println(startValue + " produced");
buffer.put(startValue++);
Thread.sleep(period);
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " stopped.");
return;
}
}
}
}
Класс-потребитель (consumer), с максимальной скоростью изымает числа из буфера (buffer.get()), выводит в консоль, повторяет (while(true) {...}).
x
x
x
>> x
public class Consumer implements Runnable {
private final SingleElementBuffer buffer;
public Consumer(SingleElementBuffer buffer) {
this.buffer = buffer;
}
@Override
public void run() {
while (true) {
try {
int elem = buffer.get();
System.out.println(elem + " consumed");
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " stopped.");
return;
}
}
}
}
Система с одним потребителем сразу же блокируется (потребитель висит на очереди ожидая данных)
x
x
x
>> x
public class ProducerConsumerExample_0x1 {
public static void main(String[] args) {
SingleElementBuffer buffer = new SingleElementBuffer();
new Thread(new Consumer(buffer)).start();
}
}
>> ... повисла
Система с одним производителем блокируется позже (успевает произвести 2 числа и повисает пытаясь поместить второе число в очередь еще занятую первым числом)
x
x
x
>> x
public class ProducerConsumerExample_1x0 {
public static void main(String[] args) {
SingleElementBuffer buffer = new SingleElementBuffer();
new Thread(new Producer(1, 1000, buffer)).start();
}
}
>> 1 produced
... задержка 1 секунда
>> 2 produced
... повисла
Система из потребителя и производителя работает стабильно
x
x
>> x
public class ProducerConsumerExample_1x1 {
public static void main(String[] args) {
SingleElementBuffer buffer = new SingleElementBuffer();
new Thread(new Producer(1, 1000, buffer)).start();
new Thread(new Consumer(buffer)).start();
}
}
>> 1 produced
>> 1 consumed
... задержка 1 секунда
>> 2 produced
>> 2 consumed
... задержка 1 секунда
>> 3 produced
>> 3 consumed
... задержка 1 секунда
>> 4 produced
>> 4 consumed
... и так далее
Система с 3-мя производителями (с разной скоростью помещения элементов в буфер - 300, 500 и 700 миллисекунд) и 2-мя потребителями работает с "легкими рывками"
x
x
x
>> x
public class ProducerConsumerExample_3x2 {
public static void main(String[] args) {
SingleElementBuffer buffer = new SingleElementBuffer();
Thread[] producers = new Thread[]{
new Thread(new Producer(1, 950, buffer)),
new Thread(new Producer(100, 1550, buffer)),
new Thread(new Producer(1000, 2010, buffer)),
};
for (Thread producer : producers) {
producer.start();
}
Thread[] consumers = new Thread[]{
new Thread(new Consumer(buffer)),
new Thread(new Consumer(buffer)),
};
for (Thread consumer : consumers) {
consumer.start();
}
}
}
>>100 produced
>>1000 produced
>>1 produced
>>100 consumed
>>1 consumed
>>1000 consumed
>>2 produced
>>2 consumed
>>101 produced
>>101 consumed
>>3 produced
>>3 consumed
>>1001 produced
>>1001 consumed
... и так далее
История вопроса
Многопоточность сегодня
"Software and the Concurrency Revolution". Herb Sutter, James Larus.
Проблемы с потоками
"The Problem with Threads". Edward A. Lee.
"Why Threads Are A Bad Idea (for most purposes)". John Ousterhout.
Monitor
"Java synchronization is based on the variety of monitors that were developed in the early 1970s by Per Brinch-Hansen and C. A. R. Hoare, but Java synchronization is not as well developed as Brinch-Hansen and Hoare monitors are."
"Brinch-Hansen and Hoare called their solution monitors. They gave this definition of a monitor:
A monitor is essentially a shared class with explicit queues."
"This led Brinch-Hansen to express a great disappointment with Java’s synchronization. He writes:
Java’s most serious mistake was the decision to use the sequential part of the language to implement the run-time support for the parallel features. In 1975, Concurrent Pascal demonstrated that platform-independent parallel programs (even small operating systems) can be written as a secure programming language with monitors. It is astounding to me that Java’s insecure parallelism is taken seriously by the programming language community a quarter of a century after the invention of monitors and Concurrent Pascal. It has no merit." Brinch-Hansen, Per, “Java’s Insecure Parallelism,” ACM SIGPLAN Notices, 34(4) April 1999.
Терминология
Monitor, conditional variable, wait-set, blocked-set, spirious wakeup, monitor owner.