← Back to team overview

maria-developers team mailing list archive

MDEV-13995 MAX(timestamp) returns a wrong result near DST change

 

Hi Sergei,

Please review a patch for MDEV-13995.

Thanks!
commit 3eb1d600701b743f7b3238e6393436d6b917ad40
Author: Alexander Barkov <bar@xxxxxxxxxxx>
Date:   Mon Dec 4 21:41:28 2017 +0400

    MDEV-13995 MAX(timestamp) returns a wrong result near DST change
    
    The problem happened because timestamp values were converted to
    MYSQL_TIME internally and then compared as MYSQL_TIME, which does
    not work well near DST time zone changes, because two different
    timestamps can map to the same 'YYYY-MM-DD hh:mm:ss' representation.
    
    Introducing a set of methods to transfer and use data
    inside the server using a type-specific raw format:
    - Field::store_raw_native()
    - Field::val_raw_native()
    - Item::val_raw_native()
    - Item::val_raw_native_with_conversion()
    - Arg_comparator::set_cmp_func_raw()
    - Arg_comparator::compare_raw()
    - Arg_comparator::compare_e_raw()
    (and some others)
    
    A type handler (or a group of type handlers) can now implement its own
    internal raw representation to use with new methods.
    Raw representation is stored in "String" objects.
    
    The group of TIMESTAMP type handlers (which stand befind Field_timestamp,
    Field_timestamp_hires, Field_timestsmp2) now uses raw (instead of MYSQL_TIME)
    representation:
    
    1. Zero datetime '0000-00-00 00:00:00.000000' is represented as an empty String
    2. Real timestamps are represented as a String containing:
      a. Seconds (big-endian, 4-bytes), optionally followed by
      b. Microseconds (big-endian, 0-3 bytes, depending on precision 0-6)
    Two TIMESTAMP raw values are binary-comparable.
    
    This patch does not cover CASE and IN operators yet.
    They will be fixed in a separate patch
    (some more coding will be needed in in_xxx and cmp_item_xxx classes).
    
    This change is an extract from the joint prototype patch implementing:
    - MDEV-4912 Add a plugin to field types (column types)
    - MDEV-274 The data type for IPv6/IPv4 addresses in MariaDB
    (development for these tasks is in progress).
    The new raw data methods will be reused soon for the INET6 data type.

diff --git a/mysql-test/r/timezone2.result b/mysql-test/r/timezone2.result
index 096e996bffb..b673ad02345 100644
--- a/mysql-test/r/timezone2.result
+++ b/mysql-test/r/timezone2.result
@@ -332,3 +332,135 @@ NULL
 #
 # End of 5.3 tests
 #
+#
+# Start of 10.3 tests
+#
+#
+# MDEV-13995 MAX(timestamp) returns a wrong result near DST change
+#
+SET time_zone='+00:00';
+CREATE TABLE t1 (a TIMESTAMP);
+INSERT INTO t1 VALUES (FROM_UNIXTIME(1288477526)      /*summer time in Moscow*/);
+INSERT INTO t1 VALUES (FROM_UNIXTIME(1288477526+3599) /*winter time in Moscow*/);
+SET time_zone='Europe/Moscow';
+SELECT a, UNIX_TIMESTAMP(a) FROM t1;
+a	UNIX_TIMESTAMP(a)
+2010-10-31 02:25:26	1288477526
+2010-10-31 02:25:25	1288481125
+SELECT UNIX_TIMESTAMP(MAX(a)) AS a FROM t1;
+a
+1288481125
+CREATE TABLE t2 (a TIMESTAMP);
+INSERT INTO t2 SELECT MAX(a) AS a FROM t1;
+SELECT a, UNIX_TIMESTAMP(a) FROM t2;
+a	UNIX_TIMESTAMP(a)
+2010-10-31 02:25:25	1288481125
+DROP TABLE t2;
+DROP TABLE t1;
+SET time_zone='+00:00';
+CREATE TABLE t1 (a TIMESTAMP);
+CREATE TABLE t2 (a TIMESTAMP);
+INSERT INTO t1 VALUES (FROM_UNIXTIME(1288477526)      /*summer time in Moscow*/);
+INSERT INTO t2 VALUES (FROM_UNIXTIME(1288477526+3599) /*winter time in Moscow*/);
+SET time_zone='Europe/Moscow';
+SELECT UNIX_TIMESTAMP(t1.a), UNIX_TIMESTAMP(t2.a) FROM t1,t2;
+UNIX_TIMESTAMP(t1.a)	UNIX_TIMESTAMP(t2.a)
+1288477526	1288481125
+SELECT * FROM t1,t2 WHERE t1.a < t2.a;
+a	a
+2010-10-31 02:25:26	2010-10-31 02:25:25
+DROP TABLE t1,t2;
+BEGIN NOT ATOMIC
+DECLARE a,b TIMESTAMP;
+SET time_zone='+00:00';
+SET a=FROM_UNIXTIME(1288477526);
+SET b=FROM_UNIXTIME(1288481125);
+SELECT a < b;
+SET time_zone='Europe/Moscow';
+SELECT a < b;
+END;
+$$
+a < b
+1
+a < b
+1
+CREATE OR REPLACE FUNCTION f1(uts INT) RETURNS TIMESTAMP
+BEGIN
+DECLARE ts TIMESTAMP;
+DECLARE tz VARCHAR(64) DEFAULT @@time_zone;
+SET time_zone='+00:00';
+SET ts=FROM_UNIXTIME(uts);
+SET time_zone=tz;
+RETURN ts;
+END;
+$$
+SET time_zone='+00:00';
+SELECT f1(1288477526) < f1(1288481125);
+f1(1288477526) < f1(1288481125)
+1
+SET time_zone='Europe/Moscow';
+SELECT f1(1288477526) < f1(1288481125);
+f1(1288477526) < f1(1288481125)
+1
+DROP FUNCTION f1;
+CREATE TABLE t1 (a TIMESTAMP,b TIMESTAMP);
+SET time_zone='+00:00';
+INSERT INTO t1 VALUES (FROM_UNIXTIME(1288477526) /*summer time in Mowcow*/,
+FROM_UNIXTIME(1288481125) /*winter time in Moscow*/);
+SELECT *, LEAST(a,b) FROM t1;
+a	b	LEAST(a,b)
+2010-10-30 22:25:26	2010-10-30 23:25:25	2010-10-30 22:25:26
+SET time_zone='Europe/Moscow';
+SELECT *, LEAST(a,b) FROM t1;
+a	b	LEAST(a,b)
+2010-10-31 02:25:26	2010-10-31 02:25:25	2010-10-31 02:25:26
+SELECT UNIX_TIMESTAMP(a), UNIX_TIMESTAMP(b), UNIX_TIMESTAMP(LEAST(a,b)) FROM t1;
+UNIX_TIMESTAMP(a)	UNIX_TIMESTAMP(b)	UNIX_TIMESTAMP(LEAST(a,b))
+1288477526	1288481125	1288477526
+DROP TABLE t1;
+CREATE TABLE t1 (a TIMESTAMP,b TIMESTAMP,c TIMESTAMP);
+SET time_zone='+00:00';
+INSERT INTO t1 VALUES (
+FROM_UNIXTIME(1288477526) /*summer time in Moscow*/,
+FROM_UNIXTIME(1288481125) /*winter time in Moscow*/,
+FROM_UNIXTIME(1288481126) /*winter time in Moscow*/);
+SELECT b BETWEEN a AND c FROM t1;
+b BETWEEN a AND c
+1
+SET time_zone='Europe/Moscow';
+SELECT b BETWEEN a AND c FROM t1;
+b BETWEEN a AND c
+1
+DROP TABLE t1;
+SET time_zone='+00:00';
+CREATE TABLE t1 (a TIMESTAMP);
+INSERT INTO t1 VALUES (FROM_UNIXTIME(1288477526) /*summer time in Mowcow*/);
+INSERT INTO t1 VALUES (FROM_UNIXTIME(1288481125) /*winter time in Moscow*/);
+SELECT a, UNIX_TIMESTAMP(a) FROM t1 ORDER BY a;
+a	UNIX_TIMESTAMP(a)
+2010-10-30 22:25:26	1288477526
+2010-10-30 23:25:25	1288481125
+SELECT COALESCE(a) AS a, UNIX_TIMESTAMP(a) FROM t1 ORDER BY a;
+a	UNIX_TIMESTAMP(a)
+2010-10-30 22:25:26	1288477526
+2010-10-30 23:25:25	1288481125
+SET time_zone='Europe/Moscow';
+SELECT a, UNIX_TIMESTAMP(a) FROM t1 ORDER BY a;
+a	UNIX_TIMESTAMP(a)
+2010-10-31 02:25:26	1288477526
+2010-10-31 02:25:25	1288481125
+SELECT COALESCE(a) AS a, UNIX_TIMESTAMP(a) FROM t1 ORDER BY a;
+a	UNIX_TIMESTAMP(a)
+2010-10-31 02:25:26	1288477526
+2010-10-31 02:25:25	1288481125
+DROP TABLE t1;
+SET time_zone='+00:00';
+CREATE TABLE t1 (a TIMESTAMP);
+INSERT INTO t1 VALUES (FROM_UNIXTIME(1288477526) /*summer time in Mowcow*/);
+INSERT INTO t1 VALUES (FROM_UNIXTIME(1288481126) /*winter time in Moscow*/);
+SET time_zone='Europe/Moscow';
+SELECT a, UNIX_TIMESTAMP(a) FROM t1 GROUP BY a;
+a	UNIX_TIMESTAMP(a)
+2010-10-31 02:25:26	1288477526
+2010-10-31 02:25:26	1288481126
+DROP TABLE t1;
diff --git a/mysql-test/r/type_timestamp.result b/mysql-test/r/type_timestamp.result
index b0405bc4ad7..01c33f22fe8 100644
--- a/mysql-test/r/type_timestamp.result
+++ b/mysql-test/r/type_timestamp.result
@@ -1012,5 +1012,23 @@ t2	CREATE TABLE `t2` (
 DROP TABLE t2;
 DROP TABLE t1;
 #
+# MDEV-13995 MAX(timestamp) returns a wrong result near DST change
+#
+# Testing Item_func_rollup_const::val_raw_native()
+CREATE TABLE t1 (id INT);
+INSERT INTO t1 VALUES (1),(2);
+BEGIN NOT ATOMIC
+DECLARE v INT DEFAULT 10; -- "v" will be wrapped into Item_func_rollup_const
+SELECT id, v AS v, COUNT(*) FROM t1 GROUP BY id,v WITH ROLLUP;
+END;
+$$
+id	v	COUNT(*)
+1	10	1
+1	10	1
+2	10	1
+2	10	1
+NULL	10	2
+DROP TABLE t1;
+#
 # End of 10.3 tests
 #
diff --git a/mysql-test/t/timezone2.test b/mysql-test/t/timezone2.test
index 7a38610ad95..627c46cd430 100644
--- a/mysql-test/t/timezone2.test
+++ b/mysql-test/t/timezone2.test
@@ -308,3 +308,122 @@ SELECT CONVERT_TZ('2001-10-08 00:00:00', MAKE_SET(0,'+01:00'), '+00:00' );
 --echo #
 --echo # End of 5.3 tests
 --echo #
+
+
+--echo #
+--echo # Start of 10.3 tests
+--echo #
+
+
+--echo #
+--echo # MDEV-13995 MAX(timestamp) returns a wrong result near DST change
+--echo #
+
+# MAX()
+SET time_zone='+00:00';
+CREATE TABLE t1 (a TIMESTAMP);
+INSERT INTO t1 VALUES (FROM_UNIXTIME(1288477526)      /*summer time in Moscow*/);
+INSERT INTO t1 VALUES (FROM_UNIXTIME(1288477526+3599) /*winter time in Moscow*/);
+SET time_zone='Europe/Moscow';
+SELECT a, UNIX_TIMESTAMP(a) FROM t1;
+SELECT UNIX_TIMESTAMP(MAX(a)) AS a FROM t1;
+CREATE TABLE t2 (a TIMESTAMP);
+INSERT INTO t2 SELECT MAX(a) AS a FROM t1;
+SELECT a, UNIX_TIMESTAMP(a) FROM t2;
+DROP TABLE t2;
+DROP TABLE t1;
+
+
+# Comparison
+SET time_zone='+00:00';
+CREATE TABLE t1 (a TIMESTAMP);
+CREATE TABLE t2 (a TIMESTAMP);
+INSERT INTO t1 VALUES (FROM_UNIXTIME(1288477526)      /*summer time in Moscow*/);
+INSERT INTO t2 VALUES (FROM_UNIXTIME(1288477526+3599) /*winter time in Moscow*/);
+SET time_zone='Europe/Moscow';
+SELECT UNIX_TIMESTAMP(t1.a), UNIX_TIMESTAMP(t2.a) FROM t1,t2;
+SELECT * FROM t1,t2 WHERE t1.a < t2.a;
+DROP TABLE t1,t2;
+
+
+# SP variable comparison
+DELIMITER $$;
+BEGIN NOT ATOMIC
+  DECLARE a,b TIMESTAMP;
+  SET time_zone='+00:00';
+  SET a=FROM_UNIXTIME(1288477526);
+  SET b=FROM_UNIXTIME(1288481125);
+  SELECT a < b;
+  SET time_zone='Europe/Moscow';
+  SELECT a < b;
+END;
+$$
+DELIMITER ;$$
+
+
+# SP function comparison
+DELIMITER $$;
+CREATE OR REPLACE FUNCTION f1(uts INT) RETURNS TIMESTAMP
+BEGIN
+  DECLARE ts TIMESTAMP;
+  DECLARE tz VARCHAR(64) DEFAULT @@time_zone;
+  SET time_zone='+00:00';
+  SET ts=FROM_UNIXTIME(uts);
+  SET time_zone=tz;
+  RETURN ts;
+END;
+$$
+DELIMITER ;$$
+SET time_zone='+00:00';
+SELECT f1(1288477526) < f1(1288481125);
+SET time_zone='Europe/Moscow';
+SELECT f1(1288477526) < f1(1288481125);
+DROP FUNCTION f1;
+
+
+# LEAST()
+CREATE TABLE t1 (a TIMESTAMP,b TIMESTAMP);
+SET time_zone='+00:00';
+INSERT INTO t1 VALUES (FROM_UNIXTIME(1288477526) /*summer time in Mowcow*/,
+                       FROM_UNIXTIME(1288481125) /*winter time in Moscow*/);
+SELECT *, LEAST(a,b) FROM t1;
+SET time_zone='Europe/Moscow';
+SELECT *, LEAST(a,b) FROM t1;
+SELECT UNIX_TIMESTAMP(a), UNIX_TIMESTAMP(b), UNIX_TIMESTAMP(LEAST(a,b)) FROM t1;
+DROP TABLE t1;
+
+
+# BETWEEN
+CREATE TABLE t1 (a TIMESTAMP,b TIMESTAMP,c TIMESTAMP);
+SET time_zone='+00:00';
+INSERT INTO t1 VALUES (
+  FROM_UNIXTIME(1288477526) /*summer time in Moscow*/,
+  FROM_UNIXTIME(1288481125) /*winter time in Moscow*/,
+  FROM_UNIXTIME(1288481126) /*winter time in Moscow*/);
+SELECT b BETWEEN a AND c FROM t1;
+SET time_zone='Europe/Moscow';
+SELECT b BETWEEN a AND c FROM t1;
+DROP TABLE t1;
+
+
+# ORDER BY
+SET time_zone='+00:00';
+CREATE TABLE t1 (a TIMESTAMP);
+INSERT INTO t1 VALUES (FROM_UNIXTIME(1288477526) /*summer time in Mowcow*/);
+INSERT INTO t1 VALUES (FROM_UNIXTIME(1288481125) /*winter time in Moscow*/);
+SELECT a, UNIX_TIMESTAMP(a) FROM t1 ORDER BY a;
+SELECT COALESCE(a) AS a, UNIX_TIMESTAMP(a) FROM t1 ORDER BY a;
+SET time_zone='Europe/Moscow';
+SELECT a, UNIX_TIMESTAMP(a) FROM t1 ORDER BY a;
+SELECT COALESCE(a) AS a, UNIX_TIMESTAMP(a) FROM t1 ORDER BY a;
+DROP TABLE t1;
+
+
+# GROUP BY
+SET time_zone='+00:00';
+CREATE TABLE t1 (a TIMESTAMP);
+INSERT INTO t1 VALUES (FROM_UNIXTIME(1288477526) /*summer time in Mowcow*/);
+INSERT INTO t1 VALUES (FROM_UNIXTIME(1288481126) /*winter time in Moscow*/);
+SET time_zone='Europe/Moscow';
+SELECT a, UNIX_TIMESTAMP(a) FROM t1 GROUP BY a;
+DROP TABLE t1;
diff --git a/mysql-test/t/type_timestamp.test b/mysql-test/t/type_timestamp.test
index 6d81a86331a..749660db7dc 100644
--- a/mysql-test/t/type_timestamp.test
+++ b/mysql-test/t/type_timestamp.test
@@ -602,6 +602,23 @@ SHOW CREATE TABLE t2;
 DROP TABLE t2;
 DROP TABLE t1;
 
+--echo #
+--echo # MDEV-13995 MAX(timestamp) returns a wrong result near DST change
+--echo #
+
+--echo # Testing Item_func_rollup_const::val_raw_native()
+
+CREATE TABLE t1 (id INT);
+INSERT INTO t1 VALUES (1),(2);
+DELIMITER $$;
+BEGIN NOT ATOMIC
+  DECLARE v INT DEFAULT 10; -- "v" will be wrapped into Item_func_rollup_const
+  SELECT id, v AS v, COUNT(*) FROM t1 GROUP BY id,v WITH ROLLUP;
+END;
+$$
+DELIMITER ;$$
+DROP TABLE t1;
+
 
 --echo #
 --echo # End of 10.3 tests
diff --git a/sql/compat56.h b/sql/compat56.h
index bb5e2670f7d..5cd150f30ff 100644
--- a/sql/compat56.h
+++ b/sql/compat56.h
@@ -19,6 +19,15 @@
 
 
 /** MySQL56 routines and macros **/
+
+/*
+  Buffer size for a raw TIMESTAMP representation, for use with StringBuffer.
+  4 bytes for seconds
+  3 bytes for microseconds
+  1 byte for the trailing '\0' (as String always reserves extra 1 byte for '\0')
+*/
+#define STRING_BUFFER_TIMESTAMP_BINARY_SIZE    8 /* 4 + 3 + 1 */
+
 #define MY_PACKED_TIME_GET_INT_PART(x)     ((x) >> 24)
 #define MY_PACKED_TIME_GET_FRAC_PART(x)    ((x) % (1LL << 24))
 #define MY_PACKED_TIME_MAKE(i, f)          ((((longlong) (i)) << 24) + (f))
diff --git a/sql/field.cc b/sql/field.cc
index cfa9623fe17..76803ddec14 100644
--- a/sql/field.cc
+++ b/sql/field.cc
@@ -1713,10 +1713,10 @@ int Field::store(const char *to, uint length, CHARSET_INFO *cs,
 }
 
 
-static int timestamp_to_TIME(THD *thd, MYSQL_TIME *ltime, my_time_t ts,
-                             ulong sec_part, ulonglong fuzzydate)
+bool THD::timestamp_to_TIME(MYSQL_TIME *ltime, my_time_t ts,
+                            ulong sec_part, ulonglong fuzzydate)
 {
-  thd->time_zone_used= 1;
+  time_zone_used= 1;
   if (ts == 0 && sec_part == 0)
   {
     if (fuzzydate & TIME_NO_ZERO_DATE)
@@ -1725,7 +1725,7 @@ static int timestamp_to_TIME(THD *thd, MYSQL_TIME *ltime, my_time_t ts,
   }
   else
   {
-    thd->variables.time_zone->gmt_sec_to_TIME(ltime, ts);
+    variables.time_zone->gmt_sec_to_TIME(ltime, ts);
     ltime->second_part= sec_part;
   }
   return 0;
@@ -1736,7 +1736,7 @@ int Field::store_timestamp(my_time_t ts, ulong sec_part)
 {
   MYSQL_TIME ltime;
   THD *thd= get_thd();
-  timestamp_to_TIME(thd, &ltime, ts, sec_part, 0);
+  thd->timestamp_to_TIME(&ltime, ts, sec_part, 0);
   return store_time_dec(&ltime, decimals());
 }
 
@@ -4864,6 +4864,22 @@ my_time_t Field_timestamp::get_timestamp(const uchar *pos,
 }
 
 
+bool Field_timestamp::val_raw_native(String *to)
+{
+  int32 nr= sint4korr(ptr);
+  if (!nr)
+  {
+    to->length(0); // Zero datetime '0000-00-00 00:00:00'
+    return false;
+  }
+  if (to->alloc(4))
+    return true;
+  mi_int4store((uchar *) to->ptr(), nr);
+  to->length(4);
+  return false;
+}
+
+
 int Field_timestamp::store_TIME_with_warning(THD *thd, MYSQL_TIME *l_time,
                                              const ErrConv *str,
                                              int was_cut,
@@ -5102,7 +5118,7 @@ bool Field_timestamp::get_date(MYSQL_TIME *ltime, ulonglong fuzzydate)
 {
   ulong sec_part;
   my_time_t ts= get_timestamp(&sec_part);
-  return timestamp_to_TIME(get_thd(), ltime, ts, sec_part, fuzzydate);
+  return get_thd()->timestamp_to_TIME(ltime, ts, sec_part, fuzzydate);
 }
 
 
@@ -5246,6 +5262,17 @@ my_time_t Field_timestamp_hires::get_timestamp(const uchar *pos,
   return mi_uint4korr(pos);
 }
 
+
+bool Field_timestamp_hires::val_raw_native(String *to)
+{
+  ASSERT_COLUMN_MARKED_FOR_READ;
+  struct timeval tm;
+  tm.tv_sec= mi_uint4korr(ptr);
+  tm.tv_usec= sec_part_unshift(read_bigendian(ptr + 4, sec_part_bytes(dec)), dec);
+  return Timestamp_or_zero_datetime(tm.tv_sec == 0, tm).to_raw(to, dec);
+}
+
+
 double Field_timestamp_with_dec::val_real(void)
 {
   MYSQL_TIME ltime;
diff --git a/sql/field.h b/sql/field.h
index 3147374d496..f33ad2cf9f2 100644
--- a/sql/field.h
+++ b/sql/field.h
@@ -817,6 +817,15 @@ class Field: public Value_source
   virtual int  store_decimal(const my_decimal *d)=0;
   virtual int  store_time_dec(MYSQL_TIME *ltime, uint dec);
   virtual int  store_timestamp(my_time_t timestamp, ulong sec_part);
+  /**
+    Store a value represented in native raw format
+  */
+  virtual int store_raw_native(const char *str, uint length)
+  {
+    DBUG_ASSERT(0);
+    reset();
+    return 0;
+  }
   int store_time(MYSQL_TIME *ltime)
   { return store_time_dec(ltime, TIME_SECOND_PART_DIGITS); }
   int store(const char *to, uint length, CHARSET_INFO *cs,
@@ -854,6 +863,11 @@ class Field: public Value_source
      This trickery is used to decrease a number of malloc calls.
   */
   virtual String *val_str(String*,String *)=0;
+  virtual bool val_raw_native(String *to)
+  {
+    DBUG_ASSERT(!is_null());
+    return to->copy((const char *) ptr, pack_length(), &my_charset_bin);
+  }
   String *val_int_as_str(String *val_buffer, bool unsigned_flag);
   /*
     Return the field value as a LEX_CSTRING, without padding to full length
@@ -2432,6 +2446,20 @@ class Field_timestamp :public Field_temporal {
   {
     int4store(ptr,timestamp);
   }
+  void store_timeval(const struct timeval &tm)
+  {
+    store_TIME(tm.tv_sec, tm.tv_usec);
+  }
+  int store_raw_native(const char *str, uint length)
+  {
+    Timestamp_or_zero_datetime tm(str, length);
+    if (tm.is_zero_datetime())
+      reset();
+    else
+      store_timeval(tm.tm());
+    return 0;
+  }
+  bool val_raw_native(String *to);
   bool get_date(MYSQL_TIME *ltime, ulonglong fuzzydate);
   uchar *pack(uchar *to, const uchar *from,
               uint max_length __attribute__((unused)))
@@ -2509,6 +2537,7 @@ class Field_timestamp_hires :public Field_timestamp_with_dec {
   {
     DBUG_ASSERT(dec);
   }
+  bool val_raw_native(String *to);
   my_time_t get_timestamp(const uchar *pos, ulong *sec_part) const;
   void store_TIME(my_time_t timestamp, ulong sec_part);
   int cmp(const uchar *,const uchar *);
@@ -2554,6 +2583,16 @@ class Field_timestampf :public Field_timestamp_with_dec {
   }
   void store_TIME(my_time_t timestamp, ulong sec_part);
   my_time_t get_timestamp(const uchar *pos, ulong *sec_part) const;
+  bool val_raw_native(String *to)
+  {
+    // Check if it's '0000-00-00 00:00:00' rather than a real timestamp
+    if (ptr[0] == 0 && ptr[1] == 0 && ptr[2] == 0 && ptr[3] == 0)
+    {
+      to->length(0);
+      return false;
+    }
+    return Field::val_raw_native(to);
+  }
   uint size_of() const { return sizeof(*this); }
 };
 
diff --git a/sql/filesort.cc b/sql/filesort.cc
index 4fffd0dca91..c432321893c 100644
--- a/sql/filesort.cc
+++ b/sql/filesort.cc
@@ -1089,6 +1089,28 @@ Type_handler_temporal_result::make_sort_key(uchar *to, Item *item,
 
 
 void
+Type_handler_timestamp_common::make_sort_key(uchar *to, Item *item,
+                                             const SORT_FIELD_ATTR *sort_field,
+                                             Sort_param *param) const
+{
+  StringBuffer<STRING_BUFFER_TIMESTAMP_BINARY_SIZE> raw;
+  uint binlen= my_timestamp_binary_length(item->decimals);
+  if (item->val_raw_native(&raw) || raw.length() == 0)
+  {
+    // NULL or '0000-00-00 00:00:00'
+    bzero(to, item->maybe_null ? binlen + 1 : binlen);
+  }
+  else
+  {
+    DBUG_ASSERT(raw.length() == binlen);
+    if (item->maybe_null)
+      *to++= 1;
+    memcpy((char *) to, raw.ptr(), binlen);
+  }
+}
+
+
+void
 Type_handler::make_sort_key_longlong(uchar *to,
                                      bool maybe_null,
                                      bool null_value,
@@ -1902,6 +1924,15 @@ Type_handler_temporal_result::sortlength(THD *thd,
 
 
 void
+Type_handler_timestamp_common::sortlength(THD *thd,
+                                          const Type_std_attributes *item,
+                                          SORT_FIELD_ATTR *sortorder) const
+{
+  sortorder->length= my_timestamp_binary_length(item->decimals);
+}
+
+
+void
 Type_handler_int_result::sortlength(THD *thd,
                                         const Type_std_attributes *item,
                                         SORT_FIELD_ATTR *sortorder) const
diff --git a/sql/item.cc b/sql/item.cc
index c9f5462cca9..cf1d0fa0eb0 100644
--- a/sql/item.cc
+++ b/sql/item.cc
@@ -1732,6 +1732,14 @@ String *Item_sp_variable::val_str(String *sp)
 }
 
 
+bool Item_sp_variable::val_raw_native(String *raw)
+{
+  DBUG_ASSERT(fixed);
+  Item *it= this_item();
+  return null_value= it->val_raw_native(raw);
+}
+
+
 my_decimal *Item_sp_variable::val_decimal(my_decimal *decimal_value)
 {
   DBUG_ASSERT(fixed);
@@ -5001,6 +5009,12 @@ bool Item_ref_null_helper::get_date(MYSQL_TIME *ltime, ulonglong fuzzydate)
 }
 
 
+bool Item_ref_null_helper::val_raw_native(String *raw)
+{
+  return (owner->was_null|= null_value= (*ref)->val_raw_native(raw));
+}
+
+
 /**
   Mark item and SELECT_LEXs as dependent if item was resolved in
   outer SELECT.
@@ -7970,6 +7984,25 @@ longlong Item_ref::val_int_result()
 }
 
 
+bool Item_ref::val_raw_native(String *raw)
+{
+  DBUG_ASSERT(fixed);
+  return null_value= (*ref)->val_raw_native(raw);
+}
+
+
+bool Item_ref::val_raw_native_result(String *raw)
+{
+  if (result_field)
+  {
+    if ((null_value= result_field->is_null()))
+      return true;
+    return null_value= result_field->val_raw_native(raw);
+  }
+  return val_raw_native(raw);
+}
+
+
 String *Item_ref::str_result(String* str)
 {
   if (result_field)
@@ -8214,6 +8247,12 @@ bool Item_direct_ref::get_date(MYSQL_TIME *ltime,ulonglong fuzzydate)
 }
 
 
+bool Item_direct_ref::val_raw_native(String *raw)
+{
+  return (null_value= (*ref)->val_raw_native(raw));
+}
+
+
 Item_cache_wrapper::~Item_cache_wrapper()
 {
   DBUG_ASSERT(expr_cache == 0);
@@ -8502,6 +8541,34 @@ String *Item_cache_wrapper::val_str(String* str)
 
 
 /**
+  Get the raw value of the possibly cached item
+*/
+
+bool Item_cache_wrapper::val_raw_native(String* raw)
+{
+  Item *cached_value;
+  DBUG_ENTER("Item_cache_wrapper::val_raw_native");
+  if (!expr_cache)
+  {
+    bool rc= orig_item->val_raw_native(raw);
+    null_value= orig_item->null_value;
+    DBUG_RETURN(rc);
+  }
+
+  if ((cached_value= check_cache()))
+  {
+    bool rc= cached_value->val_raw_native(raw);
+    null_value= cached_value->null_value;
+    DBUG_RETURN(rc);
+  }
+  cache();
+  if ((null_value= expr_value->null_value))
+    DBUG_RETURN(true);
+  DBUG_RETURN(expr_value->val_raw_native(raw));
+}
+
+
+/**
   Get the decimal value of the possibly cached item
 */
 
@@ -9705,6 +9772,132 @@ bool  Item_cache_temporal::cache_value()
 }
 
 
+bool Item_cache_timestamp::val_raw_native(String *raw)
+{
+  if (!has_value())
+  {
+    null_value= true;
+    return true;
+  }
+  return null_value= raw->copy(m_raw.ptr(), m_raw.length(), &my_charset_bin);
+}
+
+
+bool Item_field::val_raw_native(String *raw)
+{
+  DBUG_ASSERT(fixed == 1);
+  if ((null_value= field->is_null()))
+    return true;
+  return (null_value= field->val_raw_native(raw));
+}
+
+
+bool Item_field::val_raw_native_result(String *raw)
+{
+  DBUG_ASSERT(fixed == 1);
+  if ((null_value= result_field->is_null()))
+    return true;
+  return (null_value= result_field->val_raw_native(raw));
+}
+
+
+bool Item_cache_timestamp::cache_value()
+{
+  if (!example)
+    return false;
+  value_cached= true;
+  null_value= example->val_raw_with_conversion_result(&m_raw, type_handler());
+  return true;
+}
+
+
+String *Item_cache_timestamp::val_str(String *str)
+{
+  DBUG_ASSERT(fixed == 1);
+  if (!has_value())
+  {
+    null_value= true;
+    return NULL;
+  }
+  return val_string_from_date(str);
+}
+
+
+my_decimal *Item_cache_timestamp::val_decimal(my_decimal *decimal_value)
+{
+  DBUG_ASSERT(fixed == 1);
+  if (!has_value())
+  {
+    null_value= true;
+    return NULL;
+  }
+  return val_decimal_from_date(decimal_value);
+}
+
+
+longlong Item_cache_timestamp::val_int()
+{
+  DBUG_ASSERT(fixed == 1);
+  if (!has_value())
+  {
+    null_value= true;
+    return 0;
+  }
+  return val_int_from_date();
+}
+
+
+longlong Item_cache_timestamp::val_datetime_packed()
+{
+  DBUG_ASSERT(0);
+  return 0;
+}
+
+
+longlong Item_cache_timestamp::val_time_packed()
+{
+  DBUG_ASSERT(0);
+  return 0;
+}
+
+
+double Item_cache_timestamp::val_real()
+{
+  DBUG_ASSERT(fixed == 1);
+  if (!has_value())
+  {
+    null_value= true;
+    return 0;
+  }
+  return val_real_from_date();
+}
+
+
+bool Item_cache_timestamp::get_date(MYSQL_TIME *ltime, ulonglong fuzzydate)
+{
+  if (!has_value())
+  {
+    set_zero_time(ltime, MYSQL_TIMESTAMP_DATETIME);
+    return true;
+  }
+  Timestamp_or_zero_datetime tm(&m_raw);
+  return (null_value= tm.to_TIME(current_thd, ltime, fuzzydate));
+}
+
+
+int Item_cache_timestamp::save_in_field(Field *field, bool no_conversions)
+{
+  if (!has_value())
+    return set_field_to_null_with_conversions(field, no_conversions);
+  if (!null_value && field->type_handler()->is_timestamp_type())
+  {
+    field->set_notnull();
+    return field->store_raw_native(m_raw.ptr(), m_raw.length());
+  }
+  return save_date_in_field(field, no_conversions);
+}
+
+
 bool Item_cache_time::cache_value()
 {
   if (!example)
diff --git a/sql/item.h b/sql/item.h
index 46a65b31b38..38ce5197ddf 100644
--- a/sql/item.h
+++ b/sql/item.h
@@ -668,6 +668,12 @@ class Item: public Value_source,
     DBUG_ASSERT(fixed == 1);
     return (null_value= item->get_date_with_conversion(ltime, fuzzydate));
   }
+  bool val_raw_with_conversion_from_item(Item *item, String *raw,
+                                         const Type_handler *handler)
+  {
+    DBUG_ASSERT(fixed == 1);
+    return null_value= item->val_raw_with_conversion(raw, handler);
+  }
 
 public:
   /*
@@ -992,6 +998,55 @@ class Item: public Value_source,
   */
   virtual String *val_str(String *str)=0;
 
+  bool val_raw_with_conversion(String *raw, const Type_handler *handler)
+  {
+    return handler->Item_val_raw_with_conversion(this, raw);
+  }
+  bool val_raw_with_conversion_result(String *raw, const Type_handler *handler)
+  {
+    return handler->Item_val_raw_with_conversion_result(this, raw);
+  }
+
+  virtual bool val_raw_native(String *str)
+  {
+   /*
+     The default implementation for the Items that do not need raw format:
+     - Item_basic_value
+     - Item_ident_for_show
+     - Item_copy
+     - Item_exists_subselect
+     - Item_sum_field
+     - Item_sum_or_func (default implementation)
+     - Item_proc
+     - Item_type_holder (as val_xxx() are never called for it);
+     - TODO: Item_name_const: TIMESTAMP WITH LOCAL TIMEZONE'2001-01-01 00:00:00'
+
+     These hybrid Item types override val_raw_native():
+     - Item_field
+     - Item_param
+     - Item_sp_variable
+     - Item_ref
+     - Item_cache_wrapper
+     - Item_direct_ref
+     - Item_direct_view_ref
+     - Item_ref_null_helper
+     - Item_sum_or_func
+         Note, these hybrid type Item_sum_or_func descendants
+         override the default implementation:
+         * Item_sum_hybrid
+         * Item_func_hybrid_field_type
+         * Item_func_min_max
+         * Item_func_sp
+         * Item_func_last_value
+         * Item_func_rollup_const
+   */
+    DBUG_ASSERT(0);
+    return null_value= true;
+  }
+  virtual bool val_raw_native_result(String *str)
+  {
+    return val_raw_native(str);
+  }
   /*
     Returns string representation of this item in ASCII format.
 
@@ -2299,6 +2354,7 @@ class Item_sp_variable :public Item
   longlong val_int();
   String *val_str(String *sp);
   my_decimal *val_decimal(my_decimal *decimal_value);
+  bool val_raw_native(String *raw);
   bool is_null();
 
 public:
@@ -2771,6 +2827,8 @@ class Item_field :public Item_ident
   void save_result(Field *to);
   double val_result();
   longlong val_int_result();
+  bool val_raw_native(String *str);
+  bool val_raw_native_result(String *str);
   String *str_result(String* tmp);
   my_decimal *val_decimal_result(my_decimal *);
   bool val_bool_result();
@@ -3296,6 +3354,10 @@ class Item_param :public Item_basic_value,
   {
     return can_return_value() ? value.val_str(str, this) : NULL;
   }
+  bool val_raw_native(String *raw)
+  {
+    return Item_param::type_handler()->Item_param_val_raw_native(this, raw);
+  }
   bool get_date(MYSQL_TIME *tm, ulonglong fuzzydate);
   int  save_in_field(Field *field, bool no_conversions);
 
@@ -4527,6 +4589,8 @@ class Item_ref :public Item_ident
   double val_result();
   longlong val_int_result();
   String *str_result(String* tmp);
+  bool val_raw_native(String *str);
+  bool val_raw_native_result(String *str);
   my_decimal *val_decimal_result(my_decimal *);
   bool val_bool_result();
   bool is_null_result();
@@ -4722,6 +4786,7 @@ class Item_direct_ref :public Item_ref
   double val_real();
   longlong val_int();
   String *val_str(String* tmp);
+  bool val_raw_native(String *raw);
   my_decimal *val_decimal(my_decimal *);
   bool val_bool();
   bool is_null();
@@ -4816,6 +4881,7 @@ class Item_cache_wrapper :public Item_result_field,
   double val_real();
   longlong val_int();
   String *val_str(String* tmp);
+  bool val_raw_native(String *raw);
   my_decimal *val_decimal(my_decimal *);
   bool val_bool();
   bool is_null();
@@ -5040,6 +5106,12 @@ class Item_direct_view_ref :public Item_direct_ref
     }
     return Item_direct_ref::get_date(ltime, fuzzydate);
   }
+  bool val_raw_native(String *raw)
+  {
+    if (check_null_ref())
+      return true;
+    return Item_direct_ref::val_raw_native(raw);
+  }
   bool send(Protocol *protocol, st_value *buffer);
   void save_org_in_field(Field *field,
                          fast_field_copier data __attribute__ ((__unused__)))
@@ -5155,6 +5227,7 @@ class Item_ref_null_helper: public Item_ref
   my_decimal *val_decimal(my_decimal *);
   bool val_bool();
   bool get_date(MYSQL_TIME *ltime, ulonglong fuzzydate);
+  bool val_raw_native(String *raw);
   virtual void print(String *str, enum_query_type query_type);
   table_map used_tables() const;
   Item *get_copy(THD *thd, MEM_ROOT *mem_root)
@@ -5957,6 +6030,27 @@ class Item_cache_datetime: public Item_cache_temporal
 };
 
 
+class Item_cache_timestamp: public Item_cache
+{
+  StringBuffer<STRING_BUFFER_TIMESTAMP_BINARY_SIZE> m_raw;
+public:
+  Item_cache_timestamp(THD *thd)
+   :Item_cache(thd, &type_handler_timestamp2) { }
+  Item *get_copy(THD *thd, MEM_ROOT *mem_root)
+  { return get_item_copy<Item_cache_timestamp>(thd, mem_root, this); }
+  bool cache_value();
+  String* val_str(String *str);
+  my_decimal *val_decimal(my_decimal *);
+  longlong val_int();
+  longlong val_datetime_packed();
+  longlong val_time_packed();
+  double val_real();
+  bool get_date(MYSQL_TIME *ltime, ulonglong fuzzydate);
+  int save_in_field(Field *field, bool no_conversions);
+  bool val_raw_native(String *str);
+};
+
+
 class Item_cache_date: public Item_cache_temporal
 {
 public:
diff --git a/sql/item_cmpfunc.cc b/sql/item_cmpfunc.cc
index 587dc14c529..f5f2fc3052e 100644
--- a/sql/item_cmpfunc.cc
+++ b/sql/item_cmpfunc.cc
@@ -591,6 +591,18 @@ bool Arg_comparator::set_cmp_func_datetime()
 }
 
 
+bool Arg_comparator::set_cmp_func_raw()
+{
+  THD *thd= current_thd;
+  m_compare_collation= &my_charset_numeric;
+  func= is_owner_equal_func() ? &Arg_comparator::compare_e_raw :
+                                &Arg_comparator::compare_raw;
+  a= cache_converted_constant(thd, a, &a_cache, compare_type_handler());
+  b= cache_converted_constant(thd, b, &b_cache, compare_type_handler());
+  return false;
+}
+
+
 bool Arg_comparator::set_cmp_func_int()
 {
   THD *thd= current_thd;
@@ -846,6 +858,33 @@ int Arg_comparator::compare_e_string()
 }
 
 
+int Arg_comparator::compare_raw()
+{
+  if (!(*a)->val_raw_with_conversion(&value1, compare_type_handler()))
+  {
+    if (!(*b)->val_raw_with_conversion(&value2, compare_type_handler()))
+    {
+      if (set_null)
+        owner->null_value= 0;
+      return compare_type_handler()->cmp_raw(&value1, &value2);
+    }
+  }
+  if (set_null)
+    owner->null_value= 1;
+  return -1;
+}
+
+
+int Arg_comparator::compare_e_raw()
+{
+  bool res1= (*a)->val_raw_with_conversion(&value1, compare_type_handler());
+  bool res2= (*b)->val_raw_with_conversion(&value2, compare_type_handler());
+  if (res1 || res2)
+    return MY_TEST(res1 == res2);
+  return MY_TEST(compare_type_handler()->cmp_raw(&value1, &value2) == 0);
+}
+
+
 int Arg_comparator::compare_real()
 {
   /*
@@ -2144,6 +2183,28 @@ longlong Item_func_between::val_int_cmp_temporal()
 }
 
 
+longlong Item_func_between::val_int_cmp_raw()
+{
+  const Type_handler *h= m_comparator.type_handler();
+  StringBuffer<STRING_BUFFER_USUAL_SIZE> value, a, b;
+  if ((null_value= args[0]->val_raw_with_conversion(&value, h)))
+    return 0;
+  bool ra= args[1]->val_raw_with_conversion(&a, h);
+  bool rb= args[2]->val_raw_with_conversion(&b, h);
+  if (!ra && !rb)
+    return (longlong)
+      ((h->cmp_raw(&value, &a) >= 0 &&
+        h->cmp_raw(&value, &b) <= 0) != negated);
+  if (ra && rb)
+    null_value= true;
+  else if (ra)
+    null_value= h->cmp_raw(&value, &b) <= 0;
+  else
+    null_value= h->cmp_raw(&value ,&a) >= 0;
+  return (longlong) (!null_value && negated);
+}
+
+
 longlong Item_func_between::val_int_cmp_string()
 {
   String *value,*a,*b;
@@ -2328,6 +2389,15 @@ bool Item_func_ifnull::date_op(MYSQL_TIME *ltime, ulonglong fuzzydate)
 }
 
 
+bool Item_func_ifnull::raw_op(String *raw)
+{
+  DBUG_ASSERT(fixed == 1);
+  if (!args[0]->val_raw_with_conversion(raw, type_handler()))
+    return (null_value= false);
+  return (null_value= args[1]->val_raw_with_conversion(raw, type_handler()));
+}
+
+
 /**
   Perform context analysis of an IF item tree.
 
@@ -2803,6 +2873,16 @@ Item_func_nullif::date_op(MYSQL_TIME *ltime, ulonglong fuzzydate)
 
 
 bool
+Item_func_nullif::raw_op(String *raw)
+{
+  DBUG_ASSERT(fixed == 1);
+  if (!compare())
+    return (null_value= true);
+  return (null_value= args[2]->val_raw_with_conversion(raw, type_handler()));
+}
+
+
+bool
 Item_func_nullif::is_null()
 {
   return (null_value= (!compare() ? 1 : args[2]->null_value));
@@ -2943,6 +3023,16 @@ bool Item_func_case::date_op(MYSQL_TIME *ltime, ulonglong fuzzydate)
 }
 
 
+bool Item_func_case::raw_op(String *raw)
+{
+  DBUG_ASSERT(fixed == 1);
+  Item *item= find_item();
+  if (!item)
+    return (null_value= true);
+  return (null_value= item->val_raw_with_conversion(raw, type_handler()));
+}
+
+
 bool Item_func_case::fix_fields(THD *thd, Item **ref)
 {
   /*
@@ -3356,6 +3446,18 @@ bool Item_func_coalesce::date_op(MYSQL_TIME *ltime, ulonglong fuzzydate)
 }
 
 
+bool Item_func_coalesce::raw_op(String *raw)
+{
+  DBUG_ASSERT(fixed == 1);
+  for (uint i= 0; i < arg_count; i++)
+  {
+    if (!args[i]->val_raw_with_conversion(raw, type_handler()))
+      return (null_value= false);
+  }
+  return (null_value= true);
+}
+
+
 my_decimal *Item_func_coalesce::decimal_op(my_decimal *decimal_value)
 {
   DBUG_ASSERT(fixed == 1);
diff --git a/sql/item_cmpfunc.h b/sql/item_cmpfunc.h
index e18315fde87..042be5b2c2a 100644
--- a/sql/item_cmpfunc.h
+++ b/sql/item_cmpfunc.h
@@ -89,6 +89,7 @@ class Arg_comparator: public Sql_alloc
   bool set_cmp_func_string();
   bool set_cmp_func_time();
   bool set_cmp_func_datetime();
+  bool set_cmp_func_raw();
   bool set_cmp_func_int();
   bool set_cmp_func_real();
   bool set_cmp_func_decimal();
@@ -119,6 +120,8 @@ class Arg_comparator: public Sql_alloc
   int compare_e_real_fixed();
   int compare_datetime();
   int compare_e_datetime();
+  int compare_raw();
+  int compare_e_raw();
   int compare_time();
   int compare_e_time();
   int compare_json_str_basic(Item *j, Item *s);
@@ -928,6 +931,7 @@ class Item_func_between :public Item_func_opt_neg
 
   longlong val_int_cmp_string();
   longlong val_int_cmp_temporal();
+  longlong val_int_cmp_raw();
   longlong val_int_cmp_int();
   longlong val_int_cmp_real();
   longlong val_int_cmp_decimal();
@@ -1002,6 +1006,7 @@ class Item_func_coalesce :public Item_func_case_expression
   String *str_op(String *);
   my_decimal *decimal_op(my_decimal *);
   bool date_op(MYSQL_TIME *ltime, ulonglong fuzzydate);
+  bool raw_op(String *raw);
   void fix_length_and_dec()
   {
     if (!aggregate_for_result(func_name(), args, arg_count, true))
@@ -1074,6 +1079,7 @@ class Item_func_ifnull :public Item_func_case_abbreviation2
   String *str_op(String *str);
   my_decimal *decimal_op(my_decimal *);
   bool date_op(MYSQL_TIME *ltime, ulonglong fuzzydate);
+  bool raw_op(String *raw);
   void fix_length_and_dec()
   {
     Item_func_case_abbreviation2::fix_length_and_dec2(args);
@@ -1125,6 +1131,10 @@ class Item_func_case_abbreviation2_switch: public Item_func_case_abbreviation2
   {
     return val_str_from_item(find_item(), str);
   }
+  bool raw_op(String *raw)
+  {
+    return val_raw_with_conversion_from_item(find_item(), raw, type_handler());
+  }
 };
 
 
@@ -1223,6 +1233,7 @@ class Item_func_nullif :public Item_func_case_expression
   longlong int_op();
   String *str_op(String *str);
   my_decimal *decimal_op(my_decimal *);
+  bool raw_op(String *raw);
   void fix_length_and_dec();
   bool walk(Item_processor processor, bool walk_subquery, void *arg);
   const char *func_name() const { return "nullif"; }
@@ -2108,6 +2119,7 @@ class Item_func_case :public Item_func_case_expression
   String *str_op(String *);
   my_decimal *decimal_op(my_decimal *);
   bool date_op(MYSQL_TIME *ltime, ulonglong fuzzydate);
+  bool raw_op(String *raw);
   bool fix_fields(THD *thd, Item **ref);
   table_map not_null_tables() const { return 0; }
   const char *func_name() const { return "case"; }
diff --git a/sql/item_func.cc b/sql/item_func.cc
index be5e5064d60..6bb9d2e14d8 100644
--- a/sql/item_func.cc
+++ b/sql/item_func.cc
@@ -2830,6 +2830,36 @@ my_decimal *Item_func_min_max::val_decimal_native(my_decimal *dec)
 }
 
 
+bool Item_func_min_max::val_raw_native(String *raw)
+{
+  DBUG_ASSERT(fixed == 1);
+  const Type_handler *handler= Item_hybrid_func::type_handler();
+  /*
+    For now we don't use val_raw_native() with traditional data types other
+    than TIMESTAMP. If we ever start to use, we'll possibly need new methods:
+      Type_handler::Item_func_min_max_val_raw_native()
+      Item::val_raw_native_from_{int|decimal|real|str|date}()
+    It may happen to be cheaper to do the operation in e.g. INT format and
+    then convert INT to raw (instead of doing the operation in raw format).
+  */
+  StringBuffer<STRING_BUFFER_USUAL_SIZE> cur;
+  for (uint i= 0; i < arg_count; i++)
+  {
+    args[i]->val_raw_with_conversion(i == 0 ? raw : &cur, handler);
+    if (args[i]->null_value)
+      return null_value= true;
+    if (i > 0)
+    {
+      int cmp= handler->cmp_raw(raw, &cur);
+      if ((cmp_sign < 0 ? cmp : -cmp) < 0 &&
+         raw->copy(cur.ptr(), cur.length(), &my_charset_bin))
+        return null_value= true;
+    }
+  }
+  return null_value= false;
+}
+
+
 longlong Item_func_bit_length::val_int()
 {
   DBUG_ASSERT(fixed == 1);
@@ -6729,6 +6759,16 @@ String *Item_func_last_value::val_str(String *str)
   return tmp;
 }
 
+
+bool Item_func_last_value::val_raw_native(String *str)
+{
+  evaluate_sideeffects();
+  bool rc= last_value->val_raw_native(str);
+  null_value= last_value->null_value;
+  return rc;
+}
+
+
 longlong Item_func_last_value::val_int()
 {
   longlong tmp;
diff --git a/sql/item_func.h b/sql/item_func.h
index 58df7b8b3c6..31a058f176c 100644
--- a/sql/item_func.h
+++ b/sql/item_func.h
@@ -519,6 +519,14 @@ class Item_func_hybrid_field_type: public Item_hybrid_func
   bool get_date_from_decimal_op(MYSQL_TIME *ltime, ulonglong fuzzydate);
   bool get_date_from_int_op(MYSQL_TIME *ltime, ulonglong fuzzydate);
 
+  /*
+    For now only the TIMESTAMP data type uses val_raw_native().
+    For conversion purposes (e.g. to string and numbers),
+    it uses DATETIME representation.
+    Later (e.g. for INET6) we'll implement the following methods:
+      val_{str|decimal|int|real}_from_raw_op()
+      get_date_from_raw_op()
+  */
 public:
   Item_func_hybrid_field_type(THD *thd):
     Item_hybrid_func(thd)
@@ -569,6 +577,23 @@ class Item_func_hybrid_field_type: public Item_hybrid_func
            Item_func_hybrid_field_type_get_date(this, res, fuzzy_date);
   }
 
+  bool val_raw_native(String *raw)
+  {
+    DBUG_ASSERT(fixed);
+    /*
+      For now only the TIMESTAMP data type uses val_raw_native().
+      Later we'll probably need new methods:
+        Item_func_hybrid_field_type::val_raw_native_from_{str|int|real|date}()
+        Type_handler::Item_func_hybrid_field_type_val_raw_native(),
+      for symmetry with val_xxx() and get_date() above,
+      so the type handler can decide how to perform the hybrid operation
+      and when and how do data type conversion to raw format.
+      It may happen to be cheaper to do the operation in e.g. INT format and
+      then convert INT to raw (instead of doing the operation in raw format).
+      For now it's OK just to call raw_op() directly here.
+    */
+    return raw_op(raw);
+  }
   /**
      @brief Performs the operation that this functions implements when the
      result type is INT.
@@ -612,6 +637,7 @@ class Item_func_hybrid_field_type: public Item_hybrid_func
   */
   virtual bool date_op(MYSQL_TIME *res, ulonglong fuzzy_date)= 0;
 
+  virtual bool raw_op(String *raw)= 0;
 };
 
 
@@ -674,6 +700,11 @@ class Item_func_numhybrid: public Item_func_hybrid_field_type
     DBUG_ASSERT(0);
     return true;
   }
+  bool raw_op(String *raw)
+  {
+    DBUG_ASSERT(0);
+    return true;
+  }
 };
 
 
@@ -1527,6 +1558,7 @@ class Item_func_min_max :public Item_hybrid_func
     return Item_func_min_max::type_handler()->
              Item_func_min_max_get_date(this, res, fuzzy_date);
   }
+  bool val_raw_native(String *raw);
   void aggregate_attributes_real(Item **items, uint nitems)
   {
     /*
@@ -1589,6 +1621,7 @@ class Item_func_rollup_const :public Item_func
   double val_real() { return args[0]->val_real(); }
   longlong val_int() { return args[0]->val_int(); }
   String *val_str(String *str) { return args[0]->val_str(str); }
+  bool val_raw_native(String *raw) { return args[0]->val_raw_native(raw); }
   my_decimal *val_decimal(my_decimal *dec) { return args[0]->val_decimal(dec); }
   const char *func_name() const { return "rollup_const"; }
   bool const_item() const { return 0; }
@@ -2835,6 +2868,13 @@ class Item_func_sp :public Item_func
     return str;
   }
 
+  bool val_raw_native(String *raw)
+  {
+    if (execute())
+      return true;
+    return null_value= sp_result_field->val_raw_native(raw);
+  }
+
   void update_null_value()
   { 
     execute();
@@ -2970,6 +3010,7 @@ class Item_func_last_value :public Item_func
   longlong val_int();
   String *val_str(String *);
   my_decimal *val_decimal(my_decimal *);
+  bool val_raw_native(String *);
   void fix_length_and_dec();
   const char *func_name() const { return "last_value"; }
   const Type_handler *type_handler() const { return last_value->type_handler(); }
diff --git a/sql/item_subselect.cc b/sql/item_subselect.cc
index 8e4c5dcbbf1..201bb818eae 100644
--- a/sql/item_subselect.cc
+++ b/sql/item_subselect.cc
@@ -1331,6 +1331,24 @@ String *Item_singlerow_subselect::val_str(String *str)
 }
 
 
+bool Item_singlerow_subselect::val_raw_native(String *raw)
+{
+  DBUG_ASSERT(fixed == 1);
+  if (forced_const)
+    return value->val_raw_native(raw);
+  if (!exec() && !value->null_value)
+  {
+    null_value= false;
+    return value->val_raw_native(raw);
+  }
+  else
+  {
+    reset();
+    return true;
+  }
+}
+
+
 my_decimal *Item_singlerow_subselect::val_decimal(my_decimal *decimal_value)
 {
   DBUG_ASSERT(fixed == 1);
diff --git a/sql/item_subselect.h b/sql/item_subselect.h
index 812352e4c4f..716330a4168 100644
--- a/sql/item_subselect.h
+++ b/sql/item_subselect.h
@@ -301,6 +301,7 @@ class Item_singlerow_subselect :public Item_subselect
   double val_real();
   longlong val_int ();
   String *val_str (String *);
+  bool val_raw_native(String *);
   my_decimal *val_decimal(my_decimal *);
   bool val_bool();
   bool get_date(MYSQL_TIME *ltime, ulonglong fuzzydate);
diff --git a/sql/item_sum.cc b/sql/item_sum.cc
index 4fecd7de269..52d737ee4ed 100644
--- a/sql/item_sum.cc
+++ b/sql/item_sum.cc
@@ -2079,6 +2079,15 @@ Item_sum_hybrid::val_str(String *str)
 }
 
 
+bool Item_sum_hybrid::val_raw_native(String *raw)
+{
+  DBUG_ASSERT(fixed == 1);
+  if (null_value)
+    return true;
+  return null_value= value->val_raw_native(raw);
+}
+
+
 void Item_sum_hybrid::cleanup()
 {
   DBUG_ENTER("Item_sum_hybrid::cleanup");
diff --git a/sql/item_sum.h b/sql/item_sum.h
index 0cb5be391a2..b95048b4eb1 100644
--- a/sql/item_sum.h
+++ b/sql/item_sum.h
@@ -1037,6 +1037,7 @@ class Item_sum_hybrid :public Item_sum, public Type_handler_hybrid_field_type
   bool get_date(MYSQL_TIME *ltime, ulonglong fuzzydate);
   void reset_field();
   String *val_str(String *);
+  bool val_raw_native(String *);
   const Type_handler *real_type_handler() const
   {
     return get_arg(0)->real_type_handler();
diff --git a/sql/item_timefunc.cc b/sql/item_timefunc.cc
index a0fdd4bb8fc..07700a52317 100644
--- a/sql/item_timefunc.cc
+++ b/sql/item_timefunc.cc
@@ -1203,14 +1203,16 @@ bool Item_func_unix_timestamp::get_timestamp_value(my_time_t *seconds,
     }
   }
 
-  MYSQL_TIME ltime;
-  if (get_arg0_date(&ltime, TIME_NO_ZERO_IN_DATE))
-    return 1;
-
-  uint error_code;
-  *seconds= TIME_to_timestamp(current_thd, &ltime, &error_code);
-  *second_part= ltime.second_part;
-  return (null_value= (error_code == ER_WARN_DATA_OUT_OF_RANGE));
+  StringBuffer<STRING_BUFFER_TIMESTAMP_BINARY_SIZE> raw;
+  if ((null_value= args[0]->val_raw_with_conversion(&raw,
+                                                    &type_handler_timestamp2)))
+    return true;
+  Timestamp_or_zero_datetime tm(&raw);
+  if (tm.is_zero_datetime())
+    return null_value= true;
+  *seconds= tm.tm().tv_sec;
+  *second_part= tm.tm().tv_usec;
+  return false;
 }
 
 
diff --git a/sql/sql_class.h b/sql/sql_class.h
index 8e99d57d0b4..fb8ae4e1ccb 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -3238,6 +3238,8 @@ class THD :public Statement,
     return !MY_TEST(variables.sql_mode & MODE_NO_BACKSLASH_ESCAPES);
   }
   const Type_handler *type_handler_for_date() const;
+  bool timestamp_to_TIME(MYSQL_TIME *ltime, my_time_t ts,
+                         ulong sec_part, ulonglong fuzzydate);
   inline my_time_t query_start() { query_start_used=1; return start_time; }
   inline ulong query_start_sec_part()
   { query_start_sec_part_used=1; return start_time_sec_part; }
diff --git a/sql/sql_type.cc b/sql/sql_type.cc
index dd4cb2b8068..45d7959c9c6 100644
--- a/sql/sql_type.cc
+++ b/sql/sql_type.cc
@@ -65,6 +65,83 @@ Type_handler_geometry    type_handler_geometry;
 #endif
 
 
+uint Timestamp::binary_length_to_precision(uint length)
+{
+  switch (length) {
+  case 4: return 0;
+  case 5: return 2;
+  case 6: return 4;
+  case 7: return 6;
+  }
+  DBUG_ASSERT(0);
+  return 0;
+}
+
+
+void Timestamp::from_raw(const char *str, uint length)
+{
+  DBUG_ASSERT(length >= 4 && length <= 7);
+  uint dec= binary_length_to_precision(length);
+  my_timestamp_from_binary(&m_tm, (const uchar *) str, dec);
+}
+
+
+bool Timestamp::to_raw(String *raw, uint decimals) const
+{
+  uint len= my_timestamp_binary_length(decimals);
+  if (raw->reserve(len))
+    return true;
+  my_timestamp_to_binary(&m_tm, (uchar *) raw->ptr(), decimals);
+  raw->length(len);
+  return false;
+}
+
+
+bool Timestamp::to_TIME(THD *thd, MYSQL_TIME *to, ulonglong fuzzydate) const
+{
+  return thd->timestamp_to_TIME(to, m_tm.tv_sec, m_tm.tv_usec, fuzzydate);
+}
+
+
+Timestamp_or_zero_datetime::Timestamp_or_zero_datetime(THD *thd,
+                                                       const MYSQL_TIME *ltime,
+                                                       uint *error_code)
+{
+  m_tm.tv_sec= TIME_to_timestamp(thd, ltime, error_code);
+  m_tm.tv_usec= ltime->second_part;
+  if ((m_is_zero_datetime= (*error_code == ER_WARN_DATA_OUT_OF_RANGE)))
+  {
+    if (!non_zero_date(ltime))
+      *error_code= 0;  // ltime was '0000-00-00 00:00:00'
+  }
+  else if (*error_code == ER_WARN_INVALID_TIMESTAMP)
+    *error_code= 0; // ltime fell into spring time gap, adjusted.
+}
+
+
+bool Timestamp_or_zero_datetime::to_TIME(THD *thd, MYSQL_TIME *to,
+                                         ulonglong fuzzydate) const
+{
+  if (m_is_zero_datetime)
+  {
+    set_zero_time(to, MYSQL_TIMESTAMP_DATETIME);
+    return false;
+  }
+  return Timestamp::to_TIME(thd, to, fuzzydate);
+}
+
+
+bool Timestamp_or_zero_datetime::to_raw(String *raw, uint decimals) const
+{
+  if (m_is_zero_datetime)
+  {
+    raw->length(0);
+    return false;
+  }
+  return Timestamp::to_raw(raw, decimals);
+}
+
+
 bool Type_handler_data::init()
 {
 #ifdef HAVE_SPATIAL
@@ -486,7 +563,7 @@ const Type_handler *Type_handler_datetime_common::type_handler_for_comparison()
 
 const Type_handler *Type_handler_timestamp_common::type_handler_for_comparison() const
 {
-  return &type_handler_datetime;
+  return &type_handler_timestamp;
 }
 
 
@@ -671,6 +748,15 @@ Type_handler_hybrid_field_type::aggregate_for_comparison(const Type_handler *h)
       */
       if (b == TIME_RESULT)
         m_type_handler= h; // Temporal types bit non-temporal types
+      /*
+        Compare TIMESTAMP to a non-temporal type as DATETIME.
+        This is needed to make queries with fuzzy dates work:
+        SELECT * FROM t1
+        WHERE
+          ts BETWEEN '0000-00-00' AND '2010-00-01 00:00:00';
+      */
+      if (m_type_handler->is_timestamp_type())
+        m_type_handler= &type_handler_datetime;
     }
     else
     {
@@ -754,7 +840,16 @@ Type_handler_hybrid_field_type::aggregate_for_min_max(const Type_handler *h)
   }
   else if (a == TIME_RESULT || b == TIME_RESULT)
   {
-    if ((a == TIME_RESULT) + (b == TIME_RESULT) == 1)
+    if (m_type_handler->is_timestamp_type() + h->is_timestamp_type() == 1)
+    {
+      /*
+        Handle LEAST(TIMESTAMP, non-TIMESTAMP) as DATETIME,
+        to make sure fuzzy dates work in this context:
+          LEAST('2001-00-00', timestamp_field)
+      */
+      m_type_handler= &type_handler_datetime2;
+    }
+    else if ((a == TIME_RESULT) + (b == TIME_RESULT) == 1)
     {
       /*
         We're here if there's only one temporal data type:
@@ -2468,6 +2563,22 @@ int Type_handler_temporal_with_date::Item_save_in_field(Item *item,
 }
 
 
+int Type_handler_timestamp_common::Item_save_in_field(Item *item,
+                                                      Field *field,
+                                                      bool no_conversions)
+                                                      const
+{
+  StringBuffer<STRING_BUFFER_TIMESTAMP_BINARY_SIZE> raw;
+  if (Item_val_raw_with_conversion(item, &raw))
+    return set_field_to_null_with_conversions(field, no_conversions);
+  field->set_notnull();
+  Timestamp_or_zero_datetime tm(&raw);
+  return tm.is_zero_datetime() ?
+         field->store_timestamp(0, 0) :
+         field->store_timestamp(tm.tm().tv_sec, tm.tm().tv_usec);
+}
+
+
 int Type_handler_string_result::Item_save_in_field(Item *item, Field *field,
                                                    bool no_conversions) const
 {
@@ -2534,6 +2645,10 @@ Type_handler_temporal_with_date::set_comparator_func(Arg_comparator *cmp) const
   return cmp->set_cmp_func_datetime();
 }
 
+bool Type_handler_timestamp_common::set_comparator_func(Arg_comparator *cmp) const
+{
+  return cmp->set_cmp_func_raw();
+}
 
 /*************************************************************************/
 
@@ -2663,7 +2778,7 @@ Type_handler_string_result::Item_get_cache(THD *thd, const Item *item) const
 Item_cache *
 Type_handler_timestamp_common::Item_get_cache(THD *thd, const Item *item) const
 {
-  return new (thd->mem_root) Item_cache_datetime(thd);
+  return new (thd->mem_root) Item_cache_timestamp(thd);
 }
 
 Item_cache *
@@ -3553,6 +3668,12 @@ longlong Type_handler_temporal_result::
   return func->val_int_cmp_temporal();
 }
 
+longlong Type_handler_timestamp_common::
+           Item_func_between_val_int(Item_func_between *func) const
+{
+  return func->val_int_cmp_raw();
+}
+
 longlong Type_handler_int_result::
            Item_func_between_val_int(Item_func_between *func) const
 {
@@ -3777,6 +3898,17 @@ String *Type_handler_temporal_result::
 }
 
 
+String *Type_handler_timestamp_common::
+          Item_func_min_max_val_str(Item_func_min_max *func, String *str) const
+{
+  MYSQL_TIME ltime;
+  if (get_date_from_val_raw_native(func, &ltime) ||
+      my_TIME_to_str(&ltime, str, func->decimals))
+    return NULL;
+  return str;
+}
+
+
 String *Type_handler_int_result::
           Item_func_min_max_val_str(Item_func_min_max *func, String *str) const
 {
@@ -3815,6 +3947,16 @@ double Type_handler_temporal_result::
 }
 
 
+double Type_handler_timestamp_common::
+         Item_func_min_max_val_real(Item_func_min_max *func) const
+{
+  MYSQL_TIME ltime;
+  if (get_date_from_val_raw_native(func, &ltime))
+    return 0;
+  return TIME_to_double(&ltime);
+}
+
+
 double Type_handler_numeric::
          Item_func_min_max_val_real(Item_func_min_max *func) const
 {
@@ -3839,6 +3981,16 @@ longlong Type_handler_temporal_result::
 }
 
 
+longlong Type_handler_timestamp_common::
+         Item_func_min_max_val_int(Item_func_min_max *func) const
+{
+  MYSQL_TIME ltime;
+  if (get_date_from_val_raw_native(func, &ltime))
+    return 0;
+  return TIME_to_ulonglong(&ltime);
+}
+
+
 longlong Type_handler_numeric::
          Item_func_min_max_val_int(Item_func_min_max *func) const
 {
@@ -3873,6 +4025,17 @@ my_decimal *Type_handler_temporal_result::
 }
 
 
+my_decimal *Type_handler_timestamp_common::
+            Item_func_min_max_val_decimal(Item_func_min_max *func,
+                                          my_decimal *dec) const
+{
+  MYSQL_TIME ltime;
+  if (get_date_from_val_raw_native(func, &ltime))
+    return 0;
+  return date2my_decimal(&ltime, dec);
+}
+
+
 bool Type_handler_string_result::
        Item_func_min_max_get_date(Item_func_min_max *func,
                                   MYSQL_TIME *ltime, ulonglong fuzzydate) const
@@ -3902,6 +4065,13 @@ bool Type_handler_temporal_result::
   return func->get_date_native(ltime, fuzzydate);
 }
 
+bool Type_handler_timestamp_common::
+       Item_func_min_max_get_date(Item_func_min_max *func,
+                                  MYSQL_TIME *ltime, ulonglong fuzzydate) const
+{
+  return get_date_from_val_raw_native(func, ltime);
+}
+
 /***************************************************************************/
 
 /**
@@ -5170,6 +5340,20 @@ bool Type_handler::
 }
 
 
+bool Type_handler::Item_send_timestamp(Item *item,
+                                       Protocol *protocol,
+                                       st_value *buf) const
+{
+  StringBuffer<STRING_BUFFER_TIMESTAMP_BINARY_SIZE> raw;
+  MYSQL_TIME ltime;
+  if (item->val_raw_native(&raw))
+    return protocol->store_null();
+  return Timestamp_or_zero_datetime(&raw).to_TIME(current_thd, &ltime, 0) ?
+         protocol->store_null() :
+         protocol->store(&ltime, item->decimals);
+}
+
+
 bool Type_handler::
        Item_send_datetime(Item *item, Protocol *protocol, st_value *buf) const
 {
@@ -5606,3 +5790,101 @@ void Type_handler_geometry::Item_param_set_param_func(Item_param *param,
 #endif
 
 /***************************************************************************/
+
+
+bool Type_handler_timestamp_common::TIME_to_raw(THD *thd,
+                                                const MYSQL_TIME *ltime,
+                                                String *raw,
+                                                uint decimals) const
+{
+  uint error_code;
+  Timestamp_or_zero_datetime tm(thd, ltime, &error_code);
+  if (error_code)
+    return true;
+  tm.trunc(decimals);
+  return tm.to_raw(raw, decimals);
+}
+
+
+bool
+Type_handler_timestamp_common::Item_val_raw_with_conversion(Item *item,
+                                                            String *raw)
+                                                            const
+{
+  MYSQL_TIME ltime;
+  if (item->type_handler()->is_timestamp_type())
+    return item->val_raw_native(raw);
+  return
+    item->get_date(&ltime, TIME_NO_ZERO_IN_DATE) ||
+    TIME_to_raw(current_thd, &ltime, raw, item->datetime_precision());
+}
+
+
+bool
+Type_handler_timestamp_common::Item_val_raw_with_conversion_result(Item *item,
+                                                                   String *raw)
+                                                                   const
+{
+  MYSQL_TIME ltime;
+  if (item->type_handler()->is_timestamp_type())
+    return item->val_raw_native_result(raw);
+  return
+    item->get_date_result(&ltime, TIME_NO_ZERO_IN_DATE) ||
+    TIME_to_raw(current_thd, &ltime, raw, item->datetime_precision());
+}
+
+
+int Type_handler_timestamp_common::cmp_raw(const String *a,
+                                           const String *b) const
+{
+  /*
+    Optimize a simple case:
+    Either both timeatamp values have the same fractional precision,
+    or both values are zero datetime '0000-00-00 00:00:00.000000',
+  */
+  if (a->length() == b->length())
+    return memcmp(a->ptr(), b->ptr(), a->length());
+  return Timestamp_or_zero_datetime(a).cmp(Timestamp_or_zero_datetime(b));
+}
+
+
+bool
+Type_handler_timestamp_common::get_date_from_val_raw_native(Item *item,
+                                                            MYSQL_TIME *ltime)
+                                                            const
+{
+  StringBuffer<STRING_BUFFER_TIMESTAMP_BINARY_SIZE> raw;
+  if (item->val_raw_native(&raw))
+  {
+    DBUG_ASSERT(item->null_value);
+    return true;
+  }
+  return item->null_value= Timestamp_or_zero_datetime(&raw).
+                             to_TIME(current_thd, ltime, 0);
+}
+
+
+bool
+Type_handler::Item_param_val_raw_native(Item_param *item, String *raw) const
+{
+  DBUG_ASSERT(0);
+  return item->null_value= true;
+}
+
+
+bool
+Type_handler_timestamp_common::Item_param_val_raw_native(Item_param *item,
+                                                         String *raw) const
+{
+  /*
+    The below code may not run well in corner cases.
+    This will be fixed under terms of MDEV-14271.
+    Item_param should:
+    - either remember @@time_zone at bind time
+    - or store TIMESTAMP in my_time_t format, rather than in MYSQL_TIME format.
+  */
+  MYSQL_TIME ltime;
+  return
+    item->get_date(&ltime, TIME_NO_ZERO_IN_DATE) ||
+    TIME_to_raw(current_thd, &ltime, raw, item->datetime_precision());
+}
diff --git a/sql/sql_type.h b/sql/sql_type.h
index 6dbd51a1ebd..fdc3caf900b 100644
--- a/sql/sql_type.h
+++ b/sql/sql_type.h
@@ -26,6 +26,87 @@
 #include "sql_const.h"
 #include "my_time.h"
 
+
+class Timestamp
+{
+  uint binary_length_to_precision(uint length);
+protected:
+  struct timeval m_tm;
+public:
+  int cmp(const Timestamp &other) const
+  {
+    return m_tm.tv_sec < other.m_tm.tv_sec   ? -1 :
+           m_tm.tv_sec > other.m_tm.tv_sec   ? +1 :
+           m_tm.tv_usec < other.m_tm.tv_usec ? -1 :
+           m_tm.tv_usec > other.m_tm.tv_usec ? +1 : 0;
+  }
+  const struct timeval &tm() const { return m_tm; }
+  void trunc(uint decimals) { my_timeval_trunc(&m_tm, decimals); }
+  bool to_TIME(THD *thd, MYSQL_TIME *ltime, ulonglong fuzzydate) const;
+  bool to_raw(String *to, uint decimals) const;
+  void from_raw(const char *str, uint length);
+};
+
+
+/*
+  A helper class to store MariaDB TIMESTAMP values, which can be:
+  - real TIMESTAMP (seconds and microseconds since epoch), or
+  - zero datetime '0000-00-00 00:00:00.000000'
+*/
+class Timestamp_or_zero_datetime: public Timestamp
+{
+  bool m_is_zero_datetime;
+public:
+  Timestamp_or_zero_datetime(const char *rawstr, uint length)
+  {
+    from_raw(rawstr, length);
+  }
+  Timestamp_or_zero_datetime(const String *raw)
+  {
+    from_raw(raw->ptr(), raw->length());
+  }
+  Timestamp_or_zero_datetime(bool is_zero_datetime, const struct timeval tm)
+  {
+    m_is_zero_datetime= is_zero_datetime;
+    m_tm= tm;
+  }
+  Timestamp_or_zero_datetime(THD *thd, const MYSQL_TIME *ltime, uint *err_code);
+  bool is_zero_datetime() const { return m_is_zero_datetime; }
+  const struct timeval &tm() const
+  {
+    DBUG_ASSERT(!is_zero_datetime());
+    return Timestamp::tm();
+  }
+  void trunc(uint decimals)
+  {
+    if (!is_zero_datetime())
+     Timestamp::trunc(decimals);
+  }
+  int cmp(const Timestamp_or_zero_datetime &other) const
+  {
+    if (is_zero_datetime())
+      return other.is_zero_datetime() ? 0 : -1;
+    if (other.is_zero_datetime())
+      return 1;
+    return Timestamp::cmp(other);
+  }
+  bool to_TIME(THD *thd, MYSQL_TIME *to, ulonglong fuzzydate) const;
+  /*
+    Convert to raw format:
+    - Real timestamps are encoded in the same way how Field_timestamp2 stores
+      values (big endian seconds followed by big endian microseconds)
+    - Zero datetime '0000-00-00 00:00:00.000000' is encoded as empty string.
+    Two raw values are binary comparable.
+  */
+  bool to_raw(String *to, uint decimals) const;
+  void from_raw(const char *str, uint length)
+  {
+    if (!(m_is_zero_datetime= (length == 0)))
+      Timestamp::from_raw(str, length);
+  }
+};
+
+
 class Field;
 class Column_definition;
 class Item;
@@ -590,6 +671,7 @@ class Type_handler
   bool Item_send_double(Item *item, Protocol *protocol, st_value *buf) const;
   bool Item_send_time(Item *item, Protocol *protocol, st_value *buf) const;
   bool Item_send_date(Item *item, Protocol *protocol, st_value *buf) const;
+  bool Item_send_timestamp(Item *item, Protocol *protocol, st_value *buf) const;
   bool Item_send_datetime(Item *item, Protocol *protocol, st_value *buf) const;
   bool Column_definition_prepare_stage2_legacy(Column_definition *c,
                                                enum_field_types type)
@@ -812,6 +894,7 @@ class Type_handler
                                          Item_param *param,
                                          const Type_all_attributes *attr,
                                          const st_value *value) const= 0;
+  virtual bool Item_param_val_raw_native(Item_param *item, String *raw) const;
   virtual bool Item_send(Item *item, Protocol *p, st_value *buf) const= 0;
   virtual int Item_save_in_field(Item *item, Field *field,
                                  bool no_conversions) const= 0;
@@ -888,6 +971,11 @@ class Type_handler
     DBUG_ASSERT(0);
     return NULL;
   }
+  virtual int cmp_raw(const String *a, const String *b) const
+  {
+    DBUG_ASSERT(0);
+    return 0;
+  }
   virtual bool set_comparator_func(Arg_comparator *cmp) const= 0;
   virtual bool Item_hybrid_func_fix_attributes(THD *thd,
                                                const char *name,
@@ -905,6 +993,15 @@ class Type_handler
   virtual
   bool Item_sum_variance_fix_length_and_dec(Item_sum_variance *) const= 0;
 
+  virtual bool Item_val_raw_with_conversion(Item *item, String *raw) const
+  {
+    return true;
+  }
+  virtual bool Item_val_raw_with_conversion_result(Item *item,
+                                                   String *raw) const
+  {
+    return true;
+  }
   virtual bool Item_val_bool(Item *item) const= 0;
   virtual longlong Item_val_int_signed_typecast(Item *item) const= 0;
   virtual longlong Item_val_int_unsigned_typecast(Item *item) const= 0;
@@ -2299,6 +2396,9 @@ class Type_handler_datetime2: public Type_handler_datetime_common
 class Type_handler_timestamp_common: public Type_handler_temporal_with_date
 {
   static const Name m_name_timestamp;
+protected:
+  bool TIME_to_raw(THD *, const MYSQL_TIME *ltime, String *raw, uint dec) const;
+  bool get_date_from_val_raw_native(Item *item, MYSQL_TIME *ltime) const;
 public:
   virtual ~Type_handler_timestamp_common() {}
   const Name name() const { return m_name_timestamp; }
@@ -2312,6 +2412,16 @@ class Type_handler_timestamp_common: public Type_handler_temporal_with_date
   {
     return true;
   }
+  bool Item_val_raw_with_conversion(Item *item, String *raw) const;
+  bool Item_val_raw_with_conversion_result(Item *item, String *raw) const;
+  bool Item_param_val_raw_native(Item_param *item, String *raw) const;
+  int cmp_raw(const String *a, const String *b) const;
+  longlong Item_func_between_val_int(Item_func_between *func) const;
+  void make_sort_key(uchar *to, Item *item, const SORT_FIELD_ATTR *sort_field,
+                     Sort_param *param) const;
+  void sortlength(THD *thd,
+                  const Type_std_attributes *item,
+                  SORT_FIELD_ATTR *attr) const;
   bool Column_definition_fix_attributes(Column_definition *c) const;
   uint Item_decimal_scale(const Item *item) const
   {
@@ -2324,15 +2434,24 @@ class Type_handler_timestamp_common: public Type_handler_temporal_with_date
   }
   bool Item_send(Item *item, Protocol *protocol, st_value *buf) const
   {
-    return Item_send_datetime(item, protocol, buf);
+    return Item_send_timestamp(item, protocol, buf);
   }
+  int Item_save_in_field(Item *item, Field *field, bool no_conversions) const;
   String *print_item_value(THD *thd, Item *item, String *str) const;
   Item_cache *Item_get_cache(THD *thd, const Item *item) const;
+  bool set_comparator_func(Arg_comparator *cmp) const;
   bool Item_hybrid_func_fix_attributes(THD *thd,
                                        const char *name,
                                        Type_handler_hybrid_field_type *,
                                        Type_all_attributes *atrr,
                                        Item **items, uint nitems) const;
+  String *Item_func_min_max_val_str(Item_func_min_max *, String *) const;
+  double Item_func_min_max_val_real(Item_func_min_max *) const;
+  longlong Item_func_min_max_val_int(Item_func_min_max *) const;
+  my_decimal *Item_func_min_max_val_decimal(Item_func_min_max *,
+                                            my_decimal *) const;
+  bool Item_func_min_max_get_date(Item_func_min_max*,
+                                  MYSQL_TIME *, ulonglong fuzzydate) const;
   void Item_param_set_param_func(Item_param *param,
                                  uchar **pos, ulong len) const;
 };