Рубрики
Многопоточность Программирование

Введение в многопоточность: POSIX

Работа с многопоточностью в библиотеке POSIX

Работа с потоками

Для работы с POSIX-многопоточностью в языке Си требуется подключить заголовочный файл pthread.h и добавить флаг компилятору с библиотекой pthread: -lpthread.

Создание потоков

pthread_create — эта функция используется для создания нового потока. Она возвращает 0 в случае успешного создания потока или код ошибки. Функция принимает четыре аргумента:

  • указатель pthread_t*, который будет хранить идентификатор нового потока;
  • указатель на структуру атрибутов потока (pthread_attr_t* или NULL для атрибутов по умолчанию);
  • указатель на функцию, которая будет выполнена в новом потоке;
  • указатель на аргумент, передаваемый в эту функцию (либо NULL).

Формат функции, которая будет выполнена в новом потоке:

void* threadFunction(void* data)
{
    int idata1 = *((int*)data); // Для обращения к копии данных как int-переменной
    int* idata2 = (int*) data; // Для обращения к данным как int-указателю
// работа функции
    return 0; // или адрес переменной с результатами работы
}

Примечание: передаваемый аргумент может быть объектом структуры.

Пример создания потока:

pthread_t thread;
int argument;
int result = pthread_create(&thread, NULL, threadFunction, &argument);

Если необходимо создать несколько потоков, то можно сделать это в цикле:

pthread_t threads[THREADS_COUNT];
int arguments[THREADS_COUNT];
for (i = 0; i < THREADS_COUNT; ++i) 
{
    pthread_create(&threads[i], NULL, threadFunction, &arguments[i]);
}

Примечание: поток стартует сразу после того, как создан.

Ожидание завершения потока

pthread_join — эта функция блокирует поток из которого она была вызвана до тех пор, пока указанный поток не завершится. Это позволяет программе дождаться результатов выполнения потока. Функция принимает дескриптор pthread_t, а так же указатель void* для возвращаемого адреса с результатом.

Пример ожидания всех потоков, которые были созданы в предыдущем примере:

void *result;
for (i = 0; i < THREADS_COUNT; ++i) 
{
    pthread_join(threads[i], &result); // Или NULL заместо &result
    // Действия после завершения i-ого потока (например с result)
}

Завершение потока

Есть четыре пути завершения потока:

  • вызов оператора return (из стартовой функции потока);
  • вызов функции pthread_exit (из любой функции в стеке вызовов потока);
  • вызов функции pthread_cancel из других потоков;
  • использование переменной-флага о необходимости завершения.

При завершении работы потока важно не забыть освободить выделенные ресурсы включая те, которые были выделены в куче (через new или malloc).

Вызов оператора return из стартовой функции потока завершает его выполнение. В качестве возвращаемого значения используется NULL или адрес переменной. Важно помнить, что все локальные переменные, размещенные в стеке будут удалены.

Функция pthread_exit используется в библиотеке POSIX для завершения выполнения вызывающего потока. Это один из способов явно завершить поток, помимо нормального возврата из запускаемой функции потока — функция может вызываться из любой подфункции, вызываемой потоком. Функция pthread_exit особенно полезна, когда нужно завершить выполнение потока в середине его выполнения, например, в случае ошибки или когда поток завершил свою работу досрочно. Данная функция принимает в качестве аргумента адрес, который поток может вернуть другим потокам, которые ждут его завершения с помощью функции pthread_join. Этот параметр может быть NULL, если поток не возвращает значение.

Важное замечание: М. Митчелл в книге «Программирование для Linux. Профессиональный подход» (Advanced Linux Programming) говорит — «Если поток вызывает pthread_exit, C++ не гарантирует вызов деструкторов для всех автоматических переменных на стеке потока.» — данное высказывание справедливо только для старых ядер Linux (например 2.4.32 2010 года, g++ версии 2.95.4). Данная проблема не возникает при вызове оператора return.

Данная функция обладает следующими свойствами:

  • функция pthread_exit не возвращает управление вызывающему коду, поскольку она завершает выполнение текущего потока;
  • если pthread_exit вызывается из главного потока (то есть из функции main), это приведет к ожиданию завершения всех потоков в процессе (занимает некоторое время), а не мгновенного завершения всех — это отличается от вызова exit или return (из main), которые немедленно завершает весь процесс и все его потоки;

Функция pthread_cancel посылает запрос на отмену к указанному потоку. В зависимости от атрибутов потока поведение может быть следующим:

  • поток игнорирует запросы на отмену;
  • поток разрешает запросы на отмену:
    • отложенная отмена — поток самостоятельно проверяет запросы на отмену или отмена происходит при вызове функций ожидания или блокирующих операций,
    • асинхронная отмена — поток может быть немедленно отменен в любой момент

Жирным выделено поведение по умолчанию.

Для явной проверки запроса на отмену можно вызвать функцию pthread_testcancel().

Поток может изменять свое поведение по отношению к запросам на отмену с помощью следующих функций:

  • pthread_setcancelstate — устанавливает состояние восприимчивости потока к отмене:
    • PTHREAD_CANCEL_ENABLE — включает возможность отмены,
    • PTHREAD_CANCEL_DISABLE — отключает возможность отмены;
  • pthread_setcanceltype — устанавливает тип отмены потока:
    • PTHREAD_CANCEL_DEFERRED — отложенная отмена потока,
    • PTHREAD_CANCEL_ASYNCHRONOUS — асинхронная отмена (немедленная).

Обе функции в качестве второго аргумента принимают указатель на переменную, куда будет записано старое состояние, либо NULL. Пример использования:

void* threadFunction(void* data)
{
    int oldstate;

    // Временно отключаем возможность отмены потока, чтобы например безопасно выполнить инициализацию
    pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &oldstate);

    // ...

    // Закончив, включаем возможность отмены потока
    pthread_setcancelstate(oldstate, NULL);

    // ...

    // Завершаем поток
    return NULL;
}

Последний вариант — использование переменной-флага, указывающей на необходимость завершения работы. В примере используются атомарные переменные (подробнее далее в заметке), которые позволяют обеспечить корректность передаваемого в поток флага:

// Флаг о завершении потока
atomic_int thread_should_exit = 0;

void* threadFunction(void* data)
{
    while(!atomic_load(&thread_should_exit)) 
    {
        // Нормальная работа потока
        // ...
    }

    // Корректное завершение работы
    printf("Поток завершает свою работу\n");
    return NULL;
}

int main()
{
    // ...

    // Установить флаг для завершения потока
    atomic_store(&thread_should_exit, 1);
    
    // Ждать завершения потока
    pthread_join(thread, NULL);

    // ...
}

Атрибуты потока

Атрибуты потока в POSIX Threads позволяют настраивать различные параметры потоков при их создании. Доступны следующие атрибуты:

  • состояние отсоединения (detach state, pthread_attr_setdetachstate) — определяет, будет ли поток в отсоединенном состоянии после создания: автоматически освобождает свои ресурсы при завершении (не требуется вызывать pthread_join);
  • политика планирования (scheduling policy, pthread_attr_setschedpolicy);
  • приоритет планирования (scheduling priority, pthread_attr_setschedparam) — устанавливает приоритет потока в рамках его политики планирования (более высокий приоритет может увеличивать шансы потока на более раннее выполнение);
  • размер стека (stack size, pthread_attr_setstacksize) — задает размер стека для потока, увеличение которого может позволить создать больше локальных переменных или выполнять более глубокую рекурсию;
  • адрес стека (stack address, pthread_attr_setstack) — позволяет указать точный адрес в памяти для стека потока;
  • размер защитного блока (guard size, pthread_attr_setguardsize) — определяет размер «защитного блока» в конце стека потока, который используется для предотвращения переполнения стека.

В POSIX имеются следующие политики планирования:

  • SCHED_FIFO (First-In, First-Out) — политика планирования реального времени с фиксированным приоритетом: потоки с более высоким приоритетом всегда будут выполняться раньше потоков с более низким приоритетом, а потоки с одинаковым приоритетом будут выполняться в порядке их добавления в очередь, в свою очередь поток, который уже выполняется, будет продолжать своё выполнение до тех пор, пока явно не освободит процессор (например, вызовом sleep или блокировкой на операции ввода-вывода);
  • SCHED_RR (Round-Robin) — политика реального времени, похожая на SCHED_FIFO, но добавляющая квант времени (time slice) для каждого потока: все потоки с одинаковым приоритетом будут получать равные кванты времени для выполнения в циклическом порядке, как только поток исчерпает свой квант времени, то он возвращается в конец очереди и следующий поток с тем же приоритетом начинает выполнение;
  • SCHED_OTHER — стандартная политика планирования, используемая для обычных потоков выполнения: в этом режиме операционная система автоматически управляет планированием потоков, основываясь на динамически меняющихся приоритетах.

Важное замечание: для политик реального времени могут потребоваться права суперпользователя.

Структура pthread_attr_t используется для хранения атрибутов потока. Функция pthread_attr_init позволяет инициализировать экземпляр структуры значениями по умолчанию. Необходимо вызывать pthread_attr_destroy после создания потока, так как pthread_attr_t имеет указатель на динамически-выделяемую память.

Пример создания потока с заданием атрибутов — поток с максимальным приоритетом FIFO:

#include <sched.h>
#include <pthread.h>
#include <stdio.h>

void* threadFunction(void* data)
{
    // ... 
}

int main() 
{
    pthread_t thread;
    pthread_attr_t attr;
    struct sched_param param;
    int policy = SCHED_FIFO;

    // Инициализация атрибутов потока
    pthread_attr_init(&attr);

    // Установка политики планирования FIFO
    pthread_attr_setschedpolicy(&attr, policy);

    param.sched_priority = sched_get_priority_max(policy);
    pthread_attr_setschedparam(&attr, &param);

    // Установка атрибута явного использования заданных атрибутов планирования
    pthread_attr_setinheritsched(&attr, PTHREAD_EXPLICIT_SCHED);

    // Создание потока с заданными атрибутами
    if(pthread_create(&thread, &attr, threadFunction, NULL))
    {
        fprintf(stderr, "Ошибка при создании потока\n");
        return 1;
    }

    printf("Поток создан. Политика: %d, Приоритет: %d\n", policy, param.sched_priority);

    // Удаление атрибутов потока
    pthread_attr_destroy(&attr);

    // Ожидание завершения потока
    pthread_join(thread, NULL);

    return 0;
}

Созданный поток так же можно отсоединить (detach) в процессе работы вызовом функции pthread_detach:

pthread_detach(thread);

Средства синхронизации

Мьютексы (Mutexes)

Мьютексы («mutual exclusion» — взаимное исключение) используются для обеспечения взаимного исключения, что позволяет предотвратить одновременный доступ к общим ресурсам в многопоточных приложениях. Они представляют собой блокировку, которая может находиться в одном из двух состояний: заблокированном и разблокированном.

Для работы с мьютексами используются следующие функции:

  • pthread_mutex_init — инициализирует мьютекс, где можно задать атрибуты мьютекса через второй параметр или использовать NULL для стандартных атрибутов;
  • pthread_mutex_lock — блокирует мьютекс, если мьютекс уже заблокирован, вызывающий поток блокируется до тех пор, пока мьютекс не будет разблокирован;
  • pthread_mutex_unlock — разблокирует мьютекс, делая его доступным для других потоков (позволяет спящему потоку забрать мьютекс себе);
  • pthread_mutex_destroy — уничтожает мьютекс, освобождая ресурсы, связанные с ним.

Рассмотрим пример использования мьютекса для решения проблемы состояния гонки при записи в многопоточном приложении:

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>

// Глобальная переменная и мьютекс
int shared_data = 0;
pthread_mutex_t mutex;

// Функция потока
void* thread_function(void* args) 
{
    // Блокировка мьютекса
    pthread_mutex_lock(&mutex);
    
    // Критическая секция
    shared_data++;
    printf("Thread %ld: shared_data = %d\n", (long)args, shared_data);
    
    // Разблокировка мьютекса
    pthread_mutex_unlock(&mutex);
    
    return NULL;
}

int main() 
{
    pthread_t threads[2];
    pthread_mutex_init(&mutex, NULL); // Инициализация мьютекса

    // Создание потоков
    for (long i = 0; i < 2; i++) 
    {
        pthread_create(&threads[i], NULL, thread_function, (void*)i);
    }

    // Ожидание завершения потоков
    for (int i = 0; i < 2; i++) 
    {
        pthread_join(threads[i], NULL);
    }

    pthread_mutex_destroy(&mutex); // Уничтожение мьютекса
    return 0;
}

В этом примере два потока инкрементируют глобальную переменную shared_data. Для обеспечения потокобезопасности изменений этой переменной используется мьютекс mutex, который блокируется перед изменением переменной и разблокируется после. Это гарантирует, что в критической секции (где изменяется shared_data) в любой момент времени может находиться только один поток.

Мьютекс может быть инициализирован по умолчанию без использования функции pthread_mutex_init, применяя макрос PTHREAD_MUTEX_INITIALIZER. Этот способ инициализации особенно удобен для статически распределенных мьютексов. В таком случае мьютекс инициализируется с атрибутами по умолчанию:

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; // Не требует вызова pthread_mutex_init

Как говорилось ранее: в качестве второго аргумента функции pthread_mutex_init может быть передан указатель на атрибуты мьютекса (pthread_mutexattr_t). Данная структура должна быть инициализирована значениями по умолчанию с помощью функции pthread_mutexattr_init.

Для изменения атрибутов создаваемого мьютекса используются следующие функции:

  • pthread_mutexattr_settype — определяет поведение мьютекса при попытке повторной блокировке тем же самым потоком (например в случае рекурсии):
    • PTHREAD_MUTEX_NORMAL — если поток пытается повторно заблокировать мьютекс, это может привести к взаимной блокировке (deadlock),
    • PTHREAD_MUTEX_ERRORCHECK — при попытке разблокировки мьютекса другим потоком возвращается ошибка;
    • PTHREAD_MUTEX_RECURSIVE — позволяет потоку, который уже заблокировал мьютекс, повторно его блокировать без взаимной блокировки (при каждой последующей блокировке счётчик блокировок увеличивается и поток должен выполнить соответствующее количество разблокировок, чтобы полностью освободить мьютекс),
    • PTHREAD_MUTEX_DEFAULT — варьируется от конечной реализации мьютекса на целевой платформе и компиляторе;
  • pthread_mutexattr_setprotocol — определяет, как мьютекс будет взаимодействовать с приоритетами потоков для предотвращения инверсии приоритетов:
    • PTHREAD_PRIO_NONE — никакой специальной поддержки инверсии приоритетов,
    • PTHREAD_PRIO_INHERIT — поток, владеющий мьютексом, временно наследует приоритет наивысшего приоритетного потока, который ждёт этот мьютекс,
    • PTHREAD_PRIO_PROTECT — устанавливается определённый приоритет для мьютекса, и любой поток, пытающийся заблокировать мьютекс, будет выполняться с этим приоритетом;
  • pthread_mutexattr_setpshared — определяет, может ли мьютекс использоваться между несколькими процессами:
    • PTHREAD_PROCESS_PRIVATE — мьютекс может использоваться только внутри процесса, который его инициализировал,
    • PTHREAD_PROCESS_SHARED — мьютекс может использоваться между несколькими процессами (для этого мьютекс должен находиться в области памяти, разделяемой между этими процессами);
  • pthread_mutexattr_setrobust — определяет поведение мьютекса, если владелец мьютекса завершается не освободив его:
    • PTHREAD_MUTEX_STALLED — никаких специальных действий (другие потоки могут оставаться заблокированными),
    • PTHREAD_MUTEX_ROBUST — позволяет другому потоку разблокировать мьютекс, если владелец мьютекса завершил выполнение без его разблокировки.

Пример инициализации мьютекса:

void initialize_mutex(pthread_mutex_t* mutex) 
{
    pthread_mutexattr_t attr;
    pthread_mutexattr_init(&attr); // Инициализируем атрибуты мьютекса
    pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE); // Устанавливаем тип мьютекса как рекурсивный

    pthread_mutex_init(mutex, &attr); // Инициализируем мьютекс по указателю с заданными атрибутами

    pthread_mutexattr_destroy(&attr); // Уничтожаем объект атрибутов, так как он больше не нужен
}

Иногда может возникнуть ситуация, когда необходимо не блокировать поток, а лишь пропустить критическую секцию — для таких ситуаций можно использовать функцию pthread_mutex_trylock. Данная функция возвращает -1 в случае ошибки. Для получения кода ошибки используется заголовочный файл errno.h и результат ошибки в переменной errno:

  • 0 — если мьютекс был успешно заблокирован текущим потоком;
  • EBUSY — если мьютекс уже занят другим потоком.

Пример:

#include <errno.h>   
// ...

    if(pthread_mutex_trylock(&lock) == 0) 
    {
        // Мьютекс успешно заблокирован
        // Критическая секция
        pthread_mutex_unlock(&lock); // Освобождаем мьютекс
    } 
    else 
    if(errno == EBUSY) 
    {
        // Мьютекс уже заблокирован другим потоком
        printf("Мьютекс уже занят другим потоком.\n");
    } 
    else 
    {
        // Другая ошибка
        printf("Ошибка блокировки мьютекса: %d\n", reterrno;
    }

Так же существует возможность заблокировать мьютекс на определенное время с помощью функции pthread_mutex_timedlock, что может быть полезным в ситуации, когда поток может подождать, но ограниченное количество времени. Функция принимает в качестве второго аргумента адрес объекта структуры struct timespec — время, когда потоку следует проснуться. Данная функция возвращает -1 в случае ошибки. Для получения кода ошибки используется заголовочный файл errno.h и результат ошибки в переменной errno:

  • 0 — если мьютекс был успешно заблокирован текущим потоком;
  • ETIMEDOUT — если заданное время ожидания истекло.

Пример использования:

#include <errno.h>   
// ...

    struct timespec ts;
    clock_gettime(CLOCK_REALTIME, &ts);
    ts.tv_sec += 1; // Ждем блокировку максимум 1 секунду

    if(pthread_mutex_timedlock(&lock, &ts) == 0) 
    {
        // Мьютекс успешно заблокирован
        // Критическая секция
        pthread_mutex_unlock(&lock);
    } 
    else 
    if(errno == ETIMEDOUT) 
    {
        // Время ожидания истекло
        printf("Время ожидания мьютекса истекло.\n");
    } 
    else 
    {
        // Другая ошибка
        printf("Ошибка блокировки мьютекса: %d\n", errno);
    }

Важное замечание: структура struct timespec содержит время в секундах и наносекундах. При ожидании не кратном секунде требуется проверять, что время в наносекундах не превышает 109. В качестве примера можно использовать функцию:

void add_nanoseconds(struct timespec *ts, long nanoseconds) {
    const long sec_nsec = 1000000000L;
    long new_nsec = ts->tv_nsec + nanoseconds;

    // Проверяем, не превышает ли количество наносекунд 10^9
    if (new_nsec >= sec_nsec) {
        ts->tv_sec += new_nsec / sec_nsec; // Добавляем секунды
        ts->tv_nsec = new_nsec % sec_nsec; // Оставляем остаток от наносекунд
    } 
    else 
        ts->tv_nsec = new_nsec;
}

Примечание: существует код EAGAIN, который возвращается при достижении лимита рекурсивной блокировки.

Семафоры (Semaphores)

Семафоры представляют собой механизм синхронизации, предназначенный для управления доступом к общим ресурсам в многопоточных программных приложениях.

Для работы с семафорами используются следующие функции:

  • sem_init — используется для задания начального значения семафора и определения, будет ли семафор использоваться между несколькими процессами;
  • sem_wait — уменьшение значения семафора на 1, если значение семафора уже равно 0, вызывающий поток блокируется до тех пор, пока семафор не будет увеличен другим потоком;
  • sem_trywait — для неблокирующей попытки;
  • sem_timedwait — для неблокирующей попытки с таймаутом;
  • sem_post — увеличение значения семафора на 1. Эта операция может разблокировать потоки, ожидающие уменьшения значения семафора;
  • sem_destroy — уничтожение семафора и освобождение всех связанных с ним ресурсов.

Пример использования семафора:

#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
#include <unistd.h>

sem_t sem;

// Функция потока
void* thread_func(void* arg) 
{
    // Пытаемся войти в критическую секцию
    sem_wait(&sem);
    printf("Поток %ld вошел в критическую секцию.\n", (long)arg);

    // Имитация выполнения работы в критической секции
    sleep(1);

    printf("Поток %ld вышел из критической секции.\n", (long)arg);
    sem_post(&sem); // Освобождаем критическую секцию
    return NULL;
}

int main() 
{
    pthread_t t1, t2;

    // Инициализация семафора со значением 1 (разрешается доступ только одному потоку)
    // 0 - запрещает доступ из других процессов
    sem_init(&sem, 0, 1);

    // Создание двух потоков
    pthread_create(&t1, NULL, thread_func, (void*)1);
    pthread_create(&t2, NULL, thread_func, (void*)2);

    // Ожидание завершения потоков
    pthread_join(t1, NULL);
    pthread_join(t2, NULL);

    // Уничтожение семафора
    sem_destroy(&sem);

    return 0;
}

В этом примере семафор инициализируется с начальным значением 1, что означает, что к критической секции одновременно может получить доступ только один поток, а так же вторым аргументом, равным 0, запрещает доступ из других процессов. Каждый поток пытается «захватить» семафор с помощью sem_wait перед входом в критическую секцию и «освобождает» его после выхода, вызывая sem_post. Это обеспечивает синхронизацию доступа к критической секции между потоками.

Пример использования sem_trywait:

#include <errno.h>
//...

    if(sem_trywait(&sem) == 0) 
    {
        // Семафор успешно захвачен
        // Критическая секция
        sem_post(&sem); // Освобождение семафора
    } 
    else 
    if(errno == EAGAIN) 
    {
        // Семафор уже заблокирован
        printf("Семафор уже заблокирован.\n");
    } 
    else 
    {
        // Другая ошибка
        perror("sem_trywait ошибка: %d\n", errno);
    }

Пример использования sem_timedwait:

#include <errno.h>
//...

    struct timespec ts;
    clock_gettime(CLOCK_REALTIME, &ts);
    ts.tv_sec += 10; // Установка таймаута в 10 секунд

    if(sem_timedwait(&sem, &ts) == 0) 
    {
        // Семафор успешно захвачен
        // Критическая секция
        sem_post(&sem); // Освобождение семафора
    } 
    else 
    if(errno == ETIMEDOUT) 
    {
        // Семафор уже заблокирован
        printf("Семафор уже заблокирован.\n");
    } 
    else 
    {
        // Другая ошибка
        perror("sem_trywait ошибка: %d\n", errno);
    }

Семафор может так же вернуть ошибку: EINTR — вызов был прерван обработчиком сигнала (interrupt).

Условные переменные (Condition variables)

Условные переменные представляют собой механизм синхронизации, который позволяет потокам приостанавливать выполнение до тех пор, пока не наступит определённое условие. Условные переменные используются в сочетании с мьютексами для координации действий между несколькими потоками, позволяя одним потокам ждать («спать»), пока другие потоки не изменят состояние программы и не сигнализируют об этом.

Для работы с условными переменными используются следующие функции:

  • pthread_cond_init — инициализация условной переменной;
  • pthread_cond_wait — блокировка потока до тех пор, пока другой поток не разбудит его, сигнализируя условную переменную (функция атомарно разблокирует мьютекс и ждёт сигнала на условной переменной, после чего повторно захватывает мьютекс);
  • pthread_cond_signal — пробуждение хотя бы одного из потоков, ожидающих на условной переменной;
  • pthread_cond_broadcast — пробуждение всех потоков, ожидающих на условной переменной;
  • pthread_cond_destroy — уничтожение условной переменной.

Примечание: условную переменную можно проинициализировать макросом PTHREAD_COND_INITIALIZER

Пример:

#include <pthread.h>
#include <stdio.h>
#include <unistd.h>

// Глобальные переменные
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
int ready = 0; // Условие, на котором основано ожидание

// Функция потока, ожидающего события
void* waiting_thread(void *arg) 
{
    pthread_mutex_lock(&mutex);
    while (ready == 0) 
    {
        printf("Ожидающий поток: ждем сигнала...\n");
        pthread_cond_wait(&cond, &mutex);
    }
    printf("Ожидающий поток: получен сигнал.\n");
    pthread_mutex_unlock(&mutex);
    return NULL;
}

// Функция потока, посылающего сигнал
void* signaling_thread(void *arg) 
{
    sleep(1); // Имитация работы
    pthread_mutex_lock(&mutex);
    ready = 1; // Изменяем условие, на котором основано ожидание
    pthread_cond_signal(&cond); // Посылаем сигнал одному ожидающему потоку
    printf("Сигнализирующий поток: сигнал отправлен.\n");
    pthread_mutex_unlock(&mutex);
    return NULL;
}

int main() 
{
    pthread_t w_thread, s_thread;

    pthread_create(&w_thread, NULL, waiting_thread, NULL);
    pthread_create(&s_thread, NULL, signaling_thread, NULL);

    pthread_join(w_thread, NULL);
    pthread_join(s_thread, NULL);

    pthread_mutex_destroy(&mutex);
    pthread_cond_destroy(&cond);

    return 0;
}

В этом примере один поток (waiting_thread) ожидает сигнала от другого потока (signaling_thread) перед продолжением выполнения. waiting_thread использует pthread_cond_wait для ожидания сигнала, что атомарно разблокирует мьютекс и приостанавливает выполнение потока до получения сигнала. signaling_thread изменяет состояние, на котором основано ожидание (ready = 1), и затем сигнализирует об этом, используя pthread_cond_signal, что пробуждает ожидающий поток.

Барьеры (Barriers)

Барьеры представляют собой синхронизационный механизм, который используется для ожидания в определённой точке выполнения несколькими потоками до тех пор, пока не достигнут определённый барьер — условие, при котором все потоки должны остановиться и дождаться друг друга. Это позволяет координировать работу потоков, гарантируя, что все потоки достигли определённой точки выполнения, прежде чем продолжить дальше. Барьеры особенно полезны в распределённых вычислениях и для задач, требующих разделения на фазы.

Для работы с условными переменными используются следующие функции:

  • pthread_barrier_init — создаёт барьер и задаёт количество потоков, которое должно достигнуть барьера, прежде чем все потоки могут продолжить выполнение;
  • pthread_barrier_wait — поток, вызывающий эту функцию, блокируется до тех пор, пока заданное количество потоков не достигнет барьера;
  • pthread_barrier_destroy — уничтожение барьера и освобождение всех связанных с ним ресурсов.

Примечание: функция pthread_barrier_wait возвращает PTHREAD_BARRIER_SERIAL_THREAD для последнего потока при разблокировке и 0 всем остальным. Такой подход позволяет выполнить сегмент кода только одному потоку среди всех разблокированных.

Пример использования барьеров:

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>

#define NUM_THREADS 5

pthread_barrier_t barrier;

// Функция потока
void* barrier_test(void* tid) 
{
    long thread_id = (long)tid;

    printf("Поток %ld ожидает у барьера.\n", thread_id);
    // Ожидание у барьера
    int rc = pthread_barrier_wait(&barrier);

    // Проверка, является ли поток последним, который разблокирует барьер
    if(rc == PTHREAD_BARRIER_SERIAL_THREAD) 
    {
        printf("Поток %ld проходит через барьер последним.\n", thread_id);
    } 
    else 
    {
        printf("Поток %ld проходит через барьер.\n", thread_id);
    }

    pthread_exit(NULL);
}

int main() 
{
    pthread_t threads[NUM_THREADS];
    long t;

    // Инициализация барьера с указанием, что барьер должен ожидать NUM_THREADS потоков
    pthread_barrier_init(&barrier, NULL, NUM_THREADS);

    for(t = 0; t < NUM_THREADS; t++) 
    {
        pthread_create(&threads[t], NULL, barrier_test, (void*)t);
    }

    for(t = 0; t < NUM_THREADS; t++) 
    {
        pthread_join(threads[t], NULL);
    }

    // Уничтожение барьера после завершения работы
    pthread_barrier_destroy(&barrier);

    return 0;
}

В этом примере инициализируется барьер для пяти потоков (NUM_THREADS). Каждый поток выполняет функцию barrier_test, где они ожидают у барьера, вызывая pthread_barrier_wait. Потоки блокируются до тех пор, пока все пять потоков не соберутся у барьера. Как только последний поток достигает барьера, все потоки одновременно разблокируются и продолжают выполнение. Это демонстрирует, как барьеры могут быть использованы для синхронизации потоков на определённом этапе выполнения программы.

Атомарные операции

Атомарные операции — это операции, которые выполняются как единое целое без возможности прерывания. В контексте многопоточного программирования это означает, что когда один поток выполняет атомарную операцию, никакой другой поток не может видеть промежуточное состояние этой операции; он видит либо состояние до начала операции, либо после её завершения. Атомарные операции критически важны для синхронизации данных между потоками и предотвращения состояний гонки.

В POSIX атомарные операции чаще всего связаны с управлением потоками и синхронизацией, например, через использование семафоров (sem_t) и атомарные операции над переменными типа sig_atomic_t для обработки сигналов.

Однако в языке программирования Си с приходом стандарта C11 появилась возможность использования атомарных переменных и операций над ними. Однако стандарт C11 не предоставляет прямой поддержки атомарных операций для типов с плавающей точкой (float или double). Это связано с тем, что основное назначение атомарных операций — обеспечение целостности данных при доступе из нескольких потоков.

Заголовочный файл <stdatomic.h> предоставляет типы и функции для выполнения атомарных операций.

Атомарные переменные

Существует три подхода по объявлению атомарных переменных в языке Си:

  • использование квалификатора типа _Atomic;
  • использование макроса с параметром типа _Atomic(type);
  • использование атомарного типа данных.

Примечание: для работы с атомарными типами необходимо подключить заголовочный файл stdatomic.h

Пример объявления целочисленных атомарных переменных:

_Atomic const int i1;  
const _Atomic(int) i3;
const atomic_int i2;   

Важное замечание: если определена макро-константа __STDC_NO_ATOMICS__, то ключевое слово _Atomic не предоставляется. Таким образом можно проверить поддержку атомарных переменных компилятором на целевой платформе и, например, выдать соответствующую ошибку:

#ifdef __STDC_NO_ATOMICS__
    #error "Атомарные переменные не поддерживаются"
#endif

Важное предупреждение: далее будут приведены примеры работы, как правильные, так и те, работа которых не гарантируется на всех поддерживаемых платформах, а так же может зависеть от версии компилятора.

Переменные можно инициализировать двумя путями:

atomic_int atomicVar = ATOMIC_VAR_INIT(10); // Статическая инициализация
// или динамическая:
atomic_int atomicVarDynamic;
atomic_init(&atomicVarDynamic, 10);

Так же можно вызвать обычный оператор присваивания, но данный подход не гарантирует, что переменная будет инициализирована таким образом, чтобы быть безопасно использованной в многопоточных программах:

atomic_int atomicVar = 10; // Не рекомендуется

Для безопасной записи и получения значения используются функции:

atomic_store(&atomicVar, 20); // Запись значения
int value = atomic_load(&atomicVar); // Получение значения

По аналогии можно попробовать использовать оператор присваивания:

atomicVar = 20; // Запись значения (не рекомендуется)
value = atomicVar; // Получение значения (не рекомендуется)

Функции atomic_load и atomic_store имеют дополнительные формы с суффиксом _explicit.

Функции с суффиксом _explicit принимают дополнительный аргумент, указывающий модель памяти для операции:

  • memory_order_relaxed — операции не вводят никаких ограничений на порядок выполнения относительно других операций (самый слабый порядок, который позволяет достигнуть максимальной производительности на мультипроцессорных системах, но не гарантирует порядок видимости изменений между потоками);
  • memory_order_consume — редко используемая модель памяти, предназначенная для операций, зависящих от данных — ограничивает переупорядочивание операций так, чтобы чтение не могло быть выполнено до завершения операций, от которых оно зависит (часто заменяется на memory_order_acquire из-за сложности корректной реализации и поддержки компиляторами);
  • memory_order_acquire — предотвращает все чтения и записи, выполненные в текущем потоке после операции acquire, от перемещения до этой операции (гарантирует, что изменения, выполненные другими потоками до операции release, будут видимы после acquire);
  • memory_order_release — предотвращает все чтения и записи, выполненные в текущем потоке до операции release, от перемещения после этой операции (обеспечивает, что операции, выполненные до release, будут видимы в других потоках, выполняющих соответствующую операцию acquire);
  • memory_order_acq_rel — сочетает в себе эффекты memory_order_acquire и memory_order_release (полезно для операций чтения-модификации-записи, таких как atomic_compare_exchange, где необходимо обеспечить видимость изменений до и после операции);
  • memory_order_seq_cst — самый строгий порядок. Операции с этой моделью памяти выполняются с полным упорядочиванием относительно других seq_cst операций — все потоки видят операции в одном и том же порядке (обеспечивает наибольшие гарантии порядка, но может быть менее производительным по сравнению с более слабыми порядками).

Такой подход позволяет контролировать, как операции могут быть переупорядочены друг относительно друга, что позволяет выставить баланс между производительностью и строгостью гарантий синхронизации.

Для увеличения и уменьшения атомарной переменной нужно использовать функции:

// Прибавление
int oldValAdd = atomic_fetch_add(&atomicVar, 5); // Увеличивает atomicVar на 5, возвращает старое значение

// Вычитание
int oldValSub = atomic_fetch_sub(&atomicVar, 3); // Уменьшает atomicVar на 3, возвращает старое значение

Данные функции возвращают оригинальное значение (до изменений).

Для логических операций, требующих изменения атомарной переменной, необходимо использовать:

// Битовое И
int oldValAnd = atomic_fetch_and(&atomicVar, 0xF); // Применяет битовое И к atomicVar с 0xF, возвращает старое значение

// Битовое ИЛИ
int oldValOr = atomic_fetch_or(&atomicVar, 0x1); // Применяет битовое ИЛИ к atomicVar с 0x1, возвращает старое значение

// Исключающее ИЛИ
int oldValXor = atomic_fetch_xor(&atomicVar, 0xFF); // Применяет исключающее ИЛИ к atomicVar с 0xFF, возвращает старое значение

Для реализации более сложных манипуляций используются следующие функции — атомарный обмен и сравнительный обмен:

// Обмен
int oldValExchange = atomic_exchange(&atomicVar, 50); // Устанавливает atomicVar в 50, возвращает старое значение

// Сравнительный обмен
int expected = 50;
int exchanged = atomic_compare_exchange_strong(&atomicVar, &expected, 100);
// Если atomicVar равен expected (50), устанавливает atomicVar в 100
// Возвращает не ноль, если обмен выполнен, и ноль в противном случае

Первая функция atomic_exchange записывает в атомарную переменную заданное значение, возвращая её предыдущее. Вторая функция atomic_compare_exchange_strong принимает в качестве дополнительного аргумента указатель на ожидаемое значение. Данное «ожидаемое значение» используется для сравнения со значением, записанным в атомарной переменной, и может быть полезно для проверки «устаревания» оригинального значения атомарной переменной, использованного в расчетах.

Функция atomic_compare_exchange имеет несколько разновидностей:

  • atomic_compare_exchange_strong — обновляет значение атомарной переменной в случае совпадения с ожидаемым значением;
  • atomic_compare_exchange_weak — может «ложно сбоить», когда ожидаемое значение совпадает со значением в атомарной переменной: при таком подходе «weak» версии могут быть более эффективны на некоторых платформах за счет меньших требований к синхронизации (например в циклах, где операции могут быть повторены без значительных накладных расходов);
  • atomic_compare_exchange_weak_explicit — то же самое что и atomic_compare_exchange_weak, но с возможностью указать модель памяти для операции;
  • atomic_compare_exchange_strong_explicit — то же самое что и atomic_compare_exchange_strong, но с возможностью указать модель памяти для операции.

Так как C11 не предоставляет функций для атомарного умножения — можно использовать функцию atomic_compare_exchange_strong для её реализации:

int atomic_multiply(atomic_int* obj, int multiplier) 
{
    int old_val = atomic_load(obj);
    int new_val;
    do 
    {
        new_val = old_val * multiplier;
        // Пытаемся обновить значение, если оно не изменилось с момента последнего чтения
    } 
    while (!atomic_compare_exchange_strong(obj, &old_val, new_val));

    return old_val;
}

Примечание: используется именно strong версия во избежание повторных вычислений произведения.

Атомарные барьеры памяти

Стандарт C11 ввел барьеры памяти, которые используются для определения строгих гарантий порядка видимости модификаций памяти между потоками.

Барьеры памяти могут быть полезны для:

  • предотвращения непредсказуемого переупорядочивания чтений и записей памяти компилятором и процессором, что может привести к нарушению предполагаемого порядка выполнения и видимости данных;
  • обеспечения корректной синхронизации данных между потоками, гарантируя, что изменения, выполненные одним потоком, будут видны другим потокам в строго определенном порядке;
  • реализации механизмов синхронизации без блокировок, таких как очереди сообщений, атомарные счетчики и другие структуры данных, требующие точного контроля порядка операций.

Функция atomic_thread_fence устанавливает барьер памяти для операций в текущем потоке. Аргумент memory_order данной функции определяет тип барьера. гарантирует, что все операции с памятью до барьера, определенные его memory_order, будут выполнены до всех операций с памятью после барьера.

Функция atomic_signal_fence используется для установки барьера памяти, который влияет только на операции с памятью, оптимизированные компилятором, и не влияет на порядок, с которым изменения памяти становятся видимы другим потокам. Этот барьер в основном используется для синхронизации потока с обработчиками сигналов в том же потоке.

Рассмотрим пример использования atomic_thread_fence.

Предположим, что есть поток-писатель, который инициализирует структуру данных и затем устанавливает флаг, сигнализирующий о готовности данных, а так же есть поток-читатель, который ожидает этот флаг и затем безопасно обращается к данным. Чтобы гарантировать, что поток-читатель видит полностью инициализированную структуру данных после проверки флага, будут использованы барьеры памяти.

#include <stdatomic.h>
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>

typedef struct 
{
    int value1;
    int value2;
} ComplexData;

ComplexData* shared_data = NULL;
atomic_int data_ready = ATOMIC_VAR_INIT(0);

void* writer_thread(void* arg) 
{
    // Имитация работы
    sleep(1);

    // Подготовка данных
    ComplexData* data = (ComplexData*)malloc(sizeof(ComplexData));
    data->value1 = 42;
    data->value2 = 84;

    // Используем барьер памяти для гарантии публикации данных
    shared_data = data;
    atomic_thread_fence(memory_order_release);
    
    atomic_store_explicit(&data_ready, 1, memory_order_release);

    return NULL;
}

void* reader_thread(void* arg) 
{
    // Ожидание готовности данных
    while (atomic_load_explicit(&data_ready, memory_order_acquire) == 0) 
    {
        // Активное ожидание
    }

    // Данные гарантированно опубликованы, можно безопасно читать
    atomic_thread_fence(memory_order_acquire);
    ComplexData* data = shared_data;
    
    printf("Получены данные: value1 = %d, value2 = %d\n", data->value1, data->value2);
    
    free(data);

    return NULL;
}

int main() 
{
    pthread_t writer, reader;

    pthread_create(&writer, NULL, writer_thread, NULL);
    pthread_create(&reader, NULL, reader_thread, NULL);

    pthread_join(writer, NULL);
    pthread_join(reader, NULL);

    return 0;
}

Здесь atomic_thread_fence может быть использован для обеспечения корректной видимости инициализированных данных между потоками в многопоточной программе без необходимости блокировок.

Поток-писатель (writer_thread) инициализирует структуру данных и устанавливает shared_data для указания на неё. Затем он использует atomic_thread_fence(memory_order_release) перед установкой флага data_ready, чтобы гарантировать, что все предыдущие записи в память (инициализация данных) станут видимы другим потокам до или одновременно с записью флага data_ready.

Поток-читатель (reader_thread) ожидает, пока data_ready станет равным 1, используя memory_order_acquire для загрузки этого флага. Это обеспечивает, что поток-читатель видит все записи в память, сделанные потоком-писателем до установки флага data_ready.

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *

Этот сайт использует Akismet для борьбы со спамом. Узнайте, как обрабатываются ваши данные комментариев.