Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions plugins/outputs/influxdb_v2/batching.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"errors"
"fmt"
"slices"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
Expand Down Expand Up @@ -102,11 +103,23 @@ func (b *batch) split() (first, second *batch) {
}

func (b *batch) serialize(serializer ratelimiter.Serializer, limit int64, encoder internal.ContentEncoder) (int64, error) {
// Serialize the metrics with the remaining limit, exit early if nothing was serialized
// Serialize the metrics with the remaining limit,
body, serr := serializer.SerializeBatch(b.metrics, limit)
if serr != nil && !errors.Is(serr, internal.ErrSizeLimitReached) {
return int64(len(body)), serr
// When only part of the metrics failed to be serialized we should remove
// them from the normal handling and mark them as rejected for upstream
// to pass on this information
var werr *internal.PartialWriteError
if errors.As(serr, &werr) {
for i, idx := range slices.Backward(werr.MetricsReject) {
werr.MetricsReject[i] = b.indices[idx]
b.indices = slices.Delete(b.indices, idx, idx+1)
}
serr = werr
}
}

// Exit early if nothing was serialized
if len(body) == 0 {
return 0, serr
}
Expand All @@ -115,7 +128,7 @@ func (b *batch) serialize(serializer ratelimiter.Serializer, limit int64, encode
if encoder != nil {
enc, err := encoder.Encode(body)
if err != nil {
return int64(len(body)), fmt.Errorf("encoding failed: %w", err)
return 0, fmt.Errorf("encoding failed: %w", err)
}
b.payload = bytes.Clone(enc)
} else {
Expand Down
27 changes: 18 additions & 9 deletions plugins/outputs/influxdb_v2/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,20 +169,30 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error
// Serialize the metrics with the remaining limit, exit early if nothing was serialized
used, err := batch.serialize(c.serializer, limit, c.encoder)
if err != nil {
writeErr.Err = err
batch.err = err
}
if used == 0 {
limitReached = i
break
var werr *internal.PartialWriteError
if errors.As(err, &werr) {
writeErr.MetricsReject = append(writeErr.MetricsReject, werr.MetricsReject...)
writeErr.MetricsRejectErrors = append(writeErr.MetricsRejectErrors, werr.MetricsRejectErrors...)
writeErr.Err = werr.Err
} else {
writeErr.Err = err
batch.err = err
}
}
c.rateLimiter.Reserve(used)

if errors.Is(batch.err, internal.ErrSizeLimitReached) {
limitReached = i + 1
limitReached = i
// If we serialized at least one metric in this batch the limit
// should include the current batch, otherwise we stop before this
// batch.
if used > 0 {
limitReached++
}
break
}
}

// Skip all non-serialized batches
if limitReached > 0 && limitReached < len(batches) {
batches = batches[:limitReached]
Expand Down Expand Up @@ -374,9 +384,8 @@ func (c *httpClient) writeBatch(ctx context.Context, b *batch) error {
http.StatusGatewayTimeout:
// ^ these handle the cases where the server is likely overloaded, and may not be able to say so.
retryDuration := getRetryDuration(resp.Header, c.retryCount.Add(1))
c.log.Warnf("Failed to write to %s; will retry in %s. (%s)\n", b.bucket, retryDuration, resp.Status)
return &ThrottleError{
Err: fmt.Errorf("waiting %s for server before sending metrics again", retryDuration),
Err: fmt.Errorf("failed to write to %s; will retry in %s. (%s)", b.bucket, retryDuration, resp.Status),
StatusCode: resp.StatusCode,
RetryAfter: retryDuration,
}
Expand Down
2 changes: 1 addition & 1 deletion plugins/outputs/influxdb_v2/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func TestRetryLaterEarlyExit(t *testing.T) {

// Write the metrics the first time and check for the expected errors
err = c.Write(t.Context(), metrics)
require.ErrorContains(t, err, "waiting 2m0s for server before sending metrics again")
require.ErrorContains(t, err, "will retry in 2m0s")

var writeErr *internal.PartialWriteError
require.ErrorAs(t, err, &writeErr)
Expand Down
185 changes: 184 additions & 1 deletion plugins/outputs/influxdb_v2/influxdb_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package influxdb_v2_test
import (
"fmt"
"io"
"math"
"net"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -213,6 +214,188 @@ func TestWrite(t *testing.T) {
require.NoError(t, plugin.Write(metrics))
}

func TestWriteWithPartialSerializationError(t *testing.T) {
expectedBody := "cpu,type=valid value=42.123 0\n"
// Setup a test server
ts := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/api/v2/write":
body, err := io.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
t.Error(err)
return
}

if string(body) != expectedBody {
w.WriteHeader(http.StatusInternalServerError)
t.Errorf("'body' should contain %q", expectedBody)
return
}

w.WriteHeader(http.StatusNoContent)
return
default:
w.WriteHeader(http.StatusNotFound)
return
}
}),
)
defer ts.Close()

// Setup plugin and connect
plugin := &influxdb.InfluxDB{
URLs: []string{"http://" + ts.Listener.Addr().String()},
Bucket: "telegraf",
ExcludeBucketTag: true,
ContentEncoding: "identity",
PingTimeout: config.Duration(15 * time.Second),
ReadIdleTimeout: config.Duration(30 * time.Second),
Log: &testutil.Logger{},
}
require.NoError(t, plugin.Init())
require.NoError(t, plugin.Connect())
defer plugin.Close()

metrics := []telegraf.Metric{
// Metric which cannot be serialized
testutil.MustMetric(
"cpu",
map[string]string{
"type": "invalid",
},
map[string]interface{}{
"value": math.NaN,
},
time.Unix(0, 0),
),
// Valid metric
testutil.MustMetric(
"cpu",
map[string]string{
"type": "valid",
},
map[string]interface{}{
"value": 42.123,
},
time.Unix(0, 0),
),
// Metric which cannot be serialized
testutil.MustMetric(
"cpu",
map[string]string{
"type": "invalid",
},
map[string]interface{}{
"value": math.Inf,
},
time.Unix(0, 0),
),
}

// Writing should cause a partial write error where only the valid metric is
// accepted, all invalid metrics should be rejected.
err := plugin.Write(metrics)
require.ErrorIs(t, err, internal.ErrSerialization)
var werr *internal.PartialWriteError
require.ErrorAs(t, err, &werr)
require.ElementsMatch(t, werr.MetricsAccept, []int{1})
require.ElementsMatch(t, werr.MetricsReject, []int{0, 2})
}

func TestWriteWithPartialSerializationAndSendError(t *testing.T) {
expectedBody := "cpu,type=valid value=42.123 0\n"
// Setup a test server
ts := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/api/v2/write":
body, err := io.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
t.Error(err)
return
}

if string(body) != expectedBody {
w.WriteHeader(http.StatusInternalServerError)
t.Errorf("'body' should contain %q", expectedBody)
return
}

w.WriteHeader(http.StatusTooManyRequests)
return
default:
w.WriteHeader(http.StatusNotFound)
return
}
}),
)
defer ts.Close()

// Setup plugin and connect
plugin := &influxdb.InfluxDB{
URLs: []string{"http://" + ts.Listener.Addr().String()},
Bucket: "telegraf",
ExcludeBucketTag: true,
ContentEncoding: "identity",
PingTimeout: config.Duration(15 * time.Second),
ReadIdleTimeout: config.Duration(30 * time.Second),
Log: &testutil.Logger{},
}
require.NoError(t, plugin.Init())
require.NoError(t, plugin.Connect())
defer plugin.Close()

metrics := []telegraf.Metric{
// Metric which cannot be serialized
testutil.MustMetric(
"cpu",
map[string]string{
"type": "invalid",
},
map[string]interface{}{
"value": math.NaN,
},
time.Unix(0, 0),
),
// Valid metric
testutil.MustMetric(
"cpu",
map[string]string{
"type": "valid",
},
map[string]interface{}{
"value": 42.123,
},
time.Unix(0, 0),
),
// Metric which cannot be serialized
testutil.MustMetric(
"cpu",
map[string]string{
"type": "invalid",
},
map[string]interface{}{
"value": math.Inf,
},
time.Unix(0, 0),
),
}

// Writing should cause a partial write error where no metric is accepted
// due to the sending error, all invalid metrics should be rejected.
// As the sending error is retryable the valid metric should neither be
// rejected not accepted to re-queue it in the next write.
err := plugin.Write(metrics)
require.ErrorContains(t, err, http.StatusText(http.StatusTooManyRequests))
var werr *internal.PartialWriteError
require.ErrorAs(t, err, &werr)
require.Empty(t, werr.MetricsAccept)
require.ElementsMatch(t, werr.MetricsReject, []int{0, 2})
}

func TestWriteBucketTagWorksOnRetry(t *testing.T) {
// Setup a test server
ts := httptest.NewServer(
Expand Down Expand Up @@ -764,7 +947,7 @@ func TestStatusCodeServiceUnavailable(t *testing.T) {

// Write the metrics the first time and check for the expected errors
err := plugin.Write(metrics)
require.ErrorContains(t, err, "waiting 25ms for server before sending metrics again")
require.ErrorContains(t, err, http.StatusText(code))

var writeErr *internal.PartialWriteError
require.ErrorAs(t, err, &writeErr)
Expand Down
Loading