Best SQL-on-hadoop tool?

SQL on Hadoop: Drill, Impala or Spark SQL

Drill, Impala and Spark SQL all fit into the SQL-on-Hadoop category. Apache Drill and Spark are both top level Apache projects. Impala is developed by Cloudera and shipped by Cloudera, MapR, Oracle and Amazon. Spark SQL is part of the Spark project and is mainly supported by the company Databricks. Drill is mainly pushed by Hadoop distribution MapR. Drill, Impala and Spark SQL all show better performance than Hive, and aim to deal with interactive queries, whereas Hive was designed to be used in batch jobs. Since July 1st 2014, it was announced that development on Shark (also known as Hive on Spark) were ending and focus would be put on Spark SQL. For those familiar with Shark, Spark SQL gives the similar features as Shark, and more. Accoding to Databricks, Shark faced too many limitations inherent to the mapReduce paradigm and was difficult to improve and maintain. For some benchmark on Shark vs Spark SQL, please see this.

Since Drill, Impala and Spark SQL are all available in YARN and can all query Hive metastore tables, one may wonder which one to use. Let’s look at some of the features of each of those tools.

Drill

Drill entered the Apache Incubator in August 2012 and was first released to the public in August 2013. Drill presents very interesting features:

  • it can query data from multiple sources: text files, JSON files, Parquet files, Avro files or whatever other formats, Hive metastore, or other databases as MongoDB, HBase, etc.;
  • it is built to work with schema that is dynamic, as well as data that is complex;
  • it can work with data without having to define schema definitions upfront;
  • it has been optimized for interactive queries, for both performance and SLAs;
  • it is a in-memory execution engine for better performance;
  • it allows flexible memory allocation (how much memory Drill utilizes);
  • it allows security through views;
  • it provides industry-standard APIs: ANSI SQL, ODBC/JDBC, RESTFul APIs and can be accessed by most of the BI tools (e.g., Tableau, MicroStrategy, QlikView, Spotfire, Pentaho, Excel, etc);
  • it supports multiple cloud storage (Amazon S3, Google Cloud Storage, Azure Blog Storage, Swift) and allows others by developing a storage plugin;
  • it supports custom applications via the REST API and Java and C applications via the dedicated Java and C libraries;
  • it allows to reuse UDFs and UDAFs that were built in Hive with no modifications
  • it allows joins for tables of different sources (example taken from here
// Hive table 'orders', HBase view 'custview' and JSON file 'clicks.json' are joined together

select custview.membership, sum(orders.order_total) 
as sales from hive.orders, custview, dfs.`/mapr/demo.mapr.com/data/nested/clicks/clicks.json` c 
where orders.cust_id=custview.cust_id and orders.cust_id=c.user_info.cust_id 
group by custview.membership order by 2; 
  • it supports large datasets (very scalable), trying to use memory when possible and spills to disk only if the working dataset does not fit in memory.

Impala

Impala was the first to bring SQL querying to the public in April 2013.

Impala comes with a bunch of interesting features:

  • it can query many file format such as Parquet, Avro, Text, RCFile, SequenceFile
  • it supports data stored in HDFS, Apache HBase and Amazon S3
  • it supports multiple compression codecs: Snappy (Recommended for its effective balance between compression ratio and decompression speed), Gzip (Recommended when achieving the highest level of compression), Deflate (not supported for text files), Bzip2, LZO (for text files only);
  • it provides security through
    • authorization based on Sentry (OS user ID), defining which users are allowed to access which resources, and what operations are they allowed to perform
    • authentification based on Kerberos + ability to specify Active Directory username/password, how does Impala verify the identity of the users to confirm that they are allowed exercise their privileges assigned to that user
    • auditing, what operations were attempted, and did they succeed or not, allowing to track down suspicious activity; the audit data are collected by Cloudera Manager;
  • it supports SSL network encryption between Impala and client programs, and between the Impala-related daemons running on different nodes in the cluster;
    • it allows to use UDFs and UDAFs;
    • it orders the joins automatically to be the most efficient;
    • it allows admission control – prioritization and queueing of queries within impala;
    • it allows multi-user concurrent queries;
    • it caches frequently accessed data in memory;
    • it computes statistics (with COMPUTE STATS);
    • it provides window functions (aggregation OVER PARTITION, RANK, LEAD, LAG, NTILE, and so on) – to provide more advanced SQL analytic capabilities (since version 2.0);
    • it allows external joins and aggregation using disk (since version 2.0) – enables operations to spill to disk if their internal state exceeds the aggregate memory size;
    • it allows subqueries inside WHERE clauses;
    • it allows incremental statistics – only run statistics on the new or changed data for even faster statistics computations;
    • it enables queries on complex nested structures including maps, structs and arrays;
    • it enables merging (MERGE) in updates into existing tables;
    • it enables some OLAP functions (ROLLUP, CUBE, GROUPING SET);
    • it allows use of impala for inserts and updates into HBase.

Spark SQL

Spark SQL has been announced in March 2014. It officialy replaces Shark, which has limited integration with Spark programs. "Spark SQL conveniently blurs the lines between RDDs and relational tables." In addition to be part of the Spark platform allowing compatibility with the other Spark libraries (MLlib, GraphX, Spark streaming), Spark SQL shows multiple interesting features:

  • it supports multiple file formats such as Parquet, Avro, Text, JSON, ORC;
  • it supports data stored in HDFS, Apache HBase (see here, showing better performance than Phoenix) and Amazon S3;
  • it supports classical Hadoop codecs such as snappy, lzo, gzip;
  • it provides [security](https://spark.apache.org/docs/1.3.1/security.html, for more details) through;
    • authentification via the use of a "shared secret" (spark.authenticate=true on YARN, or spark.authenticate.secret on all nodes if not YARN);
    • encryption, Spark supports SSL for Akka and HTTP protocols;
    • keeping event logs;
  • it supports UDFs, e.g.
registerFunction("countMatches",
    lambda(pattern, text):
        re.subn(pattern, '', text)[1])
sql("SELECT countMatches('a', text) ...")
  • it supports concurrent queries and manages the allocation of memory to the jobs (it is possible to specify the storage of RDD like in-memory only, disk only or memory and disk;
  • it supports caching data in memory using a SchemaRDD columnar format (cacheTable(““))exposing ByteBuffer, it can also use memory-only caching exposing User object;
  • it supports nested structures (see http://www.congiu.com/creating-nested-data-parquet-in-spark-sql/ for an example);

Making a decision

Cloudera’s benchmark in September 2014 showed much better performances than the alternatives (Hive on Tez, Spark SQL, Presto), with a factor of 2x to 13x.
However, Facebook has shown recent improvements in Presto query engine competing with Impala on Parquet. Presto is used by large companies such as Airbnb, AWS, DropBox and Netflix. Facebook uses a new ORC reader for Presto, called DWRF (fork of ORC).
Cloudera Impala comes with multiple advanced features such as OLAP features and is more mature than the alternatives. A great strength of Spark SQL is its high integration with Spark, allowing to use other Spark libraries (e.g, MLlib for machines learning, etc) very easily. Drill can be used outside Hadoop or Spark which makes it convenient if data sources are coming from different environments.

Other SQL-on-Hadoop alternatives

There are other SQL-on-Hadoop alternatives out there:

  • Presto: developed by Facebook, it is open-source but not supported by third-party vendors as long as I know), it can query data from multiple sources (Hive, Cassandra, RDBMS, etc);
  • Pivotal HAWQ: developed by Pivotal, it has been recently open-sourced and is now available in the Hortonworks Data Platform (HDP), it can query data from multiple sources (Hive, HBase, etc);
  • Big SQL: developed by IBM and part of its Big Insights platform, it is closed-source code, and can query multiple data sources (Hive, HBase, etc); it is probably only useful if you use IBM tools;
  • Apache Phoenix: top-level Apache project, open-source, it can only query HBase since Phoenix is nothing else than a relational database layer over HBase, allowing low latency queries over HBase data; Phoenix shows much better performance than Hive and Impala over HBase on some benchmarks;
  • Apache Tajo: Apache top-level project, Apache Tajo is a robust big data relational and distributed data warehouse system for Apache Hadoop. Tajo is designed for low-latency and scalable ad-hoc queries, online aggregation, and ETL (extract-transform-load process) on large-data sets stored on HDFS (Hadoop Distributed File System) and other data sources;
  • Teradata SQL-H: developed by Teradata, “with Aster SQL-H™, users of the Teradata Aster Discovery Platform got the ability to issue SQL and SQL-MapReduce® queries directly on Hadoop data as if that data had been in Aster all along”.
  • Blink DB: developed by people from Universities of MIT, California (Berkeley) and Michigan. BlinkDB is a massively parallel, approximate query engine for running interactive SQL queries on large volumes of data. “It allows users to trade-off query accuracy for response time, enabling interactive queries over massive data by running queries on data samples and presenting results annotated with meaningful error bars.”

Storm vs Spark

Real time data processing: Storm vs Spark

Real time data processing has been made possible with tools like Storm or Spark streaming (both top-level project from Apache).

You may wonder which one to choose. There are several things you should consider:

  • scale
  • latency
  • iterative processing
  • use what you know
  • code reuse
  • other language compatibility
  • maturity

Batch vs Streaming

Storm and Spark are not doing the same things:

  • Storm is a stream processing framework that also does micro-batching (Trident)
  • Spark is a batch processing framework that also does micro-batching (Spark Streaming)

Stream processing means “one at a time”, whereas micro-batching means per batches, small ones, but still not one at a time. To have a fair comparison of Storm vs Spark Streaming, one should then compare with Storm Trident. Apache Storm is 2 streaming APIs:

  • the Core Storm (Spouts and Bolts)
    • one at a time,
    • lower latency,
    • operate on tuple streams
  • Trident (Streams and Operations)
    • Micro-Batch,
    • Higher Throughput,
    • Operate on streams of tuple batches and partitions

As illustration, a benchmark of Storm with 5 nodes on AWS (m1.large) with 1 Zookeeper, 1 Nimbus and 3 Supervisors gave ~150k msg/sec. and ~80ms latency with Core API and ~300k msg/sec. with ~250ms latency. A higher throughput is possible with increased latency and a better performance is possible with bigger hardware.

Storm provides lower level API than Spark, with no built-in concept of look back aggregations.

Sources of data

  • Storm can work with an incredibly large variety of sources (from the Twitter Streaming API to Apache Kafka to everything in between).
  • Spark can also work with numerous disparate sources including HDFS, Cassandra, HBase, and S3.

Other language compatibility

  • Storm is mainly written in Clojure and spouts (sources of streams in a computation, e.g. a Twitter API) and bolts (process input streams and produce output streams) can be written in almost all language, including non-JVM languages like R or Python. Note that Storm Trident is only compatible with Java, Clojure and Scala.
  • Spark is written in Scala and provide API support only for Scala, Java and Python.

How to pick one ?

Choosing between Storm or Spark will probably depend on your use case.

If your processing needs involve substantial requirements for graph processing, SQL access or batch processing, Spark would be preferred. Spark comes with a series of modules, one for streaming, one for machine learning (MLlib), one for graphs (GraphX), and one for connecting to SQL databases (SQL).

If you use iterative batch processing (machine learning, drill-down, historical comparisons, etc) you better use Spark, at least if the amount of data to be processed is under the RAM available on your cluster.

If you need to combine batch with streaming, Spark would spare you much effort compared to Storm.

If you are already working on YARN, Spark would be better fitted. Storm on YARN is still at its infant stage (see here). Note that both Storm and Spark are running on MESOS.

If you need to leverage code written in R or any other language not natively supported by Spark then Storm has some advantages.

If latency must be under 1 sec, you should consider Storm Core.

HBase or Impala over HBase ?

Why choose Impala over HBase instead of simply using HBase ?

You data are stored in HBase but you would like to use SQL requests and this is not possible as such with HBase, or you want to join data from a HBase table with data from a MySQL table. One solution is to use Impala over HBase.

How does it work ?

Impala does not read HFiles (from HBase) directly, it uses the HBase client API via Java Native Interface (JNI) to query data stored in HBase.

What do you need ?

You need to create a Hive table beside the HBase table and then establish a mapping from HBase to Hive. This step is actually done when you create the Hive table. The user also needs permission on the HBase table.

What to be careful of ?

For the mapping to function, the Hive column mapping to the row key should be a string value column. Other columns in HBase could be used as predicates (used in the WHERE statement) and should be string value as well to get efficient queries. If the row key in HBase is mapped to a non-string column in Hive, say an integer (int), the predicate on row key does not get transformed into scan parameter, which means that the whole table is passed to impala. This should be avoided.

Concurrent queries with Impala and HBase

Impala, since CDH 5, manages concurrent queries by default. A table of the number of nodes needed with respect to the number of queries and the data size is given on the cloudera website. For large data size (>10TB), the recommended number of nodes increases linearly with the number of concurrent queries. For instance, 100 concurrent queries requires 200 nodes on a 15TB data set, while 20 nodes are enough to deal with 10 concurrent queries on the same data set. The number of nodes also increases linearly with data size. HBase is largely more suitable for concurrent tasks. One can manage it by setting up the variable hbase.regionserver.handler.count.

Great features of Impala

Impala does not only allow you to query data using some SQL-type syntax but also allows you to query data in different structures. For instance, one can do joins on HBase and non-HBase tables. Impala makes a great use of RAM and is therefore often much faster than Hive based on the mapreduce paradigm (more CPU expensive than RAM expensive).

Inconvenients with Impala

Impala requires a refresh of the Hive tables when new data are updated/inserted.

About Phoenix

Apache Phoenix is a relational database layer over HBase delivered as a client-embedded JDBC driver targeting low latency queries over HBase data. Apache takes the SQL query, compiles it into a series of HBase scans, and orchestrates the running of those scans to produce regular JDBC result sets.” They announce milliseconds for small queries and seconds for 10s of millions rows. Phoenix is not supported yet by Cloudera but can be installed on CDH 4 or 5. Note that Apache Phoenix has joined Cloudera Labs, such that Phoenix could be soon integrated by default in next CDH releases.

HBase, Parquet or Avro ?

How to choose between HBase, Parquet and Avro ?

First, if you need to update your data, go with HBase. If part of the data should be updated and the other part not, then you may think of a hybrid solution. With HBase you can keep old version of a “cell”. Moreover, you can use time-to-live (TTL) feature to delete old data automatically.

If you need to scan many rows, HBase is not really suitable (e.g, if you do a “COUNT DISTINCT”). In that case AVRO will be faster and Parquet the fastest especially if you restrict your queries on some columns. Let’s take an example, you have logs of metrics coming in to feed HBase every second. Suppose you are interested in analytics based on minutes. HBase should do it because you restrict your analysis to a limited amount of data. Now, you want to make some analysis on a daily basis or worse on a monthly basis. Then HBase will not be suitable anymore.But you need those analysis. OK you can use Parquet. Great! But not real time and you need real time queries. Then you need to aggregate your data. You may do it at the frequency(ies) you define in your KPIs or you can partition your data and pre-aggregate them with Flume as they are coming in before loading them aggregated into HBase.

Avro is a row-based storage format for Hadoop.
Parquet is a column-based storage format for Hadoop.
If your use case typically scans or retrieves all of the fields in a row in each query, Avro is usually the best choice.
If your dataset has many columns, and your use case typically involves working with a subset of those columns rather than entire records, Parquet is optimized for that kind of work.

Scala vs Python

Scala vs python

If you are wondering whether you’d better learn Scala or Python… or both, you might want to read this.

Scala is a statically typed language, which means that the type of the variable is known at compile time (the programmer must specify what type each variable is). Python on the contrary is dynamically typed, which means that type is inferred by Python. So, defining a variable is a bit quicker in Python than in Scala.
If you really need to specify some domain for your variables (in Math for instance), you should probably go for a static typing because type checking, although possible in Python, would make it more verbose and slower.

Scala is both functional and Object Oriented (OO), and Python, often thought as a procedural and OO language, is well equipped for functional programming. Python was not designed for functional programming but it has some tools like lambda, map(), reduce(), filter() that are acting so.

Scala can also use java libraries as well as many other JVM languages like Clojure, Groovy, etc. Scala will allow you to learn much more things than Python, but Python has a lot of great libraries maintained by a great community. You can almost do anything in both Python and Scala, but you probably will take more time with scala than with python even if scala is less verbose than Java.

Scala is very fast, about only 2 to 3 times slower than C, whereas Python would be about 50 times slower. Note that these numbers are very general and actually depend on what you actually do.

Scala lacks the same amount of data science libraries and tools as Python.
It is always possible to run R code from python which interfaces with many languages. Scala did not get that far. For instance for exploration purposes, R libraries might quickly help to visualize metrics/dimensions. For production purposes with needs of concurrent access and computing efficiency, Scala would be the way to go. Keep in mind that if Python has better libraries for Machine learning (ML) and Natural Language Processing (NLP) it is not designed for big data, whereas Scala is big data oriented. Spark MLLib has fewer algorithms but they are perfect for big data.

Scala is designed for distributed systems: Apache Spark and Storm have been written in scala. Performance with Scala is usually better than with traditional languages like Python and R. Scala integrates well with the big data eco-system, which is mostly JVM based. There are frameworks on top of java libraries like Scalding (Cascading), Summingbird (Scalding and Storm), Scrunch (Crunch), Flink (Java core with Scala API), ones built from scratch but interface with JVM systems, like Spark and Kafka. The Scala API are usually more flexible than Hadoop streaming with Python, PySpark or Python bolts in Storm, since you have direct access to the underlying API. There are also a wide range of data storage solutions that are built for or work well with JVM like Cassandra or HBase. Another great benefit is the functional paradigm that fits well within the mapreduce and bigdata model.

What about web frameworks? Scala has Play and Python has Django. Django is an older project and it is used in companies like Instagram and Pinterest. LinkedIn has migrated part of its stack to Play framework. Twitter also moved from rails to scala for its technical stack. Hue, the web user interface for Hadoop and developped by Cloudera, is based on Django. Play for its computing efficiency is often prefered in enterprise. If you want to get some more insight about Django vs Play, go here.

Want to know more about Python vs Scala, go here.

HBase response times

HBase response times

There are several causes to latency in HBase:

  • Normal network round-trip time (RTT), internal or external to the Hadoop cluster, order of ms
  • Disk read and write time, order of ms
  • Client retries due to moved regions of splits, order of s; HBase can move a region if it considers that the cluster is not well balanced, regions are splitted when they become too large in size (these great features are built in HBase and called auto-sharding), note that the time is not dependent on the region size since no data is actually moved, just the ownership is transferred to another region server.
  • Garbage collector (GC), order of 50ms.
  • Server outages, when a region server dies it takes by default 90s to detect, then the server regions are reassigned to other servers, which then have to replay logs (WAL) that could not be run. Depending on the amount of uncommited data, this can take several minutes. So in total it takes a few minutes. Note that logs are automatically replayed since recent versions of HBase (hbase.master.distributed.log.replay set to true).
  • Cluster overwhelming if too many clients are trying to write data into HBase, it could be too many user queries, flushes, major compactions, etc, so that CPU, RAM or I/O can not follow. Major compactions can take minutes or even hours. One can limit the number of concurrent mappers by setting hbase.regionserver.handler.count. If memstores takes too much room in RAM, reduce their size by setting hbase.regionserver.global.memstore.size. Too many flushes (for instance if you have too many column families) may result into too many HFiles (format in which HBase store data in HDFS) and an extra overload can be created for minor compaction.

Typically, latency will be of a few ms if things are in blockcache, or <20ms when disk seeks are needed. It can take 1 or 2s in cases of GC, region splits/moves. If latency goes higher, this is the result of a major compaction or more serious problem may have occured (server failure, etc).

HBase on low memory cluster

How to configure HBase on low memory cluster

Reduce the number of regions per server

Before getting into math, let’s recall briefly what the memstore is in HBase. The memstore holds in-memory modifications to the Store before it is flushed to the Store as HFile, in other words the data that are coming in are first stored in memory, and when their volume reaches a certain size (typically 128MB on recent HBase versions), the data are flushed on disk.

Now, let’s do some computation. You have 1 memstore per region. Suppose that each memstore is 128 MB. If you have 100 regions for each region server, you would have 100*128MB, i.e. about 13 GB RAM. Keep in mind that only 40% is actually used for memstores, which means that each of the region server would at least need 13 GB/0.4 about 33 GB RAM. Of course, you might need a bit less RAM since not all of your memstores will be full at all times.

Suppose you have chosen 100 regions because you need to store 1 TB with 10 GB regions. What can you do if you just have 32 GB RAM and do not want to monitor all your nodes 24/7? (note that if you need to store 1 TB you need 3 TB disk space if you chose the default replication of 3, remind that if the data are compressed on disk they are not in memory).

  • you can increase your region size, say 20GB each hbase.hregion.max.filesize
  • you can decrease you memstore size, say 64MB hbase.hregion.memstore.flush.size
  • you can increase the heap fractions used for the memstores. If you load is write-heavy, say maybe up to 50% of the heap hbase.regionserver.global.memstore.upperLimit hbase.regionserver.global.memstore.lowerLimit

Steal memory from other services

A typical configuration calls for 1GB RAM for a datanode, which is often not needed. You can cut datanode head down to 400 MB, giving 624MB extra to HBase. This solution will not probably safe you much.

Tune or disable MSLAB

The MSLAB feature adds 2MB of heap overhead by default for each region. You can tune this buffer down with hbase.hregion.memstore.mslab.chunksize. The lower you go the less effective it is, but the less memory overhead as well. To disable it completely, set hbase.hregion.memstore.mslab.enabled to false.

Reduce caching

Caching and batching are used to reduce the effect of network latency on large scans, but they also require more memory on both the client and server side.

Limit concurrent mappers

Lowering the hbase.regionserver.handler.count helps to limit the number of active connections taking memory.