Skip to content

Commit

Permalink
Convert __sync* built-ins to std::atomic
Browse files Browse the repository at this point in the history
Also convert thread_base to use std::mutex instead of pthread_mutex
  • Loading branch information
kannibalox committed Dec 1, 2024
1 parent d43eb0a commit 6321d77
Show file tree
Hide file tree
Showing 17 changed files with 101 additions and 149 deletions.
2 changes: 1 addition & 1 deletion src/thread_disk.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ thread_disk::call_events() {
if ((m_flags & flag_did_shutdown))
throw internal_error("Already trigged shutdown.");

__sync_or_and_fetch(&m_flags, flag_did_shutdown);
m_flags |= flag_did_shutdown;
throw shutdown_exception();
}

Expand Down
2 changes: 0 additions & 2 deletions src/torrent/poll_epoll.cc
Original file line number Diff line number Diff line change
Expand Up @@ -203,13 +203,11 @@ PollEPoll::do_poll(int64_t timeout_usec, int flags) {

if (!(flags & poll_worker_thread)) {
thread_base::release_global_lock();
thread_base::entering_main_polling();
}

int status = poll((timeout.usec() + 999) / 1000);

if (!(flags & poll_worker_thread)) {
thread_base::leaving_main_polling();
thread_base::acquire_global_lock();
}

Expand Down
2 changes: 0 additions & 2 deletions src/torrent/poll_kqueue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -262,13 +262,11 @@ PollKQueue::do_poll(int64_t timeout_usec, int flags) {

if (!(flags & poll_worker_thread)) {
thread_base::release_global_lock();
thread_base::entering_main_polling();
}

int status = poll((timeout.usec() + 999) / 1000);

if (!(flags & poll_worker_thread)) {
thread_base::leaving_main_polling();
thread_base::acquire_global_lock();
}

Expand Down
2 changes: 0 additions & 2 deletions src/torrent/poll_select.cc
Original file line number Diff line number Diff line change
Expand Up @@ -243,14 +243,12 @@ PollSelect::do_poll(int64_t timeout_usec, int flags) {
timeval t = timeout.tval();

if (!(flags & poll_worker_thread)) {
thread_base::entering_main_polling();
thread_base::release_global_lock();
}

int status = select(maxFd + 1, read_set, write_set, error_set, &t);

if (!(flags & poll_worker_thread)) {
thread_base::leaving_main_polling();
thread_base::acquire_global_lock();
}

Expand Down
4 changes: 2 additions & 2 deletions src/torrent/utils/signal_bitfield.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ signal_bitfield::add_signal(slot_type slot) {
throw internal_error("signal_bitfield::add_signal(...): Cannot add empty slot.");

unsigned int index = m_size;
__sync_add_and_fetch(&m_size, 1);
++m_size;

m_slots[index] = slot;
return index;
Expand All @@ -28,7 +28,7 @@ void
signal_bitfield::work() {
bitfield_type bitfield;

while (!__sync_bool_compare_and_swap(&m_bitfield, (bitfield = m_bitfield), 0))
while (!m_bitfield.compare_exchange_weak(bitfield, 0))
; // Do nothing.

unsigned int i = 0;
Expand Down
14 changes: 8 additions & 6 deletions src/torrent/utils/signal_bitfield.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@
#ifndef LIBTORRENT_UTILS_SIGNAL_BITFIELD_H
#define LIBTORRENT_UTILS_SIGNAL_BITFIELD_H

#include "config.h"

#include <functional>
#include <atomic>

#include <torrent/common.h>

Expand All @@ -48,22 +51,21 @@ class LIBTORRENT_EXPORT lt_cacheline_aligned signal_bitfield {
typedef uint32_t bitfield_type;
typedef std::function<void ()> slot_type;

static const unsigned int max_size = 32;
static constexpr unsigned int max_size = 32;

signal_bitfield() : m_bitfield(0), m_size(0) {}
signal_bitfield() = default;

bool has_signal(unsigned int index) const { return m_bitfield & (1 << index); }

// Do the interrupt from the thread?
void signal(unsigned int index) { __sync_or_and_fetch(&m_bitfield, 1 << index); }
void signal(unsigned int index) { m_bitfield |= 1 << index; }
void work();

unsigned int add_signal(slot_type slot);

private:

bitfield_type m_bitfield;
unsigned int m_size;
std::atomic<bitfield_type> m_bitfield{0};
std::atomic<unsigned int> m_size{0};
slot_type m_slots[max_size] lt_cacheline_aligned;
};

Expand Down
13 changes: 7 additions & 6 deletions src/torrent/utils/thread_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@

namespace torrent {

thread_base::global_lock_type lt_cacheline_aligned thread_base::m_global = { 0, 0, PTHREAD_MUTEX_INITIALIZER };
std::mutex thread_base::m_globalLock;
std::atomic<int> thread_base::m_waiting;

thread_base::thread_base() :
m_state(STATE_UNKNOWN),
Expand Down Expand Up @@ -56,7 +57,7 @@ thread_base::start_thread() {

void
thread_base::stop_thread() {
__sync_fetch_and_or(&m_flags, flag_do_shutdown);
m_flags |= flag_do_shutdown;
interrupt();
}

Expand Down Expand Up @@ -94,7 +95,7 @@ thread_base::event_loop(thread_base* thread) {
if (!thread->is_initialized())
throw internal_error("thread_base::event_loop call on an uninitialized object");

__sync_lock_test_and_set(&thread->m_state, STATE_ACTIVE);
thread->m_state = STATE_ACTIVE;

#if defined(HAS_PTHREAD_SETNAME_NP_DARWIN)
pthread_setname_np(thread->name());
Expand All @@ -117,7 +118,7 @@ thread_base::event_loop(thread_base* thread) {
thread->call_events();
thread->signal_bitfield()->work();

__sync_fetch_and_or(&thread->m_flags, flag_polling);
thread->m_flags |= flag_polling;

// Call again after setting flag_polling to ensure we process
// any events set while it was working.
Expand Down Expand Up @@ -152,7 +153,7 @@ thread_base::event_loop(thread_base* thread) {
instrumentation_update(INSTRUMENTATION_POLLING_EVENTS, event_count);
instrumentation_update(instrumentation_enum(INSTRUMENTATION_POLLING_EVENTS + thread->m_instrumentation_index), event_count);

__sync_fetch_and_and(&thread->m_flags, ~(flag_polling | flag_no_timeout));
thread->m_flags &= ~(flag_polling | flag_no_timeout);
}

// #ifdef USE_INTERRUPT_SOCKET
Expand All @@ -163,7 +164,7 @@ thread_base::event_loop(thread_base* thread) {
lt_log_print(torrent::LOG_THREAD_NOTICE, "%s: Shutting down thread.", thread->name());
}

__sync_lock_test_and_set(&thread->m_state, STATE_INACTIVE);
thread->m_state = STATE_INACTIVE;
return NULL;
}

Expand Down
51 changes: 13 additions & 38 deletions src/torrent/utils/thread_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#define LIBTORRENT_UTILS_THREAD_BASE_H

#import <functional>
#import <mutex>
#import <pthread.h>
#import <sys/types.h>

Expand Down Expand Up @@ -69,36 +70,29 @@ class LIBTORRENT_EXPORT lt_cacheline_aligned thread_base {
slot_void& slot_do_work() { return m_slot_do_work; }
slot_timer& slot_next_timeout() { return m_slot_next_timeout; }

static inline int global_queue_size() { return m_global.waiting; }
static inline int global_queue_size() { return m_waiting; }

static inline void acquire_global_lock();
static inline bool trylock_global_lock();
static inline void release_global_lock();
static inline void waive_global_lock();

static inline bool is_main_polling() { return m_global.main_polling; }
static inline void entering_main_polling();
static inline void leaving_main_polling();

static bool should_handle_sigusr1();

static void* event_loop(thread_base* thread);

protected:
struct lt_cacheline_aligned global_lock_type {
int waiting;
int main_polling;
pthread_mutex_t lock;
};
static std::mutex lt_cacheline_aligned m_globalLock;
static std::atomic<int> lt_cacheline_aligned m_waiting;

virtual void call_events() = 0;
virtual int64_t next_timeout_usec() = 0;

static global_lock_type m_global;

pthread_t m_thread;
state_type m_state lt_cacheline_aligned;
int m_flags lt_cacheline_aligned;

std::atomic<state_type> lt_cacheline_aligned m_state{STATE_UNKNOWN};
std::atomic<int> lt_cacheline_aligned m_flags{0};

int m_instrumentation_index;

Expand All @@ -124,13 +118,11 @@ thread_base::is_current() const {

inline int
thread_base::flags() const {
__sync_synchronize();
return m_flags;
}

inline thread_base::state_type
thread_base::state() const {
__sync_synchronize();
return m_state;
}

Expand All @@ -144,46 +136,29 @@ thread_base::send_event_signal(unsigned int index, bool do_interrupt) {

inline void
thread_base::acquire_global_lock() {
__sync_add_and_fetch(&thread_base::m_global.waiting, 1);
pthread_mutex_lock(&thread_base::m_global.lock);
__sync_sub_and_fetch(&thread_base::m_global.waiting, 1);
++m_waiting;
m_globalLock.lock();
--m_waiting;
}

inline bool
thread_base::trylock_global_lock() {
return pthread_mutex_trylock(&thread_base::m_global.lock) == 0;
return thread_base::m_globalLock.try_lock();
}

inline void
thread_base::release_global_lock() {
pthread_mutex_unlock(&thread_base::m_global.lock);
return thread_base::m_globalLock.unlock();
}

inline void
thread_base::waive_global_lock() {
pthread_mutex_unlock(&thread_base::m_global.lock);
thread_base::m_globalLock.unlock();

// Do we need to sleep here? Make a CppUnit test for this.
acquire_global_lock();
}

// 'entering/leaving_main_polling' is used by the main polling thread
// to indicate to other threads when it is safe to change the main
// thread's event entries.
//
// A thread should first aquire global lock, then if it needs to
// change poll'ed sockets on the main thread it should call
// 'interrupt_main_polling' unless 'is_main_polling() == false'.
inline void
thread_base::entering_main_polling() {
__sync_lock_test_and_set(&thread_base::m_global.main_polling, 1);
}

inline void
thread_base::leaving_main_polling() {
__sync_lock_test_and_set(&thread_base::m_global.main_polling, 0);
}

}

#endif
6 changes: 4 additions & 2 deletions src/torrent/utils/thread_interrupt.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ thread_interrupt::~thread_interrupt() {

bool
thread_interrupt::poke() {
if (!__sync_bool_compare_and_swap(&m_other->m_poking, false, true))
bool flag = false;
if (!m_other->m_poking.compare_exchange_strong(flag, true))
return true;

int result = ::send(m_fileDesc, "a", 1, 0);
Expand Down Expand Up @@ -104,7 +105,8 @@ thread_interrupt::event_read() {

instrumentation_update(INSTRUMENTATION_POLLING_INTERRUPT_READ_EVENT, 1);

__sync_bool_compare_and_swap(&m_poking, true, false);
m_poking = false;
}

}

3 changes: 2 additions & 1 deletion src/torrent/utils/thread_interrupt.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#define LIBTORRENT_UTILS_THREAD_INTERRUPT_H

#include <utility>
#include <atomic>
#include <torrent/event.h>

namespace torrent {
Expand Down Expand Up @@ -66,7 +67,7 @@ class LIBTORRENT_EXPORT lt_cacheline_aligned thread_interrupt : public Event {
SocketFd& get_fd() { return *reinterpret_cast<SocketFd*>(&m_fileDesc); }

thread_interrupt* m_other;
bool m_poking lt_cacheline_aligned;
std::atomic<bool> m_poking lt_cacheline_aligned;
};

inline bool
Expand Down
Loading

0 comments on commit 6321d77

Please sign in to comment.