Agentic ETL: Stop Writing Boring Spark Scripts and Build "Smart" Pipelines with LangGraph
Written by Joseph on January 19, 2026
Let’s be real for a second: Traditional ETL (Extract, Transform, Load) is a massive headache.
You know the drill. You spend days writing a perfect PySpark job. It runs great for a week. Then, at 3:00 AM on a Saturday, a data vendor decides to change a column name from user_id to customerId, or suddenly changes their date format from YYYY-MM-DD to DD/MM/YYYY.
Boom. Your pipeline crashes. Your dashboard breaks. You wake up to angry Slack messages.
In 2026, we don’t do that anymore.
We are entering the era of Agentic Engineering. By combining the raw, distributed computing muscle of Apache Spark with the reasoning brain of LangGraph, you can build pipelines that don’t just move data—they think about it.
At Tribe of Programmers, we believe the future belongs to engineers who build systems that can manage themselves. Here is how you can build your first “Self-Healing” ETL pipeline.
The Dynamic Duo: Why Spark + LangGraph?
If you are building a modern data platform, you need two things: scale and intelligence.
The Muscle (Apache Spark): When you need to process terabytes of data, you can’t use a simple Python script. Spark is the industry standard for distributed data processing. It handles the heavy lifting.
The Brain (LangGraph): This is where the magic happens. Unlike a standard script that runs from top to bottom, LangGraph is a state machine. It allows your program to have memory, loop back on itself, try different strategies, and make decisions based on output.
When you put them together, you get a pipeline that can say: “Hey, this Spark job failed because of a schema mismatch. I’ll just rewrite the transformation logic on the fly and try again.”
The Use Case: Handling “Schema Drift” on Autopilot
Let’s take a real-world scenario. You have a daily job that ingests CSV files from a partner.
Yesterday’s format: id, name, join_date (YYYY-MM-DD)
Today’s format: id, full_name, join_date (DD-Mon-YY)
A standard Airflow DAG would fail instantly. An Agentic Pipeline will see the error, analyze the new file header, rewrite the PySpark schema definition, and successfully load the data—all without waking you up.
Here is the step-by-step blueprint to build it.
The Build Guide: Your First Agentic ETL Pipeline
We’ll assume you have a standard AI dev environment set up (like WSL2 with Docker) and access to an LLM API (like OpenAI or Anthropic).
Step 1: Install the Essentials
We need the core libraries for our brain and muscle.
pip install langgraph pyspark langchain-openai pandasStep 2: Define the “Shared Memory” (The State)
In LangGraph, the agents need a shared notebook to keep track of what’s happening. We call this the State. It will hold our goal, the code the agent writes, the results of running that code, and a log of any errors.
from typing import TypedDict, Optional, Anyfrom langchain_openai import ChatOpenAIfrom pyspark.sql import SparkSession
# --- Configuration ---# Initialize Spark (The Muscle)spark = SparkSession.builder \ .appName("AgenticETLDemo") \ .getOrCreate()
# Initialize the LLM (The Brain)# Note: Make sure OPENAI_API_KEY is in your environment variablesllm = ChatOpenAI(model="gpt-5.2-pro", temperature=0)
# --- The State Definition ---class ETLState(TypedDict): """The shared memory for our agentic workflow.""" goal: str # What are we trying to do? (e.g., "Load bad_dates.csv") schema_hint: str # Information about the data structure pyspark_code: Optional[str] # The code generated by the LLM execution_result: Any # The output data or success message error_log: Optional[str]# Track failures to learn from them attempts: int # To prevent infinite loopsStep 3: Build the “Thinking” Node (The Generator)
This node is the architect. It looks at the goal and any previous errors, then prompts the LLM to write the correct PySpark code.
Notice how the prompt changes if there was a previous error. This is the key to self-healing.
def code_generator_node(state: ETLState): """Node 1: The Brain writes the code.""" print(f"\n--- [Brain] Generating Code (Attempt {state['attempts'] + 1}) ---")
# Base prompt prompt = f""" You are an expert PySpark data engineer. Your Goal: {state['goal']} Schema Hint: {state['schema_hint']}
Write a complete, runnable block of PySpark code that accomplishes this goal. The code must result in a Spark DataFrame named 'final_df'. ONLY return the executable Python code. No markdown formatting. """
# If we failed before, add the error to the prompt so the LLM can fix it. if state['error_log']: prompt += f"\n\nIMPORTANT: The previous code failed with this error:\n{state['error_log']}\nFix the code to handle this error."
# Call the LLM response = llm.invoke(prompt) cleaned_code = response.content.replace("python", "").replace("", "").strip()
return { "pyspark_code": cleaned_code, "attempts": state['attempts'] + 1 }Step 4: Build the “Doing” Node (The Executor)
This node takes the code from the generator and tries to run it on the Spark cluster.
Warning: In a real production environment, running exec() on AI-generated code is risky. You would add guardrails here, like using a separate “Judge” agent to review the code before execution. Let’s keep it simple for this demo.
import pandas as pdfrom io import StringIO
def execution_node(state: ETLState): """Node 2: The Muscle runs the code.""" print("\n--- [Muscle] Executing Spark Code ---") try: # Create dummy data for this demo to simulate a messy CSV csv_data = """id,date_string,value 1,2023-01-15,100 2,15/01/2023,200 3,Jan 15 2023,300"""
# We define the execution context, giving the code access to 'spark' and our data local_vars = {"spark": spark, "csv_data": csv_data}
# DANGER ZONE: Executing generated code. # The generated code should create a 'final_df' variable. exec(state['pyspark_code'], globals(), local_vars)
result_df = local_vars.get("final_df")
if result_df is None: raise ValueError("Code ran but did not generate 'final_df'")
print("SUCCESS: Data transformed successfully!") result_df.show()
return {"execution_result": "Success", "error_log": None}
except Exception as e: print(f"FAILURE: Spark job failed with error: {e}") # Pass the error back to the state so the generator can fix it return {"error_log": str(e), "execution_result": None}Step 5: Wire the Graph (The Logic Loop)
Now we connect the nodes. This is where we define the conditional logic.
Standard Path: Generator -> Executor.
Conditional Path: After execution, check the status. If it failed, loop back to the Generator. If it succeeded, end.
from langgraph.graph import StateGraph, END
# Initialize the graphworkflow = StateGraph(ETLState)
# Add nodesworkflow.add_node("generator", code_generator_node)workflow.add_node("executor", execution_node)
# Define flowworkflow.set_entry_point("generator")workflow.add_edge("generator", "executor")
# Define the conditional routerdef router(state: ETLState): # Stop after 3 attempts to prevent infinite loops costing you API money if state["attempts"] >= 3: print("--- Max attempts reached. Stopping. ---") return END
if state["error_log"]: # If there is an error, go back to the drawing board return "generator" else: # If success, we are done return END
workflow.add_conditional_edges("executor", router)
# Compile the graphapp = workflow.compile()Step 6: Run the Pipeline!
Let’s kick it off. We’ll give it a tricky goal: parsing a CSV that has mixed date formats. A normal Spark job would choke on this. Let’s see our agent handle it.
# Initial State definitioninitial_state = { "goal": "Read the raw CSV data provided in the variable 'csv_data'. It contains a column 'date_string' with mixed date formats. Standardize this column into a proper Spark DateType column named 'clean_date' and return the dataframe as 'final_df'.", "schema_hint": "CSV with headers: id,date_string,value", "error_log": None, "attempts": 0, "pyspark_code": None, "execution_result": None}
print("Starting Agentic ETL Workflow...")# Run the graphfinal_state = app.invoke(initial_state)
print("\n--- Workflow Finished ---")if final_state["execution_result"] == "Success": print("Final Status: SUCCESS") # In a real app, you'd now write this data to your data lake (Delta Lake, Snowflake, etc.)else: print("Final Status: FAILED after max attempts.")When you run this, you will likely see the agent fail on the first attempt as it tries a simple date parse. It will catch the error, realize the dates are mixed, rewrite the code using a more complex coalesce or when/otherwise strategy in Spark, and succeed on the second or third try—automatically.
The Verdict: The Future is Agentic
What we just built wasn’t just a cool Python script. It’s a fundamental shift in how we approach data engineering.
Traditional coding is about telling a computer exactly what to do. Agentic ETL is about telling the computer what outcome you want and giving it the autonomy to figure out the “how,” and more importantly, fix itself when the “how” changes.
By mastering tools like Spark and LangGraph, you aren’t just building pipelines that break less. You are freeing yourself from the 3 AM PagerDuty calls so you can focus on building the next big thing.