Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert __sync* built-ins to std::atomic #265

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/thread_disk.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ thread_disk::call_events() {
if ((m_flags & flag_did_shutdown))
throw internal_error("Already trigged shutdown.");

__sync_or_and_fetch(&m_flags, flag_did_shutdown);
m_flags |= flag_did_shutdown;
throw shutdown_exception();
}

Expand Down
2 changes: 0 additions & 2 deletions src/torrent/poll_epoll.cc
Original file line number Diff line number Diff line change
Expand Up @@ -203,13 +203,11 @@ PollEPoll::do_poll(int64_t timeout_usec, int flags) {

if (!(flags & poll_worker_thread)) {
thread_base::release_global_lock();
thread_base::entering_main_polling();
}

int status = poll((timeout.usec() + 999) / 1000);

if (!(flags & poll_worker_thread)) {
thread_base::leaving_main_polling();
thread_base::acquire_global_lock();
}

Expand Down
2 changes: 0 additions & 2 deletions src/torrent/poll_kqueue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -262,13 +262,11 @@ PollKQueue::do_poll(int64_t timeout_usec, int flags) {

if (!(flags & poll_worker_thread)) {
thread_base::release_global_lock();
thread_base::entering_main_polling();
}

int status = poll((timeout.usec() + 999) / 1000);

if (!(flags & poll_worker_thread)) {
thread_base::leaving_main_polling();
thread_base::acquire_global_lock();
}

Expand Down
2 changes: 0 additions & 2 deletions src/torrent/poll_select.cc
Original file line number Diff line number Diff line change
Expand Up @@ -243,14 +243,12 @@ PollSelect::do_poll(int64_t timeout_usec, int flags) {
timeval t = timeout.tval();

if (!(flags & poll_worker_thread)) {
thread_base::entering_main_polling();
thread_base::release_global_lock();
}

int status = select(maxFd + 1, read_set, write_set, error_set, &t);

if (!(flags & poll_worker_thread)) {
thread_base::leaving_main_polling();
thread_base::acquire_global_lock();
}

Expand Down
4 changes: 2 additions & 2 deletions src/torrent/utils/signal_bitfield.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ signal_bitfield::add_signal(slot_type slot) {
throw internal_error("signal_bitfield::add_signal(...): Cannot add empty slot.");

unsigned int index = m_size;
__sync_add_and_fetch(&m_size, 1);
++m_size;

m_slots[index] = slot;
return index;
Expand All @@ -28,7 +28,7 @@ void
signal_bitfield::work() {
bitfield_type bitfield;

while (!__sync_bool_compare_and_swap(&m_bitfield, (bitfield = m_bitfield), 0))
while (!m_bitfield.compare_exchange_weak(bitfield, 0))
; // Do nothing.

unsigned int i = 0;
Expand Down
14 changes: 8 additions & 6 deletions src/torrent/utils/signal_bitfield.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@
#ifndef LIBTORRENT_UTILS_SIGNAL_BITFIELD_H
#define LIBTORRENT_UTILS_SIGNAL_BITFIELD_H

#include "config.h"

#include <functional>
#include <atomic>

#include <torrent/common.h>

Expand All @@ -48,22 +51,21 @@ class LIBTORRENT_EXPORT lt_cacheline_aligned signal_bitfield {
typedef uint32_t bitfield_type;
typedef std::function<void ()> slot_type;

static const unsigned int max_size = 32;
static constexpr unsigned int max_size = 32;

signal_bitfield() : m_bitfield(0), m_size(0) {}
signal_bitfield() = default;

bool has_signal(unsigned int index) const { return m_bitfield & (1 << index); }

// Do the interrupt from the thread?
void signal(unsigned int index) { __sync_or_and_fetch(&m_bitfield, 1 << index); }
void signal(unsigned int index) { m_bitfield |= 1 << index; }
void work();

unsigned int add_signal(slot_type slot);

private:

bitfield_type m_bitfield;
unsigned int m_size;
std::atomic<bitfield_type> m_bitfield{0};
std::atomic<unsigned int> m_size{0};
slot_type m_slots[max_size] lt_cacheline_aligned;
};

Expand Down
13 changes: 7 additions & 6 deletions src/torrent/utils/thread_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@

namespace torrent {

thread_base::global_lock_type lt_cacheline_aligned thread_base::m_global = { 0, 0, PTHREAD_MUTEX_INITIALIZER };
std::mutex thread_base::m_globalLock;
std::atomic<int> thread_base::m_waiting;

thread_base::thread_base() :
m_state(STATE_UNKNOWN),
Expand Down Expand Up @@ -56,7 +57,7 @@ thread_base::start_thread() {

void
thread_base::stop_thread() {
__sync_fetch_and_or(&m_flags, flag_do_shutdown);
m_flags |= flag_do_shutdown;
interrupt();
}

Expand Down Expand Up @@ -94,7 +95,7 @@ thread_base::event_loop(thread_base* thread) {
if (!thread->is_initialized())
throw internal_error("thread_base::event_loop call on an uninitialized object");

__sync_lock_test_and_set(&thread->m_state, STATE_ACTIVE);
thread->m_state = STATE_ACTIVE;

#if defined(HAS_PTHREAD_SETNAME_NP_DARWIN)
pthread_setname_np(thread->name());
Expand All @@ -117,7 +118,7 @@ thread_base::event_loop(thread_base* thread) {
thread->call_events();
thread->signal_bitfield()->work();

__sync_fetch_and_or(&thread->m_flags, flag_polling);
thread->m_flags |= flag_polling;

// Call again after setting flag_polling to ensure we process
// any events set while it was working.
Expand Down Expand Up @@ -152,7 +153,7 @@ thread_base::event_loop(thread_base* thread) {
instrumentation_update(INSTRUMENTATION_POLLING_EVENTS, event_count);
instrumentation_update(instrumentation_enum(INSTRUMENTATION_POLLING_EVENTS + thread->m_instrumentation_index), event_count);

__sync_fetch_and_and(&thread->m_flags, ~(flag_polling | flag_no_timeout));
thread->m_flags &= ~(flag_polling | flag_no_timeout);
}

// #ifdef USE_INTERRUPT_SOCKET
Expand All @@ -163,7 +164,7 @@ thread_base::event_loop(thread_base* thread) {
lt_log_print(torrent::LOG_THREAD_NOTICE, "%s: Shutting down thread.", thread->name());
}

__sync_lock_test_and_set(&thread->m_state, STATE_INACTIVE);
thread->m_state = STATE_INACTIVE;
return NULL;
}

Expand Down
51 changes: 13 additions & 38 deletions src/torrent/utils/thread_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#define LIBTORRENT_UTILS_THREAD_BASE_H

#import <functional>
#import <mutex>
#import <pthread.h>
#import <sys/types.h>

Expand Down Expand Up @@ -69,36 +70,29 @@ class LIBTORRENT_EXPORT lt_cacheline_aligned thread_base {
slot_void& slot_do_work() { return m_slot_do_work; }
slot_timer& slot_next_timeout() { return m_slot_next_timeout; }

static inline int global_queue_size() { return m_global.waiting; }
static inline int global_queue_size() { return m_waiting; }

static inline void acquire_global_lock();
static inline bool trylock_global_lock();
static inline void release_global_lock();
static inline void waive_global_lock();

static inline bool is_main_polling() { return m_global.main_polling; }
static inline void entering_main_polling();
static inline void leaving_main_polling();

static bool should_handle_sigusr1();

static void* event_loop(thread_base* thread);

protected:
struct lt_cacheline_aligned global_lock_type {
int waiting;
int main_polling;
pthread_mutex_t lock;
};
static std::mutex lt_cacheline_aligned m_globalLock;
static std::atomic<int> lt_cacheline_aligned m_waiting;

virtual void call_events() = 0;
virtual int64_t next_timeout_usec() = 0;

static global_lock_type m_global;

pthread_t m_thread;
state_type m_state lt_cacheline_aligned;
int m_flags lt_cacheline_aligned;

std::atomic<state_type> lt_cacheline_aligned m_state{STATE_UNKNOWN};
std::atomic<int> lt_cacheline_aligned m_flags{0};

int m_instrumentation_index;

Expand All @@ -124,13 +118,11 @@ thread_base::is_current() const {

inline int
thread_base::flags() const {
__sync_synchronize();
return m_flags;
}

inline thread_base::state_type
thread_base::state() const {
__sync_synchronize();
return m_state;
}

Expand All @@ -144,46 +136,29 @@ thread_base::send_event_signal(unsigned int index, bool do_interrupt) {

inline void
thread_base::acquire_global_lock() {
__sync_add_and_fetch(&thread_base::m_global.waiting, 1);
pthread_mutex_lock(&thread_base::m_global.lock);
__sync_sub_and_fetch(&thread_base::m_global.waiting, 1);
++m_waiting;
m_globalLock.lock();
--m_waiting;
}

inline bool
thread_base::trylock_global_lock() {
return pthread_mutex_trylock(&thread_base::m_global.lock) == 0;
return thread_base::m_globalLock.try_lock();
}

inline void
thread_base::release_global_lock() {
pthread_mutex_unlock(&thread_base::m_global.lock);
return thread_base::m_globalLock.unlock();
}

inline void
thread_base::waive_global_lock() {
pthread_mutex_unlock(&thread_base::m_global.lock);
thread_base::m_globalLock.unlock();

// Do we need to sleep here? Make a CppUnit test for this.
acquire_global_lock();
}

// 'entering/leaving_main_polling' is used by the main polling thread
// to indicate to other threads when it is safe to change the main
// thread's event entries.
//
// A thread should first aquire global lock, then if it needs to
// change poll'ed sockets on the main thread it should call
// 'interrupt_main_polling' unless 'is_main_polling() == false'.
inline void
thread_base::entering_main_polling() {
__sync_lock_test_and_set(&thread_base::m_global.main_polling, 1);
}

inline void
thread_base::leaving_main_polling() {
__sync_lock_test_and_set(&thread_base::m_global.main_polling, 0);
}

}

#endif
6 changes: 4 additions & 2 deletions src/torrent/utils/thread_interrupt.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ thread_interrupt::~thread_interrupt() {

bool
thread_interrupt::poke() {
if (!__sync_bool_compare_and_swap(&m_other->m_poking, false, true))
bool flag = false;
if (!m_other->m_poking.compare_exchange_strong(flag, true))
return true;

int result = ::send(m_fileDesc, "a", 1, 0);
Expand Down Expand Up @@ -104,7 +105,8 @@ thread_interrupt::event_read() {

instrumentation_update(INSTRUMENTATION_POLLING_INTERRUPT_READ_EVENT, 1);

__sync_bool_compare_and_swap(&m_poking, true, false);
m_poking = false;
}

}

5 changes: 4 additions & 1 deletion src/torrent/utils/thread_interrupt.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@
#ifndef LIBTORRENT_UTILS_THREAD_INTERRUPT_H
#define LIBTORRENT_UTILS_THREAD_INTERRUPT_H

#include "config.h"
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should never be in a header file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have no quarrel with that, but it throws off clang-tidy since it can't infer what lt_cacheline_aligned is.


#include <utility>
#include <atomic>
#include <torrent/event.h>

namespace torrent {
Expand Down Expand Up @@ -66,7 +69,7 @@ class LIBTORRENT_EXPORT lt_cacheline_aligned thread_interrupt : public Event {
SocketFd& get_fd() { return *reinterpret_cast<SocketFd*>(&m_fileDesc); }

thread_interrupt* m_other;
bool m_poking lt_cacheline_aligned;
std::atomic<bool> m_poking lt_cacheline_aligned;
};

inline bool
Expand Down
Loading