← Back to team overview

maria-developers team mailing list archive

Please review: sql_yacc.yy and HA_CREATE_INFO refactoring (pre-requisite for MDEV-5359 CREATE OR REPLACE, CREATE IF NOT EXISTS, DROP IF EXISTS)

 

Hi Sergei,

The patch is attached.

Thanks.

diff --git a/sql/handler.h b/sql/handler.h
index 0044556..7b6a5f4 100644
--- a/sql/handler.h
+++ b/sql/handler.h
@@ -376,10 +376,7 @@ enum enum_alter_inplace_result {
 #define HA_KEY_BLOB_LENGTH	2
 
 #define HA_LEX_CREATE_TMP_TABLE	1
-#define HA_LEX_CREATE_IF_NOT_EXISTS 2
-#define HA_LEX_CREATE_TABLE_LIKE 4
 #define HA_CREATE_TMP_ALTER     8
-#define HA_LEX_CREATE_REPLACE   16
 #define HA_MAX_REC_LENGTH	65535
 
 /* Table caching type */
@@ -1571,9 +1568,9 @@ enum enum_stats_auto_recalc { HA_STATS_AUTO_RECALC_DEFAULT= 0,
                               HA_STATS_AUTO_RECALC_ON,
                               HA_STATS_AUTO_RECALC_OFF };
 
-struct HA_CREATE_INFO
+struct Table_contents_source_st
 {
-  CHARSET_INFO *table_charset, *default_table_charset;
+  CHARSET_INFO *table_charset;
   LEX_CUSTRING tabledef_version;
   LEX_STRING connect_string;
   const char *password, *tablespace;
@@ -1593,7 +1590,6 @@ struct HA_CREATE_INFO
   uint stats_sample_pages;
   uint null_bits;                       /* NULL bits at start of record */
   uint options;				/* OR of HA_CREATE_ options */
-  uint org_options;                     /* original options from query */
   uint merge_insert_method;
   uint extra_size;                      /* length of extra data segment */
   SQL_I_List<TABLE_LIST> merge_list;
@@ -1626,7 +1622,11 @@ struct HA_CREATE_INFO
   MDL_ticket *mdl_ticket;
   bool table_was_deleted;
 
-  bool tmp_table() { return options & HA_LEX_CREATE_TMP_TABLE; }
+  void init()
+  {
+    bzero(this, sizeof(*this));
+  }
+  bool tmp_table() const { return options & HA_LEX_CREATE_TMP_TABLE; }
   void use_default_db_type(THD *thd)
   {
     db_type= tmp_table() ? ha_default_tmp_handlerton(thd)
@@ -1635,6 +1635,55 @@ struct HA_CREATE_INFO
 };
 
 
+struct Schema_specification_st
+{
+  CHARSET_INFO *default_table_charset;
+  void init()
+  {
+    bzero(this, sizeof(*this));
+  }
+};
+
+
+struct HA_CREATE_INFO: public Table_contents_source_st,
+                       public Schema_specification_st
+{
+  void init()
+  {
+    Table_contents_source_st::init();
+    Schema_specification_st::init();
+  }
+};
+
+
+struct Table_specification_st: public HA_CREATE_INFO,
+                               public DDL_options_st
+{
+  // Deep initialization
+  void init()
+  {
+    HA_CREATE_INFO::init();
+    DDL_options_st::init();
+  }
+  void init(DDL_options_st::Options options)
+  {
+    HA_CREATE_INFO::init();
+    DDL_options_st::init(options);
+  }
+  /*
+    Quick initialization, for parser.
+    Most of the HA_CREATE_INFO is left uninitialized.
+    It gets fully initialized in sql_yacc.yy, only when the parser
+    scans a related keyword (e.g. CREATE, ALTER).
+  */
+  void lex_start()
+  {
+    HA_CREATE_INFO::options= 0;
+    DDL_options_st::init();
+  }
+};
+
+
 /**
   In-place alter handler context.
 
diff --git a/sql/log_event.cc b/sql/log_event.cc
index 7617077..57623a7 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -3204,13 +3204,13 @@ bool Query_log_event::write(IO_CACHE* file)
   switch (lex->sql_command)
   {
     case SQLCOM_DROP_TABLE:
-      use_cache= (lex->drop_temporary && thd->in_multi_stmt_transaction_mode());
+      use_cache= (lex->tmp_table() && thd->in_multi_stmt_transaction_mode());
     break;
 
     case SQLCOM_CREATE_TABLE:
       trx_cache= (lex->select_lex.item_list.elements &&
                   thd->is_current_stmt_binlog_format_row());
-      use_cache= (lex->create_info.tmp_table() &&
+      use_cache= (lex->tmp_table() &&
                    thd->in_multi_stmt_transaction_mode()) || trx_cache;
       break;
     case SQLCOM_SET_OPTION:
@@ -4348,7 +4348,8 @@ Query partially completed on the master (error on master: %d) \
       has already been dropped. To ignore such irrelevant "table does
       not exist errors", we silently clear the error if TEMPORARY was used.
     */
-    if (thd->lex->sql_command == SQLCOM_DROP_TABLE && thd->lex->drop_temporary &&
+    if (thd->lex->sql_command == SQLCOM_DROP_TABLE &&
+        thd->lex->tmp_table() &&
         thd->is_error() && thd->get_stmt_da()->sql_errno() == ER_BAD_TABLE_ERROR &&
         !expected_error)
       thd->get_stmt_da()->reset_diagnostics_area();
diff --git a/sql/sp_head.cc b/sql/sp_head.cc
index 296135c..f67d4c6 100644
--- a/sql/sp_head.cc
+++ b/sql/sp_head.cc
@@ -281,13 +281,13 @@ bool Item_splocal::append_for_log(THD *thd, String *str)
     flags= sp_head::CONTAINS_DYNAMIC_SQL;
     break;
   case SQLCOM_CREATE_TABLE:
-    if (lex->create_info.tmp_table())
+    if (lex->tmp_table())
       flags= 0;
     else
       flags= sp_head::HAS_COMMIT_OR_ROLLBACK;
     break;
   case SQLCOM_DROP_TABLE:
-    if (lex->drop_temporary)
+    if (lex->tmp_table())
       flags= 0;
     else
       flags= sp_head::HAS_COMMIT_OR_ROLLBACK;
@@ -4040,7 +4040,7 @@ uchar *sp_table_key(const uchar *ptr, size_t *plen, my_bool first)
   SP_TABLE *tab;
 
   if (lex_for_tmp_check->sql_command == SQLCOM_DROP_TABLE &&
-      lex_for_tmp_check->drop_temporary)
+      lex_for_tmp_check->tmp_table())
     return TRUE;
 
   for (uint i= 0 ; i < m_sptabs.records ; i++)
@@ -4105,7 +4105,7 @@ uchar *sp_table_key(const uchar *ptr, size_t *plen, my_bool first)
           return FALSE;
         if (lex_for_tmp_check->sql_command == SQLCOM_CREATE_TABLE &&
             lex_for_tmp_check->query_tables == table &&
-            lex_for_tmp_check->create_info.tmp_table())
+            lex_for_tmp_check->tmp_table())
         {
           tab->temp= TRUE;
           tab->qname.length= temp_table_key_length;
diff --git a/sql/sql_base.cc b/sql/sql_base.cc
index 0bbcca5..0edda65 100644
--- a/sql/sql_base.cc
+++ b/sql/sql_base.cc
@@ -1923,7 +1923,9 @@ bool MDL_deadlock_handler::handle_condition(THD *,
 */
 
 static bool
-open_table_get_mdl_lock(THD *thd, Open_table_context *ot_ctx,
+open_table_get_mdl_lock(THD *thd,
+                        const DDL_options_st &options,
+                        Open_table_context *ot_ctx,
                         MDL_request *mdl_request,
                         uint flags,
                         MDL_ticket **mdl_ticket)
@@ -2059,7 +2061,8 @@ bool MDL_deadlock_handler::handle_condition(THD *,
                 TABLE_LIST::view is set for views).
 */
 
-bool open_table(THD *thd, TABLE_LIST *table_list, MEM_ROOT *mem_root,
+bool open_table(THD *thd, const DDL_options_st &options,
+                TABLE_LIST *table_list, MEM_ROOT *mem_root,
                 Open_table_context *ot_ctx)
 {
   TABLE *table;
@@ -2277,7 +2280,7 @@ bool open_table(THD *thd, TABLE_LIST *table_list, MEM_ROOT *mem_root,
       ot_ctx->set_has_protection_against_grl();
     }
 
-    if (open_table_get_mdl_lock(thd, ot_ctx, &table_list->mdl_request,
+    if (open_table_get_mdl_lock(thd, options, ot_ctx, &table_list->mdl_request,
                                 flags, &mdl_ticket) ||
         mdl_ticket == NULL)
     {
@@ -3453,7 +3456,8 @@ static bool auto_repair_table(THD *thd, TABLE_LIST *table_list)
       break;
     case OT_DISCOVER:
       {
-        if ((result= lock_table_names(m_thd, m_failed_table, NULL,
+        if ((result= lock_table_names(m_thd, m_thd->lex->create_info,
+                                      m_failed_table, NULL,
                                       get_timeout(), 0)))
           break;
 
@@ -3484,7 +3488,8 @@ static bool auto_repair_table(THD *thd, TABLE_LIST *table_list)
       }
     case OT_REPAIR:
       {
-        if ((result= lock_table_names(m_thd, m_failed_table, NULL,
+        if ((result= lock_table_names(m_thd, m_thd->lex->create_info,
+                                      m_failed_table, NULL,
                                       get_timeout(), 0)))
           break;
 
@@ -3760,7 +3765,8 @@ thr_lock_type read_lock_type_for_table(THD *thd,
 */
 
 static bool
-open_and_process_table(THD *thd, LEX *lex, TABLE_LIST *tables,
+open_and_process_table(THD *thd, LEX *lex,
+                       const DDL_options_st &options, TABLE_LIST *tables,
                        uint *counter, uint flags,
                        Prelocking_strategy *prelocking_strategy,
                        bool has_prelocking_list,
@@ -3813,7 +3819,7 @@ thr_lock_type read_lock_type_for_table(THD *thd,
         We still need to take a MDL lock on the merged view to protect
         it from concurrent changes.
       */
-      if (!open_table_get_mdl_lock(thd, ot_ctx, &tables->mdl_request,
+      if (!open_table_get_mdl_lock(thd, options, ot_ctx, &tables->mdl_request,
                                    flags, &mdl_ticket) &&
           mdl_ticket != NULL)
         goto process_view_routines;
@@ -3897,7 +3903,7 @@ thr_lock_type read_lock_type_for_table(THD *thd,
     error= open_temporary_table(thd, tables);
 
     if (!error && !tables->table)
-      error= open_table(thd, tables, new_frm_mem, ot_ctx);
+      error= open_table(thd, options, tables, new_frm_mem, ot_ctx);
 
     thd->pop_internal_handler();
     safe_to_ignore_table= no_such_table_handler.safely_trapped_errors();
@@ -3915,7 +3921,7 @@ thr_lock_type read_lock_type_for_table(THD *thd,
 
     error= open_temporary_table(thd, tables);
     if (!error && !tables->table)
-      error= open_table(thd, tables, new_frm_mem, ot_ctx);
+      error= open_table(thd, options, tables, new_frm_mem, ot_ctx);
 
     thd->pop_internal_handler();
     safe_to_ignore_table= repair_mrg_table_handler.safely_trapped_errors();
@@ -3933,7 +3939,7 @@ thr_lock_type read_lock_type_for_table(THD *thd,
     }
 
     if (!error && !tables->table)
-      error= open_table(thd, tables, new_frm_mem, ot_ctx);
+      error= open_table(thd, options, tables, new_frm_mem, ot_ctx);
   }
 
   free_root(new_frm_mem, MYF(MY_KEEP_PREALLOC));
@@ -4132,7 +4138,7 @@ thr_lock_type read_lock_type_for_table(THD *thd,
 */
 
 bool
-lock_table_names(THD *thd,
+lock_table_names(THD *thd, const DDL_options_st &options,
                  TABLE_LIST *tables_start, TABLE_LIST *tables_end,
                  ulong lock_wait_timeout, uint flags)
 {
@@ -4176,8 +4182,8 @@ thr_lock_type read_lock_type_for_table(THD *thd,
     DBUG_RETURN(FALSE);
 
   /* Check if CREATE TABLE without REPLACE was used */
-  create_table= (thd->lex->sql_command == SQLCOM_CREATE_TABLE &&
-                 !(thd->lex->create_info.options & HA_LEX_CREATE_REPLACE));
+  create_table= thd->lex->sql_command == SQLCOM_CREATE_TABLE &&
+                !options.or_replace();
 
   if (!(flags & MYSQL_OPEN_SKIP_SCOPED_MDL_LOCK))
   {
@@ -4231,7 +4237,7 @@ thr_lock_type read_lock_type_for_table(THD *thd,
     */
     if (ha_table_exists(thd, tables_start->db, tables_start->table_name))
     {
-      if (thd->lex->create_info.options & HA_LEX_CREATE_IF_NOT_EXISTS)
+      if (options.if_not_exists())
       {
         push_warning_printf(thd, Sql_condition::WARN_LEVEL_NOTE,
                             ER_TABLE_EXISTS_ERROR, ER(ER_TABLE_EXISTS_ERROR),
@@ -4344,8 +4350,9 @@ thr_lock_type read_lock_type_for_table(THD *thd,
   @retval  TRUE   Error, reported.
 */
 
-bool open_tables(THD *thd, TABLE_LIST **start, uint *counter, uint flags,
-                Prelocking_strategy *prelocking_strategy)
+bool open_tables(THD *thd, const DDL_options_st &options,
+                 TABLE_LIST **start, uint *counter, uint flags,
+                 Prelocking_strategy *prelocking_strategy)
 {
   /*
     We use pointers to "next_global" member in the last processed
@@ -4434,7 +4441,8 @@ bool open_tables(THD *thd, TABLE_LIST **start, uint *counter, uint flags,
     else
     {
       TABLE_LIST *table;
-      if (lock_table_names(thd, *start, thd->lex->first_not_own_table(),
+      if (lock_table_names(thd, options, *start,
+                           thd->lex->first_not_own_table(),
                            ot_ctx.get_timeout(), flags))
       {
         error= TRUE;
@@ -4464,7 +4472,7 @@ bool open_tables(THD *thd, TABLE_LIST **start, uint *counter, uint flags,
     for (tables= *table_to_open; tables;
          table_to_open= &tables->next_global, tables= tables->next_global)
     {
-      error= open_and_process_table(thd, thd->lex, tables, counter,
+      error= open_and_process_table(thd, thd->lex, options, tables, counter,
                                     flags, prelocking_strategy,
                                     has_prelocking_list, &ot_ctx,
                                     &new_frm_mem);
@@ -4971,8 +4979,7 @@ TABLE *open_n_lock_single_table(THD *thd, TABLE_LIST *table_l,
   table_l->required_type= FRMTYPE_TABLE;
 
   /* Open the table. */
-  if (open_and_lock_tables(thd, table_l, FALSE, flags,
-                           prelocking_strategy))
+  if (open_and_lock_tables(thd, table_l, FALSE, flags, prelocking_strategy))
     table_l->table= NULL; /* Just to be sure. */
 
   /* Restore list. */
@@ -5117,7 +5124,8 @@ TABLE *open_ltable(THD *thd, TABLE_LIST *table_list, thr_lock_type lock_type,
   @retval TRUE   Error
 */
 
-bool open_and_lock_tables(THD *thd, TABLE_LIST *tables,
+bool open_and_lock_tables(THD *thd, const DDL_options_st &options,
+                          TABLE_LIST *tables,
                           bool derived, uint flags,
                           Prelocking_strategy *prelocking_strategy)
 {
@@ -5126,7 +5134,8 @@ bool open_and_lock_tables(THD *thd, TABLE_LIST *tables,
   DBUG_ENTER("open_and_lock_tables");
   DBUG_PRINT("enter", ("derived handling: %d", derived));
 
-  if (open_tables(thd, &tables, &counter, flags, prelocking_strategy))
+  if (open_tables(thd, options,
+                  &tables, &counter, flags, prelocking_strategy))
     goto err;
 
   DBUG_EXECUTE_IF("sleep_open_and_lock_after_open", {
@@ -9314,8 +9323,8 @@ bool is_equal(const LEX_STRING *a, const LEX_STRING *b)
   thd->reset_n_backup_open_tables_state(backup);
 
   if (open_and_lock_tables(thd, table_list, FALSE,
-                           MYSQL_OPEN_IGNORE_FLUSH |
-                           MYSQL_LOCK_IGNORE_TIMEOUT))
+                            MYSQL_OPEN_IGNORE_FLUSH |
+                            MYSQL_LOCK_IGNORE_TIMEOUT))
   {
     lex->restore_backup_query_tables_list(&query_tables_list_backup);
     thd->restore_backup_open_tables_state(backup);
diff --git a/sql/sql_base.h b/sql/sql_base.h
index e39ec16..9d60400 100644
--- a/sql/sql_base.h
+++ b/sql/sql_base.h
@@ -117,9 +117,15 @@ TABLE *open_ltable(THD *thd, TABLE_LIST *table_list, thr_lock_type update,
                             MYSQL_OPEN_GET_NEW_TABLE |\
                             MYSQL_OPEN_HAS_MDL_LOCK)
 
-bool open_table(THD *thd, TABLE_LIST *table_list, MEM_ROOT *mem_root,
+bool open_table(THD *thd, const DDL_options_st &options,
+                TABLE_LIST *table_list, MEM_ROOT *mem_root,
                 Open_table_context *ot_ctx);
-
+static inline bool
+open_table(THD *thd, TABLE_LIST *table_list, MEM_ROOT *mem_root,
+           Open_table_context *ot_ctx)
+{
+  return open_table(thd, thd->lex->create_info, table_list, mem_root, ot_ctx);
+}
 bool open_new_frm(THD *thd, TABLE_SHARE *share, const char *alias,
                   uint db_stat, uint prgflag,
                   uint ha_open_flags, TABLE *outparam, TABLE_LIST *table_desc,
@@ -226,15 +232,41 @@ int setup_conds(THD *thd, TABLE_LIST *tables, List<TABLE_LIST> &leaves,
 void wrap_ident(THD *thd, Item **conds);
 int setup_ftfuncs(SELECT_LEX* select);
 int init_ftfuncs(THD *thd, SELECT_LEX* select, bool no_order);
-bool lock_table_names(THD *thd, TABLE_LIST *table_list,
+bool lock_table_names(THD *thd, const DDL_options_st &options,
+                      TABLE_LIST *table_list,
                       TABLE_LIST *table_list_end, ulong lock_wait_timeout,
                       uint flags);
-bool open_tables(THD *thd, TABLE_LIST **tables, uint *counter, uint flags,
+static inline bool
+lock_table_names(THD *thd, TABLE_LIST *table_list,
+                 TABLE_LIST *table_list_end, ulong lock_wait_timeout,
+                 uint flags)
+{
+  return lock_table_names(thd, thd->lex->create_info, table_list,
+                          table_list_end, lock_wait_timeout, flags);
+}
+bool open_tables(THD *thd, const DDL_options_st &options,
+                 TABLE_LIST **tables, uint *counter, uint flags,
                  Prelocking_strategy *prelocking_strategy);
+static inline bool
+open_tables(THD *thd, TABLE_LIST **tables, uint *counter, uint flags,
+            Prelocking_strategy *prelocking_strategy)
+{
+  return open_tables(thd, thd->lex->create_info, tables, counter, flags,
+                     prelocking_strategy);
+}
 /* open_and_lock_tables with optional derived handling */
-bool open_and_lock_tables(THD *thd, TABLE_LIST *tables,
+bool open_and_lock_tables(THD *thd, const DDL_options_st &options,
+                          TABLE_LIST *tables,
                           bool derived, uint flags,
                           Prelocking_strategy *prelocking_strategy);
+static inline bool
+open_and_lock_tables(THD *thd, TABLE_LIST *tables,
+                     bool derived, uint flags,
+                     Prelocking_strategy *prelocking_strategy)
+{
+  return open_and_lock_tables(thd, thd->lex->create_info,
+                              tables, derived, flags, prelocking_strategy);
+}
 /* simple open_and_lock_tables without derived handling for single table */
 TABLE *open_n_lock_single_table(THD *thd, TABLE_LIST *table_l,
                                 thr_lock_type lock_type, uint flags,
@@ -460,13 +492,24 @@ class Alter_table_prelocking_strategy : public Prelocking_strategy
 
 
 inline bool
+open_tables(THD *thd, const DDL_options_st &options,
+            TABLE_LIST **tables, uint *counter, uint flags)
+{
+  DML_prelocking_strategy prelocking_strategy;
+
+  return open_tables(thd, options, tables, counter, flags,
+                     &prelocking_strategy);
+}
+inline bool
 open_tables(THD *thd, TABLE_LIST **tables, uint *counter, uint flags)
 {
   DML_prelocking_strategy prelocking_strategy;
 
-  return open_tables(thd, tables, counter, flags, &prelocking_strategy);
+  return open_tables(thd, thd->lex->create_info, tables, counter, flags,
+                     &prelocking_strategy);
 }
 
+
 inline TABLE *open_n_lock_single_table(THD *thd, TABLE_LIST *table_l,
                                        thr_lock_type lock_type, uint flags)
 {
@@ -478,12 +521,23 @@ inline TABLE *open_n_lock_single_table(THD *thd, TABLE_LIST *table_l,
 
 
 /* open_and_lock_tables with derived handling */
-inline bool open_and_lock_tables(THD *thd, TABLE_LIST *tables,
+inline bool open_and_lock_tables(THD *thd,
+                                 const DDL_options_st &options,
+                                 TABLE_LIST *tables,
                                  bool derived, uint flags)
 {
   DML_prelocking_strategy prelocking_strategy;
 
-  return open_and_lock_tables(thd, tables, derived, flags,
+  return open_and_lock_tables(thd, options, tables, derived, flags,
+                              &prelocking_strategy);
+}
+inline bool open_and_lock_tables(THD *thd, TABLE_LIST *tables,
+                                  bool derived, uint flags)
+{
+  DML_prelocking_strategy prelocking_strategy;
+
+  return open_and_lock_tables(thd, thd->lex->create_info,
+                              tables, derived, flags,
                               &prelocking_strategy);
 }
 
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index 34fa030..5e26468 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -5448,8 +5448,7 @@ int THD::decide_logging_format(TABLE_LIST *tables)
       flags_access_some_set |= flags;
 
       if (lex->sql_command != SQLCOM_CREATE_TABLE ||
-          (lex->sql_command == SQLCOM_CREATE_TABLE &&
-          lex->create_info.tmp_table()))
+          (lex->sql_command == SQLCOM_CREATE_TABLE && lex->tmp_table()))
       {
         my_bool trans= table->table->file->has_transactions();
 
diff --git a/sql/sql_class.h b/sql/sql_class.h
index 4198402..f3307ef 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -4202,7 +4202,7 @@ class select_insert :public select_result_interceptor {
 class select_create: public select_insert {
   ORDER *group;
   TABLE_LIST *create_table;
-  HA_CREATE_INFO *create_info;
+  Table_specification_st *create_info;
   TABLE_LIST *select_tables;
   Alter_info *alter_info;
   Field **field;
@@ -4214,7 +4214,7 @@ class select_create: public select_insert {
 
 public:
   select_create (TABLE_LIST *table_arg,
-		 HA_CREATE_INFO *create_info_par,
+		 Table_specification_st *create_info_par,
                  Alter_info *alter_info_arg,
 		 List<Item> &select_fields,enum_duplicates duplic, bool ignore,
                  TABLE_LIST *select_tables_arg)
diff --git a/sql/sql_db.cc b/sql/sql_db.cc
index a930cb0..7cffb8b 100644
--- a/sql/sql_db.cc
+++ b/sql/sql_db.cc
@@ -39,6 +39,9 @@
 #include "events.h"
 #include "sql_handler.h"
 #include "sql_statistics.h"
+#include "rpl_filter.h"
+#include "rpl_rli.h"
+
 #include <my_dir.h>
 #include <m_ctype.h>
 #include "log.h"
@@ -233,7 +236,7 @@ void my_dbopt_cleanup(void)
     1 on error.
 */
 
-static my_bool get_dbopt(const char *dbname, HA_CREATE_INFO *create)
+static my_bool get_dbopt(const char *dbname, Schema_specification_st *create)
 {
   my_dbopt_t *opt;
   uint length;
@@ -264,7 +267,7 @@ static my_bool get_dbopt(const char *dbname, HA_CREATE_INFO *create)
     1 on error.
 */
 
-static my_bool put_dbopt(const char *dbname, HA_CREATE_INFO *create)
+static my_bool put_dbopt(const char *dbname, Schema_specification_st *create)
 {
   my_dbopt_t *opt;
   uint length;
@@ -333,7 +336,8 @@ static void del_dbopt(const char *path)
   1	Could not create file or write to it.  Error sent through my_error()
 */
 
-static bool write_db_opt(THD *thd, const char *path, HA_CREATE_INFO *create)
+static bool write_db_opt(THD *thd, const char *path,
+                         Schema_specification_st *create)
 {
   register File file;
   char buf[256]; // Should be enough for one option
@@ -379,7 +383,7 @@ static bool write_db_opt(THD *thd, const char *path, HA_CREATE_INFO *create)
 
 */
 
-bool load_db_opt(THD *thd, const char *path, HA_CREATE_INFO *create)
+bool load_db_opt(THD *thd, const char *path, Schema_specification_st *create)
 {
   File file;
   char buf[256];
@@ -491,7 +495,7 @@ bool load_db_opt(THD *thd, const char *path, HA_CREATE_INFO *create)
 */
 
 bool load_db_opt_by_name(THD *thd, const char *db_name,
-                         HA_CREATE_INFO *db_create_info)
+                         Schema_specification_st *db_create_info)
 {
   char db_opt_path[FN_REFLEN + 1];
 
@@ -518,7 +522,7 @@ bool load_db_opt_by_name(THD *thd, const char *db_name,
 
 CHARSET_INFO *get_default_db_collation(THD *thd, const char *db_name)
 {
-  HA_CREATE_INFO db_info;
+  Schema_specification_st db_info;
 
   if (thd->db != NULL && strcmp(db_name, thd->db) == 0)
     return thd->db_charset;
@@ -537,14 +541,28 @@ CHARSET_INFO *get_default_db_collation(THD *thd, const char *db_name)
 }
 
 
+SQL::Statement::Schema::DDL_options::DDL_options(const LEX *lex)
+{
+  *this= lex->create_info;
+}
+
+
+SQL::Statement::Schema::Specification::Specification(const LEX *lex)
+{
+  *this= lex->create_info;
+}
+
+
+
 /*
   Create a database
 
   SYNOPSIS
-  mysql_create_db()
+  create_db()
   thd		Thread handler
   db		Name of database to create
 		Function assumes that this is already validated.
+  options       DDL options, e.g. IF NOT EXISTS
   create_info	Database create options (like character set)
   silent	Used by replication when internally creating a database.
 		In this case the entry should not be logged.
@@ -560,16 +578,24 @@ CHARSET_INFO *get_default_db_collation(THD *thd, const char *db_name)
   TRUE  Error
 
 */
-
-int mysql_create_db(THD *thd, char *db, HA_CREATE_INFO *create_info,
-                     bool silent)
+int
+SQL::Statement::Schema::DDL::create_db(THD *thd,
+                                       const Name &name,
+                                       const DDL_options_st options,
+                                       const Schema_specification_st &spec,
+                                       bool silent) const
 {
   char	 path[FN_REFLEN+16];
   long result= 1;
   int error= 0;
   MY_STAT stat_info;
-  uint create_options= create_info ? create_info->options : 0;
   uint path_len;
+  const char *db= name.name();
+  /*
+    write_db_opt() can modify Schema_specification_st structure passed to it,
+    so make a copy.
+  */
+  Schema_specification_st create_info(spec);
   DBUG_ENTER("mysql_create_db");
 
   /* do not create 'information_schema' db */
@@ -579,7 +605,8 @@ int mysql_create_db(THD *thd, char *db, HA_CREATE_INFO *create_info,
     DBUG_RETURN(-1);
   }
 
-  char db_tmp[SAFE_NAME_LEN], *dbnorm;
+  char db_tmp[SAFE_NAME_LEN];
+  const char *dbnorm;
   if (lower_case_table_names)
   {
     strmake_buf(db_tmp, db);
@@ -598,7 +625,7 @@ int mysql_create_db(THD *thd, char *db, HA_CREATE_INFO *create_info,
 
   if (mysql_file_stat(key_file_misc, path, &stat_info, MYF(0)))
   {
-    if (!(create_options & HA_LEX_CREATE_IF_NOT_EXISTS))
+    if (!options.if_not_exists())
     {
       my_error(ER_DB_CREATE_EXISTS, MYF(0), db);
       error= -1;
@@ -626,7 +653,7 @@ int mysql_create_db(THD *thd, char *db, HA_CREATE_INFO *create_info,
 
   path[path_len-1]= FN_LIBCHAR;
   strmake(path+path_len, MY_DB_OPT_FILE, sizeof(path)-path_len-1);
-  if (write_db_opt(thd, path, create_info))
+  if (write_db_opt(thd, path, &create_info))
   {
     /*
       Could not create options file.
@@ -702,14 +729,19 @@ int mysql_create_db(THD *thd, char *db, HA_CREATE_INFO *create_info,
 
 /* db-name is already validated when we come here */
 
-bool mysql_alter_db(THD *thd, const char *db, HA_CREATE_INFO *create_info)
+bool SQL::Statement::Schema::Alter::exec(THD *thd) const
 {
   char path[FN_REFLEN+16];
   long result=1;
   int error= 0;
+  /*
+    write_db_opt() can modify Schema_specification_st structure passed to it,
+    so make a copy.
+  */
+  Schema_specification_st create_info(*this);
   DBUG_ENTER("mysql_alter_db");
 
-  if (lock_schema_name(thd, db))
+  if (lock_schema_name(thd, name()))
     DBUG_RETURN(TRUE);
 
   /* 
@@ -717,17 +749,15 @@ bool mysql_alter_db(THD *thd, const char *db, HA_CREATE_INFO *create_info)
      We pass MY_DB_OPT_FILE as "extension" to avoid
      "table name to file name" encoding.
   */
-  build_table_filename(path, sizeof(path) - 1, db, "", MY_DB_OPT_FILE, 0);
-  if ((error=write_db_opt(thd, path, create_info)))
+  build_table_filename(path, sizeof(path) - 1, name(), "", MY_DB_OPT_FILE, 0);
+  if ((error=write_db_opt(thd, path, &create_info)))
     goto exit;
 
   /* Change options if current database is being altered. */
 
-  if (thd->db && !strcmp(thd->db,db))
+  if (thd->db && !strcmp(thd->db, name()))
   {
-    thd->db_charset= create_info->default_table_charset ?
-		     create_info->default_table_charset :
-		     thd->variables.collation_server;
+    thd->db_charset= create_info.default_table_charset;
     thd->variables.collation_database= thd->db_charset;
   }
 
@@ -741,8 +771,8 @@ bool mysql_alter_db(THD *thd, const char *db, HA_CREATE_INFO *create_info)
       database" and not the threads current database, which is the
       default.
     */
-    qinfo.db     = db;
-    qinfo.db_len = strlen(db);
+    qinfo.db     = name();
+    qinfo.db_len = strlen(name());
 
     /*
       These DDL methods and logging are protected with the exclusive
@@ -773,7 +803,8 @@ bool mysql_alter_db(THD *thd, const char *db, HA_CREATE_INFO *create_info)
   @retval  true   Error
 */
 
-bool mysql_rm_db(THD *thd,char *db,bool if_exists, bool silent)
+bool SQL::Statement::Schema::DDL::rm_db(THD *thd, const Name &name,
+                                        bool if_exists, bool silent) const
 {
   ulong deleted_tables= 0;
   bool error= true;
@@ -783,6 +814,7 @@ bool mysql_rm_db(THD *thd,char *db,bool if_exists, bool silent)
   TABLE_LIST *tables= NULL;
   TABLE_LIST *table;
   Drop_table_error_handler err_handler;
+  const char *db= name.name();
   DBUG_ENTER("mysql_rm_db");
 
   char db_tmp[SAFE_NAME_LEN], *dbnorm;
@@ -793,7 +825,7 @@ bool mysql_rm_db(THD *thd,char *db,bool if_exists, bool silent)
     dbnorm= db_tmp;
   }
   else
-    dbnorm= db;
+    dbnorm= (char *) db;
 
   if (lock_schema_name(thd, dbnorm))
     DBUG_RETURN(true);
@@ -960,7 +992,8 @@ bool mysql_rm_db(THD *thd,char *db,bool if_exists, bool silent)
           These DDL methods and logging are protected with the exclusive
           metadata lock on the schema.
         */
-        if (write_to_binlog(thd, query, query_pos -1 - query, db, db_len))
+        if (write_to_binlog(thd, query, query_pos -1 - query,
+                            (char *) db, db_len))
         {
           error= true;
           goto exit;
@@ -978,7 +1011,8 @@ bool mysql_rm_db(THD *thd,char *db,bool if_exists, bool silent)
         These DDL methods and logging are protected with the exclusive
         metadata lock on the schema.
       */
-      if (write_to_binlog(thd, query, query_pos -1 - query, db, db_len))
+      if (write_to_binlog(thd, query, query_pos -1 - query,
+                          (char *) db, db_len))
       {
         error= true;
         goto exit;
@@ -1597,20 +1631,20 @@ bool mysql_opt_change_db(THD *thd,
   @param old_db 5.0 database name, in #mysql50#name format
   @return 0 on success, 1 on error
 */
-bool mysql_upgrade_db(THD *thd, LEX_STRING *old_db)
+bool SQL::Statement::Schema::Upgrade::exec(THD *thd) const
 {
   int error= 0, change_to_newdb= 0;
   char path[FN_REFLEN+16];
   uint length;
-  HA_CREATE_INFO create_info;
+  Schema_specification_st create_info;
   MY_DIR *dirp;
   TABLE_LIST *table_list;
   SELECT_LEX *sl= thd->lex->current_select;
   LEX_STRING new_db;
   DBUG_ENTER("mysql_upgrade_db");
 
-  if ((old_db->length <= MYSQL50_TABLE_NAME_PREFIX_LENGTH) ||
-      (strncmp(old_db->str,
+  if ((name_length() <= MYSQL50_TABLE_NAME_PREFIX_LENGTH) ||
+      (strncmp(name(),
               MYSQL50_TABLE_NAME_PREFIX,
               MYSQL50_TABLE_NAME_PREFIX_LENGTH) != 0))
   {
@@ -1621,36 +1655,35 @@ bool mysql_upgrade_db(THD *thd, LEX_STRING *old_db)
   }
 
   /* `#mysql50#<name>` converted to encoded `<name>` */
-  new_db.str= old_db->str + MYSQL50_TABLE_NAME_PREFIX_LENGTH;
-  new_db.length= old_db->length - MYSQL50_TABLE_NAME_PREFIX_LENGTH;
+  new_db.str= (char *) name() + MYSQL50_TABLE_NAME_PREFIX_LENGTH;
+  new_db.length= name_length() - MYSQL50_TABLE_NAME_PREFIX_LENGTH;
 
   /* Lock the old name, the new name will be locked by mysql_create_db().*/
-  if (lock_schema_name(thd, old_db->str))
+  if (lock_schema_name(thd, name()))
     DBUG_RETURN(1);
 
   /*
     Let's remember if we should do "USE newdb" afterwards.
     thd->db will be cleared in mysql_rename_db()
   */
-  if (thd->db && !strcmp(thd->db, old_db->str))
+  if (thd->db && !strcmp(thd->db, name()))
     change_to_newdb= 1;
 
-  build_table_filename(path, sizeof(path)-1,
-                       old_db->str, "", MY_DB_OPT_FILE, 0);
+  build_table_filename(path, sizeof(path)-1, name(), "", MY_DB_OPT_FILE, 0);
   if ((load_db_opt(thd, path, &create_info)))
     create_info.default_table_charset= thd->variables.collation_server;
 
-  length= build_table_filename(path, sizeof(path)-1, old_db->str, "", "", 0);
+  length= build_table_filename(path, sizeof(path)-1, name(), "", "", 0);
   if (length && path[length-1] == FN_LIBCHAR)
     path[length-1]=0;                            // remove ending '\'
   if ((error= my_access(path,F_OK)))
   {
-    my_error(ER_BAD_DB_ERROR, MYF(0), old_db->str);
+    my_error(ER_BAD_DB_ERROR, MYF(0), name());
     goto exit;
   }
 
   /* Step1: Create the new database */
-  if ((error= mysql_create_db(thd, new_db.str, &create_info, 1)))
+  if ((error= create_db(thd, new_db, DDL_options(), create_info, true)))
     goto exit;
 
   /* Step2: Move tables to the new database */
@@ -1675,7 +1708,10 @@ bool mysql_upgrade_db(THD *thd, LEX_STRING *old_db)
       table_str.length= filename_to_tablename(file->name,
                                               tname, sizeof(tname)-1);
       table_str.str= (char*) sql_memdup(tname, table_str.length + 1);
-      Table_ident *old_ident= new Table_ident(thd, *old_db, table_str, 0);
+      LEX_STRING old_db;
+      old_db.str= (char *) name();
+      old_db.length= name_length();
+      Table_ident *old_ident= new Table_ident(thd, old_db, table_str, 0);
       Table_ident *new_ident= new Table_ident(thd, new_db, table_str, 0);
       if (!old_ident || !new_ident ||
           !sl->add_table_to_list(thd, old_ident, NULL,
@@ -1760,7 +1796,7 @@ bool mysql_upgrade_db(THD *thd, LEX_STRING *old_db)
 
       /* pass empty file name, and file->name as extension to avoid encoding */
       build_table_filename(oldname, sizeof(oldname)-1,
-                           old_db->str, "", file->name, 0);
+                           name(), "", file->name, 0);
       build_table_filename(newname, sizeof(newname)-1,
                            new_db.str, "", file->name, 0);
       mysql_file_rename(key_file_misc, oldname, newname, MYF(MY_WME));
@@ -1774,7 +1810,7 @@ bool mysql_upgrade_db(THD *thd, LEX_STRING *old_db)
     to execute them again.
     mysql_rm_db() also "unuses" if we drop the current database.
   */
-  error= mysql_rm_db(thd, old_db->str, 0, 1);
+  error= rm_db(thd, *this, false, true);
 
   /* Step8: logging */
   if (mysql_bin_log.is_open())
@@ -1823,3 +1859,104 @@ bool check_db_dir_existence(const char *db_name)
 
   return my_access(db_dir_path, F_OK);
 }
+
+
+bool SQL::Statement::Schema::Name::check() const
+{
+  LEX_CSTRING tmp= *this;
+  bool check_path_chars;
+  if ((check_path_chars= check_mysql50_prefix(name())))
+  {
+    DBUG_ASSERT(tmp.length >= MYSQL50_TABLE_NAME_PREFIX_LENGTH);
+    tmp.str+= MYSQL50_TABLE_NAME_PREFIX_LENGTH;
+    tmp.length-= MYSQL50_TABLE_NAME_PREFIX_LENGTH;
+  }
+  if (!tmp.length || tmp.length > NAME_LEN)
+    return true;
+
+  if (db_name_is_in_ignore_db_dirs_list(tmp.str))
+    return true;
+
+  return check_table_name(tmp.str, tmp.length, check_path_chars);
+}
+
+
+bool SQL::Statement::Schema::Name::check_with_error() const
+{
+  if (Schema::Name::check())
+  {
+    my_error(ER_WRONG_DB_NAME, MYF(0), name());
+    return true;
+  }
+  return false;
+}
+
+
+#ifdef HAVE_REPLICATION
+bool SQL::Statement::Schema::Name::check_if_slave_ignored(THD *thd) const
+{
+  if (thd->slave_thread)
+  {
+    Rpl_filter *rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter;
+    if (!rpl_filter->db_ok(name()) ||
+        !rpl_filter->db_ok_with_wild_table(name()))
+    {
+      my_message(ER_SLAVE_IGNORED_TABLE, ER(ER_SLAVE_IGNORED_TABLE), MYF(0));
+      return true;
+    }
+  }
+  return false;
+}
+#endif
+
+
+int SQL::Statement::Schema::DDL_with_name::exec_direct(THD *thd) const
+{
+  if (Name::check_with_error())
+    return 0;
+#ifdef HAVE_REPLICATION
+  /*
+    If in a slave thread :
+    DROP DATABASE DB may not be preceded by USE DB.
+    For that reason, maybe db_ok() in sql/slave.cc did not check the 
+    do_db/ignore_db. And as this query involves no tables, tables_ok()
+    above was not called. So we have to check rules again here.
+  */
+  if (check_if_slave_ignored(thd))
+    return 0;
+#endif
+  if (check_access(thd))
+    return 0;
+  WSREP_TO_ISOLATION_BEGIN((char *) name(), NULL, NULL)
+  return exec(thd);
+
+error:
+  return -1;
+}
+
+
+/**
+  Upgrade slightly differs from the other DDL statements:
+  - returns a different value on slave when the database is ignored
+  - the order of Rpl_filter and Name valididy tests
+  - it additionally performs my_ok() on failure
+  QQ: Perhaps some of these differences are not really necessary
+*/
+int SQL::Statement::Schema::Upgrade::exec_direct(THD *thd) const
+{
+#ifdef HAVE_REPLICATION
+  if (check_if_slave_ignored(thd))
+    return 1;
+#endif
+  if (Name::check_with_error())
+    return 0;
+  if (check_access(thd))
+    return 1;
+  WSREP_TO_ISOLATION_BEGIN((char *) name(), NULL, NULL)
+  int res;
+  if (!(res= exec(thd)))
+    my_ok(thd);
+  return res;
+error:
+  return -1;
+}
diff --git a/sql/sql_db.h b/sql/sql_db.h
index 62d379c..dc62e6b 100644
--- a/sql/sql_db.h
+++ b/sql/sql_db.h
@@ -16,14 +16,12 @@
 #ifndef SQL_DB_INCLUDED
 #define SQL_DB_INCLUDED
 
+#include "sql_lang.h"
+#include "sql_acl.h"
 #include "hash.h"                               /* HASH */
 
 class THD;
 
-int mysql_create_db(THD *thd, char *db, HA_CREATE_INFO *create, bool silent);
-bool mysql_alter_db(THD *thd, const char *db, HA_CREATE_INFO *create);
-bool mysql_rm_db(THD *thd,char *db,bool if_exists, bool silent);
-bool mysql_upgrade_db(THD *thd, LEX_STRING *old_db);
 bool mysql_change_db(THD *thd, const LEX_STRING *new_db_name,
                      bool force_switch);
 
@@ -35,13 +33,178 @@ bool mysql_opt_change_db(THD *thd,
 bool my_dboptions_cache_init(void);
 void my_dboptions_cache_free(void);
 bool check_db_dir_existence(const char *db_name);
-bool load_db_opt(THD *thd, const char *path, HA_CREATE_INFO *create);
+bool load_db_opt(THD *thd, const char *path, Schema_specification_st *create);
 bool load_db_opt_by_name(THD *thd, const char *db_name,
-                         HA_CREATE_INFO *db_create_info);
+                         Schema_specification_st *db_create_info);
 CHARSET_INFO *get_default_db_collation(THD *thd, const char *db_name);
 bool my_dbopt_init(void);
 void my_dbopt_cleanup(void);
 
+
+namespace SQL {
+
+
+namespace Statement {
+
+
+namespace Schema {
+
+
+// Schema name
+class Name: public Identifier_normalized
+{
+public:
+  Name(const LEX_STRING &other) :Identifier_normalized(other) { }
+  Name(const LEX *lex) :Identifier_normalized(lex) { }
+#ifdef HAVE_REPLICATION
+  bool check_if_slave_ignored(THD *thd) const;
+#endif
+  bool check() const;
+  bool check_with_error() const;
+};
+
+
+// Schema definition options: IF EXISTS, IF NOT EXISTS
+class DDL_options: public DDL_options_st
+{
+public:
+  DDL_options() { init(); }
+  DDL_options(Options options)
+  {
+    set(options);
+  }
+  DDL_options(const DDL_options_st options)
+  {
+    set(options);
+  }
+  DDL_options(const LEX *lex);
+};
+
+
+// Schema specification, e.g. CHARACTER SET xxx
+class Specification: public Schema_specification_st
+{
+public:
+  Specification(const Schema_specification_st &info)
+   :Schema_specification_st(info)
+  { }
+  Specification(const LEX *lex);
+};
+
+
+/**
+  A common class for all Schema DDL options:
+  - CREATE SCHEMA
+  - DROP SCHEMA
+  - ALTER SCHEMA
+  - ALTER SCHEMA UPGRADE DATA DIRECTORY NAME
+*/
+class DDL: public Directly_executable
+{
+protected:
+  bool rm_db(THD *thd, const Name &name, bool if_exists, bool silent) const;
+
+  int create_db(THD *thd,
+                const Name &name,
+                const DDL_options_st options,
+                const Schema_specification_st &info,
+                bool silent) const;
+};
+
+
+class DDL_with_name: public DDL, public Name
+{
+  /*
+    Execute a DDL statement.
+    Assumes that Name has already been checked for validity.
+  */
+  virtual bool exec(THD *thd) const = 0;
+  virtual bool check_access(THD *) const = 0;
+protected:
+  DDL_with_name(const Name &name): Name(name) { }
+  DDL_with_name(const LEX *lex): Name(lex) { }
+public:
+  int exec_direct(THD *thd) const;
+};
+
+
+class DDL_with_name_and_options: public DDL_with_name, public DDL_options
+{
+protected:
+  DDL_with_name_and_options(const LEX *lex)
+    :DDL_with_name(lex), DDL_options(lex) { }
+  DDL_with_name_and_options(const Name &name)
+    :DDL_with_name(name), DDL_options() { }
+};
+
+
+class Create: public DDL_with_name_and_options, public Specification
+{
+  bool exec(THD *thd) const
+  {
+    return DDL::create_db(thd, *this, *this, *this, false);
+  }
+  bool check_access(THD *thd) const
+  {
+    return ::check_access(thd, CREATE_ACL, name(), NULL, NULL, 1, 0);
+  }
+public:
+  Create(const LEX *lex)
+    :DDL_with_name_and_options(lex), Specification(lex)
+  { }
+  Create(const Name &name, const Schema_specification_st &info)
+   :DDL_with_name_and_options(name), Specification(info)
+  { }
+};
+
+
+class Drop: public DDL_with_name_and_options
+{
+  bool exec(THD *thd) const
+  {
+    return rm_db(thd, *this, if_exists(), false);
+  }
+  bool check_access(THD *thd) const
+  {
+    return ::check_access(thd, DROP_ACL, name(), NULL, NULL, 1, 0);
+  }
+public:
+  Drop(const Name &name) :DDL_with_name_and_options(name) { }
+  Drop(const LEX *lex) :DDL_with_name_and_options(lex) { }
+};
+
+
+class Alter: public DDL_with_name, public Specification
+{
+  bool exec(THD *thd) const;
+  bool check_access(THD *thd) const
+  {
+    return ::check_access(thd, ALTER_ACL, name(), NULL, NULL, 1, 0);
+  }
+public:
+  Alter(const LEX *lex) :DDL_with_name(lex), Specification(lex) { }
+};
+
+
+class Upgrade: public DDL_with_name
+{
+  bool exec(THD *thd) const;
+  bool check_access(THD *thd) const
+  {
+    return ::check_access(thd, ALTER_ACL, name(),  NULL, NULL, 1, 0) ||
+           ::check_access(thd, DROP_ACL,  name(),  NULL, NULL, 1, 0)  ||
+           ::check_access(thd, CREATE_ACL, name(), NULL, NULL, 1, 0);
+  }
+public:
+  Upgrade(const LEX *lex) :DDL_with_name(lex) { }
+  int exec_direct(THD *thd) const;
+};
+
+
+} // End of namespace SQL::Statement::Schema
+} // End of namespace SQL::Statement
+} // End of namespace SQL
+
 #define MY_DB_OPT_FILE "db.opt"
 
 #endif /* SQL_DB_INCLUDED */
diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc
index 92f5f52..54a1e70 100644
--- a/sql/sql_insert.cc
+++ b/sql/sql_insert.cc
@@ -3841,7 +3841,8 @@ void select_insert::abort_result_set() {
   @retval 0         Error
 */
 
-static TABLE *create_table_from_items(THD *thd, HA_CREATE_INFO *create_info,
+static TABLE *create_table_from_items(THD *thd,
+                                      Table_specification_st *create_info,
                                       TABLE_LIST *create_table,
                                       Alter_info *alter_info,
                                       List<Item> *items,
@@ -3924,7 +3925,7 @@ static TABLE *create_table_from_items(THD *thd, HA_CREATE_INFO *create_info,
 
   if (!mysql_create_table_no_lock(thd, create_table->db,
                                   create_table->table_name,
-                                  create_info, alter_info, NULL,
+                                  *create_info, alter_info, NULL,
                                   select_field_count))
   {
     DEBUG_SYNC(thd,"create_table_select_before_open");
@@ -4086,7 +4087,7 @@ static TABLE *create_table_from_items(THD *thd, HA_CREATE_INFO *create_info,
     row-based replication for the statement.  If we are creating a
     temporary table, we need to start a statement transaction.
   */
-  if (!thd->lex->create_info.tmp_table() &&
+  if (!thd->lex->tmp_table() &&
       thd->is_current_stmt_binlog_format_row() &&
       mysql_bin_log.is_open())
   {
@@ -4095,7 +4096,8 @@ static TABLE *create_table_from_items(THD *thd, HA_CREATE_INFO *create_info,
 
   DEBUG_SYNC(thd,"create_table_select_before_check_if_exists");
 
-  if (!(table= create_table_from_items(thd, create_info, create_table,
+  if (!(table= create_table_from_items(thd, create_info,
+                                       create_table,
                                        alter_info, &values,
                                        &extra_lock, hook_ptr)))
     /* abort() deletes table */
@@ -4178,8 +4180,8 @@ static TABLE *create_table_from_items(THD *thd, HA_CREATE_INFO *create_info,
   tmp_table_list.table = *tables;
   query.length(0);      // Have to zero it since constructor doesn't
 
-  result= show_create_table(thd, &tmp_table_list, &query, create_info,
-                            WITH_DB_NAME);
+  result= show_create_table(thd, &tmp_table_list, &query,
+                            create_info, WITH_DB_NAME);
   DBUG_ASSERT(result == 0); /* show_create_table() always return 0 */
 
   if (WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open())
diff --git a/sql/sql_lang.h b/sql/sql_lang.h
new file mode 100644
index 0000000..76a1340
--- /dev/null
+++ b/sql/sql_lang.h
@@ -0,0 +1,79 @@
+/* Copyright (c) 2014, MariaDB Foundation
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
+
+#ifndef SQL_LANG_INCLUDED
+#define SQL_LANG_INCLUDED
+
+namespace SQL {
+
+class Identifier: protected LEX_CSTRING
+{
+  void set(const LEX_STRING &other)
+  {
+    str= other.str;
+    length= other.length;
+    DBUG_ASSERT(str[length] == 0);
+  }
+public:
+  Identifier(const LEX_STRING &other)
+  {
+    set(other);
+  }
+  Identifier(const LEX *);
+  const char *name() const { return str; }
+  const size_t name_length() const { return length; }
+};
+
+
+class Identifier_normalized: public Identifier
+{
+  char m_buf[NAME_LEN + 1 + MYSQL50_TABLE_NAME_PREFIX_LENGTH + 10];
+  void normalize();
+public:
+  Identifier_normalized(const LEX_STRING &other)
+    :Identifier(other) { normalize(); }
+  Identifier_normalized(const LEX *lex)
+    :Identifier(lex) { normalize(); }
+};
+
+
+namespace Statement {
+
+
+/*
+  All SQL statements that can be executed directly.
+  Statement instances are created using the data in thd->lex,
+  previously populated by the SQL parser.
+*/
+class Directly_executable
+{
+public:
+  /*
+    Perform all stages of a direct SQL execution:
+    - Check for validity (e.g. name in "CREATE SCHEMA name" is valid).
+    - Check access
+    - Apply filters (e.g. Rpl_filters on slave)
+    - Perform WSREP_TO_ISOLATION
+    - Execute the actual action
+  */
+  virtual int exec_direct(THD *thd) const = 0;
+};
+
+} // End of namescpace Statement
+
+
+} // End of namespace SQL
+
+#endif /* SQL_LANG_INCLUDED */
diff --git a/sql/sql_lex.cc b/sql/sql_lex.cc
index 7eb38b1..45ba8be 100644
--- a/sql/sql_lex.cc
+++ b/sql/sql_lex.cc
@@ -122,6 +122,13 @@ inline int lex_casecmp(const char *s, const char *t, uint len)
   return (int) len+1;
 }
 
+
+SQL::Identifier::Identifier(const LEX *lex)
+{
+  set(lex->name);
+}
+
+
 #include <lex_hash.h>
 
 
@@ -512,6 +519,7 @@ void lex_start(THD *thd)
   lex->use_only_table_context= FALSE;
   lex->parse_vcol_expr= FALSE;
   lex->check_exists= FALSE;
+  lex->create_info.lex_start();
   lex->verbose= 0;
 
   lex->name= null_lex_str;
diff --git a/sql/sql_lex.h b/sql/sql_lex.h
index b18cff1..fc61ac1 100644
--- a/sql/sql_lex.h
+++ b/sql/sql_lex.h
@@ -21,6 +21,7 @@
 #ifndef SQL_LEX_INCLUDED
 #define SQL_LEX_INCLUDED
 
+#include "sql_lang.h"
 #include "violite.h"                            /* SSL_type */
 #include "sql_trigger.h"
 #include "item.h"               /* From item_subselect.h: subselect_union_engine */
@@ -2427,7 +2428,7 @@ struct LEX: public Query_tables_list
   Item_sum *in_sum_func;
   udf_func udf;
   HA_CHECK_OPT   check_opt;			// check/repair options
-  HA_CREATE_INFO create_info;
+  Table_specification_st create_info;
   KEY_CREATE_INFO key_create_info;
   LEX_MASTER_INFO mi;				// used by CHANGE MASTER
   LEX_SERVER_OPTIONS server_options;
@@ -2507,7 +2508,7 @@ struct LEX: public Query_tables_list
   uint16 create_view_algorithm;
   uint8 create_view_check;
   uint8 context_analysis_only;
-  bool drop_temporary, local_file;
+  bool local_file;
   bool check_exists;
   bool autocommit;
   bool verbose, no_write_to_binlog;
@@ -2775,6 +2776,40 @@ struct LEX: public Query_tables_list
 
   int print_explain(select_result_sink *output, uint8 explain_flags,
                     bool is_analyze, bool *printed_anything);
+
+  void set_command(enum_sql_command command,
+                   DDL_options_st options)
+  {
+    sql_command= command;
+    create_info.set(options);
+  }
+  void set_command(enum_sql_command command,
+                   uint scope,
+                   DDL_options_st options)
+  {
+    set_command(command, options);
+    create_info.options|= scope; // HA_LEX_CREATE_TMP_TABLE or 0
+  }
+  bool set_command_with_check(enum_sql_command command,
+                              uint scope,
+                              DDL_options_st options)
+  {
+    if (options.or_replace() && options.if_not_exists())
+    {
+      my_error(ER_WRONG_USAGE, MYF(0), "OR REPLACE", "IF NOT EXISTS");
+      return true;
+    }
+    set_command(command, scope, options);
+    return false;
+  }
+  /*
+    DROP shares lex->create_info to store TEMPORARY and IF EXISTS options
+    to save on extra initialization in lex_start().
+    Add some wrappers, to avoid direct use of lex->create_info in the
+    caller code processing DROP statements (which might look confusing).
+  */
+  bool tmp_table() const { return create_info.tmp_table(); }
+  bool if_exists() const { return create_info.if_exists(); }
 };
 
 
diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc
index b905d8b..c59332e 100644
--- a/sql/sql_parse.cc
+++ b/sql/sql_parse.cc
@@ -216,12 +216,12 @@ static bool stmt_causes_implicit_commit(THD *thd, uint mask)
 
   switch (lex->sql_command) {
   case SQLCOM_DROP_TABLE:
-    skip= (lex->drop_temporary ||
+    skip= (lex->tmp_table() ||
            (thd->variables.option_bits & OPTION_GTID_BEGIN));
     break;
   case SQLCOM_ALTER_TABLE:
     /* If ALTER TABLE of non-temporary table, do implicit commit */
-    skip= (lex->create_info.tmp_table());
+    skip= (lex->tmp_table());
     break;
   case SQLCOM_CREATE_TABLE:
     /*
@@ -230,7 +230,7 @@ static bool stmt_causes_implicit_commit(THD *thd, uint mask)
       This ensures that CREATE ... SELECT will in the same GTID group on the
       master and slave.
     */
-    skip= (lex->create_info.tmp_table() ||
+    skip= (lex->tmp_table() ||
            (thd->variables.option_bits & OPTION_GTID_BEGIN));
     break;
   case SQLCOM_SET_OPTION:
@@ -1171,12 +1171,10 @@ static my_bool deny_updates_if_read_only_option(THD *thd,
     DBUG_RETURN(FALSE);
 
   const my_bool create_temp_tables= 
-    (lex->sql_command == SQLCOM_CREATE_TABLE) &&
-    lex->create_info.tmp_table();
+    (lex->sql_command == SQLCOM_CREATE_TABLE) && lex->tmp_table();
 
   const my_bool drop_temp_tables= 
-    (lex->sql_command == SQLCOM_DROP_TABLE) &&
-    lex->drop_temporary;
+    (lex->sql_command == SQLCOM_DROP_TABLE) && lex->tmp_table();
 
   const my_bool update_real_tables=
     some_non_temp_table_to_be_updated(thd, all_tables) &&
@@ -2422,8 +2420,6 @@ static bool do_execute_sp(THD *thd, sp_head *sp)
 #ifdef HAVE_REPLICATION
   /* have table map for update for multi-update statement (BUG#37051) */
   bool have_table_map_for_update= FALSE;
-  /* */
-  Rpl_filter *rpl_filter;
 #endif
   DBUG_ENTER("mysql_execute_command");
 
@@ -2559,7 +2555,7 @@ static bool do_execute_sp(THD *thd, sp_head *sp)
     if (!(lex->sql_command == SQLCOM_UPDATE_MULTI) &&
 	!(lex->sql_command == SQLCOM_SET_OPTION) &&
 	!(lex->sql_command == SQLCOM_DROP_TABLE &&
-          lex->drop_temporary && lex->check_exists) &&
+          lex->tmp_table() && lex->if_exists()) &&
         all_tables_not_ok(thd, all_tables))
     {
       /* we warn the slave SQL thread */
@@ -3048,7 +3044,7 @@ static bool do_execute_sp(THD *thd, sp_head *sp)
       safe. A shallow copy is enough as this code won't modify any memory
       referenced from this structure.
     */
-    HA_CREATE_INFO create_info(lex->create_info);
+    Table_specification_st create_info(lex->create_info);
     /*
       We need to copy alter_info for the same reasons of re-execution
       safety, only in case of Alter_info we have to do (almost) a deep
@@ -3109,11 +3105,13 @@ static bool do_execute_sp(THD *thd, sp_head *sp)
       CREATE TABLE OR EXISTS failures by dropping the table and
       retrying the create.
     */
-    create_info.org_options= create_info.options;
     if (thd->slave_thread &&
         slave_ddl_exec_mode_options == SLAVE_EXEC_MODE_IDEMPOTENT &&
-        !(lex->create_info.options & HA_LEX_CREATE_IF_NOT_EXISTS))
-      create_info.options|= HA_LEX_CREATE_REPLACE;
+        !lex->create_info.if_not_exists())
+    {
+      create_info.add(DDL_options_st::OPT_OR_REPLACE);
+      create_info.add(DDL_options_st::OPT_OR_REPLACE_SLAVE_GENERATED);
+    }
 
 #ifdef WITH_PARTITION_STORAGE_ENGINE
     {
@@ -3199,7 +3197,7 @@ static bool do_execute_sp(THD *thd, sp_head *sp)
       /* Copy temporarily the statement flags to thd for lock_table_names() */
       uint save_thd_create_info_options= thd->lex->create_info.options;
       thd->lex->create_info.options|= create_info.options;
-      res= open_and_lock_tables(thd, lex->query_tables, TRUE, 0);
+      res= open_and_lock_tables(thd, create_info, lex->query_tables, TRUE, 0);
       thd->lex->create_info.options= save_thd_create_info_options;
       if (res)
       {
@@ -3210,8 +3208,7 @@ static bool do_execute_sp(THD *thd, sp_head *sp)
       }
 
       /* Ensure we don't try to create something from which we select from */
-      if ((create_info.options & HA_LEX_CREATE_REPLACE) &&
-          !create_info.tmp_table())
+      if (create_info.or_replace() && !create_info.tmp_table())
       {
         TABLE_LIST *duplicate;
         if ((duplicate= unique_table(thd, lex->query_tables,
@@ -3239,8 +3236,7 @@ static bool do_execute_sp(THD *thd, sp_head *sp)
           select_create is currently not re-execution friendly and
           needs to be created for every execution of a PS/SP.
         */
-        if ((result= new select_create(create_table,
-                                       &create_info,
+        if ((result= new select_create(create_table, &create_info,
                                        &alter_info,
                                        select_lex->item_list,
                                        lex->duplicates,
@@ -3264,11 +3260,11 @@ static bool do_execute_sp(THD *thd, sp_head *sp)
     else
     {
       /* regular create */
-      if (create_info.options & HA_LEX_CREATE_TABLE_LIKE)
+      if (create_info.like())
       {
         /* CREATE TABLE ... LIKE ... */
         res= mysql_create_like_table(thd, create_table, select_tables,
-                                     &create_info);
+                                     create_info);
       }
       else
       {
@@ -3276,13 +3272,12 @@ static bool do_execute_sp(THD *thd, sp_head *sp)
            tables, like mysql replication does
         */
         if (WSREP(thd) && (!thd->is_current_stmt_binlog_format_row() ||
-            !(create_info.options & HA_LEX_CREATE_TMP_TABLE)))
+            !create_info.tmp_table()))
         {
 	  WSREP_TO_ISOLATION_BEGIN(create_table->db, create_table->table_name, NULL)
         }
         /* Regular CREATE TABLE */
-        res= mysql_create_table(thd, create_table,
-                                &create_info, &alter_info);
+        res= mysql_create_table(thd, create_table, create_info, &alter_info);
       }
       if (!res)
       {
@@ -3987,7 +3982,7 @@ static bool do_execute_sp(THD *thd, sp_head *sp)
   case SQLCOM_DROP_TABLE:
   {
     DBUG_ASSERT(first_table == all_tables && first_table != 0);
-    if (!lex->drop_temporary)
+    if (!lex->tmp_table())
     {
       if (check_table_access(thd, DROP_ACL, all_tables, FALSE, UINT_MAX, FALSE))
 	goto error;				/* purecov: inspected */
@@ -4001,7 +3996,7 @@ static bool do_execute_sp(THD *thd, sp_head *sp)
    {
      for (TABLE_LIST *table= all_tables; table; table= table->next_global)
      {
-       if (!lex->drop_temporary                       &&
+       if (!lex->tmp_table() &&
           (!thd->is_current_stmt_binlog_format_row() ||
 	   !find_temporary_table(thd, table)))
        {
@@ -4018,11 +4013,11 @@ static bool do_execute_sp(THD *thd, sp_head *sp)
     */
     if (thd->slave_thread && !thd->slave_expected_error &&
         slave_ddl_exec_mode_options == SLAVE_EXEC_MODE_IDEMPOTENT)
-      lex->check_exists= 1;
+      lex->create_info.set(DDL_options_st::OPT_IF_EXISTS);
+    
 
     /* DDL and binlog write order are protected by metadata locks. */
-    res= mysql_rm_table(thd, first_table, lex->check_exists,
-			lex->drop_temporary);
+    res= mysql_rm_table(thd, first_table, lex->if_exists(), lex->tmp_table());
     break;
   }
   case SQLCOM_SHOW_PROCESSLIST:
@@ -4181,143 +4176,21 @@ static bool do_execute_sp(THD *thd, sp_head *sp)
     }
     break;
   case SQLCOM_CREATE_DB:
-  {
-    /*
-      As mysql_create_db() may modify HA_CREATE_INFO structure passed to
-      it, we need to use a copy of LEX::create_info to make execution
-      prepared statement- safe.
-    */
-    HA_CREATE_INFO create_info(lex->create_info);
-    if (check_db_name(&lex->name))
-    {
-      my_error(ER_WRONG_DB_NAME, MYF(0), lex->name.str);
-      break;
-    }
-    /*
-      If in a slave thread :
-      CREATE DATABASE DB was certainly not preceded by USE DB.
-      For that reason, db_ok() in sql/slave.cc did not check the
-      do_db/ignore_db. And as this query involves no tables, tables_ok()
-      above was not called. So we have to check rules again here.
-    */
-#ifdef HAVE_REPLICATION
-    if (thd->slave_thread)
-    {
-      rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter;
-      if (!rpl_filter->db_ok(lex->name.str) ||
-          !rpl_filter->db_ok_with_wild_table(lex->name.str))
-      {
-        my_message(ER_SLAVE_IGNORED_TABLE, ER(ER_SLAVE_IGNORED_TABLE), MYF(0));
-        break;
-      }
-    }
-#endif
-    if (check_access(thd, CREATE_ACL, lex->name.str, NULL, NULL, 1, 0))
-      break;
-    WSREP_TO_ISOLATION_BEGIN(lex->name.str, NULL, NULL)
-    res= mysql_create_db(thd, lex->name.str, &create_info, 0);
+    if ((res= SQL::Statement::Schema::Create(lex).exec_direct(thd)) < 0)
+      goto error;
     break;
-  }
   case SQLCOM_DROP_DB:
-  {
-    if (check_db_name(&lex->name))
-    {
-      my_error(ER_WRONG_DB_NAME, MYF(0), lex->name.str);
-      break;
-    }
-    /*
-      If in a slave thread :
-      DROP DATABASE DB may not be preceded by USE DB.
-      For that reason, maybe db_ok() in sql/slave.cc did not check the 
-      do_db/ignore_db. And as this query involves no tables, tables_ok()
-      above was not called. So we have to check rules again here.
-    */
-#ifdef HAVE_REPLICATION
-    if (thd->slave_thread)
-    {
-      rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter;
-      if (!rpl_filter->db_ok(lex->name.str) ||
-          !rpl_filter->db_ok_with_wild_table(lex->name.str))
-      {
-        my_message(ER_SLAVE_IGNORED_TABLE, ER(ER_SLAVE_IGNORED_TABLE), MYF(0));
-        break;
-      }
-    }
-#endif
-    if (check_access(thd, DROP_ACL, lex->name.str, NULL, NULL, 1, 0))
-      break;
-    WSREP_TO_ISOLATION_BEGIN(lex->name.str, NULL, NULL)
-    res= mysql_rm_db(thd, lex->name.str, lex->check_exists, 0);
+    if ((res= SQL::Statement::Schema::Drop(lex).exec_direct(thd)) < 0)
+      goto error;
     break;
-  }
   case SQLCOM_ALTER_DB_UPGRADE:
-  {
-    LEX_STRING *db= & lex->name;
-#ifdef HAVE_REPLICATION
-    if (thd->slave_thread)
-    {
-      rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter;
-      if (!rpl_filter->db_ok(db->str) ||
-          !rpl_filter->db_ok_with_wild_table(db->str))
-      {
-        res= 1;
-        my_message(ER_SLAVE_IGNORED_TABLE, ER(ER_SLAVE_IGNORED_TABLE), MYF(0));
-        break;
-      }
-    }
-#endif
-    if (check_db_name(db))
-    {
-      my_error(ER_WRONG_DB_NAME, MYF(0), db->str);
-      break;
-    }
-    if (check_access(thd, ALTER_ACL, db->str, NULL, NULL, 1, 0) ||
-        check_access(thd, DROP_ACL, db->str, NULL, NULL, 1, 0) ||
-        check_access(thd, CREATE_ACL, db->str, NULL, NULL, 1, 0))
-    {
-      res= 1;
-      break;
-    }
-    WSREP_TO_ISOLATION_BEGIN(db->str, NULL, NULL)
-    res= mysql_upgrade_db(thd, db);
-    if (!res)
-      my_ok(thd);
+    if ((res= SQL::Statement::Schema::Upgrade(lex).exec_direct(thd)) < 0)
+      goto error;
     break;
-  }
   case SQLCOM_ALTER_DB:
-  {
-    LEX_STRING *db= &lex->name;
-    HA_CREATE_INFO create_info(lex->create_info);
-    if (check_db_name(db))
-    {
-      my_error(ER_WRONG_DB_NAME, MYF(0), db->str);
-      break;
-    }
-    /*
-      If in a slave thread :
-      ALTER DATABASE DB may not be preceded by USE DB.
-      For that reason, maybe db_ok() in sql/slave.cc did not check the
-      do_db/ignore_db. And as this query involves no tables, tables_ok()
-      above was not called. So we have to check rules again here.
-    */
-#ifdef HAVE_REPLICATION
-    if (thd->slave_thread)
-    {
-      rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter;
-      if (!rpl_filter->db_ok(db->str) ||
-          !rpl_filter->db_ok_with_wild_table(db->str))
-      {
-        my_message(ER_SLAVE_IGNORED_TABLE, ER(ER_SLAVE_IGNORED_TABLE), MYF(0));
-        break;
-      }
-    }
-#endif
-    if (check_access(thd, ALTER_ACL, db->str, NULL, NULL, 1, 0))
-      break;
-    WSREP_TO_ISOLATION_BEGIN(db->str, NULL, NULL)
-    res= mysql_alter_db(thd, db->str, &create_info);
+    if ((res= SQL::Statement::Schema::Alter(lex).exec_direct(thd)) < 0)
+      goto error;
     break;
-  }
   case SQLCOM_SHOW_CREATE_DB:
   {
     char db_name_buff[NAME_LEN+1];
@@ -4333,7 +4206,7 @@ static bool do_execute_sp(THD *thd, sp_head *sp)
       my_error(ER_WRONG_DB_NAME, MYF(0), db_name.str);
       break;
     }
-    res= mysqld_show_create_db(thd, &db_name, &lex->name, &lex->create_info);
+    res= mysqld_show_create_db(thd, &db_name, &lex->name, lex->create_info);
     break;
   }
   case SQLCOM_CREATE_EVENT:
@@ -4357,8 +4230,7 @@ static bool do_execute_sp(THD *thd, sp_head *sp)
     switch (lex->sql_command) {
     case SQLCOM_CREATE_EVENT:
     {
-      bool if_not_exists= (lex->create_info.options &
-                           HA_LEX_CREATE_IF_NOT_EXISTS);
+      bool if_not_exists= lex->create_info.if_not_exists();
       res= Events::create_event(thd, lex->event_parse_data, if_not_exists);
       break;
     }
@@ -4391,7 +4263,7 @@ static bool do_execute_sp(THD *thd, sp_head *sp)
     WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL, NULL)
     if (!(res= Events::drop_event(thd,
                                   lex->spname->m_db, lex->spname->m_name,
-                                  lex->check_exists)))
+                                  lex->if_exists())))
       my_ok(thd);
     break;
 #else
@@ -5127,7 +4999,7 @@ static bool do_execute_sp(THD *thd, sp_head *sp)
 
         if (lex->spname->m_db.str == NULL)
         {
-          if (lex->check_exists)
+          if (lex->if_exists())
           {
             push_warning_printf(thd, Sql_condition::WARN_LEVEL_NOTE,
                                 ER_SP_DOES_NOT_EXIST, ER(ER_SP_DOES_NOT_EXIST),
@@ -5197,7 +5069,7 @@ static bool do_execute_sp(THD *thd, sp_head *sp)
 	my_ok(thd);
 	break;
       case SP_KEY_NOT_FOUND:
-	if (lex->check_exists)
+	if (lex->if_exists())
 	{
           res= write_bin_log(thd, TRUE, thd->query(), thd->query_length());
 	  push_warning_printf(thd, Sql_condition::WARN_LEVEL_NOTE,
@@ -5430,7 +5302,7 @@ static bool do_execute_sp(THD *thd, sp_head *sp)
 
     if ((err_code= drop_server(thd, &lex->server_options)))
     {
-      if (! lex->check_exists && err_code == ER_FOREIGN_SERVER_DOESNT_EXIST)
+      if (! lex->if_exists() && err_code == ER_FOREIGN_SERVER_DOESNT_EXIST)
       {
         DBUG_PRINT("info", ("problem dropping server %s",
                             lex->server_options.server_name));
@@ -7039,7 +6911,8 @@ bool add_field_to_list(THD *thd, LEX_STRING *field_name, enum_field_types type,
                        List<String> *interval_list, CHARSET_INFO *cs,
 		       uint uint_geom_type,
 		       Virtual_column_info *vcol_info,
-                       engine_option_value *create_options)
+                       engine_option_value *create_options,
+                       bool check_exists)
 {
   register Create_field *new_field;
   LEX  *lex= thd->lex;
@@ -7058,7 +6931,7 @@ bool add_field_to_list(THD *thd, LEX_STRING *field_name, enum_field_types type,
     lex->col_list.push_back(new Key_part_spec(*field_name, 0));
     key= new Key(Key::PRIMARY, null_lex_str,
                       &default_key_create_info,
-                      0, lex->col_list, NULL, lex->check_exists);
+                      0, lex->col_list, NULL, check_exists);
     lex->alter_info.key_list.push_back(key);
     lex->col_list.empty();
   }
@@ -7068,7 +6941,7 @@ bool add_field_to_list(THD *thd, LEX_STRING *field_name, enum_field_types type,
     lex->col_list.push_back(new Key_part_spec(*field_name, 0));
     key= new Key(Key::UNIQUE, null_lex_str,
                  &default_key_create_info, 0,
-                 lex->col_list, NULL, lex->check_exists);
+                 lex->col_list, NULL, check_exists);
     lex->alter_info.key_list.push_back(key);
     lex->col_list.empty();
   }
@@ -7120,7 +6993,7 @@ bool add_field_to_list(THD *thd, LEX_STRING *field_name, enum_field_types type,
       new_field->init(thd, field_name->str, type, length, decimals, type_modifier,
                       default_value, on_update_value, comment, change,
                       interval_list, cs, uint_geom_type, vcol_info,
-                      create_options, lex->check_exists))
+                      create_options, check_exists))
     DBUG_RETURN(1);
 
   lex->alter_info.create_list.push_back(new_field);
@@ -8476,7 +8349,7 @@ void create_table_set_open_action_and_adjust_tables(LEX *lex)
 {
   TABLE_LIST *create_table= lex->query_tables;
 
-  if (lex->create_info.tmp_table())
+  if (lex->tmp_table())
     create_table->open_type= OT_TEMPORARY_ONLY;
   else
     create_table->open_type= OT_BASE_ONLY;
@@ -8522,12 +8395,11 @@ bool create_table_precheck(THD *thd, TABLE_LIST *tables,
     CREATE TABLE ... SELECT, also require INSERT.
   */
 
-  want_priv= lex->create_info.tmp_table() ?  CREATE_TMP_ACL :
+  want_priv= lex->tmp_table() ?  CREATE_TMP_ACL :
              (CREATE_ACL | (select_lex->item_list.elements ? INSERT_ACL : 0));
 
   /* CREATE OR REPLACE on not temporary tables require DROP_ACL */
-  if ((lex->create_info.options & HA_LEX_CREATE_REPLACE) &&
-      !lex->create_info.tmp_table())
+  if (lex->create_info.or_replace() && !lex->tmp_table())
     want_priv|= DROP_ACL;
                           
   if (check_access(thd, want_priv, create_table->db,
@@ -8591,7 +8463,7 @@ bool create_table_precheck(THD *thd, TABLE_LIST *tables,
                                      UINT_MAX, FALSE))
       goto err;
   }
-  else if (lex->create_info.options & HA_LEX_CREATE_TABLE_LIKE)
+  else if (lex->create_info.like())
   {
     if (check_table_access(thd, SELECT_ACL, tables, FALSE, UINT_MAX, FALSE))
       goto err;
diff --git a/sql/sql_parse.h b/sql/sql_parse.h
index da024a5..a419841 100644
--- a/sql/sql_parse.h
+++ b/sql/sql_parse.h
@@ -113,7 +113,8 @@ bool add_field_to_list(THD *thd, LEX_STRING *field_name, enum enum_field_types t
 		       CHARSET_INFO *cs,
 		       uint uint_geom_type,
                        Virtual_column_info *vcol_info,
-                       engine_option_value *create_options);
+                       engine_option_value *create_options,
+                       bool check_exists);
 bool add_to_list(THD *thd, SQL_I_List<ORDER> &list, Item *group, bool asc);
 void add_join_on(TABLE_LIST *b,Item *expr);
 void add_join_natural(TABLE_LIST *a,TABLE_LIST *b,List<String> *using_fields,
diff --git a/sql/sql_show.cc b/sql/sql_show.cc
index 0541cbc..2536255 100644
--- a/sql/sql_show.cc
+++ b/sql/sql_show.cc
@@ -1187,7 +1187,7 @@ class Show_create_error_handler : public Internal_error_handler {
 
 bool mysqld_show_create_db(THD *thd, LEX_STRING *dbname,
                            LEX_STRING *orig_dbname,
-                           HA_CREATE_INFO *create_info)
+                           const DDL_options_st &options)
 {
   char buff[2048];
   String buffer(buff, sizeof(buff), system_charset_info);
@@ -1196,7 +1196,6 @@ bool mysqld_show_create_db(THD *thd, LEX_STRING *dbname,
   uint db_access;
 #endif
   HA_CREATE_INFO create;
-  uint create_options = create_info ? create_info->options : 0;
   Protocol *protocol=thd->protocol;
   DBUG_ENTER("mysql_show_create_db");
 
@@ -1243,7 +1242,7 @@ bool mysqld_show_create_db(THD *thd, LEX_STRING *dbname,
   protocol->store(orig_dbname->str, orig_dbname->length, system_charset_info);
   buffer.length(0);
   buffer.append(STRING_WITH_LEN("CREATE DATABASE "));
-  if (create_options & HA_LEX_CREATE_IF_NOT_EXISTS)
+  if (options.if_not_exists())
     buffer.append(STRING_WITH_LEN("/*!32312 IF NOT EXISTS*/ "));
   append_identifier(thd, &buffer, dbname->str, dbname->length);
 
@@ -1654,7 +1653,7 @@ static void append_create_options(THD *thd, String *packet,
  */
 
 int show_create_table(THD *thd, TABLE_LIST *table_list, String *packet,
-                      HA_CREATE_INFO *create_info_arg,
+                      Table_specification_st *create_info_arg,
                       enum_with_db_name with_db_name)
 {
   List<Item> field_list;
@@ -1696,14 +1695,14 @@ int show_create_table(THD *thd, TABLE_LIST *table_list, String *packet,
 
   packet->append(STRING_WITH_LEN("CREATE "));
   if (create_info_arg &&
-      (create_info_arg->org_options & HA_LEX_CREATE_REPLACE ||
+      ((create_info_arg->or_replace() &&
+        !create_info_arg->or_replace_slave_generated()) ||
        create_info_arg->table_was_deleted))
     packet->append(STRING_WITH_LEN("OR REPLACE "));
   if (share->tmp_table)
     packet->append(STRING_WITH_LEN("TEMPORARY "));
   packet->append(STRING_WITH_LEN("TABLE "));
-  if (create_info_arg &&
-      (create_info_arg->options & HA_LEX_CREATE_IF_NOT_EXISTS))
+  if (create_info_arg && create_info_arg->if_not_exists())
     packet->append(STRING_WITH_LEN("IF NOT EXISTS "));
   if (table_list->schema_table)
     alias= table_list->schema_table->table_name;
diff --git a/sql/sql_show.h b/sql/sql_show.h
index bad2b41..4bdfe95 100644
--- a/sql/sql_show.h
+++ b/sql/sql_show.h
@@ -76,7 +76,7 @@
 
 typedef enum { WITHOUT_DB_NAME, WITH_DB_NAME } enum_with_db_name;
 int show_create_table(THD *thd, TABLE_LIST *table_list, String *packet,
-                      HA_CREATE_INFO  *create_info_arg,
+                      Table_specification_st *create_info_arg,
                       enum_with_db_name with_db_name);
 
 int copy_event_to_schema_table(THD *thd, TABLE *sch_table, TABLE *event_table);
@@ -88,7 +88,7 @@ bool append_identifier(THD *thd, String *packet, const char *name,
 bool mysqld_show_create(THD *thd, TABLE_LIST *table_list);
 bool mysqld_show_create_db(THD *thd, LEX_STRING *db_name,
                            LEX_STRING *orig_db_name,
-                           HA_CREATE_INFO *create);
+                           const DDL_options_st &options);
 
 void mysqld_list_processes(THD *thd,const char *user,bool verbose);
 int mysqld_show_status(THD *thd);
diff --git a/sql/sql_table.cc b/sql/sql_table.cc
index b991215..7bb293b 100644
--- a/sql/sql_table.cc
+++ b/sql/sql_table.cc
@@ -4628,7 +4628,8 @@ int create_table_impl(THD *thd,
                        const char *orig_db, const char *orig_table_name,
                        const char *db, const char *table_name,
                        const char *path,
-                       HA_CREATE_INFO *create_info,
+                       const DDL_options_st options,
+                       HA_CREATE_INFO &create_info,
                        Alter_info *alter_info,
                        int create_table_mode,
                        bool *is_trans,
@@ -4647,32 +4648,32 @@ int create_table_impl(THD *thd,
 
   if (thd->variables.sql_mode & MODE_NO_DIR_IN_CREATE)
   {
-    if (create_info->data_file_name)
+    if (create_info.data_file_name)
       push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
                           WARN_OPTION_IGNORED, ER(WARN_OPTION_IGNORED),
                           "DATA DIRECTORY");
-    if (create_info->index_file_name)
+    if (create_info.index_file_name)
       push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
                           WARN_OPTION_IGNORED, ER(WARN_OPTION_IGNORED),
                           "INDEX DIRECTORY");
-    create_info->data_file_name= create_info->index_file_name= 0;
+    create_info.data_file_name= create_info.index_file_name= 0;
   }
   else
-  if (error_if_data_home_dir(create_info->data_file_name,  "DATA DIRECTORY") ||
-      error_if_data_home_dir(create_info->index_file_name, "INDEX DIRECTORY")||
+  if (error_if_data_home_dir(create_info.data_file_name,  "DATA DIRECTORY") ||
+      error_if_data_home_dir(create_info.index_file_name, "INDEX DIRECTORY")||
       check_partition_dirs(thd->lex->part_info))
     goto err;
 
-  alias= table_case_name(create_info, table_name);
+  alias= table_case_name(&create_info, table_name);
 
   /* Check if table exists */
-  if (create_info->tmp_table())
+  if (create_info.tmp_table())
   {
     TABLE *tmp_table;
     if ((tmp_table= find_temporary_table(thd, db, table_name)))
     {
       bool table_creation_was_logged= tmp_table->s->table_creation_was_logged;
-      if (create_info->options & HA_LEX_CREATE_REPLACE)
+      if (options.or_replace())
       {
         bool is_trans;
         /*
@@ -4682,7 +4683,7 @@ int create_table_impl(THD *thd,
         if (drop_temporary_table(thd, tmp_table, &is_trans))
           goto err;
       }
-      else if (create_info->options & HA_LEX_CREATE_IF_NOT_EXISTS)
+      else if (options.if_not_exists())
         goto warn;
       else
       {
@@ -4700,7 +4701,7 @@ int create_table_impl(THD *thd,
       {
         thd->variables.option_bits|= OPTION_KEEP_LOG;
         thd->log_current_statement= 1;
-        create_info->table_was_deleted= 1;
+        create_info.table_was_deleted= 1;
       }
     }
   }
@@ -4708,13 +4709,13 @@ int create_table_impl(THD *thd,
   {
     if (!internal_tmp_table && ha_table_exists(thd, db, table_name))
     {
-      if (create_info->options & HA_LEX_CREATE_REPLACE)
+      if (options.or_replace())
       {
         TABLE_LIST table_list;
         table_list.init_one_table(db, strlen(db), table_name,
                                   strlen(table_name), table_name,
                                   TL_WRITE_ALLOW_WRITE);
-        table_list.table= create_info->table;
+        table_list.table= create_info.table;
 
         if (check_if_log_table(&table_list, TRUE, "CREATE OR REPLACE"))
           goto err;
@@ -4734,7 +4735,7 @@ int create_table_impl(THD *thd,
         */
         thd->variables.option_bits|= OPTION_KEEP_LOG;
         thd->log_current_statement= 1;
-        create_info->table_was_deleted= 1;
+        create_info.table_was_deleted= 1;
         DBUG_EXECUTE_IF("send_kill_after_delete", thd->killed= KILL_QUERY; );
 
         /*
@@ -4744,7 +4745,7 @@ int create_table_impl(THD *thd,
             restart_trans_for_tables(thd, thd->lex->query_tables))
           goto err;
       }
-      else if (create_info->options & HA_LEX_CREATE_IF_NOT_EXISTS)
+      else if (options.if_not_exists())
         goto warn;
       else
       {
@@ -4756,7 +4757,7 @@ int create_table_impl(THD *thd,
 
   THD_STAGE_INFO(thd, stage_creating_table);
 
-  if (check_engine(thd, orig_db, orig_table_name, create_info))
+  if (check_engine(thd, orig_db, orig_table_name, &create_info))
     goto err;
 
   if (create_table_mode == C_ASSISTED_DISCOVERY)
@@ -4766,7 +4767,7 @@ int create_table_impl(THD *thd,
     DBUG_ASSERT(alter_info->key_list.elements == 0);
 
     TABLE_SHARE share;
-    handlerton *hton= create_info->db_type;
+    handlerton *hton= create_info.db_type;
     int ha_err;
     Field *no_fields= 0;
 
@@ -4781,13 +4782,13 @@ int create_table_impl(THD *thd,
     /* prepare everything for discovery */
     share.field= &no_fields;
     share.db_plugin= ha_lock_engine(thd, hton);
-    share.option_list= create_info->option_list;
-    share.connect_string= create_info->connect_string;
+    share.option_list= create_info.option_list;
+    share.connect_string= create_info.connect_string;
 
     if (parse_engine_table_options(thd, hton, &share))
       goto err;
 
-    ha_err= hton->discover_table_structure(hton, thd, &share, create_info);
+    ha_err= hton->discover_table_structure(hton, thd, &share, &create_info);
 
     /*
       if discovery failed, the plugin will be auto-unlocked, as it
@@ -4808,30 +4809,30 @@ int create_table_impl(THD *thd,
   }
   else
   {
-    file= mysql_create_frm_image(thd, orig_db, orig_table_name, create_info,
+    file= mysql_create_frm_image(thd, orig_db, orig_table_name, &create_info,
                                  alter_info, create_table_mode, key_info,
                                  key_count, frm);
     if (!file)
       goto err;
-    if (rea_create_table(thd, frm, path, db, table_name, create_info,
+    if (rea_create_table(thd, frm, path, db, table_name, &create_info,
                          file, frm_only))
       goto err;
   }
 
-  create_info->table= 0;
-  if (!frm_only && create_info->tmp_table())
+  create_info.table= 0;
+  if (!frm_only && create_info.tmp_table())
   {
     /*
       Open a table (skipping table cache) and add it into
       THD::temporary_tables list.
     */
 
-    TABLE *table= open_table_uncached(thd, create_info->db_type, frm, path,
+    TABLE *table= open_table_uncached(thd, create_info.db_type, frm, path,
                                       db, table_name, true, true);
 
     if (!table)
     {
-      (void) rm_temporary_table(create_info->db_type, path);
+      (void) rm_temporary_table(create_info.db_type, path);
       goto err;
     }
 
@@ -4839,7 +4840,7 @@ int create_table_impl(THD *thd,
       *is_trans= table->file->has_transactions();
 
     thd->thread_specific_used= TRUE;
-    create_info->table= table;                  // Store pointer to table
+    create_info.table= table;                  // Store pointer to table
   }
 #ifdef WITH_PARTITION_STORAGE_ENGINE
   else if (thd->work_part_info && frm_only)
@@ -4900,7 +4901,7 @@ int create_table_impl(THD *thd,
 
 int mysql_create_table_no_lock(THD *thd,
                                 const char *db, const char *table_name,
-                                HA_CREATE_INFO *create_info,
+                                Table_specification_st &create_info,
                                 Alter_info *alter_info, bool *is_trans,
                                 int create_table_mode)
 {
@@ -4910,12 +4911,12 @@ int mysql_create_table_no_lock(THD *thd,
   char path[FN_REFLEN + 1];
   LEX_CUSTRING frm= {0,0};
 
-  if (create_info->tmp_table())
+  if (create_info.tmp_table())
     build_tmptable_filename(thd, path, sizeof(path));
   else
   {
     int length;
-    const char *alias= table_case_name(create_info, table_name);
+    const char *alias= table_case_name(&create_info, table_name);
     length= build_table_filename(path, sizeof(path) - 1, db, alias,
                                  "", 0);
     // Check if we hit FN_REFLEN bytes along with file extension.
@@ -4927,7 +4928,8 @@ int mysql_create_table_no_lock(THD *thd,
   }
 
   res= create_table_impl(thd, db, table_name, db, table_name, path,
-                         create_info, alter_info, create_table_mode,
+                         create_info, create_info,
+                         alter_info, create_table_mode,
                          is_trans, &not_used_1, &not_used_2, &frm);
   my_free(const_cast<uchar*>(frm.str));
   return res;
@@ -4944,7 +4946,7 @@ int mysql_create_table_no_lock(THD *thd,
 */
 
 bool mysql_create_table(THD *thd, TABLE_LIST *create_table,
-                        HA_CREATE_INFO *create_info,
+                        Table_specification_st &create_info,
                         Alter_info *alter_info)
 {
   const char *db= create_table->db;
@@ -4960,10 +4962,10 @@ bool mysql_create_table(THD *thd, TABLE_LIST *create_table,
 
   /* Copy temporarily the statement flags to thd for lock_table_names() */
   uint save_thd_create_info_options= thd->lex->create_info.options;
-  thd->lex->create_info.options|= create_info->options;
+  thd->lex->create_info.options|= create_info.options;
 
   /* Open or obtain an exclusive metadata lock on table being created  */
-  result= open_and_lock_tables(thd, create_table, FALSE, 0);
+  result= open_and_lock_tables(thd, create_info, create_table, FALSE, 0);
 
   thd->lex->create_info.options= save_thd_create_info_options;
 
@@ -4973,9 +4975,9 @@ bool mysql_create_table(THD *thd, TABLE_LIST *create_table,
     DBUG_RETURN(thd->is_error());
   }
   /* The following is needed only in case of lock tables */
-  if ((create_info->table= create_table->table))
+  if ((create_info.table= create_table->table))
   {
-    pos_in_locked_tables= create_info->table->pos_in_locked_tables;
+    pos_in_locked_tables= create_info.table->pos_in_locked_tables;
     mdl_ticket= create_table->table->mdl_ticket;
   }
   
@@ -5000,7 +5002,7 @@ bool mysql_create_table(THD *thd, TABLE_LIST *create_table,
     on a non temporary table
   */
   if (thd->locked_tables_mode && pos_in_locked_tables &&
-      (create_info->options & HA_LEX_CREATE_REPLACE))
+      create_info.or_replace())
   {
     /*
       Add back the deleted table and re-created table as a locked table
@@ -5021,13 +5023,13 @@ bool mysql_create_table(THD *thd, TABLE_LIST *create_table,
 
 err:
   /* In RBR we don't need to log CREATE TEMPORARY TABLE */
-  if (thd->is_current_stmt_binlog_format_row() && create_info->tmp_table())
+  if (thd->is_current_stmt_binlog_format_row() && create_info.tmp_table())
     DBUG_RETURN(result);
 
   /* Write log if no error or if we already deleted a table */
   if (!result || thd->log_current_statement)
   {
-    if (result && create_info->table_was_deleted)
+    if (result && create_info.table_was_deleted)
     {
       /*
         Possible locked table was dropped. We should remove meta data locks
@@ -5035,13 +5037,13 @@ bool mysql_create_table(THD *thd, TABLE_LIST *create_table,
       */
       thd->locked_tables_list.unlock_locked_table(thd, mdl_ticket);
     }
-    else if (!result && create_info->tmp_table() && create_info->table)
+    else if (!result && create_info.tmp_table() && create_info.table)
     {
       /*
         Remember that tmp table creation was logged so that we know if
         we should log a delete of it.
       */
-      create_info->table->s->table_creation_was_logged= 1;
+      create_info.table->s->table_creation_was_logged= 1;
     }
     if (write_bin_log(thd, result ? FALSE : TRUE, thd->query(),
                       thd->query_length(), is_trans))
@@ -5241,9 +5243,9 @@ bool mysql_create_table(THD *thd, TABLE_LIST *create_table,
 
 bool mysql_create_like_table(THD* thd, TABLE_LIST* table,
                              TABLE_LIST* src_table,
-                             HA_CREATE_INFO *create_info)
+                             Table_specification_st &create_info)
 {
-  HA_CREATE_INFO local_create_info;
+  Table_specification_st local_create_info;
   TABLE_LIST *pos_in_locked_tables= 0;
   Alter_info local_alter_info;
   Alter_table_ctx local_alter_ctx; // Not used
@@ -5256,7 +5258,7 @@ bool mysql_create_like_table(THD* thd, TABLE_LIST* table,
 
 #ifdef WITH_WSREP
   if (WSREP_ON && !thd->wsrep_applier &&
-      wsrep_create_like_table(thd, table, src_table, create_info))
+      wsrep_create_like_table(thd, table, src_table, &create_info))
     DBUG_RETURN(res);
 #endif
 
@@ -5273,9 +5275,10 @@ bool mysql_create_like_table(THD* thd, TABLE_LIST* table,
   */
 
   /* Copy temporarily the statement flags to thd for lock_table_names() */
+  // QQ: is this really needed???
   uint save_thd_create_info_options= thd->lex->create_info.options;
-  thd->lex->create_info.options|= create_info->options;
-  res= open_tables(thd, &thd->lex->query_tables, &not_used, 0);
+  thd->lex->create_info.options|= create_info.options;
+  res= open_tables(thd, create_info, &thd->lex->query_tables, &not_used, 0);
   thd->lex->create_info.options= save_thd_create_info_options;
 
   if (res)
@@ -5285,8 +5288,7 @@ bool mysql_create_like_table(THD* thd, TABLE_LIST* table,
     goto err;
   }
   /* Ensure we don't try to create something from which we select from */
-  if ((create_info->options & HA_LEX_CREATE_REPLACE) &&
-      !create_info->tmp_table())
+  if (create_info.or_replace() && !create_info.tmp_table())
   {
     TABLE_LIST *duplicate;
     if ((duplicate= unique_table(thd, table, src_table, 0)))
@@ -5300,8 +5302,12 @@ bool mysql_create_like_table(THD* thd, TABLE_LIST* table,
 
   DEBUG_SYNC(thd, "create_table_like_after_open");
 
-  /* Fill HA_CREATE_INFO and Alter_info with description of source table. */
-  bzero((char*) &local_create_info, sizeof(local_create_info));
+  /*
+    Fill Table_specification_st and Alter_info with the source table description.
+    Set OR REPLACE and IF NOT EXISTS option as in the CREATE TABLE LIKE
+    statement.
+  */
+  local_create_info.init(create_info.create_like_options());
   local_create_info.db_type= src_table->table->s->db_type();
   local_create_info.row_type= src_table->table->s->row_type;
   if (mysql_prepare_alter_table(thd, src_table->table, &local_create_info,
@@ -5322,13 +5328,9 @@ bool mysql_create_like_table(THD* thd, TABLE_LIST* table,
   */
   if (src_table->schema_table)
     local_create_info.max_rows= 0;
-  /* Set IF NOT EXISTS option as in the CREATE TABLE LIKE statement. */
-  local_create_info.options|= (create_info->options &
-                               (HA_LEX_CREATE_IF_NOT_EXISTS | 
-                                HA_LEX_CREATE_REPLACE));
   /* Replace type of source table with one specified in the statement. */
   local_create_info.options&= ~HA_LEX_CREATE_TMP_TABLE;
-  local_create_info.options|= create_info->tmp_table();
+  local_create_info.options|= create_info.tmp_table();
   /* Reset auto-increment counter for the new table. */
   local_create_info.auto_increment_value= 0;
   /*
@@ -5343,7 +5345,7 @@ bool mysql_create_like_table(THD* thd, TABLE_LIST* table,
 
   res= ((create_res=
          mysql_create_table_no_lock(thd, table->db, table->table_name,
-                                    &local_create_info, &local_alter_info,
+                                    local_create_info, &local_alter_info,
                                     &is_trans, C_ORDINARY_CREATE)) > 0);
   /* Remember to log if we deleted something */
   do_logging= thd->log_current_statement;
@@ -5355,7 +5357,7 @@ bool mysql_create_like_table(THD* thd, TABLE_LIST* table,
     on a non temporary table
   */
   if (thd->locked_tables_mode && pos_in_locked_tables &&
-      (create_info->options & HA_LEX_CREATE_REPLACE))
+      create_info.or_replace())
   {
     /*
       Add back the deleted table and re-created table as a locked table
@@ -5383,7 +5385,7 @@ bool mysql_create_like_table(THD* thd, TABLE_LIST* table,
       Ensure that we have an exclusive lock on target table if we are creating
       non-temporary table.
     */
-    DBUG_ASSERT((create_info->tmp_table()) ||
+    DBUG_ASSERT((create_info.tmp_table()) ||
                 thd->mdl_context.is_lock_owner(MDL_key::TABLE, table->db,
                                                table->table_name,
                                                MDL_EXCLUSIVE));
@@ -5412,7 +5414,7 @@ bool mysql_create_like_table(THD* thd, TABLE_LIST* table,
            4    temporary temporary Nothing
            ==== ========= ========= ==============================
     */
-    if (!(create_info->tmp_table()))
+    if (!(create_info.tmp_table()))
     {
       if (src_table->table->s->tmp_table)               // Case 2
       {
@@ -5465,7 +5467,8 @@ bool mysql_create_like_table(THD* thd, TABLE_LIST* table,
         if (!table->view)
         {
           int result __attribute__((unused))=
-            show_create_table(thd, table, &query, create_info, WITHOUT_DB_NAME);
+            show_create_table(thd, table, &query,
+                              &create_info, WITHOUT_DB_NAME);
 
           DBUG_ASSERT(result == 0); // show_create_table() always return 0
           do_logging= FALSE;
@@ -5499,8 +5502,8 @@ bool mysql_create_like_table(THD* thd, TABLE_LIST* table,
   {
     DBUG_PRINT("info",
                ("res: %d  tmp_table: %d  create_info->table: %p",
-                res, create_info->tmp_table(), local_create_info.table));
-    if (!res && create_info->tmp_table() && local_create_info.table)
+                res, create_info.tmp_table(), local_create_info.table));
+    if (!res && create_info.tmp_table() && local_create_info.table)
     {
       /*
         Remember that tmp table creation was logged so that we know if
@@ -5514,7 +5517,7 @@ bool mysql_create_like_table(THD* thd, TABLE_LIST* table,
 err:
   if (do_logging)
   {
-    if (res && create_info->table_was_deleted)
+    if (res && create_info.table_was_deleted)
     {
       /*
         Table was not deleted. Original table was deleted.
@@ -5522,7 +5525,7 @@ bool mysql_create_like_table(THD* thd, TABLE_LIST* table,
       */
       log_drop_table(thd, table->db, table->db_length,
                      table->table_name, table->table_name_length,
-                     create_info->tmp_table());
+                     create_info.tmp_table());
     }
     else if (write_bin_log(thd, res ? FALSE : TRUE, thd->query(),
                            thd->query_length(), is_trans))
@@ -5918,7 +5921,7 @@ static bool is_candidate_key(KEY *key)
   
 #ifdef WITH_PARTITION_STORAGE_ENGINE
   partition_info *tab_part_info= table->part_info;
-  if (tab_part_info && thd->lex->check_exists)
+  if (tab_part_info && thd->lex->if_exists())
   {
     /* ALTER TABLE ADD PARTITION IF NOT EXISTS */
     if (alter_info->flags & Alter_info::ALTER_ADD_PARTITION)
@@ -8645,7 +8648,7 @@ bool mysql_alter_table(THD *thd,char *new_db, char *new_name,
                            alter_ctx.db, alter_ctx.table_name,
                            alter_ctx.new_db, alter_ctx.tmp_name,
                            alter_ctx.get_tmp_path(),
-                           create_info, alter_info,
+                           thd->lex->create_info, *create_info, alter_info,
                            C_ALTER_TABLE_FRM_ONLY, NULL,
                            &key_info, &key_count, &frm);
   reenable_binlog(thd);
diff --git a/sql/sql_table.h b/sql/sql_table.h
index c3e903a..83a2e1a 100644
--- a/sql/sql_table.h
+++ b/sql/sql_table.h
@@ -31,6 +31,7 @@
 class handler;
 typedef struct st_ha_check_opt HA_CHECK_OPT;
 struct HA_CREATE_INFO;
+struct Table_specification_st;
 typedef struct st_key KEY;
 typedef struct st_key_cache KEY_CACHE;
 typedef struct st_lock_param_type ALTER_PARTITION_PARAM_TYPE;
@@ -151,7 +152,7 @@ uint build_table_shadow_filename(char *buff, size_t bufflen,
                                  ALTER_PARTITION_PARAM_TYPE *lpt);
 uint build_tmptable_filename(THD* thd, char *buff, size_t bufflen);
 bool mysql_create_table(THD *thd, TABLE_LIST *create_table,
-                        HA_CREATE_INFO *create_info,
+                        Table_specification_st &create_info,
                         Alter_info *alter_info);
 
 /*
@@ -192,7 +193,7 @@ bool mysql_create_table(THD *thd, TABLE_LIST *create_table,
 
 int mysql_create_table_no_lock(THD *thd, const char *db,
                                const char *table_name,
-                               HA_CREATE_INFO *create_info,
+                               Table_specification_st &create_info,
                                Alter_info *alter_info, bool *is_trans,
                                int create_table_mode);
 
@@ -227,7 +228,7 @@ bool mysql_compare_tables(TABLE *table,
 bool mysql_recreate_table(THD *thd, TABLE_LIST *table_list, bool table_copy);
 bool mysql_create_like_table(THD *thd, TABLE_LIST *table,
                              TABLE_LIST *src_table,
-                             HA_CREATE_INFO *create_info);
+                             Table_specification_st &create_info);
 bool mysql_rename_table(handlerton *base, const char *old_db,
                         const char * old_name, const char *new_db,
                         const char * new_name, uint flags);
diff --git a/sql/sql_trigger.cc b/sql/sql_trigger.cc
index 443a82a..f404daa 100644
--- a/sql/sql_trigger.cc
+++ b/sql/sql_trigger.cc
@@ -444,7 +444,7 @@ bool mysql_create_or_drop_trigger(THD *thd, TABLE_LIST *tables, bool create)
 
   if (!create)
   {
-    bool if_exists= thd->lex->check_exists;
+    bool if_exists= thd->lex->if_exists();
 
     /*
       Protect the query table list from the temporary and potentially
diff --git a/sql/sql_view.cc b/sql/sql_view.cc
index 07169f2..b064546 100644
--- a/sql/sql_view.cc
+++ b/sql/sql_view.cc
@@ -1640,7 +1640,7 @@ bool mysql_drop_view(THD *thd, TABLE_LIST *views, enum_drop_mode drop_mode)
     {
       char name[FN_REFLEN];
       my_snprintf(name, sizeof(name), "%s.%s", view->db, view->table_name);
-      if (thd->lex->check_exists)
+      if (thd->lex->if_exists())
       {
 	push_warning_printf(thd, Sql_condition::WARN_LEVEL_NOTE,
 			    ER_BAD_TABLE_ERROR, ER(ER_BAD_TABLE_ERROR),
diff --git a/sql/sql_yacc.yy b/sql/sql_yacc.yy
index 632bdd2..0f2c0e0 100644
--- a/sql/sql_yacc.yy
+++ b/sql/sql_yacc.yy
@@ -739,11 +739,12 @@ static bool add_create_index_prepare (LEX *lex, Table_ident *table)
 
 static bool add_create_index (LEX *lex, Key::Keytype type,
                               const LEX_STRING &name,
+                              bool check_exists,
                               KEY_CREATE_INFO *info= NULL, bool generated= 0)
 {
   Key *key;
   key= new Key(type, name, info ? info : &lex->key_create_info, generated, 
-               lex->col_list, lex->option_list, lex->check_exists);
+               lex->col_list, lex->option_list, check_exists);
   if (key == NULL)
     return TRUE;
 
@@ -922,6 +923,7 @@ static bool sp_create_assignment_instr(THD *thd, bool no_lookahead)
   List<Condition_information_item> *cond_info_list;
   DYNCALL_CREATE_DEF *dyncol_def;
   List<DYNCALL_CREATE_DEF> *dyncol_def_list;
+  DDL_options_st object_ddl_options;
 }
 
 %{
@@ -1621,7 +1623,8 @@ bool my_yyoverflow(short **a, YYSTYPE **b, ulong *yystacksize);
         IDENT_sys TEXT_STRING_sys TEXT_STRING_literal
         NCHAR_STRING opt_component key_cache_name
         sp_opt_label BIN_NUM label_ident TEXT_STRING_filesystem ident_or_empty
-        opt_constraint constraint opt_ident opt_if_not_exists_ident
+        opt_constraint constraint opt_ident
+        opt_if_not_exists_opt_table_element_name
 
 %type <lex_str_ptr>
         opt_table_alias
@@ -1639,8 +1642,8 @@ bool my_yyoverflow(short **a, YYSTYPE **b, ulong *yystacksize);
 
 %type <num>
         type type_with_opt_collate int_type real_type order_dir lock_option
-        udf_type opt_if_exists opt_local opt_table_options table_options
-        table_option opt_if_not_exists create_or_replace opt_no_write_to_binlog
+        udf_type opt_local opt_table_options table_options
+        table_option opt_no_write_to_binlog
         opt_temporary all_or_any opt_distinct
         opt_ignore_leaves fulltext_options spatial_type union_option
         field_def opt_not opt_union_order_or_limit
@@ -1652,6 +1655,12 @@ bool my_yyoverflow(short **a, YYSTYPE **b, ulong *yystacksize);
         opt_time_precision kill_type kill_option int_num
         opt_default_time_precision
         case_stmt_body
+        opt_if_exists_table_element opt_if_not_exists_table_element
+
+%type <object_ddl_options>
+        create_or_replace 
+        opt_if_not_exists
+        opt_if_exists
 
 /*
   Bit field of MYSQL_START_TRANS_OPT_* flags.
@@ -2326,12 +2335,9 @@ create:
           create_or_replace opt_table_options TABLE_SYM opt_if_not_exists table_ident
           {
             LEX *lex= thd->lex;
-            lex->sql_command= SQLCOM_CREATE_TABLE;
-            if ($1 && $4)
-            {
-               my_error(ER_WRONG_USAGE, MYF(0), "OR REPLACE", "IF NOT EXISTS");
+            lex->create_info.init();
+            if (lex->set_command_with_check(SQLCOM_CREATE_TABLE, $2, $1 | $4))
                MYSQL_YYABORT;
-            }
             if (!lex->select_lex.add_table_to_list(thd, $5, NULL,
                                                    TL_OPTION_UPDATING,
                                                    TL_WRITE, MDL_EXCLUSIVE))
@@ -2339,13 +2345,11 @@ create:
             lex->alter_info.reset();
             lex->col_list.empty();
             lex->change=NullS;
-            bzero((char*) &lex->create_info,sizeof(lex->create_info));
             /*
               For CREATE TABLE we should not open the table even if it exists.
               If the table exists, we should either not create it or replace it
             */
             lex->query_tables->open_strategy= TABLE_LIST::OPEN_STUB;
-            lex->create_info.options= ($1 | $2 | $4);
             lex->create_info.default_table_charset= NULL;
             lex->name= null_lex_str;
             lex->create_last_non_select_table= lex->last_table();
@@ -2366,18 +2370,20 @@ create:
             }
             create_table_set_open_action_and_adjust_tables(lex);
           }
-        | CREATE opt_unique INDEX_SYM opt_if_not_exists ident key_alg ON table_ident
+        | CREATE opt_unique INDEX_SYM opt_if_not_exists ident
+          key_alg ON table_ident
           {
             if (add_create_index_prepare(Lex, $8))
               MYSQL_YYABORT;
           }
           '(' key_list ')' normal_key_options
           {
-            if (add_create_index(Lex, $2, $5))
+            if (add_create_index(Lex, $2, $5, $4.if_not_exists()))
               MYSQL_YYABORT;
           }
           opt_index_lock_algorithm { }
-        | CREATE fulltext INDEX_SYM opt_if_not_exists ident init_key_options ON
+        | CREATE fulltext INDEX_SYM opt_if_not_exists ident
+          init_key_options ON
           table_ident
           {
             if (add_create_index_prepare(Lex, $8))
@@ -2385,11 +2391,12 @@ create:
           }
           '(' key_list ')' fulltext_key_options
           {
-            if (add_create_index(Lex, $2, $5))
+            if (add_create_index(Lex, $2, $5, $4.if_not_exists()))
               MYSQL_YYABORT;
           }
           opt_index_lock_algorithm { }
-        | CREATE spatial INDEX_SYM opt_if_not_exists ident init_key_options ON
+        | CREATE spatial INDEX_SYM opt_if_not_exists ident
+          init_key_options ON
           table_ident
           {
             if (add_create_index_prepare(Lex, $8))
@@ -2397,7 +2404,7 @@ create:
           }
           '(' key_list ')' spatial_key_options
           {
-            if (add_create_index(Lex, $2, $5))
+            if (add_create_index(Lex, $2, $5, $4.if_not_exists()))
               MYSQL_YYABORT;
           }
           opt_index_lock_algorithm { }
@@ -2409,20 +2416,19 @@ create:
           opt_create_database_options
           {
             LEX *lex=Lex;
-            lex->sql_command=SQLCOM_CREATE_DB;
+            lex->set_command(SQLCOM_CREATE_DB, $3);
             lex->name= $4;
-            lex->create_info.options=$3;
           }
         | create_or_replace
           {
-            Lex->create_view_mode= ($1 == 0 ? VIEW_CREATE_NEW :
-                                    VIEW_CREATE_OR_REPLACE);
+            Lex->create_view_mode= ($1.or_replace() ? VIEW_CREATE_OR_REPLACE :
+                                                      VIEW_CREATE_NEW);
             Lex->create_view_algorithm= DTYPE_ALGORITHM_UNDEFINED;
             Lex->create_view_suid= TRUE;
           }
           view_or_trigger_or_sp_or_event
           {
-            if ($1 && Lex->sql_command != SQLCOM_CREATE_VIEW)
+            if ($1.or_replace() && Lex->sql_command != SQLCOM_CREATE_VIEW)
             {
                my_error(ER_WRONG_USAGE, MYF(0), "OR REPLACE",
                        "TRIGGERS / SP / EVENT");
@@ -2506,7 +2512,7 @@ event_tail:
             LEX *lex=Lex;
 
             lex->stmt_definition_begin= $1;
-            lex->create_info.options= $3;
+            lex->create_info.set($3);
             if (!(lex->event_parse_data= Event_parse_data::new_instance(thd)))
               MYSQL_YYABORT;
             lex->event_parse_data->identifier= $4;
@@ -4718,7 +4724,7 @@ create_body:
         | create_like
           {
 
-            Lex->create_info.options|= HA_LEX_CREATE_TABLE_LIKE;
+            Lex->create_info.add(DDL_options_st::OPT_LIKE);
             TABLE_LIST *src_table= Lex->select_lex.add_table_to_list(thd,
                                         $1, NULL, 0, TL_READ, MDL_SHARED_READ);
             if (! src_table)
@@ -5511,27 +5517,36 @@ table_option:
           TEMPORARY { $$=HA_LEX_CREATE_TMP_TABLE; }
         ;
 
-opt_if_not_exists:
+opt_if_not_exists_table_element:
           /* empty */
           {
             Lex->check_exists= FALSE;
-            $$= 0;
           }
         | IF_SYM not EXISTS
           {
             Lex->check_exists= TRUE;
-            $$=HA_LEX_CREATE_IF_NOT_EXISTS;
+          }
+         ;
+
+opt_if_not_exists:
+          /* empty */
+          {
+            $$.init();
+          }
+        | IF_SYM not EXISTS
+          {
+            $$.set(DDL_options_st::OPT_IF_NOT_EXISTS);
           }
          ;
 
 create_or_replace:
           CREATE /* empty */
           {
-            $$= 0;
+            $$.init();
           }
         | CREATE OR_SYM REPLACE
           {
-            $$= HA_LEX_CREATE_REPLACE;
+            $$.set(DDL_options_st::OPT_OR_REPLACE);
           }
          ;
 
@@ -5916,38 +5931,40 @@ column_def:
         ;
 
 key_def:
-          normal_key_type opt_if_not_exists_ident key_alg '(' key_list ')'
+          normal_key_type opt_if_not_exists_opt_table_element_name
+          key_alg '(' key_list ')'
           { Lex->option_list= NULL; }
           normal_key_options
           {
-            if (add_create_index (Lex, $1, $2))
+            if (add_create_index (Lex, $1, $2, Lex->check_exists))
               MYSQL_YYABORT;
           }
-        | fulltext opt_key_or_index opt_if_not_exists_ident init_key_options 
-            '(' key_list ')'
+        | fulltext opt_key_or_index opt_if_not_exists_opt_table_element_name
+          init_key_options '(' key_list ')'
           { Lex->option_list= NULL; }
             fulltext_key_options
           {
-            if (add_create_index (Lex, $1, $3))
+            if (add_create_index (Lex, $1, $3, Lex->check_exists))
               MYSQL_YYABORT;
           }
-        | spatial opt_key_or_index opt_if_not_exists_ident init_key_options 
-            '(' key_list ')'
+        | spatial opt_key_or_index opt_if_not_exists_opt_table_element_name
+          init_key_options '(' key_list ')'
           { Lex->option_list= NULL; }
             spatial_key_options
           {
-            if (add_create_index (Lex, $1, $3))
+            if (add_create_index (Lex, $1, $3, Lex->check_exists))
               MYSQL_YYABORT;
           }
-        | opt_constraint constraint_key_type opt_if_not_exists_ident key_alg
-          '(' key_list ')'
+        | opt_constraint constraint_key_type
+          opt_if_not_exists_opt_table_element_name key_alg '(' key_list ')'
           { Lex->option_list= NULL; }
           normal_key_options
           {
-            if (add_create_index (Lex, $2, $3.str ? $3 : $1))
+            if (add_create_index (Lex, $2, $3.str ? $3 : $1, Lex->check_exists))
               MYSQL_YYABORT;
           }
-        | opt_constraint FOREIGN KEY_SYM opt_if_not_exists_ident '(' key_list ')' references
+        | opt_constraint FOREIGN KEY_SYM opt_if_not_exists_opt_table_element_name
+          '(' key_list ')' references
           {
             LEX *lex=Lex;
             Key *key= new Foreign_key($4.str ? $4 : $1, lex->col_list,
@@ -5963,6 +5980,7 @@ key_def:
             lex->alter_info.key_list.push_back(key);
             lex->option_list= NULL;
             if (add_create_index (lex, Key::MULTIPLE, $1.str ? $1 : $4,
+                                  Lex->check_exists,
                                   &default_key_create_info, 1))
               MYSQL_YYABORT;
             /* Only used for ALTER TABLE. Ignored otherwise. */
@@ -6013,7 +6031,8 @@ field_spec:
                                   &lex->comment,
                                   lex->change,&lex->interval_list,lex->charset,
                                   lex->uint_geom_type,
-                                  lex->vcol_info, lex->option_list))
+                                  lex->vcol_info, lex->option_list,
+                                  lex->check_exists))
               MYSQL_YYABORT;
           }
         ;
@@ -6989,8 +7008,8 @@ opt_ident:
         | field_ident { $$= $1; }
         ;
 
-opt_if_not_exists_ident:
-        opt_if_not_exists opt_ident
+opt_if_not_exists_opt_table_element_name:
+        opt_if_not_exists_table_element opt_ident
         {
           LEX *lex= Lex;
           if (lex->check_exists && lex->sql_command != SQLCOM_ALTER_TABLE)
@@ -7022,9 +7041,7 @@ alter:
             Lex->duplicates= DUP_ERROR; 
             Lex->col_list.empty();
             Lex->select_lex.init_order();
-            bzero(&Lex->create_info, sizeof(Lex->create_info));
-            Lex->create_info.db_type= 0;
-            Lex->create_info.default_table_charset= NULL;
+            Lex->create_info.init();
             Lex->create_info.row_type= ROW_TYPE_NOT_USED;
             Lex->alter_info.reset();
             Lex->no_write_to_binlog= 0;
@@ -7273,6 +7290,8 @@ alter_commands:
         | DROP PARTITION_SYM opt_if_exists alt_part_name_list
           {
             Lex->alter_info.flags|= Alter_info::ALTER_DROP_PARTITION;
+            DBUG_ASSERT(!Lex->if_exists());
+            Lex->create_info.add(DDL_options_st::OPT_IF_EXISTS);
           }
         | REBUILD_SYM PARTITION_SYM opt_no_write_to_binlog
           all_or_alt_part_name_list
@@ -7390,7 +7409,8 @@ all_or_alt_part_name_list:
         ;
 
 add_partition_rule:
-          ADD PARTITION_SYM opt_if_not_exists opt_no_write_to_binlog
+          ADD PARTITION_SYM opt_if_not_exists_table_element
+          opt_no_write_to_binlog
           {
             LEX *lex= Lex;
             lex->part_info= new partition_info();
@@ -7476,7 +7496,7 @@ alter_list:
         ;
 
 add_column:
-          ADD opt_column opt_if_not_exists
+          ADD opt_column opt_if_not_exists_table_element
           {
             LEX *lex=Lex;
             lex->change=0;
@@ -7499,7 +7519,7 @@ alter_list_item:
             Lex->alter_info.flags|= Alter_info::ALTER_ADD_COLUMN |
                                     Alter_info::ALTER_ADD_INDEX;
           }
-        | CHANGE opt_column opt_if_exists field_ident
+        | CHANGE opt_column opt_if_exists_table_element field_ident
           {
             LEX *lex=Lex;
             lex->change= $4.str;
@@ -7510,7 +7530,7 @@ alter_list_item:
           {
             Lex->create_last_non_select_table= Lex->last_table();
           }
-        | MODIFY_SYM opt_column opt_if_exists field_ident
+        | MODIFY_SYM opt_column opt_if_exists_table_element field_ident
           {
             LEX *lex=Lex;
             lex->length=lex->dec=0; lex->type=0;
@@ -7531,14 +7551,14 @@ alter_list_item:
                                   &lex->comment,
                                   $4.str, &lex->interval_list, lex->charset,
                                   lex->uint_geom_type,
-                                  lex->vcol_info, lex->option_list))
+                                  lex->vcol_info, lex->option_list, $3))
               MYSQL_YYABORT;
           }
           opt_place
           {
             Lex->create_last_non_select_table= Lex->last_table();
           }
-        | DROP opt_column opt_if_exists field_ident opt_restrict
+        | DROP opt_column opt_if_exists_table_element field_ident opt_restrict
           {
             LEX *lex=Lex;
             Alter_drop *ad= new Alter_drop(Alter_drop::COLUMN, $4.str, $3);
@@ -7547,7 +7567,7 @@ alter_list_item:
             lex->alter_info.drop_list.push_back(ad);
             lex->alter_info.flags|= Alter_info::ALTER_DROP_COLUMN;
           }
-        | DROP FOREIGN KEY_SYM opt_if_exists field_ident
+        | DROP FOREIGN KEY_SYM opt_if_exists_table_element field_ident
           {
             LEX *lex=Lex;
             Alter_drop *ad= new Alter_drop(Alter_drop::FOREIGN_KEY, $5.str, $4);
@@ -7566,7 +7586,7 @@ alter_list_item:
             lex->alter_info.drop_list.push_back(ad);
             lex->alter_info.flags|= Alter_info::ALTER_DROP_INDEX;
           }
-        | DROP key_or_index opt_if_exists field_ident
+        | DROP key_or_index opt_if_exists_table_element field_ident
           {
             LEX *lex=Lex;
             Alter_drop *ad= new Alter_drop(Alter_drop::KEY, $4.str, $3);
@@ -11676,15 +11696,13 @@ drop:
           DROP opt_temporary table_or_tables opt_if_exists
           {
             LEX *lex=Lex;
-            lex->sql_command = SQLCOM_DROP_TABLE;
-            lex->drop_temporary= $2;
-            lex->check_exists= $4;
+            lex->set_command(SQLCOM_DROP_TABLE, $2, $4);
             YYPS->m_lock_type= TL_UNLOCK;
             YYPS->m_mdl_type= MDL_EXCLUSIVE;
           }
           table_list opt_restrict
           {}
-        | DROP INDEX_SYM opt_if_exists ident ON table_ident {}
+        | DROP INDEX_SYM opt_if_exists_table_element ident ON table_ident {}
           {
             LEX *lex=Lex;
             Alter_drop *ad= new Alter_drop(Alter_drop::KEY, $4.str, $3);
@@ -11703,8 +11721,7 @@ drop:
         | DROP DATABASE opt_if_exists ident
           {
             LEX *lex=Lex;
-            lex->sql_command= SQLCOM_DROP_DB;
-            lex->check_exists=$3;
+            lex->set_command(SQLCOM_DROP_DB, $3);
             lex->name= $4;
           }
         | DROP FUNCTION_SYM opt_if_exists ident '.' ident
@@ -11721,8 +11738,7 @@ drop:
               my_error(ER_SP_NO_DROP_SP, MYF(0), "FUNCTION");
               MYSQL_YYABORT;
             }
-            lex->sql_command = SQLCOM_DROP_FUNCTION;
-            lex->check_exists= $3;
+            lex->set_command(SQLCOM_DROP_FUNCTION, $3);
             spname= new sp_name($4, $6, true);
             if (spname == NULL)
               MYSQL_YYABORT;
@@ -11741,8 +11757,7 @@ drop:
             }
             if (thd->db && lex->copy_db_to(&db.str, &db.length))
               MYSQL_YYABORT;
-            lex->sql_command = SQLCOM_DROP_FUNCTION;
-            lex->check_exists= $3;
+            lex->set_command(SQLCOM_DROP_FUNCTION, $3);
             spname= new sp_name(db, $4, false);
             if (spname == NULL)
               MYSQL_YYABORT;
@@ -11757,8 +11772,7 @@ drop:
               my_error(ER_SP_NO_DROP_SP, MYF(0), "PROCEDURE");
               MYSQL_YYABORT;
             }
-            lex->sql_command = SQLCOM_DROP_PROCEDURE;
-            lex->check_exists= $3;
+            lex->set_command(SQLCOM_DROP_PROCEDURE, $3);
             lex->spname= $4;
           }
         | DROP USER clear_privileges user_list
@@ -11772,8 +11786,7 @@ drop:
         | DROP VIEW_SYM opt_if_exists
           {
             LEX *lex= Lex;
-            lex->sql_command= SQLCOM_DROP_VIEW;
-            lex->check_exists= $3;
+            lex->set_command(SQLCOM_DROP_VIEW, $3);
             YYPS->m_lock_type= TL_UNLOCK;
             YYPS->m_mdl_type= MDL_EXCLUSIVE;
           }
@@ -11781,15 +11794,13 @@ drop:
           {}
         | DROP EVENT_SYM opt_if_exists sp_name
           {
-            Lex->check_exists= $3;
             Lex->spname= $4;
-            Lex->sql_command = SQLCOM_DROP_EVENT;
+            Lex->set_command(SQLCOM_DROP_EVENT, $3);
           }
         | DROP TRIGGER_SYM opt_if_exists sp_name
           {
             LEX *lex= Lex;
-            lex->sql_command= SQLCOM_DROP_TRIGGER;
-            lex->check_exists= $3;
+            lex->set_command(SQLCOM_DROP_TRIGGER, $3);
             lex->spname= $4;
           }
         | DROP TABLESPACE tablespace_name opt_ts_engine opt_ts_wait
@@ -11804,8 +11815,7 @@ drop:
           }
         | DROP SERVER_SYM opt_if_exists ident_or_text
           {
-            Lex->sql_command = SQLCOM_DROP_SERVER;
-            Lex->check_exists= $3;
+            Lex->set_command(SQLCOM_DROP_SERVER, $3);
             Lex->server_options.server_name= $4.str;
             Lex->server_options.server_name_length= $4.length;
           }
@@ -11856,7 +11866,7 @@ table_alias_ref:
           }
         ;
 
-opt_if_exists:
+opt_if_exists_table_element:
           /* empty */
         {
           Lex->check_exists= FALSE;
@@ -11869,9 +11879,20 @@ opt_if_exists:
         }
         ;
 
+opt_if_exists:
+          /* empty */
+        {
+          $$.set(DDL_options_st::OPT_NONE);
+        }
+        | IF_SYM EXISTS
+        {
+          $$.set(DDL_options_st::OPT_IF_EXISTS);
+        }
+        ;
+
 opt_temporary:
           /* empty */ { $$= 0; }
-        | TEMPORARY { $$= 1; }
+        | TEMPORARY { $$= HA_LEX_CREATE_TMP_TABLE;; }
         ;
 /*
 ** Insert : add new data to table
@@ -12340,7 +12361,7 @@ show:
             lex->ident=null_lex_str;
             mysql_init_select(lex);
             lex->current_select->parsing_place= SELECT_LIST;
-            bzero((char*) &lex->create_info,sizeof(lex->create_info));
+            lex->create_info.init();
           }
           show_param
           {
@@ -12542,8 +12563,7 @@ show_param:
           }
         | CREATE DATABASE opt_if_not_exists ident
           {
-            Lex->sql_command=SQLCOM_SHOW_CREATE_DB;
-            Lex->create_info.options=$3;
+            Lex->set_command(SQLCOM_SHOW_CREATE_DB, $3);
             Lex->name= $4;
           }
         | CREATE TABLE_SYM table_ident
diff --git a/sql/structs.h b/sql/structs.h
index 99561c5..b38685c 100644
--- a/sql/structs.h
+++ b/sql/structs.h
@@ -471,4 +471,67 @@ class Discrete_intervals_list {
   Discrete_interval* get_current() const { return current; };
 };
 
+
+struct DDL_options_st
+{
+public:
+  enum Options
+  {
+    OPT_NONE= 0,
+    OPT_IF_NOT_EXISTS= 2,              // CREATE TABLE IF NOT EXISTS
+    OPT_LIKE= 4,                       // CREATE TABLE LIKE
+    OPT_OR_REPLACE= 16,                // CREATE OR REPLACE TABLE
+    OPT_OR_REPLACE_SLAVE_GENERATED= 32,// REPLACE was added on slave, it was
+                                       // not in the original query on master.
+    OPT_IF_EXISTS= 64
+  };
+
+private:
+  Options m_options;
+
+protected:
+
+public:
+  Options create_like_options() const
+  {
+    return (DDL_options_st::Options)
+           (((uint) m_options) & (OPT_IF_NOT_EXISTS | OPT_OR_REPLACE));
+  }
+  void init() { m_options= OPT_NONE; }
+  void init(Options options) { m_options= options; }
+  void set(Options other)
+  {
+    m_options= other;
+  }
+  void set(const DDL_options_st other)
+  {
+    m_options= other.m_options;
+  }
+  bool if_not_exists() const { return m_options & OPT_IF_NOT_EXISTS; }
+  bool or_replace() const { return m_options & OPT_OR_REPLACE; }
+  bool or_replace_slave_generated() const
+  { return m_options & OPT_OR_REPLACE_SLAVE_GENERATED; }
+  bool like() const { return m_options & OPT_LIKE; }
+  bool if_exists() const { return m_options & OPT_IF_EXISTS; }
+  void add(const DDL_options_st::Options other)
+  {
+    m_options= (Options) ((uint) m_options | (uint) other);
+  }
+  void add(const DDL_options_st &other)
+  {
+    add(other.m_options);
+  }
+  DDL_options_st operator|(const DDL_options_st &other)
+  {
+    add(other.m_options);
+    return *this;
+  }
+  DDL_options_st operator|=(DDL_options_st::Options other)
+  {
+    add(other);
+    return *this;
+  }
+};
+
+
 #endif /* STRUCTS_INCLUDED */
diff --git a/sql/table.cc b/sql/table.cc
index 6ac4544..1e6f3ff 100644
--- a/sql/table.cc
+++ b/sql/table.cc
@@ -2061,7 +2061,7 @@ static bool sql_unusable_for_discovery(THD *thd, handlerton *engine,
   if (lex->sql_command != SQLCOM_CREATE_TABLE)
     return 1;
   // ... create like
-  if (create_info->options & HA_LEX_CREATE_TABLE_LIKE)
+  if (lex->create_info.like())
     return 1;
   // ... create select
   if (lex->select_lex.item_list.elements)
@@ -2070,7 +2070,7 @@ static bool sql_unusable_for_discovery(THD *thd, handlerton *engine,
   if (create_info->tmp_table())
     return 1;
   // ... if exists
-  if (create_info->options & HA_LEX_CREATE_IF_NOT_EXISTS)
+  if (lex->create_info.if_not_exists())
     return 1;
 
   // XXX error out or rather ignore the following:
@@ -3510,6 +3510,35 @@ bool check_db_name(LEX_STRING *org_name)
 }
 
 
+void SQL::Identifier_normalized::normalize()
+{
+  if (lower_case_table_names == 1 && name() != any_db)
+  {
+    CHARSET_INFO *cs= files_charset_info;
+    if (!check_mysql50_prefix(name()))
+    {
+      LEX_CSTRING::length= cs->cset->casedn(cs,
+                                            (char *) LEX_CSTRING::str,
+                                            LEX_CSTRING::length,
+                                            m_buf, sizeof(m_buf) - 1);
+    }
+    else
+    {
+      memcpy(m_buf, LEX_CSTRING::str, MYSQL50_TABLE_NAME_PREFIX_LENGTH);
+      LEX_CSTRING src;
+      src.str= name() + MYSQL50_TABLE_NAME_PREFIX_LENGTH;
+      src.length= name_length() - MYSQL50_TABLE_NAME_PREFIX_LENGTH;
+      LEX_CSTRING::length= MYSQL50_TABLE_NAME_PREFIX_LENGTH +
+        cs->cset->casedn(cs, (char *) src.str, src.length,
+                         m_buf + MYSQL50_TABLE_NAME_PREFIX_LENGTH,
+                         sizeof(m_buf) - MYSQL50_TABLE_NAME_PREFIX_LENGTH - 1);
+    }
+    LEX_CSTRING::str= m_buf;
+    m_buf[LEX_CSTRING::length]= '\0';
+  }
+}
+
+
 /*
   Allow anything as a table name, as long as it doesn't contain an
   ' ' at the end
diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc
index 81b1e18..6a2d394 100644
--- a/sql/wsrep_mysqld.cc
+++ b/sql/wsrep_mysqld.cc
@@ -2364,7 +2364,7 @@ bool wsrep_create_like_table(THD* thd, TABLE_LIST* table,
         break;
       }
   }
-  if (create_info->options & HA_LEX_CREATE_TMP_TABLE)
+  if (create_info->tmp_table())
   {
 
     /* CREATE TEMPORARY TABLE LIKE must be skipped from replication */
diff --git a/storage/innobase/handler/ha_innodb.cc b/storage/innobase/handler/ha_innodb.cc
index 8154c0f..7598da2 100644
--- a/storage/innobase/handler/ha_innodb.cc
+++ b/storage/innobase/handler/ha_innodb.cc
@@ -10713,7 +10713,7 @@ static __attribute__((nonnull, warn_unused_result))
 
 	/* Do not use DATA DIRECTORY with TEMPORARY TABLE. */
 	if (create_info->data_file_name
-	    && create_info->options & HA_LEX_CREATE_TMP_TABLE) {
+	    && create_info->tmp_table()) {
 		push_warning(
 			thd, Sql_condition::WARN_LEVEL_WARN,
 			ER_ILLEGAL_HA_CREATE_OPTION,
@@ -10808,7 +10808,7 @@ static __attribute__((nonnull, warn_unused_result))
 
 	if (use_tablespace
 	    && !mysqld_embedded
-	    && !(create_info->options & HA_LEX_CREATE_TMP_TABLE)) {
+	    && !create_info->tmp_table()) {
 
 		if ((name[1] == ':')
 		    || (name[0] == '\\' && name[1] == '\\')) {
@@ -10826,7 +10826,7 @@ static __attribute__((nonnull, warn_unused_result))
 	In the case of;
 	  CREATE TEMPORARY TABLE ... DATA DIRECTORY={path} ... ;
 	We ignore the DATA DIRECTORY. */
-	if (create_info->options & HA_LEX_CREATE_TMP_TABLE) {
+	if (create_info->tmp_table()) {
 		strncpy(temp_path, name, FN_REFLEN - 1);
 	}
 
@@ -10844,7 +10844,7 @@ static __attribute__((nonnull, warn_unused_result))
 		}
 
 		/* Do not use DATA DIRECTORY with TEMPORARY TABLE. */
-		if (create_info->options & HA_LEX_CREATE_TMP_TABLE) {
+		if (create_info->tmp_table()) {
 			push_warning(
 				thd, Sql_condition::WARN_LEVEL_WARN,
 				ER_ILLEGAL_HA_CREATE_OPTION,
@@ -10916,7 +10916,7 @@ static __attribute__((nonnull, warn_unused_result))
 
 			/* We don't support FTS indexes in temporary
 			tables. */
-			if (create_info->options & HA_LEX_CREATE_TMP_TABLE) {
+			if (create_info->tmp_table()) {
 
 				my_error(ER_INNODB_NO_FT_TEMP_TABLE, MYF(0));
 				DBUG_RETURN(false);
@@ -11095,7 +11095,7 @@ static __attribute__((nonnull, warn_unused_result))
 
 	use_data_dir = use_tablespace
 		       && ((create_info->data_file_name != NULL)
-		       && !(create_info->options & HA_LEX_CREATE_TMP_TABLE));
+		       && !create_info->tmp_table());
 
 	/* Set up table dictionary flags */
 	dict_tf_set(flags,
@@ -11107,7 +11107,7 @@ static __attribute__((nonnull, warn_unused_result))
 		        default_compression_level : options->page_compression_level,
 		    options->atomic_writes);
 
-	if (create_info->options & HA_LEX_CREATE_TMP_TABLE) {
+	if (create_info->tmp_table()) {
 		*flags2 |= DICT_TF2_TEMPORARY;
 	}
 
@@ -11466,7 +11466,7 @@ static __attribute__((nonnull, warn_unused_result))
 	if (stmt) {
 		dberr_t	err = row_table_add_foreign_constraints(
 			trx, stmt, stmt_len, norm_name,
-			create_info->options & HA_LEX_CREATE_TMP_TABLE);
+			create_info->tmp_table());
 
 		switch (err) {
 
diff --git a/storage/spider/ha_spider.cc b/storage/spider/ha_spider.cc
index 03fa644..ccfb04f 100644
--- a/storage/spider/ha_spider.cc
+++ b/storage/spider/ha_spider.cc
@@ -10493,8 +10493,7 @@ int ha_spider::create(
     goto error;
   DBUG_PRINT("info",("spider tmp_table=%d", form->s->tmp_table));
   if (
-    (sql_command == SQLCOM_CREATE_TABLE &&
-      !(info->options & HA_LEX_CREATE_TMP_TABLE))
+    (sql_command == SQLCOM_CREATE_TABLE && !info->tmp_table())
   ) {
     if (
       !(table_tables = spider_open_sys_table(
diff --git a/storage/xtradb/handler/ha_innodb.cc b/storage/xtradb/handler/ha_innodb.cc
index f3b1167..6b1942e 100644
--- a/storage/xtradb/handler/ha_innodb.cc
+++ b/storage/xtradb/handler/ha_innodb.cc
@@ -11349,7 +11349,7 @@ static __attribute__((nonnull, warn_unused_result))
 
 	/* Do not use DATA DIRECTORY with TEMPORARY TABLE. */
 	if (create_info->data_file_name
-	    && create_info->options & HA_LEX_CREATE_TMP_TABLE) {
+	    && create_info->tmp_table()) {
 		push_warning(
 			thd, Sql_condition::WARN_LEVEL_WARN,
 			ER_ILLEGAL_HA_CREATE_OPTION,
@@ -11444,7 +11444,7 @@ static __attribute__((nonnull, warn_unused_result))
 
 	if (use_tablespace
 	    && !mysqld_embedded
-	    && !(create_info->options & HA_LEX_CREATE_TMP_TABLE)) {
+	    && !create_info->tmp_table()) {
 
 		if ((name[1] == ':')
 		    || (name[0] == '\\' && name[1] == '\\')) {
@@ -11462,7 +11462,7 @@ static __attribute__((nonnull, warn_unused_result))
 	In the case of;
 	  CREATE TEMPORARY TABLE ... DATA DIRECTORY={path} ... ;
 	We ignore the DATA DIRECTORY. */
-	if (create_info->options & HA_LEX_CREATE_TMP_TABLE) {
+	if (create_info->tmp_table()) {
 		strncpy(temp_path, name, FN_REFLEN - 1);
 	}
 
@@ -11480,7 +11480,7 @@ static __attribute__((nonnull, warn_unused_result))
 		}
 
 		/* Do not use DATA DIRECTORY with TEMPORARY TABLE. */
-		if (create_info->options & HA_LEX_CREATE_TMP_TABLE) {
+		if (create_info->tmp_table()) {
 			push_warning(
 				thd, Sql_condition::WARN_LEVEL_WARN,
 				ER_ILLEGAL_HA_CREATE_OPTION,
@@ -11552,7 +11552,7 @@ static __attribute__((nonnull, warn_unused_result))
 
 			/* We don't support FTS indexes in temporary
 			tables. */
-			if (create_info->options & HA_LEX_CREATE_TMP_TABLE) {
+			if (create_info->tmp_table()) {
 
 				my_error(ER_INNODB_NO_FT_TEMP_TABLE, MYF(0));
 				DBUG_RETURN(false);
@@ -11728,7 +11728,7 @@ static __attribute__((nonnull, warn_unused_result))
 
 	use_data_dir = use_tablespace
 		       && ((create_info->data_file_name != NULL)
-		       && !(create_info->options & HA_LEX_CREATE_TMP_TABLE));
+		       && !create_info->tmp_table());
 
 	/* Set up table dictionary flags */
 	dict_tf_set(flags,
@@ -11740,7 +11740,7 @@ static __attribute__((nonnull, warn_unused_result))
 		        default_compression_level : options->page_compression_level,
 		    options->atomic_writes);
 
-	if (create_info->options & HA_LEX_CREATE_TMP_TABLE) {
+	if (create_info->tmp_table()) {
 		*flags2 |= DICT_TF2_TEMPORARY;
 	}
 
@@ -12104,7 +12104,7 @@ static __attribute__((nonnull, warn_unused_result))
 	if (stmt) {
 		dberr_t	err = row_table_add_foreign_constraints(
 			trx, stmt, stmt_len, norm_name,
-			create_info->options & HA_LEX_CREATE_TMP_TABLE);
+			create_info->tmp_table());
 
 		switch (err) {
 

Follow ups