2021-08-25 14:34:33 +01:00
|
|
|
package pgx
|
|
|
|
|
|
|
|
import (
|
|
|
|
"bytes"
|
|
|
|
"context"
|
|
|
|
"fmt"
|
|
|
|
"io"
|
|
|
|
|
2023-05-12 13:33:40 +01:00
|
|
|
"github.com/jackc/pgx/v5/internal/pgio"
|
|
|
|
"github.com/jackc/pgx/v5/pgconn"
|
2021-08-25 14:34:33 +01:00
|
|
|
)
|
|
|
|
|
|
|
|
// CopyFromRows returns a CopyFromSource interface over the provided rows slice
|
|
|
|
// making it usable by *Conn.CopyFrom.
|
2023-05-12 13:33:40 +01:00
|
|
|
func CopyFromRows(rows [][]any) CopyFromSource {
|
2021-08-25 14:34:33 +01:00
|
|
|
return ©FromRows{rows: rows, idx: -1}
|
|
|
|
}
|
|
|
|
|
|
|
|
type copyFromRows struct {
|
2023-05-12 13:33:40 +01:00
|
|
|
rows [][]any
|
2021-08-25 14:34:33 +01:00
|
|
|
idx int
|
|
|
|
}
|
|
|
|
|
|
|
|
func (ctr *copyFromRows) Next() bool {
|
|
|
|
ctr.idx++
|
|
|
|
return ctr.idx < len(ctr.rows)
|
|
|
|
}
|
|
|
|
|
2023-05-12 13:33:40 +01:00
|
|
|
func (ctr *copyFromRows) Values() ([]any, error) {
|
2021-08-25 14:34:33 +01:00
|
|
|
return ctr.rows[ctr.idx], nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (ctr *copyFromRows) Err() error {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// CopyFromSlice returns a CopyFromSource interface over a dynamic func
|
|
|
|
// making it usable by *Conn.CopyFrom.
|
2023-05-12 13:33:40 +01:00
|
|
|
func CopyFromSlice(length int, next func(int) ([]any, error)) CopyFromSource {
|
2021-08-25 14:34:33 +01:00
|
|
|
return ©FromSlice{next: next, idx: -1, len: length}
|
|
|
|
}
|
|
|
|
|
|
|
|
type copyFromSlice struct {
|
2023-05-12 13:33:40 +01:00
|
|
|
next func(int) ([]any, error)
|
2021-08-25 14:34:33 +01:00
|
|
|
idx int
|
|
|
|
len int
|
|
|
|
err error
|
|
|
|
}
|
|
|
|
|
|
|
|
func (cts *copyFromSlice) Next() bool {
|
|
|
|
cts.idx++
|
|
|
|
return cts.idx < cts.len
|
|
|
|
}
|
|
|
|
|
2023-05-12 13:33:40 +01:00
|
|
|
func (cts *copyFromSlice) Values() ([]any, error) {
|
2021-08-25 14:34:33 +01:00
|
|
|
values, err := cts.next(cts.idx)
|
|
|
|
if err != nil {
|
|
|
|
cts.err = err
|
|
|
|
}
|
|
|
|
return values, err
|
|
|
|
}
|
|
|
|
|
|
|
|
func (cts *copyFromSlice) Err() error {
|
|
|
|
return cts.err
|
|
|
|
}
|
|
|
|
|
|
|
|
// CopyFromSource is the interface used by *Conn.CopyFrom as the source for copy data.
|
|
|
|
type CopyFromSource interface {
|
|
|
|
// Next returns true if there is another row and makes the next row data
|
|
|
|
// available to Values(). When there are no more rows available or an error
|
|
|
|
// has occurred it returns false.
|
|
|
|
Next() bool
|
|
|
|
|
|
|
|
// Values returns the values for the current row.
|
2023-05-12 13:33:40 +01:00
|
|
|
Values() ([]any, error)
|
2021-08-25 14:34:33 +01:00
|
|
|
|
|
|
|
// Err returns any error that has been encountered by the CopyFromSource. If
|
|
|
|
// this is not nil *Conn.CopyFrom will abort the copy.
|
|
|
|
Err() error
|
|
|
|
}
|
|
|
|
|
|
|
|
type copyFrom struct {
|
|
|
|
conn *Conn
|
|
|
|
tableName Identifier
|
|
|
|
columnNames []string
|
|
|
|
rowSrc CopyFromSource
|
|
|
|
readerErrChan chan error
|
2023-05-12 13:33:40 +01:00
|
|
|
mode QueryExecMode
|
2021-08-25 14:34:33 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
func (ct *copyFrom) run(ctx context.Context) (int64, error) {
|
2023-05-12 13:33:40 +01:00
|
|
|
if ct.conn.copyFromTracer != nil {
|
|
|
|
ctx = ct.conn.copyFromTracer.TraceCopyFromStart(ctx, ct.conn, TraceCopyFromStartData{
|
|
|
|
TableName: ct.tableName,
|
|
|
|
ColumnNames: ct.columnNames,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2021-08-25 14:34:33 +01:00
|
|
|
quotedTableName := ct.tableName.Sanitize()
|
|
|
|
cbuf := &bytes.Buffer{}
|
|
|
|
for i, cn := range ct.columnNames {
|
|
|
|
if i != 0 {
|
|
|
|
cbuf.WriteString(", ")
|
|
|
|
}
|
|
|
|
cbuf.WriteString(quoteIdentifier(cn))
|
|
|
|
}
|
|
|
|
quotedColumnNames := cbuf.String()
|
|
|
|
|
2023-05-12 13:33:40 +01:00
|
|
|
var sd *pgconn.StatementDescription
|
|
|
|
switch ct.mode {
|
|
|
|
case QueryExecModeExec, QueryExecModeSimpleProtocol:
|
|
|
|
// These modes don't support the binary format. Before the inclusion of the
|
|
|
|
// QueryExecModes, Conn.Prepare was called on every COPY operation to get
|
|
|
|
// the OIDs. These prepared statements were not cached.
|
|
|
|
//
|
|
|
|
// Since that's the same behavior provided by QueryExecModeDescribeExec,
|
|
|
|
// we'll default to that mode.
|
|
|
|
ct.mode = QueryExecModeDescribeExec
|
|
|
|
fallthrough
|
|
|
|
case QueryExecModeCacheStatement, QueryExecModeCacheDescribe, QueryExecModeDescribeExec:
|
|
|
|
var err error
|
|
|
|
sd, err = ct.conn.getStatementDescription(
|
|
|
|
ctx,
|
|
|
|
ct.mode,
|
|
|
|
fmt.Sprintf("select %s from %s", quotedColumnNames, quotedTableName),
|
|
|
|
)
|
|
|
|
if err != nil {
|
|
|
|
return 0, fmt.Errorf("statement description failed: %w", err)
|
|
|
|
}
|
|
|
|
default:
|
|
|
|
return 0, fmt.Errorf("unknown QueryExecMode: %v", ct.mode)
|
2021-08-25 14:34:33 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
r, w := io.Pipe()
|
|
|
|
doneChan := make(chan struct{})
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
defer close(doneChan)
|
|
|
|
|
|
|
|
// Purposely NOT using defer w.Close(). See https://github.com/golang/go/issues/24283.
|
|
|
|
buf := ct.conn.wbuf
|
|
|
|
|
|
|
|
buf = append(buf, "PGCOPY\n\377\r\n\000"...)
|
|
|
|
buf = pgio.AppendInt32(buf, 0)
|
|
|
|
buf = pgio.AppendInt32(buf, 0)
|
|
|
|
|
|
|
|
moreRows := true
|
|
|
|
for moreRows {
|
|
|
|
var err error
|
|
|
|
moreRows, buf, err = ct.buildCopyBuf(buf, sd)
|
|
|
|
if err != nil {
|
|
|
|
w.CloseWithError(err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
if ct.rowSrc.Err() != nil {
|
|
|
|
w.CloseWithError(ct.rowSrc.Err())
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
if len(buf) > 0 {
|
|
|
|
_, err = w.Write(buf)
|
|
|
|
if err != nil {
|
|
|
|
w.Close()
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
buf = buf[:0]
|
|
|
|
}
|
|
|
|
|
|
|
|
w.Close()
|
|
|
|
}()
|
|
|
|
|
|
|
|
commandTag, err := ct.conn.pgConn.CopyFrom(ctx, r, fmt.Sprintf("copy %s ( %s ) from stdin binary;", quotedTableName, quotedColumnNames))
|
|
|
|
|
|
|
|
r.Close()
|
|
|
|
<-doneChan
|
|
|
|
|
2023-05-12 13:33:40 +01:00
|
|
|
if ct.conn.copyFromTracer != nil {
|
|
|
|
ct.conn.copyFromTracer.TraceCopyFromEnd(ctx, ct.conn, TraceCopyFromEndData{
|
|
|
|
CommandTag: commandTag,
|
|
|
|
Err: err,
|
|
|
|
})
|
2021-08-25 14:34:33 +01:00
|
|
|
}
|
|
|
|
|
2023-05-12 13:33:40 +01:00
|
|
|
return commandTag.RowsAffected(), err
|
2021-08-25 14:34:33 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
func (ct *copyFrom) buildCopyBuf(buf []byte, sd *pgconn.StatementDescription) (bool, []byte, error) {
|
2023-05-12 13:33:40 +01:00
|
|
|
const sendBufSize = 65536 - 5 // The packet has a 5-byte header
|
|
|
|
lastBufLen := 0
|
|
|
|
largestRowLen := 0
|
2021-08-25 14:34:33 +01:00
|
|
|
|
|
|
|
for ct.rowSrc.Next() {
|
2023-05-12 13:33:40 +01:00
|
|
|
lastBufLen = len(buf)
|
|
|
|
|
2021-08-25 14:34:33 +01:00
|
|
|
values, err := ct.rowSrc.Values()
|
|
|
|
if err != nil {
|
|
|
|
return false, nil, err
|
|
|
|
}
|
|
|
|
if len(values) != len(ct.columnNames) {
|
|
|
|
return false, nil, fmt.Errorf("expected %d values, got %d values", len(ct.columnNames), len(values))
|
|
|
|
}
|
|
|
|
|
|
|
|
buf = pgio.AppendInt16(buf, int16(len(ct.columnNames)))
|
|
|
|
for i, val := range values {
|
2023-05-12 13:33:40 +01:00
|
|
|
buf, err = encodeCopyValue(ct.conn.typeMap, buf, sd.Fields[i].DataTypeOID, val)
|
2021-08-25 14:34:33 +01:00
|
|
|
if err != nil {
|
|
|
|
return false, nil, err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-05-12 13:33:40 +01:00
|
|
|
rowLen := len(buf) - lastBufLen
|
|
|
|
if rowLen > largestRowLen {
|
|
|
|
largestRowLen = rowLen
|
|
|
|
}
|
|
|
|
|
|
|
|
// Try not to overflow size of the buffer PgConn.CopyFrom will be reading into. If that happens then the nature of
|
|
|
|
// io.Pipe means that the next Read will be short. This can lead to pathological send sizes such as 65531, 13, 65531
|
|
|
|
// 13, 65531, 13, 65531, 13.
|
|
|
|
if len(buf) > sendBufSize-largestRowLen {
|
2021-08-25 14:34:33 +01:00
|
|
|
return true, buf, nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return false, buf, nil
|
|
|
|
}
|
|
|
|
|
2023-05-12 13:33:40 +01:00
|
|
|
// CopyFrom uses the PostgreSQL copy protocol to perform bulk data insertion. It returns the number of rows copied and
|
|
|
|
// an error.
|
|
|
|
//
|
|
|
|
// CopyFrom requires all values use the binary format. A pgtype.Type that supports the binary format must be registered
|
|
|
|
// for the type of each column. Almost all types implemented by pgx support the binary format.
|
2021-08-25 14:34:33 +01:00
|
|
|
//
|
2023-05-12 13:33:40 +01:00
|
|
|
// Even though enum types appear to be strings they still must be registered to use with CopyFrom. This can be done with
|
|
|
|
// Conn.LoadType and pgtype.Map.RegisterType.
|
2021-08-25 14:34:33 +01:00
|
|
|
func (c *Conn) CopyFrom(ctx context.Context, tableName Identifier, columnNames []string, rowSrc CopyFromSource) (int64, error) {
|
|
|
|
ct := ©From{
|
|
|
|
conn: c,
|
|
|
|
tableName: tableName,
|
|
|
|
columnNames: columnNames,
|
|
|
|
rowSrc: rowSrc,
|
|
|
|
readerErrChan: make(chan error),
|
2023-05-12 13:33:40 +01:00
|
|
|
mode: c.config.DefaultQueryExecMode,
|
2021-08-25 14:34:33 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
return ct.run(ctx)
|
|
|
|
}
|