Home » Oracle DBA » Oracle Streams Step by Step Replication Example

Oracle Streams Step by Step Replication Example

What is Oracle real-time backup system?

Oracle Streams replication is the process which enables Oracle database schema to replicate to another Oracle database, which can be located to any location. Whenever a transaction occurs in the primary database DML or DDL, it will execute immediately to the target database to maintain a live backup copy. Below I am explaining and providing scripts for Oracle Streams step by step replication example.

Oracle Streams Replication Setup

In this post, I am describing steps to create an Oracle database replication through Oracle Streams feature, for standby database or database which can be used for reporting and analysis purpose or any other purposes.
This Oracle Streams feature is supported in Oracle 11g Enterprise Edition and 10gR2 and 9iR2. Oracle Streams is no longer supported in 12c; now Oracle is using Oracle Golden Gate for this feature.
I described below simple steps to create a replication to another Oracle database, for every step I am providing SQL script which you can copy and save as step-1, step-2 and so on, then run the scripts one by one to set up the replication.
But before running the scripts, change the values to your values which are in square brackets [ ] and replace it with carefully, all values in square brackets having meaning full names so change accordingly.

Important! Create an entry in tnsname.ora for both source and target databases for each other connection.

Step-1 Configuring Target or Secondary Database

CONNECT [TARGET_SYS_CONNECTION] AS SYSDBA;
ALTER SYSTEM SET AQ_TM_PROCESSES=3 SCOPE=BOTH;
alter system set global_names=TRUE scope=BOTH;
alter system set job_queue_processes=6 scope=BOTH;
alter system set streams_pool_size=100M scope=BOTH;
alter system set undo_retention=3600 scope=BOTH;
shutdown immediate;
startup mount;
alter database open;

Step-2 Configuring Source or Primary Database

CONNECT [SOURCE_SYS_CONNECTION1] AS SYSDBA;
ALTER SYSTEM SET AQ_TM_PROCESSES=3 SCOPE=BOTH;
alter system set global_names=TRUE scope=BOTH;
alter system set job_queue_processes=6 scope=BOTH;
alter system set streams_pool_size=48M scope=BOTH;
ALTER SYSTEM SET SMTP_OUT_SERVER='[MAIL_SERVER] [MAIL_PORT]' SCOPE=BOTH;
--- UNDO_RETENTION=86400
alter system set undo_retention=3600 scope=BOTH;
--- UNDO_MANAGEMENT=AUTO
Alter System Set LOG_ARCHIVE_DEST_1 = '[ARCHIVELOG_DESTINATION]' Scope=Spfile;
Alter System Set LOG_ARCHIVE_DEST_STATE_1=enable Scope=Spfile;
shutdown immediate;
startup mount;
alter database archivelog;
alter database open;
Alter Database Add Supplemental Log Data
(Primary Key, Unique, Foreign Key) Columns;
Alter Database Force Logging;
Alter System Archive Log Current;

Step-3 Create Tablespace for Source Database

connect [source_sys_connection1] as sysdba;
create tablespace [logminer_ts_name]
datafile '[logminer_datafile]'
size 100m
reuse autoextend on maxsize unlimited;
execute dbms_logmnr_d.set_tablespace('[logminer_ts_name]');
create tablespace [stream_ts_name]
datafile '[stream_datafile]'
size 100m
reuse autoextend on maxsize unlimited;

Step-4 Create Oracle Streams Admin User for Source Database

connect [source_sys_connection1] as sysdba;

create user [source_stream_admin]
  identified by [source_stream_psw]
  default tablespace [stream_ts_name]
  temporary tablespace temp;
  -- 9 roles for [source_stream_admin]
  grant resource to [source_stream_admin];
  grant datapump_imp_full_database to [source_stream_admin];
  grant imp_full_database to [source_stream_admin];
  grant datapump_exp_full_database to [source_stream_admin];
  grant exp_full_database to [source_stream_admin];
  grant dba to [source_stream_admin];
  grant connect to [source_stream_admin];
  grant aq_administrator_role to [source_stream_admin];
  grant select_catalog_role to [source_stream_admin];
  alter user [source_stream_admin] default role resource, imp_full_database, exp_full_database, dba, connect, aq_administrator_role, select_catalog_role;
  -- 15 system privileges for [source_stream_admin]
  begin
sys.dbms_rule_adm.grant_system_privilege(
  privilege    => sys.dbms_rule_adm.alter_any_rule_set,
  grantee      => '[source_stream_admin]',
  grant_option => true);
end;
/
  begin
sys.dbms_aqadm.grant_system_privilege (
  privilege    => 'dequeue_any',
  grantee      => '[source_stream_admin]',
  admin_option => true);
end;
/
  begin
sys.dbms_rule_adm.grant_system_privilege(
  privilege    => sys.dbms_rule_adm.alter_any_rule,
  grantee      => '[source_stream_admin]',
  grant_option => true);
end;
/
  begin
sys.dbms_rule_adm.grant_system_privilege(
  privilege    => sys.dbms_rule_adm.create_rule_obj,
  grantee      => '[source_stream_admin]',
  grant_option => true);
end;
/
  grant unlimited tablespace to [source_stream_admin];
  grant restricted session to [source_stream_admin];
  begin
sys.dbms_rule_adm.grant_system_privilege(
  privilege    => sys.dbms_rule_adm.execute_any_rule_set,
  grantee      => '[source_stream_admin]',
  grant_option => true);
end;
/
  begin
sys.dbms_rule_adm.grant_system_privilege(
  privilege    => sys.dbms_rule_adm.create_any_rule_set,
  grantee      => '[source_stream_admin]',
  grant_option => true);
end;
/
  grant create session to [source_stream_admin];
  begin
sys.dbms_aqadm.grant_system_privilege (
  privilege    => 'enqueue_any',
  grantee      => '[source_stream_admin]',
  admin_option => true);
end;
/
  begin
sys.dbms_rule_adm.grant_system_privilege(
  privilege    => sys.dbms_rule_adm.execute_any_rule,
  grantee      => '[source_stream_admin]',
  grant_option => true);
end;
/
  begin
sys.dbms_rule_adm.grant_system_privilege(
  privilege    => sys.dbms_rule_adm.create_any_rule,
  grantee      => '[source_stream_admin]',
  grant_option => true);
end;
/
  begin
sys.dbms_rule_adm.grant_system_privilege(
  privilege    => sys.dbms_rule_adm.create_evaluation_context_obj,
  grantee      => '[source_stream_admin]',
  grant_option => true);
end;
/
  begin
sys.dbms_aqadm.grant_system_privilege (
  privilege    => 'manage_any',
  grantee      => '[source_stream_admin]',
  admin_option => true);
end;
/
  begin
sys.dbms_rule_adm.grant_system_privilege(
  privilege    => sys.dbms_rule_adm.create_rule_set_obj,
  grantee      => '[source_stream_admin]',
  grant_option => true);
end;
/

Step-5 Creating Streams Admin for the target database

connect [target_sys_connection] as sysdba;

--- run as sys user at the target
create user [target_stream_admin]
  identified by [target_stream_psw];
  -- 9 roles for strmadmin
  grant resource to [target_stream_admin];
  grant datapump_imp_full_database to [target_stream_admin];
  grant imp_full_database to [target_stream_admin];
  grant datapump_exp_full_database to [target_stream_admin];
  grant exp_full_database to [target_stream_admin];
  grant dba to [target_stream_admin];
  grant connect to [target_stream_admin];
  grant aq_administrator_role to [target_stream_admin];
  grant select_catalog_role to [target_stream_admin];
  alter user [target_stream_admin] default role resource, imp_full_database, exp_full_database, dba, connect, aq_administrator_role, select_catalog_role;
  -- 15 system privileges for [target_stream_admin]
  begin
sys.dbms_rule_adm.grant_system_privilege(
  privilege    => sys.dbms_rule_adm.alter_any_rule_set,
  grantee      => '[target_stream_admin]',
  grant_option => true);
end;
/
  begin
sys.dbms_aqadm.grant_system_privilege (
  privilege    => 'dequeue_any',
  grantee      => '[target_stream_admin]',
  admin_option => true);
end;
/
  begin
sys.dbms_rule_adm.grant_system_privilege(
  privilege    => sys.dbms_rule_adm.alter_any_rule,
  grantee      => '[target_stream_admin]',
  grant_option => true);
end;
/
  begin
sys.dbms_rule_adm.grant_system_privilege(
  privilege    => sys.dbms_rule_adm.create_rule_obj,
  grantee      => '[target_stream_admin]',
  grant_option => true);
end;
/
  grant unlimited tablespace to [target_stream_admin];
  grant restricted session to [target_stream_admin];
  begin
sys.dbms_rule_adm.grant_system_privilege(
  privilege    => sys.dbms_rule_adm.execute_any_rule_set,
  grantee      => '[target_stream_admin]',
  grant_option => true);
end;
/
  begin
sys.dbms_rule_adm.grant_system_privilege(
  privilege    => sys.dbms_rule_adm.create_any_rule_set,
  grantee      => '[target_stream_admin]',
  grant_option => true);
end;
/
  grant create session to [target_stream_admin];
  begin
sys.dbms_aqadm.grant_system_privilege (
  privilege    => 'enqueue_any',
  grantee      => '[target_stream_admin]',
  admin_option => true);
end;
/
  begin
sys.dbms_rule_adm.grant_system_privilege(
  privilege    => sys.dbms_rule_adm.execute_any_rule,
  grantee      => '[target_stream_admin]',
  grant_option => true);
end;
/
  begin
sys.dbms_rule_adm.grant_system_privilege(
  privilege    => sys.dbms_rule_adm.create_any_rule,
  grantee      => '[target_stream_admin]',
  grant_option => true);
end;
/
  begin
sys.dbms_rule_adm.grant_system_privilege(
  privilege    => sys.dbms_rule_adm.create_evaluation_context_obj,
  grantee      => '[target_stream_admin]',
  grant_option => true);
end;
/
  begin
sys.dbms_aqadm.grant_system_privilege (
  privilege    => 'manage_any',
  grantee      => '[target_stream_admin]',
  admin_option => true);
end;
/
  begin
sys.dbms_rule_adm.grant_system_privilege(
  privilege    => sys.dbms_rule_adm.create_rule_set_obj,
  grantee      => '[target_stream_admin]',
  grant_option => true);
end;
/

Step-6 Creating Database Link for Source Database

connect [source_stream_connection]

--- link for target database...
create database link [source_dblink] connect to [target_stream_admin] identified by [target_stream_psw] using '[target_connect_string]';

Step-7 Creating Database Link for Target Database

connect [target_stream_connection]

create database link [target_dblink] connect to [source_stream_admin] identified by [source_stream_psw] using '[source_connect_string]';

Step-8 Creating Datapumb Directory on Target database

connect [target_stream_connection]

create or replace directory
[target_datapump_dir] as
'[target_dir_location]';

Step-9 Creating Capture process at the source database

set echo off;
 ---accept strm_pwd_src prompt 'enter password of streams admin "strmadmin" at source : ' hide
 ---accept strm_pwd_dest prompt 'enter password of streams admin "strmadmin" at destination : ' hide
connect   [source_stream_connection];
declare
  cursor vin is
    select ruleset_name from dba_rulesets where owner = '[source_stream_admin]';
begin
 for i in vin loop
 begin
  dbms_rule_adm.drop_rule_set(
    rule_set_name => '[source_stream_admin].'||i.ruleset_name,
    delete_rules  => true);
  exception
    when others then
 null;
  end;
  end loop;
end;
/
begin
  dbms_streams_adm.set_up_queue(
    queue_table => '"streams_capture_qt"',
    queue_name  => '"streams_capture_q"',
    queue_user  => '"[source_stream_admin]"');
end;
/
begin
  dbms_streams_adm.add_schema_propagation_rules(
   schema_name            => '"[source_schema]"',
   streams_name           => '"streams_propagation1"',
    source_queue_name      => '"strmadmin"."streams_capture_q"',
    destination_queue_name => '"strmadmin"."streams_apply_q"@[source_dblink]',
    include_dml            => true,
    include_ddl            => true,
    source_database        => '[source_database]',
    inclusion_rule           => true);
end;
/
commit;
begin
  dbms_streams_adm.add_schema_rules(
    schema_name        => '"[source_schema]"',
    streams_type       => 'capture',
    streams_name       => '"streams_capture"',
    queue_name         => '"[source_stream_admin]"."streams_capture_q"',
    include_dml        => true,
    include_ddl        => true,
    include_tagged_lcr => false,
    inclusion_rule     => true);
end;
/
-- repeat above 2 steps for each schema
commit;
declare
 cursor vin is
    select table_name
from dba_streams_unsupported
where owner = '[source_schema]';
begin
for c in vin loop
 dbms_streams_adm.add_table_rules (
 table_name => '[source_schema].'||c.table_name,
 streams_type => 'capture',
 streams_name => 'streams_capture',
 queue_name => '[source_stream_admin].streams_capture_q',
 include_dml => true,
 include_ddl => false,
 inclusion_rule => false,
 source_database => '[source_database]'
 );
end loop;
commit;
end;
/

Step-10 Creating Apply process at the target database

connect [target_stream_connection]

begin
  dbms_streams_adm.set_up_queue(
    queue_table => '"streams_apply_qt"',
    queue_name  => '"streams_apply_q"',
    queue_user  => '"[target_stream_admin]"');
end;
/
begin
  dbms_streams_adm.add_schema_rules(
    schema_name        => '"[source_schema]"',
    streams_type       => 'apply',
    streams_name       => '"streams_apply"',
    queue_name         => '"[target_stream_admin]"."streams_apply_q"',
    include_dml        => true,
    include_ddl        => true,
    include_tagged_lcr => false,
    inclusion_rule     => true);
end;
/
commit;
begin
  dbms_apply_adm.set_parameter(
    apply_name  => 'streams_apply',
    parameter   => 'disable_on_error',
    value       => 'n');
end;
/
begin
  dbms_apply_adm.set_parameter(
    apply_name  => 'streams_apply',
    parameter   => 'allow_duplicate_rows',
    value       => 'y');
end;
/

Step-11 Instantiating Target Database

connect [target_stream_connection]

set serverout on;
execute dbms_output.enable(50000);
declare
  handle1 number;
  ind number;
  percent_done number;
  job_state varchar2(30);
  le ku$_logentry;
  js ku$_jobstatus;
  jd ku$_jobdesc;
  sts ku$_status;
begin
  begin
  execute immediate 'drop user '||'[source_schema]'||' cascade';
  exception
   when others then
     null;
  end;
  handle1 := dbms_datapump.open('import','schema', '[target_dblink]');
  dbms_datapump.add_file(handle1, 'streamimport_1349243553109.log', '[target_datapump_dir]', '',  dbms_datapump.ku$_file_type_log_file);
  dbms_datapump.metadata_filter(handle1, 'schema_expr', 'in (''[source_schema]'')');
   --'in (''cdsl11'',''kra'')' for multiple
  dbms_datapump.set_parameter(handle1, 'include_metadata', 1);
  dbms_datapump.start_job(handle1);
  percent_done :=0;
  job_state := 'undefined';
  while (job_state != 'completed') and (job_state != 'stopped') loop
  dbms_datapump.get_status(handle1, dbms_datapump.ku$_status_job_error + dbms_datapump.ku$_status_job_status + dbms_datapump.ku$_status_wip,-1,job_state,sts);
  js := sts.job_status;
  if js.percent_done != percent_done
  then
     dbms_output.put_line('*** job percent done = ' || to_char(js.percent_done));
     percent_done := js.percent_done;
  end if;
  if(bitand(sts.mask, dbms_datapump.ku$_status_wip) != 0)
  then
    le := sts.wip;
  else
     if(bitand(sts.mask,dbms_datapump.ku$_status_job_error) != 0)
     then
       le := sts.error;
     else
       le := null;
     end if;
  end if;
  if le is not null
  then
    ind := le.first;
    while ind is not null loop
      dbms_output.put_line(le(ind).logtext);
      ind := le.next(ind);
    end loop;
  end if;
  end loop;
  dbms_output.put_line('job has completed');
  dbms_output.put_line('final job state = ' || job_state);
  dbms_datapump.detach(handle1);
end;
/
connect [source_stream_connection]
create or replace trigger ddltrigger
   after create on database
declare
   n   number;
begin
   if ora_dict_obj_type = 'table' and ora_dict_obj_owner = '[source_schema]'
   then
      begin
         select 1
           into n
           from dba_streams_unsupported
          where owner = '[source_schema]' and table_name = ora_dict_obj_name;
         if n = 1
         then
            dbms_streams_adm.add_table_rules
                                          (table_name           => '[source_schema].'
                                                                   || ora_dict_obj_name,
                                           streams_type         => 'capture',
                                           streams_name         => 'streams_capture',
                                           queue_name           => '[source_stream_admin].streams_capture_q',
                                           include_dml          => true,
                                           include_ddl          => false,
                                           inclusion_rule       => false,
                                           source_database      => '[source_database]'
                                          );
         end if;
      exception
         when others
         then
            null;
      end;
   end if;
end;
/

Step-12 Starting Apply process at the arget database

connect [target_stream_connection]

set serverout on;
-- get instantiation number
declare
 iscn number;
 begin
iscn := dbms_flashback.get_system_change_number();
dbms_apply_adm.set_schema_instantiation_scn(
    source_schema_name   => '"[source_schema]"',
source_database_name => '[source_database]',
instantiation_scn    => iscn,
recursive            => true);
end;
/
declare
   v_started number;
begin
select decode(status, 'enabled', 1, 0) into v_started
 from dba_apply where apply_name = 'streams_apply';
 if (v_started = 0) then
  dbms_apply_adm.start_apply(apply_name => '"streams_apply"');
 end if;
end;
/

Step-13 Starting Source Capture process

connect [source_stream_connection]

set serverout on;
declare
   v_started number;
begin
select decode(status, 'enabled', 1, 0) into v_started
 from dba_capture where capture_name = 'streams_capture';
 if (v_started = 0) then
  dbms_capture_adm.start_capture(capture_name => '"streams_capture"');
 end if;
end;
/
begin
dbms_output.put_line('*** progress message ===> started the capture process streams_capture at source database estock and the apply process streams_apply at the destination database successfully. ***');
end;
/
Your setup is now complete, to check that your steams setup is working properly or not use the following commands by logging in with streams admin credentials you created for the source database.

Select * From V$STREAMS_CAPTURE;

Select * From DBA_CAPTURE;
Select * From DBA_PROPAGATION;
Select * From DBA_CAPTURE_PREPARED_TABLES;
And for target database use the following queries by logging in with streams user credentials on the target database.

Select * From DBA_APPLY;

Select * From DBA_APPLY_CONFLICT_COLUMNS;
Select * From DBA_APPLY_EXECUTE;
Select * From DBA_APPLY_ERROR;
Select * From DBA_APPLY_PROGRESS;
Select * From DBA_APPLY_INSTANTIATED_OBJECTS;
Select * From V$STREAMS_TRANSACTION;
Select *
From V$PROPAGATION_RECEIVER;
In case you want to stop the target apply process use the following script:

connect [target_stream_connection]

set serverout on;
declare
   v_started number;
begin
select decode(status, 'enabled', 1, 0) into v_started
 from dba_apply where apply_name = 'streams_apply';
 if (v_started = 1) then
  dbms_apply_adm.stop_apply(apply_name => '"streams_apply"');
 end if;
end;
/
In case you want to stop the source database capture process use the following process:

connect [source_stream_connection]

set serverout on;
declare
   v_started number;
begin
select decode(status, 'enabled', 1, 0) into v_started
 from dba_capture where capture_name = 'streams_capture';
 if (v_started = 0) then
  dbms_capture_adm.start_capture(capture_name => '"streams_capture"');
 end if;
end;
/
begin
dbms_output.put_line('*** progress message ===> started the capture process streams_capture at source database estock and the apply process streams_apply at the destination database successfully. ***');
end;
/

If you are not able to modify these scripts then you can buy this utility, based on vb.net check this link Oracle Replication Software.

Oracle Streams step by step replication example