linuxdcpp-team team mailing list archive
-
linuxdcpp-team team
-
Mailing list archive
-
Message #06087
[Branch ~dcplusplus-team/dcplusplus/trunk] Rev 3061: use lock-free queues for the DL queue rechecker & file mover
------------------------------------------------------------
revno: 3061
committer: poy <poy@xxxxxxxxxx>
branch nick: trunk
timestamp: Fri 2012-09-21 21:56:41 +0200
message:
use lock-free queues for the DL queue rechecker & file mover
modified:
dcpp/QueueManager.cpp
dcpp/QueueManager.h
dcpp/atomic.h
--
lp:dcplusplus
https://code.launchpad.net/~dcplusplus-team/dcplusplus/trunk
Your team Dcplusplus-team is subscribed to branch lp:dcplusplus.
To unsubscribe from this branch go to https://code.launchpad.net/~dcplusplus-team/dcplusplus/trunk/+edit-subscription
=== modified file 'dcpp/QueueManager.cpp'
--- dcpp/QueueManager.cpp 2012-09-15 20:36:00 +0000
+++ dcpp/QueueManager.cpp 2012-09-21 19:56:41 +0000
@@ -30,6 +30,7 @@
#include "FinishedManager.h"
#include "HashManager.h"
#include "LogManager.h"
+#include "ScopedFunctor.h"
#include "SearchManager.h"
#include "SearchResult.h"
#include "SFVReader.h"
@@ -52,6 +53,9 @@
using boost::adaptors::map_values;
using boost::range::for_each;
+atomic_flag QueueManager::Rechecker::active = ATOMIC_FLAG_INIT;
+atomic_flag QueueManager::FileMover::active = ATOMIC_FLAG_INIT;
+
QueueManager::FileQueue::~FileQueue() {
for_each(queue | map_values, DeleteFunction());
}
@@ -296,51 +300,40 @@
}
void QueueManager::FileMover::moveFile(const string& source, const string& target) {
- Lock l(cs);
- files.emplace_back(source, target);
- if(!active) {
- active = true;
+ files.push(make_pair(source, target));
+ if(!active.test_and_set()) {
start();
}
}
int QueueManager::FileMover::run() {
- for(;;) {
+ ScopedFunctor([this] { active.clear(); });
+
+ while(true) {
FilePair next;
- {
- Lock l(cs);
- if(files.empty()) {
- active = false;
- return 0;
- }
- next = files.back();
- files.pop_back();
+ if(!files.pop(next)) {
+ return 0;
}
+
moveFile_(next.first, next.second);
}
+ return 0;
}
void QueueManager::Rechecker::add(const string& file) {
- Lock l(cs);
- files.push_back(file);
- if(!active) {
- active = true;
+ files.push(file);
+ if(!active.test_and_set()) {
start();
}
}
int QueueManager::Rechecker::run() {
+ ScopedFunctor([this] { active.clear(); });
+
while(true) {
string file;
- {
- Lock l(cs);
- auto i = files.begin();
- if(i == files.end()) {
- active = false;
- return 0;
- }
- file = *i;
- files.erase(i);
+ if(!files.pop(file)) {
+ return 0;
}
QueueItem* q;
=== modified file 'dcpp/QueueManager.h'
--- dcpp/QueueManager.h 2012-07-01 18:41:13 +0000
+++ dcpp/QueueManager.h 2012-09-21 19:56:41 +0000
@@ -22,21 +22,22 @@
#include <functional>
#include <unordered_map>
-#include "TimerManager.h"
+#include <boost/lockfree/queue.hpp>
+#include "atomic.h"
+#include "BundleItem.h"
+#include "ClientManagerListener.h"
#include "CriticalSection.h"
+#include "DirectoryListing.h"
#include "Exception.h"
-#include "User.h"
#include "File.h"
+#include "MerkleTree.h"
#include "QueueItem.h"
-#include "Singleton.h"
-#include "DirectoryListing.h"
-#include "MerkleTree.h"
-
#include "QueueManagerListener.h"
#include "SearchManagerListener.h"
-#include "ClientManagerListener.h"
-#include "BundleItem.h"
+#include "Singleton.h"
+#include "TimerManager.h"
+#include "User.h"
namespace dcpp {
@@ -141,23 +142,20 @@
GETSET(string, queueFile, QueueFile);
private:
- enum { MOVER_LIMIT = 10*1024*1024 };
+ static const int64_t MOVER_LIMIT = 10*1024*1024;
+
class FileMover : public Thread {
public:
- FileMover() : active(false) { }
+ FileMover() : files(8) { }
virtual ~FileMover() { join(); }
void moveFile(const string& source, const string& target);
virtual int run();
+
private:
+ static atomic_flag active;
typedef pair<string, string> FilePair;
- typedef vector<FilePair> FileList;
- typedef FileList::iterator FileIter;
-
- bool active;
-
- FileList files;
- CriticalSection cs;
+ boost::lockfree::queue<FilePair> files;
} mover;
class Rechecker : public Thread {
@@ -167,7 +165,7 @@
};
public:
- explicit Rechecker(QueueManager* qm_) : qm(qm_), active(false) { }
+ explicit Rechecker(QueueManager* qm_) : qm(qm_), files(8) { }
virtual ~Rechecker() { join(); }
void add(const string& file);
@@ -175,10 +173,8 @@
private:
QueueManager* qm;
- bool active;
-
- StringList files;
- CriticalSection cs;
+ static atomic_flag active;
+ boost::lockfree::queue<string> files;
} rechecker;
/** All queue items by target */
=== modified file 'dcpp/atomic.h'
--- dcpp/atomic.h 2012-09-21 18:35:34 +0000
+++ dcpp/atomic.h 2012-09-21 19:56:41 +0000
@@ -21,8 +21,6 @@
// GCC has issues with atomic - see https://bugs.launchpad.net/dcplusplus/+bug/735512
/// @todo check this again when GCC improves their threading support
-/// @todo track the progress of <https://svn.boost.org/trac/boost/ticket/7403> when updating boost
-/// (failure to apply the patch makes MSVC lockfree lists not actually lock-free).
#if defined(__GNUC__)
#include <boost/atomic.hpp>