Semisync 详解
Semisync 是 MySQL 的自带插件,用于实现半同步功能
源码解析
class Trace
所在文件: semisync.h
这个类就是简单的打印进入函数和退出函数的事件,允许设置 trace level
成员函数
void function_enter(const char *func_name)int function_exit(const char *func_name, int exit_code)bool function_exit(const char *func_name, bool exit_code)void function_exit(const char *func_name)
增量知识
- MySQL api
sql_print_information(format, args...)用于打印日志
class ReplSemiSyncBase
所在文件: semisync.h 继承自: Trace
这个类扩展了三个成员变量,用于主从通信时的协议标记
const unsigned char ReplSemiSyncBase::kPacketMagicNum = 0xef;
const unsigned char ReplSemiSyncBase::kPacketFlagSync = 0x01;
const unsigned char ReplSemiSyncBase::kSyncHeader[2] =
{ReplSemiSyncBase::kPacketMagicNum, 0};
class ReplSemiSyncSlave
所在文件: semisync_slave.h 继承自: ReplSemiSyncBase
从库类要比主库类简单的多: 首先,从逻辑上,一个 MySQL 实例可以是多个从库的主库,但只能是一个主库的从库,也就是说从库类是单例的; 也因此,主库要处理多个从库 ack 的返回逻辑,而从库只需要处理单一 ack 逻辑。
成员函数
-
int initObject()初始化 trace level, slave enabled 等信息 -
int slaveReadSyncHeader(const char *header, unsigned long total_len, bool *need_reply, const char **payload, unsigned long *payload_len)处理 master 传来的数据,注意注释里的参数描述有点问题, 其中need_reply,payload,payload_len三个参数应该是[OUT],为解析后的结果。 如果 header 的第一个字节不是 MagicNumber,那么半同步插件不会处理这个数据包,否则,读取total_len - 2的长度的 payload。 header 的第二个字节是 flag,如果标记了 kPacketFlagSync,表明数据包需要回复int ReplSemiSyncSlave::slaveReadSyncHeader(const char *header, unsigned long total_len, bool *need_reply, const char **payload, unsigned long *payload_len) { const char *kWho = "ReplSemiSyncSlave::slaveReadSyncHeader"; int read_res = 0; function_enter(kWho); if ((unsigned char)(header[0]) == kPacketMagicNum) { *need_reply = (header[1] & kPacketFlagSync); *payload_len = total_len - 2; *payload = header + 2; if (trace_level_ & kTraceDetail) sql_print_information("%s: reply - %d", kWho, *need_reply); } else { sql_print_error("Missing magic number for semi-sync packet, packet " "len: %lu", total_len); read_res = -1; } return function_exit(kWho, read_res); } -
int slaveReply(MYSQL *mysql, const char *binlog_filename, my_off_t binlog_filepos)对于第一个参数,主要使用其 net 属性,这是一个和 master 实例的连接。 后两个参数binlog_filename和binlog_filepos用于构造相应的数据包。 数据包的格式为|kPacketMagicNum(1byte)|binlog_filepos(8byte)|binlog_filename|。 这个函数会把构造好的数据包发给 master。 -
int slaveStart(Binlog_relay_IO_param *param) -
int slaveStop(Binlog_relay_IO_param *param)这两个函数主要是为了维护rpl_semi_sync_slave_status状态
增量知识
- 开发插件时,如果 include 了
mysql.h且需要使用extern "C"可以直接用C_MODE_START和C_MODE_END
struct Slave
所在文件: semisync_master_ack_receiver.h
成员变量:
uint32_t thread_id对应的 session id,仅用于记录VIO vio和从库的连接uint server_id从库的 idbool net_compress
这个结构体的大部分字段是为了 show slave info 时的信息展示,只有 vio 字段是为了和从库通信
class Pollsocketlistener 和 class Selectsocketlistener
所在文件: semisync_master_socket_listener.h
对于支持 poll 的环境,使用 Pollsocketlistener,否则用
Selectsocketlistener, 两者实现了相同的接口。
顾名思义,两个函数就是用来 poll / select 多个 fds 的
成员函数
bool init_slave_sockets(Slave_vector slaves)根据 slaves 初始化 fd 等信息bool listen_on_sockets()等待 fds 上的出现事件,注意,最长只等待 1sbool is_socket_active(int index)用于收到事件后判断某个 fd(用 index 索引)是否收到事件void clear_socket_info(int index)清除指定 fd 收到事件的标记uint number_of_slave_sockets()查询在监听的 fd 数量Slave get_slave_obj(int index)根据索引获取 slave 对象
class Ackreceiver
所在文件: semisync_master_ack_receiver.h 继承自: ReplSemiSyncBase
首先不得不吐槽一句,这种类名的命名方式真是不容易接受啊!
这个类是处理 ack 逻辑信息的核心类
成员变量
uint8 m_status有ST_UP,ST_DOWN,ST_STOPPING三种状态Slave_vector m_slaves一个 slave vectorbool m_slaves_changed每次 slave vector 有所变化都会设置这个标记mysql_mutex_t m_mutex上面三个变量的修改都应在锁内完成,因为处理 slave 增删和 ack 是在不同线程中完成的mysql_cond_t m_condmaster 主线程在等待 slave 的变化时会等在这个 cond 上my_thread_handle m_pid启动的线程的 pid
成员函数
Ack_receiver()初始化状态、mutex、cond等~Ack_receiver()停止线程并销毁 mutex、cond 等bool add_slave(THD *thd)在锁内处理 mslaves, mslaveschanged 等状态void remove_slave(THD *thd)同上,但是要注意的是 mslaves 是 vector,所以是列表遍历操作,根据 threadid 对比void wait_for_slave_connection()私有方法,实际就是等在 cond 上,会在 slave 增删时唤醒bool start()修改 mstatus 状态,启动 ack 线程bool stop()根据 mstatus 状态,关闭 ack 线程,并重置状态void run()ack 线程核心运行逻辑,也非常简单: 如果 mslaves 为空,就等在 cond 上; 如果不为空,就复制一份给 listener 监听 fd,再处理有事件的 fd,读出来 pos 信息提交给 master 对象。
struct TranxNode
所在文件: semisync_master.h
这个结构体是记录事务的链表/哈希表结点
成员变量
char log_name_[FN_REFLEN],my_off_t log_pos_没什么好说的mysql_cond_t cond,int n_waitersTODO: 这俩是如何维护的?struct TranxNode *next_链表指针struct TranxNode *hash_next_哈希表指针
class TranxNodeAllocator
所在文件: semisync_master.h
这个类就是一个无锁 Queue 的实现,多个(默认16个)元素结点组成的一段连续内存称为 block, 此类维护了一个 block 链表,支持申请结点,释放某个结点之前的所有结点(实际操作的是block)等操作, 明显减少了内存的申请释放等系统调用。
成员函数
TranxNode *allocate_node()申请结点int free_nodes_before(TranxNode* node)释放 node 之前的所有结点int free_all_nodes()释放所有结点
class ActiveTranx
所在文件: semisync_master.h
管理 active transactions,每个二元组 (filename, position)
会对应一个TranxNode,由这个类进行管理。
成员变量
TranxNode *trx_front_, *trx_rear_维护了一个 TranxNode 的有序列表TranxNode **trx_htb_维护了一个 TranxNode 的哈希表,用于快速判定结点的存在性int num_entries_哈希表最大节点数,由于除了 session 还会有定时任务,所以这个数字大于最大连接数,实际取 最大连接数 * 2
成员函数
int signal_waiting_sessions_all()遍历链表上的所有结点,并唤醒等待者int signal_waiting_sessions_up_to(const char *log_file_name, my_off_t log_file_pos)遍历这个有序列表,唤醒此位置前(包含)的等待者TranxNode* find_active_tranx_node(const char *log_file_name, my_off_t log_file_pos)遍历这个有序列表,找到地位置前(包含)的第一个结点int insert_tranx_node(const char *log_file_name, my_off_t log_file_pos)创建 TranxNode 结点并插入到链表和哈希表int clear_active_tranx_nodes(const char *log_file_name, my_off_t log_file_pos)清理这个位置之前(不包含)的 TranxNode, 注意,如果有结点上还有等待者,那么不会继续清理bool is_tranx_end_pos(const char *log_file_name, my_off_t log_file_pos)判断二元组的存在性,直接在哈希表中查询即可
struct AckInfo
所在文件: semisync_master.h
就是一个简单的 Ack 相关信息的封装类,允许修改复用
成员变量
int server_idchar binlog_name[FN_REFLEN]unsigned long long binlog_pos
class AckContainer
所在文件: semisync_master.h 继承自: Trace
这个类记录和维护了所有从库的 ack 进度
成员变量
AckInfo m_greatest_ack最=小=的 ack 进度,注意,这里和变量名的含义不一致!AckInfo *m_ack_array存储所有 ack 进度的数组unsigned int m_sizeack 进度数组的大小,注意,这里m_size = 真正的size - 1,最后一个到达时直接把最小值 pop 出去,所以不需要给它留位置。unsigned int m_empty_slot一个缓存值,记录了一个空闲元素地址; 注意,如果这个值等于 msize 的时候可能并不精确,即数组中仍然可能存在空元素,这是 update 过程导致的。
成员函数
unsigned int updateIfExist(int server_id, const char *log_file_name, my_off_t log_file_pos)根据输入参数更新 array,返回被更新的元素的 index; 这一过程中也会尝试更新 memptyslot,但由于 AckInfo 更新会截断遍历,memptyslot 的值并不精确const AckInfo* insert(int server_id, const char *log_file_name, my_off_t log_file_pos)这是唯一一个关键逻辑函数了,如果有满足要求的 ack 信息,会直接report给 semisyncmaster 实例。 如果新到的 ack 进度比 mgreatestack 记录的还小或相等,那么没有记录价值,丢掉就好了(包括初始化时的边界情况); 尝试更新 array,如果更新成功了肯定也不会触发 ack 进度最小值的变化,直接返回 NULL 了; 如果没更新成功,说明这是可能最后一块拼读; 先检查下有没有空白值,如果没有,说明这就是最后一块拼图了,计算所有进度里面的最小值,有必要的话从数组里删除,更新到 mgreatestack 并返回 当然,如果有空白值或最小值在数组里,还要把对应位置设置为新收到的 ack 进度。
class ReplSemiSyncMaster
所在文件: semisync_master.h 继承自: ReplSemiSyncBase
好像还有很多逻辑,又好像没有什么逻辑只是一些简单的状态处理了
先到这吧\~