← Back to team overview

linuxdcpp-team team mailing list archive

[Merge] lp:~gpr/linuxdcpp/sync into lp:linuxdcpp

 

Gennady Proskurin has proposed merging lp:~gpr/linuxdcpp/sync into lp:linuxdcpp.

Requested reviews:
  LinuxDC++ Team (linuxdcpp-team)
Related bugs:
  #617021 Semaphore potentially may underflow and become negative
  https://bugs.launchpad.net/bugs/617021
  #617591 Pointer.h/intrusive_ptr_base class is too heavy-weight
  https://bugs.launchpad.net/bugs/617591
  #617757 portable FastCriticalSection implementation
  https://bugs.launchpad.net/bugs/617757
  #617988 atomic counters implemented
  https://bugs.launchpad.net/bugs/617988


+ includes all patches submitted in corresponding bug reports
+ Thread::safeDec/Inc/Exchange functions are replaced by more fine-grained/lightweight/portable implementations
+ Thread::safeDec/Inc/Exchange functions removed (as unused)

Bug #617021: Semaphore potentially may underflow and become negative
Bug #617591: Pointer.h/intrusive_ptr_base class is too heavy-weight
Bug #617757: portable FastCriticalSection implementation
Bug #617988: atomic counters implemented

-- 
https://code.launchpad.net/~gpr/linuxdcpp/sync/+merge/32714
Your team LinuxDC++ Team is requested to review the proposed merge of lp:~gpr/linuxdcpp/sync into lp:linuxdcpp.
=== added file 'dcpp/Atomic.h'
--- dcpp/Atomic.h	1970-01-01 00:00:00 +0000
+++ dcpp/Atomic.h	2010-08-15 19:35:53 +0000
@@ -0,0 +1,122 @@
+#if !defined(DCPP_ATOMIC_H)
+#define DCPP_ATOMIC_H
+
+#include "CriticalSection.h"
+
+#include <boost/interprocess/detail/atomic.hpp>
+#include <boost/cstdint.hpp>
+
+namespace dcpp {
+
+// Ordering arguments:
+
+// memory_ordering_weak
+// Suitable only for thread-safe accounting of some statistics.
+// Value can not be used as "flag" (you cannot do any multi-thread action, based
+// on this value) since it does not garantees necessary memory barriers.
+class memory_ordering_weak {};
+
+// memory_ordering_strong
+// Suitable for any multi-thread purpose
+class memory_ordering_strong {};
+
+template <typename DataType, class Ordering = memory_ordering_strong>
+class Atomic;
+
+
+// uint32_t
+template <>
+class Atomic<boost::uint32_t, memory_ordering_weak> {
+	typedef boost::uint32_t value_type;
+public:
+	Atomic(value_type val) { assign(val); }
+	Atomic(const Atomic& other) { assign(static_cast<value_type>(other)); }
+
+	// operator=
+	// return void to be safe
+	void operator=(value_type val) { assign(val); }
+	void operator=(const Atomic& other) {
+		return operator=(static_cast<value_type>(other));
+	}
+
+	// type cast
+	operator value_type() const {
+		return boost::interprocess::detail::atomic_read32(&m_value);
+	}
+
+	// increment
+	void inc() { boost::interprocess::detail::atomic_inc32(&m_value); }
+
+	// decrement
+	void dec() { boost::interprocess::detail::atomic_dec32(&m_value); }
+
+private:
+	mutable value_type m_value;
+	void assign(value_type val) { boost::interprocess::detail::atomic_write32(&m_value, val); }
+};
+
+// int32_t
+// just forward all operations to underlying Atomic<uint32_t, ...> variable
+template <>
+class Atomic<boost::int32_t, memory_ordering_weak> {
+	typedef boost::int32_t value_type;
+public:
+	Atomic(value_type val) : m_value(val) {}
+	Atomic(const Atomic& other) : m_value(other) {}
+
+	void operator=(value_type val)		{ m_value=val; }
+	void operator=(const Atomic& other)	{ m_value=other; }
+	operator value_type() const		{ return static_cast<value_type>(m_value); }
+
+	void inc() { m_value.inc(); }
+	void dec() { m_value.dec(); }
+private:
+	Atomic<boost::uint32_t,memory_ordering_weak> m_value;
+};
+
+// memory_ordering_strong
+template <typename DataType>
+class Atomic<DataType, memory_ordering_strong> {
+	typedef DataType value_type;
+public:
+	Atomic(value_type new_value) : m_value(new_value) {}
+	Atomic(const Atomic& other) : m_value(static_cast<value_type>(other)) {}
+
+	void operator=(value_type new_value) {
+		FastLock Lock(cs);
+		m_value = new_value;
+	}
+	void operator=(const Atomic& other) {
+		FastLock Lock(cs);
+		m_value = other;
+	}
+	operator value_type() const {
+		FastLock Lock(cs); // shared (read-only) lock would be sufficient here
+		return m_value;
+	}
+
+	void inc() {
+		FastLock Lock(cs);
+		++m_value;
+	}
+	void dec() {
+		FastLock Lock(cs);
+		--m_value;
+	}
+
+	// assign new value, return old value
+	value_type exchange(value_type new_val) {
+		FastLock Lock(cs);
+		value_type old_val = m_value;
+		m_value = new_val;
+		return old_val;
+	}
+private:
+	value_type m_value;
+	mutable FastCriticalSection cs;
+};
+
+
+} // namespace dcpp
+
+#endif // !defined(DCPP_ATOMIC_H)

=== modified file 'dcpp/BufferedSocket.cpp'
--- dcpp/BufferedSocket.cpp	2009-08-15 04:40:26 +0000
+++ dcpp/BufferedSocket.cpp	2010-08-15 19:35:53 +0000
@@ -40,13 +40,13 @@
 {
 	start();
 
-	Thread::safeInc(sockets);
+	sockets.inc();
 }
 
-volatile long BufferedSocket::sockets = 0;
+Atomic<long,memory_ordering_strong> BufferedSocket::sockets(0);
 
 BufferedSocket::~BufferedSocket() throw() {
-	Thread::safeDec(sockets);
+	sockets.dec();
 }
 
 void BufferedSocket::setMode (Modes aMode, size_t aRollback) {

=== modified file 'dcpp/BufferedSocket.h'
--- dcpp/BufferedSocket.h	2009-08-15 04:40:26 +0000
+++ dcpp/BufferedSocket.h	2010-08-15 19:35:53 +0000
@@ -26,6 +26,7 @@
 #include "Speaker.h"
 #include "Util.h"
 #include "Socket.h"
+#include "Atomic.h"
 
 namespace dcpp {
 
@@ -153,7 +154,10 @@
 	void threadSendData() throw(Exception);
 
 	void fail(const string& aError);
-	static volatile long sockets;
+
+	// For this counter we must use "strong" variant of Atomic
+	// We do some actions after checking this value, while it changes in other threads
+	static Atomic<long,memory_ordering_strong> sockets;
 
 	bool checkEvents() throw(Exception);
 	void checkSocket() throw(Exception);

=== modified file 'dcpp/Client.cpp'
--- dcpp/Client.cpp	2009-08-15 04:40:26 +0000
+++ dcpp/Client.cpp	2010-08-15 19:35:53 +0000
@@ -158,24 +158,24 @@
 void Client::updateCounts(bool aRemove) {
 	// We always remove the count and then add the correct one if requested...
 	if(countType == COUNT_NORMAL) {
-		Thread::safeDec(counts.normal);
+		counts.normal.dec();
 	} else if(countType == COUNT_REGISTERED) {
-		Thread::safeDec(counts.registered);
+		counts.registered.dec();
 	} else if(countType == COUNT_OP) {
-		Thread::safeDec(counts.op);
+		counts.op.dec();
 	}
 
 	countType = COUNT_UNCOUNTED;
 
 	if(!aRemove) {
 		if(getMyIdentity().isOp()) {
-			Thread::safeInc(counts.op);
+			counts.op.inc();
 			countType = COUNT_OP;
 		} else if(getMyIdentity().isRegistered()) {
-			Thread::safeInc(counts.registered);
+			counts.registered.inc();
 			countType = COUNT_REGISTERED;
 		} else {
-			Thread::safeInc(counts.normal);
+			counts.normal.inc();
 			countType = COUNT_NORMAL;
 		}
 	}

=== modified file 'dcpp/Client.h'
--- dcpp/Client.h	2009-08-15 04:40:26 +0000
+++ dcpp/Client.h	2010-08-15 19:35:53 +0000
@@ -26,6 +26,7 @@
 #include "BufferedSocketListener.h"
 #include "TimerManager.h"
 #include "ClientListener.h"
+#include "Atomic.h"
 
 namespace dcpp {
 
@@ -72,7 +73,10 @@
 
 	static string getCounts() {
 		char buf[128];
-		return string(buf, snprintf(buf, sizeof(buf), "%ld/%ld/%ld", counts.normal, counts.registered, counts.op));
+		return string(buf, snprintf(buf, sizeof(buf), "%ld/%ld/%ld",
+			static_cast<long>(counts.normal),
+			static_cast<long>(counts.registered),
+			static_cast<long>(counts.op)));
 	}
 
 	StringMap& escapeParams(StringMap& sm) {
@@ -112,12 +116,16 @@
 	friend class ClientManager;
 	Client(const string& hubURL, char separator, bool secure_);
 	virtual ~Client() throw();
+
 	struct Counts {
-		Counts(long n = 0, long r = 0, long o = 0) : normal(n), registered(r), op(o) { }
-		volatile long normal;
-		volatile long registered;
-		volatile long op;
-		bool operator !=(const Counts& rhs) { return normal != rhs.normal || registered != rhs.registered || op != rhs.op; }
+	private:
+		typedef Atomic<boost::int32_t,memory_ordering_weak> atomic_counter_t;
+	public:
+		typedef boost::int32_t value_type;
+		Counts(value_type n = 0, value_type r = 0, value_type o = 0) : normal(n), registered(r), op(o) { }
+		atomic_counter_t normal;
+		atomic_counter_t registered;
+		atomic_counter_t op;
 	};
 
 	enum States {

=== modified file 'dcpp/CriticalSection.h'
--- dcpp/CriticalSection.h	2009-02-23 01:47:25 +0000
+++ dcpp/CriticalSection.h	2010-08-15 19:35:53 +0000
@@ -19,7 +19,8 @@
 #if !defined(CRITICAL_SECTION_H)
 #define CRITICAL_SECTION_H
 
-#include "Thread.h"
+// header-only implementation of mutex
+#include <boost/signals2/mutex.hpp>
 
 namespace dcpp {
 
@@ -78,32 +79,11 @@
  */
 class FastCriticalSection {
 public:
-#ifdef _WIN32
-	FastCriticalSection() : state(0) { }
-
-	void enter() {
-		while(Thread::safeExchange(state, 1) == 1) {
-			Thread::yield();
-		}
-	}
-	void leave() {
-		Thread::safeDec(state);
-	}
-private:
-	volatile long state;
-
-#else
-	// We have to use a pthread (nonrecursive) mutex, didn't find any test_and_set on linux...
-	FastCriticalSection() {
-		static pthread_mutex_t fastmtx = PTHREAD_MUTEX_INITIALIZER;
-		mtx = fastmtx;
-	}
-	~FastCriticalSection() { pthread_mutex_destroy(&mtx); }
-	void enter() { pthread_mutex_lock(&mtx); }
-	void leave() { pthread_mutex_unlock(&mtx); }
-private:
-	pthread_mutex_t mtx;
-#endif
+	void enter() { mtx.lock(); }
+	void leave() { mtx.unlock(); }
+private:
+	typedef boost::signals2::mutex mutex_t;
+	mutex_t mtx;
 };
 
 template<class T>

=== modified file 'dcpp/Pointer.h'
--- dcpp/Pointer.h	2009-08-15 04:40:26 +0000
+++ dcpp/Pointer.h	2010-08-15 19:35:53 +0000
@@ -20,7 +20,7 @@
 #define DCPLUSPLUS_DCPP_POINTER_H
 
 #include <boost/intrusive_ptr.hpp>
-#include "Thread.h"
+#include <boost/smart_ptr/detail/atomic_count.hpp>
 
 namespace dcpp {
 
@@ -36,10 +36,10 @@
 	intrusive_ptr_base() throw() : ref(0) { }
 
 private:
-	friend void intrusive_ptr_add_ref(intrusive_ptr_base* p) { Thread::safeInc(p->ref); }
-	friend void intrusive_ptr_release(intrusive_ptr_base* p) { if(Thread::safeDec(p->ref) == 0) { delete static_cast<T*>(p); } }
+	friend void intrusive_ptr_add_ref(intrusive_ptr_base* p) { ++p->ref; }
+	friend void intrusive_ptr_release(intrusive_ptr_base* p) { if(--p->ref == 0) { delete static_cast<T*>(p); } }
 
-	volatile long ref;
+	boost::detail::atomic_count ref;
 };
 
 

=== modified file 'dcpp/Semaphore.h'
--- dcpp/Semaphore.h	2009-02-23 01:47:25 +0000
+++ dcpp/Semaphore.h	2010-08-15 19:35:53 +0000
@@ -59,7 +59,7 @@
 
 	bool wait() throw() {
 		Lock l(cs);
-		if(count == 0) {
+		while (count == 0) {
 			pthread_cond_wait(&cond, &cs.getMutex());
 		}
 		count--;
@@ -74,7 +74,10 @@
 			millis+=timev.tv_usec/1000;
 			t.tv_sec = timev.tv_sec + (millis/1000);
 			t.tv_nsec = (millis%1000)*1000*1000;
-			int ret = pthread_cond_timedwait(&cond, &cs.getMutex(), &t);
+			int ret;
+			do {
+				ret = pthread_cond_timedwait(&cond, &cs.getMutex(), &t);
+			} while (ret==0 && count==0);
 			if(ret != 0) {
 				return false;
 			}

=== modified file 'dcpp/ShareManager.cpp'
--- dcpp/ShareManager.cpp	2009-12-27 22:03:53 +0000
+++ dcpp/ShareManager.cpp	2010-08-15 19:35:53 +0000
@@ -52,7 +52,7 @@
 namespace dcpp {
 
 ShareManager::ShareManager() : hits(0), xmlListLen(0), bzXmlListLen(0),
-	xmlDirty(true), refreshDirs(false), update(false), initial(true), listN(0), refreshing(0),
+	xmlDirty(true), refreshDirs(false), update(false), initial(true), listN(0), refreshing(false),
 	lastXmlUpdate(0), lastFullUpdate(GET_TICK()), bloom(1<<20)
 {
 	SettingsManager::getInstance()->addListener(this);
@@ -812,7 +812,7 @@
 }
 
 void ShareManager::refresh(bool dirs /* = false */, bool aUpdate /* = true */, bool block /* = false */) throw() {
-	if(Thread::safeExchange(refreshing, 1) == 1) {
+	if(refreshing.exchange(true) == true) {
 		LogManager::getInstance()->message(_("File list refresh in progress, please wait for it to finish before trying to refresh again"));
 		return;
 	}
@@ -883,7 +883,7 @@
 	if(update) {
 		ClientManager::getInstance()->infoUpdated();
 	}
-	refreshing = 0;
+	refreshing = false;
 	return 0;
 }
 

=== modified file 'dcpp/ShareManager.h'
--- dcpp/ShareManager.h	2009-12-27 22:03:53 +0000
+++ dcpp/ShareManager.h	2010-08-15 19:35:53 +0000
@@ -33,6 +33,7 @@
 #include "FastAlloc.h"
 #include "MerkleTree.h"
 #include "Pointer.h"
+#include "Atomic.h"
 
 namespace dcpp {
 
@@ -249,7 +250,7 @@
 
 	int listN;
 
-	volatile long refreshing;
+	Atomic<bool,memory_ordering_strong> refreshing;
 
 	uint64_t lastXmlUpdate;
 	uint64_t lastFullUpdate;

=== modified file 'dcpp/Thread.cpp'
--- dcpp/Thread.cpp	2009-02-23 01:47:25 +0000
+++ dcpp/Thread.cpp	2010-08-15 19:35:53 +0000
@@ -23,10 +23,6 @@
 
 namespace dcpp {
 
-#ifndef _WIN32
-pthread_mutex_t Thread::mtx = PTHREAD_MUTEX_INITIALIZER;
-#endif
-
 #ifdef _WIN32
 void Thread::start() throw(ThreadException) {
 	join();

=== modified file 'dcpp/Thread.h'
--- dcpp/Thread.h	2009-03-01 05:27:41 +0000
+++ dcpp/Thread.h	2010-08-15 19:35:53 +0000
@@ -21,7 +21,6 @@
 
 #ifndef _WIN32
 #include <pthread.h>
-#include <sched.h>
 #include <sys/resource.h>
 #endif
 
@@ -62,18 +61,6 @@
 	void setThreadPriority(Priority p) throw() { ::SetThreadPriority(threadHandle, p); }
 
 	static void sleep(uint32_t millis) { ::Sleep(millis); }
-	static void yield() { ::Sleep(1); }
-
-#ifdef __MINGW32__
-	static long safeInc(volatile long& v) { return InterlockedIncrement((long*)&v); }
-	static long safeDec(volatile long& v) { return InterlockedDecrement((long*)&v); }
-	static long safeExchange(volatile long& target, long value) { return InterlockedExchange((long*)&target, value); }
-
-#else
-	static long safeInc(volatile long& v) { return InterlockedIncrement(&v); }
-	static long safeDec(volatile long& v) { return InterlockedDecrement(&v); }
-	static long safeExchange(volatile long& target, long value) { return InterlockedExchange(&target, value); }
-#endif
 
 #else
 
@@ -99,26 +86,6 @@
 
 	void setThreadPriority(Priority p) { setpriority(PRIO_PROCESS, 0, p); }
 	static void sleep(uint32_t millis) { ::usleep(millis*1000); }
-	static void yield() { ::sched_yield(); }
-	static long safeInc(volatile long& v) {
-		pthread_mutex_lock(&mtx);
-		long ret = ++v;
-		pthread_mutex_unlock(&mtx);
-		return ret;
-	}
-	static long safeDec(volatile long& v) {
-		pthread_mutex_lock(&mtx);
-		long ret = --v;
-		pthread_mutex_unlock(&mtx);
-		return ret;
-	}
-	static long safeExchange(volatile long& target, long value) {
-		pthread_mutex_lock(&mtx);
-		long ret = target;
-		target = value;
-		pthread_mutex_unlock(&mtx);
-		return ret;
-	}
 #endif
 
 protected:
@@ -133,7 +100,6 @@
 		return 0;
 	}
 #else
-	static pthread_mutex_t mtx;
 	pthread_t threadHandle;
 	static void* starter(void* p) {
 		Thread* t = (Thread*)p;