diff --git a/.github/instructions/ziginit.instructions.md b/.github/instructions/ziginit.instructions.md index a3a9dd9..c11efbb 100644 --- a/.github/instructions/ziginit.instructions.md +++ b/.github/instructions/ziginit.instructions.md @@ -14,6 +14,8 @@ applyTo: "tools/ziginit/**/*.zig" ## 错误处理 - **系统调用返回值必须检查**,不得在业务模块中用 `_ =` 静默丢弃 +- **CLI 命令失败必须 `exit(1)`**:`sendCommand` 返回 `null`(连接失败)或 `result.isOk() == false` 时,CLI 必须展示 `result.msgSlice()` 错误消息并 `std.process.exit(1)`——脚本和 CI 依赖退出码判断成功/失败 +- **IPC 响应必须携带错误原因**:`handleCommand` 返回 `CommandResult`,失败路径使用 `R.fail(code, fmt, args)` 附带可读错误消息,客户端直接展示给用户。响应线格式:`[2B status LE][2B msg_len LE][msg]` - 所有"关闭/释放/best-effort"类操作已封装在 `posix.zig` 的深封装函数中(如 `closeFd`、`unlinkFile`、`writeOnce`、`syncFd`、`reapChildren` 等),业务模块直接调用即可,无需 `_ =` - 需要新增 `_ =` 的场景应优先在 `posix.zig` 中封装为语义明确的函数 - 子进程(fork 后 child)中的关键操作(`setsid`、`dup2`、`chdir`、`open` pid 文件)失败时,必须写 stderr 错误信息(会进入 journal pipe)然后 `_exit(126)` @@ -72,3 +74,15 @@ applyTo: "tools/ziginit/**/*.zig" 5. 确认无孤儿进程、无残留 socket/lock 文件 - **改名/重构类任务**:必须全局搜索旧名称(`grep -r "旧名"`),确认注释、日志、错误提示、argv 显示全部替换完毕 - **信号处理类变更**:手动发送 SIGINT/SIGTERM/SIGKILL 验证,不能只依赖 `quit` 命令的测试路径 + +## 安全与防御性编程 + +- **用户输入必须校验后再使用**:CLI 参数、IPC 消息中的服务名等外部输入,必须先通过 `cfg.findService()` 或长度校验,才能传入 `buildServicePaths` 等内部函数。禁止将未校验的输入直接用于 `@memcpy` 到固定大小缓冲区 +- **固定缓冲区写入前必须做边界检查**:`@memcpy` 到 `[MAX_PATH]u8` 等固定数组前,计算 `total` 并断言 `total < capacity`。路径拼接溢出属于不可恢复的配置/系统错误,应 `@panic` 而非静默截断——静默截断会导致多个路径指向同一文件,引发更隐蔽的故障 +- **禁止静默丢弃数据**:缓冲区满时必须有明确的降级策略(flush 后重试、报错退出、或截断警告),不能 `@min(data.len, avail)` 后默默丢弃超出部分 +- **自引用结构体禁用裸指针**:结构体中不得保存指向外部栈变量的指针(如 `path: [*:0]const u8`),应使用自持缓冲区(如 `path_buf: [MAX_PATH+1]u8`)并用 `@memcpy` 拷贝数据,避免生命周期不匹配导致悬空指针 + +## 资源与流式数据 + +- **轮询循环中避免重复获取/释放资源**:follow 模式等 poll 循环中,如果文件使用 truncate 轮转(而非 rename),FD 应在循环外打开、循环结束后统一关闭,循环内仅用 `fstat` 检查大小变化。文件可能延迟创建时,用 `-1` 标记并每轮重试 open +- **跨 chunk 数据拼接**:流式读取场景(follow 模式、pipe drain)中,`read()` 返回的 chunk 不保证在行边界切分。必须维护 carry buffer,仅在遇到 `\n` 时输出完整行,残留数据保留到下次 read 后拼接 diff --git a/.github/workflows/release-ziginit.yml b/.github/workflows/release-ziginit.yml new file mode 100644 index 0000000..1e9c5a6 --- /dev/null +++ b/.github/workflows/release-ziginit.yml @@ -0,0 +1,112 @@ +name: Release ziginit + +on: + push: + tags: + - "ziginit/v*" + +permissions: + contents: write + +jobs: + release: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + with: + fetch-depth: 0 + + - name: Install Zig + uses: mlugg/setup-zig@v2 + with: + version: 0.16.0 + + - name: Cache Zig build artifacts + uses: actions/cache@v4 + with: + path: | + ~/.cache/zig + tools/ziginit/.zig-cache + key: ziginit-${{ runner.os }}-zig-${{ hashFiles('tools/ziginit/main.zig', 'tools/ziginit/build.zig') }} + restore-keys: | + ziginit-${{ runner.os }}-zig- + + - name: Parse version and channel from tag + id: version + run: | + version="${GITHUB_REF#refs/tags/ziginit/}" + echo "version=${version}" >> "$GITHUB_OUTPUT" + if [[ "$version" =~ -alpha ]]; then + echo "channel=dev" >> "$GITHUB_OUTPUT" + elif [[ "$version" =~ -beta ]]; then + echo "channel=beta" >> "$GITHUB_OUTPUT" + else + echo "channel=prod" >> "$GITHUB_OUTPUT" + fi + + - name: Install UPX (production only) + if: steps.version.outputs.channel == 'prod' + run: sudo apt-get update && sudo apt-get install -y upx + + - name: Cross-compile ziginit + run: | + cd tools/ziginit + channel="${{ steps.version.outputs.channel }}" + if [[ "$channel" == "prod" ]]; then + optimize="ReleaseSmall" + else + optimize="ReleaseSafe" + fi + echo "Channel: ${channel}, Optimize: ${optimize}" + + declare -A targets=( + ["x86_64-linux"]="linux_amd64" + ["aarch64-linux"]="linux_arm64" + ["arm-linux"]="linux_armv7" + ["x86_64-macos"]="darwin_amd64" + ["aarch64-macos"]="darwin_arm64" + ) + mkdir -p ../../release-artifacts + for zig_target in "${!targets[@]}"; do + label="${targets[$zig_target]}" + echo "=== Building ziginit for ${zig_target} (${optimize}) ===" + zig build -Doptimize="${optimize}" -Dtarget="${zig_target}" + if [[ "$channel" == "prod" && "$zig_target" == *-linux ]]; then + upx --best --lzma zig-out/bin/ziginit || true + fi + archive="ziginit_${{ steps.version.outputs.version }}_${label}.tar.gz" + tar -czf "../../release-artifacts/${archive}" -C zig-out/bin ziginit + rm -rf zig-out + done + + - name: Generate checksums + run: | + cd release-artifacts + sha256sum *.tar.gz > checksums.txt + cat checksums.txt + + - name: Generate changelog (production only) + if: steps.version.outputs.channel == 'prod' + uses: orhun/git-cliff-action@v4 + with: + config: cliff.toml + args: --latest --strip header + env: + OUTPUT: CHANGES.md + + - name: Create GitHub Release + env: + GH_TOKEN: ${{ secrets.RELEASER_TOKEN }} + run: | + prerelease="" + notes_arg="--generate-notes" + if [[ "${{ steps.version.outputs.channel }}" != "prod" ]]; then + prerelease="--prerelease" + else + notes_arg="--notes-file CHANGES.md" + fi + gh release create "${{ github.ref_name }}" \ + --title "ziginit ${{ steps.version.outputs.version }}" \ + $notes_arg \ + $prerelease \ + release-artifacts/* diff --git a/tools/ziginit/README.md b/tools/ziginit/README.md index 9d5cd36..74ddd14 100644 --- a/tools/ziginit/README.md +++ b/tools/ziginit/README.md @@ -1,18 +1,18 @@ # ziginit — 轻量级服务管理器 -嵌入式 Linux 设备的简化版 systemd,用 Zig 编写,~2900 行(9 个模块),纯 POSIX API,零外部依赖。 +嵌入式 Linux 设备的简化版 systemd,用 Zig 编写,~3600 行(8 个模块),纯 POSIX API,零外部依赖。 ## 核心特性 -| 特性 | 说明 | -| --------------------- | ----------------------------------------------------- | -| **多服务管理** | JSON 配置文件定义多个服务,统一管理生命周期 | -| **动态增删改查** | `list/reload/add/rm` 运行时管理 service,无需停机 | -| **幂等启动** | flock 文件锁保证单实例,重复启动安全退出 | -| **看门狗** | 自动检测崩溃服务并重启,可配置检查间隔 | -| **Supervisor 自保护** | daemon 崩溃后自动重启,指数退避(1s→30s),稳定后重置 | -| **日志托管** | 统一 journal 日志:所有服务 stdout/stderr 收集到一个文件,带时间戳和服务名前缀,10MB 自动轮转 | -| **进程隔离** | fork/setsid/exec,每服务独立会话 | +| 特性 | 说明 | +| --------------------- | ------------------------------------------------------------------------------------------- | +| **多服务管理** | JSON 配置文件定义多个服务,统一管理生命周期 | +| **动态增删改查** | `list/reload/add/rm` 运行时管理 service,无需停机 | +| **幂等启动** | flock 文件锁保证单实例,重复启动安全退出 | +| **看门狗** | 自动检测崩溃服务并重启,可配置检查间隔 | +| **Supervisor 自保护** | daemon 崩溃后自动重启,指数退避(1s→30s),稳定后重置 | +| **日志托管** | 每服务独立日志文件(`{name}.log`),stdout/stderr 通过 dup2 直写,daemon 自动 truncate 轮转 | +| **进程隔离** | fork/setsid/exec,每服务独立会话 | ## 模块结构 @@ -25,16 +25,16 @@ tools/ziginit/ ├── service.zig — 服务生命周期:start/stop/restart、flock 存活检测 ├── server.zig — Daemon 事件循环、Supervisor 双向守护、看门狗 ├── protocol.zig — IPC 协议:action 常量、socket 地址、命令收发 -├── journal.zig — Pipe 收集、journal 写入/轮转/tail 读取 +├── journal.zig — 日志读取:tail/follow/轮转(per-service log file) └── build.zig — 构建脚本 ``` **posix.zig 职责**:所有 C 函数调用和 POSIX 类型的唯一出口。其他模块通过 `const c = @import("posix.zig")` 引入,不直接接触 `std.c`。提供两层 API: -| 层级 | 示例 | 说明 | -|------|------|------| -| 原始转发 | `c.fork()`, `c.execve()`, `c.kill()` | 调用者需要完整控制返回值 | -| 深封装 | `c.openRead()→?fd`, `c.closeFd()→void`, `c.writeOnce()→void`, `c.readLoop()→usize` | 吸收返回值检查 / 循环读写语义,消除样板代码 | +| 层级 | 示例 | 说明 | +| -------- | ---------------------------------------------------------------------------------- | ------------------------------------------- | +| 原始转发 | `c.fork()`, `c.execve()`, `c.kill()` | 调用者需要完整控制返回值 | +| 深封装 | `c.openRead()→?fd`, `c.closeFd()→void`, `c.writeOnce()→void`, `c.readLoop()→usize` | 吸收返回值检查 / 循环读写语义,消除样板代码 | ## 架构 @@ -80,8 +80,8 @@ graph TD subgraph 子进程 PM -->|fork+exec| SVC1[服务 A] PM -->|fork+exec| SVC2[服务 B] - SVC1 -->|pipe| JOURNAL[ziginit.journal
统一日志] - SVC2 -->|pipe| JOURNAL + SVC1 -->|dup2 stdout/stderr| LOG1[svc-a.log] + SVC2 -->|dup2 stdout/stderr| LOG2[svc-b.log] end style CLI fill:#e0f2fe,stroke:#0284c7 @@ -91,7 +91,7 @@ graph TD style DLOCK fill:#fee2e2,stroke:#dc2626 ``` -`service` 命令启动时先进入 Supervisor 层(父进程),fork 出 Daemon 子进程运行实际事件循环。Daemon 崩溃后 Supervisor 自动重启,正常退出(`quit` 命令)则 Supervisor 也停止。 +`supervisor` 命令启动时先进入 Supervisor 层(父进程),fork 出 Daemon 子进程运行实际事件循环。Daemon 崩溃后 Supervisor 自动重启,正常退出(`quit` 命令)则 Supervisor 也停止。 Daemon 采用 `poll()` 事件循环:接收到 socket 连接时处理命令,超时时执行看门狗巡检。所有服务操作通过 flock 互斥锁序列化,避免竞态。 @@ -105,9 +105,7 @@ sequenceDiagram C->>D: connect(ziginit.sock) C->>D: [1B cmd][63B name] = 64 bytes D->>D: 执行操作 - D->>C: [2B status LE] - Note over D,C: log 命令额外流式传输日志内容 - D-->>C: [日志 payload...] + D->>C: [2B status LE][2B msg_len LE][msg...] ``` **请求格式**:固定 64 字节 @@ -130,13 +128,17 @@ sequenceDiagram | `R` | reload | 重载配置并应用变更 | | `a` | add | 增加或更新一个服务定义 | | `d` | rm | 删除一个服务定义 | -| `l` | log | 查看日志 | | `v` | version | 查询版本 | | `q` | quit | 退出 daemon(协议保留,CLI 未暴露) | -**响应格式**:2 字节 Little-Endian +**响应格式**:`[2B status LE][2B msg_len LE][0~64KB message]` + +| 偏移 | 大小 | 说明 | +| ---- | ------- | --------------------------------------------------- | +| 0-1 | 2 bytes | 状态码(LE):`byte[0] == 0` 成功,`!= 0` 失败 | +| 2-3 | 2 bytes | 消息长度(LE):成功时通常为 0,失败时携带错误原因 | +| 4+ | N bytes | 可选消息体:UTF-8 文本,如 `"unknown service: foo"` | -- `byte[0] == 0` → 成功,`!= 0` → 失败 - status 命令:`byte[1] bit0` = enabled,`bit1` = running - version 命令:`byte[1]` = 版本号 @@ -147,12 +149,15 @@ sequenceDiagram ``` {workdir}/ ├── ziginit.lock # daemon 单实例锁(flock) +├── ziginit.sup.lock # supervisor 单实例锁(flock) ├── ziginit.sock # Unix socket IPC -├── ziginit.journal # 统一日志(所有服务 stdout/stderr,带时间戳) -├── ziginit.journal.1 # 轮转后的旧日志(最多保留 1 份) +├── ziginit.quit # 干净退出标记(临时) +├── daemon.log # daemon 自身日志 +├── svc-a.log # 服务 A 的 stdout/stderr 日志 ├── svc-a.pid # 服务 A 的 PID(子进程持有 flock) ├── svc-a.lock # 服务 A 操作互斥锁 ├── svc-a.check # 存在=禁用看门狗,不存在=启用 +├── svc-b.log ├── svc-b.pid ├── svc-b.lock └── svc-b.check @@ -171,14 +176,12 @@ sequenceDiagram "exec": "/usr/bin/myapp", "args": ["-c", "/etc/myapp.json"], "dir": "/opt/myapp", - "log_file": "/var/log/myapp.log", "enabled": true, "watchdog": 5 }, { "name": "worker", "exec": "/usr/bin/worker", - "log_file": "/var/log/worker.log", "enabled": true, "watchdog": 10 } @@ -186,16 +189,15 @@ sequenceDiagram } ``` -| 字段 | 必需 | 默认值 | 说明 | -| ------------------- | ---- | ------------------ | ----------------------------------------- | -| `workdir` | ❌ | `/var/lib/ziginit` | 运行时文件目录(pid/lock/socket) | -| `name` | ✅ | — | 服务名称,最长 31 字符 | -| `exec` | ✅ | — | 可执行文件路径 | -| `args` | ❌ | `[]` | 命令行参数数组,最多 16 个 | -| `dir` | ❌ | 继承 | 工作目录 | -| `log_file` | ❌ | 无 | 业务程序自管日志字段(init 不依赖该字段) | -| `enabled` | ❌ | `true` | 是否启用该服务 | -| `watchdog` | ❌ | `5` | 看门狗检查间隔(秒),范围 1-3600 | +| 字段 | 必需 | 默认值 | 说明 | +| ---------- | ---- | ------------------ | ------------------------------------- | +| `workdir` | ❌ | `/var/lib/ziginit` | 运行时文件目录(pid/lock/log/socket) | +| `name` | ✅ | — | 服务名称,最长 31 字符 | +| `exec` | ✅ | — | 可执行文件路径 | +| `args` | ❌ | `[]` | 命令行参数数组,最多 16 个 | +| `dir` | ❌ | 继承 | 工作目录 | +| `enabled` | ❌ | `true` | 是否启用该服务 | +| `watchdog` | ❌ | `5` | 看门狗检查间隔(秒),范围 1-3600 | **限制**:最多 16 个服务,路径最长 255 字符。 @@ -223,17 +225,22 @@ zig build -Dtarget=aarch64-linux-musl -Doptimize=ReleaseSmall # ARM64 ### 启动 daemon ```bash -# 使用默认配置 /etc/ziginit.json(推荐) -ziginit service +# 推荐方式:supervisor 管理 daemon(双向守护) +ziginit supervisor --init + +# 仅启动 daemon(无 supervisor 保护,调试用) +ziginit daemon # 指定配置文件 -ziginit -c /path/to/config.json service +ziginit -c /path/to/config.json supervisor --init # 幂等启动,多次调用安全 -ziginit service -# → "daemon already running, exiting" +ziginit supervisor --init +# → "supervisor already running, exiting" ``` +> 配置文件自动搜索顺序:`/etc/ziginit.json` → `~/.config/ziginit/config.json` → 当前目录 `config.json`。`-c` 可覆盖。 + ### 管理服务 ```bash @@ -261,14 +268,28 @@ ziginit start all # 强制杀死 ziginit kill myapp -# 查看日志(从 journal 过滤该服务,最近 64KB) +# 查看日志(直接读取 per-service 日志文件尾部,默认 100 行) ziginit log myapp +# 查看 daemon 自身日志 +ziginit log daemon + +# 查看所有服务 + daemon 日志(带 [name] 前缀) +ziginit log --all + +# 指定行数 +ziginit log -n 50 myapp + +# 实时跟踪(类似 tail -f) +ziginit log -f myapp +ziginit log -f --all + # 可选:临时调大 tail 大小(单位 KB,默认 64,最大 4096) ZIGINIT_LOG_TAIL_KB=256 ziginit log myapp -# 动态新增(同名则更新) +# 动态新增(同名则更新),支持两种输入方式 ziginit add '{"name":"worker-b","exec":"/usr/bin/worker","args":["--mode","b"],"enabled":true,"watchdog":5}' +cat service.json | ziginit add # 动态删除 ziginit rm worker-b @@ -281,14 +302,13 @@ ziginit reload ### 动态变更语义 -- `add `:解析单个 service JSON,**同名即更新(upsert)**,写回配置文件后自动向 daemon 发送 `reload`。 +- `add `:解析单个 service JSON(支持 CLI 参数或 stdin),**同名即更新(upsert)**,写回配置文件后自动向 daemon 发送 `reload`。 - `rm `:从配置中删除 service,写回配置并自动 `reload`;daemon 会停止并清理该 service 的运行时文件。 - `reload`:daemon 重新读取配置并做差异应用: - 新增 service → 若 `enabled=true` 则自动拉起 - 更新 service → 按新配置重启或停用 - 删除 service → 停止并清理 `.pid/.lock/.check` - 运行时限制:`workdir` 不支持热更;若重载时 `workdir` 变化,daemon 会拒绝本次重载。 -- 安全限制:拒绝删除最后一个 service(避免配置被清空导致不可管理)。 ### 版本查询 @@ -306,11 +326,11 @@ graph TD CHECK -->|未运行| DEL[删除 name.check] DEL --> FORK[fork] FORK --> CHILD[子进程] - FORK --> PARENT[父进程:存 pipe 读端] + FORK --> PARENT[父进程] CHILD --> SETSID[setsid 新会话] SETSID --> WPID[写 PID + flock] - WPID --> REDIR[dup2 pipe 写端→stdout/stderr] + WPID --> REDIR[dup2 stdout/stderr→name.log] REDIR --> CHDIR[切换工作目录] CHDIR --> EXEC[execve 执行] @@ -328,7 +348,7 @@ graph TD style EXEC fill:#e0f2fe,stroke:#0284c7 ``` -- **启动**:先获取操作锁,检查是否已运行,然后 fork/setsid/exec 创建独立会话 +- **启动**:先获取操作锁,检查是否已运行,然后 fork/setsid/exec 创建独立会话,stdout/stderr 重定向到 `{name}.log` - **停止**:先创建 `.check` 文件禁用看门狗自动重启,再发送 SIGTERM,等待 5 秒后强制 SIGKILL ## Supervisor 互守护 @@ -418,51 +438,52 @@ Daemon 的 `poll()` 使用所有服务中最小的 `watchdog` 作为超时。每 ## 设计决策 -| 决策 | 理由 | -| ----------------------- | ---------------------------------------------------------- | -| 纯 POSIX C API | 嵌入式设备兼容性,避免 Zig std 高级抽象的平台差异 | -| `posix.zig` 集中封装 | 所有 C 函数调用集中在一个模块,其他模块 `@import("posix.zig")`;方便移植审计,消除 `_ = c.xxx()` 样板代码 | -| flock 存活检测 | 进程崩溃时 OS 自动释放锁,比轮询 PID 更可靠 | -| 固定 64 字节协议 | 简单高效,避免长度前缀/分隔符解析复杂度 | -| 单线程 poll 循环 | 嵌入式场景无需并发,简化状态管理 | -| `.check` 文件禁用看门狗 | 手动 stop 后不会被看门狗重启,start 时自动删除 | -| link_libc + musl | 获取 fork/exec/flock/setsid;musl 目标自动静态链接,零依赖 | -| Supervisor 双进程 | daemon 崩溃时由 supervisor 父进程自动重启,指数退避防风暴 | -| CLOEXEC on lock fd | 防止服务子进程继承 daemon lock,确保 supervisor 重启成功 | -| `c_allocator` | Zig 0.16 下最可靠的分配器,仅配置解析时使用 | -| 配置原子写入 | `tmp + fsync + rename`,避免并发读到半写入 JSON | -| DTO + JSON 序列化写回 | 避免手工字符串拼接,字段演进更安全、可维护 | -| 配置文件锁(`.lock`) | `add/rm` 在锁内重读+写回,避免并发命令丢失更新 | -| socket 连接短重试 | daemon 启停瞬间降低 `cannot connect` 误报 | -| 缩短配置锁持有时间 | 锁仅覆盖读改写,不覆盖 `reload` 网络等待,提升并发吞吐 | -| 统一 journal 日志 | 所有服务 stdout/stderr 通过 pipe 收集到 `ziginit.journal`,带时间戳和服务名前缀,10MB 自动轮转。daemon 重启时已有服务 pipe 断开,watchdog 自动重建 | - -## 日志托管:设计边界与风险 +| 决策 | 理由 | +| ----------------------- | ----------------------------------------------------------------------------------------------------------------- | +| 纯 POSIX C API | 嵌入式设备兼容性,避免 Zig std 高级抽象的平台差异 | +| `posix.zig` 集中封装 | 所有 C 函数调用集中在一个模块,其他模块 `@import("posix.zig")`;方便移植审计,消除 `_ = c.xxx()` 样板代码 | +| flock 存活检测 | 进程崩溃时 OS 自动释放锁,比轮询 PID 更可靠 | +| 固定 64 字节协议 | 简单高效,避免长度前缀/分隔符解析复杂度 | +| 单线程 poll 循环 | 嵌入式场景无需并发,简化状态管理 | +| `.check` 文件禁用看门狗 | 手动 stop 后不会被看门狗重启,start 时自动删除 | +| link_libc + musl | 获取 fork/exec/flock/setsid;musl 目标自动静态链接,零依赖 | +| Supervisor 双进程 | daemon 崩溃时由 supervisor 父进程自动重启,指数退避防风暴 | +| CLOEXEC on lock fd | 防止服务子进程继承 daemon lock,确保 supervisor 重启成功 | +| `c_allocator` | Zig 0.16 下最可靠的分配器,仅配置解析时使用 | +| 配置原子写入 | `tmp + fsync + rename`,避免并发读到半写入 JSON | +| DTO + JSON 序列化写回 | 避免手工字符串拼接,字段演进更安全、可维护 | +| 配置文件锁(`.lock`) | `add/rm` 在锁内重读+写回,避免并发命令丢失更新 | +| socket 连接短重试 | daemon 启停瞬间降低 `cannot connect` 误报 | +| 缩短配置锁持有时间 | 锁仅覆盖读改写,不覆盖 `reload` 网络等待,提升并发吞吐 | +| Per-service 日志文件 | 每服务独立 `{name}.log`,stdout/stderr 通过 `dup2` 直写,daemon 按大小 truncate 轮转。避免 pipe 背压/SIGPIPE 问题 | + +## 日志托管:设计边界 ### 定位 -journal 是**兜底机制**,收集服务的 stdout/stderr 异常输出(崩溃信息、未捕获异常、启动错误)。**不是业务日志通道**——生产环境中服务应通过自己的日志库写文件或发送到远程收集器,不依赖 stdout 输出业务日志。 +per-service 日志是**兜底机制**,收集服务的 stdout/stderr 输出(崩溃信息、未捕获异常、启动错误)。**不是业务日志通道**——生产环境中服务应通过自己的日志库写文件或发送到远程收集器,不依赖 stdout 输出业务日志。 + +### 实现方式 + +每服务独立日志文件(`{workdir}/{name}.log`),子进程启动时通过 `dup2` 将 stdout/stderr 重定向到该文件(`O_APPEND` 模式)。 -### 为什么用 pipe 而不是所有服务直接 append 同一个文件 +与 pipe 集中写入方案相比: -| 对比项 | 直接 append 共享文件 | pipe + 父进程写入(当前方案) | -|--------|----------------------|------------------------------| -| 时间戳/服务名前缀 | ❌ 子进程无法添加元数据 | ✅ 父进程拼装 `YYYY-MM-DDTHH:MM:SS svc[out/err]:` | -| 日志轮转 | ❌ rename 后子进程仍持旧 fd,写入已删除 inode | ✅ 父进程独占 journal fd,rotate 无感知 | -| 大小控制 | ❌ 多进程并发写入,检查与 rotate 之间有竞态窗口 | ✅ 每行写入前精确检查 | -| 长行原子性 | ❌ `O_APPEND` 仅保证 ≤ PIPE_BUF(4KB)原子,超长行交错 | ✅ 行缓冲保证完整行写入 | -| 复杂度 | 低 | 中(pipe 管理 + drain 逻辑) | +| 对比项 | pipe + 父进程写入 | per-service 文件直写(当前方案) | +| --------------- | ------------------------------- | -------------------------------------- | +| daemon 重启影响 | ❌ pipe 读端丢失,SIGPIPE 杀进程 | ✅ 文件 fd 与 daemon 无关,服务不受影响 | +| 背压阻塞 | ❌ pipe 缓冲区满时服务阻塞 | ✅ 文件写入不阻塞 | +| 时间戳/前缀 | ✅ 父进程可添加元数据 | ❌ 纯原始输出,无元数据 | +| 复杂度 | 高(pipe 管理 + drain 循环) | 低(dup2 + truncate 轮转) | ### 已知风险与缓解 -| 风险 | 影响 | 严重性 | 缓解措施 | -|------|------|--------|----------| -| **pipe 背压阻塞** | 内核 pipe 缓冲区(默认 64KB)满时,服务 `write(stdout)` 阻塞 | 低 | 生产环境服务不走 stdout 输出业务日志,仅异常输出,64KB 绑绑有余。如需扩容可 `fcntl(F_SETPIPE_SZ)` | -| **daemon 重启 pipe 断开** | daemon 崩溃重启后旧 pipe 读端丢失,服务写 stdout 收到 SIGPIPE(默认终止进程) | 中 | watchdog 自动重启被 SIGPIPE 终止的服务并重建 pipe。服务可自行忽略 SIGPIPE(`signal(SIGPIPE, SIG_IGN)`)以增强鲁棒性 | -| **单线程 drain 瓶颈** | 多服务同时高频输出时,单线程 poll+drain 可能跟不上 | 低 | 嵌入式场景服务数少(≤16)且 stdout 输出极少。极端场景可考虑独立 log 进程(s6-log 模式) | -| **journal 写入 I/O 阻塞** | 慢存储(SD 卡/NFS)上 journal 写入延迟高,拖慢整个事件循环 | 低 | 单次写入量小(一行日志),嵌入式场景可接受。如磁盘极慢,可将 workdir 放 tmpfs | -| **轮转期间短暂丢失** | rename + reopen 之间的微秒窗口内写入可能失败 | 极低 | 窗口极短且仅影响单行,兜底场景可接受 | -| **时间戳为 UTC** | 嵌入式设备可能无 RTC 或时区配置,显示时间与本地时间不一致 | 低 | UTC 是嵌入式日志通用实践,避免时区歧义 | +| 风险 | 影响 | 严重性 | 缓解措施 | +| ----------------------------- | ------------------------------------------- | ------ | ---------------------------------------------- | +| **日志文件持续增长** | 慢存储上磁盘空间耗尽 | 中 | daemon 定期 truncate 轮转(默认 10MB 阈值) | +| **truncate 轮转丢数据** | truncate 时服务正在写入,尾部内容被截断 | 低 | 仅影响截断时刻的单次 write,兜底场景可接受 | +| **follow 模式 truncate 检测** | `log -f` 跟踪时文件被 truncate,offset 失效 | 低 | `fstat` 检测文件缩小后自动重置 offset | +| **慢存储 I/O** | SD 卡/NFS 上写入延迟高 | 低 | 服务直写不经过 daemon 事件循环,不影响命令响应 | ### 适用场景与不适用场景 @@ -475,8 +496,8 @@ journal 是**兜底机制**,收集服务的 stdout/stderr 异常输出(崩 ❌ 不适用: - 高吞吐业务日志收集(应使用专用日志库 → 文件/syslog/远程) - - 结构化日志分析(journal 是纯文本,无 JSON/索引) - - 长期日志归档(仅保留当前 + 1 份轮转,最多 20MB) + - 结构化日志分析(日志是纯文本,无 JSON/索引) + - 长期日志归档(truncate 轮转仅保留最近数据) ``` ## 二进制大小 diff --git a/tools/ziginit/config.zig b/tools/ziginit/config.zig index ff76215..be55901 100644 --- a/tools/ziginit/config.zig +++ b/tools/ziginit/config.zig @@ -75,11 +75,19 @@ pub const MAX_LOG_TAIL_SIZE: usize = 4 * 1024 * 1024; pub const LOG_MAX_SIZE: usize = 10 * 1024 * 1024; // ZIGINIT_LOG_MAX_KB 环境变量允许的最大值 pub const LOG_MAX_SIZE_UPPER: usize = 100 * 1024 * 1024; +// copy-truncate 轮转保留的备份文件数(.1 .2 .3) +pub const LOG_ROTATE_KEEP: u32 = 3; +// copy-truncate 流式复制缓冲区大小 +pub const LOG_ROTATE_BUF_SIZE: usize = 64 * 1024; +// `log -n` 默认显示行数 +pub const DEFAULT_LOG_LINES: u32 = 100; +// `log -f` follow 模式轮询间隔 +pub const LOG_FOLLOW_POLL_NS: i64 = 200_000_000; // 200ms // ── 默认路径 ── pub const DEFAULT_WORKDIR = "/var/lib/ziginit"; -pub const DEFAULT_CONFIG_PATHS = [_][*:0]const u8{ +pub const DEFAULT_CONFIG_PATHS = [_]c.CStr{ ".config/ziginit/config.json", // $HOME-relative ".ziginit.json", // $HOME-relative "/usr/local/etc/ziginit.json", @@ -129,13 +137,13 @@ pub const ServiceConfig = struct { pub fn nameSlice(self: *const ServiceConfig) []const u8 { return self.name[0..self.name_len]; } - pub fn nameZ(self: *const ServiceConfig) [*:0]const u8 { + pub fn nameZ(self: *const ServiceConfig) c.CStr { return @ptrCast(self.name[0..self.name_len]); } - pub fn execZ(self: *const ServiceConfig) [*:0]const u8 { + pub fn execZ(self: *const ServiceConfig) c.CStr { return @ptrCast(self.exec[0..self.exec_len]); } - pub fn dirZ(self: *const ServiceConfig) ?[*:0]const u8 { + pub fn dirZ(self: *const ServiceConfig) ?c.CStr { if (!self.has_dir) return null; return @ptrCast(self.dir[0..self.dir_len]); } @@ -191,16 +199,16 @@ pub const ServicePaths = struct { log_file: [MAX_PATH]u8, log_len: usize, - pub fn lockZ(self: *const ServicePaths) [*:0]const u8 { + pub fn lockZ(self: *const ServicePaths) c.CStr { return @ptrCast(self.lock_file[0..self.lock_len]); } - pub fn pidZ(self: *const ServicePaths) [*:0]const u8 { + pub fn pidZ(self: *const ServicePaths) c.CStr { return @ptrCast(self.pid_file[0..self.pid_len]); } - pub fn checkZ(self: *const ServicePaths) [*:0]const u8 { + pub fn checkZ(self: *const ServicePaths) c.CStr { return @ptrCast(self.check_file[0..self.check_len]); } - pub fn logZ(self: *const ServicePaths) [*:0]const u8 { + pub fn logZ(self: *const ServicePaths) c.CStr { return @ptrCast(self.log_file[0..self.log_len]); } }; @@ -217,19 +225,19 @@ pub const DaemonPaths = struct { daemon_log: [MAX_PATH]u8, daemon_log_len: usize, - pub fn daemonLockZ(self: *const DaemonPaths) [*:0]const u8 { + pub fn daemonLockZ(self: *const DaemonPaths) c.CStr { return @ptrCast(self.daemon_lock[0..self.daemon_lock_len]); } - pub fn supLockZ(self: *const DaemonPaths) [*:0]const u8 { + pub fn supLockZ(self: *const DaemonPaths) c.CStr { return @ptrCast(self.sup_lock[0..self.sup_lock_len]); } - pub fn sockZ(self: *const DaemonPaths) [*:0]const u8 { + pub fn sockZ(self: *const DaemonPaths) c.CStr { return @ptrCast(self.sock_file[0..self.sock_len]); } - pub fn quitZ(self: *const DaemonPaths) [*:0]const u8 { + pub fn quitZ(self: *const DaemonPaths) c.CStr { return @ptrCast(self.quit_marker[0..self.quit_len]); } - pub fn daemonLogZ(self: *const DaemonPaths) [*:0]const u8 { + pub fn daemonLogZ(self: *const DaemonPaths) c.CStr { return @ptrCast(self.daemon_log[0..self.daemon_log_len]); } }; @@ -244,6 +252,9 @@ pub fn buildServicePaths(workdir: []const u8, name: []const u8) ServicePaths { }; for (pairs) |p| { const total = workdir.len + 1 + name.len + p.suffix.len; + if (total >= MAX_PATH) { + @panic("constructed service path exceeds MAX_PATH"); + } @memcpy(p.dst[0..workdir.len], workdir); p.dst[workdir.len] = '/'; @memcpy(p.dst[workdir.len + 1 ..][0..name.len], name); @@ -265,6 +276,9 @@ pub fn buildDaemonPaths(workdir: []const u8) DaemonPaths { }; for (pairs) |p| { const total = workdir.len + p.suffix.len; + if (total >= MAX_PATH) { + @panic("constructed daemon path exceeds MAX_PATH"); + } @memcpy(p.dst[0..workdir.len], workdir); @memcpy(p.dst[workdir.len..total], p.suffix); p.dst[total] = 0; diff --git a/tools/ziginit/docs/log-improvements.md b/tools/ziginit/docs/log-improvements.md new file mode 100644 index 0000000..a2de745 --- /dev/null +++ b/tools/ziginit/docs/log-improvements.md @@ -0,0 +1,131 @@ +# Log 子命令改进 + +## 1. 动机 + +原有 `log` 命令通过 Unix Socket 向 daemon 请求日志数据,daemon 读取日志文件后流式写回客户端 socket。这种架构存在以下限制: + +- **无法 follow**:socket 是请求-响应模型,无法持续推送新数据 +- **增加延迟**:CLI → daemon → 读文件 → 回传 socket → CLI,多一跳中转 +- **daemon 耦合**:daemon 停机时无法查看日志 +- **按字节截断**:尾部读 64KB 字节,不保证从完整行开始 + +## 2. 新架构 + +CLI 直接读取日志文件,绕过 daemon socket。 + +```mermaid +graph LR + subgraph "旧架构" + A1[CLI] -->|socket| B1[daemon] + B1 -->|read| C1[log files] + B1 -->|stream| A1 + end + + subgraph "新架构" + A2[CLI] -->|direct read| C2[log files] + end + + style A1 fill:#fee2e2,stroke:#dc2626 + style A2 fill:#dcfce7,stroke:#16a34a +``` + +日志文件位于 `{workdir}/{service}.log` 和 `{workdir}/daemon.log`,由 daemon 进程通过 `O_APPEND` 模式写入。CLI 直读不会与 daemon 写入冲突。 + +## 3. CLI 用法 + +``` +ziginit log # 显示最近 100 行 +ziginit log -n # 显示最近 N 行 +ziginit log -f # 实时跟踪(类似 tail -f) +ziginit log daemon # 查看 daemon 日志 +ziginit log --all # 所有服务 + daemon 日志(带 [name] 前缀) +ziginit log -f --all # follow 所有服务 +``` + +过滤日志使用 Unix 管道:`ziginit log -f svc | grep error` + +### 参数说明 + +| 参数 | 默认值 | 说明 | +| -------- | ------ | ------------------------------------- | +| `-n ` | 100 | 显示尾部 N 行 | +| `-f` | off | follow 模式,持续输出新行直到 Ctrl-C | +| `--all` | off | 显示所有服务 + daemon 日志 | +| `daemon` | — | 伪服务名,指向 `{workdir}/daemon.log` | + +### `--all` 输出格式 + +每行加 `[name] ` 前缀(去掉了旧版的读取时间戳,因为它不是写入时间,会产生误导): + +``` +[daemon] daemon started, 2 services +[svc-a] svc-a-out +[svc-a] svc-a-err +[svc-b] listening on :8080 +``` + +## 4. 实现细节 + +### 核心函数(journal.zig) + +| 函数 | 职责 | +| -------------------------------- | ---------------------------------------------------------------------- | +| `tailLastLines(path, n, prefix)` | 从文件尾部读取,反向扫描定位最后 N 行,应用 prefix 后输出到 stdout | +| `followFiles(targets)` | 轮询文件变化(200ms 间隔),输出新增行,检测 `sigpending` 实现优雅退出 | +| `outputLines(data, prefix)` | 逐行处理:前缀添加 + stdout 写入(过滤交给外部 `grep` 管道) | + +### FollowTarget 结构 + +```zig +pub const FollowTarget = struct { + path_buf: [MAX_PATH + 1]u8, // 自持路径数据,避免悬空指针 + path_len: usize, + offset: usize, // 当前读取位置(= tailLastLines 返回的文件大小) + prefix: [MAX_NAME + 4]u8, // "[name] " 前缀 + prefix_len: usize, +}; +``` + +> **设计决策**:`FollowTarget` 拷贝路径到内部缓冲区而非存储外部指针。原因是 `ServicePaths` 是栈上临时值,循环迭代结束后失效。存裸指针会导致 Release 模式悬空指针崩溃(与 `ConfigDtoBuild` 同类问题)。 + +### Follow 模式实现 + +```mermaid +graph TD + A[tailLastLines 初始输出] --> B[blockTermInt 阻塞信号] + B --> C{sleep 200ms} + C --> D{sigpending?} + D -->|是| E[恢复信号 & 退出] + D -->|否| F[遍历 targets] + F --> G{fstat: 文件增长?} + G -->|否| C + G -->|文件缩小| H[offset = 0 重置] + H --> C + G -->|增长| I[seekTo + read 新数据] + I --> J[outputLines 过滤输出] + J --> K[更新 offset] + K --> C +``` + +- 使用 `fstat` 检测文件大小变化,而非 `poll()`(`poll` 对普通文件总是立即返回 `POLLIN`) +- 文件大小减小时判定为日志轮转,重置 offset 到 0 重新读取 +- 信号检查使用 `sigpending`,不依赖信号 handler 全局状态 + +## 5. 删除的代码 + +| 位置 | 删除内容 | +| -------------- | -------------------------------------------------------------------- | +| `journal.zig` | `readServiceLogTail`、`readServiceLogTailPrefixed`(旧 socket 模式) | +| `server.zig` | `ACTION_LOG` 流式转发逻辑(serverLoop + handleCommand) | +| `protocol.zig` | `ACTION_LOG` 常量、`sendCommand` 中的 log 流式读取 | + +## 6. 配置常量(config.zig) + +```zig +pub const DEFAULT_LOG_LINES: u32 = 100; // -n 默认值 +pub const LOG_FOLLOW_POLL_NS: i64 = 200_000_000; // follow 轮询间隔 200ms +``` + +沿用现有环境变量: +- `ZIGINIT_LOG_TAIL_KB`:控制 tail 读取的最大字节数(默认 64KB,上限 4MB) +- `ZIGINIT_LOG_MAX_KB`:控制日志轮转阈值(默认 10MB) diff --git a/tools/ziginit/helpers.zig b/tools/ziginit/helpers.zig index 51aef34..8c1c54d 100644 --- a/tools/ziginit/helpers.zig +++ b/tools/ziginit/helpers.zig @@ -24,9 +24,9 @@ pub fn fdWriteAll(fd: c_int, s: []const u8) bool { return c.writeAll(fd, s); } -/// 将 Zig slice 转成 C 的 [*:0]const u8。如果 slice 本身已经 null-terminated 则零拷贝。 +/// 将 Zig slice 转成 C 的 c.CStr。如果 slice 本身已经 null-terminated 则零拷贝。 /// 否则用 TOZ_POOL_SIZE 个静态 buffer 轮转复制——所以同时持有超过此数的结果指针会出问题。 -pub fn toZ(s: []const u8) ?[*:0]const u8 { +pub fn toZ(s: []const u8) ?c.CStr { if (s.len > 0 and s.ptr[s.len] == 0) return @ptrCast(s.ptr); const Static = struct { var bufs: [config.TOZ_POOL_SIZE][config.MAX_PATH]u8 = undefined; diff --git a/tools/ziginit/journal.zig b/tools/ziginit/journal.zig index f87003f..13f7076 100644 --- a/tools/ziginit/journal.zig +++ b/tools/ziginit/journal.zig @@ -51,98 +51,230 @@ pub fn formatTimestamp(buf: []u8) usize { // ─── Log Tail Reading ──────────────────────────────────────────────────────── -/// 读取指定服务的日志文件尾部,写入 out_fd。 -/// 日志文件是服务进程直接 append 的 stdout/stderr 混合输出。 -pub fn readServiceLogTail(log_path: [*:0]const u8, out_fd: c_int) bool { +/// 逐行输出 data 中的内容,可选 grep 过滤和行前缀。 +fn outputLines(data: []const u8, prefix: []const u8) void { + var pos: usize = 0; + while (pos < data.len) { + var end = pos; + while (end < data.len and data[end] != '\n') end += 1; + const line = data[pos..end]; + // 保留空行(end < data.len 排除缓冲区末尾的假空行) + if (line.len > 0 or end < data.len) { + if (prefix.len > 0) c.writeOnce(1, prefix); + c.writeOnce(1, line); + c.writeOnce(1, "\n"); + } + pos = end + 1; + } +} + +/// CLI 直读日志文件尾部,显示最后 n 行(可选 grep 过滤和行前缀)。 +/// 返回当前文件大小供 follow 模式使用,失败返回 0。 +pub fn tailLastLines(log_path: c.CStr, n: u32, prefix: []const u8) usize { const fd = c.openRead(log_path) orelse { - c.writeOnce(out_fd, "no log available\n"); - return false; + helpers.logErr("no log: {s}\n", .{std.mem.sliceTo(log_path, 0)}); + return 0; }; defer c.closeFd(fd); - const file_size = c.fileSize(fd) orelse return false; - if (file_size <= 0) return false; + const file_size_off = c.fileSize(fd) orelse return 0; + if (file_size_off <= 0) return 0; + const file_size: usize = @intCast(file_size_off); - const ts: usize = logTailSize(); - const fs: usize = @intCast(file_size); - const read_size = @min(ts, fs); - const tail_start: c.off_t = @intCast(fs - read_size); + if (n == 0) return file_size; + + const max_read = logTailSize(); + const read_size = @min(file_size, max_read); + const tail_start: c.off_t = @intCast(file_size - read_size); c.seekTo(fd, tail_start); - const data = std.heap.c_allocator.alloc(u8, read_size) catch return false; + const data = std.heap.c_allocator.alloc(u8, read_size) catch return 0; defer std.heap.c_allocator.free(data); const total = c.readLoop(fd, data.ptr, read_size); - if (total == 0) return false; + if (total == 0) return file_size; // Skip first partial line if we started mid-file - var start: usize = 0; + var content_start: usize = 0; if (tail_start > 0) { - while (start < total and data[start] != '\n') start += 1; - if (start < total) start += 1; + while (content_start < total and data[content_start] != '\n') content_start += 1; + if (content_start < total) content_start += 1; } - // Output all lines (no filtering — log file is per-service) - if (start < total) { - c.writeOnce(out_fd, data[start..total]); + // Scan backwards to find start of last N lines + var pos = total; + var lines_found: u32 = 0; + if (pos > content_start and data[pos - 1] == '\n') pos -= 1; // skip trailing newline + while (pos > content_start) { + pos -= 1; + if (data[pos] == '\n') { + lines_found += 1; + if (lines_found >= n) { + pos += 1; + break; + } + } } - return true; + if (pos <= content_start) pos = content_start; + + outputLines(data[pos..total], prefix); + return file_size; } -/// 读取日志尾部,每行加 [name timestamp] 前缀后写入 out_fd。 -/// 用于 `log --all` 合并多个服务的日志输出。 -pub fn readServiceLogTailPrefixed(log_path: [*:0]const u8, out_fd: c_int, name: []const u8) bool { - const fd = c.openRead(log_path) orelse return false; - defer c.closeFd(fd); +/// follow 模式跟踪目标(自持路径数据,避免悬空指针) +pub const FollowTarget = struct { + path_buf: [config.MAX_PATH + 1]u8, + path_len: usize, + offset: usize, + prefix: [config.MAX_NAME + 4]u8, + prefix_len: usize, + // 跨 chunk 残留行缓冲 + carry: [config.PIPE_BUF_SIZE]u8, + carry_len: usize, + + pub fn pathZ(self: *const FollowTarget) c.CStr { + return @ptrCast(self.path_buf[0..self.path_len]); + } + pub fn prefixSlice(self: *const FollowTarget) []const u8 { + return self.prefix[0..self.prefix_len]; + } + /// 从 null-terminated 源拷贝路径到内部缓冲区。 + pub fn setPath(self: *FollowTarget, src: c.CStr) void { + const s = std.mem.sliceTo(src, 0); + @memcpy(self.path_buf[0..s.len], s); + self.path_buf[s.len] = 0; + self.path_len = s.len; + } +}; - const file_size = c.fileSize(fd) orelse return false; - if (file_size <= 0) return false; +/// follow 模式处理一次 read 结果,维护 carry 缓冲实现跨 chunk 行拼接。 +/// 仅输出完整行(以 \n 结尾),未结束的残留保存在 target.carry 中。 +fn followOutput(target: *FollowTarget, data: []const u8, prefix: []const u8) void { + if (data.len == 0) return; + var start: usize = 0; - const ts: usize = logTailSize(); - const fs: usize = @intCast(file_size); - const read_size = @min(ts, fs); - const tail_start: c.off_t = @intCast(fs - read_size); - c.seekTo(fd, tail_start); + // 拼接 carry 残留 + 新数据的第一行 + if (target.carry_len > 0) { + var first_nl: usize = 0; + while (first_nl < data.len and data[first_nl] != '\n') first_nl += 1; + if (first_nl < data.len) { + // 找到换行:carry + data[0..first_nl] 输出完整行 + if (prefix.len > 0) c.writeOnce(1, prefix); + c.writeOnce(1, target.carry[0..target.carry_len]); + c.writeOnce(1, data[0..first_nl]); + c.writeOnce(1, "\n"); + target.carry_len = 0; + start = first_nl + 1; + } else { + // 仍无换行,追加到 carry;循环消费确保 data 全量处理 + var pos: usize = 0; + while (pos < data.len) { + var avail = target.carry.len - target.carry_len; + if (avail == 0) { + // carry 满,先 flush 再继续 + if (prefix.len > 0) c.writeOnce(1, prefix); + c.writeOnce(1, target.carry[0..target.carry_len]); + target.carry_len = 0; + avail = target.carry.len; + } + const n = @min(data.len - pos, avail); + @memcpy(target.carry[target.carry_len..][0..n], data[pos..][0..n]); + target.carry_len += n; + pos += n; + } + return; + } + } - const data = std.heap.c_allocator.alloc(u8, read_size) catch return false; - defer std.heap.c_allocator.free(data); + if (start >= data.len) return; + const remaining = data[start..]; - const total = c.readLoop(fd, data.ptr, read_size); - if (total == 0) return false; + // 查找最后一个换行 + var last_nl: usize = remaining.len; + while (last_nl > 0) { + last_nl -= 1; + if (remaining[last_nl] == '\n') break; + } - // Skip first partial line if we started mid-file - var start: usize = 0; - if (tail_start > 0) { - while (start < total and data[start] != '\n') start += 1; - if (start < total) start += 1; + if (remaining[last_nl] == '\n') { + // 输出完整行部分 + outputLines(remaining[0 .. last_nl + 1], prefix); + // 保存尾部残留 + const tail = remaining[last_nl + 1 ..]; + if (tail.len > 0) { + const n = @min(tail.len, target.carry.len); + @memcpy(target.carry[0..n], tail[0..n]); + target.carry_len = n; + } + } else { + // 整个 chunk 无换行,保存到 carry + const n = @min(remaining.len, target.carry.len); + @memcpy(target.carry[0..n], remaining[0..n]); + target.carry_len = n; } +} - // Build prefix: [name YYYY-MM-DDTHH:MM:SS] - var prefix_buf: [MAX_NAME + TIMESTAMP_LEN + 4]u8 = undefined; // [name ts] + space - var ts_buf: [TIMESTAMP_LEN]u8 = undefined; - const ts_len = formatTimestamp(&ts_buf); - const prefix = std.fmt.bufPrint(&prefix_buf, "[{s} {s}] ", .{ name, ts_buf[0..ts_len] }) catch return false; +/// follow 模式:轮询文件变化,输出新增行直到收到 SIGTERM/SIGINT。 +/// 调用前应先 tailLastLines 完成初始输出。 +/// 持久化 FD 避免每轮 open/close——ziginit 用 truncate 而非 rename 做轮转, +/// 同一 FD 上 fstat 始终反映当前大小。 +pub fn followFiles(targets: []FollowTarget) void { + // 阻塞 SIGTERM/SIGINT,用 sigpending 同步检查 + const old_mask = c.blockTermInt(); + defer c.restoreSigmask(&old_mask); + + var buf: [config.PIPE_BUF_SIZE]u8 = undefined; + + // 持久化 FD——文件可能尚未创建,-1 表示待重试 + var fds: [config.MAX_SERVICES + 1]c_int = undefined; + for (0..targets.len) |i| { + fds[i] = c.openRead(targets[i].pathZ()) orelse -1; + } + defer for (0..targets.len) |i| { + if (fds[i] >= 0) c.closeFd(fds[i]); + }; - // Write each line with prefix - var pos = start; - while (pos < total) { - var end = pos; - while (end < total and data[end] != '\n') end += 1; - if (end > pos) { - c.writeOnce(out_fd, prefix); - c.writeOnce(out_fd, data[pos..end]); - c.writeOnce(out_fd, "\n"); + while (true) { + c.sleepNs(config.LOG_FOLLOW_POLL_NS); + if (c.pendingTermInt() != 0) break; + + for (targets, 0..) |*target, i| { + // 文件可能延迟创建,每轮重试 + if (fds[i] < 0) { + fds[i] = c.openRead(target.pathZ()) orelse -1; + if (fds[i] < 0) continue; + } + + const file_size_off = c.fileSize(fds[i]) orelse continue; + if (file_size_off <= 0) continue; + const fs: usize = @intCast(file_size_off); + + if (fs < target.offset) { + // File was truncated (rotation), reset to start + target.offset = 0; + target.carry_len = 0; + } + if (fs <= target.offset) continue; + + c.seekTo(fds[i], @intCast(target.offset)); + const pfx = target.prefixSlice(); + + while (true) { + const n = c.read(fds[i], &buf, buf.len); + if (n <= 0) break; + const bytes: usize = @intCast(n); + followOutput(target, buf[0..bytes], pfx); + target.offset += bytes; + } } - pos = end + 1; } - return true; } // ─── Log Rotation ──────────────────────────────────────────────────────────── /// Truncate 日志文件到 0——服务进程用 O_APPEND 写入,truncate 后下次 write /// 自动从文件头开始,对服务透明,不会引发 SIGPIPE。 -pub fn rotateServiceLog(log_path: [*:0]const u8) void { +pub fn rotateServiceLog(log_path: c.CStr) void { const max = logMaxSize(); if (max == 0) return; // rotation disabled diff --git a/tools/ziginit/logrotate.zig b/tools/ziginit/logrotate.zig new file mode 100644 index 0000000..6acab2b --- /dev/null +++ b/tools/ziginit/logrotate.zig @@ -0,0 +1,199 @@ +// logrotate.zig — Copy-truncate log rotation with numbered backups + gzip compression +// +// copytruncate 策略:先将日志内容复制到 .1 备份,gzip 压缩后 truncate 原文件到 0。 +// 服务进程用 O_APPEND 持有原 fd,truncate 后自动从头写入,无需重启。 +// 旧备份按编号后移:.1.gz→.2.gz→.3.gz,超出保留数量的自动删除。 +// +// 此模块可选替代 journal.zig 中的纯 truncate 轮转,调用方按需切换: +// journal.rotateServiceLog → 纯 truncate,历史全丢 +// logrotate.rotateServiceLog → 先备份再 truncate,gzip 压缩,保留最近 N 份 +// +// 压缩依赖系统 gzip 命令(busybox/coreutils 均可),通过 fork+exec 同步执行。 + +const std = @import("std"); +const c = @import("posix.zig"); +const helpers = @import("helpers.zig"); +const config = @import("config.zig"); + +const MAX_PATH = config.MAX_PATH; + +/// 备份路径后缀最大长度:".{d}.gz" 最多 ".99.gz" = 6 字节 + 1 sentinel +const BACKUP_SUFFIX_MAX = 7; + +// ─── Public API ────────────────────────────────────────────────────────────── + +/// Copy-truncate 轮转:文件超过 max_size 时,先备份到 .1 再 truncate 原文件。 +/// keep: 保留的备份份数(.1.gz ~ .{keep}.gz),超出部分自动删除,最大 99。 +/// max_size: 触发轮转的文件大小阈值(字节),0 表示禁用。 +/// +/// 备份保留的是 truncate 前的完整文件内容(从头到尾),gzip 压缩存储。 +/// 轮转顺序:shift .gz 备份 → copy current→.1 → gzip .1→.1.gz → truncate。 +pub fn rotateServiceLog(log_path: c.CStr, max_size: usize, keep: u32) void { + if (max_size == 0 or keep == 0) return; + + const fd = c.openRead(log_path) orelse return; + const file_size = c.fileSize(fd) orelse { + c.closeFd(fd); + return; + }; + c.closeFd(fd); + + if (file_size <= 0) return; + const fs: usize = @intCast(file_size); + if (fs <= max_size) return; + + const path_s = std.mem.sliceTo(log_path, 0); + + // 1. 后移旧备份:删除 .{keep}.gz,.{keep-1}.gz→.{keep}.gz, ..., .1.gz→.2.gz + shiftBackups(log_path, keep); + + // 2. 复制当前日志内容 → .1(未压缩临时文件) + if (!copyToBackup(log_path, 1)) { + helpers.logErr("log rotate: copy failed for {s}\n", .{path_s}); + return; + } + + // 3. gzip .1 → .1.gz(同步等待,10MB 仅需几十 ms) + var tmp_buf: [MAX_PATH + BACKUP_SUFFIX_MAX]u8 = undefined; + if (backupPath(&tmp_buf, log_path, 1, false)) |tmp_path| { + if (!gzipFile(tmp_path)) { + helpers.logErr("log rotate: gzip failed for {s}.1\n", .{path_s}); + // gzip 失败时保留未压缩的 .1,不影响 truncate + } + } + + // 4. Truncate 原文件到 0——O_APPEND 语义保证服务下次 write 从头开始 + const trunc_fd = c.openAppend(log_path) orelse return; + defer c.closeFd(trunc_fd); + if (c.truncateFile(trunc_fd, 0)) { + helpers.logInfo("log rotated: {s} ({d} -> .1.gz, truncated)\n", .{ path_s, fs }); + } +} + +/// 便捷入口:使用 config 中的默认阈值和保留数。 +pub fn rotateWithDefaults(log_path: c.CStr) void { + const max_size = maxSize(); + rotateServiceLog(log_path, max_size, config.LOG_ROTATE_KEEP); +} + +/// 读取 ZIGINIT_LOG_MAX_KB 环境变量控制单文件上限,默认 10MB,最大 100MB。 +/// 设为 0 则禁用轮转。与 journal.logMaxSize() 逻辑一致。 +pub fn maxSize() usize { + const env_z = c.getenv("ZIGINIT_LOG_MAX_KB") orelse return config.LOG_MAX_SIZE; + const env_s = std.mem.sliceTo(env_z, 0); + const kb = std.fmt.parseInt(usize, env_s, 10) catch return config.LOG_MAX_SIZE; + if (kb == 0) return 0; + + const max_kb = config.LOG_MAX_SIZE_UPPER / 1024; + if (kb >= max_kb) return config.LOG_MAX_SIZE_UPPER; + return kb * 1024; +} + +/// 检查系统是否有 gzip 命令可用(fork+exec `gzip --version`)。 +/// daemon 启动时调用,找不到则应 fatal 退出,避免轮转时丢备份。 +pub fn checkGzipAvailable() bool { + const pid = c.fork(); + if (pid < 0) return false; + + if (pid == 0) { + // 子进程:关闭 stdio,静默执行 + c.closeFd(0); + c.closeFd(1); + c.closeFd(2); + const argv_ = [_:null]?c.CStr{ "gzip", "--version" }; + _ = c.execvpSearch("gzip", &argv_); + c._exit(127); + } + + const wstatus = c.waitChildBlock(pid); + const exit_code = (wstatus >> 8) & 0xFF; + return exit_code == 0; +} + +// ─── Internal ──────────────────────────────────────────────────────────────── + +/// 构建备份路径:在 base 后追加 ".{n}" 或 ".{n}.gz",写入 buf 并 null-terminate。 +fn backupPath(buf: *[MAX_PATH + BACKUP_SUFFIX_MAX]u8, base: c.CStr, n: u32, gz: bool) ?c.CStr { + const s = std.mem.sliceTo(base, 0); + const result = if (gz) + std.fmt.bufPrint(buf, "{s}.{d}.gz", .{ s, n }) catch return null + else + std.fmt.bufPrint(buf, "{s}.{d}", .{ s, n }) catch return null; + buf[result.len] = 0; + return @ptrCast(buf[0..result.len]); +} + +/// 后移 .gz 备份文件:删除 .{keep}.gz,.{keep-1}.gz→.{keep}.gz, ..., .1.gz→.2.gz。 +/// rename 失败(备份不存在)时静默跳过。 +fn shiftBackups(base: c.CStr, keep: u32) void { + // 删除最旧的备份 + var oldest_buf: [MAX_PATH + BACKUP_SUFFIX_MAX]u8 = undefined; + if (backupPath(&oldest_buf, base, keep, true)) |oldest| { + c.unlinkFile(oldest); + } + + // 从后往前依次 rename + var i: u32 = keep - 1; + while (i >= 1) : (i -= 1) { + var src_buf: [MAX_PATH + BACKUP_SUFFIX_MAX]u8 = undefined; + var dst_buf: [MAX_PATH + BACKUP_SUFFIX_MAX]u8 = undefined; + const src = backupPath(&src_buf, base, i, true) orelse continue; + const dst = backupPath(&dst_buf, base, i + 1, true) orelse continue; + _ = c.renameFile(src, dst); + } +} + +/// 流式复制源文件内容到 .{n} 备份文件(未压缩)。 +/// 使用栈上 64KB 缓冲区分块读写,不受文件大小限制。 +fn copyToBackup(base: c.CStr, n: u32) bool { + var dst_buf: [MAX_PATH + BACKUP_SUFFIX_MAX]u8 = undefined; + const dst_path = backupPath(&dst_buf, base, n, false) orelse return false; + + const src_fd = c.openRead(base) orelse return false; + defer c.closeFd(src_fd); + + const dst_fd = c.openWriteTrunc(dst_path) orelse return false; + defer c.closeFd(dst_fd); + + // 从文件头开始读——openRead 后 offset 已在 0 + var buf: [config.LOG_ROTATE_BUF_SIZE]u8 = undefined; + while (true) { + const n_read = c.readLoop(src_fd, &buf, config.LOG_ROTATE_BUF_SIZE); + if (n_read == 0) break; + if (!c.writeAll(dst_fd, buf[0..n_read])) return false; + } + + c.syncFd(dst_fd); + return true; +} + +// ─── Gzip Compression ─────────────────────────────────────────────────────── + +/// fork+exec gzip 压缩文件(同步阻塞等待)。 +/// 通过 execvp 按 PATH 搜索 gzip,不硬编码路径。 +/// gzip 会将 path 替换为 path.gz,原文件自动删除。 +/// 10MB 文件约几十 ms,不会阻塞主循环。 +fn gzipFile(path: c.CStr) bool { + const pid = c.fork(); + if (pid < 0) return false; + + if (pid == 0) { + // ── 子进程 ── + // 关闭 stdin/stdout/stderr,gzip 不需要交互 + c.closeFd(0); + c.closeFd(1); + c.closeFd(2); + + // execvp 按 PATH 搜索 gzip(coreutils / busybox 均可) + const argv_ = [_:null]?c.CStr{ "gzip", "-f", path }; + _ = c.execvpSearch("gzip", &argv_); + + c._exit(127); + } + + // ── 父进程:阻塞等待 gzip 完成 ── + const wstatus = c.waitChildBlock(pid); + // wstatus 低 8 位为信号,高 8 位为退出码 + const exit_code = (wstatus >> 8) & 0xFF; + return exit_code == 0; +} diff --git a/tools/ziginit/main.zig b/tools/ziginit/main.zig index 70c97c2..137b29e 100644 --- a/tools/ziginit/main.zig +++ b/tools/ziginit/main.zig @@ -39,6 +39,7 @@ const c = @import("posix.zig"); const helpers = @import("helpers.zig"); const config = @import("config.zig"); const journal = @import("journal.zig"); +const logrotate = @import("logrotate.zig"); const service = @import("service.zig"); const protocol = @import("protocol.zig"); const server = @import("server.zig"); @@ -70,7 +71,10 @@ const usage = \\ restart Restart service(s) \\ kill Force-kill service(s) \\ status [name] Query service status (omit name for all) - \\ log View recent log output + \\ log View recent log output + \\ log -f Follow log output (live tail) + \\ log -n Show last N lines (default 100) + \\ log daemon View daemon log \\ version Print version \\ \\Options: @@ -128,6 +132,9 @@ pub fn main(init: std.process.Init.Minimal) !void { var cmd_str: ?[]const u8 = null; var svc_name: []const u8 = ""; var is_init: bool = false; + // log 子命令专用标志 + var log_lines: u32 = config.DEFAULT_LOG_LINES; + var log_follow: bool = false; // Collect all args first var arg_list: [config.MAX_CLI_ARGS][]const u8 = undefined; @@ -156,6 +163,19 @@ pub fn main(init: std.process.Init.Minimal) !void { is_init = true; } else if (std.mem.eql(u8, arg, "--all")) { svc_name = "all"; + } else if (std.mem.eql(u8, arg, "-f")) { + log_follow = true; + } else if (std.mem.eql(u8, arg, "-n")) { + i += 1; + if (i < arg_count) { + log_lines = std.fmt.parseInt(u32, arg_list[i], 10) catch { + logErr("invalid line count: {s}\n", .{arg_list[i]}); + std.process.exit(1); + }; + } else { + logErr("-n requires a number\n", .{}); + std.process.exit(1); + } } else if (cmd_str == null) { cmd_str = arg; } else if (svc_name.len == 0) { @@ -212,7 +232,7 @@ pub fn main(init: std.process.Init.Minimal) !void { buf[h.len] = '/'; @memcpy(buf[h.len + 1 ..][0..s.len], s); buf[h.len + 1 + s.len] = 0; - const full: [*:0]const u8 = @ptrCast(buf[0 .. h.len + 1 + s.len + 1].ptr); + const full: c.CStr = @ptrCast(buf[0 .. h.len + 1 + s.len + 1].ptr); if (c.openRead(full)) |fd| { c.closeFd(fd); // Allocate a persistent copy @@ -251,12 +271,19 @@ pub fn main(init: std.process.Init.Minimal) !void { var cfg = config.parseConfig(allocator, cfg_path) catch std.process.exit(1); if (command == .check) { - if (config.validateConfig(&cfg)) { - logInfo("config OK: {d} service(s)\n", .{cfg.service_count}); - } else { + if (!config.validateConfig(&cfg)) { logErr("config check FAILED\n", .{}); std.process.exit(1); } + logInfo("config: {s}\n", .{cfg_path}); + logInfo("config OK: {d} service(s)\n", .{cfg.service_count}); + + // 检查系统依赖 + if (!logrotate.checkGzipAvailable()) { + logErr("gzip not found in PATH\n", .{}); + std.process.exit(1); + } + logInfo("gzip: OK\n", .{}); return; } @@ -268,9 +295,9 @@ pub fn main(init: std.process.Init.Minimal) !void { .check => unreachable, .list => { for (cfg.services[0..cfg.service_count]) |*svc| { - if (protocol.sendCommand(dp.sockZ(), protocol.ACTION_STATUS, svc.nameSlice())) |ret| { - const enabled = (ret & protocol.STATUS_ENABLED) != 0; - const running = (ret & protocol.STATUS_RUNNING) != 0; + if (protocol.sendCommand(dp.sockZ(), protocol.ACTION_STATUS, svc.nameSlice())) |result| { + const enabled = (result.status & protocol.STATUS_ENABLED) != 0; + const running = (result.status & protocol.STATUS_RUNNING) != 0; logSvc(svc.nameSlice(), "enabled={s} running={s}\n", .{ if (enabled) "yes" else "no", if (running) "yes" else "no", @@ -287,11 +314,11 @@ pub fn main(init: std.process.Init.Minimal) !void { } }, .reload => { - if (protocol.sendCommand(dp.sockZ(), protocol.ACTION_RELOAD, "")) |ret| { - if (ret & 0xFF == 0) { + if (protocol.sendCommand(dp.sockZ(), protocol.ACTION_RELOAD, "")) |result| { + if (result.isOk()) { logInfo("reload ok\n", .{}); } else { - logErr("reload failed\n", .{}); + logErr("reload failed: {s}\n", .{result.msgSlice()}); std.process.exit(1); } } else { @@ -361,9 +388,9 @@ pub fn main(init: std.process.Init.Minimal) !void { .full => unreachable, } - if (protocol.sendCommand(dp.sockZ(), protocol.ACTION_RELOAD, "")) |ret| { - if (ret & 0xFF != 0) { - logErr("reload failed after add/update\n", .{}); + if (protocol.sendCommand(dp.sockZ(), protocol.ACTION_RELOAD, "")) |result| { + if (!result.isOk()) { + logErr("reload failed after add/update: {s}\n", .{result.msgSlice()}); std.process.exit(1); } } else { @@ -405,9 +432,9 @@ pub fn main(init: std.process.Init.Minimal) !void { } logInfo("service removed: {s}\n", .{svc_name}); - if (protocol.sendCommand(dp.sockZ(), protocol.ACTION_RELOAD, "")) |ret| { - if (ret & 0xFF != 0) { - logErr("reload failed after remove\n", .{}); + if (protocol.sendCommand(dp.sockZ(), protocol.ACTION_RELOAD, "")) |result| { + if (!result.isOk()) { + logErr("reload failed after remove: {s}\n", .{result.msgSlice()}); std.process.exit(1); } } else { @@ -420,10 +447,12 @@ pub fn main(init: std.process.Init.Minimal) !void { std.process.exit(1); } if (std.mem.eql(u8, svc_name, "all")) { - protocol.sendCommandAll(&cfg, &dp, protocol.ACTION_START); + if (!protocol.sendCommandAll(&cfg, &dp, protocol.ACTION_START)) std.process.exit(1); } else { - if (protocol.sendCommand(dp.sockZ(), protocol.ACTION_START, svc_name)) |ret| { - if (ret & 0xFF == 0) logSvc(svc_name, "started\n", .{}) else logSvc(svc_name, "start failed\n", .{}); + const result = protocol.sendCommand(dp.sockZ(), protocol.ACTION_START, svc_name) orelse std.process.exit(1); + if (result.isOk()) logSvc(svc_name, "started\n", .{}) else { + logSvc(svc_name, "{s}\n", .{result.msgSlice()}); + std.process.exit(1); } } }, @@ -433,10 +462,12 @@ pub fn main(init: std.process.Init.Minimal) !void { std.process.exit(1); } if (std.mem.eql(u8, svc_name, "all")) { - protocol.sendCommandAll(&cfg, &dp, protocol.ACTION_STOP); + if (!protocol.sendCommandAll(&cfg, &dp, protocol.ACTION_STOP)) std.process.exit(1); } else { - if (protocol.sendCommand(dp.sockZ(), protocol.ACTION_STOP, svc_name)) |ret| { - if (ret & 0xFF == 0) logSvc(svc_name, "stopped\n", .{}) else logSvc(svc_name, "stop failed\n", .{}); + const result = protocol.sendCommand(dp.sockZ(), protocol.ACTION_STOP, svc_name) orelse std.process.exit(1); + if (result.isOk()) logSvc(svc_name, "stopped\n", .{}) else { + logSvc(svc_name, "{s}\n", .{result.msgSlice()}); + std.process.exit(1); } } }, @@ -446,10 +477,12 @@ pub fn main(init: std.process.Init.Minimal) !void { std.process.exit(1); } if (std.mem.eql(u8, svc_name, "all")) { - protocol.sendCommandAll(&cfg, &dp, protocol.ACTION_RESTART); + if (!protocol.sendCommandAll(&cfg, &dp, protocol.ACTION_RESTART)) std.process.exit(1); } else { - if (protocol.sendCommand(dp.sockZ(), protocol.ACTION_RESTART, svc_name)) |ret| { - if (ret & 0xFF == 0) logSvc(svc_name, "restarted\n", .{}) else logSvc(svc_name, "restart failed\n", .{}); + const result = protocol.sendCommand(dp.sockZ(), protocol.ACTION_RESTART, svc_name) orelse std.process.exit(1); + if (result.isOk()) logSvc(svc_name, "restarted\n", .{}) else { + logSvc(svc_name, "{s}\n", .{result.msgSlice()}); + std.process.exit(1); } } }, @@ -459,37 +492,121 @@ pub fn main(init: std.process.Init.Minimal) !void { std.process.exit(1); } if (std.mem.eql(u8, svc_name, "all")) { - protocol.sendCommandAll(&cfg, &dp, protocol.ACTION_KILL); + if (!protocol.sendCommandAll(&cfg, &dp, protocol.ACTION_KILL)) std.process.exit(1); } else { - if (protocol.sendCommand(dp.sockZ(), protocol.ACTION_KILL, svc_name)) |ret| { - if (ret & 0xFF == 0) logSvc(svc_name, "killed\n", .{}) else logSvc(svc_name, "kill failed\n", .{}); + const result = protocol.sendCommand(dp.sockZ(), protocol.ACTION_KILL, svc_name) orelse std.process.exit(1); + if (result.isOk()) logSvc(svc_name, "killed\n", .{}) else { + logSvc(svc_name, "{s}\n", .{result.msgSlice()}); + std.process.exit(1); } } }, .status => { if (svc_name.len == 0) { // Status all - if (protocol.sendCommand(dp.sockZ(), protocol.ACTION_STATUS, "")) |ret| { - if (ret & 0xFF != 0) logErr("status query failed\n", .{}); + const result = protocol.sendCommand(dp.sockZ(), protocol.ACTION_STATUS, "") orelse std.process.exit(1); + if (!result.isOk()) { + logErr("status query failed: {s}\n", .{result.msgSlice()}); + std.process.exit(1); } } else { - if (protocol.sendCommand(dp.sockZ(), protocol.ACTION_STATUS, svc_name)) |ret| { - const enabled = (ret & protocol.STATUS_ENABLED) != 0; - const running = (ret & protocol.STATUS_RUNNING) != 0; - logSvc(svc_name, "enabled={s} running={s}\n", .{ - if (enabled) "yes" else "no", - if (running) "yes" else "no", - }); + const result = protocol.sendCommand(dp.sockZ(), protocol.ACTION_STATUS, svc_name) orelse std.process.exit(1); + if (!result.isOk()) { + logSvc(svc_name, "{s}\n", .{result.msgSlice()}); + std.process.exit(1); } + const enabled = (result.status & protocol.STATUS_ENABLED) != 0; + const running = (result.status & protocol.STATUS_RUNNING) != 0; + logSvc(svc_name, "enabled={s} running={s}\n", .{ + if (enabled) "yes" else "no", + if (running) "yes" else "no", + }); } }, .log => { if (svc_name.len == 0) { - logErr("log requires a service name or --all\n", .{}); + logErr("log requires a service name, 'daemon', or --all\n", .{}); std.process.exit(1); } - if (protocol.sendCommand(dp.sockZ(), protocol.ACTION_LOG, svc_name)) |ret| { - if (ret & 0xFF != 0) logErr("log not available for '{s}'\n", .{svc_name}); + + const show_all = std.mem.eql(u8, svc_name, "all"); + const show_daemon = show_all or std.mem.eql(u8, svc_name, "daemon"); + const show_prefix = show_all; + + // Build follow targets (also used for initial tail) + var targets: [config.MAX_SERVICES + 1]journal.FollowTarget = undefined; + var target_count: usize = 0; + + // Daemon log + if (show_daemon) { + targets[target_count] = .{ + .path_buf = undefined, + .path_len = 0, + .offset = 0, + .prefix = undefined, + .prefix_len = 0, + .carry = undefined, + .carry_len = 0, + }; + targets[target_count].setPath(dp.daemonLogZ()); + if (show_prefix) { + const pfx = std.fmt.bufPrint(&targets[target_count].prefix, "[daemon] ", .{}) catch ""; + targets[target_count].prefix_len = pfx.len; + } + target_count += 1; + } + + // Service logs + if (show_all) { + for (cfg.services[0..cfg.service_count]) |*svc| { + const sp = config.buildServicePaths(cfg.workdirSlice(), svc.nameSlice()); + targets[target_count] = .{ + .path_buf = undefined, + .path_len = 0, + .offset = 0, + .prefix = undefined, + .prefix_len = 0, + .carry = undefined, + .carry_len = 0, + }; + targets[target_count].setPath(sp.logZ()); + const pfx = std.fmt.bufPrint(&targets[target_count].prefix, "[{s}] ", .{svc.nameSlice()}) catch ""; + targets[target_count].prefix_len = pfx.len; + target_count += 1; + } + } else if (!show_daemon) { + // Single service — 先校验服务存在,防止未知名称越界写 [MAX_PATH] 缓冲 + if (cfg.findService(svc_name) == null) { + logErr("unknown service: {s}\n", .{svc_name}); + std.process.exit(1); + } + const sp = config.buildServicePaths(cfg.workdirSlice(), svc_name); + targets[target_count] = .{ + .path_buf = undefined, + .path_len = 0, + .offset = 0, + .prefix = undefined, + .prefix_len = 0, + .carry = undefined, + .carry_len = 0, + }; + targets[target_count].setPath(sp.logZ()); + target_count += 1; + } + + if (target_count == 0) { + logErr("no log targets\n", .{}); + std.process.exit(1); + } + + // Initial tail + for (targets[0..target_count]) |*t| { + t.offset = journal.tailLastLines(t.pathZ(), log_lines, t.prefixSlice()); + } + + // Follow mode + if (log_follow) { + journal.followFiles(targets[0..target_count]); } }, .version => unreachable, diff --git a/tools/ziginit/posix.zig b/tools/ziginit/posix.zig index ce109a6..67c5165 100644 --- a/tools/ziginit/posix.zig +++ b/tools/ziginit/posix.zig @@ -9,6 +9,11 @@ const native = std.c; // ── Types ──────────────────────────────────────────────────────────────────── +/// C 风格 null 结尾字符串 (const char*) +pub const CStr = [*:0]const u8; +/// C 风格 null 结尾字符串数组 (const char*[]),用于 argv/envp +pub const CStrArray = [*:null]const ?[*:0]const u8; + pub const pid_t = native.pid_t; pub const mode_t = native.mode_t; pub const off_t = native.off_t; @@ -31,7 +36,7 @@ pub const CLOCK = native.CLOCK; pub const SEEK = native.SEEK; /// environ 是运行时指针,不能作为 comptime 常量,需要通过函数访问。 -pub fn getEnviron() [*:null]const ?[*:0]const u8 { +pub fn getEnviron() CStrArray { return @ptrCast(native.environ); } @@ -80,10 +85,17 @@ pub fn sendSignalChecked(pid: pid_t, sig: SIG) bool { } /// 替换当前进程映像。成功不返回,失败返回 -1。 -pub fn execve(path: [*:0]const u8, argv: [*:null]const ?[*:0]const u8, envp: [*:null]const ?[*:0]const u8) c_int { +pub fn execve(path: CStr, argv: CStrArray, envp: CStrArray) c_int { return native.execve(path, argv, envp); } +extern "c" fn execvp(file: CStr, argv: CStrArray) c_int; + +/// 按 PATH 搜索并替换当前进程映像。成功不返回,失败返回 -1。 +pub fn execvpSearch(file: CStr, argv: CStrArray) c_int { + return execvp(file, argv); +} + // ── Network ────────────────────────────────────────────────────────────────── /// bind socket 到地址。成功返回 0,失败返回 -1。 @@ -117,7 +129,7 @@ pub fn poll(fds: [*]pollfd, nfds: u32, timeout: c_int) c_int { } /// 获取环境变量。未找到返回 null。 -pub fn getenv(name: [*:0]const u8) ?[*:0]const u8 { +pub fn getenv(name: CStr) ?CStr { return native.getenv(name); } @@ -199,7 +211,7 @@ pub const MODE_NONE: mode_t = 0; // ── 深封装:File I/O ───────────────────────────────────────────────────────── /// 只读打开。成功返回 fd,失败返回 null。 -pub fn openRead(path: [*:0]const u8) ?c_int { +pub fn openRead(path: CStr) ?c_int { const fd = native.open(path, .{}, MODE_NONE); return if (fd >= 0) fd else null; } @@ -210,7 +222,7 @@ pub fn closeFd(fd: c_int) void { } /// 删除文件,忽略返回值(文件可能已不存在)。 -pub fn unlinkFile(path: [*:0]const u8) void { +pub fn unlinkFile(path: CStr) void { _ = native.unlink(path); } @@ -266,7 +278,7 @@ pub fn sleepSec(sec: u32) void { /// 睡眠指定纳秒数。 pub fn sleepNs(nsec: i64) void { - const ts: timespec = .{ .sec = 0, .nsec = nsec }; + const ts: timespec = .{ .sec = 0, .nsec = @intCast(nsec) }; _ = native.nanosleep(&ts, null); } @@ -303,35 +315,35 @@ pub fn setSocketTimeouts(fd: c_int, sec: c_long) void { /// 以 WRONLY|CREAT|TRUNC 打开文件,mode 644。成功返回 fd,失败返回 null。 /// 用于写入新文件(覆盖已有内容)。 -pub fn openWriteTrunc(path: [*:0]const u8) ?c_int { +pub fn openWriteTrunc(path: CStr) ?c_int { const fd = native.open(path, .{ .ACCMODE = .WRONLY, .CREAT = true, .TRUNC = true }, MODE_RW); return if (fd >= 0) fd else null; } /// 以 WRONLY|CREAT|APPEND|CLOEXEC 打开文件,mode 644。成功返回 fd,失败返回 null。 /// 用于追加写入(journal 等)。 -pub fn openAppend(path: [*:0]const u8) ?c_int { +pub fn openAppend(path: CStr) ?c_int { const fd = native.open(path, .{ .ACCMODE = .WRONLY, .CREAT = true, .APPEND = true, .CLOEXEC = true }, MODE_RW); return if (fd >= 0) fd else null; } /// 以 WRONLY|CREAT|APPEND 打开文件(无 CLOEXEC),mode 644。 /// 用于服务日志——fd 必须在 exec 后继续存活。 -pub fn openAppendLeaky(path: [*:0]const u8) ?c_int { +pub fn openAppendLeaky(path: CStr) ?c_int { const fd = native.open(path, .{ .ACCMODE = .WRONLY, .CREAT = true, .APPEND = true }, MODE_RW); return if (fd >= 0) fd else null; } /// 以 WRONLY|CREAT 打开文件,mode 644。成功返回 fd,失败返回 null。 /// 用于创建标记文件(如 .check)。 -pub fn openCreate(path: [*:0]const u8) ?c_int { +pub fn openCreate(path: CStr) ?c_int { const fd = native.open(path, .{ .ACCMODE = .WRONLY, .CREAT = true }, MODE_RW); return if (fd >= 0) fd else null; } /// 以 WRONLY|CREAT|CLOEXEC 打开文件,mode 644。成功返回 fd,失败返回 null。 /// 用于锁文件(exec 后自动关闭)。 -pub fn openCreateCloexec(path: [*:0]const u8) ?c_int { +pub fn openCreateCloexec(path: CStr) ?c_int { const fd = native.open(path, .{ .ACCMODE = .WRONLY, .CREAT = true, .CLOEXEC = true }, MODE_RW); return if (fd >= 0) fd else null; } @@ -351,7 +363,7 @@ pub fn syncFd(fd: c_int) void { } /// 原子重命名。成功返回 true。 -pub fn renameFile(old_path: [*:0]const u8, new_path: [*:0]const u8) bool { +pub fn renameFile(old_path: CStr, new_path: CStr) bool { return native.rename(old_path, new_path) == 0; } @@ -361,7 +373,7 @@ pub fn truncateFile(fd: c_int, size: i64) bool { } /// 检查文件是否存在(使用 open+close)。 -pub fn fileExists(path: [*:0]const u8) bool { +pub fn fileExists(path: CStr) bool { if (openRead(path)) |fd| { closeFd(fd); return true; @@ -419,7 +431,7 @@ pub fn newSession() bool { } /// 切换工作目录。成功返回 true。 -pub fn chdirTo(path: [*:0]const u8) bool { +pub fn chdirTo(path: CStr) bool { return native.chdir(path) == 0; } diff --git a/tools/ziginit/protocol.zig b/tools/ziginit/protocol.zig index f0c78a8..be2de17 100644 --- a/tools/ziginit/protocol.zig +++ b/tools/ziginit/protocol.zig @@ -20,7 +20,6 @@ pub const ACTION_KILL: u8 = 'k'; pub const ACTION_RESTART: u8 = 'r'; pub const ACTION_STATUS: u8 = 'p'; // 'p' for probe pub const ACTION_VERSION: u8 = 'v'; -pub const ACTION_LOG: u8 = 'l'; pub const ACTION_QUIT: u8 = 'q'; pub const ACTION_LIST: u8 = 'L'; // 大写区分于 log pub const ACTION_RELOAD: u8 = 'R'; // 大写区分于 restart @@ -28,28 +27,67 @@ pub const ACTION_ADD: u8 = 'a'; pub const ACTION_REMOVE: u8 = 'd'; // 'd' for delete // ─── Status Response Bitmask ───────────────────────────────────────────────── -// 2 字节 LE 响应中的标志位,低字节为错误码(0=成功),高字节为状态标志 +// 响应格式:[2B status LE][2B msg_len LE][0~65535B message] +// 低字节为错误码(0=成功),高字节为状态标志/附加数据 // 服务启用(watchdog 会自动拉起) pub const STATUS_ENABLED: u16 = 0x0100; // 服务正在运行 pub const STATUS_RUNNING: u16 = 0x0200; +// ─── Command Result ────────────────────────────────────────────────────────── +// 服务端处理结果:状态码 + 可选错误消息 +// 响应线格式:[2B status LE][2B msg_len LE][0~65535B message] + +pub const MAX_MSG_LEN = 256; + +pub const CommandResult = struct { + status: u16 = 0, + msg: [MAX_MSG_LEN]u8 = undefined, + msg_len: u16 = 0, + + /// 成功,可附带高字节标志 + pub fn ok(status: u16) CommandResult { + return .{ .status = status }; + } + + /// 失败,带格式化错误消息 + pub fn fail(code: u8, comptime fmt: []const u8, args: anytype) CommandResult { + var r = CommandResult{ .status = code }; + if (std.fmt.bufPrint(&r.msg, fmt, args)) |s| { + r.msg_len = @intCast(s.len); + } else |_| { + r.msg_len = MAX_MSG_LEN; + } + return r; + } + + pub fn msgSlice(self: *const CommandResult) []const u8 { + return self.msg[0..self.msg_len]; + } + + pub fn isOk(self: *const CommandResult) bool { + return self.status & 0xFF == 0; + } +}; + // ─── Socket Helpers ────────────────────────────────────────────────────────── -pub fn sockaddrUn(path: [*:0]const u8) c.sockaddr.un { +pub fn sockaddrUn(path: c.CStr) c.sockaddr.un { var addr: c.sockaddr.un = .{ .path = undefined }; const path_slice = std.mem.sliceTo(path, 0); - const len = @min(path_slice.len, addr.path.len - 1); - @memcpy(addr.path[0..len], path_slice[0..len]); - addr.path[len] = 0; + if (path_slice.len >= addr.path.len) { + @panic("socket path exceeds sun_path capacity"); + } + @memcpy(addr.path[0..path_slice.len], path_slice); + addr.path[path_slice.len] = 0; return addr; } -/// Send a command to the daemon. Returns 2-byte status and optionally reads payload. +/// Send a command to the daemon. Returns CommandResult with status + error message. /// CLI 进程通过此函数与 daemon 通信。 /// 内置 2s 重试(100ms × 20)——daemon 可能正在启动中。 -pub fn sendCommand(sock_path: [*:0]const u8, command: u8, svc_name: []const u8) ?u16 { +pub fn sendCommand(sock_path: c.CStr, command: u8, svc_name: []const u8) ?CommandResult { const fd = c.unixSocket() orelse { logErr("cannot create socket\n", .{}); return null; @@ -78,8 +116,12 @@ pub fn sendCommand(sock_path: [*:0]const u8, command: u8, svc_name: []const u8) // Build 64-byte request var req: [REQUEST_SIZE]u8 = std.mem.zeroes([REQUEST_SIZE]u8); req[0] = command; - const name_len = @min(svc_name.len, REQUEST_SIZE - 2); - @memcpy(req[1..][0..name_len], svc_name[0..name_len]); + const max_name = REQUEST_SIZE - 2; + if (svc_name.len > max_name) { + logErr("service name too long (max {d} bytes)\n", .{max_name}); + return null; + } + @memcpy(req[1..][0..svc_name.len], svc_name); if (c.write(fd, &req, REQUEST_SIZE) != REQUEST_SIZE) { logErr("send command failed\n", .{}); @@ -92,31 +134,43 @@ pub fn sendCommand(sock_path: [*:0]const u8, command: u8, svc_name: []const u8) logErr("recv response failed\n", .{}); return null; } - const result = @as(u16, resp[1]) << 8 | resp[0]; - - // For log command, stream remaining data to stdout - if (command == ACTION_LOG and (result & 0xFF) == 0) { - var buf: [config.PIPE_BUF_SIZE]u8 = undefined; - while (true) { - const n = c.read(fd, &buf, buf.len); - if (n <= 0) break; - c.writeOnce(1, buf[0..@intCast(n)]); + + var result = CommandResult{ + .status = @as(u16, resp[1]) << 8 | resp[0], + }; + + // Read 2-byte message length (LE) + message body + var len_buf: [2]u8 = undefined; + if (c.readLoop(fd, &len_buf, 2) == 2) { + const msg_len = @as(u16, len_buf[1]) << 8 | len_buf[0]; + if (msg_len > 0) { + const to_read: usize = @min(msg_len, MAX_MSG_LEN); + const n = c.readLoop(fd, &result.msg, to_read); + result.msg_len = @intCast(n); } } return result; } -pub fn sendCommandAll(cfg: *const config.Config, dp: *const config.DaemonPaths, command: u8) void { +/// Send command to all services. Returns true only if every service succeeded. +pub fn sendCommandAll(cfg: *const config.Config, dp: *const config.DaemonPaths, command: u8) bool { + var all_ok = true; for (cfg.services[0..cfg.service_count]) |*svc| { - if (sendCommand(dp.sockZ(), command, svc.nameSlice())) |ret| { - if (ret & 0xFF == 0) { + if (sendCommand(dp.sockZ(), command, svc.nameSlice())) |result| { + if (result.isOk()) { logSvc(svc.nameSlice(), "ok\n", .{}); + } else if (result.msg_len > 0) { + logSvc(svc.nameSlice(), "{s}\n", .{result.msgSlice()}); + all_ok = false; } else { logSvc(svc.nameSlice(), "failed\n", .{}); + all_ok = false; } } else { logSvc(svc.nameSlice(), "connection failed\n", .{}); + all_ok = false; } } + return all_ok; } diff --git a/tools/ziginit/server.zig b/tools/ziginit/server.zig index 0c87643..62bd301 100644 --- a/tools/ziginit/server.zig +++ b/tools/ziginit/server.zig @@ -10,6 +10,7 @@ const config = @import("config.zig"); const service = @import("service.zig"); const journal = @import("journal.zig"); const protocol = @import("protocol.zig"); +const logrotate = @import("logrotate.zig"); const logInfo = helpers.logInfo; const logErr = helpers.logErr; @@ -72,7 +73,7 @@ fn readSupPid(dp: *const config.DaemonPaths) ?c.pid_t { /// 5. daemon 退出后检查 quit marker: /// - 存在 → 正常关停,supervisor 退出 /// - 不存在 → 崩溃,指数回退重启 -pub fn supervisedServerLoop(cfg: *const config.Config, dp: *const config.DaemonPaths, config_path: []const u8, is_init: bool, argv: []const [*:0]const u8, argv_cmd_ptr: ?[*]u8) void { +pub fn supervisedServerLoop(cfg: *const config.Config, dp: *const config.DaemonPaths, config_path: []const u8, is_init: bool, argv: []const c.CStr, argv_cmd_ptr: ?[*]u8) void { // Reset signal disposition to default, then block SIGTERM + SIGINT. // Replacement supervisors inherit daemon's blocked mask; resetting first // ensures a clean state before re-blocking. @@ -492,28 +493,19 @@ pub fn serverLoop(cfg: *config.Config, dp: *const config.DaemonPaths, config_pat const result = handleCommand(cmd, svc_name, cfg, config_path); - // Write 2-byte response - const resp = [2]u8{ @intCast(result & 0xFF), @intCast((result >> 8) & 0xFF) }; + // Write response: [2B status LE][2B msg_len LE][msg] + const resp = [2]u8{ @intCast(result.status & 0xFF), @intCast((result.status >> 8) & 0xFF) }; c.writeOnce(client_fd, &resp); - - // For log, stream service log file(s) - if (cmd == protocol.ACTION_LOG and (result & 0xFF) == 0) { - const is_log_all = svc_name.len == 0 or std.mem.eql(u8, svc_name, "all"); - if (is_log_all) { - for (cfg.services[0..cfg.service_count]) |*svc| { - const sp = config.buildServicePaths(cfg.workdirSlice(), svc.nameSlice()); - _ = journal.readServiceLogTailPrefixed(sp.logZ(), client_fd, svc.nameSlice()); - } - } else if (cfg.findService(svc_name)) |_| { - const sp = config.buildServicePaths(cfg.workdirSlice(), svc_name); - _ = journal.readServiceLogTail(sp.logZ(), client_fd); - } + const msg_len_bytes = [2]u8{ @intCast(result.msg_len & 0xFF), @intCast((result.msg_len >> 8) & 0xFF) }; + c.writeOnce(client_fd, &msg_len_bytes); + if (result.msg_len > 0) { + c.writeOnce(client_fd, result.msgSlice()); } c.closeFd(client_fd); // Quit: write marker then exit - if (cmd == protocol.ACTION_QUIT and (result & 0xFF) == 0) { + if (cmd == protocol.ACTION_QUIT and result.isOk()) { logInfo("daemon exiting (quit command)\n", .{}); if (c.openCreate(dp.quitZ())) |qfd| c.closeFd(qfd); break; @@ -600,13 +592,18 @@ fn spawnSupervisor(cfg: *const config.Config, dp: *const config.DaemonPaths, con // ─── Command Handling ──────────────────────────────────────────────────────── -fn handleCommand(cmd: u8, svc_name: []const u8, cfg: *config.Config, config_path: []const u8) u16 { +fn handleCommand(cmd: u8, svc_name: []const u8, cfg: *config.Config, config_path: []const u8) protocol.CommandResult { + const R = protocol.CommandResult; + // Global commands (no service name needed) switch (cmd) { - protocol.ACTION_VERSION => return @as(u16, @intCast(VERSION)) << 8, - protocol.ACTION_QUIT => return 0, - protocol.ACTION_LIST => return handleStatusAll(cfg), - protocol.ACTION_RELOAD => return if (reloadDaemonConfig(cfg, config_path)) 0 else 1, + protocol.ACTION_VERSION => return R.ok(@as(u16, @intCast(VERSION)) << 8), + protocol.ACTION_QUIT => return R.ok(0), + protocol.ACTION_LIST => return R.ok(handleStatusAll(cfg)), + protocol.ACTION_RELOAD => return if (reloadDaemonConfig(cfg, config_path)) + R.ok(0) + else + R.fail(1, "reload config failed", .{}), else => {}, } @@ -614,12 +611,7 @@ fn handleCommand(cmd: u8, svc_name: []const u8, cfg: *config.Config, config_path const is_all = svc_name.len == 0 or std.mem.eql(u8, svc_name, "all"); if (cmd == protocol.ACTION_STATUS and is_all) { - return handleStatusAll(cfg); - } - - // log all: return 0 so serverLoop streams all service logs - if (cmd == protocol.ACTION_LOG and is_all) { - return 0; + return R.ok(handleStatusAll(cfg)); } if (is_all) { @@ -636,32 +628,41 @@ fn handleCommand(cmd: u8, svc_name: []const u8, cfg: *config.Config, config_path }; if (!ok) any_fail = true; } - return if (any_fail) 1 else 0; + return if (any_fail) R.fail(1, "some services failed", .{}) else R.ok(0); } // Single service const svc = cfg.findService(svc_name) orelse { logErr("unknown service: {s}\n", .{svc_name}); - return 0xFF; + return R.fail(0xFF, "unknown service: {s}", .{svc_name}); }; const sp = config.buildServicePaths(cfg.workdirSlice(), svc.nameSlice()); switch (cmd) { - protocol.ACTION_START => return if (service.startService(svc, &sp, cfg.workdirSlice())) 0 else 1, - protocol.ACTION_STOP => return if (service.stopService(svc, &sp, false)) 0 else 1, - protocol.ACTION_KILL => return if (service.stopService(svc, &sp, true)) 0 else 1, - protocol.ACTION_RESTART => return if (service.restartService(svc, &sp, cfg.workdirSlice())) 0 else 1, + protocol.ACTION_START => return if (service.startService(svc, &sp, cfg.workdirSlice())) + R.ok(0) + else + R.fail(1, "start failed (check daemon log)", .{}), + protocol.ACTION_STOP => return if (service.stopService(svc, &sp, false)) + R.ok(0) + else + R.fail(1, "stop failed (check daemon log)", .{}), + protocol.ACTION_KILL => return if (service.stopService(svc, &sp, true)) + R.ok(0) + else + R.fail(1, "kill failed (check daemon log)", .{}), + protocol.ACTION_RESTART => return if (service.restartService(svc, &sp, cfg.workdirSlice())) + R.ok(0) + else + R.fail(1, "restart failed (check daemon log)", .{}), protocol.ACTION_STATUS => { const st = service.getServiceStatus(&sp); var result: u16 = 0; if (st.enabled) result |= protocol.STATUS_ENABLED; if (st.running) result |= protocol.STATUS_RUNNING; - return result; - }, - protocol.ACTION_LOG => { - return 0; + return R.ok(result); }, - else => return 0xFF, + else => return R.fail(0xFF, "unknown command: {c}", .{cmd}), } } @@ -746,11 +747,11 @@ pub fn watchdogCheckAll(cfg: *const config.Config) void { } } -/// 检查所有服务日志文件大小,超限则 truncate。 +/// 检查所有服务日志文件大小,超限则 copy-truncate 轮转(带 gzip 备份)。 pub fn rotateAllLogs(cfg: *const config.Config) void { for (cfg.services[0..cfg.service_count]) |*svc| { const sp = config.buildServicePaths(cfg.workdirSlice(), svc.nameSlice()); - journal.rotateServiceLog(sp.logZ()); + logrotate.rotateWithDefaults(sp.logZ()); } } diff --git a/tools/ziginit/service.zig b/tools/ziginit/service.zig index 42a75ac..64b3adf 100644 --- a/tools/ziginit/service.zig +++ b/tools/ziginit/service.zig @@ -11,7 +11,7 @@ const EXIT_EXEC = config.EXIT_EXEC_FAILURE; // ─── Lock & PID Helpers ────────────────────────────────────────────────────── -pub fn tryLock(path: [*:0]const u8, blocking: bool) c_int { +pub fn tryLock(path: c.CStr, blocking: bool) c_int { const fd = c.openCreateCloexec(path) orelse return -1; if (!c.flockEx(fd, blocking)) { c.closeFd(fd); @@ -23,7 +23,7 @@ pub fn tryLock(path: [*:0]const u8, blocking: bool) c_int { /// flock 作为存活探测:子进程持有 pid 文件的 LOCK_EX, /// 进程死亡时内核自动释放锁。所以能用非阻塞 flock 探测。 /// 比读 pid + kill -0 更可靠——没有 PID 回收竞争问题。 -pub fn isRunning(pid_path: [*:0]const u8) bool { +pub fn isRunning(pid_path: c.CStr) bool { const fd = c.openRead(pid_path) orelse return false; defer c.closeFd(fd); if (c.flockEx(fd, false)) { @@ -33,7 +33,7 @@ pub fn isRunning(pid_path: [*:0]const u8) bool { return true; } -pub fn readPid(pid_path: [*:0]const u8) ?c.pid_t { +pub fn readPid(pid_path: c.CStr) ?c.pid_t { const fd = c.openRead(pid_path) orelse return null; defer c.closeFd(fd); var buf: [32]u8 = undefined; @@ -127,7 +127,7 @@ pub fn startService(svc: *const config.ServiceConfig, sp: *const config.ServiceP } // Build argv: [exec, args..., null] - var argv_buf: [config.MAX_ARGS + 2]?[*:0]const u8 = undefined; + var argv_buf: [config.MAX_ARGS + 2]?c.CStr = undefined; argv_buf[0] = svc.execZ(); const max_a = @min(svc.args_count, config.MAX_ARGS); for (0..max_a) |i| { diff --git a/tools/ziginit/tests/e2e_test.go b/tools/ziginit/tests/e2e_test.go index 800b396..a56b751 100644 --- a/tools/ziginit/tests/e2e_test.go +++ b/tools/ziginit/tests/e2e_test.go @@ -1,6 +1,8 @@ package ziginit_test import ( + "bytes" + "context" "encoding/json" "fmt" "os" @@ -141,6 +143,35 @@ func TestStatusUnknownService(t *testing.T) { if r.Status != 255 { t.Fatalf("expected status 255, got %d", r.Status) } + if !strings.Contains(r.Msg, "unknown service") { + t.Fatalf("expected error message containing 'unknown service', got %q", r.Msg) + } +} + +func TestErrorMessagePropagation(t *testing.T) { + bin := buildOnce(t) + e := newEnv(t, bin, svcA()) + e.StartSupervisor("--init") + e.WaitForSocket(8 * time.Second) + time.Sleep(500 * time.Millisecond) + + // start unknown service via CLI — should exit non-zero with error message + out, err := e.RunCmd("start", "no-such-svc") + if err == nil { + t.Fatal("expected non-zero exit for unknown service start") + } + if !strings.Contains(out, "unknown service") { + t.Fatalf("expected 'unknown service' in output, got %q", out) + } + + // stop unknown service via CLI + out, err = e.RunCmd("stop", "no-such-svc") + if err == nil { + t.Fatal("expected non-zero exit for unknown service stop") + } + if !strings.Contains(out, "unknown service") { + t.Fatalf("expected 'unknown service' in output, got %q", out) + } } // ─── Add service ──────────────────────────────────────────────────────────── @@ -289,11 +320,39 @@ func TestLog(t *testing.T) { t.Run("log-all", func(t *testing.T) { out, _ := e.RunCmd("log", "--all") - if !strings.Contains(out, "[svc-a ") || !strings.Contains(out, "svc-a-out") { + if !strings.Contains(out, "[svc-a]") || !strings.Contains(out, "svc-a-out") { t.Fatalf("expected log-all content, got: %s", out) } }) + t.Run("log-n-lines", func(t *testing.T) { + out, _ := e.RunCmd("log", "-n", "3", "svc-a") + lines := strings.Split(strings.TrimSpace(out), "\n") + if len(lines) > 3 { + t.Fatalf("expected at most 3 lines, got %d:\n%s", len(lines), out) + } + if len(lines) == 0 || lines[0] == "" { + t.Fatal("expected some log output") + } + }) + + t.Run("log-daemon", func(t *testing.T) { + out, _ := e.RunCmd("log", "daemon") + if len(strings.TrimSpace(out)) == 0 { + t.Fatal("expected daemon log output") + } + }) + + t.Run("log-all-includes-daemon", func(t *testing.T) { + out, _ := e.RunCmd("log", "--all") + if !strings.Contains(out, "[daemon]") { + t.Fatalf("expected [daemon] prefix in --all output, got: %s", out) + } + if !strings.Contains(out, "[svc-a]") { + t.Fatalf("expected [svc-a] prefix in --all output, got: %s", out) + } + }) + t.Run("per-service-log-file", func(t *testing.T) { logPath := filepath.Join(e.Workdir, "svc-a.log") data, err := os.ReadFile(logPath) @@ -319,6 +378,63 @@ func TestLog(t *testing.T) { t.Fatalf("expected MARKER in log, got: %s", data) } }) + + t.Run("log-unknown-service-exits-nonzero", func(t *testing.T) { + cmd := exec.Command(e.Bin, "-c", e.CfgPath, "log", "no-such-svc") + out, err := cmd.CombinedOutput() + if err == nil { + t.Fatal("expected non-zero exit for unknown service") + } + if !strings.Contains(string(out), "unknown service") { + t.Fatalf("expected 'unknown service' error, got: %s", out) + } + }) + + t.Run("log-follow-long-line-intact", func(t *testing.T) { + // 写一条超过 PIPE_BUF (4096) 的长行到日志文件,验证 follow 模式完整输出 + e2 := newEnv(t, bin, ServiceConfig{ + Name: "long-svc", Exec: "/bin/sh", + Args: []string{"-c", "exec sleep 3600"}, + Enabled: true, Watchdog: 30, + }) + e2.StartDaemon() + e2.WaitForSocket(8 * time.Second) + time.Sleep(500 * time.Millisecond) + + logPath := filepath.Join(e2.Workdir, "long-svc.log") + // 5000 字节的长行 = "LONG:" + 4992*"X" + "END\n" + longLine := "LONG:" + strings.Repeat("X", 4992) + "END" + os.WriteFile(logPath, []byte(longLine+"\n"), 0o644) + + // -n 0 -f: 不输出旧内容,进入 follow 模式 + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + cmd := exec.CommandContext(ctx, e2.Bin, "-c", e2.CfgPath, "-n", "0", "-f", "log", "long-svc") + var stdout bytes.Buffer + cmd.Stdout = &stdout + cmd.Stderr = &stdout + cmd.Start() + + // 等 follow 模式就绪后追加新长行 + time.Sleep(500 * time.Millisecond) + f, _ := os.OpenFile(logPath, os.O_APPEND|os.O_WRONLY, 0o644) + marker := "FOLLOW:" + strings.Repeat("Y", 4990) + "OK" + f.WriteString(marker + "\n") + f.Close() + + // 等待输出 + time.Sleep(1 * time.Second) + cancel() + cmd.Wait() + + got := stdout.String() + if !strings.Contains(got, "FOLLOW:") || !strings.Contains(got, "OK") { + t.Fatalf("expected complete long line in follow output, got %d bytes: %s", len(got), got[:min(200, len(got))]) + } + if !strings.Contains(got, marker) { + t.Fatalf("long line was split or truncated: expected %d bytes marker, got: %s", len(marker), got[:min(200, len(got))]) + } + }) } // ─── Reload ───────────────────────────────────────────────────────────────── @@ -975,10 +1091,28 @@ func TestLogRotation(t *testing.T) { e.StartDaemon("ZIGINIT_LOG_MAX_KB=4") e.WaitForSocket(8 * time.Second) + // 等待轮转发生 e.WaitForCondition(10*time.Second, "log rotation", func() bool { log := e.DaemonLog() return strings.Contains(log, "log rotated") }) + + // 验证 gzip 备份文件存在 + gzPath := filepath.Join(e.Workdir, "noisy.log.1.gz") + e.WaitForCondition(5*time.Second, "gzip backup", func() bool { + info, err := os.Stat(gzPath) + return err == nil && info.Size() > 0 + }) + + // 验证 .gz 文件是有效的 gzip 格式(magic number: 1f 8b) + data, err := os.ReadFile(gzPath) + if err != nil { + t.Fatalf("read gzip backup: %v", err) + } + if len(data) < 2 || data[0] != 0x1f || data[1] != 0x8b { + t.Fatalf("not a valid gzip file: first 2 bytes = %x %x", data[0], data[1]) + } + t.Logf("gzip backup OK: %s (%d bytes)", gzPath, len(data)) } // ─── Stop all cleanup ─────────────────────────────────────────────────────── diff --git a/tools/ziginit/tests/helpers_test.go b/tools/ziginit/tests/helpers_test.go index 7e587f0..fe3dc7c 100644 --- a/tools/ziginit/tests/helpers_test.go +++ b/tools/ziginit/tests/helpers_test.go @@ -2,6 +2,7 @@ package ziginit_test import ( "encoding/json" + "io" "net" "os" "os/exec" @@ -153,6 +154,7 @@ func (e *Env) RunCmd(args ...string) (string, error) { type CmdResult struct { Status byte Extra byte + Msg string } func (e *Env) SendCmdRaw(cmd byte, name string) (CmdResult, error) { @@ -169,11 +171,24 @@ func (e *Env) SendCmdRaw(cmd byte, name string) (CmdResult, error) { if _, err := conn.Write(req); err != nil { return CmdResult{}, err } + // Read 2-byte status resp := make([]byte, 2) - if _, err := conn.Read(resp); err != nil { + if _, err := io.ReadFull(conn, resp); err != nil { return CmdResult{}, err } - return CmdResult{Status: resp[0], Extra: resp[1]}, nil + result := CmdResult{Status: resp[0], Extra: resp[1]} + // Read 2-byte msg_len (LE) + lenBuf := make([]byte, 2) + if _, err := io.ReadFull(conn, lenBuf); err != nil { + return result, nil // graceful: old protocol or partial read + } + msgLen := int(lenBuf[0]) | int(lenBuf[1])<<8 + if msgLen > 0 { + msgBuf := make([]byte, msgLen) + n, _ := io.ReadFull(conn, msgBuf) + result.Msg = string(msgBuf[:n]) + } + return result, nil } func (e *Env) SendCmd(cmd byte, name string) CmdResult {