HBase response times
HBase response times
There are several causes to latency in HBase:
- Normal network round-trip time (RTT), internal or external to the Hadoop cluster, order of ms
- Disk read and write time, order of ms
- Client retries due to moved regions of splits, order of s; HBase can move a region if it considers that the cluster is not well balanced, regions are splitted when they become too large in size (these great features are built in HBase and called auto-sharding), note that the time is not dependent on the region size since no data is actually moved, just the ownership is transferred to another region server.
- Garbage collector (GC), order of 50ms.
- Server outages, when a region server dies it takes by default 90s to detect, then the server regions are reassigned to other servers, which then have to replay logs (WAL) that could not be run. Depending on the amount of uncommited data, this can take several minutes. So in total it takes a few minutes. Note that logs are automatically replayed since recent versions of HBase (hbase.master.distributed.log.replay set to true).
- Cluster overwhelming if too many clients are trying to write data into HBase, it could be too many user queries, flushes, major compactions, etc, so that CPU, RAM or I/O can not follow. Major compactions can take minutes or even hours. One can limit the number of concurrent mappers by setting
hbase.regionserver.handler.count
. If memstores takes too much room in RAM, reduce their size by settinghbase.regionserver.global.memstore.size
. Too many flushes (for instance if you have too many column families) may result into too many HFiles (format in which HBase store data in HDFS) and an extra overload can be created for minor compaction.
Typically, latency will be of a few ms if things are in blockcache, or <20ms when disk seeks are needed. It can take 1 or 2s in cases of GC, region splits/moves. If latency goes higher, this is the result of a major compaction or more serious problem may have occured (server failure, etc).
HBase on low memory cluster
How to configure HBase on low memory cluster
Reduce the number of regions per server
Before getting into math, let’s recall briefly what the memstore is in HBase. The memstore holds in-memory modifications to the Store before it is flushed to the Store as HFile, in other words the data that are coming in are first stored in memory, and when their volume reaches a certain size (typically 128MB on recent HBase versions), the data are flushed on disk.
Now, let’s do some computation. You have 1 memstore per region. Suppose that each memstore is 128 MB. If you have 100 regions for each region server, you would have 100*128MB
, i.e. about 13 GB RAM. Keep in mind that only 40% is actually used for memstores, which means that each of the region server would at least need 13 GB/0.4
about 33 GB RAM. Of course, you might need a bit less RAM since not all of your memstores will be full at all times.
Suppose you have chosen 100 regions because you need to store 1 TB with 10 GB regions. What can you do if you just have 32 GB RAM and do not want to monitor all your nodes 24/7? (note that if you need to store 1 TB you need 3 TB disk space if you chose the default replication of 3, remind that if the data are compressed on disk they are not in memory).
- you can increase your region size, say 20GB each
hbase.hregion.max.filesize
- you can decrease you memstore size, say 64MB
hbase.hregion.memstore.flush.size
- you can increase the heap fractions used for the memstores. If you load is write-heavy, say maybe up to 50% of the heap
hbase.regionserver.global.memstore.upperLimit
hbase.regionserver.global.memstore.lowerLimit
Steal memory from other services
A typical configuration calls for 1GB RAM for a datanode, which is often not needed. You can cut datanode head down to 400 MB, giving 624MB extra to HBase. This solution will not probably safe you much.
Tune or disable MSLAB
The MSLAB feature adds 2MB of heap overhead by default for each region. You can tune this buffer down with hbase.hregion.memstore.mslab.chunksize
. The lower you go the less effective it is, but the less memory overhead as well. To disable it completely, set hbase.hregion.memstore.mslab.enabled
to false.
Reduce caching
Caching and batching are used to reduce the effect of network latency on large scans, but they also require more memory on both the client and server side.
Limit concurrent mappers
Lowering the hbase.regionserver.handler.count
helps to limit the number of active connections taking memory.
Optimizing Hive queries
Tips for efficient Hive queries
Hive on Hadoop is a great data processing tool which is easy to use given its SQL-like syntax. Some tips to optimize Hive queries are described in this article.
Typically there are 3 areas where you can optimize you Hive queries:
- data layout (partitions, buckets)
- data sampling
- data processing (map join, parallel execution)
Data layout tips
Partitioning Hive tables
The idea is to partition the data based on some dimension that is often used in queries in the where statement (ex: select * from user_table where region = 'Europe'
. Note that it is also possible to partition data based on several dimensions. Partitioning the data largely reduces the number of read data, and so reduces the number of mappers, I/O operations and time to answer the query.
To partition a table one can just use the following statement
CREATE TABLE user_table ... PARTITIONED BY (region STRING)
Be careful: do not partition if the cardinality (number of unique values) of the column is too high which would results in too many partitions, especially if there is a high risk that the column used for partitioning will not be used as filter in all queries. Concretely, there is a directory per partition and then subdirectories for subpartitions (if you use more than 1 column for partitioning), creating a huge overhead if you need to parse all these directories and files. Moreover, HDFS uses large block size of typically 64 MB or more, which means that each file, even with a few bytes of data, will have to allocate that block size on HDFS, potentially resulting in a waste of disk space.
Bucketing Hive tables
Bucketing is useful if there is a column that is frequently used for join operations. To create buckets (it is just a way to hash data and store it by hash results):
CREATE TABLE user_table ... CLUSTERED BY (country) INTO 64 BUCKETS
and to add data from another existing table
set hive.enforce.bucketing=true; INSERT OVERWRITE TABLE user_table SELECT ... FROM ...
Then to make sure you only join the relevant data
set hive.optimize.bucketmapjoin=true SELECT /*+MAPJOIN*/a.*,b.* FROM user_table a JOIN country_attributes b ON a.country = b.country
Sampling
Bucket sampling
In the exploration phase of the data, one may want to address only part of the data. Sampling may be complicated if you need to join tables because if you select data randomly from 2 tables, what results from the join statement may be empty. That is another case where the bucketing explained above is great.
SELECT a.*, b.* FROM user_table (bucket 30 out of 64 on country) a, country_attributes TABLESAMPLE(bucket 30 out of 64 on country) b WHERE a.country = b.country
Block or random sampling
If you need to sample data from 1 table in a random way, one can use the following
SELECT * FROM user_table TABLESAMPLE(1 PERCENT)
Parallel processing
The idea is to parallelize stages that are sequentially done by Hive by default. This is particularly interesting when you have such queries
SELECT a.* FROM ( SELECT ... FROM ... ) a JOIN ( SELECT ... FROM ... ) b ON ( a.country = b.country) GROUP BY a.postcode
We have the following sequential steps:
- Stage 1: a table
- Stage 2: b table
- Stage 3: join
With the option set hive.exec.parallel=true;
, stage 1 and stage 2 are done in parallel and stage 3 afterwards.
Successfully passed Coursera course
Successfully passed the course Coursera developing data products 2015
Successfully passed course
Stat’Rgy passed the Coursera course entitled ‘Getting and cleaning data’ (see Coursera getdata 2015 for statement).
Successfully passed Coursera course
Stat’Rgy passed the Coursera course entitled ‘Regression model’ (see Coursera regmods 2015 for statement).
Stat’Rgy sponsor of RBelgium for 2015
Stat’Rgy is happy to announce that it will be sponsor of RBelgium in 2015.
Successfully passed Coursera courses
Stat’Rgy counts 2 new passed Coursera courses (JOHNS HOPKINS UNIVERSITY):
– The Data Scientist’s Toolbox 2015
– Practical Machine Learning 2015
Plotly
Nice tool: plotly
Here is one plot generated from their tutorial:
Another plot I made based on Belgium population data (not sure they are still valid).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 | library(plotly) library(maps) py <- plotly(username='myname', key='mykey') s <- " Brussels wikipedia article, Brussels Capital 1,019,022 50.85 / 4.349 Antwerp wikipedia article, Flemish Region 459,805 51.22 / 4.403 Ghent wikipedia article, Flemish Region 231,493 51.05 / 3.717 Charleroi wikipedia article, Walloon 200,132 50.411 / 4.444 Liège wikipedia article, Walloon 182,597 50.634 / 5.567 Bruges wikipedia article, Flemish Region 116,709 51.209 / 3.224 Namur wikipedia article, Walloon 106,284 50.467 / 4.867 Leuven, Flemish Region 92,892 50.88 / 4.701 Mons wikipedia article, Walloon 91,277 50.454 / 3.952 Aalst, Flemish Region 77,534 50.936 / 4.035 " d<-read.delim(textConnection(s),header=FALSE,sep="\t",strip.white=TRUE) d[,1] <- unlist(lapply(d[,1],function(x){ sp <- strsplit(as.character(x),' ')[[1]][1] gg <- strsplit(sp,',')[[1]][1] } )) d[,2] <- unlist(lapply(d[,2],function(x){ sp <- as.numeric(gsub(',','',as.character(x))) })) d[,4] <- unlist(lapply(d[,3],function(x){ sp <- strsplit(as.character(x),' / ')[[1]][1] } )) d[,5] <- unlist(lapply(d[,3],function(x){ sp <- strsplit(as.character(x),' / ')[[1]][2] } )) beData <- d[,c(1,2,4,5)] colnames(beData) <- c("Cities","Population","Latitude","Longitude") #Create the hexagone map borders <- list(x=map(regions="belgium")$x, y=map(regions="belgium")$y, text='') ##Create the plotable city data cities <- list(x= beData$Longitude, y=beData$Latitude, text=paste(beData$Cities," - ",beData$Population, " inhab.",sep=""), type="scatter", mode="markers", marker=list("size"=sqrt(beData$Population/max(beData$Population))*100, "opacity"=0.5)) #Plot the two traces response = py$plotly(borders,cities) #Access your plot.ly plot browseURL(response$url) |
Stat’Rgy on Walloon IT platform