zeitgeist team mailing list archive
-
zeitgeist team
-
Mailing list archive
-
Message #01913
[Merge] lp:~zeitgeist/zeitgeist/fix-641968 into lp:zeitgeist
Seif Lotfy has proposed merging lp:~zeitgeist/zeitgeist/fix-641968 into lp:zeitgeist.
Requested reviews:
Zeitgeist Framework Team (zeitgeist)
Related bugs:
#641968 querying on LeastRecentActor and MostRecentActor for a certain subject_uri is not working
https://bugs.launchpad.net/bugs/641968
Fixing the LeastRecentActor issues we have:
I came to the point where I think we had a fundamental understanding issue.
IMHO our current understanding of LeastRecentActor is wrong...
Let's assume we have sequential events. (The actors are defined by numbers)
2, 1, 3, 2, 1, 4
So we have 4 different actors (1,2,3,4) and we want to sort them by least recent.
the least recent is not 2 or 1 since they are used again at the end. the least recent is 3
This means LeastRecentActors should return the latest actors sorted ASC:
3, 2, 1, 4
and not
2, 1, 3, 4
MostRecentActors should return the same last 4 files but in reversed sorting:
4, 3, 1, 2
--
https://code.launchpad.net/~zeitgeist/zeitgeist/fix-641968/+merge/36424
Your team Zeitgeist Framework Team is requested to review the proposed merge of lp:~zeitgeist/zeitgeist/fix-641968 into lp:zeitgeist.
=== modified file '_zeitgeist/engine/main.py'
--- _zeitgeist/engine/main.py 2010-09-17 08:37:39 +0000
+++ _zeitgeist/engine/main.py 2010-09-23 10:01:19 +0000
@@ -346,7 +346,7 @@
if order == ResultType.LeastRecentActor:
sql += """
NATURAL JOIN (
- SELECT actor, min(timestamp) AS timestamp
+ SELECT actor, timestamp
FROM event_view
GROUP BY actor)
"""
=== modified file 'test/blacklist-test.py'
--- test/blacklist-test.py 2010-04-26 19:42:07 +0000
+++ test/blacklist-test.py 2010-09-23 10:01:19 +0000
@@ -5,7 +5,6 @@
import sys
import os
import unittest
-import dbus
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from zeitgeist.client import ZeitgeistDBusInterface
@@ -19,6 +18,9 @@
self.blacklist = None
def setUp(self):
+ # lazy import to get a chance to use the private bus
+ import dbus
+
# We set up the connection lazily in order to wait for the
# engine to come up
super(BlacklistTest, self).setUp()
=== modified file 'test/engine-extension-test.py'
--- test/engine-extension-test.py 2010-08-02 10:13:12 +0000
+++ test/engine-extension-test.py 2010-09-23 10:01:19 +0000
@@ -7,30 +7,23 @@
import weakref
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
-import _zeitgeist.engine
-from _zeitgeist.engine import constants
-from _zeitgeist.engine import get_engine
-from _zeitgeist.engine.extension import Extension
-
import unittest
from testutils import import_events
-class _Extension1(Extension):
- PUBLIC_METHODS = ["return_hallo", "return_engine"]
-
- def return_hallo(self):
- return "Hallo"
-
- def return_boo(self):
- return "boo"
-
- def return_engine(self):
- return self.engine
-
+Extension = None
class _engineTestClass(unittest.TestCase):
def setUp (self):
+ global Extension
+
+ from _zeitgeist.engine import constants
+ from _zeitgeist.engine import get_engine
+
+ if Extension is None:
+ from _zeitgeist.engine.extension import Extension as _Extension
+ Extension = _Extension
+
constants.DATABASE_FILE = ":memory:"
self.save_default_ext = os.environ.get("ZEITGEIST_DEFAULT_EXTENSIONS")
self.save_extra_ext = os.environ.get("ZEITGEIST_EXTRA_EXTENSIONS")
@@ -39,6 +32,7 @@
self.engine = get_engine()
def tearDown (self):
+ import _zeitgeist.engine
if self.save_default_ext is not None:
os.environ["ZEITGEIST_DEFAULT_EXTENSIONS"] = self.save_default_ext
else:
@@ -54,13 +48,27 @@
class TestExtensions(_engineTestClass):
def testCreateEngine(self):
- engine = get_engine()
+
+ class _Extension1(Extension):
+ PUBLIC_METHODS = ["return_hallo", "return_engine"]
+
+ def return_hallo(self):
+ return "Hallo"
+
+ def return_boo(self):
+ return "boo"
+
+ def return_engine(self):
+ return self.engine
+
+ engine = self.engine
self.assertEqual(len(engine.extensions), 0)
self.assertRaises(AttributeError, engine.extensions.__getattr__, "return_hallo")
engine.extensions.load(_Extension1)
self.assertEqual(engine.extensions.return_hallo(), "Hallo")
self.assertRaises(AttributeError, engine.extensions.__getattr__, "return_boo")
self.assertEqual(engine.extensions.return_engine(), weakref.proxy(engine))
+
class TestExtensionHooks(_engineTestClass):
=== modified file 'test/engine-test.py'
--- test/engine-test.py 2010-09-19 09:24:40 +0000
+++ test/engine-test.py 2010-09-23 10:01:19 +0000
@@ -9,7 +9,6 @@
import _zeitgeist.engine
from _zeitgeist.engine import constants
from _zeitgeist.engine import get_engine
-from _zeitgeist.engine.sql import WhereClause
from zeitgeist.datamodel import *
from testutils import import_events
@@ -602,7 +601,26 @@
events = self.engine.find_events(
TimeRange.always(), [], StorageState.Any, 0, ResultType.LeastRecentActor)
- self.assertEquals([e[0][1] for e in events], ["100", "101", "105"])
+ self.assertEquals([e[0][1] for e in events], ["105", "114", "119"])
+
+ def testResultTypesLeastRecentActorForSubject(self):
+ import_events("test/data/twenty_events.js", self.engine)
+
+ event_templates = [Event.new_for_values(subject_uri="file:///tmp/foo.txt"),]
+
+ events = self.engine.find_events(
+ TimeRange.always(), event_templates, StorageState.Any, 0, ResultType.LeastRecentActor)
+ self.assertEquals([e[0][1] for e in events], ["105", "114", "119"])
+
+ def testResultTypesMostRecentActorForSubject(self):
+ import_events("test/data/twenty_events.js", self.engine)
+
+ event_templates = [Event.new_for_values(subject_uri="file:///tmp/foo.txt"),]
+
+ events = self.engine.find_events(
+ TimeRange.always(), event_templates, StorageState.Any, 0, ResultType.MostRecentActor)
+ self.assertEquals([e[0][1] for e in events], ["119", "114", "105"])
+
def testResultTypesMostPopularOrigin(self):
import_events("test/data/twenty_events.js", self.engine)
@@ -933,72 +951,7 @@
[template,], StorageState.Any, 10, ResultType.MostRecentEvents
)
self.assertEquals(5, len(ids))
-
- def testWildcardOptimization(self):
- cursor = self.engine._cursor
- strings = [
- (u"hällö, I'm gürmen - Ã¥ge drikker øl - â bug",),
- (u"ä â åø",),
- (u"h" + unichr(0x10ffff),),
- (unichr(0x10ffff),),
- ("",),
- (unichr(0x10ffff) + unichr(0x10ffff) + "aa",),
- ]
-
- # does it work for ascii chars?
- cursor.executemany("INSERT INTO uri(value) VALUES(?)", strings)
- stm = WhereClause.optimize_glob("value", "uri", u"h")
- self.assertEquals(
- cursor.execute(*stm).fetchall(),
- cursor.execute("SELECT value FROM uri WHERE value GLOB ?", ("h*",)).fetchall()
- )
- self.assertEquals(len(cursor.execute(*stm).fetchall()), 2)
-
- # bunch of unicode in the prefix
- stm = WhereClause.optimize_glob("value", "uri", u"ä â Ã¥")
- self.assertEquals(
- cursor.execute(*stm).fetchall(),
- cursor.execute("SELECT value FROM uri WHERE value GLOB ?", (u"ä â Ã¥*",)).fetchall()
- )
- self.assertEquals(len(cursor.execute(*stm).fetchall()), 1)
-
- # bunch of unicode in the prefix, prefix is not 'utf-8' decoded
- stm = WhereClause.optimize_glob("value", "uri", "ä â Ã¥")
- self.assertEquals(
- cursor.execute(*stm).fetchall(),
- cursor.execute("SELECT value FROM uri WHERE value GLOB ?", ("ä â Ã¥*",)).fetchall()
- )
- self.assertEquals(len(cursor.execute(*stm).fetchall()), 1)
-
- # select all
- stm = WhereClause.optimize_glob("value", "uri", "")
- self.assertEquals(
- cursor.execute(*stm).fetchall(),
- cursor.execute("SELECT value FROM uri WHERE value GLOB ?", ("*",)).fetchall()
- )
- self.assertEquals(len(cursor.execute(*stm).fetchall()), len(strings))
-
- # what if the biggest char is the last character of the search prefix?
- prefix = u"h" + unichr(0x10ffff)
- stm = WhereClause.optimize_glob("value", "uri", prefix)
- self.assertEquals(
- cursor.execute(*stm).fetchall(),
- cursor.execute(
- "SELECT value FROM uri WHERE value GLOB ?", (u"%s*" %prefix,)
- ).fetchall()
- )
- self.assertEquals(len(cursor.execute(*stm).fetchall()), 1)
-
- # what if the search prefix only contains the biggest char
- prefix = unichr(0x10ffff) + unichr(0x10ffff)
- stm = WhereClause.optimize_glob("value", "uri", prefix)
- self.assertEquals(
- cursor.execute(*stm).fetchall(),
- cursor.execute(
- "SELECT value FROM uri WHERE value GLOB ?", (u"%s*" %prefix,)
- ).fetchall()
- )
- self.assertEquals(len(cursor.execute(*stm).fetchall()), 1)
if __name__ == "__main__":
unittest.main()
+
=== modified file 'test/loggers-datasources-recent-test.py'
--- test/loggers-datasources-recent-test.py 2009-11-30 07:57:58 +0000
+++ test/loggers-datasources-recent-test.py 2010-09-23 10:01:19 +0000
@@ -7,16 +7,28 @@
import unittest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
-from _zeitgeist.loggers.datasources.recent import SimpleMatch, MimeTypeSet
-
-class SimpleMatchTest(unittest.TestCase):
+
+SimpleMatch = None
+MimeTypeSet = None
+
+class BaseTestCase(unittest.TestCase):
+
+ def setUp(self):
+ global SimpleMatch
+ global MimeTypeSet
+ if None in (SimpleMatch, MimeTypeSet):
+ from _zeitgeist.loggers.datasources.recent import SimpleMatch as _SM, MimeTypeSet as _MTS
+ SimpleMatch = _SM
+ MimeTypeSet = _MTS
+
+class SimpleMatchTest(BaseTestCase):
def testmatch(self):
self.assertTrue(SimpleMatch("boo/*").match("boo/bar"))
self.assertTrue(SimpleMatch("boo/bar.*").match("boo/bar.foo"))
self.assertFalse(SimpleMatch("boo/bar.*").match("boo/barfoo"))
-class MimeTypeSetTest(unittest.TestCase):
+class MimeTypeSetTest(BaseTestCase):
def testinit(self):
self.assertEquals(repr(MimeTypeSet("boo", "bar", "foo")), "MimeTypeSet('bar', 'boo', 'foo')")
=== modified file 'test/remote-test.py'
--- test/remote-test.py 2010-09-15 14:20:21 +0000
+++ test/remote-test.py 2010-09-23 10:01:19 +0000
@@ -6,6 +6,8 @@
import logging
import signal
import time
+import tempfile
+import shutil
from subprocess import Popen, PIPE
# DBus setup
@@ -18,7 +20,6 @@
from zeitgeist.client import ZeitgeistDBusInterface, ZeitgeistClient
from zeitgeist.datamodel import (Event, Subject, Interpretation, Manifestation,
TimeRange, StorageState)
-from _zeitgeist.engine.remote import RemoteInterface
import testutils
from testutils import parse_events
@@ -322,9 +323,31 @@
class ZeitgeistRemoteInterfaceTest(unittest.TestCase):
+ def setUp(self):
+ from _zeitgeist import engine
+ from _zeitgeist.engine import sql, constants
+ engine._engine = None
+ sql.unset_cursor()
+ self.saved_data = {
+ "datapath": constants.DATA_PATH,
+ "database": constants.DATABASE_FILE,
+ "extensions": constants.USER_EXTENSION_PATH,
+ }
+ constants.DATA_PATH = tempfile.mkdtemp(prefix="zeitgeist.datapath.")
+ constants.DATABASE_FILE = ":memory:"
+ constants.USER_EXTENSION_PATH = os.path.join(constants.DATA_PATH, "extensions")
+
+ def tearDown(self):
+ from _zeitgeist.engine import constants
+ shutil.rmtree(constants.DATA_PATH)
+ constants.DATA_PATH = self.saved_data["datapath"]
+ constants.DATABASE_FILE = self.saved_data["database"]
+ constants.USER_EXTENSION_PATH = self.saved_data["extensions"]
+
def testQuit(self):
"""calling Quit() on the remote interface should shutdown the
engine in a clean way"""
+ from _zeitgeist.engine.remote import RemoteInterface
interface = RemoteInterface()
self.assertEquals(interface._engine.is_closed(), False)
interface.Quit()
@@ -332,17 +355,21 @@
class ZeitgeistDaemonTest(unittest.TestCase):
+ def setUp(self):
+ self.env = os.environ.copy()
+ self.datapath = tempfile.mkdtemp(prefix="zeitgeist.datapath.")
+ self.env.update({
+ "ZEITGEIST_DATABASE_PATH": ":memory:",
+ "ZEITGEIST_DATA_PATH": self.datapath,
+ })
+
+ def tearDown(self):
+ shutil.rmtree(self.datapath)
+
def testSIGHUP(self):
"""sending a SIGHUP signal to a running deamon instance results
in a clean shutdown"""
- daemon = Popen(
- ["./zeitgeist-daemon.py", "--no-datahub"], stderr=PIPE, stdout=PIPE
- )
- # give the daemon some time to wake up
- time.sleep(3)
- err = daemon.poll()
- if err:
- raise RuntimeError("Could not start daemon, got err=%i" % err)
+ daemon = testutils.RemoteTestCase._safe_start_daemon(env=self.env)
os.kill(daemon.pid, signal.SIGHUP)
err = daemon.wait()
self.assertEqual(err, 0)
=== modified file 'test/run-all-tests.py'
--- test/run-all-tests.py 2010-09-02 14:33:04 +0000
+++ test/run-all-tests.py 2010-09-23 10:01:19 +0000
@@ -6,36 +6,98 @@
import doctest
import logging
import sys
+import tempfile
+import shutil
from optparse import OptionParser
-parser = OptionParser()
-parser.add_option("-v", action="count", dest="verbosity")
-(options, args) = parser.parse_args()
-
-if options.verbosity:
- # do more fine grained stuff later
- # redirect all debugging output to stderr
- logging.basicConfig(stream=sys.stderr)
-else:
- logging.basicConfig(filename="/dev/null")
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
# Find the test/ directory
-testdir = os.path.dirname(os.path.abspath(__file__))
-doctests = glob.glob(os.path.join(testdir, "*.rst"))
-
-# Create a test suite to run all tests
-# first, add all doctests
-arguments = {"module_relative": False, "globs": {"sys": sys}}
-suite = doctest.DocFileSuite(*doctests, **arguments)
-
-# Add all of the tests from each file that ends with "-test.py"
-for fname in os.listdir(testdir):
- if fname.endswith("-test.py"):
- fname = os.path.basename(fname)[:-3] # Get the filename and chop off ".py"
- module = __import__(fname)
- suite.addTest(unittest.defaultTestLoader.loadTestsFromModule(module))
-
-# Run all of the tests
-unittest.TextTestRunner(stream=sys.stdout, verbosity=2).run(suite)
+TESTDIR = os.path.dirname(os.path.abspath(__file__))
+DOCTESTS = glob.glob(os.path.join(TESTDIR, "*.rst"))
+
+def doctest_setup(test):
+ test._datapath = tempfile.mkdtemp(prefix="zeitgeist.datapath.")
+ test._env = os.environ.copy()
+ os.environ.update({
+ "ZEITGEIST_DATABASE_PATH": ":memory:",
+ "ZEITGEIST_DATA_PATH": test._datapath
+ })
+
+def doctest_teardown(test):
+ shutil.rmtree(test._datapath)
+ os.environ = test._env
+
+def iter_tests(suite):
+ for test in suite:
+ if isinstance(test, unittest.TestSuite):
+ for t in iter_tests(test):
+ yield t
+ else:
+ yield test
+
+def get_test_name(test):
+ return ".".join((test.__class__.__module__, test.__class__.__name__, test._testMethodName))
+
+def load_tests(module, pattern):
+ suite = unittest.defaultTestLoader.loadTestsFromModule(module)
+ for test in iter_tests(suite):
+ name = get_test_name(test)
+ if pattern is not None:
+ for p in pattern:
+ if name.startswith(p):
+ yield test
+ break
+ else:
+ yield test
+
+def compile_suite(pattern=None):
+ # Create a test suite to run all tests
+
+ # first, add all doctests
+ arguments = {
+ "module_relative": False,
+ "globs": {"sys": sys},
+ "setUp": doctest_setup,
+ "tearDown": doctest_teardown,
+ }
+ suite = doctest.DocFileSuite(*DOCTESTS, **arguments)
+
+ # Add all of the tests from each file that ends with "-test.py"
+ for fname in os.listdir(TESTDIR):
+ if fname.endswith("-test.py"):
+ fname = os.path.basename(fname)[:-3] # Get the filename and chop off ".py"
+ module = __import__(fname)
+ tests = list(load_tests(module, pattern))
+ suite.addTests(tests)
+ return suite
+
+if __name__ == "__main__":
+ parser = OptionParser()
+ parser.add_option("-v", action="count", dest="verbosity")
+ (options, args) = parser.parse_args()
+
+ if options.verbosity:
+ # do more fine grained stuff later
+ # redirect all debugging output to stderr
+ logging.basicConfig(stream=sys.stderr)
+ else:
+ logging.basicConfig(filename="/dev/null")
+
+ from testutils import DBusPrivateMessageBus
+ bus = DBusPrivateMessageBus()
+ err = bus.run(ignore_errors=True)
+ if err:
+ print >> sys.stderr, "*** Failed to setup private bus, error was: %s" %err
+ else:
+ print >> sys.stderr, "*** Testsuite is running using a private dbus bus"
+ config = bus.dbus_config.copy()
+ config.update({"DISPLAY": bus.DISPLAY, "pid.Xvfb": bus.display.pid})
+ print >> sys.stderr, "*** Configuration: %s" %config
+ try:
+ suite = compile_suite(args or None)
+ # Run all of the tests
+ unittest.TextTestRunner(stream=sys.stdout, verbosity=2).run(suite)
+ finally:
+ bus.quit(ignore_errors=True)
=== modified file 'test/sql-test.py'
--- test/sql-test.py 2010-09-19 11:49:41 +0000
+++ test/sql-test.py 2010-09-23 10:01:19 +0000
@@ -23,10 +23,16 @@
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
import unittest
-from _zeitgeist.engine.sql import *
+WhereClause = None
class SQLTest (unittest.TestCase):
+ def setUp(self):
+ global WhereClause
+ if WhereClause is None:
+ from _zeitgeist.engine.sql import WhereClause as _WhereClause
+ WhereClause = _WhereClause
+
def testFlat (self):
where = WhereClause(WhereClause.AND)
where.add ("foo = %s", 10)
=== modified file 'test/testutils.py'
--- test/testutils.py 2010-07-22 09:52:53 +0000
+++ test/testutils.py 2010-09-23 10:01:19 +0000
@@ -3,6 +3,7 @@
# Zeitgeist
#
# Copyright © 2009 Mikkel Kamstrup Erlandsen <mikkel.kamstrup@xxxxxxxxx>
+# Copyright © 2009-2010 Markus Korn <thekorn@xxxxxx>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
@@ -22,6 +23,8 @@
import time
import sys
import signal
+import tempfile
+import shutil
from subprocess import Popen, PIPE
# DBus setup
@@ -41,8 +44,6 @@
# maybe the user is using python < 2.6
import simplejson as json
-from zeitgeist.datamodel import Event, Subject
-
def dict2event(d):
ev = Event()
ev[0][Event.Id] = d.get("id", "").encode("UTF-8")
@@ -74,7 +75,7 @@
"""
Load a collection of JSON event definitions into 'engine'. Fx:
- import_events("test/data/single_event.js", self.engine)
+ import_events("test/data/single_event.js", self.engine)
"""
events = parse_events(path)
@@ -86,21 +87,50 @@
remote Zeitgeist process
"""
+ @staticmethod
+ def _get_pid(matching_string):
+ p1 = Popen(["ps", "aux"], stdout=PIPE, stderr=PIPE)
+ p2 = Popen(["grep", matching_string], stdin=p1.stdout, stderr=PIPE, stdout=PIPE)
+ return p2.communicate()[0]
+
+ @staticmethod
+ def _safe_start_subprocess(cmd, env, timeout=1, error_callback=None):
+ """ starts `cmd` in a subprocess and check after `timeout`
+ if everything goes well"""
+ process = Popen(cmd, stderr=PIPE, stdout=PIPE, env=env)
+ # give the process some time to wake up
+ time.sleep(timeout)
+ error = process.poll()
+ if error:
+ cmd = " ".join(cmd)
+ error = "'%s' exits with error %i." %(cmd, error)
+ if error_callback:
+ error += " *** %s" %error_callback(*process.communicate())
+ raise RuntimeError(error)
+ return process
+
+ @staticmethod
+ def _safe_start_daemon(env=None, timeout=1):
+ if env is None:
+ env = os.environ.copy()
+
+ def error_callback(stdout, stderr):
+ if "--replace" in stderr:
+ return "%r | %s" %(stderr, RemoteTestCase._get_pid("zeitgeist-daemon").replace("\n", "|"))
+ else:
+ return stderr
+
+ return RemoteTestCase._safe_start_subprocess(
+ ("./zeitgeist-daemon.py", "--no-datahub"), env, timeout, error_callback
+ )
+
def __init__(self, methodName):
super(RemoteTestCase, self).__init__(methodName)
self.daemon = None
self.client = None
def spawn_daemon(self):
- os.environ.update({"ZEITGEIST_DATABASE_PATH": ":memory:"})
- self.daemon = Popen(
- ["./zeitgeist-daemon.py", "--no-datahub"], stderr=sys.stderr, stdout=sys.stderr
- )
- # give the daemon some time to wake up
- time.sleep(3)
- err = self.daemon.poll()
- if err:
- raise RuntimeError("Could not start daemon, got err=%i" % err)
+ self.daemon = self._safe_start_daemon(env=self.env)
def kill_daemon(self):
os.kill(self.daemon.pid, signal.SIGKILL)
@@ -109,6 +139,12 @@
def setUp(self):
assert self.daemon is None
assert self.client is None
+ self.env = os.environ.copy()
+ self.datapath = tempfile.mkdtemp(prefix="zeitgeist.datapath.")
+ self.env.update({
+ "ZEITGEIST_DATABASE_PATH": ":memory:",
+ "ZEITGEIST_DATA_PATH": self.datapath,
+ })
self.spawn_daemon()
# hack to clear the state of the interface
@@ -119,6 +155,7 @@
assert self.daemon is not None
assert self.client is not None
self.kill_daemon()
+ shutil.rmtree(self.datapath)
def insertEventsAndWait(self, events):
"""
@@ -220,3 +257,45 @@
num_events=num_events, result_type=result_type)
mainloop.run()
return result
+
+class DBusPrivateMessageBus(object):
+ DISPLAY = ":27"
+
+ def _run(self):
+ os.environ.update({"DISPLAY": self.DISPLAY})
+ self.display = Popen(["Xvfb", self.DISPLAY, "-screen", "0", "1024x768x8"])
+ # give the display some time to wake up
+ time.sleep(1)
+ err = self.display.poll()
+ if err:
+ raise RuntimeError("Could not start Xvfb on display %s, got err=%i" %(self.DISPLAY, err))
+ dbus = Popen(["dbus-launch"], stdout=PIPE)
+ time.sleep(1)
+ self.dbus_config = dict(l.split("=", 1) for l in dbus.communicate()[0].split("\n") if l)
+ os.environ.update(self.dbus_config)
+
+ def run(self, ignore_errors=False):
+ try:
+ return self._run()
+ except Exception, e:
+ if ignore_errors:
+ return e
+ raise
+
+ def _quit(self):
+ os.kill(self.display.pid, signal.SIGKILL)
+ self.display.wait()
+ pid = int(self.dbus_config["DBUS_SESSION_BUS_PID"])
+ os.kill(pid, signal.SIGKILL)
+ try:
+ os.waitpid(pid, 0)
+ except OSError:
+ pass
+
+ def quit(self, ignore_errors=False):
+ try:
+ return self._quit()
+ except Exception, e:
+ if ignore_errors:
+ return e
+ raise