Skip to content

Commit 4d49b78

Browse files
authored
fix: do not reset usage when no remaining streaming buffer (#1633)
**Description** Remove the usage reset when there is no remaining stream buffer as this is expected and fixed the unrealistic test case which masked the issue. --------- Signed-off-by: Dan Sun <[email protected]>
1 parent ce34aeb commit 4d49b78

File tree

3 files changed

+54
-35
lines changed

3 files changed

+54
-35
lines changed

internal/translator/anthropic_anthropic.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -85,11 +85,6 @@ func (a *anthropicToAnthropicTranslator) ResponseBody(_ map[string]string, body
8585
return nil, nil, tokenUsage, a.requestModel, fmt.Errorf("failed to read body: %w", err)
8686
}
8787

88-
// If this is a fresh start (no buffered data), reset the streaming token usage
89-
if len(a.buffered) == 0 {
90-
a.streamingTokenUsage = metrics.TokenUsage{}
91-
}
92-
9388
a.buffered = append(a.buffered, buf...)
9489
a.extractUsageFromBufferEvent(span)
9590
// Use stored streaming response model, fallback to request model for non-compliant backends

internal/translator/anthropic_anthropic_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -116,10 +116,11 @@ event: content_block_delta
116116
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hi"} }
117117
118118
event: content_block_delta
119-
data: {"type":"content_block_delta","index":0,"delta":{"typ`
120-
const responseTail = `
121-
e":"text_delta","text":"! 👋 How"} }
119+
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"! 👋 How"} }
120+
121+
`
122122

123+
const responseTail = `
123124
event: ping
124125
data: {"type": "ping"}
125126

internal/translator/anthropic_gcpanthropic_test.go

Lines changed: 50 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -472,9 +472,6 @@ func TestAnthropicToGCPAnthropicTranslator_ResponseBody_ZeroTokenUsage(t *testin
472472
}
473473

474474
func TestAnthropicToGCPAnthropicTranslator_ResponseBody_StreamingTokenUsage(t *testing.T) {
475-
translator := NewAnthropicToGCPAnthropicTranslator("2023-06-01", "")
476-
translator.(*anthropicToGCPAnthropicTranslator).stream = true
477-
478475
tests := []struct {
479476
name string
480477
chunk string
@@ -517,7 +514,8 @@ func TestAnthropicToGCPAnthropicTranslator_ResponseBody_StreamingTokenUsage(t *t
517514
t.Run(tt.name, func(t *testing.T) {
518515
bodyReader := bytes.NewReader([]byte(tt.chunk))
519516
respHeaders := map[string]string{"content-type": "application/json"}
520-
517+
translator := NewAnthropicToGCPAnthropicTranslator("2023-06-01", "")
518+
translator.(*anthropicToGCPAnthropicTranslator).stream = true
521519
headerMutation, bodyMutation, tokenUsage, _, err := translator.ResponseBody(respHeaders, bodyReader, tt.endOfStream, nil)
522520

523521
require.NoError(t, err)
@@ -529,9 +527,6 @@ func TestAnthropicToGCPAnthropicTranslator_ResponseBody_StreamingTokenUsage(t *t
529527
}
530528

531529
func TestAnthropicToGCPAnthropicTranslator_ResponseBody_StreamingEdgeCases(t *testing.T) {
532-
translator := NewAnthropicToGCPAnthropicTranslator("2023-06-01", "")
533-
translator.(*anthropicToGCPAnthropicTranslator).stream = true
534-
535530
tests := []struct {
536531
name string
537532
chunk string
@@ -563,7 +558,8 @@ func TestAnthropicToGCPAnthropicTranslator_ResponseBody_StreamingEdgeCases(t *te
563558
t.Run(tt.name, func(t *testing.T) {
564559
bodyReader := bytes.NewReader([]byte(tt.chunk))
565560
respHeaders := map[string]string{"content-type": "application/json"}
566-
561+
translator := NewAnthropicToGCPAnthropicTranslator("2023-06-01", "")
562+
translator.(*anthropicToGCPAnthropicTranslator).stream = true
567563
headerMutation, bodyMutation, tokenUsage, _, err := translator.ResponseBody(respHeaders, bodyReader, false, nil)
568564

569565
require.NoError(t, err)
@@ -611,47 +607,74 @@ func TestAnthropicToGCPAnthropicTranslator_ResponseBody_StreamingFullScenario(t
611607
// 2. content_block events provide the actual text content
612608
// 3. message_delta at the end provides output_tokens=5 but no input_tokens
613609
// 4. message_stop ends the stream
614-
sseStream := `event: message_start
615-
data: {"type": "message_start", "message": {"id": "msg_123", "type": "message", "role": "assistant", "content": [], "model": "claude-3-sonnet-20240229", "usage": {"input_tokens": 15, "output_tokens": 0}}}
616-
617-
event: content_block_start
610+
messageStartChunk := `event: message_start
611+
data: {"type": "message_start", "message": {"id": "msg_123", "type": "message", "role": "assistant", "content": [], "model": "claude-3-sonnet-20240229", "usage": {"input_tokens": 15, "cache_read_input_tokens": 5, "output_tokens": 0}}}
612+
`
613+
contentBlockStartChunk := `event: content_block_start
618614
data: {"type": "content_block_start", "index": 0, "content_block": {"type": "text", "text": ""}}
619-
620-
event: content_block_delta
615+
`
616+
contentBlockDeltaChunk := `event: content_block_delta
621617
data: {"type": "content_block_delta", "index": 0, "delta": {"type": "text_delta", "text": "Hello"}}
622-
623-
event: content_block_stop
618+
`
619+
contentBlockStopChunk := `event: content_block_stop
624620
data: {"type": "content_block_stop", "index": 0}
625-
626-
event: message_delta
621+
`
622+
messageDeltaChunk := `event: message_delta
627623
data: {"type": "message_delta", "delta": {"stop_reason": "end_turn"}, "usage": {"output_tokens": 5}}
628-
629-
event: message_stop
624+
`
625+
messageStopChunk := `event: message_stop
630626
data: {"type": "message_stop"}
631-
632627
`
633628

634629
// Process the streaming response
635-
reader := strings.NewReader(sseStream)
636-
_, _, tokenUsage, _, err := translator.ResponseBody(nil, reader, false, nil)
630+
_, _, tokenUsage, _, err := translator.ResponseBody(nil, strings.NewReader(messageStartChunk), false, nil)
637631
require.NoError(t, err)
638632

639-
// Verify token usage - this should preserve input_tokens from message_start
633+
// Verify token usage - this should calculate input_tokens from message_start
640634
inputTokens, inputSet := tokenUsage.InputTokens()
641635
outputTokens, outputSet := tokenUsage.OutputTokens()
642636
totalTokens, totalSet := tokenUsage.TotalTokens()
643637
cachedTokens, cachedSet := tokenUsage.CachedInputTokens()
644638

645639
// Assertions
646640
assert.True(t, inputSet, "Input tokens should be set")
647-
assert.Equal(t, uint32(15), inputTokens, "Input tokens should be preserved from message_start")
641+
assert.Equal(t, uint32(20), inputTokens, "Input tokens should be preserved from message_start")
648642

649643
assert.True(t, outputSet, "Output tokens should be set")
650-
assert.Equal(t, uint32(5), outputTokens, "Output tokens should come from message_delta")
644+
assert.Equal(t, uint32(0), outputTokens, "Output tokens should come from message_delta")
651645

652646
assert.True(t, totalSet, "Total tokens should be calculated")
653647
assert.Equal(t, uint32(20), totalTokens, "Total tokens should be input + output")
654648

655649
assert.True(t, cachedSet, "Cached tokens should be set")
656-
assert.Equal(t, uint32(0), cachedTokens, "No cached tokens in this scenario")
650+
assert.Equal(t, uint32(5), cachedTokens, "No cached tokens in this scenario")
651+
652+
_, _, tokenUsage, _, err = translator.ResponseBody(nil, strings.NewReader(contentBlockStartChunk), false, nil)
653+
require.NoError(t, err)
654+
_, _, tokenUsage, _, err = translator.ResponseBody(nil, strings.NewReader(contentBlockDeltaChunk), false, nil)
655+
require.NoError(t, err)
656+
_, _, tokenUsage, _, err = translator.ResponseBody(nil, strings.NewReader(contentBlockStopChunk), false, nil)
657+
require.NoError(t, err)
658+
_, _, tokenUsage, _, err = translator.ResponseBody(nil, strings.NewReader(messageDeltaChunk), false, nil)
659+
require.NoError(t, err)
660+
_, _, tokenUsage, _, err = translator.ResponseBody(nil, strings.NewReader(messageStopChunk), false, nil)
661+
require.NoError(t, err)
662+
663+
// Verify token usage - this should preserve input_tokens from message_start and get the output_tokens from message_delta
664+
inputTokens, inputSet = tokenUsage.InputTokens()
665+
outputTokens, outputSet = tokenUsage.OutputTokens()
666+
totalTokens, totalSet = tokenUsage.TotalTokens()
667+
cachedTokens, cachedSet = tokenUsage.CachedInputTokens()
668+
669+
assert.True(t, inputSet, "Input tokens should be set")
670+
assert.Equal(t, uint32(20), inputTokens, "Input tokens should be preserved from message_start")
671+
672+
assert.True(t, outputSet, "Output tokens should be set")
673+
assert.Equal(t, uint32(5), outputTokens, "Output tokens should come from message_delta")
674+
675+
assert.True(t, totalSet, "Total tokens should be calculated")
676+
assert.Equal(t, uint32(25), totalTokens, "Total tokens should be input + output")
677+
678+
assert.True(t, cachedSet, "Cached tokens should be set")
679+
assert.Equal(t, uint32(5), cachedTokens, "No cached tokens in this scenario")
657680
}

0 commit comments

Comments
 (0)