server-core/identity/contacts
packages/server/src/identity/contacts
Purpose
Contact identity server internals.Public surface
ContactAcceptResult
Interface
export interface ContactAcceptResult {
readonly contact: Contact;
readonly requesterUserId: UserId;
readonly transitioned: boolean;
}
ContactCreateInput
Interface
export interface ContactCreateInput {
readonly contactUserId: UserId;
readonly relationship?: string;
}
contactsAccept
Variable
export const contactsAccept: ServerHandler<typeof ContactsAccept> = (params)
contactsAdd
Variable
export const contactsAdd: ServerHandler<typeof ContactsAdd> = (params)
ContactService
Interface
export interface ContactService {
areInContact(userIdA: UserId, userIdB: UserId): Effect.Effect<boolean, never>;
}
contactsList
Variable
export const contactsList: ServerHandler<typeof ContactsList> = (params)
ContactsListInput
Interface
export interface ContactsListInput {
readonly limit?: number;
readonly cursor?: string;
}
ContactsListPage
Interface
export interface ContactsListPage {
readonly contacts: readonly Contact[];
readonly nextCursor?: ListCursor;
}
ContactsService
Class
export class ContactsService {
constructor(private readonly db: Db) {}
list(
owner: UserId,
input: ContactsListInput,
): Effect.Effect<ContactsListPage, InvalidCursorError> {
const limit = input.limit ?? DEFAULT_PAGE_LIMIT;
return Effect.gen(this, function* () {
const pos =
input.cursor === undefined
? undefined
: yield* decodeListCursor(input.cursor);
return yield* catchSqlErrorAsDefect(
Effect.gen(this, function* () {
let query = this.db
.selectFrom("contacts")
.selectAll()
.where("owner_user_id", "=", owner);
if (pos !== undefined) {
query = query.where((eb) =>
keysetWhere(
eb,
{ sortKey: sortKeyExpr(eb, "created_at"), id: "id" },
pos,
),
);
}
const rows = yield* query
.orderBy((eb) => sortKeyExpr(eb, "created_at"), "desc")
.orderBy("id", "asc")
.limit(limit + 1);
const { page, nextCursor } = paginate(
rows,
limit,
positionOfContactRow,
);
return {
contacts: page.map(rowToContact),
...(nextCursor !== undefined ? { nextCursor } : {}),
};
}),
);
});
}
add(
owner: UserId,
input: ContactCreateInput,
): Effect.Effect<Contact, ForbiddenError | ConflictError> {
if (input.contactUserId === owner) {
return Effect.fail(new ForbiddenError({ message: ERR_SELF_ADD }));
}
return catchSqlErrorAsDefect(
Effect.gen(this, function* () {
const inserted = yield* this.db
.insertInto("contacts")
.values({
owner_user_id: owner,
contact_user_id: input.contactUserId,
relationship: input.relationship ?? null,
status: "pending",
})
.onConflict((oc) =>
oc.columns(["owner_user_id", "contact_user_id"]).doNothing(),
)
.returningAll();
if (inserted.length === 0) {
return yield* Effect.fail(
new ConflictError({ message: ERR_DUPLICATE }),
);
}
return rowToContact(inserted[0]!);
}),
);
}
accept(
owner: UserId,
id: ContactId,
): Effect.Effect<ContactAcceptResult, ContactNotFoundError | ForbiddenError> {
return catchSqlErrorAsDefect(
Effect.gen(this, function* () {
const updated = yield* this.markPendingContactAccepted(owner, id);
if (updated.length === 0) {
return yield* this.resolveAlreadyAcceptedContact(owner, id);
}
const row = updated[0]!;
yield* this.upsertMirroredAcceptedContact(row);
return {
contact: rowToContact(row),
requesterUserId: row.owner_user_id,
transitioned: true,
};
}),
);
}
private markPendingContactAccepted(owner: UserId, id: ContactId) {
return this.db
.updateTable("contacts")
.set({ status: "accepted" })
.where("id", "=", id)
.where("contact_user_id", "=", owner)
.where("status", "=", "pending")
.returningAll();
}
private resolveAlreadyAcceptedContact(owner: UserId, id: ContactId) {
return Effect.gen(this, function* () {
const existing = yield* this.db
.selectFrom("contacts")
.selectAll()
.where("id", "=", id);
if (existing.length === 0) {
return yield* Effect.fail(
new ContactNotFoundError({ message: ERR_NOT_FOUND }),
);
}
ContactsServiceLive
Variable
export const ContactsServiceLive = Layer.effect(
ContactsServiceTag,
Effect.gen(function* () {
const db = yield* DbTag;
return new ContactsService(db);
}).pipe(Effect.withSpan("ContactsServiceLive")),
)
ContactsServiceTag
Class
export class ContactsServiceTag extends Context.Tag("moltzap/ContactsService")<
ContactsServiceTag,
ContactsService
>() {}
WebhookContactService
Class
export class WebhookContactService implements ContactService {
constructor(
private readonly httpClient: HttpClient.HttpClient,
private readonly url: string,
private readonly timeoutMs: number,
) {}
areInContact(
userIdA: UserId,
userIdB: UserId,
): Effect.Effect<boolean, never> {
return this.httpClient
.execute(
HttpClientRequest.post(this.url).pipe(
HttpClientRequest.setHeader("X-MoltZap-Event", EVENT_NAME),
HttpClientRequest.bodyUnsafeJson({ userIdA, userIdB }),
),
)
.pipe(
// Drain the response body unconditionally before `filterStatusOk`.
// `response.text` is `Effect.cached`, so the subsequent
// `schemaBodyJson` reuses the same buffer on 2xx without
// re-reading the socket; on non-2xx, this prevents the body
// from being left for the FinalizationRegistry to reap.
Effect.tap((response) => response.text),
Effect.flatMap(HttpClientResponse.filterStatusOk),
Effect.flatMap(
HttpClientResponse.schemaBodyJson(ContactsCheckResponse),
),
Effect.timeout(Duration.millis(this.timeoutMs)),
Effect.map((result) => result.inContact),
Effect.catchAll((err) =>
Effect.logError(
"Contact check webhook failed, rejecting contact",
).pipe(
Effect.annotateLogs({ err, userIdA, userIdB, url: this.url }),
Effect.as(false),
),
),
);
}
}
Files
contact-policy.tscontact.service.tshandlers.tslayer.tswebhook-contact-service.ts