Hadoop & Hive Integration Tutorial

Hadoop and Hive with GAUSS

Welcome to the Hadoop and Hive tutorial using GAUSS 16+. This example will involve acquiring movie ratings data and determining the average movie rating each user gave. The tutorial assumes that Hadoop and Hive are both installed and configured. All tests were performed under Ubuntu 14.04.1 with Hadoop 2.7.2 and Hive 1.2.1. All hduser references are for the local account used for testing.

Data Preparation

These instructions assume your Hadoop cluster is running on Linux. All data preparation commands are to be performed on the machine Hadoop is installed (HDFS location).
  • Fetch the sample data:
    $ wget http://files.grouplens.org/datasets/movielens/ml-100k.zip
    $ unzip ml-100k.zip
    Archive:  ml-100k.zip
      Length      Date    Time    Name
    ---------  ---------- -----   ----
            0  2015-03-31 14:33   ml-100k/
          716  2000-07-19 14:09   ml-100k/allbut.pl
          643  2000-07-19 14:09   ml-100k/mku.sh
         6403  2015-03-31 14:33   ml-100k/README
      1979173  2000-07-19 14:09   ml-100k/u.data
          202  2000-07-19 14:09   ml-100k/u.genre
           36  2000-07-19 14:09   ml-100k/u.info
       236344  2000-07-19 14:09   ml-100k/u.item
          193  2000-07-19 14:09   ml-100k/u.occupation
        22628  2000-07-19 14:09   ml-100k/u.user
      1586544  2001-03-08 11:33   ml-100k/u1.base
       392629  2001-03-08 11:32   ml-100k/u1.test
      1583948  2001-03-08 11:33   ml-100k/u2.base
       395225  2001-03-08 11:33   ml-100k/u2.test
      1582546  2001-03-08 11:33   ml-100k/u3.base
       396627  2001-03-08 11:33   ml-100k/u3.test
      1581878  2001-03-08 11:33   ml-100k/u4.base
       397295  2001-03-08 11:33   ml-100k/u4.test
      1581776  2001-03-08 11:34   ml-100k/u5.base
       397397  2001-03-08 11:33   ml-100k/u5.test
      1792501  2001-03-08 11:34   ml-100k/ua.base
       186672  2001-03-08 11:34   ml-100k/ua.test
      1792476  2001-03-08 11:34   ml-100k/ub.base
       186697  2001-03-08 11:34   ml-100k/ub.test
    ---------                     -------
     16100549                     24 files
  • Place the example data file, named u.data, directly into the HDFS (used for non-HiveQL queries):
    $ hadoop fs -put ml-100k/u.data
  • Preview the first 20 lines of the data file we just placed in the HDFS:
    $ hadoop fs -cat u.data | head -n20
    userid  movieid rating  unixtime   # (Headers added for clarity)
    196	242	3	881250949
    186	302	3	891717742
    22	377	1	878887116
    244	51	2	880606923
    166	346	1	886397596
    298	474	4	884182806
    115	265	2	881171488
    253	465	5	891628467
    305	451	3	886324817
    6	86	3	883603013
    62	257	2	879372434
    286	1014	5	879781125
    200	222	5	876042340
    210	40	3	891035994
    224	29	3	888104457
    303	785	3	879485318
    122	387	5	879270459
    194	274	2	879539794
    291	1042	4	874834944
    234	1184	2	892079237
  • Start the Beeline command-line interface (CLI) for Hive to perform the data preparation steps (we are omitting specifying 'default' as the database as this is done automatically):
    $ beeline -u jdbc:hive2://localhost:10000 -n hduser
    • Create a table in Hive to hold our example data
      0: jdbc:hive2://localhost:10000> CREATE TABLE u_data (
        userid INT,
        movieid INT,
        rating INT,
        unixtime STRING)
      ROW FORMAT DELIMITED
      FIELDS TERMINATED BY '\t'
      STORED AS TEXTFILE;
    • Import our example data into the Hive table we just created
      0: jdbc:hive2://localhost:10000> LOAD DATA LOCAL INPATH 'ml-100k/u.data' OVERWRITE INTO TABLE u_data;
    • Verify the data was imported, by printing the number of records in our table:
      0: jdbc:hive2://localhost:10000> SELECT COUNT(*) FROM u_data;
We now have identical data loaded into a Hive Table (u_data) and also as a file on HDFS (u.data) and can start working with it.

Example 1 - Local Hadoop batch job

Since the data in u.data is tab-delimited and we don't want the movieid or unixtime columns, our mapper will remove those fields.
GAUSS Mapper (mapcolumns.gss)
if eof(__STDIN);end;endif;

//Read all rows of columns 1:3
stats = csvReadM(__STDIN,1,1|3,"\t");

//Format print output as tab-separated
format /rdt;

//Print the first and third columns
print stats[.,1]~stats[.,3];
The reducer will pull in the data and perform the calculations, printing out the results
GAUSS Reducer (reducecolumns.gss)
if eof(__STDIN);end;endif;

//Read in the first 2 columns
stats = csvReadM(__STDIN, 1, 1|2, "\t");

all_ids = stats[.,1];
user_ids = unique(all_ids);

//Create matrix to hold user id's and average rating
final = user_ids ~ zeros(rows(user_ids), 1);

for i(1, rows(user_ids), 1);
    //Select i'th unique user id
    uid = user_ids[i];
    //Select all ratings by the i'th unique user
    user_ratings = selif(stats[.,2], all_ids .== uid);
    //Calculate average of user's ratings
    final[i,2] = meanc(user_ratings);
endfor;

format /rdt;
print final;
With the proper scripts in place (ensure the following command has the appropriate paths for the Hadoop jar and GAUSS scripts), we can test our code.
$ hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-x.x.x.jar -input u.data -output GAUSSExample1 -mapper '/home/hduser/gauss16/tgauss -b /home/hduser/gauss16/mapcolumns.gss' -reducer '/home/hduser/gauss16/tgauss -b /home/hduser/gauss16/reducecolumns.gss' -numReduceTasks 2
Check the output directory for the results:
$ hadoop fs -ls GAUSSExample1
Found 3 items
-rw-r--r--   1 hduser supergroup          0 2015-12-10 15:38 GAUSSExample1/_SUCCESS
-rw-r--r--   1 hduser supergroup      16522 2015-12-10 15:38 GAUSSExample1/part-00000
-rw-r--r--   1 hduser supergroup      16522 2015-12-10 15:38 GAUSSExample1/part-00001
$ hadoop fs -cat GAUSSExample1/part-00000 | head -n20
	
      1.00000000	      3.61029412	
      3.00000000	      2.79629630	
      5.00000000	      2.87428571	
      7.00000000	      3.96526055	
      9.00000000	      4.27272727	
     10.00000000	      4.20652174	
     12.00000000	      4.39215686	
     14.00000000	      4.09183673	
     16.00000000	      4.32857143	
     18.00000000	      3.88086643	
     21.00000000	      2.67039106	
     23.00000000	      3.63576159	
     25.00000000	      4.05128205	
     27.00000000	      3.24000000	
     29.00000000	      3.64705882	
     30.00000000	      3.76744186	
     32.00000000	      3.31707317	
     34.00000000	      4.05000000	
     36.00000000	      3.80000000
You can merge the output into a single file after the job is complete:
$ hadoop fs -getmerge GAUSSExample1 /local/output/file

Example 2 - Using HiveQL to perform remote map reduce

  1. This will require usage of either the HortonWorks Hive ODBC Driver or the Microsoft Hive ODBC Driver.
    Note: The architecture you select will need to match your GAUSS installation:
  2. Ensure successful connectivity via ODBC Data Source Administrator: An excellent resource for this process is on Page 7 of the HortonWorks ODBC Hive User Guide
  3. Run the following file in GAUSS to perform MapReduce using HiveQL:
GAUSS Program (db_hive.gss)
new;

trap 1;
id = dbAddDatabase("ODBC");
trap 0;
if scalmiss(id);
    print "Driver not found.";
    end;
endif;

gauss_home = "/home/hduser/gauss16";
driver = "HortonWorks Hive ODBC Driver";
//driver = "Microsoft Hive ODBC Driver"; // If using Microsoft Hive ODBC Driver
user = ""; // leave blank if not using
password = ""; // leave blank if not using
host = "192.168.0.25";
port = 10000;
use_db = "default";
use_ssl = 1;

// Choose correct authentication type based on provided values
if strlen(user) > 0 or use_ssl;
    if strlen(password) > 0;
        if use_ssl;
            auth_mech = "4";
        else;
            auth_mech = "3";
        endif;
        opts = "AuthMech="$+auth_mech$+";UID="$+user$+";PWD="$+password$+";";
    else;
        opts = "AuthMech=2;UID="$+user$+";";
    endif;
else;
    opts = "AuthMech=0;"; // Specify authentication (None)
endif;

dbSetDatabaseName(id, "DRIVER={"$+driver$+"};HOST="$+host$+";PORT="$+ntos(port)$+";HiveServerType=2;Schema="$+use_db$+";"$+opts);
dbOpen(id);
// Set number of reduction tasks
rid = dbExecQuery(id, "set mapred.reduce.tasks=2;");
// Process query
qid = dbExecQuery(id, "from (select userid,rating from u_data distribute by userid) t1 insert overwrite directory 'GAUSSExample2' reduce userid,rating using '"$+gauss_home$+"/tgauss -b "$+gauss_home$+"/reducecolumns.gss';");
dbClose(id);
Verify the job was ran successfully and inspect the output:
$ hadoop fs -ls GAUSSExample2
Found 2 items
-rwxr-xr-x   1 hduser supergroup      16489 2015-12-10 15:33 GAUSSExample2/000000_0
-rwxr-xr-x   1 hduser supergroup      16524 2015-12-10 15:33 GAUSSExample2/000001_0
$ hadoop fs -cat GAUSSExample2/000000_0 | head -n20
\N
      2.00000000      3.70967742	
      4.00000000      4.33333333	
      6.00000000      3.63507109	
      8.00000000      3.79661017	
     10.00000000      4.20652174	
     12.00000000      4.39215686	
     14.00000000      4.09183673	
     16.00000000      4.32857143	
     18.00000000      3.88086643
     20.00000000      3.10416667	
     22.00000000      3.35156250	
     24.00000000      4.32352941	
     26.00000000      2.94392523	
     28.00000000      3.72151899	
     30.00000000      3.76744186	
     32.00000000      3.31707317	
     34.00000000      4.05000000	
     36.00000000      3.80000000	
     38.00000000      3.71900826
The possibilities for MapReduce with GAUSS are endless. The level of integration can be modified to suit your needs, such as only using GAUSS for the Reduce phase.

Have a Specific Question?

Get a real answer from a real person

Need Support?

Get help from our friendly experts.

Try GAUSS for 14 days for FREE

See what GAUSS can do for your data

© Aptech Systems, Inc. All rights reserved.

Privacy Policy