====
DAGs
====
The concept of workflows and pipelines that run a series of :ref:`topic_tasks` in a specific order can often be described as a :ref:`dags` (directed-acyclic-graph).
Given Gluepy is a framework to provide structure to your workflows, the concept of the :ref:`dags` is a primary citizen of the framework and a very core component.
Here is an example of a very simple Gluepy DAG:
.. code-block:: python
from gluepy.exec import DAG
from .tasks import (
DataTask, ForecastTrainingTask, ForecastTestTask, OutputFormatTask
)
class TrainingDAG(DAG):
label = "training"
tasks = [
DataTask, ForecastTrainingTask, ForecastTestTask, OutputFormatTask
]
This DAG can easily be executed using the :ref:`cli_dag`:
.. code-block:: bash
python manage.py dag training
Orchestrating DAGs
==================
As described in the :doc:`Overview `, Gluepy is **not** an orchestrator and is taking multiple steps to decouple
from what tools or architecture that is being used to actually execute the code. A Gluepy project should be able to be executed
on your Local Machine, `Airflow `_, `Dagster `_ or other orchestrators.
This is achieved by the Gluepy :ref:`dags` format being agnostic to the various Orchestrator's DAG formats, and you can
create CLI commands such as :ref:`cli_airflow_generate` to translate the Gluepy DAG into an Airflow DAG using Jinja templating.
The benefit of this is that your broader Data Science or Machine Learning team do not need to be familiar with the architecture
or tools involved to run their pipelines in production, and instead there can be a separation of concern between the Data Scientists
and the Engineers on topics such as Scheduling, Orchestration, Horizontal Scaling and so on.
.. _topic_tasks:
Tasks
=====
A :ref:`tasks` is the class that holds the actual logic of a step in your :ref:`dags`. This is where you read in data, write custom code,
transform your dataframes and train your machine learning models.
Because of the distinction between a :ref:`dags` and a :ref:`tasks`, a :ref:`tasks` can be reused across multiple :ref:`dags`. The task itself is not
aware or coupled to any specific DAG.
Here is an example Task that we wrote in :doc:`/intro/tutorial02`
.. code-block:: python
import os
import io
import pickle
import xgboost as xgb
from gluepy.exec import Task
from gluepy.conf import default_context
from gluepy.files.data import data_manager
from gluepy.files.storages import default_storage
import pandas as pd
class TrainingTask(Task):
label = "training"
def run(self):
# Read the training dataset previous generated in
# ``GenerateTrainingDataTask``. The path is automatically
# formatted to read from the run_folder to ensure data versioning
# and isolation of output between executions.
df: pd.DataFrame = data_manager.read("training.csv")
df["date"] = df["date"].astype("category")
# Train our machine learning model.
model = xgb.XGBRegressor(enable_categorical=True)
model.fit(df[["date", "article_id"]], df["units"])
# Store model to disk to later be used when we want
# to do inference.
stream = io.BytesIO()
pickle.dump(model, stream)
stream.seek(0)
default_storage.touch(
os.path.join(default_context.gluepy.run_folder, "model.pkl"), stream
)
Passing objects between Tasks
-----------------------------
The ``run()`` method is the entrypoint to any :ref:`tasks`, and you may notice that ``run()`` do not accept any keyword argument
being passed into it. This is an intentional design choice to avoid data scientists building in-memory dependencies between tasks
that make it very challenging to try subset of a :ref:`dags`, or orchestrate your various tasks in parallel across a cluster of machines
that do not share memory.
The preferred way to pass data between a series of :ref:`tasks` is to simply write to disk using the ``default_storage`` object as described
in :ref:`topic_storage`, and load it from disk in the next step.
This ensures that if a step fail, it can be retried without re-running the full :ref:`dags`.