← Back to team overview

maria-developers team mailing list archive

review of the MDEV-377 Name support for dynamic columns

 

Hi, Sanja,

See my review comments below:

> === modified file 'include/ma_dyncol.h'
> --- include/ma_dyncol.h	2011-09-22 09:04:00 +0000
> +++ include/ma_dyncol.h	2012-11-12 15:11:23 +0000
> @@ -39,6 +39,12 @@
>  */
>  #define MAX_DYNAMIC_COLUMN_LENGTH 0X1FFFFFFFL
>  
> +/*
> +  Limits of implementation
> +*/
> +#define MAX_NAME_LENGTH 255

If we store only offsets, this limit can go away

> +#define MAX_TOTAL_NAME_LENGTH 65535

I'd suggest to store the offset length in the header, just as
you do for data offsets. You might hard-code the fixed offset of 2,
but if it's in the header, you can later change it, if needed.

> +
>  /* NO and OK is the same used just to show semantics */
>  #define ER_DYNCOL_NO ER_DYNCOL_OK
>  
> @@ -81,6 +88,7 @@ struct st_dynamic_column_value
>      struct {
>        LEX_STRING value;
>        CHARSET_INFO *charset;
> +      my_bool nonfreeable;

Yuck. That doesn't look correct. Dynamic column code does not allocate
or frees values. The concept of "nonfreeable" values simply has
no place in the dynamic column universe.

I realize that ha_cassandra needs to know whether to free certain strings
or not. But it's the internal problem of ha_cassandra, it has nothing
to do with dynaic columns, besides the fact that these certain strings
are sometines stored in the struct st_dynamic_column_value.

>      } string;
>      struct {
>        decimal_digit_t buffer[DECIMAL_BUFF_LENGTH];
> @@ -103,6 +111,20 @@ dynamic_column_create_many(DYNAMIC_COLUM
>                             DYNAMIC_COLUMN_VALUE *values);
>  
>  enum enum_dyncol_func_result
> +dynamic_column_create_many_fmt(DYNAMIC_COLUMN *str,
> +                               uint column_count,
> +                               uchar *column_keys,
> +                               DYNAMIC_COLUMN_VALUE *values,
> +                               my_bool names);

I think this should rather be

dynamic_column_create_many_named(DYNAMIC_COLUMN *str,
                                 uint column_count,
                                 LEX_STRING *column_keys,
                                 DYNAMIC_COLUMN_VALUE *values);

> +enum enum_dyncol_func_result
> +dynamic_column_create_many_internal_fmt(DYNAMIC_COLUMN *str,
> +                                        uint column_count,
> +                                        void *column_keys,
> +                                        DYNAMIC_COLUMN_VALUE *values,
> +                                        my_bool new_str,
> +                                        my_bool string_keys);

*internal* should not be in include/, it's not part of the API

> +
> +enum enum_dyncol_func_result
>  dynamic_column_update(DYNAMIC_COLUMN *org, uint column_nr,
>                        DYNAMIC_COLUMN_VALUE *value);
>  enum enum_dyncol_func_result
> @@ -110,16 +132,30 @@ dynamic_column_update_many(DYNAMIC_COLUM
>                             uint add_column_count,
>                             uint *column_numbers,
>                             DYNAMIC_COLUMN_VALUE *values);
> +enum enum_dyncol_func_result
> +dynamic_column_update_many_fmt(DYNAMIC_COLUMN *str,
> +                               uint add_column_count,
> +                               void *column_keys,
> +                               DYNAMIC_COLUMN_VALUE *values,
> +                               my_bool string_keys);

same as above, make it dynamic_column_update_many_named

>  
>  enum enum_dyncol_func_result
>  dynamic_column_delete(DYNAMIC_COLUMN *org, uint column_nr);
>  
>  enum enum_dyncol_func_result
>  dynamic_column_exists(DYNAMIC_COLUMN *org, uint column_nr);
> +enum enum_dyncol_func_result
> +dynamic_column_exists_str(DYNAMIC_COLUMN *str, LEX_STRING *name);
> +enum enum_dyncol_func_result
> +dynamic_column_exists_fmt(DYNAMIC_COLUMN *str, void *key, my_bool string_keys);

remove the second one, and rename the first s/_str/_named/

>  
>  /* List of not NULL columns */
>  enum enum_dyncol_func_result
>  dynamic_column_list(DYNAMIC_COLUMN *org, DYNAMIC_ARRAY *array_of_uint);
> +enum enum_dyncol_func_result
> +dynamic_column_list_str(DYNAMIC_COLUMN *str, DYNAMIC_ARRAY *array_of_lexstr);
> +enum enum_dyncol_func_result
> +dynamic_column_list_fmt(DYNAMIC_COLUMN *str, DYNAMIC_ARRAY *array, my_bool string_keys);

Same.

>  
>  /*
>     if the column do not exists it is NULL
> @@ -127,10 +163,36 @@ dynamic_column_list(DYNAMIC_COLUMN *org,
>  enum enum_dyncol_func_result
>  dynamic_column_get(DYNAMIC_COLUMN *org, uint column_nr,
>                     DYNAMIC_COLUMN_VALUE *store_it_here);
> +enum enum_dyncol_func_result
> +dynamic_column_get_str(DYNAMIC_COLUMN *str, LEX_STRING *name,
> +                           DYNAMIC_COLUMN_VALUE *store_it_here);

s/_str/_named

> +
> +my_bool dynamic_column_has_names(DYNAMIC_COLUMN *str);
> +
> +enum enum_dyncol_func_result
> +dynamic_column_check(DYNAMIC_COLUMN *str);
> +
> +enum enum_dyncol_func_result
> +dynamic_column_json(DYNAMIC_COLUMN *str, DYNAMIC_STRING *json);
>  
>  #define dynamic_column_initialize(A) memset((A), 0, sizeof(*(A)))
>  #define dynamic_column_column_free(V) dynstr_free(V)
>  
> +/* conversion of values to 3 base types */
> +enum enum_dyncol_func_result
> +dynamic_column_val_str(DYNAMIC_STRING *str, DYNAMIC_COLUMN_VALUE *val,
> +                       CHARSET_INFO *cs, my_bool quote);

instead of my_bool quote, better pass a char quote_char.
when it's 0 - no quoting. Otherwise it can be " or `
To make it less error prone, you can create an enum
NOT_QUOTED=0, QUOTED='`', ANSI_QUOTED='"'

> +enum enum_dyncol_func_result
> +dynamic_column_val_long(longlong *ll, DYNAMIC_COLUMN_VALUE *val);
> +enum enum_dyncol_func_result
> +dynamic_column_val_double(double *dbl, DYNAMIC_COLUMN_VALUE *val);
> +
> +
> +enum enum_dyncol_func_result
> +dynamic_column_vals(DYNAMIC_COLUMN *str,
> +                    DYNAMIC_ARRAY *names, DYNAMIC_ARRAY *vals,
> +                    char **free_names);

Eh. may be dynamic_column_unpack() ?

> +
>  /***************************************************************************
>   Internal functions, don't use if you don't know what you are doing...
>  ***************************************************************************/
> 
> === modified file 'mysql-test/t/dyncol.test'
> --- mysql-test/t/dyncol.test	2012-02-29 20:55:04 +0000
> +++ mysql-test/t/dyncol.test	2012-11-12 15:11:23 +0000
> @@ -550,3 +550,125 @@ select hex(COLUMN_CREATE(0, COLUMN_GET(@
>  
>  select hex(COLUMN_CREATE(0, COLUMN_GET(COLUMN_CREATE(0, 0.0 as decimal), 0 as decimal)));
>  select hex(COLUMN_CREATE(0, 0.0 as decimal));
> +
> +--echo #
> +--echo # test of symbolic names
> +--echo #
> +--echo # creation test (names)
> +set names utf8;
> +select hex(column_create("адын", 1212));
> +select hex(column_create("1212", 1212));
> +select hex(column_create(1212, 2, "www", 3));
> +select hex(column_create("1212", 2, "www", 3));
> +select hex(column_create("1212", 2, 3, 3));
> +select hex(column_create("1212", 2, "адын", 1, 3, 3));

to test quoting, add a column with a ` (backtick) in the name
and few other strange names - with a single quote, double quote, backslash,
space, dot, comma, whatever

> +set names default;
...
> === modified file 'sql/sql_yacc.yy'
> --- sql/sql_yacc.yy	2012-06-13 23:28:47 +0000
> +++ sql/sql_yacc.yy	2012-11-12 15:11:23 +0000
> @@ -8799,6 +8801,13 @@ function_call_nonkeyword:
>                MYSQL_YYABORT;
>            }
>          |
> +          COLUMN_CHECK_SYM '(' expr ')'
> +          {
> +            $$= new (YYTHD->mem_root) Item_func_dyncol_check($3);
> +            if ($$ == NULL)
> +              MYSQL_YYABORT;
> +          }
> +        |

This should be removed completely. See, for example, how EXP function
is created (hint: in item_create.cc) and do the same

>            COLUMN_EXISTS_SYM '(' expr ',' expr ')'
>            {
>              $$= new (YYTHD->mem_root) Item_func_dyncol_exists($3, $5);
> @@ -8820,6 +8829,13 @@ function_call_nonkeyword:
>                MYSQL_YYABORT;
>            }
>          |
> +          COLUMN_JSON_SYM '(' expr ')'
> +          {
> +            $$= create_func_dyncol_json(YYTHD, $3);
> +            if ($$ == NULL)
> +              MYSQL_YYABORT;
> +          }
> +        |

same. please, remove the dedicated rule for this and most of other
COLUMN_xxx functions.

>            COLUMN_GET_SYM '(' expr ',' expr AS cast_type ')'
>            {
>              LEX *lex= Lex;
> 
> === modified file 'mysys/ma_dyncol.c'
> --- mysys/ma_dyncol.c	2011-11-22 17:04:38 +0000
> +++ mysys/ma_dyncol.c	2012-11-12 15:11:23 +0000
> @@ -37,19 +41,42 @@
>  */
>  /* mask to get above bits */
>  #define DYNCOL_FLG_OFFSET  3
> +#define DYNCOL_FLG_NAMES  4
>  /* All known flags mask */
> -#define DYNCOL_FLG_KNOWN  3
> +#define DYNCOL_FLG_KNOWN  7
> +
> +/* formats */
> +#define DYNCOL_FMT_NUM 0
> +#define DYNCOL_FMT_STR 1

why not an enum?

>  
>  /* dynamic column size reserve */
>  #define DYNCOL_SYZERESERVE 80
>  
> +#define DYNCOL_OFFSET_ERROR 0xffffffff
> +
>  /* length of fixed string header 1 byte - flags, 2 bytes - columns counter */
>  #define FIXED_HEADER_SIZE 3
> +/*
> +  length of fixed string header with names
> +  1 byte - flags, 2 bytes - columns counter,  2 bytes - name pool size
> +*/
> +#define FIXED_HEADER_SIZE_NM 5
>  
>  #define COLUMN_NUMBER_SIZE 2
> +/* 1 byte name length + 2 bytes offset from the name pool */
> +#define COLUMN_NAMEPTR_SIZE 3
>  
>  #define MAX_OFFSET_LENGTH  5
>  
> +#define DYNCOL_NUM_CHAR 6
> +
> +my_bool dynamic_column_has_names(DYNAMIC_COLUMN *str)
> +{
> +  if (str->length < 1)
> +    return FALSE;
> +  return test(str->str[0] & DYNCOL_FLG_NAMES);
> +}
> +
>  static enum enum_dyncol_func_result
>  dynamic_column_time_store(DYNAMIC_COLUMN *str,
>                            MYSQL_TIME *value);
> @@ -62,6 +89,311 @@ dynamic_column_time_read_internal(DYNAMI
>  static enum enum_dyncol_func_result
>  dynamic_column_date_read_internal(DYNAMIC_COLUMN_VALUE *store_it_here,
>                                    uchar *data, size_t length);
> +static enum enum_dyncol_func_result
> +dynamic_column_get_internal(DYNAMIC_COLUMN *str,
> +                                DYNAMIC_COLUMN_VALUE *store_it_here,
> +                                uint num_key, LEX_STRING *str_key);
> +static enum enum_dyncol_func_result
> +dynamic_column_exists_internal(DYNAMIC_COLUMN *str, uint num_key,
> +                               LEX_STRING *str_key);
> +enum enum_dyncol_func_result
> +dynamic_column_update_many_fmt(DYNAMIC_COLUMN *str,
> +                               uint add_column_count,
> +                               void *column_keys,
> +                               DYNAMIC_COLUMN_VALUE *values,
> +                               my_bool string_keys);
> +static int plan_sort_num(const void *a, const void *b);
> +static int plan_sort_str(const void *a, const void *b);
> +
> +/*
> +  Structure to hold information about dynamic columns record and
> +  iterate through it.
> +*/
> +
> +struct st_dyn_header
> +{
> +  uchar *header, *nmpool, *dtpool, *data_end;
> +  size_t offset_size;
> +  size_t entry_size;
> +  size_t header_size;
> +  size_t nmpool_size;
> +  size_t data_size;
> +  /* DYNCOL_FMT_NUM - numeric columns, DYNCOL_FMT_STR - column names */
> +  uint format;
> +  uint column_count;
> +
> +  uchar *entry, *data, *name;
> +  size_t offset;
> +  uint length;
> +  enum enum_dynamic_column_type type;
> +};
> +
> +typedef struct st_dyn_header DYN_HEADER;
> +
> +static inline my_bool read_fixed_header(DYN_HEADER *hdr,
> +                                        DYNAMIC_COLUMN *str);
> +static void set_fixed_header(DYNAMIC_COLUMN *str,
> +                             uint offset_size,
> +                             uint column_count);
> +static my_bool type_and_offset_store(uchar *place, size_t offset_size,
> +                                     DYNAMIC_COLUMN_TYPE type,
> +                                     size_t offset);
> +
> +/*
> +  Calculate entry size (E) and header size (H) by offset size (O) and column
> +  count (C) and fixed part of entry size (F).
> +*/
> +
> +#define calc_param(E,H,F,O,C) do { \
> +  (*(E))= (O) + F;                 \
> +  (*(H))= (*(E)) * (C);            \
> +}while(0);
> +
> +
> +/**
> +  Name pool size functions, for numeric format it is 0
> +*/
> +
> +size_t name_size_num(void *keys __attribute__((unused)),
> +                     uint i __attribute__((unused)))
> +{
> +  return 0;
> +}
> +
> +
> +/**
> +  Name pool size functions.
> +*/
> +size_t name_size_str(void *keys, uint i)
> +{
> +  return ((LEX_STRING *) keys)[i].length;
> +}
> +
> +
> +/**
> +  Comparator function for references on column numbers for qsort
> +  (numeric format)
> +*/
> +
> +static int column_sort_num(const void *a, const void *b)
> +{
> +  return **((uint **)a) - **((uint **)b);
> +}
> +
> +
> +/**
> +  Comparator function for references on column numbers for qsort
> +  (names format)
> +*/
> +
> +static int column_sort_str(const void *a, const void *b)
> +{
> +  LEX_STRING *s1= *((LEX_STRING **)a);
> +  LEX_STRING *s2= *((LEX_STRING **)b);
> +  int rc= s1->length - s2->length;
> +  if (rc == 0)
> +    rc= memcmp((void *)s1->str, (void *)s2->str, (size_t) s1->length);
> +  return rc;
> +}
> +
> +
> +/**
> +  Check limit function (numeric format)
> +*/
> +
> +static my_bool check_limit_num(const void *val)
> +{
> +  return **((uint **)val) > UINT_MAX16;
> +}
> +
> +
> +/**
> +  Check limit function (names format)
> +*/
> +
> +static my_bool check_limit_str(const void *val)
> +{
> +  return (*((LEX_STRING **)val))->length > MAX_NAME_LENGTH;
> +}
> +
> +
> +/**
> +  Write numeric format static header part.
> +*/
> +
> +void set_fixed_header_num(DYNAMIC_COLUMN *str, DYN_HEADER *hdr)
> +{
> +  set_fixed_header(str, hdr->offset_size, hdr->column_count);
> +  hdr->header= (uchar *)str->str + FIXED_HEADER_SIZE;
> +  hdr->nmpool= hdr->dtpool= hdr->header + hdr->header_size;
> +}
> +
> +
> +/**
> +  Write names format static header part.
> +*/
> +
> +void set_fixed_header_str(DYNAMIC_COLUMN *str, DYN_HEADER *hdr)
> +{
> +  set_fixed_header(str, hdr->offset_size, hdr->column_count);

There's no need to write the column count there, you save two bytes per record.
a column count = (offset of the first name - size of the fized header) / (size of the fixed per column header)

> +  str->str[0]|= DYNCOL_FLG_NAMES;
> +  int2store(str->str + 3, hdr->nmpool_size);
> +  hdr->header= (uchar *)str->str + FIXED_HEADER_SIZE_NM;
> +  hdr->nmpool= hdr->header + hdr->header_size;
> +  hdr->dtpool= hdr->nmpool + hdr->nmpool_size;
> +}
> +
> +
> +/**
> +  Write numeric format header entry
> +   2 bytes - column number
> +   1-4 bytes - data offset combined with type
> +
> +  @param hdr             descriptor of dynamic column record
> +  @param column_key      pointer to uint (column number)
> +  @param value           value which will be written (only type used)
> +  @param offset          offset of the data
> +*/
> +
> +my_bool put_header_entry_num(DYN_HEADER *hdr,

this is and many other functions should be static

> +                             void *column_key,
> +                             DYNAMIC_COLUMN_VALUE *value,
> +                             size_t offset)
> +{
> +  uint *column_number= (uint *)column_key;
> +  int2store(hdr->entry, *column_number);
> +  DBUG_ASSERT(hdr->nmpool_size == 0);
> +  if (type_and_offset_store(hdr->entry, hdr->offset_size,
> +                            value->type,
> +                            offset))
> +      return TRUE;
> +  hdr->entry= hdr->entry + hdr->entry_size;
> +  return FALSE;
> +}
> +
> +
> +/**
> +  Write names format header entry
> +   1 byte - name length
> +   2 bytes - name offset in the name pool

as discussed on the irc, please store either lengths or offsets, but not both

> +   1-4 bytes - data offset combined with type
> +
> +  @param hdr             descriptor of dynamic column record
> +  @param column_key      pointer to LEX_STRING (column name)
> +  @param value           value which will be written (only type used)
> +  @param offset          offset of the data
> +*/
> +
> +my_bool put_header_entry_str(DYN_HEADER *hdr,
> +                             void *column_key,
> +                             DYNAMIC_COLUMN_VALUE *value,
> +                             size_t offset)
> +{
> +  LEX_STRING *column_name= (LEX_STRING *)column_key;
> +  DBUG_ASSERT(column_name->length <= MAX_NAME_LENGTH);
> +  hdr->entry[0]= column_name->length;
> +  DBUG_ASSERT(hdr->name - hdr->nmpool < (long) 0x10000L);
> +  int2store(hdr->entry + 1, hdr->name - hdr->nmpool);
> +  memcpy(hdr->name, column_name->str, column_name->length);
> +  DBUG_ASSERT(hdr->nmpool_size != 0 || column_name->length == 0);
> +  if (type_and_offset_store(hdr->entry + 1, hdr->offset_size,
> +                            value->type,
> +                            offset))
> +    return TRUE;
> +  hdr->entry+= hdr->entry_size;
> +  hdr->name+= column_name->length;
> +  return FALSE;
> +}
> +
> +
> +/**
> +  Format descriptor, contain constants and function references for
> +  format processing
> +*/
> +
> +struct st_service_funcs
> +{
> +  /* size of fixed header */
> +  uint fixed_hdr;
> +  /* size of fixed part of header entry */
> +  uint fixed_hdr_entry;
> +
> +  /*size of array element which stores keys */
> +  uint key_size_in_array;
> +
> +  size_t (*name_size)
> +    (void *, uint);
> +  int (*column_sort)
> +    (const void *a, const void *b);
> +  my_bool (*check_limit)
> +    (const void *val);
> +  void (*set_fixed_hdr)
> +    (DYNAMIC_COLUMN *str, DYN_HEADER *hdr);
> +  my_bool (*put_header_entry)(DYN_HEADER *hdr,
> +                              void *column_key,
> +                              DYNAMIC_COLUMN_VALUE *value,
> +                              size_t offset);
> +  int (*plan_sort)(const void *a, const void *b);
> +};
> +
> +
> +/**
> +  Actual our 2 format descriptors
> +*/
> +
> +static struct st_service_funcs fmt_data[2]=
> +{
> +  {
> +    FIXED_HEADER_SIZE,
> +    COLUMN_NUMBER_SIZE,
> +    sizeof(uint),
> +    &name_size_num,
> +    &column_sort_num,
> +    &check_limit_num,
> +    &set_fixed_header_num,
> +    &put_header_entry_num,
> +    &plan_sort_num
> +  },
> +  {
> +    FIXED_HEADER_SIZE_NM,
> +    COLUMN_NAMEPTR_SIZE,
> +    sizeof(LEX_STRING),
> +    &name_size_str,
> +    &column_sort_str,
> +    &check_limit_str,
> +    &set_fixed_header_str,
> +    &put_header_entry_str,
> +    &plan_sort_str
> +  }
> +};
> +
> +
> +/**
> +  Read dynamic column record header and fill the descriptor
> +
> +  @param hdr             dynamic columns record descriptor to fill
> +  @param str             dynamic columns record
> +
> +  @return ER_DYNCOL_* return code
> +*/
> +
> +enum enum_dyncol_func_result
> +init_read_hdr(DYN_HEADER *hdr, DYNAMIC_COLUMN *str)
> +{
> +  if (read_fixed_header(hdr, str))
> +    return ER_DYNCOL_FORMAT;
> +  hdr->header= (uchar*)str->str + fmt_data[hdr->format].fixed_hdr;
> +  calc_param(&hdr->entry_size, &hdr->header_size,
> +             fmt_data[hdr->format].fixed_hdr_entry, hdr->offset_size,
> +             hdr->column_count);
> +  hdr->nmpool= hdr->header + hdr->header_size;
> +  hdr->dtpool= hdr->nmpool + hdr->nmpool_size;
> +  hdr->data_size= str->length - fmt_data[hdr->format].fixed_hdr -
> +    hdr->header_size - hdr->nmpool_size;
> +  hdr->data_end= (uchar*)str->str + str->length;
> +  return ER_DYNCOL_OK;
> +}
> +
>  
>  /**
>    Initialize dynamic column string with (make it empty but correct format)
> @@ -1640,83 +2132,763 @@ dynamic_column_list(DYNAMIC_COLUMN *str,
>  
>  
>  /**
> +  List not-null columns in the packed string (any format)
> +
> +  @param str             The packed string
> +  @param array_of_lexstr Where to put reference on created array
> +
> +  @return ER_DYNCOL_* return code
> +*/
> +
> +enum enum_dyncol_func_result
> +dynamic_column_list_str(DYNAMIC_COLUMN *str, DYNAMIC_ARRAY *array_of_lexstr)
> +{
> +  DYN_HEADER header;
> +  uchar *read;
> +  struct st_service_funcs *fmt;
> +  uint i;
> +  enum enum_dyncol_func_result rc;
> +
> +  bzero(array_of_lexstr, sizeof(*array_of_lexstr)); /* In case of errors */
> +  if (str->length == 0)
> +    return ER_DYNCOL_OK;                        /* no columns */
> +
> +  if ((rc= init_read_hdr(&header, str)) < 0)
> +    return rc;
> +
> +  fmt= fmt_data + header.format;
> +
> +  if (header.entry_size * header.column_count + fmt->fixed_hdr >
> +      str->length)
> +    return ER_DYNCOL_FORMAT;
> +
> +  if (init_dynamic_array(array_of_lexstr, sizeof(LEX_STRING),
> +                         header.column_count, 0))
> +    return ER_DYNCOL_RESOURCE;
> +
> +  for (i= 0, read= header.header;
> +       i < header.column_count;
> +       i++, read+= header.entry_size)
> +  {
> +    LEX_STRING tmp;
> +    if (header.format == DYNCOL_FMT_NUM)
> +    {
> +      uint nm= uint2korr(read);
> +      tmp.str= my_malloc(DYNCOL_NUM_CHAR, MYF(0));

ouch. do you really need to do header.column_count small mallocs here?

> +      if (!tmp.str)
> +        return ER_DYNCOL_RESOURCE;
> +      tmp.length= snprintf(tmp.str, DYNCOL_NUM_CHAR, "%u", nm);

longlong2str or longlong10_to_str

> +    }
> +    else
> +    {
> +      tmp.length= read[0];
> +      tmp.str= my_malloc(tmp.length + 1, MYF(0));

ouch. do you really need to do header.column_count small mallocs here?

> +      if(!tmp.str)
> +        return ER_DYNCOL_RESOURCE;
> +      memcpy(tmp.str, (const void *)header.nmpool + uint2korr(read + 1),
> +             tmp.length);
> +      tmp.str[tmp.length]= '\0'; // just for safety
> +    }
> +    /* Insert can't never fail as it's pre-allocated above */
> +    (void) insert_dynamic(array_of_lexstr, (uchar *)&tmp);
> +  }
> +  return ER_DYNCOL_OK;
> +}
> +
> +/**
>    Find the place of the column in the header or place where it should be put
>  
> -  @param num             Number of the column
> -  @param header          Pointer to the header
> -  @param entry_size      Size of a header entry
> -  @param column_count    Number of columns in the packed string
> -  @param entry           Return pointer to the entry or next entry
> +  @param hdr             descriptor of dynamic column record
> +  @param key             Name or number of column to fetch
> +                         (depends on string_key)
> +  @param string_key      True if we gave pointer to LEX_STRING.
>  
>    @retval TRUE found
>    @retval FALSE pointer set to the next row
>  */
>  
>  static my_bool
> -find_place(uint num, uchar *header, size_t entry_size,
> -           uint column_count, uchar **entry)
> +find_place(DYN_HEADER *hdr, void *key, my_bool string_keys)
>  {
>    uint mid, start, end, val;
>    int flag;
> +  LEX_STRING str;
> +  char buff[DYNCOL_NUM_CHAR];
> +  my_bool need_conversion= ((string_keys ? DYNCOL_FMT_STR : DYNCOL_FMT_NUM) !=
> +                            hdr->format);
>    LINT_INIT(flag);                              /* 100 % safe */

UNINIT_VAR is preferable, when possible (but it doesn't work with structures)

> +  /* new format can't be numeric if the old one is names */
> +  DBUG_ASSERT(string_keys ||
> +              hdr->format == DYNCOL_FMT_NUM);
>  
>    start= 0;
> -  end= column_count -1;
> +  end= hdr->column_count -1;
>    mid= 1;
>    while (start != end)
>    {
> -   uint val;
> -   mid= (start + end) / 2;
> -   val= uint2korr(header + mid * entry_size);
> -   if ((flag= CMP_NUM(num, val)) <= 0)
> -     end= mid;
> -   else
> -     start= mid + 1;
> +    uint val;
> +    mid= (start + end) / 2;
> +    hdr->entry= hdr->header + mid * hdr->entry_size;
> +    if (!string_keys)
> +    {
> +      val= uint2korr(hdr->entry);
> +      flag= CMP_NUM(*((uint *)key), val);
> +    }
> +    else
> +    {
> +      if (need_conversion)
> +      {
> +        str.str= backwritenum(buff + sizeof(buff), uint2korr(hdr->entry));
> +        str.length= (buff + sizeof(buff)) - str.str;
> +      }
> +      else
> +      {
> +        DBUG_ASSERT(hdr->format == DYNCOL_FMT_STR);
> +        str.length= hdr->entry[0];
> +        str.str= (char *)hdr->nmpool + uint2korr(hdr->entry + 1);
> +      }
> +      flag= ((LEX_STRING *) key)->length - str.length;
> +      if (flag == 0)
> +        flag= memcmp(((LEX_STRING *) key)->str, str.str, str.length);
> +    }
> +    if (flag <= 0)
> +      end= mid;
> +    else
> +      start= mid + 1;
>    }
> +  hdr->entry= hdr->header + start * hdr->entry_size;
>    if (start != mid)
>    {
> -    val= uint2korr(header + start * entry_size);
> -    flag= CMP_NUM(num, val);
> +    if (!string_keys)
> +    {
> +      val= uint2korr(hdr->entry);
> +      flag= CMP_NUM(*((uint *)key), val);
> +    }
> +    else
> +    {
> +      if (need_conversion)
> +      {
> +        str.str= backwritenum(buff + sizeof(buff), uint2korr(hdr->entry));
> +        str.length= (buff + sizeof(buff)) - str.str;
> +      }
> +      else
> +      {
> +        DBUG_ASSERT(hdr->format == DYNCOL_FMT_STR);
> +        str.length= hdr->entry[0];
> +        str.str= (char*) hdr->nmpool + uint2korr(hdr->entry + 1);
> +      }
> +      flag= ((LEX_STRING *) key)->length - str.length;
> +      if (flag == 0)
> +        flag= memcmp(((LEX_STRING *) key)->str, str.str, str.length);
> +    }
>    }
> -  *entry= header + start * entry_size;
>    if (flag > 0)
> -    *entry+= entry_size;        /* Point at next bigger key */
> +    hdr->entry+= hdr->entry_size; /* Point at next bigger key */
>    return flag == 0;
>  }
>  
>  
>  /*
> -  Description of plan of adding/removing/updating a packed string
> +  It is internal structure which describes plan of chenging the record

s/chenging/changing/

what do you mean "a plan" ?

> +  of dynamic columns
>  */
>  
>  typedef enum {PLAN_REPLACE, PLAN_ADD, PLAN_DELETE, PLAN_NOP} PLAN_ACT;
>  
>  struct st_plan {
>    DYNAMIC_COLUMN_VALUE *val;
> -  uint *num;
> +  void *key;
>    uchar *place;
>    size_t length;
> -  int hdelta, ddelta;
> +  int hdelta, ddelta, ndelta;
> +  uint mv_offset, mv_length, mv_end;
>    PLAN_ACT act;
>  };
>  typedef struct st_plan PLAN;
>  
>  
> -static int plan_sort(const void *a, const void *b)
> +/**
> +  Sort function for plan by column number
> +*/
> +
> +static int plan_sort_num(const void *a, const void *b)
>  {
> -  return ((PLAN *)a)->num[0] - ((PLAN *)b)->num[0];
> +  return *((uint *)((PLAN *)a)->key) - *((uint *)((PLAN *)b)->key);
> +}
> +
> +
> +/**
> +  Sort function for plan by column name
> +*/
> +
> +static int plan_sort_str(const void *a, const void *b)
> +{
> +  int res= (((LEX_STRING *)((PLAN *)a)->key)->length -
> +            ((LEX_STRING *)((PLAN *)b)->key)->length);
> +  if (res == 0)
> +    res= memcmp(((LEX_STRING *)((PLAN *)a)->key)->str,
> +                ((LEX_STRING *)((PLAN *)b)->key)->str,
> +                ((LEX_STRING *)((PLAN *)a)->key)->length);
> +  return res;
> +}
> +
> +#define DELTA_CHECK(S, D, C)        \
> +  if ((S) == 0)                     \
> +    (S)= (D);                       \
> +  else if (((S) > 0 && (D) < 0) ||  \
> +            ((S) < 0 && (D) > 0))   \
> +  {                                 \
> +    (C)= TRUE;                      \
> +  }
> +
> +/**
> +  Update dynamic column by copying in a new record (string).
> +
> +  @param str             Dynamic column record to change
> +  @param plan            Plan of changing the record
> +  @param add_column_count number of records in the plan array.
> +  @param hdr             descriptor of old dynamic column record
> +  @param new_hdr         descriptor of new dynamic column record
> +  @param convert         need conversion from numeric to names format
> +
> +  @return ER_DYNCOL_* return code
> +*/
> +
> +enum enum_dyncol_func_result
> +dynamic_column_update_copy(DYNAMIC_COLUMN *str, PLAN *plan,
> +                           uint add_column_count,
> +                           DYN_HEADER *hdr, DYN_HEADER *new_hdr,
> +                           my_bool convert)
> +{
> +  DYNAMIC_COLUMN tmp;
> +  struct st_service_funcs *fmt= fmt_data + hdr->format,
> +                          *new_fmt= fmt_data + new_hdr->format;
> +  uint i, j, k;
> +  size_t all_headers_size;
> +
> +  if (dynamic_column_init_str(&tmp,
> +                              (new_fmt->fixed_hdr + new_hdr->header_size +
> +                               new_hdr->nmpool_size +
> +                               new_hdr->data_size + DYNCOL_SYZERESERVE)))
> +  {
> +    return ER_DYNCOL_RESOURCE;
> +  }
> +  bzero(tmp.str, new_fmt->fixed_hdr);
> +  (*new_fmt->set_fixed_hdr)(&tmp, new_hdr);
> +  /* Adjust tmp to contain whole the future header */
> +  tmp.length= new_fmt->fixed_hdr + new_hdr->header_size + new_hdr->nmpool_size;
> +
> +
> +  /*
> +    Copy data to the new string
> +    i= index in array of changes
> +    j= index in packed string header index
> +  */
> +  new_hdr->entry= new_hdr->header;
> +  new_hdr->name= new_hdr->nmpool;
> +  all_headers_size= new_fmt->fixed_hdr +
> +    new_hdr->header_size + new_hdr->nmpool_size;
> +  for (i= 0, j= 0; i < add_column_count || j < hdr->column_count; i++)
> +  {
> +    size_t first_offset;
> +    uint start= j, end;
> +    LINT_INIT(first_offset);
> +
> +    /*
> +      Search in i and j for the next column to add from i and where to
> +      add.
> +    */
> +
> +    while (i < add_column_count && plan[i].act == PLAN_NOP)
> +      i++;                                    /* skip NOP */
> +
> +    if (i == add_column_count)
> +      j= end= hdr->column_count;
> +    else
> +    {
> +      /*
> +        old data portion. We don't need to check that j < column_count
> +        as plan[i].place is guaranteed to have a pointer inside the
> +        data.
> +      */
> +      while (hdr->header + j * hdr->entry_size < plan[i].place)
> +        j++;
> +      end= j;
> +      if ((plan[i].act == PLAN_REPLACE || plan[i].act == PLAN_DELETE))
> +        j++;                              /* data at 'j' will be removed */
> +    }
> +
> +    /*
> +      Adjust all headers since last loop.
> +      We have to do this as the offset for data has moved
> +    */
> +    for (k= start; k < end; k++)
> +    {
> +      uchar *read= hdr->header + k * hdr->entry_size;
> +      void *key;
> +      LEX_STRING name;
> +      size_t offs;
> +      uint nm;
> +      DYNAMIC_COLUMN_TYPE tp;
> +      char buff[DYNCOL_NUM_CHAR];
> +
> +      if (hdr->format == DYNCOL_FMT_NUM)
> +      {
> +        if (convert)
> +        {
> +          name.str= backwritenum(buff + sizeof(buff), uint2korr(read));
> +          name.length= (buff + sizeof(buff)) - name.str;
> +          key= &name;
> +        }
> +        else
> +        {
> +          nm= uint2korr(read);                    /* Column nummber */
> +          key= &nm;
> +        }
> +      }
> +      else
> +      {
> +        name.length= read[0];
> +        name.str= (char *) hdr->nmpool +  uint2korr(read + 1);
> +        key= &name;
> +      }
> +      if (type_and_offset_read(&tp, &offs,
> +                               read + fmt->fixed_hdr_entry, hdr->offset_size))
> +          goto err;
> +      if (k == start)
> +        first_offset= offs;
> +      else if (offs < first_offset)
> +        goto err;
> +
> +      offs+= plan[i].ddelta;
> +      {
> +        DYNAMIC_COLUMN_VALUE val;
> +        val.type= tp; // only the type used in the header
> +        if ((*new_fmt->put_header_entry)(new_hdr, key, &val, offs))
> +          goto err;
> +      }
> +    }
> +
> +    /* copy first the data that was not replaced in original packed data */
> +    if (start < end)
> +    {
> +      size_t data_size;
> +      /* Add old data last in 'tmp' */
> +      hdr->entry= hdr->header + start * hdr->entry_size;
> +      data_size=
> +        hdr_interval_length(hdr, hdr->header + end * hdr->entry_size);
> +      if (data_size == DYNCOL_OFFSET_ERROR ||
> +          (long) data_size < 0 ||
> +          data_size > hdr->data_size - first_offset)
> +        goto err;
> +
> +      memcpy(tmp.str + tmp.length, (char *)hdr->dtpool + first_offset,
> +             data_size);
> +      tmp.length+= data_size;
> +    }
> +
> +    /* new data adding */
> +    if (i < add_column_count)
> +    {
> +      if( plan[i].act == PLAN_ADD || plan[i].act == PLAN_REPLACE)
> +      {
> +        if ((*new_fmt->put_header_entry)(new_hdr, plan[i].key,
> +                                         plan[i].val,
> +                                         tmp.length - all_headers_size))
> +          goto err;
> +        data_store(&tmp, plan[i].val);        /* Append new data */
> +      }
> +    }
> +  }
> +  dynamic_column_column_free(str);
> +  *str= tmp;
> +  return ER_DYNCOL_OK;
> +err:
> +  dynamic_column_column_free(&tmp);
> +  return ER_DYNCOL_FORMAT;
> +}
> +
> +enum enum_dyncol_func_result
> +dynamic_column_update_move_left(DYNAMIC_COLUMN *str, PLAN *plan,
> +                                size_t offset_size,
> +                                size_t entry_size,
> +                                size_t header_size,
> +                                size_t new_offset_size,
> +                                size_t new_entry_size,
> +                                size_t new_header_size,
> +                                uint column_count,
> +                                uint new_column_count,
> +                                uint add_column_count,
> +                                uchar *header_end,
> +                                size_t max_offset)
> +{
> +  uchar *write;
> +  uchar *header_base= (uchar *)str->str + FIXED_HEADER_SIZE;
> +  uint i, j, k;
> +  size_t curr_offset;
> +
> +  write= (uchar *)str->str + FIXED_HEADER_SIZE;
> +  set_fixed_header(str, new_offset_size, new_column_count);
> +
> +  /*
> +    Move headers first.
> +    i= index in array of changes
> +    j= index in packed string header index
> +  */
> +  for (curr_offset= 0, i= 0, j= 0;
> +       i < add_column_count || j < column_count;
> +       i++)
> +  {
> +    size_t first_offset;
> +    uint start= j, end;
> +    LINT_INIT(first_offset);
> +
> +    /*
> +      Search in i and j for the next column to add from i and where to
> +      add.
> +    */
> +
> +    while (i < add_column_count && plan[i].act == PLAN_NOP)
> +      i++;                                    /* skip NOP */
> +
> +    if (i == add_column_count)
> +      j= end= column_count;
> +    else
> +    {
> +      /*
> +        old data portion. We don't need to check that j < column_count
> +        as plan[i].place is guaranteed to have a pointer inside the
> +        data.
> +      */
> +      while (header_base + j * entry_size < plan[i].place)
> +        j++;
> +      end= j;
> +      if ((plan[i].act == PLAN_REPLACE || plan[i].act == PLAN_DELETE))
> +        j++;                              /* data at 'j' will be removed */
> +    }
> +    plan[i].mv_end= end;
> +
> +    {
> +      DYNAMIC_COLUMN_TYPE tp;
> +      if (type_and_offset_read(&tp, &first_offset,
> +                               header_base + start * entry_size +
> +                               COLUMN_NUMBER_SIZE, offset_size))
> +        return ER_DYNCOL_FORMAT;
> +    }
> +    /* find data to be moved */
> +    if (start < end)
> +    {
> +      size_t data_size=
> +        get_length_interval(header_base + start * entry_size,
> +                            header_base + end * entry_size,
> +                            header_end, offset_size, max_offset);
> +      if (data_size == DYNCOL_OFFSET_ERROR ||
> +          (long) data_size < 0 ||
> +          data_size > max_offset - first_offset)
> +      {
> +        str->length= 0; // just something valid
> +        return ER_DYNCOL_FORMAT;
> +      }
> +      DBUG_ASSERT(curr_offset == first_offset + plan[i].ddelta);
> +      plan[i].mv_offset= first_offset;
> +      plan[i].mv_length= data_size;
> +      curr_offset+= data_size;
> +    }
> +    else
> +    {
> +      plan[i].mv_length= 0;
> +      plan[i].mv_offset= curr_offset;
> +    }
> +
> +    if (plan[i].ddelta == 0 && offset_size == new_offset_size &&
> +        plan[i].act != PLAN_DELETE)
> +      write+= entry_size * (end - start);
> +    else
> +    {
> +      /*
> +        Adjust all headers since last loop.
> +        We have to do this as the offset for data has moved
> +      */
> +      for (k= start; k < end; k++)
> +      {
> +        uchar *read= header_base + k * entry_size;
> +        size_t offs;
> +        uint nm;
> +        DYNAMIC_COLUMN_TYPE tp;
> +
> +        nm= uint2korr(read);                    /* Column nummber */
> +        if (type_and_offset_read(&tp, &offs, read + COLUMN_NUMBER_SIZE,
> +                                 offset_size))
> +          return ER_DYNCOL_FORMAT;
> +
> +        if (k > start && offs < first_offset)
> +        {
> +          str->length= 0; // just something valid
> +          return ER_DYNCOL_FORMAT;
> +        }
> +
> +        offs+= plan[i].ddelta;
> +        int2store(write, nm);
> +        /* write rest of data at write + COLUMN_NUMBER_SIZE */
> +        type_and_offset_store(write, new_offset_size, tp, offs);
> +        write+= new_entry_size;
> +      }
> +    }
> +
> +    /* new data adding */
> +    if (i < add_column_count)
> +    {
> +      if( plan[i].act == PLAN_ADD || plan[i].act == PLAN_REPLACE)
> +      {
> +        int2store(write, *((uint *)plan[i].key));
> +        type_and_offset_store(write, new_offset_size,
> +                              plan[i].val[0].type,
> +                              curr_offset);
> +        write+= new_entry_size;
> +        curr_offset+= plan[i].length;
> +      }
> +    }
> +  }
> +
> +  /*
> +    Move data.
> +    i= index in array of changes
> +    j= index in packed string header index
> +  */
> +  str->length= (FIXED_HEADER_SIZE + new_header_size);
> +  for (i= 0, j= 0;
> +       i < add_column_count || j < column_count;
> +       i++)
> +  {
> +    uint start= j, end;
> +
> +    /*
> +      Search in i and j for the next column to add from i and where to
> +      add.
> +    */
> +
> +    while (i < add_column_count && plan[i].act == PLAN_NOP)
> +      i++;                                    /* skip NOP */
> +
> +    j= end= plan[i].mv_end;
> +    if (i != add_column_count &&
> +        (plan[i].act == PLAN_REPLACE || plan[i].act == PLAN_DELETE))
> +      j++;
> +
> +    /* copy first the data that was not replaced in original packed data */
> +    if (start < end && plan[i].mv_length)
> +    {
> +      memmove((header_base + new_header_size +
> +               plan[i].mv_offset + plan[i].ddelta),
> +              header_base + header_size + plan[i].mv_offset,
> +              plan[i].mv_length);
> +    }
> +    str->length+= plan[i].mv_length;
> +
> +    /* new data adding */
> +    if (i < add_column_count)
> +    {
> +      if( plan[i].act == PLAN_ADD || plan[i].act == PLAN_REPLACE)
> +      {
> +        data_store(str, plan[i].val);        /* Append new data */
> +      }
> +    }
> +  }
> +  return ER_DYNCOL_OK;
> +}
> +
> +enum enum_dyncol_func_result
> +dynamic_column_update_move_right(DYNAMIC_COLUMN *str, PLAN *plan,
> +                                 size_t offset_size,
> +                                 size_t entry_size,
> +                                 size_t header_size,
> +                                 size_t new_offset_size,
> +                                 size_t new_entry_size,
> +                                 size_t new_header_size,
> +                                 uint column_count,
> +                                 uint new_column_count,
> +                                 uint add_column_count,
> +                                 uchar *header_end,
> +                                 size_t max_offset)
> +{
> +  uchar *write;
> +  uchar *header_base= (uchar *)str->str + FIXED_HEADER_SIZE;
> +  uint i, j, k;
> +  size_t curr_offset;
> +
> +  write= (uchar *)str->str + FIXED_HEADER_SIZE;
> +  set_fixed_header(str, new_offset_size, new_column_count);
> +
> +  /*
> +    Move data first.
> +    i= index in array of changes
> +    j= index in packed string header index
> +  */
> +  for (curr_offset= 0, i= 0, j= 0;
> +       i < add_column_count || j < column_count;
> +       i++)
> +  {
> +    size_t first_offset;
> +    uint start= j, end;
> +    LINT_INIT(first_offset);
> +
> +    /*
> +      Search in i and j for the next column to add from i and where to
> +      add.
> +    */
> +
> +    while (i < add_column_count && plan[i].act == PLAN_NOP)
> +      i++;                                    /* skip NOP */
> +
> +    if (i == add_column_count)
> +      j= end= column_count;
> +    else
> +    {
> +      /*
> +        old data portion. We don't need to check that j < column_count
> +        as plan[i].place is guaranteed to have a pointer inside the
> +        data.
> +      */
> +      while (header_base + j * entry_size < plan[i].place)
> +        j++;
> +      end= j;
> +      if ((plan[i].act == PLAN_REPLACE || plan[i].act == PLAN_DELETE))
> +        j++;                              /* data at 'j' will be removed */
> +    }
> +    plan[i].mv_end= end;
> +
> +    {
> +      DYNAMIC_COLUMN_TYPE tp;
> +      type_and_offset_read(&tp, &first_offset,
> +                           header_base + start * entry_size + COLUMN_NUMBER_SIZE, offset_size);
> +    }
> +    /* find data to be moved */
> +    if (start < end)
> +    {
> +      size_t data_size=
> +        get_length_interval(header_base + start * entry_size,
> +                            header_base + end * entry_size,
> +                            header_end, offset_size, max_offset);
> +      if (data_size == DYNCOL_OFFSET_ERROR ||
> +          (long) data_size < 0 ||
> +          data_size > max_offset - first_offset)
> +      {
> +        str->length= 0; // just something valid
> +        return ER_DYNCOL_FORMAT;
> +      }
> +      DBUG_ASSERT(curr_offset == first_offset + plan[i].ddelta);
> +      plan[i].mv_offset= first_offset;
> +      plan[i].mv_length= data_size;
> +      curr_offset+= data_size;
> +    }
> +    else
> +    {
> +      plan[i].mv_length= 0;
> +      plan[i].mv_offset= curr_offset;
> +    }
> +
> +    if (plan[i].ddelta == 0 && offset_size == new_offset_size &&
> +        plan[i].act != PLAN_DELETE)
> +      write+= entry_size * (end - start);
> +    else
> +    {
> +      /*
> +        Adjust all headers since last loop.
> +        We have to do this as the offset for data has moved
> +      */
> +      for (k= start; k < end; k++)
> +      {
> +        uchar *read= header_base + k * entry_size;
> +        size_t offs;
> +        uint nm;
> +        DYNAMIC_COLUMN_TYPE tp;
> +
> +        nm= uint2korr(read);                    /* Column nummber */
> +        type_and_offset_read(&tp, &offs, read + COLUMN_NUMBER_SIZE, offset_size);
> +        if (k > start && offs < first_offset)
> +        {
> +          str->length= 0; // just something valid
> +          return ER_DYNCOL_FORMAT;
> +        }
> +
> +        offs+= plan[i].ddelta;
> +        int2store(write, nm);
> +        /* write rest of data at write + COLUMN_NUMBER_SIZE */
> +        if (type_and_offset_store(write, new_offset_size, tp, offs))
> +        {
> +          str->length= 0; // just something valid
> +          return ER_DYNCOL_FORMAT;
> +        }
> +        write+= new_entry_size;
> +      }
> +    }
> +
> +    /* new data adding */
> +    if (i < add_column_count)
> +    {
> +      if( plan[i].act == PLAN_ADD || plan[i].act == PLAN_REPLACE)
> +      {
> +        int2store(write, *((uint *)plan[i].key));
> +        if (type_and_offset_store(write, new_offset_size,
> +                                  plan[i].val[0].type,
> +                                  curr_offset))
> +        {
> +          str->length= 0; // just something valid
> +          return ER_DYNCOL_FORMAT;
> +        }
> +        write+= new_entry_size;
> +        curr_offset+= plan[i].length;
> +      }
> +    }
> +  }
> +
> +  /*
> +    Move headers.
> +    i= index in array of changes
> +    j= index in packed string header index
> +  */
> +  str->length= (FIXED_HEADER_SIZE + new_header_size);
> +  for (i= 0, j= 0;
> +       i < add_column_count || j < column_count;
> +       i++)
> +  {
> +    uint start= j, end;
> +
> +    /*
> +      Search in i and j for the next column to add from i and where to
> +      add.
> +    */
> +
> +    while (i < add_column_count && plan[i].act == PLAN_NOP)
> +      i++;                                    /* skip NOP */
> +
> +    j= end= plan[i].mv_end;
> +    if (i != add_column_count &&
> +        (plan[i].act == PLAN_REPLACE || plan[i].act == PLAN_DELETE))
> +      j++;
> +
> +    /* copy first the data that was not replaced in original packed data */
> +    if (start < end && plan[i].mv_length)
> +    {
> +      memmove((header_base + new_header_size +
> +               plan[i].mv_offset + plan[i].ddelta),
> +              header_base + header_size + plan[i].mv_offset,
> +              plan[i].mv_length);
> +    }
> +    str->length+= plan[i].mv_length;
> +
> +    /* new data adding */
> +    if (i < add_column_count)
> +    {
> +      if( plan[i].act == PLAN_ADD || plan[i].act == PLAN_REPLACE)
> +      {
> +        data_store(str, plan[i].val);        /* Append new data */
> +      }
> +    }
> +  }
> +  return ER_DYNCOL_OK;
>  }
>  
> -#define DELTA_CHECK(S, D, C)        \
> -  if ((S) == 0)                     \
> -    (S)= (D);                       \
> -  else if (((S) > 0 && (D) < 0) ||  \
> -            ((S) < 0 && (D) > 0))   \
> -  {                                 \
> -    (C)= TRUE;                      \
> -    break;                          \
> -  }                                 \
> -
>  
>  /**
>    Update the packed string with the given columns
> @@ -1826,26 +3044,36 @@ dynamic_column_update_many(DYNAMIC_COLUM
>  
>      /* Set common variables for all plans */
>      plan[i].ddelta= data_delta;
> +    plan[i].ndelta= name_delta;
>      /* get header delta in entries */
>      plan[i].hdelta= header_delta;
>      plan[i].length= 0;                          /* Length if NULL */
>  
> -    if (find_place(plan[i].num[0],
> -                   (uchar *)str->str + FIXED_HEADER_SIZE,
> -                   entry_size, column_count, &entry))
> +    if (find_place(&header, plan[i].key, string_keys))
>      {
> -      size_t entry_data_size;
> +      size_t entry_data_size, entry_name_size= 0;
>  
>        /* Data existed; We have to replace or delete it */
>  
> -      entry_data_size= get_length(entry, header_end,
> -                                  offset_size, max_offset);
> -      if ((long) entry_data_size < 0)
> +      entry_data_size= hdr_interval_length(&header, header.entry +
> +                                           header.entry_size);
> +      if (entry_data_size == DYNCOL_OFFSET_ERROR ||
> +          (long) entry_data_size < 0)
>        {
>          rc= ER_DYNCOL_FORMAT;
>          goto end;
>        }
>  
> +        //get_length(header.entry, header.dtpool, header.offset_size,
> +        //header.data_size);

did you forget to delete this?

> +      if (new_header.format == DYNCOL_FMT_STR)
> +      {
> +        if (header.format == DYNCOL_FMT_STR)
> +          entry_name_size= header.entry[0];
> +        else
> +          entry_name_size= numlen(uint2korr(header.entry));
> +      }
> +
>        if (plan[i].val->type == DYN_COL_NULL)
>        {
>          /* Inserting a NULL means delete the old data */
> @@ -1891,219 +3124,759 @@ dynamic_column_update_many(DYNAMIC_COLUM
>            goto end;
>          }
>          data_delta+= plan[i].length;
> +        if (new_header.format == DYNCOL_FMT_STR)
> +          name_delta+= ((LEX_STRING *)plan[i].key)->length;
>        }
>      }
> -    plan[i].place= entry;
> +    plan[i].place= header.entry;
>    }
>    plan[add_column_count].hdelta= header_delta;
>    plan[add_column_count].ddelta= data_delta;
> -  new_column_count= column_count + header_delta;
> +  plan[add_column_count].act= PLAN_NOP;
> +  plan[add_column_count].place= header.dtpool;
> +
> +  new_header.column_count= header.column_count + header_delta;
>  
>    /*
>      Check if it is only "increasing" or only "decreasing" plan for (header
>      and data separately).
>    */
> -  data_size= str->length - header_size - FIXED_HEADER_SIZE;
> -  new_data_size= data_size + data_delta;
> -  if ((new_offset_size= dynamic_column_offset_bytes(new_data_size)) >=
> +  new_header.data_size= header.data_size + data_delta;
> +  new_header.nmpool_size= new_header.nmpool_size + name_delta;
> +  DBUG_ASSERT(new_header.format != DYNCOL_FMT_NUM ||
> +              new_header.nmpool_size == 0);
> +  if ((new_header.offset_size=
> +       dynamic_column_offset_bytes(new_header.data_size)) >=
>        MAX_OFFSET_LENGTH)
>    {
>      rc= ER_DYNCOL_LIMIT;
>      goto end;
>    }
>  
> -#ifdef NOT_IMPLEMENTED
> -  /* if (new_offset_size != offset_size) then we have to rewrite header */
> -  header_delta_sign= new_offset_size - offset_size;
> +  copy= ((header.format != new_header.format) ||
> +         (new_header.format == DYNCOL_FMT_STR));
> +  /* if (new_header.offset_size!=offset_size) then we have to rewrite header */
> +  header_delta_sign=
> +    ((int)new_header.offset_size + new_fmt->fixed_hdr_entry) -
> +    ((int)header.offset_size + fmt->fixed_hdr_entry);
>    data_delta_sign= 0;
> -  for (i= 0; i < add_column_count; i++)
> +  // plan[add_column_count] contains last deltas.
> +  for (i= 0; i <= add_column_count && !copy; i++)
>    {
>      /* This is the check for increasing/decreasing */
>      DELTA_CHECK(header_delta_sign, plan[i].hdelta, copy);
>      DELTA_CHECK(data_delta_sign, plan[i].ddelta, copy);
>    }
> -#endif
> -  calc_param(&new_entry_size, &new_header_size,
> -             new_offset_size, new_column_count);
> +  calc_param(&new_header.entry_size, &new_header.header_size,
> +             new_fmt->fixed_hdr_entry,
> +             new_header.offset_size, new_header.column_count);
>  
>    /*
> -    The following code always make a copy. In future we can do a more
> -    optimized version when data is only increasing / decreasing.
> +    Need copy because:
> +    1. Header/data parts moved in different directions.
> +    2. There is no enough allocated space in the string.
> +    3. Header and data moved in different directions.
>    */
> +  if (copy || /*1*/

forgot to delete /*1*/ ?

> +      str->max_length < str->length + header_delta + data_delta || /*2*/
> +      ((header_delta_sign < 0 && data_delta_sign > 0) ||
> +       (header_delta_sign > 0 && data_delta_sign < 0))) /*3*/
> +    rc= dynamic_column_update_copy(str, plan, add_column_count,
> +                                   &header, &new_header,
> +                                   convert);
> +  else
> +    if (header_delta_sign < 0)
> +      rc= dynamic_column_update_move_left(str, plan, header.offset_size,
> +                                          header.entry_size,
> +                                          header.header_size,
> +                                          new_header.offset_size,
> +                                          new_header.entry_size,
> +                                          new_header.header_size,
> +                                          header.column_count,
> +                                          new_header.column_count,
> +                                          add_column_count, header.dtpool,
> +                                          header.data_size);
> +    else
> +      /*
> +      rc= dynamic_column_update_move_right(str, plan, offset_size,
> +                                           entry_size,  header_size,
> +                                           new_header.offset_size,
> +                                           new_header.entry_size,
> +                                           new_heder.header_size, column_count,
> +                                           new_header.column_count,
> +                                           add_column_count, header_end,
> +                                           header.data_size);
> +                                         */
> +      rc= dynamic_column_update_copy(str, plan, add_column_count,
> +                                     &header, &new_header,
> +                                     convert);
> +end:
> +  my_free(alloc_plan);
> +  return rc;
> +
> +create_new_string:
> +  /* There is no columns from before, so let's just add the new ones */
> +  rc= ER_DYNCOL_OK;
> +  my_free(alloc_plan);
> +  if (not_null != 0)
> +    rc= dynamic_column_create_many_internal_fmt(str, add_column_count,
> +                                                (uint*)column_keys, values,
> +                                                str->str == NULL,
> +                                                string_keys);
> +  goto end;
> +}
> +
> +
> +/**
> +  Update the packed string with the given column
> +
> +  @param str             String where to write the data
> +  @param column_number   Array of columns number
> +  @param values          Array of columns values
> +
> +  @return ER_DYNCOL_* return code
> +*/
> +
> +
> +int dynamic_column_update(DYNAMIC_COLUMN *str, uint column_nr,
> +                          DYNAMIC_COLUMN_VALUE *value)
> +{
> +  return dynamic_column_update_many(str, 1, &column_nr, value);
> +}
> +
> +
> +enum enum_dyncol_func_result
> +dynamic_column_check(DYNAMIC_COLUMN *str)
> +{
> +  struct st_service_funcs *fmt;
> +  enum enum_dyncol_func_result rc= ER_DYNCOL_FORMAT;
> +  DYN_HEADER header;
> +  uint i;
> +  size_t data_offset= 0, name_offset= 0;
> +  size_t prev_data_offset= 0, prev_name_offset= 0;
> +  LEX_STRING name= {0,0}, prev_name= {0,0};
> +  uint num= 0, prev_num= 0;
> +  void *key, *prev_key;
> +  enum enum_dynamic_column_type type= DYN_COL_NULL, prev_type= DYN_COL_NULL;
> +
> +  DBUG_ENTER("dynamic_column_check");
>  
> -  /*if (copy) */
> +  if (str->length == 0)
>    {
> -    DYNAMIC_COLUMN tmp;
> -    uchar *header_base= (uchar *)str->str + FIXED_HEADER_SIZE,
> -          *write;
> -    if (dynamic_column_init_str(&tmp,
> -                                (FIXED_HEADER_SIZE + new_header_size +
> -                                 new_data_size + DYNCOL_SYZERESERVE)))
> -    {
> -      rc= ER_DYNCOL_RESOURCE;
> -      goto end;
> -    }
> -    write= (uchar *)tmp.str + FIXED_HEADER_SIZE;
> -    /* Adjust tmp to contain whole the future header */
> -    tmp.length= FIXED_HEADER_SIZE + new_header_size;
> -    set_fixed_header(&tmp, new_offset_size, new_column_count);
> -    data_delta= 0;
> +    DBUG_PRINT("info", ("empty string is OK"));
> +    DBUG_RETURN(ER_DYNCOL_OK);
> +  }
>  
> -    /*
> -      Copy data to the new string
> -      i= index in array of changes
> -      j= index in packed string header index
> -    */
> +  bzero(&header, sizeof(header));
>  
> -    for (i= 0, j= 0; i < add_column_count || j < column_count; i++)
> -    {
> -      size_t first_offset;
> -      uint start= j, end;
> -      LINT_INIT(first_offset);
> +  /* Check that header is OK */
> +  if (read_fixed_header(&header, str))
> +  {
> +    DBUG_PRINT("info", ("Reading fixed string header failed"));
> +    goto end;
> +  }
> +  fmt= fmt_data + header.format;
> +  calc_param(&header.entry_size, &header.header_size,
> +             fmt->fixed_hdr_entry, header.offset_size,
> +             header.column_count);
> +  /* headers are out of string length (no space for data and part of headers) */
> +  if (fmt->fixed_hdr + header.header_size + header.nmpool_size > str->length)
> +  {
> +    DBUG_PRINT("info", ("Fixed header: %u  Header size: %u  "
> +                        "Name pool size: %u  but Strig length: %u",
> +                        (uint)fmt->fixed_hdr,
> +                        (uint)header.header_size,
> +                        (uint)header.nmpool_size,
> +                        (uint)str->length));
> +    goto end;
> +  }
> +  header.header= (uchar*)str->str + fmt->fixed_hdr;
> +  header.nmpool= header.header + header.header_size;
> +  header.dtpool= header.nmpool + header.nmpool_size;
> +  header.data_size= str->length - fmt->fixed_hdr -
> +    header.header_size - header.nmpool_size;
>  
> -      /*
> -        Search in i and j for the next column to add from i and where to
> -        add.
> -      */
> +  /* read and check headers */
> +  if (header.format == DYNCOL_FMT_NUM)
> +  {
> +    key= &num;
> +    prev_key= &prev_num;
> +  }
> +  else
> +  {
> +    key= &name;
> +    prev_key= &prev_name;
> +  }
> +  for (i= 0, header.entry= header.header;
> +       i < header.column_count;
> +       i++, header.entry+= header.entry_size)
> +  {
>  
> -      while (i < add_column_count && plan[i].act == PLAN_NOP)
> -        i++;                                    /* skip NOP */
> -      if (i == add_column_count)
> -        j= end= column_count;
> -      else
> +    if (header.format == DYNCOL_FMT_NUM)
> +    {
> +       num= uint2korr(header.entry);
> +    }
> +    else
> +    {
> +      DBUG_ASSERT(header.format == DYNCOL_FMT_STR);
> +      name.length= header.entry[0];
> +      name_offset= uint2korr(header.entry + 1);
> +      name.str= (char *)header.nmpool + name_offset;
> +    }
> +    if (type_and_offset_read(&type, &data_offset,
> +                             header.entry + fmt->fixed_hdr_entry,
> +                             header.offset_size))
> +      goto end;
> +
> +    DBUG_ASSERT(type != DYN_COL_NULL);
> +    if (data_offset > header.data_size)
> +    {
> +      DBUG_PRINT("info", ("Field order: %u  Data offset: %u"
> +                          " > Data pool size: %u",
> +                          (uint)i,
> +                          (uint)name_offset,
> +                          (uint)header.nmpool_size));
> +      goto end;
> +    }
> +    if (name_offset > header.nmpool_size)
> +    {
> +      DBUG_PRINT("info", ("Field order: %u  Name offset: %u"
> +                          " > Name pool size: %u",
> +                          (uint)i,
> +                          (uint)name_offset,
> +                          (uint)header.nmpool_size));
> +      goto end;
> +    }
> +    if (prev_type != DYN_COL_NULL)
> +    {
> +      /* It is not first entry */
> +      if (prev_data_offset >= data_offset)
>        {
> -        /*
> -          old data portion. We don't need to check that j < column_count
> -          as plan[i].place is guaranteed to have a pointer inside the
> -          data.
> -        */
> -        while (header_base + j * entry_size < plan[i].place)
> -          j++;
> -        end= j;
> -        if ((plan[i].act == PLAN_REPLACE || plan[i].act == PLAN_DELETE))
> -          j++;                              /* data at 'j' will be removed */
> +        DBUG_PRINT("info", ("Field order: %u  Previous data offset: %u"
> +                            " >= Current data offset: %u",
> +                            (uint)i,
> +                            (uint)prev_data_offset,
> +                            (uint)data_offset));
> +        goto end;
>        }
> -
> -      if (plan[i].ddelta == 0 && offset_size == new_offset_size)
> +      if (prev_name_offset > name_offset)
>        {
> -        uchar *read= header_base + start * entry_size;
> -        DYNAMIC_COLUMN_TYPE tp;
> -        /*
> -          It's safe to copy the header unchanged. This is usually the
> -          case for the first header block before any changed data.
> -        */
> -        if (start < end)                        /* Avoid memcpy with 0 */
> -        {
> -          size_t length= entry_size * (end - start);
> -          memcpy(write, read, length);
> -          write+= length;
> -        }
> -        /* Read first_offset */
> -        type_and_offset_read(&tp, &first_offset, read, offset_size);
> +        DBUG_PRINT("info", ("Field order: %u  Previous name offset: %u"
> +                            " > Current name offset: %u",
> +                            (uint)i,
> +                            (uint)prev_data_offset,
> +                            (uint)data_offset));
> +        goto end;
>        }
> -      else
> +      if ((*fmt->column_sort)(&prev_key, &key) >= 0)
>        {
> -        /*
> -          Adjust all headers since last loop.
> -          We have to do this as the offset for data has moved
> -        */
> -        for (k= start; k < end; k++)
> -        {
> -          uchar *read= header_base + k * entry_size;
> -          size_t offs;
> -          uint nm;
> -          DYNAMIC_COLUMN_TYPE tp;
> +        DBUG_PRINT("info", ("Field order: %u  Previous key >= Current key",
> +                            (uint)i));
> +        goto end;
> +      }
> +    }
> +    prev_num= num;
> +    prev_name= name;
> +    prev_data_offset= data_offset;
> +    prev_name_offset= name_offset;
> +    prev_type= type;
> +  }
> +
> +  /* check data, which we can */
> +  for (i= 0, header.entry= header.header;
> +       i < header.column_count;
> +       i++, header.entry+= header.entry_size)
> +  {
> +    DYNAMIC_COLUMN_VALUE store;
> +    // already checked by previouse pass
> +    type_and_offset_read(&header.type, &header.offset,
> +                         header.entry + fmt->fixed_hdr_entry,
> +                         header.offset_size);
> +    header.length=
> +      hdr_interval_length(&header, header.entry + header.entry_size);
> +    header.data= header.dtpool + header.offset;
> +    switch ((header.type)) {
> +    case DYN_COL_INT:
> +      rc= dynamic_column_sint_read(&store, header.data, header.length);
> +      break;
> +    case DYN_COL_UINT:
> +      rc= dynamic_column_uint_read(&store, header.data, header.length);
> +      break;
> +    case DYN_COL_DOUBLE:
> +      rc= dynamic_column_double_read(&store, header.data, header.length);
> +      break;
> +    case DYN_COL_STRING:
> +      rc= dynamic_column_string_read(&store, header.data, header.length);
> +      break;
> +    case DYN_COL_DECIMAL:
> +      rc= dynamic_column_decimal_read(&store, header.data, header.length);
> +      break;
> +    case DYN_COL_DATETIME:
> +      rc= dynamic_column_date_time_read(&store, header.data,
> +                                        header.length);
> +      break;
> +    case DYN_COL_DATE:
> +      rc= dynamic_column_date_read(&store, header.data, header.length);
> +      break;
> +    case DYN_COL_TIME:
> +      rc= dynamic_column_time_read(&store, header.data, header.length);
> +      break;
> +    case DYN_COL_NULL:
> +    default:
> +      rc= ER_DYNCOL_FORMAT;
> +      goto end;
> +    }
> +    if (rc != ER_DYNCOL_OK)
> +    {
> +      DBUG_ASSERT(rc < 0);
> +      DBUG_PRINT("info", ("Field order: %u  Can't read data: %i",
> +                          (uint)i, (int) rc));
> +      goto end;
> +    }
> +  }
>  
> -          nm= uint2korr(read);                    /* Column nummber */
> -          type_and_offset_read(&tp, &offs, read, offset_size);
> -          if (k == start)
> -            first_offset= offs;
> -          else if (offs < first_offset)
> +  rc= ER_DYNCOL_OK;
> +end:
> +  DBUG_RETURN(rc);
> +}
> +
> +
> +enum enum_dyncol_func_result
> +dynamic_column_val_str(DYNAMIC_STRING *str, DYNAMIC_COLUMN_VALUE *val,
> +                       CHARSET_INFO *cs, my_bool quote)
> +{
> +  char buff[40];
> +  int len;
> +  switch (val->type) {
> +  case DYN_COL_INT:
> +      len= snprintf(buff, sizeof(buff), "%lld", val->x.long_value);
> +      if (dynstr_append_mem(str, buff, len))
> +        return ER_DYNCOL_RESOURCE;
> +      break;
> +    case DYN_COL_UINT:
> +      len= snprintf(buff, sizeof(buff), "%llu", val->x.ulong_value);
> +      if (dynstr_append_mem(str, buff, len))
> +        return ER_DYNCOL_RESOURCE;
> +      break;
> +    case DYN_COL_DOUBLE:
> +      len= snprintf(buff, sizeof(buff), "%lg", val->x.double_value);
> +      if (dynstr_realloc(str, len + (quote ? 2 : 0)))
> +        return ER_DYNCOL_RESOURCE;
> +      if (quote)
> +        str->str[str->length++]= '"';
> +      dynstr_append_mem(str, buff, len);
> +      if (quote)
> +        str->str[str->length++]= '"';
> +      break;
> +    case DYN_COL_STRING:
> +      {
> +        char *alloc= NULL;
> +        char *from= val->x.string.value.str;
> +        uint bufflen;
> +        my_bool conv= !my_charset_same(val->x.string.charset, cs);
> +        my_bool rc;
> +        len= val->x.string.value.length;
> +        bufflen= (len * (conv ? cs->mbmaxlen : 1));
> +        if (dynstr_realloc(str, bufflen))
> +            return ER_DYNCOL_RESOURCE;
> +
> +        // guaranty UTF-8 string for value
> +        if (!my_charset_same(val->x.string.charset, cs))
> +        {
> +          uint dummy_errors;
> +          if (!quote)
>            {
> -            dynamic_column_column_free(&tmp);
> -            rc= ER_DYNCOL_FORMAT;
> -            goto end;
> +            /* convert to the destination */
> +            str->length+= copy_and_convert_extended(str->str, bufflen,
> +                                                    cs,
> +                                                    from, len,
> +                                                    val->x.string.charset,
> +                                                    &dummy_errors);
> +            return ER_DYNCOL_OK;
>            }
> -
> -          offs+= plan[i].ddelta;
> -          int2store(write, nm);
> -          /* write rest of data at write + COLUMN_NUMBER_SIZE */
> -          type_and_offset_store(write, new_offset_size, tp, offs);
> -          write+= new_entry_size;
> +          if ((alloc= (char *)my_malloc(bufflen, MYF(0))))
> +          {
> +            len=
> +              copy_and_convert_extended(alloc, bufflen, cs,
> +                                        from, len, val->x.string.charset,
> +                                        &dummy_errors);
> +            from= alloc;
> +          }
> +          else
> +            return ER_DYNCOL_RESOURCE;
>          }
> +        if (quote)
> +          rc= dynstr_append_quoted(str, from, len);
> +        else
> +          rc= dynstr_append_mem(str, from, len);
> +        if (alloc)
> +          my_free(alloc);
> +        if (rc)
> +          return ER_DYNCOL_RESOURCE;
> +        break;
>        }
> +    case DYN_COL_DECIMAL:
> +      len= sizeof(buff);
> +      decimal2string(&val->x.decimal.value, buff, &len,
> +                     0, val->x.decimal.value.frac,
> +                     '0');
> +      if (dynstr_append_mem(str, buff, len))
> +        return ER_DYNCOL_RESOURCE;
> +      break;
> +    case DYN_COL_DATETIME:
> +    case DYN_COL_DATE:
> +    case DYN_COL_TIME:
> +      len= my_TIME_to_str(&val->x.time_value, buff, AUTO_SEC_PART_DIGITS);
> +      if (dynstr_realloc(str, len + (quote ? 2 : 0)))
> +        return ER_DYNCOL_RESOURCE;
> +      if (quote)
> +        str->str[str->length++]= '"';
> +      dynstr_append_mem(str, buff, len);
> +      if (quote)
> +        str->str[str->length++]= '"';
> +      break;
> +    case DYN_COL_NULL:
> +      if (dynstr_append_mem(str, "null", 4))
> +        return ER_DYNCOL_RESOURCE;
> +      break;
> +    default:
> +      return(ER_DYNCOL_FORMAT);
> +  }
> +  return(ER_DYNCOL_OK);
> +}
>  
> -      /* copy first the data that was not replaced in original packed data */
> -      if (start < end)
> +
> +enum enum_dyncol_func_result
> +dynamic_column_val_long(longlong *ll, DYNAMIC_COLUMN_VALUE *val)
> +{
> +  enum enum_dyncol_func_result rc= ER_DYNCOL_OK;
> +  *ll= 0;
> +  switch (val->type) {
> +  case DYN_COL_INT:
> +      *ll= val->x.long_value;
> +      break;
> +    case DYN_COL_UINT:
> +      *ll= (longlong)val->x.ulong_value;
> +      if (val->x.ulong_value > ULONGLONG_MAX)
> +         rc= ER_DYNCOL_TRUNCATED;
> +      break;
> +    case DYN_COL_DOUBLE:
> +      *ll= (longlong)val->x.double_value;
> +      if (((double) *ll) != val->x.double_value)
> +        rc= ER_DYNCOL_TRUNCATED;
> +      break;
> +    case DYN_COL_STRING:
>        {
> -        /* Add old data last in 'tmp' */
> -        size_t data_size=
> -          get_length_interval(header_base + start * entry_size,
> -                              header_base + end * entry_size,
> -                              header_end, offset_size, max_offset);
> -        if ((long) data_size < 0 ||
> -            data_size > max_offset - first_offset)
> +        longlong i= 0, sign= 1;
> +        char *src= val->x.string.value.str;
> +        uint len= val->x.string.value.length;
> +
> +        while (len && my_isspace(&my_charset_latin1, *src)) src++,len--;
> +
> +        if (len)
>          {
> -          dynamic_column_column_free(&tmp);
> -          rc= ER_DYNCOL_FORMAT;
> -          goto end;
> +          if (*src == '-')
> +          {
> +            sign= -1;
> +            src++;
> +          } else if (*src == '-')
> +            src++;
> +          while(len && my_isdigit(&my_charset_latin1, *src))
> +          {
> +            i= i * 10 + (*src - '0');
> +            src++;
> +          }
>          }
> -
> -        memcpy(tmp.str + tmp.length, (char *)header_end + first_offset,
> -               data_size);
> -        tmp.length+= data_size;
> +        else
> +          rc= ER_DYNCOL_TRUNCATED;
> +        if (len)
> +          rc= ER_DYNCOL_TRUNCATED;
> +        *ll= i * sign;
> +        break;
>        }
> +    case DYN_COL_DECIMAL:
> +      if (decimal2longlong(&val->x.decimal.value, ll) != E_DEC_OK)
> +        rc= ER_DYNCOL_TRUNCATED;
> +      break;
> +    case DYN_COL_DATETIME:
> +      *ll= (val->x.time_value.year * 10000000000L +
> +            val->x.time_value.month * 100000000L +
> +            val->x.time_value.day * 1000000 +
> +            val->x.time_value.hour * 10000 +
> +            val->x.time_value.minute * 100 +
> +            val->x.time_value.second) *
> +        (val->x.time_value.neg ? -1 : 1);
> +      break;
> +    case DYN_COL_DATE:
> +      *ll= (val->x.time_value.year * 10000 +
> +            val->x.time_value.month * 100 +
> +            val->x.time_value.day) *
> +        (val->x.time_value.neg ? -1 : 1);
> +      break;
> +    case DYN_COL_TIME:
> +      *ll= (val->x.time_value.hour * 10000 +
> +            val->x.time_value.minute * 100 +
> +            val->x.time_value.second) *
> +        (val->x.time_value.neg ? -1 : 1);
> +      break;
> +    case DYN_COL_NULL:
> +      rc= ER_DYNCOL_TRUNCATED;
> +      break;
> +    default:
> +      return(ER_DYNCOL_FORMAT);
> +  }
> +  return(rc);
> +}
> +
>  
> -      /* new data adding */
> -      if (i < add_column_count)
> +enum enum_dyncol_func_result
> +dynamic_column_val_double(double *dbl, DYNAMIC_COLUMN_VALUE *val)
> +{
> +  enum enum_dyncol_func_result rc= ER_DYNCOL_OK;
> +  *dbl= 0;
> +  switch (val->type) {
> +  case DYN_COL_INT:
> +      *dbl= (double)val->x.long_value;
> +      if (((longlong) *dbl) != val->x.long_value)
> +        rc= ER_DYNCOL_TRUNCATED;
> +      break;
> +    case DYN_COL_UINT:
> +      *dbl= (double)val->x.ulong_value;
> +      if (((ulonglong) *dbl) != val->x.ulong_value)
> +        rc= ER_DYNCOL_TRUNCATED;
> +      break;
> +    case DYN_COL_DOUBLE:
> +      *dbl= val->x.double_value;
> +      break;
> +    case DYN_COL_STRING:
>        {
> -        if( plan[i].act == PLAN_ADD || plan[i].act == PLAN_REPLACE)
> -        {
> -          int2store(write, plan[i].num[0]);
> -          type_and_offset_store(write, new_offset_size,
> -                                plan[i].val[0].type,
> -                                tmp.length -
> -                                (FIXED_HEADER_SIZE + new_header_size));
> -          write+= new_entry_size;
> -          data_store(&tmp, plan[i].val);        /* Append new data */
> -        }
> -        data_delta= plan[i].ddelta;
> +        char *str, *end;
> +        if ((str= malloc(val->x.string.value.length + 1)))
> +          return ER_DYNCOL_RESOURCE;
> +        memcpy(str, val->x.string.value.str, val->x.string.value.length);
> +        str[val->x.string.value.length]= '\0';
> +        *dbl= strtod(str, &end);
> +        if (*end != '\0')
> +          rc= ER_DYNCOL_TRUNCATED;
>        }
> -    }
> -    dynamic_column_column_free(str);
> -    *str= tmp;
> +    case DYN_COL_DECIMAL:
> +      if (decimal2double(&val->x.decimal.value, dbl) != E_DEC_OK)
> +        rc= ER_DYNCOL_TRUNCATED;
> +      break;
> +    case DYN_COL_DATETIME:
> +      *dbl= (double)(val->x.time_value.year * 10000000000L +
> +                     val->x.time_value.month * 100000000L +
> +                     val->x.time_value.day * 1000000 +
> +                     val->x.time_value.hour * 10000 +
> +                     val->x.time_value.minute * 100 +
> +                     val->x.time_value.second) *
> +        (val->x.time_value.neg ? -1 : 1);
> +      break;
> +    case DYN_COL_DATE:
> +      *dbl= (double)(val->x.time_value.year * 10000 +
> +                     val->x.time_value.month * 100 +
> +                     val->x.time_value.day) *
> +        (val->x.time_value.neg ? -1 : 1);
> +      break;
> +    case DYN_COL_TIME:
> +      *dbl= (double)(val->x.time_value.hour * 10000 +
> +                     val->x.time_value.minute * 100 +
> +                     val->x.time_value.second) *
> +        (val->x.time_value.neg ? -1 : 1);
> +      break;
> +    case DYN_COL_NULL:
> +      rc= ER_DYNCOL_TRUNCATED;
> +      break;
> +    default:
> +      return(ER_DYNCOL_FORMAT);
>    }
> +  return(rc);
> +}
>  
> -  rc= ER_DYNCOL_OK;
>  
> -end:
> -  my_free(plan);
> -  return rc;
> +/**
> +  Convert to JSON
>  
> -create_new_string:
> -  /* There is no columns from before, so let's just add the new ones */
> -  rc= ER_DYNCOL_OK;
> -  if (not_null != 0)
> -    rc= dynamic_column_create_many_internal(str, add_column_count,
> -                                            column_numbers, values,
> -                                            str->str == NULL);
> -  goto end;
> +  @param str             The packed string
> +  @param json            Where to put json result
> +
> +  @return ER_DYNCOL_* return code
> +*/
> +
> +enum enum_dyncol_func_result
> +dynamic_column_json(DYNAMIC_COLUMN *str, DYNAMIC_STRING *json)
> +{
> +  DYN_HEADER header;
> +  uint i;
> +  enum enum_dyncol_func_result rc;
> +
> +  bzero(json, sizeof(DYNAMIC_STRING));          /* In case of errors */
> +  if (str->length == 0)
> +    return ER_DYNCOL_OK;                        /* no columns */
> +
> +  if ((rc= init_read_hdr(&header, str)) < 0)
> +    return rc;
> +
> +  if (header.entry_size * header.column_count + FIXED_HEADER_SIZE >
> +      str->length)
> +    return ER_DYNCOL_FORMAT;
> +
> +  if (init_dynamic_string(json, NULL, str->length * 2, 100))
> +    return ER_DYNCOL_RESOURCE;
> +
> +  if (dynstr_append_mem(json, "[", 1))
> +    return ER_DYNCOL_RESOURCE;
> +  rc= ER_DYNCOL_RESOURCE;
> +  for (i= 0, header.entry= header.header;
> +       i < header.column_count;
> +       i++, header.entry+= header.entry_size)
> +  {
> +    DYNAMIC_COLUMN_VALUE val;
> +    if (i != 0 && dynstr_append_mem(json, ",", 1))
> +      goto err;
> +    header.length=
> +      hdr_interval_length(&header, header.entry + header.entry_size);
> +    header.data= header.dtpool + header.offset;
> +    /*
> +      Check that the found data is withing the ranges. This can happen if
> +      we get data with wrong offsets.
> +    */
> +    if (header.length == DYNCOL_OFFSET_ERROR ||
> +        header.length > INT_MAX || header.offset > header.data_size)
> +    {
> +      rc= ER_DYNCOL_FORMAT;
> +      goto err;
> +    }
> +    if ((rc= dynamic_column_get_value(&header, &val)) < 0 ||
> +        dynstr_append_mem(json, "{", 1))
> +      goto err;
> +    if (header.format == DYNCOL_FMT_NUM)
> +    {
> +      uint nm= uint2korr(header.entry);
> +      if (dynstr_realloc(json, DYNCOL_NUM_CHAR + 3))
> +        goto err;
> +      json->str[json->length++]= '"';
> +      json->length+= (snprintf(json->str + json->length,
> +                               DYNCOL_NUM_CHAR, "%u", nm));
> +    }
> +    else
> +    {
> +      uint len= header.entry[0];
> +      if (dynstr_realloc(json, len + 3))
> +        goto err;
> +      json->str[json->length++]= '"';
> +      memcpy(json->str + json->length, (const void *)header.nmpool +
> +             uint2korr(header.entry + 1), len);
> +      json->length+= len;
> +    }
> +    json->str[json->length++]= '"';
> +    json->str[json->length++]= ':';
> +    if ((rc= dynamic_column_val_str(json, &val,
> +                                    &my_charset_utf8_general_ci, TRUE)) < 0 ||
> +        dynstr_append_mem(json, "}", 1))
> +      goto err;
> +  }
> +  if (dynstr_append_mem(json, "]", 1))
> +    return ER_DYNCOL_RESOURCE;
> +  return ER_DYNCOL_OK;
> +
> +err:
> +  json->length= 0;
> +  return rc;
>  }
>  
>  
>  /**
> -  Update the packed string with the given column
> +  Convert to DYNAMIC_COLUMN_VALUE values and names (LEX_STING) dynamic array
>  
> -  @param str             String where to write the data
> -  @param column_number   Array of columns number
> -  @param values          Array of columns values
> +  @param str             The packed string
> +  @param names           Where to put names
> +  @param vals            Where to put values
> +  @param free_names      pointer to free names buffer if there is it.
>  
>    @return ER_DYNCOL_* return code
>  */
>  
> -
> -int dynamic_column_update(DYNAMIC_COLUMN *str, uint column_nr,
> -                          DYNAMIC_COLUMN_VALUE *value)
> +enum enum_dyncol_func_result
> +dynamic_column_vals(DYNAMIC_COLUMN *str,
> +                    DYNAMIC_ARRAY *names, DYNAMIC_ARRAY *vals,
> +                    char **free_names)
>  {
> -  return dynamic_column_update_many(str, 1, &column_nr, value);
> +  DYN_HEADER header;
> +  char *nm;
> +  uint i;
> +  enum enum_dyncol_func_result rc;
> +
> +  *free_names= 0;
> +  bzero(names, sizeof(DYNAMIC_ARRAY));        /* In case of errors */
> +  bzero(vals, sizeof(DYNAMIC_ARRAY));         /* In case of errors */
> +  if (str->length == 0)
> +    return ER_DYNCOL_OK;                      /* no columns */
> +
> +  if ((rc= init_read_hdr(&header, str)) < 0)
> +    return rc;
> +
> +  if (header.entry_size * header.column_count + FIXED_HEADER_SIZE >
> +      str->length)
> +    return ER_DYNCOL_FORMAT;
> +
> +  if (init_dynamic_array(names, sizeof(LEX_STRING),
> +                         header.column_count, 0) ||
> +      init_dynamic_array(vals, sizeof(DYNAMIC_COLUMN_VALUE),
> +                         header.column_count, 0) ||
> +      (header.format == DYNCOL_FMT_NUM &&
> +       !(*free_names= (char *)malloc(DYNCOL_NUM_CHAR * header.column_count))))

why do you need a special malloc() for names, you can put them
in the buffer of 'names' dynarray. it'll be a lot more convenient for
the caller (no free_names to care about).

> +  {
> +    rc= ER_DYNCOL_RESOURCE;
> +    goto err;
> +  }
> +  nm= *free_names;
> +
> +  for (i= 0, header.entry= header.header;
> +       i < header.column_count;
> +       i++, header.entry+= header.entry_size)
> +  {
> +    DYNAMIC_COLUMN_VALUE val;
> +    LEX_STRING name;
> +    header.length=
> +      hdr_interval_length(&header, header.entry + header.entry_size);
> +    header.data= header.dtpool + header.offset;
> +    /*
> +      Check that the found data is withing the ranges. This can happen if
> +      we get data with wrong offsets.
> +    */
> +    if (header.length == DYNCOL_OFFSET_ERROR ||
> +        header.length > INT_MAX || header.offset > header.data_size)
> +    {
> +      rc= ER_DYNCOL_FORMAT;
> +      goto err;
> +    }
> +    if ((rc= dynamic_column_get_value(&header, &val)) < 0)
> +      goto err;
> +
> +    if (header.format == DYNCOL_FMT_NUM)
> +    {
> +      uint num= uint2korr(header.entry);
> +      name.str= nm;
> +      name.length= snprintf(nm, DYNCOL_NUM_CHAR, "%u", num);
> +      nm+= name.length + 1;
> +    }
> +    else
> +    {
> +      name.length= header.entry[0];
> +      name.str= (char *)header.nmpool + uint2korr(header.entry + 1);
> +    }
> +    /* following is preallocated and so do not fail */
> +    (void) insert_dynamic(names, (uchar *)&name);
> +    (void) insert_dynamic(vals, (uchar *)&val);
> +  }
> +  return ER_DYNCOL_OK;
> +
> +err:
> +  delete_dynamic(names);
> +  delete_dynamic(vals);
> +  if (*free_names)
> +    my_free(*free_names);
> +  *free_names= 0;
> +  return rc;
>  }
> 

Regards,
Sergei