Настройка репликации с Oracle на PostgreSQL

Особенности:

репликация между базами данных Oracle и Не-Oracle через Heterogenous Services
поддерживается только для шлюзов “Transparent Gateway”, к которым не относится
Gateway for ODBC. Проблема возникает при попытке применить захваченные
процессом Streams Capture данные к таблице Postgres через Gateway из-за того,
что Gateway for ODBC не поддерживает двухфазный commit.

Чтобы обойти проблему можно извлекать захваченные изменения из очереди в отдельном
job-е и применять к таблице Postgres программно. Однако, чтобы можно было извлекать
изменения непосредственно из очереди, процедура захвата должна быть настроена в
синхронном режиме (SYNC_CAPTURE).

Порядок настройки:

1. Настроить доступ к PostgreSQL через ODBC

1.1. установить Postgres (64-битная версия)

1.2. Установить UnixODBC
(тестировал версию unixODBC-2.3.0)
для сборки необходимо установить параметры окружения:

PATH=/usr/bin:/usr/sfw/bin:/usr/ccs/bin:/opt/oracle10/fvb/postgres/postgres/9.0-pgdg/lib/64:/usr/local/bin:/usr/ccs/bin:/usr/sbin:/usr/openwin/bin:/opt/oracle10/product/11.1.0/Db_1/bin

LD_LIBRARY_PATH=/usr/sfw/lib/64:/opt/oracle10/fvb/postgres/postgres/9.0-pgdg/lib/64:/usr/local/lib

CFLAGS=-m64

запускать autogonfig:

>CFLAGS=-m64 ./configure

для сборки использовать gcc 3.4.3. из стандартной поставки Solaris 10 (версии, доступные для скачивания черех freesolaris,
не содержат 64-битных библиотек)

1.3. Установить Postgres ODBC
(тестировал версию psqlodbc-08.04.0200)
переменные окружения – см. выше.

зарегистрировать Postgres ODBC в odbc.ini:

/usr/local/etc/odbcinst.ini:
[PostgreSQL]
Description     = PostgreSQL driver
Driver          = /usr/local/lib/psqlodbcw.so
FileUsage       = 1

/usr/local/etc/odbc.ini
[pgtest]
Description         = Test to Postgres
Driver              = PostgreSQL
Trace               = Yes
TraceFile           = sql.log
Database            = pgtest
Servername          = localhost
UserName            = myuser
Password            = myuser
Port                = 5432
Protocol            = 6.4
ReadOnly            = No
RowVersioning       = No
ShowSystemTables    = No
ShowOidColumn       = No
FakeOidIndex        = No
QuotedId            = No
ConnSettings        =

протестировать доступность Postgres через ODBC с помощью isql:

isql pgtest

если тест не проходит, более подробно сообщение об ошибке можно посмотреть с помощью
нативного клиента postgres:
/usr/local/pgsql/bin/psql -h <host name> -d <database name> -U <user>

в случае проблем с кодировкой можно поменять кодировку на стороне клиента с помощью команды

alter user <user> SET client_encoding to LATIN1;

эта кодировка должна быть такой же, как указана при настройке Heterogenous Services
в переменной HS_LANGUAGE

нужно иметь в виду, что есть bug:

DG4ODBC Using MySQL ODBC Driver Corrupts Multibyte Characters [ID 1209943.1]
из-за которого невожможно работать через DG4ODBC со сторонними базами в unicode-кодировке.

2. Настроить доступ к PostgreSQL из Oracle через Gateway for ODBC

2.1. Удостовериться, что в RDBMS Oracle установлены параметры:

global_names=true
job_queue_processes > 2
compatible > 10.2.0

2.2. Сконфигурировать Heterogenous Services:

2.2.1. сконфигурировать listener в $ORACLE_HOME/network/admin/listener.ora:

LHS =
(ADDRESS_LIST =
(
ADDRESS= (PROTOCOL=tcp)
(HOST = blader)
(PORT = 1526)
)
)

SID_LIST_LHS =
(SID_LIST =
(SID_DESC =
(GLOBAL_DBNAME = PGTEST.MYDOMAIN.RU)
(SID_NAME = pgtest)
(ORACLE_HOME = /opt/oracle10/product/11.1.0/Db_1)
(PROGRAM = dg4odbc)
(ENVS=”LD_LIBRARY_PATH=/usr/sfw/lib/64:/opt/oracle10/fvb/postgres/postgres/9.0-pgdg/lib/64:/usr/local/lib”)
)
)

2.2.2. сконфигурировать HS в $ORACLE_HOME/hs/admin/initpgtest.ora:

HS_FDS_CONNECT_INFO = pgtest
HS_FDS_TRACE_LEVEL = DEBUG
HS_FDS_SHAREABLE_NAME = /usr/local/lib/psqlodbcw.so
HS_LANGUAGE=american_america.we8iso8859p1
HS_FDS_SUPPORT_STATISTICS=FALSE
HS_DB_NAME=PGTEST
HS_DB_DOMAIN=MYDOMAIN.RU
set ODBCINI=/usr/local/etc/odbc.ini

2.3. создать DB LINK в БД Oracle

2.3.1. добавить запись в tnsnames.ora

pgtest =
(DESCRIPTION =
(ADDRESS =
(PROTOCOL = TCP)(HOST = blader) (PORT = 1526)
)
(CONNECT_DATA =
(SID = pgtest)
) (HS = OK)
)

2.3.2. создать DB_LINK

CREATE PUBLIC DATABASE LINK dblink CONNECT TO 
"user" IDENTIFIED BY "password" USING 'tns_name_entry';

2.4. запустить listener:
lsnrctl start LHS

2.5. протестировать доступность Postgres из Oracle

2.6. понизить уровень логирования в $ORACLE_HOME/hs/admin/initpgtest.ora:

HS_FDS_TRACE_LEVEL = 0

3. Настроить репликацию с использованием Streams

3.1. Если объекты репликации создаются в отдельной схеме, нужно дать
соотв. права:

begin
DBMS_STREAMS_AUTH.GRANT_ADMIN_PRIVILEGE(‘SCHEMA_OWNER’);
end;

3.2. Создать очередь anydata

begin
DBMS_STREAMS_ADM.set_up_queue(
‘MY_QUEUE_TABLE’,
‘tablespace NCC storage (initial 124M)’,
‘STREAMS_QUEUE’,
‘SCHEMA_OWNER’);
end;

нужно увеличить количество попыток извлечения из очереди – у нас может быть недоступен канал
и нельзя допустить, чтобы сообщения протухали

begin
dbms_aqadm.alter_queue(
queue_name => ‘STREAMS_QUEUE’,
max_retries => 1000000,
retry_delay => null,
retention_time => null,
auto_commit => false);
end;

если требуется пересоздать таблицу, то вот команда:
BEGIN
DBMS_AQADM.DROP_QUEUE_TABLE(
queue_table        => ‘MY_QUEUE_TABLE’);
END;

3.3. Сконфигурировать применение изменений (на живой таблице нужно делать
перед конфигурированием захвата, а то будут ошибки при insert’ах)

BEGIN
DBMS_APPLY_ADM.CREATE_APPLY(
queue_name          => ‘STREAMS_QUEUE’,
apply_name          => ‘TEST_APPLY’,
apply_captured      => false);
END;

если требуется удалить apply, то вот команда:
BEGIN
DBMS_APPLY_ADM.DROP_APPLY(
apply_name          => ‘TEST_APPLY’);
END;

3.4. Сконфигурировать захват изменений на таблице

begin
dbms_streams_adm.add_table_rules(
table_name => ‘myschema.my_replicated_tab’,
streams_type => ‘SYNC_CAPTURE’,
streams_name => ‘TEST_CAPTURE’,
queue_name => ‘STREAMS_QUEUE’);
end;

если требуется удалить правило, нужно сначала посмотреть его имя в DBA_STREAMS_RULES,
а потом удалить:
begin
DBMS_STREAMS_ADM.REMOVE_RULE(‘SOME_RULE_NAME’,
‘CAPTURE’,
‘TEST_CAPTURE’);
end;

3.5. Скомпилировать пакет для применения изменений:

— пакет pkg_apply_pg применяет изменения в автономной транзакции
— Автономная транзакция нужна, чтобы исключить ошибку “ORA-02047 can not join distributed transaction in progress”
create or replace package pkg_apply_pg is

procedure do_insert(p_val varchar2);
procedure do_update(p_key varchar2, p_val varchar2);
procedure do_delete(p_key varchar2);

end pkg_apply_pg;

create or replace package body pkg_apply_pg is

procedure do_insert(p_val varchar2)
is
pragma autonomous_transaction;
begin
insert into “public”.”mytab”@pgtest.mydomain.ru values (p_val);
commit;
end;

procedure do_update(p_key varchar2, p_val varchar2)
is
pragma autonomous_transaction;
begin
update “public”.”mytab”@pgtest.mydomain.ru
set “a” = p_val
where
“a” = p_key;
commit;
end;

procedure do_delete(p_key varchar2)
is
pragma autonomous_transaction;
begin
delete from “public”.”mytab”@pgtest.mydomain.ru
where
“a” = p_key;
commit;
end;

end pkg_apply_pg;

3.6. Скомпилировать процедуру для извлечения изменений из очереди
и поместить ее в периодически запускаемый JOB:

declare
deq_data        ANYDATA;
msgid           RAW(16);
deqopt          DBMS_AQ.DEQUEUE_OPTIONS_T;
mprop           DBMS_AQ.MESSAGE_PROPERTIES_T;
proceed         BOOLEAN := true;
next_trans      EXCEPTION;
no_messages     EXCEPTION;
pragma exception_init (next_trans, -25235);
pragma exception_init (no_messages, -25228);
num_var         pls_integer;
lcr SYS.LCR$_ROW_RECORD;
rc PLS_INTEGER;
object_owner VARCHAR2(30);
object_name VARCHAR2(40);
dmlcommand VARCHAR2(10);
old_val VARCHAR2(100);
new_val VARCHAR2(100);
x number;
BEGIN
deqopt.consumer_name := ‘TEST_APPLY’;
deqopt.wait := 1;
WHILE (proceed) LOOP
BEGIN
DBMS_AQ.DEQUEUE(
queue_name          =>  ‘STREAMS_QUEUE’,
dequeue_options     =>  deqopt,
message_properties  =>  mprop,
payload             =>  deq_data,
msgid               =>  msgid);
deqopt.navigation := DBMS_AQ.NEXT;
rc := deq_data.GETOBJECT(lcr);
object_owner := lcr.GET_OBJECT_OWNER();
object_name := lcr.GET_OBJECT_NAME();
dmlcommand := lcr.GET_COMMAND_TYPE();

if (dmlcommand = ‘INSERT’) then
x := lcr.GET_VALUE(‘NEW’, ‘A’).getvarchar2(new_val);
pkg_apply_pg.do_insert(new_val);
elsif (dmlcommand = ‘UPDATE’) then
x := lcr.GET_VALUE(‘OLD’, ‘A’).getvarchar2(old_val);
x := lcr.GET_VALUE(‘NEW’, ‘A’).getvarchar2(new_val);
pkg_apply_pg.do_update(old_val, new_val);
elsif (dmlcommand = ‘DELETE’) then
x := lcr.GET_VALUE(‘OLD’, ‘A’).getvarchar2(old_val);
pkg_apply_pg.do_delete(old_val);
end if;
commit;
EXCEPTION
WHEN next_trans THEN
deqopt.navigation := DBMS_AQ.NEXT_TRANSACTION;
WHEN no_messages THEN
proceed := FALSE;
DBMS_OUTPUT.PUT_LINE(‘No more messages’);
END;
END LOOP;
END;

3.7. Запустить синхронный захват изменений

!! а это похоже не обязательно !!

BEGIN
DBMS_CAPTURE_ADM.START_CAPTURE(‘TEST_CAPTURE’);
END;

Проверить, что все сконфигурировано:

select * from DBA_SYNC_CAPTURE_TABLES
select * from DBA_STREAMS_RULES

3.8. Обеспечить периодическое удаление сообщений в состоянии PROCESSED:
declare
po dbms_aqadm.aq$_purge_options_t;
begin
po.block := FALSE;
dbms_aqadm.purge_queue_table(
queue_table => ‘STREAMS_QUEUE_TABLE’,
purge_condition => ‘msg_state = ”PROCESSED”’,
purge_options => po);
end;

Особенности реализации:

1. Чтобы не съедалось много ресурсов на попытки достучаться до сервера PostgreSQL в случае, когда сервер не доступен, нужно предусмотреть механизм приостановки применения изменений из очереди на определенный период времени с последующим автоматическим возобновлением (retry timeout).

2. Операции по применению изменений полезно не коммитить по отдельности, а объединять в пачки.

Ссылки:

Oracle 11g Streams Implementer’s Guide