Thursday, April 14, 2016

Apache Sqoop


Sqoop is a tool designed to transfer data between Hadoop and relational databases or mainframes. You can use Sqoop to import data from a relational database management system (RDBMS) such as MySQL or Oracle or a mainframe into the Hadoop Distributed File System (HDFS), transform the data in Hadoop MapReduce, and then export the data back into an RDBMS.

Sqoop automates most of this process, relying on the database to describe the schema for the data to be imported. Sqoop uses MapReduce to import and export the data, which provides parallel operation as well as fault tolerance.

Importing a Table from RDBMS to HDFS:

Sqoop: sqoop import --connect jdbc:mysql://localhost/TestDb --username root --password root --table employee -m1

Here we are connecting to MySQL through JDBC connectors and using the database TestDb. Here it is necessary to specify the MySQL ‘s username and password and the table name.

Here ‘-m’ specifies the number of map task that can be run simultaneously and ‘m1’ means that only one map task can run.

If we do not use -m1 at the end of the statement, for each record in the MySQL table we will get separate files in the HDFS.

Now the data in RDBMS has been successfully imported into HDFS. By default, the files will be stored here: /user/$user_name/table_name/part-m-00000 file.

Importing all the Tables in a Database to HDFS

Sqoop: sqoop import-all-tables --connect jdbc:mysql://localhost/TestDb --username root --password root -m1

Importing table data into a Specific Directory in HDFS

Sqoop: sqoop import --connect jdbc:mysql://localhost/TestDb --username root --password root --table employee -m1 --target-dir /sqoop_data/employee/

Importing Table as a Sequence File into HDFS

Sqoop: sqoop import --connect jdbc:mysql://localhost/TestDb --username root --password root --table employee -m1 --target-dir /sqoop_data/employee/squence/ --as-sequencefile

As the sequence file stores the contents in binary format, we will get the binary output.

Importing Table as a Avro File into HDFS


Sqoop: sqoop import --connect jdbc:mysql://localhost/TestDb --username root --password root --table employee -m1 --target-dir /sqoop_data/employee/avro/ --as-avrodatafile

When storing a file using Avro file format, we will get the output file with .avro extension and the contents inside the file will be in binary format. 

Export Data from HDFS to RDBMS

First create a table in MySQL named as employee1
Sqoop: sqoop export --connect jdbc:mysql://localhost/TestDb --table employee1 --export-dir /employee1.csv

Incremental Import in Sqoop To Load Data From Mysql To HDFS

Sqoop supports two types of incremental imports: append and lastmodified. You can use the –incremental argument to specify the type of incremental import to perform.

You should specify the append mode when importing a table, where new rows are continually added with increasing row id values. You must specify the column containing the row’s id with –check-column. Sqoop imports rows where the check column has a value greater than the one specified with –last-value.

An alternate table update strategy supported by Sqoop is called lastmodified mode. This should be used when rows of the source table is updated, and each such update will set the value of a last-modified column to the current timestamp. Rows where the check column holds a timestamp more recent than the timestamp specified with –last-value are imported.

At the end of an incremental import, the value which should be specified as –last-value for a subsequent import is printed to the screen. When running a subsequent import, you should specify –last-value in this way to ensure you import only the new or updated data. This is handled automatically by creating an incremental import as a saved job, which is the preferred mechanism for performing a recurring incremental import.

Sqoop: Sqoop import --connect jdbc:mysql://localhost/TestDb --username root --password root --sales_table -m1 --tagret-dir /sqoopout

Currently 2 records imported successfully from MySQL to HDFS
1, table, 1000
2, sofa, 5000

Insert 3 more records through mysql
mysql> insert into sales_table values(3, 'window', 3000),(4, 'fan', 2000),(5, 'tv',8000);

now 5 records available in sales_table

The following syntax is used for the incremental option in Sqoop import command.

Sqoop: Sqoop import --connect jdbc:mysql://localhost/TestDb --username root --password root --sales_table -m1 --tagret-dir /sqoopout/ --incremental append --check-column ord_id --last-value 2

3 more records have been retrieved and the incremental import is now complete. 

Wednesday, April 13, 2016

Hive Indexing

Index:

An Index acts as a reference to the records. Instead of searching all the records, we can refer to the index to search for a particular record. Indexes maintain the reference of the records. So that it is easy to search for a record with minimum overhead. Indexes also speed up the searching of data.

Indexing in Hive:

Hive is a data warehousing tool present on the top of Hadoop, which provides the SQL kind of interface to perform queries on large data sets. Since Hive deals with Big Data, the size of files is naturally large and can span up to Terabytes and Petabytes. Now if we want to perform any operation or a query on this huge amount of data it will take large amount of time.

In a Hive table, there are many numbers of rows and columns. If we want to perform queries only on some columns without indexing, it will take large amount of time because queries will be executed on all the columns present in the table.

The major advantage of using indexing is; whenever we perform a query on a table that has an index, there is no need for the query to scan all the rows in the table. Further, it checks the index first and then goes to the particular column and performs the operation.

So if we maintain indexes, it will be easier for Hive query to look into the indexes first and then perform the needed operations within less amount of time.

Eventually, time is the only factor that everyone focuses on.

When to use Indexing:

Indexing can be use under the following circumstances:

If the dataset is very large.
If the query execution is more amount of time than you expected.
If a speedy query execution is required.
When building a data model.
Indexes are maintained in a separate table in Hive so that it won’t affect the data inside the table, which contains the data. Another major advantage for indexing in Hive is that indexes can also be partitioned depending on the size of the data we have.

Types of Indexes in Hive:

Compact Indexing
Bitmap Indexing

Differences between Compact and Bitmap Indexing:

The main difference is the storing of the mapped values of the rows in the different blocks. When the data inside a Hive table is stored by default in the HDFS, they are distributed across the nodes in a cluster. There needs to be a proper identification of the data, like the data in block indexing. This data will be able to identity which row is present in which block, so that when a query is triggered it can go directly into that block. So, while performing a query, it will first check the index and then go directly into that block.

Compact indexing stores the pair of indexed column’s value and its blockid.

Bitmap indexing stores the combination of indexed column value and list of rows as a bitmap.

Bitmap Indexing:

A bitmap is is a type of memory organization or image file format used to store digital images so with this meaning of bitmap, we can redefine bitmap indexing as given below.

“Bitmap index stores the combination of value and list of rows as a digital image.”

The following are the different operations that can be performed on Hive indexes:

Creating index
Showing index
Alter index
Dropping index

Creating index:

CREATE INDEX index_name 
ON TABLE table_name (columns,....) 
AS 'org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler' 
WITH DEFERRED REBUILD;

The org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler’ line specifies that a built in CompactIndexHandler will act on the created index, which means we are creating a compact index for the table.
The WITH DEFERRED REBUILD statement should be present in the created index because we need to alter the index in later stages using this statement.

This syntax will create an index for our table, but to complete the creation, we need to complete the REBUILD statement. For this to happen, we need to add one more alter statement. A MapReduce job will be launched and the index creation is now completed.

ALTER INDEX index_nam on table_name REBUILD;

This ALTER statement will complete our REBUILDED index creation for the table.

hive> create table employee(ename STRING,eage INT,country STRING,year STRING,edept STRING) row format delimited fields terminated by '\t' stored as textfile;
hive> load data local inpath ‘path of your file’ into table employee;

Let’s perform an Average operation on this ‘employee’ data. Let’s calculate the average age of the employees using the following command:

hive> SELECT AVG(eage) from employee;

hive> CREATE INDEX employee_index ON TABLE employee (eage) AS 'org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler' WITH DEFERRED REBUILD;
hive> ALTER INDEX employee_index on employee REBUILD;
hive> show formatted index on employee;

hive> SELECT AVG(eage) from employee;

create a Bitmap index for the same table:

hive> CREATE INDEX employee_index_bitmap ON TABLE employee (eage) AS 'BITMAP' WITH DEFERRED REBUILD; 
hive> ALTER INDEX employee_index_bitmap on employee REBUILD;

hive> DROP INDEX IF EXISTS employee_index ON employee;

Indexes decrease the time for executing the query.
We can have any number of indexes on the same table.
We can use the type of index depending on the data we have.
In some cases, Bitmap indexes work faster than the Compact indexes and vice versa.

When not to use indexing?

It is essential to know when and where indexing shouldn’t be used. They should not be used in the following scenarios:

Indexes are advised to build on the columns on which you frequently perform operations.
Building more number of indexes also degrade the performance of your query.
Type of index to be created should be identified prior to its creation (if your data requires bitmap you should not create compact).This leads to increase in time for executing your query.





Transactions in Hive

Transactions in Hive are introduced in Hive 0.13, but they only partially fulfill the ACID properties like atomicity, consistency, durability, at the partition level. Here, Isolation can be provided by turning on one of the locking mechanisms available with zookeeper or in memory.

But in Hive 0.14, new API’s have been added to completely fulfill the ACID properties while performing any transaction.

Transactions are provided at the row-level in Hive 0.14. The different row level transactions available in Hive 0.14 are as follows:

Insert
Delete
Update

There are numerous limitations with the present transactions available in Hive 0.14. ORC is the file format supported by Hive transaction. It is now essential to have ORC file format for performing transactions in Hive. The table needs to be bucketed in order to support transactions.

Row-level Transactions Available in Hive 0.14

Before creating a Hive table that supports transactions, the transaction features present in Hive needs to be turned on, as by default they are turned off.

hive> set hive.support.concurrency = true;
hive>set hive.enforce.bucketing = true; 
hive>set hive.exec.dynamic.partition.mode = nonstrict; 
hive>set hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; 
hive>set hive.compactor.initiator.on = true; 
hive>set hive.compactor.worker.threads = a positive number on at least one instance of the Thrift metastore service;

If the above properties are not set properly, the ‘Insert’ operation will work but ‘Update’ and ‘Delete’ will not work

hive> CREATE TABLE emp(emp_id int,emp_name string,emp_loc string) clustered by (emp_id) into 5 buckets stored as orc TBLPROPERTIES('transactional'='true');

We are bucketing the table by ‘emp_id’ and the table format is ‘orc’, also we are enabling the transactions in the table by specifying it inside the TBLPROPERTIES as ‘transactional’=’true’

hive> INSERT INTO table emp values(1,'raju','hyd'),(2,'ramu','che'),(3,'venkat','che'),(4,'reddy','del'),(5,'stanford','uk'),(6,'JNTUA','atp'),(7,'cambridge','us');

The above command is used to insert row wise data into the Hive table. Here, each row is seperated by ‘( )’ brackets.

hive> select * from emp

hive> UPDATE emp set emp_id = 8 where emp_id = 7;

In this table, we have bucketed the ‘emp_id’ column and performing the Update operation on the same column, so we have go the error

hive> UPDATE emp set emp_name = 'Reddy' where emp_id = 6;

hive> select * from emp


hive> delete from emp where emp_id=5;



MySQL Metastore and Apache Derby

Hive stores the metadata related to tables and databases into the external RDBMS like Apache Derby or MYSQL and metadata.

Metastore and database: 

The metastore service provides the interface to the Hive.
The database stores the data definitions and mappings to the data.
The metastore (which consists of services and database) can be configured in different ways. Embedded Apache Derby is used as the default Hive metastore in the Hive configuration. This configuration is called embedded metastore and is good for the sake of development and unit testing, but won’t scale to a production environment as only a single user can connect to the derby database at any instant of time. Starting second instance of the Hive driver will throw an error message.

Apache Derby:

Apache Derby, an Apache DB subproject, is an open source relational database implemented entirely in Java. Some key features include:

Derby is based on the Java, JDBC, and SQL standards.
Derby provides an embedded JDBC driver that lets you embed Derby in any Java-based solution.
Derby also supports the more familiar client/server mode with the Derby Network Client JDBC driver and Derby Network Server.
Derby is easy to install, deploy, and use.
Most importantly Derby is single instance database, which means only one user can access the derby instance at one time and this had been a motivational factor to include Mysql as the default metastore.

Advantages Of using Mysql as a metastore in Hive-

It is Stable
It keeps a track of metadata.

It can support multiple instances of Hive.

In order to change the default metastore from Derby to Mysql we need to change the property in Hive-site.xml.


Since Hive-0.10, we get only hive-default.xml. We need to explicitly create Hive-site.xml to override the default property containing the configuration of Apache Derby.

Bucketing in Hive


 Partition helps in increasing the efficiency when performing a query on a table. Instead of scanning the whole table, it will only scan for the partitioned set and does not scan or operate on the unpartitioned sets, which helps us to provide results in lesser time and the details will be displayed very quickly because of Hive Partition.

Now, let’s assume a condition that there is a huge dataset. At times, even after partitioning on a particular field or fields, the partitioned file size doesn’t match with the actual expectation and remains huge and we want to manage the partition results into different parts. To overcome this problem of partitioning, Hive provides Bucketing concept, which allows user to divide table data sets into more manageable parts.

Thus, Bucketing helps user to maintain parts that are more manageable and user can set the size of the manageable parts or Buckets too.

Bucketing Features in Hive:

Hive partition divides table into number of partitions and these partitions can be further subdivided into more manageable parts known as Buckets or Clusters. The Bucketing concept is based on Hash function, which depends on the type of the bucketing column. Records which are bucketed by the same column will always be saved in the same bucket.

Here, CLUSTERED BY clause is used to divide the table into buckets.

In Hive Partition, each partition will be created as directory. But in Hive Buckets, each bucket will be created as file.

Bucketing can also be done even without partitioning on Hive tables.

Advantages of Bucketing:

Bucketed tables allows much more efficient sampling than the non-bucketed tables. With sampling, we can try out queries on a section of data for testing and debugging purpose when the original data sets are very huge. Here, the user can fix the size of buckets according to the need.

Bucketing concept also provides the flexibility to keep the records in each bucket to be sorted by one or more columns. Since the data files are equal sized parts, map-side joins will be faster on the bucketed tables.

hive> create input_table (street string, city string, Zip int, state string, sq_feet int, price int);
hive> load data inpath '/real_state.csv' into table input_table;

To populate the bucketed table, we have to set hive.enforce.bucketing property to ‘true’, so that the Hive knows to create the number of buckets declared in the table definition.

hive> set hive.exec.dynamic.partition=true;
hive> set hive.exec.dynamic.partition.mode=nonstrict;
hive> set hive.exec.max.dynamic.partitions=200000;
hive> set hive.exec.max.dynamic.partitions.pernode=200000;
hive> set hive.enforce.bucketing=true;

The property hive.enforce.bucketing = true is similar to hive.exec.dynamic.partition = true, in Hive partitioning. By setting this property, we will enable dynamic bucketing while loading data into the Hive table.

The above hive.enforce.bucketing = true property sets the number of reduce tasks to be equal to the number of buckets mentioned in the table definition (Which is ‘4’ in our case) and automatically selects the clustered by column from table definition.

hive> create bucket_table (street string, city string, Zip int, state string, sq_feet int, price int) partitioned by (city string) clustered by (street) into 4 buckets row format delimited fields terminated by ',';

we can see that we have created a new bucket table with name ‘bucket_table’, which is partitioned by ‘city’ and clustered by ‘street’ field with the bucket size of ‘4’.

After successfully inserting the contents of input_table to bucket_table, we can see the bucketed output result in the browser and can also download the required city partitioned bucketed files in our local file system.



Hive Partitioning:

partitioning:

Hive has been one of the preferred tool for performing queries on large datasets, especially when full table scan is done on the datasets.

In the case of tables which are not partitioned, all the files in a table’s data directory is read and then filters are applied on it as a subsequent phase. This becomes a slow and expensive affair especially in cases of large tables.

Without partitioning Hive reads all the data in the directory and applies the query filters on it. This is slow and expensive since all data has to be read.

Very often users need to filter the data on specific column values. To apply the partitioning in hive , users need to understand the domain of the data on which they are doing analysis.

With this knowledge, identification of the frequently queried or accessesd columns becomes easy and then partitioning feature of Hive can be applied on the selected columns.

Owing the fact that Partitions are horizontal slices of data,larger sets of data can be separated into more manageable chunks.

The problem with hive is that when we apply where clause then even a simple query in Hive reads the entire dataset.

This decreases the efficiency and becomes a bottleneck when we are required to run the queries on large tables.

This issue can be overcome by implementing partitions in hive.

When to use Hive Partitioning:

When any user wants data contained within a table to be split across multiple sections in hive table, use of partition is suggested.

The entries for the various columns of dataset are segregated and stored in their respective partition. When we write the query to fetch the values from table , only the required partitions of the table are queried, which reduces the time taken by query to yield the result.

Let’s take a scenario:

Data is present in hdfs coming in from various ecommerce companies.
We need to run the HiveQl queries on user buying pattern.
We need to analyse data coming in from last 10 days.
In the above scenario instead of running the queries which involves scanning of entire table, an approach should be followed where query runs on only last 10 days of data.

hive> create table user_partition(fname varchar(50), lname varchar(50), id varchar(50)) partitioned by (country varchar(50), state varchar(50)) row format delimited fields terminated by ',' stored as textfile;

Classification of partitioning:

Static partitioning
Dynamic partitioning

When to use static partitioning:

Static partitioning needs to be applied when we know data(supposed to be inserted) belongs to which partition.

hive> load data local inpath <path:file1> into table user_partition partition (country='us', state='ca');

hive> load data local inpath <path:file2> into table user_partition partition (country='ind', state='ap');

Retrieving the user information:

If anyone wants to retrieve the user information belonging to partition with country  us and state ca, query needs to be written as shown below.

hive> select * from user_partition where user_partition.country='us' and user_partition.state='ca';
hive> select * from user_partition where user_partition.country='ind' and user_partition.state='ap';

When to use dynamic partitioning:

In static partitioning every partitioning needs to be backed with individual hive statement which is not feasible for large number of partitions as it will require writing of lot of hive statements.

In that scenario dynamic partitioning is suggested as we can create as many number of partitions with single hive statement.

Dynamic partitioning:

We need to create the partitioned table user_partition

hive> create table user_partition(fname varchar(50), lname varchar(50), id varchar(50)) partitioned by (country varchar(50), state varchar(50)) row format delimited fields terminated by ',' stored as textfile;

In next phase user1 table needs to be created with all the columns including country and state.

hive> create table user_partition1(fname varchar(50), lname varchar(50), id varchar(50), country string, state string) row format delimited fields terminated by ',' stored as textfile;

venkat, reddy, 100, ind, ap
raju, reddy, 101, ind, ap
kiran, rao, 102, us, ca
siva, r, 103, us, ca

The above dataset needs to be copied into table user_partition1.

hive> load data local inpath <path:file1> into table user_partition1;

Setting of parameters for dynamic partitioning:

To use the dynamic partitioning in hive we need to set the below parameters in hive shell or in hive-site.xml file.

hive> set hive.exec.dynamic.partition=true;
hive> set hive.exec.dynamic.partition.mode=nonstrict;

Retrieving data from partitioned table:

We need to copy the file from user_partition1 to partitioned table user_partition and then retriving the data from it all together using insert and select statement in one hive statement.

hive> insert into table user_partition (country,state) select fname, lname, id, country, state from user_partition1;   

Type of tables in Hive

Hive has two types of tables:

Managed table
External table

Managed table:
Managed table is also called as Internal table. This is the default table in Hive. When we create a table in Hive without specifying it as a managed or external, by default we will get a Managed table.

If we create a table as a managed table, the table will be created in a specific location in HDFS.
By default the table data will be created in /usr/hive/warehouse directory of HDFS.
If we delete a Managed table, both the table data and meta data for that table will be deleted from the HDFS.

hive>create table employee(ename String, esal Int) row format delimited fields terminated by ',';

hive>load data local inpath <path> into table employee;

Check the contents of the table in HDFS by using the below command:

>hadoop dfs -ls hdfs://localhost:9000/user/hive/warehouse/employee

EXTERNAL TABLE:

External table is created for external use as when the data is used outside Hive. Whenever we want to delete the table’s metadata and want to keep the table’s data as it is, we use External table. External table only deletes the schema of the table.

hive> create external table employee_ext(ename String, esal Int) row format delimited fields terminated by ',';

hive>load data local inpath <hive> into table employee_ext;

check the HDFS location of the table using the below command:

>hadoop dfs -ls hdfs://localhost:9000/user/hive/warehouse/employee_ext

When to use External and Managed table

Managed table

Data is temporary

Hive to Manage the table data completely not allowing any external source to use the table

Don’t want data after deletion

External table

The data is also used outside of Hive. For example, the data files are read and processed by an existing program that doesn’t lock the files

Hive should not own data and control settings, dirs, etc., you have another program or process that will do those things

You are not creating table based on existing table (AS SELECT)


Can create table back and with the same schema and point the location of the data



Hive Shell

Hive provides a default interface, where it allows users to run Hive commands. The CLI (Command Line Interface) is the default Hive shell service which allows users to work on Hive programs.

Creating a Database:

hive> create databse sampledb;

Listing Databases:


hive> SHOW DATABASES;

Using a Database:

hive> use sampledb;

Creating a Table:

The create table command allows the user to create a new table with user input attributes/columns
Row format delimited Fields terminated by ‘\t’ – This line informsHive that each column in the file is separated by a tab.

hive> Create table emp(empid int, empname string, empsal float)
        > row format delimited

        > fields terminated by ‘\t’ ;

List Tables:

The ‘show tables’ command displays the list of tables present in a particular database.


Hive> show Tables;

Describe Schema of the Table:

hive>DESCRIBE emp;

Load a File from the Local File System:

hive>load data local inpath<filename> into table emp;

Load File from HDFS:

hive>load data inpath<filename> into table<tablename>

Show Table Contents:

hive>select * from emp;

Alter Commands:

hive> ALTER TABLE EMP RENAME TO EMP_1

Adding New Columns to an Existing Table:

hive> alter table emp_1 add columns (DOB DATE);

Truncating a Table:

hive> truncate table emp_1;

Dropping a Database:


hive> drop databse sampledb;

Tuesday, April 12, 2016

File Formats in Apache Hive

File Format:

A file format is the way in which information is stored or encoded in a computer file. In Hive it refers to how records are stored inside the file. As we are dealing with structured data, each record has to be its own structure. How records are encoded in a file defines a file format.
These file formats mainly varies between data encoding, compression rate, usage of space and disk I/O.

Hive does not verify whether the data that you are loading matches the schema for the table or not. However, it verifies if the file format matches the table definition or not.

There are some specific file formats which Hive can handle such as:

• TEXTFILE
• SEQUENCEFILE
• RCFILE
• ORCFILE

TEXTFILE:

TEXTFILE format is a famous input/output format used in Hadoop. In Hive if we define a table as TEXTFILE it can load data of form CSV (Comma Separated Values), delimited by Tabs, Spaces and JSON data. This means fields in each record should be separated by comma or space or tab or it may be JSON(Java Script Object Notation) data.

By default if we use TEXTFILE format then each line is considered as a record.

The TEXTFILE input and TEXTFILE output format is present in the Hadoop package as shown below:

org.apache.hadoop.mapred.TextInputFormat
org.apache.hadoop.mapred.TextOutputFormat

example in Hive about how to create TEXTFILE table format, how to load data into TEXTFILE format and perform one basic select operation in Hive.

create table employee(ename STRING,eage INT,country STRING,year STRING,edept STRING) row format delimited fields terminated by '\t' stored as textfile;

We can load data into the created table as follows:

load data local inpath ‘path of your file’ into table employee;



SEQUENCEFILE:

We know that Hadoop’s performance is drawn out when we work with small number of files with big size rather than large number of files with small size. If the size of a file is smaller than the typical block size in Hadoop, we consider it as a small file. Due to this, the amount of metadata increases which will become an overhead to the NameNode. To solve this problem sequence files are introduced in Hadoop. Sequence files acts as a container to store the small files.

Sequence files are flat files consisting of binary key-value pairs. When Hive converts queries to MapReduce jobs, it decides on the appropriate key-value pairs to be used for a given record. Sequence files are in binary format which are able to split and the main use of these files is to club two or more smaller files and make them as a one sequence file.

In Hive we can create a sequence file by specifying STORED AS SEQUENCEFILE in the end of a CREATE TABLE statement.
There are three types of sequence files:
• Uncompressed key/value records.
• Record compressed key/value records – only ‘values’ are compressed here
• Block compressed key/value records – both keys and values are collected in ‘blocks’ separately and compressed. The size of the ‘block’ is configurable.

Hive has its own SEQUENCEFILE reader and SEQUENCEFILE writer for reading and writing through sequence files.

Hive uses the SEQUENCEFILE input and output formats from the following packages:

org.apache.hadoop.mapred.SequenceFileInputFormat
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat


create table employee_sequencefile(ename STRING,eage INT,country STRING,year STRING,edept STRING) row format delimited fields terminated by '\t' stored as textfile;

Now to load data into this table is somewhat different from loading into the table created using TEXTFILE format. You need to insert the data from another table because this SEQUENCEFILE format is binary format. It compresses the data and then stores it into the table. If you want to load directly as in TEXTFILE format that is not possible because we cannot insert the compressed files into tables.

So to load the data into SEQUENCEFILE we need to use the following approach:

INSERT OVERWRITE TABLE employee_sequencefile SELECT * FROM employee;

RCFILE:

RCFILE stands of Record Columnar File which is another type of binary file format which offers high compression rate on the top of the rows.
RCFILE is used when we want to perform operations on multiple rows at a time.
RCFILEs are flat files consisting of binary key/value pairs, which shares much similarity with SEQUENCEFILE. RCFILE stores columns of a table in form of record in a columnar manner. It first partitions rows horizontally into row splits and then it vertically partitions each row split in a columnar way. RCFILE first stores the metadata of a row split, as the key part of a record, and all the data of a row split as the value part. This means that RCFILE encourages column oriented storage rather than row oriented storage.
This column oriented storage is very useful while performing analytics. It is easy to perform analytics when we “hive’ a column oriented storage type.
Facebook uses RCFILE as its default file format for storing of data in their data warehouse as they perform different types of analytics using Hive.

Hive has its own RCFILE Input format and RCFILE output format in its default package:

org.apache.hadoop.hive.ql.io.RCFileInputFormat
org.apache.hadoop.hive.ql.io.RCFileOutputFormat

create table employee_rcfile(ename STRING,eage INT,country STRING,year STRING,edept STRING) row format delimited fields terminated by '\t' stored as textfile;

We cannot load data into RCFILE directly. First we need to load data into another table and then we need to overwrite it into our newly created RCFILE as shown below:

INSERT OVERWRITE TABLE employee_rcfile SELECT * FROM employee;

ORCFILE:

ORC stands for Optimized Row Columnar which means it can store data in an optimized way than the other file formats. ORC reduces the size of the original data up to 75%. As a result the speed of data processing also increases. ORC shows better performance than Text, Sequence and RC file formats.
An ORC file contains rows data in groups called as Stripes along with a file footer. ORC format improves the performance when Hive is processing the data.

Hive has its own ORCFILE Input format and ORCFILE output format in its default package:

 org.apache.hadoop.hive.ql.io.orc

create table employee_orcfile(ename STRING,eage INT,country STRING,year STRING,edept STRING) row format delimited fields terminated by '\t' stored as textfile;

We cannot load data into ORCFILE directly. First we need to load data into another table and then we need to overwrite it into our newly created ORCFILE.

INSERT OVERWRITE TABLE employee_orcfile SELECT * FROM employee;

Thus you can use the above four file formats depending on your data.
For example,
a) If your data is delimited by some parameters then you can use TEXTFILE format.
b) If your data is in small files whose size is less than the block size then you can use SEQUENCEFILE format.
c) If you want to perform analytics on your data and you want to store your data efficiently for that then you can use RCFILE format.
d) If you want to store your data in an optimized way which lessens your storage and increases your performance then you can use ORCFILE format.
Hope with this Blog you now have a clear picture as to which File Format to use in Hive depending on your data.

Apache Hive

Apache Hive is an open source data warehouse software that facilitates querying and managing of large datasets residing in distributed storage. Hive provides a language called HiveQL which allows users to query and is similar to SQL.

Like SQL, HiveQL handles structured data only. By default Hive has derby database to store the data in it. We can configure Hive with MySQL database. As mentioned HiveQL can handle only structured data. Data is eventually stored in files.

Thursday, February 25, 2016

Read / Write Operations In HDFS

Read Operation: 

Data read request is served by HDFS, NameNode and DataNode. Let's call reader as a 'client'. Below diagram depicts file read operation in Hadoop.


1. Client initiates read request by calling 'open()' method of FileSystem object; it is an object of type DistributedFileSystem.

2. This object connects to namenode using RPC and gets metadata information such as the locations of the blocks of the file. Please note that these addresses are of first few block of file.
3. In response to this metadata request, addresses of the DataNodes having copy of that block, is returned back.
4. Once addresses of DataNodes are received, an object of type FSDataInputStream is returned to the client. FSDataInputStream contains DFSInputStream which takes care of interactions with DataNode and NameNode. In step 4 shown in above diagram, client invokes 'read()' method which causes DFSInputStream to establish a connection with the first DataNode with the first block of file.
5. Data is read in the form of streams wherein client invokes 'read()' method repeatedly. This process of read() operation continues till it reaches end of block.
6. Once end of block is reached, DFSInputStream closes the connection and moves on to locate the next DataNode for the next block
7. Once client has done with the reading, it calls close() method.


Write Operation :

In this section, we will understand how data is written into HDFS through files.




1. Client initiates write operation by calling 'create()' method of DistributedFileSystem object which creates a new file - Step no. 1 in above diagram.
2. DistributedFileSystem object connects to the NameNode using RPC call and initiates new file creation. However, this file create operation does not associate any blocks with the file. It is the responsibility of NameNode to verify that the file (which is being created) does not exist already and client has correct permissions to create new file. If file already exists or client does not have sufficient permission to create a new file, then IOException is thrown to client. Otherwise, operation succeeds and a new record for the file is created by the NameNode.
3. Once new record in NameNode is created, an object of type FSDataOutputStream is returned to the client. Client uses it to write data into the HDFS. Data write method is invoked (step 3 in diagram).
4. FSDataOutputStream contains DFSOutputStream object which looks after communication with DataNodes and NameNode. While client continues writing data, DFSOutputStream continues creating packets with this data. These packets are en-queued into a queue which is called as DataQueue.
5. There is one more component called DataStreamer which consumes this DataQueue. DataStreamer also asks NameNode for allocation of new blocks thereby picking desirable DataNodes to be used for replication.
6. Now, the process of replication starts by creating a pipeline using DataNodes. In our case, we have chosen replication level of 3 and hence there are 3 DataNodes in the pipeline.
7. The DataStreamer pours packets into the first DataNode in the pipeline.
8. Every DataNode in a pipeline stores packet received by it and forwards the same to the second DataNode in pipeline.
9. Another queue, 'Ack Queue' is maintained by DFSOutputStream to store packets which are waiting for acknowledgement from DataNodes.
10. Once acknowledgement for a packet in queue is received from all DataNodes in the pipeline, it is removed from the 'Ack Queue'. In the event of any DataNode failure, packets from this queue are used to reinitiate the operation.
11. After client is done with the writing data, it calls close() method (Step 9 in the diagram) Call to close(), results into flushing remaining data packets to the pipeline followed by waiting for acknowledgement.
12. Once final acknowledgement is received, NameNode is contacted to tell it that the file write operation is complete.

HDFS Shell commands

help: Get help for HDFS shell command

./hadoop dfs -help /<<HDFS Shell command>>
./hadoop dfs -help cat


mkdir: Create a directory on HDFS:

./hadoop dfs -mkdir /<<Folder name>>
./hadoop dfs -mkdir /Data


ls: List your directories/Files on HDFS:

./hadoop dfs -ls /<<HDFS Path>>
./hadoop dfs -ls /Data


copyFromLocal: Copy data from Local FS to HDFS

./hadoop dfs -copyFromLocal /<<Local Path>> /<<HDFS Path>>
./hadoop dfs -copyFromLocal /home/username/Sample.txt /Data


put: Copy file from local filesystem to hdfs

./hadoop dfs -put localfile /dir/file1
– Can also use copyFromLocal


copyToLocal: Copy data from HDFS to Local FS

./hadoop dfs -copyToLocal /<<HDFS Path>> /<<Local Path>>
./hadoop dfs -copyToLocal /Data/Sample.txt /home/username


get: Copy file to the local filesystem

./hadoop dfs -get /dir/file localfile
– Can also use copyToLocal


cp: Copy data from HDFS to HDFS

./hadoop dfs -cp /<<HDFS Path>> /<<HDFS Path>>
./hadoop dfs -cp /Data/Sample.txt /Data1


mv: Move data from one folder to another in HDFS

./hadoop dfs -mv /<<HDFS Path>> /<<HDFS Path>>
./hadoop dfs -mv /Data/Sample.txt /Data1


cat: View content of a file on HDFS

./hadoop dfs -cat /<<HDFS Path>>
./hadoop dfs -cat /Data/Sample.txt


rm: Delete file from HDFS

./hadoop dfs -rm /<<HDFS Path>>
./hadoop dfs -rm /Data/Sample.txt


du: Displays length for each file/dir

./hadoop dfs -du /someDir/


getmerge: Merge file in HDFS

./hadoop dfs -getmerge <<HDFS Path>> <<Local Path>>
./hadoop dfs -getmerge /Data/Sample.txt /home/user/test.txt 

Thursday, February 18, 2016

HBase Architecture

HBase is composed of three types of servers in a master slave type of architecture. Region servers serve data for reads and writes. When accessing data, clients communicate with HBase RegionServers directly. Region assignment, DDL (create, delete tables) operations are handled by the HBase Master process. Zookeeper, which is part of HDFS, maintains a live cluster state.
The Hadoop DataNode stores the data that the Region Server is managing. All HBase data is stored in HDFS files. Region Servers are collocated with the HDFS DataNodes, which enable data locality (putting the data close to where it is needed) for the data served by the RegionServers. HBase data is local when it is written, but when a region is moved, it is not local until compaction.
The NameNode maintains metadata information for all the physical data blocks that comprise the files.

Regions
HBase Tables are divided horizontally by row key range into “Regions.” A region contains all rows in the table between the region’s start key and end key. Regions are assigned to the nodes in the cluster, called “Region Servers,” and these serve data for reads and writes. A region server can serve about 1,000 regions.

HMaster
Region assignment, DDL (create, delete tables) operations are handled by the HBase Master.
A master is responsible for:
·         Coordinating the region servers
- Assigning regions on startup , re-assigning regions for recovery or load balancing
- Monitoring all RegionServer instances in the cluster (listens for notifications from zookeeper)
·         Admin functions
- Interface for creating, deleting, updating tables

ZooKeeper: The Coordinator
HBase uses ZooKeeper as a distributed coordination service to maintain server state in the cluster. Zookeeper maintains which servers are alive and available, and provides server failure notification. Zookeeper uses consensus to guarantee common shared state. Note that there should be three or five machines for consensus.

How the Components Work Together
Zookeeper is used to coordinate shared state information for members of distributed systems. Region servers and the active HMaster connect with a session to ZooKeeper. The ZooKeeper maintains ephemeral nodes for active sessions via heartbeats.

Each Region Server creates an ephemeral node. The HMaster monitors these nodes to discover available region servers, and it also monitors these nodes for server failures. HMasters vie to create an ephemeral node. Zookeeper determines the first one and uses it to make sure that only one master is active. The active HMaster sends heartbeats to Zookeeper, and the inactive HMaster listens for notifications of the active HMaster failure.
If a region server or the active HMaster fails to send a heartbeat, the session is expired and the corresponding ephemeral node is deleted. Listeners for updates will be notified of the deleted nodes. The active HMaster listens for region servers, and will recover region servers on failure. The Inactive HMaster listens for active HMaster failure, and if an active HMaster fails, the inactive HMaster becomes active.
HBase First Read or Write
There is a special HBase Catalog table called the META table, which holds the location of the regions in the cluster. ZooKeeper stores the location of the META table.
This is what happens the first time a client reads or writes to HBase:
1.    The client gets the Region server that hosts the META table from ZooKeeper.
2.    The client will query the .META. server to get the region server corresponding to the row key it wants to access. The client caches this information along with the META table location.
3.    It will get the Row from the corresponding Region Server.
For future reads, the client uses the cache to retrieve the META location and previously read row keys. Over time, it does not need to query the META table, unless there is a miss because a region has moved; then it will re-query and update the cache.

HBase Meta Table
·         This META table is an HBase table that keeps a list of all regions in the system.
·         The .META. table is like a b tree.
·         The .META. table structure is as follows:
- Key: region start key,region id
- Values: RegionServer

Region Server Components
A Region Server runs on an HDFS data node and has the following components:
·         WAL: Write Ahead Log is a file on the distributed file system. The WAL is used to store new data that hasn't yet been persisted to permanent storage; it is used for recovery in the case of failure.
·         BlockCache: is the read cache. It stores frequently read data in memory. Least Recently Used data is evicted when full.
·         MemStore: is the write cache. It stores new data which has not yet been written to disk. It is sorted before writing to disk. There is one MemStore per column family per region.
·         Hfiles store the rows as sorted KeyValues on disk.

HBase Write Steps (1)
When the client issues a Put request, the first step is to write the data to the write-ahead log, the WAL:
- Edits are appended to the end of the WAL file that is stored on disk.
- The WAL is used to recover not-yet-persisted data in case a server crashes.

HBase Write Steps (2)
Once the data is written to the WAL, it is placed in the MemStore. Then, the put request acknowledgement returns to the client.

HBase MemStore
The MemStore stores updates in memory as sorted KeyValues, the same as it would be stored in an HFile. There is one MemStore per column family. The updates are sorted per column family.

HBase Region Flush
When the MemStore accumulates enough data, the entire sorted set is written to a new HFile in HDFS. HBase uses multiple HFiles per column family, which contain the actual cells, or KeyValue instances. These files are created over time as KeyValue edits sorted in the MemStores are flushed as files to disk.
Note that this is one reason why there is a limit to the number of column families in HBase. There is one MemStore per CF; when one is full, they all flush. It also saves the last written sequence number so the system knows what was persisted so far.
The highest sequence number is stored as a meta field in each HFile, to reflect where persisting has ended and where to continue. On region startup, the sequence number is read, and the highest is used as the sequence number for new edits.

HBase HFile
Data is stored in an HFile which contains sorted key/values. When the MemStore accumulates enough data, the entire sorted KeyValue set is written to a new HFile in HDFS. This is a sequential write. It is very fast, as it avoids moving the disk drive head.

HBase HFile Structure
An HFile contains a multi-layered index which allows HBase to seek to the data without having to read the whole file. The multi-level index is like a b+tree:
·      Key value pairs are stored in increasing order
·      Indexes point by row key to the key value data in 64KB “blocks”
·      Each block has its own leaf-index
·      The last key of each block is put in the intermediate index
·      The root index points to the intermediate index
The trailer points to the meta blocks, and is written at the end of persisting the data to the file. The trailer also has information like bloom filters and time range info. Bloom filters help to skip files that do not contain a certain row key. The time range info is useful for skipping the file if it is not in the time range the read is looking for.

HFile Index
The index, which we just discussed, is loaded when the HFile is opened and kept in memory. This allows lookups to be performed with a single disk seek.

HBase Read Merge
We have seen that the KeyValue cells corresponding to one row can be in multiple places, row cells already persisted are in Hfiles, recently updated cells are in the MemStore, and recently read cells are in the Block cache. So when you read a row, how does the system get the corresponding cells to return? A Read merges Key Values from the block cache, MemStore, and HFiles in the following steps:
1.  First, the scanner looks for the Row cells in the Block cache - the read cache. Recently Read Key Values are cached here, and Least Recently Used are evicted when memory is needed.
2.  Next, the scanner looks in the MemStore, the write cache in memory containing the most recent writes.
3.  If the scanner does not find all of the row cells in the MemStore and Block Cache, then HBase will use the Block Cache indexes and bloom filters to load HFiles into memory, which may contain the target row cells.

HBase Read Merge
As discussed earlier, there may be many HFiles per MemStore, which means for a read, multiple files may have to be examined, which can affect the performance. This is called read amplification.

HBase Minor Compaction
HBase will automatically pick some smaller HFiles and rewrite them into fewer bigger Hfiles. This process is called minor compaction. Minor compaction reduces the number of storage files by rewriting smaller files into fewer but larger ones, performing a merge sort.

HBase Major Compaction
Major compaction merges and rewrites all the HFiles in a region to one HFile per column family, and in the process, drops deleted or expired cells. This improves read performance; however, since major compaction rewrites all of the files, lots of disk I/O and network traffic might occur during the process. This is called write amplification.
Major compactions can be scheduled to run automatically. Due to write amplification, major compactions are usually scheduled for weekends or evenings. Note that MapR-DB has made improvements and does not need to do compactions. A major compaction also makes any data files that were remote, due to server failure or load balancing, local to the region server.

Region = Contiguous Keys
Let’s do a quick review of regions:
·      A table can be divided horizontally into one or more regions. A region contains a contiguous, sorted range of rows between a start key and an end key
·      Each region is 1GB in size (default)
·      A region of a table is served to the client by a RegionServer
·      A region server can serve about 1,000 regions (which may belong to the same table or different tables)

Region Split
Initially there is one region per table. When a region grows too large, it splits into two child regions. Both child regions, representing one-half of the original region, are opened in parallel on the same Region server, and then the split is reported to the HMaster. For load balancing reasons, the HMaster may schedule for new regions to be moved off to other servers.

Read Load Balancing
Splitting happens initially on the same region server, but for load balancing reasons, the HMaster may schedule for new regions to be moved off to other servers. This results in the new Region server serving data from a remote HDFS node until a major compaction moves the data files to the Regions server’s local node. HBase data is local when it is written, but when a region is moved (for load balancing or recovery), it is not local until major compaction.

HDFS Data Replication
All writes and Reads are to/from the primary node. HDFS replicates the WAL and HFile blocks. HFile block replication happens automatically. HBase relies on HDFS to provide the data safety as it stores its files. When data is written in HDFS, one copy is written locally, and then it is replicated to a secondary node, and a third copy is written to a tertiary node.

HDFS Data Replication (2)
The WAL file and the Hfiles are persisted on disk and replicated, so how does HBase recover the MemStore updates not persisted to HFiles? See the next section for the answer.

HBase Crash Recovery
When a RegionServer fails, Crashed Regions are unavailable until detection and recovery steps have happened. Zookeeper will determine Node failure when it loses region server heart beats. The HMaster will then be notified that the Region Server has failed.
When the HMaster detects that a region server has crashed, the HMaster reassigns the regions from the crashed server to active Region servers. In order to recover the crashed region server’s memstore edits that were not flushed to disk. The HMaster splits the WAL belonging to the crashed region server into separate files and stores these file in the new region servers’ data nodes. Each Region Server then replays the WAL from the respective split WAL, to rebuild the memstore for that region.

Data Recovery
WAL files contain a list of edits, with one edit representing a single put or delete. Edits are written chronologically, so, for persistence, additions are appended to the end of the WAL file that is stored on disk.
What happens if there is a failure when the data is still in memory and not persisted to an HFile? The WAL is replayed. Replaying a WAL is done by reading the WAL, adding and sorting the contained edits to the current MemStore. At the end, the MemStore is flush to write changes to an HFile.

Apache HBase Architecture Benefits
HBase provides the following benefits:
·      Strong consistency model
- When a write returns, all readers will see same value
·      Scales automatically
- Regions split when data grows too large
- Uses HDFS to spread and replicate data
·      Built-in recovery
- Using Write Ahead Log (similar to journaling on file system)
·      Integrated with Hadoop
- MapReduce on HBase is straightforward
Apache HBase Has Problems Too…
·      Business continuity reliability:
- WAL replay slow
- Slow complex crash recovery
- Major Compaction I/O storms