close

Вход

Забыли?

вход по аккаунту

?

Разработка на JAVA. Лекция 11

код для вставкиСкачать
Thread.start()
2
О чем лекция?
●
Что?
●
Зачем?
●
Как?
●
Почему не работает?
3
Зачем?
●
утилизация CPU (одного или нескольких)
●
упрощение моделирования (многозадачность)
●
асинхронная обработка событий
●
производительность
4
Процессы и потоки
●
Процессы обладают собственным адресным
простанством
●
Потоки работают с общей, разделяемой
памятью
●
Планировщик ОС работает с потоками
●
POSIX fork()/pthread_create()
●
WIN CreateProcess()/CreateThread()
5
6
Закон Амдала
7
Что
class MyThread extends Thread {
@Override
public void run() {...}
}
Thread t = new MyThread();
// Запуск кода в отдельном потоке start();
// выполнится код, описанный в run();
t.start();
8
Что
class Task implements Runnable {
@Override
public void run() {...}
}
Thread t = new Thread(new Task());
t.start();
9
Что
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {...}
}, 60 * 1000);
10
BLOCKED
lock on monitor
interrupt()
start()
NEW
TERMINATED
RUNNABLE
notify()
wait()
WAITING
11
Управление потоком
● start()
● static sleep() (TimeUnit.SECONDS.
sleep(10))
● yield() - отдать управление
● setDaemon()
12
Thread join()
void main() {
Thread t = new Thread();
t.start();
// main блокируется
t.join();
}
13
Постой, паровоз!
А как остановить поток?
14
Постой, паровоз!
https://docs.oracle.com/javase/8/docs/technotes/guides/concurrency/threadPrimitiveDeprecation.html
class MyThread extends Thread {
private volatile boolean pleaseStop;
@Override void run() {
while (!pleaseStop) {
// do what you want to do!
}
}}
MyThread t = new MyThread;
// Подаем сигнал, просим остановиться
t.setStop(true);
15
Постой, паровоз - 2
●
●
есть блокирующие операции
○
sleep()
○
join()
○
wait()
операции ввода-вывода
○
●
read(byte[])
до проверки флага не доходит
16
Постой, паровоз - 2
@Override void run() {
try {
while (!Thread.currentThread.isInterrupted()) {
TimeUnit.SECONDS.sleep(10); // throws IntExcp
}
} catch (InterruptedException e) {
// необходимо проставить флаг прерывания
Thread.currentThread.interrupt();
}
}
MyThread t = new MyThread;
t.interrupt();
17
interrupt()
●
interrupt()
○
○
выставляет флаг потока
Если поток был в состоянии BLOCKED,WAITING, то
выбросится InterruptedException
●
isInterrupted() - проверяет состояние флага
●
static interrupted() - проверяет состояние
флага у текущего потока и сбрасывает его
если не проверять состояние внутри потока, то он
не остановится
●
18
Пул потоков
ExecutorService pool = Executors.newFixedThreadPool(10);
service.submit(new Runnable(...));
Пул потоков - автоматически создает и запускает потоки
●
●
Ограниченное число потоков (fixed)
Создается сколько нужно (cached)
19
Всё здорово?
class Counter {
long counter = 0;
long inc() { return counter++; }
long get() { return counter; }
}
Client1
Client2
long id = counter.inc()
long id = counter.inc();
20
Всё здорово?
class Counter {
long counter = 0;
long inc() { return counter++; }
long get() { return counter; }
}
Client1
Client2
long id = counter.inc()
long id = counter.inc();
Race Condition!
21
Алгоритм страуса
22
Заново учимся считать
T1
T2
read = 9
read = 9
9+1
9+1
write=10
write=10
Инкремент не так-то прост!
movl
addl
movl
{memory}, %eax
$1,%eax
%eax, {memory}
Read-Modify-Write (RMW) operation - не атомарны!
23
Атомарность
Операции, выполняющиеся как единое целое, либо
не выполняющиеся вовсе
● чтение и запись примитивов (кроме long, double)
● чтение и запись ссылок
● чтение и запись volatile примитивов
● специальные атомарные операции
24
CompareAndSwap
// (x86 - CMPXCHG)
int cas(int *val, int old, int new) {
ATOMIC_LOCK();
int old_val = *val;
if (old_val == old) {
*val = new;
}
ATOMIC_UNLOCK();
return old_val;
}
25
AtomicLong.getAndIncrement()
private Long currentValue;
public Long getAndIncrement() {
do {
Long val = currentValue;
} while (!CAS(currentValue, val, val + 1));
return val;
}
26
AtomicCounter
class AtomicCounter {
private AtomicLong counter;
public AtomicLong inc() {
return counter.getAndIncrement();
}
public Long get() {
return counter.get();
}
}
27
Критическая секция
Участок исполняемого кода программы, в котором производится
доступ к общему ресурсу (данным или устройству), который не
должен быть одновременно использован более чем одним
потоком исполнения
enter
read=9
blocked
exit
9+1
write=10
read=10
10+1
write=11
28
lock()
lock - вход в критическую секцию
unlock - выход из критической секции
Lock lock = new ReentrantLock();
lock.lock();
try {
…
} finally {
lock.unlock();
}
29
synchronized
synchronized метод
блокируется на мониторе
текущего объекта (this)
synchronized void do() {
...
}
// or
synchonized блок - можно
указать монитор
void do() {
synchronized(obj) {
...
static synch. - блокируется
на объекте .class
}
}
30
Критическая секция
CS
31
Критическая секция
CS
32
Блокировки и честность
Монитор - примитив синхронизации, позволяющий потокам
эксклюзивно (mutual exclusion) владеть ресурсом, также обладает
набором переменных для ожидания выполнения заданного
условия.
● потоки борются за владение монитором
● fair lock - захватывает, который ждет дольше
всех (ReentrantLock(boolean fair))
● lock contention - из-за нагрузки на критическую
секцию, потоки простаивают
33
Проблемы
●
Race Condition (гонка)
○
●
Starvation
○
●
ограничиваем доступ к общему ресурсу на
запись
честные блокировки new ReentrantLock(true)
Deadlock
34
Deadlock
Thread 1
Thread 2
A
B
1.
2.
3.
4.
5.
T1 acquire A
T2 acquire B
T1 waiting for B
T2 waiting for A
deadlock :(
35
Deadlock
Thread 1
Thread 2
A
B
1.
2.
3.
4.
5.
T1 acquire A
T2 acquire B
T1 waiting for B
T2 waiting for A
deadlock :(
● Упорядочить взятие блокировок
● Использовать tryLock()
● Использовать таймаут
36
Проблемы
●
Race Condition (гонка)
○
●
Starvation
○
●
ограничиваем доступ к общему ресурсу на
запись
честные блокировки new ReentrantLock(true)
Deadlock
○
○
○
упорядочить взятие блокировок
использовать tryLock()
использовать таймаут
37
Read/Write Lock
Более дешевая блокировка, если много читателей
и мало писателей.
Read доступ - если никто не пишет
Write доступ - если никто не пишет и не читает
38
Задача
Семафор - примитив синхронизации, который
ограничивает количество потоков, которые могут
обратиться к блоку кода (КС)
Semaphore sem = new Semaphore(permit, fair);
class RWLock {
Semaphore sem;
// Как реализовать через семафор?
public void lockRead() {...}
public voir lockWrite() {...}
}
39
40
Критическая секция
Участок исполняемого кода программы, в котором производится
доступ к общему ресурсу (данным или устройству), который не
должен быть одновременно использован более чем одним
потоком исполнения
41
Блокировки и честность
Монитор - примитив синхронизации, позволяющий потокам
эксклюзивно (mutual exclusion) владеть ресурсом, также обладает
набором переменных для ожидания выполнения заданного
условия.
● потоки борются за владение монитором
● fair lock - захватывает, который ждет дольше
всех (ReentrantLock(boolean fair))
● lock contention - из-за нагрузки на критическую
секцию, потоки простаивают
42
Задача
Два потока имеют доступ к общему ресурсу S.
Поток А должен ждать, когда поток Б изменить
состояние S и только потом продолжать работу.
43
Задача
Два потока имеют доступ к общему ресурсу S.
Поток А должен ждать, когда поток Б изменить
состояние S и только потом продолжать работу.
S - shared resource, С - условие
Thread A
Thread B
while (!c.isReady()) {}
sync {
if (c.isReady()) {
// process S
}
}
sync {
// process S
c.setReady()
}
44
Object.wait()/Object.notify()
Check/change invariant
S - shared resource, C - condition
Thread A
Thread B
1)
Acquire CS
1 sync (s) {
2 while(!C.isReady())
3
s.wait();
4 // process s
5 }
1 sync (s) {
2
// do smth
3
C.setReady();
4
s.notifyAll()
5 }
2) Release CS
Поток А отпустит монитор и перейдет в состояние
WAIT. Поток будет сохранен в wait set монитора.
Поток Б изменил состояние S, и
переводит потоки, ожидающие в
wait set в состояние RUNNABLE.
45
Схема условного ожидания
46
Замечания
●
●
●
вызывать wait/notify только внутри КС
вызывать wait/notify только на мониторе
объекта захваченной КС
инвариант нужно проверять в цикле while()
while (!isReady) {
wait();
}
●
не ждать на глобальных объектах
47
Задача
Есть 2 типа потоков - первый подготавливает
данные, второй занимается их обработкой.
Требуется передавать данные между потоками.
48
producer-consumer
B
A
49
producer-consumer
put(C)
C
B
A
50
producer-consumer
A = take()
C
B
51
producer-consumer
producer
consumer
52
blocking queue
BlockingQueue<T> {
boolean offer(T t); // не ждет
void put(T t); // ждет
T take(); // ждет
T poll(timeout); // ждет с таймаутом
}
53
thread pool
FIXED THREAD POOL(2);
Worker#1
Worker#2
54
thread pool
FIXED THREAD POOL(2);
submit();
RUNNABLE
RUNNABLE
RUNNABLE
submit(Runnable r) {
queue.put(r);
Worker#1
Worker#2
}
55
thread pool
FIXED THREAD POOL(2);
RUNNABLE
submit();
// in every Worker thread
while (isRunning) {
Runnable r = queue.take();
r.run();
Worker#1
Worker#2
}
56
thread pool
FIXED THREAD POOL(2);
submit();
RUNNABLE
Worker#1
Worker#2
57
thread pool
FIXED THREAD POOL(2);
Worker#1
Worker#2
58
callable / future
interface Callable<T> {
T call(); // Возвращает результат
}
// Зарегистрировали такс на выполнение
Future<T> future = pool.submit(() -> {
// do hard work
return 100;
};
// Дождались результат
T result = future.get();
59
60
Java Memory Model и все-все-все
61
Java Memory Model и все-все-все
Java Memory Model (JMM)
описывает, когда запись сделанная одним
потоком будет видна другому потоку
62
Спасибо за
внимание!
Автор
tekhnostrim
Документ
Категория
Без категории
Просмотров
228
Размер файла
1 010 Кб
Теги
лекция
1/--страниц
Пожаловаться на содержимое документа