Skip to the content.
Building a Custom Airflow DAG Scheduler in Pure Python | AI Systems Design From Scratch

Connect with Amin Boulouma Official

AI Systems Design From First Principles - An implementation of AI Systems Design From First Principles | Product Hunt

🏠 Documentation Hub 📝 Engineering Blog 💻 GitHub Repository

Building a Custom Airflow DAG Scheduler in Pure Python

Amin BouloumaSoftware Engineer

In data engineering, workflow orchestration is the backbone of reliable data movement. Tools like Apache Airflow manage complex networks of data pipelines by treating them as DAGs (Directed Acyclic Graphs). They ensure tasks run in the correct order, handle retries, and schedule executions precisely based on time configurations.

But underneath the heavy enterprise infrastructure, what does an orchestrator actually do? At its fundamental level, it is a infinite control loop mapping three core mechanisms:

  1. The Graph Structure: A coordinate plane defining jobs and relationships.
  2. The State Registry: Tracking what needs to run and when.
  3. The Scheduler Heartbeat: A deterministic clock cycle parsing cron-like syntax and managing thread execution.

As part of our zero-dependency mandate, we are shifting away from heavy celery workers and databases to build a functional data pipeline orchestrator using nothing but the Python standard library.


The First-Principles Orchestration Blueprint

Our minimalist engine breaks the orchestration pattern into three standalone components: AirflowDAG to hold our structural metadata, Task to isolate our execution callbacks, and a clock-driven Scheduler to drive the system forward.

Here is the complete core implementation:

import time

class AirflowDAG:
    def __init__(self, name, default_args=None):
        self.name = name 
        self.default_args = default_args or {}
        self.tasks = []
        self.schedule_interval = None 

    def add_task(self, task, schedule=None, dependencies=None):
        """Registers an independent operational node inside the execution registry."""
        task.name = task.name
        task.schedule = task.schedule or schedule
        task.dependencies = dependencies 
        self.tasks.append(task)

class Task:
    def __init__(self, name, execute_func, schedule=None, dependencies=None):
        self.name = name
        self.execute_func = execute_func 
        self.schedule = schedule 
        self.dependencies = dependencies or {}

    def execute(self):
        """Triggers the task payload function execution wrapper."""
        print(f"Executing Task: {self.name}")
        self.execute_func()
        return True 

class Scheduler:
    def __init__(self, dag):
        self.dag = dag
        self.current_time = 0
        self.task_executions = {}

    def start(self):
        """Ignites the core operational heartbeat engine loop."""
        print("Scheduler started...")
        while True:
            self._check_schedules()
            self._execute_tasks()
            self.current_time += 1 # Simulate deterministic logical time ticks

    def _check_schedules(self):
        """Scans the registered graph nodes to detect due workflows."""
        for task in self.dag.tasks:
            if self._is_due(task):
                self._schedule_task(task)

    def _execute_tasks(self):
        """Loops through task nodes to safely execute payload operations."""
        for index, task in zip(range(len(self.dag.tasks)), self.dag.tasks):
            if self._is_due(task):
                task.execute()
                # Use cron parameter splits to sleep processing threads dynamically
                time.sleep(int(task.schedule.split(' ')[0]))

    def _is_due(self, task):
        """Evaluates numerical modulo metrics against a cron-like timing model."""
        if not task.schedule: 
            return False 
        # Evaluates simple interval parameters (e.g., parsing the first space block)
        interval = int(task.schedule.split(' ')[0]) if task.schedule else 1
        return self.current_time % interval == 0

    def _schedule_task(self, task):
        """Caches the task payload internally into the state tracker registry."""
        self.task_executions[task.name] = task 
        print(f'Task {task.name} scheduled')


Architectural Deep Dive

1. Simulating Cron Interval Parsing

In standard enterprise architectures, scheduling intervals are parsed using complex crontab specifications matching minutes, hours, days, and weeks. Our custom core handles this deterministically with zero overhead via token extraction string splits:

int(task.schedule.split(' ')[0])

By reading the initial string slice from a standard cron statement (like "5 * * * *"), the engine extracts the integer value (5). The system then evaluates the operational deadline by computing a mathematical modulus check against the advancing system clock: self.current_time % interval == 0. Every time the clock matches a clean multiple of that interval, the scheduler flags the specific graph node as due for execution.

2. The Heartbeat Engine Loop

The Scheduler class relies on an infinite while loop driven by a synthetic logical clock (self.current_time += 1). On each clock tick, the orchestrator acts as a state coordinator executing two phases:


Verification & Execution

We can verify our local orchestration layer by establishing a mock task flow, loading it into our core engine, and instantiating the loop handler:

if __name__ == "__main__":
    # 1. Instantiate the logical DAG orchestrator wrapper
    dag = AirflowDAG("Sample_Orchestration_DAG")

    # 2. Construct a functional workload callback payload
    def my_task():
        print("Task executed!")

    # 3. Create a task defined to trigger every 5 clock cycles
    task = Task("My_First_Principles_Task", my_task, schedule="5 * * * *")
    dag.add_task(task)

    # 4. Bootstrap and run the custom processing heartbeat
    scheduler = Scheduler(dag)
    scheduler.start()

Expected Output Log Output

When you run this script directly in your terminal, you will see the system tick forward, dynamically scheduling and running your tasks exactly as the temporal interval demands:

Scheduler started...
Task My_First_Principles_Task scheduled
Executing Task: My_First_Principles_Task
Task executed!


Future Framework Roadmap

While this structural pattern establishes the foundational clock mechanics of an enterprise data pipeline worker, true production DAG orchestration requires managing asymmetric task dependencies.

To expand this framework into a production-grade emulator, our project development registry targets the following architectural additions:

Connect with Amin Boulouma Official