Shell Scripts to Check Data Integrity in Hive

Apache Hive does not come with an out-of-the-box way to check tables for duplicate entries or a ready-made method to inspect column contents, such as for instance R’s summary function. In this post I shall show a shell scripts replete with functions to do exactly that: count duplicates and show basic column statistics. These two functions are ideal when you want to perform a quick sanity check on the data stored in or accessible with Hive.

Apache Hive is a SQL-like interface to HDFS. It is often used as a relational data store, not unlike a traditional data warehouse, and as an ETL engine. Until recently Hive did not support row-level ACID; even now transactions and referential integrity are still not its strong suit. After all, Hive is not designed for online transaction processing.

HiveQL queries that count the number of duplicates are simple thanks to the ROW_NUMBER() window function. What is tedious is the fact that you have to provide a full list of columns over which you want to create windows for the function to be applied to, that is, the columns in the PARTITION BY clause. The outline of this query stays the same, which means it’s an excellent candidate for a shell script. The best documentation out of all APIs is for the CLI, which is why I have chosen to stick with using SQL via the command line, especially the beeline shell.

Enough foreplay, show me the code!

Since we want to reuse what’s already available on GitHub, our shell script with Hive functions needs to source a couple of files for us:

# Source Hadoop aliases and auxiliary Bash functions
source ../hadoop/aliases.sh
source ../functions.sh

The first line reads in the hadoop/aliases.sh file, which contains Hadoop-specific aliases and variables we need, for example HIVE_SERVER and HIVE_PORT. The ../functions.sh file contains generic functions we need in the code below, mainly colEcho. You can replace colEcho with a plain echo but with colEcho I believe the output to be more legible.

Before we continue let’s create a helper function that executes HiveQL statements with beeline:

function __exec_hive() {
  [ $# -eq 0 ] && echo "$FUNCNAME: at least one argument is required" && return 1

  local stmt="$1"
  local opts="${2:-}"

  beeline \
    --fastConnect=true \
    --silent=true \
    "$opts" \
    -u 'jdbc:hive2://$HIVE_SERVER:$HIVE_PORT' \
    -e "$stmt"
}

This function also accepts beeline options, which we shall use in __hive_columns. If you have no need of these options you can also revert to the beehive alias.

Duplicates

The function to calculate the number of duplicates for a given list of columns is as follows:

function countHiveDuplicates() {
  [ $# -eq 0 ] && echo "$FUNCNAME: at least one argument is required" && return 1

  local table="${1,,}"
  local cols=""

  # Use provided list of columns or generate full list with auxiliary function
  if [ $# -gt 1 ]; then
    oldIFS=$IFS
    IFS=","
    cols="${*:2}"
    IFS=$oldIFS
  else
    cols="$(__hive_columns $table)"
  fi

  local sql="SELECT COUNT(*) AS n FROM
  (
    SELECT
      $cols
    , ROW_NUMBER() OVER (PARTITION BY $cols ORDER BY NULL) AS rn
    FROM
      $table
  ) t
  WHERE rn > 1"

  __exec_hive "$sql"
}

The skeleton HiveQL query near the bottom is quite clear: the subquery calculates the row numbers within each partition defined by $cols, our list of columns. The main query computes the number of rows with a number larger than one, that is, the duplicate entries. The if-then-else block deals with potentially absent columns and refers to the following auxiliary function:

function __hive_columns() {
  [ $# -eq 0 ] && echo "$FUNCNAME: at least one argument is required" && return 1

  local table="${1,,}"
  local exclude="${2:-}"
  local tmpFile=desc.$$$(date +%s%N)

  # Generate list of columns and store in temporary file
  __exec_hive "SHOW COLUMNS IN $table" "--outputformat=csv2" > "$tmpFile"

  # Remove first line if it contains fields (header)
  sed -i '1{/^field$/d}' "$tmpFile"

  # Exclude columns that match the pattern
  if [ "$exclude" != "" ]; then
    sed -i "/$exclude/d" "$tmpFile"
  fi

  # Replace newlines with commas, remove additional spaces, and remove final comma
  local cols="$(tr '\n' ',' < "$tmpFile")"
  cols="${cols//[[:space:]]/}"
  cols="${cols::-1}"

  rm "$tmpFile"

  echo "$cols"
}

It generates a comma-separated list of all columns for a particular table. If a second argument is provided, that will be used to filter columns: any columns that match the pattern will be not be included in the list. This is not really useful right now, but it will be convenient when we talk about the function analyseHiveTable.

Note that spaces in column names will not be handled correctly. This is a limitation that I can live with because spaces in table or column identifiers are a bad idea anyway.

With these two functions you can now count full-row duplicates or duplicates of specific column values:

countHiveDuplicates schema.table
countHiveDuplicates schema.table col1 col2 col3
countHiveDuplicates "schema.table" "col1, col2, col3" # same as previous line

Basic Statistics

Checking for potential duplicates is a cheap way to verify whether data transfer is working properly. What it does not tell you is how many distinct entries, how many times blank or NULL graces a column value, and what the minimum and maximum values are. That is done with the help of analyseHiveTable:

function analyseHiveTable() {
  [ $# -eq 0 ] && echo "$FUNCNAME: at least one argument is required" && return 1

  # Disable globbing of * (for queries)
  set -f

  local table="$1"
  local limitRows="${2:-}"
  local exclude="${3:-}"
  local limitClause=""

  local sqlStmt="SELECT metric_name, metric_value FROM ( SELECT MAP("

  if [ "$limitRows" != "" ]; then
    # Bash cannot handle floating-point comparisons, hence bc
    limitComparison=$(echo "$limitRows < 1 && $limitRows > 0" | bc)

    # Deal with fraction: if < 1 and > 0, count rows, and limit to % of that
    if [ "$limitComparison" = "1" ]; then
      colEcho "$FUNCNAME: computing the number of rows..."
      limRows=$(__exec_hive "SELECT FLOOR($limitRows*COUNT(*)) FROM $table")
      limitRows=$(echo $limRows | cut -d'|' -f4)
    fi

    # Check that given or computed row limit > 1, otherwise do not use LIMIT clause
    limitComparison=$(echo "$limitRows > 1" | bc)
    if [ "$limitComparison" = "1" ]; then
      limitClause=" LIMIT $limitRows"
    fi
  fi

  local cols="$(__hive_columns $table $exclude)"

  # Replace each column with a MIN, MAX, COUNT-DISTINCT, and COUNT-NULL inside MAP (needed for unpivoting)
  pattern=' *\([[:alnum:]\_]*\) *,*'
  replacement="\"min_\1\",CAST(MIN(t.\1) AS STRING), \"max_\1\",CAST(MAX(t.\1) AS STRING), "
  replacement="$replacement""\"max_length_\1\",MAX(LENGTH(CAST(t.\1 AS STRING))), "
  replacement="$replacement""\"distinct_\1_pct\",ROUND(100.0*COUNT(DISTINCT t.\1)\/COUNT(*),2), "
  replacement="$replacement""\"null_\1_pct\",ROUND(100.0*SUM(CASE WHEN t.\1 IS NULL THEN 1 ELSE 0 END)\/COUNT(*),2), "
  cols="$(echo "$cols" | sed "s/$pattern/$replacement/g")"

  # Remove last two characters (i.e. final comma and white space)
  local selectList="$cols"
  selectList="${selectList%??}"

  selectList="$selectList) AS metrics_map"

  fromClause=" FROM (SELECT * FROM $table $limitClause) t ) exp "
  fromClause="$fromClause""LATERAL VIEW explode(metrics_map) mm AS metric_name, metric_value"

  # Execute query
  colEcho "$FUNCNAME: computing the metrics..."
  sqlStmt="$sqlStmt""$selectList""$fromClause"

  __exec_hive "$sqlStmt"

  # Re-enable globbing of * (outside of queries)
  set +f
}

What the script does is as follows:

  1. Compute the LIMIT clause based on the arguments provided. If no argument pertaining to limiting the number of rows is passed to the function, all rows will be used. This is not ideal when you have a huge table as it may take very long to calculate all metrics. If, on the one hand, the number is between zero and one (inclusive), a fraction of the total number of rows will be used. Let’s say you enter ‘0.6’. The LIMIT clause will read LIMIT 0.6 * NUM_ROWS with NUM_ROWS the output of a COUNT(*) on the table. If, on the other hand, the number is larger than one, the function assumes you want the query limited to that many rows.
  2. Generate a comma-separated list of columns. Here an exclusion pattern can be very useful, which is why I included it in __hive_columns.
  3. Generate the HiveQL statement that will be executed.** The bit with sed looks awful but what it does is build a query that looks roughly like this:
SELECT
  metric_name, metric_value
FROM
(
  SELECT
    MAP( "key1", value1,
         "key2", value2,
         ... ) AS metrics_map
  FROM
  (
    SELECT
      *
    FROM
      $table LIMIT $rowLimit
  ) t
) exp
LATERAL VIEW explode(metrics_map) mm AS metric_name, metric_value;

This creates key-value pairs with the column names (with appropriate prefixes and matching suffixes) as the keys and their (aggregate) values as the values. The call to explode makes individual rows out of the map entries, which makes it slightly easier to read on a screen: you will get a long, narrow table instead of a short, wide one.

Please observe that analyseHiveTable treats all columns as strings. The advantage is that this always works, even with external tables based on CSV files. The disadvantage is that the function is limited in what it can calculate: means and medians are out of reach.

For a table with columns a, b and c, this is the query that is executed:

SELECT
  metric_name
, metric_value
FROM
(
  SELECT
    MAP(
      "min_a", CAST(MIN(t.a) AS STRING),
      "max_a", CAST(MAX(t.a) AS STRING),
      "max_length_a", MAX(LENGTH(CAST(t.a AS STRING))),
      "distinct_a_pct", ROUND(100.0*COUNT(DISTINCT t.a)/COUNT(*), 2),
      "null_a_pct", ROUND(100.0*SUM(CASE WHEN t.a IS NULL THEN 1 ELSE 0 END)/COUNT(*), 2),

      "min_b", CAST(MIN(t.b) AS STRING),
      "max_b", CAST(MAX(t.b) AS STRING),
      "max_length_b", MAX(LENGTH(CAST(t.b AS STRING))),
      "distinct_b_pct", ROUND(100.0*COUNT(DISTINCT t.b)/COUNT(*), 2),
      "null_b_pct", ROUND(100.0*SUM(CASE WHEN t.b IS NULL THEN 1 ELSE 0 END)/COUNT(*), 2),

      "min_c", CAST(MIN(t.c) AS STRING),
      "max_c", CAST(MAX(t.c) AS STRING),
      "max_length_c", MAX(LENGTH(CAST(t.c AS STRING))),
      "distinct_c_pct", ROUND(100.0*COUNT(DISTINCT t.c)/COUNT(*), 2),
      "null_c_pct", ROUND(100.0*SUM(CASE WHEN t.c IS NULL THEN 1 ELSE 0 END)/COUNT(*) ,2)
      ) AS metrics_map
  FROM
    ( SELECT * FROM schema.table ) t
) exp
LATERAL VIEW
  explode(metrics_map) mm AS metric_name, metric_value;

Instead of having to copy-paste and edit such an elaborate query for each and every table, with a myriad of columns, you can now simply write:

analyseHiveTable schema.table

To see some examples of different command line arguments, have a look at the comments in the code in the repository.

Easy peasy, n’est-ce pas?