export class MoltZapServer<
AuthRequires,
ConnectionProvides,
ConnectionRequires,
HookRequires = never,
> {
constructor(
private readonly options: MoltZapServerOptions<
AuthRequires,
ConnectionProvides,
ConnectionRequires,
HookRequires
>,
) {}
handleSocket(
socket: Socket.Socket,
): Effect.Effect<
void,
Socket.SocketError,
ServerSocketRequirements<AuthRequires, ConnectionRequires, HookRequires>
> {
return Effect.scoped(this.openSocketSession(socket));
}
private openSocketSession(
socket: Socket.Socket,
): Effect.Effect<
void,
Socket.SocketError,
ScopedServerSocketRequirements<
AuthRequires,
ConnectionRequires,
HookRequires
>
> {
return Effect.gen(this, function* () {
const accepted = yield* makeAcceptedSocketSession(socket);
const scope = yield* Effect.scope;
const originator = yield* buildReverseClient({
write: accepted.write,
scope,
});
const session = makeMoltZapServerSession(accepted, originator);
yield* this.options.onOpen(session);
yield* Effect.logInfo("WebSocket connected").pipe(
Effect.annotateLogs({ connId: session.connId }),
);
const disconnects = yield* Mailbox.make<number>();
const sinkReady = yield* Deferred.make<ChannelSink>();
yield* Layer.build(
makeSocketRpcLayer({
write: session.write,
disconnects,
sinkReady,
handlers: this.options.handlers,
authLayer: this.options.authLayer(session.connId),
connectionLayer: this.options.connectionLayer(session.connId),
}),
);
const serverSink = yield* Deferred.await(sinkReady);
const reader = runMuxReader(
socket,
{ server: serverSink, client: session.originator.sink },
disconnects,
session.write,
);
yield* this.runSocketReader(reader, session);
}).pipe(Effect.withSpan("MoltZapServer.openSocketSession"));
}
private runSocketReader(
reader: Effect.Effect<
void,
Socket.SocketError,
ServerSocketRequirements<AuthRequires, ConnectionRequires, HookRequires>
>,
session: MoltZapServerSession,
): Effect.Effect<
void,
Socket.SocketError,
ServerSocketRequirements<AuthRequires, ConnectionRequires, HookRequires>
> {
return Effect.raceFirst(
reader,
Deferred.await(session.closeRequested),
).pipe(
Effect.onExit((exit) =>
Effect.gen(this, function* () {
yield* this.options.onClose(exit, session);
if (Exit.isFailure(exit)) {
yield* Effect.logWarning("WebSocket error").pipe(
Effect.annotateLogs({
connId: session.connId,
cause: Cause.pretty(exit.cause),
}),
);
}
yield* Effect.logInfo("WebSocket disconnected").pipe(
Effect.annotateLogs({ connId: session.connId }),
);
}),
),
);
}
}