С появлением стандарта C++11, библиотека STL была расширена, включив в себя прямую поддержку многопоточности через модуль в заголовочном файле <thread>. До стандарта C++11 приходилось использовать POSIX-потоки, а новый подход решал проблемы с портируемостью.
Важное примечание: данная заметка опирается на знания двух предыдущих — теории и POSIX. Ознакомление с ними желательно.
Принудительное завершение потоков в C++11 и более поздних версиях стандарта не поддерживается напрямую из-за потенциальных проблем с безопасностью и надежностью. Вместо этого необходимо использовать другие механизмы для безопасного завершения потоков, такие как флаги завершения, условные переменные или другие средства синхронизации. Задание атрибутов, аналогично POSIX, так же не поддерживается в целях повышения портируемости кода.
Создание потоков
В языке C++ работа с потоком осуществляется с помощью класса std::thread, который может порождать поток с точкой входа в:
- обычных функциях;
- лямбда-функциях;
- методах классов — требует передачи объекта класса в качестве первого аргумента потока;
- статических методах классов;
- функторах.
Если в конструктор не передать точку входа, то поток создан не будет, а лишь пустой объект, который может быть позднее инициализирован.
Кроме того, данный класс позволяет передавать любое число параметров в точку входа потока, что сильно упрощает взаимодействие с потоком по сравнению с POSIX подходом.
Примечание: для обеспечения безопасной работы с выделяемыми ресурсами следует использовать умные указатели.
Вызов метода join() блокирует вызывающий поток до тех пор, пока поток этого объекта не выполнит свою работу. При попытке вызвать функцию для не созданного или отсоединенного потока будет выдано исключение (exception). Для возможности проверки необходимо использовать метод joinable().
Важное замечание: вызов метода join() не возвращает значение, возвращаемое функцией потока. Подробнее о возвращаемых значениях в следующем разделе.
Пример создания потока с точкой входа в обычной функции:
#include <thread>
#include <iostream>
void threadFunction1()
{
std::cout << "Поток 1 с обычной функцией без аргументов\n";
}
void threadFunction2(int i, double d)
{
std::cout << "Поток 2 с обычной функцией с аргументами: " << i << ' ' << d << '\n';
}
int main()
{
std::thread myThread1(threadFunction1);
myThread1.join();
std::thread myThread2(threadFunction2, 1, 2.34);
myThread2.join();
return 0;
}
Важное замечание: в данном примере и далее потоки создаются после завершения предыдущих во избежание гонки за поток вывода.
Пример использования lambda-функции при создании потока с различными подходами объявления:
#include <thread>
#include <iostream>
int main()
{
// Лямбда функции
auto myLambdaFunction1 = []() {
std::cout << "Поток 1 с лямбда-функцией без аргументов\n";
};
auto myLambdaFunction2 = [](int i, double d) {
std::cout << "Поток 2 с лямбда-функцией с аргументами: " << i << ' ' << d << '\n';
};
// Создаем потоки, выполняющие лямбда-функции
std::thread myThread1(myLambdaFunction1);
myThread1.join();
std::thread myThread2(myLambdaFunction2, 1, 2.34);
myThread2.join();
std::thread myThread3([]() {
std::cout << "Поток 3 с лямбда-функцией без аргументов\n";
}); // myThread3
myThread3.join();
std::thread myThread4([](int i, double d) { // Лямбда-функция с параметрами
std::cout << "Поток 4 с лямбда-функцией с аргументами: " << i << ' ' << d << '\n';
},
1, 2.34); // myThread4
myThread4.join();
return 0;
}
Важное замечание: лямбда функции не особо подходят для потоков, но все же могут быть использованы в качестве точки входа нового потока.
Пример использования методов классов при создании потока:
#include <thread>
#include <iostream>
class MyClass
{
public:
void instanceMethod1()
{
std::cout << "Поток 1 с методом объекта без доп. аргументов\n";
}
void instanceMethod2(int i, double d)
{
std::cout << "Поток 2 с методом объекта с доп. аргументами: " << i << ' ' << d << '\n';
}
static void staticMethod1()
{
std::cout << "Поток 3 со статическим методом без аргументов\n";
}
static void staticMethod2(int i, double d)
{
std::cout << "Поток 4 со статическим методом с аргументами: " << i << ' ' << d << '\n';
}
};
int main()
{
MyClass myObject; // Объект нашего класса
// Создание потоков
std::thread thread1(&MyClass::instanceMethod1, &myObject);
thread1.join();
std::thread thread2(&MyClass::instanceMethod2, &myObject, 1, 2.34);
thread2.join();
std::thread thread3(&MyClass::staticMethod1);
thread3.join();
std::thread thread4(&MyClass::staticMethod2, 1, 2.34);
thread4.join();
return 0;
}
Важное замечание: если не передавать адрес объекта, а сам объект класса при создании потока, то будет использована копия этого объекта.
Если в аргументах потока необходимо передать объект по ссылке — требуется использовать std::ref и std::cref. Пример:
#include <thread>
#include <iostream>
class MyClass
{
public:
void instanceMethod1(int& i, double& d)
{
std::cout << "Поток с методом объекта (" << this << ") с доп. аргументами: " << i << " (" << &i << ") и " << d << "(" << &d << ")\n";
}
void instanceMethod2(int i, double d)
{
std::cout << "Поток с методом объекта (" << this << ") с доп. аргументами: " << i << " (" << &i << ") и " << d << "(" << &d << ")\n";
}
};
int main()
{
MyClass myObject; // Объект нашего класса
int i = 1;
double d = 2.34;
// Вывод адресов
std::cout << "myObject (" << &myObject << ") i (" << &i << ") d (" << &d << ")\n";
// Создание потоков
std::thread thread1(&MyClass::instanceMethod1, &myObject, std::ref(i), std::ref(d)); // адрес myObject и ссылки на i, d
thread1.join();
std::thread thread2(&MyClass::instanceMethod1, std::ref(myObject), std::ref(i), std::ref(d)); // ссылка myObject и ссылки на i, d
thread2.join();
std::thread thread3(&MyClass::instanceMethod2, myObject, i, d); // копия myObject и копии i, d
thread3.join();
return 0;
}
Примечание: компилятор не позволит передать объекты без std::ref и std::cref, если функция/метод ожидают ссылку или константную ссылку в качестве аргумента
Функторы — классы с перегруженным operator(), объекты которых могут быть использованы как обычные функции. Пример использования функторов в качестве точки входа потока:
#include <thread>
#include <iostream>
class MyClass
{
public:
void operator()()
{
std::cout << "Поток с функтором (" << this << ") - без аргументов\n";
}
void operator()(int i, double d)
{
std::cout << "Поток с функтором (" << this << ") с аргументами: " << i << ' ' << d << '\n';
}
};
int main()
{
MyClass myObject; // Объект-функтор
// Адрес объекта-функтора
std::cout << "myObject (" << &myObject << ")\n";
// Создание потоков
std::thread thread1(std::ref(myObject)); // ссылка на объект-функтор
thread1.join();
std::thread thread2(myObject); // копия объекта-функтора
thread2.join();
std::thread thread3(std::ref(myObject), 1, 2.3); // ссылка на объект-функтор с аргументами
thread3.join();
return 0;
}
Метод detach() позволяет отсоединить поток от объекта std::thread, позволяя потоку продолжить выполнение независимо. После вызова этого метода объект std::thread может быть использован для другого потока выполнения.
Метод swap(thread& other) позволяет обменять потоки между двумя объектами std::thread. Данный метод может быть использован например алгоритмами сортировки в std::vector.
Метод get_id возвращает уникальный идентификатор потока выполнения, ассоциированного с объектом для которого вызывается данный метод.
Метод native_handle может быть использован для инициализации платформозависимой переменной для работы с потоками (например pthread_t в POSIX).
Так же имеется статический метод hardware_concurrency, который возвращает количество потоков выполнения, которые могут быть эффективно запущены параллельно.
Получение значения из потока
Как было сказано ранее: метод join не возвращает значения из функции. Для реализации такой функциональности используется механизм promise-future, представленный классами std::promise и std::future. Эти инструменты предоставляют механизм для обмена данными между потоками, где один поток обещает предоставить значение (promise), а другой поток ожидает получения этого значения (future).
Примечание: для работы с std::future необходимо подключить заголовочный файл <future>
Внимание: далее немного сложно, но надеюсь пример расставит все по своим местам.
std::promise позволяет потоку обещать предоставление некоторого значения в будущем. Он предоставляет механизм для установки значения или исключения, которое может быть получено в другом потоке с помощью std::future. Обычно std::promise используется в потоке-производителе для установки результата выполнения операции. Для задания значения используется метод set_value. Если в процессе выполнения операции возникает исключение, его можно установить с помощью метода set_exception.
Примечание: если значение или исключение уже установлено, то это приведет к вызову исключения std::future_error.
std::future представляет собой объект, который будет получать значение или исключение из std::promise. Он используется в потоке-потребителе для ожидания результата, который должен быть установлен в соответствующем std::promise. Для создания std::future из std::promise используется метод get_future().
std::future имеет следующие методы:
get()— блокирует поток до тех пор, пока значение не будет установлено в связанномstd::promiseи возвращает его в точку вызова;wait()— блокирует вызывающий поток и ожидает, пока значение будет установлено, но не возвращает его в точку вызова;wait_for(const chrono::duration& relative_time)— ожидает установки значения в течении указанного времени, по истечению возвращает:std::future_status::timeout— время ожидания истекло,std::future_status::ready— значение установлено до истечения времени ожидания;
wait_until(const chrono::time_point& absolute_time)— аналогичноwait_for, но с абсолютной временной точкой.
Примечание: после вызова метода get объект std::future становится не связанным. Его связанность следует проверять методом valid перед вызовами get, wait, wait_for, wait_until.
Для работы std::promise совместно с std::thread можно использовать лямбда функцию:
#include <iostream>
#include <future>
#include <thread>
int threadFunction()
{
// Имитация работы
std::this_thread::sleep_for(std::chrono::seconds(1));
return 42; // Ответ на главный вопрос жизни, вселенной и всего такого
}
int main()
{
// Создание std::promise
std::promise<int> threadPromise;
// Получение std::future из std::promise
std::future<int> threadFuture = threadPromise.get_future();
// Запуск потока, передача std::promise в поток
std::thread myThread([&threadPromise]() {
threadPromise.set_value(threadFunction()); // Установка значения, которое будет получено через std::future
});
// Получение результата работы потока (main засыпает)
int result = threadFuture.get();
std::cout << "Результат работы потока: " << result << std::endl;
// Дождемся завершения потока
myThread.join();
return 0;
}
Здесь в лямбда-функцию (при создании потока) в квадратных скобках передается ссылка на переменную threadPromise, которая использует метод set_value от значения, возвращаемого функцией потока (threadFunction).
Примечание: в примере вызывается myThread.join(), но это не обязательно, так как к моменту вызова threadFuture.get() поток уже завершит выполнение.
Конструкция получается «монструозной». Для упрощения работы используется функция std::async.
Асинхронные задачи
Как сказано ранее: получение значения, возвращаемого из потока является трудоемкой задачей. В C++ существует функция std::async, который представляет собой высокоуровневый способ запуска асинхронных задач. Он возвращает объект std::future, который можно использовать для получения результата задачи.
В конструктор класса передаются следующие аргументы:
- политика запуска — определяет как задача будет выполняться:
- std::launch::async — гарантирует, что задача будет выполнена асинхронно в отдельном потоке,
- std::launch::deferred — задача будет выполнена «лениво» (когда будет запрошен результат),
- не указание политики (использование значения по умолчанию) позволяет системе выбрать как выполнить задачу;
- вызываемая функция;
- передаваемые в функцию аргументы.
Пример использования std::async со стандартной политикой запуска и без аргументов для упрощения получения значения:
#include <iostream>
#include <future>
#include <thread>
int threadFunction()
{
// Имитация работы
std::this_thread::sleep_for(std::chrono::seconds(1));
return 42; // Ответ на главный вопрос жизни, вселенной и всего такого
}
int main()
{
// Запуск асинхронной задачи
std::future<int> asyncFuture = std::async(threadFunction);
// Получение результата работы задачи
int result = asyncFuture.get();
std::cout << "Результат работы асинхронной задачи: " << result << std::endl;
return 0;
}
По примеру можно отметить, что объем кода значительно сократился.
Важное замечание: стандарт гарантирует, что окончание выполнения потока, запущенного вызовом std::async, синхронизировано с вызовом получения результата объектом std::future или освобождением общего состояния — области памяти, ответственной за передачу результата. Рассмотрим пример с работой в отдельном потоке:
#include <future>
#include <iostream>
#include <thread>
int main(int argc, char* argv[])
{
std::async(std::launch::async, []() {
for(int i=0; i < 15; i++)
{
std::cout << "1\n";
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
});
std::async(std::launch::async, []() {
for(int i=0; i < 15; i++)
{
std::cout << "2\n";
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
});
return 0;
}
Тут функция std::async возвращает объект std::future, который за ненадобностью в точке вызова функции уничтожается — в следствии чего основной поток функции main блокируется до освобождения общего состояния. Пример наглядно показывает, что числа выводятся последовательно: сначала единицы, потом двойки. Для исправления результата запишем объект std::future в переменную:
#include <future>
#include <iostream>
#include <thread>
int main(int argc, char* argv[])
{
std::future<void> asyncFuture1 = std::async(std::launch::async, []() {
for(int i=0; i < 15; i++)
{
std::cout << "1\n";
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
});
auto asyncFuture2 = std::async(std::launch::async, []() {
for(int i=0; i < 15; i++)
{
std::cout << "2\n";
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
});
return 0;
}
В исправленном примере вывод изменился: числа выводятся в случайном порядке.
Использование POSIX для модификации потока
Метод native_handle может быть использован для инициализации переменной pthread_t и модификации потока средствами POSIX.
Например изменим приоритет выполнения:
#include <pthread.h>
#include <thread>
void threadFunction()
{
// Код потока
}
int main()
{
std::thread myThread(threadFunction);
int policy = SCHED_FIFO;
struct sched_param param;
param.sched_priority = sched_get_priority_max(policy);
pthread_t threadID = myThread.native_handle();
pthread_setschedparam(threadID, policy, ¶m); // SCHED_FIFO требует прав суперпользователя
myThread.join();
return 0;
}
Средства синхронизации
Мьютексы (Mutexes)
Мьютексы («mutual exclusion» — взаимное исключение) используются для обеспечения взаимного исключения, что позволяет предотвратить одновременный доступ к общим ресурсам в многопоточных приложениях. Они представляют собой блокировку, которая может находиться в одном из двух состояний: заблокированном и разблокированном.
Доступны с C++11. Требуется подключить заголовочный файл mutex
В C++ есть несколько типов мьютексов:
std::mutex— простейший мьютекс, который не рекомендуется для рекурсивного захвата одним и тем же потоком;std::recursive_mutex— позволяет одному потоку захватывать мьютекс несколько раз;std::timed_mutexиstd::recursive_timed_mutex— позволяют пытаться захватить мьютекс в течение указанного времени.
Пример использования простого мьютекса:
#include <iostream>
#include <mutex>
#include <thread>
// Глобальная переменная и мьютекс
int shared_data = 0;
std::mutex mtx;
// Функция потока
void thread_function()
{
// Блокировка мьютекса
mtx.lock();
// Критическая секция
shared_data++;
printf("Thread: shared_data = %d\n", shared_data);
// Разблокировка мьютекса
mtx.unlock();
}
int main()
{
std::thread threads[2];
// Создание потоков
for (int i = 0; i < 2; i++)
threads[i] = std::thread(thread_function);
// Ожидание завершения потоков
for (int i = 0; i < 2; i++)
threads[i].join();
return 0;
}
Условные переменные (Condition Variables)
Условные переменные используются для блокировки потока до тех пор, пока не наступит определенное условие. Они тесно связаны с мьютексами и часто используются для ожидания определенных условий выполнения задач. std::condition_variable предназначена для использования с std::unique_lock<std::mutex>.
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>
#include <unistd.h>
// Глобальные переменные
std::mutex mtx;
std::condition_variable cv;
bool ready = false; // Условие, на котором основано ожидание
// Функция потока, ожидающего события
void waiting_thread()
{
std::unique_lock<std::mutex> lck(mtx);
while (!ready)
{
printf("Ожидающий поток: ждем сигнала...\n");
cv.wait(lck);
}
printf("Ожидающий поток: получен сигнал.\n");
}
// Функция потока, посылающего сигнал
void signaling_thread()
{
sleep(1); // Имитация работы
std::unique_lock<std::mutex> lck(mtx);
ready = true;
cv.notify_all();
}
int main()
{
std::thread w_thread(waiting_thread);
std::thread s_thread(signaling_thread);
w_thread.join();
s_thread.join();
return 0;
}
Атомарные переменные
Атомарные переменные позволяют выполнять операции (чтение, запись, модификация) над переменными (в случае C++ над объектами) без использования традиционных средств синхронизации (например мьютексов) в многопоточных приложениях. Для достижения такого результата используются аппаратные инструкции (lock-free).
В C++ атомарные переменные реализованы через шаблонный класс std::atomic, который поддерживает: базовые типы данных, указатели и пользовательские типы. Со стандарта C++20 поддерживаются std::shared_ptr и std::weak_ptr.
Операции над атомарной переменной могут выполняться без блокировок на аппаратном уровне. Проверить отсутствие блокировок можно с помощью метода is_lock_free. Так же есть переменная is_always_lock_free, определяемая на этапе компиляции, но принимает true, если отсутствие блокировок не зависит от аппаратного уровня.
В отличии от POSIX стандарт C++ позволяет безопасно использовать operator= наравне с методом store для записи значения в атомарную переменную. Метод load используется для чтения значения из атомарной переменной, который может быть опущен в пользу неявного приведения. Пример:
std::atomic<int> a(10); // Инициализация значением 10
int value = a.load(); // Чтение
a.store(20); // Запись без возврата значения
a = 30; // Ещё один способ записи с возвратом копии значения
value = a; // Ещё один способ чтения
Есть три метода для сравнения и обмена:
exchange— атомарный способ значения атомарной переменной новым значением, возвращая предыдущее значение.compare_exchange_weak— атомарно сравнивает значение с ожидаемым и, если они равны, устанавливает новое значение, возвращая флаг успешности замены;compare_exchange_strong— аналогичноcompare_exchange_weak, но с гарантией отсутствия ложных отказов, возвращая флаг успешности замены.
Примечание: compare_exchange_weak может «ложно сбоить», когда ожидаемое значение совпадает со значением в атомарной переменной: при таком подходе «weak» версии могут быть более эффективны на некоторых платформах за счет меньших требований к синхронизации (например в циклах, где операции могут быть повторены без значительных накладных расходов).
Функция exchange принимает следующие аргументы:
- desired — новое значение;
- order — опциональный параметр, определяющий порядок памяти для операции (по умолчанию
std::memory_order_seq_cst).
Функции compare_exchange_ принимают следующие аргументы:
- expected — ожидаемое значение в атомарной переменной;
- desired — новое значение;
- order — опциональный параметр, определяющий порядок памяти для операции (по умолчанию
std::memory_order_seq_cst).
Порядок памяти для операций exchange может принимать следующие значения:
std::memory_order_relaxed— операции не вводят никаких ограничений на порядок выполнения относительно других операций (самый слабый порядок, который позволяет достигнуть максимальной производительности на мультипроцессорных системах, но не гарантирует порядок видимости изменений между потоками);std::memory_order_acquire— предотвращает все чтения и записи, выполненные в текущем потоке после операцииacquire, от перемещения до этой операции (гарантирует, что изменения, выполненные другими потоками до операцииrelease, будут видимы послеacquire);std::memory_order_release— предотвращает все чтения и записи, выполненные в текущем потоке до операцииrelease, от перемещения после этой операции (обеспечивает, что операции, выполненные доrelease, будут видимы в других потоках, выполняющих соответствующую операциюacquire);std::memory_order_acq_rel— сочетает в себе эффектыmemory_order_acquireиmemory_order_release(полезно для операций чтения-модификации-записи, таких какatomic_compare_exchange, где необходимо обеспечить видимость изменений до и после операции);std::memory_order_consume— редко используемая модель памяти, предназначенная для операций, зависящих от данных — ограничивает переупорядочивание операций так, чтобы чтение не могло быть выполнено до завершения операций, от которых оно зависит (часто заменяется наmemory_order_acquireиз-за сложности корректной реализации и поддержки компиляторами);std::memory_order_seq_cst— самый строгий порядок. Операции с этой моделью памяти выполняются с полным упорядочиванием относительно другихseq_cstопераций — все потоки видят операции в одном и том же порядке (обеспечивает наибольшие гарантии порядка, но может быть менее производительным по сравнению с более слабыми порядками).
В зависимости от типа данных, используемого в шаблоне доступны следующие методы:
- для целых и указателей (со стандарта C++11), а так же вещественных (со стандарта C++20):
fetch_add— атомарная сумма, возвращающая предыдущее значение,fetch_sub— атомарная разность, возвращающая предыдущее значение,operator+=— атомарная сумма, возвращающая результат суммы,operator-=— атомарная разность, возвращающая результат разности;
- для целых и указателей:
operator++— префиксный и суффиксный атомарный инкремент,operator--— префиксный и суффиксный атомарный декремент;
- только для целых:
fetch_and— атомарное побитовое И, возвращающее предыдущее значение,fetch_or— атомарное побитовое ИЛИ, возвращающее предыдущее значение,fetch_xor— атомарное побитовое исключающее ИЛИ, возвращающее предыдущее значение,operator&=— атомарное побитовое И, возвращающее результат операции,operator|=— атомарное побитовое ИЛИ, возвращающее результат операции,operator^=— атомарное побитовое исключающее ИЛИ, возвращающее результат операции.
Важное замечание: следующий код в различных вариациях не является атомарным:
atomicInt = atomicInt * 2; // и тому подобное
В стандарте C++20 введены дополнительные методы:
test— атомарно возвращает значение флага;wait— блокирует поток, пока значение, переданное в качестве аргумента) не поменяется;notify_one— уведомляет один поток об изменении значения;notify_all— уведомляет все потоки об изменении значения.
В большинстве случаев использование этих методов может быть более производительным, чем использование традиционных примитивов синхронизации, так как они используют низкоуровневые механизмы уведомления потоков.
Атомарные флаги (std::atomic_flag)
Атомарный флаг (со стандарта C++11), определенный классом std::atomic_flag, является простейшим примитивом синхронизации, который гарантирует отсутствие блокировок.
Данный класс имеет следующие методы:
clear— атомарно устанавливает флаг в состояниеfalse;- test_and_set — атомарно ставит флаг в состояние true и возвращает прошлое значение;
Также можно использовать методы атомарных переменных (C++20): test, wait, notify_one, notify_all.
Пример использования атомарных флагов в циклической блокировке (spinlock):
#include <iostream>
#include <atomic>
#include <thread>
#define N_THREADS 5
std::atomic_flag lock = ATOMIC_FLAG_INIT;
void simpleSpinLock(int id)
{
while (lock.test_and_set(std::memory_order_acquire))
{
// Блокировка занята, ждем
}
std::cout << "поток " << id << " захватил блокировку\n";
std::this_thread::sleep_for(std::chrono::milliseconds(1000)); // Симуляция работы
lock.clear(std::memory_order_release);
}
int main()
{
std::thread threads[N_THREADS];
for (int i = 0; i < N_THREADS; i++)
threads[i] = std::thread(simpleSpinLock, i);
for (auto& thread : threads)
thread.join();
return 0;
}
В данном примере потоки, которые ожидают блокировки тратят процессорное время, что в целом можно решить в добавлением ожиданий через sleep.
Использование атомарных флагов не ограничивается циклическими блокировками, например их можно использовать для установки флагов завершения.
Семафоры (Semaphores)
Семафоры были добавлены в стандарте C++20:
- бинарные семафоры (
std::binary_semaphore) — могут пропустить только один поток в критическую секцию; - семафор со счетчиком (
std::coutnting_semaphore) — позволяют пропускать в критическую секцию N потоков.
Основные методы семафоров:
acquire— захватывает семафор, уменьшая счетчик (если счетчик равен нулю — поток блокируется);release— освобождает семафор, увеличивая счетчик (если есть заблокированный поток — он просыпается);try_acquire— неблокирующая попытка захвата счетчика, возвращаетtrueв случае успешного захвата.
В отличии от мьютексов семафоры не привязаны к потокам выполнения — acquire и release могут быть вызваны в разных потоках.
Начальное значение передается параметром в конструкторе.
Пример использования бинарного семафора:
#include <iostream>
#include <thread>
#include <semaphore>
std::binary_semaphore sem(1); // Создаем бинарный семафор, инициализированный в состояние "открыт"
void worker(int id)
{
sem.acquire(); // Захватываем семафор (блокируем доступ к ресурсу)
std::cout << "Поток " << id << " работает\n";
std::this_thread::sleep_for(std::chrono::milliseconds(4000)); // Симуляция работы
std::cout << "Поток " << id << " закончил\n";
sem.release(); // Освобождаем семафор (разблокируем доступ к ресурсу)
}
int main()
{
std::thread t1(worker, 1);
std::thread t2(worker, 2);
t1.join();
t2.join();
return 0;
}
Бинарный семафор определен через:
using binary_semaphore = std::counting_semaphore<1>;
Семафор со счетчиком требует передачи конкретного числа LeastMaxValue в шаблоне, используемого в качестве «требуемого максимального значения счетчика семафора». По исходным файлам (github.com/gcc-mirror
a945c34) GNU C++ Compiller данное значение используется для static_assert проверок:
static_assert(__least_max_value >= 0);
static_assert(__least_max_value <= __semaphore_impl::_S_max);
Такая проверка позволяет на этапе компиляции проверить, может ли компилятор обеспечить работу семафора с «требуемым максимальным значением счетчика» (0 ≤ LeastMaxValue ≤ std::__semaphore_impl::_S_max).
В GNU C++ Compiller при увеличении текущего значения счетчика семафора методом release не проверяется на превышение верхней границы.
Важное замечание: ноль является допустимым значением LeastMaxValue и позволяет проигнорировать проверку по верхней границе счетчика.
Данная конструкция в объявлении класса:
template< std::ptrdiff_t LeastMaxValue = /* implementation-defined */ >
class counting_semaphore;
Пример использования семафора со счетчиком:
#include <iostream>
#include <sstream>
#include <thread>
#include <semaphore>
#define N_THREADS 5
std::counting_semaphore<3> sem(3); // Создаем счетный семафор с максимальным значением 3
void worker(int id)
{
sem.acquire(); // Захватываем семафор (уменьшаем счетчик)
std::cout << (std::stringstream() << "Поток " << id << " работает\n").str();
std::this_thread::sleep_for(std::chrono::milliseconds(4000)); // Симуляция работы
std::cout << (std::stringstream() << "Поток " << id << " закончил\n").str();
sem.release(); // Освобождаем семафор (увеличиваем счетчик)
}
int main()
{
std::thread threads[N_THREADS];
for (int i = 0; i < N_THREADS; i++)
threads[i] = std::thread(worker, i);
for (auto& t : threads)
t.join();
return 0;
}
В GNU C++ Compiller имеется две реализации семафоров (libstdc++/include/bits/semaphore_base.h a945c34):
__platform_semaphore— использует POSIX семафоры;__atomic_semaphore— использует атомарные функции для работы со счетчиком.
Можно указать необходимость принудительного использования POSIX семафоров с помощью установки флага компиляции _GLIBCXX_USE_POSIX_SEMAPHORE. Выбор реализации осуществляется следующим образом:
// Note: the _GLIBCXX_USE_POSIX_SEMAPHORE macro can be used to force the
// use of Posix semaphores (sem_t). Doing so however, alters the ABI.
#if defined __glibcxx_atomic_wait && !_GLIBCXX_USE_POSIX_SEMAPHORE
using __semaphore_impl = __atomic_semaphore;
#elif _GLIBCXX_HAVE_POSIX_SEMAPHORE
using __semaphore_impl = __platform_semaphore;
#endif
Барьеры (Barriers)
Барьер — это примитив синхронизации, который позволяет потоку или группе потоков ожидать оставшиеся потоки до перехода к следующей фазе выполнения. Барьеры представлены классом std::barrier со стандарта C++20, который содержит в себе счетчик. Барьер может быть использован множество раз (подходит для использования в цикле).
Конструктор класса принимает количество потоков и функцию (необязательный параметр), которая будет вызвана, когда нужное число потоков дойдет до барьера.
Важное замечание: потоки будут ждать завершения функции, вызванной при заполнении барьера.
Данный класс имеет следующие методы:
arrive— уменьшает счетчик, но не блокирует поток, возвращает токен, который передается вwait;wait— блокирует поток пока остальные не достигнут барьера, принимает токен из методаarriveчерезstd::move;arrive_and_wait— уменьшает счетчик и блокирует поток пока остальные не достигнут барьера;arrive_and_drop— уменьшает счетчик и исключает поток из синхронизации (уменьшает общее количество ожидаемых потоков на барьере для следующих использований).
После того, как потоки проходят через барьер, значение счетчика восстанавливается на начальное, что позволяет использовать барьер множество раз.
Важное замечание: метод arrive_and_drop уменьшает число ожидаемых на барьере потоков и если в следующий раз поток зайдет на вызов arrive_and_wait, то это приведет к выдаче исключения Floating point exception. В такой ситуации требуется дополнительно проверить, что поток больше не будет взаимодействовать с барьером.
Пример использования раздельного вызова методов arrive и wait:
auto token = sync_point.arrive(); // Сообщаем о прибытии потока к барьеру и получаем токен прибытия
// ...
sync_point.wait(std::move(token)); // Ожидаем, пока все потоки не достигнут барьера, используя токен
Пример использования двух барьеров (с функцией и без неё):
#include <iostream>
#include <sstream>
#include <barrier>
#include <thread>
#define N_THREADS 5
void sync_function()
{
std::cout << "Потоки пришли на барьер\n";
}
std::barrier sync_point1(N_THREADS); // Синхронизируем потоки
std::barrier sync_point2(N_THREADS, sync_function); // Синхронизируем потоки c функцией
void worker(int id)
{
std::cout << (std::stringstream() << "Поток " << id << " стартовал\n").str();
std::this_thread::sleep_for(std::chrono::milliseconds(400 * id)); // Имитация работы продолжительностью в соответствии с идентификатором потока
std::cout << (std::stringstream() << "Поток " << id << " достиг 1 барьера\n").str();
sync_point1.arrive_and_wait();
std::cout << (std::stringstream() << "Поток " << id << " прошел 1 барьер\n").str();
std::this_thread::sleep_for(std::chrono::milliseconds(400 * (id+1))); // Имитация работы продолжительностью в соответствии с идентификатором потока
std::cout << (std::stringstream() << "Поток " << id << " достиг 2 барьера\n").str();
sync_point2.arrive_and_wait();
std::cout << (std::stringstream() << "Поток " << id << " прошел 2 барьер\n").str();
}
int main()
{
std::thread threads[N_THREADS];
for (int i = 0; i < N_THREADS; i++)
threads[i] = std::thread(worker, i);
for (auto &thread : threads)
thread.join();
return 0;
}
Латч (Latch)
Латч — это примитив синхронизации, который позволяет потоку или группе потоков ожидать оставшиеся потоки до перехода к следующей фазе выполнения. Латчи представлены классом std::latch со стандарта C++20, который содержит в себе одноразовый счетчик — собственно в этом его главное отличие от барьеров.
Примечание: latch можно перевести на русский как защелка, задвижка или щеколда, но автор позволит себе использовать англицизм.
Конструктор данного класса принимает начальное значение счетчика.
Данный класс имеет следующие методы:
count_down— уменьшает счетчик без блокировки потока;try_wait— проверяет достижение счетчиком нуля без блокировки;wait— блокирует поток, пока счетчик не достигнет нуля;arrive_and_wait— уменьшает счетчик и блокирует поток, пока счетчик не достигнет нуля.
Пример использования методов count_down и wait:
latch.count_down(); // Уменьшаем счетчик
// ...
latch.wait(); // Ожидаем, пока все потоки не достигнут латча
Пример использования латча в многопоточном приложении с методом arrive_and_wait:
#include <iostream>
#include <sstream>
#include <thread>
#include <latch>
#define N_THREADS 5
std::latch latch(N_THREADS); // Создаем латч, ожидающий 5 участников
void worker(int id)
{
std::cout << (std::stringstream() << "Поток " << id << " начал работу\n").str();
std::this_thread::sleep_for(std::chrono::milliseconds(400 * (id + 1))); // Симуляция работы
std::cout << (std::stringstream() << "Поток " << id << " закончил свою работу и ждет остальных\n").str();
latch.arrive_and_wait(); // Уменьшаем счетчик и ждем остальных
std::cout << (std::stringstream() << "Поток " << id << " вышел из латча\n").str();
}
int main()
{
std::thread threads[N_THREADS];
for (int i = 0; i < N_THREADS; i++)
threads[i] = std::thread(worker, i);
for (auto &t : threads)
t.join();
return 0;
}
Важное замечание: повторное использование отработанного latch может привести к взаимной блокировке (deadlock).