源端数据库:Oracle11.2.0.4 双节点RAC Service:BDEDW
downstream服务器:Oracle11.2.0.4 单节点 Service:BDEDWDSOGG抽取端:12.2.0.1.1OGG复制端:OGG for Big Data Version 12.3.1.1.1目标端:Kafka 0.10.0.0抽取方式:OGG Integrated通过downstream服务器抽取一、downstream服务器设置
1.Oracle安装,略 2.归档模式打开数据库: alter system set log_archive_dest_1='LOCATION=/opt/app/oracle/archivelog VALID_FOR=(ONLINE_LOGFILE,PRIMARY_ROLE))'; alter system set log_archive_dest_state_1=enable; shutdown immediate startup mount alter database archivelog; alter database open 3.添加standby log standby log添加原则: 1)standby redo log size >= source log file size 2)The number of standby log file groups >= The number of source online log file groups+1.So if you have "n" threads at the source database, each having "m" redo log groups, you should configure n*(m+1) redo log groups at the downstream mining database. ALTER DATABASE ADD STANDBY LOGFILE THREAD 1 GROUP 13 ('/opt/app/oracle/oradata/BDEDWDS/standby_13.log') SIZE 8192M; ALTER DATABASE ADD STANDBY LOGFILE THREAD 1 GROUP 14 ('/opt/app/oracle/oradata/BDEDWDS/standby_14.log') SIZE 8192M; ALTER DATABASE ADD STANDBY LOGFILE THREAD 1 GROUP 15 ('/opt/app/oracle/oradata/BDEDWDS/standby_15.log') SIZE 8192M; ALTER DATABASE ADD STANDBY LOGFILE THREAD 1 GROUP 16 ('/opt/app/oracle/oradata/BDEDWDS/standby_16.log') SIZE 8192M; ALTER DATABASE ADD STANDBY LOGFILE THREAD 2 GROUP 17 ('/opt/app/oracle/oradata/BDEDWDS/standby_17.log') SIZE 8192M; ALTER DATABASE ADD STANDBY LOGFILE THREAD 2 GROUP 18 ('/opt/app/oracle/oradata/BDEDWDS/standby_18.log') SIZE 8192M; ALTER DATABASE ADD STANDBY LOGFILE THREAD 2 GROUP 19 ('/opt/app/oracle/oradata/BDEDWDS/standby_19.log') SIZE 8192M; ALTER DATABASE ADD STANDBY LOGFILE THREAD 2 GROUP 20 ('/opt/app/oracle/oradata/BDEDWDS/standby_20.log') SIZE 8192M; 4.为standby log指定归档目录 alter system set log_archive_dest_2='LOCATION=/opt/app/oracle/standbyarch VALID_FOR=(STANDBY_LOGFILE,PRIMARY_ROLE)'; alter system set log_archive_dest_state_2=enable; 5.downstream服务器上创建OGG用户(will be used to retrieve logical change records from the logmining server at the downstream mining database) create user ogg identified by xxx; grant connect,resource,dba to ogg; exec dbms_goldengate_auth.grant_admin_privilege('OGG');(CREATE RULE、CREATE RULE SET、SELECT ANY TABLE、ALTER ANY TABLE、SELECT ANY TRANSACTION、CREATE JOB、EXECUTE ANY RULE SET、CREATE EVALUATION CONTEXT、ALTER SESSION、DEQUEUE ANY QUEUE、FLASHBACK ANY TABLE、SELECT_CATALOG_ROLE等权限) 6.添加Source库的TNS: BDEDWDS = (DESCRIPTION = (ADDRESS = (PROTOCOL = TCP)(HOST = downstream ip)(PORT = 1521)) (CONNECT_DATA = (SERVER = DEDICATED) (SERVICE_NAME = BDEDWDS) ) )BDEDW =
(DESCRIPTION = (ADDRESS = (PROTOCOL = TCP)(HOST = 源ip)(PORT = 1521)) (CONNECT_DATA = (SERVER = DEDICATED) (SERVICE_NAME = BDEDW) ) ) 7.配置log_archive_config参数 ALTER SYSTEM SET LOG_ARCHIVE_CONFIG='DG_CONFIG=(BDEDW,BDEDWDS)' SCOPE=BOTH; 8.配置enable_goldengate_replication参数 alter system set enable_goldengate_replication=true;二、源库设置
1.归档模式打开数据库 alter system set log_archive_dest_1='LOCATION=+DATA_SSD VALID_FOR=(ALL_LOGFILES,ALL_ROLES) DB_UNIQUE_NAME=BDEDW' sid='*' alter system set log_archive_dest_state_1=enable; .... 2.同步Source库的密码文件到downstream服务器,并在downstream服务器上修改密码文件名称使其适合本机 scp .. 3.设置downstream服务器的TNS: BDEDW = (DESCRIPTION = (ADDRESS = (PROTOCOL = TCP)(HOST = source ip)(PORT = 1521)) (CONNECT_DATA = (SERVER = DEDICATED) (SERVICE_NAME = BDEDW) ) ) BDEDWDS = (DESCRIPTION = (ADDRESS = (PROTOCOL = TCP)(HOST = downstream ip)(PORT = 1521)) (CONNECT_DATA = (SERVER = DEDICATED) (SERVICE_NAME = BDEDWDS) ) ) 4.创建OGG用户(will be used to fetch data and metadata from BDED) create user ogg identified by xxx; grant connect,resource,dba to ogg; exec dbms_goldengate_auth.grant_admin_privilege('OGG'); 5.配置log_archive_config参数 ALTER SYSTEM SET LOG_ARCHIVE_CONFIG='DG_CONFIG=(BDEDW,BDEDWDS)' SCOPE=BOTH; 6.设置archive参数,以传日志到downstream服务器 alter system set log_archive_dest_2='SERVICE=BDEDWDS ASYNC NOREGISTER VALID_FOR=(ONLINE_LOGFILES,PRIMARY_ROLE) DB_UNIQUE_NAME=BDEDWDS' sid='*'; alter system set log_archive_dest_state_2=enable;三、OGG配置
源端: 1.配置MGR port 3306 dynamicportlist 9001-9500 autostart er * autorestart er *,retries 4,waitminutes 4 startupvalidationdelay 5 purgeoldextracts ./dirdat/*,usecheckpoints,minkeephours 12 2.创建用户: create identialstore alter credentialstore add user ogg@BDEDW alias BDEDW alter credentialstore add user ogg@BDEDWDS alias BDEDWDS 3.downstream服务器上注册抽取进程 dblogin useridalias BDEDW miningdblogin useridalias BDEDWDS register extract kfk_e01 database 删除抽取进程:unregister extract kfk_e01 database 查看抽取进程:SELECT CAPTURE_NAME,STATUS FROM DBA_CAPTURE; 数据库中操作抽取进程: exec dbms_capture_adm.stop_capture('OGG$CAP_KFK_E01', true); exec dbms_capture_adm.drop_capture('OGG$CAP_KFK_E01', true); 4.添加抽取进程 ADD EXTRACT KFK_E01 INTEGRATED TRANLOG, BEGIN NOW ADD RMTTRAIL /oggbase/ogg108/OGG_KFK/dirdat/ka, EXTRACT KFK_E01, MEGABYTES 50EXTRACT KFK_E01
RMTHOST xx.xxx.xxx.xxx, MGRPORT 3326 SETENV (NLS_LANG = "AMERICAN_AMERICA.AL32UTF8") USERIDALIAS BDEDW TRANLOGOPTIONS MININGUSERALIAS BDEDWDS TRANLOGOPTIONS INTEGRATEDPARAMS (downstream_real_time_mine Y) GETTRUNCATES LOGALLSUPCOLS UPDATERECORDFORMAT FULL RMTTRAIL /oggbase/ogg108/OGG_KFK/dirdat/ka TABLE NEWSUSER.CRM_CUSTOMER;复制端:
OGG for bigdata版需要java支持,此次配置使用的java版本为1.8.0_161(如果低于1.8,要升级到1.8) 1.配置java环境变量 .bash_profile文件中添加如下配置(环境变量修改后,要重启MGR): export JAVA_HOME=/usr/lib/jvm/jre1.8.0_161 export PATH=$JAVA_HOME/bin:$PATH export LD_LIBRARY_PATH=$JAVA_HOME/lib/amd64/server:$JAVA_HOME/lib/amd64:$LD_LIBRARY_PATH 2.配置MGR port 3326 dynamicportlist 9001-9500 autostart er * autorestart er *,retries 4,waitminutes 4 startupvalidationdelay 5 purgeoldextracts ./dirdat/*,usecheckpoints,minkeephours 123.拷贝示例配置文件到$OGG_HOME/dirprm/下,并拷贝kafka相关lib包到$OGG_HOME/kafkalib/下
cp $OGG_HOME/AdapterExamples/big-data/kafka_connect/* $OGG_HOME/dirprm/ cp $KAFKA_HOME/libs/* $OGG_HOME/kafkalib/ 4.修改kafka配置文件$OGG_HOME/dirprm/kc.props: gg.handlerlist=kafkaconnect gg.handler.kafkaconnect.type=kafkaconnect gg.handler.kafkaconnect.kafkaProducerConfigFile=custom_kafka_producer.properties gg.handler.kafkaconnect.mode=tx gg.handler.kafkaconnect.topicMappingTemplate=${fullyQualifiedTableName} gg.handler.kafkaconnect.keyMappingTemplate=${primaryKeys} gg.handler.kafkaconnect.messageFormatting=op gg.handler.kafkaconnect.insertOpKey=I gg.handler.kafkaconnect.updateOpKey=U gg.handler.kafkaconnect.deleteOpKey=D gg.handler.kafkaconnect.truncateOpKey=T gg.handler.kafkaconnect.treatAllColumnsAsStrings=false gg.handler.kafkaconnect.iso8601Format=false gg.handler.kafkaconnect.pkUpdateHandling=update gg.handler.kafkaconnect.includeTableName=true gg.handler.kafkaconnect.includeOpType=true gg.handler.kafkaconnect.includeOpTimestamp=true gg.handler.kafkaconnect.includeCurrentTimestamp=true gg.handler.kafkaconnect.includePosition=true gg.handler.kafkaconnect.includePrimaryKeys=true gg.handler.kafkaconnect.includeTokens=true goldengate.userexit.timestamp=utc goldengate.userexit.writers=javawriter javawriter.stats.display=TRUE javawriter.stats.full=TRUE gg.log=log4j gg.log.level=INFO gg.report.time=30sec gg.classpath=dirprm/:/oggbase/ogg108/OGG_KFK/kafkalib/* javawriter.bootoptions=-Xmx2048m -Xms32m -Djava.class.path=ggjava/ggjava.jar5.修改生产者配置文件$OGG_HOME/dirprm/custom_kafka_producer.properties
bootstrap.servers=kafka集群节点ip acks=1 #JSON Converter Settings key.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter=org.apache.kafka.connect.json.JsonConverter value.converter.schemas.enable=false buffer.memory=33554432 batch.size=16384 6.添加复制进程,并按示例进行配置 add replicat KFK_R01, exttrail /oggbase/ogg108/OGG_KFK/dirdat/ka, begin now 7.启动抽取端与复制端搭建过程中遇到的问题:
1.OGG抽取端版本为12.3.0.1.0时,报错:ERROR OGG-00662 OCI Error OCI-22053: overflow error (status = 22053).一直没解决掉,换为12.2后问题不再出现 2.OGG抽取端部署在ACFS上时,启动抽取进程报错:ERROR OGG-02079 Extract failed to login to the database as user ogg specified in the MININGUSER parameter because of error ORA-12154: TNS:could not resolve the connect identifier specified.但监听是好的,OGG只要不部署在ACFS上就不会报这个错误。 3.OGG复制端启动报错:OGG-15051 Java or JNI exception:...nested exception is java.lang.NoSuchMethodError: oracle.goldengate.handler.kafkaconnect.GGConfig.originalsWithPrefix(Ljava/lang/String;)Ljava/util/Map... 原因:kafka.props文件中gg.classpath变量设置不正确,或者kafka的jar包在gg.classpath变量指定的路径下不存在 4.ERROR OGG-02171 Error reading LCR from data source. Status 509, data source type 0. ERROR OGG-02191 Incompatible record 101 in /oggbase/ogg108/OGG_KFK/dirdat/ka000000000, rba -2 when getting trail header. replicat中使用kafka handler时报此错误,改为kafka connect handler后就不再报错。此问题未解决 5.ERROR OGG-01816 Partial operations are not supported in this release. 原因:抽取端没有抽取完整的lob对象。当目标端为非Oracle数据库或者其他需要完整lob信息的情况时,抽取端应使用TRANLOGOPTIONS FETCHPARTIALLOB参数