Skip to main content

server-core/task

packages/server/src/task

Purpose

Task-domain service barrel.

Public surface

TaskAuthorizationService

Class
export class TaskAuthorizationService {
  constructor(private readonly apps: AppEndpointRegistry) {}

  authorizeCreate(
    appId: AppId,
    ctx: ParamsOf<typeof TaskCreate>,
  ): Effect.Effect<TaskCreateVerdict, never> {
    const entry = this.apps.lookupApp(appId);
    if (entry === undefined) {
      return Effect.succeed({
        decision: "reject",
        reason: "app_unreachable",
      });
    }
    const policy = entry.manifest.hooks.task_create;
    switch (policy.kind) {
      case "accept":
        return Effect.succeed({ decision: "accept" });
      case "reject":
        return Effect.succeed({
          decision: "reject",
          reason: policy.reason,
        });
      case "hook": {
        const timeoutMs = policy.timeoutMs;
        return wrapHookEffectWithEnvelope({
          raw: callAppRpc(entry, {
            definition: TaskCreate,
            params: ctx,
          }).pipe(Effect.map((envelope) => envelope.verdict)),
          timeoutMs,
          timeoutLogMessage: "app/task/create timed out",
          timeoutLogContext: { taskId: ctx.taskId, appId, timeoutMs },
          errorLogMessage: "app/task/create error",
          errorLogContext: { taskId: ctx.taskId, appId },
          onTimeout: () => ({
            decision: "reject",
            reason: "timeout",
          }),
          onError: () => ({
            decision: "reject",
            reason: "app_unreachable",
          }),
        });
      }
    }
  }
}

TaskAuthorizationServiceLive

Variable
export const TaskAuthorizationServiceLive = Layer.effect(
  TaskAuthorizationServiceTag,
  Effect.gen(function* () {
    const appEndpointRegistry = yield* AppEndpointRegistryTag;
    return new TaskAuthorizationService(appEndpointRegistry);
  }).pipe(Effect.withSpan("TaskAuthorizationServiceLive")),
)

TaskAuthorizationServiceTag

Class
export class TaskAuthorizationServiceTag extends Context.Tag(
  "moltzap/TaskAuthorizationService",
)<TaskAuthorizationServiceTag, TaskAuthorizationService>() {}

TaskCreateVerdict

TypeAlias
export type TaskCreateVerdict = ResultOf<typeof TaskCreate>["verdict"];

taskLeave

Variable
export const taskLeave: ServerHandler<typeof TaskLeave> = (params)

taskList

Variable
export const taskList: ServerHandler<typeof TaskList> = (params)

taskRequest

Variable
export const taskRequest: ServerHandler<typeof TaskRequest> = (params)

TaskService

Class
export class TaskService {
  constructor(
    private readonly db: Db,
    private readonly conversations: ConversationService,
    private readonly messages: MessageService,
  ) {}

  create(
    initiator: AgentId,
    input: TaskCreateInput,
  ): Effect.Effect<Task, never> {
    return catchSqlErrorAsDefect(
      transaction(this.db, (trx) =>
        Effect.gen(function* () {
          const row = yield* takeFirstOrFail(
            trx
              .insertInto("tasks")
              .values({
                app_id: input.appId,
                initiator_agent_id: initiator,
                status: "waiting",
              })
              .returningAll(),
          );
          // Auto-admit every invited participant at create time. Read
          // paths (`loadTaskWithReadAccess`,
          // `assertAgentInTaskParticipants`, task list scope) gate on
          // `WHERE admitted_at IS NOT NULL`, so a row written with
          // `admitted_at: null` is a pending invite that grants no read
          // access until admitted.
          const admittedAt = new Date();
          yield* trx.insertInto("task_participants").values({
            task_id: row.id,
            agent_id: initiator,
            admitted_at: admittedAt,
          });
          const invited = input.invitedAgentIds ?? [];
          for (const agentId of invited) {
            yield* trx
              .insertInto("task_participants")
              .values({
                task_id: row.id,
                agent_id: agentId,
                admitted_at: admittedAt,
              })
              .onConflict((oc) => oc.doNothing());
          }
          return rowToTask(row);
        }),
      ),
    );
  }

  /**
   * Transition a task from `waiting` to `active` or `failed`. The state
   * machine is `waiting → active | failed`, one-way.
   *
   * The `WHERE status = 'waiting'` guard SQL-enforces the one-way
   * invariant: an UPDATE against an already-transitioned task matches
   * zero rows and `takeFirstOrFail` raises (caught as a defect),
   * rather than silently re-writing a terminal `active`/`failed`/
   * `closed` row. The single guarded UPDATE also means a racing read
   * never observes a stale `waiting` row after the verdict resolves.
   *
   * Returns the updated row so the handler can fan out
   * `agent/task/created { task }` or `task/failed { taskId, reason }`
   * without a second SELECT.
   */
  setStatus(
    id: TaskId,
    status: "active" | "failed",
  ): Effect.Effect<Task, never> {
    return catchSqlErrorAsDefect(
      Effect.gen(this, function* () {
        const row = yield* takeFirstOrFail(
          this.db
            .updateTable("tasks")
            .set({ status })
            .where("id", "=", id)
            .where("status", "=", "waiting")
            .returningAll(),
        );
        return rowToTask(row);
      }),
    );
  }

  get(
    id: TaskId,
    caller: AgentId,
  ): Effect.Effect<
    { task: Task; participants: TaskParticipant[] },
    TaskNotFoundError | ForbiddenError
  > {
    return Effect.gen(this, function* () {
      const task = yield* this.loadTaskWithReadAccess(id, caller);
      const rows = yield* catchSqlErrorAsDefect(
        this.db
          .selectFrom("task_participants")
          .selectAll()
          .where("task_id", "=", id),
      );
      return {
        task,
        participants: rows.map(rowToParticipant),
      };
    });
  }

  list(
    caller: AgentId,
    input: TaskListInput,
  ): Effect.Effect<TaskListPage, InvalidCursorError> {
    const limit = input.limit ?? DEFAULT_PAGE_LIMIT;
    return Effect.gen(this, function* () {
      const pos =
        input.cursor === undefined
          ? undefined
          : yield* decodeListCursor(input.cursor);
      return yield* catchSqlErrorAsDefect(

TaskServiceLive

Variable
export const TaskServiceLive = Layer.effect(
  TaskServiceTag,
  Effect.gen(function* () {
    const db = yield* DbTag;
    const conversations = yield* ConversationServiceTag;
    const messages = yield* MessageServiceTag;
    return new TaskService(db, conversations, messages);
  }).pipe(Effect.withSpan("TaskServiceLive")),
)

TaskServiceTag

Class
export class TaskServiceTag extends Context.Tag("moltzap/TaskService")<
  TaskServiceTag,
  TaskService
>() {}

taskUpdate

Variable
export const taskUpdate: ServerHandler<typeof TaskUpdate> = (params)

Files

  • authorization.ts
  • handlers.ts
  • layer.ts
  • task.service.ts