Многопоточность в Java
Thread/Runnable
- thread.thread.play_the_accordion (обязательно)
- thread.thread.rabbit_attack (обязательно)
synchronized/wait()/notify()/notifyAll()
- thread.sync_wait_notify.optimized_buffer_a
- thread.sync_wait_notify.optimized_buffer_b
- thread.sync_wait_notify.timed_buffer (обязательно)
interrupt() / InterruptedException
- thread.interrupt.interruptable_buffer (обязательно)
Thread/Runnable
В лабораторных ниже используется общий класс PrintRunnable
synchronized/wait()/notify()/notifyAll()
thread.sync_wait_notify.optimized_buffer_a
В качестве ответа на лабораторную напишите - будет ли корректно или некорректно работать эта реализация ограниченного блокирующего буфера и почему
Заметим, что если нет потребителей, но есть производители, то производители блокируются навечно (в реализации буфера SingleElementBuffer с лекции):
Задание: Вам дана "заготовка" класса SingleElementBufferTimed, который должен в случае превышения времени ожидания в wait(long) выбросить исключение TimeoutException. В данный момент он ожидает корректно, но исключение не выбрасывает. Ваша задача дописать что-то в местах комментариев, что решает бросать исключение или нет и, в случае надобности, бросает:
Это Потребитель для такого буфера
>> 1 produced
>> 2 produced
... 3000 миллисекунд
>> Thread-0 time out.
>> Consumer time out.
>> 1 produced
>> 1 consumed
... 1000 миллисекунд
>> Consumer time out.
... 500 миллисекунд
>> 2 produced
... 1500 миллисекунд
>> 3 produced
... 100 миллисекунд
>> Producer time out.
interrupt() / InterruptedException
thread.interrupt.interruptable_buffer (обязательно)
Предлагается сделать блокирующий буфер на один элемент, но передачу сообщения делать не на основе wait()/notify(), а на основе wait()/interrupt(). В следующей ниже "заготовке" замените '// ?' на свои строки.
Основная идея заключается в том, что мы явно поддерживаем односвязные списки ожидающих производителей и потребителей. Если нет работы, то поток сам себя добавляет в соответствующий список. Если поток создал работу, то он удаляет поток из соответствующего списка и "будит" его.
"Заготовка" буфера:
Thread/Runnable
- thread.thread.play_the_accordion (обязательно)
- thread.thread.rabbit_attack (обязательно)
synchronized/wait()/notify()/notifyAll()
- thread.sync_wait_notify.optimized_buffer_a
- thread.sync_wait_notify.optimized_buffer_b
- thread.sync_wait_notify.timed_buffer (обязательно)
interrupt() / InterruptedException
- thread.interrupt.interruptable_buffer (обязательно)
Thread/Runnable
В лабораторных ниже используется общий класс PrintRunnable
public class PrintRunnable implements Runnable {
private String msg;
private long sleepMillis;
public PrintRunnable(String msg, long sleepMillis) {
this.msg = msg;
this.sleepMillis = sleepMillis;
}
@Override
public void run() {
for (int k = 0; k < 10; k++) {
try {
Thread.sleep(sleepMillis);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(msg);
}
}
}
thread.thread.play_the_accordion (обязательно)
public class Lab_Thread_Thread_Play_The_Accordion {
public static void main(String[] args) throws InterruptedException {
for (int k = 0; k < 10; k++) {
// A + B
Runnable printerA = new PrintRunnable("A .", 100);
Thread threadA = new Thread(printerA);
threadA.start();
Runnable printerB = new PrintRunnable(". B", 99);
Thread threadB = new Thread(printerB);
threadB.start();
threadA.join();
threadB.join();
// C
System.out.println("-----");
Runnable printerC = new PrintRunnable(" C", 100);
printerC.run();
System.out.println("-----");
}
}
}
Модифицируйте/допишите код сохранив функциональность, но увеличив количество потоков.
Сейчас есть 3 потока:
1) поток метода main(...)
2) поток threadA
3) поток threadB
Требуется добавить потоки threadC и threadCoordinator.
4) в threadC из потока метода main() вынести печать буквы 'C'
5) в threadCoordinator вынести из метода main(...) функционал создания, старта и ожидания завершения потоков threadA+threadB+threadC.
Общий сценарий должен быть следующим:
- поток метода main(...) создает, стартует и ожидает завершения потока threadCoordinator
- поток threadCoordinator циклически создает, запускает потоки threadA и threadB (они работают одновременно), дожидается их окончания, по окончанию создает, стартует и ожидает завершения потока threadC.
Сейчас есть 3 потока:
1) поток метода main(...)
2) поток threadA
3) поток threadB
Требуется добавить потоки threadC и threadCoordinator.
4) в threadC из потока метода main() вынести печать буквы 'C'
5) в threadCoordinator вынести из метода main(...) функционал создания, старта и ожидания завершения потоков threadA+threadB+threadC.
Общий сценарий должен быть следующим:
- поток метода main(...) создает, стартует и ожидает завершения потока threadCoordinator
- поток threadCoordinator циклически создает, запускает потоки threadA и threadB (они работают одновременно), дожидается их окончания, по окончанию создает, стартует и ожидает завершения потока threadC.
thread.thread.rabbit_attack (обязательно)
Модифицируйте/допишите код: сейчас поток в котором работает метод main() циклически выбрасывает потоки-"сигнальные ракеты", те пишут 10 раз в консоль и умирают. На лицо асимметрия (main - порождает, Printer - печатает). Сделайте новый класс RabbitPrinter, который И пишет в консоль И циклически порождает каждую секунду новые RabbitPrinter-ы.
public class Lab_Thread_Thread_Rabbit_Attack {
public static void main(String[] args) throws InterruptedException {
for (int k = 1; k < 10000000; k++) {
String spaces = spaces(k);
Runnable printer = new PrintRunnable(spaces + k, 100);
Thread thread = new Thread(printer);
thread.start();
Thread.sleep(100);
}
}
private static String spaces(int count) {
String result = "";
for (int i = 0; i < count; i++) {
result += " ";
}
return result;
}
}
public clacc RabbitLab {
public static void main(String[] args) {
new Thread(new RabbitPrinter()).start();
}
}
class RabbitPrinter implements Runnable {
public void run() {
System.out.println("New rabbit born!");
System.out.println("New rabbit born!");
...
while (...) {
new Thread().start(new RabbitPrinter());
....
}
}
}
synchronized/wait()/notify()/notifyAll()
thread.sync_wait_notify.optimized_buffer_a
В качестве ответа на лабораторную напишите - будет ли корректно или некорректно работать эта реализация ограниченного блокирующего буфера и почему
public class SingleElementBufferOptimized_A {
private int waitedProducers = 0;
private int waitedConsumers = 0;
private Integer elem = null;
public synchronized void put(Integer newElem) throws InterruptedException {
while (elem != null) {
waitedProducers++;
this.wait();
waitedProducers--;
}
this.elem = newElem;
if (waitedConsumers > 0) {
this.notify();
}
}
public synchronized Integer get() throws InterruptedException {
while (elem == null) {
waitedConsumers++;
this.wait();
waitedConsumers--;
}
int result = this.elem;
this.elem = null;
if (waitedProducers > 0) {
this.notify();
}
return result;
}
}
thread.sync_wait_notify.optimized_buffer_b
В качестве ответа на лабораторную напишите - будет ли корректно или некорректно работать эта реализация ограниченного блокирующего буфера и почему
public class SingleElementBufferOptimized_B {
private Integer elem = null;
public synchronized void put(Integer newElem) throws InterruptedException {
while (elem != null) {
this.wait();
if (elem != null) {
this.notify();
}
}
this.elem = newElem;
this.notify();
}
public synchronized Integer get() throws InterruptedException {
while (elem == null) {
this.wait();
if (elem == null) {
this.notify();
}
}
int result = this.elem;
this.elem = null;
this.notify();
return result;
}
}
thread.sync_wait_notify.timed_buffer (обязательно)
Заметим, что если нет потребителей, но есть производители, то производители блокируются навечно (в реализации буфера SingleElementBuffer с лекции):
public class Test {
public static void main(String[] args) {
SingleElementBuffer buffer = new SingleElementBuffer();
new Thread(new Producer(1, 1000, buffer)).start();
}
}
Аналогично, если нет производителей, но есть потребители, то потребители блокируются навечно (в реализации буфера SingleElementBuffer с лекции):
public class Test {
public static void main(String[] args) {
SingleElementBuffer buffer = new SingleElementBuffer();
new Thread(new Consumer(buffer)).start();
}
}
Задание: Вам дана "заготовка" класса SingleElementBufferTimed, который должен в случае превышения времени ожидания в wait(long) выбросить исключение TimeoutException. В данный момент он ожидает корректно, но исключение не выбрасывает. Ваша задача дописать что-то в местах комментариев, что решает бросать исключение или нет и, в случае надобности, бросает:
import java.util.concurrent.TimeoutException;
public class SingleElementBufferTimed_question {
private Integer elem = null;
public synchronized void put(Integer newElem, long timeout) throws InterruptedException, TimeoutException {
long waitTime = timeout;
while (elem != null && waitTime > 0) {
long t0 = System.currentTimeMillis();
wait(waitTime);
long t1 = System.currentTimeMillis();
long elapsedTime = t1 - t0;
waitTime -= elapsedTime;
}
// todo: insert throw new TimeoutException
this.elem = newElem;
this.notifyAll();
}
public synchronized Integer get(long timeout) throws InterruptedException, TimeoutException {
long waitTime = timeout;
while (elem == null && waitTime > 0) {
long t0 = System.currentTimeMillis();
wait(waitTime);
long t1 = System.currentTimeMillis();
long elapsedTime = t1 - t0;
waitTime -= elapsedTime;
}
// todo: insert throw new TimeoutException
int result = this.elem;
this.elem = null;
this.notifyAll();
return result;
}
}
Это Производитель для такого буфера
public class ProducerTimed implements Runnable {
private int startValue;
private final int period;
private final SingleElementBufferTimed buffer;
private final long timeout;
public ProducerTimed(int startValue, int period, SingleElementBufferTimed buffer, long timeout) {
this.buffer = buffer;
this.period = period;
this.startValue = startValue;
this.timeout = timeout;
}
@Override
public void run() {
while (true) {
try {
System.out.println(startValue + " produced");
buffer.put(startValue++, timeout);
Thread.sleep(period);
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " stopped.");
return;
} catch (TimeoutException e) {
System.out.println(Thread.currentThread().getName() + " time out.");
return;
}
}
}
}
Это Потребитель для такого буфера
public class ConsumerTimed implements Runnable {
private final SingleElementBufferTimed buffer;
private final long timeout;
public ConsumerTimed(SingleElementBufferTimed buffer, long timeout) {
this.buffer = buffer;
this.timeout = timeout;
}
@Override
public void run() {
while (true) {
try {
int elem = buffer.get(timeout);
System.out.println(elem + " consumed");
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " stopped.");
return;
} catch (TimeoutException e) {
System.out.println(Thread.currentThread().getName() + " time out.");
return;
}
}
}
}
С ним одинокий Производитель не ждет вечно
public class SingleElementBufferTimedTest {
public static void main(String[] args) {
SingleElementBufferTimed buffer = new SingleElementBufferTimed();
new Thread(new ProducerTimed(1, 1000, buffer, 100)).start();
}
}
>> 1 produced
>> 2 produced
... 3000 миллисекунд
>> Thread-0 time out.
С ним одинокий Потребитель не ждет вечно
public class SingleConsumerTimedExample {
public static void main(String[] args) {
SingleElementBufferTimed buffer = new SingleElementBufferTimed();
new Thread(new ConsumerTimed(buffer, 3000), "Consumer").start();
}
}
... 3000 миллисекунд>> Consumer time out.
Нетерпеливый потребитель не дожидается медленного производителя и умирает, после - умирает одинокий производитель (спасибо Шекспиру за сюжет)
public class SlowProducerExample {
public static void main(String[] args) {
int producerSleepTime = 1500;
int consumerWaitTime = 1000;
SingleElementBufferTimed buffer = new SingleElementBufferTimed();
new Thread(new ProducerTimed(1, producerSleepTime, buffer, 100), "Producer").start();
new Thread(new ConsumerTimed(buffer, consumerWaitTime), "Consumer").start();
}
}
>> 1 produced
>> 1 consumed
... 1000 миллисекунд
>> Consumer time out.
... 500 миллисекунд
>> 2 produced
... 1500 миллисекунд
>> 3 produced
... 100 миллисекунд
>> Producer time out.
interrupt() / InterruptedException
thread.interrupt.interruptable_buffer (обязательно)
Предлагается сделать блокирующий буфер на один элемент, но передачу сообщения делать не на основе wait()/notify(), а на основе wait()/interrupt(). В следующей ниже "заготовке" замените '// ?' на свои строки.
Основная идея заключается в том, что мы явно поддерживаем односвязные списки ожидающих производителей и потребителей. Если нет работы, то поток сам себя добавляет в соответствующий список. Если поток создал работу, то он удаляет поток из соответствующего списка и "будит" его.
"Заготовка" буфера:
public class InterruptBuffer_q {
private ThreadNode producers = null;
private ThreadNode consumers = null;
private Integer elem = null;
public synchronized void put(int newElem) {
while (elem != null) {
try {
// ?
this.wait();
} catch (InterruptedException e) {/*NOP*/}
}
elem = newElem;
if (consumers != null) {
consumers.thread.interrupt();
// ?
}
}
public synchronized int get() {
while (elem == null) {
try {
// ?
this.wait();
} catch (InterruptedException e) {/*NOP*/}
}
int result = elem;
elem = null;
if (producers != null) {
producers.thread.interrupt();
// ?
}
return result;
}
}
Звено односвязного списка
public class ThreadNode {
public final Thread thread;
public final ThreadNode nextNode;
public ThreadNode(Thread thread, ThreadNode nextNode) {
this.thread = thread;
this.nextNode = nextNode;
}
}
Производитель
public class InterruptedProducer implements Runnable {
private int startValue;
private final InterruptBuffer buffer;
public InterruptedProducer(int startValue, InterruptBuffer buffer) {
this.buffer = buffer;
this.startValue = startValue;
}
@Override
public void run() {
while (true) {
try {
System.out.println(startValue + " produced");
buffer.put(startValue++);
Thread.sleep((int) (1000 * Math.random()));
} catch (InterruptedException e) {
return;
}
}
}
}
Потребитель
public class InterruptedConsumer implements Runnable {
private final InterruptBuffer buffer;
public InterruptedConsumer(InterruptBuffer buffer) {
this.buffer = buffer;
}
@Override
public void run() {
while (true) {
int elem = buffer.get();
System.out.println(elem + " consumed");
}
}
}
Пример использования
public class ProducerConsumerExample_3x2 {
public static void main(String[] args) {
InterruptBuffer buffer = new InterruptBuffer();
Thread[] producers = new Thread[]{
new Thread(new InterruptedProducer(1, buffer)),
new Thread(new InterruptedProducer(100, buffer)),
new Thread(new InterruptedProducer(1000, buffer)),
};
for (Thread producer : producers) {
producer.start();
}
Thread[] consumers = new Thread[]{
new Thread(new InterruptedConsumer(buffer)),
new Thread(new InterruptedConsumer(buffer)),
};
for (Thread consumer : consumers) {
consumer.start();
}
}
}
>> 1 produced
>> 100 produced
>> 1000 produced
>> 1 consumed
>> 100 consumed
>> 1000 consumed
>> 1001 produced
>> 1001 consumed
>> 101 produced
>> 101 consumed
>> 2 produced
>> 2 consumed
... и так до бесконечности