maria-developers team mailing list archive
-
maria-developers team
-
Mailing list archive
-
Message #04993
MDEV-3792: review of the handler part of the cassandra engine
Hi, Sergey,
The handler is quite ok. I've only had few mostly cosmetical comments.
> === modified file 'cmake/build_configurations/mysql_release.cmake'
> --- cmake/build_configurations/mysql_release.cmake 2012-06-06 12:15:29 +0000
> +++ cmake/build_configurations/mysql_release.cmake 2012-11-12 15:11:23 +0000
> @@ -46,6 +46,8 @@ SET(FEATURE_SET_large 5)
> SET(FEATURE_SET_xlarge 6)
> SET(FEATURE_SET_community 7)
>
> +SET(WITH_CASSANDRA_STORAGE_ENGINE ON)
> +
This probably should be reverted before pushing into main, right?
> IF(FEATURE_SET)
> STRING(TOLOWER ${FEATURE_SET} feature_set)
> SET(num ${FEATURE_SET_${feature_set}})
>
> === modified file 'sql/sql_join_cache.cc'
> --- sql/sql_join_cache.cc 2012-03-24 17:21:22 +0000
> +++ sql/sql_join_cache.cc 2012-11-12 15:11:23 +0000
> @@ -4543,7 +4546,7 @@ bool JOIN_CACHE_BKAH::prepare_look_for_m
> {
> last_matching_rec_ref_ptr= next_matching_rec_ref_ptr= 0;
> if (no_association &&
> - (curr_matching_chain= get_matching_chain_by_join_key()))
> + !(curr_matching_chain= get_matching_chain_by_join_key())) //psergey: added '!'
Is that something that should be pushed into the main branch or
it's a temporary hack for cassandra, that should be reverted?
> return 1;
> last_matching_rec_ref_ptr= get_next_rec_ref(curr_matching_chain);
> return 0;
>
> === added file 'storage/cassandra/CMakeLists.txt'
> --- storage/cassandra/CMakeLists.txt 1970-01-01 00:00:00 +0000
> +++ storage/cassandra/CMakeLists.txt 2012-11-12 15:11:23 +0000
> @@ -0,0 +1,25 @@
> +
> +SET(cassandra_sources
> + ha_cassandra.cc
> + ha_cassandra.h
> + cassandra_se.h
> + cassandra_se.cc
> + gen-cpp/Cassandra.cpp
> + gen-cpp/cassandra_types.h
> + gen-cpp/cassandra_types.cpp
> + gen-cpp/cassandra_constants.h
> + gen-cpp/cassandra_constants.cpp
> + gen-cpp/Cassandra.h)
> +
> +#INCLUDE_DIRECTORIES(BEFORE ${Boost_INCLUDE_DIRS})
> +
> +#INCLUDE_DIRECTORIES(AFTER /usr/local/include/thrift)
> +INCLUDE_DIRECTORIES(AFTER /home/buildbot/build/thrift-inst/include/thrift/)
this needs to be fixed before pushing into the main tree
I suppose, you'll need to detect here if thrift is available
and disable the engine otherwise
> +
> +#
> +STRING(REPLACE "-fno-exceptions" "" CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS})
> +STRING(REPLACE "-fno-implicit-templates" "" CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS})
> +#LINK_DIRECTORIES(/home/psergey/cassandra/thrift/lib)
> +
> +MYSQL_ADD_PLUGIN(cassandra ${cassandra_sources} STORAGE_ENGINE LINK_LIBRARIES thrift)
> +# was: STORAGE_ENGINE MANDATORY
>
> === added file 'storage/cassandra/cassandra_se.cc'
> --- storage/cassandra/cassandra_se.cc 1970-01-01 00:00:00 +0000
> +++ storage/cassandra/cassandra_se.cc 2012-11-12 15:11:23 +0000
> @@ -0,0 +1,809 @@
> +
> +// Cassandra includes:
> +#include <inttypes.h>
> +#include <netinet/in.h>
> +#include <sys/time.h>
> +#include <stdio.h>
> +#include <stdarg.h>
> +
> +#include "Thrift.h"
> +#include "transport/TSocket.h"
> +#include "transport/TTransport.h"
> +#include "transport/TBufferTransports.h"
> +#include "protocol/TProtocol.h"
> +#include "protocol/TBinaryProtocol.h"
> +#include "gen-cpp/Cassandra.h"
> +// cassandra includes end
> +
> +#include "cassandra_se.h"
> +
> +struct st_mysql_lex_string
> +{
> + char *str;
> + size_t length;
> +};
> +
> +using namespace std;
> +using namespace apache::thrift;
> +using namespace apache::thrift::transport;
> +using namespace apache::thrift::protocol;
> +using namespace org::apache::cassandra;
> +
> +
> +void Cassandra_se_interface::print_error(const char *format, ...)
> +{
> + va_list ap;
> + va_start(ap, format);
> + // it's not a problem if output was truncated
> + vsnprintf(err_buffer, sizeof(err_buffer), format, ap);
my_vsnprintf please
> + va_end(ap);
> +}
> +
...
> === added file 'storage/cassandra/ha_cassandra.h'
> --- storage/cassandra/ha_cassandra.h 1970-01-01 00:00:00 +0000
> +++ storage/cassandra/ha_cassandra.h 2012-11-12 15:11:23 +0000
> @@ -0,0 +1,328 @@
> +/*
> + Copyright (c) 2012, Monty Program Ab
> +
> + This program is free software; you can redistribute it and/or modify
> + it under the terms of the GNU General Public License as published by
> + the Free Software Foundation; version 2 of the License.
> +
> + This program is distributed in the hope that it will be useful,
> + but WITHOUT ANY WARRANTY; without even the implied warranty of
> + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> + GNU General Public License for more details.
> +
> + You should have received a copy of the GNU General Public License
> + along with this program; if not, write to the Free Software
> + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
> +#ifdef USE_PRAGMA_INTERFACE
> +#pragma interface /* gcc class implementation */
> +#endif
> +
> +
> +#include "my_global.h" /* ulonglong */
> +#include "thr_lock.h" /* THR_LOCK, THR_LOCK_DATA */
> +#include "handler.h" /* handler */
> +#include "my_base.h" /* ha_rows */
> +
> +#include "cassandra_se.h"
> +
> +/** @brief
> + CASSANDRA_SHARE is a structure that will be shared among all open handlers.
> + This example implements the minimum of what you will probably need.
> +*/
> +typedef struct st_cassandra_share {
> + char *table_name;
> + uint table_name_length,use_count;
> + mysql_mutex_t mutex;
> + THR_LOCK lock;
> +} CASSANDRA_SHARE;
> +
> +class ColumnDataConverter;
> +
> +struct ha_table_option_struct;
> +
> +
> +struct st_dynamic_column_value;
> +
> +typedef bool (* CAS2DYN_CONVERTER)(const char *cass_data,
> + int cass_data_len,
> + struct st_dynamic_column_value *value);
> +typedef bool (* DYN2CAS_CONVERTER)(struct st_dynamic_column_value *value,
> + char **cass_data,
> + int *cass_data_len,
> + void *buf, void **freemem);
> +struct cassandra_type_def
> +{
> + const char *name;
> + CAS2DYN_CONVERTER cassandra_to_dynamic;
> + DYN2CAS_CONVERTER dynamic_to_cassandra;
> +};
> +
> +typedef struct cassandra_type_def CASSANDRA_TYPE_DEF;
> +
> +enum cassandtra_type_enum {CT_BIGINT, CT_INT, CT_COUNTER, CT_FLOAT, CT_DOUBLE,
> + CT_BLOB, CT_ASCII, CT_TEXT, CT_TIMESTAMP, CT_UUID, CT_BOOLEAN, CT_VARINT,
> + CT_DECIMAL};
> +
> +typedef enum cassandtra_type_enum CASSANDRA_TYPE;
> +
> +
> +
> +/** @brief
> + Class definition for the storage engine
> +*/
> +class ha_cassandra: public handler
> +{
> + friend class Column_name_enumerator_impl;
> + THR_LOCK_DATA lock; ///< MySQL lock
> + CASSANDRA_SHARE *share; ///< Shared lock info
> +
> + Cassandra_se_interface *se;
> +
> + /* description of static part of the table definition */
> + ColumnDataConverter **field_converters;
> + uint n_field_converters;
> +
> + CASSANDRA_TYPE_DEF *default_type_def;
> + /* description of dynamic columns part */
> + CASSANDRA_TYPE_DEF *special_type_field_converters;
> + LEX_STRING *special_type_field_names;
> + uint n_special_type_fields;
> + DYNAMIC_ARRAY dynamic_values, dynamic_names;
> + DYNAMIC_STRING dynamic_rec;
> +
> + ColumnDataConverter *rowkey_converter;
> +
> + bool setup_field_converters(Field **field, uint n_fields);
> + void free_field_converters();
> +
> + int read_cassandra_columns(bool unpack_pk);
> + int check_table_options(struct ha_table_option_struct* options);
> +
> + bool doing_insert_batch;
> + ha_rows insert_rows_batched;
> +
> + uint dyncol_field;
> + bool dyncol_set;
> +
> + /* Used to produce 'wrong column %s at row %lu' warnings */
> + ha_rows insert_lineno;
> + void print_conversion_error(const char *field_name,
> + char *cass_value, int cass_value_len);
> + int connect_and_check_options(TABLE *table_arg);
> +public:
> + ha_cassandra(handlerton *hton, TABLE_SHARE *table_arg);
> + ~ha_cassandra()
> + {
> + free_field_converters();
> + delete se;
> + }
> +
> + /** @brief
> + The name that will be used for display purposes.
> + */
> + const char *table_type() const { return "CASSANDRA"; }
> +
> + /** @brief
> + The name of the index type that will be used for display.
> + Don't implement this method unless you really have indexes.
> + */
> + const char *index_type(uint inx) { return "HASH"; }
> +
> + /** @brief
> + The file extensions.
> + */
> + const char **bas_ext() const;
> +
> + /** @brief
> + This is a list of flags that indicate what functionality the storage engine
> + implements. The current table flags are documented in handler.h
> + */
> + ulonglong table_flags() const
> + {
> + /*
> + HA_BINLOG_STMT_CAPABLE
> + We are saying that this engine is just statement capable to have
> + an engine that can only handle statement-based logging. This is
> + used in testing.
> + HA_REC_NOT_IN_SEQ
> + If we don't set it, filesort crashes, because it assumes rowids are
> + 1..8 byte numbers
> + */
> + return HA_BINLOG_STMT_CAPABLE |
> + HA_REC_NOT_IN_SEQ;
HA_NO_TRANSACTIONS is unset. Is this engine transactional?
HA_PARTIAL_COLUMN_READ is unset. Do you always work with all columns, ignoring read_set and write_set?
HA_TABLE_SCAN_ON_INDEX is unset. Do you store the data MyISAM-style, index and data in separate files?
HA_FAST_KEY_READ is unset. is reading keys in order faster in cassandra, than random reads?
HA_REQUIRE_PRIMARY_KEY is unset. but a primary key is required.
HA_PRIMARY_KEY_IN_READ_INDEX is unset. but as far as I see, primary key columns are always returned (and needed for position())
HA_PRIMARY_KEY_REQUIRED_FOR_POSITION is unset, but a primary key is required for position()
HA_NO_AUTO_INCREMENT is unset. is auto-increment supported?
> +
> + }
> +
> + /** @brief
> + This is a bitmap of flags that indicates how the storage engine
> + implements indexes. The current index flags are documented in
> + handler.h. If you do not implement indexes, just return zero here.
> +
> + @details
> + part is the key part to check. First key part is 0.
> + If all_parts is set, MySQL wants to know the flags for the combined
> + index, up to and including 'part'.
> + */
> + ulong index_flags(uint inx, uint part, bool all_parts) const
> + {
> + return 0;
> + }
> +
> + /** @brief
> + unireg.cc will call max_supported_record_length(), max_supported_keys(),
> + max_supported_key_parts(), uint max_supported_key_length()
> + to make sure that the storage engine can handle the data it is about to
> + send. Return *real* limits of your storage engine here; MySQL will do
> + min(your_limits, MySQL_limits) automatically.
> + */
> + uint max_supported_record_length() const { return HA_MAX_REC_LENGTH; }
> +
> + /* Support only one Primary Key, for now */
> + uint max_supported_keys() const { return 1; }
> + uint max_supported_key_parts() const { return 1; }
> +
> + /** @brief
> + unireg.cc will call this to make sure that the storage engine can handle
> + the data it is about to send. Return *real* limits of your storage engine
> + here; MySQL will do min(your_limits, MySQL_limits) automatically.
> +
> + @details
> + There is no need to implement ..._key_... methods if your engine doesn't
> + support indexes.
> + */
> + uint max_supported_key_length() const { return 16*1024; /* just to return something*/ }
> +
> + int index_init(uint idx, bool sorted);
> +
> + int index_read_map(uchar * buf, const uchar * key,
> + key_part_map keypart_map,
> + enum ha_rkey_function find_flag);
> +
> + /** @brief
> + Called in test_quick_select to determine if indexes should be used.
> + */
> + virtual double scan_time() { return (double) (stats.records+stats.deleted) / 20.0+10; }
does that make any sense? (I understand that it's copy-paste)
stats.deleted, for example, is never set.
> +
> + /** @brief
> + This method will never be called if you do not implement indexes.
> + */
> + virtual double read_time(uint, uint, ha_rows rows)
> + { return (double) rows / 20.0+1; }
same question
> +
> + virtual void start_bulk_insert(ha_rows rows);
> + virtual int end_bulk_insert();
> +
> + virtual int reset();
> +
> +
> + int multi_range_read_init(RANGE_SEQ_IF *seq, void *seq_init_param,
> + uint n_ranges, uint mode, HANDLER_BUFFER *buf);
> + int multi_range_read_next(range_id_t *range_info);
> + ha_rows multi_range_read_info_const(uint keyno, RANGE_SEQ_IF *seq,
> + void *seq_init_param,
> + uint n_ranges, uint *bufsz,
> + uint *flags, COST_VECT *cost);
> + ha_rows multi_range_read_info(uint keyno, uint n_ranges, uint keys,
> + uint key_parts, uint *bufsz,
> + uint *flags, COST_VECT *cost);
> + int multi_range_read_explain_info(uint mrr_mode, char *str, size_t size);
> +
> +private:
> + bool source_exhausted;
> + bool mrr_start_read();
> + int check_field_options(Field **fields);
> + int read_dyncol(DYNAMIC_ARRAY *vals, DYNAMIC_ARRAY *names,
> + String *valcol, char **freenames);
> + int write_dynamic_row(DYNAMIC_ARRAY *names, DYNAMIC_ARRAY *vals);
> + void static free_dynamic_row(DYNAMIC_ARRAY *vals, DYNAMIC_ARRAY *names,
> + char *free_names);
> + CASSANDRA_TYPE_DEF * get_cassandra_field_def(char *cass_name,
> + int cass_name_length);
> +public:
> +
> + /*
> + Everything below are methods that we implement in ha_example.cc.
may be it'd be better to remove the references to ha_example.cc (and example
in general) before pushing this into the main tree?
> +
> + Most of these methods are not obligatory, skip them and
> + MySQL will treat them as not implemented
> + */
> + /** @brief
> + We implement this in ha_example.cc; it's a required method.
> + */
> + int open(const char *name, int mode, uint test_if_locked); // required
> +
> + /** @brief
> + We implement this in ha_example.cc; it's a required method.
> + */
> + int close(void); // required
> +
> + /** @brief
> + We implement this in ha_example.cc. It's not an obligatory method;
> + skip it and and MySQL will treat it as not implemented.
> + */
> + int write_row(uchar *buf);
> +
> + /** @brief
> + We implement this in ha_example.cc. It's not an obligatory method;
> + skip it and and MySQL will treat it as not implemented.
> + */
> + int update_row(const uchar *old_data, uchar *new_data);
> +
> + /** @brief
> + We implement this in ha_example.cc. It's not an obligatory method;
> + skip it and and MySQL will treat it as not implemented.
> + */
> + int delete_row(const uchar *buf);
> +
> + /** @brief
> + We implement this in ha_example.cc. It's not an obligatory method;
> + skip it and and MySQL will treat it as not implemented.
> + */
> + int index_next(uchar *buf);
> +
> + /** @brief
> + We implement this in ha_example.cc. It's not an obligatory method;
> + skip it and and MySQL will treat it as not implemented.
> + */
> + int index_prev(uchar *buf);
> +
> + /** @brief
> + We implement this in ha_example.cc. It's not an obligatory method;
> + skip it and and MySQL will treat it as not implemented.
> + */
> + int index_first(uchar *buf);
why to have unimplemented methods here?
I'd rather remove them.
> +
> + /** @brief
> + We implement this in ha_example.cc. It's not an obligatory method;
> + skip it and and MySQL will treat it as not implemented.
> + */
> + int index_last(uchar *buf);
> +
> + /** @brief
> + Unlike index_init(), rnd_init() can be called two consecutive times
> + without rnd_end() in between (it only makes sense if scan=1). In this
> + case, the second call should prepare for the new table scan (e.g if
> + rnd_init() allocates the cursor, the second call should position the
> + cursor to the start of the table; no need to deallocate and allocate
> + it again. This is a required method.
> + */
> + int rnd_init(bool scan); //required
> + int rnd_end();
> + int rnd_next(uchar *buf); ///< required
> + int rnd_pos(uchar *buf, uchar *pos); ///< required
> + void position(const uchar *record); ///< required
> + int info(uint); ///< required
> + int extra(enum ha_extra_function operation);
> + int external_lock(THD *thd, int lock_type); ///< required
> + int delete_all_rows(void);
> + ha_rows records_in_range(uint inx, key_range *min_key,
> + key_range *max_key);
> + int delete_table(const char *from);
> + int create(const char *name, TABLE *form,
> + HA_CREATE_INFO *create_info); ///< required
> + bool check_if_incompatible_data(HA_CREATE_INFO *info,
> + uint table_changes);
> +
> + THR_LOCK_DATA **store_lock(THD *thd, THR_LOCK_DATA **to,
> + enum thr_lock_type lock_type); ///< required
> +};
>
> === added file 'storage/cassandra/ha_cassandra.cc'
> --- storage/cassandra/ha_cassandra.cc 1970-01-01 00:00:00 +0000
> +++ storage/cassandra/ha_cassandra.cc 2012-11-12 15:11:23 +0000
> @@ -0,0 +1,2674 @@
> +/*
> + Copyright (c) 2012, Monty Program Ab
> +
> + This program is free software; you can redistribute it and/or modify
> + it under the terms of the GNU General Public License as published by
> + the Free Software Foundation; version 2 of the License.
> +
> + This program is distributed in the hope that it will be useful,
> + but WITHOUT ANY WARRANTY; without even the implied warranty of
> + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> + GNU General Public License for more details.
> +
> + You should have received a copy of the GNU General Public License
> + along with this program; if not, write to the Free Software
> + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
> +
> +#ifdef USE_PRAGMA_IMPLEMENTATION
> +#pragma implementation // gcc: Class implementation
> +#endif
> +
> +#include <mysql/plugin.h>
> +#include "ha_cassandra.h"
> +#include "sql_class.h"
> +
> +#define DYNCOL_USUAL 20
> +#define DYNCOL_DELTA 100
> +#define DYNCOL_USUAL_REC 1024
> +#define DYNCOL_DELTA_REC 1024
> +
> +static handler *cassandra_create_handler(handlerton *hton,
> + TABLE_SHARE *table,
> + MEM_ROOT *mem_root);
> +
> +extern int dynamic_column_error_message(enum_dyncol_func_result rc);
> +
> +handlerton *cassandra_hton;
> +
> +
> +/*
> + Hash used to track the number of open tables; variable for example share
> + methods
> +*/
> +static HASH cassandra_open_tables;
> +
> +/* The mutex used to init the hash; variable for example share methods */
> +mysql_mutex_t cassandra_mutex;
> +
> +
> +/**
> + Structure for CREATE TABLE options (table options).
> + It needs to be called ha_table_option_struct.
> +
> + The option values can be specified in the CREATE TABLE at the end:
> + CREATE TABLE ( ... ) *here*
> +*/
> +
> +struct ha_table_option_struct
> +{
> + const char *thrift_host;
> + int thrift_port;
> + const char *keyspace;
> + const char *column_family;
> +};
> +
> +
> +ha_create_table_option cassandra_table_option_list[]=
> +{
> + /*
> + one option that takes an arbitrary string
> + */
> + HA_TOPTION_STRING("thrift_host", thrift_host),
> + HA_TOPTION_NUMBER("thrift_port", thrift_port, 9160, 1, 65535, 0),
> + HA_TOPTION_STRING("keyspace", keyspace),
> + HA_TOPTION_STRING("column_family", column_family),
> + HA_TOPTION_END
> +};
> +
> +/**
> + Structure for CREATE TABLE options (field options).
> +*/
> +
> +struct ha_field_option_struct
> +{
> + bool dyncol_field;
> +};
> +
> +ha_create_table_option cassandra_field_option_list[]=
> +{
> + /*
> + Collect all other columns as dynamic here,
> + the valid values are YES/NO, ON/OFF, 1/0.
> + The default is 0, that is true, yes, on.
really? looks like a typo. 0 is false, 1 is true.
> + */
> + HA_FOPTION_BOOL("DYNAMIC_COLUMN_STORAGE", dyncol_field, 0),
> + HA_FOPTION_END
> +};
> +
> +static MYSQL_THDVAR_ULONG(insert_batch_size, PLUGIN_VAR_RQCMDARG,
> + "Number of rows in an INSERT batch",
> + NULL, NULL, /*default*/ 100, /*min*/ 1, /*max*/ 1024*1024*1024, 0);
> +
> +static MYSQL_THDVAR_ULONG(multiget_batch_size, PLUGIN_VAR_RQCMDARG,
> + "Number of rows in a multiget(MRR) batch",
> + NULL, NULL, /*default*/ 100, /*min*/ 1, /*max*/ 1024*1024*1024, 0);
> +
> +static MYSQL_THDVAR_ULONG(rnd_batch_size, PLUGIN_VAR_RQCMDARG,
> + "Number of rows in an rnd_read (full scan) batch",
> + NULL, NULL, /*default*/ 10*1000, /*min*/ 1, /*max*/ 1024*1024*1024, 0);
> +
> +static MYSQL_THDVAR_ULONG(failure_retries, PLUGIN_VAR_RQCMDARG,
> + "Number of times to retry Cassandra calls that failed due to timeouts or "
> + "network communication problems. The default, 0, means not to retry.",
> + NULL, NULL, /*default*/ 0, /*min*/ 0, /*max*/ 1024*1024*1024, 0);
> +
> +/* These match values in enum_cassandra_consistency_level */
> +const char *cassandra_consistency_level[] =
> +{
> + "ONE",
> + "QUORUM",
> + "LOCAL_QUORUM",
> + "EACH_QUORUM",
> + "ALL",
> + "ANY",
> + "TWO",
> + "THREE",
> + NullS
> +};
> +
> +TYPELIB cassandra_consistency_level_typelib= {
> + array_elements(cassandra_consistency_level) - 1, "",
> + cassandra_consistency_level, NULL
> +};
> +
> +
> +static MYSQL_THDVAR_ENUM(write_consistency, PLUGIN_VAR_RQCMDARG,
> + "Cassandra consistency level to use for write operations", NULL, NULL,
> + ONE, &cassandra_consistency_level_typelib);
> +
> +static MYSQL_THDVAR_ENUM(read_consistency, PLUGIN_VAR_RQCMDARG,
> + "Cassandra consistency level to use for read operations", NULL, NULL,
> + ONE, &cassandra_consistency_level_typelib);
> +
> +
> +mysql_mutex_t cassandra_default_host_lock;
> +static char* cassandra_default_thrift_host = NULL;
> +static char cassandra_default_host_buf[256]="";
> +
> +static void
> +cassandra_default_thrift_host_update(THD *thd,
> + struct st_mysql_sys_var* var,
> + void* var_ptr, /*!< out: where the
> + formal string goes */
> + const void* save) /*!< in: immediate result
> + from check function */
> +{
> + const char *new_host= *((char**)save);
> + const size_t max_len= sizeof(cassandra_default_host_buf);
> +
> + mysql_mutex_lock(&cassandra_default_host_lock);
> +
> + if (new_host)
> + {
> + strncpy(cassandra_default_host_buf, new_host, max_len-1);
> + cassandra_default_host_buf[max_len-1]= 0;
> + cassandra_default_thrift_host= cassandra_default_host_buf;
> + }
> + else
> + {
> + cassandra_default_host_buf[0]= 0;
> + cassandra_default_thrift_host= NULL;
> + }
> +
> + *((const char**)var_ptr)= cassandra_default_thrift_host;
> +
> + mysql_mutex_unlock(&cassandra_default_host_lock);
> +}
> +
> +
> +static MYSQL_SYSVAR_STR(default_thrift_host, cassandra_default_thrift_host,
> + PLUGIN_VAR_RQCMDARG,
> + "Default host for Cassandra thrift connections",
> + /*check*/NULL,
> + cassandra_default_thrift_host_update,
> + /*default*/NULL);
> +
> +static struct st_mysql_sys_var* cassandra_system_variables[]= {
> + MYSQL_SYSVAR(insert_batch_size),
> + MYSQL_SYSVAR(multiget_batch_size),
> + MYSQL_SYSVAR(rnd_batch_size),
> +
> + MYSQL_SYSVAR(default_thrift_host),
> + MYSQL_SYSVAR(write_consistency),
> + MYSQL_SYSVAR(read_consistency),
> + MYSQL_SYSVAR(failure_retries),
> + NULL
> +};
> +
> +
> +static SHOW_VAR cassandra_status_variables[]= {
> + {"row_inserts",
> + (char*) &cassandra_counters.row_inserts, SHOW_LONG},
> + {"row_insert_batches",
> + (char*) &cassandra_counters.row_insert_batches, SHOW_LONG},
> +
> + {"multiget_reads",
> + (char*) &cassandra_counters.multiget_reads, SHOW_LONG},
> + {"multiget_keys_scanned",
> + (char*) &cassandra_counters.multiget_keys_scanned, SHOW_LONG},
> + {"multiget_rows_read",
> + (char*) &cassandra_counters.multiget_rows_read, SHOW_LONG},
> +
> + {"timeout_exceptions",
> + (char*) &cassandra_counters.timeout_exceptions, SHOW_LONG},
> + {"unavailable_exceptions",
> + (char*) &cassandra_counters.unavailable_exceptions, SHOW_LONG},
> + {NullS, NullS, SHOW_LONG}
> +};
> +
> +
> +Cassandra_status_vars cassandra_counters;
> +Cassandra_status_vars cassandra_counters_copy;
> +
> +
> +/**
> + @brief
> + Function we use in the creation of our hash to get key.
> +*/
> +
> +static uchar* cassandra_get_key(CASSANDRA_SHARE *share, size_t *length,
> + my_bool not_used __attribute__((unused)))
> +{
> + *length=share->table_name_length;
> + return (uchar*) share->table_name;
> +}
> +
> +#ifdef HAVE_PSI_INTERFACE
> +static PSI_mutex_key ex_key_mutex_example, ex_key_mutex_CASSANDRA_SHARE_mutex;
> +
> +static PSI_mutex_info all_cassandra_mutexes[]=
> +{
> + { &ex_key_mutex_example, "cassandra", PSI_FLAG_GLOBAL},
> + { &ex_key_mutex_CASSANDRA_SHARE_mutex, "CASSANDRA_SHARE::mutex", 0}
> +};
> +
> +static void init_cassandra_psi_keys()
> +{
> + const char* category= "cassandra";
> + int count;
> +
> + if (PSI_server == NULL)
> + return;
> +
> + count= array_elements(all_cassandra_mutexes);
> + PSI_server->register_mutex(category, all_cassandra_mutexes, count);
> +}
> +#endif
> +
> +static int cassandra_init_func(void *p)
> +{
> + DBUG_ENTER("cassandra_init_func");
> +
> +#ifdef HAVE_PSI_INTERFACE
> + init_cassandra_psi_keys();
> +#endif
> +
> + cassandra_hton= (handlerton *)p;
> + mysql_mutex_init(ex_key_mutex_example, &cassandra_mutex, MY_MUTEX_INIT_FAST);
> + (void) my_hash_init(&cassandra_open_tables,system_charset_info,32,0,0,
> + (my_hash_get_key) cassandra_get_key,0,0);
> +
> + cassandra_hton->state= SHOW_OPTION_YES;
> + cassandra_hton->create= cassandra_create_handler;
> + /*
> + Don't specify HTON_CAN_RECREATE in flags. re-create is used by TRUNCATE
> + TABLE to create an *empty* table from scratch. Cassandra table won't be
> + emptied if re-created.
> + */
HTON_ALTER_NOT_SUPPORTED is not set. ALTER works?
HTON_TEMPORARY_NOT_SUPPORTED is not set. CREATE TEMPORARY TABLE works?
HTON_NO_PARTITION is not set. Partitioning works?
> + cassandra_hton->flags= 0;
> + cassandra_hton->table_options= cassandra_table_option_list;
> + cassandra_hton->field_options= cassandra_field_option_list;
> +
> + mysql_mutex_init(0 /* no instrumentation */,
why no instrumentation ?
> + &cassandra_default_host_lock, MY_MUTEX_INIT_FAST);
> +
> + DBUG_RETURN(0);
> +}
> +
> +
> +static int cassandra_done_func(void *p)
> +{
> + int error= 0;
> + DBUG_ENTER("cassandra_done_func");
> + if (cassandra_open_tables.records)
> + error= 1;
> + my_hash_free(&cassandra_open_tables);
> + mysql_mutex_destroy(&cassandra_mutex);
> + mysql_mutex_destroy(&cassandra_default_host_lock);
> + DBUG_RETURN(error);
> +}
> +
> +
> +/**
> + @brief
> + Example of simple lock controls. The "share" it creates is a
> + structure we will pass to each cassandra handler. Do you have to have
> + one of these? Well, you have pieces that are used for locking, and
> + they are needed to function.
> +*/
> +
> +static CASSANDRA_SHARE *get_share(const char *table_name, TABLE *table)
> +{
> + CASSANDRA_SHARE *share;
> + uint length;
> + char *tmp_name;
> +
> + mysql_mutex_lock(&cassandra_mutex);
> + length=(uint) strlen(table_name);
> +
> + if (!(share=(CASSANDRA_SHARE*) my_hash_search(&cassandra_open_tables,
> + (uchar*) table_name,
> + length)))
> + {
> + if (!(share=(CASSANDRA_SHARE *)
> + my_multi_malloc(MYF(MY_WME | MY_ZEROFILL),
> + &share, sizeof(*share),
> + &tmp_name, length+1,
> + NullS)))
> + {
> + mysql_mutex_unlock(&cassandra_mutex);
> + return NULL;
> + }
> +
> + share->use_count=0;
> + share->table_name_length=length;
> + share->table_name=tmp_name;
> + strmov(share->table_name,table_name);
> + if (my_hash_insert(&cassandra_open_tables, (uchar*) share))
> + goto error;
> + thr_lock_init(&share->lock);
> + mysql_mutex_init(ex_key_mutex_CASSANDRA_SHARE_mutex,
> + &share->mutex, MY_MUTEX_INIT_FAST);
> + }
> + share->use_count++;
> + mysql_mutex_unlock(&cassandra_mutex);
> +
> + return share;
> +
> +error:
> + mysql_mutex_destroy(&share->mutex);
> + my_free(share);
> +
> + return NULL;
> +}
> +
> +
> +/**
> + @brief
> + Free lock controls. We call this whenever we close a table. If the table had
> + the last reference to the share, then we free memory associated with it.
> +*/
> +
> +static int free_share(CASSANDRA_SHARE *share)
> +{
> + mysql_mutex_lock(&cassandra_mutex);
> + if (!--share->use_count)
> + {
> + my_hash_delete(&cassandra_open_tables, (uchar*) share);
> + thr_lock_delete(&share->lock);
> + mysql_mutex_destroy(&share->mutex);
> + my_free(share);
> + }
> + mysql_mutex_unlock(&cassandra_mutex);
> +
> + return 0;
> +}
> +
> +
> +static handler* cassandra_create_handler(handlerton *hton,
> + TABLE_SHARE *table,
> + MEM_ROOT *mem_root)
> +{
> + return new (mem_root) ha_cassandra(hton, table);
> +}
> +
> +
> +ha_cassandra::ha_cassandra(handlerton *hton, TABLE_SHARE *table_arg)
> + :handler(hton, table_arg),
> + se(NULL), field_converters(NULL),
> + special_type_field_converters(NULL),
> + special_type_field_names(NULL), n_special_type_fields(0),
> + rowkey_converter(NULL),
> + dyncol_field(0), dyncol_set(0)
> +{}
> +
> +
> +static const char *ha_cassandra_exts[] = {
> + NullS
> +};
> +
> +const char **ha_cassandra::bas_ext() const
> +{
> + return ha_cassandra_exts;
> +}
> +
> +
> +int ha_cassandra::connect_and_check_options(TABLE *table_arg)
> +{
> + ha_table_option_struct *options= table_arg->s->option_struct;
> + int res;
> + DBUG_ENTER("ha_cassandra::connect_and_check_options");
> +
> + if ((res= check_field_options(table_arg->s->field)) ||
> + (res= check_table_options(options)))
> + DBUG_RETURN(res);
> +
> + se= create_cassandra_se();
> + se->set_column_family(options->column_family);
> + const char *thrift_host= options->thrift_host? options->thrift_host:
> + cassandra_default_thrift_host;
> + if (se->connect(thrift_host, options->thrift_port, options->keyspace))
> + {
> + my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), se->error_str());
> + DBUG_RETURN(HA_ERR_NO_CONNECTION);
> + }
> +
> + if (setup_field_converters(table_arg->field, table_arg->s->fields))
> + {
> + DBUG_RETURN(HA_ERR_NO_CONNECTION);
> + }
> +
> + DBUG_RETURN(0);
> +}
> +
> +
> +int ha_cassandra::check_field_options(Field **fields)
> +{
> + Field **field;
> + uint i;
> + DBUG_ENTER("ha_cassandra::check_field_options");
> + for (field= fields, i= 0; *field; field++, i++)
> + {
> + ha_field_option_struct *field_options= (*field)->option_struct;
> + if (field_options && field_options->dyncol_field)
> + {
> + if (dyncol_set || (*field)->type() != MYSQL_TYPE_BLOB)
> + {
> + my_error(ER_WRONG_FIELD_SPEC, MYF(0), (*field)->field_name);
> + DBUG_RETURN(HA_WRONG_CREATE_OPTION);
> + }
> + dyncol_set= 1;
> + dyncol_field= i;
> + bzero(&dynamic_values, sizeof(dynamic_values));
> + bzero(&dynamic_names, sizeof(dynamic_names));
> + bzero(&dynamic_rec, sizeof(dynamic_rec));
> + }
> + }
> + DBUG_RETURN(0);
> +}
> +
> +
> +int ha_cassandra::open(const char *name, int mode, uint test_if_locked)
> +{
> + DBUG_ENTER("ha_cassandra::open");
> +
> + if (!(share = get_share(name, table)))
> + DBUG_RETURN(1);
> + thr_lock_data_init(&share->lock,&lock,NULL);
> +
> + DBUG_ASSERT(!se);
> + /*
> + Don't do the following on open: it prevents SHOW CREATE TABLE when the server
> + has gone away.
> + */
> + /*
> + int res;
> + if ((res= connect_and_check_options(table)))
> + {
> + DBUG_RETURN(res);
> + }
> + */
> +
> + info(HA_STATUS_NO_LOCK | HA_STATUS_VARIABLE | HA_STATUS_CONST);
> + insert_lineno= 0;
> +
> + DBUG_RETURN(0);
> +}
> +
> +
> +int ha_cassandra::close(void)
> +{
> + DBUG_ENTER("ha_cassandra::close");
> + delete se;
> + se= NULL;
> + free_field_converters();
> + DBUG_RETURN(free_share(share));
> +}
> +
> +
> +int ha_cassandra::check_table_options(ha_table_option_struct *options)
> +{
> + if (!options->thrift_host && (!cassandra_default_thrift_host ||
> + !cassandra_default_thrift_host[0]))
> + {
> + my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0),
> + "thrift_host table option must be specified, or "
> + "@@cassandra_default_thrift_host must be set");
> + return HA_WRONG_CREATE_OPTION;
> + }
> +
> + if (!options->keyspace || !options->column_family)
> + {
> + my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0),
> + "keyspace and column_family table options must be specified");
> + return HA_WRONG_CREATE_OPTION;
> + }
> + return 0;
> +}
> +
> +
> +/**
> + @brief
> + create() is called to create a table. The variable name will have the name
> + of the table.
> +
> + @details
> + When create() is called you do not need to worry about
> + opening the table. Also, the .frm file will have already been
> + created so adjusting create_info is not necessary. You can overwrite
> + the .frm file at this point if you wish to change the table
> + definition, but there are no methods currently provided for doing
> + so.
> +
> + Called from handle.cc by ha_create_table().
> +
> + @see
> + ha_create_table() in handle.cc
> +*/
> +
> +int ha_cassandra::create(const char *name, TABLE *table_arg,
> + HA_CREATE_INFO *create_info)
> +{
> + int res;
> + DBUG_ENTER("ha_cassandra::create");
> +
> + Field **pfield= table_arg->s->field;
> + if (!((*pfield)->flags & NOT_NULL_FLAG))
> + {
> + my_error(ER_WRONG_COLUMN_NAME, MYF(0), "First column must be NOT NULL");
> + DBUG_RETURN(HA_WRONG_CREATE_OPTION);
> + }
this is unnecessary. the second check guarantees that the first column is
NOT NULL, that is, this if() is redundant
> +
> + if (table_arg->s->keys != 1 || table_arg->s->primary_key !=0 ||
> + table_arg->key_info[0].key_parts != 1 ||
> + table_arg->key_info[0].key_part[0].fieldnr != 1)
> + {
> + my_error(ER_WRONG_COLUMN_NAME, MYF(0),
> + "Table must have PRIMARY KEY defined over the first column");
> + DBUG_RETURN(HA_WRONG_CREATE_OPTION);
> + }
...
> +CASSANDRA_TYPE get_cassandra_type(const char *validator)
> +{
> + CASSANDRA_TYPE rc;
> + switch(validator[32])
> + {
> + case 'L':
> + rc= CT_BIGINT;
> + break;
> + case 'I':
> + rc= (validator[35] == '3' ? CT_INT : CT_VARINT);
> + rc= CT_INT;
> + break;
> + case 'C':
> + rc= CT_COUNTER;
> + break;
> + case 'F':
> + rc= CT_FLOAT;
> + break;
> + case 'D':
> + switch (validator[33])
> + {
> + case 'o':
> + rc= CT_DOUBLE;
> + break;
> + case 'a':
> + rc= CT_TIMESTAMP;
> + break;
> + case 'e':
> + rc= CT_DECIMAL;
> + break;
> + default:
> + rc= CT_BLOB;
> + break;
> + }
> + break;
> + case 'B':
> + rc= (validator[33] == 'o' ? CT_BOOLEAN : CT_BLOB);
> + break;
> + case 'A':
> + rc= CT_ASCII;
> + break;
> + case 'U':
> + rc= (validator[33] == 'T' ? CT_TEXT : CT_UUID);
> + break;
> + default:
> + rc= CT_BLOB;
> + }
> + DBUG_ASSERT(strcmp(cassandra_types[rc].name, validator) == 0);
> + return rc;
> +}
> +
> +ColumnDataConverter *map_field_to_validator(Field *field, const char *validator_name)
> +{
> + ColumnDataConverter *res= NULL;
> +
> + switch(field->type()) {
> + case MYSQL_TYPE_TINY:
> + if (!strcmp(validator_name, validator_boolean))
why do you strcmp here, while you're only check selected characters in
get_cassandra_type() ?
you could've called get_cassandra_type() for validator_name instead
and compare enum values, instead of strings.
> + {
> + res= new TinyintDataConverter;
> + break;
> + }
> + /* fall through: */
> + case MYSQL_TYPE_SHORT:
> + case MYSQL_TYPE_LONGLONG:
> + {
> + bool is_counter= false;
> + if (!strcmp(validator_name, validator_bigint) ||
> + !strcmp(validator_name, validator_timestamp) ||
> + (is_counter= !strcmp(validator_name, validator_counter)))
> + res= new BigintDataConverter(!is_counter);
> + break;
> + }
> + case MYSQL_TYPE_FLOAT:
> + if (!strcmp(validator_name, validator_float))
> + res= new FloatDataConverter;
> + break;
> +
> + case MYSQL_TYPE_DOUBLE:
> + if (!strcmp(validator_name, validator_double))
> + res= new DoubleDataConverter;
> + break;
> +
> + case MYSQL_TYPE_TIMESTAMP:
> + if (!strcmp(validator_name, validator_timestamp))
> + res= new TimestampDataConverter;
> + break;
> +
> + case MYSQL_TYPE_STRING: // these are space padded CHAR(n) strings.
> + if (!strcmp(validator_name, validator_uuid) &&
> + field->real_type() == MYSQL_TYPE_STRING &&
> + field->field_length == 36)
> + {
> + // UUID maps to CHAR(36), its text representation
> + res= new UuidDataConverter;
> + break;
> + }
> + /* fall through: */
> + case MYSQL_TYPE_VAR_STRING:
> + case MYSQL_TYPE_VARCHAR:
> + {
> + /*
> + Cassandra's "varint" type is a binary-encoded arbitary-length
> + big-endian number.
> + - It can be mapped to VARBINARY(N), with sufficiently big N.
> + - If the value does not fit into N bytes, it is an error. We should not
> + truncate it, because that is just as good as returning garbage.
> + - varint should not be mapped to BINARY(N), because BINARY(N) values
> + are zero-padded, which will work as multiplying the value by
> + 2^k for some value of k.
> + */
> + if (field->type() == MYSQL_TYPE_VARCHAR &&
> + field->binary() &&
> + (!strcmp(validator_name, validator_varint) ||
> + !strcmp(validator_name, validator_decimal)))
> + {
> + res= new StringCopyConverter(field->field_length);
> + break;
> + }
> +
> + if (!strcmp(validator_name, validator_blob) ||
> + !strcmp(validator_name, validator_ascii) ||
> + !strcmp(validator_name, validator_text))
> + {
> + res= new StringCopyConverter((size_t)-1);
> + }
> + break;
> + }
> + case MYSQL_TYPE_LONG:
> + if (!strcmp(validator_name, validator_int))
> + res= new Int32DataConverter;
> + break;
> +
> + default:;
> + }
> + return res;
> +}
> +
> +
> +bool ha_cassandra::setup_field_converters(Field **field_arg, uint n_fields)
> +{
> + char *col_name;
> + int col_name_len;
> + char *col_type;
> + int col_type_len;
> + size_t ddl_fields= se->get_ddl_size();
> + const char *default_type= se->get_default_validator();
> + uint max_non_default_fields;
> + DBUG_ENTER("ha_cassandra::setup_field_converters");
> + DBUG_ASSERT(default_type);
> +
> + DBUG_ASSERT(!field_converters);
> + DBUG_ASSERT(dyncol_set == 0 || dyncol_set == 1);
> +
> + /*
> + We always should take into account that in case of using dynamic columns
> + sql description contain one field which does not described in
> + Cassandra DDL also key field is described separately. So that
> + is why we use "n_fields - dyncol_set - 1" or "ddl_fields + 2".
> + */
> + max_non_default_fields= ddl_fields + 2 - n_fields;
> + if (ddl_fields < (n_fields - dyncol_set - 1))
> + {
> + se->print_error("Some of SQL fields were not mapped to Cassandra's fields");
> + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
> + DBUG_RETURN(true);
> + }
> +
> + /* allocate memory in one chunk */
> + size_t memsize= sizeof(ColumnDataConverter*) * n_fields +
> + (sizeof(LEX_STRING) + sizeof(CASSANDRA_TYPE_DEF))*
> + (dyncol_set ? max_non_default_fields : 0);
> + if (!(field_converters= (ColumnDataConverter**)my_malloc(memsize, MYF(0))))
> + DBUG_RETURN(true);
> + bzero(field_converters, memsize);
> + n_field_converters= n_fields;
> +
> + if (dyncol_set)
> + {
> + special_type_field_converters=
> + (CASSANDRA_TYPE_DEF *)(field_converters + n_fields);
> + special_type_field_names=
> + ((LEX_STRING*)(special_type_field_converters + max_non_default_fields));
> + }
> +
> + if (dyncol_set)
> + {
> + if (init_dynamic_array(&dynamic_values,
> + sizeof(DYNAMIC_COLUMN_VALUE),
> + DYNCOL_USUAL, DYNCOL_DELTA))
> + DBUG_RETURN(true);
> + else
> + if (init_dynamic_array(&dynamic_names,
> + sizeof(LEX_STRING),
> + DYNCOL_USUAL, DYNCOL_DELTA))
> + {
> + delete_dynamic(&dynamic_values);
> + DBUG_RETURN(true);
> + }
> + else
> + if (init_dynamic_string(&dynamic_rec, NULL,
> + DYNCOL_USUAL_REC, DYNCOL_DELTA_REC))
> + {
> + delete_dynamic(&dynamic_values);
> + delete_dynamic(&dynamic_names);
> + DBUG_RETURN(true);
> + }
> +
> + /* Dynamic column field has special processing */
> + field_converters[dyncol_field]= NULL;
> +
> + default_type_def= cassandra_types + get_cassandra_type(default_type);
> + }
> +
> + se->first_ddl_column();
> + uint n_mapped= 0;
> + while (!se->next_ddl_column(&col_name, &col_name_len, &col_type,
> + &col_type_len))
> + {
> + Field **field;
> + uint i;
> + /* Mapping for the 1st field is already known */
> + for (field= field_arg + 1, i= 1; *field; field++, i++)
> + {
> + if ((!dyncol_set || dyncol_field != i) &&
> + !strcmp((*field)->field_name, col_name))
> + {
> + n_mapped++;
> + ColumnDataConverter **conv= field_converters + (*field)->field_index;
> + if (!(*conv= map_field_to_validator(*field, col_type)))
> + {
> + se->print_error("Failed to map column %s to datatype %s",
> + (*field)->field_name, col_type);
> + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
> + DBUG_RETURN(true);
> + }
> + (*conv)->field= *field;
> + }
> + }
> + if (dyncol_set && !(*field)) // is needed and not found
> + {
> + DBUG_PRINT("info",("Field not found: %s", col_name));
> + if (strcmp(col_type, default_type))
> + {
> + DBUG_PRINT("info",("Field '%s' non-default type: '%s'",
> + col_name, col_type));
> + special_type_field_names[n_special_type_fields].length= col_name_len;
> + special_type_field_names[n_special_type_fields].str= col_name;
> + special_type_field_converters[n_special_type_fields]=
> + cassandra_types[get_cassandra_type(col_type)];
> + n_special_type_fields++;
> + }
> + }
> + }
> +
> + if (n_mapped != n_fields - 1 - dyncol_set)
> + {
> + Field *first_unmapped= NULL;
> + /* Find the first field */
> + for (uint i= 1; i < n_fields;i++)
> + {
> + if (!field_converters[i])
> + {
> + first_unmapped= field_arg[i];
> + break;
> + }
> + }
> + DBUG_ASSERT(first_unmapped);
> +
> + se->print_error("Field `%s` could not be mapped to any field in Cassandra",
generally (here and everywhere) use %`s instead of `%s`
> + first_unmapped->field_name);
> + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
> + DBUG_RETURN(true);
> + }
> +
> + /*
> + Setup type conversion for row_key.
> + */
> + se->get_rowkey_type(&col_name, &col_type);
> + if (col_name && strcmp(col_name, (*field_arg)->field_name))
> + {
> + se->print_error("PRIMARY KEY column must match Cassandra's name '%s'",
> + col_name);
> + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
> + DBUG_RETURN(true);
> + }
> + if (!col_name && strcmp("rowkey", (*field_arg)->field_name))
> + {
> + se->print_error("target column family has no key_alias defined, "
> + "PRIMARY KEY column must be named 'rowkey'");
> + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
> + DBUG_RETURN(true);
> + }
> +
> + if (col_type != NULL)
> + {
> + if (!(rowkey_converter= map_field_to_validator(*field_arg, col_type)))
> + {
> + se->print_error("Failed to map PRIMARY KEY to datatype %s", col_type);
> + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
> + DBUG_RETURN(true);
> + }
> + rowkey_converter->field= *field_arg;
> + }
> + else
> + {
> + se->print_error("Cassandra's rowkey has no defined datatype (todo: support this)");
> + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
> + DBUG_RETURN(true);
> + }
> +
> + DBUG_RETURN(false);
> +}
> +
> +
> +void ha_cassandra::free_field_converters()
> +{
> + delete rowkey_converter;
> + rowkey_converter= NULL;
> +
> + if (dyncol_set)
> + {
> + delete_dynamic(&dynamic_values);
> + delete_dynamic(&dynamic_names);
> + dynstr_free(&dynamic_rec);
> + }
> + if (field_converters)
> + {
> + for (uint i=0; i < n_field_converters; i++)
> + if (field_converters[i])
> + {
> + DBUG_ASSERT(!dyncol_set || i == dyncol_field);
> + delete field_converters[i];
> + }
> + my_free(field_converters);
> + field_converters= NULL;
> + }
> +}
> +
> +
> +int ha_cassandra::index_init(uint idx, bool sorted)
> +{
> + int ires;
> + if (!se && (ires= connect_and_check_options(table)))
> + return ires;
> + return 0;
> +}
> +
> +void store_key_image_to_rec(Field *field, uchar *ptr, uint len);
> +
> +int ha_cassandra::index_read_map(uchar *buf, const uchar *key,
> + key_part_map keypart_map,
> + enum ha_rkey_function find_flag)
> +{
> + int rc= 0;
> + DBUG_ENTER("ha_cassandra::index_read_map");
> +
> + if (find_flag != HA_READ_KEY_EXACT)
> + DBUG_RETURN(HA_ERR_WRONG_COMMAND);
did you verify that the server expects HA_ERR_WRONG_COMMAND from
this method and correctly handles it?
or, perhaps, find_flag is always HA_READ_KEY_EXACT here,
because of your index_flags() ?
> +
> + uint key_len= calculate_key_len(table, active_index, key, keypart_map);
> + store_key_image_to_rec(table->field[0], (uchar*)key, key_len);
> +
> + char *cass_key;
> + int cass_key_len;
> + my_bitmap_map *old_map;
> +
> + old_map= dbug_tmp_use_all_columns(table, table->read_set);
> +
> + if (rowkey_converter->mariadb_to_cassandra(&cass_key, &cass_key_len))
> + {
> + /* We get here when making lookups like uuid_column='not-an-uuid' */
> + dbug_tmp_restore_column_map(table->read_set, old_map);
> + DBUG_RETURN(HA_ERR_KEY_NOT_FOUND);
> + }
> +
> + dbug_tmp_restore_column_map(table->read_set, old_map);
> +
> + bool found;
> + if (se->get_slice(cass_key, cass_key_len, &found))
> + {
> + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
generally it's better not to use my_error() directly,
but only return an error code, and return your se->error_str()
from ha_cassandra::get_error_message()
> + rc= HA_ERR_INTERNAL_ERROR;
> + }
> +
> + /* TODO: what if we're not reading all columns?? */
> + if (!found)
> + rc= HA_ERR_KEY_NOT_FOUND;
> + else
> + rc= read_cassandra_columns(false);
> +
> + DBUG_RETURN(rc);
> +}
> +
> +
> +void ha_cassandra::print_conversion_error(const char *field_name,
> + char *cass_value,
> + int cass_value_len)
> +{
> + char buf[32];
> + char *p= cass_value;
> + size_t i= 0;
> + for (; (i < (int)sizeof(buf)-1) && (p < cass_value + cass_value_len); p++)
why do you cast here?
> + {
> + buf[i++]= map2number[(*p >> 4) & 0xF];
> + buf[i++]= map2number[*p & 0xF];
> + }
> + buf[i]=0;
> +
> + se->print_error("Unable to convert value for field `%s` from Cassandra's data"
> + " format. Source data is %d bytes, 0x%s%s",
> + field_name, cass_value_len, buf,
> + (i == sizeof(buf) - 1)? "..." : "");
> + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
> +}
> +
> +
> +void free_strings(DYNAMIC_COLUMN_VALUE *vals, uint num)
> +{
> + for (uint i= 0; i < num; i++)
> + if (vals[i].type == DYN_COL_STRING &&
> + !vals[i].x.string.nonfreeable)
> + my_free(vals[i].x.string.value.str);
> +}
> +
> +
> +CASSANDRA_TYPE_DEF * ha_cassandra::get_cassandra_field_def(char *cass_name,
> + int cass_name_len)
> +{
> + CASSANDRA_TYPE_DEF *type= default_type_def;
> + for(uint i= 0; i < n_special_type_fields; i++)
> + {
> + if (cass_name_len == (int)special_type_field_names[i].length &&
> + memcmp(cass_name, special_type_field_names[i].str,
> + cass_name_len) == 0)
> + {
> + type= special_type_field_converters + i;
> + break;
> + }
> + }
> + return type;
> +}
> +
> +int ha_cassandra::read_cassandra_columns(bool unpack_pk)
> +{
> + char *cass_name;
> + char *cass_value;
> + int cass_value_len, cass_name_len;
> + Field **field;
> + int res= 0;
> + ulong total_name_len= 0;
> +
> + /*
> + cassandra_to_mariadb() calls will use field->store(...) methods, which
> + require that the column is in the table->write_set
> + */
> + my_bitmap_map *old_map;
> + old_map= dbug_tmp_use_all_columns(table, table->write_set);
> +
> + /* Start with all fields being NULL */
> + for (field= table->field + 1; *field; field++)
> + (*field)->set_null();
> +
> + while (!se->get_next_read_column(&cass_name, &cass_name_len,
> + &cass_value, &cass_value_len))
> + {
> + // map to our column. todo: use hash or something..
> + bool found= 0;
> + for (field= table->field + 1; *field; field++)
> + {
> + uint fieldnr= (*field)->field_index;
> + if ((!dyncol_set || dyncol_field != fieldnr) &&
> + !strcmp((*field)->field_name, cass_name))
> + {
> + found= 1;
> + (*field)->set_notnull();
> + if (field_converters[fieldnr]->cassandra_to_mariadb(cass_value,
> + cass_value_len))
> + {
> + print_conversion_error((*field)->field_name, cass_value,
> + cass_value_len);
> + res=1;
> + goto err;
> + }
> + break;
> + }
> + }
> + if (dyncol_set && !found)
> + {
> + DYNAMIC_COLUMN_VALUE val;
> + LEX_STRING nm;
> + CASSANDRA_TYPE_DEF *type= get_cassandra_field_def(cass_name,
> + cass_name_len);
> + nm.str= cass_name;
> + nm.length= cass_name_len;
> + if (nm.length > MAX_NAME_LENGTH)
> + {
> + se->print_error("Unable to convert value for field `%s`"
> + " from Cassandra's data format. Name"
> + " length exceed limit of %u: '%s'",
> + table->field[dyncol_field]->field_name,
> + (uint)MAX_NAME_LENGTH, cass_name);
> + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
> + res=1;
> + goto err;
> + }
> + total_name_len+= cass_name_len;
> + if (nm.length > MAX_TOTAL_NAME_LENGTH)
> + {
> + se->print_error("Unable to convert value for field `%s`"
> + " from Cassandra's data format. Sum of all names"
> + " length exceed limit of %lu",
> + table->field[dyncol_field]->field_name,
> + cass_name, (uint)MAX_TOTAL_NAME_LENGTH);
> + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
> + res=1;
> + goto err;
> + }
> +
> + if ((res= (*(type->cassandra_to_dynamic))(cass_value,
> + cass_value_len, &val)) ||
> + insert_dynamic(&dynamic_names, (uchar *) &nm) ||
> + insert_dynamic(&dynamic_values, (uchar *) &val))
> + {
> + if (res)
> + {
> + print_conversion_error(cass_name, cass_value, cass_value_len);
> + }
> + free_strings((DYNAMIC_COLUMN_VALUE *)dynamic_values.buffer,
> + dynamic_values.elements);
> + // EOM shouldm be already reported if happened
> + res=1;
> + goto err;
> + }
> + }
> + }
> +
> + dynamic_rec.length= 0;
> + if (dyncol_set)
> + {
> + if (dynamic_column_create_many_internal_fmt(&dynamic_rec,
> + dynamic_names.elements,
> + dynamic_names.buffer,
> + (DYNAMIC_COLUMN_VALUE *)
> + dynamic_values.buffer,
> + FALSE,
> + TRUE) < 0)
> + dynamic_rec.length= 0;
> +
> + free_strings((DYNAMIC_COLUMN_VALUE *)dynamic_values.buffer,
> + dynamic_values.elements);
> + dynamic_values.elements= dynamic_names.elements= 0;
> + }
> + if (dyncol_set)
why a separate if() ?
> + {
> + if (dynamic_rec.length == 0)
> + table->field[dyncol_field]->set_null();
> + else
> + {
> + Field_blob *blob= (Field_blob *)table->field[dyncol_field];
> + blob->set_notnull();
> + blob->store_length(dynamic_rec.length);
> + *((char **)(((char *)blob->ptr) + blob->pack_length_no_ptr()))=
> + dynamic_rec.str;
> + }
> + }
> +
> + if (unpack_pk)
> + {
> + /* Unpack rowkey to primary key */
> + field= table->field;
> + (*field)->set_notnull();
> + se->get_read_rowkey(&cass_value, &cass_value_len);
> + if (rowkey_converter->cassandra_to_mariadb(cass_value, cass_value_len))
> + {
> + print_conversion_error((*field)->field_name, cass_value, cass_value_len);
> + res=1;
> + goto err;
> + }
> + }
> +
> +err:
> + dbug_tmp_restore_column_map(table->write_set, old_map);
> + return res;
> +}
...
> +int ha_cassandra::rnd_pos(uchar *buf, uchar *pos)
> +{
> + int rc;
> + DBUG_ENTER("ha_cassandra::rnd_pos");
> +
> + int save_active_index= active_index;
> + active_index= 0; /* The primary key */
> + rc= index_read_map(buf, pos, key_part_map(1), HA_READ_KEY_EXACT);
> +
> + active_index= save_active_index;
a bit nicer would be to implement index_read_map_idx
and call it from here and from index_read_map
> +
> + DBUG_RETURN(rc);
> +}
> +
...
> +bool ha_cassandra::mrr_start_read()
> +{
> + uint key_len;
> +
> + my_bitmap_map *old_map;
> + old_map= dbug_tmp_use_all_columns(table, table->read_set);
> +
> + se->new_lookup_keys();
> +
> + while (!(source_exhausted= mrr_funcs.next(mrr_iter, &mrr_cur_range)))
> + {
> + char *cass_key;
> + int cass_key_len;
> +
> + DBUG_ASSERT(mrr_cur_range.range_flag & EQ_RANGE);
> +
> + uchar *key= (uchar*)mrr_cur_range.start_key.key;
> + key_len= mrr_cur_range.start_key.length;
> + //key_len= calculate_key_len(table, active_index, key, keypart_map); // NEED THIS??
> + store_key_image_to_rec(table->field[0], (uchar*)key, key_len);
> +
> + rowkey_converter->mariadb_to_cassandra(&cass_key, &cass_key_len);
> +
> + // Primitive buffer control
> + if (se->add_lookup_key(cass_key, cass_key_len) >
> + THDVAR(table->in_use, multiget_batch_size))
> + break;
I'd suggest to add a status variable to count
how many times the buffer was refilled
> + }
> +
> + dbug_tmp_restore_column_map(table->read_set, old_map);
> +
> + return se->multiget_slice();
> +}
...
> +/////////////////////////////////////////////////////////////////////////////
> +// Dummy implementations start
> +/////////////////////////////////////////////////////////////////////////////
> +
> +
> +int ha_cassandra::index_next(uchar *buf)
Why did you do all these dummy methods?
> +{
> + int rc;
> + DBUG_ENTER("ha_cassandra::index_next");
> + rc= HA_ERR_WRONG_COMMAND;
> + DBUG_RETURN(rc);
> +}
> +
> +
> +int ha_cassandra::index_prev(uchar *buf)
> +{
> + int rc;
> + DBUG_ENTER("ha_cassandra::index_prev");
> + rc= HA_ERR_WRONG_COMMAND;
> + DBUG_RETURN(rc);
> +}
> +
> +
> +int ha_cassandra::index_first(uchar *buf)
> +{
> + int rc;
> + DBUG_ENTER("ha_cassandra::index_first");
> + rc= HA_ERR_WRONG_COMMAND;
> + DBUG_RETURN(rc);
> +}
> +
> +int ha_cassandra::index_last(uchar *buf)
> +{
> + int rc;
> + DBUG_ENTER("ha_cassandra::index_last");
> + rc= HA_ERR_WRONG_COMMAND;
> + DBUG_RETURN(rc);
> +}
> +
> +
> +ha_rows ha_cassandra::records_in_range(uint inx, key_range *min_key,
> + key_range *max_key)
> +{
> + DBUG_ENTER("ha_cassandra::records_in_range");
> + //DBUG_RETURN(10); // low number to force index usage
> + DBUG_RETURN(HA_POS_ERROR);
> +}
> +
> +
> +class Column_name_enumerator_impl : public Column_name_enumerator
> +{
> + ha_cassandra *obj;
> + uint idx;
> +public:
> + Column_name_enumerator_impl(ha_cassandra *obj_arg) : obj(obj_arg), idx(1) {}
> + const char* get_next_name()
> + {
> + if (idx == obj->table->s->fields)
> + return NULL;
> + else
> + return obj->table->field[idx++]->field_name;
> + }
> +};
> +
> +
> +int ha_cassandra::update_row(const uchar *old_data, uchar *new_data)
> +{
> + DYNAMIC_ARRAY oldvals, oldnames, vals, names;
> + String oldvalcol, valcol;
> + char *oldfree_names= NULL, *free_names= NULL;
> + my_bitmap_map *old_map;
> + int res;
> + DBUG_ENTER("ha_cassandra::update_row");
> + /* Currently, it is guaranteed that new_data == table->record[0] */
> + DBUG_ASSERT(new_data == table->record[0]);
> + /* For now, just rewrite the full record */
> + se->clear_insert_buffer();
> +
> + old_map= dbug_tmp_use_all_columns(table, table->read_set);
> +
> + char *old_key;
> + int old_key_len;
> + se->get_read_rowkey(&old_key, &old_key_len);
> +
> + /* Get the key we're going to write */
> + char *new_key;
> + int new_key_len;
> + if (rowkey_converter->mariadb_to_cassandra(&new_key, &new_key_len))
> + {
> + my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0),
> + rowkey_converter->field->field_name, insert_lineno);
> + dbug_tmp_restore_column_map(table->read_set, old_map);
> + DBUG_RETURN(HA_ERR_AUTOINC_ERANGE);
> + }
> +
> + /*
> + Compare it to the key we've read. For all types that Cassandra supports,
> + binary byte-wise comparison can be used
> + */
> + bool new_primary_key;
> + if (new_key_len != old_key_len || memcmp(old_key, new_key, new_key_len))
> + new_primary_key= true;
> + else
> + new_primary_key= false;
> +
> + if (dyncol_set)
> + {
> + Field *field= table->field[dyncol_field];
> + /* move to get old_data */
> + my_ptrdiff_t diff;
> + diff= (my_ptrdiff_t) (old_data - new_data);
> + field->move_field_offset(diff); // Points now at old_data
> + if ((res= read_dyncol(&oldvals, &oldnames, &oldvalcol, &oldfree_names)))
> + DBUG_RETURN(res);
> + field->move_field_offset(-diff); // back to new_data
> + if ((res= read_dyncol(&vals, &names, &valcol, &free_names)))
> + {
> + free_dynamic_row(&oldnames, &oldvals, oldfree_names);
> + DBUG_RETURN(res);
> + }
> + }
> +
> + if (new_primary_key)
> + {
> + /*
> + Primary key value changed. This is essentially a DELETE + INSERT.
> + Add a DELETE operation into the batch
> + */
> + Column_name_enumerator_impl name_enumerator(this);
> + se->add_row_deletion(old_key, old_key_len, &name_enumerator,
> + (LEX_STRING *)oldnames.buffer,
> + (dyncol_set ? oldnames.elements : 0));
> + oldnames.elements= oldvals.elements= 0; // they will be deleted
> + }
> +
> + se->start_row_insert(new_key, new_key_len);
> +
> + /* Convert other fields */
> + for (uint i= 1; i < table->s->fields; i++)
I'd probably would've simply done delete+insert in all cases :)
but as you've went to the troubles of updating column by column,
it really make little sense to rewrite columns that didn't change.
could you check table->write_set here and memcmp with the old
column value before updating?
> + {
> + char *cass_data;
> + int cass_data_len;
> + if (dyncol_set && dyncol_field == i)
> + {
> + DBUG_ASSERT(field_converters[i] == NULL);
> + if ((res= write_dynamic_row(&vals, &names)))
> + goto err;
> + }
> + else
> + {
> + if (field_converters[i]->mariadb_to_cassandra(&cass_data, &cass_data_len))
> + {
> + my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0),
> + field_converters[i]->field->field_name, insert_lineno);
> + dbug_tmp_restore_column_map(table->read_set, old_map);
> + DBUG_RETURN(HA_ERR_AUTOINC_ERANGE);
> + }
> + se->add_insert_column(field_converters[i]->field->field_name, 0,
> + cass_data, cass_data_len);
> + }
> + }
> + if (dyncol_set)
> + {
> + /* find removed fields */
> + uint i= 0, j= 0;
> + LEX_STRING *onames= (LEX_STRING *)oldnames.buffer;
> + LEX_STRING *nnames= (LEX_STRING *)names.buffer;
> + /* both array are sorted */
> + for(; i < oldnames.elements; i++)
> + {
> + int scmp= 0;
> + while (j < names.elements &&
> + (nnames[j].length < onames[i].length ||
> + (nnames[j].length == onames[i].length &&
> + (scmp= memcmp(nnames[j].str, onames[i].str,
> + onames[i].length)) < 0)))
> + j++;
> + if (j < names.elements &&
> + nnames[j].length == onames[i].length &&
> + scmp == 0)
> + j++;
> + else
> + se->add_insert_delete_column(onames[i].str, onames[i].length);
> + }
> + }
> +
> + dbug_tmp_restore_column_map(table->read_set, old_map);
> +
> + res= se->do_insert();
> +
> + if (res)
> + my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
> +
> +err:
> + if (dyncol_set)
> + {
> + free_dynamic_row(&oldnames, &oldvals, oldfree_names);
> + free_dynamic_row(&names, &vals, free_names);
> + }
> +
> + DBUG_RETURN(res? HA_ERR_INTERNAL_ERROR: 0);
> +}
> +
> +
> +int ha_cassandra::extra(enum ha_extra_function operation)
> +{
> + DBUG_ENTER("ha_cassandra::extra");
> + DBUG_RETURN(0);
> +}
please. why do you like dummy methods that much? :)
> +
> +
> +/* The following function was copied from ha_blackhole::store_lock: */
> +THR_LOCK_DATA **ha_cassandra::store_lock(THD *thd,
> + THR_LOCK_DATA **to,
> + enum thr_lock_type lock_type)
> +{
> + DBUG_ENTER("ha_cassandra::store_lock");
> + if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
> + {
> + /*
> + Here is where we get into the guts of a row level lock.
> + If TL_UNLOCK is set
> + If we are not doing a LOCK TABLE or DISCARD/IMPORT
> + TABLESPACE, then allow multiple writers
> + */
> +
> + if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
> + lock_type <= TL_WRITE) && !thd_in_lock_tables(thd)
> + && !thd_tablespace_op(thd))
1. tablespace op in cassandra? really? too much copy-paste is confusing.
(here and in the comments too)
2. did you test LOCK TABLES?
> + lock_type = TL_WRITE_ALLOW_WRITE;
that makes all changes to cassanrda immediately visible
by concurrently running threads. okay, if that's intentional,
but it needs to be documented, as it not the SQL semantics.
> +
> + /*
> + In queries of type INSERT INTO t1 SELECT ... FROM t2 ...
> + MySQL would use the lock TL_READ_NO_INSERT on t2, and that
> + would conflict with TL_WRITE_ALLOW_WRITE, blocking all inserts
> + to t2. Convert the lock to a normal read lock to allow
> + concurrent inserts to t2.
> + */
> +
> + if (lock_type == TL_READ_NO_INSERT && !thd_in_lock_tables(thd))
> + lock_type = TL_READ;
this also breaks SBR for INSERT ... SELECT. same as above - okay if
intentional, but should be thouroughly documented.
> +
> + lock.type= lock_type;
> + }
> + *to++= &lock;
> + DBUG_RETURN(to);
> +}
> +
> +
> +int ha_cassandra::external_lock(THD *thd, int lock_type)
> +{
> + DBUG_ENTER("ha_cassandra::external_lock");
> + DBUG_RETURN(0);
> +}
> +
> +int ha_cassandra::delete_table(const char *name)
> +{
> + DBUG_ENTER("ha_cassandra::delete_table");
> + /*
> + Cassandra table is just a view. Dropping it doesn't affect the underlying
> + column family.
> + */
> + DBUG_RETURN(0);
> +}
more dummies...
> +
> +
> +/**
> + check_if_incompatible_data() called if ALTER TABLE can't detect otherwise
> + if new and old definition are compatible
> +
> + @details If there are no other explicit signs like changed number of
> + fields this function will be called by compare_tables()
> + (sql/sql_tables.cc) to decide should we rewrite whole table or only .frm
> + file.
> +
> +*/
> +
> +bool ha_cassandra::check_if_incompatible_data(HA_CREATE_INFO *info,
> + uint table_changes)
> +{
> + DBUG_ENTER("ha_cassandra::check_if_incompatible_data");
> + /* Checked, we intend to have this empty for Cassandra SE. */
> + DBUG_RETURN(COMPATIBLE_DATA_YES);
> +}
> +
> +
> +/////////////////////////////////////////////////////////////////////////////
> +// Dummy implementations end
> +/////////////////////////////////////////////////////////////////////////////
::update_row and ::store_lock are hardly dummies :)
> +
> +static int show_cassandra_vars(THD *thd, SHOW_VAR *var, char *buff)
> +{
> + cassandra_counters_copy= cassandra_counters;
> +
> + var->type= SHOW_ARRAY;
> + var->value= (char *) &cassandra_status_variables;
what's the point? If you don't do anything in this function,
you can just as easily list all status variables below
in the func_status[] array.
> + return 0;
> +}
> +
> +
> +struct st_mysql_storage_engine cassandra_storage_engine=
> +{ MYSQL_HANDLERTON_INTERFACE_VERSION };
> +
> +static struct st_mysql_show_var func_status[]=
> +{
> + {"Cassandra", (char *)show_cassandra_vars, SHOW_FUNC},
> + {0,0,SHOW_UNDEF}
> +};
> +
> +maria_declare_plugin(cassandra)
> +{
> + MYSQL_STORAGE_ENGINE_PLUGIN,
> + &cassandra_storage_engine,
> + "CASSANDRA",
> + "Monty Program Ab",
> + "Cassandra storage engine",
> + PLUGIN_LICENSE_GPL,
> + cassandra_init_func, /* Plugin Init */
> + cassandra_done_func, /* Plugin Deinit */
> + 0x0001, /* version number (0.1) */
> + func_status, /* status variables */
> + cassandra_system_variables, /* system variables */
> + "0.1", /* string version */
> + MariaDB_PLUGIN_MATURITY_EXPERIMENTAL /* maturity */
> +}
> +maria_declare_plugin_end;
Regards,
Sergei
Follow ups