zeitgeist team mailing list archive
-
zeitgeist team
-
Mailing list archive
-
Message #03857
[Branch ~zeitgeist/zeitgeist/bluebird] Rev 203: added data-source-registry tests from original zeitgeist
------------------------------------------------------------
revno: 203
committer: Seif Lotfy <seif@xxxxxxxxx>
branch nick: bluebird
timestamp: Tue 2011-08-30 20:37:26 +0200
message:
added data-source-registry tests from original zeitgeist
added:
test/dbus/dsr-test.py
--
lp:~zeitgeist/zeitgeist/bluebird
https://code.launchpad.net/~zeitgeist/zeitgeist/bluebird
Your team Zeitgeist Framework Team is subscribed to branch lp:~zeitgeist/zeitgeist/bluebird.
To unsubscribe from this branch go to https://code.launchpad.net/~zeitgeist/zeitgeist/bluebird/+edit-subscription
=== added file 'test/dbus/dsr-test.py'
--- test/dbus/dsr-test.py 1970-01-01 00:00:00 +0000
+++ test/dbus/dsr-test.py 2011-08-30 18:37:26 +0000
@@ -0,0 +1,252 @@
+#! /usr/bin/python
+# -.- coding: utf-8 -.-
+
+# remote-test.py
+#
+# Copyright © 2009-2011 Seif Lotfy <seif@xxxxxxxxx>
+# Copyright © 2009-2011 Siegfried-Angel Gevatter Pujals <siegfried@xxxxxxxxxxxx>
+# Copyright © 2009-2011 Mikkel Kamstrup Erlandsen <mikkel.kamstrup@xxxxxxxxx>
+# Copyright © 2009-2011 Markus Korn <thekorn@xxxxxx>
+# Copyright © 2011 Collabora Ltd.
+# By Siegfried-Angel Gevatter Pujals <siegfried@xxxxxxxxxxxx>
+# By Seif Lotfy <seif@xxxxxxxxx>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License as published by
+# the Free Software Foundation, either version 2.1 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import unittest
+import os
+import sys
+import logging
+import signal
+import time
+import tempfile
+import shutil
+import pickle
+from subprocess import Popen, PIPE
+
+# DBus setup
+import gobject
+from dbus.mainloop.glib import DBusGMainLoop
+DBusGMainLoop(set_as_default=True)
+from dbus.exceptions import DBusException
+
+from zeitgeist.datamodel import (Event, Subject, Interpretation, Manifestation,
+ TimeRange, StorageState, DataSource, NULL_EVENT, ResultType)
+
+import testutils
+from testutils import parse_events, import_events
+
+
+class ZeitgeistRemoteDataSourceRegistryTest(testutils.RemoteTestCase):
+
+ _ds1 = [
+ "www.example.com/foo",
+ "Foo Source",
+ "Awakes the foo in you",
+ [
+ Event.new_for_values(subject_manifestation = "!stfu:File"),
+ Event.new_for_values(interpretation = "stfu:CreateEvent")
+ ],
+ ]
+
+ _ds2 = [
+ "www.example.org/bar",
+ u"© mwhahaha çà éü",
+ u"Thŧ ıs teĦ ünâçÃÃe¡",
+ []
+ ]
+
+ _ds2b = [
+ "www.example.org/bar", # same unique ID as _ds2
+ u"This string has been translated into the ASCII language",
+ u"Now the unicode is gone :(",
+ [
+ Event.new_for_values(subject_manifestation = "nah"),
+ ],
+ ]
+
+ def __init__(self, methodName):
+ super(ZeitgeistRemoteDataSourceRegistryTest, self).__init__(methodName)
+
+ def get_plain(self, ev):
+ """
+ Ensure that an Event instance is a Plain Old Python Object (popo),
+ without DBus wrappings etc.
+ """
+ popo = []
+ popo.append(map(unicode, ev[0]))
+ popo.append([map(unicode, subj) for subj in ev[1]])
+ # We need the check here so that if D-Bus gives us an empty
+ # byte array we don't serialize the text "dbus.Array(...)".
+ popo.append(str(ev[2]) if ev[2] else u'')
+ return popo
+
+ def _assertDataSourceEquals(self, dsdbus, dsref):
+ self.assertEquals(dsdbus[DataSource.UniqueId], dsref[0])
+ self.assertEquals(dsdbus[DataSource.Name], dsref[1])
+ self.assertEquals(dsdbus[DataSource.Description], dsref[2])
+ self.assertEquals(len(dsdbus[DataSource.EventTemplates]), len(dsref[3]))
+ for i, template in enumerate(dsref[3]):
+ tmpl = dsdbus[DataSource.EventTemplates][i]
+ self.assertEquals(self.get_plain(tmpl), self.get_plain(template))
+
+ def testPresence(self):
+ """ Ensure that the DataSourceRegistry extension is there """
+ iface = self.client._iface # we know that client._iface is as clean as possible
+ registry = iface.get_extension("DataSourceRegistry", "data_source_registry")
+ registry.GetDataSources()
+
+ def testGetDataSourcesEmpty(self):
+ self.assertEquals(self.client._registry.GetDataSources(), [])
+
+ def testRegisterDataSource(self):
+ self.client.register_data_source(*self._ds1)
+ datasources = list(self.client._registry.GetDataSources())
+ self.assertEquals(len(datasources), 1)
+ self._assertDataSourceEquals(datasources[0], self._ds1)
+
+ def testRegisterDataSourceUnicode(self):
+ self.client.register_data_source(*self._ds2)
+ datasources = list(self.client._registry.GetDataSources())
+ self.assertEquals(len(datasources), 1)
+ self._assertDataSourceEquals(datasources[0], self._ds2)
+
+ def testRegisterDataSourceWithCallback(self):
+ self.client.register_data_source(*self._ds1, enabled_callback=lambda x: True)
+
+ def testRegisterDataSources(self):
+ # Insert two data-sources
+ self.client._registry.RegisterDataSource(*self._ds1)
+ self.client._registry.RegisterDataSource(*self._ds2)
+
+ # Verify that they have been inserted correctly
+ datasources = list(self.client._registry.GetDataSources())
+ self.assertEquals(len(datasources), 2)
+ datasources.sort(key=lambda x: x[DataSource.UniqueId])
+ self._assertDataSourceEquals(datasources[0], self._ds1)
+ self._assertDataSourceEquals(datasources[1], self._ds2)
+
+ # Change the information of the second data-source
+ self.client._registry.RegisterDataSource(*self._ds2b)
+
+ # Verify that it changed correctly
+ datasources = list(self.client._registry.GetDataSources())
+ self.assertEquals(len(datasources), 2)
+ datasources.sort(key=lambda x: x[DataSource.UniqueId])
+ self._assertDataSourceEquals(datasources[0], self._ds1)
+ self._assertDataSourceEquals(datasources[1], self._ds2b)
+
+ def testSetDataSourceEnabled(self):
+ # Insert a data-source -- it should be enabled by default
+ self.client._registry.RegisterDataSource(*self._ds1)
+ ds = list(self.client._registry.GetDataSources())[0]
+ self.assertEquals(ds[DataSource.Enabled], True)
+
+ # Now we can choose to disable it...
+ self.client._registry.SetDataSourceEnabled(self._ds1[0], False)
+ ds = list(self.client._registry.GetDataSources())[0]
+ self.assertEquals(ds[DataSource.Enabled], False)
+
+ # And enable it again!
+ self.client._registry.SetDataSourceEnabled(self._ds1[0], True)
+ ds = list(self.client._registry.GetDataSources())[0]
+ self.assertEquals(ds[DataSource.Enabled], True)
+
+ def testGetDataSourceFromId(self):
+ # Insert a data-source -- and then retrieve it by id
+ self.client._registry.RegisterDataSource(*self._ds1)
+ ds = self.client._registry.GetDataSourceFromId(self._ds1[0])
+ self._assertDataSourceEquals(ds, self._ds1)
+
+ # Retrieve a data-source from an id that has not been registered
+ self.assertRaises(DBusException,
+ self.client._registry.GetDataSourceFromId,
+ self._ds2[0])
+
+ def testDataSourceSignals(self):
+ mainloop = self.create_mainloop()
+
+ global hit
+ hit = 0
+
+ def cb_registered(datasource):
+ global hit
+ self.assertEquals(hit, 0)
+ hit = 1
+
+ def cb_enabled(unique_id, enabled):
+ global hit
+ if hit == 1:
+ self.assertEquals(enabled, False)
+ hit = 2
+ elif hit == 2:
+ self.assertEquals(enabled, True)
+ hit = 3
+ # We're done -- change this if we figure out how to force a
+ # disconnection from the bus, so we can also check the
+ # DataSourceDisconnected signal.
+ mainloop.quit()
+ else:
+ self.fail("Unexpected number of signals: %d." % hit)
+
+ #def cb_disconnect(datasource):
+ # self.assertEquals(hit, 3)
+ # mainloop.quit()
+
+ # Connect to signals
+ self.client._registry.connect('DataSourceRegistered', cb_registered)
+ self.client._registry.connect('DataSourceEnabled', cb_enabled)
+ #self.client._registry.connect('DataSourceDisconnected', cb_disconnect)
+
+ # Register data-source, disable it, enable it again
+ gobject.idle_add(self.testSetDataSourceEnabled)
+
+ mainloop.run()
+
+ def testRegisterDataSourceEnabledCallbackOnRegister(self):
+ mainloop = self.create_mainloop()
+
+ def callback(enabled):
+ mainloop.quit()
+ self.client.register_data_source(*self._ds1, enabled_callback=callback)
+
+ mainloop.run()
+
+ def testRegisterDataSourceEnabledCallbackOnChange(self):
+ mainloop = self.create_mainloop()
+ global hit
+ hit = 0
+
+ # Register a callback
+ def callback(enabled):
+ global hit
+ if hit == 0:
+ # Register callback
+ hit = 1
+ elif hit == 1:
+ # Disable callback
+ mainloop.quit()
+ else:
+ self.fail("Unexpected number of signals: %d." % hit)
+ self.client.register_data_source(*self._ds1)
+ self.client.set_data_source_enabled_callback(self._ds1[0], callback)
+
+ # Disable the data-source
+ self.client._registry.SetDataSourceEnabled(self._ds1[0], False)
+
+ mainloop.run()
+
+
+if __name__ == "__main__":
+ unittest.main()