bson/internal/test/goleak/goleak_test.go
2025-03-17 20:58:26 +01:00

123 lines
3.3 KiB
Go

// Copyright (C) MongoDB, Inc. 2024-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
package main
import (
"context"
"fmt"
"os"
"testing"
"time"
"gitea.psichedelico.com/go/bson"
"gitea.psichedelico.com/go/bson/mongo"
"gitea.psichedelico.com/go/bson/mongo/options"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
)
var dbName = fmt.Sprintf("goleak-%d", time.Now().Unix())
// TestGoroutineLeak creates clients with various client configurations, runs
// some operations with each one, then disconnects the client. It asserts that
// no goroutines were leaked after the client is disconnected.
func TestGoroutineLeak(t *testing.T) {
testCases := []struct {
desc string
opts *options.ClientOptions
}{
{
desc: "base",
opts: options.Client(),
},
{
desc: "compressors=snappy",
opts: options.Client().SetCompressors([]string{"snappy"}),
},
{
desc: "compressors=zlib",
opts: options.Client().SetCompressors([]string{"zlib"}),
},
{
desc: "compressors=zstd",
opts: options.Client().SetCompressors([]string{"zstd"}),
},
{
desc: "minPoolSize=10",
opts: options.Client().SetMinPoolSize(10),
},
{
desc: "serverMonitoringMode=poll",
opts: options.Client().SetServerMonitoringMode(options.ServerMonitoringModePoll),
},
}
for _, tc := range testCases {
// These can't be run in parallel because goleak currently can't filter
// out goroutines from other parallel subtests.
t.Run(tc.desc, func(t *testing.T) {
defer goleak.VerifyNone(t)
base := options.Client()
if u := os.Getenv("MONGODB_URI"); u != "" {
base.ApplyURI(u)
}
client, err := mongo.Connect(base, tc.opts)
require.NoError(t, err)
defer func() {
err = client.Disconnect(context.Background())
require.NoError(t, err)
}()
db := client.Database(dbName)
defer func() {
err := db.Drop(context.Background())
require.NoError(t, err)
}()
coll := db.Collection(collectionName(t))
// Start a change stream to simulate a change listener workload.
cs, err := coll.Watch(context.Background(), mongo.Pipeline{})
require.NoError(t, err)
defer cs.Close(context.Background())
// Run some Insert and FindOne operations to simulate a writing and
// reading workload. Run 50 iterations to increase the probability
// that a goroutine leak will happen if a problem exists.
for i := 0; i < 50; i++ {
_, err = coll.InsertOne(context.Background(), bson.M{"x": 123})
require.NoError(t, err)
var res bson.D
err = coll.FindOne(context.Background(), bson.D{}).Decode(&res)
require.NoError(t, err)
}
// Intentionally cause some timeouts. Ignore any errors.
for i := 0; i < 50; i++ {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Microsecond)
coll.FindOne(ctx, bson.D{}).Err()
cancel()
}
// Finish simulating the change listener workload. Use "Next" to
// fetch at least one change stream document batch and decode the
// first document.
cs.Next(context.Background())
var res bson.D
err = cs.Decode(&res)
require.NoError(t, err)
})
}
}
func collectionName(t *testing.T) string {
return fmt.Sprintf("%s-%d", t.Name(), time.Now().Unix())
}