Hive For Un-Structured Data

By: | Jul 10, 2014

The Hadoop ecosystem today is very rich and growing. A technology that I use and enjoy quite a bit in that ecosystem is Hive. From the Hive wiki, Hive is “designed to enable easy data summarization, ad-hoc querying and analysis of large volumes of data”. To add to that statement, Hive is also an abstraction built on top of Map Reduce that lets you express data processing using a SQL-like syntax described in detail here. Hive reduces the need to deeply understand the Map Reduce paradigm and allows developers and analysts to apply existing knowledge of SQL to big data processing. It also makes expressing Map Reduce jobs more declarative.

One thing I do hear a lot from folks is that Hive, being schema driven and having typed columns, is only fit for processing structured and row oriented tabular data. Although this seems like a logical conclusion, it is very good at processing unstructured data into a structured form too. Not only that, it puts structure around processing of unstructured data that has higher level of abstraction than Map Reduce.

So before we get into the details of processing unstructured data in Hive, I’ll mention some other features and concepts of Hive.

Hive Meta Store

Hive uses a meta store to store meta data about the data, usually MySQL is used in production.  The meta store stores the table meta data like table names, columns and types, etc.

User Defined Functions

Hive provides many User Defined Functions (UDF) out of the box and makes it really easy to write custom ones. There are three main kinds of UDF’s in Hive.

  1. Generic UDFs are used for operating on a single column value. For example lower() is a UDF that will lower case a string value. 
  2. User Defined Aggregate Functions (UDAF) are used when aggregating on a value or set of values grouped by some columns. For example, sum() which will return the sum of a column with or without a group by clause.
  3. User Defined Table-Generating Functions (UDTF) are used when you want to generate rows from a column. For example,  explode() takes a array or map as input and returns multiple rows.

 

Tables and Partitions

Hive supports two main kinds of tables: external and non external. With external tables, the data is added to the table by using a load partition command. For non external tables, the data goes in whichever folder you specified in LOCATION block of the create statement. If no LOCATION is specified, hive will use its default base location specified in its configuration variable named ” hive.metastore.warehouse.dir”.  When you drop an external table, the data is not deleted. But, when you drop a non external table, the data is deleted along with the table. You can think of the data in Hive tables like giant CSV’s with some pre-determined delimiter defined when creating the table.

Since big data can be, well, big, it is not always optimal to scan the entire folder or table in Hive. For example, if you are interested in data for a specific date range, then limiting the data to that date range and then processing can be much more efficient than a full table scan. For this reason Hive supports partitions. Partition columns are virtual columns, they are not part of the data itself but are derived on load. Partitions columns don’t have to be dates, but many times, at least one of the columns tends to be a date type.

Here is  how to create a partitioned external table where rows are delimited by comma:

CREATE EXTERNAL TABLE IF NOT EXISTS access_log (log_line STRING)
PARTITIONED BY (hive_entry_timestamp STRING)
ROW FORMAT DELIMITED
   FIELDS TERMINATED BY ','
FIELDS TERMINATED BY '01'
STORED AS TEXTFILE
LOCATION '/user/demo/access_logs';

Processing Un Structured Data Using Hive

One of the popular use cases for Hadoop is processing large volumes of unstructured logs. I’ll use this as an example to illustrate using Hive to parse unstructured data and store in a query-able structured form.  As an example, here are a few lines of what a typical access log file might look like:

10.236.133.247 - - [Mon, 19 May 2014 16:31:33 GMT] "GET /api/admin/job/aggregator/status HTTP/1.1" 200 1847 "https://my.analytics.app/admin" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/34.0.1847.137 Safari/537.36"
10.236.133.247 - - [Mon, 19 May 2014 16:31:43 GMT] "GET /api/admin/job/aggregator/status HTTP/1.1" 200 1984 "https://my.analytics.app/admin" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/34.0.1847.137 Safari/537.36"
10.236.133.247 - - [Mon, 19 May 2014 16:33:02 GMT] "GET /dashboard/courses/1291726 HTTP/1.1" 304 - "https://my.analytics.app/admin" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/34.0.1847.137 Safari/537.36"
10.181.145.106 - - [Mon, 19 May 2014 16:33:03 GMT] "GET /api/loggedInUser HTTP/1.1" 304 - "https://my.analytics.app/dashboard/courses/1291726" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/34.0.1847.137 Safari/537.36"
10.181.145.106 - - [Mon, 19 May 2014 16:33:03 GMT] "POST /api/instrumentation/events/new HTTP/1.1" 200 2 "https://my.analytics.app/dashboard/courses/1291726" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/34.0.1847.137 Safari/537.36"

So using the table named “access_log” defined in the create statement example, we can load this data in that table. Let’s assume the access log data is stored in the following HDFS location:

/user/demo/access_logs/2014/07/04/00/00

Running the following  alter table statements, which takes advantage of the partitioned feature of the table,  will load the data into a new partition on the table :

ALTER TABLE access_log
  DROP IF EXISTS PARTITION (hive_entry_timestamp='2014-07-04T00:00Z');

ALTER TABLE access_log
  ADD PARTITION (hive_entry_timestamp='2014-07-04T00:00Z')
  LOCATION '/user/demo/access_logs/2014/07/04/00/00';

Next,  setup the table to store the parsed log data into:

CREATE EXTERNAL TABLE IF NOT EXISTS parsed_access_log (
  log_date STRING,
  ip STRING,
  http_method STRING,
  uri STRING,
  protocol STRING,
  user_agent STRING,
  url STRING
)
PARTITIONED BY (hive_entry_timestamp STRING)
ROW FORMAT DELIMITED
    FIELDS TERMINATED BY '01'
STORED AS TEXTFILE
LOCATION '/user/demo/parsed_access_log';

Once the raw table and the output tables are in place, script the parsing of the data using Hive SQL and taking advantage of Hive’s many built-in string parsing UDF’s. Again, make sure to add the destination and partition for the the parsed data:

ALTER TABLE parsed_access_log
  DROP IF EXISTS PARTITION (hive_entry_timestamp='2014-07-04T00:00Z');

ALTER TABLE parsed_access_log
  ADD PARTITION (hive_entry_timestamp='2014-07-04T00:00Z')
  LOCATION '/user/demo/parsed_access_log/2014/07/04/00/00';

Finally, parse the data using Hive SQL with UDF’s:

  INSERT OVERWRITE DIRECTORY '/user/demo/parsed_access_log/2014/07/04/00/00'

  SELECT
  TRIM(SUBSTRING( log_line, INSTR(log_line, "[")+1, INSTR(log_line, "]")-(INSTR(log_line, "[")+1) )) AS log_date,
  TRIM(SUBSTRING( log_line, 0, INSTR(log_line, "-")-2) ) AS ip,

  SPLIT(TRIM(SUBSTRING( log_line, (INSTR(log_line, " \"")+2), INSTR(log_line, " \"") ) ), " ")[0] AS method,
  SPLIT(TRIM(SUBSTRING( log_line, (INSTR(log_line, " \"")+2), INSTR(log_line, " \"") ) ), " ")[1] AS uri,
  REGEX_REPLACE(SPLIT(TRIM(SUBSTRING( log_line, (INSTR(log_line, " \"")+2), INSTR(log_line, " \"") ) ), " ")[2], "\"", "") AS proto,

  TRIM(SUBSTRING(log_line, (instr(log_line, "\" \"")+3), ( (length(log_line)-1)) - (INSTR(log_line, "\" \"")+2) )) AS user_agent,

  CASE
    WHEN INSTR(log_line, "\"http") > 0
      THEN TRIM(SUBSTRING( log_line, (INSTR(log_line, "\"http")+1), INSTR(log_line, "\" \"") - (INSTR(log_line, "\"http")+1) ) )
    ELSE "N/A"
  END AS url
FROM access_log
WHERE hive_entry_timestamp = '2014-07-04T00:00Z';

Once the load query completes, you can select * the processed structured version of the log data from the “parsed_access_log” table to get following tabular structured results:

parsed_access_logs

So there you have it, Hive can be used to effectively process unstructured data. For the more complex processing needs you may revert to writing some custom UDF’s instead. There are many benefits to using higher level of abstraction than writing low level Map Reduce code. One of the benefits that makes Hive appealing to me is the lack of boiler plate code for Mapper, Reducers, and Drivers. The biggest benefit is the declarative SQL-like syntax. It’s easier to follow and it’s a good fit for developer and non-developer folks to take full advantage of Big Data using an existing skill set.