Introduction

Especially in data science projects, Python code is often peppered with functions generating SQL queries as strings like:

def get_sku_ts(entity_id: str, sku: Union[str, list]) -> str:
    """
    Compose query to retrieve timeseries for SKU or list of SKUs
    """
    query = f"""
    SELECT *
    FROM `sku-table`
    WHERE entity_id = \"{entity_id}\"
    """
    if isinstance(sku, str):
        query += f" AND sku = \"{sku}\" "
    if isinstance(sku, list):
        list_str = ", ".join([f"\"{s}\"" for s in sku])
        query += f" AND sku IN ({list_str}) "
    query += f"""
    ORDER BY date ASC
    """
    return query

And this is even only a simple example. Using dynamic joins and sub-queries, things can get even more complicated. When the Python on-board resources are no longer sufficient, some developers start using templating engines like Jinja to handle the complexity of generating a query string dynamically.

But is string generation really the right way to solve a problem like that? It’s definitely easy as most developers in the field of data science know their SQL and dealing with strings, so it’s a start with low entry hurdle. Let’s shed some light on the downsides of this approach. The first thing to note is that we have a language break as we use SQL and Python side-by-side in the same program. This causes us some trouble as we never know if the queries we generate are even syntactically valid. Only at execution time we will see if the SQL parsers swallows what we generated which makes unit testing such functions quite hard. Without an SQL parser, we can only test in the function above if different kinds of parameters lead to some string, which is trivial. Another impact of the language break is that structuring and decoupling the code becomes harder. Imagine you want to write one method that takes another query and adds time-based slicing by specifying two dates. If the query is a string this will be pretty hard as you need to add some DATE(timestamp) BETWEEN FROM_DATE AND TO_DATE-clause and there might be already some other where-clauses specified. A last downside is that specifying SQL queries like this is quite prone to SQL injection attacks. So if you compose your queries based on user-generated data, e.g. user input from some e-commerce website, a malicious user might inject sub-queries in a smart way to steal your data. In many data science related projects this might be a rather academic vector of attack, but especially in learning-to-rank use-cases it might not be that unrealistic.

To summarize these downsides:

  1. language gap between Python and SQL, i.e. no IDE-support to find errors before execution,
  2. hard to write clean, decoupled, well-structured code and meaningful unit tests,
  3. the possibility of SQL injection attacks.

Programmatic SQL

But SQL is the right tool for this task! So what’s the solution then?”, I hear you mutter. Welcome to programmatic SQL! And it might not even be new to you if you have ever used Apache Spark. In Spark you have many frontends besides the SQL API, like PySpark for Python and others for Java, Scala, R, etc. Most data scientists naturally use a programmatic approach in their programs avoiding the downsides of SQL string generation without even noticing.

But what if we are dealing not with Spark but another database or warehouse like BigQuery? In this case we can use SQLAlchemy! SQLAlchemy comes with two abstraction layers, a Core layer and a high-level ORM layer. ORM stands for Object-Relational Mapping and is a common technique to define objects that are automatically mapped to a relational database. For most data science projects it’s not so useful as typical use-cases are analytical and are thus not focused on single instances/objects. The underrated Core layer of SQLAlchemy, on the other hand, is really useful as it provides us with a way of generating queries programmatically similar to PySpark. SQLAlchemy is independent of the actual SQL dialect and by installing a corresponding dialect it can deal with all popular databases and data warehouses, for instance MySQL, PostgreSQL, BigQuery, etc. So how does this work? Let’s go through this with a BigQuery example as it is often used in data science projects.

SQLAlchemy with BigQuery

Let’s say we want to find out how many monthly downloads some project on PyPI has. If you want to follow along and try it out yourself, a Jupyter notebook is available on my Github repository.

With plain SQL, we would solve this task the following way:

from google.cloud import bigquery

bqclient = bigquery.Client()

query_string = """
SELECT 
  DATE_TRUNC(DATE(timestamp), MONTH) AS `month`,
  COUNT(*) AS num_downloads,
FROM `bigquery-public-data.pypi.file_downloads`
WHERE file.project = 'pyscaffold'
    AND details.installer.name = 'pip'
    AND DATE(timestamp) BETWEEN DATE('2021-01-01') AND CURRENT_DATE()
GROUP BY `month`
ORDER BY `month`
"""

df = (
    bqclient.query(query_string)
    .result()
    .to_dataframe(create_bqstorage_client=True)
)

Quite easy, but note that to keep this example focused, the query string is static, meaning that in order to dynamically change the time slicing, the project name etc., we would have to introduce string substitutions at least. This would result in the aforementioned downsides.

In contrast to that, after having installed SQLAlchemy and the BigQuery dialect pybigquery, we can define a dynamic query object instead of query_string:

from sqlalchemy import *
from sqlalchemy.sql import func as F
from sqlalchemy.engine import create_engine
from sqlalchemy.schema import *

engine = create_engine('bigquery://my_gcp_project')

table = Table('bigquery-public-data.pypi.file_downloads', MetaData(bind=engine), autoload=True)
query = (
    select([F.date_trunc(F.date(column("timestamp")), text("month")).label("month"),
            F.count("*").label("num_downloads")],
           from_obj=table)
    .where(
        and_(
            F.date(column("timestamp")).between("2021-01-01", F.current_date()),
            column("file.project") == "pyscaffold",
            column("details.installer.name") == "pip"
        )
    )
    .group_by("month")
    .order_by("month")
)

At first sight, it looks somewhat more verbose and unfamiliar, but you will get used to the SQLAlchemy API, even if it doesn’t look as concise as the one of PySpark, I have to admit. More or less, it’s just SQL but embedded nicely as functions and objects in Python.

So far, we have only generated the query and SQLAlchemy would have directly informed us if this was invalid. To see how the prepared SQL statement looks like, maybe for debugging, we can just print(query) to get:

SELECT date_trunc(date(`timestamp`), month) AS `month`, count(%(count_1:STRING)s) AS `num_downloads` 
FROM `bigquery-public-data.pypi.file_downloads` 
WHERE date(`timestamp`) BETWEEN %(date_1:STRING)s AND CURRENT_DATE 
  AND `file`.`project` = %(file.project_1:STRING)s 
  AND `details`.`installer`.`name` = %(details.installer.name_1:STRING)s 
GROUP BY `month` 
ORDER BY `month`

To see the actual query with all literals bound to their final values, we need to compile the query first before printing:

print(query.compile(engine, compile_kwargs={"literal_binds": True}))

to get:

SELECT date_trunc(date(`timestamp`), month) AS `month`, count('*') AS `num_downloads` 
FROM `bigquery-public-data.pypi.file_downloads` 
WHERE date(`timestamp`) BETWEEN '2021-01-01' AND CURRENT_DATE 
  AND `file`.`project` = 'pyscaffold' 
  AND `details`.`installer`.`name` = 'pip' 
GROUP BY `month` 
ORDER BY `month`

But this is only interesting for debugging and also for learning SQLAlchemy. In order to execute the query object and transform its result set into a dataframe, all we have to do is:

df = pd.read_sql(query, engine)

Quite easy, right? Pandas understands and works nicely with SQLAlchemy query objects and engines.

Conclusion

We have seen how SQL string generation in Python programs can be replaced by a programmatic approach and that this approach has several advantages over the usage of strings. But it also comes with some small downsides. First of all, one has to learn SQLAlchemy, and its API as well as documentation is surely not as nice as PySpark. So when switching, you will become slower for a while until you get the hang of SQLAlchemy. The second downside is that another level of abstraction, naturally comes with less control over the final SQL statement, but this is also true for PySpark and a trade-off every developer always needs to consider. Having mentioned Spark again, my 5 cents are that the PySpark API is much better than SQLAlchemy and if you are using Spark anyway, you should rather use Google’s Spark Bigquery Connector. For rather small-data projects, where your Python program just uses BigQuery as data storage, using SQLAlchemy might up your Python code quite a bit though.


REMARK: Spark BigQuery Connector and Pushdowns to BigQuery

Right now the Spark Bigquery Connector has no full pushdown capabilities regarding BigQuery. If pushAllFilters is set to true in spark.conf, which is the default, the connector pushes all the filters Spark can delegate to the BigQuery Storage API. Since GroupBy is a more complex operation, the following query:

spark.read.format("bigquery").load("bigquery-public-data.pypi.file_downloads").select(...).groupby(...)

will first fetch all data from the table into Spark. As this might be quite inefficient it’s better to directly provide an SQL query that filters and groups direclty in BigQuery like:

spark.read.format("bigquery").load(sql_string)

In this case, the programmatic SQLAlchemy approach as described above, can also be applied to generate sql_string with the advantages described above.


Feedback is always welcome! Please let me know if this was helpful and also if you think otherwise in the comments below.


Comments

comments powered by Disqus