← Back to team overview

maria-developers team mailing list archive

Re: Extendet FederatedX storage engine to support updating Sphinx RT indexes

 

Hallo Sergey,

sorry for the delay, but now sphinxql:// has an own parser and delete_row() is much simplified.

If i still has something missed, please feel free to contact me.


Best regards,


Markus Lidel

Am 16.06.2013 19:58, schrieb Markus Lidel:
Hello Sergey,

thank you very much for reviewing my patch!

Am 16.06.2013 15:48, schrieb Sergey Vojtovich:
Hi Markus,

thanks for your contribution. Connecting to Sphinx RT index via
federatedx
sounds like a great idea.

I reviewed attached patch and from what I can see, relevant difference
between dialects is as following:

1. Connection to RT index doesn't require database name and you made it
    optional in parse_url(). That's acceptable with sphinxql scheme, but
    with mysql scheme it's a bit ambiguous. What do you think if we keep
    database name mandatory and document that connection string to RT
index
    should look like
    sphinxql://root@localhost:9306//rt
    or
    sphinxql://root@localhost:9306/dummy/rt

Hmmm, i think both ways are a workaround. Probably the best way is to
implement a unique parse_url() function for sphinxql and mysql. This way
you don't have to specify a database, which isn't used for SphinxQL and
for MySQL you couldn't forget to set the database. The only reason why i
didn't implement it already was because i want to change as less as
possible. I'll look into it and send an updated patch.

2. delete_row(): looks nice, but could be greatly simplified because
sphinxql
    accepts only record identifier in WHERE clause.

Yes, that's true.

3. delete_all_rows(): FWICS there is "TRUNCATE RTINDEX" statement in
sphinx.
    Why can't we use it? Even better patch sphinx to make RTINDEX word
optional?

TRUNCATE RTINDEX was implemented first in Sphinx 2.1.1-beta. In the
current release the TRUNCATE RTINDEX is not available.

4. table_metadata(): I believe it should be easy and natural to patch
sphinx to
    support SHOW TABLE STATUS. What do you think?

Yes, that will be the best solution, but i didn't looked at the source
of Sphinx.

5. query(): Same here, sphinxql shouldn't add implicit limit.

SphinxQL by default adds a limit to 30 rows. So if you don't specify a
LIMIT you only get 30 rows back instead of all rows. If you for example
have 100 rows in your Sphinx index, and you want to delete all rows, you
could only delete 30 at once. So i thought the best way is to add a limit.

6. test_connection(): What's the problem with test query?

Sphinx simply doesn't understand the test query:

SELECT * FROM <table> WHERE 1=0

so i changed it into

SELECT * FROM <table> LIMIT 0

which should do the same, and Sphinx did understand.

Did I miss anything else?

If so, please feel free to contact me.


Best regards,


Markus Lidel



Thanks,
Sergey

29.05.2013, в 1:46, Markus Lidel <Markus.Lidel@xxxxxxxxxxxxxxxxx>
написал(а):

Hello,

the Sphinx Search engine has RT indexes, which could be updated with
a subset of SQL commands, refered to as SphinxQL:

http://sphinxsearch.com/docs/2.1.1/sphinxql-reference.html

Because the FederatedX storage engine uses some commands, which
Sphinx does not know, here is a patch, wich extends the FederatedX
storage engine to support this subset of commands. Now its possible
to update Sphinx RT indexes directly from MariaDB. To create a
connection to the RT index in the Sphinx default RT-index:

index rt
{
  type = rt

  path = @CONFDIR@/data/rt

  rt_field = title
  rt_field = content

  rt_attr_string = title
  rt_attr_string = content

  rt_attr_uint = gid
}

you just have to call:

CREATE TABLE `rt` (
  `id` BIGINT UNSIGNED NOT NULL,
  `title` TEXT,
  `content` TEXT,
  `gid` INT UNSIGNED
) ENGINE=FEDERATED CONNECTION='sphinxql://root@localhost:9306/rt';

It would be great, if the attached patch could be integrated into
MariaDB. If you have any suggestions, please feel free to contact me.

Best regards,


Markus Lidel


--
------------------------------------------
Markus Lidel (Senior IT Consultant)

Shadow Connect GmbH
Carl-Reisch-Weg 12
D-86381 Krumbach
Germany

Phone:  +49 82 82/99 51-0
Fax:    +49 82 82/99 51-11

E-Mail: Markus.Lidel@xxxxxxxxxxxxxxxxx
URL:    http://www.shadowconnect.com

Geschäftsführer/CEO: Markus Lidel
HRB 10357, Amtsgericht Memmingen
diff -u -r -N federatedx/CMakeLists.txt federatedx/CMakeLists.txt
--- federatedx/CMakeLists.txt	2013-08-15 16:36:08.000000000 +0200
+++ federatedx/CMakeLists.txt	2013-09-02 11:20:34.891886300 +0200
@@ -1,4 +1,4 @@
 SET(FEDERATEDX_PLUGIN_STATIC  "federatedx")
 SET(FEDERATEDX_PLUGIN_DYNAMIC "ha_federatedx")
-SET(FEDERATEDX_SOURCES  ha_federatedx.cc federatedx_txn.cc federatedx_io.cc federatedx_io_null.cc federatedx_io_mysql.cc)
+SET(FEDERATEDX_SOURCES  ha_federatedx.cc federatedx_txn.cc federatedx_io.cc federatedx_io_null.cc federatedx_io_mysql.cc federatedx_io_sphinxql.cc)
 MYSQL_ADD_PLUGIN(federatedx ${FEDERATEDX_SOURCES} STORAGE_ENGINE)
diff -u -r -N federatedx/federatedx_io.cc federatedx/federatedx_io.cc
--- federatedx/federatedx_io.cc	2013-08-15 16:36:08.000000000 +0200
+++ federatedx/federatedx_io.cc	2013-09-02 15:36:56.971504000 +0200
@@ -27,13 +27,20 @@
 */
 
 
-/*#define MYSQL_SERVER 1*/
+#define MYSQL_SERVER 1
 #include "sql_priv.h"
 #include <mysql/plugin.h>
 
 #include "ha_federatedx.h"
 
 #include "m_string.h"
+#include "sql_servers.h"
+#include "sql_analyse.h"                        // append_escaped()
+
+#include "federatedx_io.h"
+#include "federatedx_io_null.h"
+#include "federatedx_io_mysql.h"
+#include "federatedx_io_sphinxql.h"
 
 #ifdef USE_PRAGMA_IMPLEMENTATION
 #pragma implementation                          // gcc: Class implementation
@@ -44,18 +51,26 @@
 struct io_schemes_st
 {
   const char *scheme;
+  bool (*parse)(FEDERATEDX_SHARE *share, char *connection_string);
   instantiate_io_type instantiate;
 };
 
 
 static const io_schemes_st federated_io_schemes[] =
 {
-  { "mysql", &instantiate_io_mysql },
-  { "null", instantiate_io_null } /* must be last element */
+  { "mysql", &federatedx_io_mysql::parse, &federatedx_io_mysql::construct },
+  { "sphinxql", &federatedx_io_sphinxql::parse, &federatedx_io_sphinxql::construct },
+  { "null", &federatedx_io_null::parse, &federatedx_io_null::construct } /* must be last element */
 };
 
 const uint federated_io_schemes_count= array_elements(federated_io_schemes);
 
+/* Variables used when chopping off trailing characters */
+static const uint sizeof_trailing_comma= sizeof(", ") - 1;
+static const uint sizeof_trailing_closeparen= sizeof(") ") - 1;
+static const uint sizeof_trailing_and= sizeof(" AND ") - 1;
+static const uint sizeof_trailing_where= sizeof(" WHERE ") - 1;
+
 federatedx_io::federatedx_io(FEDERATEDX_SERVER *aserver)
   : server(aserver), owner_ptr(0), txn_next(0), idle_next(0),
     active(FALSE), busy(FALSE), readonly(TRUE)
@@ -69,7 +84,6 @@
   DBUG_VOID_RETURN;
 }
 
-
 federatedx_io::~federatedx_io()
 {
   DBUG_ENTER("federatedx_io::~federatedx_io");
@@ -79,17 +93,20 @@
   DBUG_VOID_RETURN;
 }
 
-
-bool federatedx_io::handles_scheme(const char *scheme)
+bool federatedx_io::handles_scheme(FEDERATEDX_SHARE *share, char *connection_string)
 {
   const io_schemes_st *ptr = federated_io_schemes;
   const io_schemes_st *end = ptr + array_elements(federated_io_schemes);
-  while (ptr != end && strcasecmp(scheme, ptr->scheme))
+  while (ptr != end)
+  {
+    if (!strcasecmp(share->scheme, ptr->scheme))
+      return ptr->parse(share, connection_string);
+
     ++ptr;
-  return ptr != end;
+  }
+  return false;
 }
 
-
 federatedx_io *federatedx_io::construct(MEM_ROOT *server_root,
                                         FEDERATEDX_SERVER *server)
 {
@@ -100,4 +117,131 @@
   return ptr->instantiate(server_root, server);
 }
 
+void federatedx_io::reset()
+{
+}
+
+int federatedx_io::commit()
+{
+  return 0;
+}
+
+int federatedx_io::rollback()
+{
+  return 0;
+}
+
+ulong federatedx_io::last_savepoint() const
+{
+  return 0;
+}
+
+ulong federatedx_io::actual_savepoint() const
+{
+  return 0;
+}
+
+bool federatedx_io::is_autocommit() const
+{
+  return TRUE;
+}
+
+int federatedx_io::savepoint_set(ulong sp)
+{
+  return 0;
+}
+
+ulong federatedx_io::savepoint_release(ulong sp)
+{
+  return 0;
+}
+
+ulong federatedx_io::savepoint_rollback(ulong sp)
+{
+  return 0;
+}
+
+void federatedx_io::savepoint_restrict(ulong sp)
+{
+}
+
+size_t federatedx_io::max_query_size() const
+{
+  return INT_MAX;
+}
 
+int federatedx_io::test_connection(MYSQL_THD thd, FEDERATEDX_SHARE *share)
+{
+  return 0;
+}
+
+int federatedx_io::delete_row(FEDERATEDX_SHARE *share, TABLE *table, const uchar *buf)
+{
+  char delete_buffer[FEDERATEDX_QUERY_BUFFER_SIZE];
+  char data_buffer[FEDERATEDX_QUERY_BUFFER_SIZE];
+  String delete_string(delete_buffer, sizeof(delete_buffer), &my_charset_bin);
+  String data_string(data_buffer, sizeof(data_buffer), &my_charset_bin);
+  uint found= 0;
+  DBUG_ENTER("federatedx_io::delete_row");
+
+  delete_string.length(0);
+  delete_string.append(STRING_WITH_LEN("DELETE FROM "));
+  append_ident(&delete_string, share->table_name,
+               share->table_name_length, ident_quote_char);
+  delete_string.append(STRING_WITH_LEN(" WHERE "));
+
+  for (Field **field= table->field; *field; field++)
+  {
+    Field *cur_field= *field;
+    found++;
+    if (bitmap_is_set(table->read_set, cur_field->field_index))
+    {
+      append_ident(&delete_string, (*field)->field_name,
+                   strlen((*field)->field_name), ident_quote_char);
+      data_string.length(0);
+      if (cur_field->is_null())
+      {
+        delete_string.append(STRING_WITH_LEN(" IS NULL "));
+      }
+      else
+      {
+        bool needs_quote= cur_field->str_needs_quotes();
+        delete_string.append(STRING_WITH_LEN(" = "));
+        cur_field->val_str(&data_string);
+        if (needs_quote)
+          delete_string.append(value_quote_char);
+        data_string.print(&delete_string);
+        if (needs_quote)
+          delete_string.append(value_quote_char);
+      }
+      delete_string.append(STRING_WITH_LEN(" AND "));
+    }
+  }
+
+  // Remove trailing AND
+  delete_string.length(delete_string.length() - sizeof_trailing_and);
+  if (!found)
+    delete_string.length(delete_string.length() - sizeof_trailing_where);
+
+  delete_string.append(STRING_WITH_LEN(" LIMIT 1"));
+  DBUG_PRINT("info",
+             ("Delete sql: %s", delete_string.c_ptr_quick()));
+
+  DBUG_RETURN(this->query(delete_string.ptr(), delete_string.length()));
+}
+
+int federatedx_io::delete_all_rows(FEDERATEDX_SHARE *share, TABLE *table)
+{
+  char query_buffer[FEDERATEDX_QUERY_BUFFER_SIZE];
+  String query(query_buffer, sizeof(query_buffer), &my_charset_bin);
+  DBUG_ENTER("ha_federatedx::delete_all_rows");
+
+  query.length(0);
+
+  query.set_charset(system_charset_info);
+  query.append(STRING_WITH_LEN("TRUNCATE "));
+  append_ident(&query, share->table_name, share->table_name_length,
+               ident_quote_char);
+
+  DBUG_RETURN(this->query(query.ptr(), query.length()));
+}
diff -u -r -N federatedx/federatedx_io.h federatedx/federatedx_io.h
--- federatedx/federatedx_io.h	1970-01-01 01:00:00.000000000 +0100
+++ federatedx/federatedx_io.h	2013-09-02 12:53:46.071055200 +0200
@@ -0,0 +1,140 @@
+/*
+Copyright (c) 2008, Patrick Galbraith
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+    * Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+
+    * Redistributions in binary form must reproduce the above
+copyright notice, this list of conditions and the following disclaimer
+in the documentation and/or other materials provided with the
+distribution.
+
+    * Neither the name of Patrick Galbraith nor the names of its
+contributors may be used to endorse or promote products derived from
+this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+*/
+
+#ifdef USE_PRAGMA_INTERFACE
+#pragma interface
+#endif
+
+class federatedx_io
+{
+  friend class federatedx_txn;
+  FEDERATEDX_SERVER * const server;
+  federatedx_io **owner_ptr;
+  federatedx_io *txn_next;
+  federatedx_io *idle_next;
+  bool active;  /* currently participating in a transaction */
+  bool busy;    /* in use by a ha_federated instance */
+  bool readonly;/* indicates that no updates have occurred */
+
+protected:
+  void set_active(bool new_active)
+  { active= new_active; }
+
+public:
+  static bool handles_scheme(FEDERATEDX_SHARE *share, char *connection_string);
+  static federatedx_io *construct(MEM_ROOT *server_root,
+                                  FEDERATEDX_SERVER *server);
+
+  static void *operator new(size_t size, MEM_ROOT *mem_root) throw ()
+  { return alloc_root(mem_root, size); }
+  static void operator delete(void *ptr, size_t size)
+  { TRASH(ptr, size); }
+
+  federatedx_io(FEDERATEDX_SERVER *);
+  virtual ~federatedx_io();
+
+  bool is_readonly() const { return readonly; }
+  bool is_active() const { return active; }
+
+  const char * get_charsetname() const
+  { return server->csname ? server->csname : "latin1"; }
+
+  const char * get_hostname() const { return server->hostname; }
+  const char * get_username() const { return server->username; }
+  const char * get_password() const { return server->password; }
+  const char * get_database() const { return server->database; }
+  ushort       get_port() const     { return server->port; }
+  const char * get_socket() const   { return server->socket; }
+
+  /* query and metadata functions */
+
+  virtual int query(const char *buffer, uint length)=0;
+
+  virtual my_ulonglong affected_rows() const=0;
+  virtual my_ulonglong last_insert_id() const=0;
+
+  virtual size_t max_query_size() const;
+
+  virtual bool table_metadata(ha_statistics *stats, const char *table_name,
+                              uint table_name_length, uint flag) = 0;
+
+  /* error handling functions */
+
+  virtual int error_code()=0;
+  virtual const char *error_str()=0;
+
+  /* transactional functions */
+
+  virtual bool is_autocommit() const;
+
+  virtual void reset();
+  virtual int commit();
+  virtual int rollback();
+
+  virtual int savepoint_set(ulong sp);
+  virtual ulong savepoint_release(ulong sp);
+  virtual ulong savepoint_rollback(ulong sp);
+  virtual void savepoint_restrict(ulong sp);
+
+  virtual ulong last_savepoint() const;
+  virtual ulong actual_savepoint() const;
+
+  /* resultset operations */
+
+  virtual FEDERATEDX_IO_RESULT *store_result()=0;
+  virtual void free_result(FEDERATEDX_IO_RESULT *io_result)=0;
+
+  virtual unsigned int get_num_fields(FEDERATEDX_IO_RESULT *io_result)=0;
+  virtual my_ulonglong get_num_rows(FEDERATEDX_IO_RESULT *io_result)=0;
+
+  virtual FEDERATEDX_IO_ROW *fetch_row(FEDERATEDX_IO_RESULT *io_result)=0;
+  virtual ulong *fetch_lengths(FEDERATEDX_IO_RESULT *io_result)=0;
+
+  virtual const char *get_column_data(FEDERATEDX_IO_ROW *row,
+                                      unsigned int column)=0;
+  virtual bool is_column_null(const FEDERATEDX_IO_ROW *row,
+                              unsigned int column) const=0;
+
+  virtual size_t get_ref_length() const=0;
+
+  virtual void mark_position(FEDERATEDX_IO_RESULT *io_result,
+                             void *ref)=0;
+  virtual int seek_position(FEDERATEDX_IO_RESULT **io_result,
+                            const void *ref)=0;
+
+  /* extended functions */
+  virtual int test_connection(MYSQL_THD thd, FEDERATEDX_SHARE *share);
+
+  virtual int delete_row(FEDERATEDX_SHARE *share, TABLE *table, const uchar *buf);
+  virtual int delete_all_rows(FEDERATEDX_SHARE *share, TABLE *table);
+};
diff -u -r -N federatedx/federatedx_io_mysql.cc federatedx/federatedx_io_mysql.cc
--- federatedx/federatedx_io_mysql.cc	2013-08-15 16:36:01.000000000 +0200
+++ federatedx/federatedx_io_mysql.cc	2013-09-02 12:50:37.059781000 +0200
@@ -35,6 +35,10 @@
 
 #include "m_string.h"
 #include "sql_servers.h"
+#include "sql_show.h"                           // append_identifier()
+
+#include "federatedx_io.h"
+#include "federatedx_io_mysql.h"
 
 #ifdef USE_PRAGMA_IMPLEMENTATION
 #pragma implementation                          // gcc: Class implementation
@@ -59,71 +63,78 @@
 };
 
 
-class federatedx_io_mysql :public federatedx_io
+federatedx_io *federatedx_io_mysql::construct(MEM_ROOT *server_root,
+                                              FEDERATEDX_SERVER *server)
 {
-  MYSQL mysql; /* MySQL connection */
-  DYNAMIC_ARRAY savepoints;
-  bool requested_autocommit;
-  bool actual_autocommit;
-
-  int actual_query(const char *buffer, uint length);
-  bool test_all_restrict() const;
-public:
-  federatedx_io_mysql(FEDERATEDX_SERVER *);
-  ~federatedx_io_mysql();
-
-  int simple_query(const char *fmt, ...);
-  int query(const char *buffer, uint length);
-  virtual FEDERATEDX_IO_RESULT *store_result();
-
-  virtual size_t max_query_size() const;
-
-  virtual my_ulonglong affected_rows() const;
-  virtual my_ulonglong last_insert_id() const;
-
-  virtual int error_code();
-  virtual const char *error_str();
-
-  void reset();
-  int commit();
-  int rollback();
-
-  int savepoint_set(ulong sp);
-  ulong savepoint_release(ulong sp);
-  ulong savepoint_rollback(ulong sp);
-  void savepoint_restrict(ulong sp);
-
-  ulong last_savepoint() const;
-  ulong actual_savepoint() const;
-  bool is_autocommit() const;
-
-  bool table_metadata(ha_statistics *stats, const char *table_name,
-                      uint table_name_length, uint flag);
-
-  /* resultset operations */
-
-  virtual void free_result(FEDERATEDX_IO_RESULT *io_result);
-  virtual unsigned int get_num_fields(FEDERATEDX_IO_RESULT *io_result);
-  virtual my_ulonglong get_num_rows(FEDERATEDX_IO_RESULT *io_result);
-  virtual FEDERATEDX_IO_ROW *fetch_row(FEDERATEDX_IO_RESULT *io_result);
-  virtual ulong *fetch_lengths(FEDERATEDX_IO_RESULT *io_result);
-  virtual const char *get_column_data(FEDERATEDX_IO_ROW *row,
-                                      unsigned int column);
-  virtual bool is_column_null(const FEDERATEDX_IO_ROW *row,
-                              unsigned int column) const;
-
-  virtual size_t get_ref_length() const;
-  virtual void mark_position(FEDERATEDX_IO_RESULT *io_result,
-                             void *ref);
-  virtual int seek_position(FEDERATEDX_IO_RESULT **io_result,
-                            const void *ref);
-};
+  return new (server_root) federatedx_io_mysql(server);
+}
 
 
-federatedx_io *instantiate_io_mysql(MEM_ROOT *server_root,
-                                    FEDERATEDX_SERVER *server)
+bool federatedx_io_mysql::parse(FEDERATEDX_SHARE *share, char *connection_string)
 {
-  return new (server_root) federatedx_io_mysql(server);
+  share->username = connection_string;
+  if (!(share->hostname= strchr(share->username, '@')))
+    return false;
+  *share->hostname++= '\0';                   // End username
+
+  if ((share->password= strchr(share->username, ':')))
+  {
+    *share->password++= '\0';                 // End username
+
+    /* make sure there isn't an extra / or @ */
+    if ((strchr(share->password, '/') || strchr(share->hostname, '@')))
+      return false;
+    /*
+      Found that if the string is:
+      user:@hostname:port/db/table
+      Then password is a null string, so set to NULL
+    */
+    if (share->password[0] == '\0')
+      share->password= NULL;
+  }
+
+  /* make sure there isn't an extra / or @ */
+  if ((strchr(share->username, '/')) || (strchr(share->hostname, '@')))
+    return false;
+
+  if (!(share->database= strchr(share->hostname, '/')))
+    return false;
+  *share->database++= '\0';
+
+  if ((share->sport= strchr(share->hostname, ':')))
+  {
+    *share->sport++= '\0';
+    if (share->sport[0] == '\0')
+      share->sport= NULL;
+    else
+      share->port= atoi(share->sport);
+  }
+
+  if (!(share->table_name= strchr(share->database, '/')))
+  {
+    share->table_name = share->database;
+    --share->database;
+  } else {
+    *share->table_name++= '\0';
+  }
+
+  share->table_name_length= strlen(share->table_name);
+
+  /* make sure there's not an extra / */
+  if ((strchr(share->table_name, '/')))
+    return false;
+
+  if (share->hostname[0] == '\0')
+    share->hostname= NULL;
+
+  if (!share->port)
+  {
+    if (!share->hostname || strcmp(share->hostname, my_localhost) == 0)
+      share->socket= (char *) MYSQL_UNIX_ADDR;
+    else
+      share->port= MYSQL_PORT;
+  }
+  return true;
 }
 
 
@@ -653,3 +664,30 @@
   return 0;
 }
 
+int federatedx_io_mysql::test_connection(MYSQL_THD thd, FEDERATEDX_SHARE *share)
+{
+  char buffer[FEDERATEDX_QUERY_BUFFER_SIZE];
+  String str(buffer, sizeof(buffer), &my_charset_bin);
+  FEDERATEDX_IO_RESULT *resultset= NULL;
+  int retval;
+
+  str.length(0);
+  str.append(STRING_WITH_LEN("SELECT * FROM "));
+  append_identifier(thd, &str, share->table_name,
+                    share->table_name_length);
+  str.append(STRING_WITH_LEN(" WHERE 1=0"));
+
+  if ((retval= this->query(str.ptr(), str.length())))
+  {
+    sprintf(buffer, "database: '%s'  username: '%s'  hostname: '%s'",
+            share->database, share->username, share->hostname);
+    DBUG_PRINT("info", ("error-code: %d", this->error_code()));
+    my_error(ER_CANT_CREATE_FEDERATED_TABLE, MYF(0), buffer);
+  }
+  else
+    resultset= this->store_result();
+
+  this->free_result(resultset);
+
+  return retval;
+}
diff -u -r -N federatedx/federatedx_io_mysql.h federatedx/federatedx_io_mysql.h
--- federatedx/federatedx_io_mysql.h	1970-01-01 01:00:00.000000000 +0100
+++ federatedx/federatedx_io_mysql.h	2013-09-02 12:32:00.729044800 +0200
@@ -0,0 +1,100 @@
+/*
+Copyright (c) 2007, Antony T Curtis
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+    * Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+
+    * Neither the name of FederatedX nor the names of its
+contributors may be used to endorse or promote products derived from
+this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+*/
+
+#ifdef USE_PRAGMA_INTERFACE
+#pragma interface
+#endif
+
+class federatedx_io_mysql :public federatedx_io
+{
+protected:
+  MYSQL mysql; /* MySQL connection */
+  DYNAMIC_ARRAY savepoints;
+  bool requested_autocommit;
+  bool actual_autocommit;
+
+  bool test_all_restrict() const;
+
+  int actual_query(const char *buffer, uint length);
+
+public:
+  static federatedx_io *construct(MEM_ROOT *server_root,
+                                  FEDERATEDX_SERVER *server);
+  static bool parse(FEDERATEDX_SHARE *share, char *connection_string);
+
+  federatedx_io_mysql(FEDERATEDX_SERVER *);
+  ~federatedx_io_mysql();
+
+  int simple_query(const char *fmt, ...);
+  virtual int query(const char *buffer, uint length);
+  FEDERATEDX_IO_RESULT *store_result();
+
+  size_t max_query_size() const;
+
+  my_ulonglong affected_rows() const;
+  my_ulonglong last_insert_id() const;
+
+  int error_code();
+  const char *error_str();
+
+  void reset();
+  int commit();
+  int rollback();
+
+  int savepoint_set(ulong sp);
+  ulong savepoint_release(ulong sp);
+  ulong savepoint_rollback(ulong sp);
+  void savepoint_restrict(ulong sp);
+
+  ulong last_savepoint() const;
+  ulong actual_savepoint() const;
+  bool is_autocommit() const;
+
+  bool table_metadata(ha_statistics *stats, const char *table_name,
+                      uint table_name_length, uint flag);
+
+  int test_connection(MYSQL_THD thd, FEDERATEDX_SHARE *share);
+
+  /* resultset operations */
+
+  void free_result(FEDERATEDX_IO_RESULT *io_result);
+  unsigned int get_num_fields(FEDERATEDX_IO_RESULT *io_result);
+  my_ulonglong get_num_rows(FEDERATEDX_IO_RESULT *io_result);
+  FEDERATEDX_IO_ROW *fetch_row(FEDERATEDX_IO_RESULT *io_result);
+  ulong *fetch_lengths(FEDERATEDX_IO_RESULT *io_result);
+  const char *get_column_data(FEDERATEDX_IO_ROW *row,
+                              unsigned int column);
+  bool is_column_null(const FEDERATEDX_IO_ROW *row,
+                      unsigned int column) const;
+
+  size_t get_ref_length() const;
+  void mark_position(FEDERATEDX_IO_RESULT *io_result,
+                     void *ref);
+  int seek_position(FEDERATEDX_IO_RESULT **io_result,
+                    const void *ref);
+};
diff -u -r -N federatedx/federatedx_io_null.cc federatedx/federatedx_io_null.cc
--- federatedx/federatedx_io_null.cc	2013-08-15 16:36:08.000000000 +0200
+++ federatedx/federatedx_io_null.cc	2013-09-02 12:51:48.534614300 +0200
@@ -35,79 +35,24 @@
 
 #include "m_string.h"
 
+#include "federatedx_io.h"
+#include "federatedx_io_null.h"
+
 #ifdef USE_PRAGMA_IMPLEMENTATION
 #pragma implementation                          // gcc: Class implementation
 #endif
 
 
-#define SAVEPOINT_REALIZED  1
-#define SAVEPOINT_RESTRICT  2
-#define SAVEPOINT_EMITTED 4
-
-
-typedef struct federatedx_savepoint
+federatedx_io *federatedx_io_null::construct(MEM_ROOT *server_root,
+                                             FEDERATEDX_SERVER *server)
 {
-  ulong level;
-  uint  flags;
-} SAVEPT;
-
-
-class federatedx_io_null :public federatedx_io
-{
-public:
-  federatedx_io_null(FEDERATEDX_SERVER *);
-  ~federatedx_io_null();
-
-  int query(const char *buffer, uint length);
-  virtual FEDERATEDX_IO_RESULT *store_result();
-
-  virtual size_t max_query_size() const;
-
-  virtual my_ulonglong affected_rows() const;
-  virtual my_ulonglong last_insert_id() const;
-
-  virtual int error_code();
-  virtual const char *error_str();
-  
-  void reset();
-  int commit();
-  int rollback();
-  
-  int savepoint_set(ulong sp);
-  ulong savepoint_release(ulong sp);
-  ulong savepoint_rollback(ulong sp);
-  void savepoint_restrict(ulong sp);
-  
-  ulong last_savepoint() const;
-  ulong actual_savepoint() const;
-  bool is_autocommit() const;
-
-  bool table_metadata(ha_statistics *stats, const char *table_name,
-                      uint table_name_length, uint flag);
-  
-  /* resultset operations */
-  
-  virtual void free_result(FEDERATEDX_IO_RESULT *io_result);
-  virtual unsigned int get_num_fields(FEDERATEDX_IO_RESULT *io_result);
-  virtual my_ulonglong get_num_rows(FEDERATEDX_IO_RESULT *io_result);
-  virtual FEDERATEDX_IO_ROW *fetch_row(FEDERATEDX_IO_RESULT *io_result);
-  virtual ulong *fetch_lengths(FEDERATEDX_IO_RESULT *io_result);
-  virtual const char *get_column_data(FEDERATEDX_IO_ROW *row,
-                                      unsigned int column);
-  virtual bool is_column_null(const FEDERATEDX_IO_ROW *row,
-                              unsigned int column) const;
-  virtual size_t get_ref_length() const;
-  virtual void mark_position(FEDERATEDX_IO_RESULT *io_result,
-                             void *ref);
-  virtual int seek_position(FEDERATEDX_IO_RESULT **io_result,
-                            const void *ref);
-};
+  return new (server_root) federatedx_io_null(server);
+}
 
 
-federatedx_io *instantiate_io_null(MEM_ROOT *server_root,
-                                   FEDERATEDX_SERVER *server)
+bool federatedx_io_null::parse(FEDERATEDX_SHARE *share, char *connection_string)
 {
-  return new (server_root) federatedx_io_null(server);
+  return true;
 }
 
 
@@ -122,74 +67,12 @@
 }
 
 
-void federatedx_io_null::reset()
-{
-}
-
-
-int federatedx_io_null::commit()
-{
-  return 0;
-}
-
-int federatedx_io_null::rollback()
-{
-  return 0;
-}
-
-
-ulong federatedx_io_null::last_savepoint() const
-{
-  return 0;
-}
-
-
-ulong federatedx_io_null::actual_savepoint() const
-{
-  return 0;
-}
-
-bool federatedx_io_null::is_autocommit() const
-{
-  return 0;
-}
-
-
-int federatedx_io_null::savepoint_set(ulong sp)
-{
-  return 0;
-}
-
-
-ulong federatedx_io_null::savepoint_release(ulong sp)
-{
-  return 0;
-}
-
-
-ulong federatedx_io_null::savepoint_rollback(ulong sp)
-{
-  return 0;
-}
-
-
-void federatedx_io_null::savepoint_restrict(ulong sp)
-{
-}
-
-
 int federatedx_io_null::query(const char *buffer, uint length)
 {
   return 0;
 }
 
 
-size_t federatedx_io_null::max_query_size() const
-{
-  return INT_MAX;
-}
-
-
 my_ulonglong federatedx_io_null::affected_rows() const
 {
   return 0;
diff -u -r -N federatedx/federatedx_io_null.h federatedx/federatedx_io_null.h
--- federatedx/federatedx_io_null.h	1970-01-01 01:00:00.000000000 +0100
+++ federatedx/federatedx_io_null.h	2013-09-02 12:32:08.049369400 +0200
@@ -0,0 +1,71 @@
+/*
+Copyright (c) 2007, Antony T Curtis
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+    * Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+
+    * Neither the name of FederatedX nor the names of its
+contributors may be used to endorse or promote products derived from
+this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+*/
+
+#ifdef USE_PRAGMA_INTERFACE
+#pragma interface
+#endif
+
+class federatedx_io_null :public federatedx_io
+{
+public:
+  static federatedx_io *construct(MEM_ROOT *server_root,
+                                  FEDERATEDX_SERVER *server);
+  static bool parse(FEDERATEDX_SHARE *share, char *connection_string);
+
+  federatedx_io_null(FEDERATEDX_SERVER *);
+  ~federatedx_io_null();
+
+  int query(const char *buffer, uint length);
+  FEDERATEDX_IO_RESULT *store_result();
+
+  my_ulonglong affected_rows() const;
+  my_ulonglong last_insert_id() const;
+
+  int error_code();
+  const char *error_str();
+
+  bool table_metadata(ha_statistics *stats, const char *table_name,
+                      uint table_name_length, uint flag);
+
+  /* resultset operations */
+
+  void free_result(FEDERATEDX_IO_RESULT *io_result);
+  unsigned int get_num_fields(FEDERATEDX_IO_RESULT *io_result);
+  my_ulonglong get_num_rows(FEDERATEDX_IO_RESULT *io_result);
+  FEDERATEDX_IO_ROW *fetch_row(FEDERATEDX_IO_RESULT *io_result);
+  ulong *fetch_lengths(FEDERATEDX_IO_RESULT *io_result);
+  const char *get_column_data(FEDERATEDX_IO_ROW *row,
+                              unsigned int column);
+  bool is_column_null(const FEDERATEDX_IO_ROW *row,
+                      unsigned int column) const;
+  size_t get_ref_length() const;
+  void mark_position(FEDERATEDX_IO_RESULT *io_result,
+                     void *ref);
+  int seek_position(FEDERATEDX_IO_RESULT **io_result,
+                    const void *ref);
+};
diff -u -r -N federatedx/federatedx_io_sphinxql.cc federatedx/federatedx_io_sphinxql.cc
--- federatedx/federatedx_io_sphinxql.cc	1970-01-01 01:00:00.000000000 +0100
+++ federatedx/federatedx_io_sphinxql.cc	2013-09-02 17:39:53.608359600 +0200
@@ -0,0 +1,241 @@
+/*
+Copyright (c) 2007, Antony T Curtis
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+    * Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+
+    * Neither the name of FederatedX nor the names of its
+contributors may be used to endorse or promote products derived from
+this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+*/
+
+
+#define MYSQL_SERVER 1
+#include "sql_priv.h"
+#include <mysql/plugin.h>
+
+#include "ha_federatedx.h"
+
+#include "m_string.h"
+#include "sql_servers.h"
+#include "sql_show.h"                           // append_identifier()
+
+#include "federatedx_io.h"
+#include "federatedx_io_mysql.h"
+#include "federatedx_io_sphinxql.h"
+
+#ifdef USE_PRAGMA_IMPLEMENTATION
+#pragma implementation                          // gcc: Class implementation
+#endif
+
+#define SPHINX_MAX_ROWS 1000000
+#define SPHINXQL_PORT 9306
+
+federatedx_io *federatedx_io_sphinxql::construct(MEM_ROOT *server_root,
+                                                 FEDERATEDX_SERVER *server)
+{
+  return new (server_root) federatedx_io_sphinxql(server);
+}
+
+bool federatedx_io_sphinxql::parse(FEDERATEDX_SHARE *share, char *connection_string)
+{
+  share->username = connection_string;
+  if (!(share->hostname= strchr(share->username, '@')))
+    return false;
+  share->database= share->hostname;
+  *share->hostname++= '\0';                   // End username
+
+  if ((share->password= strchr(share->username, ':')))
+  {
+    *share->password++= '\0';                 // End username
+
+    /* make sure there isn't an extra / or @ */
+    if ((strchr(share->password, '/') || strchr(share->hostname, '@')))
+      return false;
+    /*
+      Found that if the string is:
+      user:@hostname:port/db/table
+      Then password is a null string, so set to NULL
+    */
+    if (share->password[0] == '\0')
+      share->password= NULL;
+  }
+
+  /* make sure there isn't an extra / or @ */
+  if ((strchr(share->username, '/')) || (strchr(share->hostname, '@')))
+    return false;
+
+  if (!(share->table_name= strchr(share->hostname, '/')))
+    return false;
+  *share->table_name++= '\0';
+  share->table_name_length= strlen(share->table_name);
+
+  if ((share->sport= strchr(share->hostname, ':')))
+  {
+    *share->sport++= '\0';
+    if (share->sport[0] == '\0')
+      share->sport= NULL;
+    else
+      share->port= atoi(share->sport);
+  }
+
+  /* make sure there's not an extra / */
+  if ((strchr(share->table_name, '/')))
+    return false;
+
+  if (share->hostname[0] == '\0')
+    share->hostname= NULL;
+
+  if (!share->port)
+    share->port= SPHINXQL_PORT;
+
+  return true;
+}
+
+federatedx_io_sphinxql::federatedx_io_sphinxql(FEDERATEDX_SERVER *aserver)
+  : federatedx_io_mysql(aserver)
+{
+  DBUG_ENTER("federatedx_io_sphinxql::federatedx_io_sphinxql");
+
+  DBUG_VOID_RETURN;
+}
+
+federatedx_io_sphinxql::~federatedx_io_sphinxql()
+{
+  DBUG_ENTER("federatedx_io_sphinxql::~federatedx_io_sphinxql");
+
+  DBUG_VOID_RETURN;
+}
+
+int federatedx_io_sphinxql::query(const char *buffer, uint length)
+{
+  char query_buffer[FEDERATEDX_QUERY_BUFFER_SIZE];
+  String query(query_buffer, sizeof(query_buffer), &my_charset_bin);
+  int error;
+  bool wants_autocommit= requested_autocommit | is_readonly();
+  DBUG_ENTER("federatedx_io_sphinxql::query");
+
+  if (!wants_autocommit && test_all_restrict())
+    wants_autocommit= TRUE;
+
+  if (wants_autocommit != actual_autocommit)
+  {
+    if ((error= actual_query(wants_autocommit ? "SET AUTOCOMMIT=1"
+                                            : "SET AUTOCOMMIT=0", 16)))
+    DBUG_RETURN(error);
+    mysql.reconnect= wants_autocommit ? 1 : 0;
+    actual_autocommit= wants_autocommit;
+  }
+
+  query.copy(buffer, length, &my_charset_bin);
+
+  if (!strncasecmp(query.c_ptr(), STRING_WITH_LEN("SELECT ")) &&
+      !strstr(query.c_ptr(), " LIMIT "))
+  {
+    query.append(" LIMIT ");
+    query.append_ulonglong(SPHINX_MAX_ROWS);
+  }
+
+  if (!(error= actual_query(query.ptr(), query.length())))
+    set_active(is_active() || !actual_autocommit);
+
+  DBUG_RETURN(error);
+}
+
+int federatedx_io_sphinxql::test_connection(MYSQL_THD thd, FEDERATEDX_SHARE *share)
+{
+  char buffer[FEDERATEDX_QUERY_BUFFER_SIZE];
+  String str(buffer, sizeof(buffer), &my_charset_bin);
+  FEDERATEDX_IO_RESULT *resultset= NULL;
+  int retval;
+
+  str.length(0);
+  str.append(STRING_WITH_LEN("SELECT * FROM "));
+  str.append(share->table_name, share->table_name_length);
+  str.append(STRING_WITH_LEN(" LIMIT 0"));
+
+  if ((retval= this->query(str.ptr(), str.length())))
+  {
+    sprintf(buffer, "database: '%s'  username: '%s'  hostname: '%s'",
+            share->database, share->username, share->hostname);
+    DBUG_PRINT("info", ("error-code: %d", this->error_code()));
+    my_error(ER_CANT_CREATE_FEDERATED_TABLE, MYF(0), buffer);
+  }
+  else
+    resultset= this->store_result();
+
+  this->free_result(resultset);
+
+  return retval;
+}
+
+bool federatedx_io_sphinxql::table_metadata(ha_statistics *stats,
+                                         const char *table_name,
+                                         uint table_name_length, uint flag)
+{
+  return 0;
+}
+
+int federatedx_io_sphinxql::delete_row(FEDERATEDX_SHARE *share, TABLE *table, const uchar *buf)
+{
+  char delete_buffer[FEDERATEDX_QUERY_BUFFER_SIZE];
+  char data_buffer[FEDERATEDX_QUERY_BUFFER_SIZE];
+  String delete_string(delete_buffer, sizeof(delete_buffer), &my_charset_bin);
+  String data_string(data_buffer, sizeof(data_buffer), &my_charset_bin);
+  DBUG_ENTER("federatedx_io_sphinxql::delete_row");
+
+  Field *field= *table->field;
+  if (field && bitmap_is_set(table->read_set, field->field_index))
+  {
+    delete_string.length(0);
+    delete_string.append(STRING_WITH_LEN("DELETE FROM "));
+
+    append_ident(&delete_string, share->table_name,
+                 share->table_name_length, ident_quote_char);
+
+    delete_string.append(STRING_WITH_LEN(" WHERE "));
+
+    append_ident(&delete_string, field->field_name,
+                 strlen(field->field_name), ident_quote_char);
+
+    delete_string.append(STRING_WITH_LEN(" = "));
+
+    delete_string.append(*field->val_str(&data_string));
+  } else {
+    DBUG_RETURN(0);
+  }
+
+  DBUG_PRINT("info",
+             ("Delete sql: %s", delete_string.c_ptr_quick()));
+
+  if (this->query(delete_string.ptr(), delete_string.length()))
+  {
+    DBUG_RETURN(1);
+  }
+
+  DBUG_RETURN(0);
+}
+
+int federatedx_io_sphinxql::delete_all_rows(FEDERATEDX_SHARE *share, TABLE *table)
+{
+  DBUG_ENTER("federatedx_io_sphinxql::delete_all_rows");
+
+  DBUG_RETURN(HA_ERR_WRONG_COMMAND);
+}
diff -u -r -N federatedx/federatedx_io_sphinxql.h federatedx/federatedx_io_sphinxql.h
--- federatedx/federatedx_io_sphinxql.h	1970-01-01 01:00:00.000000000 +0100
+++ federatedx/federatedx_io_sphinxql.h	2013-09-02 12:32:15.784936300 +0200
@@ -0,0 +1,51 @@
+/*
+Copyright (c) 2007, Antony T Curtis
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+    * Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+
+    * Neither the name of FederatedX nor the names of its
+contributors may be used to endorse or promote products derived from
+this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+*/
+
+#ifdef USE_PRAGMA_INTERFACE
+#pragma interface
+#endif
+
+class federatedx_io_sphinxql :public federatedx_io_mysql
+{
+public:
+  static federatedx_io *construct(MEM_ROOT *server_root,
+                                  FEDERATEDX_SERVER *server);
+  static bool parse(FEDERATEDX_SHARE *share, char *connection_string);
+
+  federatedx_io_sphinxql(FEDERATEDX_SERVER *);
+  ~federatedx_io_sphinxql();
+
+  int query(const char *buffer, uint length);
+
+  bool table_metadata(ha_statistics *stats, const char *table_name,
+                      uint table_name_length, uint flag);
+
+  int test_connection(MYSQL_THD thd, FEDERATEDX_SHARE *share);
+  int delete_row(FEDERATEDX_SHARE *share, TABLE *table, const uchar *buf);
+  int delete_all_rows(FEDERATEDX_SHARE *share, TABLE *table);
+};
diff -u -r -N federatedx/federatedx_txn.cc federatedx/federatedx_txn.cc
--- federatedx/federatedx_txn.cc	2013-08-15 16:36:08.000000000 +0200
+++ federatedx/federatedx_txn.cc	2013-09-02 11:20:34.935457300 +0200
@@ -40,6 +40,8 @@
 #include "table.h"
 #include "sql_servers.h"
 
+#include "federatedx_io.h"
+
 federatedx_txn::federatedx_txn()
   : txn_list(0), savepoint_level(0), savepoint_stmt(0), savepoint_next(0)
 {
@@ -232,12 +234,11 @@
       int rc= 0;
 
       if (io->active)
-	rc= io->commit();
-      else
-	io->rollback();
-
-      if (io->active && rc)
-	error= -1;
+      {
+        if (rc= io->commit())
+          error= -1;
+      } else
+        io->rollback();
 
       io->reset();
     }
diff -u -r -N federatedx/ha_federatedx.cc federatedx/ha_federatedx.cc
--- federatedx/ha_federatedx.cc	2013-08-15 16:36:01.000000000 +0200
+++ federatedx/ha_federatedx.cc	2013-09-02 12:18:13.236364600 +0200
@@ -316,7 +316,8 @@
 #include "ha_federatedx.h"
 #include "sql_servers.h"
 #include "sql_analyse.h"                        // append_escaped()
-#include "sql_show.h"                           // append_identifier()
+
+#include "federatedx_io.h"
 
 #ifdef I_AM_PARANOID
 #define MIN_PORT 1023
@@ -614,6 +615,13 @@
     CONNECTION="scheme://username@hostname:port/database/table"
     CONNECTION="scheme://username:password@hostname/database/table"
 
+    _OR_
+
+    CONNECTION="scheme://username:password@hostname:port/table"
+    CONNECTION="scheme://username@hostname/table"
+    CONNECTION="scheme://username@hostname:port/table"
+    CONNECTION="scheme://username:password@hostname/table"
+
     _OR_
 
     CONNECTION="connection name"
@@ -649,6 +657,7 @@
 static int parse_url(MEM_ROOT *mem_root, FEDERATEDX_SHARE *share,
                      TABLE_SHARE *table_s, uint table_create_flag)
 {
+  char *tmp;
   uint error_num= (table_create_flag ?
                    ER_FOREIGN_DATA_STRING_INVALID_CANT_CREATE :
                    ER_FOREIGN_DATA_STRING_INVALID);
@@ -739,72 +748,12 @@
       Remove addition of null terminator and store length
       for each string  in share
     */
-    if (!(share->username= strstr(share->scheme, "://")))
-      goto error;
-    share->scheme[share->username - share->scheme]= '\0';
-
-    if (!federatedx_io::handles_scheme(share->scheme))
+    if (!(tmp= strstr(share->scheme, "://")))
       goto error;
+    share->scheme[tmp - share->scheme]= '\0';
 
-    share->username+= 3;
-
-    if (!(share->hostname= strchr(share->username, '@')))
+    if (!federatedx_io::handles_scheme(share, tmp + 3))
       goto error;
-    *share->hostname++= '\0';                   // End username
-
-    if ((share->password= strchr(share->username, ':')))
-    {
-      *share->password++= '\0';                 // End username
-
-      /* make sure there isn't an extra / or @ */
-      if ((strchr(share->password, '/') || strchr(share->hostname, '@')))
-        goto error;
-      /*
-        Found that if the string is:
-        user:@hostname:port/db/table
-        Then password is a null string, so set to NULL
-      */
-      if (share->password[0] == '\0')
-        share->password= NULL;
-    }
-
-    /* make sure there isn't an extra / or @ */
-    if ((strchr(share->username, '/')) || (strchr(share->hostname, '@')))
-      goto error;
-
-    if (!(share->database= strchr(share->hostname, '/')))
-      goto error;
-    *share->database++= '\0';
-
-    if ((share->sport= strchr(share->hostname, ':')))
-    {
-      *share->sport++= '\0';
-      if (share->sport[0] == '\0')
-        share->sport= NULL;
-      else
-        share->port= atoi(share->sport);
-    }
-
-    if (!(share->table_name= strchr(share->database, '/')))
-      goto error;
-    *share->table_name++= '\0';
-
-    share->table_name_length= strlen(share->table_name);
-
-    /* make sure there's not an extra / */
-    if ((strchr(share->table_name, '/')))
-      goto error;
-
-    if (share->hostname[0] == '\0')
-      share->hostname= NULL;
-
-  }
-  if (!share->port)
-  {
-    if (!share->hostname || strcmp(share->hostname, my_localhost) == 0)
-      share->socket= (char *) MYSQL_UNIX_ADDR;
-    else
-      share->port= MYSQL_PORT;
   }
 
   DBUG_PRINT("info",
@@ -2431,61 +2380,13 @@
 
 int ha_federatedx::delete_row(const uchar *buf)
 {
-  char delete_buffer[FEDERATEDX_QUERY_BUFFER_SIZE];
-  char data_buffer[FEDERATEDX_QUERY_BUFFER_SIZE];
-  String delete_string(delete_buffer, sizeof(delete_buffer), &my_charset_bin);
-  String data_string(data_buffer, sizeof(data_buffer), &my_charset_bin);
-  uint found= 0;
   int error;
   DBUG_ENTER("ha_federatedx::delete_row");
 
-  delete_string.length(0);
-  delete_string.append(STRING_WITH_LEN("DELETE FROM "));
-  append_ident(&delete_string, share->table_name,
-               share->table_name_length, ident_quote_char);
-  delete_string.append(STRING_WITH_LEN(" WHERE "));
-
-  for (Field **field= table->field; *field; field++)
-  {
-    Field *cur_field= *field;
-    found++;
-    if (bitmap_is_set(table->read_set, cur_field->field_index))
-    {
-      append_ident(&delete_string, (*field)->field_name,
-                   strlen((*field)->field_name), ident_quote_char);
-      data_string.length(0);
-      if (cur_field->is_null())
-      {
-        delete_string.append(STRING_WITH_LEN(" IS NULL "));
-      }
-      else
-      {
-        bool needs_quote= cur_field->str_needs_quotes();
-        delete_string.append(STRING_WITH_LEN(" = "));
-        cur_field->val_str(&data_string);
-        if (needs_quote)
-          delete_string.append(value_quote_char);
-        data_string.print(&delete_string);
-        if (needs_quote)
-          delete_string.append(value_quote_char);
-      }
-      delete_string.append(STRING_WITH_LEN(" AND "));
-    }
-  }
-
-  // Remove trailing AND
-  delete_string.length(delete_string.length() - sizeof_trailing_and);
-  if (!found)
-    delete_string.length(delete_string.length() - sizeof_trailing_where);
-
-  delete_string.append(STRING_WITH_LEN(" LIMIT 1"));
-  DBUG_PRINT("info",
-             ("Delete sql: %s", delete_string.c_ptr_quick()));
-
   if ((error= txn->acquire(share, FALSE, &io)))
     DBUG_RETURN(error);
 
-  if (io->query(delete_string.ptr(), delete_string.length()))
+  if (io->delete_row(share, table, buf))
   {
     DBUG_RETURN(stash_remote_error());
   }
@@ -3205,18 +3106,9 @@
 
 int ha_federatedx::delete_all_rows()
 {
-  char query_buffer[FEDERATEDX_QUERY_BUFFER_SIZE];
-  String query(query_buffer, sizeof(query_buffer), &my_charset_bin);
   int error;
   DBUG_ENTER("ha_federatedx::delete_all_rows");
 
-  query.length(0);
-
-  query.set_charset(system_charset_info);
-  query.append(STRING_WITH_LEN("TRUNCATE "));
-  append_ident(&query, share->table_name, share->table_name_length,
-               ident_quote_char);
-
   /* no need for savepoint in autocommit mode */
   if (!(ha_thd()->variables.option_bits & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)))
     txn->stmt_autocommit();
@@ -3228,13 +3120,19 @@
   if ((error= txn->acquire(share, FALSE, &io)))
     DBUG_RETURN(error);
 
-  if (io->query(query.ptr(), query.length()))
+  switch (io->delete_all_rows(share, table))
   {
+  case HA_ERR_WRONG_COMMAND:
+    DBUG_RETURN(HA_ERR_WRONG_COMMAND);
+
+  case 0:
+    stats.deleted+= stats.records;
+    stats.records= 0;
+    DBUG_RETURN(0);
+
+  default:
     DBUG_RETURN(stash_remote_error());
   }
-  stats.deleted+= stats.records;
-  stats.records= 0;
-  DBUG_RETURN(0);
 }
 
 
@@ -3305,36 +3203,6 @@
   DBUG_RETURN(to);
 }
 
-
-static int test_connection(MYSQL_THD thd, federatedx_io *io,
-                           FEDERATEDX_SHARE *share)
-{
-  char buffer[FEDERATEDX_QUERY_BUFFER_SIZE];
-  String str(buffer, sizeof(buffer), &my_charset_bin);
-  FEDERATEDX_IO_RESULT *resultset= NULL;
-  int retval;
-
-  str.length(0);
-  str.append(STRING_WITH_LEN("SELECT * FROM "));
-  append_identifier(thd, &str, share->table_name,
-                    share->table_name_length);
-  str.append(STRING_WITH_LEN(" WHERE 1=0"));
-
-  if ((retval= io->query(str.ptr(), str.length())))
-  {
-    sprintf(buffer, "database: '%s'  username: '%s'  hostname: '%s'",
-            share->database, share->username, share->hostname);
-    DBUG_PRINT("info", ("error-code: %d", io->error_code()));
-    my_error(ER_CANT_CREATE_FEDERATED_TABLE, MYF(0), buffer);
-  }
-  else
-    resultset= io->store_result();
-
-  io->free_result(resultset);
-
-  return retval;
-}
-
 /*
   create() does nothing, since we have no local setup of our own.
   FUTURE: We should potentially connect to the foreign database and
@@ -3372,7 +3240,7 @@
     tmp_txn= get_txn(thd);
     if (!(retval= tmp_txn->acquire(&tmp_share, TRUE, &tmp_io)))
     {
-      retval= test_connection(thd, tmp_io, &tmp_share);
+      retval= tmp_io->test_connection(thd, &tmp_share);
       tmp_txn->release(&tmp_io);
     }
     free_server(tmp_txn, tmp_share.s);
@@ -3399,7 +3267,7 @@
 
     tmp_io= federatedx_io::construct(thd->mem_root, &server);
 
-    retval= test_connection(thd, tmp_io, &tmp_share);
+    retval= tmp_io->test_connection(thd, &tmp_share);
 
 #ifndef DBUG_OFF
     mysql_mutex_unlock(&server.mutex);
diff -u -r -N federatedx/ha_federatedx.h federatedx/ha_federatedx.h
--- federatedx/ha_federatedx.h	2013-08-15 16:36:03.000000000 +0200
+++ federatedx/ha_federatedx.h	2013-09-02 11:20:34.949464900 +0200
@@ -36,7 +36,6 @@
 #pragma interface			/* gcc class implementation */
 #endif
 
-//#include <mysql.h>
 #include <my_global.h>
 #include <thr_lock.h>
 #include "handler.h"
@@ -131,93 +130,6 @@
 typedef struct st_federatedx_row FEDERATEDX_IO_ROW;
 typedef ptrdiff_t FEDERATEDX_IO_OFFSET;
 
-class federatedx_io
-{
-  friend class federatedx_txn;
-  FEDERATEDX_SERVER * const server;
-  federatedx_io **owner_ptr;
-  federatedx_io *txn_next;
-  federatedx_io *idle_next;
-  bool active;  /* currently participating in a transaction */
-  bool busy;    /* in use by a ha_federated instance */
-  bool readonly;/* indicates that no updates have occurred */
-
-protected:
-  void set_active(bool new_active)
-  { active= new_active; }
-public:
-  federatedx_io(FEDERATEDX_SERVER *);
-  virtual ~federatedx_io();
-
-  bool is_readonly() const { return readonly; }
-  bool is_active() const { return active; }
-
-  const char * get_charsetname() const
-  { return server->csname ? server->csname : "latin1"; }
-
-  const char * get_hostname() const { return server->hostname; }
-  const char * get_username() const { return server->username; }
-  const char * get_password() const { return server->password; }
-  const char * get_database() const { return server->database; }
-  ushort       get_port() const     { return server->port; }
-  const char * get_socket() const   { return server->socket; }
-
-  static bool handles_scheme(const char *scheme);
-  static federatedx_io *construct(MEM_ROOT *server_root,
-                                  FEDERATEDX_SERVER *server);
-
-  static void *operator new(size_t size, MEM_ROOT *mem_root) throw ()
-  { return alloc_root(mem_root, size); }
-  static void operator delete(void *ptr, size_t size)
-  { TRASH(ptr, size); }
-
-  virtual int query(const char *buffer, uint length)=0;
-  virtual FEDERATEDX_IO_RESULT *store_result()=0;
-
-  virtual size_t max_query_size() const=0;
-
-  virtual my_ulonglong affected_rows() const=0;
-  virtual my_ulonglong last_insert_id() const=0;
-
-  virtual int error_code()=0;
-  virtual const char *error_str()=0;
-
-  virtual void reset()=0;
-  virtual int commit()=0;
-  virtual int rollback()=0;
-
-  virtual int savepoint_set(ulong sp)=0;
-  virtual ulong savepoint_release(ulong sp)=0;
-  virtual ulong savepoint_rollback(ulong sp)=0;
-  virtual void savepoint_restrict(ulong sp)=0;
-
-  virtual ulong last_savepoint() const=0;
-  virtual ulong actual_savepoint() const=0;
-  virtual bool is_autocommit() const=0;
-
-  virtual bool table_metadata(ha_statistics *stats, const char *table_name,
-                              uint table_name_length, uint flag) = 0;
-
-  /* resultset operations */
-
-  virtual void free_result(FEDERATEDX_IO_RESULT *io_result)=0;
-  virtual unsigned int get_num_fields(FEDERATEDX_IO_RESULT *io_result)=0;
-  virtual my_ulonglong get_num_rows(FEDERATEDX_IO_RESULT *io_result)=0;
-  virtual FEDERATEDX_IO_ROW *fetch_row(FEDERATEDX_IO_RESULT *io_result)=0;
-  virtual ulong *fetch_lengths(FEDERATEDX_IO_RESULT *io_result)=0;
-  virtual const char *get_column_data(FEDERATEDX_IO_ROW *row,
-                                      unsigned int column)=0;
-  virtual bool is_column_null(const FEDERATEDX_IO_ROW *row,
-                              unsigned int column) const=0;
-
-  virtual size_t get_ref_length() const=0;
-  virtual void mark_position(FEDERATEDX_IO_RESULT *io_result,
-                             void *ref)=0;
-  virtual int seek_position(FEDERATEDX_IO_RESULT **io_result,
-                            const void *ref)=0;
-
-};
-
 
 class federatedx_txn
 {
@@ -451,9 +363,3 @@
 
 extern bool append_ident(String *string, const char *name, uint length,
                          const char quote_char);
-
-
-extern federatedx_io *instantiate_io_mysql(MEM_ROOT *server_root,
-                                           FEDERATEDX_SERVER *server);
-extern federatedx_io *instantiate_io_null(MEM_ROOT *server_root,
-                                          FEDERATEDX_SERVER *server);

References