-- 创建消息表,用于记录变化,同步到私有库里 drop table if exists sync$message; create table sync$message( me_id bigint not null primary key auto_increment, me_table_name varchar(64) not null, me_method_type varchar(6) not null, me_data_key text not null, me_data text, me_priority int default 0, me_retry_count int default 0, me_source_app varchar(64), me_batch_code varchar(64), me_batch_size int, me_batch_detno int, me_create_time datetime not null ); -- 创建消息历史表 drop table if exists sync$message_history; create table sync$message_history( mh_id bigint not null primary key auto_increment, mh_dequeue_time datetime not null, me_id bigint not null, me_table_name varchar(64) not null, me_method_type varchar(6) not null, me_data_key text not null, me_data text, me_priority int default 0, me_source_app varchar(64), me_batch_code varchar(64), me_batch_size int, me_batch_detno int, me_create_time datetime not null ); -- 创建存储过程 入队消息 drop procedure if exists sync$enqueue_message; delimiter $$ create procedure sync$enqueue_message(p_table_name varchar(64), p_method_type varchar(6), p_data_key text, p_data text, p_priority int) begin if @source_app is not null then if @batch_detno is null then set @batch_detno = 0; end if; set @batch_detno = @batch_detno + 1; insert into sync$message (me_table_name, me_method_type, me_data_key, me_data, me_priority, me_create_time, me_source_app, me_batch_code, me_batch_size, me_batch_detno) values(p_table_name, p_method_type, p_data_key, p_data, p_priority, sysdate(), @source_app, @batch_code, @batch_detno, @batch_detno); -- 动态计算批次数量,在本次 session 中每累加一次,就更新一次数量 update sync$message set me_batch_size = @batch_detno where me_batch_code = @batch_code; else insert into sync$message (me_table_name, me_method_type, me_data_key, me_data, me_priority, me_create_time) values(p_table_name, p_method_type, p_data_key, p_data, p_priority, sysdate()); end if; end;$$ delimiter ; -- 创建存储过程 出队消息 drop procedure if exists sync$dequeue_message; delimiter $$ create procedure sync$dequeue_message(p_id bigint) begin insert into sync$message_history (mh_dequeue_time, me_id, me_table_name, me_method_type, me_data_key, me_data, me_priority, me_create_time, me_source_app, me_batch_code, me_batch_size, me_batch_detno) select sysdate(), me_id, me_table_name, me_method_type, me_data_key, me_data, me_priority, me_create_time, me_source_app, me_batch_code, me_batch_size, me_batch_detno from sync$message where me_id = p_id; delete from sync$message where me_id = p_id; end;$$ delimiter ; -- 创建存储过程 设置 session variable drop procedure if exists sync$set_session_variable; delimiter $$ create procedure sync$set_session_variable(p_source_app varchar(64), p_batch_code varchar(64)) begin set @source_app = p_source_app; set @batch_code = p_batch_code; set @batch_detno = 0; end;$$ delimiter ; -- 创建存储过程 取消设置 session variable drop procedure if exists sync$unset_session_variable; delimiter $$ create procedure sync$unset_session_variable() begin set @source_app = null; set @batch_code = null; set @batch_detno = null; end;$$ delimiter ; -- 创建存储过程 获取批次大小 drop procedure if exists sync$get_batch_size; delimiter $$ create procedure sync$get_batch_size(out p_batch_size int) begin set p_batch_size = @batch_detno; end;$$ delimiter ;