Рубрики
Многопоточность Программирование

Введение в многопоточность: C++

Работа с многопоточностью в языке C++ с использованием стандартной библиотеки шаблонов (STL)

С появлением стандарта C++11, библиотека STL была расширена, включив в себя прямую поддержку многопоточности через модуль в заголовочном файле <thread>. До стандарта C++11 приходилось использовать POSIX-потоки, а новый подход решал проблемы с портируемостью.

Важное примечание: данная заметка опирается на знания двух предыдущих — теории и POSIX. Ознакомление с ними желательно.

Принудительное завершение потоков в C++11 и более поздних версиях стандарта не поддерживается напрямую из-за потенциальных проблем с безопасностью и надежностью. Вместо этого необходимо использовать другие механизмы для безопасного завершения потоков, такие как флаги завершения, условные переменные или другие средства синхронизации. Задание атрибутов, аналогично POSIX, так же не поддерживается в целях повышения портируемости кода.

Важное замечание: стандарт C++11 гарантирует, что std::cout выведет строку атомарно, но не гарантирует, что несколько последовательных операторов << не будут в состоянии гонки. Для простоты в примерах строка заранее будет сформирована в std::sstream (string stream), который уже в свою очередь будет передаваться в std::cout, который выведет результат атомарно. Можно использовать средства синхронизации потоков, либо собрать строку на вывод заранее. Рассмотрим примеры:

#include <iostream>
#include <sstream>

// 1
std::cout << "Первая часть строки " << value << " Вторая часть строки\n"; // Будет разована по оператору << в случае состояния гонки потоков

// 2
std::stringstream ss; 
ss << "Первая часть строки " << value << " Вторая часть строки\n";
std::cout << ss; // Выведет строку атормано без разрывов, так как операторы << заранее посчитаны для переменной ss

// 3
std::cout << (std::stringstream() << "Первая часть строки " << value << " Вторая часть строки\n").str(); // Выведет строку атормано без разрывов, так как операторы << заранее посчитаны для объекта, создаваемого конструктором stringstream

В заметке будет использоваться именно третий пример, но важно использовать в качестве конца строки '\n' или "...\n", а не std::endl, который вызывает ошибку при сборке.

Содержание заметки:

Создание потоков

В языке C++ работа с потоком осуществляется с помощью класса std::thread, который может порождать поток с точкой входа в:

  • обычных функциях;
  • лямбда-функциях;
  • методах классов — требует передачи объекта класса в качестве первого аргумента потока;
  • статических методах классов;
  • функторах.

Если в конструктор не передать точку входа, то поток создан не будет, а лишь пустой объект, который может быть позднее инициализирован.

Кроме того, данный класс позволяет передавать любое число параметров в точку входа потока, что сильно упрощает взаимодействие с потоком по сравнению с POSIX подходом.

Примечание: для обеспечения безопасной работы с выделяемыми ресурсами следует использовать умные указатели.

Вызов метода join() блокирует вызывающий поток до тех пор, пока поток этого объекта не выполнит свою работу. При попытке вызвать функцию для не созданного или отсоединенного потока будет выдано исключение (exception). Для возможности проверки необходимо использовать метод joinable().

Важное замечание: вызов метода join() не возвращает значение, возвращаемое функцией потока. Подробнее о возвращаемых значениях в следующем разделе.

Пример создания потока с точкой входа в обычной функции:

#include <thread>
#include <iostream>
 
void threadFunction1()
{
    std::cout << "Поток 1 с обычной функцией без аргументов\n";
}

void threadFunction2(int i, double d)
{
    std::cout << "Поток 2 с обычной функцией с аргументами: " << i << ' ' << d << '\n';
}

int main()
{
    std::thread myThread1(threadFunction1);
    myThread1.join();

    std::thread myThread2(threadFunction2, 1, 2.34);
    myThread2.join();

    return 0;
}

Важное замечание: в данном примере и далее потоки создаются после завершения предыдущих во избежание гонки за поток вывода.

Пример использования lambda-функции при создании потока с различными подходами объявления:

#include <thread>
#include <iostream>

int main() 
{
    // Лямбда функции
    auto myLambdaFunction1 = []() {
        std::cout << "Поток 1 с лямбда-функцией без аргументов\n";
    };
    auto myLambdaFunction2 = [](int i, double d) {
        std::cout << "Поток 2 с лямбда-функцией с аргументами: " << i << ' ' << d << '\n';
    };

    // Создаем потоки, выполняющие лямбда-функции
    std::thread myThread1(myLambdaFunction1);
    myThread1.join();

    std::thread myThread2(myLambdaFunction2, 1, 2.34);
    myThread2.join();

    std::thread myThread3([]() {
        std::cout << "Поток 3 с лямбда-функцией без аргументов\n";
    }); // myThread3
    myThread3.join();

    std::thread myThread4([](int i, double d) { // Лямбда-функция с параметрами
        std::cout << "Поток 4 с лямбда-функцией с аргументами: " << i << ' ' << d << '\n';
    }, 
    1, 2.34); // myThread4
    myThread4.join();

    return 0;
}

Важное замечание: лямбда функции не особо подходят для потоков, но все же могут быть использованы в качестве точки входа нового потока.

Пример использования методов классов при создании потока:

#include <thread>
#include <iostream>

class MyClass
{
public:
    void instanceMethod1() 
    {
        std::cout << "Поток 1 с методом объекта без доп. аргументов\n";
    }

    void instanceMethod2(int i, double d)
    {
        std::cout << "Поток 2 с методом объекта с доп. аргументами: " << i << ' ' << d << '\n';
    }

    static void staticMethod1()
    {
        std::cout << "Поток 3 со статическим методом без аргументов\n";
    }

    static void staticMethod2(int i, double d)
    {
        std::cout << "Поток 4 со статическим методом с аргументами: " << i << ' ' << d << '\n';
    }
};

int main()
{
    MyClass myObject; // Объект нашего класса
    
    // Создание потоков
    std::thread thread1(&MyClass::instanceMethod1, &myObject);
    thread1.join();
    std::thread thread2(&MyClass::instanceMethod2, &myObject, 1, 2.34);
    thread2.join();
    std::thread thread3(&MyClass::staticMethod1);
    thread3.join();
    std::thread thread4(&MyClass::staticMethod2, 1, 2.34);
    thread4.join();
    
    return 0;
}

Важное замечание: если не передавать адрес объекта, а сам объект класса при создании потока, то будет использована копия этого объекта.

Если в аргументах потока необходимо передать объект по ссылке — требуется использовать std::ref и std::cref. Пример:

#include <thread>
#include <iostream>

class MyClass
{
public:
    void instanceMethod1(int& i, double& d)
    {
        std::cout << "Поток с методом объекта (" << this << ") с доп. аргументами: " << i << " (" << &i << ") и " << d << "(" << &d << ")\n";
    }

    void instanceMethod2(int i, double d)
    {
        std::cout << "Поток с методом объекта (" << this << ") с доп. аргументами: " << i << " (" << &i << ") и " << d << "(" << &d << ")\n";
    }
};

int main()
{
    MyClass myObject; // Объект нашего класса
    int i = 1;
    double d = 2.34;
    // Вывод адресов
    std::cout << "myObject (" << &myObject << ") i (" << &i << ") d (" << &d << ")\n";
    // Создание потоков
    std::thread thread1(&MyClass::instanceMethod1, &myObject, std::ref(i), std::ref(d)); // адрес myObject и ссылки на i, d
    thread1.join();
    std::thread thread2(&MyClass::instanceMethod1, std::ref(myObject), std::ref(i), std::ref(d)); // ссылка myObject и ссылки на i, d
    thread2.join();
    std::thread thread3(&MyClass::instanceMethod2, myObject, i, d); // копия myObject и копии i, d
    thread3.join();
    
    return 0;
}

Примечание: компилятор не позволит передать объекты без std::ref и std::cref, если функция/метод ожидают ссылку или константную ссылку в качестве аргумента

Функторы — классы с перегруженным operator(), объекты которых могут быть использованы как обычные функции. Пример использования функторов в качестве точки входа потока:

#include <thread>
#include <iostream>

class MyClass
{
public:
    void operator()()
    {
        std::cout << "Поток с функтором (" << this << ") - без аргументов\n";
    }

    void operator()(int i, double d)
    {
        std::cout << "Поток  с функтором (" << this << ") с аргументами: " << i << ' ' << d << '\n';
    }
};

int main()
{
    MyClass myObject; // Объект-функтор
    // Адрес объекта-функтора
    std::cout << "myObject (" << &myObject << ")\n";
    
    // Создание потоков
    std::thread thread1(std::ref(myObject)); // ссылка на объект-функтор
    thread1.join();
    std::thread thread2(myObject); // копия объекта-функтора
    thread2.join();
    std::thread thread3(std::ref(myObject), 1, 2.3); // ссылка на объект-функтор с аргументами
    thread3.join();
    
    return 0;
}

Метод detach() позволяет отсоединить поток от объекта std::thread, позволяя потоку продолжить выполнение независимо. После вызова этого метода объект std::thread может быть использован для другого потока выполнения.

Метод swap(thread& other) позволяет обменять потоки между двумя объектами std::thread. Данный метод может быть использован например алгоритмами сортировки в std::vector.

Метод get_id возвращает уникальный идентификатор потока выполнения, ассоциированного с объектом для которого вызывается данный метод.

Метод native_handle может быть использован для инициализации платформозависимой переменной для работы с потоками (например pthread_t в POSIX).

Так же имеется статический метод hardware_concurrency, который возвращает количество потоков выполнения, которые могут быть эффективно запущены параллельно.

Получение значения из потока

Как было сказано ранее: метод join не возвращает значения из функции. Для реализации такой функциональности используется механизм promise-future, представленный классами std::promise и std::future. Эти инструменты предоставляют механизм для обмена данными между потоками, где один поток обещает предоставить значение (promise), а другой поток ожидает получения этого значения (future).

Примечание: для работы с std::future необходимо подключить заголовочный файл <future>

Внимание: далее немного сложно, но надеюсь пример расставит все по своим местам.

std::promise позволяет потоку обещать предоставление некоторого значения в будущем. Он предоставляет механизм для установки значения или исключения, которое может быть получено в другом потоке с помощью std::future. Обычно std::promise используется в потоке-производителе для установки результата выполнения операции. Для задания значения используется метод set_value. Если в процессе выполнения операции возникает исключение, его можно установить с помощью метода set_exception.

Примечание: если значение или исключение уже установлено, то это приведет к вызову исключения std::future_error.

std::future представляет собой объект, который будет получать значение или исключение из std::promise. Он используется в потоке-потребителе для ожидания результата, который должен быть установлен в соответствующем std::promise. Для создания std::future из std::promise используется метод get_future().

std::future имеет следующие методы:

  • get() — блокирует поток до тех пор, пока значение не будет установлено в связанном std::promise и возвращает его в точку вызова;
  • wait() — блокирует вызывающий поток и ожидает, пока значение будет установлено, но не возвращает его в точку вызова;
  • wait_for(const chrono::duration& relative_time) — ожидает установки значения в течении указанного времени, по истечению возвращает:
    • std::future_status::timeout — время ожидания истекло,
    • std::future_status::ready — значение установлено до истечения времени ожидания;
  • wait_until(const chrono::time_point& absolute_time) — аналогично wait_for, но с абсолютной временной точкой.

Примечание: после вызова метода get объект std::future становится не связанным. Его связанность следует проверять методом valid перед вызовами get, wait, wait_for, wait_until.

Для работы std::promise совместно с std::thread можно использовать лямбда функцию:

#include <iostream>
#include <future>
#include <thread>

int threadFunction() 
{
    // Имитация работы
    std::this_thread::sleep_for(std::chrono::seconds(1));
    return 42; // Ответ на главный вопрос жизни, вселенной и всего такого
}

int main() 
{
    // Создание std::promise
    std::promise<int> threadPromise;

    // Получение std::future из std::promise
    std::future<int> threadFuture = threadPromise.get_future();

    // Запуск потока, передача std::promise в поток
    std::thread myThread([&threadPromise]() {
        threadPromise.set_value(threadFunction()); // Установка значения, которое будет получено через std::future
    });

    // Здесь можно проводить вычисления, пока запущенный поток работает

    // Получение результата работы потока (main засыпает)
    int result = threadFuture.get();
    std::cout << "Результат работы потока: " << result << std::endl;

    // Дождемся завершения потока 
    myThread.join();
    return 0;
}

Здесь в лямбда-функцию (при создании потока) в квадратных скобках передается ссылка на переменную threadPromise, которая использует метод set_value от значения, возвращаемого функцией потока (threadFunction).

Примечание: в примере вызывается myThread.join(), но это не обязательно, так как к моменту вызова threadFuture.get() поток уже завершит выполнение.

Конструкция получается «монструозной». Для упрощения работы используется функция std::async.

Асинхронные задачи

Как сказано ранее: получение значения, возвращаемого из потока является трудоемкой задачей. В C++ существует функция std::async, который представляет собой высокоуровневый способ запуска асинхронных задач. Он возвращает объект std::future, который можно использовать для получения результата задачи.

В конструктор класса передаются следующие аргументы:

  • политика запуска — определяет как задача будет выполняться:
    • std::launch::async — гарантирует, что задача будет выполнена асинхронно в отдельном потоке,
    • std::launch::deferred — задача будет выполнена «лениво» (когда будет запрошен результат),
    • не указание политики (использование значения по умолчанию) позволяет системе выбрать как выполнить задачу;
  • вызываемая функция;
  • передаваемые в функцию аргументы.

Пример использования std::async со стандартной политикой запуска и без аргументов для упрощения получения значения:

#include <iostream>
#include <future>
#include <thread>

int threadFunction() 
{
    // Имитация работы
    std::this_thread::sleep_for(std::chrono::seconds(1));
    return 42; // Ответ на главный вопрос жизни, вселенной и всего такого
}

int main() 
{
    // Запуск асинхронной задачи
    std::future<int> asyncFuture = std::async(threadFunction);

    // Здесь можно проводить вычисления, пока запущенный поток работает

    // Получение результата работы задачи
    int result = asyncFuture.get();
    std::cout << "Результат работы асинхронной задачи: " << result << std::endl;
    
    return 0;
}

По примеру можно отметить, что объем кода значительно сократился.

Важное замечание: стандарт гарантирует, что окончание выполнения потока, запущенного вызовом std::async, синхронизировано с вызовом получения результата объектом std::future или освобождением общего состояния — области памяти, ответственной за передачу результата. Рассмотрим пример с работой в отдельном потоке:

#include <future>
#include <iostream>
#include <thread>

int main(int argc, char* argv[]) 
{
    std::async(std::launch::async, []() {
        for(int i=0; i < 15; i++) 
        {
            std::cout << "1\n";
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
        }
    });

    std::async(std::launch::async, []() {
        for(int i=0; i < 15; i++) 
        {
            std::cout << "2\n";
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
        }
    });

    return 0;
}

Тут функция std::async возвращает объект std::future, который за ненадобностью в точке вызова функции уничтожается — в следствии чего основной поток функции main блокируется до освобождения общего состояния. Пример наглядно показывает, что числа выводятся последовательно: сначала единицы, потом двойки. Для исправления результата запишем объект std::future в переменную:

#include <future>
#include <iostream>
#include <thread>

int main(int argc, char* argv[]) 
{
    // Явное указание типа возвращаемого объекта
    std::future<void> asyncFuture1 = std::async(std::launch::async, []() {
        for(int i=0; i < 15; i++) 
        {
            std::cout << "1\n";
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
        }
    });
    
    // Автоматический тип возвращаемого объекта
    auto asyncFuture2 = std::async(std::launch::async, []() {
        for(int i=0; i < 15; i++) 
        {
            std::cout << "2\n";
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
        }
    });

    return 0;
}

В исправленном примере вывод изменился: числа выводятся в случайном порядке.

Использование POSIX для модификации потока

Метод native_handle может быть использован для инициализации переменной pthread_t и модификации потока средствами POSIX.

Например изменим приоритет выполнения:

#include <pthread.h>
#include <thread>

void threadFunction() 
{
    // Код потока
}

int main() 
{
    std::thread myThread(threadFunction);

    int policy = SCHED_FIFO;
    struct sched_param param;
    param.sched_priority = sched_get_priority_max(policy);

    pthread_t threadID = myThread.native_handle();
    pthread_setschedparam(threadID, policy, &param); // SCHED_FIFO требует прав суперпользователя

    myThread.join();
    return 0;
}

Важное замечание: для данной программы требуется запускать программу с правами суперпользователя.

Средства синхронизации

Мьютексы (Mutexes)

Мьютексы («mutual exclusion» — взаимное исключение) используются для обеспечения взаимного исключения, что позволяет предотвратить одновременный доступ к общим ресурсам в многопоточных приложениях. Они представляют собой блокировку, которая может находиться в одном из двух состояний: заблокированном и разблокированном.

Доступны с C++11. Требуется подключить заголовочный файл <mutex>

В C++ есть несколько типов мьютексов:

  • std::mutex — простейший мьютекс, который не рекомендуется для рекурсивного захвата одним и тем же потоком;
  • std::recursive_mutex — позволяет одному потоку захватывать мьютекс несколько раз;
  • std::timed_mutex и std::recursive_timed_mutex — позволяют пытаться захватить мьютекс в течение указанного времени.

Пример использования простого мьютекса:

#include <iostream>
#include <mutex>
#include <thread>

// Глобальная переменная и мьютекс
int shared_data = 0;
std::mutex mtx;

// Функция потока
void thread_function() 
{
    // Блокировка мьютекса
    mtx.lock();
    
    // Критическая секция
    shared_data++;
    std::cout << "Поток: shared_data = " << shared_data << '\n'; // Тут не будет состояния гонки в вычислении операторов<< для std::cout
    
    // Разблокировка мьютекса
    mtx.unlock();
}

int main() 
{
    std::thread threads[2];

    // Создание потоков
    for (int i = 0; i < 2; i++) 
        threads[i] = std::thread(thread_function);

    // Ожидание завершения потоков
    for (int i = 0; i < 2; i++) 
        threads[i].join();

    return 0;
}

Пример использования мьютекса с блокировкой по времени:

std::timed_mutex tmtx;

// Функция потока
void thread_function() 
{
    if (tmtx.try_lock_for(std::chrono::seconds(10))) 
    { 
        // Блокировка на 10 секунд в течении которых мьютекс стал доступен
        // Критическая секция
        tmtx.unlock(); 
    } 
    else 
    {
        // Мьютекс не был захвачен в течение 1 секунды
    }
}

Не рекомендуется использовать класс std::mutex напрямую, так как на плечи разработчика ложится задача ручного освобождения мьютекса (в том числе при выдаче исключений). Существует четыре шаблонных класса-обертки над мьютексами, позволяющие оптимизировать работу с ними — мьютекс занят, пока существует объект класса-обертки:

  • std::lock_guard — захватывает мьютекс при создании объекта и автоматически освобождает его при уничтожении объекта (не предоставляет явного способа освобождения мьютекса или повторного его захвата);
  • std::unique_lock — позволяет не только автоматически захватывать и освобождать мьютекс, но и предоставляет возможность для ручного управления мьютексом (используется условными переменными);
  • std::shared_lock (со стандарта C++14) — позволяет множеству потоков безопасно и одновременно читать данные, защищенные мьютексом, при этом гарантируя исключительный доступ для потока, выполняющего запись;
  • std::scoped_lock (со стандарта C++17) — предназначен для замены std::lock_guard и std::unique_lock при необходимости одновременного захвата нескольких мьютексов — автоматически и безопасно управляет множеством мьютексов, предотвращая взаимные блокировки и упрощая синхронизацию в сложных многопоточных приложениях.

Рассмотрим данные виды оберток над мьютексами подробнее.

std::lock_guard является одной из наиболее простых и безопасных оберток — захватывает мьютекс пока существует объект, что может быть удобно как для работы с условными операторами, так и для безопасного освобождения мьютекса при выходе из функции потока. Пример использования std::lock_guard:

#include <iostream>
#include <thread>
#include <mutex>

#define N_THREADS 10

std::mutex mtx;

void print_id(int id) 
{
    // Небезопасная секция
    // ...
    // Блок кода после lock_guard автоматически защищен мьютексом
    std::lock_guard<std::mutex> lock(mtx);
    std::cout << "Поток " << id << " работает в безопасной секции\n"; // Тут не будет состояния гонки в вычислении операторов<< для std::cout
    std::this_thread::sleep_for(std::chrono::seconds(1)); // Задержка во времени
    // Мьютекс автоматически освободится при выходе из блока (в данном случае функции)
}

int main() 
{
    std::thread threads[N_THREADS];

    // Создание потоков
    for (int i = 0; i < N_THREADS; i++) 
        threads[i] = std::thread(print_id, i);

    // Ожидание завершения потоков
    for (int i = 0; i < N_THREADS; i++) 
        threads[i].join();

    return 0;
}

Важное замечание: std::lock_guard занимает мьютекс только после создания объекта, а до этого момента блок кода является не потокобезопасным.

std::unique_lock позволяет использовать методы для явного управления мьютексом. Конструктор класса принимает в качестве аргументов:

  • ссылку на мьютекс (если ссылка не указана, то позднее требуется создать новый объект во избежание исключений);
  • параметр захвата мьютекса:
    • таймаут захвата — при создании объекта происходит попытка блокировки мьютекса в течении заданного времени, если мьютекс не становится доступным в течении данного времени, то поток не блокируется,
    • попытка захвата мьютекса — при создании объекта происходит попытка захвата мьютекса, если тот свободен — происходит его захват,
    • отложенный захват мьютекса — при создании объекта мьютекс не будет захвачен (до момента вызова методов lock или try_lock).

Проверить захват мьютекса можно с помощью метода owns_lock. Так же у класса перегружен operator bool, что позволяет использовать объект в условных конструкциях без вызова метода owns_lock.

Объекты std::unique_lock являются перемещаемыми. Владение мьютексом может передаваться между экземплярами std::unique_lock путем перемещения.

Объекты std::unique_lock являются перемещаемыми. Владение мьютексом может передаваться между экземплярами std::unique_lock путем перемещения. Для перемещения необходимо использовать функцию: std::move

Пример различных конструкторов:

std::unique_lock<std::mutex> ul1; // объект без управления мьютексом - вызовет ошибку в случае вызова ul1.lock();

std::unique_lock<std::mutex> ul2(mutex2); // объект с захватом мьютекса, когда он станет доступен (поток спит до этого момента)

std::unique_lock<std::mutex> ul3(mutex3, std::chrono::milliseconds(300)); // объект пытается захватить мьютекс на протяжении 300мс
if (ul3) // проверка блокировки без вызова метода owns_lock
{ ... }

std::unique_lock<std::mutex> ul4(mutex4, std::try_to_lock); // объект пытается захватить мьютекс, если он занят - продолжает без его блокировки
if (ul4.owns_lock()) // проверка блокировки с вызовом метода owns_lock
{ ... }

std::unique_lock<std::mutex> ul5(mutex5, std::defer_lock); // объект не пытается захватить мьютекс 

Пример использования std::unique_lock в потоках:

#include <iostream>
#include <sstream>
#include <mutex>
#include <thread>

#define N_THREADS 10

std::mutex mtx; // Глобальный мьютекс

void perform_task(int id) 
{
    std::unique_lock<std::mutex> ul(mtx, std::defer_lock); // Создаем объект lock, но не захватываем мьютекс сразу

    // Некоторые операции, которые не требуют защиты мьютексом
    std::cout << (std::stringstream() << "Поток " << id << " работает без блокировки мьютекса\n").str();

    ul.lock(); // Явно захватываем мьютекс
    std::cout << (std::stringstream() << "Поток " << id << " захватил мьютекс\n").str();
    
    // Выполнение критической секции, требующей защиты мьютексом
    std::this_thread::sleep_for(std::chrono::seconds(1)); // Имитация длительной операции

    std::cout << (std::stringstream() << "Поток " << id << " освобождает мьютекс\n").str();
    ul.unlock(); // Явное освобождение мьютекса

    // Продолжение работы после критической секции без мьютекса
    std::cout << (std::stringstream() << "Поток " << id << " заканичивает работу\n").str();
}

int main() 
{
    std::thread threads[N_THREADS];

    // Создание потоков
    for (int i = 0; i < N_THREADS; i++) 
        threads[i] = std::thread(perform_task, i);

    // Ожидание завершения потоков
    for (int i = 0; i < N_THREADS; i++) 
        threads[i].join();

    return 0;
}

std::shared_lock обычно используется в совокупности с std::shared_mutex (со стандарта C++17), что в свою очередь требует учета стандарта C++17. Данная конструкция необходима для секций чтения критических данных (читать можно нескольким потокам), а для секции записи критических данных (писать можно только одному потоку): std::unique_lock или std::lock_guard.

Пример использования std::shared_lock и std::unique_lock с std::shared_mutex в классе потокобезопасного счетчика:

#include <iostream>
#include <sstream>
#include <mutex>
#include <shared_mutex>
#include <thread>

#define N_THREADS 5

class ThreadSafeCounter 
{
    public:
        // Увеличивает значение счетчика
        int increment() 
        {
            std::unique_lock lock(mtx);
            return ++value;
        }

        // Получает текущее значение счетчика
        int get() const 
        {
            std::shared_lock lock(mtx);
            return value;
        }

    private:
        mutable std::shared_mutex mtx;
        int value = 0;
};

void reader(ThreadSafeCounter& counter, int id)
{
    int value;
    for (int i = 0; i < 5; i++) 
    {
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        value = counter.get();
        std::cout << (std::stringstream() << "Текущее значение в потоке " << id << ": " << value << '\n').str();
    }
}

void writer(ThreadSafeCounter& counter) 
{
    int value;
    for (int i = 0; i < 5; i++) 
    {
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        value = counter.increment();
        std::cout << (std::stringstream() << "Икремент изменил значение на: " << value  << '\n').str();
    }
}

int main() 
{
    ThreadSafeCounter counter;
    std::thread threads[N_THREADS];

    // Запускаем несколько потоков-чтецов
    for (int i = 0; i < N_THREADS; i++) 
        threads[i] = std::thread(reader, std::ref(counter), i);

    // Запускаем поток-записыватель
    std::thread threadW(writer, std::ref(counter));

    // Ждем завершения всех потоков
    for (auto& t : threads) 
        t.join();

    threadW.join();

    return 0;
}

Важное замечание: для использования мьютексов в методах, квалифицированных как const, требуется добавить ключевое слово mutable, что позволит const-методам блокировать мьютекс.

std::scoped_lock (со стандарта C++17) используется в ситуации, когда требуется одновременный захват нескольких мьютексов, предотвращая взаимный блокировки (deadlock).

Пример использования std::scoped_lock для одновременного захвата двух мьютексов:

#include <iostream>
#include <thread>
#include <mutex>

#define N_THREADS 5

std::mutex mtx1;
std::mutex mtx2;

void threadFunction(int id) 
{
    for (int i = 0; i < 5; i++) 
    {
        {
            std::scoped_lock lock(mtx1, mtx2);  // Одновременное захватывание нескольких мьютексов
            std::cout << "Поток " << id << ": итерация " << i << std::endl;
        }
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
}

int main() 
{
    std::thread threads[N_THREADS];

    for (int i = 0; i < N_THREADS; i++) 
        threads[i] = std::thread(threadFunction, i);
    
    for (auto& thread : threads) 
        thread.join();

    return 0;
}

Условные переменные (Condition Variables)

Условные переменные используются для блокировки потока до тех пор, пока не наступит определенное условие. Они тесно связаны с мьютексами и часто используются для ожидания определенных условий выполнения задач. std::condition_variable предназначена для использования с std::unique_lock<std::mutex>.

#include <iostream>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <unistd.h>

// Глобальные переменные
std::mutex mtx;
std::condition_variable cv;
bool ready = false; // Условие, на котором основано ожидание

// Функция потока, ожидающего события
void waiting_thread() 
{
    std::unique_lock<std::mutex> lck(mtx);
    while (!ready) 
    {
        std::cout << "Ожидающий поток: ждем сигнала...\n";
        cv.wait(lck);
    }
    std::cout << "Ожидающий поток: получен сигнал.\n";
}

// Функция потока, посылающего сигнал
void signaling_thread() 
{
    sleep(1); // Имитация работы
    std::unique_lock<std::mutex> lck(mtx);
    ready = true;
    std::cout << "Сигнализирующий поток: выдаем сигнал.\n";
    cv.notify_all();
}

int main() 
{
    std::thread w_thread(waiting_thread);
    std::thread s_thread(signaling_thread);

    w_thread.join();
    s_thread.join();

    return 0;
}

Атомарные переменные

Атомарные переменные позволяют выполнять операции (чтение, запись, модификация) над переменными (в случае C++ над объектами) без использования традиционных средств синхронизации (например мьютексов) в многопоточных приложениях. Для достижения такого результата используются аппаратные инструкции (lock-free).

В C++ атомарные переменные реализованы через шаблонный класс std::atomic, который поддерживает: базовые типы данных, указатели и пользовательские типы. Со стандарта C++20 поддерживаются std::shared_ptr и std::weak_ptr.

Операции над атомарной переменной могут выполняться без блокировок на аппаратном уровне. Проверить отсутствие блокировок можно с помощью метода is_lock_free. Так же есть переменная is_always_lock_free, определяемая на этапе компиляции, но принимает true, если отсутствие блокировок не зависит от аппаратного уровня.

В отличии от POSIX стандарт C++ позволяет безопасно использовать operator= наравне с методом store для записи значения в атомарную переменную. Метод load используется для чтения значения из атомарной переменной, который может быть опущен в пользу неявного приведения. Пример:

std::atomic<int> a(10); // Инициализация значением 10
int value = a.load();   // Чтение
a.store(20);            // Запись без возврата значения
a = 30;                 // Ещё один способ записи с возвратом копии значения
value = a;              // Ещё один способ чтения

Есть три метода для сравнения и обмена:

  • exchange — атомарный способ значения атомарной переменной новым значением, возвращая предыдущее значение.
  • compare_exchange_weak — атомарно сравнивает значение с ожидаемым и, если они равны, устанавливает новое значение, возвращая флаг успешности замены;
  • compare_exchange_strong — аналогично compare_exchange_weak, но с гарантией отсутствия ложных отказов, возвращая флаг успешности замены.

Примечание: compare_exchange_weak может «ложно сбоить», когда ожидаемое значение совпадает со значением в атомарной переменной: при таком подходе «weak» версии могут быть более эффективны на некоторых платформах за счет меньших требований к синхронизации (например в циклах, где операции могут быть повторены без значительных накладных расходов).

Функция exchange принимает следующие аргументы:

  • desired — новое значение;
  • order — опциональный параметр, определяющий порядок памяти для операции (по умолчанию std::memory_order_seq_cst).

Функции compare_exchange_ принимают следующие аргументы:

  • expected — ожидаемое значение в атомарной переменной;
  • desired — новое значение;
  • order — опциональный параметр, определяющий порядок памяти для операции (по умолчанию std::memory_order_seq_cst).

Порядок памяти для операций exchange может принимать следующие значения:

  • std::memory_order_relaxed — операции не вводят никаких ограничений на порядок выполнения относительно других операций (самый слабый порядок, который позволяет достигнуть максимальной производительности на мультипроцессорных системах, но не гарантирует порядок видимости изменений между потоками);
  • std::memory_order_acquire — предотвращает все чтения и записи, выполненные в текущем потоке после операции acquire, от перемещения до этой операции (гарантирует, что изменения, выполненные другими потоками до операции release, будут видимы после acquire);
  • std::memory_order_release — предотвращает все чтения и записи, выполненные в текущем потоке до операции release, от перемещения после этой операции (обеспечивает, что операции, выполненные до release, будут видимы в других потоках, выполняющих соответствующую операцию acquire);
  • std::memory_order_acq_rel — сочетает в себе эффекты memory_order_acquire и memory_order_release (полезно для операций чтения-модификации-записи, таких как atomic_compare_exchange, где необходимо обеспечить видимость изменений до и после операции);
  • std::memory_order_consume — редко используемая модель памяти, предназначенная для операций, зависящих от данных — ограничивает переупорядочивание операций так, чтобы чтение не могло быть выполнено до завершения операций, от которых оно зависит (часто заменяется на memory_order_acquire из-за сложности корректной реализации и поддержки компиляторами);
  • std::memory_order_seq_cst — самый строгий порядок. Операции с этой моделью памяти выполняются с полным упорядочиванием относительно других seq_cst операций — все потоки видят операции в одном и том же порядке (обеспечивает наибольшие гарантии порядка, но может быть менее производительным по сравнению с более слабыми порядками).

В зависимости от типа данных, используемого в шаблоне доступны следующие методы:

  • для целых и указателей (со стандарта C++11), а так же вещественных (со стандарта C++20):
    • fetch_add — атомарная сумма, возвращающая предыдущее значение,
    • fetch_sub — атомарная разность, возвращающая предыдущее значение,
    • operator+= — атомарная сумма, возвращающая результат суммы,
    • operator-= — атомарная разность, возвращающая результат разности;
  • для целых и указателей:
    • operator++ — префиксный и суффиксный атомарный инкремент,
    • operator-- — префиксный и суффиксный атомарный декремент;
  • только для целых:
    • fetch_and — атомарное побитовое И, возвращающее предыдущее значение,
    • fetch_or — атомарное побитовое ИЛИ, возвращающее предыдущее значение,
    • fetch_xor — атомарное побитовое исключающее ИЛИ, возвращающее предыдущее значение,
    • operator&= — атомарное побитовое И, возвращающее результат операции,
    • operator|= — атомарное побитовое ИЛИ, возвращающее результат операции,
    • operator^= — атомарное побитовое исключающее ИЛИ, возвращающее результат операции.

Важное замечание: следующий код в различных вариациях не является атомарным:

atomicInt = atomicInt * 2; // и тому подобное

В стандарте C++20 введены дополнительные методы:

  • test — атомарно возвращает значение флага;
  • wait — блокирует поток, пока значение, переданное в качестве аргумента) не поменяется;
  • notify_one — уведомляет один поток об изменении значения;
  • notify_all — уведомляет все потоки об изменении значения.

В большинстве случаев использование этих методов может быть более производительным, чем использование традиционных примитивов синхронизации, так как они используют низкоуровневые механизмы уведомления потоков.

Атомарные флаги (std::atomic_flag)

Атомарный флаг (со стандарта C++11), определенный классом std::atomic_flag, является простейшим примитивом синхронизации, который гарантирует отсутствие блокировок.

Данный класс имеет следующие методы:

  • clear — атомарно устанавливает флаг в состояние false;
  • test_and_set — атомарно ставит флаг в состояние true и возвращает прошлое значение;

Также можно использовать методы атомарных переменных (C++20): test, wait, notify_one, notify_all.

Пример использования атомарных флагов в циклической блокировке (spinlock):

#include <iostream>
#include <atomic>
#include <thread>

#define N_THREADS 5

std::atomic_flag lock = ATOMIC_FLAG_INIT;

void simpleSpinLock(int id)
{
    while (lock.test_and_set(std::memory_order_acquire)) 
    {
        // Блокировка занята, ждем
    }
    std::cout << "поток " << id << " захватил блокировку\n";
    std::this_thread::sleep_for(std::chrono::milliseconds(1000)); // Симуляция работы
    lock.clear(std::memory_order_release);
}

int main() 
{
    std::thread threads[N_THREADS];
    for (int i = 0; i < N_THREADS; i++) 
        threads[i] = std::thread(simpleSpinLock, i);
    for (auto& thread : threads) 
        thread.join();
    
    return 0;
}

В данном примере потоки, которые ожидают блокировки тратят процессорное время, что в целом можно решить в добавлением ожиданий через sleep.

Использование атомарных флагов не ограничивается циклическими блокировками, например их можно использовать для установки флагов завершения.

Семафоры (Semaphores)

Семафоры были добавлены в стандарте C++20:

  • бинарные семафоры (std::binary_semaphore) — могут пропустить только один поток в критическую секцию;
  • семафор со счетчиком (std::coutnting_semaphore) — позволяют пропускать в критическую секцию N потоков.

Основные методы семафоров:

  • acquire — захватывает семафор, уменьшая счетчик (если счетчик равен нулю — поток блокируется);
  • release — освобождает семафор, увеличивая счетчик (если есть заблокированный поток — он просыпается);
  • try_acquire — неблокирующая попытка захвата счетчика, возвращает true в случае успешного захвата.

В отличии от мьютексов семафоры не привязаны к потокам выполнения — acquire и release могут быть вызваны в разных потоках.

Начальное значение передается параметром в конструкторе.

Пример использования бинарного семафора:

#include <iostream>
#include <thread>
#include <semaphore>

std::binary_semaphore sem(1); // Создаем бинарный семафор, инициализированный в состояние "открыт"

void worker(int id) 
{
    sem.acquire(); // Захватываем семафор (блокируем доступ к ресурсу)
    std::cout << "Поток " << id << " работает\n";
    std::this_thread::sleep_for(std::chrono::milliseconds(4000)); // Симуляция работы
    std::cout << "Поток " << id << " закончил\n";
    sem.release(); // Освобождаем семафор (разблокируем доступ к ресурсу)
}

int main() 
{
    std::thread t1(worker, 1);
    std::thread t2(worker, 2);

    t1.join();
    t2.join();

    return 0;
}

Бинарный семафор определен через:

using binary_semaphore = std::counting_semaphore<1>;

Семафор со счетчиком требует передачи конкретного числа LeastMaxValue в шаблоне, используемого в качестве «требуемого максимального значения счетчика семафора». По исходным файлам (github.com/gcc-mirror
a945c34) GNU C++ Compiller данное значение используется для static_assert проверок:

static_assert(__least_max_value >= 0);
static_assert(__least_max_value <= __semaphore_impl::_S_max);

Такая проверка позволяет на этапе компиляции проверить, может ли компилятор обеспечить работу семафора с «требуемым максимальным значением счетчика» (0 ≤ LeastMaxValuestd::__semaphore_impl::_S_max).

В GNU C++ Compiller при увеличении текущего значения счетчика семафора методом release не проверяется на превышение верхней границы.

Важное замечание: ноль является допустимым значением LeastMaxValue и позволяет проигнорировать проверку по верхней границе счетчика.

Данная конструкция в объявлении класса:

template< std::ptrdiff_t LeastMaxValue = /* implementation-defined */ >
class counting_semaphore;

Пример использования семафора со счетчиком:

#include <iostream>
#include <sstream>
#include <thread>
#include <semaphore>

#define N_THREADS 5

std::counting_semaphore<3> sem(3); // Создаем счетный семафор с максимальным значением 3

void worker(int id) 
{
    sem.acquire(); // Захватываем семафор (уменьшаем счетчик)
    std::cout << (std::stringstream() << "Поток " << id << " работает\n").str();
    std::this_thread::sleep_for(std::chrono::milliseconds(4000)); // Симуляция работы
    std::cout << (std::stringstream() << "Поток " << id << " закончил\n").str();
    sem.release(); // Освобождаем семафор (увеличиваем счетчик)
}

int main() 
{
    std::thread threads[N_THREADS];
    for (int i = 0; i < N_THREADS; i++) 
        threads[i] = std::thread(worker, i);

    for (auto& t : threads) 
        t.join();

    return 0;
}

В GNU C++ Compiller имеется две реализации семафоров (libstdc++/include/bits/semaphore_base.h a945c34):

  • __platform_semaphore — использует POSIX семафоры;
  • __atomic_semaphore — использует атомарные функции для работы со счетчиком.

Можно указать необходимость принудительного использования POSIX семафоров с помощью установки флага компиляции _GLIBCXX_USE_POSIX_SEMAPHORE. Выбор реализации осуществляется следующим образом:

// Note: the _GLIBCXX_USE_POSIX_SEMAPHORE macro can be used to force the
// use of Posix semaphores (sem_t). Doing so however, alters the ABI.
#if defined __glibcxx_atomic_wait && !_GLIBCXX_USE_POSIX_SEMAPHORE
  using __semaphore_impl = __atomic_semaphore;
#elif _GLIBCXX_HAVE_POSIX_SEMAPHORE
  using __semaphore_impl = __platform_semaphore;
#endif

Барьеры (Barriers)

Барьер — это примитив синхронизации, который позволяет потоку или группе потоков ожидать оставшиеся потоки до перехода к следующей фазе выполнения. Барьеры представлены классом std::barrier со стандарта C++20, который содержит в себе счетчик. Барьер может быть использован множество раз (подходит для использования в цикле).

Конструктор класса принимает количество потоков и функцию (необязательный параметр), которая будет вызвана, когда нужное число потоков дойдет до барьера.

Важное замечание: потоки будут ждать завершения функции, вызванной при заполнении барьера.

Данный класс имеет следующие методы:

  • arrive — уменьшает счетчик, но не блокирует поток, возвращает токен, который передается в wait;
  • wait — блокирует поток пока остальные не достигнут барьера, принимает токен из метода arrive через std::move;
  • arrive_and_wait — уменьшает счетчик и блокирует поток пока остальные не достигнут барьера;
  • arrive_and_drop — уменьшает счетчик и исключает поток из синхронизации (уменьшает общее количество ожидаемых потоков на барьере для следующих использований).

После того, как потоки проходят через барьер, значение счетчика восстанавливается на начальное, что позволяет использовать барьер множество раз.

Важное замечание: метод arrive_and_drop уменьшает число ожидаемых на барьере потоков и если в следующий раз поток зайдет на вызов arrive_and_wait, то это приведет к выдаче исключения Floating point exception. В такой ситуации требуется дополнительно проверить, что поток больше не будет взаимодействовать с барьером.

Пример использования раздельного вызова методов arrive и wait:

    auto token = sync_point.arrive(); // Сообщаем о прибытии потока к барьеру и получаем токен прибытия
    // ...
    sync_point.wait(std::move(token)); // Ожидаем, пока все потоки не достигнут барьера, используя токен

Пример использования двух барьеров (с функцией и без неё):

#include <iostream>
#include <sstream>
#include <barrier>
#include <thread>

#define N_THREADS 5

void sync_function()
{
    std::cout << "Потоки пришли на барьер\n";
}

std::barrier sync_point1(N_THREADS); // Синхронизируем потоки
std::barrier sync_point2(N_THREADS, sync_function); // Синхронизируем потоки c функцией

void worker(int id)
{
    std::cout << (std::stringstream() << "Поток " << id << " стартовал\n").str();
    std::this_thread::sleep_for(std::chrono::milliseconds(400 * id)); // Имитация работы продолжительностью в соответствии с идентификатором потока
    std::cout << (std::stringstream() << "Поток " << id << " достиг 1 барьера\n").str();
    sync_point1.arrive_and_wait();
    std::cout << (std::stringstream() << "Поток " << id << " прошел 1 барьер\n").str();
    std::this_thread::sleep_for(std::chrono::milliseconds(400 * (id+1))); // Имитация работы продолжительностью в соответствии с идентификатором потока
    std::cout << (std::stringstream() << "Поток " << id << " достиг 2 барьера\n").str();
    sync_point2.arrive_and_wait();
    std::cout << (std::stringstream() << "Поток " << id << " прошел 2 барьер\n").str();
}

int main()
{
    std::thread threads[N_THREADS];
    for (int i = 0; i < N_THREADS; i++) 
        threads[i] = std::thread(worker, i);

    for (auto &thread : threads)
        thread.join();

    return 0;
}

Латч (Latch)

Латч — это примитив синхронизации, который позволяет потоку или группе потоков ожидать оставшиеся потоки до перехода к следующей фазе выполнения. Латчи представлены классом std::latch со стандарта C++20, который содержит в себе одноразовый счетчик — собственно в этом его главное отличие от барьеров.

Примечание: latch можно перевести на русский как защелка, задвижка или щеколда, но автор позволит себе использовать англицизм.

Конструктор данного класса принимает начальное значение счетчика.

Данный класс имеет следующие методы:

  • count_down — уменьшает счетчик без блокировки потока;
  • try_wait — проверяет достижение счетчиком нуля без блокировки;
  • wait — блокирует поток, пока счетчик не достигнет нуля;
  • arrive_and_wait — уменьшает счетчик и блокирует поток, пока счетчик не достигнет нуля.

Пример использования методов count_down и wait:

    latch.count_down(); // Уменьшаем счетчик
    // ...
    latch.wait(); // Ожидаем, пока все потоки не достигнут латча 

Пример использования латча в многопоточном приложении с методом arrive_and_wait:

#include <iostream>
#include <sstream>
#include <thread>
#include <latch>

#define N_THREADS 5

std::latch latch(N_THREADS); // Создаем латч, ожидающий 5 участников

void worker(int id)
{
    std::cout << (std::stringstream() << "Поток " << id << " начал работу\n").str();
    std::this_thread::sleep_for(std::chrono::milliseconds(400 * (id + 1))); // Симуляция работы
    std::cout << (std::stringstream() << "Поток " << id << " закончил свою работу и ждет остальных\n").str();
    latch.arrive_and_wait(); // Уменьшаем счетчик и ждем остальных
    std::cout << (std::stringstream() << "Поток " << id << " вышел из латча\n").str();
}

int main()
{
    std::thread threads[N_THREADS];
    for (int i = 0; i < N_THREADS; i++)
        threads[i] = std::thread(worker, i);

    for (auto &t : threads)
        t.join();

    return 0;
}

Важное замечание: повторное использование отработанного latch может привести к взаимной блокировке (deadlock).

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *

Этот сайт использует Akismet для борьбы со спамом. Узнайте, как обрабатываются ваши данные комментариев.