Thursday, April 14, 2016

Apache Sqoop


Sqoop is a tool designed to transfer data between Hadoop and relational databases or mainframes. You can use Sqoop to import data from a relational database management system (RDBMS) such as MySQL or Oracle or a mainframe into the Hadoop Distributed File System (HDFS), transform the data in Hadoop MapReduce, and then export the data back into an RDBMS.

Sqoop automates most of this process, relying on the database to describe the schema for the data to be imported. Sqoop uses MapReduce to import and export the data, which provides parallel operation as well as fault tolerance.

Importing a Table from RDBMS to HDFS:

Sqoop: sqoop import --connect jdbc:mysql://localhost/TestDb --username root --password root --table employee -m1

Here we are connecting to MySQL through JDBC connectors and using the database TestDb. Here it is necessary to specify the MySQL ‘s username and password and the table name.

Here ‘-m’ specifies the number of map task that can be run simultaneously and ‘m1’ means that only one map task can run.

If we do not use -m1 at the end of the statement, for each record in the MySQL table we will get separate files in the HDFS.

Now the data in RDBMS has been successfully imported into HDFS. By default, the files will be stored here: /user/$user_name/table_name/part-m-00000 file.

Importing all the Tables in a Database to HDFS

Sqoop: sqoop import-all-tables --connect jdbc:mysql://localhost/TestDb --username root --password root -m1

Importing table data into a Specific Directory in HDFS

Sqoop: sqoop import --connect jdbc:mysql://localhost/TestDb --username root --password root --table employee -m1 --target-dir /sqoop_data/employee/

Importing Table as a Sequence File into HDFS

Sqoop: sqoop import --connect jdbc:mysql://localhost/TestDb --username root --password root --table employee -m1 --target-dir /sqoop_data/employee/squence/ --as-sequencefile

As the sequence file stores the contents in binary format, we will get the binary output.

Importing Table as a Avro File into HDFS


Sqoop: sqoop import --connect jdbc:mysql://localhost/TestDb --username root --password root --table employee -m1 --target-dir /sqoop_data/employee/avro/ --as-avrodatafile

When storing a file using Avro file format, we will get the output file with .avro extension and the contents inside the file will be in binary format. 

Export Data from HDFS to RDBMS

First create a table in MySQL named as employee1
Sqoop: sqoop export --connect jdbc:mysql://localhost/TestDb --table employee1 --export-dir /employee1.csv

Incremental Import in Sqoop To Load Data From Mysql To HDFS

Sqoop supports two types of incremental imports: append and lastmodified. You can use the –incremental argument to specify the type of incremental import to perform.

You should specify the append mode when importing a table, where new rows are continually added with increasing row id values. You must specify the column containing the row’s id with –check-column. Sqoop imports rows where the check column has a value greater than the one specified with –last-value.

An alternate table update strategy supported by Sqoop is called lastmodified mode. This should be used when rows of the source table is updated, and each such update will set the value of a last-modified column to the current timestamp. Rows where the check column holds a timestamp more recent than the timestamp specified with –last-value are imported.

At the end of an incremental import, the value which should be specified as –last-value for a subsequent import is printed to the screen. When running a subsequent import, you should specify –last-value in this way to ensure you import only the new or updated data. This is handled automatically by creating an incremental import as a saved job, which is the preferred mechanism for performing a recurring incremental import.

Sqoop: Sqoop import --connect jdbc:mysql://localhost/TestDb --username root --password root --sales_table -m1 --tagret-dir /sqoopout

Currently 2 records imported successfully from MySQL to HDFS
1, table, 1000
2, sofa, 5000

Insert 3 more records through mysql
mysql> insert into sales_table values(3, 'window', 3000),(4, 'fan', 2000),(5, 'tv',8000);

now 5 records available in sales_table

The following syntax is used for the incremental option in Sqoop import command.

Sqoop: Sqoop import --connect jdbc:mysql://localhost/TestDb --username root --password root --sales_table -m1 --tagret-dir /sqoopout/ --incremental append --check-column ord_id --last-value 2

3 more records have been retrieved and the incremental import is now complete. 

Wednesday, April 13, 2016

Hive Indexing

Index:

An Index acts as a reference to the records. Instead of searching all the records, we can refer to the index to search for a particular record. Indexes maintain the reference of the records. So that it is easy to search for a record with minimum overhead. Indexes also speed up the searching of data.

Indexing in Hive:

Hive is a data warehousing tool present on the top of Hadoop, which provides the SQL kind of interface to perform queries on large data sets. Since Hive deals with Big Data, the size of files is naturally large and can span up to Terabytes and Petabytes. Now if we want to perform any operation or a query on this huge amount of data it will take large amount of time.

In a Hive table, there are many numbers of rows and columns. If we want to perform queries only on some columns without indexing, it will take large amount of time because queries will be executed on all the columns present in the table.

The major advantage of using indexing is; whenever we perform a query on a table that has an index, there is no need for the query to scan all the rows in the table. Further, it checks the index first and then goes to the particular column and performs the operation.

So if we maintain indexes, it will be easier for Hive query to look into the indexes first and then perform the needed operations within less amount of time.

Eventually, time is the only factor that everyone focuses on.

When to use Indexing:

Indexing can be use under the following circumstances:

If the dataset is very large.
If the query execution is more amount of time than you expected.
If a speedy query execution is required.
When building a data model.
Indexes are maintained in a separate table in Hive so that it won’t affect the data inside the table, which contains the data. Another major advantage for indexing in Hive is that indexes can also be partitioned depending on the size of the data we have.

Types of Indexes in Hive:

Compact Indexing
Bitmap Indexing

Differences between Compact and Bitmap Indexing:

The main difference is the storing of the mapped values of the rows in the different blocks. When the data inside a Hive table is stored by default in the HDFS, they are distributed across the nodes in a cluster. There needs to be a proper identification of the data, like the data in block indexing. This data will be able to identity which row is present in which block, so that when a query is triggered it can go directly into that block. So, while performing a query, it will first check the index and then go directly into that block.

Compact indexing stores the pair of indexed column’s value and its blockid.

Bitmap indexing stores the combination of indexed column value and list of rows as a bitmap.

Bitmap Indexing:

A bitmap is is a type of memory organization or image file format used to store digital images so with this meaning of bitmap, we can redefine bitmap indexing as given below.

“Bitmap index stores the combination of value and list of rows as a digital image.”

The following are the different operations that can be performed on Hive indexes:

Creating index
Showing index
Alter index
Dropping index

Creating index:

CREATE INDEX index_name 
ON TABLE table_name (columns,....) 
AS 'org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler' 
WITH DEFERRED REBUILD;

The org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler’ line specifies that a built in CompactIndexHandler will act on the created index, which means we are creating a compact index for the table.
The WITH DEFERRED REBUILD statement should be present in the created index because we need to alter the index in later stages using this statement.

This syntax will create an index for our table, but to complete the creation, we need to complete the REBUILD statement. For this to happen, we need to add one more alter statement. A MapReduce job will be launched and the index creation is now completed.

ALTER INDEX index_nam on table_name REBUILD;

This ALTER statement will complete our REBUILDED index creation for the table.

hive> create table employee(ename STRING,eage INT,country STRING,year STRING,edept STRING) row format delimited fields terminated by '\t' stored as textfile;
hive> load data local inpath ‘path of your file’ into table employee;

Let’s perform an Average operation on this ‘employee’ data. Let’s calculate the average age of the employees using the following command:

hive> SELECT AVG(eage) from employee;

hive> CREATE INDEX employee_index ON TABLE employee (eage) AS 'org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler' WITH DEFERRED REBUILD;
hive> ALTER INDEX employee_index on employee REBUILD;
hive> show formatted index on employee;

hive> SELECT AVG(eage) from employee;

create a Bitmap index for the same table:

hive> CREATE INDEX employee_index_bitmap ON TABLE employee (eage) AS 'BITMAP' WITH DEFERRED REBUILD; 
hive> ALTER INDEX employee_index_bitmap on employee REBUILD;

hive> DROP INDEX IF EXISTS employee_index ON employee;

Indexes decrease the time for executing the query.
We can have any number of indexes on the same table.
We can use the type of index depending on the data we have.
In some cases, Bitmap indexes work faster than the Compact indexes and vice versa.

When not to use indexing?

It is essential to know when and where indexing shouldn’t be used. They should not be used in the following scenarios:

Indexes are advised to build on the columns on which you frequently perform operations.
Building more number of indexes also degrade the performance of your query.
Type of index to be created should be identified prior to its creation (if your data requires bitmap you should not create compact).This leads to increase in time for executing your query.





Transactions in Hive

Transactions in Hive are introduced in Hive 0.13, but they only partially fulfill the ACID properties like atomicity, consistency, durability, at the partition level. Here, Isolation can be provided by turning on one of the locking mechanisms available with zookeeper or in memory.

But in Hive 0.14, new API’s have been added to completely fulfill the ACID properties while performing any transaction.

Transactions are provided at the row-level in Hive 0.14. The different row level transactions available in Hive 0.14 are as follows:

Insert
Delete
Update

There are numerous limitations with the present transactions available in Hive 0.14. ORC is the file format supported by Hive transaction. It is now essential to have ORC file format for performing transactions in Hive. The table needs to be bucketed in order to support transactions.

Row-level Transactions Available in Hive 0.14

Before creating a Hive table that supports transactions, the transaction features present in Hive needs to be turned on, as by default they are turned off.

hive> set hive.support.concurrency = true;
hive>set hive.enforce.bucketing = true; 
hive>set hive.exec.dynamic.partition.mode = nonstrict; 
hive>set hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; 
hive>set hive.compactor.initiator.on = true; 
hive>set hive.compactor.worker.threads = a positive number on at least one instance of the Thrift metastore service;

If the above properties are not set properly, the ‘Insert’ operation will work but ‘Update’ and ‘Delete’ will not work

hive> CREATE TABLE emp(emp_id int,emp_name string,emp_loc string) clustered by (emp_id) into 5 buckets stored as orc TBLPROPERTIES('transactional'='true');

We are bucketing the table by ‘emp_id’ and the table format is ‘orc’, also we are enabling the transactions in the table by specifying it inside the TBLPROPERTIES as ‘transactional’=’true’

hive> INSERT INTO table emp values(1,'raju','hyd'),(2,'ramu','che'),(3,'venkat','che'),(4,'reddy','del'),(5,'stanford','uk'),(6,'JNTUA','atp'),(7,'cambridge','us');

The above command is used to insert row wise data into the Hive table. Here, each row is seperated by ‘( )’ brackets.

hive> select * from emp

hive> UPDATE emp set emp_id = 8 where emp_id = 7;

In this table, we have bucketed the ‘emp_id’ column and performing the Update operation on the same column, so we have go the error

hive> UPDATE emp set emp_name = 'Reddy' where emp_id = 6;

hive> select * from emp


hive> delete from emp where emp_id=5;



MySQL Metastore and Apache Derby

Hive stores the metadata related to tables and databases into the external RDBMS like Apache Derby or MYSQL and metadata.

Metastore and database: 

The metastore service provides the interface to the Hive.
The database stores the data definitions and mappings to the data.
The metastore (which consists of services and database) can be configured in different ways. Embedded Apache Derby is used as the default Hive metastore in the Hive configuration. This configuration is called embedded metastore and is good for the sake of development and unit testing, but won’t scale to a production environment as only a single user can connect to the derby database at any instant of time. Starting second instance of the Hive driver will throw an error message.

Apache Derby:

Apache Derby, an Apache DB subproject, is an open source relational database implemented entirely in Java. Some key features include:

Derby is based on the Java, JDBC, and SQL standards.
Derby provides an embedded JDBC driver that lets you embed Derby in any Java-based solution.
Derby also supports the more familiar client/server mode with the Derby Network Client JDBC driver and Derby Network Server.
Derby is easy to install, deploy, and use.
Most importantly Derby is single instance database, which means only one user can access the derby instance at one time and this had been a motivational factor to include Mysql as the default metastore.

Advantages Of using Mysql as a metastore in Hive-

It is Stable
It keeps a track of metadata.

It can support multiple instances of Hive.

In order to change the default metastore from Derby to Mysql we need to change the property in Hive-site.xml.


Since Hive-0.10, we get only hive-default.xml. We need to explicitly create Hive-site.xml to override the default property containing the configuration of Apache Derby.

Bucketing in Hive


 Partition helps in increasing the efficiency when performing a query on a table. Instead of scanning the whole table, it will only scan for the partitioned set and does not scan or operate on the unpartitioned sets, which helps us to provide results in lesser time and the details will be displayed very quickly because of Hive Partition.

Now, let’s assume a condition that there is a huge dataset. At times, even after partitioning on a particular field or fields, the partitioned file size doesn’t match with the actual expectation and remains huge and we want to manage the partition results into different parts. To overcome this problem of partitioning, Hive provides Bucketing concept, which allows user to divide table data sets into more manageable parts.

Thus, Bucketing helps user to maintain parts that are more manageable and user can set the size of the manageable parts or Buckets too.

Bucketing Features in Hive:

Hive partition divides table into number of partitions and these partitions can be further subdivided into more manageable parts known as Buckets or Clusters. The Bucketing concept is based on Hash function, which depends on the type of the bucketing column. Records which are bucketed by the same column will always be saved in the same bucket.

Here, CLUSTERED BY clause is used to divide the table into buckets.

In Hive Partition, each partition will be created as directory. But in Hive Buckets, each bucket will be created as file.

Bucketing can also be done even without partitioning on Hive tables.

Advantages of Bucketing:

Bucketed tables allows much more efficient sampling than the non-bucketed tables. With sampling, we can try out queries on a section of data for testing and debugging purpose when the original data sets are very huge. Here, the user can fix the size of buckets according to the need.

Bucketing concept also provides the flexibility to keep the records in each bucket to be sorted by one or more columns. Since the data files are equal sized parts, map-side joins will be faster on the bucketed tables.

hive> create input_table (street string, city string, Zip int, state string, sq_feet int, price int);
hive> load data inpath '/real_state.csv' into table input_table;

To populate the bucketed table, we have to set hive.enforce.bucketing property to ‘true’, so that the Hive knows to create the number of buckets declared in the table definition.

hive> set hive.exec.dynamic.partition=true;
hive> set hive.exec.dynamic.partition.mode=nonstrict;
hive> set hive.exec.max.dynamic.partitions=200000;
hive> set hive.exec.max.dynamic.partitions.pernode=200000;
hive> set hive.enforce.bucketing=true;

The property hive.enforce.bucketing = true is similar to hive.exec.dynamic.partition = true, in Hive partitioning. By setting this property, we will enable dynamic bucketing while loading data into the Hive table.

The above hive.enforce.bucketing = true property sets the number of reduce tasks to be equal to the number of buckets mentioned in the table definition (Which is ‘4’ in our case) and automatically selects the clustered by column from table definition.

hive> create bucket_table (street string, city string, Zip int, state string, sq_feet int, price int) partitioned by (city string) clustered by (street) into 4 buckets row format delimited fields terminated by ',';

we can see that we have created a new bucket table with name ‘bucket_table’, which is partitioned by ‘city’ and clustered by ‘street’ field with the bucket size of ‘4’.

After successfully inserting the contents of input_table to bucket_table, we can see the bucketed output result in the browser and can also download the required city partitioned bucketed files in our local file system.



Hive Partitioning:

partitioning:

Hive has been one of the preferred tool for performing queries on large datasets, especially when full table scan is done on the datasets.

In the case of tables which are not partitioned, all the files in a table’s data directory is read and then filters are applied on it as a subsequent phase. This becomes a slow and expensive affair especially in cases of large tables.

Without partitioning Hive reads all the data in the directory and applies the query filters on it. This is slow and expensive since all data has to be read.

Very often users need to filter the data on specific column values. To apply the partitioning in hive , users need to understand the domain of the data on which they are doing analysis.

With this knowledge, identification of the frequently queried or accessesd columns becomes easy and then partitioning feature of Hive can be applied on the selected columns.

Owing the fact that Partitions are horizontal slices of data,larger sets of data can be separated into more manageable chunks.

The problem with hive is that when we apply where clause then even a simple query in Hive reads the entire dataset.

This decreases the efficiency and becomes a bottleneck when we are required to run the queries on large tables.

This issue can be overcome by implementing partitions in hive.

When to use Hive Partitioning:

When any user wants data contained within a table to be split across multiple sections in hive table, use of partition is suggested.

The entries for the various columns of dataset are segregated and stored in their respective partition. When we write the query to fetch the values from table , only the required partitions of the table are queried, which reduces the time taken by query to yield the result.

Let’s take a scenario:

Data is present in hdfs coming in from various ecommerce companies.
We need to run the HiveQl queries on user buying pattern.
We need to analyse data coming in from last 10 days.
In the above scenario instead of running the queries which involves scanning of entire table, an approach should be followed where query runs on only last 10 days of data.

hive> create table user_partition(fname varchar(50), lname varchar(50), id varchar(50)) partitioned by (country varchar(50), state varchar(50)) row format delimited fields terminated by ',' stored as textfile;

Classification of partitioning:

Static partitioning
Dynamic partitioning

When to use static partitioning:

Static partitioning needs to be applied when we know data(supposed to be inserted) belongs to which partition.

hive> load data local inpath <path:file1> into table user_partition partition (country='us', state='ca');

hive> load data local inpath <path:file2> into table user_partition partition (country='ind', state='ap');

Retrieving the user information:

If anyone wants to retrieve the user information belonging to partition with country  us and state ca, query needs to be written as shown below.

hive> select * from user_partition where user_partition.country='us' and user_partition.state='ca';
hive> select * from user_partition where user_partition.country='ind' and user_partition.state='ap';

When to use dynamic partitioning:

In static partitioning every partitioning needs to be backed with individual hive statement which is not feasible for large number of partitions as it will require writing of lot of hive statements.

In that scenario dynamic partitioning is suggested as we can create as many number of partitions with single hive statement.

Dynamic partitioning:

We need to create the partitioned table user_partition

hive> create table user_partition(fname varchar(50), lname varchar(50), id varchar(50)) partitioned by (country varchar(50), state varchar(50)) row format delimited fields terminated by ',' stored as textfile;

In next phase user1 table needs to be created with all the columns including country and state.

hive> create table user_partition1(fname varchar(50), lname varchar(50), id varchar(50), country string, state string) row format delimited fields terminated by ',' stored as textfile;

venkat, reddy, 100, ind, ap
raju, reddy, 101, ind, ap
kiran, rao, 102, us, ca
siva, r, 103, us, ca

The above dataset needs to be copied into table user_partition1.

hive> load data local inpath <path:file1> into table user_partition1;

Setting of parameters for dynamic partitioning:

To use the dynamic partitioning in hive we need to set the below parameters in hive shell or in hive-site.xml file.

hive> set hive.exec.dynamic.partition=true;
hive> set hive.exec.dynamic.partition.mode=nonstrict;

Retrieving data from partitioned table:

We need to copy the file from user_partition1 to partitioned table user_partition and then retriving the data from it all together using insert and select statement in one hive statement.

hive> insert into table user_partition (country,state) select fname, lname, id, country, state from user_partition1;   

Type of tables in Hive

Hive has two types of tables:

Managed table
External table

Managed table:
Managed table is also called as Internal table. This is the default table in Hive. When we create a table in Hive without specifying it as a managed or external, by default we will get a Managed table.

If we create a table as a managed table, the table will be created in a specific location in HDFS.
By default the table data will be created in /usr/hive/warehouse directory of HDFS.
If we delete a Managed table, both the table data and meta data for that table will be deleted from the HDFS.

hive>create table employee(ename String, esal Int) row format delimited fields terminated by ',';

hive>load data local inpath <path> into table employee;

Check the contents of the table in HDFS by using the below command:

>hadoop dfs -ls hdfs://localhost:9000/user/hive/warehouse/employee

EXTERNAL TABLE:

External table is created for external use as when the data is used outside Hive. Whenever we want to delete the table’s metadata and want to keep the table’s data as it is, we use External table. External table only deletes the schema of the table.

hive> create external table employee_ext(ename String, esal Int) row format delimited fields terminated by ',';

hive>load data local inpath <hive> into table employee_ext;

check the HDFS location of the table using the below command:

>hadoop dfs -ls hdfs://localhost:9000/user/hive/warehouse/employee_ext

When to use External and Managed table

Managed table

Data is temporary

Hive to Manage the table data completely not allowing any external source to use the table

Don’t want data after deletion

External table

The data is also used outside of Hive. For example, the data files are read and processed by an existing program that doesn’t lock the files

Hive should not own data and control settings, dirs, etc., you have another program or process that will do those things

You are not creating table based on existing table (AS SELECT)


Can create table back and with the same schema and point the location of the data