server-core/task
packages/server/src/task
Purpose
Task-domain service barrel.Public surface
TaskAuthorizationService
Class
export class TaskAuthorizationService {
constructor(private readonly apps: AppEndpointRegistry) {}
authorizeCreate(
appId: AppId,
ctx: ParamsOf<typeof TaskCreate>,
): Effect.Effect<TaskCreateVerdict, never> {
const entry = this.apps.lookupApp(appId);
if (entry === undefined) {
return Effect.succeed({
decision: "reject",
reason: "app_unreachable",
});
}
const policy = entry.manifest.hooks.task_create;
switch (policy.kind) {
case "accept":
return Effect.succeed({ decision: "accept" });
case "reject":
return Effect.succeed({
decision: "reject",
reason: policy.reason,
});
case "hook": {
const timeoutMs = policy.timeoutMs;
return wrapHookEffectWithEnvelope({
raw: callAppRpc(entry, {
definition: TaskCreate,
params: ctx,
}).pipe(Effect.map((envelope) => envelope.verdict)),
timeoutMs,
timeoutLogMessage: "app/task/create timed out",
timeoutLogContext: { taskId: ctx.taskId, appId, timeoutMs },
errorLogMessage: "app/task/create error",
errorLogContext: { taskId: ctx.taskId, appId },
onTimeout: () => ({
decision: "reject",
reason: "timeout",
}),
onError: () => ({
decision: "reject",
reason: "app_unreachable",
}),
});
}
}
}
}
TaskAuthorizationServiceLive
Variable
export const TaskAuthorizationServiceLive = Layer.effect(
TaskAuthorizationServiceTag,
Effect.gen(function* () {
const appEndpointRegistry = yield* AppEndpointRegistryTag;
return new TaskAuthorizationService(appEndpointRegistry);
}).pipe(Effect.withSpan("TaskAuthorizationServiceLive")),
)
TaskAuthorizationServiceTag
Class
export class TaskAuthorizationServiceTag extends Context.Tag(
"moltzap/TaskAuthorizationService",
)<TaskAuthorizationServiceTag, TaskAuthorizationService>() {}
TaskCreateVerdict
TypeAlias
export type TaskCreateVerdict = ResultOf<typeof TaskCreate>["verdict"];
taskLeave
Variable
export const taskLeave: ServerHandler<typeof TaskLeave> = (params)
taskList
Variable
export const taskList: ServerHandler<typeof TaskList> = (params)
taskRequest
Variable
export const taskRequest: ServerHandler<typeof TaskRequest> = (params)
TaskService
Class
export class TaskService {
constructor(
private readonly db: Db,
private readonly conversations: ConversationService,
private readonly messages: MessageService,
) {}
create(
initiator: AgentId,
input: TaskCreateInput,
): Effect.Effect<Task, never> {
return catchSqlErrorAsDefect(
transaction(this.db, (trx) =>
Effect.gen(function* () {
const row = yield* takeFirstOrFail(
trx
.insertInto("tasks")
.values({
app_id: input.appId,
initiator_agent_id: initiator,
status: "waiting",
})
.returningAll(),
);
// Auto-admit every invited participant at create time. Read
// paths (`loadTaskWithReadAccess`,
// `assertAgentInTaskParticipants`, task list scope) gate on
// `WHERE admitted_at IS NOT NULL`, so a row written with
// `admitted_at: null` is a pending invite that grants no read
// access until admitted.
const admittedAt = new Date();
yield* trx.insertInto("task_participants").values({
task_id: row.id,
agent_id: initiator,
admitted_at: admittedAt,
});
const invited = input.invitedAgentIds ?? [];
for (const agentId of invited) {
yield* trx
.insertInto("task_participants")
.values({
task_id: row.id,
agent_id: agentId,
admitted_at: admittedAt,
})
.onConflict((oc) => oc.doNothing());
}
return rowToTask(row);
}),
),
);
}
/**
* Transition a task from `waiting` to `active` or `failed`. The state
* machine is `waiting → active | failed`, one-way.
*
* The `WHERE status = 'waiting'` guard SQL-enforces the one-way
* invariant: an UPDATE against an already-transitioned task matches
* zero rows and `takeFirstOrFail` raises (caught as a defect),
* rather than silently re-writing a terminal `active`/`failed`/
* `closed` row. The single guarded UPDATE also means a racing read
* never observes a stale `waiting` row after the verdict resolves.
*
* Returns the updated row so the handler can fan out
* `agent/task/created { task }` or `task/failed { taskId, reason }`
* without a second SELECT.
*/
setStatus(
id: TaskId,
status: "active" | "failed",
): Effect.Effect<Task, never> {
return catchSqlErrorAsDefect(
Effect.gen(this, function* () {
const row = yield* takeFirstOrFail(
this.db
.updateTable("tasks")
.set({ status })
.where("id", "=", id)
.where("status", "=", "waiting")
.returningAll(),
);
return rowToTask(row);
}),
);
}
get(
id: TaskId,
caller: AgentId,
): Effect.Effect<
{ task: Task; participants: TaskParticipant[] },
TaskNotFoundError | ForbiddenError
> {
return Effect.gen(this, function* () {
const task = yield* this.loadTaskWithReadAccess(id, caller);
const rows = yield* catchSqlErrorAsDefect(
this.db
.selectFrom("task_participants")
.selectAll()
.where("task_id", "=", id),
);
return {
task,
participants: rows.map(rowToParticipant),
};
});
}
list(
caller: AgentId,
input: TaskListInput,
): Effect.Effect<TaskListPage, InvalidCursorError> {
const limit = input.limit ?? DEFAULT_PAGE_LIMIT;
return Effect.gen(this, function* () {
const pos =
input.cursor === undefined
? undefined
: yield* decodeListCursor(input.cursor);
return yield* catchSqlErrorAsDefect(
TaskServiceLive
Variable
export const TaskServiceLive = Layer.effect(
TaskServiceTag,
Effect.gen(function* () {
const db = yield* DbTag;
const conversations = yield* ConversationServiceTag;
const messages = yield* MessageServiceTag;
return new TaskService(db, conversations, messages);
}).pipe(Effect.withSpan("TaskServiceLive")),
)
TaskServiceTag
Class
export class TaskServiceTag extends Context.Tag("moltzap/TaskService")<
TaskServiceTag,
TaskService
>() {}
taskUpdate
Variable
export const taskUpdate: ServerHandler<typeof TaskUpdate> = (params)
Files
authorization.tshandlers.tslayer.tstask.service.ts