Sunday, January 11, 2015

Processing CSV Files using Hive / Hadoop / HDFS


When there is a need to process large-sized CSV files, Apache Hive became a good option since it allow us to directly query these files. I will try to describe my recent experiences in using Apache Hive. In this case I need to group the rows and count the rows for each group. I will compare to my existing systems using MySQL database, one built using PHP and other built using combination of Pentaho and PHP.

Installation & Configuration

We have many components of Hadoop-Hive-HDFS ecosystem :
  • HDFS : Namenode service, Datanode services. 
  • MapReduce : ResourceManager service, NodeManager services
  • Hive 
  • ZooKeeper
Each component have their own configuration file (or files), and their own log files. 
For simplicity, in my opinion nothing beats the Apache-MySQL-PHP stack. Minus points for Hadoop-Hive-HDFS in complexity standpoint. I think we need additional management layer to be able to cope with complexity, maybe like Cloudera Manager or Apache Ambari, which I haven't explored yet.

My experiences here are the result of using Vagrant to provision 4 VMs like I have described in my previous post.
The Hadoop ecosystem consists of many Java components. Be prepared to review the configuration of each one. For example, the Zookeeper in my server is configured for maximum 50 connections.

The load process

The CSV files to be processed are stored in a directory. For my existing systems that uses MySQL database, the first processing to be done is parsing the CSV files and performing inserts into MySQL database table. But using Hive, we only need to upload the files to HDFS :

hdfs$ hadoop -put filename2014????.log /user/myuser

And then load the file into the 'staging' table :

hive> LOAD DATA INPATH '/user/myuser/filename20140101.log' INTO TABLE my_staging_table PARTITION (batch_id=20140101);

Due to limited time I haven't explored Hive automation from PHP or Java. So I need to run the statements for each file that need to be processed. But the statements get executed quickly, since there are no row processing done here.

My existing systems need to perform row field splitting just to be able to create insert statements. So: plus points for Hive in the efficency part, being able to quickly load the files into the Hive tables.

Hive  is able to ingest CSV files that ends with normal line endings very quickly 

The map and reduce processing

First I tried to follow the same process as my PHP-based system, that is creating additional field to the existing rows that are derived from existing field. This additional field becames one of the important grouping key. Because Hive tables are can only by appended, and somewhat immutable for all other purposes, I created a new table to contain the augmented rows.

hive> CREATE TABLE my_stage2_table (... ) PARTITION BY (batch_id) STORED AS SEQUENCEFILE;
hive>set hive.exec.dynamic.partition.mode=nonstrict;
hive> INSERT INTO TABLE my_stage2_table PARTITION (batch_id)  SELECT ...with additional column using UDF functions.. from my_staging_table;

The process is quite a long time, but definitely several times faster that my existing PHP process. It is about 40 minutes, vs 8 hours for my existing MySQL-PHP system in the same server. I found that the ResourceManager web app is very useful to check running processes (containers). I need to code a lot in PHP to get similar process visibility. But error cases are not uncommon, when processes hung or get terminated and I were unable to quickly check the  reason for failures (it is not shown in the Hue UI). In some cases I need to hunt for clues in the log files (there are many, because there is multiple VMs and components). Maybe installing ELK stack or Splunk for log file analysis would help, which I haven't done yet. Maybe my install is not a good one - because clicking in some log links in the web consoles (such as Resource Manager's web in port 8088) result in error messages.
I cannot determine my opinion on this. It is either difficulty of configuring Hive stack caused me trouble in this part, or that there are many edge cases where the Hadoop ecosystem will not became user friendly. 

If there is trouble with your MapReduce process, and the Hue UI gives very little clue, you might want to check other Hadoop web consoles, and maybe you need to check all of the log files existing in the system (hive-server2, yarn and zookeeper comes to mind)

I also find very peculiar that Hive's compression setting is stored in the session and not the metadata :

SET hive.exec.compress.output=true;
SET io.seqfile.compression.type=BLOCK;

But I have trouble using the compression options above, I restarted the VMs and skip the compression enabling step to be able to run the INSERT INTO above correctly. 

The summarizing part is done on the stage2 table, creating a new summary table :

hive> CREATE TABLE summary_table AS select count(*) cnt, key1, key2,.... from my_stage2_table group by key1,key2,..

The summarizing part runs quickly, about 5 mins using few key fields, near 10 mins using the real set of keys we need in production.

In the second iteration, I remembered that the smaller data we need to write to hard disk, the faster any process will perform. So I forgo the staging table and go direct to summarizing using expressions for the group by clause, and still get 10 minutes of running time.

hive> CREATE TABLE summary_table AS select count(*) cnt, key1, key2,..,key expression 1 as keyname1,key expression 2 as keyname2 from my_stage2_table group by key1,key2,..,key expression 1, key expression 2

Hive can be very fast in a single server hardware if we manage to use processes that writes small amount of output
I noted that memory capacity of Hadoop nodes are a bit tricky to set up. First, missing entry in the configuration file about the memory results in 8 GB default, which will cause strange behaviors when we only have 2 GB VM RAM and the volume of data that need to be processed are quite large. Allocating too small memory for Hadoop MapReduce/Yarn will make the CPU cores underutilized. However, because of single HDD in the test server (which makes the disk I/O capability not in the same level with enterprise servers ), underutilization of the CPU cores doesn't have much difference.

 Post processing

After the summary table gets created, the table is exported to CSV and processed using Excel Pivot table function for analysis. Because the million rows is now summarized into hundreds of rows, Excel could easily process them.


Apache Hive's performance for the CSV data processing is several times that of custom PHP application using MySQL. However the user need to aware that the complex configuration might cause difficulties, namely in troubleshooting for errors and configuring optimal RAM/CPU core to be used by the VM Hadoop nodes.

Reference :

Friday, January 9, 2015

Openshift Log Aggregation And Analysis using Splunk

Splunk is one of popular tools we use to analyze log files. In this post I would describe how to configure an openshift cluster to send all of the platform log files (mind that this excludes gear log files) to Splunk.

Configure Splunk to listen on TCP port

From splunk web console home, choose 'Add Data', 'monitor', 'TCP/UDP', fill in port 10514 (TCP), click 'Next', select sourcetype Operating System - linux_messages_syslog.

Configure Rsyslog Forwarding

These steps should be done in every openshift node, openshift broker and console.
As root, create an /etc/rsyslog.d/forward.conf file  as follows (change splunkserver to your splunk server IP, and the @@ means TCP, instead of @ for UDP)

$WorkDirectory /var/lib/rsyslog # where to place spool files
$ActionQueueFileName fwdRule1 # unique name prefix for spool files
$ActionQueueMaxDiskSpace 1g   # 1gb space limit (use as much as possible)
$ActionQueueSaveOnShutdown on # save messages to disk on shutdown
$ActionQueueType LinkedList   # run asynchronously
$ActionResumeRetryCount -1    # infinite retries if host is down
*.* @@splunkserver:10514

And I don't want watchman metrics to fill up local log files, so I discard the metrics before writing to local file. Edit /etc/rsyslog.conf :

#kern.*                                                 /dev/console
:msg, contains, "type=metric"                           ~
# Log anything (except mail) of level info or higher.
# Don't log private authentication messages!
*.info;mail.none;authpriv.none;cron.none                /var/log/messages

And in my system SElinux block the rsyslog remote connect activity, so we need to enable allow_ypbind :

setsebool -P allow_ypbind 1

Restart the rsyslog system

service rsyslog stop
service rsyslog start

Configure Node Logging

Edit the /etc/openshift/node.conf and add these lines if they haven't already existed:

# enable metrics, beware of log volume increase
# select fewer metrics

Restart the mcollective service.

service ruby193-mcollective stop
service ruby193-mcollective start

Configure Broker Logging

Edit the /etc/openshift/broker.conf and add this line :


Restart broker 

service openshift-broker stop
service openshift-broker start

Configure Web Console Logging

Edit the /etc/openshift/console.conf and add this line :


Restart the web console

service openshift-console stop
service openshift-console start


We could analyze, for example, gear memory usage :

And gear create/destroy activities :