2023-11-06 14:44:53 +00:00
package pgxpool
import (
"context"
"fmt"
"math/rand"
"runtime"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/puddle/v2"
)
var defaultMaxConns = int32 ( 4 )
var defaultMinConns = int32 ( 0 )
var defaultMaxConnLifetime = time . Hour
var defaultMaxConnIdleTime = time . Minute * 30
var defaultHealthCheckPeriod = time . Minute
type connResource struct {
conn * pgx . Conn
conns [ ] Conn
poolRows [ ] poolRow
poolRowss [ ] poolRows
maxAgeTime time . Time
}
func ( cr * connResource ) getConn ( p * Pool , res * puddle . Resource [ * connResource ] ) * Conn {
if len ( cr . conns ) == 0 {
cr . conns = make ( [ ] Conn , 128 )
}
c := & cr . conns [ len ( cr . conns ) - 1 ]
cr . conns = cr . conns [ 0 : len ( cr . conns ) - 1 ]
c . res = res
c . p = p
return c
}
func ( cr * connResource ) getPoolRow ( c * Conn , r pgx . Row ) * poolRow {
if len ( cr . poolRows ) == 0 {
cr . poolRows = make ( [ ] poolRow , 128 )
}
pr := & cr . poolRows [ len ( cr . poolRows ) - 1 ]
cr . poolRows = cr . poolRows [ 0 : len ( cr . poolRows ) - 1 ]
pr . c = c
pr . r = r
return pr
}
func ( cr * connResource ) getPoolRows ( c * Conn , r pgx . Rows ) * poolRows {
if len ( cr . poolRowss ) == 0 {
cr . poolRowss = make ( [ ] poolRows , 128 )
}
pr := & cr . poolRowss [ len ( cr . poolRowss ) - 1 ]
cr . poolRowss = cr . poolRowss [ 0 : len ( cr . poolRowss ) - 1 ]
pr . c = c
pr . r = r
return pr
}
// Pool allows for connection reuse.
type Pool struct {
// 64 bit fields accessed with atomics must be at beginning of struct to guarantee alignment for certain 32-bit
// architectures. See BUGS section of https://pkg.go.dev/sync/atomic and https://github.com/jackc/pgx/issues/1288.
newConnsCount int64
lifetimeDestroyCount int64
idleDestroyCount int64
p * puddle . Pool [ * connResource ]
config * Config
beforeConnect func ( context . Context , * pgx . ConnConfig ) error
afterConnect func ( context . Context , * pgx . Conn ) error
beforeAcquire func ( context . Context , * pgx . Conn ) bool
afterRelease func ( * pgx . Conn ) bool
beforeClose func ( * pgx . Conn )
minConns int32
maxConns int32
maxConnLifetime time . Duration
maxConnLifetimeJitter time . Duration
maxConnIdleTime time . Duration
healthCheckPeriod time . Duration
healthCheckChan chan struct { }
2024-05-27 10:35:41 +01:00
acquireTracer AcquireTracer
releaseTracer ReleaseTracer
2023-11-06 14:44:53 +00:00
closeOnce sync . Once
closeChan chan struct { }
}
// Config is the configuration struct for creating a pool. It must be created by [ParseConfig] and then it can be
// modified.
type Config struct {
ConnConfig * pgx . ConnConfig
// BeforeConnect is called before a new connection is made. It is passed a copy of the underlying pgx.ConnConfig and
// will not impact any existing open connections.
BeforeConnect func ( context . Context , * pgx . ConnConfig ) error
// AfterConnect is called after a connection is established, but before it is added to the pool.
AfterConnect func ( context . Context , * pgx . Conn ) error
// BeforeAcquire is called before a connection is acquired from the pool. It must return true to allow the
// acquisition or false to indicate that the connection should be destroyed and a different connection should be
// acquired.
BeforeAcquire func ( context . Context , * pgx . Conn ) bool
// AfterRelease is called after a connection is released, but before it is returned to the pool. It must return true to
// return the connection to the pool or false to destroy the connection.
AfterRelease func ( * pgx . Conn ) bool
// BeforeClose is called right before a connection is closed and removed from the pool.
BeforeClose func ( * pgx . Conn )
// MaxConnLifetime is the duration since creation after which a connection will be automatically closed.
MaxConnLifetime time . Duration
// MaxConnLifetimeJitter is the duration after MaxConnLifetime to randomly decide to close a connection.
// This helps prevent all connections from being closed at the exact same time, starving the pool.
MaxConnLifetimeJitter time . Duration
// MaxConnIdleTime is the duration after which an idle connection will be automatically closed by the health check.
MaxConnIdleTime time . Duration
// MaxConns is the maximum size of the pool. The default is the greater of 4 or runtime.NumCPU().
MaxConns int32
// MinConns is the minimum size of the pool. After connection closes, the pool might dip below MinConns. A low
// number of MinConns might mean the pool is empty after MaxConnLifetime until the health check has a chance
// to create new connections.
MinConns int32
// HealthCheckPeriod is the duration between checks of the health of idle connections.
HealthCheckPeriod time . Duration
createdByParseConfig bool // Used to enforce created by ParseConfig rule.
}
// Copy returns a deep copy of the config that is safe to use and modify.
// The only exception is the tls.Config:
// according to the tls.Config docs it must not be modified after creation.
func ( c * Config ) Copy ( ) * Config {
newConfig := new ( Config )
* newConfig = * c
newConfig . ConnConfig = c . ConnConfig . Copy ( )
return newConfig
}
// ConnString returns the connection string as parsed by pgxpool.ParseConfig into pgxpool.Config.
func ( c * Config ) ConnString ( ) string { return c . ConnConfig . ConnString ( ) }
// New creates a new Pool. See [ParseConfig] for information on connString format.
func New ( ctx context . Context , connString string ) ( * Pool , error ) {
config , err := ParseConfig ( connString )
if err != nil {
return nil , err
}
return NewWithConfig ( ctx , config )
}
// NewWithConfig creates a new Pool. config must have been created by [ParseConfig].
func NewWithConfig ( ctx context . Context , config * Config ) ( * Pool , error ) {
// Default values are set in ParseConfig. Enforce initial creation by ParseConfig rather than setting defaults from
// zero values.
if ! config . createdByParseConfig {
panic ( "config must be created by ParseConfig" )
}
p := & Pool {
config : config ,
beforeConnect : config . BeforeConnect ,
afterConnect : config . AfterConnect ,
beforeAcquire : config . BeforeAcquire ,
afterRelease : config . AfterRelease ,
beforeClose : config . BeforeClose ,
minConns : config . MinConns ,
maxConns : config . MaxConns ,
maxConnLifetime : config . MaxConnLifetime ,
maxConnLifetimeJitter : config . MaxConnLifetimeJitter ,
maxConnIdleTime : config . MaxConnIdleTime ,
healthCheckPeriod : config . HealthCheckPeriod ,
healthCheckChan : make ( chan struct { } , 1 ) ,
closeChan : make ( chan struct { } ) ,
}
2024-05-27 10:35:41 +01:00
if t , ok := config . ConnConfig . Tracer . ( AcquireTracer ) ; ok {
p . acquireTracer = t
}
if t , ok := config . ConnConfig . Tracer . ( ReleaseTracer ) ; ok {
p . releaseTracer = t
}
2023-11-06 14:44:53 +00:00
var err error
p . p , err = puddle . NewPool (
& puddle . Config [ * connResource ] {
Constructor : func ( ctx context . Context ) ( * connResource , error ) {
atomic . AddInt64 ( & p . newConnsCount , 1 )
connConfig := p . config . ConnConfig . Copy ( )
// Connection will continue in background even if Acquire is canceled. Ensure that a connect won't hang forever.
if connConfig . ConnectTimeout <= 0 {
connConfig . ConnectTimeout = 2 * time . Minute
}
if p . beforeConnect != nil {
if err := p . beforeConnect ( ctx , connConfig ) ; err != nil {
return nil , err
}
}
conn , err := pgx . ConnectConfig ( ctx , connConfig )
if err != nil {
return nil , err
}
if p . afterConnect != nil {
err = p . afterConnect ( ctx , conn )
if err != nil {
conn . Close ( ctx )
return nil , err
}
}
jitterSecs := rand . Float64 ( ) * config . MaxConnLifetimeJitter . Seconds ( )
maxAgeTime := time . Now ( ) . Add ( config . MaxConnLifetime ) . Add ( time . Duration ( jitterSecs ) * time . Second )
cr := & connResource {
conn : conn ,
conns : make ( [ ] Conn , 64 ) ,
poolRows : make ( [ ] poolRow , 64 ) ,
poolRowss : make ( [ ] poolRows , 64 ) ,
maxAgeTime : maxAgeTime ,
}
return cr , nil
} ,
Destructor : func ( value * connResource ) {
ctx , cancel := context . WithTimeout ( context . Background ( ) , 15 * time . Second )
conn := value . conn
if p . beforeClose != nil {
p . beforeClose ( conn )
}
conn . Close ( ctx )
select {
case <- conn . PgConn ( ) . CleanupDone ( ) :
case <- ctx . Done ( ) :
}
cancel ( )
} ,
MaxSize : config . MaxConns ,
} ,
)
if err != nil {
return nil , err
}
go func ( ) {
p . createIdleResources ( ctx , int ( p . minConns ) )
p . backgroundHealthCheck ( )
} ( )
return p , nil
}
// ParseConfig builds a Config from connString. It parses connString with the same behavior as [pgx.ParseConfig] with the
// addition of the following variables:
//
// - pool_max_conns: integer greater than 0
// - pool_min_conns: integer 0 or greater
// - pool_max_conn_lifetime: duration string
// - pool_max_conn_idle_time: duration string
// - pool_health_check_period: duration string
// - pool_max_conn_lifetime_jitter: duration string
//
// See Config for definitions of these arguments.
//
2024-05-27 10:35:41 +01:00
// # Example Keyword/Value
2023-11-06 14:44:53 +00:00
// user=jack password=secret host=pg.example.com port=5432 dbname=mydb sslmode=verify-ca pool_max_conns=10
//
// # Example URL
// postgres://jack:secret@pg.example.com:5432/mydb?sslmode=verify-ca&pool_max_conns=10
func ParseConfig ( connString string ) ( * Config , error ) {
connConfig , err := pgx . ParseConfig ( connString )
if err != nil {
return nil , err
}
config := & Config {
ConnConfig : connConfig ,
createdByParseConfig : true ,
}
if s , ok := config . ConnConfig . Config . RuntimeParams [ "pool_max_conns" ] ; ok {
delete ( connConfig . Config . RuntimeParams , "pool_max_conns" )
n , err := strconv . ParseInt ( s , 10 , 32 )
if err != nil {
return nil , fmt . Errorf ( "cannot parse pool_max_conns: %w" , err )
}
if n < 1 {
return nil , fmt . Errorf ( "pool_max_conns too small: %d" , n )
}
config . MaxConns = int32 ( n )
} else {
config . MaxConns = defaultMaxConns
if numCPU := int32 ( runtime . NumCPU ( ) ) ; numCPU > config . MaxConns {
config . MaxConns = numCPU
}
}
if s , ok := config . ConnConfig . Config . RuntimeParams [ "pool_min_conns" ] ; ok {
delete ( connConfig . Config . RuntimeParams , "pool_min_conns" )
n , err := strconv . ParseInt ( s , 10 , 32 )
if err != nil {
return nil , fmt . Errorf ( "cannot parse pool_min_conns: %w" , err )
}
config . MinConns = int32 ( n )
} else {
config . MinConns = defaultMinConns
}
if s , ok := config . ConnConfig . Config . RuntimeParams [ "pool_max_conn_lifetime" ] ; ok {
delete ( connConfig . Config . RuntimeParams , "pool_max_conn_lifetime" )
d , err := time . ParseDuration ( s )
if err != nil {
return nil , fmt . Errorf ( "invalid pool_max_conn_lifetime: %w" , err )
}
config . MaxConnLifetime = d
} else {
config . MaxConnLifetime = defaultMaxConnLifetime
}
if s , ok := config . ConnConfig . Config . RuntimeParams [ "pool_max_conn_idle_time" ] ; ok {
delete ( connConfig . Config . RuntimeParams , "pool_max_conn_idle_time" )
d , err := time . ParseDuration ( s )
if err != nil {
return nil , fmt . Errorf ( "invalid pool_max_conn_idle_time: %w" , err )
}
config . MaxConnIdleTime = d
} else {
config . MaxConnIdleTime = defaultMaxConnIdleTime
}
if s , ok := config . ConnConfig . Config . RuntimeParams [ "pool_health_check_period" ] ; ok {
delete ( connConfig . Config . RuntimeParams , "pool_health_check_period" )
d , err := time . ParseDuration ( s )
if err != nil {
return nil , fmt . Errorf ( "invalid pool_health_check_period: %w" , err )
}
config . HealthCheckPeriod = d
} else {
config . HealthCheckPeriod = defaultHealthCheckPeriod
}
if s , ok := config . ConnConfig . Config . RuntimeParams [ "pool_max_conn_lifetime_jitter" ] ; ok {
delete ( connConfig . Config . RuntimeParams , "pool_max_conn_lifetime_jitter" )
d , err := time . ParseDuration ( s )
if err != nil {
return nil , fmt . Errorf ( "invalid pool_max_conn_lifetime_jitter: %w" , err )
}
config . MaxConnLifetimeJitter = d
}
return config , nil
}
// Close closes all connections in the pool and rejects future Acquire calls. Blocks until all connections are returned
// to pool and closed.
func ( p * Pool ) Close ( ) {
p . closeOnce . Do ( func ( ) {
close ( p . closeChan )
p . p . Close ( )
} )
}
func ( p * Pool ) isExpired ( res * puddle . Resource [ * connResource ] ) bool {
return time . Now ( ) . After ( res . Value ( ) . maxAgeTime )
}
func ( p * Pool ) triggerHealthCheck ( ) {
go func ( ) {
// Destroy is asynchronous so we give it time to actually remove itself from
// the pool otherwise we might try to check the pool size too soon
time . Sleep ( 500 * time . Millisecond )
select {
case p . healthCheckChan <- struct { } { } :
default :
}
} ( )
}
func ( p * Pool ) backgroundHealthCheck ( ) {
ticker := time . NewTicker ( p . healthCheckPeriod )
defer ticker . Stop ( )
for {
select {
case <- p . closeChan :
return
case <- p . healthCheckChan :
p . checkHealth ( )
case <- ticker . C :
p . checkHealth ( )
}
}
}
func ( p * Pool ) checkHealth ( ) {
for {
// If checkMinConns failed we don't destroy any connections since we couldn't
// even get to minConns
if err := p . checkMinConns ( ) ; err != nil {
// Should we log this error somewhere?
break
}
if ! p . checkConnsHealth ( ) {
// Since we didn't destroy any connections we can stop looping
break
}
// Technically Destroy is asynchronous but 500ms should be enough for it to
// remove it from the underlying pool
select {
case <- p . closeChan :
return
case <- time . After ( 500 * time . Millisecond ) :
}
}
}
// checkConnsHealth will check all idle connections, destroy a connection if
// it's idle or too old, and returns true if any were destroyed
func ( p * Pool ) checkConnsHealth ( ) bool {
var destroyed bool
totalConns := p . Stat ( ) . TotalConns ( )
resources := p . p . AcquireAllIdle ( )
for _ , res := range resources {
// We're okay going under minConns if the lifetime is up
if p . isExpired ( res ) && totalConns >= p . minConns {
atomic . AddInt64 ( & p . lifetimeDestroyCount , 1 )
res . Destroy ( )
destroyed = true
// Since Destroy is async we manually decrement totalConns.
totalConns --
} else if res . IdleDuration ( ) > p . maxConnIdleTime && totalConns > p . minConns {
atomic . AddInt64 ( & p . idleDestroyCount , 1 )
res . Destroy ( )
destroyed = true
// Since Destroy is async we manually decrement totalConns.
totalConns --
} else {
res . ReleaseUnused ( )
}
}
return destroyed
}
func ( p * Pool ) checkMinConns ( ) error {
// TotalConns can include ones that are being destroyed but we should have
// sleep(500ms) around all of the destroys to help prevent that from throwing
// off this check
toCreate := p . minConns - p . Stat ( ) . TotalConns ( )
if toCreate > 0 {
return p . createIdleResources ( context . Background ( ) , int ( toCreate ) )
}
return nil
}
func ( p * Pool ) createIdleResources ( parentCtx context . Context , targetResources int ) error {
ctx , cancel := context . WithCancel ( parentCtx )
defer cancel ( )
errs := make ( chan error , targetResources )
for i := 0 ; i < targetResources ; i ++ {
go func ( ) {
err := p . p . CreateResource ( ctx )
// Ignore ErrNotAvailable since it means that the pool has become full since we started creating resource.
if err == puddle . ErrNotAvailable {
err = nil
}
errs <- err
} ( )
}
var firstError error
for i := 0 ; i < targetResources ; i ++ {
err := <- errs
if err != nil && firstError == nil {
cancel ( )
firstError = err
}
}
return firstError
}
// Acquire returns a connection (*Conn) from the Pool
2024-05-27 10:35:41 +01:00
func ( p * Pool ) Acquire ( ctx context . Context ) ( c * Conn , err error ) {
if p . acquireTracer != nil {
ctx = p . acquireTracer . TraceAcquireStart ( ctx , p , TraceAcquireStartData { } )
defer func ( ) {
var conn * pgx . Conn
if c != nil {
conn = c . Conn ( )
}
p . acquireTracer . TraceAcquireEnd ( ctx , p , TraceAcquireEndData { Conn : conn , Err : err } )
} ( )
}
2023-11-06 14:44:53 +00:00
for {
res , err := p . p . Acquire ( ctx )
if err != nil {
return nil , err
}
cr := res . Value ( )
if res . IdleDuration ( ) > time . Second {
err := cr . conn . Ping ( ctx )
if err != nil {
res . Destroy ( )
continue
}
}
if p . beforeAcquire == nil || p . beforeAcquire ( ctx , cr . conn ) {
return cr . getConn ( p , res ) , nil
}
res . Destroy ( )
}
}
// AcquireFunc acquires a *Conn and calls f with that *Conn. ctx will only affect the Acquire. It has no effect on the
// call of f. The return value is either an error acquiring the *Conn or the return value of f. The *Conn is
// automatically released after the call of f.
func ( p * Pool ) AcquireFunc ( ctx context . Context , f func ( * Conn ) error ) error {
conn , err := p . Acquire ( ctx )
if err != nil {
return err
}
defer conn . Release ( )
return f ( conn )
}
// AcquireAllIdle atomically acquires all currently idle connections. Its intended use is for health check and
// keep-alive functionality. It does not update pool statistics.
func ( p * Pool ) AcquireAllIdle ( ctx context . Context ) [ ] * Conn {
resources := p . p . AcquireAllIdle ( )
conns := make ( [ ] * Conn , 0 , len ( resources ) )
for _ , res := range resources {
cr := res . Value ( )
if p . beforeAcquire == nil || p . beforeAcquire ( ctx , cr . conn ) {
conns = append ( conns , cr . getConn ( p , res ) )
} else {
res . Destroy ( )
}
}
return conns
}
// Reset closes all connections, but leaves the pool open. It is intended for use when an error is detected that would
// disrupt all connections (such as a network interruption or a server state change).
//
// It is safe to reset a pool while connections are checked out. Those connections will be closed when they are returned
// to the pool.
func ( p * Pool ) Reset ( ) {
p . p . Reset ( )
}
// Config returns a copy of config that was used to initialize this pool.
func ( p * Pool ) Config ( ) * Config { return p . config . Copy ( ) }
// Stat returns a pgxpool.Stat struct with a snapshot of Pool statistics.
func ( p * Pool ) Stat ( ) * Stat {
return & Stat {
s : p . p . Stat ( ) ,
newConnsCount : atomic . LoadInt64 ( & p . newConnsCount ) ,
lifetimeDestroyCount : atomic . LoadInt64 ( & p . lifetimeDestroyCount ) ,
idleDestroyCount : atomic . LoadInt64 ( & p . idleDestroyCount ) ,
}
}
// Exec acquires a connection from the Pool and executes the given SQL.
// SQL can be either a prepared statement name or an SQL string.
// Arguments should be referenced positionally from the SQL string as $1, $2, etc.
// The acquired connection is returned to the pool when the Exec function returns.
func ( p * Pool ) Exec ( ctx context . Context , sql string , arguments ... any ) ( pgconn . CommandTag , error ) {
c , err := p . Acquire ( ctx )
if err != nil {
return pgconn . CommandTag { } , err
}
defer c . Release ( )
return c . Exec ( ctx , sql , arguments ... )
}
// Query acquires a connection and executes a query that returns pgx.Rows.
// Arguments should be referenced positionally from the SQL string as $1, $2, etc.
// See pgx.Rows documentation to close the returned Rows and return the acquired connection to the Pool.
//
// If there is an error, the returned pgx.Rows will be returned in an error state.
// If preferred, ignore the error returned from Query and handle errors using the returned pgx.Rows.
//
// For extra control over how the query is executed, the types QuerySimpleProtocol, QueryResultFormats, and
// QueryResultFormatsByOID may be used as the first args to control exactly how the query is executed. This is rarely
// needed. See the documentation for those types for details.
func ( p * Pool ) Query ( ctx context . Context , sql string , args ... any ) ( pgx . Rows , error ) {
c , err := p . Acquire ( ctx )
if err != nil {
return errRows { err : err } , err
}
rows , err := c . Query ( ctx , sql , args ... )
if err != nil {
c . Release ( )
return errRows { err : err } , err
}
return c . getPoolRows ( rows ) , nil
}
// QueryRow acquires a connection and executes a query that is expected
// to return at most one row (pgx.Row). Errors are deferred until pgx.Row's
// Scan method is called. If the query selects no rows, pgx.Row's Scan will
// return ErrNoRows. Otherwise, pgx.Row's Scan scans the first selected row
// and discards the rest. The acquired connection is returned to the Pool when
// pgx.Row's Scan method is called.
//
// Arguments should be referenced positionally from the SQL string as $1, $2, etc.
//
// For extra control over how the query is executed, the types QuerySimpleProtocol, QueryResultFormats, and
// QueryResultFormatsByOID may be used as the first args to control exactly how the query is executed. This is rarely
// needed. See the documentation for those types for details.
func ( p * Pool ) QueryRow ( ctx context . Context , sql string , args ... any ) pgx . Row {
c , err := p . Acquire ( ctx )
if err != nil {
return errRow { err : err }
}
row := c . QueryRow ( ctx , sql , args ... )
return c . getPoolRow ( row )
}
func ( p * Pool ) SendBatch ( ctx context . Context , b * pgx . Batch ) pgx . BatchResults {
c , err := p . Acquire ( ctx )
if err != nil {
return errBatchResults { err : err }
}
br := c . SendBatch ( ctx , b )
return & poolBatchResults { br : br , c : c }
}
// Begin acquires a connection from the Pool and starts a transaction. Unlike database/sql, the context only affects the begin command. i.e. there is no
// auto-rollback on context cancellation. Begin initiates a transaction block without explicitly setting a transaction mode for the block (see BeginTx with TxOptions if transaction mode is required).
// *pgxpool.Tx is returned, which implements the pgx.Tx interface.
// Commit or Rollback must be called on the returned transaction to finalize the transaction block.
func ( p * Pool ) Begin ( ctx context . Context ) ( pgx . Tx , error ) {
return p . BeginTx ( ctx , pgx . TxOptions { } )
}
// BeginTx acquires a connection from the Pool and starts a transaction with pgx.TxOptions determining the transaction mode.
// Unlike database/sql, the context only affects the begin command. i.e. there is no auto-rollback on context cancellation.
// *pgxpool.Tx is returned, which implements the pgx.Tx interface.
// Commit or Rollback must be called on the returned transaction to finalize the transaction block.
func ( p * Pool ) BeginTx ( ctx context . Context , txOptions pgx . TxOptions ) ( pgx . Tx , error ) {
c , err := p . Acquire ( ctx )
if err != nil {
return nil , err
}
t , err := c . BeginTx ( ctx , txOptions )
if err != nil {
c . Release ( )
return nil , err
}
return & Tx { t : t , c : c } , nil
}
func ( p * Pool ) CopyFrom ( ctx context . Context , tableName pgx . Identifier , columnNames [ ] string , rowSrc pgx . CopyFromSource ) ( int64 , error ) {
c , err := p . Acquire ( ctx )
if err != nil {
return 0 , err
}
defer c . Release ( )
return c . Conn ( ) . CopyFrom ( ctx , tableName , columnNames , rowSrc )
}
// Ping acquires a connection from the Pool and executes an empty sql statement against it.
// If the sql returns without error, the database Ping is considered successful, otherwise, the error is returned.
func ( p * Pool ) Ping ( ctx context . Context ) error {
c , err := p . Acquire ( ctx )
if err != nil {
return err
}
defer c . Release ( )
return c . Ping ( ctx )
}