gemini-cli/packages/cli/src/ui/hooks/useGeminiStream.ts

741 lines
23 KiB
TypeScript

/**
* @license
* Copyright 2025 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import { useState, useRef, useCallback, useEffect, useMemo } from 'react';
import { useInput } from 'ink';
import {
Config,
GeminiClient,
GeminiEventType as ServerGeminiEventType,
ServerGeminiStreamEvent as GeminiEvent,
ServerGeminiContentEvent as ContentEvent,
ServerGeminiErrorEvent as ErrorEvent,
ServerGeminiChatCompressedEvent,
getErrorMessage,
isNodeError,
MessageSenderType,
ToolCallRequestInfo,
logUserPrompt,
GitService,
EditorType,
ThoughtSummary,
isAuthError,
} from '@gemini-cli/core';
import { type Part, type PartListUnion } from '@google/genai';
import {
StreamingState,
HistoryItem,
HistoryItemWithoutId,
HistoryItemToolGroup,
MessageType,
ToolCallStatus,
} from '../types.js';
import { isAtCommand } from '../utils/commandUtils.js';
import { parseAndFormatApiError } from '../utils/errorParsing.js';
import { useShellCommandProcessor } from './shellCommandProcessor.js';
import { handleAtCommand } from './atCommandProcessor.js';
import { findLastSafeSplitPoint } from '../utils/markdownUtilities.js';
import { useStateAndRef } from './useStateAndRef.js';
import { UseHistoryManagerReturn } from './useHistoryManager.js';
import { useLogger } from './useLogger.js';
import { promises as fs } from 'fs';
import path from 'path';
import {
useReactToolScheduler,
mapToDisplay as mapTrackedToolCallsToDisplay,
TrackedToolCall,
TrackedCompletedToolCall,
TrackedCancelledToolCall,
} from './useReactToolScheduler.js';
import { useSessionStats } from '../contexts/SessionContext.js';
export function mergePartListUnions(list: PartListUnion[]): PartListUnion {
const resultParts: PartListUnion = [];
for (const item of list) {
if (Array.isArray(item)) {
resultParts.push(...item);
} else {
resultParts.push(item);
}
}
return resultParts;
}
enum StreamProcessingStatus {
Completed,
UserCancelled,
Error,
}
/**
* Manages the Gemini stream, including user input, command processing,
* API interaction, and tool call lifecycle.
*/
export const useGeminiStream = (
geminiClient: GeminiClient,
history: HistoryItem[],
addItem: UseHistoryManagerReturn['addItem'],
setShowHelp: React.Dispatch<React.SetStateAction<boolean>>,
config: Config,
onDebugMessage: (message: string) => void,
handleSlashCommand: (
cmd: PartListUnion,
) => Promise<
import('./slashCommandProcessor.js').SlashCommandActionReturn | boolean
>,
shellModeActive: boolean,
getPreferredEditor: () => EditorType | undefined,
onAuthError: () => void,
) => {
const [initError, setInitError] = useState<string | null>(null);
const abortControllerRef = useRef<AbortController | null>(null);
const [isResponding, setIsResponding] = useState<boolean>(false);
const [thought, setThought] = useState<ThoughtSummary | null>(null);
const [pendingHistoryItemRef, setPendingHistoryItem] =
useStateAndRef<HistoryItemWithoutId | null>(null);
const logger = useLogger();
const { startNewTurn, addUsage } = useSessionStats();
const gitService = useMemo(() => {
if (!config.getProjectRoot()) {
return;
}
return new GitService(config.getProjectRoot());
}, [config]);
const [toolCalls, scheduleToolCalls, markToolsAsSubmitted] =
useReactToolScheduler(
(completedToolCallsFromScheduler) => {
// This onComplete is called when ALL scheduled tools for a given batch are done.
if (completedToolCallsFromScheduler.length > 0) {
// Add the final state of these tools to the history for display.
// The new useEffect will handle submitting their responses.
addItem(
mapTrackedToolCallsToDisplay(
completedToolCallsFromScheduler as TrackedToolCall[],
),
Date.now(),
);
}
},
config,
setPendingHistoryItem,
getPreferredEditor,
);
const pendingToolCallGroupDisplay = useMemo(
() =>
toolCalls.length ? mapTrackedToolCallsToDisplay(toolCalls) : undefined,
[toolCalls],
);
const onExec = useCallback(async (done: Promise<void>) => {
setIsResponding(true);
await done;
setIsResponding(false);
}, []);
const { handleShellCommand } = useShellCommandProcessor(
addItem,
setPendingHistoryItem,
onExec,
onDebugMessage,
config,
geminiClient,
);
const streamingState = useMemo(() => {
if (toolCalls.some((tc) => tc.status === 'awaiting_approval')) {
return StreamingState.WaitingForConfirmation;
}
if (
isResponding ||
toolCalls.some(
(tc) =>
tc.status === 'executing' ||
tc.status === 'scheduled' ||
tc.status === 'validating',
)
) {
return StreamingState.Responding;
}
return StreamingState.Idle;
}, [isResponding, toolCalls]);
useEffect(() => {
if (streamingState === StreamingState.Idle) {
abortControllerRef.current = null;
}
}, [streamingState]);
useInput((_input, key) => {
if (streamingState !== StreamingState.Idle && key.escape) {
abortControllerRef.current?.abort();
}
});
const prepareQueryForGemini = useCallback(
async (
query: PartListUnion,
userMessageTimestamp: number,
abortSignal: AbortSignal,
): Promise<{
queryToSend: PartListUnion | null;
shouldProceed: boolean;
}> => {
if (typeof query === 'string' && query.trim().length === 0) {
return { queryToSend: null, shouldProceed: false };
}
let localQueryToSendToGemini: PartListUnion | null = null;
if (typeof query === 'string') {
const trimmedQuery = query.trim();
logUserPrompt(config, {
prompt: trimmedQuery,
prompt_length: trimmedQuery.length,
});
onDebugMessage(`User query: '${trimmedQuery}'`);
await logger?.logMessage(MessageSenderType.USER, trimmedQuery);
// Handle UI-only commands first
const slashCommandResult = await handleSlashCommand(trimmedQuery);
if (typeof slashCommandResult === 'boolean' && slashCommandResult) {
// Command was handled, and it doesn't require a tool call from here
return { queryToSend: null, shouldProceed: false };
} else if (
typeof slashCommandResult === 'object' &&
slashCommandResult.shouldScheduleTool
) {
// Slash command wants to schedule a tool call (e.g., /memory add)
const { toolName, toolArgs } = slashCommandResult;
if (toolName && toolArgs) {
const toolCallRequest: ToolCallRequestInfo = {
callId: `${toolName}-${Date.now()}-${Math.random().toString(16).slice(2)}`,
name: toolName,
args: toolArgs,
};
scheduleToolCalls([toolCallRequest], abortSignal);
}
return { queryToSend: null, shouldProceed: false }; // Handled by scheduling the tool
}
if (shellModeActive && handleShellCommand(trimmedQuery, abortSignal)) {
return { queryToSend: null, shouldProceed: false };
}
// Handle @-commands (which might involve tool calls)
if (isAtCommand(trimmedQuery)) {
const atCommandResult = await handleAtCommand({
query: trimmedQuery,
config,
addItem,
onDebugMessage,
messageId: userMessageTimestamp,
signal: abortSignal,
});
if (!atCommandResult.shouldProceed) {
return { queryToSend: null, shouldProceed: false };
}
localQueryToSendToGemini = atCommandResult.processedQuery;
} else {
// Normal query for Gemini
addItem(
{ type: MessageType.USER, text: trimmedQuery },
userMessageTimestamp,
);
localQueryToSendToGemini = trimmedQuery;
}
} else {
// It's a function response (PartListUnion that isn't a string)
localQueryToSendToGemini = query;
}
if (localQueryToSendToGemini === null) {
onDebugMessage(
'Query processing resulted in null, not sending to Gemini.',
);
return { queryToSend: null, shouldProceed: false };
}
return { queryToSend: localQueryToSendToGemini, shouldProceed: true };
},
[
config,
addItem,
onDebugMessage,
handleShellCommand,
handleSlashCommand,
logger,
shellModeActive,
scheduleToolCalls,
],
);
// --- Stream Event Handlers ---
const handleContentEvent = useCallback(
(
eventValue: ContentEvent['value'],
currentGeminiMessageBuffer: string,
userMessageTimestamp: number,
): string => {
let newGeminiMessageBuffer = currentGeminiMessageBuffer + eventValue;
if (
pendingHistoryItemRef.current?.type !== 'gemini' &&
pendingHistoryItemRef.current?.type !== 'gemini_content'
) {
if (pendingHistoryItemRef.current) {
addItem(pendingHistoryItemRef.current, userMessageTimestamp);
}
setPendingHistoryItem({ type: 'gemini', text: '' });
newGeminiMessageBuffer = eventValue;
}
// Split large messages for better rendering performance. Ideally,
// we should maximize the amount of output sent to <Static />.
const splitPoint = findLastSafeSplitPoint(newGeminiMessageBuffer);
if (splitPoint === newGeminiMessageBuffer.length) {
// Update the existing message with accumulated content
setPendingHistoryItem((item) => ({
type: item?.type as 'gemini' | 'gemini_content',
text: newGeminiMessageBuffer,
}));
} else {
// This indicates that we need to split up this Gemini Message.
// Splitting a message is primarily a performance consideration. There is a
// <Static> component at the root of App.tsx which takes care of rendering
// content statically or dynamically. Everything but the last message is
// treated as static in order to prevent re-rendering an entire message history
// multiple times per-second (as streaming occurs). Prior to this change you'd
// see heavy flickering of the terminal. This ensures that larger messages get
// broken up so that there are more "statically" rendered.
const beforeText = newGeminiMessageBuffer.substring(0, splitPoint);
const afterText = newGeminiMessageBuffer.substring(splitPoint);
addItem(
{
type: pendingHistoryItemRef.current?.type as
| 'gemini'
| 'gemini_content',
text: beforeText,
},
userMessageTimestamp,
);
setPendingHistoryItem({ type: 'gemini_content', text: afterText });
newGeminiMessageBuffer = afterText;
}
return newGeminiMessageBuffer;
},
[addItem, pendingHistoryItemRef, setPendingHistoryItem],
);
const handleUserCancelledEvent = useCallback(
(userMessageTimestamp: number) => {
if (pendingHistoryItemRef.current) {
if (pendingHistoryItemRef.current.type === 'tool_group') {
const updatedTools = pendingHistoryItemRef.current.tools.map(
(tool) =>
tool.status === ToolCallStatus.Pending ||
tool.status === ToolCallStatus.Confirming ||
tool.status === ToolCallStatus.Executing
? { ...tool, status: ToolCallStatus.Canceled }
: tool,
);
const pendingItem: HistoryItemToolGroup = {
...pendingHistoryItemRef.current,
tools: updatedTools,
};
addItem(pendingItem, userMessageTimestamp);
} else {
addItem(pendingHistoryItemRef.current, userMessageTimestamp);
}
setPendingHistoryItem(null);
}
addItem(
{ type: MessageType.INFO, text: 'User cancelled the request.' },
userMessageTimestamp,
);
setIsResponding(false);
},
[addItem, pendingHistoryItemRef, setPendingHistoryItem],
);
const handleErrorEvent = useCallback(
(eventValue: ErrorEvent['value'], userMessageTimestamp: number) => {
if (pendingHistoryItemRef.current) {
addItem(pendingHistoryItemRef.current, userMessageTimestamp);
setPendingHistoryItem(null);
}
addItem(
{ type: MessageType.ERROR, text: `[API Error: ${eventValue.message}]` },
userMessageTimestamp,
);
},
[addItem, pendingHistoryItemRef, setPendingHistoryItem],
);
const handleChatCompressionEvent = useCallback(
(eventValue: ServerGeminiChatCompressedEvent['value']) =>
addItem(
{
type: 'info',
text:
`IMPORTANT: This conversation approached the input token limit for ${config.getModel()}. ` +
`A compressed context will be sent for future messages (compressed from: ` +
`${eventValue?.originalTokenCount ?? 'unknown'} to ` +
`${eventValue?.newTokenCount ?? 'unknown'} tokens).`,
},
Date.now(),
),
[addItem, config],
);
const processGeminiStreamEvents = useCallback(
async (
stream: AsyncIterable<GeminiEvent>,
userMessageTimestamp: number,
signal: AbortSignal,
): Promise<StreamProcessingStatus> => {
let geminiMessageBuffer = '';
const toolCallRequests: ToolCallRequestInfo[] = [];
for await (const event of stream) {
switch (event.type) {
case ServerGeminiEventType.Thought:
setThought(event.value);
break;
case ServerGeminiEventType.Content:
geminiMessageBuffer = handleContentEvent(
event.value,
geminiMessageBuffer,
userMessageTimestamp,
);
break;
case ServerGeminiEventType.ToolCallRequest:
toolCallRequests.push(event.value);
break;
case ServerGeminiEventType.UserCancelled:
handleUserCancelledEvent(userMessageTimestamp);
break;
case ServerGeminiEventType.Error:
handleErrorEvent(event.value, userMessageTimestamp);
break;
case ServerGeminiEventType.ChatCompressed:
handleChatCompressionEvent(event.value);
break;
case ServerGeminiEventType.UsageMetadata:
addUsage(event.value);
break;
case ServerGeminiEventType.ToolCallConfirmation:
case ServerGeminiEventType.ToolCallResponse:
// do nothing
break;
default: {
// enforces exhaustive switch-case
const unreachable: never = event;
return unreachable;
}
}
}
if (toolCallRequests.length > 0) {
scheduleToolCalls(toolCallRequests, signal);
}
return StreamProcessingStatus.Completed;
},
[
handleContentEvent,
handleUserCancelledEvent,
handleErrorEvent,
scheduleToolCalls,
handleChatCompressionEvent,
addUsage,
],
);
const submitQuery = useCallback(
async (query: PartListUnion, options?: { isContinuation: boolean }) => {
if (
streamingState === StreamingState.Responding ||
streamingState === StreamingState.WaitingForConfirmation
)
return;
const userMessageTimestamp = Date.now();
setShowHelp(false);
abortControllerRef.current = new AbortController();
const abortSignal = abortControllerRef.current.signal;
const { queryToSend, shouldProceed } = await prepareQueryForGemini(
query,
userMessageTimestamp,
abortSignal,
);
if (!shouldProceed || queryToSend === null) {
return;
}
if (!options?.isContinuation) {
startNewTurn();
}
setIsResponding(true);
setInitError(null);
try {
const stream = geminiClient.sendMessageStream(queryToSend, abortSignal);
const processingStatus = await processGeminiStreamEvents(
stream,
userMessageTimestamp,
abortSignal,
);
if (processingStatus === StreamProcessingStatus.UserCancelled) {
return;
}
if (pendingHistoryItemRef.current) {
addItem(pendingHistoryItemRef.current, userMessageTimestamp);
setPendingHistoryItem(null);
}
} catch (error: unknown) {
if (isAuthError(error)) {
onAuthError();
} else if (!isNodeError(error) || error.name !== 'AbortError') {
addItem(
{
type: MessageType.ERROR,
text: parseAndFormatApiError(
getErrorMessage(error) || 'Unknown error',
),
},
userMessageTimestamp,
);
}
} finally {
setIsResponding(false);
}
},
[
streamingState,
setShowHelp,
prepareQueryForGemini,
processGeminiStreamEvents,
pendingHistoryItemRef,
addItem,
setPendingHistoryItem,
setInitError,
geminiClient,
startNewTurn,
onAuthError,
],
);
/**
* Automatically submits responses for completed tool calls.
* This effect runs when `toolCalls` or `isResponding` changes.
* It ensures that tool responses are sent back to Gemini only when
* all processing for a given set of tools is finished and Gemini
* is not already generating a response.
*/
useEffect(() => {
if (isResponding) {
return;
}
const completedAndReadyToSubmitTools = toolCalls.filter(
(
tc: TrackedToolCall,
): tc is TrackedCompletedToolCall | TrackedCancelledToolCall => {
const isTerminalState =
tc.status === 'success' ||
tc.status === 'error' ||
tc.status === 'cancelled';
if (isTerminalState) {
const completedOrCancelledCall = tc as
| TrackedCompletedToolCall
| TrackedCancelledToolCall;
return (
!completedOrCancelledCall.responseSubmittedToGemini &&
completedOrCancelledCall.response?.responseParts !== undefined
);
}
return false;
},
);
if (
completedAndReadyToSubmitTools.length > 0 &&
completedAndReadyToSubmitTools.length === toolCalls.length
) {
// If all the tools were cancelled, don't submit a response to Gemini.
const allToolsCancelled = completedAndReadyToSubmitTools.every(
(tc) => tc.status === 'cancelled',
);
if (allToolsCancelled) {
if (geminiClient) {
// We need to manually add the function responses to the history
// so the model knows the tools were cancelled.
const responsesToAdd = completedAndReadyToSubmitTools.flatMap(
(toolCall) => toolCall.response.responseParts,
);
for (const response of responsesToAdd) {
let parts: Part[];
if (Array.isArray(response)) {
parts = response;
} else if (typeof response === 'string') {
parts = [{ text: response }];
} else {
parts = [response];
}
geminiClient.addHistory({
role: 'user',
parts,
});
}
}
const callIdsToMarkAsSubmitted = completedAndReadyToSubmitTools.map(
(toolCall) => toolCall.request.callId,
);
markToolsAsSubmitted(callIdsToMarkAsSubmitted);
return;
}
const responsesToSend: PartListUnion[] =
completedAndReadyToSubmitTools.map(
(toolCall) => toolCall.response.responseParts,
);
const callIdsToMarkAsSubmitted = completedAndReadyToSubmitTools.map(
(toolCall) => toolCall.request.callId,
);
markToolsAsSubmitted(callIdsToMarkAsSubmitted);
submitQuery(mergePartListUnions(responsesToSend), {
isContinuation: true,
});
}
}, [
toolCalls,
isResponding,
submitQuery,
markToolsAsSubmitted,
addItem,
geminiClient,
]);
const pendingHistoryItems = [
pendingHistoryItemRef.current,
pendingToolCallGroupDisplay,
].filter((i) => i !== undefined && i !== null);
useEffect(() => {
const saveRestorableToolCalls = async () => {
if (!config.getCheckpointEnabled()) {
return;
}
const restorableToolCalls = toolCalls.filter(
(toolCall) =>
(toolCall.request.name === 'replace' ||
toolCall.request.name === 'write_file') &&
toolCall.status === 'awaiting_approval',
);
if (restorableToolCalls.length > 0) {
const checkpointDir = config.getGeminiDir()
? path.join(config.getGeminiDir(), 'checkpoints')
: undefined;
if (!checkpointDir) {
return;
}
try {
await fs.mkdir(checkpointDir, { recursive: true });
} catch (error) {
if (!isNodeError(error) || error.code !== 'EEXIST') {
onDebugMessage(
`Failed to create checkpoint directory: ${getErrorMessage(error)}`,
);
return;
}
}
for (const toolCall of restorableToolCalls) {
const filePath = toolCall.request.args['file_path'] as string;
if (!filePath) {
onDebugMessage(
`Skipping restorable tool call due to missing file_path: ${toolCall.request.name}`,
);
continue;
}
try {
let commitHash = await gitService?.createFileSnapshot(
`Snapshot for ${toolCall.request.name}`,
);
if (!commitHash) {
commitHash = await gitService?.getCurrentCommitHash();
}
if (!commitHash) {
onDebugMessage(
`Failed to create snapshot for ${filePath}. Skipping restorable tool call.`,
);
continue;
}
const timestamp = new Date()
.toISOString()
.replace(/:/g, '-')
.replace(/\./g, '_');
const toolName = toolCall.request.name;
const fileName = path.basename(filePath);
const toolCallWithSnapshotFileName = `${timestamp}-${fileName}-${toolName}.json`;
const clientHistory = await geminiClient?.getHistory();
const toolCallWithSnapshotFilePath = path.join(
checkpointDir,
toolCallWithSnapshotFileName,
);
await fs.writeFile(
toolCallWithSnapshotFilePath,
JSON.stringify(
{
history,
clientHistory,
toolCall: {
name: toolCall.request.name,
args: toolCall.request.args,
},
commitHash,
filePath,
},
null,
2,
),
);
} catch (error) {
onDebugMessage(
`Failed to write restorable tool call file: ${getErrorMessage(
error,
)}`,
);
}
}
}
};
saveRestorableToolCalls();
}, [toolCalls, config, onDebugMessage, gitService, history, geminiClient]);
return {
streamingState,
submitQuery,
initError,
pendingHistoryItems,
thought,
};
};