Copying Big Oracle Tables into Iceberg

During my piloting of Trino Query Engine (formerly PrestoSQL), I tried several datawarehouse destination options. The first option is using Trino's Hive connector with the data stored in Minio storage accessed using S3 API. The Minio services were  run on IBM hardware (ppc64le architecture), but that's another story for another blog post. The metadata were stored in a Hive metastore to serve the metadata, which takes some effort because at some point the metastore need to access the S3 storage (which I don't understand why) and thus need to have proper Hadoop AWS jars. The second option is using Trino's Iceberg Connector to store the data in the same Minio storage and Hive metastore with Iceberg table format.

For reference's sake, I will note the version of the software being used in this experiment. 

  • Trino version 442, deployed on openshift OKD 4.13 using Pulumi and Trino Helm template as starting point. Using pristine Trino image taken from docker hub (docker.io/trinodb/trino:442). Trino's query.max-memory is set to 8GB and query.max-memory-per node is 2 GB, the worker pods are set using 6GB memory limit, and deployed 5 worker pods.
  • Hive metastore 3.1.3, custom dockerfile build and deployed on openshift OKD 4.13, with hadoop-aws 3.1.0 and aws-java-sdk-bundle 1.11.271. Ekstra care taken to find matching versions of hadoop-aws and aws-java-sdk-bundle. Docker base image is docker.io/apache/hive:3.1.3.
  • Spark 3.3.4 and Zeppelin 0.10.1, using hadoop-aws-3.3.2.jar, aws-java-sdk-bundle-1.11.1026.jar, , spark-hadoop-cloud_2.12-3.3.4.jar and iceberg-spark-runtime-3.3_2.12-1.5.0.jar.

Using Trino


Our attempt to copy a sizable Oracle table using Trino didn’t go as planned due to encountering an ‘Insufficient Resources’ error. On other occasions the worker pods crashed without a single warning. I blame these to not understanding the concept of limits in Trino, which I understood more after reading the ebook "Trino the Definitive Guide". So there are concept of System memory and User memory, and the limits being set by default in config.properties file is only the user part. When the system memory part is larger than the pods memory limit, the pod crashed (imagine, having set memory limit to 6GB, Trino's limit of 2GB not exceeded, yet the pod crashed silently).
Ok, we need to have some results, so here it is the result of copying part of the table (only 3 financial periods / 3 month of data and only 1/5 portion of customers) : 
Partitioning : none
Running time : 4.89 minutes,  Elapsed 2.79 minutes
Rows / s : 29.8 K
Row : 4.99 million
Logical Written data : 8.52 GB
Physical Written data : 291 MB
Peak Memory : Stage 1 : 792 MB
Cumulative Memory : Stage 1 : 78.9 GB.
Average speed : 14,9 K row / s
Yes, the cumulative memory is 78.9 GB for logical data of 8.52 GB.

When I use partitioning and bucketing altogether, the process failed :
Partitioning : financial period and bucket from customer ids(50 buckets)
Running time : 3.06 minutes , Elapsed : 44.81s
Rows / s : 56.4 K
Row : only 2.49 M before error
Logical written data : 4.25 GB
Physical written data : 3.85 GB
Peak memory: Stage 1: 5.69 GB
Cumulative Memory : Stage 1 : 112 GB.

With the Hive destination (ORC format) and not using Iceberg :
Partitioning : financial period and bucket from customer ids(50 buckets)
Running time : 4.65 minutes, Elapsed Time : 37 s
Rows / s : 776 K (I think this is inaccurate)
Row : 4.99 M 
Logical written data : 8.54 GB
Physical written data : 376 MB
Peak memory: Stage 1: 5.4 GB
Cumulative Memory : Stage 1 : 116 GB.
Average Speed : 134 K row/s

In order to prevent failure in the Iceberg bucketed  insert, I increase query.max-memory-per-node  to 4000 MB,  and only for Iceberg bucketed case I set some other session properties :

set SESSION spill_enabled=true;
set SESSION use_preferred_write_partitioning=false;
set SESSION scale_writers=false;
set SESSION writer_scaling_min_data_processed='250MB';
set SESSION task_scale_writers_enabled=false;
set SESSION task_concurrency=2;
set SESSION task_max_writer_count=1;

Results using Iceberg partitioned by period for 12 financial period (1 years of data) and all customers is :
Wall clock time : 1340.8 seconds
Rows : 153.14 million
Rows / s : 114.2 K row/s

Results using Iceberg partitioned by period and bucketed using customer ids to 50 buckets for 12 financial period  and all customers is :
Wall clock time : 1058.7 seconds
Rows : 153.14 million
Rows /s  : 144.6 K row/s
Peak Memory : 6.1 GB
Note : without session parameters reducing concurrency, most of the query would failed in this partitioned and bucketed case. It seems that ideal memory size for Trino cluster in this case (12 period, 50 buckets) are much larger than  the resource limits given to the OKD project for running the Trino cluster.

The memory measurement were taken from Trino dashboard, so there is some amount of memory usage not accounted in there (Trino's rule of thumb is about 30% of total memory being reserved for allocation outside Trino memory accounting).

Using Spark

For this one I run all 12 period in one year and all customers.

Partitioning : financial period and bucket from customer ids(50 buckets)
Running time : 18 min + 9.2 min (total 27.2 min)
Rows : 89 M
Shuffle Read Size : 39.9 GB
Output Size : 7 GB
Rows : 89 M
Average Speed : 54.5 K row/s

For Spark, I only use 1 executor process with 12 worker thread, memory allocation of 8 GB.

Method for Copying Large Oracle Table

Maybe I should describe the method I used to do the process. The basic concept is that, for partition-less table, we should create a somewhat virtual 'partition' of the large table. For this current table, the existing table were partitioned using financial periods, but I need the partition the data further in order to fit the data chunks (or splits in Trino's terminology) into the memory while doing 4 queries in parallel. So I create a ccabuckethelper table in DBT (data build tool): 
with x as (
select cca, ntile(5) over (order by cca) bucketno FROM iceberg.default.originaltablename
where nper='201812'
)
select min(cca) mincca,max(cca) maxcca,bucketno,
count(*) cnt from x group by bucketno order by bucketno

This table helps me to partition further by using an existing index on cca, and to divide the entire customer space to 5 equal parts (this is what ntile(5) means). Actually I cheat by using existing data in iceberg that are being extracted without further partitioning.
Next, to create an empty target table by using this SQL :
create table if not exists iceberg.default.targettablename
as select d.*,cast(0 as integer) bucketno
from sourcecatalog.sourceschema.originaltable d where nper ='197000'
This SQL assumes the period 1970 month 0 is empty.

The basic query is like this :
select p.*, 0 bucketno from "catalog"."schema"."originaltable" p where (nper = '201909') and 
            ( cca between '88800000087000300001' and '88807385714000100001'
             or cca < '88800000087000300001'
            ) union all
            select p.*, 0 bucketno from "catalog"."schema"."originaltable" p where (nper = '201908') and 
( cca between '88800000087000300001' and '88807385714000100001' or cca < '88800000087000300001' ) union all select p.*, 0 bucketno from "catalog"."schema"."originaltable" p where (nper = '201907') and
( cca between '88800000087000300001' and '88807385714000100001' or cca < '88800000087000300001' )
In Trino, we could not do JDBC queries parallel like in Spark, so the next best thing is to use union all clause, each select clause would be run in a separate stage in parallel. But there is some limitation for this,  about 150 stages for one query in the default settings, and also the memory usage will expand toward the memory limit. If the query memory usage larger than the limit, the query will be stopped by Trino (and thus the transaction rolled back).

In order to skip periods that already has data in them, I use this query to create summarization table : 
create table destinationsummarytable as
select nper,count(*) cnt from catalog.schema."destinationtable" group by nper

Using DBT to generate the insert queries, I run this sql to prepare the where clauses and also to skip existing periods  :
create or replace table catalog.schema.tmp_ccaquery as
with months as (select batchmonth FROM UNNEST(SEQUENCE(1,6)) AS t(batchmonth)),
years as (select batchyear from iceberg.default.batchyears),
q1 as
(
select mincca,maxcca,cast (years.batchyear as int)*100 + months.batchmonth as nper,
first_value(mincca) over w as firstcca,
last_value(maxcca) over w as lastcca, bucketno,
ceil((row_number() over (order by years.batchyear, batchmonth, bucketno) - 1) /5)
            rowno from catalog.schema.ccabuckethelper, months, years
window w as (partition by years.batchyear order by bucketno range between
        unbounded preceding and unbounded following )
),
partbatches as (select mincca,maxcca, nper, firstcca, lastcca, rowno, bucketno from q1)
select mincca,maxcca,t.nper nper_t,p.nper nper_p,firstcca,lastcca, t.rowno, t.bucketno
        from partbatches t
left outer join catalog.schema.destinationsummarytable p on
        p.nper = cast(t.nper as varchar(12)) where p.nper is null

Then to generate insert SQLs, I used a quite bit of DBT macro language :
{%- set queryrowno %}
select rowno from catalog.schema.tmp_ccaquery2 group by rowno order by rowno
{%- endset %}

{%- do run_query(createccaquery) %}
{%- set grouplist = query_to_list(queryrowno) %}

{%- for groupcode in grouplist %}
{%- set ccaquery3 %}
select mincca,maxcca,nper_t,firstcca,lastcca, bucketno from catalog.schema.tmp_ccaquery2
where rowno = {{ groupcode[0] }} order by nper_t,bucketno
{%- endset %}
{%- set ccaprefixlist = query_to_list(ccaquery3) %}
{%- set insertquery %}
-- set SESSIONs here
        -- (omitted)
   insert into catalog.schema.destination table
{%- for ccaprefix in ccaprefixlist %}
select p.* from catalog.sourceschema.originaltable p where (nper = '{{ ccaprefix[2] }}') and
( cca between '{{ ccaprefix[0] }}' and '{{ ccaprefix[1]}}'
{%- if ccaprefix[0] == ccaprefix[3] %}
or cca < '{{ ccaprefix[0] }}'
{%- endif %}
{%- if ccaprefix[1] == ccaprefix[4] %}
or cca > '{{ ccaprefix[1] }}'
{%- endif %}
)
{%- if not loop.last %} union all {%- endif %}
{%- endfor %}
{%- endset %}
{%- if ccaprefixlist %}
{%- do run_query(insertquery) %}
{%- endif %}
{%- endfor %}

Conclusion

I concluded that the Iceberg connector is quite faster than Hive's connector for Trino, but it required much larger peak memory usage. The ranking is like this : (from slow to faster)
a. Spark writing to Iceberg partitioned and bucketed table, small memory usage, lowest speed.
b. Trino with Hive connector, writing to partitioned and bucketed table, medium memory usage, medium speed
c. Trino with Iceberg connector, writing to partitioned table, medium memory usage, faster speed
d. Trino with Iceberg connector, writing to partitioned and bucketed table, largest memory usage, fastest speed.

Addendum

Hive table structure using partition and bucketing : 

CREATE TABLE (hive-catalog).(schema).(tablename)
WITH ( bucket_count = 50,
    bucketed_by = ARRAY['cca'],
    partitioned_by = ARRAY['partnper']
)
AS ...


DBT model syntax :
{{ config(materialized='incremental',
properties={"partitioned_by": "ARRAY['partnper']",
"bucketed_by": "ARRAY['cca']", "bucket_count": 50}) }}

Iceberg table structure using partition :

CREATE TABLE (iceberg-catalog).(schema).(tablename)

WITH (                                                                                     

    format = 'PARQUET',                                                                       

    partitioning = ARRAY['nper']                                                              

 ) 

AS ...

DBT model syntax : 
{{ config(materialized='table',
properties={"partitioning": "ARRAY['nper']"}) }}

Iceberg table structure using partition and bucketing :

CREATE TABLE (iceberg-catalog).(schema).(tablename)

WITH (                                                                                     

    format = 'PARQUET',                                                                       

    partitioning = ARRAY['nper','bucket(cca,50)']                                                              

 ) 

AS ...

DBT model syntax to enable creation of such table :
{{ config(materialized='table',
properties={"partitioning": "ARRAY['nper','bucket(cca, 60)']"}) }}

Comments

Popular posts from this blog

Long running process in Linux using PHP

Reverse Engineering Reptile Kernel module to Extract Authentication code

SAP System Copy Lessons Learned