В версии JDК 7 внедрен новый класс синхронизации под названием Phaser. Главное его назначение — синхронизировать потоки исполнения, которые представляют одну или несколько стадий (или фаз) выполнения действия. Например, в прикладной программе может быть несколько потоков исполнения, реализующих три стадии обработки заказов.
На первой стадии отдельные потоки исполнения используются для того, чтобы проверить сведения о клиенте, наличие товара на складе и его цену. По завершении этой стадии остаются два потока исполнения, где на второй стадии вычисляется стоимость доставки и сумма соответствующего налога, а на заключительной стадии подтверждается оплата и определяется ориентировочное время доставки.
В прошлом для синхронизации нескольких потоков исполнения в такой прикладной программе пришлось бы немало потрудиться. А с появлением класса Phaser этот процесс значительно упростился.
Прежде всего следует иметь в виду, что класс Phaser действует подобно описанному ранее классу CyclicBarrier, за исключением того, что он поддерживает несколько фаз.
В итоге класс Phaser позволяет определить объект синхронизации, ожидающий завершения определенной фазы. Затем он переходит к следующей фазе и снова ожидает ее завершения. Следует также иметь в виду, что класс Phaser можно использовать и для синхронизации только одной фазы. В этом отношении он действует подобно классу CyclicBarrier, хотя главное его назначение — синхронизация нескольких фаз. В классе Phaser определяются четыре конструктора.
В данной статье упоминаются следующие два конструктора этого класса:
1 2 |
Phaser() Phaset(int_количество сторон) |
Первый конструктор создает синхронизатор фаз с нулевым регистрационным счетом, а второй устанавливает значение регистрационного счета равным заданному количеству_сторон. Объекты, регистрируемые синхронизатором фаз, зачастую обозначаются термином сторона.
Обычно имеется полное соответствие количества регистрируемых объектов и синхронизируемых потоков исполнения, хотя этого и не требуется. В обоих случаях текущая фаза является нулевой. Поэтому когда создается экземпляр класса Phaser, он первоначально находится в нулевой фазе.
Обычно класс Phaser используется следующим образом. Сначала создается новый экземпляр класса Phaser. Затем синхронизатор фаз регистрирует одну или несколько сторон, вызывая метод register() или указывая нужное количество сторон в конструкторе класса Phaser.
Пользуюсь случаем, хочу сказать что мы желаем изменить шаблон для нашего блога. Нашел интересный сайт htmldownload.com где доступны самые разные HTML шаблоны. Если вам нравится какой-нибудь дизайн вставьте ссылку в комментарии.
Синхронизатор фаз ожидает до тех пор, пока все зарегистрированные стороны не завершат фазу. Сторона извещает об этом, вызывая один из многих методов, предоставляемых классом Phaser, например метод arrive() или arriveAndAwaitAdvance().
Как только все стороны достигнут данной фазы, она считается завершенной, и синхронизатор фаз может перейти к следующей фазе (если она имеется) или завершить свою работу. Далее этот процесс поясняется более подробно.
Для регистрации стороны после создания объекта класса Phaser следует вызвать метод register(). Ниже приведена общая форма этого метода. В итоге он возвратит номер регистрируемой фазы.
1 |
int register() |
Чтобы сообщить о завершении фазы, сторона должна вызвать метод arrive( ) или какой-нибудь его вариант. Когда количество достижений конца фазы сравняется с количеством зарегистрированных сторон, фаза завершится и объект класса Phaser перейдет к следующей фазе (если она имеется). Метод arrive() имеет следующую общую форму:
1 |
int arrive() |
Этот метод сообщает, что сторона (обычно поток исполнения) завершила некоторую задачу (или ее часть) . Он возвращает текущий номер фазы. Если работа синхронизатора фаз завершена, этот метод возвращает отрицательное значение.
Метод arrive() не приостанавливает исполнение вызывающего потока. Это означает, что он не ожидает завершения фазы. Этот метод должен быть вызван только зарегистрированной стороной.
Если требуется указать завершение фазы, а затем ожидать завершения этой фазы всеми остальными зарегистрированными сторонами, следует вызвать метод arriveAndAwaitAdvance(). Ниже приведена общая форма этого метода.
1 |
int arriveAndAwaitAdvance() |
Этот метод ожидает до тех пор, пока все стороны не достигнут данной фазы, а затем возвращает номер следующей фазы или отрицательное значение, если синхронизатор фаз завершил свою работу. Метод ariveAndAwaitAdvance() должен быть вызван только зарегистрированной стороной.
Поток исполнения может достигнуть данной фазы, а затем сняться с регистрации, вызвав метод arriveAndDeregister(). Ниже приведена общая форма этого метода.
1 |
int arriveAndDeregister() |
Этот метод возвращает номер текущей фазы или отрицательное значение,если синхронизатор фаз завершил свою работу. Он не ожидает завершения фазы. Метод arriveAndDeregister() должен быть вызван только зарегистрированной стороной.
Чтобы получить номер текущей фазы, следует вызвать метод getPhase(). Его общая форма выглядит следующим образом:
1 |
final int getPhase() |
Когда создается объект класса Phaser, первая фаза получает нулевой номер, вторая фаза — номер 1, третья фаза — номер 2 и т.д. Если вызывающий объект класса Phaser завершил свою работу, возвращается отрицательное значение.
В приведенном ниже примере программы демонстрируется применение класса Phaser. В этой программе создаются три потока, каждый из которых имеет три фазы своего исполнения. Для синхронизации каждой фазы применяется класс Phaser.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
// Пример применения класса Phaser import java.util.concurrent.*; class PhaserDemo { public static void main(String args[]) { Phaser phsr = new Phaser(1); int curPhase; System.out.println("Запуск потоков"); new MyThread(phsr, "A"); new MyThread(phsr, "B"); new MyThread(phsr, "C"); // ожидать заверешния всеми потоками исполнения первой фазы curPhase = phsr.getPhase(); phsr.arriveAndAwaitAdvance(); System.out.println("Фаза " + curPhase + " завершена"); // ожидать завершения всеми потоками исполнения второй фазы curPhase = phsr.getPhase(); phsr.arriveAndAwaitAdvance(); System.out.println("Фаза " + curPhase + " завершена"); curPhase = phsr.getPhase(); phsr.arriveAndAwaitAdvance(); System.out.println("Фаза " + curPhase + " завершена"); // снять основной поток исполнения с регистрации phsr.arriveAndDeregister(); if(phsr.isTerminated()) { System.out.println("Синхронизатор фаз завершен"); } } } // Поток исполнения, использующий синхронизатор фаз типа Phaser class MyThread implements Runnable { Phaser phsr; String name; MyThread(Phaser p, String n) { phsr = p; name = n; phsr.register(); new Thread(this).start(); } public void run() { System.out.println("Поток " + name + " начинает первую фазу"); phsr.arriveAndAwaitAdvance(); // известить о достижении фазы // Небольшая пауза, чтобы не нарушить порядок вывода. // Только для иллюстрации, но необязательно для правильного // функционирования синхронизатора фаз try { Thread.sleep(10); } catch(InterruptedException e) { System.out.println(e); } System.out.println("Поток " + name + " начинает вторую фазу"); phsr.arriveAndAwaitAdvance(); // известить о достижении фазы // Небольшая пауза, чтобы не нарушить порядок вывода. // Только для иллюстрации, но необязательно для правильного // функционирования синхронизатора фаз try { Thread.sleep(10); } catch(InterruptedException e) { System.out.println(e); } System.out.println("Поток " + name + " начинает третью фазу"); // известить о достижении фазы и снять потоки с регистрации phsr.arriveAndDeregister(); } } |
Ниже приведен результат, выводимый данной программой.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
Поток B начинает первую фазу Поток C начинает первую фазу Поток A начинает первую фазу Фаза 0 завершена Поток A начинает вторую фазу Поток B начинает вторую фазу Поток C начинает вторую фазу Фаза 1 завершена Поток C начинает третью фазу Поток B начинает третью фазу Поток A начинает третью фазу Фаза 2 завершена Синхронизатор фаз завершен |
Рассмотрим подробнее основные части данной программы. Сначала в методе main() создается объект phsr типа Phaser с начальным счетом сторон, равным 1 (что соответствует основному потоку исполнения).
Затем создаются три объекта типа MyThread и запускаются три соответствующих потока исполнения. Обратите внимание на то, что объекту типа MyThread передается ссылка на объект phsr синхронизатор фаз. Объекты типа MyThread используют этот синхронизатор фаз для синхронизации своих действий.
Затем в методе main() вызывается метод getPhase(), чтобы получить номер текущей фазы (который первоначально является нулевым), а после этого — метод arriveAndAwaitAdvance(). В итоге выполнение метода main() приостанавливается до тех пор, пока не завершится нулевая, по существу, первая фаза.
Но этого не произойдет до тех пор, пока все объекты типа MyThread не вызовут метод arriveAndAwaitAdvance(). Как только это произойдет, метод main() возобновит свое выполнение, сообщив о завершении нулевой фазы, и перейдет к следующей фазе. Этот процесс повторяется до завершения всех трех фаз. Затем в методе main( ) вызывается метод arriveAndDeregister() ,чтобы снять с регистрации все три объекта типа MyThread.
В итоге зарегистрированных сторон больше не остается, а следовательно, перейдя к следующей фазе, синхронизатор фаз завершит свою работу.
Теперь рассмотрим объект типа MyThread. Прежде всего обратите вниманиена то, что конструктору передается ссылка на синхронизатор фаз, в котором новый поток исполнения регистрируется далее как отдельная сторона.
Таким образом, каждый новый объект типа MyThread становится стороной, зарегистрированной синхронизатором фаз, переданным конструктору этого объекта.
Обратите также внимание на то, что у каждого потока исполнения имеются три фазы. В данном примере каждая фаза состоит из метки-заполнителя, которая просто отображает имя потока исполнения и то, что он делает.
Безусловно, реальный код выполнял бы в потоке более полезные действия. Между первыми двумя фазами поток исполнения вызывает метод arriveAndAwaitAdvance(). Таким образом, каждый поток исполнения ожидает завершения фазы всеми остальными потоками, включая и основной поток. По завершении всех потоков исполнения, включая и основной, синхронизатор фаз переходит к следующей фазе.
А по завершении третьей фазы каждый поток исполнения снимается с регистрации, вызывая метод arriveAndDeregister(). Как поясняется в комментариях к объектам типа MyThread, метод sleep() вызывается исключительно в иллюстративных целях, чтобы предотвратить нарушение вывода из-за многопоточности, хотя это и не обязательно для правильного функционирования синхронизатора фаз.
Если удалить вызовы метода sleep(), то выводимый результат может выглядеть немного запутанным, но фазы все равно будут синхронизированы правильно.
Следует также иметь в виду, что в рассматриваемом здесь примере используются три однотипных потока исполнения, хотя это и не обязательное требование. Каждая сторона, пользующаяся синхронизатором фаз, может быть совершенно непохожей на остальные, выполняя свою задачу.
Все, что происходит при переходе к следующей фазе, вполне поддается контролю. Для этого следует переопределить метод onAdvance(). Этот метод вызывается исполняющей средой, когда синхронизатор фаз переходит от одной фазы к следующей. Ниже приведена общая форма данного метода.
1 |
protected boolean onAdvance(int фаза, int количество_сторон) |
Здесь параметр фаза обозначает текущий номер фазы перед его приращением, а параметр количество_сторон — число зарегистрированных сторон.
Для тогочтобы завершить работу синхронизатора фаз, метод onAdvance() должен возвратить логическое значение true. А для того чтобы продолжить работу синхронизатора фаз, метод onAdvance() должен возвратить булевское значение false.
В версии по умолчанию метод onAdvance() возвращает логическое значение true, чтобы завершить работу синхронизатора фаз, если зарегистрированныхсторон больше нет. Как правило, переопределяемая версия данного метода должна следовать этой практике.
Метод onAdvance() может быть, в частности, переопределен для того, чтобы дать синхронизатору фаз возможность выполнить заданное количество фаз, а затем остановиться.
Ниже приведен характерный тому пример. В данном примере создается класс MyPhaser, расширяющий класс Phaser таким образом, чтобы выполнять заданное количество фаз. С этой целью переопределяется метод onAdvance().
Конструктор класса MyPhaser принимает один аргумент, задающий количество выполняемых фаз. Обратите внимание на то, что класс MyPhaser автоматически регистрирует одну сторону. Это удобно для целей данного примера,но у конкретной прикладной программы могут быть другие потребности.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
// Расширить класс Phaser и переопределить метод onAdvance() // таким образом, чтобы было выполнено только определенное // количество фаз import java.util.concurrent.*; // Расширить класс MyPhaser, чтобы выполнить только // определенное количество фаз class MyPhaser extends Phaser { int numPhases; MyPhaser(int parties, int phaseCount) { super(parties); numPhases = phaseCount - 1; } // переопределить метод onAdvance(), чтобы выполнить // определенное количество фаз protected boolean onAdvance(int p, int regParties) { // Следующий оператор println() требуется только для // целей иллюстрации. Как правило, метод onAdvance() // не отображает выводимые данные System.out.println("Фаза " + p + " завершена.\n"); // возратить логическое значение true, // если все фазы завершены if(p == numPhases || regParties == 0) return true; // В противном случает возратить логическое значение false; return false; } } class PhaserDemo2 { public static void main(String args[]) { MyPhaser phsr = new MyPhaser(1, 4); System.out.println("Запуск потоков\n"); new MyThread(phsr, "A"); new MyThread(phsr, "B"); new MyThread(phsr, "C"); // ожидать завершения определенного количества фаз while(!phsr.isTerminated()) { phsr.arriveAndAwaitAdvance(); } System.out.println("Синхронизатор фаз завершен"); } } // Поток исполнения, использующий синхронизатор фаз типа Phaser class MyThread implements Runnable { Phaser phsr; String name; MyThread(Phaser p, String n) { phsr = p; name = n; phsr.register(); new Thread(this).start(); } public void run() { while(!phsr.isTerminated()) { System.out.println( "Поток " + name + " начинает фазу " + phsr.getPhase()); phsr.arriveAndAwaitAdvance(); // небольшая пауза, чтобы не нарушить порядок вывода. // Только для иллюстрации, но необязательно для правильного // функционирования синхронизатора фаз try { Thread.sleep(10); } catch(InterruptedException e) { System.out.println(e); } } } } |
Ниже приведен результат, выводимый данной программой.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
Запуск потоков Поток B начинает фазу 0 Поток A начинает фазу 0 Поток C начинает фазу 0 Фаза 0 завершена. Поток C начинает фазу 1 Поток B начинает фазу 1 Поток A начинает фазу 1 Фаза 1 завершена. Поток A начинает фазу 2 Поток C начинает фазу 2 Поток B начинает фазу 2 Фаза 2 завершена. Поток B начинает фазу 3 Поток C начинает фазу 3 Поток A начинает фазу 3 Фаза 3 завершена. Синхронизатор фаз завершен |
В методе main( ) создается один экземпляр класса Phaser. В качестве аргумента ему передается значение 4. Это означает, что он будет выполняться в течениечетырех фаз, а затем завершится. Затем создаются три потока исполнения и начинается следующий цикл, как показано ниже.
1 2 3 4 |
// ожидать завершения определенного количества фаз while(!phsr.isTerminated()) { phsr.arriveAndAwaitAdvance(); } |
В этом цикле метод arriveAndAwaitAdvance() вызывается до завершения работы синхронизатора фаз, а этого не произойдет до тех пор, пока не будет выполнено определенное количество фаз.
В данном случае цикл будет продолжаться до тех пор, пока не завершатся все четыре фазы. Следует также иметь в виду, что метод arriveAndAwaitAdvance() вызывается из потоков исполнения вплоть до завершения работы синхронизатора фаз в данном цикле. Это означает, что потоки исполняются до завершения заданного количества фаз.
А теперь рассмотрим подробнее исходный код метода onAdvance(). Всякий раз, когда вызывается метод onAdvance(), ему передается текущая фаза и количество зарегистрированных сторон. Если текущая фаза соответствует указанной фазе или количество зарегистрированных сторон равно нулю, метод onAdvance() возвращает логическое значение true, прекращая таким образом работу синхронизатора фаз.
Это делается в приведенном ниже фрагменте кода. Как можно заметить, для достижения желаемого результата необходимо совсем немного кода.
1 2 3 |
// возратить логическое значение true, // если все фазы завершены if(p == numPhases || regParties == 0) return true; |
Прежде чем завершить эту статью, следует заметить, что расширять класс Phaser совсем не обязательно. Как и в предыдущем примере, для этого достаточно переопределить метод onAdvance().
В некоторых случаях может быть создан более компактный код с помощью анонимного внутреннего класса, переопределяющего метод onAdvance().
У класса Phaser имеются дополнительные возможности, которые моrут оказаться полезными в прикладных программах. В частности, вызвав метод awaitAdvance(), можно ожидать конкретной фазы, как показано ниже.
1 |
int awaitAdvance(int фаза) |
Здесь параметр фаза обозначает номер фазы, в течение которой метод awaitAdvance() находится в состоянии ожидания до тех пор, пока не произойдет переход к следующей фазе.
Этот метод завершится немедленно, если передаваемый ему параметр фаза не будет равен текущей фазе. Он завершится сразу и в том случае, если синхронизатор фаз завершит работу.
Но если в качестве параметра фазаэтому методу будет передана текущая фаза, то он будет ожидать перехода к следующей фазе. Этот метод должен быть вызван только зарегистрированной стороной. Имеется также прерывающая версия этого метода под названием awaitAdvanceInterruptibly().
Чтобы зарегистрировать несколько сторон, следует вызвать метод bulkRegister( ), чтобы получить количество зарегистрированных сторон — метод getRegisteredParties(), чтобы получить количество сторон, достигших или не достигших своей фазы — метод getArrivedParties() или getUnarrivedParties() соответственно, а чтобы перевести синхронизатор фаз в завершенное состояние — метод forceTermination().
Класс Phaser позволяет также создать дерево синхронизаторов фаз. Он снабжен двумя дополнительными конструкторами, которые позволяют указать родительский синхронизатор фаз и метод getParent().