Skip to content

Commit

Permalink
Fix/atomic types 2 (#267)
Browse files Browse the repository at this point in the history
Migrated __sync_* calls to std::atomic.
  • Loading branch information
rakshasa committed Jan 20, 2025
1 parent 7f761f8 commit 1150f62
Show file tree
Hide file tree
Showing 13 changed files with 83 additions and 157 deletions.
2 changes: 1 addition & 1 deletion .clang-format
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ AlignConsecutiveDeclarations:
AlignConsecutiveMacros:
Enabled: true
AlignConsecutiveAssignments:
Enabled: true
Enabled: true

IncludeCategories:
- Regex: "^(config|globals)\\.h"
Expand Down
2 changes: 1 addition & 1 deletion .clang-tidy
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ CheckOptions:
- key: readability-identifier-naming.PrivateMemberPrefix
value: m_
- key: readability-identifier-naming.PrivateMemberCase
value: camelBack
value: lower_case
- key: readability-identifier-naming.ClassConstantCase
value: lower_case
20 changes: 8 additions & 12 deletions src/torrent/utils/thread_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,31 +13,27 @@

namespace torrent {

thread_base::global_lock_type lt_cacheline_aligned thread_base::m_global = { 0, PTHREAD_MUTEX_INITIALIZER };
thread_base::global_lock_type thread_base::m_global;

thread_base::thread_base() :
m_state(STATE_UNKNOWN),
m_flags(0),
m_instrumentation_index(INSTRUMENTATION_POLLING_DO_POLL_OTHERS - INSTRUMENTATION_POLLING_DO_POLL),

m_poll(NULL),
m_interrupt_sender(NULL),
m_interrupt_receiver(NULL)
m_poll(NULL)
{
std::memset(&m_thread, 0, sizeof(pthread_t));

// #ifdef USE_INTERRUPT_SOCKET
thread_interrupt::pair_type interrupt_sockets = thread_interrupt::create_pair();

m_interrupt_sender = interrupt_sockets.first;
m_interrupt_receiver = interrupt_sockets.second;
m_interrupt_sender = std::move(interrupt_sockets.first);
m_interrupt_receiver = std::move(interrupt_sockets.second);

// #endif
}

thread_base::~thread_base() {
delete m_interrupt_sender;
delete m_interrupt_receiver;
}
thread_base::~thread_base() = default;

void
thread_base::start_thread() {
Expand Down Expand Up @@ -106,7 +102,7 @@ thread_base::event_loop(thread_base* thread) {
try {

// #ifdef USE_INTERRUPT_SOCKET
thread->m_poll->insert_read(thread->m_interrupt_receiver);
thread->m_poll->insert_read(thread->m_interrupt_receiver.get());
// #endif

while (true) {
Expand Down Expand Up @@ -155,7 +151,7 @@ thread_base::event_loop(thread_base* thread) {
}

// #ifdef USE_INTERRUPT_SOCKET
thread->m_poll->remove_write(thread->m_interrupt_receiver);
thread->m_poll->remove_write(thread->m_interrupt_receiver.get());
// #endif

} catch (torrent::shutdown_exception& e) {
Expand Down
25 changes: 12 additions & 13 deletions src/torrent/utils/thread_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#include <atomic>
#include <functional>
#include <mutex>
#include <pthread.h>
#include <sys/types.h>

Expand Down Expand Up @@ -83,8 +84,8 @@ class LIBTORRENT_EXPORT thread_base {

protected:
struct global_lock_type {
int waiting;
pthread_mutex_t lock;
std::atomic_int waiting{0};
std::mutex mutex;
};

virtual void call_events() = 0;
Expand All @@ -94,7 +95,7 @@ class LIBTORRENT_EXPORT thread_base {

pthread_t m_thread;
std::atomic<state_type> m_state;
std::atomic<int> m_flags;
std::atomic_int m_flags;

int m_instrumentation_index;

Expand All @@ -104,8 +105,8 @@ class LIBTORRENT_EXPORT thread_base {
slot_void m_slot_do_work;
slot_timer m_slot_next_timeout;

thread_interrupt* m_interrupt_sender;
thread_interrupt* m_interrupt_receiver;
std::unique_ptr<thread_interrupt> m_interrupt_sender;
std::unique_ptr<thread_interrupt> m_interrupt_receiver;
};

inline bool
Expand All @@ -128,26 +129,24 @@ thread_base::send_event_signal(unsigned int index, bool do_interrupt) {

inline void
thread_base::acquire_global_lock() {
__sync_add_and_fetch(&thread_base::m_global.waiting, 1);
pthread_mutex_lock(&thread_base::m_global.lock);
__sync_sub_and_fetch(&thread_base::m_global.waiting, 1);
thread_base::m_global.waiting++;
thread_base::m_global.mutex.lock();
thread_base::m_global.waiting--;
}

inline bool
thread_base::trylock_global_lock() {
return pthread_mutex_trylock(&thread_base::m_global.lock) == 0;
return thread_base::m_global.mutex.try_lock();
}

inline void
thread_base::release_global_lock() {
pthread_mutex_unlock(&thread_base::m_global.lock);
thread_base::m_global.mutex.unlock();
}

inline void
thread_base::waive_global_lock() {
pthread_mutex_unlock(&thread_base::m_global.lock);

// Do we need to sleep here? Make a CppUnit test for this.
release_global_lock();
acquire_global_lock();
}

Expand Down
12 changes: 7 additions & 5 deletions src/torrent/utils/thread_interrupt.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,15 @@ thread_interrupt::create_pair() {
if (!SocketFd::open_socket_pair(fd1, fd2))
throw internal_error("Could not create socket pair for thread_interrupt: " + std::string(rak::error_number::current().c_str()) + ".");

thread_interrupt* t1 = new thread_interrupt(fd1);
thread_interrupt* t2 = new thread_interrupt(fd2);
pair_type result;

t1->m_other = t2;
t2->m_other = t1;
result.first = std::unique_ptr<thread_interrupt>(new thread_interrupt(fd1));
result.second = std::unique_ptr<thread_interrupt>(new thread_interrupt(fd2));

return pair_type(t1, t2);
result.first->m_other = result.second.get();
result.second->m_other = result.first.get();

return result;
}

void
Expand Down
3 changes: 2 additions & 1 deletion src/torrent/utils/thread_interrupt.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#define LIBTORRENT_UTILS_THREAD_INTERRUPT_H

#include <atomic>
#include <memory>
#include <utility>
#include <torrent/event.h>

Expand All @@ -11,7 +12,7 @@ class SocketFd;

class LIBTORRENT_EXPORT thread_interrupt : public Event {
public:
typedef std::pair<thread_interrupt*, thread_interrupt*> pair_type;
typedef std::pair<std::unique_ptr<thread_interrupt>, std::unique_ptr<thread_interrupt>> pair_type;

~thread_interrupt();

Expand Down
64 changes: 13 additions & 51 deletions src/utils/instrumentation.cc
Original file line number Diff line number Diff line change
@@ -1,67 +1,29 @@
// libTorrent - BitTorrent library
// Copyright (C) 2005-2011, Jari Sundell
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 2 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
//
// In addition, as a special exception, the copyright holders give
// permission to link the code of portions of this program with the
// OpenSSL library under certain conditions as described in each
// individual source file, and distribute linked combinations
// including the two.
//
// You must obey the GNU General Public License in all respects for
// all of the code used other than OpenSSL. If you modify file(s)
// with this exception, you may extend this exception to your version
// of the file(s), but you are not obligated to do so. If you do not
// wish to do so, delete this exception statement from your version.
// If you delete this exception statement from all source files in the
// program, then also delete it here.
//
// Contact: Jari Sundell <[email protected]>
//
// Skomakerveien 33
// 3185 Skoppum, NORWAY

#include "config.h"

#include "instrumentation.h"

namespace torrent {

std::array<int64_t, INSTRUMENTATION_MAX_SIZE> instrumentation_values lt_cacheline_aligned;
std::array<std::atomic_int64_t, INSTRUMENTATION_MAX_SIZE> instrumentation_values;

inline int64_t
instrumentation_fetch_and_clear(instrumentation_enum type) {
#ifdef LT_INSTRUMENTATION
return __sync_fetch_and_and(&instrumentation_values[type], int64_t());
return instrumentation_values[type].exchange(0);
#else
return instrumentation_values[type] = 0;
return 0;
#endif
}

void
instrumentation_tick() {
// Since the values are updated with __sync_add, they can be read
// without any memory barriers.
lt_log_print(LOG_INSTRUMENTATION_MEMORY,
"%" PRIi64 " %" PRIi64 " %" PRIi64 " %" PRIi64 " %" PRIi64,
instrumentation_values[INSTRUMENTATION_MEMORY_CHUNK_USAGE],
instrumentation_values[INSTRUMENTATION_MEMORY_CHUNK_COUNT],
instrumentation_values[INSTRUMENTATION_MEMORY_HASHING_CHUNK_USAGE],
instrumentation_values[INSTRUMENTATION_MEMORY_HASHING_CHUNK_COUNT],
instrumentation_values[INSTRUMENTATION_MEMORY_BITFIELDS]);
instrumentation_values[INSTRUMENTATION_MEMORY_CHUNK_USAGE].load(),
instrumentation_values[INSTRUMENTATION_MEMORY_CHUNK_COUNT].load(),
instrumentation_values[INSTRUMENTATION_MEMORY_HASHING_CHUNK_USAGE].load(),
instrumentation_values[INSTRUMENTATION_MEMORY_HASHING_CHUNK_COUNT].load(),
instrumentation_values[INSTRUMENTATION_MEMORY_BITFIELDS].load());

lt_log_print(LOG_INSTRUMENTATION_MINCORE,
"%" PRIi64 " %" PRIi64 " %" PRIi64 " %" PRIi64 " %" PRIi64
Expand Down Expand Up @@ -117,24 +79,24 @@ instrumentation_tick() {
instrumentation_fetch_and_clear(INSTRUMENTATION_TRANSFER_REQUESTS_QUEUED_ADDED),
instrumentation_fetch_and_clear(INSTRUMENTATION_TRANSFER_REQUESTS_QUEUED_MOVED),
instrumentation_fetch_and_clear(INSTRUMENTATION_TRANSFER_REQUESTS_QUEUED_REMOVED),
instrumentation_values[INSTRUMENTATION_TRANSFER_REQUESTS_QUEUED_TOTAL],
instrumentation_values[INSTRUMENTATION_TRANSFER_REQUESTS_QUEUED_TOTAL].load(),

instrumentation_fetch_and_clear(INSTRUMENTATION_TRANSFER_REQUESTS_UNORDERED_ADDED),
instrumentation_fetch_and_clear(INSTRUMENTATION_TRANSFER_REQUESTS_UNORDERED_MOVED),
instrumentation_fetch_and_clear(INSTRUMENTATION_TRANSFER_REQUESTS_UNORDERED_REMOVED),
instrumentation_values[INSTRUMENTATION_TRANSFER_REQUESTS_UNORDERED_TOTAL],
instrumentation_values[INSTRUMENTATION_TRANSFER_REQUESTS_UNORDERED_TOTAL].load(),

instrumentation_fetch_and_clear(INSTRUMENTATION_TRANSFER_REQUESTS_STALLED_ADDED),
instrumentation_fetch_and_clear(INSTRUMENTATION_TRANSFER_REQUESTS_STALLED_MOVED),
instrumentation_fetch_and_clear(INSTRUMENTATION_TRANSFER_REQUESTS_STALLED_REMOVED),
instrumentation_values[INSTRUMENTATION_TRANSFER_REQUESTS_STALLED_TOTAL],
instrumentation_values[INSTRUMENTATION_TRANSFER_REQUESTS_STALLED_TOTAL].load(),

instrumentation_fetch_and_clear(INSTRUMENTATION_TRANSFER_REQUESTS_CHOKED_ADDED),
instrumentation_fetch_and_clear(INSTRUMENTATION_TRANSFER_REQUESTS_CHOKED_MOVED),
instrumentation_fetch_and_clear(INSTRUMENTATION_TRANSFER_REQUESTS_CHOKED_REMOVED),
instrumentation_values[INSTRUMENTATION_TRANSFER_REQUESTS_CHOKED_TOTAL],
instrumentation_values[INSTRUMENTATION_TRANSFER_REQUESTS_CHOKED_TOTAL].load(),

instrumentation_values[INSTRUMENTATION_TRANSFER_PEER_INFO_UNACCOUNTED]);
instrumentation_values[INSTRUMENTATION_TRANSFER_PEER_INFO_UNACCOUNTED].load());
}

void
Expand Down
41 changes: 3 additions & 38 deletions src/utils/instrumentation.h
Original file line number Diff line number Diff line change
@@ -1,44 +1,9 @@
// libTorrent - BitTorrent library
// Copyright (C) 2005-2011, Jari Sundell
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 2 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
//
// In addition, as a special exception, the copyright holders give
// permission to link the code of portions of this program with the
// OpenSSL library under certain conditions as described in each
// individual source file, and distribute linked combinations
// including the two.
//
// You must obey the GNU General Public License in all respects for
// all of the code used other than OpenSSL. If you modify file(s)
// with this exception, you may extend this exception to your version
// of the file(s), but you are not obligated to do so. If you do not
// wish to do so, delete this exception statement from your version.
// If you delete this exception statement from all source files in the
// program, then also delete it here.
//
// Contact: Jari Sundell <[email protected]>
//
// Skomakerveien 33
// 3185 Skoppum, NORWAY

#ifndef LIBTORRENT_UTILS_INSTRUMENTATION_H
#define LIBTORRENT_UTILS_INSTRUMENTATION_H

#include <algorithm>
#include <array>
#include <atomic>

#include "torrent/common.h"
#include "torrent/utils/log.h"
Expand Down Expand Up @@ -106,7 +71,7 @@ enum instrumentation_enum {
INSTRUMENTATION_MAX_SIZE
};

extern std::array<int64_t, INSTRUMENTATION_MAX_SIZE> instrumentation_values lt_cacheline_aligned;
extern std::array<std::atomic_int64_t, INSTRUMENTATION_MAX_SIZE> instrumentation_values;

void instrumentation_initialize();
void instrumentation_update(instrumentation_enum type, int64_t change);
Expand All @@ -125,7 +90,7 @@ instrumentation_initialize() {
inline void
instrumentation_update(instrumentation_enum type, int64_t change) {
#ifdef LT_INSTRUMENTATION
__sync_add_and_fetch(&instrumentation_values[type], change);
instrumentation_values[type] += change;
#endif
}

Expand Down
10 changes: 5 additions & 5 deletions test/helpers/test_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,19 @@ test_thread::init_thread() {
void
test_thread::call_events() {
if ((m_test_flags & test_flag_pre_stop) && m_test_state == TEST_PRE_START && m_state == STATE_ACTIVE)
__sync_lock_test_and_set(&m_test_state, TEST_PRE_STOP);
m_test_state = TEST_PRE_STOP;

if ((m_test_flags & test_flag_acquire_global)) {
acquire_global_lock();
__sync_and_and_fetch(&m_test_flags, ~test_flag_acquire_global);
__sync_or_and_fetch(&m_test_flags, test_flag_has_global);
m_test_flags &= ~test_flag_acquire_global;
m_test_flags |= test_flag_has_global;
}

if ((m_flags & flag_do_shutdown)) {
if ((m_flags & flag_did_shutdown))
throw torrent::internal_error("Already trigged shutdown.");

__sync_or_and_fetch(&m_flags, flag_did_shutdown);
m_flags |= flag_did_shutdown;
throw torrent::shutdown_exception();
}

Expand All @@ -55,7 +55,7 @@ test_thread::call_events() {

if ((m_test_flags & test_flag_do_work)) {
usleep(10 * 1000); // TODO: Don't just sleep, as that give up core.
__sync_and_and_fetch(&m_test_flags, ~test_flag_do_work);
m_test_flags &= ~test_flag_do_work;
}

if ((m_test_flags & test_flag_post_poke)) {
Expand Down
Loading

0 comments on commit 1150f62

Please sign in to comment.