zeitgeist team mailing list archive
-
zeitgeist team
-
Mailing list archive
-
Message #04231
[Branch ~zeitgeist/zeitgeist/bluebird] Rev 290: Merge in the standalone fts indexer
------------------------------------------------------------
revno: 290
committer: Michal Hruby <michal.mhr@xxxxxxxxx>
branch nick: bb-fts
timestamp: Mon 2011-10-10 16:07:42 +0200
message:
Merge in the standalone fts indexer
added:
extensions/fts-python/
extensions/fts-python/Makefile.am
extensions/fts-python/constants.py
extensions/fts-python/datamodel.py
extensions/fts-python/fts.py
extensions/fts-python/lrucache.py
extensions/fts-python/sql.py
modified:
configure.ac
extensions/Makefile.am
extra/Makefile.am
extra/org.gnome.zeitgeist.service.in
--
lp:~zeitgeist/zeitgeist/bluebird
https://code.launchpad.net/~zeitgeist/zeitgeist/bluebird
Your team Zeitgeist Framework Team is subscribed to branch lp:~zeitgeist/zeitgeist/bluebird.
To unsubscribe from this branch go to https://code.launchpad.net/~zeitgeist/zeitgeist/bluebird/+edit-subscription
=== modified file 'configure.ac'
--- configure.ac 2011-09-15 17:57:10 +0000
+++ configure.ac 2011-10-10 14:07:42 +0000
@@ -35,10 +35,28 @@
AC_SUBST(BLUEBIRD_CFLAGS)
AC_SUBST(BLUEBIRD_LIBS)
+#################################################
+# DBus service
+#################################################
+
+AC_ARG_WITH([session_bus_services_dir],
+ AC_HELP_STRING([--with-session-bus-services-dir], [Path to DBus services directory]))
+
+if test "x$with_session_bus_services_dir" = "x" ; then
+ PKG_CHECK_MODULES(DBUS_MODULE, "dbus-1")
+ services_dir="`$PKG_CONFIG --variable session_bus_services_dir dbus-1`"
+else
+ services_dir="$with_session_bus_services_dir"
+fi
+
+DBUS_SERVICES_DIR="$services_dir"
+AC_SUBST(DBUS_SERVICES_DIR)
+
AC_CONFIG_FILES([
Makefile
src/Makefile
extensions/Makefile
+ extensions/fts-python/Makefile
extra/Makefile
extra/ontology/Makefile
test/Makefile
=== modified file 'extensions/Makefile.am'
--- extensions/Makefile.am 2011-10-10 11:01:12 +0000
+++ extensions/Makefile.am 2011-10-10 14:07:42 +0000
@@ -1,3 +1,5 @@
+SUBDIRS = fts-python
+
NULL =
#extensionsdir = $(libdir)/zeitgeist/extensions
=== added directory 'extensions/fts-python'
=== added file 'extensions/fts-python/Makefile.am'
--- extensions/fts-python/Makefile.am 1970-01-01 00:00:00 +0000
+++ extensions/fts-python/Makefile.am 2011-10-10 14:07:42 +0000
@@ -0,0 +1,20 @@
+NULL =
+
+ftsdir = $(pkgdatadir)/fts-python
+dist_fts_SCRIPTS = \
+ datamodel.py \
+ constants.py \
+ fts.py \
+ lrucache.py \
+ sql.py \
+ $(NULL)
+
+servicedir = $(DBUS_SERVICE_DIR)
+service_DATA = org.gnome.zeitgeist.fts.service
+
+org.gnome.zeitgeist.fts.service: org.gnome.zeitgeist.fts.service.in
+ $(AM_V_GEN)sed -e s!\@pkgdatadir\@!$(pkgdatadir)! < $< > $@
+org.gnome.zeitgeist.fts.service: Makefile
+
+EXTRA_DIST = org.gnome.zeitgeist.fts.service.in
+CLEANFILES = org.gnome.zeitgeist.fts.service
=== added file 'extensions/fts-python/constants.py'
--- extensions/fts-python/constants.py 1970-01-01 00:00:00 +0000
+++ extensions/fts-python/constants.py 2011-10-10 14:07:42 +0000
@@ -0,0 +1,71 @@
+# -.- coding: utf-8 -.-
+
+# Zeitgeist
+#
+# Copyright © 2009 Markus Korn <thekorn@xxxxxx>
+# Copyright © 2009-2010 Siegfried-Angel Gevatter Pujals <rainct@xxxxxxxxxx>
+# Copyright © 2009 Mikkel Kamstrup Erlandsen <mikkel.kamstrup@xxxxxxxxx>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License as published by
+# the Free Software Foundation, either version 2.1 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import os
+import logging
+from xdg import BaseDirectory
+
+from zeitgeist.client import ZeitgeistDBusInterface
+
+__all__ = [
+ "log",
+ "get_engine",
+ "constants"
+]
+
+log = logging.getLogger("zeitgeist.engine")
+
+_engine = None
+def get_engine():
+ """ Get the running engine instance or create a new one. """
+ global _engine
+ if _engine is None or _engine.is_closed():
+ import main # _zeitgeist.engine.main
+ _engine = main.ZeitgeistEngine()
+ return _engine
+
+class _Constants:
+ # Directories
+ DATA_PATH = os.environ.get("ZEITGEIST_DATA_PATH",
+ BaseDirectory.save_data_path("zeitgeist"))
+ DATABASE_FILE = os.environ.get("ZEITGEIST_DATABASE_PATH",
+ os.path.join(DATA_PATH, "activity.sqlite"))
+ DATABASE_FILE_BACKUP = os.environ.get("ZEITGEIST_DATABASE_BACKUP_PATH",
+ os.path.join(DATA_PATH, "activity.sqlite.bck"))
+ DEFAULT_LOG_PATH = os.path.join(BaseDirectory.xdg_cache_home,
+ "zeitgeist", "daemon.log")
+
+ # D-Bus
+ DBUS_INTERFACE = ZeitgeistDBusInterface.INTERFACE_NAME
+ SIG_EVENT = "asaasay"
+
+ # Required version of DB schema
+ CORE_SCHEMA="core"
+ CORE_SCHEMA_VERSION = 4
+
+ USER_EXTENSION_PATH = os.path.join(DATA_PATH, "extensions")
+
+ # configure runtime cache for events
+ # default size is 2000
+ CACHE_SIZE = int(os.environ.get("ZEITGEIST_CACHE_SIZE", 2000))
+ log.debug("Cache size = %i" %CACHE_SIZE)
+
+constants = _Constants()
=== added file 'extensions/fts-python/datamodel.py'
--- extensions/fts-python/datamodel.py 1970-01-01 00:00:00 +0000
+++ extensions/fts-python/datamodel.py 2011-10-10 14:07:42 +0000
@@ -0,0 +1,83 @@
+# -.- coding: utf-8 -.-
+
+# Zeitgeist
+#
+# Copyright © 2009 Mikkel Kamstrup Erlandsen <mikkel.kamstrup@xxxxxxxxx>
+# Copyright © 2009 Markus Korn <thekorn@xxxxxx>
+# Copyright © 2009 Seif Lotfy <seif@xxxxxxxxx>
+# Copyright © 2009-2010 Siegfried-Angel Gevatter Pujals <rainct@xxxxxxxxxx>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License as published by
+# the Free Software Foundation, either version 2.1 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+from zeitgeist.datamodel import Event as OrigEvent, Subject as OrigSubject, \
+ DataSource as OrigDataSource
+
+class Event(OrigEvent):
+
+ @staticmethod
+ def _to_unicode(obj):
+ """
+ Return an unicode representation of the given object.
+ If obj is None, return an empty string.
+ """
+ return unicode(obj) if obj is not None else u""
+
+ @staticmethod
+ def _make_dbus_sendable(obj):
+ """
+ Ensure that all fields in the event struct are non-None
+ """
+ for n, value in enumerate(obj[0]):
+ obj[0][n] = obj._to_unicode(value)
+ for subject in obj[1]:
+ for n, value in enumerate(subject):
+ subject[n] = obj._to_unicode(value)
+ # The payload require special handling, since it is binary data
+ # If there is indeed data here, we must not unicode encode it!
+ if obj[2] is None:
+ obj[2] = u""
+ elif isinstance(obj[2], unicode):
+ obj[2] = str(obj[2])
+ return obj
+
+ @staticmethod
+ def get_plain(ev):
+ """
+ Ensure that an Event instance is a Plain Old Python Object (popo),
+ without DBus wrappings etc.
+ """
+ popo = []
+ popo.append(map(unicode, ev[0]))
+ popo.append([map(unicode, subj) for subj in ev[1]])
+ # We need the check here so that if D-Bus gives us an empty
+ # byte array we don't serialize the text "dbus.Array(...)".
+ popo.append(str(ev[2]) if ev[2] else u'')
+ return popo
+
+class Subject(OrigSubject):
+ pass
+
+class DataSource(OrigDataSource):
+
+ @staticmethod
+ def get_plain(datasource):
+ for plaintype, props in {
+ unicode: (DataSource.Name, DataSource.Description),
+ lambda x: map(Event.get_plain, x): (DataSource.EventTemplates,),
+ bool: (DataSource.Running, DataSource.Enabled),
+ int: (DataSource.LastSeen,),
+ }.iteritems():
+ for prop in props:
+ datasource[prop] = plaintype(datasource[prop])
+ return tuple(datasource)
=== added file 'extensions/fts-python/fts.py'
--- extensions/fts-python/fts.py 1970-01-01 00:00:00 +0000
+++ extensions/fts-python/fts.py 2011-10-10 14:07:42 +0000
@@ -0,0 +1,1272 @@
+#!/usr/bin/env python
+# -.- coding: utf-8 -.-
+
+# Zeitgeist
+#
+# Copyright © 2009 Mikkel Kamstrup Erlandsen <mikkel.kamstrup@xxxxxxxxx>
+# Copyright © 2010 Canonical Ltd
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+#
+# TODO
+#
+# - Delete events hook
+# - ? Filter on StorageState
+# - Throttle IO and CPU where possible
+
+import os, sys
+import time
+import pickle
+import dbus
+import sqlite3
+import dbus.service
+from xdg import BaseDirectory
+from xdg.DesktopEntry import DesktopEntry, xdg_data_dirs
+import logging
+import subprocess
+from xml.dom import minidom
+import xapian
+import os
+from Queue import Queue, Empty
+import threading
+from urllib import quote as url_escape, unquote as url_unescape
+import gobject, gio
+from cStringIO import StringIO
+
+from collections import defaultdict
+from array import array
+from zeitgeist.datamodel import Event as OrigEvent, StorageState, TimeRange, \
+ ResultType, get_timestamp_for_now, Interpretation, Symbol, NEGATION_OPERATOR, WILDCARD
+from datamodel import Event, Subject
+from constants import constants
+from zeitgeist.client import ZeitgeistClient, ZeitgeistDBusInterface
+from sql import get_default_cursor, unset_cursor, TableLookup, WhereClause
+from lrucache import LRUCache
+
+ZG_CLIENT = ZeitgeistClient()
+
+logging.basicConfig(level=logging.DEBUG)
+log = logging.getLogger("zeitgeist.fts")
+
+INDEX_FILE = os.path.join(constants.DATA_PATH, "bb.fts.index")
+INDEX_VERSION = "1"
+INDEX_LOCK = threading.Lock()
+FTS_DBUS_BUS_NAME = "org.gnome.zeitgeist.SimpleIndexer"
+FTS_DBUS_OBJECT_PATH = "/org/gnome/zeitgeist/index/activity"
+FTS_DBUS_INTERFACE = "org.gnome.zeitgeist.Index"
+
+FILTER_PREFIX_EVENT_INTERPRETATION = "ZGEI"
+FILTER_PREFIX_EVENT_MANIFESTATION = "ZGEM"
+FILTER_PREFIX_ACTOR = "ZGA"
+FILTER_PREFIX_SUBJECT_URI = "ZGSU"
+FILTER_PREFIX_SUBJECT_INTERPRETATION = "ZGSI"
+FILTER_PREFIX_SUBJECT_MANIFESTATION = "ZGSM"
+FILTER_PREFIX_SUBJECT_ORIGIN = "ZGSO"
+FILTER_PREFIX_SUBJECT_MIMETYPE = "ZGST"
+FILTER_PREFIX_SUBJECT_STORAGE = "ZGSS"
+FILTER_PREFIX_XDG_CATEGORY = "AC"
+
+VALUE_EVENT_ID = 0
+VALUE_TIMESTAMP = 1
+
+MAX_CACHE_BATCH_SIZE = constants.CACHE_SIZE/2
+
+# When sorting by of the COALESCING_RESULT_TYPES result types,
+# we need to fetch some extra events from the Xapian index because
+# the final result set will be coalesced on some property of the event
+COALESCING_RESULT_TYPES = [ \
+ ResultType.MostRecentSubjects,
+ ResultType.LeastRecentSubjects,
+ ResultType.MostPopularSubjects,
+ ResultType.LeastPopularSubjects,
+ ResultType.MostRecentActor,
+ ResultType.LeastRecentActor,
+ ResultType.MostPopularActor,
+ ResultType.LeastPopularActor,
+]
+
+MAX_TERM_LENGTH = 245
+
+
+class NegationNotSupported(ValueError):
+ pass
+
+class WildcardNotSupported(ValueError):
+ pass
+
+def parse_negation(kind, field, value, parse_negation=True):
+ """checks if value starts with the negation operator,
+ if value starts with the negation operator but the field does
+ not support negation a ValueError is raised.
+ This function returns a (value_without_negation, negation)-tuple
+ """
+ negation = False
+ if parse_negation and value.startswith(NEGATION_OPERATOR):
+ negation = True
+ value = value[len(NEGATION_OPERATOR):]
+ if negation and field not in kind.SUPPORTS_NEGATION:
+ raise NegationNotSupported("This field does not support negation")
+ return value, negation
+
+def parse_wildcard(kind, field, value):
+ """checks if value ends with the a wildcard,
+ if value ends with a wildcard but the field does not support wildcards
+ a ValueError is raised.
+ This function returns a (value_without_wildcard, wildcard)-tuple
+ """
+ wildcard = False
+ if value.endswith(WILDCARD):
+ wildcard = True
+ value = value[:-len(WILDCARD)]
+ if wildcard and field not in kind.SUPPORTS_WILDCARDS:
+ raise WildcardNotSupported("This field does not support wildcards")
+ return value, wildcard
+
+def parse_operators(kind, field, value):
+ """runs both (parse_negation and parse_wildcard) parser functions
+ on query values, and handles the special case of Subject.Text correctly.
+ returns a (value_without_negation_and_wildcard, negation, wildcard)-tuple
+ """
+ try:
+ value, negation = parse_negation(kind, field, value)
+ except ValueError:
+ if kind is Subject and field == Subject.Text:
+ # we do not support negation of the text field,
+ # the text field starts with the NEGATION_OPERATOR
+ # so we handle this string as the content instead
+ # of an operator
+ negation = False
+ else:
+ raise
+ value, wildcard = parse_wildcard(kind, field, value)
+ return value, negation, wildcard
+
+
+def synchronized(lock):
+ """ Synchronization decorator. """
+ def wrap(f):
+ def newFunction(*args, **kw):
+ lock.acquire()
+ try:
+ return f(*args, **kw)
+ finally:
+ lock.release()
+ return newFunction
+ return wrap
+
+class Deletion:
+ """
+ A marker class that marks an event id for deletion
+ """
+ def __init__ (self, event_id):
+ self.event_id = event_id
+
+class Reindex:
+ """
+ Marker class that tells the worker thread to rebuild the entire index.
+ On construction time all events are pulled out of the zg_engine
+ argument and stored for later processing in the worker thread.
+ This avoid concurrent access to the ZG sqlite db from the worker thread.
+ """
+ def __init__ (self, zg_engine):
+ all_events = zg_engine._find_events(1, TimeRange.always(),
+ [], StorageState.Any,
+ sys.maxint,
+ ResultType.MostRecentEvents)
+ self.all_events = all_events
+
+class SearchEngineExtension (dbus.service.Object):
+ """
+ Full text indexing and searching extension for Zeitgeist
+ """
+ PUBLIC_METHODS = []
+
+ def __init__ (self):
+ bus_name = dbus.service.BusName(FTS_DBUS_BUS_NAME, bus=dbus.SessionBus())
+ dbus.service.Object.__init__(self, bus_name, FTS_DBUS_OBJECT_PATH)
+ self._indexer = Indexer()
+
+ ZG_CLIENT.install_monitor((0, 2**63 - 1), [],
+ self.pre_insert_event, self.post_delete_events)
+
+ def pre_insert_event(self, timerange, events):
+ for event in events:
+ self._indexer.index_event (event)
+
+ def post_delete_events (self, ids):
+ for _id in ids:
+ self._indexer.delete_event (_id)
+
+ @dbus.service.method(FTS_DBUS_INTERFACE,
+ in_signature="s(xx)a("+constants.SIG_EVENT+")uuu",
+ out_signature="a("+constants.SIG_EVENT+")u")
+ def Search(self, query_string, time_range, filter_templates, offset, count, result_type):
+ """
+ DBus method to perform a full text search against the contents of the
+ Zeitgeist log. Returns an array of events.
+ """
+ time_range = TimeRange(time_range[0], time_range[1])
+ filter_templates = map(Event, filter_templates)
+ events, hit_count = self._indexer.search(query_string, time_range,
+ filter_templates,
+ offset, count, result_type)
+ return self._make_events_sendable (events), hit_count
+
+ @dbus.service.method(FTS_DBUS_INTERFACE,
+ in_signature="",
+ out_signature="")
+ def ForceReindex(self):
+ """
+ DBus method to force a reindex of the entire Zeitgeist log.
+ This method is only intended for debugging purposes and is not
+ considered blessed public API.
+ """
+ log.debug ("Received ForceReindex request over DBus.")
+ self._indexer._queue.put (Reindex (self.engine))
+
+ def _make_events_sendable(self, events):
+ return [NULL_EVENT if event is None else Event._make_dbus_sendable(event) for event in events]
+
+def mangle_uri (uri):
+ """
+ Converts a URI into an index- and query friendly string. The problem
+ is that Xapian doesn't handle CAPITAL letters or most non-alphanumeric
+ symbols in a boolean term when it does prefix matching. The mangled
+ URIs returned from this function are suitable for boolean prefix searches.
+
+ IMPORTANT: This is a 1-way function! You can not convert back.
+ """
+ result = ""
+ for c in uri.lower():
+ if c in (": /"):
+ result += "_"
+ else:
+ result += c
+ return result
+
+def cap_string (s, nbytes=MAX_TERM_LENGTH):
+ """
+ If s has more than nbytes bytes (not characters) then cap it off
+ after nbytes bytes in a way still producing a valid utf-8 string.
+
+ Assumes that s is a utf-8 string.
+
+ This function useful for working with Xapian terms because Xapian has
+ a max term length of 245 (which is not very well documented, but see
+ http://xapian.org/docs/omega/termprefixes.html).
+ """
+ # Check if we can fast-path this string
+ if (len(s.encode("utf-8")) <= nbytes):
+ return s
+
+ # We use a StringIO here to avoid mem thrashing via naiive
+ # string concatenation. See fx. http://www.skymind.com/~ocrow/python_string/
+ buf = StringIO()
+ for char in s :
+ if buf.tell() >= nbytes - 1 :
+ return buf.getvalue()
+ buf.write(char.encode("utf-8"))
+
+ return unicode(buf.getvalue().decode("utf-8"))
+
+
+def expand_type (type_prefix, uri):
+ """
+ Return a string with a Xapian query matching all child types of 'uri'
+ inside the Xapian prefix 'type_prefix'.
+ """
+ is_negation = uri.startswith(NEGATION_OPERATOR)
+ uri = uri[1:] if is_negation else uri
+ children = Symbol.find_child_uris_extended(uri)
+ children = [ "%s:%s" % (type_prefix, child) for child in children ]
+
+ result = " OR ".join(children)
+ return result if not is_negation else "NOT (%s)" % result
+
+class Indexer:
+ """
+ Abstraction of the FT indexer and search engine
+ """
+
+ QUERY_PARSER_FLAGS = xapian.QueryParser.FLAG_PHRASE | \
+ xapian.QueryParser.FLAG_BOOLEAN | \
+ xapian.QueryParser.FLAG_PURE_NOT | \
+ xapian.QueryParser.FLAG_LOVEHATE | \
+ xapian.QueryParser.FLAG_WILDCARD
+
+ def __init__ (self):
+
+ self._cursor = cursor = get_default_cursor()
+ os.environ["XAPIAN_CJK_NGRAM"] = "1"
+ self._interpretation = TableLookup(cursor, "interpretation")
+ self._manifestation = TableLookup(cursor, "manifestation")
+ self._mimetype = TableLookup(cursor, "mimetype")
+ self._actor = TableLookup(cursor, "actor")
+ self._event_cache = LRUCache(constants.CACHE_SIZE)
+
+ log.debug("Opening full text index: %s" % INDEX_FILE)
+ try:
+ self._index = xapian.WritableDatabase(INDEX_FILE, xapian.DB_CREATE_OR_OPEN)
+ except xapian.DatabaseError, e:
+ log.warn("Full text index corrupted: '%s'. Rebuilding index." % e)
+ self._index = xapian.WritableDatabase(INDEX_FILE, xapian.DB_CREATE_OR_OVERWRITE)
+ self._tokenizer = indexer = xapian.TermGenerator()
+ self._query_parser = xapian.QueryParser()
+ self._query_parser.set_database (self._index)
+ self._query_parser.add_prefix("name", "N")
+ self._query_parser.add_prefix("title", "N")
+ self._query_parser.add_prefix("site", "S")
+ self._query_parser.add_prefix("app", "A")
+ self._query_parser.add_boolean_prefix("zgei", FILTER_PREFIX_EVENT_INTERPRETATION)
+ self._query_parser.add_boolean_prefix("zgem", FILTER_PREFIX_EVENT_MANIFESTATION)
+ self._query_parser.add_boolean_prefix("zga", FILTER_PREFIX_ACTOR)
+ self._query_parser.add_prefix("zgsu", FILTER_PREFIX_SUBJECT_URI)
+ self._query_parser.add_boolean_prefix("zgsi", FILTER_PREFIX_SUBJECT_INTERPRETATION)
+ self._query_parser.add_boolean_prefix("zgsm", FILTER_PREFIX_SUBJECT_MANIFESTATION)
+ self._query_parser.add_prefix("zgso", FILTER_PREFIX_SUBJECT_ORIGIN)
+ self._query_parser.add_boolean_prefix("zgst", FILTER_PREFIX_SUBJECT_MIMETYPE)
+ self._query_parser.add_boolean_prefix("zgss", FILTER_PREFIX_SUBJECT_STORAGE)
+ self._query_parser.add_prefix("category", FILTER_PREFIX_XDG_CATEGORY)
+ self._query_parser.add_valuerangeprocessor(
+ xapian.NumberValueRangeProcessor(VALUE_EVENT_ID, "id", True))
+ self._query_parser.add_valuerangeprocessor(
+ xapian.NumberValueRangeProcessor(VALUE_TIMESTAMP, "ms", False))
+ self._query_parser.set_default_op(xapian.Query.OP_AND)
+ self._enquire = xapian.Enquire(self._index)
+
+ self._desktops = {}
+
+ gobject.threads_init()
+ self._may_run = True
+ self._queue = Queue(0)
+ self._worker = threading.Thread(target=self._worker_thread,
+ name="IndexWorker")
+ self._worker.daemon = True
+
+ # We need to defer the index checking until after ZG has completed
+ # full setup. Hence the idle handler.
+ # We also don't start the worker until after we've checked the index
+ gobject.idle_add (self._check_index_and_start_worker)
+
+ @synchronized (INDEX_LOCK)
+ def _check_index_and_start_worker (self):
+ """
+ Check whether we need a rebuild of the index.
+ Returns True if the index is good. False if a reindexing has
+ been commenced.
+
+ This method should be called from the main thread and only once.
+ It starts the worker thread as a side effect.
+
+ We are clearing the queue, because there may be a race when an
+ event insertion / deletion is already queued and our index
+ is corrupted. Creating a new queue instance should be safe,
+ because we're running in main thread as are the index_event
+ and delete_event methods, and the worker thread wasn't yet
+ started.
+ """
+ if self._index.get_metadata("fts_index_version") != INDEX_VERSION:
+ log.info("Index must be upgraded. Doing full rebuild")
+ self._queue = Queue(0)
+ self._queue.put(Reindex(self))
+ elif self._index.get_doccount() == 0:
+ # If the index is empty we trigger a rebuild
+ # We must delay reindexing until after the engine is done setting up
+ log.info("Empty index detected. Doing full rebuild")
+ self._queue = Queue(0)
+ self._queue.put(Reindex(self))
+
+ # Now that we've checked the index from the main thread we can start the worker
+ self._worker.start()
+
+ def index_event (self, event):
+ """
+ This method schedules and event for indexing. It returns immediate and
+ defers the actual work to a bottom half thread. This means that it
+ will not block the main loop of the Zeitgeist daemon while indexing
+ (which may be a heavy operation)
+ """
+ self._queue.put (event)
+ return event
+
+ def delete_event (self, event_id):
+ """
+ Remove an event from the index given its event id
+ """
+ self._queue.put (Deletion(event_id))
+ return
+
+ @synchronized (INDEX_LOCK)
+ def search (self, query_string, time_range=None, filters=None, offset=0, maxhits=10, result_type=100):
+ """
+ Do a full text search over the indexed corpus. The `result_type`
+ parameter may be a zeitgeist.datamodel.ResultType or 100. In case it is
+ 100 the textual relevancy of the search engine will be used to sort the
+ results. Result type 100 is the fastest (and default) mode.
+
+ The filters argument should be a list of event templates.
+ """
+ # Expand event template filters if necessary
+ if filters:
+ query_string = "(%s) AND (%s)" % (query_string, self._compile_event_filter_query (filters))
+
+ # Expand time range value query
+ if time_range and not time_range.is_always():
+ query_string = "(%s) AND (%s)" % (query_string, self._compile_time_range_filter_query (time_range))
+
+ # If the result type coalesces the events we need to fetch some extra
+ # events from the index to have a chance of actually holding 'maxhits'
+ # unique events
+ if result_type in COALESCING_RESULT_TYPES:
+ raw_maxhits = maxhits * 3
+ else:
+ raw_maxhits = maxhits
+
+ # When not sorting by relevance, we fetch the results from Xapian sorted,
+ # by timestamp. That minimizes the skew we get from otherwise doing a
+ # relevancy ranked xapaian query and then resorting with Zeitgeist. The
+ # "skew" is that low-relevancy results may still have the highest timestamp
+ if result_type == 100:
+ self._enquire.set_sort_by_relevance()
+ else:
+ self._enquire.set_sort_by_value(VALUE_TIMESTAMP, True)
+
+ # Allow wildcards
+ query_start = time.time()
+ query = self._query_parser.parse_query (query_string,
+ self.QUERY_PARSER_FLAGS)
+ self._enquire.set_query (query)
+ hits = self._enquire.get_mset (offset, raw_maxhits)
+ hit_count = hits.get_matches_estimated()
+ log.debug("Search '%s' gave %s hits in %sms" %
+ (query_string, hits.get_matches_estimated(), (time.time() - query_start)*1000))
+
+ if result_type == 100:
+ event_ids = []
+ for m in hits:
+ event_id = int(xapian.sortable_unserialise(
+ m.document.get_value(VALUE_EVENT_ID)))
+ event_ids.append (event_id)
+ if event_ids:
+ return self.get_events(event_ids), hit_count
+ else:
+ return [], 0
+ else:
+ templates = []
+ for m in hits:
+ event_id = int(xapian.sortable_unserialise(
+ m.document.get_value(VALUE_EVENT_ID)))
+ ev = Event()
+ ev[0][Event.Id] = str(event_id)
+ templates.append(ev)
+ if templates:
+ x = self._find_events(1, TimeRange.always(),
+ templates,
+ StorageState.Any,
+ maxhits,
+ result_type), hit_count
+ return x
+ else:
+ return [], 0
+
+ def _worker_thread (self):
+ is_dirty = False
+ while self._may_run:
+ # FIXME: Throttle IO and CPU
+ try:
+ # If we are dirty wait a while before we flush,
+ # or if we are clean wait indefinitely to avoid
+ # needless wakeups
+ if is_dirty:
+ event = self._queue.get(True, 0.5)
+ else:
+ event = self._queue.get(True)
+
+ if isinstance (event, Deletion):
+ self._delete_event_real (event.event_id)
+ elif isinstance (event, Reindex):
+ self._reindex (event.all_events)
+ else:
+ self._index_event_real (event)
+
+ is_dirty = True
+ except Empty:
+ if is_dirty:
+ # Write changes to disk
+ log.debug("Committing FTS index")
+ self._index.flush()
+ is_dirty = False
+ else:
+ log.debug("No changes to index. Sleeping")
+
+ @synchronized (INDEX_LOCK)
+ def _reindex (self, event_list):
+ """
+ Index everything in the ZG log. The argument must be a list
+ of events. Typically extracted by a Reindex instance.
+ Only call from worker thread as it writes to the db and Xapian
+ is *not* thread safe (only single-writer-multiple-reader).
+ """
+ self._index.close ()
+ self._index = xapian.WritableDatabase(INDEX_FILE, xapian.DB_CREATE_OR_OVERWRITE)
+ self._query_parser.set_database (self._index)
+ self._enquire = xapian.Enquire(self._index)
+ # Register that this index was built with CJK enabled
+ self._index.set_metadata("fts_index_version", INDEX_VERSION)
+ log.info("Preparing to rebuild index with %s events" % len(event_list))
+ for e in event_list : self._queue.put(e)
+
+ @synchronized (INDEX_LOCK)
+ def _delete_event_real (self, event_id):
+ """
+ Look up the doc id given an event id and remove the xapian.Document
+ for that doc id.
+ Note: This is slow, but there's not much we can do about it
+ """
+ try:
+ _id = xapian.sortable_serialise(float(event_id))
+ query = xapian.Query(xapian.Query.OP_VALUE_RANGE,
+ VALUE_EVENT_ID, _id, _id)
+
+ self._enquire.set_query (query)
+ hits = self._enquire.get_mset (0, 10)
+
+ total = hits.get_matches_estimated()
+ if total > 1:
+ log.warning ("More than one event found with id '%s'" % event_id)
+ elif total <= 0:
+ log.debug ("No event for id '%s'" % event_id)
+ return
+
+ for m in hits:
+ log.debug("Deleting event '%s' with docid '%s'" %
+ (event_id, m.docid))
+ self._index.delete_document(m.docid)
+ except Exception, e:
+ log.error("Failed to delete event '%s': %s" % (event_id, e))
+
+ def _split_uri (self, uri):
+ """
+ Returns a triple of (scheme, host, and path) extracted from `uri`
+ """
+ i = uri.find(":")
+ if i == -1 :
+ scheme = ""
+ host = ""
+ path = uri
+ else:
+ scheme = uri[:i]
+ host = ""
+ path = ""
+
+ if uri[i+1] == "/" and uri[i+2] == "/":
+ j = uri.find("/", i+3)
+ if j == -1 :
+ host = uri[i+3:]
+ else:
+ host = uri[i+3:j]
+ path = uri[j:]
+ else:
+ host = uri[i+1:]
+
+ # Strip out URI query part
+ i = path.find("?")
+ if i != -1:
+ path = path[:i]
+
+ return scheme, host, path
+
+ def _get_desktop_entry (self, app_id):
+ """
+ Return a xdg.DesktopEntry.DesktopEntry `app_id` or None in case
+ no file is found for the given desktop id
+ """
+ if app_id in self._desktops:
+ return self._desktops[app_id]
+
+ for datadir in xdg_data_dirs:
+ path = os.path.join(datadir, "applications", app_id)
+ if os.path.exists(path):
+ try:
+ desktop = DesktopEntry(path)
+ self._desktops[app_id] = desktop
+ return desktop
+ except Exception, e:
+ log.warning("Unable to load %s: %s" % (path, e))
+ return None
+
+ return None
+
+ def _index_actor (self, actor):
+ """
+ Takes an actor as a path to a .desktop file or app:// uri
+ and index the contents of the corresponding .desktop file
+ into the document currently set for self._tokenizer.
+ """
+ if not actor : return
+
+ # Get the path of the .desktop file and convert it to
+ # an app id (eg. 'gedit.desktop')
+ scheme, host, path = self._split_uri(url_unescape (actor))
+ if not path:
+ path = host
+
+ if not path :
+ log.debug("Unable to determine application id for %s" % actor)
+ return
+
+ if path.startswith("/") :
+ path = os.path.basename(path)
+
+ desktop = self._get_desktop_entry(path)
+ if desktop:
+ if not desktop.getNoDisplay():
+ self._tokenizer.index_text(desktop.getName(), 5)
+ self._tokenizer.index_text(desktop.getName(), 5, "A")
+ self._tokenizer.index_text(desktop.getGenericName(), 5)
+ self._tokenizer.index_text(desktop.getGenericName(), 5, "A")
+ self._tokenizer.index_text(desktop.getComment(), 2)
+ self._tokenizer.index_text(desktop.getComment(), 2, "A")
+
+ doc = self._tokenizer.get_document()
+ for cat in desktop.getCategories():
+ doc.add_boolean_term(FILTER_PREFIX_XDG_CATEGORY+cat.lower())
+ else:
+ log.debug("Unable to look up app info for %s" % actor)
+
+
+ def _index_uri (self, uri):
+ """
+ Index `uri` into the document currectly set on self._tokenizer
+ """
+ # File URIs and paths are indexed in one way, and all other,
+ # usually web URIs, are indexed in another way because there may
+ # be domain name etc. in there we want to rank differently
+ scheme, host, path = self._split_uri (url_unescape (uri))
+ if scheme == "file" or not scheme:
+ path, name = os.path.split(path)
+ self._tokenizer.index_text(name, 5)
+ self._tokenizer.index_text(name, 5, "N")
+
+ # Index parent names with descending weight
+ weight = 5
+ while path and name:
+ weight = weight / 1.5
+ path, name = os.path.split(path)
+ self._tokenizer.index_text(name, int(weight))
+
+ elif scheme == "mailto":
+ tokens = host.split("@")
+ name = tokens[0]
+ self._tokenizer.index_text(name, 6)
+ if len(tokens) > 1:
+ self._tokenizer.index_text(" ".join[1:], 1)
+ else:
+ # We're cautious about indexing the path components of
+ # non-file URIs as some websites practice *extremely* long
+ # and useless URLs
+ path, name = os.path.split(path)
+ if len(name) > 30 : name = name[:30]
+ if len(path) > 30 : path = path[30]
+ if name:
+ self._tokenizer.index_text(name, 5)
+ self._tokenizer.index_text(name, 5, "N")
+ if path:
+ self._tokenizer.index_text(path, 1)
+ self._tokenizer.index_text(path, 1, "N")
+ if host:
+ self._tokenizer.index_text(host, 2)
+ self._tokenizer.index_text(host, 2, "N")
+ self._tokenizer.index_text(host, 2, "S")
+
+ def _index_text (self, text):
+ """
+ Index `text` as raw text data for the document currently
+ set on self._tokenizer. The text is assumed to be a primary
+ description of the subject, such as the basename of a file.
+
+ Primary use is for subject.text
+ """
+ self._tokenizer.index_text(text, 5)
+
+ def _index_contents (self, uri):
+ # xmlindexer doesn't extract words for URIs only for file paths
+
+ # FIXME: IONICE and NICE on xmlindexer
+
+ path = uri.replace("file://", "")
+ xmlindexer = subprocess.Popen(['xmlindexer', path],
+ stdout=subprocess.PIPE)
+ xml = xmlindexer.communicate()[0].strip()
+ xmlindexer.wait()
+
+ dom = minidom.parseString(xml)
+ text_nodes = dom.getElementsByTagName("text")
+ lines = []
+ if text_nodes:
+ for line in text_nodes[0].childNodes:
+ lines.append(line.data)
+
+ if lines:
+ self._tokenizer.index_text (" ".join(lines))
+
+
+ def _add_doc_filters (self, event, doc):
+ """Adds the filtering rules to the doc. Filtering rules will
+ not affect the relevancy ranking of the event/doc"""
+ if event.interpretation:
+ doc.add_boolean_term (cap_string(FILTER_PREFIX_EVENT_INTERPRETATION+event.interpretation))
+ if event.manifestation:
+ doc.add_boolean_term (cap_string(FILTER_PREFIX_EVENT_MANIFESTATION+event.manifestation))
+ if event.actor:
+ doc.add_boolean_term (cap_string(FILTER_PREFIX_ACTOR+mangle_uri(event.actor)))
+
+ for su in event.subjects:
+ if su.uri:
+ doc.add_boolean_term (cap_string(FILTER_PREFIX_SUBJECT_URI+mangle_uri(su.uri)))
+ if su.interpretation:
+ doc.add_boolean_term (cap_string(FILTER_PREFIX_SUBJECT_INTERPRETATION+su.interpretation))
+ if su.manifestation:
+ doc.add_boolean_term (cap_string(FILTER_PREFIX_SUBJECT_MANIFESTATION+su.manifestation))
+ if su.origin:
+ doc.add_boolean_term (cap_string(FILTER_PREFIX_SUBJECT_ORIGIN+mangle_uri(su.origin)))
+ if su.mimetype:
+ doc.add_boolean_term (cap_string(FILTER_PREFIX_SUBJECT_MIMETYPE+su.mimetype))
+ if su.storage:
+ doc.add_boolean_term (cap_string(FILTER_PREFIX_SUBJECT_STORAGE+su.storage))
+
+ @synchronized (INDEX_LOCK)
+ def _index_event_real (self, event):
+ if not isinstance (event, OrigEvent):
+ log.error("Not an Event, found: %s" % type(event))
+ if not event.id:
+ log.warning("Not indexing event. Event has no id")
+ return
+
+ try:
+ doc = xapian.Document()
+ doc.add_value (VALUE_EVENT_ID,
+ xapian.sortable_serialise(float(event.id)))
+ doc.add_value (VALUE_TIMESTAMP,
+ xapian.sortable_serialise(float(event.timestamp)))
+ self._tokenizer.set_document (doc)
+
+ self._index_actor (event.actor)
+
+ for subject in event.subjects:
+ if not subject.uri : continue
+
+ # By spec URIs can have arbitrary length. In reality that's just silly.
+ # The general online "rule" is to keep URLs less than 2k so we just
+ # choose to enforce that
+ if len(subject.uri) > 2000:
+ log.info ("URI too long (%s). Discarding: %s..."% (len(subject.uri), subject.uri[:30]))
+ return
+ log.debug("Indexing '%s'" % subject.uri)
+
+ self._index_uri (subject.uri)
+ self._index_text (subject.text)
+
+ # If the subject URI is an actor, we index the .desktop also
+ if subject.uri.startswith ("application://"):
+ self._index_actor (subject.uri)
+
+ # File contents indexing disabled for now...
+ #self._index_contents (subject.uri)
+
+ # FIXME: Possibly index payloads when we have apriori knowledge
+
+ self._add_doc_filters (event, doc)
+ self._index.add_document (doc)
+
+ except Exception, e:
+ log.error("Error indexing event: %s" % e)
+
+ def _compile_event_filter_query (self, events):
+ """Takes a list of event templates and compiles a filter query
+ based on their, interpretations, manifestations, and actor,
+ for event and subjects.
+
+ All fields within the same event will be ANDed and each template
+ will be ORed with the others. Like elsewhere in Zeitgeist the
+ type tree of the interpretations and manifestations will be expanded
+ to match all child symbols as well
+ """
+ query = []
+ for event in events:
+ if not isinstance(event, Event):
+ raise TypeError("Expected Event. Found %s" % type(event))
+
+ tmpl = []
+ if event.interpretation :
+ tmpl.append(expand_type("zgei", event.interpretation))
+ if event.manifestation :
+ tmpl.append(expand_type("zgem", event.manifestation))
+ if event.actor : tmpl.append("zga:%s" % mangle_uri(event.actor))
+ for su in event.subjects:
+ if su.uri :
+ tmpl.append("zgsu:%s" % mangle_uri(su.uri))
+ if su.interpretation :
+ tmpl.append(expand_type("zgsi", su.interpretation))
+ if su.manifestation :
+ tmpl.append(expand_type("zgsm", su.manifestation))
+ if su.origin :
+ tmpl.append("zgso:%s" % mangle_uri(su.origin))
+ if su.mimetype :
+ tmpl.append("zgst:%s" % su.mimetype)
+ if su.storage :
+ tmpl.append("zgss:%s" % su.storage)
+
+ tmpl = "(" + ") AND (".join(tmpl) + ")"
+ query.append(tmpl)
+
+ return " OR ".join(query)
+
+ def _compile_time_range_filter_query (self, time_range):
+ """Takes a TimeRange and compiles a range query for it"""
+
+ if not isinstance(time_range, TimeRange):
+ raise TypeError("Expected TimeRange, but found %s" % type(time_range))
+
+ return "%s..%sms" % (time_range.begin, time_range.end)
+
+ def _get_event_from_row(self, row):
+ event = Event()
+ event[0][Event.Id] = row["id"] # Id property is read-only in the public API
+ event.timestamp = row["timestamp"]
+ for field in ("interpretation", "manifestation", "actor"):
+ # Try to get event attributes from row using the attributed field id
+ # If attribute does not exist we break the attribute fetching and return
+ # None instead of of crashing
+ try:
+ setattr(event, field, getattr(self, "_" + field).value(row[field]))
+ except KeyError, e:
+ log.error("Event %i broken: Table %s has no id %i" \
+ %(row["id"], field, row[field]))
+ return None
+ event.origin = row["event_origin_uri"] or ""
+ event.payload = row["payload"] or "" # default payload: empty string
+ return event
+
+ def _get_subject_from_row(self, row):
+ subject = Subject()
+ for field in ("uri", "text", "storage"):
+ setattr(subject, field, row["subj_" + field])
+ subject.origin = row["subj_origin_uri"]
+ if row["subj_current_uri"]:
+ subject.current_uri = row["subj_current_uri"]
+ for field in ("interpretation", "manifestation", "mimetype"):
+ # Try to get subject attributes from row using the attributed field id
+ # If attribute does not exist we break the attribute fetching and return
+ # None instead of crashing
+ try:
+ setattr(subject, field,
+ getattr(self, "_" + field).value(row["subj_" + field]))
+ except KeyError, e:
+ log.error("Event %i broken: Table %s has no id %i" \
+ %(row["id"], field, row["subj_" + field]))
+ return None
+ return subject
+
+ def get_events(self, ids, sender=None):
+ """
+ Look up a list of events.
+ """
+
+ t = time.time()
+
+ if not ids:
+ return []
+
+ # Split ids into cached and uncached
+ uncached_ids = array("i")
+ cached_ids = array("i")
+
+ # If ids batch greater than MAX_CACHE_BATCH_SIZE ids ignore cache
+ use_cache = True
+ if len(ids) > MAX_CACHE_BATCH_SIZE:
+ use_cache = False
+ if not use_cache:
+ uncached_ids = ids
+ else:
+ for id in ids:
+ if id in self._event_cache:
+ cached_ids.append(id)
+ else:
+ uncached_ids.append(id)
+
+ id_hash = defaultdict(lambda: array("i"))
+ for n, id in enumerate(ids):
+ # the same id can be at multible places (LP: #673916)
+ # cache all of them
+ id_hash[id].append(n)
+
+ # If we are not able to get an event by the given id
+ # append None instead of raising an Error. The client
+ # might simply have requested an event that has been
+ # deleted
+ events = {}
+ sorted_events = [None]*len(ids)
+
+ for id in cached_ids:
+ event = self._event_cache[id]
+ if event:
+ if event is not None:
+ for n in id_hash[event.id]:
+ # insert the event into all necessary spots (LP: #673916)
+ sorted_events[n] = event
+
+ # Get uncached events
+ rows = self._cursor.execute("""
+ SELECT * FROM event_view
+ WHERE id IN (%s)
+ """ % ",".join("%d" % _id for _id in uncached_ids))
+
+ time_get_uncached = time.time() - t
+ t = time.time()
+
+ t_get_event = 0
+ t_get_subject = 0
+ t_apply_get_hooks = 0
+
+ row_counter = 0
+ for row in rows:
+ row_counter += 1
+ # Assumption: all rows of a same event for its different
+ # subjects are in consecutive order.
+ t_get_event -= time.time()
+ event = self._get_event_from_row(row)
+ t_get_event += time.time()
+
+ if event:
+ # Check for existing event.id in event to attach
+ # other subjects to it
+ if event.id not in events:
+ events[event.id] = event
+ else:
+ event = events[event.id]
+
+ t_get_subject -= time.time()
+ subject = self._get_subject_from_row(row)
+ t_get_subject += time.time()
+ # Check if subject has a proper value. If none than something went
+ # wrong while trying to fetch the subject from the row. So instead
+ # of failing and raising an error. We silently skip the event.
+ if subject:
+ event.append_subject(subject)
+ if use_cache and not event.payload:
+ self._event_cache[event.id] = event
+ if event is not None:
+ for n in id_hash[event.id]:
+ # insert the event into all necessary spots (LP: #673916)
+ sorted_events[n] = event
+ # Avoid caching events with payloads to have keep the cache MB size
+ # at a decent level
+
+
+ log.debug("Got %d raw events in %fs" % (row_counter, time_get_uncached))
+ log.debug("Got %d events in %fs" % (len(sorted_events), time.time()-t))
+ log.debug(" Where time spent in _get_event_from_row in %fs" % (t_get_event))
+ log.debug(" Where time spent in _get_subject_from_row in %fs" % (t_get_subject))
+ log.debug(" Where time spent in apply_get_hooks in %fs" % (t_apply_get_hooks))
+ return sorted_events
+
+ def _find_events(self, return_mode, time_range, event_templates,
+ storage_state, max_events, order, sender=None):
+ """
+ Accepts 'event_templates' as either a real list of Events or as
+ a list of tuples (event_data, subject_data) as we do in the
+ DBus API.
+
+ Return modes:
+ - 0: IDs.
+ - 1: Events.
+ """
+ t = time.time()
+
+ where = self._build_sql_event_filter(time_range, event_templates,
+ storage_state)
+
+ if not where.may_have_results():
+ return []
+
+ if return_mode == 0:
+ sql = "SELECT DISTINCT id FROM event_view"
+ elif return_mode == 1:
+ sql = "SELECT id FROM event_view"
+ else:
+ raise NotImplementedError, "Unsupported return_mode."
+
+ wheresql = " WHERE %s" % where.sql if where else ""
+
+ def group_and_sort(field, wheresql, time_asc=False, count_asc=None,
+ aggregation_type='max'):
+
+ args = {
+ 'field': field,
+ 'aggregation_type': aggregation_type,
+ 'where_sql': wheresql,
+ 'time_sorting': 'ASC' if time_asc else 'DESC',
+ 'aggregation_sql': '',
+ 'order_sql': '',
+ }
+
+ if count_asc is not None:
+ args['aggregation_sql'] = ', COUNT(%s) AS num_events' % \
+ field
+ args['order_sql'] = 'num_events %s,' % \
+ ('ASC' if count_asc else 'DESC')
+
+ return """
+ NATURAL JOIN (
+ SELECT %(field)s,
+ %(aggregation_type)s(timestamp) AS timestamp
+ %(aggregation_sql)s
+ FROM event_view %(where_sql)s
+ GROUP BY %(field)s)
+ GROUP BY %(field)s
+ ORDER BY %(order_sql)s timestamp %(time_sorting)s
+ """ % args
+
+ if order == ResultType.MostRecentEvents:
+ sql += wheresql + " ORDER BY timestamp DESC"
+ elif order == ResultType.LeastRecentEvents:
+ sql += wheresql + " ORDER BY timestamp ASC"
+ elif order == ResultType.MostRecentEventOrigin:
+ sql += group_and_sort("origin", wheresql, time_asc=False)
+ elif order == ResultType.LeastRecentEventOrigin:
+ sql += group_and_sort("origin", wheresql, time_asc=True)
+ elif order == ResultType.MostPopularEventOrigin:
+ sql += group_and_sort("origin", wheresql, time_asc=False,
+ count_asc=False)
+ elif order == ResultType.LeastPopularEventOrigin:
+ sql += group_and_sort("origin", wheresql, time_asc=True,
+ count_asc=True)
+ elif order == ResultType.MostRecentSubjects:
+ # Remember, event.subj_id identifies the subject URI
+ sql += group_and_sort("subj_id", wheresql, time_asc=False)
+ elif order == ResultType.LeastRecentSubjects:
+ sql += group_and_sort("subj_id", wheresql, time_asc=True)
+ elif order == ResultType.MostPopularSubjects:
+ sql += group_and_sort("subj_id", wheresql, time_asc=False,
+ count_asc=False)
+ elif order == ResultType.LeastPopularSubjects:
+ sql += group_and_sort("subj_id", wheresql, time_asc=True,
+ count_asc=True)
+ elif order == ResultType.MostRecentCurrentUri:
+ sql += group_and_sort("subj_id_current", wheresql, time_asc=False)
+ elif order == ResultType.LeastRecentCurrentUri:
+ sql += group_and_sort("subj_id_current", wheresql, time_asc=True)
+ elif order == ResultType.MostPopularCurrentUri:
+ sql += group_and_sort("subj_id_current", wheresql, time_asc=False,
+ count_asc=False)
+ elif order == ResultType.LeastPopularCurrentUri:
+ sql += group_and_sort("subj_id_current", wheresql, time_asc=True,
+ count_asc=True)
+ elif order == ResultType.MostRecentActor:
+ sql += group_and_sort("actor", wheresql, time_asc=False)
+ elif order == ResultType.LeastRecentActor:
+ sql += group_and_sort("actor", wheresql, time_asc=True)
+ elif order == ResultType.MostPopularActor:
+ sql += group_and_sort("actor", wheresql, time_asc=False,
+ count_asc=False)
+ elif order == ResultType.LeastPopularActor:
+ sql += group_and_sort("actor", wheresql, time_asc=True,
+ count_asc=True)
+ elif order == ResultType.OldestActor:
+ sql += group_and_sort("actor", wheresql, time_asc=True,
+ aggregation_type="min")
+ elif order == ResultType.MostRecentOrigin:
+ sql += group_and_sort("subj_origin", wheresql, time_asc=False)
+ elif order == ResultType.LeastRecentOrigin:
+ sql += group_and_sort("subj_origin", wheresql, time_asc=True)
+ elif order == ResultType.MostPopularOrigin:
+ sql += group_and_sort("subj_origin", wheresql, time_asc=False,
+ count_asc=False)
+ elif order == ResultType.LeastPopularOrigin:
+ sql += group_and_sort("subj_origin", wheresql, time_asc=True,
+ count_asc=True)
+ elif order == ResultType.MostRecentSubjectInterpretation:
+ sql += group_and_sort("subj_interpretation", wheresql,
+ time_asc=False)
+ elif order == ResultType.LeastRecentSubjectInterpretation:
+ sql += group_and_sort("subj_interpretation", wheresql,
+ time_asc=True)
+ elif order == ResultType.MostPopularSubjectInterpretation:
+ sql += group_and_sort("subj_interpretation", wheresql,
+ time_asc=False, count_asc=False)
+ elif order == ResultType.LeastPopularSubjectInterpretation:
+ sql += group_and_sort("subj_interpretation", wheresql,
+ time_asc=True, count_asc=True)
+ elif order == ResultType.MostRecentMimeType:
+ sql += group_and_sort("subj_mimetype", wheresql, time_asc=False)
+ elif order == ResultType.LeastRecentMimeType:
+ sql += group_and_sort("subj_mimetype", wheresql, time_asc=True)
+ elif order == ResultType.MostPopularMimeType:
+ sql += group_and_sort("subj_mimetype", wheresql, time_asc=False,
+ count_asc=False)
+ elif order == ResultType.LeastPopularMimeType:
+ sql += group_and_sort("subj_mimetype", wheresql, time_asc=True,
+ count_asc=True)
+
+ if max_events > 0:
+ sql += " LIMIT %d" % max_events
+ result = array("i", self._cursor.execute(sql, where.arguments).fetch(0))
+
+ if return_mode == 0:
+ log.debug("Found %d event IDs in %fs" % (len(result), time.time()- t))
+ elif return_mode == 1:
+ log.debug("Found %d events in %fs" % (len(result), time.time()- t))
+ result = self.get_events(ids=result, sender=sender)
+ else:
+ raise Exception("%d" % return_mode)
+
+ return result
+
+ @staticmethod
+ def _build_templates(templates):
+ for event_template in templates:
+ event_data = event_template[0]
+ for subject in (event_template[1] or (Subject(),)):
+ yield Event((event_data, [], None)), Subject(subject)
+
+ def _build_sql_from_event_templates(self, templates):
+
+ where_or = WhereClause(WhereClause.OR)
+
+ for template in templates:
+ event_template = Event((template[0], [], None))
+ if template[1]:
+ subject_templates = [Subject(data) for data in template[1]]
+ else:
+ subject_templates = None
+
+ subwhere = WhereClause(WhereClause.AND)
+
+ if event_template.id:
+ subwhere.add("id = ?", event_template.id)
+
+ try:
+ value, negation, wildcard = parse_operators(Event, Event.Interpretation, event_template.interpretation)
+ # Expand event interpretation children
+ event_interp_where = WhereClause(WhereClause.OR, negation)
+ for child_interp in (Symbol.find_child_uris_extended(value)):
+ if child_interp:
+ event_interp_where.add_text_condition("interpretation",
+ child_interp, like=wildcard, cache=self._interpretation)
+ if event_interp_where:
+ subwhere.extend(event_interp_where)
+
+ value, negation, wildcard = parse_operators(Event, Event.Manifestation, event_template.manifestation)
+ # Expand event manifestation children
+ event_manif_where = WhereClause(WhereClause.OR, negation)
+ for child_manif in (Symbol.find_child_uris_extended(value)):
+ if child_manif:
+ event_manif_where.add_text_condition("manifestation",
+ child_manif, like=wildcard, cache=self._manifestation)
+ if event_manif_where:
+ subwhere.extend(event_manif_where)
+
+ value, negation, wildcard = parse_operators(Event, Event.Actor, event_template.actor)
+ if value:
+ subwhere.add_text_condition("actor", value, wildcard, negation, cache=self._actor)
+
+ value, negation, wildcard = parse_operators(Event, Event.Origin, event_template.origin)
+ if value:
+ subwhere.add_text_condition("origin", value, wildcard, negation)
+
+ if subject_templates is not None:
+ for subject_template in subject_templates:
+ value, negation, wildcard = parse_operators(Subject, Subject.Interpretation, subject_template.interpretation)
+ # Expand subject interpretation children
+ su_interp_where = WhereClause(WhereClause.OR, negation)
+ for child_interp in (Symbol.find_child_uris_extended(value)):
+ if child_interp:
+ su_interp_where.add_text_condition("subj_interpretation",
+ child_interp, like=wildcard, cache=self._interpretation)
+ if su_interp_where:
+ subwhere.extend(su_interp_where)
+
+ value, negation, wildcard = parse_operators(Subject, Subject.Manifestation, subject_template.manifestation)
+ # Expand subject manifestation children
+ su_manif_where = WhereClause(WhereClause.OR, negation)
+ for child_manif in (Symbol.find_child_uris_extended(value)):
+ if child_manif:
+ su_manif_where.add_text_condition("subj_manifestation",
+ child_manif, like=wildcard, cache=self._manifestation)
+ if su_manif_where:
+ subwhere.extend(su_manif_where)
+
+ # FIXME: Expand mime children as well.
+ # Right now we only do exact matching for mimetypes
+ # thekorn: this will be fixed when wildcards are supported
+ value, negation, wildcard = parse_operators(Subject, Subject.Mimetype, subject_template.mimetype)
+ if value:
+ subwhere.add_text_condition("subj_mimetype",
+ value, wildcard, negation, cache=self._mimetype)
+
+ for key in ("uri", "origin", "text"):
+ value = getattr(subject_template, key)
+ if value:
+ value, negation, wildcard = parse_operators(Subject, getattr(Subject, key.title()), value)
+ subwhere.add_text_condition("subj_%s" % key, value, wildcard, negation)
+
+ if subject_template.current_uri:
+ value, negation, wildcard = parse_operators(Subject,
+ Subject.CurrentUri, subject_template.current_uri)
+ subwhere.add_text_condition("subj_current_uri", value, wildcard, negation)
+
+ if subject_template.storage:
+ subwhere.add_text_condition("subj_storage", subject_template.storage)
+
+ except KeyError, e:
+ # Value not in DB
+ log.debug("Unknown entity in query: %s" % e)
+ where_or.register_no_result()
+ continue
+ where_or.extend(subwhere)
+ return where_or
+
+ def _build_sql_event_filter(self, time_range, templates, storage_state):
+
+ where = WhereClause(WhereClause.AND)
+
+ # thekorn: we are using the unary operator here to tell sql to not use
+ # the index on the timestamp column at the first place. This `fix` for
+ # (LP: #672965) is based on some benchmarks, which suggest a performance
+ # win, but we might not oversee all implications.
+ # (see http://www.sqlite.org/optoverview.html section 6.0)
+ min_time, max_time = time_range
+ if min_time != 0:
+ where.add("+timestamp >= ?", min_time)
+ if max_time != sys.maxint:
+ where.add("+timestamp <= ?", max_time)
+
+ if storage_state in (StorageState.Available, StorageState.NotAvailable):
+ where.add("(subj_storage_state = ? OR subj_storage_state IS NULL)",
+ storage_state)
+ elif storage_state != StorageState.Any:
+ raise ValueError, "Unknown storage state '%d'" % storage_state
+
+ where.extend(self._build_sql_from_event_templates(templates))
+
+ return where
+
+if __name__ == "__main__":
+ mainloop = gobject.MainLoop(is_running=True)
+ search_engine = SearchEngineExtension()
+ mainloop.run()
+
=== added file 'extensions/fts-python/lrucache.py'
--- extensions/fts-python/lrucache.py 1970-01-01 00:00:00 +0000
+++ extensions/fts-python/lrucache.py 2011-10-10 14:07:42 +0000
@@ -0,0 +1,125 @@
+# -.- coding: utf-8 -.-
+
+# lrucache.py
+#
+# Copyright © 2009 Mikkel Kamstrup Erlandsen <mikkel.kamstrup@xxxxxxxxx>
+# Copyright © 2009 Markus Korn <thekorn@xxxxxx>
+# Copyright © 2011 Seif Lotfy <seif@xxxxxxxxx>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License as published by
+# the Free Software Foundation, either version 2.1 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+class LRUCache:
+ """
+ A simple LRUCache implementation backed by a linked list and a dict.
+ It can be accessed and updated just like a dict. To check if an element
+ exists in the cache the following type of statements can be used:
+ if "foo" in cache
+ """
+
+ class _Item:
+ """
+ A container for each item in LRUCache which knows about the
+ item's position and relations
+ """
+ def __init__(self, item_key, item_value):
+ self.value = item_value
+ self.key = item_key
+ self.next = None
+ self.prev = None
+
+ def __init__(self, max_size):
+ """
+ The size of the cache (in number of cached items) is guaranteed to
+ never exceed 'size'
+ """
+ self._max_size = max_size
+ self.clear()
+
+
+ def clear(self):
+ self._list_end = None # The newest item
+ self._list_start = None # Oldest item
+ self._map = {}
+
+ def __len__(self):
+ return len(self._map)
+
+ def __contains__(self, key):
+ return key in self._map
+
+ def __delitem__(self, key):
+ item = self._map[key]
+ if item.prev:
+ item.prev.next = item.next
+ else:
+ # we are deleting the first item, so we need a new first one
+ self._list_start = item.next
+ if item.next:
+ item.next.prev = item.prev
+ else:
+ # we are deleting the last item, get a new last one
+ self._list_end = item.prev
+ del self._map[key], item
+
+ def __setitem__(self, key, value):
+ if key in self._map:
+ item = self._map[key]
+ item.value = value
+ self._move_item_to_end(item)
+ else:
+ new = LRUCache._Item(key, value)
+ self._append_to_list(new)
+
+ if len(self._map) > self._max_size :
+ # Remove eldest entry from list
+ self.remove_eldest_item()
+
+ def __getitem__(self, key):
+ item = self._map[key]
+ self._move_item_to_end(item)
+ return item.value
+
+ def __iter__(self):
+ """
+ Iteration is in order from eldest to newest,
+ and returns (key,value) tuples
+ """
+ iter = self._list_start
+ while iter != None:
+ yield (iter.key, iter.value)
+ iter = iter.next
+
+ def _move_item_to_end(self, item):
+ del self[item.key]
+ self._append_to_list(item)
+
+ def _append_to_list(self, item):
+ self._map[item.key] = item
+ if not self._list_start:
+ self._list_start = item
+ if self._list_end:
+ self._list_end.next = item
+ item.prev = self._list_end
+ item.next = None
+ self._list_end = item
+
+ def remove_eldest_item(self):
+ if self._list_start == self._list_end:
+ self._list_start = None
+ self._list_end = None
+ return
+ old = self._list_start
+ old.next.prev = None
+ self._list_start = old.next
+ del self[old.key], old
=== added file 'extensions/fts-python/sql.py'
--- extensions/fts-python/sql.py 1970-01-01 00:00:00 +0000
+++ extensions/fts-python/sql.py 2011-10-10 14:07:42 +0000
@@ -0,0 +1,686 @@
+# -.- coding: utf-8 -.-
+
+# Zeitgeist
+#
+# Copyright © 2009-2010 Siegfried-Angel Gevatter Pujals <rainct@xxxxxxxxxx>
+# Copyright © 2009 Mikkel Kamstrup Erlandsen <mikkel.kamstrup@xxxxxxxxx>
+# Copyright © 2009-2011 Markus Korn <thekorn@xxxxxxx>
+# Copyright © 2009 Seif Lotfy <seif@xxxxxxxxx>
+# Copyright © 2011 J.P. Lacerda <jpaflacerda@xxxxxxxxx>
+# Copyright © 2011 Collabora Ltd.
+# By Siegfried-Angel Gevatter Pujals <rainct@xxxxxxxxxx>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License as published by
+# the Free Software Foundation, either version 2.1 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import sqlite3
+import logging
+import time
+import os
+import shutil
+
+from constants import constants
+
+log = logging.getLogger("siis.zeitgeist.sql")
+
+TABLE_MAP = {
+ "origin": "uri",
+ "subj_mimetype": "mimetype",
+ "subj_origin": "uri",
+ "subj_uri": "uri",
+ "subj_current_uri": "uri",
+}
+
+def explain_query(cursor, statement, arguments=()):
+ plan = ""
+ for r in cursor.execute("EXPLAIN QUERY PLAN "+statement, arguments).fetchall():
+ plan += str(list(r)) + "\n"
+ log.debug("Got query:\nQUERY:\n%s (%s)\nPLAN:\n%s" % (statement, arguments, plan))
+
+class UnicodeCursor(sqlite3.Cursor):
+
+ debug_explain = os.getenv("ZEITGEIST_DEBUG_QUERY_PLANS")
+
+ @staticmethod
+ def fix_unicode(obj):
+ if isinstance(obj, (int, long)):
+ # thekorn: as long as we are using the unary operator for timestamp
+ # related queries we have to make sure that integers are not
+ # converted to strings, same applies for long numbers.
+ return obj
+ if isinstance(obj, str):
+ obj = obj.decode("UTF-8")
+ # seif: Pythonâs default encoding is ASCII, so whenever a character with
+ # an ASCII value > 127 is in the input data, youâll get a UnicodeDecodeError
+ # because that character canât be handled by the ASCII encoding.
+ try:
+ obj = unicode(obj)
+ except UnicodeDecodeError, ex:
+ pass
+ return obj
+
+ def execute(self, statement, parameters=()):
+ parameters = [self.fix_unicode(p) for p in parameters]
+ if UnicodeCursor.debug_explain:
+ explain_query(super(UnicodeCursor, self), statement, parameters)
+ return super(UnicodeCursor, self).execute(statement, parameters)
+
+ def fetch(self, index=None):
+ if index is not None:
+ for row in self:
+ yield row[index]
+ else:
+ for row in self:
+ yield row
+
+def _get_schema_version (cursor, schema_name):
+ """
+ Returns the schema version for schema_name or returns 0 in case
+ the schema doesn't exist.
+ """
+ try:
+ schema_version_result = cursor.execute("""
+ SELECT version FROM schema_version WHERE schema=?
+ """, (schema_name,))
+ result = schema_version_result.fetchone()
+ return result[0] if result else 0
+ except sqlite3.OperationalError, e:
+ # The schema isn't there...
+ log.debug ("Schema '%s' not found: %s" % (schema_name, e))
+ return 0
+
+def _set_schema_version (cursor, schema_name, version):
+ """
+ Sets the version of `schema_name` to `version`
+ """
+ cursor.execute("""
+ CREATE TABLE IF NOT EXISTS schema_version
+ (schema VARCHAR PRIMARY KEY ON CONFLICT REPLACE, version INT)
+ """)
+
+ # The 'ON CONFLICT REPLACE' on the PK converts INSERT to UPDATE
+ # when appriopriate
+ cursor.execute("""
+ INSERT INTO schema_version VALUES (?, ?)
+ """, (schema_name, version))
+ cursor.connection.commit()
+
+def _do_schema_upgrade (cursor, schema_name, old_version, new_version):
+ """
+ Try and upgrade schema `schema_name` from version `old_version` to
+ `new_version`. This is done by executing a series of upgrade modules
+ named '_zeitgeist.engine.upgrades.$schema_name_$(i)_$(i+1)' and executing
+ the run(cursor) method of those modules until new_version is reached
+ """
+ _do_schema_backup()
+ _set_schema_version(cursor, schema_name, -1)
+ for i in xrange(old_version, new_version):
+ # Fire off the right upgrade module
+ log.info("Upgrading database '%s' from version %s to %s. "
+ "This may take a while" % (schema_name, i, i+1))
+ upgrader_name = "%s_%s_%s" % (schema_name, i, i+1)
+ module = __import__ ("_zeitgeist.engine.upgrades.%s" % upgrader_name)
+ eval("module.engine.upgrades.%s.run(cursor)" % upgrader_name)
+
+ # Update the schema version
+ _set_schema_version(cursor, schema_name, new_version)
+
+ log.info("Upgrade succesful")
+
+def _check_core_schema_upgrade (cursor):
+ """
+ Checks whether the schema is good or, if it is outdated, triggers any
+ necessary upgrade scripts. This method will also attempt to restore a
+ database backup in case a previous upgrade was cancelled midway.
+
+ It returns a boolean indicating whether the schema was good and the
+ database cursor (which will have changed if the database was restored).
+ """
+ # See if we have the right schema version, and try an upgrade if needed
+ core_schema_version = _get_schema_version(cursor, constants.CORE_SCHEMA)
+ if core_schema_version >= constants.CORE_SCHEMA_VERSION:
+ return True, cursor
+ else:
+ try:
+ if core_schema_version <= -1:
+ cursor.connection.commit()
+ cursor.connection.close()
+ _do_schema_restore()
+ cursor = _connect_to_db(constants.DATABASE_FILE)
+ core_schema_version = _get_schema_version(cursor,
+ constants.CORE_SCHEMA)
+ log.exception("Database corrupted at upgrade -- "
+ "upgrading from version %s" % core_schema_version)
+
+ _do_schema_upgrade (cursor,
+ constants.CORE_SCHEMA,
+ core_schema_version,
+ constants.CORE_SCHEMA_VERSION)
+
+ # Don't return here. The upgrade process might depend on the
+ # tables, indexes, and views being set up (to avoid code dup)
+ log.info("Running post upgrade setup")
+ return False, cursor
+ except sqlite3.OperationalError:
+ # Something went wrong while applying the upgrade -- this is
+ # probably due to a non existing table (this occurs when
+ # applying core_3_4, for example). We just need to fall through
+ # the rest of create_db to fix this...
+ log.exception("Database corrupted -- proceeding")
+ return False, cursor
+ except Exception, e:
+ log.exception(
+ "Failed to upgrade database '%s' from version %s to %s: %s" % \
+ (constants.CORE_SCHEMA, core_schema_version,
+ constants.CORE_SCHEMA_VERSION, e))
+ raise SystemExit(27)
+
+def _do_schema_backup ():
+ shutil.copyfile(constants.DATABASE_FILE, constants.DATABASE_FILE_BACKUP)
+
+def _do_schema_restore ():
+ shutil.move(constants.DATABASE_FILE_BACKUP, constants.DATABASE_FILE)
+
+def _connect_to_db(file_path):
+ conn = sqlite3.connect(file_path)
+ conn.row_factory = sqlite3.Row
+ cursor = conn.cursor(UnicodeCursor)
+ return cursor
+
+def create_db(file_path):
+ """Create the database and return a default cursor for it"""
+ start = time.time()
+ log.info("Using database: %s" % file_path)
+ new_database = not os.path.exists(file_path)
+ cursor = _connect_to_db(file_path)
+
+ # Seif: as result of the optimization story (LP: #639737) we are setting
+ # journal_mode to WAL if possible, this change is irreversible but
+ # gains us a big speedup, for more information see http://www.sqlite.org/wal.html
+ # FIXME: Set journal_mode to WAL when teamdecision has been take.
+ # cursor.execute("PRAGMA journal_mode = WAL")
+ cursor.execute("PRAGMA journal_mode = DELETE")
+ # Seif: another result of the performance tweaks discussed in (LP: #639737)
+ # we decided to set locking_mode to EXCLUSIVE, from now on only
+ # one connection to the database is allowed to revert this setting set locking_mode to NORMAL.
+
+ # thekorn: as part of the workaround for (LP: #598666) we need to
+ # create the '_fix_cache' TEMP table on every start,
+ # this table gets purged once the engine gets closed.
+ # When a cached value gets deleted we automatically store the name
+ # of the cache and the value's id to this table. It's then up to
+ # the python code to delete items from the cache based on the content
+ # of this table.
+ cursor.execute("CREATE TEMP TABLE _fix_cache (table_name VARCHAR, id INTEGER)")
+
+ # Always assume that temporary memory backed DBs have good schemas
+ if constants.DATABASE_FILE != ":memory:" and not new_database:
+ do_upgrade, cursor = _check_core_schema_upgrade(cursor)
+ if do_upgrade:
+ _time = (time.time() - start)*1000
+ log.debug("Core schema is good. DB loaded in %sms" % _time)
+ return cursor
+
+ # the following sql statements are only executed if a new database
+ # is created or an update of the core schema was done
+ log.debug("Updating sql schema")
+ # uri
+ cursor.execute("""
+ CREATE TABLE IF NOT EXISTS uri
+ (id INTEGER PRIMARY KEY, value VARCHAR UNIQUE)
+ """)
+ cursor.execute("""
+ CREATE UNIQUE INDEX IF NOT EXISTS uri_value ON uri(value)
+ """)
+
+ # interpretation
+ cursor.execute("""
+ CREATE TABLE IF NOT EXISTS interpretation
+ (id INTEGER PRIMARY KEY, value VARCHAR UNIQUE)
+ """)
+ cursor.execute("""
+ CREATE UNIQUE INDEX IF NOT EXISTS interpretation_value
+ ON interpretation(value)
+ """)
+
+ # manifestation
+ cursor.execute("""
+ CREATE TABLE IF NOT EXISTS manifestation
+ (id INTEGER PRIMARY KEY, value VARCHAR UNIQUE)
+ """)
+ cursor.execute("""
+ CREATE UNIQUE INDEX IF NOT EXISTS manifestation_value
+ ON manifestation(value)""")
+
+ # mimetype
+ cursor.execute("""
+ CREATE TABLE IF NOT EXISTS mimetype
+ (id INTEGER PRIMARY KEY, value VARCHAR UNIQUE)
+ """)
+ cursor.execute("""
+ CREATE UNIQUE INDEX IF NOT EXISTS mimetype_value
+ ON mimetype(value)""")
+
+ # actor
+ cursor.execute("""
+ CREATE TABLE IF NOT EXISTS actor
+ (id INTEGER PRIMARY KEY, value VARCHAR UNIQUE)
+ """)
+ cursor.execute("""
+ CREATE UNIQUE INDEX IF NOT EXISTS actor_value
+ ON actor(value)""")
+
+ # text
+ cursor.execute("""
+ CREATE TABLE IF NOT EXISTS text
+ (id INTEGER PRIMARY KEY, value VARCHAR UNIQUE)
+ """)
+ cursor.execute("""
+ CREATE UNIQUE INDEX IF NOT EXISTS text_value
+ ON text(value)""")
+
+ # payload, there's no value index for payload,
+ # they can only be fetched by id
+ cursor.execute("""
+ CREATE TABLE IF NOT EXISTS payload
+ (id INTEGER PRIMARY KEY, value BLOB)
+ """)
+
+ # storage, represented by a StatefulEntityTable
+ cursor.execute("""
+ CREATE TABLE IF NOT EXISTS storage
+ (id INTEGER PRIMARY KEY,
+ value VARCHAR UNIQUE,
+ state INTEGER,
+ icon VARCHAR,
+ display_name VARCHAR)
+ """)
+ cursor.execute("""
+ CREATE UNIQUE INDEX IF NOT EXISTS storage_value
+ ON storage(value)""")
+
+ # event - the primary table for log statements
+ # - Note that event.id is NOT unique, we can have multiple subjects per ID
+ # - Timestamps are integers.
+ # - (event-)origin and subj_id_current are added to the end of the table
+ cursor.execute("""
+ CREATE TABLE IF NOT EXISTS event (
+ id INTEGER,
+ timestamp INTEGER,
+ interpretation INTEGER,
+ manifestation INTEGER,
+ actor INTEGER,
+ payload INTEGER,
+ subj_id INTEGER,
+ subj_interpretation INTEGER,
+ subj_manifestation INTEGER,
+ subj_origin INTEGER,
+ subj_mimetype INTEGER,
+ subj_text INTEGER,
+ subj_storage INTEGER,
+ origin INTEGER,
+ subj_id_current INTEGER,
+ CONSTRAINT interpretation_fk FOREIGN KEY(interpretation)
+ REFERENCES interpretation(id) ON DELETE CASCADE,
+ CONSTRAINT manifestation_fk FOREIGN KEY(manifestation)
+ REFERENCES manifestation(id) ON DELETE CASCADE,
+ CONSTRAINT actor_fk FOREIGN KEY(actor)
+ REFERENCES actor(id) ON DELETE CASCADE,
+ CONSTRAINT origin_fk FOREIGN KEY(origin)
+ REFERENCES uri(id) ON DELETE CASCADE,
+ CONSTRAINT payload_fk FOREIGN KEY(payload)
+ REFERENCES payload(id) ON DELETE CASCADE,
+ CONSTRAINT subj_id_fk FOREIGN KEY(subj_id)
+ REFERENCES uri(id) ON DELETE CASCADE,
+ CONSTRAINT subj_id_current_fk FOREIGN KEY(subj_id_current)
+ REFERENCES uri(id) ON DELETE CASCADE,
+ CONSTRAINT subj_interpretation_fk FOREIGN KEY(subj_interpretation)
+ REFERENCES interpretation(id) ON DELETE CASCADE,
+ CONSTRAINT subj_manifestation_fk FOREIGN KEY(subj_manifestation)
+ REFERENCES manifestation(id) ON DELETE CASCADE,
+ CONSTRAINT subj_origin_fk FOREIGN KEY(subj_origin)
+ REFERENCES uri(id) ON DELETE CASCADE,
+ CONSTRAINT subj_mimetype_fk FOREIGN KEY(subj_mimetype)
+ REFERENCES mimetype(id) ON DELETE CASCADE,
+ CONSTRAINT subj_text_fk FOREIGN KEY(subj_text)
+ REFERENCES text(id) ON DELETE CASCADE,
+ CONSTRAINT subj_storage_fk FOREIGN KEY(subj_storage)
+ REFERENCES storage(id) ON DELETE CASCADE,
+ CONSTRAINT unique_event UNIQUE (timestamp, interpretation, manifestation, actor, subj_id)
+ )
+ """)
+ cursor.execute("""
+ CREATE INDEX IF NOT EXISTS event_id
+ ON event(id)""")
+ cursor.execute("""
+ CREATE INDEX IF NOT EXISTS event_timestamp
+ ON event(timestamp)""")
+ cursor.execute("""
+ CREATE INDEX IF NOT EXISTS event_interpretation
+ ON event(interpretation)""")
+ cursor.execute("""
+ CREATE INDEX IF NOT EXISTS event_manifestation
+ ON event(manifestation)""")
+ cursor.execute("""
+ CREATE INDEX IF NOT EXISTS event_actor
+ ON event(actor)""")
+ cursor.execute("""
+ CREATE INDEX IF NOT EXISTS event_origin
+ ON event(origin)""")
+ cursor.execute("""
+ CREATE INDEX IF NOT EXISTS event_subj_id
+ ON event(subj_id)""")
+ cursor.execute("""
+ CREATE INDEX IF NOT EXISTS event_subj_id_current
+ ON event(subj_id_current)""")
+ cursor.execute("""
+ CREATE INDEX IF NOT EXISTS event_subj_interpretation
+ ON event(subj_interpretation)""")
+ cursor.execute("""
+ CREATE INDEX IF NOT EXISTS event_subj_manifestation
+ ON event(subj_manifestation)""")
+ cursor.execute("""
+ CREATE INDEX IF NOT EXISTS event_subj_origin
+ ON event(subj_origin)""")
+ cursor.execute("""
+ CREATE INDEX IF NOT EXISTS event_subj_mimetype
+ ON event(subj_mimetype)""")
+ cursor.execute("""
+ CREATE INDEX IF NOT EXISTS event_subj_text
+ ON event(subj_text)""")
+ cursor.execute("""
+ CREATE INDEX IF NOT EXISTS event_subj_storage
+ ON event(subj_storage)""")
+
+ # Foreign key constraints don't work in SQLite. Yay!
+ for table, columns in (
+ ('interpretation', ('interpretation', 'subj_interpretation')),
+ ('manifestation', ('manifestation', 'subj_manifestation')),
+ ('actor', ('actor',)),
+ ('payload', ('payload',)),
+ ('mimetype', ('subj_mimetype',)),
+ ('text', ('subj_text',)),
+ ('storage', ('subj_storage',)),
+ ):
+ for column in columns:
+ cursor.execute("""
+ CREATE TRIGGER IF NOT EXISTS fkdc_event_%(column)s
+ BEFORE DELETE ON event
+ WHEN ((SELECT COUNT(*) FROM event WHERE %(column)s=OLD.%(column)s) < 2)
+ BEGIN
+ DELETE FROM %(table)s WHERE id=OLD.%(column)s;
+ END;
+ """ % {'column': column, 'table': table})
+
+ # ... special cases
+ for num, column in enumerate(('subj_id', 'subj_origin',
+ 'subj_id_current', 'origin')):
+ cursor.execute("""
+ CREATE TRIGGER IF NOT EXISTS fkdc_event_uri_%(num)d
+ BEFORE DELETE ON event
+ WHEN ((
+ SELECT COUNT(*)
+ FROM event
+ WHERE
+ origin=OLD.%(column)s
+ OR subj_id=OLD.%(column)s
+ OR subj_id_current=OLD.%(column)s
+ OR subj_origin=OLD.%(column)s
+ ) < 2)
+ BEGIN
+ DELETE FROM uri WHERE id=OLD.%(column)s;
+ END;
+ """ % {'num': num+1, 'column': column})
+
+ cursor.execute("DROP VIEW IF EXISTS event_view")
+ cursor.execute("""
+ CREATE VIEW IF NOT EXISTS event_view AS
+ SELECT event.id,
+ event.timestamp,
+ event.interpretation,
+ event.manifestation,
+ event.actor,
+ (SELECT value FROM payload WHERE payload.id=event.payload)
+ AS payload,
+ (SELECT value FROM uri WHERE uri.id=event.subj_id)
+ AS subj_uri,
+ event.subj_id, -- #this directly points to an id in the uri table
+ event.subj_interpretation,
+ event.subj_manifestation,
+ event.subj_origin,
+ (SELECT value FROM uri WHERE uri.id=event.subj_origin)
+ AS subj_origin_uri,
+ event.subj_mimetype,
+ (SELECT value FROM text WHERE text.id = event.subj_text)
+ AS subj_text,
+ (SELECT value FROM storage
+ WHERE storage.id=event.subj_storage) AS subj_storage,
+ (SELECT state FROM storage
+ WHERE storage.id=event.subj_storage) AS subj_storage_state,
+ event.origin,
+ (SELECT value FROM uri WHERE uri.id=event.origin)
+ AS event_origin_uri,
+ (SELECT value FROM uri WHERE uri.id=event.subj_id_current)
+ AS subj_current_uri,
+ event.subj_id_current
+ FROM event
+ """)
+
+ # All good. Set the schema version, so we don't have to do all this
+ # sql the next time around
+ _set_schema_version (cursor, constants.CORE_SCHEMA, constants.CORE_SCHEMA_VERSION)
+ _time = (time.time() - start)*1000
+ log.info("DB set up in %sms" % _time)
+ cursor.connection.commit()
+
+ return cursor
+
+_cursor = None
+def get_default_cursor():
+ global _cursor
+ if not _cursor:
+ dbfile = constants.DATABASE_FILE
+ _cursor = create_db(dbfile)
+ return _cursor
+def unset_cursor():
+ global _cursor
+ _cursor = None
+
+class TableLookup(dict):
+
+ # We are not using an LRUCache as pressumably there won't be thousands
+ # of manifestations/interpretations/mimetypes/actors on most
+ # installations, so we can save us the overhead of tracking their usage.
+
+ def __init__(self, cursor, table):
+
+ self._cursor = cursor
+ self._table = table
+
+ for row in cursor.execute("SELECT id, value FROM %s" % table):
+ self[row["value"]] = row["id"]
+
+ self._inv_dict = dict((value, key) for key, value in self.iteritems())
+
+ cursor.execute("""
+ CREATE TEMP TRIGGER update_cache_%(table)s
+ BEFORE DELETE ON %(table)s
+ BEGIN
+ INSERT INTO _fix_cache VALUES ("%(table)s", OLD.id);
+ END;
+ """ % {"table": table})
+
+ def __getitem__(self, name):
+ # Use this for inserting new properties into the database
+ if name in self:
+ return super(TableLookup, self).__getitem__(name)
+ try:
+ self._cursor.execute(
+ "INSERT INTO %s (value) VALUES (?)" % self._table, (name,))
+ id = self._cursor.lastrowid
+ except sqlite3.IntegrityError:
+ # This shouldn't happen, but just in case
+ # FIXME: Maybe we should remove it?
+ id = self._cursor.execute("SELECT id FROM %s WHERE value=?"
+ % self._table, (name,)).fetchone()[0]
+ # If we are here it's a newly inserted value, insert it into cache
+ self[name] = id
+ self._inv_dict[id] = name
+ return id
+
+ def value(self, id):
+ # When we fetch an event, it either was already in the database
+ # at the time Zeitgeist started or it was inserted later -using
+ # Zeitgeist-, so here we always have the data in memory already.
+ return self._inv_dict[id]
+
+ def id(self, name):
+ # Use this when fetching values which are supposed to be in the
+ # database already. Eg., in find_eventids.
+ return super(TableLookup, self).__getitem__(name)
+
+ def remove_id(self, id):
+ value = self.value(id)
+ del self._inv_dict[id]
+ del self[value]
+
+def get_right_boundary(text):
+ """ returns the smallest string which is greater than `text` """
+ if not text:
+ # if the search prefix is empty we query for the whole range
+ # of 'utf-8 'unicode chars
+ return unichr(0x10ffff)
+ if isinstance(text, str):
+ # we need to make sure the text is decoded as 'utf-8' unicode
+ text = unicode(text, "UTF-8")
+ charpoint = ord(text[-1])
+ if charpoint == 0x10ffff:
+ # if the last character is the biggest possible char we need to
+ # look at the second last
+ return get_right_boundary(text[:-1])
+ return text[:-1] + unichr(charpoint+1)
+
+class WhereClause:
+ """
+ This class provides a convenient representation a SQL `WHERE' clause,
+ composed of a set of conditions joined together.
+
+ The relation between conditions can be either of type *AND* or *OR*, but
+ not both. To create more complex clauses, use several :class:`WhereClause`
+ instances and joining them together using :meth:`extend`.
+
+ Instances of this class can then be used to obtain a line of SQL code and
+ a list of arguments, for use with the SQLite3 module, accessing the
+ appropriate properties:
+ >>> where.sql, where.arguments
+ """
+
+ AND = " AND "
+ OR = " OR "
+ NOT = "NOT "
+
+ @staticmethod
+ def optimize_glob(column, table, prefix):
+ """returns an optimized version of the GLOB statement as described
+ in http://www.sqlite.org/optoverview.html `4.0 The LIKE optimization`
+ """
+ if isinstance(prefix, str):
+ # we need to make sure the text is decoded as 'utf-8' unicode
+ prefix = unicode(prefix, "UTF-8")
+ if not prefix:
+ # empty prefix means 'select all', no way to optimize this
+ sql = "SELECT %s FROM %s" %(column, table)
+ return sql, ()
+ elif all([i == unichr(0x10ffff) for i in prefix]):
+ sql = "SELECT %s FROM %s WHERE value >= ?" %(column, table)
+ return sql, (prefix,)
+ else:
+ sql = "SELECT %s FROM %s WHERE (value >= ? AND value < ?)" %(column, table)
+ return sql, (prefix, get_right_boundary(prefix))
+
+ def __init__(self, relation, negation=False):
+ self._conditions = []
+ self.arguments = []
+ self._relation = relation
+ self._no_result_member = False
+ self._negation = negation
+
+ def __len__(self):
+ return len(self._conditions)
+
+ def add(self, condition, arguments=None):
+ if not condition:
+ return
+ self._conditions.append(condition)
+ if arguments is not None:
+ if not hasattr(arguments, "__iter__"):
+ self.arguments.append(arguments)
+ else:
+ self.arguments.extend(arguments)
+
+ def add_text_condition(self, column, value, like=False, negation=False, cache=None):
+ if like:
+ assert column in ("origin", "subj_uri", "subj_current_uri",
+ "subj_origin", "actor", "subj_mimetype"), \
+ "prefix search on the %r column is not supported by zeitgeist" % column
+ if column == "subj_uri":
+ # subj_id directly points to the id of an uri entry
+ view_column = "subj_id"
+ elif column == "subj_current_uri":
+ view_column = "subj_id_current"
+ else:
+ view_column = column
+ optimized_glob, value = self.optimize_glob("id", TABLE_MAP.get(column, column), value)
+ sql = "%s %sIN (%s)" %(view_column, self.NOT if negation else "", optimized_glob)
+ if negation:
+ sql += " OR %s IS NULL" % view_column
+ else:
+ if column == "origin":
+ column ="event_origin_uri"
+ elif column == "subj_origin":
+ column = "subj_origin_uri"
+ sql = "%s %s= ?" %(column, "!" if negation else "")
+ if cache is not None:
+ value = cache[value]
+ self.add(sql, value)
+
+ def extend(self, where):
+ self.add(where.sql, where.arguments)
+ if not where.may_have_results():
+ if self._relation == self.AND:
+ self.clear()
+ self.register_no_result()
+
+ @property
+ def sql(self):
+ if self: # Do not return "()" if there are no conditions
+ negation = self.NOT if self._negation else ""
+ return "%s(%s)" %(negation, self._relation.join(self._conditions))
+
+ def register_no_result(self):
+ self._no_result_member = True
+
+ def may_have_results(self):
+ """
+ Return False if we know from our cached data that the query
+ will give no results.
+ """
+ return len(self._conditions) > 0 or not self._no_result_member
+
+ def clear(self):
+ """
+ Reset this WhereClause to the state of a newly created one.
+ """
+ self._conditions = []
+ self.arguments = []
+ self._no_result_member = False
=== modified file 'extra/Makefile.am'
--- extra/Makefile.am 2011-10-10 11:01:12 +0000
+++ extra/Makefile.am 2011-10-10 14:07:42 +0000
@@ -1,14 +1,16 @@
SUBDIRS = ontology
-servicedir = $(datadir)/dbus-1/services
-nodist_service_DATA = org.gnome.zeitgeist.service
+servicedir = $(DBUS_SERVICE_DIR)
+service_DATA = org.gnome.zeitgeist.service
org.gnome.zeitgeist.service: org.gnome.zeitgeist.service.in
$(AM_V_GEN)sed -e s!\@prefix\@!$(prefix)! < $< > $@
org.gnome.zeitgeist.service: Makefile
-CLEANFILES = org.gnome.zeitgeist.service \
- PythonSerializer.pyc
+CLEANFILES = \
+ org.gnome.zeitgeist.service \
+ PythonSerializer.pyc \
+ $(NULL)
EXTRA_DIST = \
org.gnome.zeitgeist.service.in \
zeitgeist-daemon.bash_completion \
=== modified file 'extra/org.gnome.zeitgeist.service.in'
--- extra/org.gnome.zeitgeist.service.in 2011-08-02 13:50:15 +0000
+++ extra/org.gnome.zeitgeist.service.in 2011-10-10 14:07:42 +0000
@@ -1,3 +1,3 @@
[D-BUS Service]
Name=org.gnome.zeitgeist.Engine
-Exec=@prefix@/bin/zeitgeist-daemon
+Exec=@prefix@/bin/bluebird