Durable Objects で同期待機を実装する — クロージャと alarm の組み合わせ
A2H の core は「agent からのリクエストを最大 60 秒ブロックし、human が回答したら unblock する」という同期待機の仕組みだ。Cloudflare Workers の Durable Objects(DO)で実装した。
構成
agent → Worker → InboxItemDO.waitForAnswer() → (blocking)
↑ takeru が回答
takeru → Worker → InboxItemDO.setAnswered() → resolve
Worker は DO の RPC waitForAnswer を呼んだあと、その Promise が resolve されるまでブロックする。takeru が回答すると別の Worker リクエストが setAnswered を呼び、DO 内部で resolve を実行する。
DO 内部の仕組み
export class InboxItemDO extends DurableObject<Env> {
private resolve: ((r: AnswerResult | null) => void) | null = null;
async waitForAnswer(timeout_ms = 60_000): Promise<AnswerResult | null> {
return new Promise((resolve) => {
this.resolve = resolve;
setTimeout(() => resolve(null), timeout_ms);
});
}
async setAnswered(content: string, answer_seconds: number) {
const answered_at = Math.floor(Date.now() / 1000);
// まず status をストレージに永続化してから resolve する。
await this.ctx.storage.put({
status: 'answered', answer: content, answered_at,
human_seconds: answer_seconds,
});
if (this.resolve) {
this.resolve({ answer: content, answered_at, human_seconds: answer_seconds });
this.resolve = null;
}
}
}
this.resolve はメモリ上にしか存在しない。storage.put() で永続化されるデータではなく、DO がメモリに載っている間だけ有効なクロージャだ。DO が evict されてから別リクエストで復活したとき、この値は null に戻っている。ただし waitForAnswer の呼び出し自体が DO をメモリに留めるため、v1 の用途(60s の待機中に evict される)では実用上問題が起きない。
回答が先に来るレース
「1 問い合わせ=1 リクエスト」ではない。ひとつの問い合わせには 2 つのリクエストが関わる。agent の waitForAnswer と、takeru の setAnswered だ。両方が同じ InboxItemDO を叩く。
DO はシングルスレッドで動くので、この 2 つの RPC は同時には走らない。必ずどちらかが先に完走する。問題は順序が保証されないことだ。ふつうは waitForAnswer が先に待機に入り、あとから setAnswered が this.resolve を呼ぶ。だが takeru の回答がうんと速いと、waitForAnswer がまだ this.resolve をセットする前に setAnswered が走り切ることがある。このとき this.resolve は null なので setAnswered の if (this.resolve) は素通りし、回答はメモリ上のクロージャには届かない。
これを取りこぼさないために、setAnswered は this.resolve を呼ぶ前に status をストレージに書く。そして waitForAnswer は待機に入る前に status を読む。
async waitForAnswer(timeout_ms = 60_000): Promise<AnswerResult | null> {
const status = await this.ctx.storage.get('status');
// 待機に入る前に「もう回答済み」なら、ストレージから直接返す。
if (status === 'answered') return this.readAnswerFromStorage();
if (status !== 'queued') return null; // expired 済み
// ここで初めて Promise の待機に入る
return new Promise((resolve) => { this.resolve = resolve; /* ... */ });
}
メモリ上のクロージャ(速いが揮発する経路)とストレージ(遅いが確実な経路)の二段構え。setAnswered が先でも後でも、どちらかの経路で回答が拾える。
alarm でタイムアウトを保証する
当初の設計ドキュメントは「alarm は不要」としていた。waitForAnswer の setTimeout でタイムアウトを処理すれば十分と考えていたためだ。
しかし Cloudflare Workers にはランタイム更新時に Worker プロセスを 30 秒で強制終了する仕組みがある(週数回の頻度)。このとき waitForAnswer を待っていた fetch も切れ、InboxItemDO の setTimeout も発火しない。DO は status: "queued" のまま残り、HumanInboxDO.queue からも消えない。takeru の受信箱に、agent がとっくに諦めたゾンビ item が残り続けることになる。
Cloudflare の公式ドキュメントを確認したところ、放置 DO の掃除は「DO が自分で alarm をセットして自己掃除する」のが推奨パターンと明記されていた。この設計を採用して、init 時に ~70s の alarm を 1 回だけセットする形にした。
async init(id: string, ...): Promise<{ created_at: number }> {
const created_at = Math.floor(Date.now() / 1000);
await this.ctx.storage.setAlarm((created_at + 70) * 1000);
// ...
}
async alarm(): Promise<void> {
const status = await this.ctx.storage.get('status');
if (status === 'queued') {
await this.ctx.storage.put('status', 'expired');
// id はストレージから読む(この記事の最後の節を参照)。
const id = await this.ctx.storage.get('id');
if (id) await this.env.HUMAN_INBOX.getByName('takeru').remove(id);
}
await this.cleanup(); // deleteAlarm + deleteAll
}
setTimeout と alarm の役割は異なる。setTimeout は「agent に素早く 408 を返す」ための Worker 側のタイムアウトだ。alarm は「Worker が死んでいても DO を確実に掃除する」ためのセーフネットで、DO ランタイムが独立して発火する。両方を持つことで、正常系は setTimeout が先に発火し、Worker が強制終了された場合は alarm が後から回収する。
deleteAll() を使う理由
Cloudflare のドキュメントは「ストレージを使ったなら、個別キーを削除するだけでは不十分。deleteAll() と deleteAlarm() を呼ぶこと」と明記している。個別キーを全部消しても、DO の裏にある SQLite データベースは残る。空の状態でも約 12KB の床があり、これが課金対象であり続ける。deleteAll() を呼んで初めてストレージが完全に消え、DO が存在を止める。
A2H の InboxItemDO は 1 問い合わせにつき 1 個生成される短命 DO だ。掃除を忘れると、問い合わせの数だけ 12KB の DO が残り積もっていく。
private async cleanup(): Promise<void> {
await this.ctx.storage.deleteAlarm();
await this.ctx.storage.deleteAll();
}
deleteAll() は init 以前のメタデータも含めてすべてを消す。タイムアウト・回答済み・いずれの終端でも必ずこの cleanup() を呼ぶ。
init(id, ...) に変更した経緯
設計ドキュメント(durable-objects.md)は「InboxItemDO.init に id を渡さない」としていた。id は Worker が init 前に生成して DO のアドレスに使うもので、init が受け取るのは循環に見えたためだ。
実装中に問題が出た。alarm が発火したとき、DO は HumanInboxDO.remove(id) を呼んで自分をキューから除去する必要がある。しかし DO には自分の名前(idFromName で使った文字列)を知る手段がない。ctx.id は DO の内部 DurableObjectId であり、Worker が idFromName("chatcmpl-01abc...") に渡した文字列とは別物だ。
この問題を解消するため、init(id, model, messages, ...) と id を第 1 引数に追加して、ストレージに保存する設計に変更した。