ABC VARCHAR2(10),BCD NUMBER(20,2), CDE DATE . . . |
ABC, BCD, CDE, . . . |
connect system/manager@whatever GRANT EXECUTE_CATALOG_ROLE to boris_publisher; GRANT SELECT_CATALOG_ROLE to boris_publisher; |
connect scott/tiger@whatever GRANT SELECT on emp to boris_publisher; GRANT SELECT on DEPT to boris_publisher; |
| Create Change Tables |
|---|
connect boris_publisher/boris_publisher@whatever
set serveroutput on size 1000000
set linesize 120
/* Login as a publisher to run this */
BEGIN
DECLARE work_sql VARCHAR2(32767);
v_publisher_id VARCHAR2(20) := 'boris_publisher';
v_source_schema VARCHAR2(20) := 'scott';
v_source_table VARCHAR2(100);
v_cdc_table VARCHAR2(100);
CURSOR c_tables IS
SELECT 'emp' table_name FROM dual
UNION
SELECT 'dept' table_name FROM dual
;
BEGIN
FOR tables IN c_tables LOOP
v_source_table := tables.table_name;
v_cdc_table := 'CDC_'||v_source_table;
work_sql := etl_util.get_cols_definition(
v_source_schema,v_source_table, '','');
DBMS_LOGMNR_CDC_PUBLISH.CREATE_CHANGE_TABLE (
OWNER => v_publisher_id, SOURCE_SCHEMA => v_source_schema
,CHANGE_SET_NAME => 'SYNC_SET', CAPTURE_VALUES => 'both'
,RS_ID => 'y', ROW_ID => 'n', USER_ID => 'y'
, TIMESTAMP => 'y'
,OBJECT_ID => 'n' -- leave it as 'N' or you will have
-- "table has no columns" error
,SOURCE_COLMAP => 'y', TARGET_COLMAP => 'y'
,OPTIONS_STRING => null
,SOURCE_TABLE => v_source_table
,CHANGE_TABLE_NAME => v_cdc_table
,COLUMN_TYPE_LIST => work_sql);
DBMS_output.put_line('Change table '||v_cdc_table
||' was created successfully');
END LOOP;
EXCEPTION
WHEN OTHERS THEN
DBMS_output.put_line(
'Error start ********************************************');
DBMS_output.put_line('Error during change table ' || v_cdc_table
|| ' creation:');
DBMS_output.put_line(to_char(sqlcode)|| ' ' || sqlerrm);
DBMS_output.put_line(
'Error end ********************************************');
END;
END;
/
|
CURSOR c_tables IS SELECT 'emp' table_name FROM dual UNION SELECT 'dept' table_name FROM dual |
work_sql := etl_util.get_cols_definition(v_source_schema,v_source_table, '',''); |
comm number(7,2), deptno number(2), empno number(4), ename varchar2(10) , hiredate date, job varchar2(9), mgr number(4), sal number(7,2) |
work_sql := etl_util.get_cols_definition(v_source_schema,v_source_table, 'hiredate,job',''); |
| describe cdc_emp | describe cdc_dept |
|---|---|
Name Type ----------------- ------------ OPERATION$ CHAR(2) CSCN$ NUMBER COMMIT_TIMESTAMP$ DATE RSID$ NUMBER USERNAME$ VARCHAR2(30) TIMESTAMP$ DATE SOURCE_COLMAP$ RAW(128) TARGET_COLMAP$ RAW(128) COMM NUMBER(7,2) DEPTNO NUMBER(2) EMPNO NUMBER(4) ENAME VARCHAR2(10) HIREDATE DATE JOB VARCHAR2(9) MGR NUMBER(4) SAL NUMBER(7,2) |
Name Type ----------------- ------------ OPERATION$ CHAR(2) CSCN$ NUMBER COMMIT_TIMESTAMP$ DATE RSID$ NUMBER USERNAME$ VARCHAR2(30) TIMESTAMP$ DATE SOURCE_COLMAP$ RAW(128) TARGET_COLMAP$ RAW(128) DEPTNO NUMBER(2) DNAME VARCHAR2(14) LOC VARCHAR2(13) |
CONNECT scott/tiger@whatever GRANT SELECT ON emp TO boris_subscriber; GRANT SELECT ON dept TO boris_subscriber; CONNECT boris_publisher/boris_publisher GRANT SELECT ON cdc_emp TO boris_subscriber; GRANT SELECT ON cdc_dept TO boris_subscriber; |
| Subscribe all the tables, activate subscription, extend CDC window |
|---|
connect boris_subscriber/boris_subscriber@whatever
set serveroutput on size 1000000
set linesize 120
/* Login as a subscriber to run this */
BEGIN
DECLARE vSubhandle NUMBER;
v_subscription_description VARCHAR2(40):='scott -> Datawarehouse';
v_publisher_id VARCHAR2(20) := 'BORIS_PUBLISHER';
v_source_schema VARCHAR2(20) := 'SCOTT';
v_subscriber_id VARCHAR2(20) := 'BORIS_SUBSCRIBER';
v_source_table VARCHAR2(100);
v_cdc_table VARCHAR2(100);
col_names VARCHAR2(32767);
v_err VARCHAR2(10):='OK';
CURSOR c_tables IS
SELECT 'emp' table_name FROM dual
UNION
SELECT 'dept' table_name FROM dual
;
BEGIN
-- find/create a subscription
BEGIN
vSubhandle :=0;
-- get the handle
SELECT handle INTO vSubhandle FROM all_subscriptions
WHERE description = v_subscription_description;
EXCEPTION
WHEN OTHERS THEN
vSubhandle := 0;
dbms_output.put_line('Subscription '
||v_subscription_description
|| ' does not exist in the database');
END;
BEGIN
IF (vSubhandle = 0) THEN
-- create handle
DBMS_CDC_SUBSCRIBE.GET_SUBSCRIPTION_HANDLE(
CHANGE_SET => 'SYNC_SET',
DESCRIPTION => v_subscription_description,
SUBSCRIPTION_HANDLE => vSubhandle
);
dbms_output.put_line('Obtained handle '
||TO_CHAR(vSubhandle)
|| ' for subscription '||v_subscription_description
);
END IF;
EXCEPTION
WHEN OTHERS THEN
DBMS_output.put_line(
'Error start ***********************************');
DBMS_output.put_line(
'Error getting a handle '
|| ' for a CDC subscription '
|| v_subscription_description|| ':');
DBMS_output.put_line(to_char(sqlcode)|| ' ' || sqlerrm
|| ' Handle = ' ||TO_CHAR(vSubhandle));
DBMS_output.put_line(
'Error end ***********************************');
v_err :='ERROR';
END;
-- Subscribe for all the tables
FOR tables IN c_tables LOOP
v_source_table := UPPER(TRIM(tables.table_name));
BEGIN
IF (v_err = 'ERROR') THEN RETURN; END IF;
v_cdc_table := 'CDC_'||v_source_table;
col_names := etl_util.get_col_names(
v_source_schema,v_source_table, '','');
DBMS_LOGMNR_CDC_SUBSCRIBE.SUBSCRIBE (vSubhandle
, v_source_schema,
v_source_table, col_names);
DBMS_output.put_line('Added table '|| v_source_schema ||'.'
|| v_source_table
|| ' to the CDC subscription '''
|| v_subscription_description
|| ''' successfully. Handle = '||TO_CHAR(vSubhandle));
EXCEPTION
WHEN OTHERS THEN
DBMS_output.put_line(
'Error start ***********************************');
DBMS_output.put_line('Error adding table '
|| v_source_schema||'.'||v_source_table
|| ' to the CDC subscription '''
|| v_subscription_description
|| ''':');
DBMS_output.put_line(to_char(sqlcode)|| ' ' || sqlerrm
|| ' Handle = ' ||TO_CHAR(vSubhandle));
DBMS_output.put_line(
'Error end ***********************************');
v_err :='ERROR';
END;
END LOOP;
BEGIN
-- Activate the subscription
IF (v_err = 'ERROR') THEN RETURN; END IF;
DBMS_CDC_SUBSCRIBE.ACTIVATE_SUBSCRIPTION(vSubhandle);
DBMS_output.put_line('CDC subscription '''
|| v_subscription_description
||''' was successfully activated. Handle = '
||TO_CHAR(vSubhandle));
EXCEPTION
WHEN OTHERS THEN
DBMS_output.put_line(
'Error start ***********************************');
DBMS_output.put_line(
'Error activating the CDC subscription '''
|| v_subscription_description || ''':');
DBMS_output.put_line(to_char(sqlcode)|| ' '
|| sqlerrm || ' Handle = '
||TO_CHAR(vSubhandle));
DBMS_output.put_line(
'Error end ***********************************');
v_err :='ERROR';
END;
-- Extend the window immediately
BEGIN
-- Extend the window for subscription
IF (v_err = 'ERROR') THEN RETURN; END IF;
DBMS_CDC_SUBSCRIBE.EXTEND_WINDOW(SUBSCRIPTION_HANDLE=>vSubhandle);
DBMS_output.put_line('Window to the CDC subscription '''
|| v_subscription_description
|| ''' was successfully extended');
EXCEPTION
WHEN OTHERS THEN
DBMS_output.put_line(
'Error start ***********************************');
DBMS_output.put_line(
'Error during window extention to CDC subscription '''
||v_subscription_description || ''':');
DBMS_output.put_line('Handle # '||TO_CHAR(vSubhandle));
DBMS_output.put_line(to_char(sqlcode)|| ' ' || sqlerrm);
DBMS_output.put_line(
'Error end ***********************************');
v_err :='ERROR';
END;
END;
END;
/
|
Added table SCOTT.DEPT to the CDC subscription 'scott -> Datawarehouse' successfully. Handle = 87 Added table SCOTT.EMP to the CDC subscription 'scott -> Datawarehouse' successfully. Handle = 87 CDC subscription 'scott -> Datawarehouse' was successfully activated. Handle = 87 Window to the CDC subscription 'scott -> Datawarehouse' was successfully extended |
| Extend substription window and create CDC views |
|---|
connect boris_subscriber/boris_subscriber@whatever
set serveroutput on size 1000000
set linesize 200
/* Login as a subscriber to run this */
BEGIN
DECLARE vSubhandle NUMBER;
v_subscription_description VARCHAR2(40):='scott -> Datawarehouse';
v_publisher_id VARCHAR2(20) := 'BORIS_PUBLISHER';
v_source_schema VARCHAR2(20) := 'SCOTT';
v_subscriber_id VARCHAR2(20) := 'BORIS_SUBSCRIBER';
v_source_table VARCHAR2(100);
v_cdc_table VARCHAR2(100);
v_cdc_view_name VARCHAR2(40);
our_view_name VARCHAR2(200);
vSQL VARCHAR2(1000);
CURSOR c_tables IS
SELECT 'emp' table_name FROM dual
UNION
SELECT 'dept' table_name FROM dual
;
BEGIN
-- extend the window
BEGIN
-- get the handle
SELECT handle INTO vSubhandle FROM all_subscriptions
WHERE description = v_subscription_description;
-- Extend the window for subscription
DBMS_CDC_SUBSCRIBE.EXTEND_WINDOW(SUBSCRIPTION_HANDLE=>vSubhandle);
DBMS_output.put_line('Window to the CDC subscription '''
|| v_subscription_description
|| ''' was successfully extended');
EXCEPTION WHEN OTHERS THEN
DBMS_output.put_line(
'Error start *************************************');
DBMS_output.put_line(
'Error during window extention to CDC subscription '''
||v_subscription_description || ''':');
DBMS_output.put_line('Handle # '||TO_CHAR(vSubhandle));
DBMS_output.put_line(to_char(sqlcode)|| ' ' || sqlerrm);
DBMS_output.put_line(
'Error end *************************************');
END;
-- create subscriber views/synonyms
BEGIN
FOR tables IN c_tables LOOP
-- create a view
v_source_table := UPPER(TRIM(tables.table_name));
BEGIN
v_cdc_table := 'CDC_'||v_source_table;
v_cdc_view_name:=v_cdc_table|| '_vw';
DBMS_CDC_SUBSCRIBE.PREPARE_SUBSCRIBER_VIEW(
SUBSCRIPTION_HANDLE=> vSubhandle
,SOURCE_SCHEMA => v_source_schema
,SOURCE_TABLE => v_source_table
,VIEW_NAME => our_view_name);
DBMS_output.put_line('Subscriber view '''
||our_view_name|| ''' was successfully created for table '
|| v_source_schema||'.'||v_source_table);
EXCEPTION
WHEN OTHERS THEN
DBMS_output.put_line(
'Error start **********************************');
DBMS_output.put_line(
'Error during creation of subscriber view'
|| for source table '
||v_source_table||': ');
DBMS_output.put_line(to_char(sqlcode)|| ' ' || sqlerrm
|| ' Handle #'||TO_CHAR(vSubhandle));
DBMS_output.put_line(
'Error end **********************************');
END;
-- drop the previous synonym
BEGIN
vSQL := 'DROP SYNONYM ' || v_cdc_view_name;
EXECUTE IMMEDIATE vSQL;
EXCEPTION WHEN OTHERS THEN NULL;
END; -- create a private synonym to point to this view:
BEGIN
vSQL := 'CREATE SYNONYM ' || v_cdc_view_name
||' FOR '|| our_view_name;
EXECUTE IMMEDIATE vSQL;
DBMS_output.put_line('Private synonym ''' || v_cdc_view_name
|| ''' for view ''' || our_view_name
|| ''' was successfully created.');
EXCEPTION
WHEN OTHERS THEN
DBMS_output.put_line(
'Error start ******************************');
DBMS_output.put_line(
'Error during creation of the synonym '''
|| v_cdc_view_name || ''' for view '''
|| our_view_name || ''': ');
DBMS_output.put_line(to_char(sqlcode)|| ' ' || sqlerrm);
DBMS_output.put_line(
'Error end ******************************');
END;
END LOOP;
END;
END;
END;
/
|
Window to the CDC subscription 'scott -> Datawarehouse' was successfully extended Subscriber view 'CDC#CV$8757846' was successfully created for table SCOTT.DEPT Private synonym 'CDC_DEPT_vw' for view 'CDC#CV$8757846' was successfully created. Subscriber view 'CDC#CV$8757848' was successfully created for table SCOTT.EMP Private synonym 'CDC_EMP_vw' for view 'CDC#CV$8757848' was successfully created. |
CREATE OR REPLACE VIEW CDC#CV$8757846 ( OPERATION$, CSCN$, COMMIT_TIMESTAMP$, TIMESTAMP$, USERNAME$, TARGET_COLMAP$, SOURCE_COLMAP$, RSID$, DEPTNO, DNAME, LOC ) AS SELECT OPERATION$, CSCN$, COMMIT_TIMESTAMP$, TIMESTAMP$, USERNAME$, TARGET_COLMAP$, SOURCE_COLMAP$, RSID$, "DEPTNO", "DNAME", "LOC" FROM "BORIS_PUBLISHER"."CDC_DEPT" WHERE CSCN$ >= 40802127 AND CSCN$ <= 41013754 WITH READ ONLY |
CREATE VIEW CDC_EMP_VW AS
SELECT'I' operation$, 1 cscn$, SYSDATE commit_timestamp$, 1 rsid$
, 'initial_load' username$, SYSDATE timestamp$, HEXTORAW('FEFFFFFF)' SOURCE_COLMAP$
, HEXTORAW('FEFFFFFF') TARGET_COLMAP$ , t.*
FROM emp t;
|
| Column id | Binary Value | Hex Value |
|---|---|---|
| 1 | 10 | 2 |
| 2 | 100 | 4 |
| 3 | 1000 | 8 |
| 4 | 10000 | 10 |
| 5 | 100000 | 20 |
| 6 | 1000000 | 40 |
| 7 | 10000000 | 80 |
| 8 | 00000000 00000001 | 00 01 |
| 9 | 00000000 00000010 | 00 02 |
| . . . | . . . | . . . |
| 16 | 00000001 00000000 00000000 | 01 00 00 |
| Drop CDC Views and Purge CDC Window |
|---|
connect boris_subscriber/boris_subscriber@whatever
set serveroutput on size 1000000
set linesize 200
/* Login as a subscriber to run this */
BEGIN
DECLARE vSubhandle NUMBER;
v_subscription_description VARCHAR2(40):=
'scott -> Datawarehouse';
v_publisher_id VARCHAR2(20) := 'BORIS_PUBLISHER';
v_source_schema VARCHAR2(20) := 'SCOTT';
v_subscriber_id VARCHAR2(20) := 'BORIS_SUBSCRIBER';
v_source_table VARCHAR2(100);
v_cdc_table VARCHAR2(100);
v_cdc_view_name VARCHAR2(40);
our_view_name VARCHAR2(200);
vSQL VARCHAR2(1000);
v_err VARCHAR2(10):='OK';
CURSOR c_tables IS
SELECT 'emp' table_name FROM dual
UNION
SELECT 'dept' table_name FROM dual
;
BEGIN
-- find a subscription
BEGIN
vSubhandle :=0;
-- get the handle
SELECT handle INTO vSubhandle FROM all_subscriptions
WHERE description = v_subscription_description;
EXCEPTION
WHEN OTHERS THEN
vSubhandle := 0;
dbms_output.put_line('Subscription '
||v_subscription_description
|| ' does not exist in the database'
);
END;
FOR tables IN c_tables LOOP
v_source_table := UPPER(TRIM(tables.table_name));
BEGIN
v_cdc_table := 'CDC_'||v_source_table;
v_cdc_view_name:=v_cdc_table || '_vw';
-- drop the synonym
BEGIN
vSQL := 'DROP SYNONYM ' || v_cdc_view_name;
EXECUTE IMMEDIATE vSQL;
EXCEPTION WHEN OTHERS THEN NULL;
END;
-- drop the subscriber view
DBMS_CDC_SUBSCRIBE.DROP_SUBSCRIBER_VIEW(
SUBSCRIPTION_HANDLE=> vSubhandle
,SOURCE_SCHEMA => v_source_schema
, SOURCE_TABLE => v_source_table);
DBMS_output.put_line('Subscriber View for CDC table '''
|| v_cdc_table
|| ''' was successfully dropped. Handle # '
||TO_CHAR(vSubhandle));
EXCEPTION
WHEN OTHERS THEN
DBMS_output.put_line(
'Error start ********************************');
DBMS_output.put_line(
'Error during dropping the subscriber view for '''
|| v_cdc_table || ''' '
|| ' to the CDC subscription '''
|| v_subscription_description || ''':');
DBMS_output.put_line(to_char(sqlcode)|| ' '
|| sqlerrm
|| ' Handle = '||TO_CHAR(vSubhandle));
DBMS_output.put_line(
'Error end ********************************');
END;
END LOOP;
BEGIN
-- purge window
DBMS_CDC_SUBSCRIBE.PURGE_WINDOW(
SUBSCRIPTION_HANDLE=> vSubhandle);
DBMS_output.put_line(
'Subscriber Window for subscription '''
|| v_subscription_description
|| ''' was successfully purged ');
EXCEPTION WHEN OTHERS THEN
DBMS_output.put_line(
'Error during subscriber view drop: ');
DBMS_output.put_line(to_char(sqlcode)|| ' '
|| sqlerrm
||' Handle # '||TO_CHAR(vSubhandle));
END;
END;
END;
/
|
Subscriber View for CDC table 'CDC_DEPT' was successfully dropped. Handle # 86 Subscriber View for CDC table 'CDC_EMP' was successfully dropped. Handle # 86 Subscriber Window for subscription 'scott -> Datawarehouse' was successfully purged |
Back to BIK Information Services home page
Go to top