The function* declaration creates a binding of a new generator function to a given name. A generator function can be exited and later re-entered, with its context (variable bindings) saved across re-entrances.
/** * Create an async iterator that yields messages as they become available. * Uses atomic claim-and-delete to prevent duplicates. * The queue is a pure buffer: claim it, delete it, process in memory. * Waits for 'message' event when queue is empty. */async *createIterator(sessionDbId: number, signal: AbortSignal): AsyncIterableIterator<PendingMessageWithId> { while (!signal.aborted) { try { // Atomically claim AND DELETE next message from DB // Message is now in memory only - no "processing" state tracking needed const persistentMessage = this.store.claimAndDelete(sessionDbId); if (persistentMessage) { // Yield the message for processing (it's already deleted from queue) yield this.toPendingMessageWithId(persistentMessage); } else { // Queue empty - wait for wake-up event await this.waitForMessage(signal); } } catch (error) { if (signal.aborted) return; logger.error('SESSION', 'Error in queue processor loop', { sessionDbId }, error as Error); // Small backoff to prevent tight loop on DB error await new Promise(resolve => setTimeout(resolve, 1000)); } }}
This createIterator yields this.toPendingMessageWithId, this function is defined in the same file, under this function.
/** * Get message iterator for SDKAgent to consume (event-driven, no polling) * Auto-initializes session if not in memory but exists in database * * CRITICAL: Uses PendingMessageStore for crash-safe message persistence. * Messages are marked as 'processing' when yielded and must be marked 'processed' * by the SDK agent after successful completion. */async *getMessageIterator(sessionDbId: number): AsyncIterableIterator<PendingMessageWithId> { // Auto-initialize from database if needed (handles worker restarts) let session = this.sessions.get(sessionDbId); if (!session) { session = this.initializeSession(sessionDbId); } const emitter = this.sessionQueues.get(sessionDbId); if (!emitter) { throw new Error(`No emitter for session ${sessionDbId}`); } const processor = new SessionQueueProcessor(this.getPendingStore(), emitter); // Use the robust iterator - messages are deleted on claim (no tracking needed) for await (const message of processor.createIterator(sessionDbId, session.abortController.signal)) { // Track earliest timestamp for accurate observation timestamps // This ensures backlog messages get their original timestamps, not current time if (session.earliestPendingTimestamp === null) { session.earliestPendingTimestamp = message._originalTimestamp; } else { session.earliestPendingTimestamp = Math.min(session.earliestPendingTimestamp, message._originalTimestamp); } yield message; }}
I spent 200+ hours analyzing Supabase, shadcn/ui, LobeChat. Found the patterns that separate AI slop from production code. Stop refactoring AI slop. Start with proven patterns. Check out production-grade projects atthinkthroo.com