On-Board Datafeed
These are the outline steps to on-board a new datafeed:
- Load Landing Data
- Create Yetl Project
- Create Table Metadata
- Create Pipeline Metadata
- Create Spark Schema
- Develop & Test Pipeline
- Deploy Product
Load Landing Data
This project is about loading data from cloud blob into databricks deltalake tables. Data would normally be orchestrated into landing using some other tool for example Azure Data Factory or AWS Data Pipeline or some other tool that specialises in securely wholesale copying datasets into the cloud or between cloud services.
You may already have this orchestration in place in which case the data will in your landing blob location already or you mock it by getting a sample of data and manually copying it into the location it will be landed to.
For test driven development you can include a small vanilla hand crafted data set into the project itself that can automatically be copied into place from the workspace files as way of creating repeatable end to end integration tests that can staged, executed and torn down with simple commands.
Create Yetl Project
Create a directory, setup virtual python environment and install yetl.
mkdir my_project
cd my_project
python -m venv venv
source venv/bin/activate
pip install yetl-framework
Create yetl project scaffolding.
Create Table Metadata
Fill out the spreadsheet template with landing and deltalake architecture that you want to load. The excel file has to be in a specific format other the import will not work. Use this example as a template
NOTE:
- Lists are entered using character return between items in an Excel cell.
- Dicts are entered using a : between key value pairs and character returns betweenitmes in an Excel cell.
merge_column.column | required | type | description |
---|---|---|---|
stage | y | [audit_control, landing, raw, base, curated] | The architecural layer of the data lake house you want the DB in |
table_type | y | [read, delta_lake] | What type of table to create, read is a spark.read, delta_lake is a delta table. |
catalog | n | str | name of the catalog. Although you can set it here in the condif the api allows passing it as a parameter also. |
database | y | str | name of the database |
table | y | str | name of the table |
sql | n | [y, n] | whether or not to include a default link to a SQL ddl file for creating the table. |
id | n | str, List[str] | a column name or list of column names that is the primary key of the table. |
depends_on | n | List[str] | list of other tables that the table is loaded from thus creating a mapping. It required the yetl index which is stage.database.table you can also use stage.database.* for exmaple if you want to reference all the tables in a database. |
deltalake.delta_properties | n | Dict[str,str] | key value pairs of databricks delta properties |
deltalake.identity | n | [y, n] | whether or not to include an indentity on the table when a delta table is created implicitly |
deltalake.partition_by | n | str, List[str] | column or list of columns to partition the table by |
deltalake.delta_constraints | n | Dict[str,str] | key value pairs of delta table constraints |
deltalake.z_order_by | n | str, List[str] | column or list of columns to z-order the table by |
deltalake.vacuum | n | int | vaccum threshold in days for a delta table |
warning_thresholds.invalid_ratio | n | float | ratio of invalid to valid rows threshold that can be used to raise a warning |
warning_thresholds.invalid_rows | n | int | number of invalid rows threshold that can be used to raise a warning |
warning_thresholds.max_rows | n | int | max number of rows thresholds that can be used to raise a warning |
warning_thresholds.mins_rows | n | int | min number of rows thresholds that can be used to raise a warning |
error_thresholds.invalid_ratio | n | float | ratio of invalid to valid rows threshold that can be used to raise an exception |
error_thresholds.invalid_rows | n | int | number of invalid rows threshold that can be used to raise an exception |
error_thresholds.max_rows | n | int | max number of rows thresholds that can be used to raise an exception |
error_thresholds.mins_rows | n | int | min number of rows thresholds that can be used to raise an exception |
custom_properties.process_group | n | any | customer properties can be what ever you want. Yetl is smart enough to build them into the API |
custom_properties.rentention_days | n | any | |
custom_properties.anything_you_want | n | any |
Create the tables.yaml file by executing:
Create Pipeline Metadata
In the ./my_project/pipelines
folder create a yaml file that contains the metadata specifying how to load the tables defined in ./my_project/pipelines/tables.yaml
. You can call them whatever you want and you can create more than one. Perhaps one that batch loads and another that event stream loads. The yetl api will allow you to parameterise which pipeline metadata you want to use. For the purpose of these docs we will refere to this pipeline as my_pipeline.yaml
.
The pipeline file my_pipeline.yaml
has a relative file reference to tables.yaml
and the therefore yetl knows what files to use to stitch the table metadata together.
Please see the pipeline reference documentation for details. Here is an example.
Create Spark Schema
Once the yetl metadata is in place we can start using the API. The 1st task is to create the landing schema that need to load the data. This can be done using a simple notebook on databricks.
Using databricks repo's you can clone your project into databricks.
This must be in it's own cell:
Executing the following code will load the files and save the spark schema into the `./my_project/schema' directory in yaml format making it easy to review and adjust if you wish. There's no reason to move the files anywhere else once created, yetl uses this location as a schema repo. The files will named after the tables making it intuitive to understand what the schema's are and how the map.
The ad works example project shows this notebook approach working very well creating the schema over a relatively large number of tables.
from yetl import (
Config, Read, DeltaLake, Timeslice
)
import yaml, os
def create_schema(
source:Read,
destination:DeltaLake
):
options = source.options
options["inferSchema"] = True
options["enforceSchema"] = False
df = (
spark.read
.format(source.format)
.options(**options)
.load(source.path)
)
schema = yaml.safe_load(df.schema.json())
schema = yaml.safe_dump(schema, indent=4)
with open(source.spark_schema, "w", encoding="utf-8") as f:
f.write(schema)
project = "my_project"
pipeline = "my_pipeline"
# Timeslice may be required depending how you've configured you landing area.
# here we just using a single period to define the schema
# Timeslice(year="*", month="*", day="*") would use all the data
# you have which could be very inefficient.
# This exmaple uses the data in the landing partition of 2023-01-01
# how that is mapped to file and directories the my_pipeline definition
config = Config(
project=project,
pipeline=pipeline,
timeslice=Timeslice(year=2023, month=1, day=1)
)
tables = config.tables.lookup_table(
stage=StageType.raw,
first_match=False
)
for t in tables:
table_mapping = config.get_table_mapping(
t.stage, t.table, t.database, create_table=False
)
create_schema(table_mapping.source, table_mapping.destination)
As you can see using this approach can also be used for creating tables in a pipeline step prior to any load pipeline using the create_table
parameter. It will either create explicity defined tables using SQL DML if you've configured any or just create register empty delta tables with no schema. This may be required if you have multiple sources flowing into a single table (fan-in) to avoid transaction isolation errors creating the tables the 1st time that the pipeline runs.
Develop & Test Pipeline
TODO
Deploy Product
TODO