src

Go monorepo.
git clone git://code.dwrz.net/src
Log | Files | Refs

backend_inotify.go (15066B)


      1 //go:build linux && !appengine
      2 
      3 package fsnotify
      4 
      5 import (
      6 	"errors"
      7 	"fmt"
      8 	"io"
      9 	"io/fs"
     10 	"os"
     11 	"path/filepath"
     12 	"strings"
     13 	"sync"
     14 	"time"
     15 	"unsafe"
     16 
     17 	"github.com/fsnotify/fsnotify/internal"
     18 	"golang.org/x/sys/unix"
     19 )
     20 
     21 type inotify struct {
     22 	*shared
     23 	Events chan Event
     24 	Errors chan error
     25 
     26 	// Store fd here as os.File.Read() will no longer return on close after
     27 	// calling Fd(). See: https://github.com/golang/go/issues/26439
     28 	fd          int
     29 	inotifyFile *os.File
     30 	watches     *watches
     31 	doneResp    chan struct{} // Channel to respond to Close
     32 
     33 	// Store rename cookies in an array, with the index wrapping to 0. Almost
     34 	// all of the time what we get is a MOVED_FROM to set the cookie and the
     35 	// next event inotify sends will be MOVED_TO to read it. However, this is
     36 	// not guaranteed – as described in inotify(7) – and we may get other events
     37 	// between the two MOVED_* events (including other MOVED_* ones).
     38 	//
     39 	// A second issue is that moving a file outside the watched directory will
     40 	// trigger a MOVED_FROM to set the cookie, but we never see the MOVED_TO to
     41 	// read and delete it. So just storing it in a map would slowly leak memory.
     42 	//
     43 	// Doing it like this gives us a simple fast LRU-cache that won't allocate.
     44 	// Ten items should be more than enough for our purpose, and a loop over
     45 	// such a short array is faster than a map access anyway (not that it hugely
     46 	// matters since we're talking about hundreds of ns at the most, but still).
     47 	cookies     [10]koekje
     48 	cookieIndex uint8
     49 	cookiesMu   sync.Mutex
     50 }
     51 
     52 type (
     53 	watches struct {
     54 		wd   map[uint32]*watch // wd → watch
     55 		path map[string]uint32 // pathname → wd
     56 	}
     57 	watch struct {
     58 		wd      uint32 // Watch descriptor (as returned by the inotify_add_watch() syscall)
     59 		flags   uint32 // inotify flags of this watch (see inotify(7) for the list of valid flags)
     60 		path    string // Watch path.
     61 		recurse bool   // Recursion with ./...?
     62 	}
     63 	koekje struct {
     64 		cookie uint32
     65 		path   string
     66 	}
     67 )
     68 
     69 func newWatches() *watches {
     70 	return &watches{
     71 		wd:   make(map[uint32]*watch),
     72 		path: make(map[string]uint32),
     73 	}
     74 }
     75 
     76 func (w *watches) byPath(path string) *watch { return w.wd[w.path[path]] }
     77 func (w *watches) byWd(wd uint32) *watch     { return w.wd[wd] }
     78 func (w *watches) len() int                  { return len(w.wd) }
     79 func (w *watches) add(ww *watch)             { w.wd[ww.wd] = ww; w.path[ww.path] = ww.wd }
     80 func (w *watches) remove(watch *watch)       { delete(w.path, watch.path); delete(w.wd, watch.wd) }
     81 
     82 func (w *watches) removePath(path string) ([]uint32, error) {
     83 	path, recurse := recursivePath(path)
     84 	wd, ok := w.path[path]
     85 	if !ok {
     86 		return nil, fmt.Errorf("%w: %s", ErrNonExistentWatch, path)
     87 	}
     88 
     89 	watch := w.wd[wd]
     90 	if recurse && !watch.recurse {
     91 		return nil, fmt.Errorf("can't use /... with non-recursive watch %q", path)
     92 	}
     93 
     94 	delete(w.path, path)
     95 	delete(w.wd, wd)
     96 	if !watch.recurse {
     97 		return []uint32{wd}, nil
     98 	}
     99 
    100 	wds := make([]uint32, 0, 8)
    101 	wds = append(wds, wd)
    102 	for p, rwd := range w.path {
    103 		if strings.HasPrefix(p, path) {
    104 			delete(w.path, p)
    105 			delete(w.wd, rwd)
    106 			wds = append(wds, rwd)
    107 		}
    108 	}
    109 	return wds, nil
    110 }
    111 
    112 func (w *watches) updatePath(path string, f func(*watch) (*watch, error)) error {
    113 	var existing *watch
    114 	wd, ok := w.path[path]
    115 	if ok {
    116 		existing = w.wd[wd]
    117 	}
    118 
    119 	upd, err := f(existing)
    120 	if err != nil {
    121 		return err
    122 	}
    123 	if upd != nil {
    124 		w.wd[upd.wd] = upd
    125 		w.path[upd.path] = upd.wd
    126 
    127 		if upd.wd != wd {
    128 			delete(w.wd, wd)
    129 		}
    130 	}
    131 
    132 	return nil
    133 }
    134 
    135 var defaultBufferSize = 0
    136 
    137 func newBackend(ev chan Event, errs chan error) (backend, error) {
    138 	// Need to set nonblocking mode for SetDeadline to work, otherwise blocking
    139 	// I/O operations won't terminate on close.
    140 	fd, errno := unix.InotifyInit1(unix.IN_CLOEXEC | unix.IN_NONBLOCK)
    141 	if fd == -1 {
    142 		return nil, errno
    143 	}
    144 
    145 	w := &inotify{
    146 		shared:      newShared(ev, errs),
    147 		Events:      ev,
    148 		Errors:      errs,
    149 		fd:          fd,
    150 		inotifyFile: os.NewFile(uintptr(fd), ""),
    151 		watches:     newWatches(),
    152 		doneResp:    make(chan struct{}),
    153 	}
    154 
    155 	go w.readEvents()
    156 	return w, nil
    157 }
    158 
    159 func (w *inotify) Close() error {
    160 	if w.shared.close() {
    161 		return nil
    162 	}
    163 
    164 	// Causes any blocking reads to return with an error, provided the file
    165 	// still supports deadline operations.
    166 	err := w.inotifyFile.Close()
    167 	if err != nil {
    168 		return err
    169 	}
    170 
    171 	<-w.doneResp // Wait for readEvents() to finish.
    172 	return nil
    173 }
    174 
    175 func (w *inotify) Add(name string) error { return w.AddWith(name) }
    176 
    177 func (w *inotify) AddWith(path string, opts ...addOpt) error {
    178 	if w.isClosed() {
    179 		return ErrClosed
    180 	}
    181 	if debug {
    182 		fmt.Fprintf(os.Stderr, "FSNOTIFY_DEBUG: %s  AddWith(%q)\n",
    183 			time.Now().Format("15:04:05.000000000"), path)
    184 	}
    185 
    186 	with := getOptions(opts...)
    187 	if !w.xSupports(with.op) {
    188 		return fmt.Errorf("%w: %s", xErrUnsupported, with.op)
    189 	}
    190 
    191 	add := func(path string, with withOpts, recurse bool) error {
    192 		var flags uint32
    193 		if with.noFollow {
    194 			flags |= unix.IN_DONT_FOLLOW
    195 		}
    196 		if with.op.Has(Create) {
    197 			flags |= unix.IN_CREATE
    198 		}
    199 		if with.op.Has(Write) {
    200 			flags |= unix.IN_MODIFY
    201 		}
    202 		if with.op.Has(Remove) {
    203 			flags |= unix.IN_DELETE | unix.IN_DELETE_SELF
    204 		}
    205 		if with.op.Has(Rename) {
    206 			flags |= unix.IN_MOVED_TO | unix.IN_MOVED_FROM | unix.IN_MOVE_SELF
    207 		}
    208 		if with.op.Has(Chmod) {
    209 			flags |= unix.IN_ATTRIB
    210 		}
    211 		if with.op.Has(xUnportableOpen) {
    212 			flags |= unix.IN_OPEN
    213 		}
    214 		if with.op.Has(xUnportableRead) {
    215 			flags |= unix.IN_ACCESS
    216 		}
    217 		if with.op.Has(xUnportableCloseWrite) {
    218 			flags |= unix.IN_CLOSE_WRITE
    219 		}
    220 		if with.op.Has(xUnportableCloseRead) {
    221 			flags |= unix.IN_CLOSE_NOWRITE
    222 		}
    223 		return w.register(path, flags, recurse)
    224 	}
    225 
    226 	w.mu.Lock()
    227 	defer w.mu.Unlock()
    228 	path, recurse := recursivePath(path)
    229 	if recurse {
    230 		return filepath.WalkDir(path, func(root string, d fs.DirEntry, err error) error {
    231 			if err != nil {
    232 				return err
    233 			}
    234 			if !d.IsDir() {
    235 				if root == path {
    236 					return fmt.Errorf("fsnotify: not a directory: %q", path)
    237 				}
    238 				return nil
    239 			}
    240 
    241 			// Send a Create event when adding new directory from a recursive
    242 			// watch; this is for "mkdir -p one/two/three". Usually all those
    243 			// directories will be created before we can set up watchers on the
    244 			// subdirectories, so only "one" would be sent as a Create event and
    245 			// not "one/two" and "one/two/three" (inotifywait -r has the same
    246 			// problem).
    247 			if with.sendCreate && root != path {
    248 				w.sendEvent(Event{Name: root, Op: Create})
    249 			}
    250 
    251 			return add(root, with, true)
    252 		})
    253 	}
    254 
    255 	return add(path, with, false)
    256 }
    257 
    258 func (w *inotify) register(path string, flags uint32, recurse bool) error {
    259 	return w.watches.updatePath(path, func(existing *watch) (*watch, error) {
    260 		if existing != nil {
    261 			flags |= existing.flags | unix.IN_MASK_ADD
    262 		}
    263 
    264 		wd, err := unix.InotifyAddWatch(w.fd, path, flags)
    265 		if wd == -1 {
    266 			return nil, err
    267 		}
    268 
    269 		if e, ok := w.watches.wd[uint32(wd)]; ok {
    270 			return e, nil
    271 		}
    272 
    273 		if existing == nil {
    274 			return &watch{
    275 				wd:      uint32(wd),
    276 				path:    path,
    277 				flags:   flags,
    278 				recurse: recurse,
    279 			}, nil
    280 		}
    281 
    282 		existing.wd = uint32(wd)
    283 		existing.flags = flags
    284 		return existing, nil
    285 	})
    286 }
    287 
    288 func (w *inotify) Remove(name string) error {
    289 	if w.isClosed() {
    290 		return nil
    291 	}
    292 	if debug {
    293 		fmt.Fprintf(os.Stderr, "FSNOTIFY_DEBUG: %s  Remove(%q)\n",
    294 			time.Now().Format("15:04:05.000000000"), name)
    295 	}
    296 
    297 	w.mu.Lock()
    298 	defer w.mu.Unlock()
    299 	return w.remove(filepath.Clean(name))
    300 }
    301 
    302 func (w *inotify) remove(name string) error {
    303 	wds, err := w.watches.removePath(name)
    304 	if err != nil {
    305 		return err
    306 	}
    307 
    308 	for _, wd := range wds {
    309 		_, err := unix.InotifyRmWatch(w.fd, wd)
    310 		if err != nil {
    311 			// TODO: Perhaps it's not helpful to return an error here in every
    312 			// case; the only two possible errors are:
    313 			//
    314 			// EBADF, which happens when w.fd is not a valid file descriptor of
    315 			// any kind.
    316 			//
    317 			// EINVAL, which is when fd is not an inotify descriptor or wd is
    318 			// not a valid watch descriptor. Watch descriptors are invalidated
    319 			// when they are removed explicitly or implicitly; explicitly by
    320 			// inotify_rm_watch, implicitly when the file they are watching is
    321 			// deleted.
    322 			return err
    323 		}
    324 	}
    325 	return nil
    326 }
    327 
    328 func (w *inotify) WatchList() []string {
    329 	if w.isClosed() {
    330 		return nil
    331 	}
    332 
    333 	w.mu.Lock()
    334 	defer w.mu.Unlock()
    335 	entries := make([]string, 0, w.watches.len())
    336 	for pathname := range w.watches.path {
    337 		entries = append(entries, pathname)
    338 	}
    339 	return entries
    340 }
    341 
    342 // readEvents reads from the inotify file descriptor, converts the
    343 // received events into Event objects and sends them via the Events channel
    344 func (w *inotify) readEvents() {
    345 	defer func() {
    346 		close(w.doneResp)
    347 		close(w.Errors)
    348 		close(w.Events)
    349 	}()
    350 
    351 	var buf [unix.SizeofInotifyEvent * 4096]byte // Buffer for a maximum of 4096 raw events
    352 	for {
    353 		if w.isClosed() {
    354 			return
    355 		}
    356 
    357 		n, err := w.inotifyFile.Read(buf[:])
    358 		if err != nil {
    359 			if errors.Is(err, os.ErrClosed) {
    360 				return
    361 			}
    362 			if !w.sendError(err) {
    363 				return
    364 			}
    365 			continue
    366 		}
    367 
    368 		if n < unix.SizeofInotifyEvent {
    369 			err := errors.New("notify: short read in readEvents()") // Read was too short.
    370 			if n == 0 {
    371 				err = io.EOF // If EOF is received. This should really never happen.
    372 			}
    373 			if !w.sendError(err) {
    374 				return
    375 			}
    376 			continue
    377 		}
    378 
    379 		// We don't know how many events we just read into the buffer While the
    380 		// offset points to at least one whole event.
    381 		var offset uint32
    382 		for offset <= uint32(n-unix.SizeofInotifyEvent) {
    383 			// Point to the event in the buffer.
    384 			inEvent := (*unix.InotifyEvent)(unsafe.Pointer(&buf[offset]))
    385 
    386 			if inEvent.Mask&unix.IN_Q_OVERFLOW != 0 {
    387 				if !w.sendError(ErrEventOverflow) {
    388 					return
    389 				}
    390 			}
    391 
    392 			ev, ok := w.handleEvent(inEvent, &buf, offset)
    393 			if !ok {
    394 				return
    395 			}
    396 			if !w.sendEvent(ev) {
    397 				return
    398 			}
    399 
    400 			// Move to the next event in the buffer
    401 			offset += unix.SizeofInotifyEvent + inEvent.Len
    402 		}
    403 	}
    404 }
    405 
    406 func (w *inotify) handleEvent(inEvent *unix.InotifyEvent, buf *[65536]byte, offset uint32) (Event, bool) {
    407 	w.mu.Lock()
    408 	defer w.mu.Unlock()
    409 
    410 	/// If the event happened to the watched directory or the watched file, the
    411 	/// kernel doesn't append the filename to the event, but we would like to
    412 	/// always fill the the "Name" field with a valid filename. We retrieve the
    413 	/// path of the watch from the "paths" map.
    414 	///
    415 	/// Can be nil if Remove() was called in another goroutine for this path
    416 	/// inbetween reading the events from the kernel and reading the internal
    417 	/// state. Not much we can do about it, so just skip. See #616.
    418 	watch := w.watches.byWd(uint32(inEvent.Wd))
    419 	if watch == nil {
    420 		return Event{}, true
    421 	}
    422 
    423 	var (
    424 		name    = watch.path
    425 		nameLen = uint32(inEvent.Len)
    426 	)
    427 	if nameLen > 0 {
    428 		/// Point "bytes" at the first byte of the filename
    429 		bb := *buf
    430 		bytes := (*[unix.PathMax]byte)(unsafe.Pointer(&bb[offset+unix.SizeofInotifyEvent]))[:nameLen:nameLen]
    431 		/// The filename is padded with NULL bytes. TrimRight() gets rid of those.
    432 		name += "/" + strings.TrimRight(string(bytes[0:nameLen]), "\x00")
    433 	}
    434 
    435 	if debug {
    436 		internal.Debug(name, inEvent.Mask, inEvent.Cookie)
    437 	}
    438 
    439 	if inEvent.Mask&unix.IN_IGNORED != 0 || inEvent.Mask&unix.IN_UNMOUNT != 0 {
    440 		w.watches.remove(watch)
    441 		return Event{}, true
    442 	}
    443 
    444 	// inotify will automatically remove the watch on deletes; just need
    445 	// to clean our state here.
    446 	if inEvent.Mask&unix.IN_DELETE_SELF == unix.IN_DELETE_SELF {
    447 		w.watches.remove(watch)
    448 	}
    449 
    450 	// We can't really update the state when a watched path is moved; only
    451 	// IN_MOVE_SELF is sent and not IN_MOVED_{FROM,TO}. So remove the watch.
    452 	if inEvent.Mask&unix.IN_MOVE_SELF == unix.IN_MOVE_SELF {
    453 		if watch.recurse { // Do nothing
    454 			return Event{}, true
    455 		}
    456 
    457 		err := w.remove(watch.path)
    458 		if err != nil && !errors.Is(err, ErrNonExistentWatch) {
    459 			if !w.sendError(err) {
    460 				return Event{}, false
    461 			}
    462 		}
    463 	}
    464 
    465 	/// Skip if we're watching both this path and the parent; the parent will
    466 	/// already send a delete so no need to do it twice.
    467 	if inEvent.Mask&unix.IN_DELETE_SELF != 0 {
    468 		_, ok := w.watches.path[filepath.Dir(watch.path)]
    469 		if ok {
    470 			return Event{}, true
    471 		}
    472 	}
    473 
    474 	ev := w.newEvent(name, inEvent.Mask, inEvent.Cookie)
    475 	// Need to update watch path for recurse.
    476 	if watch.recurse {
    477 		isDir := inEvent.Mask&unix.IN_ISDIR == unix.IN_ISDIR
    478 		/// New directory created: set up watch on it.
    479 		if isDir && ev.Has(Create) {
    480 			err := w.register(ev.Name, watch.flags, true)
    481 			if !w.sendError(err) {
    482 				return Event{}, false
    483 			}
    484 
    485 			// This was a directory rename, so we need to update all the
    486 			// children.
    487 			//
    488 			// TODO: this is of course pretty slow; we should use a better data
    489 			// structure for storing all of this, e.g. store children in the
    490 			// watch. I have some code for this in my kqueue refactor we can use
    491 			// in the future. For now I'm okay with this as it's not publicly
    492 			// available. Correctness first, performance second.
    493 			if ev.renamedFrom != "" {
    494 				for k, ww := range w.watches.wd {
    495 					if k == watch.wd || ww.path == ev.Name {
    496 						continue
    497 					}
    498 					if strings.HasPrefix(ww.path, ev.renamedFrom) {
    499 						ww.path = strings.Replace(ww.path, ev.renamedFrom, ev.Name, 1)
    500 						w.watches.wd[k] = ww
    501 					}
    502 				}
    503 			}
    504 		}
    505 	}
    506 
    507 	return ev, true
    508 }
    509 
    510 func (w *inotify) isRecursive(path string) bool {
    511 	ww := w.watches.byPath(path)
    512 	if ww == nil { // path could be a file, so also check the Dir.
    513 		ww = w.watches.byPath(filepath.Dir(path))
    514 	}
    515 	return ww != nil && ww.recurse
    516 }
    517 
    518 func (w *inotify) newEvent(name string, mask, cookie uint32) Event {
    519 	e := Event{Name: name}
    520 	if mask&unix.IN_CREATE == unix.IN_CREATE || mask&unix.IN_MOVED_TO == unix.IN_MOVED_TO {
    521 		e.Op |= Create
    522 	}
    523 	if mask&unix.IN_DELETE_SELF == unix.IN_DELETE_SELF || mask&unix.IN_DELETE == unix.IN_DELETE {
    524 		e.Op |= Remove
    525 	}
    526 	if mask&unix.IN_MODIFY == unix.IN_MODIFY {
    527 		e.Op |= Write
    528 	}
    529 	if mask&unix.IN_OPEN == unix.IN_OPEN {
    530 		e.Op |= xUnportableOpen
    531 	}
    532 	if mask&unix.IN_ACCESS == unix.IN_ACCESS {
    533 		e.Op |= xUnportableRead
    534 	}
    535 	if mask&unix.IN_CLOSE_WRITE == unix.IN_CLOSE_WRITE {
    536 		e.Op |= xUnportableCloseWrite
    537 	}
    538 	if mask&unix.IN_CLOSE_NOWRITE == unix.IN_CLOSE_NOWRITE {
    539 		e.Op |= xUnportableCloseRead
    540 	}
    541 	if mask&unix.IN_MOVE_SELF == unix.IN_MOVE_SELF || mask&unix.IN_MOVED_FROM == unix.IN_MOVED_FROM {
    542 		e.Op |= Rename
    543 	}
    544 	if mask&unix.IN_ATTRIB == unix.IN_ATTRIB {
    545 		e.Op |= Chmod
    546 	}
    547 
    548 	if cookie != 0 {
    549 		if mask&unix.IN_MOVED_FROM == unix.IN_MOVED_FROM {
    550 			w.cookiesMu.Lock()
    551 			w.cookies[w.cookieIndex] = koekje{cookie: cookie, path: e.Name}
    552 			w.cookieIndex++
    553 			if w.cookieIndex > 9 {
    554 				w.cookieIndex = 0
    555 			}
    556 			w.cookiesMu.Unlock()
    557 		} else if mask&unix.IN_MOVED_TO == unix.IN_MOVED_TO {
    558 			w.cookiesMu.Lock()
    559 			var prev string
    560 			for _, c := range w.cookies {
    561 				if c.cookie == cookie {
    562 					prev = c.path
    563 					break
    564 				}
    565 			}
    566 			w.cookiesMu.Unlock()
    567 			e.renamedFrom = prev
    568 		}
    569 	}
    570 	return e
    571 }
    572 
    573 func (w *inotify) xSupports(op Op) bool {
    574 	return true // Supports everything.
    575 }
    576 
    577 func (w *inotify) state() {
    578 	w.mu.Lock()
    579 	defer w.mu.Unlock()
    580 	for wd, ww := range w.watches.wd {
    581 		fmt.Fprintf(os.Stderr, "%4d: recurse=%t %q\n", wd, ww.recurse, ww.path)
    582 	}
    583 }