Skip to content

Commit

Permalink
*Pipeline.getResults should close pipeline on error
Browse files Browse the repository at this point in the history
Otherwise, it might be possible to panic when closing the pipeline if it
tries to read a connection that should be closed but still has a fatal
error on the wire.

#1920
  • Loading branch information
jackc committed Mar 1, 2024
1 parent d149d3f commit 2e84dcc
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 0 deletions.
2 changes: 2 additions & 0 deletions pgconn/pgconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -2094,6 +2094,8 @@ func (p *Pipeline) getResults() (results any, err error) {
for {
msg, err := p.conn.receiveMessage()
if err != nil {
p.closed = true
p.err = err
p.conn.asyncClose()
return nil, normalizeTimeoutError(p.ctx, err)
}
Expand Down
83 changes: 83 additions & 0 deletions pgconn/pgconn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3389,3 +3389,86 @@ func TestSNISupport(t *testing.T) {
})
}
}

// https://github.com/jackc/pgx/issues/1920
func TestFatalErrorReceivedInPipelineMode(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel()

steps := pgmock.AcceptUnauthenticatedConnRequestSteps()
steps = append(steps, pgmock.ExpectAnyMessage(&pgproto3.Parse{}))
steps = append(steps, pgmock.ExpectAnyMessage(&pgproto3.Describe{}))
steps = append(steps, pgmock.ExpectAnyMessage(&pgproto3.Parse{}))
steps = append(steps, pgmock.ExpectAnyMessage(&pgproto3.Describe{}))
steps = append(steps, pgmock.ExpectAnyMessage(&pgproto3.Parse{}))
steps = append(steps, pgmock.ExpectAnyMessage(&pgproto3.Describe{}))
steps = append(steps, pgmock.SendMessage(&pgproto3.RowDescription{Fields: []pgproto3.FieldDescription{
{Name: []byte("mock")},
}}))
steps = append(steps, pgmock.SendMessage(&pgproto3.ErrorResponse{Severity: "FATAL", Code: "57P01"}))
// We shouldn't get anything after the first fatal error. But the reported issue was with PgBouncer so maybe that
// causes the issue. Anyway, a FATAL error after the connection had already been killed could cause a panic.
steps = append(steps, pgmock.SendMessage(&pgproto3.ErrorResponse{Severity: "FATAL", Code: "57P01"}))

script := &pgmock.Script{Steps: steps}

ln, err := net.Listen("tcp", "127.0.0.1:")
require.NoError(t, err)
defer ln.Close()

serverKeepAlive := make(chan struct{})
defer close(serverKeepAlive)

serverErrChan := make(chan error, 1)
go func() {
defer close(serverErrChan)

conn, err := ln.Accept()
if err != nil {
serverErrChan <- err
return
}
defer conn.Close()

err = conn.SetDeadline(time.Now().Add(59 * time.Second))
if err != nil {
serverErrChan <- err
return
}

err = script.Run(pgproto3.NewBackend(conn, conn))
if err != nil {
serverErrChan <- err
return
}

<-serverKeepAlive
}()

parts := strings.Split(ln.Addr().String(), ":")
host := parts[0]
port := parts[1]
connStr := fmt.Sprintf("sslmode=disable host=%s port=%s", host, port)

ctx, cancel = context.WithTimeout(ctx, 59*time.Second)
defer cancel()
conn, err := pgconn.Connect(ctx, connStr)
require.NoError(t, err)

pipeline := conn.StartPipeline(ctx)
pipeline.SendPrepare("s1", "select 1", nil)
pipeline.SendPrepare("s2", "select 2", nil)
pipeline.SendPrepare("s3", "select 3", nil)
err = pipeline.Sync()
require.NoError(t, err)

_, err = pipeline.GetResults()
require.NoError(t, err)
_, err = pipeline.GetResults()
require.Error(t, err)

err = pipeline.Close()
require.Error(t, err)
}

0 comments on commit 2e84dcc

Please sign in to comment.