Skip to main content

Datakvalitet og tester

Tester og verifiseringer er nyttige for å sikre at kvaliteten på data man jobber med er tilstrekkelig. På Saga skiller vi mellom to typer tester:

  • Data assertions. Dette er tester som kjøres på en ferdig tabell i BQ. Bruker reell data. Kan kjøres på datadumper og data utredet av pipelines
  • Logiske tester på kode. Brukes for å verifisere logikken og transformasjonene gir resultatet som er forventet. Bruker testdata. Brukes i pipelines, og er nyttige å ha for å sikre at kodeendringene ikke minimerer datakvalitet.

I noen tilfeller ønsker vi å bruke begge type tester for å sikre datakvalitet.

Verifisere datakvalitet på tabeller

Data assertions er nyttige for å verifisere kvaliteten på rådata vi får inn, eller for å for å forsøke å fange opp caser man ikke har tatt hensyn til i våre pipelines. Eksempler kan være:

  • duplikatsjekk på datasettet
  • null verdier i en kolonne overskrider ikke en terskelverdi
  • differanse på antall rader mellom gårsdagens og dagens import er ikke for stor
  • domenespesifikke sjekker, eks
    • total km riksvei overskrider ikke forventet baseline-verdi
    • det er X antall typer i en gitt kategori
    • varsel hvis X antall sidevisninger avviker fra normalen

Assertions settes opp som en pipeline og kjøres som en vanlig DAG i Airflow.

Sette opp en assertion DAG

Hver test bør kun gjøre en enkelt verifisering. Nærliggende tester bør være seperate tasks i samme DAG, og navngis slik at det er enkelt å få oversikt over hva som verifiseres.

For å sette opp en test trenger du:

  1. En DAG fil for å kjøre tasks
  2. En SQL fil per spørring, med jinja-syntaks. Spørringen må returnere en verdi (eksakt en rad og en kolonne). En slik spørring inneholder aggregatfunksjoner som COUNT(), SUM(), MAX() etc

Yggdrasil har utarbeidet hjelpefunksjoner for å gjøre slike tester. Funksjonene kan importeres fra data_assertions. Funksjonene tar inn en SQL-query fra en ekstern fil og et forventet resultat.

Eksempel på en SQL-spørring lagret i filen 01_assert_point_location_is_not_null.sql:

SELECT COUNT(recordId) FROM `{{dataset}}`.`{{table}}`
WHERE EXISTS (SELECT 1 FROM UNNEST(locations) as locations
WHERE locations.locationType = 'PointLocation'
AND locations.geography IS NULL)

Eksempel på en data assertion under. Testen teller antall forekomster i tabellen med betingelsene i SQL'en, og forventer å finne 0 rader:

from pipeline import SagaContext, make_pipeline
from airflow.decorators import task
from data_assertions import assert_query_value
from includes import include_file
from pathlib import Path

location = "EU"
dataset = "internal"
table = "situations_v2"

"""
This pipeline contains test/assertions on the BQ tables in the Veglogg project
The tasks fails if we do not get the expected result
"""
dag_folder_path = f"{Path(__file__).parent}"


def pipeline(context: SagaContext):
@task
def assert_point_location_is_not_null():
get_query = include_file(
f"{dag_folder_path}/assertion_sql/01_assert_point_location_is_not_null.sql"
)

expected_result = 0
assert_query_value(
context, get_query, {"dataset": dataset, "table": table}, expected_result
)

assert_point_location_is_not_null()


# Nightly at 2
make_pipeline(pipeline, schedule="0 2 * * *")

Et mer komplekst eksempel under på en task. Kodesnutten viser hvordan man kan bruke resultatene fra en SQL-spørring som måling mot en data assert, sette inn mer informasjon i feilmeldingen, samt sette en akseptabelt feilmargin:

def check_number_of_rows_latest_table_is_close_to_avg_past_days(
table_prefix: str, context: SagaContext
):
get_expected_result_query = include_file(
f"{dag_folder_path}/assertion_sql/01_average_row_count_last_6_days.sql"
)
get_assert_query = include_file(
f"{dag_folder_path}/assertion_sql/02_row_count_latest_table.sql"
)
render_expected_result_query = Template(get_expected_result_query).render(
{"dataset": dataset, "table_prefix": table_prefix}
)

expected_result = get_value_from_aggregate_query(
context, get_expected_result_query_render
)

avg_failed_message = """
To check which tables of the vegobject group that differ from the average run
SELECT row_count, size_bytes
FROM <dataset>.__TABLES__
WHERE table_id LIKE '<table_prefix>%'
in BQ"""

assert_query_value(
context=context,
query=get_assert_query,
query_params={"dataset": dataset, "table_prefix": table_prefix},
expected_result=expected_result,
tolerance_percent=0.1,
failed_message=avg_failed_message,
)

Dersom en test feiler, vil tasken vises som feilet i Airflow og teamet vil få varsling på sin Slack kanal.

Teste logikk i pipeline

Denne type tester er nyttig for å verifisere at logikken i en datatransformasjon fungerer som tiltenkt. Testene kjøres på statiske testdata fremfor reell data. Siden testene kjøres på samme datagrunnlag kan vi sikre at vi ikke får uønskede sideeffekter når vi gjør endringer i koden.

Har man en pipeline med mange transformasjoner og operasjoner, bør man dele opp testene til å teste minst mulig (enhetstester). Ved eventuelle feil vil man få varsel på de delene av pipelinen som er endret, og det er enklere og raskere å finne ut hvilke operasjoner som medfører feil.

Sette opp enhetstester

[Dokumentasjonen i denne seksjonen er under utarbeidelse]