feat(telemetry): Prevent memory leak in ClearcutLogger (#5734)
This commit is contained in:
parent
e50d886ba8
commit
86eaa03f8a
|
@ -35,6 +35,7 @@
|
|||
"json": "^11.0.0",
|
||||
"lodash": "^4.17.21",
|
||||
"memfs": "^4.17.2",
|
||||
"mnemonist": "^0.40.3",
|
||||
"mock-fs": "^5.5.0",
|
||||
"prettier": "^3.5.3",
|
||||
"react-devtools-core": "^4.28.5",
|
||||
|
@ -7860,6 +7861,16 @@
|
|||
"node": ">=16 || 14 >=14.17"
|
||||
}
|
||||
},
|
||||
"node_modules/mnemonist": {
|
||||
"version": "0.40.3",
|
||||
"resolved": "https://registry.npmjs.org/mnemonist/-/mnemonist-0.40.3.tgz",
|
||||
"integrity": "sha512-Vjyr90sJ23CKKH/qPAgUKicw/v6pRoamxIEDFOF8uSgFME7DqPRpHgRTejWVjkdGg5dXj0/NyxZHZ9bcjH+2uQ==",
|
||||
"dev": true,
|
||||
"license": "MIT",
|
||||
"dependencies": {
|
||||
"obliterator": "^2.0.4"
|
||||
}
|
||||
},
|
||||
"node_modules/mock-fs": {
|
||||
"version": "5.5.0",
|
||||
"resolved": "https://registry.npmjs.org/mock-fs/-/mock-fs-5.5.0.tgz",
|
||||
|
@ -8290,6 +8301,13 @@
|
|||
"url": "https://github.com/sponsors/ljharb"
|
||||
}
|
||||
},
|
||||
"node_modules/obliterator": {
|
||||
"version": "2.0.5",
|
||||
"resolved": "https://registry.npmjs.org/obliterator/-/obliterator-2.0.5.tgz",
|
||||
"integrity": "sha512-42CPE9AhahZRsMNslczq0ctAEtqk8Eka26QofnqC346BZdHDySk3LWka23LI7ULIw11NmltpiLagIq8gBozxTw==",
|
||||
"dev": true,
|
||||
"license": "MIT"
|
||||
},
|
||||
"node_modules/on-finished": {
|
||||
"version": "2.4.1",
|
||||
"resolved": "https://registry.npmjs.org/on-finished/-/on-finished-2.4.1.tgz",
|
||||
|
|
|
@ -84,6 +84,7 @@
|
|||
"react-devtools-core": "^4.28.5",
|
||||
"typescript-eslint": "^8.30.1",
|
||||
"vitest": "^3.2.4",
|
||||
"yargs": "^17.7.2"
|
||||
"yargs": "^17.7.2",
|
||||
"mnemonist": "^0.40.3"
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,263 @@
|
|||
/**
|
||||
* @license
|
||||
* Copyright 2025 Google LLC
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
import { vi, describe, it, expect, beforeEach, afterEach } from 'vitest';
|
||||
import * as https from 'https';
|
||||
import { ClientRequest, IncomingMessage } from 'http';
|
||||
import { Readable, Writable } from 'stream';
|
||||
|
||||
import {
|
||||
ClearcutLogger,
|
||||
LogResponse,
|
||||
LogEventEntry,
|
||||
} from './clearcut-logger.js';
|
||||
import { Config } from '../../config/config.js';
|
||||
import * as userAccount from '../../utils/user_account.js';
|
||||
import * as userId from '../../utils/user_id.js';
|
||||
|
||||
// Mock dependencies
|
||||
vi.mock('https-proxy-agent');
|
||||
vi.mock('https');
|
||||
vi.mock('../../utils/user_account');
|
||||
vi.mock('../../utils/user_id');
|
||||
|
||||
const mockHttps = vi.mocked(https);
|
||||
const mockUserAccount = vi.mocked(userAccount);
|
||||
const mockUserId = vi.mocked(userId);
|
||||
|
||||
describe('ClearcutLogger', () => {
|
||||
let mockConfig: Config;
|
||||
let logger: ClearcutLogger | undefined;
|
||||
|
||||
// A helper to get the internal events array for testing
|
||||
const getEvents = (l: ClearcutLogger): LogEventEntry[][] =>
|
||||
l['events'].toArray() as LogEventEntry[][];
|
||||
|
||||
const getEventsSize = (l: ClearcutLogger): number => l['events'].size;
|
||||
|
||||
const getMaxEvents = (l: ClearcutLogger): number => l['max_events'];
|
||||
|
||||
const getMaxRetryEvents = (l: ClearcutLogger): number =>
|
||||
l['max_retry_events'];
|
||||
|
||||
const requeueFailedEvents = (l: ClearcutLogger, events: LogEventEntry[][]) =>
|
||||
l['requeueFailedEvents'](events);
|
||||
|
||||
beforeEach(() => {
|
||||
vi.useFakeTimers();
|
||||
vi.setSystemTime(new Date());
|
||||
|
||||
mockConfig = {
|
||||
getUsageStatisticsEnabled: vi.fn().mockReturnValue(true),
|
||||
getDebugMode: vi.fn().mockReturnValue(false),
|
||||
getSessionId: vi.fn().mockReturnValue('test-session-id'),
|
||||
getProxy: vi.fn().mockReturnValue(undefined),
|
||||
} as unknown as Config;
|
||||
|
||||
mockUserAccount.getCachedGoogleAccount.mockReturnValue('test@google.com');
|
||||
mockUserAccount.getLifetimeGoogleAccounts.mockReturnValue(1);
|
||||
mockUserId.getInstallationId.mockReturnValue('test-installation-id');
|
||||
|
||||
logger = ClearcutLogger.getInstance(mockConfig);
|
||||
expect(logger).toBeDefined();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
ClearcutLogger.clearInstance();
|
||||
vi.useRealTimers();
|
||||
vi.restoreAllMocks();
|
||||
});
|
||||
|
||||
it('should not return an instance if usage statistics are disabled', () => {
|
||||
ClearcutLogger.clearInstance();
|
||||
vi.spyOn(mockConfig, 'getUsageStatisticsEnabled').mockReturnValue(false);
|
||||
const disabledLogger = ClearcutLogger.getInstance(mockConfig);
|
||||
expect(disabledLogger).toBeUndefined();
|
||||
});
|
||||
|
||||
describe('enqueueLogEvent', () => {
|
||||
it('should add events to the queue', () => {
|
||||
logger!.enqueueLogEvent({ test: 'event1' });
|
||||
expect(getEventsSize(logger!)).toBe(1);
|
||||
});
|
||||
|
||||
it('should evict the oldest event when the queue is full', () => {
|
||||
const maxEvents = getMaxEvents(logger!);
|
||||
|
||||
for (let i = 0; i < maxEvents; i++) {
|
||||
logger!.enqueueLogEvent({ event_id: i });
|
||||
}
|
||||
|
||||
expect(getEventsSize(logger!)).toBe(maxEvents);
|
||||
const firstEvent = JSON.parse(
|
||||
getEvents(logger!)[0][0].source_extension_json,
|
||||
);
|
||||
expect(firstEvent.event_id).toBe(0);
|
||||
|
||||
// This should push out the first event
|
||||
logger!.enqueueLogEvent({ event_id: maxEvents });
|
||||
|
||||
expect(getEventsSize(logger!)).toBe(maxEvents);
|
||||
const newFirstEvent = JSON.parse(
|
||||
getEvents(logger!)[0][0].source_extension_json,
|
||||
);
|
||||
expect(newFirstEvent.event_id).toBe(1);
|
||||
const lastEvent = JSON.parse(
|
||||
getEvents(logger!)[maxEvents - 1][0].source_extension_json,
|
||||
);
|
||||
expect(lastEvent.event_id).toBe(maxEvents);
|
||||
});
|
||||
});
|
||||
|
||||
describe('flushToClearcut', () => {
|
||||
let mockRequest: Writable;
|
||||
let mockResponse: Readable & Partial<IncomingMessage>;
|
||||
|
||||
beforeEach(() => {
|
||||
mockRequest = new Writable({
|
||||
write(chunk, encoding, callback) {
|
||||
callback();
|
||||
},
|
||||
});
|
||||
vi.spyOn(mockRequest, 'on');
|
||||
vi.spyOn(mockRequest, 'end').mockReturnThis();
|
||||
vi.spyOn(mockRequest, 'destroy').mockReturnThis();
|
||||
|
||||
mockResponse = new Readable({ read() {} }) as Readable &
|
||||
Partial<IncomingMessage>;
|
||||
|
||||
mockHttps.request.mockImplementation(
|
||||
(
|
||||
_options: string | https.RequestOptions | URL,
|
||||
...args: unknown[]
|
||||
): ClientRequest => {
|
||||
const callback = args.find((arg) => typeof arg === 'function') as
|
||||
| ((res: IncomingMessage) => void)
|
||||
| undefined;
|
||||
|
||||
if (callback) {
|
||||
callback(mockResponse as IncomingMessage);
|
||||
}
|
||||
return mockRequest as ClientRequest;
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
it('should clear events on successful flush', async () => {
|
||||
mockResponse.statusCode = 200;
|
||||
const mockResponseBody = { nextRequestWaitMs: 1000 };
|
||||
// Encoded protobuf for {nextRequestWaitMs: 1000} which is `08 E8 07`
|
||||
const encodedResponse = Buffer.from([8, 232, 7]);
|
||||
|
||||
logger!.enqueueLogEvent({ event_id: 1 });
|
||||
const flushPromise = logger!.flushToClearcut();
|
||||
|
||||
mockResponse.push(encodedResponse);
|
||||
mockResponse.push(null); // End the stream
|
||||
|
||||
const response: LogResponse = await flushPromise;
|
||||
|
||||
expect(getEventsSize(logger!)).toBe(0);
|
||||
expect(response.nextRequestWaitMs).toBe(
|
||||
mockResponseBody.nextRequestWaitMs,
|
||||
);
|
||||
});
|
||||
|
||||
it('should handle a network error and requeue events', async () => {
|
||||
logger!.enqueueLogEvent({ event_id: 1 });
|
||||
logger!.enqueueLogEvent({ event_id: 2 });
|
||||
expect(getEventsSize(logger!)).toBe(2);
|
||||
|
||||
const flushPromise = logger!.flushToClearcut();
|
||||
mockRequest.emit('error', new Error('Network error'));
|
||||
await flushPromise;
|
||||
|
||||
expect(getEventsSize(logger!)).toBe(2);
|
||||
const events = getEvents(logger!);
|
||||
expect(JSON.parse(events[0][0].source_extension_json).event_id).toBe(1);
|
||||
});
|
||||
|
||||
it('should handle an HTTP error and requeue events', async () => {
|
||||
mockResponse.statusCode = 500;
|
||||
mockResponse.statusMessage = 'Internal Server Error';
|
||||
|
||||
logger!.enqueueLogEvent({ event_id: 1 });
|
||||
logger!.enqueueLogEvent({ event_id: 2 });
|
||||
expect(getEventsSize(logger!)).toBe(2);
|
||||
|
||||
const flushPromise = logger!.flushToClearcut();
|
||||
mockResponse.emit('end'); // End the response to trigger promise resolution
|
||||
await flushPromise;
|
||||
|
||||
expect(getEventsSize(logger!)).toBe(2);
|
||||
const events = getEvents(logger!);
|
||||
expect(JSON.parse(events[0][0].source_extension_json).event_id).toBe(1);
|
||||
});
|
||||
});
|
||||
|
||||
describe('requeueFailedEvents logic', () => {
|
||||
it('should limit the number of requeued events to max_retry_events', () => {
|
||||
const maxRetryEvents = getMaxRetryEvents(logger!);
|
||||
const eventsToLogCount = maxRetryEvents + 5;
|
||||
const eventsToSend: LogEventEntry[][] = [];
|
||||
for (let i = 0; i < eventsToLogCount; i++) {
|
||||
eventsToSend.push([
|
||||
{
|
||||
event_time_ms: Date.now(),
|
||||
source_extension_json: JSON.stringify({ event_id: i }),
|
||||
},
|
||||
]);
|
||||
}
|
||||
|
||||
requeueFailedEvents(logger!, eventsToSend);
|
||||
|
||||
expect(getEventsSize(logger!)).toBe(maxRetryEvents);
|
||||
const firstRequeuedEvent = JSON.parse(
|
||||
getEvents(logger!)[0][0].source_extension_json,
|
||||
);
|
||||
// The last `maxRetryEvents` are kept. The oldest of those is at index `eventsToLogCount - maxRetryEvents`.
|
||||
expect(firstRequeuedEvent.event_id).toBe(
|
||||
eventsToLogCount - maxRetryEvents,
|
||||
);
|
||||
});
|
||||
|
||||
it('should not requeue more events than available space in the queue', () => {
|
||||
const maxEvents = getMaxEvents(logger!);
|
||||
const spaceToLeave = 5;
|
||||
const initialEventCount = maxEvents - spaceToLeave;
|
||||
for (let i = 0; i < initialEventCount; i++) {
|
||||
logger!.enqueueLogEvent({ event_id: `initial_${i}` });
|
||||
}
|
||||
expect(getEventsSize(logger!)).toBe(initialEventCount);
|
||||
|
||||
const failedEventsCount = 10; // More than spaceToLeave
|
||||
const eventsToSend: LogEventEntry[][] = [];
|
||||
for (let i = 0; i < failedEventsCount; i++) {
|
||||
eventsToSend.push([
|
||||
{
|
||||
event_time_ms: Date.now(),
|
||||
source_extension_json: JSON.stringify({ event_id: `failed_${i}` }),
|
||||
},
|
||||
]);
|
||||
}
|
||||
|
||||
requeueFailedEvents(logger!, eventsToSend);
|
||||
|
||||
// availableSpace is 5. eventsToRequeue is min(10, 5) = 5.
|
||||
// Total size should be initialEventCount + 5 = maxEvents.
|
||||
expect(getEventsSize(logger!)).toBe(maxEvents);
|
||||
|
||||
// The requeued events are the *last* 5 of the failed events.
|
||||
// startIndex = max(0, 10 - 5) = 5.
|
||||
// Loop unshifts events from index 9 down to 5.
|
||||
// The first element in the deque is the one with id 'failed_5'.
|
||||
const firstRequeuedEvent = JSON.parse(
|
||||
getEvents(logger!)[0][0].source_extension_json,
|
||||
);
|
||||
expect(firstRequeuedEvent.event_id).toBe('failed_5');
|
||||
});
|
||||
});
|
||||
});
|
|
@ -30,8 +30,8 @@ import {
|
|||
getCachedGoogleAccount,
|
||||
getLifetimeGoogleAccounts,
|
||||
} from '../../utils/user_account.js';
|
||||
import { HttpError, retryWithBackoff } from '../../utils/retry.js';
|
||||
import { getInstallationId } from '../../utils/user_id.js';
|
||||
import { FixedDeque } from 'mnemonist';
|
||||
|
||||
const start_session_event_name = 'start_session';
|
||||
const new_prompt_event_name = 'new_prompt';
|
||||
|
@ -51,6 +51,25 @@ export interface LogResponse {
|
|||
nextRequestWaitMs?: number;
|
||||
}
|
||||
|
||||
export interface LogEventEntry {
|
||||
event_time_ms: number;
|
||||
source_extension_json: string;
|
||||
}
|
||||
|
||||
export type EventValue = {
|
||||
gemini_cli_key: EventMetadataKey | string;
|
||||
value: string;
|
||||
};
|
||||
|
||||
export type LogEvent = {
|
||||
console_type: string;
|
||||
application: number;
|
||||
event_name: string;
|
||||
event_metadata: EventValue[][];
|
||||
client_email?: string;
|
||||
client_install_id?: string;
|
||||
};
|
||||
|
||||
/**
|
||||
* Determine the surface that the user is currently using. Surface is effectively the
|
||||
* distribution channel in which the user is using Gemini CLI. Gemini CLI comes bundled
|
||||
|
@ -75,13 +94,17 @@ function determineSurface(): string {
|
|||
export class ClearcutLogger {
|
||||
private static instance: ClearcutLogger;
|
||||
private config?: Config;
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any -- Clearcut expects this format.
|
||||
private readonly events: any = [];
|
||||
private readonly events: FixedDeque<LogEventEntry[]>;
|
||||
private last_flush_time: number = Date.now();
|
||||
private flush_interval_ms: number = 1000 * 60; // Wait at least a minute before flushing events.
|
||||
private readonly max_events: number = 1000; // Maximum events to keep in memory
|
||||
private readonly max_retry_events: number = 100; // Maximum failed events to retry
|
||||
private flushing: boolean = false; // Prevent concurrent flush operations
|
||||
private pendingFlush: boolean = false; // Track if a flush was requested during an ongoing flush
|
||||
|
||||
private constructor(config?: Config) {
|
||||
this.config = config;
|
||||
this.events = new FixedDeque<LogEventEntry[]>(Array, this.max_events);
|
||||
}
|
||||
|
||||
static getInstance(config?: Config): ClearcutLogger | undefined {
|
||||
|
@ -93,17 +116,41 @@ export class ClearcutLogger {
|
|||
return ClearcutLogger.instance;
|
||||
}
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any -- Clearcut expects this format.
|
||||
enqueueLogEvent(event: any): void {
|
||||
/** For testing purposes only. */
|
||||
static clearInstance(): void {
|
||||
// @ts-expect-error - ClearcutLogger is a singleton, but we need to clear it for tests.
|
||||
ClearcutLogger.instance = undefined;
|
||||
}
|
||||
|
||||
enqueueLogEvent(event: object): void {
|
||||
try {
|
||||
// Manually handle overflow for FixedDeque, which throws when full.
|
||||
const wasAtCapacity = this.events.size >= this.max_events;
|
||||
|
||||
if (wasAtCapacity) {
|
||||
this.events.shift(); // Evict oldest element to make space.
|
||||
}
|
||||
|
||||
this.events.push([
|
||||
{
|
||||
event_time_ms: Date.now(),
|
||||
source_extension_json: safeJsonStringify(event),
|
||||
},
|
||||
]);
|
||||
|
||||
if (wasAtCapacity && this.config?.getDebugMode()) {
|
||||
console.debug(
|
||||
`ClearcutLogger: Dropped old event to prevent memory leak (queue size: ${this.events.size})`,
|
||||
);
|
||||
}
|
||||
} catch (error) {
|
||||
if (this.config?.getDebugMode()) {
|
||||
console.error('ClearcutLogger: Failed to enqueue log event.', error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
createLogEvent(name: string, data: object[]): object {
|
||||
createLogEvent(name: string, data: EventValue[]): LogEvent {
|
||||
const email = getCachedGoogleAccount();
|
||||
const totalAccounts = getLifetimeGoogleAccounts();
|
||||
data.push({
|
||||
|
@ -111,12 +158,11 @@ export class ClearcutLogger {
|
|||
value: totalAccounts.toString(),
|
||||
});
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
const logEvent: any = {
|
||||
const logEvent: LogEvent = {
|
||||
console_type: 'GEMINI_CLI',
|
||||
application: 102,
|
||||
event_name: name,
|
||||
event_metadata: [data] as object[],
|
||||
event_metadata: [data],
|
||||
};
|
||||
|
||||
// Should log either email or install ID, not both. See go/cloudmill-1p-oss-instrumentation#define-sessionable-id
|
||||
|
@ -140,16 +186,25 @@ export class ClearcutLogger {
|
|||
}
|
||||
|
||||
async flushToClearcut(): Promise<LogResponse> {
|
||||
if (this.flushing) {
|
||||
if (this.config?.getDebugMode()) {
|
||||
console.debug(
|
||||
'ClearcutLogger: Flush already in progress, marking pending flush.',
|
||||
);
|
||||
}
|
||||
this.pendingFlush = true;
|
||||
return Promise.resolve({});
|
||||
}
|
||||
this.flushing = true;
|
||||
|
||||
if (this.config?.getDebugMode()) {
|
||||
console.log('Flushing log events to Clearcut.');
|
||||
}
|
||||
const eventsToSend = [...this.events];
|
||||
if (eventsToSend.length === 0) {
|
||||
return {};
|
||||
}
|
||||
const eventsToSend = this.events.toArray() as LogEventEntry[][];
|
||||
this.events.clear();
|
||||
|
||||
const flushFn = () =>
|
||||
new Promise<Buffer>((resolve, reject) => {
|
||||
return new Promise<{ buffer: Buffer; statusCode?: number }>(
|
||||
(resolve, reject) => {
|
||||
const request = [
|
||||
{
|
||||
log_source_name: 'CONCORD',
|
||||
|
@ -163,6 +218,7 @@ export class ClearcutLogger {
|
|||
path: '/log',
|
||||
method: 'POST',
|
||||
headers: { 'Content-Length': Buffer.byteLength(body) },
|
||||
timeout: 30000, // 30-second timeout
|
||||
};
|
||||
const bufs: Buffer[] = [];
|
||||
const req = https.request(
|
||||
|
@ -171,49 +227,77 @@ export class ClearcutLogger {
|
|||
agent: this.getProxyAgent(),
|
||||
},
|
||||
(res) => {
|
||||
res.on('error', reject); // Handle stream errors
|
||||
res.on('data', (buf) => bufs.push(buf));
|
||||
res.on('end', () => {
|
||||
try {
|
||||
const buffer = Buffer.concat(bufs);
|
||||
// Check if we got a successful response
|
||||
if (
|
||||
res.statusCode &&
|
||||
(res.statusCode < 200 || res.statusCode >= 300)
|
||||
res.statusCode >= 200 &&
|
||||
res.statusCode < 300
|
||||
) {
|
||||
const err: HttpError = new Error(
|
||||
`Request failed with status ${res.statusCode}`,
|
||||
resolve({ buffer, statusCode: res.statusCode });
|
||||
} else {
|
||||
// HTTP error - reject with status code for retry handling
|
||||
reject(
|
||||
new Error(`HTTP ${res.statusCode}: ${res.statusMessage}`),
|
||||
);
|
||||
err.status = res.statusCode;
|
||||
res.resume();
|
||||
return reject(err);
|
||||
}
|
||||
res.on('data', (buf) => bufs.push(buf));
|
||||
res.on('end', () => resolve(Buffer.concat(bufs)));
|
||||
} catch (e) {
|
||||
reject(e);
|
||||
}
|
||||
});
|
||||
},
|
||||
);
|
||||
req.on('error', reject);
|
||||
req.on('error', (e) => {
|
||||
// Network-level error
|
||||
reject(e);
|
||||
});
|
||||
req.on('timeout', () => {
|
||||
if (!req.destroyed) {
|
||||
req.destroy(new Error('Request timeout after 30 seconds'));
|
||||
}
|
||||
});
|
||||
req.end(body);
|
||||
});
|
||||
|
||||
try {
|
||||
const responseBuffer = await retryWithBackoff(flushFn, {
|
||||
maxAttempts: 3,
|
||||
initialDelayMs: 200,
|
||||
shouldRetry: (err: unknown) => {
|
||||
if (!(err instanceof Error)) return false;
|
||||
const status = (err as HttpError).status as number | undefined;
|
||||
// If status is not available, it's likely a network error
|
||||
if (status === undefined) return true;
|
||||
|
||||
// Retry on 429 (Too many Requests) and 5xx server errors.
|
||||
return status === 429 || (status >= 500 && status < 600);
|
||||
},
|
||||
});
|
||||
|
||||
this.events.splice(0, eventsToSend.length);
|
||||
)
|
||||
.then(({ buffer }) => {
|
||||
try {
|
||||
this.last_flush_time = Date.now();
|
||||
return this.decodeLogResponse(responseBuffer) || {};
|
||||
} catch (error) {
|
||||
if (this.config?.getDebugMode()) {
|
||||
console.error('Clearcut flush failed after multiple retries.', error);
|
||||
}
|
||||
return this.decodeLogResponse(buffer) || {};
|
||||
} catch (error: unknown) {
|
||||
console.error('Error decoding log response:', error);
|
||||
return {};
|
||||
}
|
||||
})
|
||||
.catch((error: unknown) => {
|
||||
// Handle both network-level and HTTP-level errors
|
||||
if (this.config?.getDebugMode()) {
|
||||
console.error('Error flushing log events:', error);
|
||||
}
|
||||
|
||||
// Re-queue failed events for retry
|
||||
this.requeueFailedEvents(eventsToSend);
|
||||
|
||||
// Return empty response to maintain the Promise<LogResponse> contract
|
||||
return {};
|
||||
})
|
||||
.finally(() => {
|
||||
this.flushing = false;
|
||||
|
||||
// If a flush was requested while we were flushing, flush again
|
||||
if (this.pendingFlush) {
|
||||
this.pendingFlush = false;
|
||||
// Fire and forget the pending flush
|
||||
this.flushToClearcut().catch((error) => {
|
||||
if (this.config?.getDebugMode()) {
|
||||
console.debug('Error in pending flush to Clearcut:', error);
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Visible for testing. Decodes protobuf-encoded response from Clearcut server.
|
||||
|
@ -258,7 +342,7 @@ export class ClearcutLogger {
|
|||
logStartSessionEvent(event: StartSessionEvent): void {
|
||||
const surface = determineSurface();
|
||||
|
||||
const data = [
|
||||
const data: EventValue[] = [
|
||||
{
|
||||
gemini_cli_key: EventMetadataKey.GEMINI_CLI_START_SESSION_MODEL,
|
||||
value: event.model,
|
||||
|
@ -337,7 +421,7 @@ export class ClearcutLogger {
|
|||
}
|
||||
|
||||
logNewPromptEvent(event: UserPromptEvent): void {
|
||||
const data = [
|
||||
const data: EventValue[] = [
|
||||
{
|
||||
gemini_cli_key: EventMetadataKey.GEMINI_CLI_USER_PROMPT_LENGTH,
|
||||
value: JSON.stringify(event.prompt_length),
|
||||
|
@ -361,7 +445,7 @@ export class ClearcutLogger {
|
|||
}
|
||||
|
||||
logToolCallEvent(event: ToolCallEvent): void {
|
||||
const data = [
|
||||
const data: EventValue[] = [
|
||||
{
|
||||
gemini_cli_key: EventMetadataKey.GEMINI_CLI_TOOL_CALL_NAME,
|
||||
value: JSON.stringify(event.function_name),
|
||||
|
@ -398,7 +482,7 @@ export class ClearcutLogger {
|
|||
}
|
||||
|
||||
logApiRequestEvent(event: ApiRequestEvent): void {
|
||||
const data = [
|
||||
const data: EventValue[] = [
|
||||
{
|
||||
gemini_cli_key: EventMetadataKey.GEMINI_CLI_API_REQUEST_MODEL,
|
||||
value: JSON.stringify(event.model),
|
||||
|
@ -414,7 +498,7 @@ export class ClearcutLogger {
|
|||
}
|
||||
|
||||
logApiResponseEvent(event: ApiResponseEvent): void {
|
||||
const data = [
|
||||
const data: EventValue[] = [
|
||||
{
|
||||
gemini_cli_key: EventMetadataKey.GEMINI_CLI_API_RESPONSE_MODEL,
|
||||
value: JSON.stringify(event.model),
|
||||
|
@ -471,7 +555,7 @@ export class ClearcutLogger {
|
|||
}
|
||||
|
||||
logApiErrorEvent(event: ApiErrorEvent): void {
|
||||
const data = [
|
||||
const data: EventValue[] = [
|
||||
{
|
||||
gemini_cli_key: EventMetadataKey.GEMINI_CLI_API_ERROR_MODEL,
|
||||
value: JSON.stringify(event.model),
|
||||
|
@ -503,7 +587,7 @@ export class ClearcutLogger {
|
|||
}
|
||||
|
||||
logFlashFallbackEvent(event: FlashFallbackEvent): void {
|
||||
const data = [
|
||||
const data: EventValue[] = [
|
||||
{
|
||||
gemini_cli_key: EventMetadataKey.GEMINI_CLI_AUTH_TYPE,
|
||||
value: JSON.stringify(event.auth_type),
|
||||
|
@ -521,7 +605,7 @@ export class ClearcutLogger {
|
|||
}
|
||||
|
||||
logLoopDetectedEvent(event: LoopDetectedEvent): void {
|
||||
const data = [
|
||||
const data: EventValue[] = [
|
||||
{
|
||||
gemini_cli_key: EventMetadataKey.GEMINI_CLI_PROMPT_ID,
|
||||
value: JSON.stringify(event.prompt_id),
|
||||
|
@ -537,7 +621,7 @@ export class ClearcutLogger {
|
|||
}
|
||||
|
||||
logNextSpeakerCheck(event: NextSpeakerCheckEvent): void {
|
||||
const data = [
|
||||
const data: EventValue[] = [
|
||||
{
|
||||
gemini_cli_key: EventMetadataKey.GEMINI_CLI_PROMPT_ID,
|
||||
value: JSON.stringify(event.prompt_id),
|
||||
|
@ -563,7 +647,7 @@ export class ClearcutLogger {
|
|||
}
|
||||
|
||||
logSlashCommandEvent(event: SlashCommandEvent): void {
|
||||
const data = [
|
||||
const data: EventValue[] = [
|
||||
{
|
||||
gemini_cli_key: EventMetadataKey.GEMINI_CLI_SLASH_COMMAND_NAME,
|
||||
value: JSON.stringify(event.command),
|
||||
|
@ -582,7 +666,7 @@ export class ClearcutLogger {
|
|||
}
|
||||
|
||||
logMalformedJsonResponseEvent(event: MalformedJsonResponseEvent): void {
|
||||
const data = [
|
||||
const data: EventValue[] = [
|
||||
{
|
||||
gemini_cli_key:
|
||||
EventMetadataKey.GEMINI_CLI_MALFORMED_JSON_RESPONSE_MODEL,
|
||||
|
@ -597,7 +681,7 @@ export class ClearcutLogger {
|
|||
}
|
||||
|
||||
logIdeConnectionEvent(event: IdeConnectionEvent): void {
|
||||
const data = [
|
||||
const data: EventValue[] = [
|
||||
{
|
||||
gemini_cli_key: EventMetadataKey.GEMINI_CLI_IDE_CONNECTION_TYPE,
|
||||
value: JSON.stringify(event.connection_type),
|
||||
|
@ -609,7 +693,7 @@ export class ClearcutLogger {
|
|||
}
|
||||
|
||||
logEndSessionEvent(event: EndSessionEvent): void {
|
||||
const data = [
|
||||
const data: EventValue[] = [
|
||||
{
|
||||
gemini_cli_key: EventMetadataKey.GEMINI_CLI_SESSION_ID,
|
||||
value: event?.session_id?.toString() ?? '',
|
||||
|
@ -639,4 +723,57 @@ export class ClearcutLogger {
|
|||
const event = new EndSessionEvent(this.config);
|
||||
this.logEndSessionEvent(event);
|
||||
}
|
||||
|
||||
private requeueFailedEvents(eventsToSend: LogEventEntry[][]): void {
|
||||
// Add the events back to the front of the queue to be retried, but limit retry queue size
|
||||
const eventsToRetry = eventsToSend.slice(-this.max_retry_events); // Keep only the most recent events
|
||||
|
||||
// Log a warning if we're dropping events
|
||||
if (
|
||||
eventsToSend.length > this.max_retry_events &&
|
||||
this.config?.getDebugMode()
|
||||
) {
|
||||
console.warn(
|
||||
`ClearcutLogger: Dropping ${
|
||||
eventsToSend.length - this.max_retry_events
|
||||
} events due to retry queue limit. Total events: ${
|
||||
eventsToSend.length
|
||||
}, keeping: ${this.max_retry_events}`,
|
||||
);
|
||||
}
|
||||
|
||||
// Determine how many events can be re-queued
|
||||
const availableSpace = this.max_events - this.events.size;
|
||||
const numEventsToRequeue = Math.min(eventsToRetry.length, availableSpace);
|
||||
|
||||
if (numEventsToRequeue === 0) {
|
||||
if (this.config?.getDebugMode()) {
|
||||
console.debug(
|
||||
`ClearcutLogger: No events re-queued (queue size: ${this.events.size})`,
|
||||
);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// Get the most recent events to re-queue
|
||||
const eventsToRequeue = eventsToRetry.slice(
|
||||
eventsToRetry.length - numEventsToRequeue,
|
||||
);
|
||||
|
||||
// Prepend events to the front of the deque to be retried first.
|
||||
// We iterate backwards to maintain the original order of the failed events.
|
||||
for (let i = eventsToRequeue.length - 1; i >= 0; i--) {
|
||||
this.events.unshift(eventsToRequeue[i]);
|
||||
}
|
||||
// Clear any potential overflow
|
||||
while (this.events.size > this.max_events) {
|
||||
this.events.pop();
|
||||
}
|
||||
|
||||
if (this.config?.getDebugMode()) {
|
||||
console.debug(
|
||||
`ClearcutLogger: Re-queued ${numEventsToRequeue} events for retry (queue size: ${this.events.size})`,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue