Data validation with airflow

Data validation with airflow can be a simple way to do some data quality checks without any overhead. Here I will briefly cover the topic of doing data checks or data quality tests when importing user-input data (like when integrating other data management systems such as CRMs or when taking some mapping tables as input).

Data validation methods

1. Data validation

Data validation refers to verifying that what you think about the data is true, such as that cost is a number and not a string, that a date is a valid date,  that the entries are unique, that values are not left unfilled.

It’s ideal that you do not only validate the data (that it fits expected conditions/formats/data types /primary keys etc) but also validate the referential integrity.

2. Referential validation

Referential validation is about validating that the references to or from the data you imported are valid.

If you import some entities (orders, products, users) from some data management system (CRM, CMS), make sure that those entities exist in your database. For example, make sure that the companies you import from salesforce or other crm also exist in your company dimension table.

3. Anomaly detection

This is about detecting potential data issues or events underlined in the data. It often takes the form of a monitoring system for data quality that alerts on unexpected changes.

Data validation with airflow

The anomaly detection is usually done on streaming data and does not make sense to run from Airflow, which is a workflow engine meant for running batch jobs. However, for data quality checks, airflow can work perfectly.
For your data validation, loading your data into a table with constraints (unique, not null etc) will produce a pretty meaningful message on fail. You can simply add the data producer to the on-fail emailing, and they will get the message on fail.

For referential validation, an error from the foreign keys would likely be insufficient for providing enough information to the data owner to fix the issue. Such, I prefer to use a query to select the offending records, and then send the output to the data producer. I wrap the sql in a python method that asserts output is empty, and create a task on airflow in the relevant DAG. If output exists, airflow will send the failure message to the data owner with the relevant offending keys.

How do you do your data checks? Bonus points if your solution makes use of existing components without overhead.

Easy ETL, or “friends don’t let friends write loading code”

Easy ETL is what every tool promises, but most do not deliver. Over the last years there has been a strong shift in BI towards the use of python. If previously any scripting language would do, nowadays it’s almost impossible to manage without python specifically.

Take the ever more common open source tech stack of
SQL for storage and analysis
Python as glue language
Airflow (python) as workflow engine
Airflow (python)+ custom utils (any scripting language) as framework.

You can reduce the complexity by deciding to stick with python as far as possible with regards to the utils. It makes a lot of sense as you can just import the utils directly in the job (DAG) definition file to call and parametrize your methods with ease.

Having worked on multiple BI data warehousing projects over the last years, the most common mistake I see made is underestimating the cost of SQL maintenance, and the most common difficulty is staffing with enough workers to maintain and extend the large codebases.

The cost of maintaining sql throughout its lifecycle is usually estimated at 3-4x the time it takes to create it in the first place. The truth is, for the most vital components of your transformation, the cost of maintenance is closer to 10x the cost of original development. You will touch this code again and again every time there is a source data change, addition etc. Add a new column? Easy! Modify your data loading scripts, your DM creation scripts, your reporting scripts to include it, your data dictionary, and the frontend. Change a column? not so easy anymore.

So we are already familiar with the hard way: Write code to do the transformations. When the schema changes, add or modify the code.

So what is the easy way? Write code to write code to do the transformations. Many of the transformation steps are repetitive, so if you were to standardize things, you could easily re-use the same few parametrized operations. It’s called don’t repeat yourself or ‘DRY’, and it’s a basic concept for producing decent, maintainable code. You know, what you SHOULD be doing instead of copy pasting all that sql. But while many of us know our code is ‘WET’, it is sometimes hard to abstract and standardize. So how do you do it?

Standardize.

When it comes to loading data into a table, incrementally or not, there are only a few ways to do this. Ideally, stick with the basics and re-use the code. If most of your operations are of one type, and a new one might run faster if done differently, consider that writing new loading logic and maintaining it 10 times over is likely more expensive than the drawbacks of your existing solution.

Abstract.

Split your jobs into subroutines. You will quickly identify that there are many common parts, that can be abstracted out and written as reusable methods in some shared utils. You can then construct the job from those blocks, having only the job-specific code to manage in any particular job.

Take the following examples:

A job that pulls some data from an api:
1. Download from api to csv
2. load csv in database
3. merge increment

A job that loads data from one rdbms to the dwh:
1. Download from db.
2. load csv in database
3. merge increment

A job that creates some reports:
1. Run some sql
2. Run some code
3. Send report

A job that monitors your data
1. Run some sql
2. Check if normal
3. Send report if abnormal

A job that delivers some segments into your CRM tool.
1. Run some sql
2. Write to file
3. Push to api.

Notice how many of the steps are closely shared with other jobs? what if we abstracted as much of the comon stuff as we could? We would only need to write the components that we do not already have.

A job that pulls some data from an api:
1. Download from api to csv

A job that loads data from one rdbms to the dwh:
1. Make a list of tables to copy

A job that creates some reports:
1. Write report SQL

A job that monitors your data
1. write some sql
2. Check if normal

A job that delivers some segments into your CRM tool.
1. Run some sql
2. Push to api.

Notice we now only have 7/15 parts of code to deal with. Reducing complexity in this manner not only reduces development time, but also makes it easier to maintain in the future. Want to stop loading to Postgres and load to Redshift instead? Just change the loading method once. Or, want to stop worrying about staging tables? You can just create them on the fly based on your target table and drop them when you are done.

Pareto principle, KISS, lean, or whatever you prefer.

You will do 80% of the work with 20% of the code. Do not try to solve every potential problem at once. Start with something that solves the majority of your worries.

You do not need to have the best code running from the start. Solve the problems as well as you can without adding complexity. While your shared methods might not always be the best suited for each case, they are better than 5x the code.

A few examples:

For loading data, if you are using postgres, mysql or other rdbms, the fastest way to load is through the copy command. Write a python method with file and table as parameter. Now, you can use this method as method(file, table, args = defaults) every time you need to load some data, without writing the copy statement every time. For engines like redshift, your method will additionally need to copy the file to s3 first and parametrize the copy with s3 credentials.

For incremental loading, write methods for your common operations. If you want to do an increment merge(redshift manual link), it is easy to automate as merge_increment(source, target, keys = [])

For indexing: all your join keys and columns with typical heavy usage have pretty standard naming. id, sk, date, from, to, ts, and so on. Why change indices every time you make a change when you can just use standardised naming to your advantage? Using system tables, you are able to identify the tablename, column and datatype for the index based on your naming convention, and index accordingly in an automated manner.

I hope this helps some of you in your data delivery journey. How do you keep your complexity low?