Copying Big Oracle tables Using Apache Spark
Background
Sometimes we need to copy table data from one database to another. Logically the best way to do this is to do database specific export (expdp in oracle lingo) and import in the destination database (impdp in oracle). But sometimes there are deficiencies in this method such as unable to do parallel process for single table, and requirements of DBA access in the database. This post shows how to do table copy using Apache Spark and Apache Zeppelin.
Preparations
In order to allow Apache Spark access to oracle jdbc connections, we need to add dependency to ojdbc6.jar. To do this, write this paragraph in Zeppelin :
%dep
z.reset()
z.load("/path/to/ojdbc6.jar")
Basic Approach
The most basic approach to copy table data is to retrieve data in one query and save the resulting records in the target database.
val tableNameSrc = "TABLENAME"
val tableNameTrg = "TABLENAME"
import java.util.Properties
Class.forName("oracle.jdbc.driver.OracleDriver").newInstance();
val connectionProperties = new Properties()
connectionProperties.put("user", "reader")
connectionProperties.put("password", "reader_password")
connectionProperties.put("driver", "oracle.jdbc.driver.OracleDriver")
val jdbc_url = s"jdbc:oracle:thin:@//sourceIP:1521/sourceSID"
val targetUrl = s"jdbc:oracle:thin:@//targetIP:1521/targetSID"
val dataTable = spark.read.jdbc(jdbc_url, tableNameSrc, connectionProperties)
val props = Map( "user" -> "writer",
"password" -> "writer_password",
"batchsize" -> "5000",
"driver" -> "oracle.jdbc.driver.OracleDriver")
val propsP = new java.util.Properties()
props foreach { case (key,value) => propsP.put(key, value)}
dataTable.write.mode(org.apache.spark.sql.SaveMode.Append).jdbc(targetUrl,tableNameTrg,propsP)
This method queries the entire table data in one go. This would work well if the JVM's memory limit is larger than required memory to load the entire table data. And of course there is some overhead required by Spark itself. If the data is quite large, we will find JVM hangs trying to garbage collect memory or JVM crashed with OutOfMemoryException.
Partitioning By Primary Key Range
Another approach is making use of the fact that access to table row using primary key is quite fast, and Spark is able to partition the query data using given clauses. However there is a caveat : the primary key (or any indexed column) should be numeric and we have upper bound and lower bound for the column. This normally would apply if we are using mysql tables using distinct singular primary key. But in the author's case we are not allowed to add arbitrary column to an already large table in oracle that have no numeric indexed field, so we cannot use this approach.
Partitioning By ROWID
In Oracle database, ROWID is a pseudo-column that describes a row's location in the physical datafile.
Excerpt from https://www.orafaq.com/wiki/ROWID :
The Oracle 8 format is on 10 bytes:
- bits 1 to 32 (bytes 1 to 4): data object id (0-4294967295)
- bits 33 to 44 (byte 5 and half byte 6): file number inside the tablespace (0-4095)
- bits 45 to 64 (half byte 6 and bytes 7 and 8): block number inside the file (0-1048575)
- bits 65 to 80 (bytes 9 and 10): row number inside the block (0-65535)
The key take is that querying a table using ROWID is the same as querying the table using primary key, regardless the fact that the table doesn't have any primary key (you would be surprised) or the table having a composite primary key. Experiment using explain plan to check the validity of this conjecture :).
In this approach, table are first be queried in entirely to get rows block numbers :
(note that the base64 format of ROWID is OOOOOOFFFBBBBBBRRR)
%spark
val tableName="owner.tablename"
val theTable = spark.read.jdbc(url, "( select /*+ PARALLEL(8) */ SUBSTR(ROWID,1,15) PREFIX,SUBSTR(MIN(ROWID),1,64) MINR,SUBSTR(MAX(ROWID),1,64) MAXR from "+tableName+" GROUP BY SUBSTR(ROWID,1,15)) ",props)
var rekap1 = theTable.rdd.collect()
rekap1.length
Or, we may want to group neighboring blocks together, because we don't want too much of block overhead from spark processing :
%spark
val tableName="owner.tablename"
val theTable = spark.read.jdbc(url, "( select /*+ PARALLEL(8) */ SUBSTR(ROWID,1,13) PREFIX,SUBSTR(MIN(ROWID),1,64) MINR,SUBSTR(MAX(ROWID),1,64) MAXR from "+tableName+" GROUP BY SUBSTR(ROWID,1,13)) ",props)
var rekap1 = theTable.rdd.collect()
rekap1.length
The length field shows how many partitions the entire table will be split. It is better to keep this number under 1000 by changing the digits we take from ROWID.
Afterwards, define which columns we want to copy (just use tablename.* instead of COL1 COL2 etc if we want all columns) :
%spark
rekap1.length
var clauses10 = rekap1.map( x => "ROWID BETWEEN '" + x(1) + "' AND '" + x(2) +"' ").take(2)
val tableNameXt = "( SELECT COL1, COL2,COL3, ROWID ROWID1 FROM owner.tablename) T"
val dataTable = spark.read.jdbc(url, tableNameXt, clauses10 , props).drop("ROWID1").show
After ensuring we got all the columns queried using the above paragraph, proceed with the whole copying :
%spark
var clauses= rekap1.map( x => "ROWID BETWEEN '" + x(1) + "' AND '" + x(2) +"' ")
val wData = spark.read.jdbc(url, tableNameXt, clauses , props).drop("ROWID1").write.mode(org.apache.spark.sql.SaveMode.Append).jdbc(url, tableNameTrg,props)
The last paragraph could be altered using Spark Warehouse table as a destination :
%spark
var clauses= rekap1.map( x => "ROWID BETWEEN '" + x(1) + "' AND '" + x(2) +"' ")
val wData = spark.read.jdbc(url, tableNameXt, clauses , props).drop("ROWID1").write.saveAsTable("LOCALTABLENAME")
So the approach can be used both for copying a table from one oracle database to another and also to extract some Oracle table to Spark's internal warehouse.
Comments