← Back to team overview

linuxdcpp-team team mailing list archive

[Branch ~dcplusplus-team/dcplusplus/trunk] Rev 3061: use lock-free queues for the DL queue rechecker & file mover

 

------------------------------------------------------------
revno: 3061
committer: poy <poy@xxxxxxxxxx>
branch nick: trunk
timestamp: Fri 2012-09-21 21:56:41 +0200
message:
  use lock-free queues for the DL queue rechecker & file mover
modified:
  dcpp/QueueManager.cpp
  dcpp/QueueManager.h
  dcpp/atomic.h


--
lp:dcplusplus
https://code.launchpad.net/~dcplusplus-team/dcplusplus/trunk

Your team Dcplusplus-team is subscribed to branch lp:dcplusplus.
To unsubscribe from this branch go to https://code.launchpad.net/~dcplusplus-team/dcplusplus/trunk/+edit-subscription
=== modified file 'dcpp/QueueManager.cpp'
--- dcpp/QueueManager.cpp	2012-09-15 20:36:00 +0000
+++ dcpp/QueueManager.cpp	2012-09-21 19:56:41 +0000
@@ -30,6 +30,7 @@
 #include "FinishedManager.h"
 #include "HashManager.h"
 #include "LogManager.h"
+#include "ScopedFunctor.h"
 #include "SearchManager.h"
 #include "SearchResult.h"
 #include "SFVReader.h"
@@ -52,6 +53,9 @@
 using boost::adaptors::map_values;
 using boost::range::for_each;
 
+atomic_flag QueueManager::Rechecker::active = ATOMIC_FLAG_INIT;
+atomic_flag QueueManager::FileMover::active = ATOMIC_FLAG_INIT;
+
 QueueManager::FileQueue::~FileQueue() {
 	for_each(queue | map_values, DeleteFunction());
 }
@@ -296,51 +300,40 @@
 }
 
 void QueueManager::FileMover::moveFile(const string& source, const string& target) {
-	Lock l(cs);
-	files.emplace_back(source, target);
-	if(!active) {
-		active = true;
+	files.push(make_pair(source, target));
+	if(!active.test_and_set()) {
 		start();
 	}
 }
 
 int QueueManager::FileMover::run() {
-	for(;;) {
+	ScopedFunctor([this] { active.clear(); });
+
+	while(true) {
 		FilePair next;
-		{
-			Lock l(cs);
-			if(files.empty()) {
-				active = false;
-				return 0;
-			}
-			next = files.back();
-			files.pop_back();
+		if(!files.pop(next)) {
+			return 0;
 		}
+
 		moveFile_(next.first, next.second);
 	}
+	return 0;
 }
 
 void QueueManager::Rechecker::add(const string& file) {
-	Lock l(cs);
-	files.push_back(file);
-	if(!active) {
-		active = true;
+	files.push(file);
+	if(!active.test_and_set()) {
 		start();
 	}
 }
 
 int QueueManager::Rechecker::run() {
+	ScopedFunctor([this] { active.clear(); });
+
 	while(true) {
 		string file;
-		{
-			Lock l(cs);
-			auto i = files.begin();
-			if(i == files.end()) {
-				active = false;
-				return 0;
-			}
-			file = *i;
-			files.erase(i);
+		if(!files.pop(file)) {
+			return 0;
 		}
 
 		QueueItem* q;

=== modified file 'dcpp/QueueManager.h'
--- dcpp/QueueManager.h	2012-07-01 18:41:13 +0000
+++ dcpp/QueueManager.h	2012-09-21 19:56:41 +0000
@@ -22,21 +22,22 @@
 #include <functional>
 #include <unordered_map>
 
-#include "TimerManager.h"
+#include <boost/lockfree/queue.hpp>
 
+#include "atomic.h"
+#include "BundleItem.h"
+#include "ClientManagerListener.h"
 #include "CriticalSection.h"
+#include "DirectoryListing.h"
 #include "Exception.h"
-#include "User.h"
 #include "File.h"
+#include "MerkleTree.h"
 #include "QueueItem.h"
-#include "Singleton.h"
-#include "DirectoryListing.h"
-#include "MerkleTree.h"
-
 #include "QueueManagerListener.h"
 #include "SearchManagerListener.h"
-#include "ClientManagerListener.h"
-#include "BundleItem.h"
+#include "Singleton.h"
+#include "TimerManager.h"
+#include "User.h"
 
 namespace dcpp {
 
@@ -141,23 +142,20 @@
 	GETSET(string, queueFile, QueueFile);
 
 private:
-	enum { MOVER_LIMIT = 10*1024*1024 };
+	static const int64_t MOVER_LIMIT = 10*1024*1024;
+
 	class FileMover : public Thread {
 	public:
-		FileMover() : active(false) { }
+		FileMover() : files(8) { }
 		virtual ~FileMover() { join(); }
 
 		void moveFile(const string& source, const string& target);
 		virtual int run();
+
 	private:
+		static atomic_flag active;
 		typedef pair<string, string> FilePair;
-		typedef vector<FilePair> FileList;
-		typedef FileList::iterator FileIter;
-
-		bool active;
-
-		FileList files;
-		CriticalSection cs;
+		boost::lockfree::queue<FilePair> files;
 	} mover;
 
 	class Rechecker : public Thread {
@@ -167,7 +165,7 @@
 		};
 
 	public:
-		explicit Rechecker(QueueManager* qm_) : qm(qm_), active(false) { }
+		explicit Rechecker(QueueManager* qm_) : qm(qm_), files(8) { }
 		virtual ~Rechecker() { join(); }
 
 		void add(const string& file);
@@ -175,10 +173,8 @@
 
 	private:
 		QueueManager* qm;
-		bool active;
-
-		StringList files;
-		CriticalSection cs;
+		static atomic_flag active;
+		boost::lockfree::queue<string> files;
 	} rechecker;
 
 	/** All queue items by target */

=== modified file 'dcpp/atomic.h'
--- dcpp/atomic.h	2012-09-21 18:35:34 +0000
+++ dcpp/atomic.h	2012-09-21 19:56:41 +0000
@@ -21,8 +21,6 @@
 
 // GCC has issues with atomic - see https://bugs.launchpad.net/dcplusplus/+bug/735512
 /// @todo check this again when GCC improves their threading support
-/// @todo track the progress of <https://svn.boost.org/trac/boost/ticket/7403> when updating boost
-/// (failure to apply the patch makes MSVC lockfree lists not actually lock-free).
 #if defined(__GNUC__)
 
 #include <boost/atomic.hpp>