Исходники.Ру - Программирование
Исходники
Статьи
Книги и учебники
Скрипты
Новости RSS
Магазин программиста

Главная » Статьи по Базам данных » Interbase - Статьи »

Обсудить на форуме Обсудить на форуме

Реализация off-line репликации в Interbase (Firebird)

Часто в задачи автоматизации управления предприятиями входит задача синхронизации состояния информации баз данных в головном офисе и региональных офисах. Примерами могут служить управление распределенным складом или филиалами банка.

В системе без выделенного головного офиса задача может состоять в передаче изменений заданных таблиц БД всем или выделенному списку территориальных управлений(ТУ), работающих с БД той же структуры.

Для решения этой задачи в СУБД FireBird можно поступать следующим образом.

Для всех таблиц БД использовать суррогатные первичные ключи представляющие счетики, задаваемые генераторами, а ограничение уникальности сочетаний полей задавать дополнительно. Для каждого ТУ задавать такие значения генераторов, чтобы гарантировать отсутствие пересечений множеств значений генераторов разных ТУ, например все генераторы ТУ1 имеют значения в отрезке целых чисел [1, N] , для ТУ2 [N+1, 2N] и т.д.

Для отслеживания изменений в базе данных воспользуемся менеджером протоколов данных IBExpress. На Рис 1. показано каким образом можно поставить на логирование таблицы базы данных ROffice.gdb.

Реализация off-line репликации в Interbase (Firebird)
Рис 1.

Учет изменений ведется в четырех системных таблицах изображенных на Рис 2.

Реализация off-line репликации в Interbase (Firebird)
Рис 2.

Например , при внесении изменений в таблице CITY , в таблицах IBE$Log_Tables, IBE$Log_Keys, IBE$Log_Fields, IBE$Log_Blob_Fields появятся записи за счет срабатывания триггеров логирования. На Рис. 3 показаны данные таблицы IBE$Log_Tables, где видно, что 09.09.2005 выполнялись операции INSERT и UPDATE с таблицей CITY. Соответствующие этим операциям записи будут присутствовать и в таблицах IBE$Log_Keys, IBE$Log_Fields, IBE$Log_Blob_Fields.

Реализация off-line репликации в Interbase (Firebird)
Рис 3.

Информацию об измененных данных одной БД за определенный промежуток времени для заданного набора таблиц будем формировать в файл данных off-line репликации, который будем называть D - файл.

Имена файлов D имеют структуру:

  • D_CODSENDER_CODREC_TIMESTAMP_IDPredFileD_IDFileD
  • CODSENDER - код территориального управления отправителя
  • CODREC - код территориального управления получателя
  • IDPredFileD номер ID последнего сквитованного без ошибок файла
  • IDFileD номер ID отправляемого файла

При приеме файла D происходит проверка синтаксиса файла, номера сеанса и даты в имени файла, номера сеансов должны удовлетворять своей последовательности от каждого ТУ (см. формирование имени файла D). По результату контороля формируется файл квитанции S.

Имя файла квитанции формируется из имени файла добавлением слева буквы S. Для обмена D и S файлами создаюся каталоги.

.. ROOT IN
.. ROOT OUT, где ROOT - корневая директория 

Для управления сеансами репликации нужно создать БД , содержащую по крайней мере три таблицы, подобных изображенным на Рис. 4.

Реализация off-line репликации в Interbase (Firebird)
Рис 4.

и таблицы Project и Tables, содержащие информацию о базах и таблицах , подлежащих репликации (Рис 5).

Реализация off-line репликации в Interbase (Firebird)
Рис 5.

Здесь AliasName имя файла базы данных. Replication = 1 означает, что объект подлежит репликации а Replication = 0 нет.

Очередной файл репликации D может быть построен только при наличии сквитованного без ошибок предыдущего файла D, или если он является первым.

При успешном формировании файла он помещается в Connections.Files, каталог "..ROOT " и копируется в ".. ROOT OUT" для почтовой программы. В Connections.Errors пишется протокол. При этом выполняются преобразования в таблице сеансов Connections.

При обнаружении ошибки при приеме D файла формируется квитанция с кодом ошибки , а Connections.State := stateReceived , Connections.Errid := <> 0.

При обнаружении ошибки при приеме S, квитанция бракуется Connections.State := stateReceived , Connections.Errid := <> 0.

При нормальном приеме S Connections.State := stateTICKED (Сквитован) для файла D и stateRECEIVE (принята) для квитанции и Connections.Errid := 0 для обоих.

Если для принимаемого файла существует запись Connections.NameFile = NameFile, Connections.State = stateReceive, Connections.Errid = 0 , файл удаляется как повторный.

Файл D содержит следующие данные.

  • Ключ проекта
  • Время предыдущего сеанса
  • Время текущего сеанса
  • Разность начальных значений генераторов
  • Разделитель в виде строки GUID
  • Строка в которой сохранен файл ClientDataSet с данными , соответствующими SQL запросу SIBT в функции CreateContentDViacds.
  • Разделитель в виде строки GUID
  • Строка в которой сохранен файл ClientDataSet с данными, соответствующими запросу
  • SIBUI в функции CreateContentDViacds , выполняющей построение файла репликации.

Далее аналогично помещаются данные, соответствующие запросам SIBKUI, SIBKD, SIBFU, SIBBU с тем же разделителем между ними.

Формат передачи данных ClientDataset используется для удобства передачи значений BLOB полей чего не позволяет наиболее популярный для этих целей XML формат.

// функция построения файла D для заданного отрезка времени
// ProjectID - номер проекта
// NKeyMin - минимальное значение ключей БД отправителя
// NKeyMax - максимальное значение ключей БД отправителя
// TablesList - список таблиц, разделенных запятой, подлежащих репликации

function TfmSkladTun.CreateContentDViacds( DateB, DateE: TDateTime;
  NameDataBase: string; ProjectID, NKeyMin, NKeyMax: integer;
  TablesList: string ): string;
Const // для последовательности операций
SIBT =
' select IBT.table_name, IBT.ID, IBT.OPERATION from ibe$log_tables '+
' IBT inner join ibe$log_keys IBK on IBT.ID = IBK.log_tables_id '+
' where DATE_TIME >= :D1 and DATE_TIME < :D2 '+
' and Cast(IBK.KEY_VALUE as numeric) >= %d '+
' and Cast(IBK.KEY_VALUE as numeric) < %d';

SIBKUI = // для удаленных записей
' select IBT.table_name, IBK.KEY_FIELD, IBK.KEY_VALUE, IBT.ID '+
' from ibe$log_keys IBK, ibe$log_tables IBT where log_tables_id in '+
' (select id from ibe$log_tables where DATE_TIME >= :D1 and '+
' DATE_TIME < :D2 '+
' and OPERATION <> 'D' ) '+
' and IBK.log_tables_id = IBT.id '+
' and Cast(IBK.KEY_VALUE as numeric) >= %d '+
' and Cast(IBK.KEY_VALUE as numeric) < %d';

SIBKD = // для удаленных записей
' select IBT.table_name, IBK.KEY_FIELD, IBK.KEY_VALUE, IBT.ID '+
' from ibe$log_keys IBK, ibe$log_tables IBT where log_tables_id in '+
' (select id from ibe$log_tables where DATE_TIME >= :D1 and '+
' DATE_TIME < :D2 '+
' and OPERATION = 'D' ) '+
' and IBK.log_tables_id = IBT.id '+
' and Cast(IBK.KEY_VALUE as numeric) >= %d '+
' and Cast(IBK.KEY_VALUE as numeric) < %d';

// для измененных значений полей operation <>'D'
// для вставленных значений полей

SIBFU =
' select IBT.table_name, IBF.FIELD_NAME, IBF.OLD_VALUE, IBF.NEW_VALUE, '+
; IBK.KEY_FIELD, IBK.KEY_VALUE, IBT.ID from ibe$log_tables IBT, '+
' ibe$log_fields IBF, ibe$log_keys IBK '+
' where IBF.log_tables_id in '+
' (select id from ibe$log_tables where DATE_TIME >= :D1 and '+
' DATE_TIME < :D2 '+
' and OPERATION <> 'D' ) '+
' and IBT.ID = IBK.log_tables_id and IBT.ID = IBF.log_tables_id '+
' and Cast(IBK.KEY_VALUE as numeric) >= %d '+
' and Cast(IBK.KEY_VALUE as numeric) < %d';

// для измененных значений Blob полей operation <>'D'
// для вставленных значений Blob полей
SIBBU =
' select IBT.table_name, IBB.FIELD_NAME, IBB.OLD_CHAR_VALUE, '+
' IBB.NEW_CHAR_VALUE,'+
' IBB.OLD_BLOB_VALUE, IBB.NEW_BLOB_VALUE,'+
' IBK.KEY_FIELD, IBK.KEY_VALUE, IBT.ID from ibe$log_tables IBT, '+
' ibe$log_blob_fields IBB, ibe$log_keys IBK '+
' where IBB.log_tables_id in (select id from ibe$log_tables '+
' where DATE_TIME >= :D1 and DATE_TIME < :D2 and OPERATION <> 'D')'+
' and IBT.ID = IBK.log_tables_id and IBT.ID = IBB.log_tables_id '+
' and Cast(IBK.KEY_VALUE as numeric) >= %d '+
' and Cast(IBK.KEY_VALUE as numeric) < %d';

var
  i,j: integer;
  StrTmpAll: string;
  ListSql: TStringList;
  TableList: TStringList;
  DBNameSaved: string;
  FDelimiter, FDelimiter1: string;
  guid: TGUID;
  StrStream: TStringStream;
  sTemp: string;
begin
 Result := ';
 try
  try
   // Номер реплицируемого проекта
   StrTmpAll := Padl(Trim(IntToStr(ProjectID)), 10)+#13#10+ 
   FormatDateTime('DD.MM.YYYY HH:NN:SS',DateB)+#13#10+
   FormatDateTime('DD.MM.YYYY HH:NN:SS',DateE)+#13#10+
   IntToStr(DeltaGen) +#13#10;
   // - DeltaGen отрицательное или положительное целое число,
   // представляющее разность значений генераторов в БД отправителя
   // и БД получателя.
   if CreateGuid(guid) = S_OK then
    FDelimiter := GuidToString(guid) 
   else
    begin
     MessageDlg('Не могу получить строку разделителя', mtError, [mbOk], []);
     Exit;
    end; //
   StrTmpAll := StrTmpAll + FDelimiter+#13#10;
   TableList := TStringList.Create();
   TableList.CommaText := TablesList;
   TableList.CaseSensitive := False;
   ListSql := TStringList.Create();

   ListSql.Add(Format(SIBT, [ NKeyMin, NKeyMax]));
   ListSql.Add(Format(SIBKUI, [ NKeyMin, NKeyMax]));
   ListSql.Add(Format(SIBKD, [ NKeyMin, NKeyMax]) );
   ListSql.Add(Format(SIBFU, [ NKeyMin, NKeyMax]));
   ListSql.Add(Format(SIBBU, [ NKeyMin, NKeyMax]));

   if dmReplConn.fbConstr.Connected then
    dmReplConn.fbConstr.Connected := False;

   DBNameSaved := dmReplConn.fbConstr.DBName;
   dmReplConn.fbConstr.DBName := NameDataBase;
   dmReplConn.fbConstr.Connected := True;
   for j := 0 to ListSql.Count-1 do
    begin
     sTemp := ';
     StrStream := TStringStream.Create(sTemp);
     Createcds(j, cdsReplik);
     if dmReplConn.fdCommon.Active then
      dmReplConn.fdCommon.Close;
     dmReplConn.fdCommon.SelectSQL.Clear;
     dmReplConn.fdCommon.SelectSQL.Add(ListSql[j]);
     dmReplConn.fdCommon.OpenWP([DateB, DateE]);
     with (dmReplConn.fdCommon) Do
      begin
       First;
       while not Eof Do
        begin
         cdsReplik.Append;

         // включаем только
         // таблицы подлежащие репликации
         if TableList.IndexOf(Fields[0].AsString) <> -1 then 
          For i:=0 to Fields.Count-1 Do
           Begin
            cdsReplik.Fields[i].AsString := Fields[i].AsString;
           End;
       Next;
      end;
     end;
    cdsReplik.SaveToStream(StrStream, dfBinary);
    cdsReplik.Close;
    StrTmpAll := StrTmpAll+StrStream.DataString+FDelimiter+#13#10;
    StrStream.Free;
    // конец DEL части
   end;
  Result := StrTmpAll;
 except
 on e: exception do
 ErrorDlg(e);
end;
finally
 ListSql.Free;
 TableList.Free;
 if dmReplConn.fdCommon.Active then
  dmReplConn.fdCommon.Close;
 if dmReplConn.fbConstr.Connected then
  dmReplConn.fbConstr.Connected := False;
 dmReplConn.fbConstr.DBName := DBNameSaved;
 end;
end;

// построение ClientDataSet для форматирования данных
procedure TfmSkladTun.Createcds(TipLog: smallint; cds: TClientDataSet);
var
i: integer;
begin
with cds do
begin
if Active then
Close;
FieldDefs.Clear;
case TipLog of
0: // для ID
begin
 FieldDefs.Add( 'TABLE_NAME', ftString, 70 );
 FieldDefs.Add( 'OPERATION', ftString, 1 );
 FieldDefs.Add( 'ID', ftLargeint);
end;

1, 2: // для insert, update и delete ключей

begin
 FieldDefs.Add( 'TABLE_NAME', ftString, 70 );
 FieldDefs.Add( 'KEY_FIELD', ftString, 70 );
 FieldDefs.Add( 'KEY_VALUE', ftString, 255);
 FieldDefs.Add( 'ID', ftLargeint);
end;

3: // для insert и Update полей

begin
 FieldDefs.Add( 'TABLE_NAME', ftString, 70 );
 FieldDefs.Add( 'FIELD_NAME', ftString, 70 );
 FieldDefs.Add( 'OLD_VALUE', ftMemo);
 FieldDefs.Add( 'NEW_VALUE', ftMemo);
 FieldDefs.Add( 'KEY_FIELD', ftString, 70 );
 FieldDefs.Add( 'KEY_VALUE', ftString, 255);
 FieldDefs.Add( 'ID', ftLargeint);
end;

4: // для insert и Update Blob полей

begin
 FieldDefs.Add( 'TABLE_NAME', ftString, 70 );
 FieldDefs.Add( 'FIELD_NAME', ftString, 70 );
 FieldDefs.Add( 'OLD_CHAR_VALUE', ftMemo);
 FieldDefs.Add( 'NEW_CHAR_VALUE', ftMemo);
 FieldDefs.Add( 'OLD_BLOB_VALUE', ftBlob);
 FieldDefs.Add( 'NEW_BLOB_VALUE', ftBlob);
 FieldDefs.Add( 'KEY_FIELD', ftString, 70 );
 FieldDefs.Add( 'KEY_VALUE', ftString, 255);
 FieldDefs.Add( 'ID', ftLargeint);
end;
end;
CreateDataSet;
IndexFieldNames := 'ID';
end;
end;

Файл квитанции S содержит следующие данные

  • Код ошибки (Целое число 10 позиций дополненное слева нулями)
  • Описание ошибки (Строка до 256 символов)

Очередной файл D, кроме первого, может быть принят только при наличии сквитованного без ошибок предыдущего принятого файла D.

Функция реализации изменений при приеме файла D AllUpdateDatabaseGetContentD() генерирует и выполняет необходимые SQL команды в БД получателя, в одной транзакции, и в точно такой же последовательности, как они выполнялись в базе ТУ отправителя .

Процедура контроля файла D , которая выполняет проверку имени и содержимого файла, последовательности сеансов и т.п. ( в статье не приведена ) получает переменные приведенные ниже из строки содержимого файла D FileString .

ProjectID := StrToInt(Trim(Copy(FileString, 1, 10)));
  • StringID - строка , соответствующая файлу ClientDataSet для SIBUI
  • FStringForInsUpd - строка , соответствующая файлу ClientDataSet для SIBKUI
  • FStringForDel - строка , соответствующая файлу ClientDataSet для SIBKD
  • FStringForField - строка , соответствующая файлу ClientDataSet для SIBFU.
  • FStringForBlob - строка , соответствующая файлу ClientDataSet для SIBBU
  • DeltaGen - отрицательное или положительное целое число, представляющее разность значений генераторов в БД отправителя и БД получателя.
function TFileHandling.AllUpdateDatabaseGetContentD(): boolean;
var
 Sds: TSqlDataSet;
 TD: TTransactionDesc;
 OldDatabase,NameDatabase: string;
 cdsID: TClientDataSet;
 sTemp: string;
 StrStream: TStringStream;
 ProjectID: integer;
begin
 Result := true;
 // получаем имя файла базы данных получателя по номеру проекта
 NameDataBase := fmSkladTun.GetNameDatabase(ProjectID); 
try
try
with dmReplConn do
begin
Sds := TSqlDataSet.Create(dmReplConn);
if cnRep.Connected then
cnRep.Close;
OldDatabase := cnRep.Params.Values['DataBase'];
cnRep.Params.Values['DataBase'] := NameDatabase;
if not cnRep.Connected then
cnRep.Open;
While cnRep.InTransaction do
Continue;
TD.TransactionID := 1;
TD.IsolationLevel := xilREADCOMMITTED;
cnRep.StartTransaction(TD);
Sds.SqlConnection := cnRep;
Sds.CommandType := ctQuery;
cdsID := TClientdataset.Create(fmSkladTun);
fmSkladTun.Createcds(0, cdsID);
sTemp := StringID;
StrStream := TStringStream.Create(sTemp);
cdsID.LoadFromStream(StrStream);
FreeAndNil(StrStream);
cdsID.First;
While not cdsID.Eof do
begin
UpdateDatabaseGetContentD(cdsID.Fields[0].AsInteger, cdsID.Fields[2].AsString[1], Sds);
cdsID.Next;
end;
end;
except
on E: Exception do
begin
if dmReplConn.cnRep.InTransaction then
dmReplConn.cnRep.Rollback(TD);
result := false;
ErrorDlg(E);
exit;
end;
end;
dmReplConn.cnRep.Commit(TD);
finally
if dmReplConn.cnRep.Connected then
dmReplConn.cnRep.Close;
dmReplConn.cnRep.Params.Values['DataBase'] := OldDatabase;
Screen.Cursor := crDefault;
if Sds.Active then
Sds.Close;
if Assigned(Sds) then
FreeAndNil(Sds);
if Assigned(cdsID) then
FreeAndNil(cdsID);
end;
end;

procedure TFileHandling.UpdateDatabaseGetContent(IDCUR: integer;
                             Operation: char; Sds: TSqlDataSet);
var
i: integer;
NBLOBVALUE: string;
LOG_TABLES_ID: integer;
TABLE_NAME,KEY_FIELD,KEY_VALUE: string;
FIELD_NAME, OLD_VALUE, NEW_VALUE: string;
OLD_CHAR_VALUE, NEW_CHAR_VALUE, OLD_BLOB_VALUE, NEW_BLOB_VALUE: string;
pos1, pos2: integer;
sTemp, sTemp1: string;
FieldType: TFieldType;
StrStream, StrStream1: TStringStream;
StrValues: string;
TN: string;
cdsKeys, cdsFieldInsert, cdsBlobInsert: TClientDataset;
TempParams: TParams;
sTempNames, sTempValues, sTempNamesB, sTempValuesB: string;
sTempNamesValues, sTempNamesValuesB: string;

function GetPartInsertStatement(ForBlob: boolean; Id: integer; Tip: smallint ): string;
var
sPart: string;
i: integer;
TABLE_NAME,FIELD_NAME,NEW_VALUE, KEY_FIELD,KEY_VALUE: string;
begin
Result:= ';
sPart := ';
if ForBlob then
with cdsBlobInsert do
begin
TempParams := TParams.Create;
Sds.Params.Clear;
First;
while not Eof do
begin
if Id = Fields[8].AsInteger then
if Tip = 0 then
begin // получить строку для значений вставляемых BLOB полей
Sds.Params.Add;
if sPart = ' then
sPart := sPart+' :p'+Trim(IntToStr(Sds.Params.Count))
else
sPart := sPart+', :p'+Trim(IntToStr(Sds.Params.Count));
if Fields[3].AsString <> ' then
NBLOBVALUE := Fields[3].AsString
else
NBLOBVALUE := Fields[5].AsString;
Sds.Params[Sds.Params.Count-1].AsBlob := NBLOBVALUE;
end
else // получить строку для имен вставляемых BLOB полей
if TIP = 1 then
if sPart = ' then
sPart := sPart+ Fields[1].AsString
else
sPart := sPart+','+ Fields[1].AsString
else // для update
begin
Sds.Params.Add;
if Trim(sPart) = ' then
sPart := sPart+Fields[1].AsString+'= :p'+Trim(IntToStr(Sds.Params.Count-1))
else
sPart := sPart+','+Fields[1].AsString+'= :p'+Trim(IntToStr(Sds.Params.Count));
if Fields[3].AsString <> ' then
NBLOBVALUE := Fields[3].AsString
else
NBLOBVALUE := Fields[5].AsString;
Sds.Params[Sds.Params.Count-1].AsBlob := NBLOBVALUE;
end;
Next;
end;
end
else //для обычных полей
with cdsFieldInsert do
begin
First;
while not Eof do
begin
if (Id = Fields[6].AsInteger) then
if Tip = 0 then
// получить строку для значений вставляемых полей
begin
TABLE_NAME := Fields[0].AsString;
FIELD_NAME := Fields[1].AsString;
NEW_VALUE := Fields[3].AsString;
If FIELD_NAME = Fields[4].AsString then // если это ключевое поле
NEW_VALUE := IntToStr(Fields[3].AsInteger + DeltaGen);


if not GetFieldType(TABLE_NAME, FIELD_NAME, FieldType) then
raise Exception.Create(Format('Невозможно получить тип поля %s для таблицы %s ',
[FIELD_NAME, TABLE_NAME]));
if FieldType in [ftString, ftWideString,ftDate, ftDateTime, ftTimeStamp] then
NEW_VALUE := QuotedStr(NEW_VALUE);
if sPart = ' then
sPart := sPart + NEW_VALUE
else
sPart := sPart +','+ NEW_VALUE;
end
else // получить строку для имен вставляемых полей
if TIP = 1 then
if sPart = ' then
sPart := sPart+ Fields[1].AsString
else
sPart := sPart+','+ Fields[1].AsString
else // Tip = 2 для update
begin
TABLE_NAME := Fields[0].AsString;
FIELD_NAME := Fields[1].AsString;
NEW_VALUE := Fields[3].AsString;
if not GetFieldType(TABLE_NAME, FIELD_NAME, FieldType) then
raise Exception.Create(Format('Невозможно получить тип поля %s для таблицы %s ',
[FIELD_NAME, TABLE_NAME]));
if FieldType in [ftString, ftWideString,ftDate, ftDateTime, ftTimeStamp] then
NEW_VALUE := QuotedStr(NEW_VALUE);
if sPart = ' then
sPart := sPart + Fields[1].AsString+' = '+ NEW_VALUE
else
sPart := sPart+','+ Fields[1].AsString+' = '+ NEW_VALUE;
end;
Next;
end;
end;
for i := 0 to Sds.Params.Count-1 do
TempParams.AddParam (Sds.Params[i]);
Result := sPart;
end;
begin
try
// получение и выполнение запросов на вставку и
// изменение записей для обычных и BLOB полей
cdsKeys := TClientdataset.Create(fmSkladTun);
cdsFieldInsert:= TClientdataset.Create(fmSkladTun);
cdsBlobInsert := TClientdataset.Create(fmSkladTun);

sTemp := StringForInsUpd;
StrStream := TStringStream.Create(sTemp);
fmSkladTun.Createcds(1, cdsKeys);
cdsKeys.LoadFromStream(StrStream);
FreeAndNil(StrStream);
sTemp := StringForField;
sTemp1 := StringForBlob;
StrStream := TStringStream.Create(sTemp);
StrStream1 := TStringStream.Create(sTemp1);
fmSkladTun.Createcds(3, cdsFieldInsert);
cdsFieldInsert.LoadFromStream(StrStream);
fmSkladTun.Createcds(4, cdsBlobInsert ); // для BLOBOV
cdsBlobInsert.LoadFromStream(StrStream1);
cdsKeys.First;
with cdsKeys do
while not Eof do
begin
if Fields[3].AsInteger = IDCUR then
begin
TABLE_NAME := Fields[0].AsString;
KEY_FIELD := Fields[1].AsString;
KEY_VALUE := Fields[2].AsString;
LOG_TABLES_ID := Fields[3].AsInteger;
case Operation of
'I':
begin
sTempNames := GetPartInsertStatement(false, LOG_TABLES_ID, 1 );
sTempValues := GetPartInsertStatement(false, LOG_TABLES_ID, 0 );
sTempNamesB := GetPartInsertStatement(true, LOG_TABLES_ID, 1 );
sTempValuesB := GetPartInsertStatement(true, LOG_TABLES_ID, 0 );

Sds.CommandText := 'Insert INTO ' + TABLE_NAME + '( ' +sTempNames+
IIf_Str(sTempNames=',',IIf_Str(sTempNamesB=',',','))+
{строка наименований вставляемых Не BLOB полей}
sTempNamesB+{строка наименований вставляемых BLOB полей}
') VALUES ('+ sTempValues+
IIf_Str(sTempValues=',',IIf_Str(sTempValuesB=',',','))+
{строка значений вставляемых Не BLOB полей}
sTempValuesB+{строка значений вставляемых BLOB полей}
')';
end;
'U':
begin
sTempNamesValues := GetPartInsertStatement(false, LOG_TABLES_ID, 2 );
sTempNamesValuesB := GetPartInsertStatement(true, LOG_TABLES_ID, 2 );
Sds.CommandText := 'UPDATE ' + TABLE_NAME + ' SET ' + sTempNamesValues+
IIf_Str(sTempNamesValues=',',IIf_Str(sTempNamesValuesB=',',','))+
{строка наименований = значение изменяемых Не BLOB полей}
sTempNamesValuesB+ {строка наименований = значение изменяемых BLOB полей}
' WHERE '+ Fields[1].AsString+' = '+IntToStr(Fields[2].AsInteger+ DeltaGen);
end;
end;
for i := 0 to Sds.Params.Count-1 do
Sds.Params[i]:= TempParams[i];
Sds.ExecSql;
if Assigned(TempParams) then
TempParams.Free;
end;
Next;
end;
FreeAndNil(StrStream);
FreeAndNil(StrStream1);

// получение запросов на удаление записей
FreeAndNil(cdsKeys);
cdsKeys := TClientdataset.Create(fmSkladTun);
sTemp := StringForDel;
StrStream := TStringStream.Create(sTemp);
fmSkladTun.Createcds(0, cdsKeys);
cdsKeys.LoadFromStream(StrStream);
cdsKeys.First;
with cdsKeys do
// if FindKey([IDCUR]) then
while not Eof do
begin
if Fields[3].AsInteger = IDCUR then
begin 
TABLE_NAME := Fields[0].AsString;
KEY_FIELD := Fields[1].AsString;
KEY_VALUE := Fields[2].AsString;
LOG_TABLES_ID := Fields[3].AsInteger;
Sds.CommandText := 'delete from '+TABLE_NAME+ ' where '+KEY_FIELD+' = ' + KEY_VALUE;
Sds.ExecSql;
end;
Next;
end;
FreeAndNil(StrStream);
finally
if Assigned(cdsKeys) then
FreeAndNil(cdsKeys);
if Assigned(cdsFieldInsert) then
FreeAndNil(cdsFieldInsert);
if Assigned(cdsBlobInsert) then
FreeAndNil(cdsBlobInsert);
end;
end;

Сервер off - line репликации имеет возможность выполнять настройки режимов репликации, выбирать списки ТУ, на которые выполняется рассылка TU.Replication = True, списки проектов в каждом ТУ, и списки таблиц баз данных проектов, подлежащих репликации. При выполнении пункта меню 'Создание файла репликации' выполняется построение и отправка файла D за период от введенного времени до текущего. При запуске автоматического режима выполняются циклы приема и передачи данных и квитанций с периодичностью TU. PeriodConnect. В любой момент можно остановить сервер off - line репликации.

На рис. 6. изображено состояние после отправки на Склад 2 очередного файла D. Файл сформировался после изменения данных в одной из таблиц проекта 3, поставленных на репликацию. На Рис. 7. показано, что следующий файл не формируется до прихода квитанции S от склада 2. На Рис. 8. показано, что все таблицы проекта 3 поставлены на репликацию по передаче для склада 1 . После приема файла D на стороне склада 2 и формирования квитанции Рис. 9. сервер склада 1 готов снова формировать сеанс D (Рис. 10.) , при возникновении изменений в данных одной из таблиц, поставленных на репликацию . В соответствующей БД и таблицах на стороне склада 2 выполнится та же транзакция, но со значениями ключей , отличающимися на DeltaGen.

Реализация off-line репликации в Interbase (Firebird)
Рис 6.


Реализация off-line репликации в Interbase (Firebird)
Рис 7.


Реализация off-line репликации в Interbase (Firebird)
Рис 8.


Реализация off-line репликации в Interbase (Firebird)
Рис 9.


Реализация off-line репликации в Interbase (Firebird)
Рис 10.


Пример программы, реализующей данный механизм Вы можете найти на сайте: http://nerusoft.com


Может пригодится:


Автор: Морянцев М.А.
Прочитано: 8323
Рейтинг:
Оценить: 1 2 3 4 5

Комментарии: (0)

Добавить комментарий
Ваше имя*:
Ваш email:
URL Вашего сайта:
Ваш комментарий*:
Код безопастности*:

Рассылка новостей
Рейтинги
© 2007, Программирование Исходники.Ру