zeitgeist team mailing list archive
-
zeitgeist team
-
Mailing list archive
-
Message #03991
[Branch ~zeitgeist/zeitgeist/bluebird] Rev 236: Fix signal test cases so extensions are detected correctly.
------------------------------------------------------------
revno: 236
committer: Siegfried-Angel Gevatter Pujals <siegfried@xxxxxxxxxxxx>
branch nick: bluebird
timestamp: Wed 2011-09-07 22:48:27 +0200
message:
Fix signal test cases so extensions are detected correctly.
removed:
test/dbus/remote-test.orig.py
modified:
test/dbus/blacklist-test.py
test/dbus/dsr-test.py
test/dbus/monitor-test.py
test/dbus/testutils.py
--
lp:~zeitgeist/zeitgeist/bluebird
https://code.launchpad.net/~zeitgeist/zeitgeist/bluebird
Your team Zeitgeist Framework Team is subscribed to branch lp:~zeitgeist/zeitgeist/bluebird.
To unsubscribe from this branch go to https://code.launchpad.net/~zeitgeist/zeitgeist/bluebird/+edit-subscription
=== modified file 'test/dbus/blacklist-test.py'
--- test/dbus/blacklist-test.py 2011-09-07 16:50:05 +0000
+++ test/dbus/blacklist-test.py 2011-09-07 20:48:27 +0000
@@ -32,7 +32,7 @@
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from zeitgeist.client import ZeitgeistDBusInterface
from zeitgeist.datamodel import *
-from testutils import RemoteTestCase
+from testutils import RemoteTestCase, asyncTestMethod
class BlacklistTest(RemoteTestCase):
@@ -121,6 +121,7 @@
global hit
hit = 0
+ @asyncTestMethod(mainloop)
def cb_added(template_id, event_template):
global hit
print "*** cb_added with hit", hit
@@ -129,6 +130,7 @@
self.assertEquals(template_id, "TestTemplate")
self.assertEventsEqual(template1, event_template)
+ @asyncTestMethod(mainloop)
def cb_removed(template_id, event_template):
global hit
self.assertEquals(hit, 1)
=== modified file 'test/dbus/dsr-test.py'
--- test/dbus/dsr-test.py 2011-09-05 13:09:08 +0000
+++ test/dbus/dsr-test.py 2011-09-07 20:48:27 +0000
@@ -45,7 +45,7 @@
TimeRange, StorageState, DataSource, NULL_EVENT, ResultType)
import testutils
-from testutils import parse_events, import_events
+from testutils import parse_events, import_events, asyncTestMethod
class ZeitgeistRemoteDataSourceRegistryTest(testutils.RemoteTestCase):
@@ -208,11 +208,13 @@
global hit
hit = 0
+ @asyncTestMethod(mainloop)
def cb_registered(datasource):
global hit
self.assertEquals(hit, 0)
hit = 1
+ @asyncTestMethod(mainloop)
def cb_enabled(unique_id, enabled):
global hit
if hit == 1:
@@ -228,6 +230,7 @@
else:
self.fail("Unexpected number of signals: %d." % hit)
+ #@asyncTestMethod(mainloop)
#def cb_disconnect(datasource):
# self.assertEquals(hit, 3)
# mainloop.quit()
@@ -245,6 +248,7 @@
def testRegisterDataSourceEnabledCallbackOnRegister(self):
mainloop = self.create_mainloop()
+ @asyncTestMethod(mainloop)
def callback(enabled):
mainloop.quit()
self.client.register_data_source(*self._ds1, enabled_callback=callback)
@@ -257,6 +261,7 @@
hit = 0
# Register a callback
+ @asyncTestMethod(mainloop)
def callback(enabled):
global hit
if hit == 0:
=== modified file 'test/dbus/monitor-test.py'
--- test/dbus/monitor-test.py 2011-08-28 17:30:10 +0000
+++ test/dbus/monitor-test.py 2011-09-07 20:48:27 +0000
@@ -45,7 +45,7 @@
TimeRange, StorageState, DataSource, NULL_EVENT, ResultType)
import testutils
-from testutils import parse_events, import_events
+from testutils import parse_events, import_events, asyncTestMethod
class ZeitgeistMonitorTest(testutils.RemoteTestCase):
@@ -56,10 +56,12 @@
tmpl = Event.new_for_values(interpretation="stfu:OpenEvent")
events = parse_events("test/data/five_events.js")
+ @asyncTestMethod(mainloop)
def notify_insert_handler(time_range, events):
result.extend(events)
mainloop.quit()
+ @asyncTestMethod(mainloop)
def notify_delete_handler(time_range, event_ids):
mainloop.quit()
self.fail("Unexpected delete notification")
@@ -78,10 +80,12 @@
subjects=[Subject.new_for_values(uri="file:///tmp/bar.txt")])
events = parse_events("test/data/five_events.js")
+ @asyncTestMethod(mainloop)
def notify_insert_handler(time_range, events):
result.extend(events)
mainloop.quit()
+ @asyncTestMethod(mainloop)
def notify_delete_handler(time_range, event_ids):
mainloop.quit()
self.fail("Unexpected delete notification")
@@ -100,10 +104,12 @@
subjects=[Subject.new_for_values(uri="file:///tmp/*")])
events = parse_events("test/data/five_events.js")
+ @asyncTestMethod(mainloop)
def notify_insert_handler(time_range, events):
result.extend(events)
mainloop.quit()
+ @asyncTestMethod(mainloop)
def notify_delete_handler(time_range, event_ids):
mainloop.quit()
self.fail("Unexpected delete notification")
@@ -122,10 +128,12 @@
subjects=[Subject.new_for_values(uri="!file:///tmp/*")])
events = parse_events("test/data/five_events.js")
+ @asyncTestMethod(mainloop)
def notify_insert_handler(time_range, events):
result.extend(events)
mainloop.quit()
+ @asyncTestMethod(mainloop)
def notify_delete_handler(time_range, event_ids):
mainloop.quit()
self.fail("Unexpected delete notification")
@@ -144,10 +152,12 @@
subjects=[Subject.new_for_values(uri="!file:///tmp/bar.txt")])
events = parse_events("test/data/five_events.js")
+ @asyncTestMethod(mainloop)
def notify_insert_handler(time_range, events):
result.extend(events)
mainloop.quit()
+ @asyncTestMethod(mainloop)
def notify_delete_handler(time_range, event_ids):
mainloop.quit()
self.fail("Unexpected delete notification")
@@ -164,15 +174,16 @@
mainloop = self.create_mainloop()
events = parse_events("test/data/five_events.js")
+ @asyncTestMethod(mainloop)
def notify_insert_handler(time_range, events):
event_ids = map(lambda ev : ev.id, events)
self.client.delete_events(event_ids)
+ @asyncTestMethod(mainloop)
def notify_delete_handler(time_range, event_ids):
mainloop.quit()
result.extend(event_ids)
-
self.client.install_monitor(TimeRange(125, 145), [],
notify_insert_handler, notify_delete_handler)
@@ -186,16 +197,19 @@
mainloop = self.create_mainloop(None)
events = parse_events("test/data/five_events.js")
+ @asyncTestMethod(mainloop)
def timeout():
# We want this timeout - we should not get informed
# about deletions of non-existing events
mainloop.quit()
return False
+ @asyncTestMethod(mainloop)
def notify_insert_handler(time_range, events):
event_ids = map(lambda ev : ev.id, events)
self.client.delete_events([9999999])
+ @asyncTestMethod(mainloop)
def notify_delete_handler(time_range, event_ids):
mainloop.quit()
self.fail("Notified about deletion of non-existing events %s", events)
@@ -213,18 +227,22 @@
mainloop = self.create_mainloop()
events = parse_events("test/data/five_events.js")
+ @asyncTestMethod(mainloop)
def check_ok():
if len(result1) == 2 and len(result2) == 2:
mainloop.quit()
+ @asyncTestMethod(mainloop)
def notify_insert_handler1(time_range, events):
event_ids = map(lambda ev : ev.id, events)
self.client.delete_events(event_ids)
+ @asyncTestMethod(mainloop)
def notify_delete_handler1(time_range, event_ids):
result1.extend(event_ids)
check_ok()
+ @asyncTestMethod(mainloop)
def notify_delete_handler2(time_range, event_ids):
result2.extend(event_ids)
check_ok()
@@ -246,9 +264,11 @@
mainloop = self.create_mainloop()
tmpl = Event.new_for_values(interpretation="stfu:OpenEvent")
+ @asyncTestMethod(mainloop)
def notify_insert_handler(notification_type, events):
pass
+ @asyncTestMethod(mainloop)
def notify_delete_handler(time_range, event_ids):
mainloop.quit()
self.fail("Unexpected delete notification")
@@ -256,6 +276,7 @@
mon = self.client.install_monitor(TimeRange.always(), [tmpl],
notify_insert_handler, notify_delete_handler)
+ @asyncTestMethod(mainloop)
def removed_handler(result_state):
result.append(result_state)
mainloop.quit()
=== removed file 'test/dbus/remote-test.orig.py'
--- test/dbus/remote-test.orig.py 2011-07-28 18:41:01 +0000
+++ test/dbus/remote-test.orig.py 1970-01-01 00:00:00 +0000
@@ -1,623 +0,0 @@
-#! /usr/bin/python
-# -.- coding: utf-8 -.-
-
-# Zeitgeist
-#
-# Copyright © 2009-2011 Seif Lotfy <seif@xxxxxxxxx>
-# Copyright © 2009-2011 Siegfried-Angel Gevatter Pujals <siegfried@xxxxxxxxxxxx>
-# Copyright © 2009-2011 Mikkel Kamstrup Erlandsen <mikkel.kamstrup@xxxxxxxxx>
-# Copyright © 2009-2011 Markus Korn <thekorn@xxxxxx>
-# Copyright © 2011 Collabora Ltd.
-# By Siegfried-Angel Gevatter Pujals <siegfried@xxxxxxxxxxxx>
-# By Seif Lotfy <seif@xxxxxxxxx>
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License as published by
-# the Free Software Foundation, either version 2.1 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import unittest
-import os
-import sys
-import logging
-import signal
-import time
-import tempfile
-import shutil
-import pickle
-from subprocess import Popen, PIPE
-
-# DBus setup
-import gobject
-from dbus.mainloop.glib import DBusGMainLoop
-DBusGMainLoop(set_as_default=True)
-from dbus.exceptions import DBusException
-
-from zeitgeist.datamodel import (Event, Subject, Interpretation, Manifestation,
- TimeRange, StorageState, DataSource)
-
-import testutils
-from testutils import parse_events
-
-class ZeitgeistRemoteAPITest(testutils.RemoteTestCase):
-
- def __init__(self, methodName):
- super(ZeitgeistRemoteAPITest, self).__init__(methodName)
-
- def testInsertAndGetEvent(self):
- ev = Event.new_for_values(timestamp=123,
- interpretation=Interpretation.ACCESS_EVENT,
- manifestation=Manifestation.USER_ACTIVITY,
- actor="Freak Mamma")
- subj = Subject.new_for_values(uri="void://foobar",
- interpretation=Interpretation.DOCUMENT,
- manifestation=Manifestation.FILE_DATA_OBJECT)
- ev.append_subject(subj)
- ids = self.insertEventsAndWait([ev])
- events = self.getEventsAndWait(ids)
- self.assertEquals(1, len(ids))
- self.assertEquals(1, len(events))
-
- ev = events[0]
- self.assertTrue(isinstance(ev, Event))
- self.assertEquals("123", ev.timestamp)
- self.assertEquals(Interpretation.ACCESS_EVENT, ev.interpretation)
- self.assertEquals(Manifestation.USER_ACTIVITY, ev.manifestation)
- self.assertEquals("Freak Mamma", ev.actor)
- self.assertEquals(1, len(ev.subjects))
- self.assertEquals("void://foobar", ev.subjects[0].uri)
- self.assertEquals(Interpretation.DOCUMENT, ev.subjects[0].interpretation)
- self.assertEquals(Manifestation.FILE_DATA_OBJECT, ev.subjects[0].manifestation)
-
- def testFindTwoOfThreeEvents(self):
- ev1 = Event.new_for_values(timestamp=400,
- interpretation=Interpretation.ACCESS_EVENT,
- manifestation=Manifestation.USER_ACTIVITY,
- actor="Boogaloo")
- ev2 = Event.new_for_values(timestamp=500,
- interpretation=Interpretation.ACCESS_EVENT,
- manifestation=Manifestation.USER_ACTIVITY,
- actor="Boogaloo")
- ev3 = Event.new_for_values(timestamp=600,
- interpretation=Interpretation.SEND_EVENT,
- manifestation=Manifestation.USER_ACTIVITY,
- actor="Boogaloo")
- subj1 = Subject.new_for_values(uri="foo://bar",
- interpretation=Interpretation.DOCUMENT,
- manifestation=Manifestation.FILE_DATA_OBJECT)
- subj2 = Subject.new_for_values(uri="foo://baz",
- interpretation=Interpretation.IMAGE,
- manifestation=Manifestation.FILE_DATA_OBJECT)
- subj3 = Subject.new_for_values(uri="foo://quiz",
- interpretation=Interpretation.AUDIO,
- manifestation=Manifestation.FILE_DATA_OBJECT)
- ev1.append_subject(subj1)
- ev2.append_subject(subj1)
- ev2.append_subject(subj2)
- ev3.append_subject(subj2)
- ev3.append_subject(subj3)
- ids = self.insertEventsAndWait([ev1, ev2, ev3])
- self.assertEquals(3, len(ids))
-
- events = self.getEventsAndWait(ids)
- self.assertEquals(3, len(events))
- for event in events:
- self.assertTrue(isinstance(event, Event))
- self.assertEquals(Manifestation.USER_ACTIVITY, event.manifestation)
- self.assertEquals("Boogaloo", event.actor)
-
- # Search for everything
- ids = self.findEventIdsAndWait([], num_events=3)
- self.assertEquals(3, len(ids)) # (we can not trust the ids because we don't have a clean test environment)
-
- # Search for some specific templates
- subj_templ1 = Subject.new_for_values(manifestation=Manifestation.FILE_DATA_OBJECT)
- subj_templ2 = Subject.new_for_values(interpretation=Interpretation.IMAGE)
- event_template = Event.new_for_values(
- actor="Boogaloo",
- interpretation=Interpretation.ACCESS_EVENT,
- subjects=[subj_templ1,subj_templ2])
- ids = self.findEventIdsAndWait([event_template],
- num_events=10)
- self.assertEquals(1, len(ids))
-
- def testUnicodeInsert(self):
- events = parse_events("test/data/unicode_event.js")
- ids = self.insertEventsAndWait(events)
- self.assertEquals(len(ids), len(events))
- result_events = self.getEventsAndWait(ids)
- self.assertEquals(len(ids), len(result_events))
-
- def testGetEvents(self):
- events = parse_events("test/data/five_events.js")
- ids = self.insertEventsAndWait(events) + [1000, 2000]
- result = self.getEventsAndWait(ids)
- self.assertEquals(len(filter(None, result)), len(events))
- self.assertEquals(len(filter(lambda event: event is None, result)), 2)
-
- def testMonitorInsertEvents(self):
- result = []
- mainloop = self.create_mainloop()
- tmpl = Event.new_for_values(interpretation="stfu:OpenEvent")
- events = parse_events("test/data/five_events.js")
-
- def notify_insert_handler(time_range, events):
- result.extend(events)
- mainloop.quit()
-
- def notify_delete_handler(time_range, event_ids):
- mainloop.quit()
- self.fail("Unexpected delete notification")
-
- self.client.install_monitor(TimeRange.always(), [tmpl],
- notify_insert_handler, notify_delete_handler)
- self.client.insert_events(events)
- mainloop.run()
-
- self.assertEquals(2, len(result))
-
- def testMonitorDeleteEvents(self):
- result = []
- mainloop = self.create_mainloop()
- events = parse_events("test/data/five_events.js")
-
- def notify_insert_handler(time_range, events):
- event_ids = map(lambda ev : ev.id, events)
- self.client.delete_events(event_ids)
-
- def notify_delete_handler(time_range, event_ids):
- mainloop.quit()
- result.extend(event_ids)
-
-
- self.client.install_monitor(TimeRange(125, 145), [],
- notify_insert_handler, notify_delete_handler)
-
- self.client.insert_events(events)
- mainloop.run()
-
- self.assertEquals(2, len(result))
-
- def testMonitorDeleteNonExistingEvent(self):
- result = []
- mainloop = self.create_mainloop(None)
- events = parse_events("test/data/five_events.js")
-
- def timeout():
- # We want this timeout - we should not get informed
- # about deletions of non-existing events
- mainloop.quit()
- return False
-
- def notify_insert_handler(time_range, events):
- event_ids = map(lambda ev : ev.id, events)
- self.client.delete_events([9999999])
-
- def notify_delete_handler(time_range, event_ids):
- mainloop.quit()
- self.fail("Notified about deletion of non-existing events %s", events)
-
- self.client.install_monitor(TimeRange(125, 145), [],
- notify_insert_handler, notify_delete_handler)
-
- gobject.timeout_add_seconds(5, timeout)
- self.client.insert_events(events)
- mainloop.run()
-
- def testTwoMonitorsDeleteEvents(self):
- result1 = []
- result2 = []
- mainloop = self.create_mainloop()
- events = parse_events("test/data/five_events.js")
-
- def check_ok():
- if len(result1) == 2 and len(result2) == 2:
- mainloop.quit()
-
- def notify_insert_handler1(time_range, events):
- event_ids = map(lambda ev : ev.id, events)
- self.client.delete_events(event_ids)
-
- def notify_delete_handler1(time_range, event_ids):
- result1.extend(event_ids)
- check_ok()
-
- def notify_delete_handler2(time_range, event_ids):
- result2.extend(event_ids)
- check_ok()
-
- self.client.install_monitor(TimeRange(125, 145), [],
- notify_insert_handler1, notify_delete_handler1)
-
- self.client.install_monitor(TimeRange(125, 145), [],
- lambda x, y: x, notify_delete_handler2)
-
- self.client.insert_events(events)
- mainloop.run()
-
- self.assertEquals(2, len(result1))
- self.assertEquals(2, len(result2))
-
- def testMonitorInstallRemoval(self):
- result = []
- mainloop = self.create_mainloop()
- tmpl = Event.new_for_values(interpretation="stfu:OpenEvent")
-
- def notify_insert_handler(notification_type, events):
- pass
-
- def notify_delete_handler(time_range, event_ids):
- mainloop.quit()
- self.fail("Unexpected delete notification")
-
- mon = self.client.install_monitor(TimeRange.always(), [tmpl],
- notify_insert_handler, notify_delete_handler)
-
- def removed_handler(result_state):
- result.append(result_state)
- mainloop.quit()
-
- self.client.remove_monitor(mon, removed_handler)
- mainloop.run()
- self.assertEquals(1, len(result))
- self.assertEquals(1, result.pop())
-
- def testDeleteEvents(self):
- """ delete all events with actor == firefox """
- events = parse_events("test/data/five_events.js")
- self.insertEventsAndWait(events)
-
- event = Event()
- event.actor = "firefox"
-
- # get event ids with actor == firefox
- ff_ids = self.findEventIdsAndWait([event])
- # delete this events
- time_range = self.deleteEventsAndWait(ff_ids)
- # got timerange of deleted events
- self.assertEquals(2, len(time_range))
- # get all events, the one with actor == firefox should
- # not be there
- ids = self.findEventIdsAndWait([])
- self.assertEquals(2, len(ids))
- self.assertEquals(0, len(set(ff_ids) & set(ids)))
-
- def testFindByRandomActorAndGet(self):
- events = parse_events("test/data/five_events.js")
- self.insertEventsAndWait(events)
-
- template = Event.new_for_values(actor="/usr/bliblablu")
-
- ids = self.findEventIdsAndWait([template])
- self.assertEquals(len(ids), 0)
-
- events = self.getEventsAndWait(ids)
- self.assertEquals(len(events), 0)
-
- def testFindRelated(self):
- events = parse_events("test/data/apriori_events.js")
- self.insertEventsAndWait(events)
-
- uris = self.findRelatedAndWait(["i2"], num_events=2, result_type=1)
- self.assertEquals(uris, ["i3", "i1"])
-
- uris = self.findRelatedAndWait(["i2"], num_events=2, result_type=0)
- self.assertEquals(uris, ["i1", "i3"])
-
- def testFindEventsForValues(self):
- events = parse_events("test/data/apriori_events.js")
- self.insertEventsAndWait(events)
-
- result = self.findEventsForValuesAndWait(actor="firefox", num_events=1)
- self.assertEquals(len(result), 1)
- self.assertEquals(result[0].actor, "firefox")
-
- def testFindEventsWithStringPayload(self):
- mainloop = self.create_mainloop()
- payload = "Hello World"
- def callback(ids):
- def callback2(events):
- mainloop.quit()
- self.assertEquals(events[0].payload, map(ord, payload))
- self.client.get_events(ids, callback2)
- events = [Event.new_for_values(actor=u"boo", timestamp=124, subject_uri="file://yomomma")]
- events[0].payload = payload
- self.client.insert_events(events, callback)
- mainloop.run()
-
- def testFindEventsWithNonASCIIPayload(self):
- mainloop = self.create_mainloop()
- payload = u"äöü".encode("utf-8")
- def callback(ids):
- def callback2(events):
- mainloop.quit()
- self.assertEquals(events[0].payload, map(ord, payload))
- self.client.get_events(ids, callback2)
- events = [Event.new_for_values(actor=u"boo", timestamp=124, subject_uri="file://yomomma")]
- events[0].payload = payload
- self.client.insert_events(events, callback)
- mainloop.run()
-
- def testFindEventsWithBinaryPayload(self):
- mainloop = self.create_mainloop()
- payload = pickle.dumps(1234)
- def callback(ids):
- def callback2(events):
- mainloop.quit()
- self.assertEquals(events[0].payload, map(ord, payload))
- self.client.get_events(ids, callback2)
- events = [Event.new_for_values(actor=u"boo", timestamp=124, subject_uri="file://yomomma")]
- events[0].payload = payload
- self.client.insert_events(events, callback)
- mainloop.run()
-
-class ZeitgeistRemoteInterfaceTest(unittest.TestCase):
-
- def setUp(self):
- from _zeitgeist import engine
- from _zeitgeist.engine import sql, constants
- engine._engine = None
- sql.unset_cursor()
- self.saved_data = {
- "datapath": constants.DATA_PATH,
- "database": constants.DATABASE_FILE,
- "extensions": constants.USER_EXTENSION_PATH,
- }
- constants.DATA_PATH = tempfile.mkdtemp(prefix="zeitgeist.datapath.")
- constants.DATABASE_FILE = ":memory:"
- constants.USER_EXTENSION_PATH = os.path.join(constants.DATA_PATH, "extensions")
-
- def tearDown(self):
- from _zeitgeist.engine import constants
- shutil.rmtree(constants.DATA_PATH)
- constants.DATA_PATH = self.saved_data["datapath"]
- constants.DATABASE_FILE = self.saved_data["database"]
- constants.USER_EXTENSION_PATH = self.saved_data["extensions"]
-
- def testQuit(self):
- """calling Quit() on the remote interface should shutdown the
- engine in a clean way"""
- from _zeitgeist.engine.remote import RemoteInterface
- interface = RemoteInterface()
- self.assertEquals(interface._engine.is_closed(), False)
- interface.Quit()
- self.assertEquals(interface._engine.is_closed(), True)
-
-
-class ZeitgeistRemoteDataSourceRegistryTest(testutils.RemoteTestCase):
-
- _ds1 = [
- "www.example.com/foo",
- "Foo Source",
- "Awakes the foo in you",
- [
- Event.new_for_values(subject_manifestation = "!stfu:File"),
- Event.new_for_values(interpretation = "stfu:CreateEvent")
- ],
- ]
-
- _ds2 = [
- "www.example.org/bar",
- u"© mwhahaha çà éü",
- u"Thŧ ıs teĦ ünâçÃÃe¡",
- []
- ]
-
- _ds2b = [
- "www.example.org/bar", # same unique ID as _ds2
- u"This string has been translated into the ASCII language",
- u"Now the unicode is gone :(",
- [
- Event.new_for_values(subject_manifestation = "nah"),
- ],
- ]
-
- def __init__(self, methodName):
- super(ZeitgeistRemoteDataSourceRegistryTest, self).__init__(methodName)
-
- def _assertDataSourceEquals(self, dsdbus, dsref):
- self.assertEquals(dsdbus[DataSource.UniqueId], dsref[0])
- self.assertEquals(dsdbus[DataSource.Name], dsref[1])
- self.assertEquals(dsdbus[DataSource.Description], dsref[2])
- self.assertEquals(len(dsdbus[DataSource.EventTemplates]), len(dsref[3]))
- #for i, template in enumerate(dsref[3]):
- # tmpl = dsdbus[DataSource.EventTemplates][i]
- # self.assertEquals(ZgEvent.get_plain(tmpl), ZgEvent.get_plain(template))
-
- def testPresence(self):
- """ Ensure that the DataSourceRegistry extension is there """
- iface = self.client._iface # we know that client._iface is as clean as possible
- registry = iface.get_extension("DataSourceRegistry", "data_source_registry")
- registry.GetDataSources()
-
- def testGetDataSourcesEmpty(self):
- self.assertEquals(self.client._registry.GetDataSources(), [])
-
- def testRegisterDataSource(self):
- self.client.register_data_source(*self._ds1)
- datasources = list(self.client._registry.GetDataSources())
- self.assertEquals(len(datasources), 1)
- self._assertDataSourceEquals(datasources[0], self._ds1)
-
- def testRegisterDataSourceUnicode(self):
- self.client.register_data_source(*self._ds2)
- datasources = list(self.client._registry.GetDataSources())
- self.assertEquals(len(datasources), 1)
- self._assertDataSourceEquals(datasources[0], self._ds2)
-
- def testRegisterDataSourceWithCallback(self):
- self.client.register_data_source(*self._ds1, enabled_callback=lambda x: True)
-
- def testRegisterDataSources(self):
- # Insert two data-sources
- self.client._registry.RegisterDataSource(*self._ds1)
- self.client._registry.RegisterDataSource(*self._ds2)
-
- # Verify that they have been inserted correctly
- datasources = list(self.client._registry.GetDataSources())
- self.assertEquals(len(datasources), 2)
- datasources.sort(key=lambda x: x[DataSource.UniqueId])
- self._assertDataSourceEquals(datasources[0], self._ds1)
- self._assertDataSourceEquals(datasources[1], self._ds2)
-
- # Change the information of the second data-source
- self.client._registry.RegisterDataSource(*self._ds2b)
-
- # Verify that it changed correctly
- datasources = list(self.client._registry.GetDataSources())
- self.assertEquals(len(datasources), 2)
- datasources.sort(key=lambda x: x[DataSource.UniqueId])
- self._assertDataSourceEquals(datasources[0], self._ds1)
- self._assertDataSourceEquals(datasources[1], self._ds2b)
-
- def testSetDataSourceEnabled(self):
- # Insert a data-source -- it should be enabled by default
- self.client._registry.RegisterDataSource(*self._ds1)
- ds = list(self.client._registry.GetDataSources())[0]
- self.assertEquals(ds[DataSource.Enabled], True)
-
- # Now we can choose to disable it...
- self.client._registry.SetDataSourceEnabled(self._ds1[0], False)
- ds = list(self.client._registry.GetDataSources())[0]
- self.assertEquals(ds[DataSource.Enabled], False)
-
- # And enable it again!
- self.client._registry.SetDataSourceEnabled(self._ds1[0], True)
- ds = list(self.client._registry.GetDataSources())[0]
- self.assertEquals(ds[DataSource.Enabled], True)
-
- def testGetDataSourceFromId(self):
- # Insert a data-source -- and then retrieve it by id
- self.client._registry.RegisterDataSource(*self._ds1)
- ds = self.client._registry.GetDataSourceFromId(self._ds1[0])
- self._assertDataSourceEquals(ds, self._ds1)
-
- # Retrieve a data-source from an id that has not been registered
- self.assertRaises(DBusException,
- self.client._registry.GetDataSourceFromId,
- self._ds2[0])
-
- def testDataSourceSignals(self):
- mainloop = self.create_mainloop()
-
- global hit
- hit = 0
-
- def cb_registered(datasource):
- global hit
- self.assertEquals(hit, 0)
- hit = 1
-
- def cb_enabled(unique_id, enabled):
- global hit
- if hit == 1:
- self.assertEquals(enabled, False)
- hit = 2
- elif hit == 2:
- self.assertEquals(enabled, True)
- hit = 3
- # We're done -- change this if we figure out how to force a
- # disconnection from the bus, so we can also check the
- # DataSourceDisconnected signal.
- mainloop.quit()
- else:
- self.fail("Unexpected number of signals: %d." % hit)
-
- #def cb_disconnect(datasource):
- # self.assertEquals(hit, 3)
- # mainloop.quit()
-
- # Connect to signals
- self.client._registry.connect('DataSourceRegistered', cb_registered)
- self.client._registry.connect('DataSourceEnabled', cb_enabled)
- #self.client._registry.connect('DataSourceDisconnected', cb_disconnect)
-
- # Register data-source, disable it, enable it again
- gobject.idle_add(self.testSetDataSourceEnabled)
-
- mainloop.run()
-
- def testRegisterDataSourceEnabledCallbackOnRegister(self):
- mainloop = self.create_mainloop()
-
- def callback(enabled):
- mainloop.quit()
- self.client.register_data_source(*self._ds1, enabled_callback=callback)
-
- mainloop.run()
-
- def testRegisterDataSourceEnabledCallbackOnChange(self):
- mainloop = self.create_mainloop()
- global hit
- hit = 0
-
- # Register a callback
- def callback(enabled):
- global hit
- if hit == 0:
- # Register callback
- hit = 1
- elif hit == 1:
- # Disable callback
- mainloop.quit()
- else:
- self.fail("Unexpected number of signals: %d." % hit)
- self.client.register_data_source(*self._ds1)
- self.client.set_data_source_enabled_callback(self._ds1[0], callback)
-
- # Disable the data-source
- self.client._registry.SetDataSourceEnabled(self._ds1[0], False)
-
- mainloop.run()
-
-class ZeitgeistRemotePropertiesTest(testutils.RemoteTestCase):
-
- def __init__(self, methodName):
- super(ZeitgeistRemotePropertiesTest, self).__init__(methodName)
-
- def testVersion(self):
- self.assertTrue(len(self.client.get_version()) >= 2)
-
- def testExtensions(self):
- self.assertEquals(
- sorted(self.client.get_extensions()),
- ["Blacklist", "DataSourceRegistry"]
- )
- self.assertEquals(
- sorted(self.client._iface.extensions()),
- ["Blacklist", "DataSourceRegistry"]
- )
-
-
-class ZeitgeistDaemonTest(unittest.TestCase):
-
- def setUp(self):
- self.env = os.environ.copy()
- self.datapath = tempfile.mkdtemp(prefix="zeitgeist.datapath.")
- self.env.update({
- "ZEITGEIST_DATABASE_PATH": ":memory:",
- "ZEITGEIST_DATA_PATH": self.datapath,
- })
-
- def tearDown(self):
- shutil.rmtree(self.datapath)
-
- def testSIGHUP(self):
- """sending a SIGHUP signal to a running deamon instance results
- in a clean shutdown"""
- daemon = testutils.RemoteTestCase._safe_start_daemon(env=self.env)
- os.kill(daemon.pid, signal.SIGHUP)
- err = daemon.wait()
- self.assertEqual(err, 0)
-
-
-if __name__ == "__main__":
- unittest.main()
=== modified file 'test/dbus/testutils.py'
--- test/dbus/testutils.py 2011-09-05 13:09:08 +0000
+++ test/dbus/testutils.py 2011-09-07 20:48:27 +0000
@@ -85,6 +85,22 @@
events = parse_events(path)
return engine.insertEventsAndWait(events)
+def asyncTestMethod(mainloop):
+ """
+ Any callbacks happening in a MainLoopWithFailure must use
+ this decorator for exceptions raised inside them to be visible.
+ """
+ def wrap(f):
+ def new_f(*args, **kwargs):
+ try:
+ f(*args, **kwargs)
+ except AssertionError, e:
+ mainloop.fail("Assertion failed: %s" % e)
+ except Exception, e:
+ mainloop.fail("Unexpected exception: %s" % e)
+ return new_f
+ return wrap
+
class RemoteTestCase (unittest.TestCase):
"""
Helper class to implement unit tests against a
@@ -295,6 +311,9 @@
def create_mainloop(timeout=5):
class MainLoopWithFailure(object):
+ """
+ Remember to wrap callbacks using the asyncTestMethod decorator.
+ """
def __init__(self):
self._mainloop = gobject.MainLoop()