← Back to team overview

maria-developers team mailing list archive

MDEV-3792: review of the handler part of the cassandra engine

 

Hi, Sergey,

The handler is quite ok. I've only had few mostly cosmetical comments.

> === modified file 'cmake/build_configurations/mysql_release.cmake'
> --- cmake/build_configurations/mysql_release.cmake	2012-06-06 12:15:29 +0000
> +++ cmake/build_configurations/mysql_release.cmake	2012-11-12 15:11:23 +0000
> @@ -46,6 +46,8 @@ SET(FEATURE_SET_large   5)
>  SET(FEATURE_SET_xlarge  6)
>  SET(FEATURE_SET_community 7)
>  
> +SET(WITH_CASSANDRA_STORAGE_ENGINE  ON)
> +

This probably should be reverted before pushing into main, right?

>  IF(FEATURE_SET)
>    STRING(TOLOWER ${FEATURE_SET} feature_set)
>    SET(num ${FEATURE_SET_${feature_set}})
> 
> === modified file 'sql/sql_join_cache.cc'
> --- sql/sql_join_cache.cc	2012-03-24 17:21:22 +0000
> +++ sql/sql_join_cache.cc	2012-11-12 15:11:23 +0000
> @@ -4543,7 +4546,7 @@ bool JOIN_CACHE_BKAH::prepare_look_for_m
>  {
>    last_matching_rec_ref_ptr= next_matching_rec_ref_ptr= 0;
>    if (no_association &&
> -      (curr_matching_chain= get_matching_chain_by_join_key()))
> +      !(curr_matching_chain= get_matching_chain_by_join_key())) //psergey: added '!'

Is that something that should be pushed into the main branch or
it's a temporary hack for cassandra, that should be reverted?

>      return 1;
>    last_matching_rec_ref_ptr= get_next_rec_ref(curr_matching_chain);
>    return 0;
> 
> === added file 'storage/cassandra/CMakeLists.txt'
> --- storage/cassandra/CMakeLists.txt	1970-01-01 00:00:00 +0000
> +++ storage/cassandra/CMakeLists.txt	2012-11-12 15:11:23 +0000
> @@ -0,0 +1,25 @@
> +
> +SET(cassandra_sources 
> +     ha_cassandra.cc 
> +     ha_cassandra.h 
> +     cassandra_se.h
> +     cassandra_se.cc
> +     gen-cpp/Cassandra.cpp
> +     gen-cpp/cassandra_types.h 
> +     gen-cpp/cassandra_types.cpp
> +     gen-cpp/cassandra_constants.h 
> +     gen-cpp/cassandra_constants.cpp
> +     gen-cpp/Cassandra.h)
> +
> +#INCLUDE_DIRECTORIES(BEFORE ${Boost_INCLUDE_DIRS})
> +
> +#INCLUDE_DIRECTORIES(AFTER /usr/local/include/thrift)
> +INCLUDE_DIRECTORIES(AFTER /home/buildbot/build/thrift-inst/include/thrift/) 

this needs to be fixed before pushing into the main tree
I suppose, you'll need to detect here if thrift is available
and disable the engine otherwise

> +
> +# 
> +STRING(REPLACE "-fno-exceptions" "" CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS})
> +STRING(REPLACE "-fno-implicit-templates" "" CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS})
> +#LINK_DIRECTORIES(/home/psergey/cassandra/thrift/lib)
> +
> +MYSQL_ADD_PLUGIN(cassandra ${cassandra_sources} STORAGE_ENGINE LINK_LIBRARIES thrift)
> +# was: STORAGE_ENGINE MANDATORY 
> 
> === added file 'storage/cassandra/cassandra_se.cc'
> --- storage/cassandra/cassandra_se.cc	1970-01-01 00:00:00 +0000
> +++ storage/cassandra/cassandra_se.cc	2012-11-12 15:11:23 +0000
> @@ -0,0 +1,809 @@
> +
> +// Cassandra includes:
> +#include <inttypes.h>
> +#include <netinet/in.h>
> +#include <sys/time.h>
> +#include <stdio.h>
> +#include <stdarg.h>
> +
> +#include "Thrift.h"
> +#include "transport/TSocket.h"
> +#include "transport/TTransport.h"
> +#include "transport/TBufferTransports.h"
> +#include "protocol/TProtocol.h"
> +#include "protocol/TBinaryProtocol.h"
> +#include "gen-cpp/Cassandra.h"
> +// cassandra includes end
> +
> +#include "cassandra_se.h"
> +
> +struct st_mysql_lex_string
> +{
> +  char *str;
> +  size_t length;
> +};
> +
> +using namespace std;
> +using namespace apache::thrift;
> +using namespace apache::thrift::transport;
> +using namespace apache::thrift::protocol;
> +using namespace org::apache::cassandra;
> +
> +
> +void Cassandra_se_interface::print_error(const char *format, ...)
> +{
> +  va_list ap;
> +  va_start(ap, format);
> +  // it's not a problem if output was truncated
> +  vsnprintf(err_buffer, sizeof(err_buffer), format, ap);

my_vsnprintf please

> +  va_end(ap);
> +}
> +
...
> === added file 'storage/cassandra/ha_cassandra.h'
> --- storage/cassandra/ha_cassandra.h	1970-01-01 00:00:00 +0000
> +++ storage/cassandra/ha_cassandra.h	2012-11-12 15:11:23 +0000
> @@ -0,0 +1,328 @@
> +/*
> +   Copyright (c) 2012, Monty Program Ab
> +
> +   This program is free software; you can redistribute it and/or modify
> +   it under the terms of the GNU General Public License as published by
> +   the Free Software Foundation; version 2 of the License.
> +
> +   This program is distributed in the hope that it will be useful,
> +   but WITHOUT ANY WARRANTY; without even the implied warranty of
> +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
> +   GNU General Public License for more details.
> +
> +   You should have received a copy of the GNU General Public License
> +   along with this program; if not, write to the Free Software
> +   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
> +#ifdef USE_PRAGMA_INTERFACE
> +#pragma interface			/* gcc class implementation */
> +#endif
> +
> +
> +#include "my_global.h"                   /* ulonglong */
> +#include "thr_lock.h"                    /* THR_LOCK, THR_LOCK_DATA */
> +#include "handler.h"                     /* handler */
> +#include "my_base.h"                     /* ha_rows */
> +
> +#include "cassandra_se.h"
> +
> +/** @brief
> +  CASSANDRA_SHARE is a structure that will be shared among all open handlers.
> +  This example implements the minimum of what you will probably need.
> +*/
> +typedef struct st_cassandra_share {
> +  char *table_name;
> +  uint table_name_length,use_count;
> +  mysql_mutex_t mutex;
> +  THR_LOCK lock;
> +} CASSANDRA_SHARE;
> +
> +class ColumnDataConverter;
> +
> +struct ha_table_option_struct;
> +
> +
> +struct st_dynamic_column_value;
> +
> +typedef bool (* CAS2DYN_CONVERTER)(const char *cass_data,
> +                                   int cass_data_len,
> +                                   struct st_dynamic_column_value *value);
> +typedef bool (* DYN2CAS_CONVERTER)(struct st_dynamic_column_value *value,
> +                                   char **cass_data,
> +                                   int *cass_data_len,
> +                                   void *buf, void **freemem);
> +struct cassandra_type_def
> +{
> +  const char *name;
> +  CAS2DYN_CONVERTER cassandra_to_dynamic;
> +  DYN2CAS_CONVERTER dynamic_to_cassandra;
> +};
> +
> +typedef struct cassandra_type_def CASSANDRA_TYPE_DEF;
> +
> +enum cassandtra_type_enum {CT_BIGINT, CT_INT, CT_COUNTER, CT_FLOAT, CT_DOUBLE,
> +  CT_BLOB, CT_ASCII, CT_TEXT, CT_TIMESTAMP, CT_UUID, CT_BOOLEAN, CT_VARINT,
> +  CT_DECIMAL};
> +
> +typedef enum cassandtra_type_enum CASSANDRA_TYPE;
> +
> +
> +
> +/** @brief
> +  Class definition for the storage engine
> +*/
> +class ha_cassandra: public handler
> +{
> +  friend class Column_name_enumerator_impl;
> +  THR_LOCK_DATA lock;      ///< MySQL lock
> +  CASSANDRA_SHARE *share;    ///< Shared lock info
> +
> +  Cassandra_se_interface *se;
> +
> +  /* description of static part of the table definition */
> +  ColumnDataConverter **field_converters;
> +  uint n_field_converters;
> +
> +  CASSANDRA_TYPE_DEF *default_type_def;
> +  /* description of dynamic columns part */
> +  CASSANDRA_TYPE_DEF *special_type_field_converters;
> +  LEX_STRING *special_type_field_names;
> +  uint n_special_type_fields;
> +  DYNAMIC_ARRAY dynamic_values, dynamic_names;
> +  DYNAMIC_STRING dynamic_rec;
> +
> +  ColumnDataConverter *rowkey_converter;
> +
> +  bool setup_field_converters(Field **field, uint n_fields);
> +  void free_field_converters();
> +
> +  int read_cassandra_columns(bool unpack_pk);
> +  int check_table_options(struct ha_table_option_struct* options);
> +
> +  bool doing_insert_batch;
> +  ha_rows insert_rows_batched;
> +
> +  uint dyncol_field;
> +  bool dyncol_set;
> +
> +  /* Used to produce 'wrong column %s at row %lu' warnings */
> +  ha_rows insert_lineno;
> +  void print_conversion_error(const char *field_name,
> +                              char *cass_value, int cass_value_len);
> +  int connect_and_check_options(TABLE *table_arg);
> +public:
> +  ha_cassandra(handlerton *hton, TABLE_SHARE *table_arg);
> +  ~ha_cassandra()
> +  {
> +    free_field_converters();
> +    delete se;
> +  }
> +
> +  /** @brief
> +    The name that will be used for display purposes.
> +   */
> +  const char *table_type() const { return "CASSANDRA"; }
> +
> +  /** @brief
> +    The name of the index type that will be used for display.
> +    Don't implement this method unless you really have indexes.
> +   */
> +  const char *index_type(uint inx) { return "HASH"; }
> +
> +  /** @brief
> +    The file extensions.
> +   */
> +  const char **bas_ext() const;
> +
> +  /** @brief
> +    This is a list of flags that indicate what functionality the storage engine
> +    implements. The current table flags are documented in handler.h
> +  */
> +  ulonglong table_flags() const
> +  {
> +    /*
> +      HA_BINLOG_STMT_CAPABLE
> +        We are saying that this engine is just statement capable to have
> +        an engine that can only handle statement-based logging. This is
> +        used in testing.
> +      HA_REC_NOT_IN_SEQ
> +        If we don't set it, filesort crashes, because it assumes rowids are
> +        1..8 byte numbers
> +    */
> +    return HA_BINLOG_STMT_CAPABLE |
> +           HA_REC_NOT_IN_SEQ;

HA_NO_TRANSACTIONS is unset. Is this engine transactional?
HA_PARTIAL_COLUMN_READ is unset. Do you always work with all columns, ignoring read_set and write_set?
HA_TABLE_SCAN_ON_INDEX is unset. Do you store the data MyISAM-style, index and data in separate files?
HA_FAST_KEY_READ is unset. is reading keys in order faster in cassandra, than random reads?
HA_REQUIRE_PRIMARY_KEY is unset. but a primary key is required.
HA_PRIMARY_KEY_IN_READ_INDEX is unset. but as far as I see, primary key columns are always returned (and needed for position())
HA_PRIMARY_KEY_REQUIRED_FOR_POSITION is unset, but a primary key is required for position()
HA_NO_AUTO_INCREMENT is unset. is auto-increment supported?

> +
> +  }
> +
> +  /** @brief
> +    This is a bitmap of flags that indicates how the storage engine
> +    implements indexes. The current index flags are documented in
> +    handler.h. If you do not implement indexes, just return zero here.
> +
> +      @details
> +    part is the key part to check. First key part is 0.
> +    If all_parts is set, MySQL wants to know the flags for the combined
> +    index, up to and including 'part'.
> +  */
> +  ulong index_flags(uint inx, uint part, bool all_parts) const
> +  {
> +    return 0;
> +  }
> +
> +  /** @brief
> +    unireg.cc will call max_supported_record_length(), max_supported_keys(),
> +    max_supported_key_parts(), uint max_supported_key_length()
> +    to make sure that the storage engine can handle the data it is about to
> +    send. Return *real* limits of your storage engine here; MySQL will do
> +    min(your_limits, MySQL_limits) automatically.
> +   */
> +  uint max_supported_record_length() const { return HA_MAX_REC_LENGTH; }
> +
> +  /* Support only one Primary Key, for now */
> +  uint max_supported_keys()          const { return 1; }
> +  uint max_supported_key_parts()     const { return 1; }
> +
> +  /** @brief
> +    unireg.cc will call this to make sure that the storage engine can handle
> +    the data it is about to send. Return *real* limits of your storage engine
> +    here; MySQL will do min(your_limits, MySQL_limits) automatically.
> +
> +      @details
> +    There is no need to implement ..._key_... methods if your engine doesn't
> +    support indexes.
> +   */
> +  uint max_supported_key_length()    const { return 16*1024; /* just to return something*/ }
> +
> +  int index_init(uint idx, bool sorted);
> +
> +  int index_read_map(uchar * buf, const uchar * key,
> +                     key_part_map keypart_map,
> +                     enum ha_rkey_function find_flag);
> +
> +  /** @brief
> +    Called in test_quick_select to determine if indexes should be used.
> +  */
> +  virtual double scan_time() { return (double) (stats.records+stats.deleted) / 20.0+10; }

does that make any sense? (I understand that it's copy-paste)
stats.deleted, for example, is never set.

> +
> +  /** @brief
> +    This method will never be called if you do not implement indexes.
> +  */
> +  virtual double read_time(uint, uint, ha_rows rows)
> +  { return (double) rows /  20.0+1; }

same question

> +
> +  virtual void start_bulk_insert(ha_rows rows);
> +  virtual int end_bulk_insert();
> +
> +  virtual int reset();
> +
> +
> +  int multi_range_read_init(RANGE_SEQ_IF *seq, void *seq_init_param,
> +                            uint n_ranges, uint mode, HANDLER_BUFFER *buf);
> +  int multi_range_read_next(range_id_t *range_info);
> +  ha_rows multi_range_read_info_const(uint keyno, RANGE_SEQ_IF *seq,
> +                                      void *seq_init_param,
> +                                      uint n_ranges, uint *bufsz,
> +                                      uint *flags, COST_VECT *cost);
> +  ha_rows multi_range_read_info(uint keyno, uint n_ranges, uint keys,
> +                                uint key_parts, uint *bufsz,
> +                                uint *flags, COST_VECT *cost);
> +  int multi_range_read_explain_info(uint mrr_mode, char *str, size_t size);
> +
> +private:
> +  bool source_exhausted;
> +  bool mrr_start_read();
> +  int check_field_options(Field **fields);
> +  int read_dyncol(DYNAMIC_ARRAY *vals, DYNAMIC_ARRAY *names,
> +                  String *valcol, char **freenames);
> +  int write_dynamic_row(DYNAMIC_ARRAY *names, DYNAMIC_ARRAY *vals);
> +  void static free_dynamic_row(DYNAMIC_ARRAY *vals, DYNAMIC_ARRAY *names,
> +                               char *free_names);
> +  CASSANDRA_TYPE_DEF * get_cassandra_field_def(char *cass_name,
> +                                               int cass_name_length);
> +public:
> +
> +  /*
> +    Everything below are methods that we implement in ha_example.cc.

may be it'd be better to remove the references to ha_example.cc (and example
in general) before pushing this into the main tree?

> +
> +    Most of these methods are not obligatory, skip them and
> +    MySQL will treat them as not implemented
> +  */
> +  /** @brief
> +    We implement this in ha_example.cc; it's a required method.
> +  */
> +  int open(const char *name, int mode, uint test_if_locked);    // required
> +
> +  /** @brief
> +    We implement this in ha_example.cc; it's a required method.
> +  */
> +  int close(void);                                              // required
> +
> +  /** @brief
> +    We implement this in ha_example.cc. It's not an obligatory method;
> +    skip it and and MySQL will treat it as not implemented.
> +  */
> +  int write_row(uchar *buf);
> +
> +  /** @brief
> +    We implement this in ha_example.cc. It's not an obligatory method;
> +    skip it and and MySQL will treat it as not implemented.
> +  */
> +  int update_row(const uchar *old_data, uchar *new_data);
> +
> +  /** @brief
> +    We implement this in ha_example.cc. It's not an obligatory method;
> +    skip it and and MySQL will treat it as not implemented.
> +  */
> +  int delete_row(const uchar *buf);
> +
> +  /** @brief
> +    We implement this in ha_example.cc. It's not an obligatory method;
> +    skip it and and MySQL will treat it as not implemented.
> +  */
> +  int index_next(uchar *buf);
> +
> +  /** @brief
> +    We implement this in ha_example.cc. It's not an obligatory method;
> +    skip it and and MySQL will treat it as not implemented.
> +  */
> +  int index_prev(uchar *buf);
> +
> +  /** @brief
> +    We implement this in ha_example.cc. It's not an obligatory method;
> +    skip it and and MySQL will treat it as not implemented.
> +  */
> +  int index_first(uchar *buf);

why to have unimplemented methods here?
I'd rather remove them.

> +
> +  /** @brief
> +    We implement this in ha_example.cc. It's not an obligatory method;
> +    skip it and and MySQL will treat it as not implemented.
> +  */
> +  int index_last(uchar *buf);
> +
> +  /** @brief
> +    Unlike index_init(), rnd_init() can be called two consecutive times
> +    without rnd_end() in between (it only makes sense if scan=1). In this
> +    case, the second call should prepare for the new table scan (e.g if
> +    rnd_init() allocates the cursor, the second call should position the
> +    cursor to the start of the table; no need to deallocate and allocate
> +    it again. This is a required method.
> +  */
> +  int rnd_init(bool scan);                                      //required
> +  int rnd_end();
> +  int rnd_next(uchar *buf);                                     ///< required
> +  int rnd_pos(uchar *buf, uchar *pos);                          ///< required
> +  void position(const uchar *record);                           ///< required
> +  int info(uint);                                               ///< required
> +  int extra(enum ha_extra_function operation);
> +  int external_lock(THD *thd, int lock_type);                   ///< required
> +  int delete_all_rows(void);
> +  ha_rows records_in_range(uint inx, key_range *min_key,
> +                           key_range *max_key);
> +  int delete_table(const char *from);
> +  int create(const char *name, TABLE *form,
> +             HA_CREATE_INFO *create_info);                      ///< required
> +  bool check_if_incompatible_data(HA_CREATE_INFO *info,
> +                                  uint table_changes);
> +
> +  THR_LOCK_DATA **store_lock(THD *thd, THR_LOCK_DATA **to,
> +                             enum thr_lock_type lock_type);     ///< required
> +};
> 
> === added file 'storage/cassandra/ha_cassandra.cc'
> --- storage/cassandra/ha_cassandra.cc	1970-01-01 00:00:00 +0000
> +++ storage/cassandra/ha_cassandra.cc	2012-11-12 15:11:23 +0000
> @@ -0,0 +1,2674 @@
> +/*
> +   Copyright (c) 2012, Monty Program Ab
> +
> +   This program is free software; you can redistribute it and/or modify
> +   it under the terms of the GNU General Public License as published by
> +   the Free Software Foundation; version 2 of the License.
> +
> +   This program is distributed in the hope that it will be useful,
> +   but WITHOUT ANY WARRANTY; without even the implied warranty of
> +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
> +   GNU General Public License for more details.
> +
> +   You should have received a copy of the GNU General Public License
> +   along with this program; if not, write to the Free Software
> +   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
> +
> +#ifdef USE_PRAGMA_IMPLEMENTATION
> +#pragma implementation        // gcc: Class implementation
> +#endif
> +
> +#include <mysql/plugin.h>
> +#include "ha_cassandra.h"
> +#include "sql_class.h"
> +
> +#define DYNCOL_USUAL 20
> +#define DYNCOL_DELTA 100
> +#define DYNCOL_USUAL_REC 1024
> +#define DYNCOL_DELTA_REC 1024
> +
> +static handler *cassandra_create_handler(handlerton *hton,
> +                                       TABLE_SHARE *table,
> +                                       MEM_ROOT *mem_root);
> +
> +extern int dynamic_column_error_message(enum_dyncol_func_result rc);
> +
> +handlerton *cassandra_hton;
> +
> +
> +/*
> +   Hash used to track the number of open tables; variable for example share
> +   methods
> +*/
> +static HASH cassandra_open_tables;
> +
> +/* The mutex used to init the hash; variable for example share methods */
> +mysql_mutex_t cassandra_mutex;
> +
> +
> +/**
> +  Structure for CREATE TABLE options (table options).
> +  It needs to be called ha_table_option_struct.
> +
> +  The option values can be specified in the CREATE TABLE at the end:
> +  CREATE TABLE ( ... ) *here*
> +*/
> +
> +struct ha_table_option_struct
> +{
> +  const char *thrift_host;
> +  int         thrift_port;
> +  const char *keyspace;
> +  const char *column_family;
> +};
> +
> +
> +ha_create_table_option cassandra_table_option_list[]=
> +{
> +  /*
> +    one option that takes an arbitrary string
> +  */
> +  HA_TOPTION_STRING("thrift_host", thrift_host),
> +  HA_TOPTION_NUMBER("thrift_port", thrift_port, 9160, 1, 65535, 0),
> +  HA_TOPTION_STRING("keyspace", keyspace),
> +  HA_TOPTION_STRING("column_family", column_family),
> +  HA_TOPTION_END
> +};
> +
> +/**
> +  Structure for CREATE TABLE options (field options).
> +*/
> +
> +struct ha_field_option_struct
> +{
> +  bool dyncol_field;
> +};
> +
> +ha_create_table_option cassandra_field_option_list[]=
> +{
> +  /*
> +    Collect all other columns as dynamic here,
> +    the valid values are YES/NO, ON/OFF, 1/0.
> +    The default is 0, that is true, yes, on.

really? looks like a typo. 0 is false, 1 is true.

> +  */
> +  HA_FOPTION_BOOL("DYNAMIC_COLUMN_STORAGE", dyncol_field, 0),
> +  HA_FOPTION_END
> +};
> +
> +static MYSQL_THDVAR_ULONG(insert_batch_size, PLUGIN_VAR_RQCMDARG,
> +  "Number of rows in an INSERT batch",
> +  NULL, NULL, /*default*/ 100, /*min*/ 1, /*max*/ 1024*1024*1024, 0);
> +
> +static MYSQL_THDVAR_ULONG(multiget_batch_size, PLUGIN_VAR_RQCMDARG,
> +  "Number of rows in a multiget(MRR) batch",
> +  NULL, NULL, /*default*/ 100, /*min*/ 1, /*max*/ 1024*1024*1024, 0);
> +
> +static MYSQL_THDVAR_ULONG(rnd_batch_size, PLUGIN_VAR_RQCMDARG,
> +  "Number of rows in an rnd_read (full scan) batch",
> +  NULL, NULL, /*default*/ 10*1000, /*min*/ 1, /*max*/ 1024*1024*1024, 0);
> +
> +static MYSQL_THDVAR_ULONG(failure_retries, PLUGIN_VAR_RQCMDARG,
> +  "Number of times to retry Cassandra calls that failed due to timeouts or "
> +  "network communication problems. The default, 0, means not to retry.",
> +  NULL, NULL, /*default*/ 0, /*min*/ 0, /*max*/ 1024*1024*1024, 0);
> +
> +/* These match values in enum_cassandra_consistency_level */
> +const char *cassandra_consistency_level[] =
> +{
> +  "ONE",
> +  "QUORUM",
> +  "LOCAL_QUORUM",
> +  "EACH_QUORUM",
> +  "ALL",
> +  "ANY",
> +  "TWO",
> +  "THREE",
> +   NullS
> +};
> +
> +TYPELIB cassandra_consistency_level_typelib= {
> +  array_elements(cassandra_consistency_level) - 1, "",
> +  cassandra_consistency_level, NULL
> +};
> +
> +
> +static MYSQL_THDVAR_ENUM(write_consistency, PLUGIN_VAR_RQCMDARG,
> +  "Cassandra consistency level to use for write operations", NULL, NULL,
> +  ONE, &cassandra_consistency_level_typelib);
> +
> +static MYSQL_THDVAR_ENUM(read_consistency, PLUGIN_VAR_RQCMDARG,
> +  "Cassandra consistency level to use for read operations", NULL, NULL,
> +  ONE, &cassandra_consistency_level_typelib);
> +
> +
> +mysql_mutex_t cassandra_default_host_lock;
> +static char* cassandra_default_thrift_host = NULL;
> +static char cassandra_default_host_buf[256]="";
> +
> +static void
> +cassandra_default_thrift_host_update(THD *thd,
> +                                     struct st_mysql_sys_var* var,
> +                                     void* var_ptr, /*!< out: where the
> +                                                    formal string goes */
> +                                     const void* save) /*!< in: immediate result
> +                                                       from check function */
> +{
> +  const char *new_host= *((char**)save);
> +  const size_t max_len= sizeof(cassandra_default_host_buf);
> +
> +  mysql_mutex_lock(&cassandra_default_host_lock);
> +
> +  if (new_host)
> +  {
> +    strncpy(cassandra_default_host_buf, new_host, max_len-1);
> +    cassandra_default_host_buf[max_len-1]= 0;
> +    cassandra_default_thrift_host= cassandra_default_host_buf;
> +  }
> +  else
> +  {
> +    cassandra_default_host_buf[0]= 0;
> +    cassandra_default_thrift_host= NULL;
> +  }
> +
> +  *((const char**)var_ptr)= cassandra_default_thrift_host;
> +
> +  mysql_mutex_unlock(&cassandra_default_host_lock);
> +}
> +
> +
> +static MYSQL_SYSVAR_STR(default_thrift_host, cassandra_default_thrift_host,
> +                        PLUGIN_VAR_RQCMDARG,
> +                        "Default host for Cassandra thrift connections",
> +                        /*check*/NULL,
> +                        cassandra_default_thrift_host_update,
> +                        /*default*/NULL);
> +
> +static struct st_mysql_sys_var* cassandra_system_variables[]= {
> +  MYSQL_SYSVAR(insert_batch_size),
> +  MYSQL_SYSVAR(multiget_batch_size),
> +  MYSQL_SYSVAR(rnd_batch_size),
> +
> +  MYSQL_SYSVAR(default_thrift_host),
> +  MYSQL_SYSVAR(write_consistency),
> +  MYSQL_SYSVAR(read_consistency),
> +  MYSQL_SYSVAR(failure_retries),
> +  NULL
> +};
> +
> +
> +static SHOW_VAR cassandra_status_variables[]= {
> +  {"row_inserts",
> +    (char*) &cassandra_counters.row_inserts,         SHOW_LONG},
> +  {"row_insert_batches",
> +    (char*) &cassandra_counters.row_insert_batches,  SHOW_LONG},
> +
> +  {"multiget_reads",
> +    (char*) &cassandra_counters.multiget_reads,      SHOW_LONG},
> +  {"multiget_keys_scanned",
> +    (char*) &cassandra_counters.multiget_keys_scanned, SHOW_LONG},
> +  {"multiget_rows_read",
> +    (char*) &cassandra_counters.multiget_rows_read,  SHOW_LONG},
> +
> +  {"timeout_exceptions",
> +    (char*) &cassandra_counters.timeout_exceptions, SHOW_LONG},
> +  {"unavailable_exceptions",
> +    (char*) &cassandra_counters.unavailable_exceptions, SHOW_LONG},
> +  {NullS, NullS, SHOW_LONG}
> +};
> +
> +
> +Cassandra_status_vars cassandra_counters;
> +Cassandra_status_vars cassandra_counters_copy;
> +
> +
> +/**
> +  @brief
> +  Function we use in the creation of our hash to get key.
> +*/
> +
> +static uchar* cassandra_get_key(CASSANDRA_SHARE *share, size_t *length,
> +                             my_bool not_used __attribute__((unused)))
> +{
> +  *length=share->table_name_length;
> +  return (uchar*) share->table_name;
> +}
> +
> +#ifdef HAVE_PSI_INTERFACE
> +static PSI_mutex_key ex_key_mutex_example, ex_key_mutex_CASSANDRA_SHARE_mutex;
> +
> +static PSI_mutex_info all_cassandra_mutexes[]=
> +{
> +  { &ex_key_mutex_example, "cassandra", PSI_FLAG_GLOBAL},
> +  { &ex_key_mutex_CASSANDRA_SHARE_mutex, "CASSANDRA_SHARE::mutex", 0}
> +};
> +
> +static void init_cassandra_psi_keys()
> +{
> +  const char* category= "cassandra";
> +  int count;
> +
> +  if (PSI_server == NULL)
> +    return;
> +
> +  count= array_elements(all_cassandra_mutexes);
> +  PSI_server->register_mutex(category, all_cassandra_mutexes, count);
> +}
> +#endif
> +
> +static int cassandra_init_func(void *p)
> +{
> +  DBUG_ENTER("cassandra_init_func");
> +
> +#ifdef HAVE_PSI_INTERFACE
> +  init_cassandra_psi_keys();
> +#endif
> +
> +  cassandra_hton= (handlerton *)p;
> +  mysql_mutex_init(ex_key_mutex_example, &cassandra_mutex, MY_MUTEX_INIT_FAST);
> +  (void) my_hash_init(&cassandra_open_tables,system_charset_info,32,0,0,
> +                      (my_hash_get_key) cassandra_get_key,0,0);
> +
> +  cassandra_hton->state=   SHOW_OPTION_YES;
> +  cassandra_hton->create=  cassandra_create_handler;
> +  /*
> +    Don't specify HTON_CAN_RECREATE in flags. re-create is used by TRUNCATE
> +    TABLE to create an *empty* table from scratch. Cassandra table won't be
> +    emptied if re-created.
> +  */

HTON_ALTER_NOT_SUPPORTED is not set. ALTER works?
HTON_TEMPORARY_NOT_SUPPORTED is not set. CREATE TEMPORARY TABLE works?
HTON_NO_PARTITION is not set. Partitioning works?

> +  cassandra_hton->flags=   0;
> +  cassandra_hton->table_options= cassandra_table_option_list;
> +  cassandra_hton->field_options= cassandra_field_option_list;
> +
> +  mysql_mutex_init(0 /* no instrumentation */,

why no instrumentation ?

> +                   &cassandra_default_host_lock, MY_MUTEX_INIT_FAST);
> +
> +  DBUG_RETURN(0);
> +}
> +
> +
> +static int cassandra_done_func(void *p)
> +{
> +  int error= 0;
> +  DBUG_ENTER("cassandra_done_func");
> +  if (cassandra_open_tables.records)
> +    error= 1;
> +  my_hash_free(&cassandra_open_tables);
> +  mysql_mutex_destroy(&cassandra_mutex);
> +  mysql_mutex_destroy(&cassandra_default_host_lock);
> +  DBUG_RETURN(error);
> +}
> +
> +
> +/**
> +  @brief
> +  Example of simple lock controls. The "share" it creates is a
> +  structure we will pass to each cassandra handler. Do you have to have
> +  one of these? Well, you have pieces that are used for locking, and
> +  they are needed to function.
> +*/
> +
> +static CASSANDRA_SHARE *get_share(const char *table_name, TABLE *table)
> +{
> +  CASSANDRA_SHARE *share;
> +  uint length;
> +  char *tmp_name;
> +
> +  mysql_mutex_lock(&cassandra_mutex);
> +  length=(uint) strlen(table_name);
> +
> +  if (!(share=(CASSANDRA_SHARE*) my_hash_search(&cassandra_open_tables,
> +                                              (uchar*) table_name,
> +                                              length)))
> +  {
> +    if (!(share=(CASSANDRA_SHARE *)
> +          my_multi_malloc(MYF(MY_WME | MY_ZEROFILL),
> +                          &share, sizeof(*share),
> +                          &tmp_name, length+1,
> +                          NullS)))
> +    {
> +      mysql_mutex_unlock(&cassandra_mutex);
> +      return NULL;
> +    }
> +
> +    share->use_count=0;
> +    share->table_name_length=length;
> +    share->table_name=tmp_name;
> +    strmov(share->table_name,table_name);
> +    if (my_hash_insert(&cassandra_open_tables, (uchar*) share))
> +      goto error;
> +    thr_lock_init(&share->lock);
> +    mysql_mutex_init(ex_key_mutex_CASSANDRA_SHARE_mutex,
> +                     &share->mutex, MY_MUTEX_INIT_FAST);
> +  }
> +  share->use_count++;
> +  mysql_mutex_unlock(&cassandra_mutex);
> +
> +  return share;
> +
> +error:
> +  mysql_mutex_destroy(&share->mutex);
> +  my_free(share);
> +
> +  return NULL;
> +}
> +
> +
> +/**
> +  @brief
> +  Free lock controls. We call this whenever we close a table. If the table had
> +  the last reference to the share, then we free memory associated with it.
> +*/
> +
> +static int free_share(CASSANDRA_SHARE *share)
> +{
> +  mysql_mutex_lock(&cassandra_mutex);
> +  if (!--share->use_count)
> +  {
> +    my_hash_delete(&cassandra_open_tables, (uchar*) share);
> +    thr_lock_delete(&share->lock);
> +    mysql_mutex_destroy(&share->mutex);
> +    my_free(share);
> +  }
> +  mysql_mutex_unlock(&cassandra_mutex);
> +
> +  return 0;
> +}
> +
> +
> +static handler* cassandra_create_handler(handlerton *hton,
> +                                       TABLE_SHARE *table,
> +                                       MEM_ROOT *mem_root)
> +{
> +  return new (mem_root) ha_cassandra(hton, table);
> +}
> +
> +
> +ha_cassandra::ha_cassandra(handlerton *hton, TABLE_SHARE *table_arg)
> +  :handler(hton, table_arg),
> +   se(NULL), field_converters(NULL),
> +   special_type_field_converters(NULL),
> +   special_type_field_names(NULL), n_special_type_fields(0),
> +   rowkey_converter(NULL),
> +   dyncol_field(0), dyncol_set(0)
> +{}
> +
> +
> +static const char *ha_cassandra_exts[] = {
> +  NullS
> +};
> +
> +const char **ha_cassandra::bas_ext() const
> +{
> +  return ha_cassandra_exts;
> +}
> +
> +
> +int ha_cassandra::connect_and_check_options(TABLE *table_arg)
> +{
> +  ha_table_option_struct *options= table_arg->s->option_struct;
> +  int res;
> +  DBUG_ENTER("ha_cassandra::connect_and_check_options");
> +
> +  if ((res= check_field_options(table_arg->s->field)) ||
> +      (res= check_table_options(options)))
> +    DBUG_RETURN(res);
> +
> +  se= create_cassandra_se();
> +  se->set_column_family(options->column_family);
> +  const char *thrift_host= options->thrift_host? options->thrift_host:
> +                           cassandra_default_thrift_host;
> +  if (se->connect(thrift_host, options->thrift_port, options->keyspace))
> +  {
> +    my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), se->error_str());
> +    DBUG_RETURN(HA_ERR_NO_CONNECTION);
> +  }
> +
> +  if (setup_field_converters(table_arg->field, table_arg->s->fields))
> +  {
> +    DBUG_RETURN(HA_ERR_NO_CONNECTION);
> +  }
> +
> +  DBUG_RETURN(0);
> +}
> +
> +
> +int ha_cassandra::check_field_options(Field **fields)
> +{
> +  Field **field;
> +  uint i;
> +  DBUG_ENTER("ha_cassandra::check_field_options");
> +  for (field= fields, i= 0; *field; field++, i++)
> +  {
> +    ha_field_option_struct *field_options= (*field)->option_struct;
> +    if (field_options && field_options->dyncol_field)
> +    {
> +      if (dyncol_set || (*field)->type() != MYSQL_TYPE_BLOB)
> +      {
> +         my_error(ER_WRONG_FIELD_SPEC, MYF(0), (*field)->field_name);
> +         DBUG_RETURN(HA_WRONG_CREATE_OPTION);
> +      }
> +      dyncol_set= 1;
> +      dyncol_field= i;
> +      bzero(&dynamic_values, sizeof(dynamic_values));
> +      bzero(&dynamic_names, sizeof(dynamic_names));
> +      bzero(&dynamic_rec, sizeof(dynamic_rec));
> +    }
> +  }
> +  DBUG_RETURN(0);
> +}
> +
> +
> +int ha_cassandra::open(const char *name, int mode, uint test_if_locked)
> +{
> +  DBUG_ENTER("ha_cassandra::open");
> +
> +  if (!(share = get_share(name, table)))
> +    DBUG_RETURN(1);
> +  thr_lock_data_init(&share->lock,&lock,NULL);
> +
> +  DBUG_ASSERT(!se);
> +  /*
> +    Don't do the following on open: it prevents SHOW CREATE TABLE when the server
> +    has gone away.
> +  */
> +  /*
> +  int res;
> +  if ((res= connect_and_check_options(table)))
> +  {
> +    DBUG_RETURN(res);
> +  }
> +  */
> +
> +  info(HA_STATUS_NO_LOCK | HA_STATUS_VARIABLE | HA_STATUS_CONST);
> +  insert_lineno= 0;
> +
> +  DBUG_RETURN(0);
> +}
> +
> +
> +int ha_cassandra::close(void)
> +{
> +  DBUG_ENTER("ha_cassandra::close");
> +  delete se;
> +  se= NULL;
> +  free_field_converters();
> +  DBUG_RETURN(free_share(share));
> +}
> +
> +
> +int ha_cassandra::check_table_options(ha_table_option_struct *options)
> +{
> +  if (!options->thrift_host && (!cassandra_default_thrift_host ||
> +                                !cassandra_default_thrift_host[0]))
> +  {
> +    my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0),
> +             "thrift_host table option must be specified, or "
> +             "@@cassandra_default_thrift_host must be set");
> +    return HA_WRONG_CREATE_OPTION;
> +  }
> +
> +  if (!options->keyspace || !options->column_family)
> +  {
> +    my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0),
> +             "keyspace and column_family table options must be specified");
> +    return HA_WRONG_CREATE_OPTION;
> +  }
> +  return 0;
> +}
> +
> +
> +/**
> +  @brief
> +  create() is called to create a table. The variable name will have the name
> +  of the table.
> +
> +  @details
> +  When create() is called you do not need to worry about
> +  opening the table. Also, the .frm file will have already been
> +  created so adjusting create_info is not necessary. You can overwrite
> +  the .frm file at this point if you wish to change the table
> +  definition, but there are no methods currently provided for doing
> +  so.
> +
> +  Called from handle.cc by ha_create_table().
> +
> +  @see
> +  ha_create_table() in handle.cc
> +*/
> +
> +int ha_cassandra::create(const char *name, TABLE *table_arg,
> +                         HA_CREATE_INFO *create_info)
> +{
> +  int res;
> +  DBUG_ENTER("ha_cassandra::create");
> +
> +  Field **pfield= table_arg->s->field;
> +  if (!((*pfield)->flags & NOT_NULL_FLAG))
> +  {
> +    my_error(ER_WRONG_COLUMN_NAME, MYF(0), "First column must be NOT NULL");
> +    DBUG_RETURN(HA_WRONG_CREATE_OPTION);
> +  }

this is unnecessary. the second check guarantees that the first column is
NOT NULL, that is, this if() is redundant

> +
> +  if (table_arg->s->keys != 1 || table_arg->s->primary_key !=0 ||
> +      table_arg->key_info[0].key_parts != 1 ||
> +      table_arg->key_info[0].key_part[0].fieldnr != 1)
> +  {
> +    my_error(ER_WRONG_COLUMN_NAME, MYF(0),
> +             "Table must have PRIMARY KEY defined over the first column");
> +    DBUG_RETURN(HA_WRONG_CREATE_OPTION);
> +  }
...
> +CASSANDRA_TYPE get_cassandra_type(const char *validator)
> +{
> +  CASSANDRA_TYPE rc;
> +  switch(validator[32])
> +  {
> +  case 'L':
> +    rc= CT_BIGINT;
> +    break;
> +  case 'I':
> +    rc= (validator[35] == '3' ? CT_INT : CT_VARINT);
> +    rc= CT_INT;
> +    break;
> +  case 'C':
> +    rc= CT_COUNTER;
> +    break;
> +  case 'F':
> +    rc= CT_FLOAT;
> +    break;
> +  case 'D':
> +    switch (validator[33])
> +    {
> +    case 'o':
> +      rc= CT_DOUBLE;
> +      break;
> +    case 'a':
> +      rc= CT_TIMESTAMP;
> +      break;
> +    case 'e':
> +      rc= CT_DECIMAL;
> +      break;
> +    default:
> +      rc= CT_BLOB;
> +      break;
> +    }
> +    break;
> +  case 'B':
> +    rc= (validator[33] == 'o' ? CT_BOOLEAN : CT_BLOB);
> +    break;
> +  case 'A':
> +    rc= CT_ASCII;
> +    break;
> +  case 'U':
> +    rc= (validator[33] == 'T' ? CT_TEXT : CT_UUID);
> +    break;
> +  default:
> +    rc= CT_BLOB;
> +  }
> +  DBUG_ASSERT(strcmp(cassandra_types[rc].name, validator) == 0);
> +  return rc;
> +}
> +
> +ColumnDataConverter *map_field_to_validator(Field *field, const char *validator_name)
> +{
> +  ColumnDataConverter *res= NULL;
> +
> +  switch(field->type()) {
> +    case MYSQL_TYPE_TINY:
> +      if (!strcmp(validator_name, validator_boolean))

why do you strcmp here, while you're only check selected characters in
get_cassandra_type() ?
you could've called get_cassandra_type() for validator_name instead
and compare enum values, instead of strings.

> +      {
> +        res= new TinyintDataConverter;
> +        break;
> +      }
> +      /* fall through: */
> +    case MYSQL_TYPE_SHORT:
> +    case MYSQL_TYPE_LONGLONG:
> +    {
> +      bool is_counter= false;
> +      if (!strcmp(validator_name, validator_bigint) ||
> +          !strcmp(validator_name, validator_timestamp) ||
> +          (is_counter= !strcmp(validator_name, validator_counter)))
> +        res= new BigintDataConverter(!is_counter);
> +      break;
> +    }
> +    case MYSQL_TYPE_FLOAT:
> +      if (!strcmp(validator_name, validator_float))
> +        res= new FloatDataConverter;
> +      break;
> +
> +    case MYSQL_TYPE_DOUBLE:
> +      if (!strcmp(validator_name, validator_double))
> +        res= new DoubleDataConverter;
> +      break;
> +
> +    case MYSQL_TYPE_TIMESTAMP:
> +      if (!strcmp(validator_name, validator_timestamp))
> +        res= new TimestampDataConverter;
> +      break;
> +
> +    case MYSQL_TYPE_STRING: // these are space padded CHAR(n) strings.
> +      if (!strcmp(validator_name, validator_uuid) &&
> +          field->real_type() == MYSQL_TYPE_STRING &&
> +          field->field_length == 36)
> +      {
> +        // UUID maps to CHAR(36), its text representation
> +        res= new UuidDataConverter;
> +        break;
> +      }
> +      /* fall through: */
> +    case MYSQL_TYPE_VAR_STRING:
> +    case MYSQL_TYPE_VARCHAR:
> +    {
> +      /*
> +        Cassandra's "varint" type is a binary-encoded arbitary-length
> +        big-endian number.
> +        - It can be mapped to VARBINARY(N), with sufficiently big N.
> +        - If the value does not fit into N bytes, it is an error. We should not
> +          truncate it, because that is just as good as returning garbage.
> +        - varint should not be mapped to BINARY(N), because BINARY(N) values
> +          are zero-padded, which will work as multiplying the value by
> +          2^k for some value of k.
> +      */
> +      if (field->type() == MYSQL_TYPE_VARCHAR &&
> +          field->binary() &&
> +          (!strcmp(validator_name, validator_varint) ||
> +           !strcmp(validator_name, validator_decimal)))
> +      {
> +        res= new StringCopyConverter(field->field_length);
> +        break;
> +      }
> +
> +      if (!strcmp(validator_name, validator_blob) ||
> +          !strcmp(validator_name, validator_ascii) ||
> +          !strcmp(validator_name, validator_text))
> +      {
> +        res= new StringCopyConverter((size_t)-1);
> +      }
> +      break;
> +    }
> +    case MYSQL_TYPE_LONG:
> +      if (!strcmp(validator_name, validator_int))
> +        res= new Int32DataConverter;
> +      break;
> +
> +    default:;
> +  }
> +  return res;
> +}
> +
> +
> +bool ha_cassandra::setup_field_converters(Field **field_arg, uint n_fields)
> +{
> +  char *col_name;
> +  int  col_name_len;
> +  char *col_type;
> +  int col_type_len;
> +  size_t ddl_fields= se->get_ddl_size();
> +  const char *default_type= se->get_default_validator();
> +  uint max_non_default_fields;
> +  DBUG_ENTER("ha_cassandra::setup_field_converters");
> +  DBUG_ASSERT(default_type);
> +
> +  DBUG_ASSERT(!field_converters);
> +  DBUG_ASSERT(dyncol_set == 0 || dyncol_set == 1);
> +
> +  /*
> +    We always should take into account that in case of using dynamic columns
> +    sql description contain one field which does not described in
> +    Cassandra DDL also key field is described separately. So that
> +    is why we use "n_fields - dyncol_set - 1" or "ddl_fields + 2".
> +  */
> +  max_non_default_fields= ddl_fields + 2 - n_fields;
> +  if (ddl_fields < (n_fields - dyncol_set - 1))
> +  {
> +    se->print_error("Some of SQL fields were not mapped to Cassandra's fields");
> +    my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
> +    DBUG_RETURN(true);
> +  }
> +
> +  /* allocate memory in one chunk */
> +  size_t memsize= sizeof(ColumnDataConverter*) * n_fields +
> +    (sizeof(LEX_STRING) + sizeof(CASSANDRA_TYPE_DEF))*
> +    (dyncol_set ? max_non_default_fields : 0);
> +  if (!(field_converters= (ColumnDataConverter**)my_malloc(memsize, MYF(0))))
> +    DBUG_RETURN(true);
> +  bzero(field_converters, memsize);
> +  n_field_converters= n_fields;
> +
> +  if (dyncol_set)
> +  {
> +    special_type_field_converters=
> +      (CASSANDRA_TYPE_DEF *)(field_converters + n_fields);
> +    special_type_field_names=
> +      ((LEX_STRING*)(special_type_field_converters + max_non_default_fields));
> +  }
> +
> +  if (dyncol_set)
> +  {
> +    if (init_dynamic_array(&dynamic_values,
> +                           sizeof(DYNAMIC_COLUMN_VALUE),
> +                           DYNCOL_USUAL, DYNCOL_DELTA))
> +      DBUG_RETURN(true);
> +    else
> +      if (init_dynamic_array(&dynamic_names,
> +                             sizeof(LEX_STRING),
> +                             DYNCOL_USUAL, DYNCOL_DELTA))
> +      {
> +        delete_dynamic(&dynamic_values);
> +        DBUG_RETURN(true);
> +      }
> +      else
> +        if (init_dynamic_string(&dynamic_rec, NULL,
> +                                DYNCOL_USUAL_REC, DYNCOL_DELTA_REC))
> +        {
> +          delete_dynamic(&dynamic_values);
> +          delete_dynamic(&dynamic_names);
> +          DBUG_RETURN(true);
> +        }
> +
> +    /* Dynamic column field has special processing */
> +    field_converters[dyncol_field]= NULL;
> +
> +    default_type_def= cassandra_types + get_cassandra_type(default_type);
> +  }
> +
> +  se->first_ddl_column();
> +  uint n_mapped= 0;
> +  while (!se->next_ddl_column(&col_name, &col_name_len, &col_type,
> +                              &col_type_len))
> +  {
> +    Field **field;
> +    uint i;
> +    /* Mapping for the 1st field is already known */
> +    for (field= field_arg + 1, i= 1; *field; field++, i++)
> +    {
> +      if ((!dyncol_set || dyncol_field != i) &&
> +          !strcmp((*field)->field_name, col_name))
> +      {
> +        n_mapped++;
> +        ColumnDataConverter **conv= field_converters + (*field)->field_index;
> +        if (!(*conv= map_field_to_validator(*field, col_type)))
> +        {
> +          se->print_error("Failed to map column %s to datatype %s",
> +                          (*field)->field_name, col_type);
> +          my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
> +          DBUG_RETURN(true);
> +        }
> +        (*conv)->field= *field;
> +      }
> +    }
> +    if (dyncol_set && !(*field)) // is needed and not found
> +    {
> +      DBUG_PRINT("info",("Field not found: %s", col_name));
> +      if (strcmp(col_type, default_type))
> +      {
> +        DBUG_PRINT("info",("Field '%s' non-default type: '%s'",
> +                           col_name, col_type));
> +        special_type_field_names[n_special_type_fields].length= col_name_len;
> +        special_type_field_names[n_special_type_fields].str= col_name;
> +        special_type_field_converters[n_special_type_fields]=
> +          cassandra_types[get_cassandra_type(col_type)];
> +        n_special_type_fields++;
> +      }
> +    }
> +  }
> +
> +  if (n_mapped != n_fields - 1 - dyncol_set)
> +  {
> +    Field *first_unmapped= NULL;
> +    /* Find the first field */
> +    for (uint i= 1; i < n_fields;i++)
> +    {
> +      if (!field_converters[i])
> +      {
> +        first_unmapped= field_arg[i];
> +        break;
> +      }
> +    }
> +    DBUG_ASSERT(first_unmapped);
> +
> +    se->print_error("Field `%s` could not be mapped to any field in Cassandra",

generally (here and everywhere) use %`s instead of `%s`

> +                    first_unmapped->field_name);
> +    my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
> +    DBUG_RETURN(true);
> +  }
> +
> +  /*
> +    Setup type conversion for row_key.
> +  */
> +  se->get_rowkey_type(&col_name, &col_type);
> +  if (col_name && strcmp(col_name, (*field_arg)->field_name))
> +  {
> +    se->print_error("PRIMARY KEY column must match Cassandra's name '%s'",
> +                    col_name);
> +    my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
> +    DBUG_RETURN(true);
> +  }
> +  if (!col_name && strcmp("rowkey", (*field_arg)->field_name))
> +  {
> +    se->print_error("target column family has no key_alias defined, "
> +                    "PRIMARY KEY column must be named 'rowkey'");
> +    my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
> +    DBUG_RETURN(true);
> +  }
> +
> +  if (col_type != NULL)
> +  {
> +    if (!(rowkey_converter= map_field_to_validator(*field_arg, col_type)))
> +    {
> +      se->print_error("Failed to map PRIMARY KEY to datatype %s", col_type);
> +      my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
> +      DBUG_RETURN(true);
> +    }
> +    rowkey_converter->field= *field_arg;
> +  }
> +  else
> +  {
> +    se->print_error("Cassandra's rowkey has no defined datatype (todo: support this)");
> +    my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
> +    DBUG_RETURN(true);
> +  }
> +
> +  DBUG_RETURN(false);
> +}
> +
> +
> +void ha_cassandra::free_field_converters()
> +{
> +  delete rowkey_converter;
> +  rowkey_converter= NULL;
> +
> +  if (dyncol_set)
> +  {
> +    delete_dynamic(&dynamic_values);
> +    delete_dynamic(&dynamic_names);
> +    dynstr_free(&dynamic_rec);
> +  }
> +  if (field_converters)
> +  {
> +    for (uint i=0; i < n_field_converters; i++)
> +      if (field_converters[i])
> +      {
> +        DBUG_ASSERT(!dyncol_set || i == dyncol_field);
> +        delete field_converters[i];
> +      }
> +    my_free(field_converters);
> +    field_converters= NULL;
> +  }
> +}
> +
> +
> +int ha_cassandra::index_init(uint idx, bool sorted)
> +{
> +  int ires;
> +  if (!se && (ires= connect_and_check_options(table)))
> +    return ires;
> +  return 0;
> +}
> +
> +void store_key_image_to_rec(Field *field, uchar *ptr, uint len);
> +
> +int ha_cassandra::index_read_map(uchar *buf, const uchar *key,
> +                                 key_part_map keypart_map,
> +                                 enum ha_rkey_function find_flag)
> +{
> +  int rc= 0;
> +  DBUG_ENTER("ha_cassandra::index_read_map");
> +
> +  if (find_flag != HA_READ_KEY_EXACT)
> +    DBUG_RETURN(HA_ERR_WRONG_COMMAND);

did you verify that the server expects HA_ERR_WRONG_COMMAND from
this method and correctly handles it?
or, perhaps, find_flag is always HA_READ_KEY_EXACT here,
because of your index_flags() ?

> +
> +  uint key_len= calculate_key_len(table, active_index, key, keypart_map);
> +  store_key_image_to_rec(table->field[0], (uchar*)key, key_len);
> +
> +  char *cass_key;
> +  int cass_key_len;
> +  my_bitmap_map *old_map;
> +
> +  old_map= dbug_tmp_use_all_columns(table, table->read_set);
> +
> +  if (rowkey_converter->mariadb_to_cassandra(&cass_key, &cass_key_len))
> +  {
> +    /* We get here when making lookups like uuid_column='not-an-uuid' */
> +    dbug_tmp_restore_column_map(table->read_set, old_map);
> +    DBUG_RETURN(HA_ERR_KEY_NOT_FOUND);
> +  }
> +
> +  dbug_tmp_restore_column_map(table->read_set, old_map);
> +
> +  bool found;
> +  if (se->get_slice(cass_key, cass_key_len, &found))
> +  {
> +    my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());

generally it's better not to use my_error() directly,
but only return an error code, and return your se->error_str()
from ha_cassandra::get_error_message()

> +    rc= HA_ERR_INTERNAL_ERROR;
> +  }
> +
> +  /* TODO: what if we're not reading all columns?? */
> +  if (!found)
> +    rc= HA_ERR_KEY_NOT_FOUND;
> +  else
> +    rc= read_cassandra_columns(false);
> +
> +  DBUG_RETURN(rc);
> +}
> +
> +
> +void ha_cassandra::print_conversion_error(const char *field_name,
> +                                          char *cass_value,
> +                                          int cass_value_len)
> +{
> +  char buf[32];
> +  char *p= cass_value;
> +  size_t i= 0;
> +  for (; (i < (int)sizeof(buf)-1) && (p < cass_value + cass_value_len); p++)

why do you cast here?

> +  {
> +    buf[i++]= map2number[(*p >> 4) & 0xF];
> +    buf[i++]= map2number[*p & 0xF];
> +  }
> +  buf[i]=0;
> +
> +  se->print_error("Unable to convert value for field `%s` from Cassandra's data"
> +                  " format. Source data is %d bytes, 0x%s%s",
> +                  field_name, cass_value_len, buf,
> +                  (i == sizeof(buf) - 1)? "..." : "");
> +  my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
> +}
> +
> +
> +void free_strings(DYNAMIC_COLUMN_VALUE *vals, uint num)
> +{
> +  for (uint i= 0; i < num; i++)
> +    if (vals[i].type == DYN_COL_STRING &&
> +        !vals[i].x.string.nonfreeable)
> +      my_free(vals[i].x.string.value.str);
> +}
> +
> +
> +CASSANDRA_TYPE_DEF * ha_cassandra::get_cassandra_field_def(char *cass_name,
> +                                                           int cass_name_len)
> +{
> +  CASSANDRA_TYPE_DEF *type= default_type_def;
> +  for(uint i= 0; i < n_special_type_fields; i++)
> +  {
> +    if (cass_name_len == (int)special_type_field_names[i].length &&
> +        memcmp(cass_name, special_type_field_names[i].str,
> +               cass_name_len) == 0)
> +    {
> +      type= special_type_field_converters + i;
> +      break;
> +    }
> +  }
> +  return type;
> +}
> +
> +int ha_cassandra::read_cassandra_columns(bool unpack_pk)
> +{
> +  char *cass_name;
> +  char *cass_value;
> +  int cass_value_len, cass_name_len;
> +  Field **field;
> +  int res= 0;
> +  ulong total_name_len= 0;
> +
> +  /*
> +    cassandra_to_mariadb() calls will use field->store(...) methods, which
> +    require that the column is in the table->write_set
> +  */
> +  my_bitmap_map *old_map;
> +  old_map= dbug_tmp_use_all_columns(table, table->write_set);
> +
> +  /* Start with all fields being NULL */
> +  for (field= table->field + 1; *field; field++)
> +    (*field)->set_null();
> +
> +  while (!se->get_next_read_column(&cass_name, &cass_name_len,
> +                                   &cass_value, &cass_value_len))
> +  {
> +    // map to our column. todo: use hash or something..
> +    bool found= 0;
> +    for (field= table->field + 1; *field; field++)
> +    {
> +      uint fieldnr= (*field)->field_index;
> +      if ((!dyncol_set || dyncol_field != fieldnr) &&
> +          !strcmp((*field)->field_name, cass_name))
> +      {
> +        found= 1;
> +        (*field)->set_notnull();
> +        if (field_converters[fieldnr]->cassandra_to_mariadb(cass_value,
> +                                                            cass_value_len))
> +        {
> +          print_conversion_error((*field)->field_name, cass_value,
> +                                 cass_value_len);
> +          res=1;
> +          goto err;
> +        }
> +        break;
> +      }
> +    }
> +    if (dyncol_set && !found)
> +    {
> +      DYNAMIC_COLUMN_VALUE val;
> +      LEX_STRING nm;
> +      CASSANDRA_TYPE_DEF *type= get_cassandra_field_def(cass_name,
> +                                                        cass_name_len);
> +      nm.str= cass_name;
> +      nm.length= cass_name_len;
> +      if (nm.length > MAX_NAME_LENGTH)
> +      {
> +        se->print_error("Unable to convert value for field `%s`"
> +                        " from Cassandra's data format. Name"
> +                        " length exceed limit of %u: '%s'",
> +                        table->field[dyncol_field]->field_name,
> +                        (uint)MAX_NAME_LENGTH, cass_name);
> +        my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
> +        res=1;
> +        goto err;
> +      }
> +      total_name_len+= cass_name_len;
> +      if (nm.length > MAX_TOTAL_NAME_LENGTH)
> +      {
> +        se->print_error("Unable to convert value for field `%s`"
> +                        " from Cassandra's data format. Sum of all names"
> +                        " length exceed limit of %lu",
> +                        table->field[dyncol_field]->field_name,
> +                        cass_name, (uint)MAX_TOTAL_NAME_LENGTH);
> +        my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
> +        res=1;
> +        goto err;
> +      }
> +
> +      if ((res= (*(type->cassandra_to_dynamic))(cass_value,
> +                                                cass_value_len, &val)) ||
> +          insert_dynamic(&dynamic_names, (uchar *) &nm) ||
> +          insert_dynamic(&dynamic_values, (uchar *) &val))
> +      {
> +        if (res)
> +        {
> +          print_conversion_error(cass_name, cass_value, cass_value_len);
> +        }
> +        free_strings((DYNAMIC_COLUMN_VALUE *)dynamic_values.buffer,
> +                     dynamic_values.elements);
> +        // EOM shouldm be already reported if happened
> +        res=1;
> +        goto err;
> +      }
> +    }
> +  }
> +
> +  dynamic_rec.length= 0;
> +  if (dyncol_set)
> +  {
> +    if (dynamic_column_create_many_internal_fmt(&dynamic_rec,
> +                                                dynamic_names.elements,
> +                                                dynamic_names.buffer,
> +                                                (DYNAMIC_COLUMN_VALUE *)
> +                                                dynamic_values.buffer,
> +                                                FALSE,
> +                                                TRUE) < 0)
> +      dynamic_rec.length= 0;
> +
> +    free_strings((DYNAMIC_COLUMN_VALUE *)dynamic_values.buffer,
> +                 dynamic_values.elements);
> +    dynamic_values.elements= dynamic_names.elements= 0;
> +  }
> +  if (dyncol_set)

why a separate if() ?

> +  {
> +    if (dynamic_rec.length == 0)
> +      table->field[dyncol_field]->set_null();
> +    else
> +    {
> +      Field_blob *blob= (Field_blob *)table->field[dyncol_field];
> +      blob->set_notnull();
> +      blob->store_length(dynamic_rec.length);
> +      *((char **)(((char *)blob->ptr) + blob->pack_length_no_ptr()))=
> +        dynamic_rec.str;
> +    }
> +  }
> +
> +  if (unpack_pk)
> +  {
> +    /* Unpack rowkey to primary key */
> +    field= table->field;
> +    (*field)->set_notnull();
> +    se->get_read_rowkey(&cass_value, &cass_value_len);
> +    if (rowkey_converter->cassandra_to_mariadb(cass_value, cass_value_len))
> +    {
> +      print_conversion_error((*field)->field_name, cass_value, cass_value_len);
> +      res=1;
> +      goto err;
> +    }
> +  }
> +
> +err:
> +  dbug_tmp_restore_column_map(table->write_set, old_map);
> +  return res;
> +}
...
> +int ha_cassandra::rnd_pos(uchar *buf, uchar *pos)
> +{
> +  int rc;
> +  DBUG_ENTER("ha_cassandra::rnd_pos");
> +
> +  int save_active_index= active_index;
> +  active_index= 0; /* The primary key */
> +  rc= index_read_map(buf, pos, key_part_map(1), HA_READ_KEY_EXACT);
> +
> +  active_index= save_active_index;

a bit nicer would be to implement index_read_map_idx
and call it from here and from index_read_map

> +
> +  DBUG_RETURN(rc);
> +}
> +
...
> +bool ha_cassandra::mrr_start_read()
> +{
> +  uint key_len;
> +
> +  my_bitmap_map *old_map;
> +  old_map= dbug_tmp_use_all_columns(table, table->read_set);
> +
> +  se->new_lookup_keys();
> +
> +  while (!(source_exhausted= mrr_funcs.next(mrr_iter, &mrr_cur_range)))
> +  {
> +    char *cass_key;
> +    int cass_key_len;
> +
> +    DBUG_ASSERT(mrr_cur_range.range_flag & EQ_RANGE);
> +
> +    uchar *key= (uchar*)mrr_cur_range.start_key.key;
> +    key_len= mrr_cur_range.start_key.length;
> +    //key_len= calculate_key_len(table, active_index, key, keypart_map); // NEED THIS??
> +    store_key_image_to_rec(table->field[0], (uchar*)key, key_len);
> +
> +    rowkey_converter->mariadb_to_cassandra(&cass_key, &cass_key_len);
> +
> +    // Primitive buffer control
> +    if (se->add_lookup_key(cass_key, cass_key_len) >
> +        THDVAR(table->in_use, multiget_batch_size))
> +      break;

I'd suggest to add a status variable to count
how many times the buffer was refilled

> +  }
> +
> +  dbug_tmp_restore_column_map(table->read_set, old_map);
> +
> +  return se->multiget_slice();
> +}
...
> +/////////////////////////////////////////////////////////////////////////////
> +// Dummy implementations start
> +/////////////////////////////////////////////////////////////////////////////
> +
> +
> +int ha_cassandra::index_next(uchar *buf)

Why did you do all these dummy methods?

> +{
> +  int rc;
> +  DBUG_ENTER("ha_cassandra::index_next");
> +  rc= HA_ERR_WRONG_COMMAND;
> +  DBUG_RETURN(rc);
> +}
> +
> +
> +int ha_cassandra::index_prev(uchar *buf)
> +{
> +  int rc;
> +  DBUG_ENTER("ha_cassandra::index_prev");
> +  rc= HA_ERR_WRONG_COMMAND;
> +  DBUG_RETURN(rc);
> +}
> +
> +
> +int ha_cassandra::index_first(uchar *buf)
> +{
> +  int rc;
> +  DBUG_ENTER("ha_cassandra::index_first");
> +  rc= HA_ERR_WRONG_COMMAND;
> +  DBUG_RETURN(rc);
> +}
> +
> +int ha_cassandra::index_last(uchar *buf)
> +{
> +  int rc;
> +  DBUG_ENTER("ha_cassandra::index_last");
> +  rc= HA_ERR_WRONG_COMMAND;
> +  DBUG_RETURN(rc);
> +}
> +
> +
> +ha_rows ha_cassandra::records_in_range(uint inx, key_range *min_key,
> +                                     key_range *max_key)
> +{
> +  DBUG_ENTER("ha_cassandra::records_in_range");
> +  //DBUG_RETURN(10);                         // low number to force index usage
> +  DBUG_RETURN(HA_POS_ERROR);
> +}
> +
> +
> +class Column_name_enumerator_impl : public Column_name_enumerator
> +{
> +  ha_cassandra *obj;
> +  uint idx;
> +public:
> +  Column_name_enumerator_impl(ha_cassandra *obj_arg) : obj(obj_arg), idx(1) {}
> +  const char* get_next_name()
> +  {
> +    if (idx == obj->table->s->fields)
> +      return NULL;
> +    else
> +      return obj->table->field[idx++]->field_name;
> +  }
> +};
> +
> +
> +int ha_cassandra::update_row(const uchar *old_data, uchar *new_data)
> +{
> +  DYNAMIC_ARRAY oldvals, oldnames, vals, names;
> +  String oldvalcol, valcol;
> +  char *oldfree_names= NULL, *free_names= NULL;
> +  my_bitmap_map *old_map;
> +  int res;
> +  DBUG_ENTER("ha_cassandra::update_row");
> +  /* Currently, it is guaranteed that new_data == table->record[0] */
> +  DBUG_ASSERT(new_data == table->record[0]);
> +  /* For now, just rewrite the full record */
> +  se->clear_insert_buffer();
> +
> +  old_map= dbug_tmp_use_all_columns(table, table->read_set);
> +
> +  char *old_key;
> +  int old_key_len;
> +  se->get_read_rowkey(&old_key, &old_key_len);
> +
> +  /* Get the key we're going to write */
> +  char *new_key;
> +  int new_key_len;
> +  if (rowkey_converter->mariadb_to_cassandra(&new_key, &new_key_len))
> +  {
> +    my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0),
> +             rowkey_converter->field->field_name, insert_lineno);
> +    dbug_tmp_restore_column_map(table->read_set, old_map);
> +    DBUG_RETURN(HA_ERR_AUTOINC_ERANGE);
> +  }
> +
> +  /*
> +    Compare it to the key we've read. For all types that Cassandra supports,
> +    binary byte-wise comparison can be used
> +  */
> +  bool new_primary_key;
> +  if (new_key_len != old_key_len || memcmp(old_key, new_key, new_key_len))
> +    new_primary_key= true;
> +  else
> +    new_primary_key= false;
> +
> +  if (dyncol_set)
> +  {
> +    Field *field= table->field[dyncol_field];
> +    /* move to get old_data */
> +    my_ptrdiff_t diff;
> +    diff= (my_ptrdiff_t) (old_data - new_data);
> +    field->move_field_offset(diff);      // Points now at old_data
> +    if ((res= read_dyncol(&oldvals, &oldnames, &oldvalcol, &oldfree_names)))
> +      DBUG_RETURN(res);
> +    field->move_field_offset(-diff);     // back to new_data
> +    if ((res= read_dyncol(&vals, &names, &valcol, &free_names)))
> +    {
> +      free_dynamic_row(&oldnames, &oldvals, oldfree_names);
> +      DBUG_RETURN(res);
> +    }
> +  }
> +
> +  if (new_primary_key)
> +  {
> +    /*
> +      Primary key value changed. This is essentially a DELETE + INSERT.
> +      Add a DELETE operation into the batch
> +    */
> +    Column_name_enumerator_impl name_enumerator(this);
> +    se->add_row_deletion(old_key, old_key_len, &name_enumerator,
> +                         (LEX_STRING *)oldnames.buffer,
> +                         (dyncol_set ? oldnames.elements : 0));
> +    oldnames.elements= oldvals.elements= 0; // they will be deleted
> +  }
> +
> +  se->start_row_insert(new_key, new_key_len);
> +
> +  /* Convert other fields */
> +  for (uint i= 1; i < table->s->fields; i++)

I'd probably would've simply done delete+insert in all cases :)

but as you've went to the troubles of updating column by column,
it really make little sense to rewrite columns that didn't change.
could you check table->write_set here and memcmp with the old
column value before updating?

> +  {
> +    char *cass_data;
> +    int cass_data_len;
> +    if (dyncol_set && dyncol_field == i)
> +    {
> +      DBUG_ASSERT(field_converters[i] == NULL);
> +      if ((res= write_dynamic_row(&vals, &names)))
> +        goto err;
> +    }
> +    else
> +    {
> +      if (field_converters[i]->mariadb_to_cassandra(&cass_data, &cass_data_len))
> +      {
> +        my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0),
> +                 field_converters[i]->field->field_name, insert_lineno);
> +        dbug_tmp_restore_column_map(table->read_set, old_map);
> +        DBUG_RETURN(HA_ERR_AUTOINC_ERANGE);
> +      }
> +      se->add_insert_column(field_converters[i]->field->field_name, 0,
> +                            cass_data, cass_data_len);
> +    }
> +  }
> +  if (dyncol_set)
> +  {
> +    /* find removed fields */
> +    uint i= 0, j= 0;
> +    LEX_STRING *onames= (LEX_STRING *)oldnames.buffer;
> +    LEX_STRING *nnames= (LEX_STRING *)names.buffer;
> +    /* both array are sorted */
> +    for(; i < oldnames.elements; i++)
> +    {
> +      int scmp= 0;
> +      while (j < names.elements &&
> +             (nnames[j].length < onames[i].length ||
> +             (nnames[j].length == onames[i].length &&
> +              (scmp= memcmp(nnames[j].str, onames[i].str,
> +                            onames[i].length)) < 0)))
> +        j++;
> +      if (j < names.elements &&
> +          nnames[j].length == onames[i].length &&
> +          scmp == 0)
> +        j++;
> +      else
> +        se->add_insert_delete_column(onames[i].str, onames[i].length);
> +    }
> +  }
> +
> +  dbug_tmp_restore_column_map(table->read_set, old_map);
> +
> +  res= se->do_insert();
> +
> +  if (res)
> +    my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
> +
> +err:
> +  if (dyncol_set)
> +  {
> +    free_dynamic_row(&oldnames, &oldvals, oldfree_names);
> +    free_dynamic_row(&names, &vals, free_names);
> +  }
> +
> +  DBUG_RETURN(res? HA_ERR_INTERNAL_ERROR: 0);
> +}
> +
> +
> +int ha_cassandra::extra(enum ha_extra_function operation)
> +{
> +  DBUG_ENTER("ha_cassandra::extra");
> +  DBUG_RETURN(0);
> +}

please. why do you like dummy methods that much? :)

> +
> +
> +/* The following function was copied from ha_blackhole::store_lock: */
> +THR_LOCK_DATA **ha_cassandra::store_lock(THD *thd,
> +                                         THR_LOCK_DATA **to,
> +                                         enum thr_lock_type lock_type)
> +{
> +  DBUG_ENTER("ha_cassandra::store_lock");
> +  if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
> +  {
> +    /*
> +      Here is where we get into the guts of a row level lock.
> +      If TL_UNLOCK is set
> +      If we are not doing a LOCK TABLE or DISCARD/IMPORT
> +      TABLESPACE, then allow multiple writers
> +    */
> +
> +    if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
> +         lock_type <= TL_WRITE) && !thd_in_lock_tables(thd)
> +        && !thd_tablespace_op(thd))

1. tablespace op in cassandra? really? too much copy-paste is confusing.
(here and in the comments too)
2. did you test LOCK TABLES?

> +      lock_type = TL_WRITE_ALLOW_WRITE;

that makes all changes to cassanrda immediately visible
by concurrently running threads. okay, if that's intentional,
but it needs to be documented, as it not the SQL semantics.

> +
> +    /*
> +      In queries of type INSERT INTO t1 SELECT ... FROM t2 ...
> +      MySQL would use the lock TL_READ_NO_INSERT on t2, and that
> +      would conflict with TL_WRITE_ALLOW_WRITE, blocking all inserts
> +      to t2. Convert the lock to a normal read lock to allow
> +      concurrent inserts to t2.
> +    */
> +
> +    if (lock_type == TL_READ_NO_INSERT && !thd_in_lock_tables(thd))
> +      lock_type = TL_READ;

this also breaks SBR for INSERT ... SELECT. same as above - okay if
intentional, but should be thouroughly documented.

> +
> +    lock.type= lock_type;
> +  }
> +  *to++= &lock;
> +  DBUG_RETURN(to);
> +}
> +
> +
> +int ha_cassandra::external_lock(THD *thd, int lock_type)
> +{
> +  DBUG_ENTER("ha_cassandra::external_lock");
> +  DBUG_RETURN(0);
> +}
> +
> +int ha_cassandra::delete_table(const char *name)
> +{
> +  DBUG_ENTER("ha_cassandra::delete_table");
> +  /*
> +    Cassandra table is just a view. Dropping it doesn't affect the underlying
> +    column family.
> +  */
> +  DBUG_RETURN(0);
> +}

more dummies...

> +
> +
> +/**
> +  check_if_incompatible_data() called if ALTER TABLE can't detect otherwise
> +  if new and old definition are compatible
> +
> +  @details If there are no other explicit signs like changed number of
> +  fields this function will be called by compare_tables()
> +  (sql/sql_tables.cc) to decide should we rewrite whole table or only .frm
> +  file.
> +
> +*/
> +
> +bool ha_cassandra::check_if_incompatible_data(HA_CREATE_INFO *info,
> +                                            uint table_changes)
> +{
> +  DBUG_ENTER("ha_cassandra::check_if_incompatible_data");
> +  /* Checked, we intend to have this empty for Cassandra SE. */
> +  DBUG_RETURN(COMPATIBLE_DATA_YES);
> +}
> +
> +
> +/////////////////////////////////////////////////////////////////////////////
> +// Dummy implementations end
> +/////////////////////////////////////////////////////////////////////////////

::update_row and ::store_lock are hardly dummies :)

> +
> +static int show_cassandra_vars(THD *thd, SHOW_VAR *var, char *buff)
> +{
> +  cassandra_counters_copy= cassandra_counters;
> +
> +  var->type= SHOW_ARRAY;
> +  var->value= (char *) &cassandra_status_variables;

what's the point? If you don't do anything in this function,
you can just as easily list all status variables below
in the func_status[] array.

> +  return 0;
> +}
> +
> +
> +struct st_mysql_storage_engine cassandra_storage_engine=
> +{ MYSQL_HANDLERTON_INTERFACE_VERSION };
> +
> +static struct st_mysql_show_var func_status[]=
> +{
> +  {"Cassandra",  (char *)show_cassandra_vars, SHOW_FUNC},
> +  {0,0,SHOW_UNDEF}
> +};
> +
> +maria_declare_plugin(cassandra)
> +{
> +  MYSQL_STORAGE_ENGINE_PLUGIN,
> +  &cassandra_storage_engine,
> +  "CASSANDRA",
> +  "Monty Program Ab",
> +  "Cassandra storage engine",
> +  PLUGIN_LICENSE_GPL,
> +  cassandra_init_func,                            /* Plugin Init */
> +  cassandra_done_func,                            /* Plugin Deinit */
> +  0x0001,                                       /* version number (0.1) */
> +  func_status,                                  /* status variables */
> +  cassandra_system_variables,                     /* system variables */
> +  "0.1",                                        /* string version */
> +  MariaDB_PLUGIN_MATURITY_EXPERIMENTAL          /* maturity */
> +}
> +maria_declare_plugin_end;

Regards,
Sergei


Follow ups