Thursday, February 25, 2016

Read / Write Operations In HDFS

Read Operation: 

Data read request is served by HDFS, NameNode and DataNode. Let's call reader as a 'client'. Below diagram depicts file read operation in Hadoop.


1. Client initiates read request by calling 'open()' method of FileSystem object; it is an object of type DistributedFileSystem.

2. This object connects to namenode using RPC and gets metadata information such as the locations of the blocks of the file. Please note that these addresses are of first few block of file.
3. In response to this metadata request, addresses of the DataNodes having copy of that block, is returned back.
4. Once addresses of DataNodes are received, an object of type FSDataInputStream is returned to the client. FSDataInputStream contains DFSInputStream which takes care of interactions with DataNode and NameNode. In step 4 shown in above diagram, client invokes 'read()' method which causes DFSInputStream to establish a connection with the first DataNode with the first block of file.
5. Data is read in the form of streams wherein client invokes 'read()' method repeatedly. This process of read() operation continues till it reaches end of block.
6. Once end of block is reached, DFSInputStream closes the connection and moves on to locate the next DataNode for the next block
7. Once client has done with the reading, it calls close() method.


Write Operation :

In this section, we will understand how data is written into HDFS through files.




1. Client initiates write operation by calling 'create()' method of DistributedFileSystem object which creates a new file - Step no. 1 in above diagram.
2. DistributedFileSystem object connects to the NameNode using RPC call and initiates new file creation. However, this file create operation does not associate any blocks with the file. It is the responsibility of NameNode to verify that the file (which is being created) does not exist already and client has correct permissions to create new file. If file already exists or client does not have sufficient permission to create a new file, then IOException is thrown to client. Otherwise, operation succeeds and a new record for the file is created by the NameNode.
3. Once new record in NameNode is created, an object of type FSDataOutputStream is returned to the client. Client uses it to write data into the HDFS. Data write method is invoked (step 3 in diagram).
4. FSDataOutputStream contains DFSOutputStream object which looks after communication with DataNodes and NameNode. While client continues writing data, DFSOutputStream continues creating packets with this data. These packets are en-queued into a queue which is called as DataQueue.
5. There is one more component called DataStreamer which consumes this DataQueue. DataStreamer also asks NameNode for allocation of new blocks thereby picking desirable DataNodes to be used for replication.
6. Now, the process of replication starts by creating a pipeline using DataNodes. In our case, we have chosen replication level of 3 and hence there are 3 DataNodes in the pipeline.
7. The DataStreamer pours packets into the first DataNode in the pipeline.
8. Every DataNode in a pipeline stores packet received by it and forwards the same to the second DataNode in pipeline.
9. Another queue, 'Ack Queue' is maintained by DFSOutputStream to store packets which are waiting for acknowledgement from DataNodes.
10. Once acknowledgement for a packet in queue is received from all DataNodes in the pipeline, it is removed from the 'Ack Queue'. In the event of any DataNode failure, packets from this queue are used to reinitiate the operation.
11. After client is done with the writing data, it calls close() method (Step 9 in the diagram) Call to close(), results into flushing remaining data packets to the pipeline followed by waiting for acknowledgement.
12. Once final acknowledgement is received, NameNode is contacted to tell it that the file write operation is complete.

HDFS Shell commands

help: Get help for HDFS shell command

./hadoop dfs -help /<<HDFS Shell command>>
./hadoop dfs -help cat


mkdir: Create a directory on HDFS:

./hadoop dfs -mkdir /<<Folder name>>
./hadoop dfs -mkdir /Data


ls: List your directories/Files on HDFS:

./hadoop dfs -ls /<<HDFS Path>>
./hadoop dfs -ls /Data


copyFromLocal: Copy data from Local FS to HDFS

./hadoop dfs -copyFromLocal /<<Local Path>> /<<HDFS Path>>
./hadoop dfs -copyFromLocal /home/username/Sample.txt /Data


put: Copy file from local filesystem to hdfs

./hadoop dfs -put localfile /dir/file1
– Can also use copyFromLocal


copyToLocal: Copy data from HDFS to Local FS

./hadoop dfs -copyToLocal /<<HDFS Path>> /<<Local Path>>
./hadoop dfs -copyToLocal /Data/Sample.txt /home/username


get: Copy file to the local filesystem

./hadoop dfs -get /dir/file localfile
– Can also use copyToLocal


cp: Copy data from HDFS to HDFS

./hadoop dfs -cp /<<HDFS Path>> /<<HDFS Path>>
./hadoop dfs -cp /Data/Sample.txt /Data1


mv: Move data from one folder to another in HDFS

./hadoop dfs -mv /<<HDFS Path>> /<<HDFS Path>>
./hadoop dfs -mv /Data/Sample.txt /Data1


cat: View content of a file on HDFS

./hadoop dfs -cat /<<HDFS Path>>
./hadoop dfs -cat /Data/Sample.txt


rm: Delete file from HDFS

./hadoop dfs -rm /<<HDFS Path>>
./hadoop dfs -rm /Data/Sample.txt


du: Displays length for each file/dir

./hadoop dfs -du /someDir/


getmerge: Merge file in HDFS

./hadoop dfs -getmerge <<HDFS Path>> <<Local Path>>
./hadoop dfs -getmerge /Data/Sample.txt /home/user/test.txt 

Thursday, February 18, 2016

HBase Architecture

HBase is composed of three types of servers in a master slave type of architecture. Region servers serve data for reads and writes. When accessing data, clients communicate with HBase RegionServers directly. Region assignment, DDL (create, delete tables) operations are handled by the HBase Master process. Zookeeper, which is part of HDFS, maintains a live cluster state.
The Hadoop DataNode stores the data that the Region Server is managing. All HBase data is stored in HDFS files. Region Servers are collocated with the HDFS DataNodes, which enable data locality (putting the data close to where it is needed) for the data served by the RegionServers. HBase data is local when it is written, but when a region is moved, it is not local until compaction.
The NameNode maintains metadata information for all the physical data blocks that comprise the files.

Regions
HBase Tables are divided horizontally by row key range into “Regions.” A region contains all rows in the table between the region’s start key and end key. Regions are assigned to the nodes in the cluster, called “Region Servers,” and these serve data for reads and writes. A region server can serve about 1,000 regions.

HMaster
Region assignment, DDL (create, delete tables) operations are handled by the HBase Master.
A master is responsible for:
·         Coordinating the region servers
- Assigning regions on startup , re-assigning regions for recovery or load balancing
- Monitoring all RegionServer instances in the cluster (listens for notifications from zookeeper)
·         Admin functions
- Interface for creating, deleting, updating tables

ZooKeeper: The Coordinator
HBase uses ZooKeeper as a distributed coordination service to maintain server state in the cluster. Zookeeper maintains which servers are alive and available, and provides server failure notification. Zookeeper uses consensus to guarantee common shared state. Note that there should be three or five machines for consensus.

How the Components Work Together
Zookeeper is used to coordinate shared state information for members of distributed systems. Region servers and the active HMaster connect with a session to ZooKeeper. The ZooKeeper maintains ephemeral nodes for active sessions via heartbeats.

Each Region Server creates an ephemeral node. The HMaster monitors these nodes to discover available region servers, and it also monitors these nodes for server failures. HMasters vie to create an ephemeral node. Zookeeper determines the first one and uses it to make sure that only one master is active. The active HMaster sends heartbeats to Zookeeper, and the inactive HMaster listens for notifications of the active HMaster failure.
If a region server or the active HMaster fails to send a heartbeat, the session is expired and the corresponding ephemeral node is deleted. Listeners for updates will be notified of the deleted nodes. The active HMaster listens for region servers, and will recover region servers on failure. The Inactive HMaster listens for active HMaster failure, and if an active HMaster fails, the inactive HMaster becomes active.
HBase First Read or Write
There is a special HBase Catalog table called the META table, which holds the location of the regions in the cluster. ZooKeeper stores the location of the META table.
This is what happens the first time a client reads or writes to HBase:
1.    The client gets the Region server that hosts the META table from ZooKeeper.
2.    The client will query the .META. server to get the region server corresponding to the row key it wants to access. The client caches this information along with the META table location.
3.    It will get the Row from the corresponding Region Server.
For future reads, the client uses the cache to retrieve the META location and previously read row keys. Over time, it does not need to query the META table, unless there is a miss because a region has moved; then it will re-query and update the cache.

HBase Meta Table
·         This META table is an HBase table that keeps a list of all regions in the system.
·         The .META. table is like a b tree.
·         The .META. table structure is as follows:
- Key: region start key,region id
- Values: RegionServer

Region Server Components
A Region Server runs on an HDFS data node and has the following components:
·         WAL: Write Ahead Log is a file on the distributed file system. The WAL is used to store new data that hasn't yet been persisted to permanent storage; it is used for recovery in the case of failure.
·         BlockCache: is the read cache. It stores frequently read data in memory. Least Recently Used data is evicted when full.
·         MemStore: is the write cache. It stores new data which has not yet been written to disk. It is sorted before writing to disk. There is one MemStore per column family per region.
·         Hfiles store the rows as sorted KeyValues on disk.

HBase Write Steps (1)
When the client issues a Put request, the first step is to write the data to the write-ahead log, the WAL:
- Edits are appended to the end of the WAL file that is stored on disk.
- The WAL is used to recover not-yet-persisted data in case a server crashes.

HBase Write Steps (2)
Once the data is written to the WAL, it is placed in the MemStore. Then, the put request acknowledgement returns to the client.

HBase MemStore
The MemStore stores updates in memory as sorted KeyValues, the same as it would be stored in an HFile. There is one MemStore per column family. The updates are sorted per column family.

HBase Region Flush
When the MemStore accumulates enough data, the entire sorted set is written to a new HFile in HDFS. HBase uses multiple HFiles per column family, which contain the actual cells, or KeyValue instances. These files are created over time as KeyValue edits sorted in the MemStores are flushed as files to disk.
Note that this is one reason why there is a limit to the number of column families in HBase. There is one MemStore per CF; when one is full, they all flush. It also saves the last written sequence number so the system knows what was persisted so far.
The highest sequence number is stored as a meta field in each HFile, to reflect where persisting has ended and where to continue. On region startup, the sequence number is read, and the highest is used as the sequence number for new edits.

HBase HFile
Data is stored in an HFile which contains sorted key/values. When the MemStore accumulates enough data, the entire sorted KeyValue set is written to a new HFile in HDFS. This is a sequential write. It is very fast, as it avoids moving the disk drive head.

HBase HFile Structure
An HFile contains a multi-layered index which allows HBase to seek to the data without having to read the whole file. The multi-level index is like a b+tree:
·      Key value pairs are stored in increasing order
·      Indexes point by row key to the key value data in 64KB “blocks”
·      Each block has its own leaf-index
·      The last key of each block is put in the intermediate index
·      The root index points to the intermediate index
The trailer points to the meta blocks, and is written at the end of persisting the data to the file. The trailer also has information like bloom filters and time range info. Bloom filters help to skip files that do not contain a certain row key. The time range info is useful for skipping the file if it is not in the time range the read is looking for.

HFile Index
The index, which we just discussed, is loaded when the HFile is opened and kept in memory. This allows lookups to be performed with a single disk seek.

HBase Read Merge
We have seen that the KeyValue cells corresponding to one row can be in multiple places, row cells already persisted are in Hfiles, recently updated cells are in the MemStore, and recently read cells are in the Block cache. So when you read a row, how does the system get the corresponding cells to return? A Read merges Key Values from the block cache, MemStore, and HFiles in the following steps:
1.  First, the scanner looks for the Row cells in the Block cache - the read cache. Recently Read Key Values are cached here, and Least Recently Used are evicted when memory is needed.
2.  Next, the scanner looks in the MemStore, the write cache in memory containing the most recent writes.
3.  If the scanner does not find all of the row cells in the MemStore and Block Cache, then HBase will use the Block Cache indexes and bloom filters to load HFiles into memory, which may contain the target row cells.

HBase Read Merge
As discussed earlier, there may be many HFiles per MemStore, which means for a read, multiple files may have to be examined, which can affect the performance. This is called read amplification.

HBase Minor Compaction
HBase will automatically pick some smaller HFiles and rewrite them into fewer bigger Hfiles. This process is called minor compaction. Minor compaction reduces the number of storage files by rewriting smaller files into fewer but larger ones, performing a merge sort.

HBase Major Compaction
Major compaction merges and rewrites all the HFiles in a region to one HFile per column family, and in the process, drops deleted or expired cells. This improves read performance; however, since major compaction rewrites all of the files, lots of disk I/O and network traffic might occur during the process. This is called write amplification.
Major compactions can be scheduled to run automatically. Due to write amplification, major compactions are usually scheduled for weekends or evenings. Note that MapR-DB has made improvements and does not need to do compactions. A major compaction also makes any data files that were remote, due to server failure or load balancing, local to the region server.

Region = Contiguous Keys
Let’s do a quick review of regions:
·      A table can be divided horizontally into one or more regions. A region contains a contiguous, sorted range of rows between a start key and an end key
·      Each region is 1GB in size (default)
·      A region of a table is served to the client by a RegionServer
·      A region server can serve about 1,000 regions (which may belong to the same table or different tables)

Region Split
Initially there is one region per table. When a region grows too large, it splits into two child regions. Both child regions, representing one-half of the original region, are opened in parallel on the same Region server, and then the split is reported to the HMaster. For load balancing reasons, the HMaster may schedule for new regions to be moved off to other servers.

Read Load Balancing
Splitting happens initially on the same region server, but for load balancing reasons, the HMaster may schedule for new regions to be moved off to other servers. This results in the new Region server serving data from a remote HDFS node until a major compaction moves the data files to the Regions server’s local node. HBase data is local when it is written, but when a region is moved (for load balancing or recovery), it is not local until major compaction.

HDFS Data Replication
All writes and Reads are to/from the primary node. HDFS replicates the WAL and HFile blocks. HFile block replication happens automatically. HBase relies on HDFS to provide the data safety as it stores its files. When data is written in HDFS, one copy is written locally, and then it is replicated to a secondary node, and a third copy is written to a tertiary node.

HDFS Data Replication (2)
The WAL file and the Hfiles are persisted on disk and replicated, so how does HBase recover the MemStore updates not persisted to HFiles? See the next section for the answer.

HBase Crash Recovery
When a RegionServer fails, Crashed Regions are unavailable until detection and recovery steps have happened. Zookeeper will determine Node failure when it loses region server heart beats. The HMaster will then be notified that the Region Server has failed.
When the HMaster detects that a region server has crashed, the HMaster reassigns the regions from the crashed server to active Region servers. In order to recover the crashed region server’s memstore edits that were not flushed to disk. The HMaster splits the WAL belonging to the crashed region server into separate files and stores these file in the new region servers’ data nodes. Each Region Server then replays the WAL from the respective split WAL, to rebuild the memstore for that region.

Data Recovery
WAL files contain a list of edits, with one edit representing a single put or delete. Edits are written chronologically, so, for persistence, additions are appended to the end of the WAL file that is stored on disk.
What happens if there is a failure when the data is still in memory and not persisted to an HFile? The WAL is replayed. Replaying a WAL is done by reading the WAL, adding and sorting the contained edits to the current MemStore. At the end, the MemStore is flush to write changes to an HFile.

Apache HBase Architecture Benefits
HBase provides the following benefits:
·      Strong consistency model
- When a write returns, all readers will see same value
·      Scales automatically
- Regions split when data grows too large
- Uses HDFS to spread and replicate data
·      Built-in recovery
- Using Write Ahead Log (similar to journaling on file system)
·      Integrated with Hadoop
- MapReduce on HBase is straightforward
Apache HBase Has Problems Too…
·      Business continuity reliability:
- WAL replay slow
- Slow complex crash recovery
- Major Compaction I/O storms


Wednesday, February 17, 2016

HDFS Features :


HDFS is a filesystem designed for storing huge amount of data

Support very large files:

This is the major feature of HDFS. Hadoop clusters that are running today are able to store huge amount of data even Petabytes and Zetabytes. 

Commodity Hardware:

HDFS require only commodity hardware. Commodity hardware in the sense, the hardware which is available for most of the vendors. This does not need high configured hardware.

Write once - Read many times:

Once if the file is stored on top of HDFS cluster, we cannot write the data on that file. In HDFS multiple writers are not encouraged. If we want to do so, you must delete the file on HDFS, update it in local file system and then again place it in HDFS cluster.Hadoop is a Write once - Read many times mechanism.

High Latency:

Latency means, the amount of time taken to fetch the data. High latency in HDFS might be because it contains more number of nodes(1000+ for some huge amount of data). Suppose, if we consider RDBMS, when we pose the query we may get the data in 0.01 seconds, but in hadoop it may take 0.11 seconds. This High latency is one of the limitation of Hadoop.

Streaming data access:

Suppose if we want to access the data from the 1500th line, we need to process the data from line one to 1500 unnecessarily. In hadoop we cannot access the data Randomly. This is another drawback of Hadoop. To overcome this HBASE is introduced.

HDFS Replication Factor

Replication factor dictates how many copies of a block should be kept in your cluster. The replication factor is 3 by default and hence any file you create in HDFS will have a replication factor of 3 and each block from the file will be copied to 3 different nodes in your cluster.

Data Replication

HDFS is designed to reliably store very large files across machines in a large cluster. It stores each file as a sequence of blocks; all blocks in a file except the last block are the same size. The blocks of a file are replicated for fault tolerance. The block size and replication factor are configurable per file. An application can specify the number of replicas of a file. The replication factor can be specified at file creation time and can be changed later. Files in HDFS are write-once and have strictly one writer at any time.


The NameNode makes all decisions regarding replication of blocks. It periodically receives a Heartbeat and a Blockreport from each of the DataNodes in the cluster. Receipt of a Heartbeat implies that the DataNode is functioning properly. A Blockreport contains a list of all blocks on a DataNode.

Replica Placement:

For the common case, when the replication factor is three, HDFS’s placement policy is to put one replica on one node in the local rack, another on a node in a different (remote) rack, and the last on a different node in the same remote rack. This policy cuts the inter-rack write traffic which generally improves write performance. The chance of rack failure is far less than that of node failure; this policy does not impact data reliability and availability guarantees. However, it does reduce the aggregate network bandwidth used when reading data since a block is placed in only two unique racks rather than three. With this policy, the replicas of a file do not evenly distribute across the racks. One third of replicas are on one node, two thirds of replicas are on one rack, and the other third are evenly distributed across the remaining racks. This policy improves write performance without compromising data reliability or read performance.

HDFS Architecture

HDFS has a master/slave architecture. An HDFS cluster consists of a single NameNode, a master server that manages the file system namespace and regulates access to files by clients. In addition, there are a number of DataNodes, usually one per node in the cluster, which manage storage attached to the nodes that they run on. HDFS exposes a file system namespace and allows user data to be stored in files. Internally, a file is split into one or more blocks and these blocks are stored in a set of DataNodes. The NameNode executes file system namespace operations like opening, closing, and renaming files and directories. It also determines the mapping of blocks to DataNodes. The DataNodes are responsible for serving read and write requests from the file system’s clients. The DataNodes also perform block creation, deletion, and replication upon instruction from the NameNode.


NameNode Functions  : 

  • The NameNode maintains and executes the file system namespace. If there are any modifications in the file system namespace or in its properties, this is tracked by the NameNode.
  • It directs the Datanodes (Slave nodes) to execute the low-level I/O operations.
  • It keeps a record of how the files in HDFS are divided into blocks, in which nodes these blocks are stored and by and large the NameNode manages cluster configuration.
  • It maps a file name to a set of blocks and maps a block to the DataNodes where it is located.
  • It records the metadata of all the files stored in the cluster, e.g. the location, the size of the files, permissions, hierarchy, etc.
  • With the help of a transactional log, that is, the EditLog, the NameNode records each and every change that takes place to the file system metadata. For example, if a file is deleted in HDFS, the NameNode will immediately record this in the EditLog.
  • The NameNode is also responsible to take care of the replication factor of all the blocks. If there is a change in the replication factor of any of the blocks, the NameNode will record this in the EditLog.
  • NameNode regularly receives a Heartbeat and a Blockreport from all the DataNodes in the cluster to make sure that the datanodes are working properly. A Block Report contains a list of all blocks on a DataNode.
  • In case of a datanode failure, the Namenode chooses new datanodes for new replicas, balances disk usage and also manages the communication traffic to the datanodes.

DataNode Functions  : 

    Datanodes are the slave nodes in HDFS. Datanode is a block server that stores the data in the local file ext3 or ext4.
    • Datanodes perform the low-level read and write requests from the file system’s clients.
    • They are also responsible for creating blocks, deleting blocks and replicating the same based on the decisions taken by the NameNode.
    • They regularly send a report on all the blocks present in the cluster to the NameNode.
    • Datanodes also enables pipelining of data.
    • They forward data to other specified DataNodes.
    • Datanodes send heartbeats to the NameNode once every 3 seconds, to report the overall health of HDFS.
    • The DataNode stores each block of HDFS data in separate files in its local file system.
    •  When the Datanodes gets started, they scan through its local file system, creates a list of all HDFS data blocks that relate to each of these local files and send a Blockreport to the NameNode.

    Safemode  : 

    On startup, the NameNode enters a special state called Safemode. Replication of data blocks does not occur when the NameNode is in the Safemode state. The NameNode receives Heartbeat and Blockreport messages from the DataNodes. A Blockreport contains the list of data blocks that a DataNode is hosting. Each block has a specified minimum number of replicas. A block is considered safely replicated when the minimum number of replicas of that data block has checked in with the NameNode. After a configurable percentage of safely replicated data blocks checks in with the NameNode (plus an additional 30 seconds), the NameNode exits the Safemode state. It then determines the list of data blocks (if any) that still have fewer than the specified number of replicas. The NameNode then replicates these blocks to other DataNodes.

    Secondary NameNode: 

    In the HDFS Architecture, the name – Secondary NameNode gives an impression that it is a substitute of the NameNode. Alas! It is not!

    Now, at this point, we know that NameNode stores vital information related to the Metadata of all the blocks stored in HDFS. This data is not only stored in the main memory, but also in the disk.
    The two associated files are:

    Fsimage: An image of the file system on starting the NameNode.
    EditLogs: A series of modifications done to the file system after starting the NameNode.

    The Secondary NameNode is one which constantly reads all the file systems and metadata from the RAM of the NameNode and writes it into the hard disk or the file system. It is responsible for combining the editlogs with fsimage from the NameNode. It downloads the EditLogs from the NameNode at regular intervals and applies to fsimage. The new fsimage is copied back to the NameNode, which is used whenever the Namenode is started the next time.

    However, as the secondary NameNode is unable to process the metadata onto the disk, it is not a substitute to the NameNode. So if the NameNode fails, the entire Hadoop HDFS goes down and you will lose the entire RAM present in the RAM. It just performs regular checkpoints in HDFS. Just a helper, a checkpoint node!