Sometimes the analytical power of built-in Hive functions is just not enough. In this case it is possible to write hand-tailored User-Defined Functions (UDFs) for transformations and even aggregations which are therefore called User-Defined Aggregation Functions (UDAFs). In this post we focus on how to write sophisticated UDFs and UDAFs in Python. By sophisticated we mean that our UD(A)Fs should also be able to leverage external libraries like Numpy, Scipy, Pandas etc. This makes things a lot more complicated since we have to provide not only some Python script but also a full-blown virtual environment including the external libraries since they may not be available on the cluster nodes. Therefore, in this tutorial we require only that a basic installation of Python is available on the data nodes of the Hive cluster.
General information
To keep the idea behind UD(A)Fs short, only some general notes are mentioned here.
With the help of the Transform/Map-Reduce syntax, i.e. TRANSFORM
, it is
possible to plug in your own custom mappers and reducers. This is where we gonna hook
in our Python script. A UDF is basically only a transformation done by a mapper
meaning that each row should be mapped to exactly one row. A UDAF on the
other hand allows us to transform a group of rows into one or more rows, meaning that we
can reduce the number of input rows to a single output row by some custom
aggregation. We can control if the script is run in a mapper or reducer step
by the way we formulate our HiveQL query. The statements DISTRIBUTE BY
and
CLUSTER BY
allow us to indicate that we want to actually perform an aggregation.
HiveQL feeds the data to the Python script or any other custom script by using
the standard input and reads the result from its standard out. All messages from
standard error are ignored and can therefore be used for debugging.
Since a UDAF is more complex than a UDF and actually can be seen as a generalization
of it, the development of a UDAF is demonstrated here.
Overview and a little task
In order to not get lost in the details, here is what we want to achieve from a high-level perspective.
- Set up small example Hive table within some database.
- Create a virtual environment and upload it to Hive’s distributed cache.
- Write the actual UDAF as Python script and a little helper shell script.
- Write a HiveQL query that feeds our example table into the Python script.
Our dummy data consists of different types of vehicles (car or bike) and a price. For each category we want to calculate the mean and the standard deviation with the help of Pandas to keep things simple. It should not be necessary to mention that this task can be handled in HiveQL directly, so this is really only for demonstration.
1. Setting up our dummy table
With the following query we generate our sample data:
CREATE DATABASE tmp;
USE tmp;
CREATE TABLE foo (id INT, vtype STRING, price FLOAT);
INSERT INTO TABLE foo VALUES (1, "car", 1000.);
INSERT INTO TABLE foo VALUES (2, "car", 42.);
INSERT INTO TABLE foo VALUES (3, "car", 10000.);
INSERT INTO TABLE foo VALUES (4, "car", 69.);
INSERT INTO TABLE foo VALUES (5, "bike", 1426.);
INSERT INTO TABLE foo VALUES (6, "bike", 32.);
INSERT INTO TABLE foo VALUES (7, "bike", 1234.);
INSERT INTO TABLE foo VALUES (8, "bike", null);
Note that the last row even contains a null value that we need to handle later.
2. Creating and uploading a virtual environment
In order to prepare a proper virtual environment we need to execute the following steps on an OS that is binary compatible to the OS on the Hive cluster. Typically any recent 64bit Linux distribution will do.
We start by creating an empty virtual environment with:
virtualenv —no-site-packages -p /usr/bin/python3 venv
assuming that virtualenv
was already installed with the help of pip. Note that
we explicitly ask for Python 3. Who uses Python 2 these days anyhow?
The problem with the activate
script of a virtual environment is that
its path is hard-coded. We change that by replacing the line
VIRTUAL_ENV="your/path/to/venv"
with
HERE="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
VIRTUAL_ENV="$( readlink -f "${HERE}/../" )"
in ./venv/bin/activate
. Additionally, we replace in pip
the shebang line, i.e. replacing
#!/your/path/to/venv/venv/bin/python3
with
#!/usr/bin/env python
This will help us later when we call pip list
for debugging reasons.
We activate the virtual environment and install Pandas in it.
source venv/bin/activate
pip install numpy pandas
This should install Pandas and all its dependencies into our virtual environment. No we package the virtual environment for later deployment in the distributed cache:
cd venv
tar cvfhz ../venv.tgz ./
cd ..
Be aware that the archive was created with the actual content at its root so
when unpacking there will be no directory holding the actual content. We also
used the parameter h
to package linked files.
Now we push the archive to HDFS so that later Hive’s data nodes will be able to find it:
hdfs dfs -put venv.tgz /tmp
The directory /tmp
should be changed accordingly. One should also note that
in principle the same procedure should also be possible with conda environments. In
practice though, it might be a bit more involved since the activation of a conda
environment (what we need to do later) assumes an installation of at least
miniconda which might not be available on the data nodes.
3. Writing and uploading the scripts
We start by writing a simple Python script udaf.py
:
import sys
import logging
from itertools import groupby
from operator import itemgetter
import numpy as np
import pandas as pd
SEP = '\t'
NULL = '\\N'
_logger = logging.getLogger(__name__)
def read_input(input_data):
for line in input_data:
yield line.strip().split(SEP)
def main():
logging.basicConfig(level=logging.INFO, stream=sys.stderr)
data = read_input(sys.stdin)
for vtype, group in groupby(data, itemgetter(1)):
_logger.info("Reading group {}...".format(vtype))
group = [(int(rowid), vtype, np.nan if price == NULL else float(price))
for rowid, vtype, price in group]
df = pd.DataFrame(group, columns=('id', 'vtype', 'price'))
output = [vtype, df['price'].mean(), df['price'].std()]
print(SEP.join(str(o) for o in output))
if __name__ == '__main__':
main()
The script should be pretty much self-explanatory. We read from the standard
input with the help of a generator that strips and splits the lines by the
separator \t
. At any point we want to avoid to have more data in memory as
needed to perform the actual computation. We use the groupby
function that
is shipped with Python to iterate over our two types of vehicles. For each group
we convert the read values to their respective data types and at that point
also take care of null
values which are encoded as \N
. After this preprocessing
we finally feed everything into a Pandas dataframe, do our little mean and standard
deviation calculations and print everything as a tabular separated list.
It should also be noted that we set up a logger at the beginning which writes
everything to standard error. This really helps a lot with debugging and should
be used. For demonstration purposes the vehicle type of the group currently
processed is printed.
At this point we would actually be done if it wasn’t for the fact that we are
importing external libraries like Pandas. So if we ran this Python script directly
as UDAF we would see import errors if Pandas is not installed on all cluster nodes.
But in the spirit of David Wheeler’s “All problems in computer science can be
solved by another level of indirection.” we just write a little helper script
called udaf.sh
that does this job for us and calls the Python script afterwards.
#!/bin/bash
set -e
(>&2 echo "Begin of script")
source ./venv.tgz/bin/activate
(>&2 echo "Activated venv")
(>&2 pip list --format=columns --no-cache-dir)
python udaf.py
(>&2 echo "End of script")
Again we use standard error to trace what the script is currently doing.
Furthermore, we use pip list
to output the content of the virtual environment
for debugging reasons.
With the help of chmod u+x
we make the script executable and now all that’s
left is to push both files somewhere on HDFS for the cluster to find:
hdfs dfs -put udaf.py /tmp
hdfs dfs -put udaf.sh /tmp
4. Writing the actual HiveQL query
After we are all prepared and set we can write the actual HiveQL query:
DELETE ARCHIVE hdfs:///tmp/venv.tgz;
ADD ARCHIVE hdfs:///tmp/venv.tgz;
DELETE FILE hdfs:///tmp/udaf.py;
ADD FILE hdfs:///tmp/udaf.py;
DELETE FILE hdfs:///tmp/udaf.sh;
ADD FILE hdfs:///tmp/udaf.sh;
USE tmp;
SELECT TRANSFORM(id, vtype, price) USING 'udaf.sh' AS (vtype STRING, mean FLOAT, var FLOAT)
FROM (SELECT * FROM foo CLUSTER BY vtype) AS TEMP_TABLE;
At first we add the zipped virtual environment to the distributed cache that
will be automatically unpacked for us due to the ADD ARCHIVE
command.
Then we upload the Python and helper script. To make sure the current version
in the cache is actually the latest, so in case changes are made, we
prepended DELETE
statements before each ADD
.
The actual query now calls TRANSFORM
with the three input column we expect
in our Python script. After the USING
statement our helper script is provided
as the actual UDAF seen by HiveQL. This is followed by AS
defining the names
and types of the output columns.
At this point we need to make sure that the script is executed in a reducer step.
We assure this by defining a subselect that reads from our foo
table and clusters
by the vtype
. CLUSTER BY
which is a shortcut for DISTRIBUTE BY
followed by
SORT BY
asserts that rows having the same vtype
column are also located on
the same reducer. Furthermore, the implicit SORT BY
orders within a reducer
the rows with respect to the vtype
column. The overall result are consecutive
partitions of a given vehicle type (car and bike in our case) whereas each partition resides
on a single reducer. Finally, our script is fed the whole data on a single reducer
and needs to figure out itself where one partition ends and another one starts
(what we did with itertools.groupby
).
Finally
Since our little task is now accomplished, it should also be noted that there
are some more Python libraries one should know when working with Hive.
To actually execute the HiveQL query we have written with the help of Python, there
is impyla by Cloudera with supports Python 3 in contrast to PyHive by Dropbox.
In order to work with HDFS the best library around is hdfs3. That would
for instance allow us to push changes in udaf.py
automatically with a Python script.
Have fun hacking Hive with the power of Python!
Comments
comments powered by Disqus