# Bookstore — Part 12 ch.07 "ML pipelines and workflows": the recommender
# pipeline as an Argo Workflows `WorkflowTemplate` that runs the
# train -> eval -> register -> promote DAG. The first artifact of Part 12
# that *orchestrates* the per-chapter pieces (X3a-c) into ONE pipeline.
#
# !!! CRD-INTRINSIC DRY-RUN (identical precedent to raw-manifests/51-/70-/83-,
#     argocd/, operators/, chaos/, ml/batch/, ml/serve/recommender-inference-
#     service.yaml) !!!
#   `WorkflowTemplate` is an Argo Workflows CRD
#   (argoproj.io/v1alpha1). WITHOUT Argo Workflows installed a client dry-run
#   prints:
#     no matches for kind "WorkflowTemplate" in version "argoproj.io/v1alpha1"
#   EXPECTED and SCHEMA-CORRECT — install Argo Workflows first (Part 12 ch.07
#   Hands-on: pinned Helm `argo/argo-workflows --version 0.42.0` -> ns `argo`).
#   Schema verified against argoproj.io/v1alpha1 (entrypoint / templates /
#   dag / inputs.parameters / podSpecPatch).
#
# WHAT IT DOES (the four steps of the MLOps loop, on kind)
#   1. train    — runs `bookstore/recommender-train:dev` (the SAME image as
#                 ../train/recommender-train-job.yaml) and writes `model.joblib`
#                 to the shared PVC `recommender-model`.
#   2. eval     — runs a tiny Python evaluator INSIDE the same train image
#                 (sklearn + joblib are present) — loads `model.joblib`, checks
#                 a sanity signal (the average top-1 cosine similarity over
#                 the synthetic dataset is non-trivial — proves the model
#                 actually learnt the planted affinities), writes
#                 `metrics.json` next to the model.
#   3. register — runs `kubectl create configmap` (with a small RBAC-scoped
#                 ServiceAccount) to STAMP the model URI + score + timestamp
#                 into a ConfigMap `recommender-model-registry-<workflow>`.
#                 KIND-RUNNABLE PROXY for a real model registry (MLflow / KFP
#                 Model Registry). Honestly marked in the README.
#   4. promote  — runs `kubectl annotate / rollout restart` on the recommender
#                 Deployment so the serving Pod re-reads the model PVC. In
#                 PROD this step is a GitOps commit that bumps the
#                 InferenceService `storageUri` (Part 07 ch.04) — also
#                 honestly marked.
#
# WHY a WorkflowTemplate (and not a one-shot Workflow)
#   `WorkflowTemplate` is the REUSABLE shape: install once, run many times via
#   `argo submit --from workflowtemplate/<name>` OR by reference from a
#   `CronWorkflow` (see recommender-cronworkflow.yaml in this dir) OR from an
#   Argo Events `Sensor` (recommender-sensor.yaml). The one-shot `Workflow`
#   would only support a single run. The teaching artifact is the template;
#   running it is one command.
#
# PSA — every step's pod template carries the restricted SC via
# `podSpecPatch` + each container's `securityContext`. The Argo Workflows
# controller installs into its OWN namespace (`argo`); the workflow Pods this
# template spawns land in `bookstore-ml` (PSA: enforce restricted) under the
# `argo-workflow` ServiceAccount this file creates.
---
# Minimal ServiceAccount + Role + RoleBinding so the WorkflowTemplate's
# `register` and `promote` steps can write a ConfigMap and restart a
# Deployment in the bookstore-ml namespace. Argo Workflows REQUIRES a
# workflow-runner ServiceAccount; we keep its permissions tight (no
# cluster-wide rights, no secrets access, namespace-scoped).
apiVersion: v1
kind: ServiceAccount
metadata:
  name: argo-workflow
  namespace: bookstore-ml
  labels:
    app.kubernetes.io/part-of: bookstore-ml
    app.kubernetes.io/component: ml-pipeline
automountServiceAccountToken: false
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: argo-workflow
  namespace: bookstore-ml
  labels:
    app.kubernetes.io/part-of: bookstore-ml
rules:
  # Argo Workflows controller needs to manage workflow Pods + their logs.
  - apiGroups: [""]
    resources: ["pods", "pods/log"]
    verbs: ["get", "list", "watch", "create", "delete", "patch", "update"]
  # The register step writes a ConfigMap; the promote step reads/annotates
  # the recommender Deployment so the serving Pod re-loads the model PVC.
  - apiGroups: [""]
    resources: ["configmaps"]
    verbs: ["get", "list", "create", "patch", "update"]
  # reads (get/list) and patches (annotate + rollout restart) the Deployment
  # — both are PATCH verbs in the API (update is NOT needed).
  - apiGroups: ["apps"]
    resources: ["deployments"]
    verbs: ["get", "list", "patch"]
  # Argo Workflows uses these for artifact GC + workflow-task results.
  - apiGroups: ["argoproj.io"]
    resources: ["workflowtaskresults", "workflowtasksets"]
    verbs: ["get", "list", "watch", "create", "patch", "update"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: argo-workflow
  namespace: bookstore-ml
  labels:
    app.kubernetes.io/part-of: bookstore-ml
subjects:
  - kind: ServiceAccount
    name: argo-workflow
    namespace: bookstore-ml
roleRef:
  kind: Role
  name: argo-workflow
  apiGroup: rbac.authorization.k8s.io
---
# The WorkflowTemplate: train -> eval -> register -> promote
apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
  name: recommender-pipeline
  namespace: bookstore-ml
  labels:
    app.kubernetes.io/part-of: bookstore-ml
    app.kubernetes.io/component: ml-pipeline
    ml.bookstore/path: argo-workflows
spec:
  # Argo Workflows runs every step's Pod under this SA in bookstore-ml.
  serviceAccountName: argo-workflow
  entrypoint: main
  # ttlStrategy GCs the Workflow object + Pods after success/failure — same
  # discipline as ml/train/recommender-train-job.yaml's ttlSecondsAfterFinished.
  ttlStrategy:
    secondsAfterCompletion: 600
    secondsAfterFailure: 1800
  # Active wall-clock cap so a runaway pipeline doesn't pin the queue.
  activeDeadlineSeconds: 1800
  # Spec-level retry: retry transient failures (1 extra attempt, OnError =
  # don't retry on user-script Failed exits, only on infra/transient errors).
  # train/eval/register are retry-safe; promote opts out at the task level.
  retryStrategy:
    limit: 1
    retryPolicy: OnError
    backoff:
      duration: "30s"
      factor: 2
      maxDuration: "5m"
  # Pipeline-level inputs — sensible defaults; overridable via `argo submit -p`.
  arguments:
    parameters:
      - name: train-image
        value: "bookstore/recommender-train:dev"
      - name: model-pvc
        value: "recommender-model"
      - name: seed
        value: "42"
      - name: top-k
        value: "10"
      - name: min-score
        value: "0.05"           # eval gate: avg top-1 cosine must exceed this
      - name: serve-deployment
        value: "recommender"     # the recommender Deployment to rollout-restart
      # Lineage parameters — threaded through to the register step's
      # ConfigMap stamp. Placeholders are honest defaults; pin to a real
      # `s3://…/dataset.parquet` URI and an `@sha256:<digest>` image ref
      # in prod (Part 12 ch.07/ch.08).
      - name: dataset-uri
        value: "pvc://recommender-model/dataset/"
      - name: image-sha
        value: "bookstore/recommender-train:dev"
  # Restricted PodSpec patch applied to EVERY workflow Pod (train + eval +
  # register + promote). Argo Workflows wraps each step's container in a
  # `main` + `wait` sidecar pair; the patch covers both via pod-level SC, and
  # each container also pins its container-level SC below.
  podSpecPatch: |
    # Argo's wait sidecar requires the SA token; the SA-level
    # automountServiceAccountToken: false is the secure default, overridden
    # per-Pod by the workflow's podSpecPatch.
    automountServiceAccountToken: true
    securityContext:
      runAsNonRoot: true
      runAsUser: 65532
      runAsGroup: 65532
      fsGroup: 65532
      seccompProfile:
        type: RuntimeDefault
  # The shared PVC mount the train + eval steps both use. Argo Workflows
  # supports `volumes` at the workflow level; the per-step container
  # `volumeMounts` opt into it.
  volumes:
    - name: model
      persistentVolumeClaim:
        claimName: "{{workflow.parameters.model-pvc}}"
    - name: scratch
      emptyDir:
        sizeLimit: 64Mi
  templates:
    # ───────────────────────────────────────────────────────────────────────
    # main — the DAG: train -> eval -> register -> promote.
    # Each step is a template defined below; `depends:` wires the order.
    # ───────────────────────────────────────────────────────────────────────
    - name: main
      dag:
        tasks:
          - name: train
            template: train-step
          - name: eval
            template: eval-step
            depends: "train.Succeeded"
            arguments:
              parameters:
                - { name: min-score, value: "{{workflow.parameters.min-score}}" }
          - name: register
            template: register-step
            depends: "eval.Succeeded"
            arguments:
              parameters:
                - { name: score, value: "{{tasks.eval.outputs.parameters.score}}" }
          - name: promote
            template: promote-step
            depends: "register.Succeeded"
            # promote is NOT idempotent: `kubectl annotate` may commit before
            # `rollout restart` fails, leaving a half-promoted state. Opt out
            # of the spec-level retry — task-level limit:0 overrides the spec.
            retryStrategy:
              limit: 0
    # ───────────────────────────────────────────────────────────────────────
    # train — runs the SAME image as ml/train/recommender-train-job.yaml.
    # Writes /workspace/model/model.joblib on the shared PVC.
    # ───────────────────────────────────────────────────────────────────────
    - name: train-step
      container:
        image: "{{workflow.parameters.train-image}}"
        imagePullPolicy: IfNotPresent
        env:
          - name: MODEL_DIR
            value: /workspace/model
          - name: SEED
            value: "{{workflow.parameters.seed}}"
          - name: TOP_K
            value: "{{workflow.parameters.top-k}}"
          - name: N_BOOKS
            value: "200"
          - name: N_CUSTOMERS
            value: "800"
          - name: N_ORDERS
            value: "5000"
        resources:
          requests: { cpu: "250m", memory: "256Mi" }
          limits:   { cpu: "1",    memory: "512Mi" }
        securityContext:
          allowPrivilegeEscalation: false
          readOnlyRootFilesystem: true
          capabilities: { drop: ["ALL"] }
        volumeMounts:
          - { name: model,   mountPath: /workspace/model }
          - { name: scratch, mountPath: /tmp }
    # ───────────────────────────────────────────────────────────────────────
    # eval — loads model.joblib from the SAME PVC the train step wrote,
    # computes the average top-1 cosine similarity over all books, writes
    # /workspace/model/metrics.json, and emits `score` as an output parameter
    # the `register` step reads. Uses the SAME train image (sklearn + joblib
    # baked in — no second image to build). Falls below `min-score` ->
    # exits non-zero -> the DAG fails (the eval gate).
    # ───────────────────────────────────────────────────────────────────────
    - name: eval-step
      inputs:
        parameters:
          - name: min-score
      script:
        image: "{{workflow.parameters.train-image}}"
        imagePullPolicy: IfNotPresent
        command: ["python"]
        source: |
          # Tiny evaluator — same image, same joblib schema.
          import json, os, statistics, sys
          import joblib

          model_dir = os.environ.get("MODEL_DIR", "/workspace/model")
          min_score = float(os.environ["MIN_SCORE"])
          artifact = joblib.load(os.path.join(model_dir, "model.joblib"))
          neigh = artifact["neighbours"]
          # Average top-1 cosine similarity across all books with at least
          # one neighbour. This proves the model learnt SOMETHING above
          # random — for the planted-affinity synthetic dataset the value
          # is well above 0.0 (typically ~0.4 on the defaults).
          top1 = [float(v[0][1]) for v in neigh.values() if v]
          score = statistics.fmean(top1) if top1 else 0.0
          metrics = {
              "kind": artifact["kind"],
              "n_books": artifact["n_books"],
              "n_customers": artifact["n_customers"],
              "n_orders": artifact["n_orders"],
              "top_k": artifact["top_k"],
              "avg_top1_cosine": round(score, 6),
              "min_score_gate": min_score,
              "passed": score >= min_score,
          }
          with open(os.path.join(model_dir, "metrics.json"), "w") as f:
              json.dump(metrics, f, indent=2)
          # Emit the score for the next step via Argo's output-parameter file.
          with open("/tmp/score", "w") as f:
              f.write(f"{score:.6f}")
          print(json.dumps(metrics, indent=2), flush=True)
          if not metrics["passed"]:
              print(f"[eval] FAIL: {score} < gate {min_score}", file=sys.stderr,
                    flush=True)
              sys.exit(2)
        env:
          - name: MODEL_DIR
            value: /workspace/model
          - name: MIN_SCORE
            value: "{{inputs.parameters.min-score}}"
        resources:
          requests: { cpu: "100m", memory: "256Mi" }
          limits:   { cpu: "500m", memory: "512Mi" }
        securityContext:
          allowPrivilegeEscalation: false
          readOnlyRootFilesystem: true
          capabilities: { drop: ["ALL"] }
        volumeMounts:
          - { name: model,   mountPath: /workspace/model, readOnly: true }
          - { name: scratch, mountPath: /tmp }
      outputs:
        parameters:
          - name: score
            valueFrom: { path: /tmp/score }
    # ───────────────────────────────────────────────────────────────────────
    # register — KIND-RUNNABLE PROXY for a real model registry. Writes a
    # ConfigMap into bookstore-ml stamping (model URI, score, timestamp,
    # workflow name). In PROD this is an MLflow `log_artifact` call or a
    # KFP Model Registry record — see Part 12 ch.07 + ch.08 for the swap.
    # Uses the official kubectl image; the SA created above has create
    # rights on configmaps in this namespace ONLY.
    # ───────────────────────────────────────────────────────────────────────
    - name: register-step
      inputs:
        parameters:
          - name: score
      script:
        image: bitnami/kubectl:1.31.1
        imagePullPolicy: IfNotPresent
        command: ["/bin/sh"]
        source: |
          set -eu
          CM="recommender-model-registry-{{workflow.name}}"
          MODEL_URI="pvc://{{workflow.parameters.model-pvc}}/model.joblib"
          SCORE="{{inputs.parameters.score}}"
          NOW="$(date -u +%Y-%m-%dT%H:%M:%SZ)"
          # Write the registry stamp as a ConfigMap. Honest proxy — see
          # README + Part 12 ch.07/ch.08 for the real-registry swap.
          kubectl -n bookstore-ml create configmap "$CM" \
            --from-literal=model_uri="$MODEL_URI" \
            --from-literal=score="$SCORE" \
            --from-literal=registered_at="$NOW" \
            --from-literal=workflow="{{workflow.name}}" \
            --from-literal=dataset_uri="{{workflow.parameters.dataset-uri}}" \
            --from-literal=image_sha="{{workflow.parameters.image-sha}}" \
            --dry-run=client -o yaml \
            | kubectl -n bookstore-ml apply -f -
          kubectl -n bookstore-ml label configmap "$CM" \
            app.kubernetes.io/part-of=bookstore-ml \
            app.kubernetes.io/component=recommender-model-registry \
            ml.bookstore/score="$SCORE" --overwrite
          echo "[register] stamped $CM uri=$MODEL_URI score=$SCORE at=$NOW"
        resources:
          requests: { cpu: "50m",  memory: "64Mi" }
          limits:   { cpu: "200m", memory: "128Mi" }
        securityContext:
          allowPrivilegeEscalation: false
          readOnlyRootFilesystem: true
          capabilities: { drop: ["ALL"] }
        volumeMounts:
          - { name: scratch, mountPath: /tmp }
    # ───────────────────────────────────────────────────────────────────────
    # promote — KIND-RUNNABLE PROXY for the real "promote" step. Annotates
    # the recommender Deployment so a rollout-restart picks up the new
    # model.joblib on the SAME PVC (the serving Pod re-loads it on start).
    # In PROD this step is a GitOps commit that bumps the InferenceService
    # `storageUri` to a NEW versioned URI; Argo CD reconciles; KServe
    # shifts traffic via canaryTrafficPercent (Part 12 ch.06). Honestly
    # marked — see README + Part 12 ch.07 + Part 07 ch.04.
    # ───────────────────────────────────────────────────────────────────────
    - name: promote-step
      script:
        image: bitnami/kubectl:1.31.1
        imagePullPolicy: IfNotPresent
        command: ["/bin/sh"]
        source: |
          set -eu
          DEPLOY="{{workflow.parameters.serve-deployment}}"
          NOW="$(date -u +%Y-%m-%dT%H:%M:%SZ)"
          # If the serving Deployment exists in this namespace, annotate it
          # with the workflow name + timestamp and trigger a rolling restart
          # so the serving Pod re-reads model.joblib from the PVC. If it
          # doesn't exist (the user hasn't applied ml/serve/ yet) print a
          # message and exit 0 — `promote` is documentation-friendly.
          if kubectl -n bookstore-ml get deploy "$DEPLOY" >/dev/null 2>&1; then
            kubectl -n bookstore-ml annotate deploy "$DEPLOY" \
              ml.bookstore/promoted-from="{{workflow.name}}" \
              ml.bookstore/promoted-at="$NOW" --overwrite
            kubectl -n bookstore-ml rollout restart deploy "$DEPLOY"
            kubectl -n bookstore-ml rollout status  deploy "$DEPLOY" --timeout=120s
            echo "[promote] $DEPLOY restarted with new model (workflow {{workflow.name}})"
          else
            echo "[promote] deploy/$DEPLOY not found — skipping. In prod this"
            echo "[promote] step is a GitOps commit that bumps the"
            echo "[promote] InferenceService storageUri (Part 07 ch.04 +"
            echo "[promote] Part 12 ch.06)."
          fi
        resources:
          requests: { cpu: "50m",  memory: "64Mi" }
          limits:   { cpu: "200m", memory: "128Mi" }
        securityContext:
          allowPrivilegeEscalation: false
          readOnlyRootFilesystem: true
          capabilities: { drop: ["ALL"] }
        volumeMounts:
          - { name: scratch, mountPath: /tmp }
