Skip to main content

protocol/testing/conformance/task

packages/protocol/src/testing/conformance/task

Purpose

Public barrel for task-layer conformance properties. Task-layer conformance properties. Task / conversation / message invariants — fan-out cardinality, store-and-replay, payload opacity, task-boundary isolation, conversation lifecycle, archive lifecycle, model equivalence, task-close lifecycle. Each register* lives in its own file. This barrel re-exports them by name AND aggregates them into TASK_PROPERTIES for the _shared/suite.ts aggregator.

Public surface

acquireClient

Function
export function acquireClient(
  ctx: ConformanceRunContext,
  name: string,
): Effect.Effect<ConversationActor, string, Scope.Scope>

acquireConversation

Function
export function acquireConversation(
  ctx: ConformanceRunContext,
  n: number,
  namePrefix: string,
): Effect.Effect<ConversationFixture, string, Scope.Scope>

acquirePropertyConversation

Function
export function acquirePropertyConversation(
  ctx: ConformanceRunContext,
  propertyName: string,
  namePrefix: string,
): Effect.Effect<ConversationFixture, PropertyInvariantViolation, Scope.Scope>

agent

Property
  readonly agent: TestAgent;
  readonly client: TestClient;

  /**
   * Per-client historical notification buffer (#645): the consolidated
   * `TestClient.subscribe` only emits frames arriving AFTER
   * materialisation, so a sequential `send → awaitOneNotification` races
   * the response frame. The buffer is fed by a long-lived
   * `subscribeAll()` pump installed at `acquireClient` time;
   * `awaitOneNotification` consumes the buffer so frames that arrived
   * between the triggering RPC and the wait are still observable. This
   * mirrors `@moltzap/server-core/test-utils → connectTestClient` (the
   * `makeNotificationBuffer` JSDoc below covers the design).
   */
  readonly notifications: NotificationBuffer;
};

/**
 * Historical notification buffer used by `awaitOneNotification`. Holds
 * every inbound notification arriving on a single `TestClient`'s
 * `subscribeAll()` Stream until a consumer pulls a matching frame.
 *
 * The `snapshot` and `closed` fields are the only public surfaces;
 * the pump fiber that feeds them is interrupted by the enclosing
 * Scope finalizer installed by `makeNotificationBuffer`. `closed` is
 * set to true when the transport-side stream terminates (either via
 * `TransportClosedError` or normal exhaustion); `awaitOneNotification`
 * consumes it to surface "Connection closed" rather than masquerading
 * a missing notification as a timeout.
 */
export interface NotificationBuffer {

archiveConversation

Function
export function archiveConversation(
  actor: ConversationActor,
  taskId: TaskId,
  conversationId: ConversationId,
)

assertConversationRejectsMessages

Function
export function assertConversationRejectsMessages(
  input: AssertConversationRejectsMessagesInput,
): Effect.Effect<void, PropertyInvariantViolation>

AssertConversationRejectsMessagesInput

Interface
export interface AssertConversationRejectsMessagesInput {
  readonly actor: ConversationActor;
  readonly taskId: TaskId;
  readonly conversationId: ConversationId;
  readonly propertyName: string;
  readonly expectedError?: { readonly code: number; readonly label: string };
}

awaitOneNotification

Function
export function awaitOneNotification<D extends AnyNotificationDefinition>(
  buffer: NotificationBuffer,
  definition: D,
  timeoutMs: number,
): Effect.Effect<DecodedNotification<D>, string>
Stream-based one-shot waiter for protocol-side conformance helpers. Replaces the deleted TestClient.waitForNotification(def, timeoutMs) polling shape (#645). Consumes the per-client historical NotificationBuffer populated by the subscribeAll() pump installed at acquireClient time, so sequential send → awaitOneNotification patterns observe frames that arrived between the triggering RPC and the wait — the legacy polling semantic preserved without resurrecting the deleted per-definition dedup ring. Mirrors @moltzap/server-core/test-utils → awaitOneNotification. Surfaces a single string message on either timeout or stream exhaustion so call sites preserve the legacy e.message-style error mapper without re-deriving a tagged error type per definition.

client

Property
  readonly client: TestClient;

  /**
   * Per-client historical notification buffer (#645): the consolidated
   * `TestClient.subscribe` only emits frames arriving AFTER
   * materialisation, so a sequential `send → awaitOneNotification` races
   * the response frame. The buffer is fed by a long-lived
   * `subscribeAll()` pump installed at `acquireClient` time;
   * `awaitOneNotification` consumes the buffer so frames that arrived
   * between the triggering RPC and the wait are still observable. This
   * mirrors `@moltzap/server-core/test-utils → connectTestClient` (the
   * `makeNotificationBuffer` JSDoc below covers the design).
   */
  readonly notifications: NotificationBuffer;
};

/**
 * Historical notification buffer used by `awaitOneNotification`. Holds
 * every inbound notification arriving on a single `TestClient`'s
 * `subscribeAll()` Stream until a consumer pulls a matching frame.
 *
 * The `snapshot` and `closed` fields are the only public surfaces;
 * the pump fiber that feeds them is interrupted by the enclosing
 * Scope finalizer installed by `makeNotificationBuffer`. `closed` is
 * set to true when the transport-side stream terminates (either via
 * `TransportClosedError` or normal exhaustion); `awaitOneNotification`
 * consumes it to surface "Connection closed" rather than masquerading
 * a missing notification as a timeout.
 */
export interface NotificationBuffer {

ConversationActor

TypeAlias
export type ConversationActor = {
  readonly agent: TestAgent;
  readonly client: TestClient;

  /**
   * Per-client historical notification buffer (#645): the consolidated
   * `TestClient.subscribe` only emits frames arriving AFTER
   * materialisation, so a sequential `send → awaitOneNotification` races
   * the response frame. The buffer is fed by a long-lived
   * `subscribeAll()` pump installed at `acquireClient` time;
   * `awaitOneNotification` consumes the buffer so frames that arrived
   * between the triggering RPC and the wait are still observable. This
   * mirrors `@moltzap/server-core/test-utils → connectTestClient` (the
   * `makeNotificationBuffer` JSDoc below covers the design).
   */
  readonly notifications: NotificationBuffer;
};

ConversationFixture

Interface
export interface ConversationFixture {
  readonly owner: ConversationActor;
  readonly participants: ReadonlyArray<ConversationActor>;
  readonly taskId: TaskId;
  readonly conversationId: ConversationId;
}

DELIVERY_CATEGORY

Variable
export const DELIVERY_CATEGORY = "delivery" as const

DELIVERY_DEFAULT_CAPTURE_CAPACITY

Variable
export const DELIVERY_DEFAULT_CAPTURE_CAPACITY = 256

DELIVERY_DEFAULT_PROPERTY_NUM_RUNS

Variable
export const DELIVERY_DEFAULT_PROPERTY_NUM_RUNS = 3

DELIVERY_DEFAULT_TIMEOUT_MS

Variable
export const DELIVERY_DEFAULT_TIMEOUT_MS = 5000

deliveryViolation

Function
export function deliveryViolation(
  name: string,
  reason: string,
): PropertyInvariantViolation

firstParticipant

Function
export function firstParticipant(
  fixture: ConversationFixture,
  propertyName: string,
): Effect.Effect<ConversationActor, PropertyInvariantViolation>

fixtureN

Function
export function fixtureN(requested: number): number

moderateAs

Function
export function moderateAs(
  owner: ConversationActor,
  namePrefix: string,
): Effect.Effect<ModeratedHandle, string, Scope.Scope>

ModeratedHandle

Interface
export interface ModeratedHandle {
  readonly appId: Static<typeof AppIdSchema>;

  /**
   * Block until the moderator has observed `expectedAgentIds` as
   * participants of `conversationId` via
   * `task/conversation/participants/added` notifications. Bridges
   * the gap between the create RPC returning and the notification
   * arriving on the moderator's subscriber.
   */
  readonly awaitConversationReady: (
    conversationId: ConversationId,
    expectedAgentIds: ReadonlyArray<Static<typeof AgentId>>,
  ) => Effect.Effect<void, string>;
}

NotificationBuffer

Interface
export interface NotificationBuffer {
  readonly snapshot: Ref.Ref<
    ReadonlyArray<DecodedNotification<AnyNotificationDefinition>>
  >;
  readonly closed: Ref.Ref<boolean>;
}
Historical notification buffer used by awaitOneNotification. Holds every inbound notification arriving on a single TestClient’s subscribeAll() Stream until a consumer pulls a matching frame. The snapshot and closed fields are the only public surfaces; the pump fiber that feeds them is interrupted by the enclosing Scope finalizer installed by makeNotificationBuffer. closed is set to true when the transport-side stream terminates (either via TransportClosedError or normal exhaustion); awaitOneNotification consumes it to surface “Connection closed” rather than masquerading a missing notification as a timeout.

notifications

Property
  readonly notifications: NotificationBuffer;
};

/**
 * Historical notification buffer used by `awaitOneNotification`. Holds
 * every inbound notification arriving on a single `TestClient`'s
 * `subscribeAll()` Stream until a consumer pulls a matching frame.
 *
 * The `snapshot` and `closed` fields are the only public surfaces;
 * the pump fiber that feeds them is interrupted by the enclosing
 * Scope finalizer installed by `makeNotificationBuffer`. `closed` is
 * set to true when the transport-side stream terminates (either via
 * `TransportClosedError` or normal exhaustion); `awaitOneNotification`
 * consumes it to surface "Connection closed" rather than masquerading
 * a missing notification as a timeout.
 */
export interface NotificationBuffer {
Per-client historical notification buffer (#645): the consolidated TestClient.subscribe only emits frames arriving AFTER materialisation, so a sequential send → awaitOneNotification races the response frame. The buffer is fed by a long-lived subscribeAll() pump installed at acquireClient time; awaitOneNotification consumes the buffer so frames that arrived between the triggering RPC and the wait are still observable. This mirrors @moltzap/server-core/test-utils → connectTestClient (the makeNotificationBuffer JSDoc below covers the design).

registerArchiveLifecycle

Function
export function registerArchiveLifecycle(ctx: ConformanceRunContext): void

registerConversationLifecycle

Function
export function registerConversationLifecycle(
  ctx: ConformanceRunContext,
): void

registerFanOutCardinality

Function
export function registerFanOutCardinality(ctx: ConformanceRunContext): void

registerModelEquivalence

Function
export function registerModelEquivalence(ctx: ConformanceRunContext): void

registerPayloadOpacity

Function
export function registerPayloadOpacity(ctx: ConformanceRunContext): void

registerStoreAndReplay

Function
export function registerStoreAndReplay(ctx: ConformanceRunContext): void

registerTaskBoundaryIsolation

Function
export function registerTaskBoundaryIsolation(
  ctx: ConformanceRunContext,
): void

registerTaskCloseLifecycle

Function
export function registerTaskCloseLifecycle(ctx: ConformanceRunContext): void

registerTaskConversationAddParticipant

Function
export function registerTaskConversationAddParticipant(
  ctx: ConformanceRunContext,
): void

registerTaskConversationArchiveDenied

Function
export function registerTaskConversationArchiveDenied(
  ctx: ConformanceRunContext,
): void

registerTaskConversationCreateAndList

Function
export function registerTaskConversationCreateAndList(
  ctx: ConformanceRunContext,
): void

registerTaskConversationCreateDenied

Function
export function registerTaskConversationCreateDenied(
  ctx: ConformanceRunContext,
): void

registerTaskConversationRemoveParticipant

Function
export function registerTaskConversationRemoveParticipant(
  ctx: ConformanceRunContext,
): void

registerTaskCreate

Function
export function registerTaskCreate(ctx: ConformanceRunContext): void

registerTaskLeave

Function
export function registerTaskLeave(ctx: ConformanceRunContext): void

registerTaskRequestReject

Function
export function registerTaskRequestReject(ctx: ConformanceRunContext): void

sendText

Function
export function sendText(
  actor: ConversationActor,
  taskId: TaskId,
  conversationId: ConversationId,
  text: string,
)

TASK_CONVERSATION_FAMILY_PROPERTIES

Variable
export const TASK_CONVERSATION_FAMILY_PROPERTIES: ReadonlyArray<
  (ctx: ConformanceRunContext) => void
> = [
  registerTaskCreate,
  registerTaskRequestReject,
  registerTaskLeave,
  registerTaskConversationCreateAndList,
  registerTaskConversationCreateDenied,
  registerTaskConversationArchiveDenied,
  registerTaskConversationAddParticipant,
  registerTaskConversationRemoveParticipant,
]

TASK_PROPERTIES

Variable
export const TASK_PROPERTIES: ReadonlyArray<
  (ctx: ConformanceRunContext) => void
> = [
  registerFanOutCardinality,
  registerStoreAndReplay,
  registerPayloadOpacity,
  registerTaskBoundaryIsolation,
  registerConversationLifecycle,
  registerTaskCloseLifecycle,
  registerArchiveLifecycle,
  ...TASK_CONVERSATION_FAMILY_PROPERTIES,
  registerModelEquivalence,
]
All task-layer property registrars, ordered per architect plan §2 (delivery subset first, then model-equivalence from rpc-semantics). Spec D1 additions append to the delivery subset.

unarchiveConversation

Function
export function unarchiveConversation(
  actor: ConversationActor,
  taskId: TaskId,
  conversationId: ConversationId,
)

waitForArchivedEvent

Function
export function waitForArchivedEvent(
  observer: ConversationActor,
  conversationId: ConversationId,
  _byAgentId: AgentId,
  propertyName: string,
): Effect.Effect<void, PropertyInvariantViolation>

waitForConversationCreatedNotification

Function
export function waitForConversationCreatedNotification(
  observer: ConversationActor,
  conversationId: ConversationId,
  propertyName: string,
): Effect.Effect<void, PropertyInvariantViolation>

waitForMessageReceivedNotification

Function
export function waitForMessageReceivedNotification(
  observer: ConversationActor,
  conversationId: ConversationId,
  propertyName: string,
): Effect.Effect<void, PropertyInvariantViolation>

waitForUnarchivedEvent

Function
export function waitForUnarchivedEvent(
  observer: ConversationActor,
  conversationId: ConversationId,
  _byAgentId: AgentId,
  propertyName: string,
): Effect.Effect<void, PropertyInvariantViolation>

Files

  • _helpers.ts
  • archive-lifecycle.ts
  • conversation-lifecycle.ts
  • fan-out-cardinality.ts
  • index.ts
  • model-equivalence.ts
  • payload-opacity.ts
  • store-and-replay.ts
  • task-boundary-isolation.ts
  • task-close-lifecycle.ts
  • task-conversation-family.ts