-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcli.py
More file actions
120 lines (94 loc) · 4.57 KB
/
cli.py
File metadata and controls
120 lines (94 loc) · 4.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
#!/usr/bin/env python3
"""
kdp — knowledge-dataplane CLI
Commands:
kdp publish — register a data product in the catalog (used in CI step 7)
kdp dagster — sync Dagster asset definitions (used in CI step 8)
kdp health — compute and push a health score for a data product
kdp validate — run contract validation (wraps datacontract CLI)
Usage examples:
kdp publish --descriptor data-product-descriptor.yaml --contract datacontract.yaml
kdp dagster sync --assets dagster/assets.yaml
kdp health --product flight-ops-dp
"""
from __future__ import annotations
import argparse
import sys
from pathlib import Path
def cmd_publish(args: argparse.Namespace) -> int:
"""Register a data product in the catalog."""
from kdp.examples.flight_ops_demo.register import main as register_main
sys.argv = [
"register.py",
"--descriptor", args.descriptor,
"--catalog-url", args.catalog_url,
]
if args.contract:
sys.argv += ["--contract", args.contract]
if args.dbt_manifest:
sys.argv += ["--dbt-manifest", args.dbt_manifest]
register_main()
return 0
def cmd_dagster_sync(args: argparse.Namespace) -> int:
"""Sync YAML asset definitions to the running Dagster instance."""
import os, yaml, requests
dagster_url = args.dagster_url or os.environ.get("DAGSTER_URL", "http://localhost:3000")
token = args.token or os.environ.get("DAGSTER_TOKEN", "")
print(f"Syncing assets from {args.assets} to {dagster_url}")
with open(args.assets) as f:
asset_config = yaml.safe_load(f)
product_name = args.product_name or Path(args.assets).parent.parent.name
print(f" Product: {product_name}")
print(f" Assets: {len(asset_config.get('assets', []))}")
# Dagster picks up changes automatically when the user-code container
# hot-reloads. This command validates the YAML is well-formed and
# logs the sync for audit purposes.
print(" ✓ Asset definitions are valid YAML — Dagster will reload on next poll.")
return 0
def cmd_health(args: argparse.Namespace) -> int:
"""Compute and push a health score for a data product."""
from kdp.core import get_catalog, get_contracts
print(f"Computing health score for: {args.product}")
contracts = get_contracts()
catalog = get_catalog()
result = contracts.lint("datacontract.yaml")
passed = sum(1 for c in result.checks if c.passed)
total = max(len(result.checks), 1)
score = int(passed / total * 100)
catalog.update_health_score(
urn=args.product,
score=score,
breakdown={"contract": score},
)
print(f" ✓ Health score: {score}/100")
return 0
def main() -> None:
parser = argparse.ArgumentParser(
prog="kdp",
description="knowledge-dataplane CLI",
)
sub = parser.add_subparsers(dest="command", required=True)
# ── publish ──────────────────────────────────────────────────────────
p_pub = sub.add_parser("publish", help="Register a data product in the catalog")
p_pub.add_argument("--descriptor", required=True)
p_pub.add_argument("--contract", default=None)
p_pub.add_argument("--dbt-manifest", default=None)
p_pub.add_argument("--catalog-url", default="http://localhost:8585")
p_pub.set_defaults(func=cmd_publish)
# ── dagster ──────────────────────────────────────────────────────────
p_dag = sub.add_parser("dagster", help="Dagster operations")
dag_sub = p_dag.add_subparsers(dest="dagster_command", required=True)
p_sync = dag_sub.add_parser("sync", help="Sync asset definitions")
p_sync.add_argument("--assets", required=True)
p_sync.add_argument("--product-name", default=None)
p_sync.add_argument("--dagster-url", default=None)
p_sync.add_argument("--token", default=None)
p_sync.set_defaults(func=cmd_dagster_sync)
# ── health ───────────────────────────────────────────────────────────
p_health = sub.add_parser("health", help="Compute and push health score")
p_health.add_argument("--product", required=True, help="Product URN or name")
p_health.set_defaults(func=cmd_health)
args = parser.parse_args()
sys.exit(args.func(args))
if __name__ == "__main__":
main()