linuxdcpp-team team mailing list archive
-
linuxdcpp-team team
-
Mailing list archive
-
Message #01741
[Merge] lp:~gpr/linuxdcpp/sync into lp:linuxdcpp
Gennady Proskurin has proposed merging lp:~gpr/linuxdcpp/sync into lp:linuxdcpp.
Requested reviews:
LinuxDC++ Team (linuxdcpp-team)
Related bugs:
#617021 Semaphore potentially may underflow and become negative
https://bugs.launchpad.net/bugs/617021
#617591 Pointer.h/intrusive_ptr_base class is too heavy-weight
https://bugs.launchpad.net/bugs/617591
#617757 portable FastCriticalSection implementation
https://bugs.launchpad.net/bugs/617757
#617988 atomic counters implemented
https://bugs.launchpad.net/bugs/617988
+ includes all patches submitted in corresponding bug reports
+ Thread::safeDec/Inc/Exchange functions are replaced by more fine-grained/lightweight/portable implementations
+ Thread::safeDec/Inc/Exchange functions removed (as unused)
Bug #617021: Semaphore potentially may underflow and become negative
Bug #617591: Pointer.h/intrusive_ptr_base class is too heavy-weight
Bug #617757: portable FastCriticalSection implementation
Bug #617988: atomic counters implemented
--
https://code.launchpad.net/~gpr/linuxdcpp/sync/+merge/32714
Your team LinuxDC++ Team is requested to review the proposed merge of lp:~gpr/linuxdcpp/sync into lp:linuxdcpp.
=== added file 'dcpp/Atomic.h'
--- dcpp/Atomic.h 1970-01-01 00:00:00 +0000
+++ dcpp/Atomic.h 2010-08-15 19:35:53 +0000
@@ -0,0 +1,122 @@
+#if !defined(DCPP_ATOMIC_H)
+#define DCPP_ATOMIC_H
+
+#include "CriticalSection.h"
+
+#include <boost/interprocess/detail/atomic.hpp>
+#include <boost/cstdint.hpp>
+
+namespace dcpp {
+
+// Ordering arguments:
+
+// memory_ordering_weak
+// Suitable only for thread-safe accounting of some statistics.
+// Value can not be used as "flag" (you cannot do any multi-thread action, based
+// on this value) since it does not garantees necessary memory barriers.
+class memory_ordering_weak {};
+
+// memory_ordering_strong
+// Suitable for any multi-thread purpose
+class memory_ordering_strong {};
+
+template <typename DataType, class Ordering = memory_ordering_strong>
+class Atomic;
+
+
+// uint32_t
+template <>
+class Atomic<boost::uint32_t, memory_ordering_weak> {
+ typedef boost::uint32_t value_type;
+public:
+ Atomic(value_type val) { assign(val); }
+ Atomic(const Atomic& other) { assign(static_cast<value_type>(other)); }
+
+ // operator=
+ // return void to be safe
+ void operator=(value_type val) { assign(val); }
+ void operator=(const Atomic& other) {
+ return operator=(static_cast<value_type>(other));
+ }
+
+ // type cast
+ operator value_type() const {
+ return boost::interprocess::detail::atomic_read32(&m_value);
+ }
+
+ // increment
+ void inc() { boost::interprocess::detail::atomic_inc32(&m_value); }
+
+ // decrement
+ void dec() { boost::interprocess::detail::atomic_dec32(&m_value); }
+
+private:
+ mutable value_type m_value;
+ void assign(value_type val) { boost::interprocess::detail::atomic_write32(&m_value, val); }
+};
+
+// int32_t
+// just forward all operations to underlying Atomic<uint32_t, ...> variable
+template <>
+class Atomic<boost::int32_t, memory_ordering_weak> {
+ typedef boost::int32_t value_type;
+public:
+ Atomic(value_type val) : m_value(val) {}
+ Atomic(const Atomic& other) : m_value(other) {}
+
+ void operator=(value_type val) { m_value=val; }
+ void operator=(const Atomic& other) { m_value=other; }
+ operator value_type() const { return static_cast<value_type>(m_value); }
+
+ void inc() { m_value.inc(); }
+ void dec() { m_value.dec(); }
+private:
+ Atomic<boost::uint32_t,memory_ordering_weak> m_value;
+};
+
+// memory_ordering_strong
+template <typename DataType>
+class Atomic<DataType, memory_ordering_strong> {
+ typedef DataType value_type;
+public:
+ Atomic(value_type new_value) : m_value(new_value) {}
+ Atomic(const Atomic& other) : m_value(static_cast<value_type>(other)) {}
+
+ void operator=(value_type new_value) {
+ FastLock Lock(cs);
+ m_value = new_value;
+ }
+ void operator=(const Atomic& other) {
+ FastLock Lock(cs);
+ m_value = other;
+ }
+ operator value_type() const {
+ FastLock Lock(cs); // shared (read-only) lock would be sufficient here
+ return m_value;
+ }
+
+ void inc() {
+ FastLock Lock(cs);
+ ++m_value;
+ }
+ void dec() {
+ FastLock Lock(cs);
+ --m_value;
+ }
+
+ // assign new value, return old value
+ value_type exchange(value_type new_val) {
+ FastLock Lock(cs);
+ value_type old_val = m_value;
+ m_value = new_val;
+ return old_val;
+ }
+private:
+ value_type m_value;
+ mutable FastCriticalSection cs;
+};
+
+
+} // namespace dcpp
+
+#endif // !defined(DCPP_ATOMIC_H)
=== modified file 'dcpp/BufferedSocket.cpp'
--- dcpp/BufferedSocket.cpp 2009-08-15 04:40:26 +0000
+++ dcpp/BufferedSocket.cpp 2010-08-15 19:35:53 +0000
@@ -40,13 +40,13 @@
{
start();
- Thread::safeInc(sockets);
+ sockets.inc();
}
-volatile long BufferedSocket::sockets = 0;
+Atomic<long,memory_ordering_strong> BufferedSocket::sockets(0);
BufferedSocket::~BufferedSocket() throw() {
- Thread::safeDec(sockets);
+ sockets.dec();
}
void BufferedSocket::setMode (Modes aMode, size_t aRollback) {
=== modified file 'dcpp/BufferedSocket.h'
--- dcpp/BufferedSocket.h 2009-08-15 04:40:26 +0000
+++ dcpp/BufferedSocket.h 2010-08-15 19:35:53 +0000
@@ -26,6 +26,7 @@
#include "Speaker.h"
#include "Util.h"
#include "Socket.h"
+#include "Atomic.h"
namespace dcpp {
@@ -153,7 +154,10 @@
void threadSendData() throw(Exception);
void fail(const string& aError);
- static volatile long sockets;
+
+ // For this counter we must use "strong" variant of Atomic
+ // We do some actions after checking this value, while it changes in other threads
+ static Atomic<long,memory_ordering_strong> sockets;
bool checkEvents() throw(Exception);
void checkSocket() throw(Exception);
=== modified file 'dcpp/Client.cpp'
--- dcpp/Client.cpp 2009-08-15 04:40:26 +0000
+++ dcpp/Client.cpp 2010-08-15 19:35:53 +0000
@@ -158,24 +158,24 @@
void Client::updateCounts(bool aRemove) {
// We always remove the count and then add the correct one if requested...
if(countType == COUNT_NORMAL) {
- Thread::safeDec(counts.normal);
+ counts.normal.dec();
} else if(countType == COUNT_REGISTERED) {
- Thread::safeDec(counts.registered);
+ counts.registered.dec();
} else if(countType == COUNT_OP) {
- Thread::safeDec(counts.op);
+ counts.op.dec();
}
countType = COUNT_UNCOUNTED;
if(!aRemove) {
if(getMyIdentity().isOp()) {
- Thread::safeInc(counts.op);
+ counts.op.inc();
countType = COUNT_OP;
} else if(getMyIdentity().isRegistered()) {
- Thread::safeInc(counts.registered);
+ counts.registered.inc();
countType = COUNT_REGISTERED;
} else {
- Thread::safeInc(counts.normal);
+ counts.normal.inc();
countType = COUNT_NORMAL;
}
}
=== modified file 'dcpp/Client.h'
--- dcpp/Client.h 2009-08-15 04:40:26 +0000
+++ dcpp/Client.h 2010-08-15 19:35:53 +0000
@@ -26,6 +26,7 @@
#include "BufferedSocketListener.h"
#include "TimerManager.h"
#include "ClientListener.h"
+#include "Atomic.h"
namespace dcpp {
@@ -72,7 +73,10 @@
static string getCounts() {
char buf[128];
- return string(buf, snprintf(buf, sizeof(buf), "%ld/%ld/%ld", counts.normal, counts.registered, counts.op));
+ return string(buf, snprintf(buf, sizeof(buf), "%ld/%ld/%ld",
+ static_cast<long>(counts.normal),
+ static_cast<long>(counts.registered),
+ static_cast<long>(counts.op)));
}
StringMap& escapeParams(StringMap& sm) {
@@ -112,12 +116,16 @@
friend class ClientManager;
Client(const string& hubURL, char separator, bool secure_);
virtual ~Client() throw();
+
struct Counts {
- Counts(long n = 0, long r = 0, long o = 0) : normal(n), registered(r), op(o) { }
- volatile long normal;
- volatile long registered;
- volatile long op;
- bool operator !=(const Counts& rhs) { return normal != rhs.normal || registered != rhs.registered || op != rhs.op; }
+ private:
+ typedef Atomic<boost::int32_t,memory_ordering_weak> atomic_counter_t;
+ public:
+ typedef boost::int32_t value_type;
+ Counts(value_type n = 0, value_type r = 0, value_type o = 0) : normal(n), registered(r), op(o) { }
+ atomic_counter_t normal;
+ atomic_counter_t registered;
+ atomic_counter_t op;
};
enum States {
=== modified file 'dcpp/CriticalSection.h'
--- dcpp/CriticalSection.h 2009-02-23 01:47:25 +0000
+++ dcpp/CriticalSection.h 2010-08-15 19:35:53 +0000
@@ -19,7 +19,8 @@
#if !defined(CRITICAL_SECTION_H)
#define CRITICAL_SECTION_H
-#include "Thread.h"
+// header-only implementation of mutex
+#include <boost/signals2/mutex.hpp>
namespace dcpp {
@@ -78,32 +79,11 @@
*/
class FastCriticalSection {
public:
-#ifdef _WIN32
- FastCriticalSection() : state(0) { }
-
- void enter() {
- while(Thread::safeExchange(state, 1) == 1) {
- Thread::yield();
- }
- }
- void leave() {
- Thread::safeDec(state);
- }
-private:
- volatile long state;
-
-#else
- // We have to use a pthread (nonrecursive) mutex, didn't find any test_and_set on linux...
- FastCriticalSection() {
- static pthread_mutex_t fastmtx = PTHREAD_MUTEX_INITIALIZER;
- mtx = fastmtx;
- }
- ~FastCriticalSection() { pthread_mutex_destroy(&mtx); }
- void enter() { pthread_mutex_lock(&mtx); }
- void leave() { pthread_mutex_unlock(&mtx); }
-private:
- pthread_mutex_t mtx;
-#endif
+ void enter() { mtx.lock(); }
+ void leave() { mtx.unlock(); }
+private:
+ typedef boost::signals2::mutex mutex_t;
+ mutex_t mtx;
};
template<class T>
=== modified file 'dcpp/Pointer.h'
--- dcpp/Pointer.h 2009-08-15 04:40:26 +0000
+++ dcpp/Pointer.h 2010-08-15 19:35:53 +0000
@@ -20,7 +20,7 @@
#define DCPLUSPLUS_DCPP_POINTER_H
#include <boost/intrusive_ptr.hpp>
-#include "Thread.h"
+#include <boost/smart_ptr/detail/atomic_count.hpp>
namespace dcpp {
@@ -36,10 +36,10 @@
intrusive_ptr_base() throw() : ref(0) { }
private:
- friend void intrusive_ptr_add_ref(intrusive_ptr_base* p) { Thread::safeInc(p->ref); }
- friend void intrusive_ptr_release(intrusive_ptr_base* p) { if(Thread::safeDec(p->ref) == 0) { delete static_cast<T*>(p); } }
+ friend void intrusive_ptr_add_ref(intrusive_ptr_base* p) { ++p->ref; }
+ friend void intrusive_ptr_release(intrusive_ptr_base* p) { if(--p->ref == 0) { delete static_cast<T*>(p); } }
- volatile long ref;
+ boost::detail::atomic_count ref;
};
=== modified file 'dcpp/Semaphore.h'
--- dcpp/Semaphore.h 2009-02-23 01:47:25 +0000
+++ dcpp/Semaphore.h 2010-08-15 19:35:53 +0000
@@ -59,7 +59,7 @@
bool wait() throw() {
Lock l(cs);
- if(count == 0) {
+ while (count == 0) {
pthread_cond_wait(&cond, &cs.getMutex());
}
count--;
@@ -74,7 +74,10 @@
millis+=timev.tv_usec/1000;
t.tv_sec = timev.tv_sec + (millis/1000);
t.tv_nsec = (millis%1000)*1000*1000;
- int ret = pthread_cond_timedwait(&cond, &cs.getMutex(), &t);
+ int ret;
+ do {
+ ret = pthread_cond_timedwait(&cond, &cs.getMutex(), &t);
+ } while (ret==0 && count==0);
if(ret != 0) {
return false;
}
=== modified file 'dcpp/ShareManager.cpp'
--- dcpp/ShareManager.cpp 2009-12-27 22:03:53 +0000
+++ dcpp/ShareManager.cpp 2010-08-15 19:35:53 +0000
@@ -52,7 +52,7 @@
namespace dcpp {
ShareManager::ShareManager() : hits(0), xmlListLen(0), bzXmlListLen(0),
- xmlDirty(true), refreshDirs(false), update(false), initial(true), listN(0), refreshing(0),
+ xmlDirty(true), refreshDirs(false), update(false), initial(true), listN(0), refreshing(false),
lastXmlUpdate(0), lastFullUpdate(GET_TICK()), bloom(1<<20)
{
SettingsManager::getInstance()->addListener(this);
@@ -812,7 +812,7 @@
}
void ShareManager::refresh(bool dirs /* = false */, bool aUpdate /* = true */, bool block /* = false */) throw() {
- if(Thread::safeExchange(refreshing, 1) == 1) {
+ if(refreshing.exchange(true) == true) {
LogManager::getInstance()->message(_("File list refresh in progress, please wait for it to finish before trying to refresh again"));
return;
}
@@ -883,7 +883,7 @@
if(update) {
ClientManager::getInstance()->infoUpdated();
}
- refreshing = 0;
+ refreshing = false;
return 0;
}
=== modified file 'dcpp/ShareManager.h'
--- dcpp/ShareManager.h 2009-12-27 22:03:53 +0000
+++ dcpp/ShareManager.h 2010-08-15 19:35:53 +0000
@@ -33,6 +33,7 @@
#include "FastAlloc.h"
#include "MerkleTree.h"
#include "Pointer.h"
+#include "Atomic.h"
namespace dcpp {
@@ -249,7 +250,7 @@
int listN;
- volatile long refreshing;
+ Atomic<bool,memory_ordering_strong> refreshing;
uint64_t lastXmlUpdate;
uint64_t lastFullUpdate;
=== modified file 'dcpp/Thread.cpp'
--- dcpp/Thread.cpp 2009-02-23 01:47:25 +0000
+++ dcpp/Thread.cpp 2010-08-15 19:35:53 +0000
@@ -23,10 +23,6 @@
namespace dcpp {
-#ifndef _WIN32
-pthread_mutex_t Thread::mtx = PTHREAD_MUTEX_INITIALIZER;
-#endif
-
#ifdef _WIN32
void Thread::start() throw(ThreadException) {
join();
=== modified file 'dcpp/Thread.h'
--- dcpp/Thread.h 2009-03-01 05:27:41 +0000
+++ dcpp/Thread.h 2010-08-15 19:35:53 +0000
@@ -21,7 +21,6 @@
#ifndef _WIN32
#include <pthread.h>
-#include <sched.h>
#include <sys/resource.h>
#endif
@@ -62,18 +61,6 @@
void setThreadPriority(Priority p) throw() { ::SetThreadPriority(threadHandle, p); }
static void sleep(uint32_t millis) { ::Sleep(millis); }
- static void yield() { ::Sleep(1); }
-
-#ifdef __MINGW32__
- static long safeInc(volatile long& v) { return InterlockedIncrement((long*)&v); }
- static long safeDec(volatile long& v) { return InterlockedDecrement((long*)&v); }
- static long safeExchange(volatile long& target, long value) { return InterlockedExchange((long*)&target, value); }
-
-#else
- static long safeInc(volatile long& v) { return InterlockedIncrement(&v); }
- static long safeDec(volatile long& v) { return InterlockedDecrement(&v); }
- static long safeExchange(volatile long& target, long value) { return InterlockedExchange(&target, value); }
-#endif
#else
@@ -99,26 +86,6 @@
void setThreadPriority(Priority p) { setpriority(PRIO_PROCESS, 0, p); }
static void sleep(uint32_t millis) { ::usleep(millis*1000); }
- static void yield() { ::sched_yield(); }
- static long safeInc(volatile long& v) {
- pthread_mutex_lock(&mtx);
- long ret = ++v;
- pthread_mutex_unlock(&mtx);
- return ret;
- }
- static long safeDec(volatile long& v) {
- pthread_mutex_lock(&mtx);
- long ret = --v;
- pthread_mutex_unlock(&mtx);
- return ret;
- }
- static long safeExchange(volatile long& target, long value) {
- pthread_mutex_lock(&mtx);
- long ret = target;
- target = value;
- pthread_mutex_unlock(&mtx);
- return ret;
- }
#endif
protected:
@@ -133,7 +100,6 @@
return 0;
}
#else
- static pthread_mutex_t mtx;
pthread_t threadHandle;
static void* starter(void* p) {
Thread* t = (Thread*)p;