Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions test/ux/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@

import os
import uuid
import pytest
from dataclasses import dataclass
from enum import Enum
from typing import List, Optional

import pytest
from omegaconf import OmegaConf


Expand Down Expand Up @@ -41,6 +41,7 @@ def _load_config(rootdir=None):
if rootdir:
candidates.append(os.path.join(str(rootdir), "ux_test_config.yaml"))
candidates.append(os.path.join(os.path.dirname(__file__), "ux_test_config.yaml"))

for path in candidates:
if os.path.exists(path):
return OmegaConf.load(path)
Expand All @@ -57,10 +58,12 @@ def _enabled_backends(cfg):
backend_name = compute.get("backend") or None
image = compute.get("image") or None
decospec = None

if backend_name and image:
decospec = "%s:image=%s" % (backend_name, image)
decospec = f"{backend_name}:image={image}"
elif backend_name:
decospec = backend_name

return [
{
"name": "default",
Expand Down Expand Up @@ -258,7 +261,7 @@ def pytest_generate_tests(metafunc):
params.append(pytest.param(mode, marks=marks))
else:
params.append(pytest.param(b, marks=marks))
ids.append("%s-%s" % (b["name"], mode))
ids.append(f"{b['name']}-{mode}")

if needs_exec and needs_backend:
metafunc.parametrize(["exec_mode", "backend"], params, ids=ids)
Expand Down
1 change: 1 addition & 0 deletions test/ux/core/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def _set_devstack_env():
os.environ.setdefault("AWS_ENDPOINT_URL_BATCH", "http://localhost:8000")
os.environ.setdefault("AWS_ENDPOINT_URL_SFN", "http://localhost:8082")
os.environ.setdefault("AWS_ENDPOINT_URL_DYNAMODB", "http://localhost:8765")

# EventBridge stub: handles the schedule() call from the SFN deployer.
# The stub returns ResourceNotFoundException for DisableRule (ignored by
# EventBridgeClient._disable) so that deploying unscheduled flows works.
Expand Down
163 changes: 66 additions & 97 deletions test/ux/core/test_airflow_compilation.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,74 +9,18 @@
"""

import ast
import json
import os
import subprocess
import sys
import tempfile
import os

import pytest

pytestmark = [pytest.mark.airflow_compilation]


def _get_compile_env():
"""Get environment variables for compilation-only tests."""
env = os.environ.copy()
env["METAFLOW_DEFAULT_METADATA"] = "local"
return env


def _compile_flow_to_dag(flow_path, **extra_tl_args):
"""Compile a flow to an Airflow DAG Python file."""
from .test_utils import _resolve_flow_path

full_path = _resolve_flow_path(flow_path)

with tempfile.NamedTemporaryFile(suffix=".py", delete=False, mode="w") as f:
dag_file_path = f.name

cmd = [sys.executable, full_path, "--no-pylint"]
for k, v in extra_tl_args.items():
if v is not None:
cmd.extend([f"--{k.replace('_', '-')}", str(v)])
cmd.extend(["airflow", "create", dag_file_path])

try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=30,
env=_get_compile_env(),
)
if result.returncode != 0:
# Clean up on failure
try:
os.unlink(dag_file_path)
except OSError:
pass
stderr = result.stderr or ""
stdout = result.stdout or ""
if "No such command" in stderr or "No such command" in stdout:
pytest.skip(
"airflow CLI not available (extension may override plugins)"
)
if "ConnectionRefusedError" in stderr or "ConnectionError" in stderr:
pytest.skip("Airflow backend not configured (connection refused)")
if "is not supported" in stderr:
pytest.skip(f"Feature not supported by Airflow: {stderr.strip()}")
pytest.fail(f"Compilation failed:\nstderr: {stderr}\nstdout: {stdout}")

with open(dag_file_path, "r") as f:
dag_source = f.read()

return dag_source, dag_file_path
except Exception:
try:
os.unlink(dag_file_path)
except OSError:
pass
raise
# ---------------------------------------------------------------------------
# Core Validation Logic
# ---------------------------------------------------------------------------


def _validate_dag_source(dag_source, dag_file_path=None):
Expand Down Expand Up @@ -132,59 +76,83 @@ def _validate_dag_source(dag_source, dag_file_path=None):


# ---------------------------------------------------------------------------
# Tests
# Fixtures
# ---------------------------------------------------------------------------


@pytest.fixture
def compile_and_validate():
"""Compile a flow to an Airflow DAG, validate it, and clean up the tempfile."""
def compile_and_validate(tmp_path, monkeypatch):
"""
Factory fixture to compile a flow to an Airflow DAG and validate it.
Automatically handles environment variables and temporary file cleanup.
"""
# Ensure compilation-only environment variables are set safely
monkeypatch.setenv("METAFLOW_DEFAULT_METADATA", "local")

def _impl(flow_path, **extra_tl_args):
dag_source, dag_file_path = _compile_flow_to_dag(flow_path, **extra_tl_args)
try:
result = _validate_dag_source(dag_source, dag_file_path)
assert (
result["result"] == "OK"
), f"Validation failed: {result.get('diagnostics')}"
return dag_source
finally:
try:
os.unlink(dag_file_path)
except OSError:
pass

return _impl

from .test_utils import _resolve_flow_path

def test_linear_flow(compile_and_validate):
"""Simple start->step->end flow compiles to valid Airflow DAG."""
compile_and_validate("basic/helloworld.py")
full_path = _resolve_flow_path(flow_path)
dag_file_path = tmp_path / "compiled_dag.py"

cmd = [sys.executable, full_path, "--no-pylint"]
for k, v in extra_tl_args.items():
if v is not None:
cmd.extend([f"--{k.replace('_', '-')}", str(v)])
cmd.extend(["airflow", "create", str(dag_file_path)])

def test_branch_flow(compile_and_validate):
"""Parallel branch flow compiles to valid Airflow DAG."""
compile_and_validate("dag/branch_flow.py")
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=30,
)

if result.returncode != 0:
stderr = result.stderr or ""
stdout = result.stdout or ""
if "No such command" in stderr or "No such command" in stdout:
pytest.skip(
"airflow CLI not available (extension may override plugins)"
)
if "ConnectionRefusedError" in stderr or "ConnectionError" in stderr:
pytest.skip("Airflow backend not configured (connection refused)")
if "is not supported" in stderr:
pytest.skip(f"Feature not supported by Airflow: {stderr.strip()}")
pytest.fail(f"Compilation failed:\nstderr: {stderr}\nstdout: {stdout}")

def test_foreach_flow(compile_and_validate):
"""Foreach flow compiles to valid Airflow DAG."""
compile_and_validate("dag/foreach_flow.py")
# Read the generated DAG and validate it
dag_source = dag_file_path.read_text()
validation = _validate_dag_source(dag_source, str(dag_file_path))

assert (
validation["result"] == "OK"
), f"Validation failed: {validation.get('diagnostics')}"
return dag_source

def test_retry_flow(compile_and_validate):
"""Flow with @retry compiles to valid Airflow DAG."""
compile_and_validate("basic/retry_flow.py")
return _impl


def test_resources_flow(compile_and_validate):
"""Flow with @resources compiles to valid Airflow DAG."""
compile_and_validate("basic/resources_flow.py")
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------


def test_schedule_flow(compile_and_validate):
"""Flow with @schedule compiles to valid Airflow DAG."""
compile_and_validate("lifecycle/schedule_flow.py")
@pytest.mark.parametrize(
"flow_path",
[
"basic/helloworld.py",
"dag/branch_flow.py",
"dag/foreach_flow.py",
"basic/retry_flow.py",
"basic/resources_flow.py",
"lifecycle/schedule_flow.py",
],
ids=["linear", "branch", "foreach", "retry", "resources", "schedule"],
)
def test_airflow_dag_compilation(compile_and_validate, flow_path):
"""Core Metaflow flow patterns compile to structurally valid Airflow DAGs."""
compile_and_validate(flow_path)


# ---------------------------------------------------------------------------
Expand All @@ -195,6 +163,7 @@ def test_schedule_flow(compile_and_validate):
def test_tags_are_list_not_tuple(compile_and_validate):
"""DAG tags must be a list, not a tuple (Airflow rejects tuples)."""
dag_source = compile_and_validate("basic/helloworld.py")

# Check that tags assignment uses list syntax
tree = ast.parse(dag_source)
for node in ast.walk(tree):
Expand Down
76 changes: 47 additions & 29 deletions test/ux/core/test_argo_compilation.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
import pytest
from metaflow import Deployer

from .test_utils import _resolve_flow_path, prepare_runner_deployer_args

pytestmark = [pytest.mark.argo_compilation, pytest.mark.scheduler_only]


# ---------------------------------------------------------------------------
# Helpers and Assertion Callbacks
# ---------------------------------------------------------------------------


def _find_duplicate_task_names(workflow_template):
duplicates = {}
for template in workflow_template.get("spec", {}).get("templates", []):
Expand All @@ -18,6 +26,14 @@ def _find_duplicate_task_names(workflow_template):
return duplicates


def _assert_only_json_structure(workflow_template, deployed_flow_name):
"""Verify the foundational structure of the generated Argo WorkflowTemplate."""
assert workflow_template is not None
assert workflow_template["kind"] == "WorkflowTemplate"
assert workflow_template["metadata"]["name"] == deployed_flow_name
assert workflow_template["spec"]["templates"]


def _container_template_for_step(workflow_template, step_name):
for template in workflow_template.get("spec", {}).get("templates", []):
annotations = template.get("metadata", {}).get("annotations", {})
Expand All @@ -39,57 +55,59 @@ def test_argo_only_json_exposes_workflow_template(
if scheduler_config.scheduler_type != "argo-workflows":
pytest.skip("Argo compilation tests require the argo-workflows scheduler")
Comment thread
agsaru marked this conversation as resolved.
Outdated

from metaflow import Deployer

from .test_utils import _resolve_flow_path, prepare_runner_deployer_args

deployed_flow = (
Deployer(
flow_file=_resolve_flow_path("basic/helloworld.py"),
show_output=False,
**prepare_runner_deployer_args({"decospecs": decospecs}),
)
.argo_workflows()
.create(
only_json=True,
tags=tag + ["test_argo_only_json_exposes_workflow_template"],
**(scheduler_config.deploy_args or {}),
)
)

workflow_template = deployed_flow.workflow_template
def _assert_deduplicated_task_names(workflow_template, deployed_flow_name):
"""Verify that complex DAG topologies do not produce duplicate task names in Argo."""
assert workflow_template is not None
assert workflow_template["kind"] == "WorkflowTemplate"
assert workflow_template["metadata"]["name"] == deployed_flow.name
assert workflow_template["spec"]["templates"]
assert _find_duplicate_task_names(workflow_template) == {}


def test_foreach_split_switch_join_task_names_are_deduplicated(
exec_mode, decospecs, tag, scheduler_config
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------


@pytest.mark.parametrize(
"flow_name, test_suffix, assertion_fn",
[
pytest.param(
"basic/helloworld.py",
"only_json_exposes_workflow_template",
_assert_only_json_structure,
id="only_json_structure",
),
pytest.param(
"dag/foreach_split_switch_dedup_flow.py",
"foreach_split_switch_dedup",
_assert_deduplicated_task_names,
id="task_name_deduplication",
),
],
)
def test_argo_compilation_behaviors(
exec_mode, decospecs, tag, scheduler_config, flow_name, test_suffix, assertion_fn
):
"""Parametrized test covering Argo JSON compilation outputs and structural integrity."""
if exec_mode != "deployer":
pytest.skip("Argo compilation tests require deployer mode")
if scheduler_config.scheduler_type != "argo-workflows":
pytest.skip("Argo compilation tests require the argo-workflows scheduler")

from metaflow import Deployer

from .test_utils import _resolve_flow_path, prepare_runner_deployer_args

deployed_flow = (
Deployer(
flow_file=_resolve_flow_path("dag/foreach_split_switch_dedup_flow.py"),
flow_file=_resolve_flow_path(flow_name),
show_output=False,
**prepare_runner_deployer_args({"decospecs": decospecs}),
)
.argo_workflows()
.create(
only_json=True,
tags=tag + ["test_argo_foreach_split_switch_dedup"],
tags=tag + [f"test_argo_{test_suffix}"],
**(scheduler_config.deploy_args or {}),
)
)

assertion_fn(deployed_flow.workflow_template, deployed_flow.name)
workflow_template = deployed_flow.workflow_template
assert workflow_template is not None
assert _find_duplicate_task_names(workflow_template) == {}
Comment thread
greptile-apps[bot] marked this conversation as resolved.
Outdated
Expand Down
Loading
Loading