123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596 |
- -- 创建消息表,用于记录变化,同步到私有库里
- drop table if exists sync$message;
- create table sync$message(
- me_id bigint not null primary key auto_increment,
- me_table_name varchar(64) not null,
- me_method_type varchar(6) not null,
- me_data_key text not null,
- me_data text,
- me_priority int default 0,
- me_retry_count int default 0,
- me_source_app varchar(64),
- me_batch_code varchar(64),
- me_batch_size int,
- me_batch_detno int,
- me_create_time datetime not null
- );
- -- 创建消息历史表
- drop table if exists sync$message_history;
- create table sync$message_history(
- mh_id bigint not null primary key auto_increment,
- mh_dequeue_time datetime not null,
- me_id bigint not null,
- me_table_name varchar(64) not null,
- me_method_type varchar(6) not null,
- me_data_key text not null,
- me_data text,
- me_priority int default 0,
- me_source_app varchar(64),
- me_batch_code varchar(64),
- me_batch_size int,
- me_batch_detno int,
- me_create_time datetime not null
- );
- -- 创建存储过程 入队消息
- drop procedure if exists sync$enqueue_message;
- delimiter $$
- create procedure sync$enqueue_message(p_table_name varchar(64), p_method_type varchar(6), p_data_key text, p_data text, p_priority int)
- begin
- if @source_app is not null then
- if @batch_detno is null then
- set @batch_detno = 0;
- end if;
- set @batch_detno = @batch_detno + 1;
- insert into sync$message (me_table_name, me_method_type, me_data_key, me_data, me_priority, me_create_time, me_source_app, me_batch_code, me_batch_size, me_batch_detno) values(p_table_name, p_method_type, p_data_key, p_data, p_priority, sysdate(), @source_app, @batch_code, @batch_detno, @batch_detno);
- -- 动态计算批次数量,在本次 session 中每累加一次,就更新一次数量
- update sync$message set me_batch_size = @batch_detno where me_batch_code = @batch_code;
- else
- insert into sync$message (me_table_name, me_method_type, me_data_key, me_data, me_priority, me_create_time) values(p_table_name, p_method_type, p_data_key, p_data, p_priority, sysdate());
- end if;
- end;$$
- delimiter ;
- -- 创建存储过程 出队消息
- drop procedure if exists sync$dequeue_message;
- delimiter $$
- create procedure sync$dequeue_message(p_id bigint)
- begin
- insert into sync$message_history (mh_dequeue_time, me_id, me_table_name, me_method_type, me_data_key, me_data, me_priority, me_create_time, me_source_app, me_batch_code, me_batch_size, me_batch_detno) select sysdate(), me_id, me_table_name, me_method_type, me_data_key, me_data, me_priority, me_create_time, me_source_app, me_batch_code, me_batch_size, me_batch_detno from sync$message where me_id = p_id;
- delete from sync$message where me_id = p_id;
- end;$$
- delimiter ;
- -- 创建存储过程 设置 session variable
- drop procedure if exists sync$set_session_variable;
- delimiter $$
- create procedure sync$set_session_variable(p_source_app varchar(64), p_batch_code varchar(64))
- begin
- set @source_app = p_source_app;
- set @batch_code = p_batch_code;
- set @batch_detno = 0;
- end;$$
- delimiter ;
- -- 创建存储过程 取消设置 session variable
- drop procedure if exists sync$unset_session_variable;
- delimiter $$
- create procedure sync$unset_session_variable()
- begin
- set @source_app = null;
- set @batch_code = null;
- set @batch_detno = null;
- end;$$
- delimiter ;
- -- 创建存储过程 获取批次大小
- drop procedure if exists sync$get_batch_size;
- delimiter $$
- create procedure sync$get_batch_size(out p_batch_size int)
- begin
- set p_batch_size = @batch_detno;
- end;$$
- delimiter ;
|