Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions core/bifrost.go
Original file line number Diff line number Diff line change
Expand Up @@ -1130,6 +1130,84 @@ func (bifrost *Bifrost) BatchResultsRequest(ctx context.Context, req *schemas.Bi
return response, nil
}

// BatchDeleteRequest deletes a batch job.
func (bifrost *Bifrost) BatchDeleteRequest(ctx context.Context, req *schemas.BifrostBatchDeleteRequest) (*schemas.BifrostBatchDeleteResponse, *schemas.BifrostError) {
if req == nil {
return nil, &schemas.BifrostError{
IsBifrostError: false,
Error: &schemas.ErrorField{
Message: "batch delete request is nil",
},
}
}
if req.Provider == "" {
return nil, &schemas.BifrostError{
IsBifrostError: false,
Error: &schemas.ErrorField{
Message: "provider is required for batch delete request",
},
}
}
if req.BatchID == "" {
return nil, &schemas.BifrostError{
IsBifrostError: false,
Error: &schemas.ErrorField{
Message: "batch_id is required for batch delete request",
},
}
}
if ctx == nil {
ctx = bifrost.ctx
}

provider := bifrost.getProviderByKey(req.Provider)
if provider == nil {
return nil, &schemas.BifrostError{
IsBifrostError: false,
Error: &schemas.ErrorField{
Message: "provider not found for batch delete request",
},
}
}

config, err := bifrost.account.GetConfigForProvider(req.Provider)
if err != nil {
return nil, newBifrostErrorFromMsg(fmt.Sprintf("failed to get config for provider %s: %v", req.Provider, err.Error()))
}
if config == nil {
return nil, newBifrostErrorFromMsg(fmt.Sprintf("config is nil for provider %s", req.Provider))
}

// Determine the base provider type for key requirement checks
baseProvider := req.Provider
if config.CustomProviderConfig != nil && config.CustomProviderConfig.BaseProviderType != "" {
baseProvider = config.CustomProviderConfig.BaseProviderType
}

var key schemas.Key
if providerRequiresKey(baseProvider, config.CustomProviderConfig) {
keys, keyErr := bifrost.getAllSupportedKeys(&ctx, req.Provider, baseProvider)
if keyErr != nil {
return nil, newBifrostError(keyErr)
}
if len(keys) > 0 {
key = keys[0]
}
}

response, bifrostErr := executeRequestWithRetries(&ctx, config, func() (*schemas.BifrostBatchDeleteResponse, *schemas.BifrostError) {
return provider.BatchDelete(ctx, key, req)
}, schemas.BatchDeleteRequest, req.Provider, "")
if bifrostErr != nil {
bifrostErr.ExtraFields = schemas.BifrostErrorExtraFields{
RequestType: schemas.BatchDeleteRequest,
Provider: req.Provider,
}
return nil, bifrostErr
}
return response, nil
}

// FileUploadRequest uploads a file to the specified provider.
func (bifrost *Bifrost) FileUploadRequest(ctx context.Context, req *schemas.BifrostFileUploadRequest) (*schemas.BifrostFileUploadResponse, *schemas.BifrostError) {
if req == nil {
Expand Down
5 changes: 5 additions & 0 deletions core/providers/anthropic/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -789,3 +789,8 @@ func formatAnthropicTimestamp(unixTime int64) string {
}
return time.Unix(unixTime, 0).UTC().Format(time.RFC3339)
}

// BatchDelete is not supported by Anthropic provider.
func (provider *AnthropicProvider) BatchDelete(ctx context.Context, key schemas.Key, request *schemas.BifrostBatchDeleteRequest) (*schemas.BifrostBatchDeleteResponse, *schemas.BifrostError) {
return nil, providerUtils.NewUnsupportedOperationError(schemas.BatchDeleteRequest, provider.GetProviderKey())
}
5 changes: 5 additions & 0 deletions core/providers/azure/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,3 +434,8 @@ func splitJSONL(data []byte) [][]byte {
}
return lines
}

// BatchDelete is not supported by Azure provider.
func (provider *AzureProvider) BatchDelete(ctx context.Context, key schemas.Key, request *schemas.BifrostBatchDeleteRequest) (*schemas.BifrostBatchDeleteResponse, *schemas.BifrostError) {
return nil, providerUtils.NewUnsupportedOperationError(schemas.BatchDeleteRequest, provider.GetProviderKey())
}
5 changes: 5 additions & 0 deletions core/providers/bedrock/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -1060,3 +1060,8 @@ func splitJSONL(data []byte) [][]byte {
}
return lines
}

// BatchDelete is not supported by Bedrock provider.
func (provider *BedrockProvider) BatchDelete(ctx context.Context, key schemas.Key, request *schemas.BifrostBatchDeleteRequest) (*schemas.BifrostBatchDeleteResponse, *schemas.BifrostError) {
return nil, providerUtils.NewUnsupportedOperationError(schemas.BatchDeleteRequest, provider.GetProviderKey())
}
5 changes: 5 additions & 0 deletions core/providers/cerebras/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,8 @@ func (provider *CerebrasProvider) BatchResults(ctx context.Context, key schemas.
return nil, providerUtils.NewUnsupportedOperationError(schemas.BatchResultsRequest, provider.GetProviderKey())
}

// BatchDelete is not supported by Cerebras provider.
func (provider *CerebrasProvider) BatchDelete(ctx context.Context, key schemas.Key, request *schemas.BifrostBatchDeleteRequest) (*schemas.BifrostBatchDeleteResponse, *schemas.BifrostError) {
return nil, providerUtils.NewUnsupportedOperationError(schemas.BatchDeleteRequest, provider.GetProviderKey())
}

5 changes: 5 additions & 0 deletions core/providers/cohere/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,8 @@ func (provider *CohereProvider) BatchResults(ctx context.Context, key schemas.Ke
return nil, providerUtils.NewUnsupportedOperationError(schemas.BatchResultsRequest, provider.GetProviderKey())
}

// BatchDelete is not supported by Cohere provider.
func (provider *CohereProvider) BatchDelete(ctx context.Context, key schemas.Key, request *schemas.BifrostBatchDeleteRequest) (*schemas.BifrostBatchDeleteResponse, *schemas.BifrostError) {
return nil, providerUtils.NewUnsupportedOperationError(schemas.BatchDeleteRequest, provider.GetProviderKey())
}

5 changes: 5 additions & 0 deletions core/providers/elevenlabs/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,8 @@ func (provider *ElevenlabsProvider) BatchResults(ctx context.Context, key schema
return nil, providerUtils.NewUnsupportedOperationError(schemas.BatchResultsRequest, provider.GetProviderKey())
}

// BatchDelete is not supported by Elevenlabs provider.
func (provider *ElevenlabsProvider) BatchDelete(ctx context.Context, key schemas.Key, request *schemas.BifrostBatchDeleteRequest) (*schemas.BifrostBatchDeleteResponse, *schemas.BifrostError) {
return nil, providerUtils.NewUnsupportedOperationError(schemas.BatchDeleteRequest, provider.GetProviderKey())
}

202 changes: 202 additions & 0 deletions core/providers/gemini/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -839,3 +839,205 @@ func (provider *GeminiProvider) BatchResults(ctx context.Context, key schemas.Ke
},
}, nil
}

// ==================== SDK RESPONSE CONVERTERS ====================
// These functions convert Bifrost batch responses to Google GenAI SDK format.

// ToGeminiJobState converts Bifrost batch status to Gemini SDK job state.
func ToGeminiJobState(status schemas.BatchStatus) string {
switch status {
case schemas.BatchStatusValidating:
return GeminiJobStatePending
case schemas.BatchStatusInProgress:
return GeminiJobStateRunning
case schemas.BatchStatusFinalizing:
return GeminiJobStateRunning
case schemas.BatchStatusCompleted:
return GeminiJobStateSucceeded
case schemas.BatchStatusFailed:
return GeminiJobStateFailed
case schemas.BatchStatusCancelling:
return GeminiJobStateCancelling
case schemas.BatchStatusCancelled:
return GeminiJobStateCancelled
case schemas.BatchStatusExpired:
return GeminiJobStateFailed
default:
return GeminiJobStatePending
}
}
Comment on lines +847 to +868
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Fix BatchStats computation to avoid negative pending counts

The SDK converters generally look good, but there’s a corner case in the stats math:

  • In BatchRetrieve, BifrostBatchRetrieveResponse.RequestCounts is populated with Completed and Failed, while Total is left at its zero value.
  • ToGeminiBatchRetrieveResponse and ToGeminiBatchListResponse currently use resp.RequestCounts.Total directly to derive RequestCount and PendingRequestCount.

When Total == 0 and Completed > 0, PendingRequestCount = Total - Completed becomes negative, which is invalid.

Consider defensively deriving totals and pending counts, for example:

-	result.Metadata = &GeminiBatchMetadata{
-		Name:       resp.ID,
-		State:      ToGeminiJobState(resp.Status),
-		CreateTime: time.Unix(resp.CreatedAt, 0).Format(time.RFC3339),
-		BatchStats: &GeminiBatchStats{
-			RequestCount:           resp.RequestCounts.Total,
-			PendingRequestCount:    resp.RequestCounts.Total - resp.RequestCounts.Completed,
-			SuccessfulRequestCount: resp.RequestCounts.Completed - resp.RequestCounts.Failed,
-		},
-	}
+	total := resp.RequestCounts.Total
+	if total == 0 {
+		total = resp.RequestCounts.Completed + resp.RequestCounts.Failed
+	}
+	success := resp.RequestCounts.Completed
+	if success < 0 {
+		success = 0
+	}
+	pending := total - (success + resp.RequestCounts.Failed)
+	if pending < 0 {
+		pending = 0
+	}
+
+	result.Metadata = &GeminiBatchMetadata{
+		Name:       resp.ID,
+		State:      ToGeminiJobState(resp.Status),
+		CreateTime: time.Unix(resp.CreatedAt, 0).Format(time.RFC3339),
+		BatchStats: &GeminiBatchStats{
+			RequestCount:           total,
+			PendingRequestCount:    pending,
+			SuccessfulRequestCount: success,
+		},
+	}

and mirror the same pattern inside ToGeminiBatchListResponse when populating BatchStats, so the invariants RequestCount ≥ 0, PendingRequestCount ≥ 0, and RequestCount ≈ success + failures + pending always hold.

Also applies to: 899-919, 941-958

🤖 Prompt for AI Agents
core/providers/gemini/batch.go lines ~847-868 (and similarly at 899-919,
941-958): the current stats math can produce negative PendingRequestCount when
resp.RequestCounts.Total is zero but Completed/Failed are non-zero; instead
derive totals defensively: compute total := resp.RequestCounts.Total; if total
== 0 { total = resp.RequestCounts.Completed + resp.RequestCounts.Failed +
resp.RequestCounts.Pending }; compute pending := total -
resp.RequestCounts.Completed - resp.RequestCounts.Failed; if pending < 0 {
pending = 0 }; then set RequestCount = total and PendingRequestCount = pending
(ensuring RequestCount ≥ 0, PendingRequestCount ≥ 0 and RequestCount ≈ success +
failures + pending). Apply the same change to ToGeminiBatchListResponse and any
other places noted.


// ToGeminiBatchJobResponse converts a BifrostBatchCreateResponse to Gemini SDK format.
func ToGeminiBatchJobResponse(resp *schemas.BifrostBatchCreateResponse) *GeminiBatchJobResponseSDK {
if resp == nil {
return nil
}

result := &GeminiBatchJobResponseSDK{
Name: resp.ID,
State: ToGeminiJobState(resp.Status),
}

// Add metadata if available
if resp.CreatedAt > 0 {
result.Metadata = &GeminiBatchMetadata{
Name: resp.ID,
State: ToGeminiJobState(resp.Status),
CreateTime: time.Unix(resp.CreatedAt, 0).Format(time.RFC3339),
BatchStats: &GeminiBatchStats{
RequestCount: resp.RequestCounts.Total,
PendingRequestCount: resp.RequestCounts.Total - resp.RequestCounts.Completed,
SuccessfulRequestCount: resp.RequestCounts.Completed - resp.RequestCounts.Failed,
},
}
}

return result
}

// ToGeminiBatchRetrieveResponse converts a BifrostBatchRetrieveResponse to Gemini SDK format.
func ToGeminiBatchRetrieveResponse(resp *schemas.BifrostBatchRetrieveResponse) *GeminiBatchJobResponseSDK {
if resp == nil {
return nil
}

result := &GeminiBatchJobResponseSDK{
Name: resp.ID,
State: ToGeminiJobState(resp.Status),
}

// Add metadata
result.Metadata = &GeminiBatchMetadata{
Name: resp.ID,
State: ToGeminiJobState(resp.Status),
CreateTime: time.Unix(resp.CreatedAt, 0).Format(time.RFC3339),
BatchStats: &GeminiBatchStats{
RequestCount: resp.RequestCounts.Total,
PendingRequestCount: resp.RequestCounts.Total - resp.RequestCounts.Completed,
SuccessfulRequestCount: resp.RequestCounts.Completed - resp.RequestCounts.Failed,
},
}

if resp.CompletedAt != nil {
result.Metadata.EndTime = time.Unix(*resp.CompletedAt, 0).Format(time.RFC3339)
}

// Add output file info if available
if resp.OutputFileID != nil {
result.Dest = &GeminiBatchDest{
FileName: *resp.OutputFileID,
}
}

return result
}

// ToGeminiBatchListResponse converts a BifrostBatchListResponse to Gemini SDK format.
func ToGeminiBatchListResponse(resp *schemas.BifrostBatchListResponse) *GeminiBatchListResponseSDK {
if resp == nil {
return nil
}

jobs := make([]GeminiBatchJobResponseSDK, 0, len(resp.Data))
for _, batch := range resp.Data {
job := GeminiBatchJobResponseSDK{
Name: batch.ID,
State: ToGeminiJobState(batch.Status),
}

// Add metadata
job.Metadata = &GeminiBatchMetadata{
Name: batch.ID,
State: ToGeminiJobState(batch.Status),
CreateTime: time.Unix(batch.CreatedAt, 0).Format(time.RFC3339),
BatchStats: &GeminiBatchStats{
RequestCount: batch.RequestCounts.Total,
PendingRequestCount: batch.RequestCounts.Total - batch.RequestCounts.Completed,
SuccessfulRequestCount: batch.RequestCounts.Completed - batch.RequestCounts.Failed,
},
}

jobs = append(jobs, job)
}

result := &GeminiBatchListResponseSDK{
BatchJobs: jobs,
}

if resp.NextCursor != nil {
result.NextPageToken = *resp.NextCursor
}

return result
}

// ToGeminiBatchCancelResponse converts a BifrostBatchCancelResponse to Gemini SDK format.
func ToGeminiBatchCancelResponse(resp *schemas.BifrostBatchCancelResponse) *GeminiBatchJobResponseSDK {
if resp == nil {
return nil
}

return &GeminiBatchJobResponseSDK{
Name: resp.ID,
State: ToGeminiJobState(resp.Status),
}
}

// BatchDelete deletes a batch job for Gemini.
func (provider *GeminiProvider) BatchDelete(ctx context.Context, key schemas.Key, request *schemas.BifrostBatchDeleteRequest) (*schemas.BifrostBatchDeleteResponse, *schemas.BifrostError) {
if err := providerUtils.CheckOperationAllowed(schemas.Gemini, provider.customProviderConfig, schemas.BatchDeleteRequest); err != nil {
return nil, err
}

providerName := provider.GetProviderKey()

if request.BatchID == "" {
return nil, providerUtils.NewBifrostOperationError("batch_id is required", nil, providerName)
}

// Create HTTP request
req := fasthttp.AcquireRequest()
resp := fasthttp.AcquireResponse()
defer fasthttp.ReleaseRequest(req)
defer fasthttp.ReleaseResponse(resp)

// Build URL for delete operation
batchID := request.BatchID
var url string
if strings.HasPrefix(batchID, "batches/") {
url = fmt.Sprintf("%s/%s", provider.networkConfig.BaseURL, batchID)
} else {
url = fmt.Sprintf("%s/batches/%s", provider.networkConfig.BaseURL, batchID)
}

provider.logger.Debug("gemini batch delete url: " + url)
providerUtils.SetExtraHeaders(ctx, req, provider.networkConfig.ExtraHeaders, nil)
req.SetRequestURI(url)
req.Header.SetMethod(http.MethodDelete)
if key.Value != "" {
req.Header.Set("x-goog-api-key", key.Value)
}
req.Header.SetContentType("application/json")

// Make request
latency, bifrostErr := providerUtils.MakeRequestWithContext(ctx, provider.client, req, resp)
if bifrostErr != nil {
return nil, bifrostErr
}

// Handle response
if resp.StatusCode() != fasthttp.StatusOK && resp.StatusCode() != fasthttp.StatusNoContent {
return nil, parseGeminiError(providerName, resp)
}

return &schemas.BifrostBatchDeleteResponse{
ID: request.BatchID,
Object: "batch",
Deleted: true,
ExtraFields: schemas.BifrostResponseExtraFields{
RequestType: schemas.BatchDeleteRequest,
Provider: providerName,
Latency: latency.Milliseconds(),
},
}, nil
}
28 changes: 17 additions & 11 deletions core/providers/gemini/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,17 +463,23 @@ func (provider *GeminiProvider) FileContent(ctx context.Context, key schemas.Key
}

// ToGeminiFileUploadResponse converts a Bifrost file upload response to Gemini format.
func ToGeminiFileUploadResponse(resp *schemas.BifrostFileUploadResponse) map[string]interface{} {
return map[string]interface{}{
"file": map[string]interface{}{
"name": resp.ID,
"displayName": resp.Filename,
"mimeType": "application/octet-stream",
"sizeBytes": fmt.Sprintf("%d", resp.Bytes),
"createTime": formatGeminiTimestamp(resp.CreatedAt),
"state": toGeminiFileState(resp.Status),
"uri": resp.StorageURI,
"expirationTime": formatGeminiTimestamp(safeDerefInt64(resp.ExpiresAt)),
// Uses snake_case field names to match Google's API format.
// GeminiFileUploadResponseWrapper is a wrapper that contains the file response for the upload API.
type GeminiFileUploadResponseWrapper struct {
File GeminiFileResponse `json:"file"`
}

func ToGeminiFileUploadResponse(resp *schemas.BifrostFileUploadResponse) *GeminiFileUploadResponseWrapper {
return &GeminiFileUploadResponseWrapper{
File: GeminiFileResponse{
Name: resp.ID,
DisplayName: resp.Filename,
MimeType: "application/octet-stream",
SizeBytes: fmt.Sprintf("%d", resp.Bytes),
CreateTime: formatGeminiTimestamp(resp.CreatedAt),
State: toGeminiFileState(resp.Status),
URI: resp.StorageURI,
ExpirationTime: formatGeminiTimestamp(safeDerefInt64(resp.ExpiresAt)),
},
}
}
Comment on lines +466 to 485
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Gemini file upload wrapper matches expected SDK shape (minor doc nit)

The new GeminiFileUploadResponseWrapper and ToGeminiFileUploadResponse produce the expected { "file": { ... } } shape using GeminiFileResponse, which is consistent with the rest of the Gemini/GenAI integration. The comment about “snake_case field names” is slightly misleading given the struct tags are camelCase; consider updating the comment for accuracy when you next touch this code.

🤖 Prompt for AI Agents
In core/providers/gemini/files.go around lines 466 to 485, the top comment
incorrectly states “Uses snake_case field names to match Google's API format”
even though the struct uses JSON tags with camelCase; update the comment to
accurately describe the wrapper’s purpose and JSON shape (e.g., indicate it
produces a "file" wrapper with camelCase JSON tags matching the SDK/GenAI
integration) and keep the rest of the comment concise and aligned with the
implementation.

Expand Down
Loading