Каждый поток ассоциирован с классом java.lang.Thread. Есть два основных способа использования объектов Thread в многопоточном программировании:
- Прямое создание и управление потоками с помощью создания экземпляров класса Thread.
- Абстрагирование от управления потоками и передача задач в executor.
java.lang.Thread
Объявление и запуск потока
Приложение, создающее экземпляр Thread, должно предоставить код, который будет выполняться в отдельном потоке. Есть два способа сделать это:
- Предоставить экземпляр класса, реализующего интерфейсjava.lang.Runnable. Этот класс имеет один метод run(), который должен содержать код, который будет выполняться в отдельном потоке. Экземпляр класса java.lang.Runnable передаётся в конструктор классаThread вот так:
1
2
3
4
5
6
7
8
9
10
11
|
public class HelloRunnable implements Runnable {
public void run() {
System.out.println(«Hello from a thread!»);
}
public static void main(String args[]) {
(new Thread(new HelloRunnable())).start();
}
}
|
- Написать подкласс класса Thread. Класс Thread сам реализует интерфейс java.lang.Runnable, но его метод run() ничего не делает. Приложение может унаследовать класс от Thread и переопределить метод run():
1
2
3
4
5
6
7
8
9
10
11
|
public class HelloThread extends Thread {
public void run() {
System.out.println(«Hello from a thread!»);
}
public static void main(String args[]) {
(new HelloThread()).start();
}
}
|
Обратите внимание, что оба примера вызывают методThread.start() для запуска нового потока. Именно он запускает отдельный поток. Если просто вызывать метод run(), то код будет выполняться в том же потоке, отдельный поток создаваться не будет.
Какой способ использовать? Первый способ, где предоставляется экземпляр класса, реализующего Runnable, более общий, так как в этом случае класс может наследоваться от отличного от Thread класса. Второй способ проще использовать в простых приложениях, но он ограничен тем, что ваш класс будет наследником Thread.
Приостанавливаем исполнение с помощью метода sleep
Метод sleep класса Thread останавливает выполнение текущего потока на указанное время. Он используется, когда нужно освободить процессор, чтобы он занялся другими потоками или процессами, либо для задания интервала между какими-нибудь действиями.
Есть два варианта метода sleep: первый принимает в качестве параметра количество миллисекунд, на которое нужно остановить текущий поток, второй дополнительно принимает второй параметр, в котором указывается количество наносекунд, на которые нужно дополнительно остановить поток.
Время остановки потока не точно, оно зависит от возможностей системы. К тому же состояние ожидания для потока может быть прервано извне.
Пример:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
public class SleepMessages {
public static void main(String args[])
throws InterruptedException {
String importantInfo[] = {
«Mares eat oats»,
«Does eat oats»,
«Little lambs eat ivy»,
«A kid will eat ivy too»
};
for (int i = 0;
i < importantInfo.length;
i++) {
// Ждём 4 секунды
Thread.sleep(4000);
// Выводим сообщение
System.out.println(importantInfo[i]);
}
}
}
|
Обратите внимание, что метод main объявляет, что он throwsInterruptedException . Это исключение бросается методом sleep, если поток прерывается во время ожидания внутри sleep. Так как эта программа не объявила никаких других потоков, которые могут прерывать текущий, то ей вовсе не обязательно обрабатывать это исключение.
Прерывание потока
Прерывание (interrupt) — это сигнал для потока, что он должен прекратить делать то, что он делает сейчас, и делать что-то другое. Что должен делать поток в ответ на прерывание, решает программист, но обычно поток завершается.
Поток отправляет прерывание вызывая метод public void interrupt() класса Thread. Для того чтобы механизм прерывания работал корректно, прерываемый поток должен поддерживать возможность прерывания своей работы.
Как поток должен поддерживать прерывание своей работы? Это зависит от того, что он сейчас делает. Если поток часто вызывает методы, которые могут бросить InterruptedException, то он просто вызывает return при перехвате подобного исключения. Пример:
1
2
3
4
5
6
7
8
9
10
11
|
for (int i = 0; i < importantInfo.length; i++) {
// Пауза 4 секунды
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
// Ожидание было прервано! Больше не нужно сообщений.
return;
}
// Пишем сообщение
System.out.println(importantInfo[i]);
}
|
Многие методы, которые бросают InterruptedException, например методы sleep, останавливают своё выполнение и возвращают управление в вызвавший их код при получении прерывания (interrupt).
Что если поток выполняется длительное время без вызова методов, которые бросают исключение InterruptedException? Тогда он может периодически вызывать метод Thread.interrupted(), который возвращает true, если получен сигнал о прерывании. Например:
1
2
3
4
5
6
7
|
for (int i = 0; i < inputs.length; i++) {
heavyCrunch(inputs[i]);
if (Thread.interrupted()) {
// Мы были прерваны: no more crunching.
return;
}
}
|
В этом примере код просто проверяет на наличие сигнала о прерывании, и выходит из потока, если сигнал есть. В более сложных приложениях имеет смысл бросить исключениеInterruptedException:
1
2
3
|
if (Thread.interrupted()) {
throw new InterruptedException();
}
|
Это позволяет располагать код обработки прерывания потока в одной клаузе catch.
Механизм прерывания реализован с помощью внутреннего флага, известного как статус прерывания (interrupt status). ВызовThread.interrupt() устанавливает этот флаг. Когда поток проверяет наличие прерывания вызовов Thread.interrupted(), то флаг статуса прерывания сбрасывается. Нестатический метод isInterrupted(), который используется одним потоком для проверки статуса прерывания другого потока, не меняет флаг статуса прерывания.
По соглашению любой метод, который прерывает свою выполнение бросая исключение InterruptedException, очищает флаг статуса прерывания, когда он бросает это исключение. Однако есть вероятность, что флаг статуса прерывания будет сразу же установлен ещё раз, если другой поток вызовет interrupt().
Соединение
Метод join позволяет одному потоку ждать завершения другого потока. Если t является экземпляром класса Thread, чей поток в данный момент продолжает выполняться, то
t.join();
приведёт к приостановке выполнения текущего потока до тех пор, пока поток t не завершит свою работу. Метод join() имеет варианты с параметрами:
1
2
|
public final void join(long millis)
throws InterruptedException
|
1
2
3
|
public final void join(long millis,
int nanos)
throws InterruptedException
|
Они позволяют задать время в миллисекундах и дополнительно количество наносекунд, в течение которых ждать завершения выполнения потока. Однако, как и с методами sleep, методы join зависят от возможностей операционной системы, поэтому вы не должны полагаться на то, что join будет ждать точно указанное время.
Как и методы sleep, методы join отвечают на сигнал прерывания, останавливая процесс ожидания и бросая исключениеInterruptedException.
Простой пример
Пример состоит из двух потоков. Первый поток является главным потоком приложения, который имеет каждая программа на Java. Главный поток создаёт новый поток и ждёт его завершения. Если второй поток выполняется слишком долго, то главный поток прерывает его.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
|
public class SimpleThreads {
// Выводим сообщение
// с именем текущего потока в начале.
static void threadMessage(String message) {
String threadName =
Thread.currentThread().getName();
System.out.format(«%s: %s%n»,
threadName,
message);
}
private static class MessageLoop
implements Runnable {
public void run() {
String importantInfo[] = {
«Mares eat oats»,
«Does eat oats»,
«Little lambs eat ivy»,
«A kid will eat ivy too»
};
try {
for (int i = 0;
i < importantInfo.length;
i++) {
// Ждём 4 секунды
Thread.sleep(4000);
// Пишем сообщение
threadMessage(importantInfo[i]);
}
} catch (InterruptedException e) {
threadMessage(«I wasn’t done!»);
}
}
}
public static void main(String args[])
throws InterruptedException {
// Задержка в миллисекундах
// перед тем как мы прерываем MessageLoop
// (по умолчанию один час).
long patience = 1000 * 60 * 60;
// Если есть аргумент командной строки,
// то он указывает ожидание в секундах.
if (args.length > 0) {
try {
patience = Long.parseLong(args[0]) * 1000;
} catch (NumberFormatException e) {
System.err.println(«Argument must be an integer.»);
System.exit(1);
}
}
threadMessage(«Starting MessageLoop thread»);
long startTime = System.currentTimeMillis();
Thread t = new Thread(new MessageLoop());
t.start();
threadMessage(«Waiting for MessageLoop thread to finish»);
// ждём пока MessageLoop
// существует
while (t.isAlive()) {
threadMessage(«Still waiting…»);
// Ждём максимум 1 секунду
// завершения потока MessageLoop
t.join(1000);
if (((System.currentTimeMillis() — startTime) > patience)
&& t.isAlive()) {
threadMessage(«Tired of waiting!»);
t.interrupt();
// Должно быть недолго теперь.
// — Ждём до конца
t.join();
}
}
threadMessage(«Finally!»);
}
}
|
Синхронизация
Потоки общаются в основном разделяя свои поля и поля объектов между собой. Эта форма общения очень эффективна, но делает возможным два типа ошибок: вмешательство в поток (thread interference) и ошибки консистентности памяти (memory consistency errors). Для того чтобы предотвратить эти ошибки, нужно использовать синхронизацию потоков.
Однако синхронизация может привести к конкуренции потоков(thread contention), которая возникает, когда два или более потока пытаются получить доступ к одному и тому же ресурсу одновременно, что приводит к тому, что среда выполнения Java выполняет один или более этих потоков более медленно или даже приостанавливает их выполнение. Голодание (starvation) и активная блокировка (livelock) — это формы конкуренции потоков. Смотрите пункт «Живучесть (Liveness)».
Вмешательство в поток (thread interference)
Рассмотрим простой класс Counter:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
class Counter {
private int c = 0;
public void increment() {
c++;
}
public void decrement() {
c—;
}
public int value() {
return c;
}
}
|
Counter спроектирован так, что каждый вызов метода increment добавляет 1 к c, а каждый вызов decrement вычитает 1 из c. Однако если объект Counter используется несколькими потоками, то вмешательство в поток может помешать этому коду работать как ожидалось.
Вмешательство в поток происходит, когда два действия выполняются разными потоками, но используют одни и те же данные. Это означает, что два действия, которые содержат несколько шагов, и последовательность шагов частично перекрывается.
Может показаться, что операции над экземплярами Counter не могут перекрываться, так как все операции над c являются одиночными простыми инструкциями. Однако даже простые инструкции могут транслироваться виртуальной машиной в несколько шагов. Выражение c++ может быть разложено на три шага:
- Получить текущее значение c.
- Увеличить полученное значение на 1.
- Сохранить увеличенное значение в c.
Предположим, что поток A вызывает increment, и в то же самое время поток B вызывает decrement. Начальное значение c равно 0, их пересечённые действия могут породить следующую последовательность шагов:
- Поток A получает c.
- Поток B получает c.
- Поток A увеличивает полученное значение, в результате получает 1.
- Поток B уменьшает полученное значение, в результате получает -1.
- Поток A сохраняет результат 1 в c.
- Поток B сохраняет результат -1 в c.
Результат потока A потерян, он был перезаписан потоком B. Такое частичное перекрытие действий — это только одна из возможностей. В некоторых других ситуациях может оказаться, что результат потока B будет потерян, либо ошибок не будет совсем. Из-за этого ошибки вмешательства в поток трудно обнаруживать и исправлять.
Ошибки консистентности памяти (memory consistency errors)
Ошибки консистентности памяти (memory consistency errors) возникают, когда разные потоки имеют несовместимое представление о том, что должно быть общими данными. Причины ошибок консистентности памяти сложны и выходят за рамки этой статьи, но вам достаточно будет знать стратегию избегания подобных ошибок.
Ключ к исключению ошибок консистентности памяти — это пониманию связи происходит-до (happens-before). Эта связь гарантирует, что данные, записанные в память одной инструкцией, видимы в другой. Рассмотрим следующий пример. Предположим, что поле типа int объявлено и инициализировано:
1
|
int counter = 0;
|
Поле counter используется совместно двумя потоками A и B. Предположим, что поток A увеличивает counter:
1
|
counter++;
|
сразу же после этого поток B выводит в консоль значение counter:
1
|
System.out.println(counter);
|
Если бы обе инструкции были выполнены одним потоком, то можно было бы смело предположить, что в консоль выведется число 1. Но если две инструкции выполняются разными потоками, то может быть выведено 0, так как нет гарантии, что изменениеcounter потоком A будет видимо потоком B, до тех пор пока программист не обеспечит связь происходит-до (happens-before) между этими инструкциями.
Есть разные способы создания связи происходит-до (happens-before). Один из них — это синхронизация, она будет расписана в следующих пунктах.
Мы уже видели два действия, которые порождают связь происходит-до (happens-before):
- Когда инструкция вызывает Thread.start, каждая инструкция, которая имеет связь происходит-до (happens-before) с этой инструкцией, также имеет связь происходит-до (happens-before) с каждой инструкцией, выполняемой новым потоком. Все последствия действий кода, который был выполнен до создания нового потока, видимы новым потоком.
- Когда поток завершается и приводит Thread.join другого потока к возврату выполнения, то все инструкции, которые были выполнены завершённым потоком, имеют связь происходит-до (happens-before) со всеми инструкциями, которые следуют за успешным соединением потока. Все последствия действий кода в потоке теперь видимы потоком, который осуществил соединение.
Синхронизированные (synchronized) методы
Язык программирования Java предоставляет два базовых способа синхронизации: синхронизированные методы (synchronized methods) и синхронизированные инструкции (synchronized statements). Есть другие, более сложные, способы синхронизации, они будут рассмотрены в дальнейшем.
Чтобы сделать метод синхронизированным (synchronized), просто добавьте ключевое слово synchronized к его объявлению:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
public class SynchronizedCounter {
private int c = 0;
public synchronized void increment() {
c++;
}
public synchronized void decrement() {
c—;
}
public synchronized int value() {
return c;
}
}
|
Если count является экземпляром класса SynchronizedCounter, то синхронизированные методы имеют два эффекта:
- Во-первых, два вызова синхронизированных метода на одном и том же объекте не могут пересекаться. Когда один поток выполняет синхронизированный метод объекта, то другие потоки, которые вызывают синхронизированные методы того же самого объекта, блокируются (приостанавливают своё выполнение) до тех пор, пока первый поток не завершит работу с объектом.
- Во-вторых, когда синхронизированный метод завершает своё выполнение, то он автоматически делает связь происходит-до (happens-before) со всеми последующими вызовами синхронизированных методов того же самого объекта. Это гарантирует, что изменения состояния объекта будут видимы для других потоков.
Заметьте, что конструкторы не могут быть синхронизированными. Использование ключевого слова synchronized для конструктора приведёт к ошибке компиляции. Синхронизированные конструкторы не имеют смысла, так как только один поток, который создаёт объект, должен иметь доступ к нему во время создания.
Предупреждение: Когда создаёте объект, который будет совместно использоваться разными потоками, то будьте очень осторожны, чтобы ссылка на объект не «утекла» раньше времени. Например, предположим, что вы хотите сделать список List, который содержит экземпляры каждого классы. Вы можете захотеть добавить следующую строку в ваш конструктор:
1
|
instances.add(this);
|
Но тогда другие потоки смогут использовать instances для получения доступа к объекту до того, как его создание будет завершено.
Синхронизированные методы — простая стратегия для предотвращения вмешательства в поток (thread interference) и ошибок консистентности памяти (memory consistency errors): Если объект видим более чем одному потоку, то все чтения и записи полей объекта должны происходить через синхронизированные методы. (Важное исключение: поля с модификатором final, которые не могут быть изменены после создания экземпляра объекта, могут безопасно читаться из несинхронизированных методов после создания конструктора) Эта стратегия эффективна, но может содержать проблемы с живучестью (liveness).
Внутренние блокировки и синхронизация
Синхронизация построена вокруг внутренней сущности, известной как внутренняя блокировка или блокировка монитора. Внутренняя блокировка играет роль в обоих аспектах синхронизации: обеспечивает эксклюзивный доступ к внутреннему состоянию объекта и обеспечивает связь происходит-до (happens-before).
Каждый объект имеет внутреннюю блокировку (монитор), который с ним связан. По соглашению поток, которому требуется эксклюзивный и согласованный доступ к полям объекта, должен получить внутреннюю блокировку объекта перед доступом к ним и освободить внутреннюю блокировку объекта после совершения необходимых действий с ними. Поток владеет блокировкой объекта между временем получения и временем освобождения блокировки. Пока поток держит внутреннюю блокировку (внутренний монитор) никакой другой объект не может получить ту же самую блокировку. Другой поток будет блокирован (приостановлен) при попытке получить эту блокировку.
Когда поток вызывает синхронизированный метод, то он автоматически получает внутреннюю блокировку этого объекта и освобождает её по завершении метода. Освобождение блокировки происходит даже при возникновении неперехваченного исключения.
Если вызывается статический синхронизированный метод, то поток получает внутреннюю блокировку объекта Class, связанного с этим классом. Таким образом доступ к статическим полям контролируется другой блокировкой, отличной от блокировки любого из экземпляров класса.
В отличие от синхронизированных методов синхронизированные инструкции должны указать объект, который предоставляет внутреннюю блокировку:
1
2
3
4
5
6
7
|
public void addName(String name) {
synchronized(this) {
lastName = name;
nameCount++;
}
nameList.add(name);
}
|
В этом примере метод addName должен синхронизировать изменениеlastName и nameCount, но он также должен избежать синхронизированных вызовов методов других объектов. (Вызов методов других объектов из синхронизированного кода может привести к проблемам, описанным в пункте «Живучесть (Liveness)» Без синхронизированных инструкций это мог бы быть отдельный несинхронизируемый метод для единственного вызоваnameList.add.
Синхронизированные инструкции также полезны для улучшения многопоточности с небольшими блокировками. Например, классMsLunch имеет два поля экземпляров, c1 и c2, которые никогда не используются вместе. Все изменения этих полей должны быть синхронизированы, но нет никакого смысла запрещать изменениеc1 при изменении только c2. Вместо использования синхронизированных методов или использования блокировкиthis мы создадим два объекта, которые будут предоставлять блокировки:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
public class MsLunch {
private long c1 = 0;
private long c2 = 0;
private Object lock1 = new Object();
private Object lock2 = new Object();
public void inc1() {
synchronized(lock1) {
c1++;
}
}
public void inc2() {
synchronized(lock2) {
c2++;
}
}
}
|
Используйте этот способ с осторожностью. Вы должны быть абсолютно уверены, что безопасно делать одновременный доступ к этим полям для разных потоков.
Поток не может получить блокировку, которой владеет другой поток. Но поток МОЖЕТ получить блокировку, которой он уже владеет. Возможность потоков получать одну и ту же блокировку несколько раз называется повторная синхронизация (reentrant synchronization). Это может быть, например, ситуация, когда синхронизированный код напрямую или ненапрямую вызывает метод, который тоже содержит синхронизированный код, и оба кода используют ту же самую блокировку. Без reentrant synchronization синхронизированному коду пришлось бы использовать много предосторожностей, чтобы исключить блокировку потоком самого себя.
Атомарный доступ
В программировании атомарное действие — это действие которое происходит полностью и сразу. Атомарное действие не может остановиться посередине: оно либо завершается полностью, либо не происходит совсем. Никаких эффектов от атомарного действия не видно снаружи до тех пор, пока действие не завершится.
Мы уже видели, что операция инкремента не является атомарным действием. Даже самые простые выражения могут содержать в себе составные действия, которые могут быть разложены на другие действия. Однако следующие действия атомарны:
- Чтения и запись атомарны для ссылочных переменных и большинства примитивных типов (все типы кроме long и double )
- Чтение и запись атомарны для всех переменных, объявленных какvolatile (включая long и double ).
Атомарные действия не могут пересекаться, и они могут использоваться без опасений о вмешательстве в поток. Однако это не это не устраняет все потребности синхронизации атомарных действий, так как ошибки консистенции памяти всё ещё возможны. Использование volatile-переменных уменьшает риск ошибок консистенции памяти, потому что любая запись в volatile-переменную делает связь происходит-до (happens-before) для последующих чтений из этой переменной. Это означает, что изменения volatile-переменных всегда видны для других потоков. Это также означает, что когда поток читает volatile-переменную, он видит не только последнее изменение, но и все побочные эффекты кода, которые приводят к этому изменению.
Использование простого атомарного доступа к переменным более эффективно, чем доступ к этим переменным из синхронизированного кода, но он требует большей внимательности от программиста, чтобы исключить ошибки консистентности памяти.
Живучесть (Liveness)
Взаимная блокировка (Deadlock)
Взаимная блокировка (deadlock) описывает ситуацию, когда два или более потока блокируются навсегда, каждый ожидая другого. Вот пример.
Алиса и Боб — друзья и большие приверженцы вежливости. Строгое правило вежливости: когда вы кланяетесь другу вы должны оставаться в поклоне до тех пор, пока ваш друг тоже не поклонится вам. Однако это правило не учитывает возможность, когда оба друга кланяются одновременно:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
public class Deadlock {
static class Friend {
private final String name;
public Friend(String name) {
this.name = name;
}
public String getName() {
return this.name;
}
public synchronized void bow(Friend bower) {
System.out.format(«%s: %s»
+ » has bowed to me!%n»,
this.name, bower.getName());
bower.bowBack(this);
}
public synchronized void bowBack(Friend bower) {
System.out.format(«%s: %s»
+ » has bowed back to me!%n»,
this.name, bower.getName());
}
}
public static void main(String[] args) {
final Friend alice =
new Friend(«Alice»);
final Friend bob =
new Friend(«Bob»);
new Thread(new Runnable() {
public void run() { alice.bow(bob); }
}).start();
new Thread(new Runnable() {
public void run() { bob.bow(alice); }
}).start();
}
}
|
Когда этот пример запустится, то наиболее вероятно, что каждый из потоков будет заблокирован во время попытки вызова bowBack. Ни одна из блокировок никогда не закончится, потому что каждый поток ожидает выхода другого потока из метода bow.
Голодание (starvation)
Голодание (starvation) описывает ситуацию, когда поток не может получить доступ к совместно используемым ресурсам и не может продвинуться в своём выполнении дальше. Это возникает, когда совместно используемый ресурс делается недоступным на долгое время «жадными» потоками. Например, предположим, что объект предоставляет синхронизированный метод, который обычно выполняется достаточно долго. Если один поток вызывает этот метод часто, то другие потоки, которым тоже нужен частый синхронизированный доступ к тому же самому объекту, будут часто блокироваться.
Активная блокировка (livelock)
Поток часто реагирует на события из другого потока. Если действие другого потока тоже является ответом на событие из другого потока, то может произойти активная блокировка (livelock). Как и взаимная блокировка (deadlock), активно заблокированные потоки не могут продвинуться дальше в своём выполнении. Однако эти потоки не заблокированы — они просто слишком заняты, отвечая друг другу, чтобы вернуться к работе. Это можно сравнить с двумя людьми, которые пытаются пройти через друг друга в коридоре: Алиса двигается влево, чтобы Боб мог пройти, в это же время Боб двигается вправо, чтобы Алиса могла пройти. Видя, что они всё ещё блокируют друг друга, Боб двигается влево, а Алиса вправо, но они всё ещё блокируют друг друга.
Защищённые блокировки (guarded blocks)
Потокам зачастую приходится согласовывать свои действия. Наиболее часто используемый способ согласования — защищённые блокировки (guarded blocks). Такой блок начинается с выбора условия, которое должно быть true, перед тем как может осуществиться блокировка. Есть несколько шагов, которые нужно выполнить, чтобы осуществить блокировку правильно.
Предположим, что guardedJoy — это метод, который не должен выполняться до тех пор, пока разделяемая между потоками переменная joy не будет установлена другим потоком. Такой метод теоретически должен просто выполнять цикл, пока условие не выполниться, но это было бы расточительно, так как это выполняется в течение всего времени ожидания.
1
2
3
4
5
6
|
public void guardedJoy() {
// Простой цикл. Тратит процессорное время
// не делайте так!
while(!joy) {}
System.out.println(«Joy has been achieved!»);
}
|
Наиболее эффективно использовать Object.wait(), чтобы приостановить работу текущего потока. Вызов метода wait не возвращает управление до тех пор, пока другой поток не обработает уведомление о том, что произошло некоторое специальное событие, однако не имеет значения, какое событие ожидает поток:
1
2
3
4
5
6
7
8
9
10
11
|
public synchronized void guardedJoy() {
// Этот цикл выполняется только один раз
// для каждого специального события,
// которое может быть событием, которое мы ожидаем.
while(!joy) {
try {
wait();
} catch (InterruptedException e) {}
}
System.out.println(«Joy and efficiency have been achieved!»);
}
|
Замечание: всегда вызывайте wait внутри цикла, который проверяет условие, которое ожидается. Не предполагайте, что прерывание было вызвано конкретным условием, которое нам нужно, или что это условие до сих пор выполняется.
Как и многие другие методы, которые приостанавливают выполнение, wait может бросить InterruptedException. В этом примере мы просто игнорируем это исключение, мы беспокоимся только о значении переменной joy.
Почему эта версия guardedJoy синхронизирована ( synchronized )? Предположим, что d — это объект, который мы используем для вызова wait. Когда поток вызывает d.wait, то он должен обладать внутренней блокировкой для d, иначе бросится исключение. Вызов wait внутри синхронизированного метода — это наиболее простой способ получить внутреннюю блокировку.
При вызове метода wait поток освобождает блокировку и приостанавливает выполнение. Затем, спустя время, другой поток получает ту же самую блокировку и вызывает Object.notifyAll, сообщая всем ожидающим потокам, что произошло что-то существенное:
1
2
3
4
|
public synchronized notifyJoy() {
joy = true;
notifyAll();
}
|
Спустя какое-то время второй поток освобождает блокировку, первый поток снова получает блокировку и возвращается из вызова wait.
Замечание: Есть второй метод уведомления — notify, который пробуждает только один поток, но так как он не позволяет указать, какой поток пробудить, то он полезен только в программах, которые используют большое количество потоков, и каждый поток выполняет похожую работу. В таком приложении вам не нужно беспокоиться о том, какой поток будет пробуждён.
Давайте сделаем приложение поставщик-потребитель, используя защищённые блокировки. Этот тип приложений разделяет данные между двумя потоками: поставщик, который создаёт данные, и потребитель, который делает что-либо с ними. Два потока общаются с помощью общего объекта. Согласование их действий очень важно: поток потребителя не должен пытаться получать данные до того, как поставщик доставит их, и поток поставщика не должен пытаться доставить новые данные до того, пока потребитель не получил старые данные.
В этом примере данными являются последовательность текстовых сообщений, которые разделяются с помощью объекта класса Drop:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
public class Drop {
// Сообщение, отправленное от поставщика потребителю.
private String message;
// True, если потребитель должен ждать поставщика,
// пока тот не отправит сообщение.
// false если поставщик должен ждать, пока потребитель
// не получит сообщение
private boolean empty = true;
public synchronized String take() {
// Ждём, пока нет сообщения.
while (empty) {
try {
wait();
} catch (InterruptedException e) {}
}
// Меняем статус
empty = true;
// Уведомляем поставщика, что статус был изменён.
notifyAll();
return message;
}
public synchronized void put(String message) {
// Ждём, пока сообщение не доставлено.
while (!empty) {
try {
wait();
} catch (InterruptedException e) {}
}
// Переключаем статус.
empty = false;
// Сохраняем сообщение.
this.message = message;
// Уведомляем потребителя, что статус был изменён.
notifyAll();
}
}
|
Поток поставщика отправляет несколько сообщений. Строка «DONE» обозначает, что все сообщения были успешно отправлены. Чтобы имитировать непредсказуемое поведение реального приложения, поток поставщика делает случайную паузу между сообщениями.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
import java.util.Random;
public class Producer implements Runnable {
private Drop drop;
public Producer(Drop drop) {
this.drop = drop;
}
public void run() {
String importantInfo[] = {
«Mares eat oats»,
«Does eat oats»,
«Little lambs eat ivy»,
«A kid will eat ivy too»
};
Random random = new Random();
for (int i = 0;
i < importantInfo.length;
i++) {
drop.put(importantInfo[i]);
try {
Thread.sleep(random.nextInt(5000));
} catch (InterruptedException e) {}
}
drop.put(«DONE»);
}
}
|
Поток потребителя просто получает сообщения и выводит их, пока не получит сообщение «DONE». Он тоже делает случайную паузу между сообщениями.
Главный поток запускает поток поставщика и поток потребителя:
1
2
3
4
5
6
7
|
public class ProducerConsumerExample {
public static void main(String[] args) {
Drop drop = new Drop();
(new Thread(new Producer(drop))).start();
(new Thread(new Consumer(drop))).start();
}
}
|
Замечание: Класс Drop написан только для демонстрации потоков. Постарайтесь не изобретать колесо, посмотрите существующие структуры данных в Java Collection Framework, перед тем как пытаться написать свой собственный объект для разделения данных между несколькими потоками.
Неизменяемые объекты (immutable objects)
Объект считается неизменяемым, если его внутреннее состояние не может быть изменено после создания. Использование неизменяемых объектов — широко распространённая стратегия для создания простого и надёжного кода.
Неизменяемые объекты особенно полезны в многопоточных приложениях. Так как они не могут менять своего внутреннего состояния, то они не могут быть испорчены вмешательством в поток (thread interference) или прочитаны в некорректном состоянии.
Программисты часто ленятся использовать неизменяемые объекты, так как они беспокоятся о цене создания нового объекта вместо изменения старого. Влияние создания экземпляров объектов часто переоценивается и может быть компенсировано преимуществами использования неизменяемых объектов.
Пример синхронизированного класса
Класс SynchronizedRGB определяет объекты, которые хранят цвет. Каждый объект содержит три числа (красный, зелёный, синий) и имя цвета.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
public class SynchronizedRGB {
// Values must be between 0 and 255.
private int red;
private int green;
private int blue;
private String name;
private void check(int red,
int green,
int blue) {
if (red < 0 || red > 255
|| green < 0 || green > 255
|| blue < 0 || blue > 255) {
throw new IllegalArgumentException();
}
}
public SynchronizedRGB(int red,
int green,
int blue,
String name) {
check(red, green, blue);
this.red = red;
this.green = green;
this.blue = blue;
this.name = name;
}
public void set(int red,
int green,
int blue,
String name) {
check(red, green, blue);
synchronized (this) {
this.red = red;
this.green = green;
this.blue = blue;
this.name = name;
}
}
public synchronized int getRGB() {
return ((red << 16) | (green << 8) | blue);
}
public synchronized String getName() {
return name;
}
public synchronized void invert() {
red = 255 — red;
green = 255 — green;
blue = 255 — blue;
name = «Inverse of « + name;
}
}
|
SynchronizedRGB нужно использовать с осторожностью, чтобы исключить его видимость другими потоками в противоречивом состоянии. Например, предположим, что поток выполняет следующий код:
1
2
3
4
5
|
SynchronizedRGB color =
new SynchronizedRGB(0, 0, 0, «Pitch Black»);
...
int myColorInt = color.getRGB(); // инструкция 1
String myColorName = color.getName(); // инструкция 2
|
Если другой поток вызывает color.set после инструкции 1, но перед инструкцией 2, то значение myColorInt не будет совпадать со значением myColorName. Чтобы предотвратить это, две инструкции должны быт связаны вместе:
1
2
3
4
|
synchronized (color) {
int myColorInt = color.getRGB();
String myColorName = color.getName();
}
|
Такой тип противоречивого состояния возможен только для изменяемых объектов, он не будет происходить с неизменяемой версией SynchronizedRGB.
Как определять неизменяемые объекты (immutable objects)
Ниже перечислены правила определения неизменяемых объектов. Не все классы, документированные как «неизменяемые», следуют этим правилам. Это не обязательно значит, что создатели этих классов были идиотами — они могли иметь веские причины считать, что экземпляры их объектов никогда не будут меняться после создания. Однако такие стратегии требуют сложного анализа и не подходят для начинающих.
- Не создавайте методов установки значений (setter-ов, которые меняют значения полей или объектов, на которые ссылаются эти поля).
- Делайте все поля final и private.
- Не позволяйте дочерним классам переопределять методы. Самый просто способ добиться этого — объявить класс как final. Более сложный способ — это сделать конструктор приватным и создавать экземпляры класса с помощью методов фабрик.
- Если поля экземпляров ссылаются на изменяемые объекты, то не позволяйте менять состояние этих объектов: не предоставляйте методов, которые меняют внутреннее состояние изменяемых объектов, не позволяйте стороннему коду получить ссылки на изменяемые объекты (возвращайте копии этих объектов), не используйте те объекты, которые были переданы в конструктор (создавайте их копии, если нужно).
Применение этих правил к SynchronizedRGB состоит из следующих шагов:
- Есть два метода установки значений. Первый произвольно меняет объект, и его нужно убрать из неизменяемой версии класса. Второй метод invert может быть адаптирован так, что он будет создавать новый объект, вместо изменения существующего.
- Все поля сделать final, они уже приватные.
- Сам класс объявить как final.
- Только одно поле ссылается на объект, и этот объект уже неизменяемый, поэтому других предосторожностей не нужно.
Результат:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
final public class ImmutableRGB {
// Values must be between 0 and 255.
final private int red;
final private int green;
final private int blue;
final private String name;
private void check(int red,
int green,
int blue) {
if (red < 0 || red > 255
|| green < 0 || green > 255
|| blue < 0 || blue > 255) {
throw new IllegalArgumentException();
}
}
public ImmutableRGB(int red,
int green,
int blue,
String name) {
check(red, green, blue);
this.red = red;
this.green = green;
this.blue = blue;
this.name = name;
}
public int getRGB() {
return ((red << 16) | (green << 8) | blue);
}
public String getName() {
return name;
}
public ImmutableRGB invert() {
return new ImmutableRGB(255 — red,
255 — green,
255 — blue,
«Inverse of « + name);
}
}
|
Высокоуровневые объекты для многопоточного приложения
До сих пор мы обсуждали низкоуровневое API, которое было частью Java с самого начала. Это API подходит для очень простых задач, но для более сложных задач нужно что-нибудь более высокоуровневое. Это особенно важно для больших многопоточных приложений, которые полностью используют современные системы с несколькими процессорами и несколькими ядрами.
Объекты Lock
Синхронизированный код полагается на простой тип reentrant lock (блокировка, которую можно брать несколько раз). Этот тип легко использовать, но он имеет определённые ограничение. Более сложные способы блокировки поддерживаются пакетом java.util.concurrent.locks. Этот пакет имеет довольно много классов, но здесь будет рассмотрен его наиболее базовый интерфейс Lock.
Объекты Lock работают очень похоже на внутренние блокировки, используемые синхронизированным кодом. Так же как и для внутренних блокировок только один поток может держать блокировку объекта Lock в одно время. Объекты Lock также поддерживают механизм wait / notify через ассоциированные с ними объекты Condition.
Преимущество объектов Lock над внутренними блокировками в том, что они могут отказаться от участия в попытке приобрести блокировку. Метод tryLock сразу же завершается, если блокировка недоступна сразу же, либо после истечения указанного времени (если время указано). Метод lockInterruptibly отказывается от попытки получить блокировку, если другой поток отправляет interrupt до получения блокировки.
Используем объекты Lock, чтобы решить проблему взаимной блокировки, которую мы видели в «Живучесть (Liveness)». Алиса и Боб научили себя замечать, когда другой собирается сделать поклон. Мы смоделируем это улучшение, добавив необходимость для наших объектов Friend получить блокировку на обоих участниках, перед тем как сделать поклон. Чтобы продемонстрировать универсальность этого способа, мы предположим, что Алиса и Боб так воодушевлены своей возможностью кланяться безопасно, что они не могут остановиться:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
|
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.Random;
public class Safelock {
static class Friend {
private final String name;
private final Lock lock = new ReentrantLock();
public Friend(String name) {
this.name = name;
}
public String getName() {
return this.name;
}
public boolean impendingBow(Friend bower) {
Boolean myLock = false;
Boolean yourLock = false;
try {
myLock = lock.tryLock();
yourLock = bower.lock.tryLock();
} finally {
if (! (myLock && yourLock)) {
if (myLock) {
lock.unlock();
}
if (yourLock) {
bower.lock.unlock();
}
}
}
return myLock && yourLock;
}
public void bow(Friend bower) {
if (impendingBow(bower)) {
try {
System.out.format(«%s: %s has»
+ » bowed to me!%n»,
this.name, bower.getName());
bower.bowBack(this);
} finally {
lock.unlock();
bower.lock.unlock();
}
} else {
System.out.format(«%s: %s started»
+ » to bow to me, but saw that»
+ » I was already bowing to»
+ » him.%n»,
this.name, bower.getName());
}
}
public void bowBack(Friend bower) {
System.out.format(«%s: %s has» +
» bowed back to me!%n»,
this.name, bower.getName());
}
}
static class BowLoop implements Runnable {
private Friend bower;
private Friend bowee;
public BowLoop(Friend bower, Friend bowee) {
this.bower = bower;
this.bowee = bowee;
}
public void run() {
Random random = new Random();
for (;;) {
try {
Thread.sleep(random.nextInt(10));
} catch (InterruptedException e) {}
bowee.bow(bower);
}
}
}
public static void main(String[] args) {
final Friend alice =
new Friend(«Alice»);
final Friend bob =
new Friend(«Bob»);
new Thread(new BowLoop(alice, bob)).start();
new Thread(new BowLoop(bob, alice)).start();
}
}
|
Executors
java.util.concurrent.Executor
Интерфейс java.util.concurrent.Executor предоставляет один методexecute, который является заменой обычного создания потока. Еслиr реализует интерфейс Runnable, а e реализует интерфейс Executor, то вы можете заменить
1
|
(new Thread(r)).start();
|
следующим кодом:
1
|
e.execute(r);
|
Однако определение метода execute несколько отличается. Низкоуровневый вариант создаёт новый поток и сразу же его запускает. В зависимости от реализации Executor-а метод execute может делать то же самое, но обычно он использует уже существующий рабочий поток для запуска r, либо r помещается в очередь, где дожидается освобождения рабочего потока.
Реализации executor-ов в java.util.concurrent созданы для использования с более продвинутыми интерфейсами ExecutorService иScheduledExecutorService, но они также работают и с интерфейсом Executor.
java.util.concurrent.ExecutorService
Интерфейс java.util.concurrent.ExecutorService расширяет интерфейс Executor, добавляя множество новых методов. Основной метод — метод submit, который принимает как Runnable, так и интерфейс java.util.concurrent.Callable<V> с единственным методом Vcall() , который позволяет заданиям возвращать значение. Методsubmit возвращает интерфейс java.util.concurrent.Future, который используется для получения результата и контролирования состояния потока.
java.util.concurrent.ScheduledExecutorService
Интерфейс java.util.concurrent.ScheduledExecutorService расширяет интерфейс java.util.concurrent.ExecutorService и добавляет методы schedule*, которые позволяют запланировать выполнение задания.
Пулы потоков
Многие реализации executor-ов из пакета java.util.concurrent используют пул потоков, который состоит из работающих потоков. Такой тип потока существует отдельно от Runnable и Callable, который он выполняет, и часто используется для выполнения нескольких заданий.
Использование работающих потоков минимизирует издержки создания потоков. Потоки используют много памяти, и в больших приложениях создание и уничтожение большого количества потоков значительно увеличивает потребление памяти.
Наиболее часто использующийся тип пула потоков — это фиксированный пул потоков. Этот тип пула всегда держит работающими указанное количество потоков. Если поток как-нибудь завершается в то время, пока он всё ещё используется, то он автоматически заменяется вновь созданным потоком. Задачи отправляются в пул через внутреннюю очередь, которая хранит дополнительные задачи, если их больше, чем потоков.
Важное преимущество фиксированного пула потоков в том, что приложения, которые их используют не подвисают из-за того, что они создали слишком большое количество потоков, которое превышает возможности системы.
Самый простой способ создать executor, который использует фиксированный пул потоков, — это вызвать newFixedThreadPool у классаjava.util.concurrent.Executors:
1
|
public static ExecutorService newFixedThreadPool(int nThreads)
|
Класс Executors также имеет следующие фабричные методы:
1
|
public static ExecutorService newCachedThreadPool()
|
Создаёт новые потоки по мере надобности. Повторно использует предыдущие потоки, если они свободны.
1
|
public static ExecutorService newSingleThreadExecutor()
|
Пул потоков, состоящий из одного потока.
Если ни один из стандартных executor-ов не удовлетворяет вашим потребностям, то вы можете создать экземпляры java.util.concurrent.ThreadPoolExecutor или java.util.concurrent.ScheduleThreadPoolExecutor.
Fork/Join Framework
Fork/Join Framework является реализацией интерфейса ExecutorService, который помогает вам получить преимущество при использовании мультипроцессорной системы. Он спроектирован для такой работы, которая может быть разбита рекурсивно на множество маленьких частей. Цель фреймворка — использовать всю доступную мощь, чтобы увеличить производительность вашего приложения.
Как и с любой реализацией ExecutorService fork/join framework распределяет задачи между рабочими потоками в пуле потоков. Fork/join фрейморк отличается тем, что он использует алгоритм воровства работы. Рабочие потоки, для которых кончилась работа, могут воровать задачи других потоков, которые всё ещё заняты.
Основным классом fork/join фреймворка является класс java.util.concurrent.ForkJoinPool, который расширяет класс java.util.concurrent.AbstractExecutorService. Класс ForkJoinPool реализует основной алгоритм воровства работы и может выполнять задачи java.util.concurrent.ForkJoinTask.
Первый шаг использования fork/join framework — это написать код, который выполняет кусок работы. Ваш код должен быть похож на следующий псевдокод:
1
2
3
4
5
|
если моя порция работы слишком мала
сделать работу напрямую
иначе
разделить работу на два куска
вызывать два куска и подождать результата
|
Оберните этот код в какой-нибудь дочерний класс от ForkJoinTask, обычно используя один из более специализированных типов java.util.concurrent.RecursiveTask (который может вернуть результат) либо java.util.concurrent.RecursiveAction.
Когда ваш ForkJoinTask будет готов, создайте объект, который представляет из себя всю работу и передайте его в метод invoke() экземпляра ForkJoinPool.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
public class ForkBlur extends RecursiveAction {
private int[] mSource;
private int mStart;
private int mLength;
private int[] mDestination;
// Размер окна. Должен быть нечётным
private int mBlurWidth = 15;
public ForkBlur(int[] src, int start, int length, int[] dst) {
mSource = src;
mStart = start;
mLength = length;
mDestination = dst;
}
protected void computeDirectly() {
int sidePixels = (mBlurWidth — 1) / 2;
for (int index = mStart; index < mStart + mLength; index++) {
// вычисляем средний цвет.
float rt = 0, gt = 0, bt = 0;
for (int mi = —sidePixels; mi <= sidePixels; mi++) {
int mindex = Math.min(Math.max(mi + index, 0),
mSource.length — 1);
int pixel = mSource[mindex];
rt += (float)((pixel & 0x00ff0000) >> 16)
/ mBlurWidth;
gt += (float)((pixel & 0x0000ff00) >> 8)
/ mBlurWidth;
bt += (float)((pixel & 0x000000ff) >> 0)
/ mBlurWidth;
}
// создаём заново конечный пиксель.
int dpixel = (0xff000000 ) |
(((int)rt) << 16) |
(((int)gt) << 8) |
(((int)bt) << 0);
mDestination[index] = dpixel;
}
}
...
|
Теперь мы реализуем абстрактный метод compute(), который реализует размывание напрямую, либо делит на мелкие задачи.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
protected static int sThreshold = 100000;
protected void compute() {
if (mLength < sThreshold) {
computeDirectly();
return;
}
int split = mLength / 2;
invokeAll(new ForkBlur(mSource, mStart, split, mDestination),
new ForkBlur(mSource, mStart + split, mLength — split,
mDestination));
}
|
Пример запуска:
1
2
3
4
5
6
7
|
// source image pixels are in src
// destination image pixels are in dst
ForkBlur fb = new ForkBlur(src, 0, src.length, dst);
ForkJoinPool pool = new ForkJoinPool();
pool.invoke(fb);
|
Атомарные переменные
Пакет java.util.concurrent.atomic содержит классы, которые поддерживают атомарные операции над простыми переменными. Все классы имеют методы get и set, которые работают так же, как чтение и запись volatile переменных, то есть set имеет связь произошло-до (happens-before) с любым последующим get над той же самой переменной. Атомарный метод compareAndSet также имеет ту же особенность.
Чтобы посмотреть класс в деле давайте посмотрим класс Counter:
Один из способов избавить Counter от вмешательства в поток (thread interference) — это сделать его синхронизированным:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
class SynchronizedCounter {
private int c = 0;
public synchronized void increment() {
c++;
}
public synchronized void decrement() {
c—;
}
public synchronized int value() {
return c;
}
}
|
Для такого простого класса это вполне приемлемое решение. Но для более сложных классов, мы хотим ибежать живучести (liveness). Замена int на AtomicInteger позволит защититься от вмешательства в поток (thread interference) без synchronized :
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
import java.util.concurrent.atomic.AtomicInteger;
class AtomicCounter {
private AtomicInteger c = new AtomicInteger(0);
public void increment() {
c.incrementAndGet();
}
public void decrement() {
c.decrementAndGet();
}
public int value() {
return c.get();
}
}
|