diff --git a/rpl/remote_event_stream.cc b/rpl/remote_event_stream.cc new file mode 100644 index 0000000000000..b0d05f1538e36 --- /dev/null +++ b/rpl/remote_event_stream.cc @@ -0,0 +1,169 @@ +/* + Copyright (c) 2026 MariaDB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + See the GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along + with this program; if not, write to the Free Software Foundation, Inc., + 51 Franklin St, Fifth Floor, Boston, MA 02110-1335 USA. +*/ + +#include "remote_event_stream.h" +#include // required for `sql_common.h`; don't know which part +#include // required for `sql_common.h` to fix "C++ in C linkage" +#include // `mysql.h`, mysql_reconnect() +#include // vio_close(), vio_shutdown() + + +void Remote_event_stream::Connection_options::operator()(MYSQL *connector) const +{ + //TODO: Connector/C deprecated mysql_options() in favor of `mysql_optionsv()`. + /* + mysql_optionsv(connector, MARIADB_OPT_HOST, host); + mysql_optionsv(connector, MARIADB_OPT_USER, user); + mysql_optionsv(connector, MARIADB_OPT_PASSWORD, password); + mysql_optionsv(connector, MARIADB_OPT_PORT, &port); + */ + connector->host= host; connector->user= user; connector->passwd= password; + connector->port= port; + if (ssl_options) + { + mysql_ssl_set(connector, + ssl_options->ssl_key, ssl_options->ssl_cert, + ssl_options->ssl_ca, ssl_options->ssl_capath, ssl_options->ssl_cipher); + mysql_options(connector, MYSQL_OPT_SSL_CRL, ssl_options->ssl_crl); + mysql_options(connector, MYSQL_OPT_SSL_CRLPATH, ssl_options->ssl_crlpath); + mysql_options(connector, MYSQL_OPT_SSL_VERIFY_SERVER_CERT, + &ssl_options->ssl_verify_server_cert); + } + else + //mysql_optionsv(connector, MYSQL_OPT_SSL_ENFORCE, &use_ssl); + connector->options.use_ssl= false; + mysql_options(connector, MYSQL_SET_CHARSET_NAME, charset_name); + // In case the master asks for an external authentication plugin + mysql_options(connector, MYSQL_PLUGIN_DIR, plugin_dir); +} + + +Rpl_slave_connection::Rpl_slave_connection( + const Connection_options &options, uint32_t timeout) +{ + connector= mysql_init(nullptr); + if (!connector) + return; + mysql_options(connector, MYSQL_OPT_CONNECT_TIMEOUT, &timeout); + mysql_options(connector, MYSQL_OPT_READ_TIMEOUT, &timeout); + options(connector); +} + +Remote_event_stream::Remote_event_stream( + const Connection_options &options, uint32_t timeout, + unsigned long max_allowed_packet): Rpl_slave_connection(options, timeout) +{ + if (!connector) + return; + unsigned char yes= true; + //@deprecated not applicable with Connector/C + mysql_options(connector, MYSQL_OPT_USE_THREAD_SPECIFIC_MEMORY, &yes); + //mysql_options(connector, MYSQL_OPT_MAX_ALLOWED_PACKET, &max_allowed_packet); + connector->options.max_allowed_packet= max_allowed_packet; + mysql_options(connector, MYSQL_OPT_RECONNECT, &yes); +} + +Semi_sync_graceful_killer::Semi_sync_graceful_killer( + const Connection_options &options, uint32_t timeout): + Rpl_slave_connection(options, timeout) +{ + if (connector) + mysql_options(connector, MYSQL_OPT_WRITE_TIMEOUT, &timeout); +} + +Rpl_slave_connection::~Rpl_slave_connection() +{ + if (connector) + { + vio_close(connector->net.vio); //@deprecated already covered by Connector/C + mysql_close(connector); + } +} + + +unsigned int Remote_event_stream::errnum() { return mysql_errno(connector); } +const char *Remote_event_stream::errmsg() { return mysql_error(connector); } + +/** Call mysql_real_connect() with matching properties in the struct + TODO: Refactor Connector/C to avoid re-passing these parameters +*/ +bool Rpl_slave_connection::connect(unsigned long flags) +{ + return !mysql_real_connect(connector, + connector->host, connector->user, connector->passwd, connector->db, + connector->port, connector->unix_socket, flags); +} + +bool Remote_event_stream::connect(bool compress) +{ + if (do_reconnect) + return mysql_reconnect(connector); + do_reconnect= true; + return Rpl_slave_connection::connect( + CLIENT_REMEMBER_OPTIONS | (compress ? CLIENT_COMPRESS : 0)); +} + + +bool Rpl_slave_connection::real_query(const char *query, size_t strlen) +{ return mysql_real_query(connector, query, strlen); } + +unsigned long Remote_event_stream::master_version() +{ return mysql_get_server_version(connector); } + +MYSQL_RES_P Remote_event_stream::store_result() +{ return mysql_store_result(connector); } +char **Remote_event_stream::fetch_row(MYSQL_RES_P query_result) +{ return mysql_fetch_row(query_result); } +void Remote_event_stream::free_result(MYSQL_RES_P query_result) +{ return mysql_free_result(query_result); } + +bool Remote_event_stream::send_command( + int command, const unsigned char *args, size_t strlen, bool skip_check) +{ + return simple_command(connector, static_cast(command), + args, static_cast(strlen), false); +} + +unsigned long Remote_event_stream::thread_id() +{ return connector->thread_id; } + + +///TODO: Split this part from Connector/C's mariadb_rpl_fetch() +std::basic_string_view Remote_event_stream::next() +{ + auto strlen= static_cast(cli_safe_read(connector)); + return {connector->net.read_pos, strlen}; +} + +bool Remote_event_stream::semisync_ack( + const std::string_view log_name, uint64_t next_pos) +{ + constexpr size_t HEAD_SIZE= /* Semi-Sync Header */1 + sizeof(next_pos); + char payload[HEAD_SIZE + (FN_REFLEN+1)]= {'\xEF'}; + int8store(&(payload[1]), next_pos); + log_name.copy(&(payload[HEAD_SIZE]), sizeof(payload)-HEAD_SIZE); + NET *net= &connector->net; + //@deprecated: not required in Connector/C + net->pkt_nr_can_be_reset= true; + // Connector/C might not require resetting; better be safe until confirmed. + net_clear(net, false); + return my_net_write(net, reinterpret_cast(payload), + HEAD_SIZE + (log_name.size()+1)) || net_flush(net); +} + +void Remote_event_stream::abort() +{ vio_shutdown(connector->net.vio, SHUT_RDWR); } diff --git a/rpl/remote_event_stream.h b/rpl/remote_event_stream.h new file mode 100644 index 0000000000000..2861193635a24 --- /dev/null +++ b/rpl/remote_event_stream.h @@ -0,0 +1,130 @@ +/* + Copyright (c) 2026 MariaDB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + See the GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along + with this program; if not, write to the Free Software Foundation, Inc., + 51 Franklin St, Fifth Floor, Boston, MA 02110-1335 USA. +*/ + +#include +#include // Remote_event_stream::Connection_options::ssl_options +#include // Return type of Remote_event_stream::next() + +using MYSQL= struct st_mysql; +using MYSQL_RES_P= struct st_mysql_res *; + + +struct Rpl_slave_connection +{ + struct Connection_options + { + char *host, *user, *password; + unsigned int port; + struct SSL_options + { + const char + *ssl_ca, *ssl_capath, + *ssl_cert, + *ssl_crl, *ssl_crlpath, + *ssl_key, + *ssl_cipher; + bool ssl_verify_server_cert; + }; + std::optional ssl_options; ///< Disable SSL with std::nullopt + const char *charset_name; + /// In case the master asks for an external authentication plugin + const char *plugin_dir; + void operator ()(MYSQL *connector) const; + }; + + operator bool() { return connector; } + ///@deprecated TODO: hide this behind reusable helpers @{ + ///@return `false` on success, or `true` on error + bool connect(unsigned long flags= 0); + bool real_query(const char *query, size_t strlen); + /// }@ +protected: + /**TODO: + Replace this opaque pointer with `std::optional` + when `../sql-common` is completely phased out. + */ + MYSQL *connector; + /** + @note This does not invoke the setup methods; do so before calling next(). + (TODO: clean up the rest of `sql/slave.cc`'s `handle_slave_io()` & + sub-procedures so they don't have to be manual.) + @post operator bool() is `true` on success or `false` on OOM. + */ + Rpl_slave_connection(const Connection_options &options, uint32_t timeout); + ~Rpl_slave_connection(); +}; + + +struct Remote_event_stream: Rpl_slave_connection +{ + bool do_reconnect= false; + Remote_event_stream(const Connection_options &options, + uint32_t timeout, unsigned long max_allowed_packet); + + /** Ordered setup commands and error status getters + @deprecated TODO: hide these behind helper + methods of protected Chain of Command classes @{ + */ + unsigned int errnum(); + const char *errmsg(); + /** + (Re)connect to the remote + @return `false` on success, or `true` on error + */ + bool connect(bool compress= false); + ///@deprecated MySQL 4.x and before are long EOL. + unsigned long master_version(); + MYSQL_RES_P store_result(); + static char **fetch_row(MYSQL_RES_P query_result); + static void free_result(MYSQL_RES_P query_result); + /**TODO: + merge `register_slave_on_master()` & `request_dump()` + in `sql/slave.cc` and `dump_remote_log_entries()` in + `client/mysqlbinlog.cc` with Connector/C's mariadb_rpl_open() + @return `false` on success, or `true` on error + */ + bool send_command( + int command, const unsigned char *args, size_t strlen, bool skip_check); + /// }@ + + unsigned long thread_id(); /// for Semi_sync_graceful_killer + /** @return a connector-managed string, which can be + * the next event + * an EOF packet + * an empty string, which represents error + */ + std::basic_string_view next(); + /** + @return `false` on success, or `true` on error + @note Acknowledgement comes after the caller has safely recorded the + event from next(); this method is therefore separate from next(). + */ + bool semisync_ack(const std::string_view log_name, uint64_t next_pos); + /** + Force-close the stream. It remains existing for recovery, + but all ongoing and subsequent operations will error. + @note Call this from another thread _with additional mutexing_ + */ + void abort(); +}; + + +struct Semi_sync_graceful_killer: Rpl_slave_connection +{ + Semi_sync_graceful_killer( + const Connection_options &options, uint32_t timeout); +}; diff --git a/sql-common/client.c b/sql-common/client.c index 469bfdc8ae920..c84decaa9e43b 100644 --- a/sql-common/client.c +++ b/sql-common/client.c @@ -731,9 +731,6 @@ void end_server(MYSQL *mysql) mysql->connector_fd = 0; DBUG_PRINT("info",("Net: %s", vio_description(mysql->net.vio))); -#ifdef MYSQL_SERVER - slave_io_thread_detach_vio(); -#endif vio_delete(mysql->net.vio); mysql->net.vio= 0; /* Marker */ mysql_prune_stmt_list(mysql); diff --git a/sql/CMakeLists.txt b/sql/CMakeLists.txt index e54e894e1d0fc..86b8b412cbe9f 100644 --- a/sql/CMakeLists.txt +++ b/sql/CMakeLists.txt @@ -124,7 +124,7 @@ SET (SQL_SOURCE protocol.cc records.cc repl_failsafe.cc rpl_filter.cc session_tracker.cc sql_path.cc - set_var.cc + set_var.cc ../rpl/remote_event_stream.cc slave.cc sp.cc sp_cache.cc sp_head.cc sp_pcontext.cc sp_rcontext.cc sp_cursor.cc spatial.cc sql_acl.cc sql_analyse.cc sql_base.cc diff --git a/sql/client_settings.h b/sql/client_settings.h index 93aa7f115cad5..6e2be3bf6e69f 100644 --- a/sql/client_settings.h +++ b/sql/client_settings.h @@ -42,11 +42,3 @@ #define mysql_server_init(a,b,c) mysql_client_plugin_init() #define mysql_server_end() mysql_client_plugin_deinit() -#ifdef HAVE_REPLICATION -C_MODE_START -void slave_io_thread_detach_vio(); -C_MODE_END -#else -#define slave_io_thread_detach_vio() -#endif - diff --git a/sql/rpl_mi.cc b/sql/rpl_mi.cc index ecc666c6ddd30..bdb492207b118 100644 --- a/sql/rpl_mi.cc +++ b/sql/rpl_mi.cc @@ -297,7 +297,6 @@ int init_master_info(Master_info* mi, const char* master_info_fname, DBUG_RETURN(0); } - mi->mysql=0; mi->file_id=1; fn_format(fname, master_info_fname, mysql_data_home, "", 4+32); @@ -1637,35 +1636,13 @@ bool Master_info_index::flush_all_relay_logs() DBUG_RETURN(result); } -void setup_mysql_connection_for_master(MYSQL *mysql, Master_info *mi, - uint timeout) +Remote_event_stream::Connection_options + setup_mysql_connection_for_master(Master_info *mi) { DBUG_ASSERT(mi); - DBUG_ASSERT(mi->mysql); - mysql_options(mysql, MYSQL_OPT_CONNECT_TIMEOUT, (char *) &timeout); - mysql_options(mysql, MYSQL_OPT_READ_TIMEOUT, (char *) &timeout); - -#ifdef HAVE_OPENSSL - if (mi->master_ssl) - { - mysql_ssl_set(mysql, - mi->master_ssl_key, mi->master_ssl_cert, mi->master_ssl_ca, - mi->master_ssl_capath, mi->master_ssl_cipher); - mysql_options(mysql, MYSQL_OPT_SSL_CRL, mi->master_ssl_crl); - mysql_options(mysql, MYSQL_OPT_SSL_CRLPATH, mi->master_ssl_crlpath); - mysql_options(mysql, MYSQL_OPT_SSL_VERIFY_SERVER_CERT, - &mi->master_ssl_verify_server_cert); - } - else -#endif - mysql->options.use_ssl= 0; - - /* - If server's default charset is not supported (like utf16, utf32) as client - charset, then set client charset to 'latin1' (default client charset). - */ + const char *charset_name; if (is_supported_parser_charset(default_charset_info)) - mysql_options(mysql, MYSQL_SET_CHARSET_NAME, default_charset_info->cs_name.str); + charset_name= default_charset_info->cs_name.str; else { sql_print_information("'%s' can not be used as client character set. " @@ -1673,13 +1650,26 @@ void setup_mysql_connection_for_master(MYSQL *mysql, Master_info *mi, "while connecting to master.", default_charset_info->cs_name.str, default_client_charset_info->cs_name.str); - mysql_options(mysql, MYSQL_SET_CHARSET_NAME, - default_client_charset_info->cs_name.str); + charset_name= default_client_charset_info->cs_name.str; } - - /* Set MYSQL_PLUGIN_DIR in case master asks for an external authentication plugin */ - if (opt_plugin_dir_ptr && *opt_plugin_dir_ptr) - mysql_options(mysql, MYSQL_PLUGIN_DIR, opt_plugin_dir_ptr); + return { + mi->master_host.buf, mi->master_user.buf, mi->master_password.buf, + mi->master_port, +#ifdef HAVE_OPENSSL + mi->master_ssl ? + std::optional({ + mi->master_ssl_ca, mi->master_ssl_capath, + mi->master_ssl_cert, + mi->master_ssl_crl, mi->master_ssl_crlpath, + mi->master_ssl_key, + mi->master_ssl_cipher, + mi->master_ssl_verify_server_cert + }) : +#endif + std::nullopt, + charset_name, + opt_plugin_dir_ptr + }; } #endif /* HAVE_REPLICATION */ diff --git a/sql/rpl_mi.h b/sql/rpl_mi.h index a3e9c553fe83c..7a25d278aeb63 100644 --- a/sql/rpl_mi.h +++ b/sql/rpl_mi.h @@ -25,8 +25,6 @@ #include "rpl_filter.h" #include "keycaches.h" -typedef struct st_mysql MYSQL; - /** Domain id based filter to handle DO_DOMAIN_IDS and IGNORE_DOMAIN_IDS used to set filtering on replication slave based on event's GTID domain_id. @@ -208,7 +206,6 @@ class Master_info: public Master_info_file, public Slave_reporting_capability mysql_mutex_t data_lock, run_lock, sleep_lock, start_stop_lock, start_alter_lock, start_alter_list_lock; mysql_cond_t data_cond, start_cond, stop_cond, sleep_cond; THD *io_thd; - MYSQL* mysql; uint32 file_id; /* for 3.23 load data infile */ uint mysql_version; Relay_log_info rli; @@ -462,15 +459,11 @@ uint any_slave_sql_running(bool already_locked); bool give_error_if_slave_running(bool already_lock); /* - Sets up the basic options for a MYSQL connection, mysql, to connect to the - primary server described by the Master_info parameter, mi. The timeout must - be passed explicitly, as different types of connections created by the slave - will use different values. - - Assumes mysql_init() has already been called on the mysql connection object. + Sets up the basic options for a remote connection to connect to the + primary server described by the Master_info parameter, mi. */ -void setup_mysql_connection_for_master(MYSQL *mysql, Master_info *mi, - uint timeout); +Remote_event_stream::Connection_options + setup_mysql_connection_for_master(Master_info *mi); #endif /* HAVE_REPLICATION */ #endif /* RPL_MI_H */ diff --git a/sql/semisync_slave.cc b/sql/semisync_slave.cc index 32e53dab7580d..32802750d05ac 100644 --- a/sql/semisync_slave.cc +++ b/sql/semisync_slave.cc @@ -125,7 +125,8 @@ void Repl_semi_sync_slave::slave_start(Master_info *mi) rpl_semi_sync_slave_send_ack= 0; } -void Repl_semi_sync_slave::slave_stop(Master_info *mi) +void Repl_semi_sync_slave:: + slave_stop(Master_info *mi, Remote_event_stream &mysql) { if (get_slave_enabled()) { @@ -141,7 +142,7 @@ void Repl_semi_sync_slave::slave_stop(Master_info *mi) DBUG_ASSERT(!debug_sync_set_action(mi->io_thd, STRING_WITH_LEN(act))); };); #endif - kill_connection(mi); + kill_connection(mi, mysql); } set_slave_enabled(0); @@ -158,59 +159,40 @@ void Repl_semi_sync_slave::slave_reconnect(Master_info *mi) } -void Repl_semi_sync_slave::kill_connection(Master_info *mi) +void Repl_semi_sync_slave:: + kill_connection(Master_info *mi, Remote_event_stream &mysql) { - MYSQL *mysql= mi->mysql; if (!mysql) return; char kill_buffer[30]; - MYSQL *kill_mysql = NULL; + bool ret; size_t kill_buffer_length; - kill_mysql = mysql_init(kill_mysql); - - setup_mysql_connection_for_master(kill_mysql, mi, m_kill_conn_timeout); - mysql_options(kill_mysql, MYSQL_OPT_WRITE_TIMEOUT, &m_kill_conn_timeout); + auto kill_mysql= Semi_sync_graceful_killer( + setup_mysql_connection_for_master(mi), m_kill_conn_timeout); + if (!kill_mysql) + return; - bool ret= (!mysql_real_connect(kill_mysql, mysql->host, - mysql->user, mysql->passwd,0, mysql->port, mysql->unix_socket, 0)); + ret= kill_mysql.connect(); if (DBUG_IF("semisync_slave_failed_kill") || ret) { sql_print_information("cannot connect to master to kill slave io_thread's " "connection"); - goto failed_graceful_kill; + return; } kill_buffer_length= my_snprintf(kill_buffer, 30, "KILL %lu", - mysql->thread_id); - if (mysql_real_query(kill_mysql, kill_buffer, (ulong)kill_buffer_length)) - { + mysql.thread_id()); + if (kill_mysql.real_query(kill_buffer, kill_buffer_length)) sql_print_information( "Failed to gracefully kill our active semi-sync connection with " "primary. Silently closing the connection."); - goto failed_graceful_kill; - } - -end: - mysql_close(kill_mysql); - return; - -failed_graceful_kill: - /* - If we fail to issue `KILL` on the primary to kill the active semi-sync - connection; we need to locally clean up our side of the connection. This - is because mysql_close will send COM_QUIT on the active semi-sync - connection, causing the primary to error. - */ - net_clear(&(mysql->net), 0); - end_server(mysql); - goto end; } -int Repl_semi_sync_slave::request_transmit(Master_info *mi) +int Repl_semi_sync_slave:: + request_transmit(Master_info *mi, Remote_event_stream &mysql) { - MYSQL *mysql= mi->mysql; MYSQL_RES *res= 0; MYSQL_ROW row; const char *query; @@ -219,10 +201,11 @@ int Repl_semi_sync_slave::request_transmit(Master_info *mi) return 0; query= "SHOW VARIABLES LIKE 'rpl_semi_sync_master_enabled'"; - if (mysql_real_query(mysql, query, (ulong)strlen(query)) || - !(res= mysql_store_result(mysql))) + if (mysql.real_query(query, strlen(query)) || + !(res= mysql.store_result())) { - sql_print_error("Execution failed on master: %s, error :%s", query, mysql_error(mysql)); + sql_print_error("Execution failed on master: %s, error :%s", + query, mysql.errmsg()); set_slave_enabled(0); return 1; } @@ -250,59 +233,33 @@ int Repl_semi_sync_slave::request_transmit(Master_info *mi) the master connection. */ query= "SET @rpl_semi_sync_slave= 1"; - if (mysql_real_query(mysql, query, (ulong)strlen(query))) + if (mysql.real_query(query, strlen(query))) { sql_print_error("%s on master failed", query); set_slave_enabled(0); return 1; } mi->semi_sync_reply_enabled= 1; - /* Inform net_server that pkt_nr can come out of order */ - mi->mysql->net.pkt_nr_can_be_reset= 1; - mysql_free_result(mysql_store_result(mysql)); + Remote_event_stream::free_result(mysql.store_result()); return 0; } -int Repl_semi_sync_slave::slave_reply(Master_info *mi) +bool Repl_semi_sync_slave:: + slave_reply(Master_info *mi, Remote_event_stream &mysql) { - MYSQL* mysql= mi->mysql; const char *binlog_filename= const_cast(mi->master_log_name); my_off_t binlog_filepos= mi->master_log_pos; - NET *net= &mysql->net; - uchar reply_buffer[REPLY_MAGIC_NUM_LEN - + REPLY_BINLOG_POS_LEN - + REPLY_BINLOG_NAME_LEN]; - int reply_res = 0; - size_t name_len = strlen(binlog_filename); DBUG_ENTER("Repl_semi_sync_slave::slave_reply"); DBUG_ASSERT(get_slave_enabled() && mi->semi_sync_reply_enabled); - /* Prepare the buffer of the reply. */ - reply_buffer[REPLY_MAGIC_NUM_OFFSET] = k_packet_magic_num; - int8store(reply_buffer + REPLY_BINLOG_POS_OFFSET, binlog_filepos); - memcpy(reply_buffer + REPLY_BINLOG_NAME_OFFSET, - binlog_filename, - name_len + 1 /* including trailing '\0' */); - DBUG_PRINT("semisync", ("%s: reply (%s, %lu)", "Repl_semi_sync_slave::slave_reply", binlog_filename, (ulong)binlog_filepos)); - /* - We have to do a net_clear() as with semi-sync the slave_reply's are - interleaved with data from the master and then the net->pkt_nr - cannot be kept in sync. Better to start pkt_nr from 0 again. - */ - net_clear(net, 0); - /* Send the reply. */ - reply_res = my_net_write(net, reply_buffer, - name_len + REPLY_BINLOG_NAME_OFFSET); - if (!reply_res) - { - reply_res= DBUG_IF("semislave_failed_net_flush") || net_flush(net); + bool reply_res= mysql.semisync_ack(binlog_filename, binlog_filepos) || + DBUG_IF("semislave_failed_net_flush"); if (!reply_res) rpl_semi_sync_slave_send_ack++; - } DBUG_RETURN(reply_res); } diff --git a/sql/semisync_slave.h b/sql/semisync_slave.h index 79949067de8c3..b13e442a82110 100644 --- a/sql/semisync_slave.h +++ b/sql/semisync_slave.h @@ -22,7 +22,6 @@ #include "my_global.h" #include "sql_priv.h" #include "rpl_mi.h" -#include "mysql.h" #include class Master_info; @@ -87,12 +86,12 @@ class Repl_semi_sync_slave * indicates that the slave has received all events before the specified * binlog position. */ - int slave_reply(Master_info* mi); + bool slave_reply(Master_info* mi, Remote_event_stream &mysql); void slave_start(Master_info *mi); - void slave_stop(Master_info *mi); + void slave_stop(Master_info *mi, Remote_event_stream &stream); void slave_reconnect(Master_info *mi); - int request_transmit(Master_info *mi); - void kill_connection(Master_info *mi); + int request_transmit(Master_info *mi, Remote_event_stream &stream); + void kill_connection(Master_info *mi, Remote_event_stream &stream); private: /* True when init_object has been called */ diff --git a/sql/slave.cc b/sql/slave.cc index 82d265ab327a8..e91b94a907262 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -43,7 +43,6 @@ #include "unireg.h" #include #include -#include #include #include "debug_sync.h" // debug_sync_set_action #include "sql_base.h" // close_thread_tables @@ -154,15 +153,33 @@ static bool sql_slave_killed(rpl_group_info *rgi); static int init_slave_thread(THD*, Master_info *, SLAVE_THD_TYPE); static void make_slave_skip_errors_printable(void); static void make_slave_transaction_retry_errors_printable(void); -static int safe_connect(THD* thd, MYSQL* mysql, Master_info* mi); -static int safe_reconnect(THD*, MYSQL*, Master_info*, bool); -static int connect_to_master(THD*, MYSQL*, Master_info*, bool, bool); +static int safe_connect(THD* thd, Remote_event_stream &, Master_info* mi); +static int safe_reconnect(THD* thd, Remote_event_stream &, Master_info*, bool); +static int connect_to_master(THD*, Remote_event_stream &, Master_info*, bool, bool); static Log_event* next_event(rpl_group_info* rgi, ulonglong *event_size); static int queue_event(Master_info *mi,const uchar *buf, ulong event_len); static int terminate_slave_thread(THD *, mysql_mutex_t *, mysql_cond_t *, volatile uint *, bool); static bool check_io_slave_killed(Master_info *mi, const char *info); +unsigned int server_mysql_errno(Remote_event_stream &stream) +{ return stream.errnum(); } +const char *server_mysql_error(Remote_event_stream &stream) +{ return stream.errmsg(); } +bool server_mysql_real_query + (Remote_event_stream &stream, const char *query, size_t strlen) +{ return stream.real_query(query, strlen); } +MYSQL_RES_P server_mysql_store_result(Remote_event_stream &stream) +{ return stream.store_result(); } +#define server_mysql_fetch_row(query_result) \ + Remote_event_stream::fetch_row(query_result) +#define server_mysql_free_result(query) Remote_event_stream::free_result(query); +#undef simple_command +#define simple_command(stream, command, args, strlen, skip_check) \ + stream.send_command(command, args, strlen, skip_check) +#undef packet_error +#define packet_error std::basic_string_view() + /* Function to set the slave's max_allowed_packet based on the value of slave_max_allowed_packet. @@ -171,12 +188,9 @@ static bool check_io_slave_killed(Master_info *mi, const char *info); @in_param mysql MySQL connection handle */ -static void set_slave_max_allowed_packet(THD *thd, MYSQL *mysql) +static unsigned long set_slave_max_allowed_packet(THD *thd) { DBUG_ENTER("set_slave_max_allowed_packet"); - // thd and mysql must be valid - DBUG_ASSERT(thd && mysql); - thd->variables.max_allowed_packet= slave_max_allowed_packet; thd->net.max_packet_size= slave_max_allowed_packet; /* @@ -185,14 +199,7 @@ static void set_slave_max_allowed_packet(THD *thd, MYSQL *mysql) replication event can become this much larger than the corresponding packet (query) sent from client to master. */ - thd->net.max_packet_size+= MAX_LOG_EVENT_HEADER; - /* - Skipping the setting of mysql->net.max_packet size to slave - max_allowed_packet since this is done during mysql_real_connect. - */ - mysql->options.max_allowed_packet= - slave_max_allowed_packet+MAX_LOG_EVENT_HEADER; - DBUG_VOID_RETURN; + DBUG_RETURN(thd->net.max_packet_size+= MAX_LOG_EVENT_HEADER); } /* @@ -1640,14 +1647,15 @@ bool is_network_error(uint errorno) 2 transient network problem, the caller should try to reconnect */ -static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi) +static int + get_master_version_and_clock(Remote_event_stream &mysql, Master_info* mi) { char err_buff[MAX_SLAVE_ERRMSG], err_buff2[MAX_SLAVE_ERRMSG]; const char* errmsg= 0; int err_code= 0; MYSQL_RES *master_res= 0; MYSQL_ROW master_row; - uint full_version= mysql_get_server_version(mysql); + uint full_version= mysql.master_version(); uint version= full_version/ 10000; uint32_t heartbeat_period= mi->master_heartbeat_period; DBUG_ENTER("get_master_version_and_clock"); @@ -1660,16 +1668,6 @@ static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi) mi->rli.relay_log.description_event_for_queue= 0; mi->mysql_version= full_version; - if (!my_isdigit(&my_charset_bin,*mysql->server_version)) - { - errmsg= err_buff2; - snprintf(err_buff2, sizeof(err_buff2), - "Master reported unrecognized MariaDB version: %s", - mysql->server_version); - err_code= ER_SLAVE_FATAL_ERROR; - sprintf(err_buff, ER_DEFAULT(err_code), err_buff2); - } - else { DBUG_EXECUTE_IF("mock_mariadb_primary_v5_in_get_master_version", version= 5;); @@ -1685,8 +1683,7 @@ static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi) case 4: errmsg= err_buff2; snprintf(err_buff2, sizeof(err_buff2), - "Master reported unrecognized MariaDB version: %s", - mysql->server_version); + "Master reported unrecognized MariaDB version: %u.x", version); err_code= ER_SLAVE_FATAL_ERROR; sprintf(err_buff, ER_DEFAULT(err_code), err_buff2); break; @@ -1700,7 +1697,7 @@ static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi) master is 3.23, 4.0, etc. */ mi->rli.relay_log.description_event_for_queue= new - Format_description_log_event(4, mysql->server_version, + Format_description_log_event(4, /* unused */ nullptr, mi->rli.relay_log.relay_log_checksum_alg); break; } @@ -2785,7 +2782,7 @@ static void write_ignored_events_info_to_relay_log(THD *thd, Master_info *mi) } -int register_slave_on_master(MYSQL* mysql, Master_info *mi, +int register_slave_on_master(Remote_event_stream &mysql, Master_info *mi, bool *suppress_warnings) { uchar buf[1024], *pos= buf; @@ -2840,7 +2837,7 @@ int register_slave_on_master(MYSQL* mysql, Master_info *mi, /* The master will fill in master_id */ int4store(pos, 0); pos+= 4; - if (simple_command(mysql, COM_REGISTER_SLAVE, buf, (ulong) (pos- buf), 0)) + if (simple_command(mysql, COM_REGISTER_SLAVE, buf, pos-buf, false)) { if (mysql_errno(mysql) == ER_NET_READ_INTERRUPTED) { @@ -3308,7 +3305,7 @@ static bool slave_sleep(THD *thd, time_t seconds, } -static int request_dump(THD *thd, MYSQL* mysql, Master_info* mi, +static int request_dump(Remote_event_stream &mysql, Master_info* mi, bool *suppress_warnings) { uchar buf[FN_REFLEN + 10]; @@ -3322,7 +3319,7 @@ static int request_dump(THD *thd, MYSQL* mysql, Master_info* mi, if (opt_log_slave_updates && opt_replicate_annotate_row_events) binlog_flags|= BINLOG_SEND_ANNOTATE_ROWS_EVENT; - if (repl_semisync_slave.request_transmit(mi)) + if (repl_semisync_slave.request_transmit(mi, mysql)) DBUG_RETURN(1); // TODO if big log files: Change next to int8store() @@ -3367,13 +3364,13 @@ static int request_dump(THD *thd, MYSQL* mysql, Master_info* mi, RETURN VALUES 'packet_error' Error - number Length of packet + std::string_view Packet */ -static ulong read_event(MYSQL* mysql, Master_info *mi, bool* suppress_warnings, - ulong* network_read_len) +static std::basic_string_view read_event( + Remote_event_stream &mysql, Master_info *mi, + bool* suppress_warnings, ulong* network_read_len) { - ulong len; DBUG_ENTER("read_event"); *suppress_warnings= FALSE; @@ -3386,8 +3383,15 @@ static ulong read_event(MYSQL* mysql, Master_info *mi, bool* suppress_warnings, DBUG_RETURN(packet_error); #endif - len = cli_safe_read_reallen(mysql, network_read_len); - if (unlikely(len == packet_error || (long) len < 1)) + std::basic_string_view packet= mysql.next(); + size_t len= packet.size(); + /*FIXME: + In `@@slave_compressed_protocol`, `@@read_binlog_speed_limit` should + measure pre-decompress (network) byte count, not post-decompress count, + but this statistic is not exported by Connector/C. + */ + *network_read_len= len; + if (unlikely(!len)) { if (mysql_errno(mysql) == ER_NET_READ_INTERRUPTED) { @@ -3411,7 +3415,7 @@ static ulong read_event(MYSQL* mysql, Master_info *mi, bool* suppress_warnings, } /* Check if eof packet */ - if (len < 8 && mysql->net.read_pos[0] == 254) + if (len < 8 && packet[0] == 254) { sql_print_information("Slave: received end packet from server, apparent " "master shutdown: %s", @@ -3419,9 +3423,9 @@ static ulong read_event(MYSQL* mysql, Master_info *mi, bool* suppress_warnings, DBUG_RETURN(packet_error); } - DBUG_PRINT("exit", ("len: %lu net->read_pos[4]: %d", - len, mysql->net.read_pos[4])); - DBUG_RETURN(len - 1); + DBUG_PRINT("exit", ("len: %zu net->read_pos[4]: %d", + len, packet[4])); + DBUG_RETURN(packet); } @@ -4425,7 +4429,7 @@ static bool check_io_slave_killed(Master_info *mi, const char *info) @retval 1 There was an error. */ -static int try_to_reconnect(THD *thd, MYSQL *mysql, Master_info *mi, +static int try_to_reconnect(THD *thd, Remote_event_stream &mysql, Master_info *mi, bool suppress_warnings, const char *messages[SLAVE_RECON_MSG_MAX]) { @@ -4433,7 +4437,6 @@ static int try_to_reconnect(THD *thd, MYSQL *mysql, Master_info *mi, #ifdef SIGNAL_WITH_VIO_CLOSE thd->clear_active_vio(); #endif - end_server(mysql); thd->proc_info = messages[SLAVE_RECON_MSG_AFTER]; if (!suppress_warnings) { @@ -4490,7 +4493,6 @@ static int try_to_reconnect(THD *thd, MYSQL *mysql, Master_info *mi, pthread_handler_t handle_slave_io(void *arg) { THD *thd; // needs to be first for thread_stack - MYSQL *mysql; Master_info *mi = (Master_info*)arg; Relay_log_info *rli= &mi->rli; bool suppress_warnings; @@ -4505,7 +4507,6 @@ pthread_handler_t handle_slave_io(void *arg) DBUG_ENTER("handle_slave_io"); DBUG_ASSERT(mi->inited); - mysql= NULL ; thd= new THD(next_thread_id()); // note that contructor of THD uses DBUG_ ! @@ -4546,6 +4547,11 @@ pthread_handler_t handle_slave_io(void *arg) DBUG_PRINT("master_info",("log_file_name: '%s' position: %llu", mi->master_log_name, mi->master_log_pos)); +{ + Remote_event_stream mysql( + setup_mysql_connection_for_master(mi), + slave_net_timeout, + set_slave_max_allowed_packet(thd)); /* Load the set of seen GTIDs, if we did not already. */ if (rpl_load_gtid_slave_state(thd)) @@ -4566,7 +4572,7 @@ pthread_handler_t handle_slave_io(void *arg) thd->variables.wsrep_on= 0; repl_semisync_slave.slave_start(mi); - if (!(mi->mysql = mysql = mysql_init(NULL))) + if (!mysql) { mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL, ER_THD(thd, ER_SLAVE_FATAL_ERROR), "error in mysql_init()"); @@ -4638,7 +4644,6 @@ pthread_handler_t handle_slave_io(void *arg) mi->slave_running= MYSQL_SLAVE_RUN_CONNECT; mysql_mutex_unlock(&mi->run_lock); - thd->slave_net = &mysql->net; THD_STAGE_INFO(thd, stage_checking_master_version); ret= get_master_version_and_clock(mysql, mi); if (ret == 1) @@ -4695,7 +4700,7 @@ pthread_handler_t handle_slave_io(void *arg) const uchar *event_buf; THD_STAGE_INFO(thd, stage_requesting_binlog_dump); - if (request_dump(thd, mysql, mi, &suppress_warnings)) + if (request_dump(mysql, mi, &suppress_warnings)) { sql_print_error("Failed on request_dump()"); if (check_io_slave_killed(mi, NullS) || @@ -4729,11 +4734,13 @@ pthread_handler_t handle_slave_io(void *arg) DBUG_SET("-d,pause_before_io_read_event"); };); #endif - event_len= read_event(mysql, mi, &suppress_warnings, &network_read_len); + std::basic_string_view packet= + read_event(mysql, mi, &suppress_warnings, &network_read_len); + event_len= packet.size(); if (check_io_slave_killed(mi, NullS)) goto err; - if (unlikely(event_len == packet_error)) + if (unlikely(!event_len)) { uint mysql_error_number= mysql_errno(mysql); switch (mysql_error_number) { @@ -4762,14 +4769,14 @@ Stopping slave I/O thread due to out-of-memory error from master"); reconnect_messages[SLAVE_RECON_ACT_EVENT])) goto err; goto connected; - } // if (event_len == packet_error) + } // if (!event_len) thd->set_time_for_next_stage(); THD_STAGE_INFO(thd, stage_queueing_master_event_to_the_relay_log); - event_buf= mysql->net.read_pos + 1; + event_buf= packet.data(); mi->semi_ack= 0; if (repl_semisync_slave. - slave_read_sync_header((const uchar*) mysql->net.read_pos + 1, + slave_read_sync_header(event_buf, event_len, &(mi->semi_ack), &event_buf, &event_len)) { @@ -4842,7 +4849,7 @@ Stopping slave I/O thread due to out-of-memory error from master"); !debug_sync_set_action(current_thd, STRING_WITH_LEN(act))); };); #endif - if (repl_semisync_slave.slave_reply(mi)) + if (repl_semisync_slave.slave_reply(mi, mysql)) { /* Master is not responding (gone away?) or it has turned semi sync @@ -4861,8 +4868,8 @@ Stopping slave I/O thread due to out-of-memory error from master"); sql_print_error("Master server does not read semi-sync messages " "last_error: %s (%d). " "Fallback to asynchronous replication", - mi->mysql->net.last_error, - mi->mysql->net.last_errno); + mysql_error(mysql), + mysql_errno(mysql)); mi->semi_sync_reply_enabled= 0; } } @@ -4932,7 +4939,7 @@ log space"); IO_RPL_LOG_NAME, mi->master_log_pos, tmp.c_ptr_safe(), mi->host, mi->port); } - repl_semisync_slave.slave_stop(mi); + repl_semisync_slave.slave_stop(mi, mysql); thd->reset_query(); thd->reset_db(&null_clex_str); if (mysql) @@ -4941,16 +4948,15 @@ log space"); Here we need to clear the active VIO before closing the connection with the master. The reason is that THD::awake() might be called from terminate_slave_thread() because somebody - issued a STOP SLAVE. If that happends, the close_active_vio() + issued a STOP SLAVE. If that happends, Remote_event_stream::abort() can be called in the middle of closing the VIO associated with the 'mysql' object, causing a crash. */ #ifdef SIGNAL_WITH_VIO_CLOSE thd->clear_active_vio(); #endif - mysql_close(mysql); - mi->mysql=0; } +} write_ignored_events_info_to_relay_log(thd, mi); if (mi->using_gtid != Master_info::USE_GTID_NO) flush_master_info(mi, TRUE, TRUE); @@ -6939,30 +6945,6 @@ void end_relay_log_info(Relay_log_info* rli) } -/** - Hook to detach the active VIO before closing a connection handle. - - The client API might close the connection (and associated data) - in case it encounters a unrecoverable (network) error. This hook - is called from the client code before the VIO handle is deleted - allows the thread to detach the active vio so it does not point - to freed memory. - - Other calls to THD::clear_active_vio throughout this module are - redundant due to the hook but are left in place for illustrative - purposes. -*/ - -extern "C" void slave_io_thread_detach_vio() -{ -#ifdef SIGNAL_WITH_VIO_CLOSE - THD *thd= current_thd; - if (thd && thd->slave_thread) - thd->clear_active_vio(); -#endif -} - - /* Try to connect until successful or slave killed @@ -6977,7 +6959,7 @@ extern "C" void slave_io_thread_detach_vio() # Error */ -static int safe_connect(THD* thd, MYSQL* mysql, Master_info* mi) +static int safe_connect(THD* thd, Remote_event_stream &mysql, Master_info* mi) { DBUG_ENTER("safe_connect"); @@ -6995,25 +6977,17 @@ static int safe_connect(THD* thd, MYSQL* mysql, Master_info* mi) whether this connection is a new first-time or reconnects an existing one @return errno: 1 if error or 0 if successful */ -static int connect_to_master(THD* thd, MYSQL* mysql, Master_info* mi, +static int connect_to_master(THD* thd, Remote_event_stream &mysql, Master_info* mi, bool reconnect, bool suppress_warnings) { int slave_was_killed; unsigned int last_errno= 0; // initialize with not-error mi->connects_tried= 0; // reset retry counter DBUG_EXECUTE_IF("set_slave_err_count_near_overflow", mi->connects_tried = ULONG_MAX - 2;); - my_bool my_true= 1; DBUG_ENTER("connect_to_master"); - set_slave_max_allowed_packet(thd, mysql); #ifndef DBUG_OFF mi->events_till_disconnect = disconnect_slave_event_count; #endif - ulong client_flag= CLIENT_REMEMBER_OPTIONS; - if (opt_slave_compressed_protocol) - client_flag|= CLIENT_COMPRESS; /* We will use compression */ - - setup_mysql_connection_for_master(mi->mysql, mi, slave_net_timeout); - mysql_options(mysql, MYSQL_OPT_USE_THREAD_SPECIFIC_MEMORY, &my_true); /* we disallow empty users */ if (mi->user[0] == 0) @@ -7026,9 +7000,7 @@ static int connect_to_master(THD* thd, MYSQL* mysql, Master_info* mi, DBUG_RETURN(1); } while (!(slave_was_killed= io_slave_killed(mi)) && - (reconnect ? mysql_reconnect(mysql) : - !mysql_real_connect(mysql, mi->host, mi->user, mi->password, 0, - mi->port, 0, client_flag))) + (mysql.connect(opt_slave_compressed_protocol))) { /* Don't repeat last error and don't report killed error */ if (mysql_errno(mysql) != last_errno && !io_slave_killed(mi)) @@ -7080,10 +7052,9 @@ static int connect_to_master(THD* thd, MYSQL* mysql, Master_info* mi, } ++(mi->connects_tried); // count the final success in addition to failures #ifdef SIGNAL_WITH_VIO_CLOSE - thd->set_active_vio(mysql->net.vio); + thd->set_active_vio(&mysql); #endif } - mysql->reconnect= 1; DBUG_PRINT("exit",("slave_was_killed: %d", slave_was_killed)); DBUG_RETURN(slave_was_killed); } @@ -7097,8 +7068,8 @@ static int connect_to_master(THD* thd, MYSQL* mysql, Master_info* mi, mi->retry_count times */ -static int safe_reconnect(THD* thd, MYSQL* mysql, Master_info* mi, - bool suppress_warnings) +static int safe_reconnect(THD* thd, Remote_event_stream &mysql, + Master_info* mi, bool suppress_warnings) { DBUG_ENTER("safe_reconnect"); DBUG_RETURN(connect_to_master(thd, mysql, mi, 1, suppress_warnings)); diff --git a/sql/slave.h b/sql/slave.h index 6b22048a226f0..2b23ac45fa63e 100644 --- a/sql/slave.h +++ b/sql/slave.h @@ -175,7 +175,6 @@ int init_slave(); int init_recovery(Master_info* mi, const char** errmsg); bool init_slave_skip_errors(const char* arg); bool init_slave_transaction_retry_errors(const char* arg); -int register_slave_on_master(MYSQL* mysql); int terminate_slave_threads(Master_info* mi, int thread_mask, bool skip_lock = 0); int start_slave_threads(THD *thd, @@ -204,10 +203,6 @@ int start_slave_thread( int mysql_table_dump(THD* thd, const char* db, const char* tbl_name, int fd = -1); -/* retrieve table from master and copy to slave*/ -int fetch_master_table(THD* thd, const char* db_name, const char* table_name, - Master_info* mi, MYSQL* mysql, bool overwrite); - void show_master_info_get_fields(THD *thd, List *field_list, bool full, size_t gtid_pos_length); bool show_master_info(THD* thd, Master_info* mi, bool full); diff --git a/sql/sql_class.h b/sql/sql_class.h index 03394f46307c0..d4eb005c89912 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -74,6 +74,7 @@ void set_thd_stage_info(void *thd, #include "my_apc.h" #include "rpl_gtid_base.h" +#include "../rpl/remote_event_stream.h" #include "wsrep.h" #include "wsrep_on.h" @@ -4350,7 +4351,11 @@ class THD: public THD_count, /* this must be first */ /** number of name_const() substitutions, see sp_head.cc:subst_spvars() */ uint query_name_consts; - NET* slave_net; // network connection from slave -> m. + /** + network connection from slave -> master, + for Remote_event_stream::abort() callback (requires @ref LOCK_thd_data) + */ + Remote_event_stream *slave_net; /* Used to update global user stats. The global user stats are updated @@ -4479,10 +4484,17 @@ class THD: public THD_count, /* this must be first */ active_vio = vio; mysql_mutex_unlock(&LOCK_thd_data); } + void set_active_vio(Remote_event_stream *stream) + { + mysql_mutex_lock(&LOCK_thd_data); + slave_net= stream; + mysql_mutex_unlock(&LOCK_thd_data); + } inline void clear_active_vio() { mysql_mutex_lock(&LOCK_thd_data); active_vio = 0; + slave_net= nullptr; mysql_mutex_unlock(&LOCK_thd_data); } void close_active_vio();