export class DispatchAdmissionService {
constructor(
private readonly db: Db,
private readonly apps: AppEndpointRegistry,
private readonly registry: LeaseRegistry,
private readonly conversations: DispatchAdmissionConversations,
) {}
enqueue(
args: EnqueueDispatchRequestArgs,
): Effect.Effect<
{ readonly leaseId: LeaseId; readonly dispatchId: DispatchId },
never,
NetworkSendServiceTag
> {
return catchSqlErrorAsDefect(this.enqueueEffect(args));
}
private enqueueEffect(
args: EnqueueDispatchRequestArgs,
): Effect.Effect<
{ readonly leaseId: LeaseId; readonly dispatchId: DispatchId },
SqlError,
NetworkSendServiceTag
> {
return Effect.gen(this, function* () {
const lookup = yield* lookupAppBoundForConversation(
this.db,
args.conversationId,
);
const binding = yield* this.dispatchLeaseBindingForLookup(args, lookup);
const minted = yield* this.registry.mint(binding);
yield* this.attachDispatchRoundTripFiber(minted.leaseId, lookup, {
conversationId: args.conversationId,
recipientAgentId: args.recipientAgentId,
messageId: args.messageId,
senderAgentId: args.senderAgentId,
parts: args.parts,
attempt: args.attempt,
receivedAt: args.receivedAt,
pending: args.pending,
});
return minted;
});
}
private dispatchLeaseBindingForLookup(
args: EnqueueDispatchRequestArgs,
lookup: AppBoundConversationLookup,
): Effect.Effect<ModeratorBoundLeaseBinding, never> {
const entry = this.apps.lookupApp(lookup.appId);
if (entry === undefined) {
return Effect.die(
new DispatchAppUnavailableError({
appId: lookup.appId,
conversationId: args.conversationId,
}),
);
}
return Effect.succeed({
_tag: "ModeratorBound",
recipientAgentId: args.recipientAgentId,
recipientConnectionId: args.recipientConnectionId,
conversationId: args.conversationId,
appId: lookup.appId,
taskId: lookup.taskId,
moderatorConnectionId: entry.endpoint.connId,
});
}
private attachDispatchRoundTripFiber(
leaseId: LeaseId,
lookup: AppBoundConversationLookup,
params: DispatchRoundTripParams,
): Effect.Effect<void, never, NetworkSendServiceTag> {
return Effect.gen(this, function* () {
const fiber = yield* Effect.forkDaemon(
this.runForkedDispatchRoundTrip(leaseId, lookup, params),
);
yield* this.registry.attachRoundTripFiber(leaseId, fiber);
});
}
private runForkedDispatchRoundTrip(
leaseId: LeaseId,
lookup: AppBoundConversationLookup,
params: DispatchRoundTripParams,
): Effect.Effect<void, never, NetworkSendServiceTag> {
return catchSqlErrorAsDefect(
this.runAppBoundDispatchRoundTrip(leaseId, lookup, params),
);
}
private runAppBoundDispatchRoundTrip(
leaseId: LeaseId,
lookup: AppBoundConversationLookup,
params: DispatchRoundTripParams,
): Effect.Effect<void, SqlError, NetworkSendServiceTag> {
if (this.apps.lookupApp(lookup.appId) === undefined) {
return this.resolveLease(leaseId, {
_tag: "deny",
reason: "app_unavailable",
});
}
return Effect.gen(this, function* () {
const ctx = yield* this.dispatchAuthorizeContext(lookup, params);
const verdict = yield* this.dispatchAuthorize(lookup.appId, ctx);
yield* this.resolveLease(leaseId, dispatchVerdictToLeaseVerdict(verdict));
yield* this.removeDeniedParticipant(verdict, params);
});
}
private dispatchAuthorizeContext(
lookup: AppBoundConversationLookup,
params: DispatchRoundTripParams,
): Effect.Effect<DispatchAuthorizeContext, SqlError> {
return Effect.gen(this, function* () {