Skip to main content

server-core/identity/contacts

packages/server/src/identity/contacts

Purpose

Contact identity server internals.

Public surface

ContactAcceptResult

Interface
export interface ContactAcceptResult {
  readonly contact: Contact;
  readonly requesterUserId: UserId;
  readonly transitioned: boolean;
}

ContactCreateInput

Interface
export interface ContactCreateInput {
  readonly contactUserId: UserId;
  readonly relationship?: string;
}

contactsAccept

Variable
export const contactsAccept: ServerHandler<typeof ContactsAccept> = (params)

contactsAdd

Variable
export const contactsAdd: ServerHandler<typeof ContactsAdd> = (params)

ContactService

Interface
export interface ContactService {
  areInContact(userIdA: UserId, userIdB: UserId): Effect.Effect<boolean, never>;
}

contactsList

Variable
export const contactsList: ServerHandler<typeof ContactsList> = (params)

ContactsListInput

Interface
export interface ContactsListInput {
  readonly limit?: number;
  readonly cursor?: string;
}

ContactsListPage

Interface
export interface ContactsListPage {
  readonly contacts: readonly Contact[];
  readonly nextCursor?: ListCursor;
}

ContactsService

Class
export class ContactsService {
  constructor(private readonly db: Db) {}

  list(
    owner: UserId,
    input: ContactsListInput,
  ): Effect.Effect<ContactsListPage, InvalidCursorError> {
    const limit = input.limit ?? DEFAULT_PAGE_LIMIT;
    return Effect.gen(this, function* () {
      const pos =
        input.cursor === undefined
          ? undefined
          : yield* decodeListCursor(input.cursor);
      return yield* catchSqlErrorAsDefect(
        Effect.gen(this, function* () {
          let query = this.db
            .selectFrom("contacts")
            .selectAll()
            .where("owner_user_id", "=", owner);
          if (pos !== undefined) {
            query = query.where((eb) =>
              keysetWhere(
                eb,
                { sortKey: sortKeyExpr(eb, "created_at"), id: "id" },
                pos,
              ),
            );
          }
          const rows = yield* query
            .orderBy((eb) => sortKeyExpr(eb, "created_at"), "desc")
            .orderBy("id", "asc")
            .limit(limit + 1);
          const { page, nextCursor } = paginate(
            rows,
            limit,
            positionOfContactRow,
          );
          return {
            contacts: page.map(rowToContact),
            ...(nextCursor !== undefined ? { nextCursor } : {}),
          };
        }),
      );
    });
  }

  add(
    owner: UserId,
    input: ContactCreateInput,
  ): Effect.Effect<Contact, ForbiddenError | ConflictError> {
    if (input.contactUserId === owner) {
      return Effect.fail(new ForbiddenError({ message: ERR_SELF_ADD }));
    }
    return catchSqlErrorAsDefect(
      Effect.gen(this, function* () {
        const inserted = yield* this.db
          .insertInto("contacts")
          .values({
            owner_user_id: owner,
            contact_user_id: input.contactUserId,
            relationship: input.relationship ?? null,
            status: "pending",
          })
          .onConflict((oc) =>
            oc.columns(["owner_user_id", "contact_user_id"]).doNothing(),
          )
          .returningAll();
        if (inserted.length === 0) {
          return yield* Effect.fail(
            new ConflictError({ message: ERR_DUPLICATE }),
          );
        }
        return rowToContact(inserted[0]!);
      }),
    );
  }

  accept(
    owner: UserId,
    id: ContactId,
  ): Effect.Effect<ContactAcceptResult, ContactNotFoundError | ForbiddenError> {
    return catchSqlErrorAsDefect(
      Effect.gen(this, function* () {
        const updated = yield* this.markPendingContactAccepted(owner, id);
        if (updated.length === 0) {
          return yield* this.resolveAlreadyAcceptedContact(owner, id);
        }

        const row = updated[0]!;
        yield* this.upsertMirroredAcceptedContact(row);
        return {
          contact: rowToContact(row),
          requesterUserId: row.owner_user_id,
          transitioned: true,
        };
      }),
    );
  }

  private markPendingContactAccepted(owner: UserId, id: ContactId) {
    return this.db
      .updateTable("contacts")
      .set({ status: "accepted" })
      .where("id", "=", id)
      .where("contact_user_id", "=", owner)
      .where("status", "=", "pending")
      .returningAll();
  }

  private resolveAlreadyAcceptedContact(owner: UserId, id: ContactId) {
    return Effect.gen(this, function* () {
      const existing = yield* this.db
        .selectFrom("contacts")
        .selectAll()
        .where("id", "=", id);
      if (existing.length === 0) {
        return yield* Effect.fail(
          new ContactNotFoundError({ message: ERR_NOT_FOUND }),
        );
      }

ContactsServiceLive

Variable
export const ContactsServiceLive = Layer.effect(
  ContactsServiceTag,
  Effect.gen(function* () {
    const db = yield* DbTag;
    return new ContactsService(db);
  }).pipe(Effect.withSpan("ContactsServiceLive")),
)

ContactsServiceTag

Class
export class ContactsServiceTag extends Context.Tag("moltzap/ContactsService")<
  ContactsServiceTag,
  ContactsService
>() {}

WebhookContactService

Class
export class WebhookContactService implements ContactService {
  constructor(
    private readonly httpClient: HttpClient.HttpClient,
    private readonly url: string,
    private readonly timeoutMs: number,
  ) {}

  areInContact(
    userIdA: UserId,
    userIdB: UserId,
  ): Effect.Effect<boolean, never> {
    return this.httpClient
      .execute(
        HttpClientRequest.post(this.url).pipe(
          HttpClientRequest.setHeader("X-MoltZap-Event", EVENT_NAME),
          HttpClientRequest.bodyUnsafeJson({ userIdA, userIdB }),
        ),
      )
      .pipe(
        // Drain the response body unconditionally before `filterStatusOk`.
        // `response.text` is `Effect.cached`, so the subsequent
        // `schemaBodyJson` reuses the same buffer on 2xx without
        // re-reading the socket; on non-2xx, this prevents the body
        // from being left for the FinalizationRegistry to reap.
        Effect.tap((response) => response.text),
        Effect.flatMap(HttpClientResponse.filterStatusOk),
        Effect.flatMap(
          HttpClientResponse.schemaBodyJson(ContactsCheckResponse),
        ),
        Effect.timeout(Duration.millis(this.timeoutMs)),
        Effect.map((result) => result.inContact),
        Effect.catchAll((err) =>
          Effect.logError(
            "Contact check webhook failed, rejecting contact",
          ).pipe(
            Effect.annotateLogs({ err, userIdA, userIdB, url: this.url }),
            Effect.as(false),
          ),
        ),
      );
  }
}

Files

  • contact-policy.ts
  • contact.service.ts
  • handlers.ts
  • layer.ts
  • webhook-contact-service.ts