core: Refactor and improve listener logic (#5089)

* core: Refactor, improve listener logic

Deprecate:
- caddy.Listen
- caddy.ListenTimeout
- caddy.ListenPacket

Prefer caddy.NetworkAddress.Listen() instead.

Change:
- caddy.ListenQUIC (hopefully to remove later)
- caddy.ListenerFunc signature (add context and ListenConfig)

- Don't emit Alt-Svc header advertising h3 over HTTP/3

- Use quic.ListenEarly instead of quic.ListenEarlyAddr; this gives us
more flexibility (e.g. possibility of HTTP/3 over UDS) but also
introduces a new issue:
https://github.com/lucas-clemente/quic-go/issues/3560#issuecomment-1258959608

- Unlink unix socket before and after use

* Appease the linter

* Keep ListenAll
This commit is contained in:
Matt Holt 2022-09-28 13:35:51 -06:00 committed by GitHub
parent d0556929a4
commit e3e8aabbcf
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 568 additions and 298 deletions

View file

@ -382,7 +382,7 @@ func replaceLocalAdminServer(cfg *Config) error {
handler := cfg.Admin.newAdminHandler(addr, false) handler := cfg.Admin.newAdminHandler(addr, false)
ln, err := Listen(addr.Network, addr.JoinHostPort(0)) ln, err := addr.Listen(context.TODO(), 0, net.ListenConfig{})
if err != nil { if err != nil {
return err return err
} }
@ -403,7 +403,7 @@ func replaceLocalAdminServer(cfg *Config) error {
serverMu.Lock() serverMu.Lock()
server := localAdminServer server := localAdminServer
serverMu.Unlock() serverMu.Unlock()
if err := server.Serve(ln); !errors.Is(err, http.ErrServerClosed) { if err := server.Serve(ln.(net.Listener)); !errors.Is(err, http.ErrServerClosed) {
adminLogger.Error("admin server shutdown for unknown reason", zap.Error(err)) adminLogger.Error("admin server shutdown for unknown reason", zap.Error(err))
} }
}() }()
@ -549,10 +549,11 @@ func replaceRemoteAdminServer(ctx Context, cfg *Config) error {
serverMu.Unlock() serverMu.Unlock()
// start listener // start listener
ln, err := Listen(addr.Network, addr.JoinHostPort(0)) lnAny, err := addr.Listen(ctx, 0, net.ListenConfig{})
if err != nil { if err != nil {
return err return err
} }
ln := lnAny.(net.Listener)
ln = tls.NewListener(ln, tlsConfig) ln = tls.NewListener(ln, tlsConfig)
go func() { go func() {

View file

@ -20,7 +20,7 @@
package caddy package caddy
import ( import (
"fmt" "context"
"net" "net"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -29,21 +29,14 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
func ListenTimeout(network, addr string, keepAlivePeriod time.Duration) (net.Listener, error) { func reuseUnixSocket(network, addr string) (any, error) {
// check to see if plugin provides listener return nil, nil
if ln, err := getListenerFromPlugin(network, addr); err != nil || ln != nil {
return ln, err
} }
lnKey := listenerKey(network, addr) func listenTCPOrUnix(ctx context.Context, lnKey string, network, address string, config net.ListenConfig) (net.Listener, error) {
sharedLn, _, err := listenerPool.LoadOrNew(lnKey, func() (Destructor, error) { sharedLn, _, err := listenerPool.LoadOrNew(lnKey, func() (Destructor, error) {
ln, err := net.Listen(network, addr) ln, err := config.Listen(ctx, network, address)
if err != nil { if err != nil {
// https://github.com/caddyserver/caddy/pull/4534
if isUnixNetwork(network) && isListenBindAddressAlreadyInUseError(err) {
return nil, fmt.Errorf("%w: this can happen if Caddy was forcefully killed", err)
}
return nil, err return nil, err
} }
return &sharedListener{Listener: ln, key: lnKey}, nil return &sharedListener{Listener: ln, key: lnKey}, nil
@ -51,8 +44,7 @@ func ListenTimeout(network, addr string, keepAlivePeriod time.Duration) (net.Lis
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &fakeCloseListener{sharedListener: sharedLn.(*sharedListener), keepAlivePeriod: config.KeepAlive}, nil
return &fakeCloseListener{sharedListener: sharedLn.(*sharedListener), keepAlivePeriod: keepAlivePeriod}, nil
} }
// fakeCloseListener is a private wrapper over a listener that // fakeCloseListener is a private wrapper over a listener that
@ -156,8 +148,6 @@ func (sl *sharedListener) clearDeadline() error {
switch ln := sl.Listener.(type) { switch ln := sl.Listener.(type) {
case *net.TCPListener: case *net.TCPListener:
err = ln.SetDeadline(time.Time{}) err = ln.SetDeadline(time.Time{})
case *net.UnixListener:
err = ln.SetDeadline(time.Time{})
} }
sl.deadline = false sl.deadline = false
} }
@ -173,8 +163,6 @@ func (sl *sharedListener) setDeadline() error {
switch ln := sl.Listener.(type) { switch ln := sl.Listener.(type) {
case *net.TCPListener: case *net.TCPListener:
err = ln.SetDeadline(timeInPast) err = ln.SetDeadline(timeInPast)
case *net.UnixListener:
err = ln.SetDeadline(timeInPast)
} }
sl.deadline = true sl.deadline = true
} }

View file

@ -24,78 +24,88 @@ import (
"errors" "errors"
"io/fs" "io/fs"
"net" "net"
"sync" "sync/atomic"
"syscall" "syscall"
"time"
"go.uber.org/zap" "go.uber.org/zap"
"golang.org/x/sys/unix" "golang.org/x/sys/unix"
) )
// ListenTimeout is the same as Listen, but with a configurable keep-alive timeout duration. // reuseUnixSocket copies and reuses the unix domain socket (UDS) if we already
func ListenTimeout(network, addr string, keepalivePeriod time.Duration) (net.Listener, error) { // have it open; if not, unlink it so we can have it. No-op if not a unix network.
// check to see if plugin provides listener func reuseUnixSocket(network, addr string) (any, error) {
if ln, err := getListenerFromPlugin(network, addr); err != nil || ln != nil { if !isUnixNetwork(network) {
return ln, err return nil, nil
} }
socketKey := listenerKey(network, addr) socketKey := listenerKey(network, addr)
if isUnixNetwork(network) {
unixSocketsMu.Lock()
defer unixSocketsMu.Unlock()
socket, exists := unixSockets[socketKey] socket, exists := unixSockets[socketKey]
if exists { if exists {
// make copy of file descriptor // make copy of file descriptor
socketFile, err := socket.File() // dup() deep down socketFile, err := socket.File() // does dup() deep down
if err != nil { if err != nil {
return nil, err return nil, err
} }
// use copy to make new listener // use copied fd to make new Listener or PacketConn, then replace
// it in the map so that future copies always come from the most
// recent fd (as the previous ones will be closed, and we'd get
// "use of closed network connection" errors) -- note that we
// preserve the *pointer* to the counter (not just the value) so
// that all socket wrappers will refer to the same value
switch unixSocket := socket.(type) {
case *unixListener:
ln, err := net.FileListener(socketFile) ln, err := net.FileListener(socketFile)
if err != nil { if err != nil {
return nil, err return nil, err
} }
atomic.AddInt32(unixSocket.count, 1)
unixSockets[socketKey] = &unixListener{ln.(*net.UnixListener), socketKey, unixSocket.count}
// the old socket fd will likely be closed soon, so replace it in the map case *unixConn:
unixSockets[socketKey] = ln.(*net.UnixListener) pc, err := net.FilePacketConn(socketFile)
if err != nil {
return nil, err
}
atomic.AddInt32(unixSocket.count, 1)
unixSockets[socketKey] = &unixConn{pc.(*net.UnixConn), addr, socketKey, unixSocket.count}
}
return ln.(*net.UnixListener), nil return unixSockets[socketKey], nil
} }
// from what I can tell after some quick research, it's quite common for programs to // from what I can tell after some quick research, it's quite common for programs to
// leave their socket file behind after they close, so the typical pattern is to // leave their socket file behind after they close, so the typical pattern is to
// unlink it before you bind to it -- this is often crucial if the last program using // unlink it before you bind to it -- this is often crucial if the last program using
// it was killed forcefully without a chance to clean up the socket, but there is a // it was killed forcefully without a chance to clean up the socket, but there is a
// race, as the comment in net.UnixListener.close() explains... oh well? // race, as the comment in net.UnixListener.close() explains... oh well, I guess?
if err := syscall.Unlink(addr); err != nil && !errors.Is(err, fs.ErrNotExist) { if err := syscall.Unlink(addr); err != nil && !errors.Is(err, fs.ErrNotExist) {
return nil, err return nil, err
} }
return nil, nil
} }
config := &net.ListenConfig{Control: reusePort, KeepAlive: keepalivePeriod} func listenTCPOrUnix(ctx context.Context, lnKey string, network, address string, config net.ListenConfig) (net.Listener, error) {
// wrap any Control function set by the user so we can also add our reusePort control without clobbering theirs
ln, err := config.Listen(context.Background(), network, addr) oldControl := config.Control
if err != nil { config.Control = func(network, address string, c syscall.RawConn) error {
return nil, err if oldControl != nil {
if err := oldControl(network, address, c); err != nil {
return err
} }
if uln, ok := ln.(*net.UnixListener); ok {
// TODO: ideally, we should unlink the socket once we know we're done using it
// (i.e. either on exit or a new config that doesn't use this socket; in UsagePool
// terms, when the reference count reaches 0), but given that we unlink existing
// socket before we create the new one anyway (see above), we don't necessarily
// need to clean up after ourselves; still, doing so would probably be more tidy
uln.SetUnlinkOnClose(false)
unixSockets[socketKey] = uln
} }
return reusePort(network, address, c)
return ln, nil }
return config.Listen(ctx, network, address)
} }
// reusePort sets SO_REUSEPORT. Ineffective for unix sockets. // reusePort sets SO_REUSEPORT. Ineffective for unix sockets.
func reusePort(network, address string, conn syscall.RawConn) error { func reusePort(network, address string, conn syscall.RawConn) error {
if isUnixNetwork(network) {
return nil
}
return conn.Control(func(descriptor uintptr) { return conn.Control(func(descriptor uintptr) {
if err := unix.SetsockoptInt(int(descriptor), unix.SOL_SOCKET, unix.SO_REUSEPORT, 1); err != nil { if err := unix.SetsockoptInt(int(descriptor), unix.SOL_SOCKET, unix.SO_REUSEPORT, 1); err != nil {
Log().Error("setting SO_REUSEPORT", Log().Error("setting SO_REUSEPORT",
@ -106,10 +116,3 @@ func reusePort(network, address string, conn syscall.RawConn) error {
} }
}) })
} }
// unixSockets keeps track of the currently-active unix sockets
// so we can transfer their FDs gracefully during reloads.
var (
unixSockets = make(map[string]*net.UnixListener)
unixSocketsMu sync.Mutex
)

View file

@ -19,60 +19,154 @@ import (
"crypto/tls" "crypto/tls"
"errors" "errors"
"fmt" "fmt"
"io"
"net" "net"
"net/netip" "net/netip"
"os" "os"
"strconv" "strconv"
"strings" "strings"
"sync"
"sync/atomic" "sync/atomic"
"syscall" "syscall"
"time"
"github.com/lucas-clemente/quic-go" "github.com/lucas-clemente/quic-go"
"github.com/lucas-clemente/quic-go/http3" "github.com/lucas-clemente/quic-go/http3"
"go.uber.org/zap" "go.uber.org/zap"
) )
// Listen is like net.Listen, except Caddy's listeners can overlap // NetworkAddress represents one or more network addresses.
// each other: multiple listeners may be created on the same socket // It contains the individual components for a parsed network
// at the same time. This is useful because during config changes, // address of the form accepted by ParseNetworkAddress().
// the new config is started while the old config is still running. type NetworkAddress struct {
// When Caddy listeners are closed, the closing logic is virtualized // Should be a network value accepted by Go's net package or
// so the underlying socket isn't actually closed until all uses of // by a plugin providing a listener for that network type.
// the socket have been finished. Always be sure to close listeners Network string
// when you are done with them, just like normal listeners.
func Listen(network, addr string) (net.Listener, error) { // The "main" part of the network address is the host, which
// a 0 timeout means Go uses its default // often takes the form of a hostname, DNS name, IP address,
return ListenTimeout(network, addr, 0) // or socket path.
Host string
// For addresses that contain a port, ranges are given by
// [StartPort, EndPort]; i.e. for a single port, StartPort
// and EndPort are the same. For no port, they are 0.
StartPort uint
EndPort uint
} }
// getListenerFromPlugin returns a listener on the given network and address // ListenAll calls Listen() for all addresses represented by this struct, i.e. all ports in the range.
// if a plugin has registered the network name. It may return (nil, nil) if // (If the address doesn't use ports or has 1 port only, then only 1 listener will be created.)
// no plugin can provide a listener. // It returns an error if any listener failed to bind, and closes any listeners opened up to that point.
func getListenerFromPlugin(network, addr string) (net.Listener, error) { //
network = strings.TrimSpace(strings.ToLower(network)) // TODO: Experimental API: subject to change or removal.
func (na NetworkAddress) ListenAll(ctx context.Context, config net.ListenConfig) ([]any, error) {
var listeners []any
var err error
// get listener from plugin if network type is registered // if one of the addresses has a failure, we need to close
if getListener, ok := networkTypes[network]; ok { // any that did open a socket to avoid leaking resources
Log().Debug("getting listener from plugin", zap.String("network", network)) defer func() {
return getListener(network, addr) if err == nil {
return
}
for _, ln := range listeners {
if cl, ok := ln.(io.Closer); ok {
cl.Close()
}
}
}()
// an address can contain a port range, which represents multiple addresses;
// some addresses don't use ports at all and have a port range size of 1;
// whatever the case, iterate each address represented and bind a socket
for portOffset := uint(0); portOffset < na.PortRangeSize(); portOffset++ {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
} }
return nil, nil // create (or reuse) the listener ourselves
} var ln any
ln, err = na.Listen(ctx, portOffset, config)
// ListenPacket returns a net.PacketConn suitable for use in a Caddy module.
// It is like Listen except for PacketConns.
// Always be sure to close the PacketConn when you are done.
func ListenPacket(network, addr string) (net.PacketConn, error) {
lnKey := listenerKey(network, addr)
sharedPc, _, err := listenerPool.LoadOrNew(lnKey, func() (Destructor, error) {
pc, err := net.ListenPacket(network, addr)
if err != nil { if err != nil {
// https://github.com/caddyserver/caddy/pull/4534 return nil, err
if isUnixNetwork(network) && isListenBindAddressAlreadyInUseError(err) {
return nil, fmt.Errorf("%w: this can happen if Caddy was forcefully killed", err)
} }
listeners = append(listeners, ln)
}
return listeners, nil
}
// Listen is similar to net.Listen, with a few differences:
//
// Listen announces on the network address using the port calculated by adding
// portOffset to the start port. (For network types that do not use ports, the
// portOffset is ignored.)
//
// The provided ListenConfig is used to create the listener. Its Control function,
// if set, may be wrapped by an internally-used Control function. The provided
// context may be used to cancel long operations early. The context is not used
// to close the listener after it has been created.
//
// Caddy's listeners can overlap each other: multiple listeners may be created on
// the same socket at the same time. This is useful because during config changes,
// the new config is started while the old config is still running. How this is
// accomplished varies by platform and network type. For example, on Unix, SO_REUSEPORT
// is set except on Unix sockets, for which the file descriptor is duplicated and
// reused; on Windows, the close logic is virtualized using timeouts. Like normal
// listeners, be sure to Close() them when you are done.
//
// This method returns any type, as the implementations of listeners for various
// network types are not interchangeable. The type of listener returned is switched
// on the network type. Stream-based networks ("tcp", "unix", "unixpacket", etc.)
// return a net.Listener; datagram-based networks ("udp", "unixgram", etc.) return
// a net.PacketConn; and so forth. The actual concrete types are not guaranteed to
// be standard, exported types (wrapping is necessary to provide graceful reloads).
//
// Unix sockets will be unlinked before being created, to ensure we can bind to
// it even if the previous program using it exited uncleanly; it will also be
// unlinked upon a graceful exit (or when a new config does not use that socket).
//
// TODO: Experimental API: subject to change or removal.
func (na NetworkAddress) Listen(ctx context.Context, portOffset uint, config net.ListenConfig) (any, error) {
if na.IsUnixNetwork() {
unixSocketsMu.Lock()
defer unixSocketsMu.Unlock()
}
// check to see if plugin provides listener
if ln, err := getListenerFromPlugin(ctx, na.Network, na.JoinHostPort(portOffset), config); ln != nil || err != nil {
return ln, err
}
// create (or reuse) the listener ourselves
return na.listen(ctx, portOffset, config)
}
func (na NetworkAddress) listen(ctx context.Context, portOffset uint, config net.ListenConfig) (any, error) {
var ln any
var err error
address := na.JoinHostPort(portOffset)
// if this is a unix socket, see if we already have it open
if socket, err := reuseUnixSocket(na.Network, address); socket != nil || err != nil {
return socket, err
}
lnKey := listenerKey(na.Network, address)
switch na.Network {
case "tcp", "tcp4", "tcp6", "unix", "unixpacket":
ln, err = listenTCPOrUnix(ctx, lnKey, na.Network, address, config)
case "unixgram":
ln, err = config.ListenPacket(ctx, na.Network, address)
case "udp", "udp4", "udp6":
sharedPc, _, err := listenerPool.LoadOrNew(lnKey, func() (Destructor, error) {
pc, err := config.ListenPacket(ctx, na.Network, address)
if err != nil {
return nil, err return nil, err
} }
return &sharedPacketConn{PacketConn: pc, key: lnKey}, nil return &sharedPacketConn{PacketConn: pc, key: lnKey}, nil
@ -80,169 +174,32 @@ func ListenPacket(network, addr string) (net.PacketConn, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
ln = &fakeClosePacketConn{sharedPacketConn: sharedPc.(*sharedPacketConn)}
return &fakeClosePacketConn{sharedPacketConn: sharedPc.(*sharedPacketConn)}, nil
} }
if strings.HasPrefix(na.Network, "ip") {
// ListenQUIC returns a quic.EarlyListener suitable for use in a Caddy module. ln, err = config.ListenPacket(ctx, na.Network, address)
// Note that the context passed to Accept is currently ignored, so using
// a context other than context.Background is meaningless.
// This API is EXPERIMENTAL and may change.
func ListenQUIC(addr string, tlsConf *tls.Config, activeRequests *int64) (quic.EarlyListener, error) {
lnKey := listenerKey("udp", addr)
sharedEl, _, err := listenerPool.LoadOrNew(lnKey, func() (Destructor, error) {
el, err := quic.ListenAddrEarly(addr, http3.ConfigureTLSConfig(tlsConf), &quic.Config{
RequireAddressValidation: func(clientAddr net.Addr) bool {
var highLoad bool
if activeRequests != nil {
highLoad = atomic.LoadInt64(activeRequests) > 1000 // TODO: make tunable?
} }
return highLoad
},
})
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &sharedQuicListener{EarlyListener: el, key: lnKey}, nil if ln == nil {
}) return nil, fmt.Errorf("unsupported network type: %s", na.Network)
if err != nil {
return nil, err
} }
ctx, cancel := context.WithCancel(context.Background()) // if new listener is a unix socket, make sure we can reuse it later
return &fakeCloseQuicListener{ // (we do our own "unlink on close" -- not required, but more tidy)
sharedQuicListener: sharedEl.(*sharedQuicListener), one := int32(1)
context: ctx, switch unix := ln.(type) {
contextCancel: cancel, case *net.UnixListener:
}, nil unix.SetUnlinkOnClose(false)
ln = &unixListener{unix, lnKey, &one}
unixSockets[lnKey] = ln.(*unixListener)
case *net.UnixConn:
ln = &unixConn{unix, address, lnKey, &one}
unixSockets[lnKey] = ln.(*unixConn)
} }
// ListenerUsage returns the current usage count of the given listener address. return ln, nil
func ListenerUsage(network, addr string) int {
count, _ := listenerPool.References(listenerKey(network, addr))
return count
}
func listenerKey(network, addr string) string {
return network + "/" + addr
}
type fakeCloseQuicListener struct {
closed int32 // accessed atomically; belongs to this struct only
*sharedQuicListener // embedded, so we also become a quic.EarlyListener
context context.Context
contextCancel context.CancelFunc
}
// Currently Accept ignores the passed context, however a situation where
// someone would need a hotswappable QUIC-only (not http3, since it uses context.Background here)
// server on which Accept would be called with non-empty contexts
// (mind that the default net listeners' Accept doesn't take a context argument)
// sounds way too rare for us to sacrifice efficiency here.
func (fcql *fakeCloseQuicListener) Accept(_ context.Context) (quic.EarlyConnection, error) {
conn, err := fcql.sharedQuicListener.Accept(fcql.context)
if err == nil {
return conn, nil
}
// if the listener is "closed", return a fake closed error instead
if atomic.LoadInt32(&fcql.closed) == 1 && errors.Is(err, context.Canceled) {
return nil, fakeClosedErr(fcql)
}
return nil, err
}
func (fcql *fakeCloseQuicListener) Close() error {
if atomic.CompareAndSwapInt32(&fcql.closed, 0, 1) {
fcql.contextCancel()
_, _ = listenerPool.Delete(fcql.sharedQuicListener.key)
}
return nil
}
// fakeClosedErr returns an error value that is not temporary
// nor a timeout, suitable for making the caller think the
// listener is actually closed
func fakeClosedErr(l interface{ Addr() net.Addr }) error {
return &net.OpError{
Op: "accept",
Net: l.Addr().Network(),
Addr: l.Addr(),
Err: errFakeClosed,
}
}
// ErrFakeClosed is the underlying error value returned by
// fakeCloseListener.Accept() after Close() has been called,
// indicating that it is pretending to be closed so that the
// server using it can terminate, while the underlying
// socket is actually left open.
var errFakeClosed = fmt.Errorf("listener 'closed' 😉")
// fakeClosePacketConn is like fakeCloseListener, but for PacketConns.
type fakeClosePacketConn struct {
closed int32 // accessed atomically; belongs to this struct only
*sharedPacketConn // embedded, so we also become a net.PacketConn
}
func (fcpc *fakeClosePacketConn) Close() error {
if atomic.CompareAndSwapInt32(&fcpc.closed, 0, 1) {
_, _ = listenerPool.Delete(fcpc.sharedPacketConn.key)
}
return nil
}
// Supports QUIC implementation: https://github.com/caddyserver/caddy/issues/3998
func (fcpc fakeClosePacketConn) SetReadBuffer(bytes int) error {
if conn, ok := fcpc.PacketConn.(interface{ SetReadBuffer(int) error }); ok {
return conn.SetReadBuffer(bytes)
}
return fmt.Errorf("SetReadBuffer() not implemented for %T", fcpc.PacketConn)
}
// Supports QUIC implementation: https://github.com/caddyserver/caddy/issues/3998
func (fcpc fakeClosePacketConn) SyscallConn() (syscall.RawConn, error) {
if conn, ok := fcpc.PacketConn.(interface {
SyscallConn() (syscall.RawConn, error)
}); ok {
return conn.SyscallConn()
}
return nil, fmt.Errorf("SyscallConn() not implemented for %T", fcpc.PacketConn)
}
// sharedQuicListener is like sharedListener, but for quic.EarlyListeners.
type sharedQuicListener struct {
quic.EarlyListener
key string
}
// Destruct closes the underlying QUIC listener.
func (sql *sharedQuicListener) Destruct() error {
return sql.EarlyListener.Close()
}
// sharedPacketConn is like sharedListener, but for net.PacketConns.
type sharedPacketConn struct {
net.PacketConn
key string
}
// Destruct closes the underlying socket.
func (spc *sharedPacketConn) Destruct() error {
return spc.PacketConn.Close()
}
// NetworkAddress contains the individual components
// for a parsed network address of the form accepted
// by ParseNetworkAddress(). Network should be a
// network value accepted by Go's net package. Port
// ranges are given by [StartPort, EndPort].
type NetworkAddress struct {
Network string
Host string
StartPort uint
EndPort uint
} }
// IsUnixNetwork returns true if na.Network is // IsUnixNetwork returns true if na.Network is
@ -260,17 +217,27 @@ func (na NetworkAddress) JoinHostPort(offset uint) string {
return net.JoinHostPort(na.Host, strconv.Itoa(int(na.StartPort+offset))) return net.JoinHostPort(na.Host, strconv.Itoa(int(na.StartPort+offset)))
} }
// Expand returns one NetworkAddress for each port in the port range.
//
// This is EXPERIMENTAL and subject to change or removal.
func (na NetworkAddress) Expand() []NetworkAddress { func (na NetworkAddress) Expand() []NetworkAddress {
size := na.PortRangeSize() size := na.PortRangeSize()
addrs := make([]NetworkAddress, size) addrs := make([]NetworkAddress, size)
for portOffset := uint(0); portOffset < size; portOffset++ { for portOffset := uint(0); portOffset < size; portOffset++ {
na2 := na addrs[portOffset] = na.At(portOffset)
na2.StartPort, na2.EndPort = na.StartPort+portOffset, na.StartPort+portOffset
addrs[portOffset] = na2
} }
return addrs return addrs
} }
// At returns a NetworkAddress with a port range of just 1
// at the given port offset; i.e. a NetworkAddress that
// represents precisely 1 address only.
func (na NetworkAddress) At(portOffset uint) NetworkAddress {
na2 := na
na2.StartPort, na2.EndPort = na.StartPort+portOffset, na.StartPort+portOffset
return na2
}
// PortRangeSize returns how many ports are in // PortRangeSize returns how many ports are in
// pa's port range. Port ranges are inclusive, // pa's port range. Port ranges are inclusive,
// so the size is the difference of start and // so the size is the difference of start and
@ -326,20 +293,6 @@ func isUnixNetwork(netw string) bool {
return netw == "unix" || netw == "unixgram" || netw == "unixpacket" return netw == "unix" || netw == "unixgram" || netw == "unixpacket"
} }
func isListenBindAddressAlreadyInUseError(err error) bool {
switch networkOperationError := err.(type) {
case *net.OpError:
switch syscallError := networkOperationError.Err.(type) {
case *os.SyscallError:
if syscallError.Syscall == "bind" {
return true
}
}
}
return false
}
// ParseNetworkAddress parses addr into its individual // ParseNetworkAddress parses addr into its individual
// components. The input string is expected to be of // components. The input string is expected to be of
// the form "network/host:port-range" where any part is // the form "network/host:port-range" where any part is
@ -439,6 +392,209 @@ func JoinNetworkAddress(network, host, port string) string {
return a return a
} }
// DEPRECATED: Use NetworkAddress.Listen instead. This function will likely be changed or removed in the future.
func Listen(network, addr string) (net.Listener, error) {
// a 0 timeout means Go uses its default
return ListenTimeout(network, addr, 0)
}
// DEPRECATED: Use NetworkAddress.Listen instead. This function will likely be changed or removed in the future.
func ListenTimeout(network, addr string, keepalivePeriod time.Duration) (net.Listener, error) {
netAddr, err := ParseNetworkAddress(JoinNetworkAddress(network, addr, ""))
if err != nil {
return nil, err
}
ln, err := netAddr.Listen(context.TODO(), 0, net.ListenConfig{KeepAlive: keepalivePeriod})
if err != nil {
return nil, err
}
return ln.(net.Listener), nil
}
// DEPRECATED: Use NetworkAddress.Listen instead. This function will likely be changed or removed in the future.
func ListenPacket(network, addr string) (net.PacketConn, error) {
netAddr, err := ParseNetworkAddress(JoinNetworkAddress(network, addr, ""))
if err != nil {
return nil, err
}
ln, err := netAddr.Listen(context.TODO(), 0, net.ListenConfig{})
if err != nil {
return nil, err
}
return ln.(net.PacketConn), nil
}
// ListenQUIC returns a quic.EarlyListener suitable for use in a Caddy module.
// The network will be transformed into a QUIC-compatible type (if unix, then
// unixgram will be used; otherwise, udp will be used).
//
// NOTE: This API is EXPERIMENTAL and may be changed or removed.
//
// TODO: See if we can find a more elegant solution closer to the new NetworkAddress.Listen API.
func ListenQUIC(ln net.PacketConn, tlsConf *tls.Config, activeRequests *int64) (quic.EarlyListener, error) {
lnKey := listenerKey(ln.LocalAddr().Network(), ln.LocalAddr().String())
sharedEarlyListener, _, err := listenerPool.LoadOrNew(lnKey, func() (Destructor, error) {
earlyLn, err := quic.ListenEarly(ln, http3.ConfigureTLSConfig(tlsConf), &quic.Config{
RequireAddressValidation: func(clientAddr net.Addr) bool {
var highLoad bool
if activeRequests != nil {
highLoad = atomic.LoadInt64(activeRequests) > 1000 // TODO: make tunable?
}
return highLoad
},
})
if err != nil {
return nil, err
}
return &sharedQuicListener{EarlyListener: earlyLn, key: lnKey}, nil
})
if err != nil {
return nil, err
}
// TODO: to serve QUIC over a unix socket, currently we need to hold onto
// the underlying net.PacketConn (which we wrap as unixConn to keep count
// of closes) because closing the quic.EarlyListener doesn't actually close
// the underlying PacketConn, but we need to for unix sockets since we dup
// the file descriptor and thus need to close the original; track issue:
// https://github.com/lucas-clemente/quic-go/issues/3560#issuecomment-1258959608
var unix *unixConn
if uc, ok := ln.(*unixConn); ok {
unix = uc
}
ctx, cancel := context.WithCancel(context.Background())
return &fakeCloseQuicListener{
sharedQuicListener: sharedEarlyListener.(*sharedQuicListener),
uc: unix,
context: ctx,
contextCancel: cancel,
}, nil
}
// ListenerUsage returns the current usage count of the given listener address.
func ListenerUsage(network, addr string) int {
count, _ := listenerPool.References(listenerKey(network, addr))
return count
}
// sharedQuicListener is like sharedListener, but for quic.EarlyListeners.
type sharedQuicListener struct {
quic.EarlyListener
key string
}
// Destruct closes the underlying QUIC listener.
func (sql *sharedQuicListener) Destruct() error {
return sql.EarlyListener.Close()
}
// sharedPacketConn is like sharedListener, but for net.PacketConns.
type sharedPacketConn struct {
net.PacketConn
key string
}
// Destruct closes the underlying socket.
func (spc *sharedPacketConn) Destruct() error {
return spc.PacketConn.Close()
}
// fakeClosedErr returns an error value that is not temporary
// nor a timeout, suitable for making the caller think the
// listener is actually closed
func fakeClosedErr(l interface{ Addr() net.Addr }) error {
return &net.OpError{
Op: "accept",
Net: l.Addr().Network(),
Addr: l.Addr(),
Err: errFakeClosed,
}
}
// errFakeClosed is the underlying error value returned by
// fakeCloseListener.Accept() after Close() has been called,
// indicating that it is pretending to be closed so that the
// server using it can terminate, while the underlying
// socket is actually left open.
var errFakeClosed = fmt.Errorf("listener 'closed' 😉")
// fakeClosePacketConn is like fakeCloseListener, but for PacketConns.
type fakeClosePacketConn struct {
closed int32 // accessed atomically; belongs to this struct only
*sharedPacketConn // embedded, so we also become a net.PacketConn
}
func (fcpc *fakeClosePacketConn) Close() error {
if atomic.CompareAndSwapInt32(&fcpc.closed, 0, 1) {
_, _ = listenerPool.Delete(fcpc.sharedPacketConn.key)
}
return nil
}
// Supports QUIC implementation: https://github.com/caddyserver/caddy/issues/3998
func (fcpc fakeClosePacketConn) SetReadBuffer(bytes int) error {
if conn, ok := fcpc.PacketConn.(interface{ SetReadBuffer(int) error }); ok {
return conn.SetReadBuffer(bytes)
}
return fmt.Errorf("SetReadBuffer() not implemented for %T", fcpc.PacketConn)
}
// Supports QUIC implementation: https://github.com/caddyserver/caddy/issues/3998
func (fcpc fakeClosePacketConn) SyscallConn() (syscall.RawConn, error) {
if conn, ok := fcpc.PacketConn.(interface {
SyscallConn() (syscall.RawConn, error)
}); ok {
return conn.SyscallConn()
}
return nil, fmt.Errorf("SyscallConn() not implemented for %T", fcpc.PacketConn)
}
type fakeCloseQuicListener struct {
closed int32 // accessed atomically; belongs to this struct only
*sharedQuicListener // embedded, so we also become a quic.EarlyListener
uc *unixConn // underlying unix socket, if UDS
context context.Context
contextCancel context.CancelFunc
}
// Currently Accept ignores the passed context, however a situation where
// someone would need a hotswappable QUIC-only (not http3, since it uses context.Background here)
// server on which Accept would be called with non-empty contexts
// (mind that the default net listeners' Accept doesn't take a context argument)
// sounds way too rare for us to sacrifice efficiency here.
func (fcql *fakeCloseQuicListener) Accept(_ context.Context) (quic.EarlyConnection, error) {
conn, err := fcql.sharedQuicListener.Accept(fcql.context)
if err == nil {
return conn, nil
}
// if the listener is "closed", return a fake closed error instead
if atomic.LoadInt32(&fcql.closed) == 1 && errors.Is(err, context.Canceled) {
return nil, fakeClosedErr(fcql)
}
return nil, err
}
func (fcql *fakeCloseQuicListener) Close() error {
if atomic.CompareAndSwapInt32(&fcql.closed, 0, 1) {
fcql.contextCancel()
_, _ = listenerPool.Delete(fcql.sharedQuicListener.key)
if fcql.uc != nil {
// unix sockets need to be closed ourselves because we dup() the file
// descriptor when we reuse them, so this avoids a resource leak
fcql.uc.Close()
}
}
return nil
}
// RegisterNetwork registers a network type with Caddy so that if a listener is // RegisterNetwork registers a network type with Caddy so that if a listener is
// created for that network type, getListener will be invoked to get the listener. // created for that network type, getListener will be invoked to get the listener.
// This should be called during init() and will panic if the network type is standard // This should be called during init() and will panic if the network type is standard
@ -460,11 +616,77 @@ func RegisterNetwork(network string, getListener ListenerFunc) {
networkTypes[network] = getListener networkTypes[network] = getListener
} }
type unixListener struct {
*net.UnixListener
mapKey string
count *int32 // accessed atomically
}
func (uln *unixListener) Close() error {
newCount := atomic.AddInt32(uln.count, -1)
if newCount == 0 {
defer func() {
addr := uln.Addr().String()
unixSocketsMu.Lock()
delete(unixSockets, uln.mapKey)
unixSocketsMu.Unlock()
_ = syscall.Unlink(addr)
}()
}
return uln.UnixListener.Close()
}
type unixConn struct {
*net.UnixConn
filename string
mapKey string
count *int32 // accessed atomically
}
func (uc *unixConn) Close() error {
newCount := atomic.AddInt32(uc.count, -1)
if newCount == 0 {
defer func() {
unixSocketsMu.Lock()
delete(unixSockets, uc.mapKey)
unixSocketsMu.Unlock()
_ = syscall.Unlink(uc.filename)
}()
}
return uc.UnixConn.Close()
}
// unixSockets keeps track of the currently-active unix sockets
// so we can transfer their FDs gracefully during reloads.
var (
unixSockets = make(map[string]interface {
File() (*os.File, error)
})
unixSocketsMu sync.Mutex
)
// getListenerFromPlugin returns a listener on the given network and address
// if a plugin has registered the network name. It may return (nil, nil) if
// no plugin can provide a listener.
func getListenerFromPlugin(ctx context.Context, network, addr string, config net.ListenConfig) (any, error) {
// get listener from plugin if network type is registered
if getListener, ok := networkTypes[network]; ok {
Log().Debug("getting listener from plugin", zap.String("network", network))
return getListener(ctx, network, addr, config)
}
return nil, nil
}
func listenerKey(network, addr string) string {
return network + "/" + addr
}
// ListenerFunc is a function that can return a listener given a network and address. // ListenerFunc is a function that can return a listener given a network and address.
// The listeners must be capable of overlapping: with Caddy, new configs are loaded // The listeners must be capable of overlapping: with Caddy, new configs are loaded
// before old ones are unloaded, so listeners may overlap briefly if the configs // before old ones are unloaded, so listeners may overlap briefly if the configs
// both need the same listener. EXPERIMENTAL and subject to change. // both need the same listener. EXPERIMENTAL and subject to change.
type ListenerFunc func(network, addr string) (net.Listener, error) type ListenerFunc func(ctx context.Context, network, addr string, cfg net.ListenConfig) (any, error)
var networkTypes = map[string]ListenerFunc{} var networkTypes = map[string]ListenerFunc{}

View file

@ -18,6 +18,7 @@ import (
"context" "context"
"crypto/tls" "crypto/tls"
"fmt" "fmt"
"net"
"net/http" "net/http"
"strconv" "strconv"
"sync" "sync"
@ -387,10 +388,11 @@ func (app *App) Start() error {
for portOffset := uint(0); portOffset < listenAddr.PortRangeSize(); portOffset++ { for portOffset := uint(0); portOffset < listenAddr.PortRangeSize(); portOffset++ {
// create the listener for this socket // create the listener for this socket
hostport := listenAddr.JoinHostPort(portOffset) hostport := listenAddr.JoinHostPort(portOffset)
ln, err := caddy.ListenTimeout(listenAddr.Network, hostport, time.Duration(srv.KeepAliveInterval)) lnAny, err := listenAddr.Listen(app.ctx, portOffset, net.ListenConfig{KeepAlive: time.Duration(srv.KeepAliveInterval)})
if err != nil { if err != nil {
return fmt.Errorf("%s: listening on %s: %v", listenAddr.Network, hostport, err) return fmt.Errorf("listening on %s: %v", listenAddr.At(portOffset), err)
} }
ln := lnAny.(net.Listener)
// wrap listener before TLS (up to the TLS placeholder wrapper) // wrap listener before TLS (up to the TLS placeholder wrapper)
var lnWrapperIdx int var lnWrapperIdx int
@ -409,13 +411,30 @@ func (app *App) Start() error {
ln = tls.NewListener(ln, tlsCfg) ln = tls.NewListener(ln, tlsCfg)
// enable HTTP/3 if configured // enable HTTP/3 if configured
if srv.protocol("h3") && !listenAddr.IsUnixNetwork() { if srv.protocol("h3") {
// Can't serve HTTP/3 on the same socket as HTTP/1 and 2 because it uses
// a different transport mechanism... which is fine, but the OS doesn't
// differentiate between a SOCK_STREAM file and a SOCK_DGRAM file; they
// are still one file on the system. So even though "unixpacket" and
// "unixgram" are different network types just as "tcp" and "udp" are,
// the OS will not let us use the same file as both STREAM and DGRAM.
if len(srv.Protocols) > 1 && listenAddr.IsUnixNetwork() {
app.logger.Warn("HTTP/3 disabled because Unix can't multiplex STREAM and DGRAM on same socket",
zap.String("file", hostport))
for i := range srv.Protocols {
if srv.Protocols[i] == "h3" {
srv.Protocols = append(srv.Protocols[:i], srv.Protocols[i+1:]...)
break
}
}
} else {
app.logger.Info("enabling HTTP/3 listener", zap.String("addr", hostport)) app.logger.Info("enabling HTTP/3 listener", zap.String("addr", hostport))
if err := srv.serveHTTP3(hostport, tlsCfg); err != nil { if err := srv.serveHTTP3(listenAddr.At(portOffset), tlsCfg); err != nil {
return err return err
} }
} }
} }
}
// finish wrapping listener where we left off before TLS // finish wrapping listener where we left off before TLS
for i := lnWrapperIdx; i < len(srv.listenerWrappers); i++ { for i := lnWrapperIdx; i < len(srv.listenerWrappers); i++ {
@ -424,11 +443,10 @@ func (app *App) Start() error {
// if binding to port 0, the OS chooses a port for us; // if binding to port 0, the OS chooses a port for us;
// but the user won't know the port unless we print it // but the user won't know the port unless we print it
if listenAddr.StartPort == 0 && listenAddr.EndPort == 0 { if !listenAddr.IsUnixNetwork() && listenAddr.StartPort == 0 && listenAddr.EndPort == 0 {
app.logger.Info("port 0 listener", app.logger.Info("port 0 listener",
zap.String("input_address", lnAddr), zap.String("input_address", lnAddr),
zap.String("actual_address", ln.Addr().String()), zap.String("actual_address", ln.Addr().String()))
)
} }
app.logger.Debug("starting server loop", app.logger.Debug("starting server loop",
@ -533,6 +551,18 @@ func (app *App) Stop() error {
if server.h3server == nil { if server.h3server == nil {
return return
} }
// TODO: we have to manually close our listeners because quic-go won't
// close listeners it didn't create along with the server itself...
// see https://github.com/lucas-clemente/quic-go/issues/3560
for _, el := range server.h3listeners {
if err := el.Close(); err != nil {
app.logger.Error("HTTP/3 listener close",
zap.Error(err),
zap.String("address", el.LocalAddr().String()))
}
}
// TODO: CloseGracefully, once implemented upstream (see https://github.com/lucas-clemente/quic-go/issues/2103) // TODO: CloseGracefully, once implemented upstream (see https://github.com/lucas-clemente/quic-go/issues/2103)
if err := server.h3server.Close(); err != nil { if err := server.h3server.Close(); err != nil {
app.logger.Error("HTTP/3 server shutdown", app.logger.Error("HTTP/3 server shutdown",

View file

@ -156,7 +156,9 @@ type (
MatchHeaderRE map[string]*MatchRegexp MatchHeaderRE map[string]*MatchRegexp
// MatchProtocol matches requests by protocol. Recognized values are // MatchProtocol matches requests by protocol. Recognized values are
// "http", "https", and "grpc". // "http", "https", and "grpc" for broad protocol matches, or specific
// HTTP versions can be specified like so: "http/1", "http/1.1",
// "http/2", "http/3", or minimum versions: "http/2+", etc.
MatchProtocol string MatchProtocol string
// MatchRemoteIP matches requests by client IP (or CIDR range). // MatchRemoteIP matches requests by client IP (or CIDR range).

View file

@ -172,6 +172,7 @@ type Server struct {
server *http.Server server *http.Server
h3server *http3.Server h3server *http3.Server
h3listeners []net.PacketConn // TODO: we have to hold these because quic-go won't close listeners it didn't create
addresses []caddy.NetworkAddress addresses []caddy.NetworkAddress
shutdownAt time.Time shutdownAt time.Time
@ -193,11 +194,13 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
atomic.AddInt64(&s.activeRequests, 1) atomic.AddInt64(&s.activeRequests, 1)
defer atomic.AddInt64(&s.activeRequests, -1) defer atomic.AddInt64(&s.activeRequests, -1)
if r.ProtoMajor < 3 {
err := s.h3server.SetQuicHeaders(w.Header()) err := s.h3server.SetQuicHeaders(w.Header())
if err != nil { if err != nil {
s.logger.Error("setting HTTP/3 Alt-Svc header", zap.Error(err)) s.logger.Error("setting HTTP/3 Alt-Svc header", zap.Error(err))
} }
} }
}
// reject very long methods; probably a mistake or an attack // reject very long methods; probably a mistake or an attack
if len(r.Method) > 32 { if len(r.Method) > 32 {
@ -493,8 +496,27 @@ func (s *Server) findLastRouteWithHostMatcher() int {
// serveHTTP3 creates a QUIC listener, configures an HTTP/3 server if // serveHTTP3 creates a QUIC listener, configures an HTTP/3 server if
// not already done, and then uses that server to serve HTTP/3 over // not already done, and then uses that server to serve HTTP/3 over
// the listener, with Server s as the handler. // the listener, with Server s as the handler.
func (s *Server) serveHTTP3(hostport string, tlsCfg *tls.Config) error { func (s *Server) serveHTTP3(addr caddy.NetworkAddress, tlsCfg *tls.Config) error {
h3ln, err := caddy.ListenQUIC(hostport, tlsCfg, &s.activeRequests) switch addr.Network {
case "unix":
addr.Network = "unixgram"
case "tcp":
addr.Network = "udp"
case "tcp4":
addr.Network = "udp4"
case "tcp6":
addr.Network = "udp6"
default:
return fmt.Errorf("unsure what network to use for HTTP/3 given network type: %s", addr.Network)
}
lnAny, err := addr.Listen(s.ctx, 0, net.ListenConfig{})
if err != nil {
return err
}
ln := lnAny.(net.PacketConn)
h3ln, err := caddy.ListenQUIC(ln, tlsCfg, &s.activeRequests)
if err != nil { if err != nil {
return fmt.Errorf("starting HTTP/3 QUIC listener: %v", err) return fmt.Errorf("starting HTTP/3 QUIC listener: %v", err)
} }
@ -512,6 +534,8 @@ func (s *Server) serveHTTP3(hostport string, tlsCfg *tls.Config) error {
} }
} }
s.h3listeners = append(s.h3listeners, lnAny.(net.PacketConn))
//nolint:errcheck //nolint:errcheck
go s.h3server.ServeListener(h3ln) go s.h3server.ServeListener(h3ln)