跳转至

Semisync 详解

Semisync 是 MySQL 的自带插件,用于实现半同步功能

源码解析

class Trace

所在文件: semisync.h

这个类就是简单的打印进入函数和退出函数的事件,允许设置 trace level

成员函数

  • void function_enter(const char *func_name)
  • int function_exit(const char *func_name, int exit_code)
  • bool function_exit(const char *func_name, bool exit_code)
  • void function_exit(const char *func_name)

增量知识

  • MySQL api sql_print_information(format, args...) 用于打印日志

class ReplSemiSyncBase

所在文件: semisync.h 继承自: Trace

这个类扩展了三个成员变量,用于主从通信时的协议标记

const unsigned char ReplSemiSyncBase::kPacketMagicNum = 0xef;
const unsigned char ReplSemiSyncBase::kPacketFlagSync = 0x01;
const unsigned char ReplSemiSyncBase::kSyncHeader[2] =
  {ReplSemiSyncBase::kPacketMagicNum, 0};

class ReplSemiSyncSlave

所在文件: semisync_slave.h 继承自: ReplSemiSyncBase

从库类要比主库类简单的多: 首先,从逻辑上,一个 MySQL 实例可以是多个从库的主库,但只能是一个主库的从库,也就是说从库类是单例的; 也因此,主库要处理多个从库 ack 的返回逻辑,而从库只需要处理单一 ack 逻辑。

成员函数

  • int initObject() 初始化 trace level, slave enabled 等信息

  • int slaveReadSyncHeader(const char *header, unsigned long total_len, bool *need_reply, const char **payload, unsigned long *payload_len) 处理 master 传来的数据,注意注释里的参数描述有点问题, 其中 need_reply, payload, payload_len 三个参数应该是 [OUT] ,为解析后的结果。 如果 header 的第一个字节不是 MagicNumber,那么半同步插件不会处理这个数据包,否则,读取 total_len - 2 的长度的 payload。 header 的第二个字节是 flag,如果标记了 kPacketFlagSync,表明数据包需要回复

    int ReplSemiSyncSlave::slaveReadSyncHeader(const char *header,
                                          unsigned long total_len,
                                          bool  *need_reply,
                                          const char **payload,
                                          unsigned long *payload_len)
    {
      const char *kWho = "ReplSemiSyncSlave::slaveReadSyncHeader";
      int read_res = 0;
      function_enter(kWho);
    
      if ((unsigned char)(header[0]) == kPacketMagicNum)
      {
        *need_reply  = (header[1] & kPacketFlagSync);
        *payload_len = total_len - 2;
        *payload     = header + 2;
    
        if (trace_level_ & kTraceDetail)
          sql_print_information("%s: reply - %d", kWho, *need_reply);
      }
      else
      {
        sql_print_error("Missing magic number for semi-sync packet, packet "
                        "len: %lu", total_len);
        read_res = -1;
      }
    
      return function_exit(kWho, read_res);
    }
    
  • int slaveReply(MYSQL *mysql, const char *binlog_filename, my_off_t binlog_filepos) 对于第一个参数,主要使用其 net 属性,这是一个和 master 实例的连接。 后两个参数 binlog_filenamebinlog_filepos 用于构造相应的数据包。 数据包的格式为 |kPacketMagicNum(1byte)|binlog_filepos(8byte)|binlog_filename| 。 这个函数会把构造好的数据包发给 master。

  • int slaveStart(Binlog_relay_IO_param *param)

  • int slaveStop(Binlog_relay_IO_param *param) 这两个函数主要是为了维护 rpl_semi_sync_slave_status 状态

增量知识

  • 开发插件时,如果 include 了 mysql.h 且需要使用 extern "C" 可以直接用 C_MODE_STARTC_MODE_END

struct Slave

所在文件: semisync_master_ack_receiver.h

成员变量:

  • uint32_t thread_id 对应的 session id,仅用于记录
  • VIO vio 和从库的连接
  • uint server_id 从库的 id
  • bool net_compress

这个结构体的大部分字段是为了 show slave info 时的信息展示,只有 vio 字段是为了和从库通信

class Pollsocketlistener 和 class Selectsocketlistener

所在文件: semisync_master_socket_listener.h

对于支持 poll 的环境,使用 Pollsocketlistener,否则用 Selectsocketlistener, 两者实现了相同的接口。

顾名思义,两个函数就是用来 poll / select 多个 fds 的

成员函数

  • bool init_slave_sockets(Slave_vector slaves) 根据 slaves 初始化 fd 等信息
  • bool listen_on_sockets() 等待 fds 上的出现事件,注意,最长只等待 1s
  • bool is_socket_active(int index) 用于收到事件后判断某个 fd(用 index 索引)是否收到事件
  • void clear_socket_info(int index) 清除指定 fd 收到事件的标记
  • uint number_of_slave_sockets() 查询在监听的 fd 数量
  • Slave get_slave_obj(int index) 根据索引获取 slave 对象

class Ackreceiver

所在文件: semisync_master_ack_receiver.h 继承自: ReplSemiSyncBase

首先不得不吐槽一句,这种类名的命名方式真是不容易接受啊!

这个类是处理 ack 逻辑信息的核心类

成员变量

  • uint8 m_statusST_UP, ST_DOWN, ST_STOPPING 三种状态
  • Slave_vector m_slaves 一个 slave vector
  • bool m_slaves_changed 每次 slave vector 有所变化都会设置这个标记
  • mysql_mutex_t m_mutex 上面三个变量的修改都应在锁内完成,因为处理 slave 增删和 ack 是在不同线程中完成的
  • mysql_cond_t m_cond master 主线程在等待 slave 的变化时会等在这个 cond 上
  • my_thread_handle m_pid 启动的线程的 pid

成员函数

  • Ack_receiver() 初始化状态、mutex、cond等
  • ~Ack_receiver() 停止线程并销毁 mutex、cond 等
  • bool add_slave(THD *thd) 在锁内处理 mslaves, mslaveschanged 等状态
  • void remove_slave(THD *thd) 同上,但是要注意的是 mslaves 是 vector,所以是列表遍历操作,根据 threadid 对比
  • void wait_for_slave_connection() 私有方法,实际就是等在 cond 上,会在 slave 增删时唤醒
  • bool start() 修改 mstatus 状态,启动 ack 线程
  • bool stop() 根据 mstatus 状态,关闭 ack 线程,并重置状态
  • void run() ack 线程核心运行逻辑,也非常简单: 如果 mslaves 为空,就等在 cond 上; 如果不为空,就复制一份给 listener 监听 fd,再处理有事件的 fd,读出来 pos 信息提交给 master 对象。

struct TranxNode

所在文件: semisync_master.h

这个结构体是记录事务的链表/哈希表结点

成员变量

  • char log_name_[FN_REFLEN], my_off_t log_pos_ 没什么好说的
  • mysql_cond_t cond, int n_waiters TODO: 这俩是如何维护的?
  • struct TranxNode *next_ 链表指针
  • struct TranxNode *hash_next_ 哈希表指针

class TranxNodeAllocator

所在文件: semisync_master.h

这个类就是一个无锁 Queue 的实现,多个(默认16个)元素结点组成的一段连续内存称为 block, 此类维护了一个 block 链表,支持申请结点,释放某个结点之前的所有结点(实际操作的是block)等操作, 明显减少了内存的申请释放等系统调用。

成员函数

  • TranxNode *allocate_node() 申请结点
  • int free_nodes_before(TranxNode* node) 释放 node 之前的所有结点
  • int free_all_nodes() 释放所有结点

class ActiveTranx

所在文件: semisync_master.h

管理 active transactions,每个二元组 (filename, position) 会对应一个TranxNode,由这个类进行管理。

成员变量

  • TranxNode *trx_front_, *trx_rear_ 维护了一个 TranxNode 的有序列表
  • TranxNode **trx_htb_ 维护了一个 TranxNode 的哈希表,用于快速判定结点的存在性
  • int num_entries_ 哈希表最大节点数,由于除了 session 还会有定时任务,所以这个数字大于最大连接数,实际取 最大连接数 * 2

成员函数

  • int signal_waiting_sessions_all() 遍历链表上的所有结点,并唤醒等待者
  • int signal_waiting_sessions_up_to(const char *log_file_name, my_off_t log_file_pos) 遍历这个有序列表,唤醒此位置前(包含)的等待者
  • TranxNode* find_active_tranx_node(const char *log_file_name, my_off_t log_file_pos) 遍历这个有序列表,找到地位置前(包含)的第一个结点
  • int insert_tranx_node(const char *log_file_name, my_off_t log_file_pos) 创建 TranxNode 结点并插入到链表和哈希表
  • int clear_active_tranx_nodes(const char *log_file_name, my_off_t log_file_pos) 清理这个位置之前(不包含)的 TranxNode, 注意,如果有结点上还有等待者,那么不会继续清理
  • bool is_tranx_end_pos(const char *log_file_name, my_off_t log_file_pos) 判断二元组的存在性,直接在哈希表中查询即可

struct AckInfo

所在文件: semisync_master.h

就是一个简单的 Ack 相关信息的封装类,允许修改复用

成员变量

  • int server_id
  • char binlog_name[FN_REFLEN]
  • unsigned long long binlog_pos

class AckContainer

所在文件: semisync_master.h 继承自: Trace

这个类记录和维护了所有从库的 ack 进度

成员变量

  • AckInfo m_greatest_ack=小= 的 ack 进度,注意,这里和变量名的含义不一致!
  • AckInfo *m_ack_array 存储所有 ack 进度的数组
  • unsigned int m_size ack 进度数组的大小,注意,这里 m_size = 真正的size - 1 ,最后一个到达时直接把最小值 pop 出去,所以不需要给它留位置。
  • unsigned int m_empty_slot 一个缓存值,记录了一个空闲元素地址; 注意,如果这个值等于 msize 的时候可能并不精确,即数组中仍然可能存在空元素,这是 update 过程导致的。

成员函数

  • unsigned int updateIfExist(int server_id, const char *log_file_name, my_off_t log_file_pos) 根据输入参数更新 array,返回被更新的元素的 index; 这一过程中也会尝试更新 memptyslot,但由于 AckInfo 更新会截断遍历,memptyslot 的值并不精确
  • const AckInfo* insert(int server_id, const char *log_file_name, my_off_t log_file_pos) 这是唯一一个关键逻辑函数了,如果有满足要求的 ack 信息,会直接 report 给 semisyncmaster 实例。 如果新到的 ack 进度比 mgreatestack 记录的还小或相等,那么没有记录价值,丢掉就好了(包括初始化时的边界情况); 尝试更新 array,如果更新成功了肯定也不会触发 ack 进度最小值的变化,直接返回 NULL 了; 如果没更新成功,说明这是可能最后一块拼读; 先检查下有没有空白值,如果没有,说明这就是最后一块拼图了,计算所有进度里面的最小值,有必要的话从数组里删除,更新到 mgreatestack 并返回 当然,如果有空白值或最小值在数组里,还要把对应位置设置为新收到的 ack 进度。

class ReplSemiSyncMaster

所在文件: semisync_master.h 继承自: ReplSemiSyncBase

好像还有很多逻辑,又好像没有什么逻辑只是一些简单的状态处理了

先到这吧\~