среда, 1 августа 2012 г.

core-labs: threads

Многопоточность в Java


    Thread/Runnable
        - thread.thread.play_the_accordion (обязательно)
        - thread.thread.rabbit_attack (обязательно)       

    synchronized/wait()/notify()/notifyAll()
        - thread.sync_wait_notify.optimized_buffer_a
        - thread.sync_wait_notify.optimized_buffer_b
        - thread.sync_wait_notify.timed_buffer (обязательно)       

    interrupt() / InterruptedException
        - thread.interrupt.interruptable_buffer (обязательно)       


    Thread/Runnable

    В лабораторных ниже используется общий класс PrintRunnable

public class PrintRunnable implements Runnable {
    private String msg;
    private long sleepMillis;

    public PrintRunnable(String msg, long sleepMillis) {
        this.msg = msg;
        this.sleepMillis = sleepMillis;
    }

    @Override
    public void run() {
        for (int k = 0; k < 10; k++) {
            try {
                Thread.sleep(sleepMillis);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println(msg);
        }
    }
}




    thread.thread.play_the_accordion (обязательно)
public class Lab_Thread_Thread_Play_The_Accordion {
    public static void main(String[] args) throws InterruptedException {
        for (int k = 0; k < 10; k++) {
            // A + B
            Runnable printerA = new PrintRunnable("A   .", 100);
            Thread threadA = new Thread(printerA);
            threadA.start();
            Runnable printerB = new PrintRunnable(".   B", 99);
            Thread threadB = new Thread(printerB);
            threadB.start();
            threadA.join();
            threadB.join();
            // C
            System.out.println("-----");
            Runnable printerC = new PrintRunnable("  C", 100);
            printerC.run();
            System.out.println("-----");
        }
    }
}
    Модифицируйте/допишите код сохранив функциональность, но увеличив количество потоков. 
    Сейчас есть 3 потока:
1) поток метода main(...)
2) поток threadA
3) поток threadB
    Требуется добавить потоки threadC и threadCoordinator.
4) в threadC из потока метода main() вынести печать буквы 'C'
5) в threadCoordinator вынести из метода main(...) функционал создания, старта и ожидания завершения потоков threadA+threadB+threadC.

    Общий сценарий должен быть следующим:
- поток метода main(...) создает, стартует и ожидает завершения потока threadCoordinator
- поток threadCoordinator циклически создает, запускает потоки threadA и threadB (они работают одновременно), дожидается их окончания, по окончанию создает, стартует и ожидает завершения потока threadC.

    thread.thread.rabbit_attack (обязательно)
    Модифицируйте/допишите код: сейчас поток в котором работает метод main() циклически выбрасывает потоки-"сигнальные ракеты", те пишут 10 раз в консоль и умирают. На лицо асимметрия (main - порождает, Printer - печатает). Сделайте новый класс RabbitPrinter, который И пишет в консоль И циклически порождает каждую секунду новые RabbitPrinter-ы.
public class Lab_Thread_Thread_Rabbit_Attack {

    public static void main(String[] args) throws InterruptedException {
        for (int k = 1; k < 10000000; k++) {
            String spaces = spaces(k);
            Runnable printer = new PrintRunnable(spaces + k, 100);
            Thread thread = new Thread(printer);
            thread.start();
            Thread.sleep(100);
        }
    }

    private static String spaces(int count) {
        String result = "";
        for (int i = 0; i < count; i++) {
            result += " ";
        }
        return result;
    }
}

public clacc RabbitLab {
    public static void main(String[] args) {
        new Thread(new RabbitPrinter()).start();        
    }
}

class RabbitPrinter implements Runnable {
    public void run() {
        System.out.println("New rabbit born!");
        ...
        while (...) {
            new Thread().start(new RabbitPrinter());
            ....      
        }        
    }
}

    synchronized/wait()/notify()/notifyAll()

    thread.sync_wait_notify.optimized_buffer_a 


    В качестве ответа на лабораторную напишите - будет ли корректно или некорректно работать эта реализация ограниченного блокирующего буфера и почему
public class SingleElementBufferOptimized_A {
    private int waitedProducers = 0;
    private int waitedConsumers = 0;
    private Integer elem = null;

    public synchronized void put(Integer newElem) throws InterruptedException {
        while (elem != null) {
            waitedProducers++;
            this.wait();
            waitedProducers--;
        }
        this.elem = newElem;
        if (waitedConsumers > 0) {
            this.notify();
        }
    }

    public synchronized Integer get() throws InterruptedException {
        while (elem == null) {
            waitedConsumers++;
            this.wait();
            waitedConsumers--;
        }
        int result = this.elem;
        this.elem = null;
        if (waitedProducers > 0) {
            this.notify();
        }
        return result;
    }
}

    thread.sync_wait_notify.optimized_buffer_b 

    В качестве ответа на лабораторную напишите - будет ли корректно или некорректно работать эта реализация ограниченного блокирующего буфера и почему


public class SingleElementBufferOptimized_B {
    private Integer elem = null;

    public synchronized void put(Integer newElem) throws InterruptedException {
        while (elem != null) {
            this.wait();
            if (elem != null) {
                this.notify();
            }
        }
        this.elem = newElem;
        this.notify();
    }

    public synchronized Integer get() throws InterruptedException {
        while (elem == null) {
            this.wait();
            if (elem == null) {
                this.notify();
            }
        }
        int result = this.elem;
        this.elem = null;
        this.notify();
        return result;
    }
}

   
    thread.sync_wait_notify.timed_buffer (обязательно)       

    Заметим, что если нет потребителей, но есть производители, то производители блокируются навечно (в реализации буфера SingleElementBuffer с лекции):
public class Test {
    public static void main(String[] args) {    
        SingleElementBuffer buffer = new SingleElementBuffer();
        new Thread(new Producer(1, 1000, buffer)).start();
    }
}
    Аналогично, если нет производителей, но есть потребители, то потребители блокируются навечно (в реализации буфера SingleElementBuffer с лекции):
public class Test {
    public static void main(String[] args) {    
        SingleElementBuffer buffer = new SingleElementBuffer();
        new Thread(new Consumer(buffer)).start();
    }
}

    Задание: Вам дана "заготовка" класса SingleElementBufferTimed, который должен в случае превышения времени ожидания в wait(long) выбросить исключение TimeoutException. В данный момент он ожидает корректно, но исключение не выбрасывает. Ваша задача дописать что-то в местах комментариев, что решает бросать исключение или нет и, в случае надобности, бросает:

import java.util.concurrent.TimeoutException;

public class SingleElementBufferTimed_question {
    private Integer elem = null;

    public synchronized void put(Integer newElem, long timeout) throws InterruptedException, TimeoutException {
        long waitTime = timeout;
        while (elem != null && waitTime > 0) {
            long t0 = System.currentTimeMillis();
            wait(waitTime);
            long t1 = System.currentTimeMillis();
            long elapsedTime = t1 - t0;
            waitTime -= elapsedTime;
        }
        // todo: insert throw new TimeoutException
        this.elem = newElem;
        this.notifyAll();
    }

    public synchronized Integer get(long timeout) throws InterruptedException, TimeoutException {
        long waitTime = timeout;
        while (elem == null && waitTime > 0) {
            long t0 = System.currentTimeMillis();
            wait(waitTime);
            long t1 = System.currentTimeMillis();
            long elapsedTime = t1 - t0;
            waitTime -= elapsedTime;
        }
        // todo: insert throw new TimeoutException
        int result = this.elem;
        this.elem = null;
        this.notifyAll();
        return result;
    }
}


    Это Производитель для такого буфера
public class ProducerTimed implements Runnable {
    private int startValue;
    private final int period;
    private final SingleElementBufferTimed buffer;
    private final long timeout;

    public ProducerTimed(int startValue, int period, SingleElementBufferTimed buffer, long timeout) {
        this.buffer = buffer;
        this.period = period;
        this.startValue = startValue;
        this.timeout = timeout;
    }

    @Override
    public void run() {
        while (true) {
            try {
                System.out.println(startValue + " produced");
                buffer.put(startValue++, timeout);
                Thread.sleep(period);
            } catch (InterruptedException e) {
                System.out.println(Thread.currentThread().getName() + " stopped.");
                return;
            } catch (TimeoutException e) {
                System.out.println(Thread.currentThread().getName() + " time out.");
                return;
            }
        }
    }
}

    Это Потребитель для такого буфера
public class ConsumerTimed implements Runnable {
    private final SingleElementBufferTimed buffer;
    private final long timeout;

    public ConsumerTimed(SingleElementBufferTimed buffer, long timeout) {
        this.buffer = buffer;
        this.timeout = timeout;
    }

    @Override
    public void run() {
        while (true) {
            try {
                int elem = buffer.get(timeout);
                System.out.println(elem + " consumed");
            } catch (InterruptedException e) {
                System.out.println(Thread.currentThread().getName() + " stopped.");
                return;
            } catch (TimeoutException e) {
                System.out.println(Thread.currentThread().getName() + " time out.");
                return;
            }
        }
    }
}



    С ним одинокий Производитель не ждет вечно
public class SingleElementBufferTimedTest {
    public static void main(String[] args) {
        SingleElementBufferTimed buffer = new SingleElementBufferTimed();
        new Thread(new ProducerTimed(1, 1000, buffer, 100)).start();
    }
}

>> 1 produced
>> 2 produced
... 3000 миллисекунд
>> Thread-0 time out.



    С ним одинокий Потребитель не ждет вечно
public class SingleConsumerTimedExample {
    public static void main(String[] args) {
        SingleElementBufferTimed buffer = new SingleElementBufferTimed();
        new Thread(new ConsumerTimed(buffer, 3000), "Consumer").start();
    }
}
... 3000 миллисекунд
>> Consumer time out.



    Нетерпеливый потребитель не дожидается медленного производителя и умирает, после - умирает одинокий производитель (спасибо Шекспиру за сюжет)
public class SlowProducerExample {
    public static void main(String[] args) {
        int producerSleepTime = 1500;
        int consumerWaitTime = 1000;

        SingleElementBufferTimed buffer = new SingleElementBufferTimed();
        new Thread(new ProducerTimed(1, producerSleepTime, buffer, 100), "Producer").start();
        new Thread(new ConsumerTimed(buffer, consumerWaitTime), "Consumer").start();
    }
}

>> 1 produced
>> 1 consumed
... 1000 миллисекунд
>> Consumer time out.
... 500 миллисекунд
>> 2 produced
... 1500 миллисекунд
>> 3 produced
... 100 миллисекунд
>> Producer time out.



    interrupt() / InterruptedException

    thread.interrupt.interruptable_buffer (обязательно)   

    Предлагается сделать блокирующий буфер на один элемент, но передачу сообщения делать не на основе wait()/notify(), а на основе wait()/interrupt(). В следующей ниже "заготовке" замените '// ?' на свои строки. 
    Основная идея заключается в том, что мы явно поддерживаем односвязные списки ожидающих производителей и потребителей. Если нет работы, то поток сам себя добавляет в соответствующий список. Если поток создал работу, то он удаляет поток из соответствующего списка и "будит" его. 

    "Заготовка" буфера:

public class InterruptBuffer_q {
    private ThreadNode producers = null;
    private ThreadNode consumers = null;
    private Integer elem = null;
    
    public synchronized void put(int newElem) {
        while (elem != null) {
            try {
                // ?
                this.wait();
            } catch (InterruptedException e) {/*NOP*/}
        }
        elem = newElem;
        if (consumers != null) {
            consumers.thread.interrupt();
            // ?
        }
    }

    public synchronized int get() {
        while (elem == null) {
            try {
                // ?
                this.wait();
            } catch (InterruptedException e) {/*NOP*/}
        }
        int result = elem;
        elem = null;
        if (producers != null) {
            producers.thread.interrupt();
            // ?
        }
        return result;
    }
}
    Звено односвязного списка



public class ThreadNode {
    public final Thread thread;
    public final ThreadNode nextNode;

    public ThreadNode(Thread thread, ThreadNode nextNode) {
        this.thread = thread;
        this.nextNode = nextNode;
    }
}
    Производитель





public class InterruptedProducer implements Runnable {
    private int startValue;
    private final InterruptBuffer buffer;

    public InterruptedProducer(int startValue, InterruptBuffer buffer) {
        this.buffer = buffer;
        this.startValue = startValue;
    }

    @Override
    public void run() {
        while (true) {
            try {
                System.out.println(startValue + " produced");
                buffer.put(startValue++);
                Thread.sleep((int) (1000 * Math.random()));
            } catch (InterruptedException e) {
                return;
            }
        }
    }
}
    Потребитель





public class InterruptedConsumer implements Runnable {
    private final InterruptBuffer buffer;

    public InterruptedConsumer(InterruptBuffer buffer) {
        this.buffer = buffer;
    }

    @Override
    public void run() {
        while (true) {
            int elem = buffer.get();
            System.out.println(elem + " consumed");
        }
    }
}
    Пример использования





public class ProducerConsumerExample_3x2 {
    public static void main(String[] args) {
        InterruptBuffer buffer = new InterruptBuffer();
        Thread[] producers = new Thread[]{
                new Thread(new InterruptedProducer(1, buffer)),
                new Thread(new InterruptedProducer(100, buffer)),
                new Thread(new InterruptedProducer(1000, buffer)),
        };
        for (Thread producer : producers) {
            producer.start();
        }
        Thread[] consumers = new Thread[]{
                new Thread(new InterruptedConsumer(buffer)),
                new Thread(new InterruptedConsumer(buffer)),
        };
        for (Thread consumer : consumers) {
            consumer.start();
        }
    }
}
>> 1 produced
>> 100 produced
>> 1000 produced
>> 1 consumed
>> 100 consumed
>> 1000 consumed
>> 1001 produced
>> 1001 consumed
>> 101 produced
>> 101 consumed
>> 2 produced
>> 2 consumed
... и так до бесконечности