backend_inotify.go (15066B)
1 //go:build linux && !appengine 2 3 package fsnotify 4 5 import ( 6 "errors" 7 "fmt" 8 "io" 9 "io/fs" 10 "os" 11 "path/filepath" 12 "strings" 13 "sync" 14 "time" 15 "unsafe" 16 17 "github.com/fsnotify/fsnotify/internal" 18 "golang.org/x/sys/unix" 19 ) 20 21 type inotify struct { 22 *shared 23 Events chan Event 24 Errors chan error 25 26 // Store fd here as os.File.Read() will no longer return on close after 27 // calling Fd(). See: https://github.com/golang/go/issues/26439 28 fd int 29 inotifyFile *os.File 30 watches *watches 31 doneResp chan struct{} // Channel to respond to Close 32 33 // Store rename cookies in an array, with the index wrapping to 0. Almost 34 // all of the time what we get is a MOVED_FROM to set the cookie and the 35 // next event inotify sends will be MOVED_TO to read it. However, this is 36 // not guaranteed – as described in inotify(7) – and we may get other events 37 // between the two MOVED_* events (including other MOVED_* ones). 38 // 39 // A second issue is that moving a file outside the watched directory will 40 // trigger a MOVED_FROM to set the cookie, but we never see the MOVED_TO to 41 // read and delete it. So just storing it in a map would slowly leak memory. 42 // 43 // Doing it like this gives us a simple fast LRU-cache that won't allocate. 44 // Ten items should be more than enough for our purpose, and a loop over 45 // such a short array is faster than a map access anyway (not that it hugely 46 // matters since we're talking about hundreds of ns at the most, but still). 47 cookies [10]koekje 48 cookieIndex uint8 49 cookiesMu sync.Mutex 50 } 51 52 type ( 53 watches struct { 54 wd map[uint32]*watch // wd → watch 55 path map[string]uint32 // pathname → wd 56 } 57 watch struct { 58 wd uint32 // Watch descriptor (as returned by the inotify_add_watch() syscall) 59 flags uint32 // inotify flags of this watch (see inotify(7) for the list of valid flags) 60 path string // Watch path. 61 recurse bool // Recursion with ./...? 62 } 63 koekje struct { 64 cookie uint32 65 path string 66 } 67 ) 68 69 func newWatches() *watches { 70 return &watches{ 71 wd: make(map[uint32]*watch), 72 path: make(map[string]uint32), 73 } 74 } 75 76 func (w *watches) byPath(path string) *watch { return w.wd[w.path[path]] } 77 func (w *watches) byWd(wd uint32) *watch { return w.wd[wd] } 78 func (w *watches) len() int { return len(w.wd) } 79 func (w *watches) add(ww *watch) { w.wd[ww.wd] = ww; w.path[ww.path] = ww.wd } 80 func (w *watches) remove(watch *watch) { delete(w.path, watch.path); delete(w.wd, watch.wd) } 81 82 func (w *watches) removePath(path string) ([]uint32, error) { 83 path, recurse := recursivePath(path) 84 wd, ok := w.path[path] 85 if !ok { 86 return nil, fmt.Errorf("%w: %s", ErrNonExistentWatch, path) 87 } 88 89 watch := w.wd[wd] 90 if recurse && !watch.recurse { 91 return nil, fmt.Errorf("can't use /... with non-recursive watch %q", path) 92 } 93 94 delete(w.path, path) 95 delete(w.wd, wd) 96 if !watch.recurse { 97 return []uint32{wd}, nil 98 } 99 100 wds := make([]uint32, 0, 8) 101 wds = append(wds, wd) 102 for p, rwd := range w.path { 103 if strings.HasPrefix(p, path) { 104 delete(w.path, p) 105 delete(w.wd, rwd) 106 wds = append(wds, rwd) 107 } 108 } 109 return wds, nil 110 } 111 112 func (w *watches) updatePath(path string, f func(*watch) (*watch, error)) error { 113 var existing *watch 114 wd, ok := w.path[path] 115 if ok { 116 existing = w.wd[wd] 117 } 118 119 upd, err := f(existing) 120 if err != nil { 121 return err 122 } 123 if upd != nil { 124 w.wd[upd.wd] = upd 125 w.path[upd.path] = upd.wd 126 127 if upd.wd != wd { 128 delete(w.wd, wd) 129 } 130 } 131 132 return nil 133 } 134 135 var defaultBufferSize = 0 136 137 func newBackend(ev chan Event, errs chan error) (backend, error) { 138 // Need to set nonblocking mode for SetDeadline to work, otherwise blocking 139 // I/O operations won't terminate on close. 140 fd, errno := unix.InotifyInit1(unix.IN_CLOEXEC | unix.IN_NONBLOCK) 141 if fd == -1 { 142 return nil, errno 143 } 144 145 w := &inotify{ 146 shared: newShared(ev, errs), 147 Events: ev, 148 Errors: errs, 149 fd: fd, 150 inotifyFile: os.NewFile(uintptr(fd), ""), 151 watches: newWatches(), 152 doneResp: make(chan struct{}), 153 } 154 155 go w.readEvents() 156 return w, nil 157 } 158 159 func (w *inotify) Close() error { 160 if w.shared.close() { 161 return nil 162 } 163 164 // Causes any blocking reads to return with an error, provided the file 165 // still supports deadline operations. 166 err := w.inotifyFile.Close() 167 if err != nil { 168 return err 169 } 170 171 <-w.doneResp // Wait for readEvents() to finish. 172 return nil 173 } 174 175 func (w *inotify) Add(name string) error { return w.AddWith(name) } 176 177 func (w *inotify) AddWith(path string, opts ...addOpt) error { 178 if w.isClosed() { 179 return ErrClosed 180 } 181 if debug { 182 fmt.Fprintf(os.Stderr, "FSNOTIFY_DEBUG: %s AddWith(%q)\n", 183 time.Now().Format("15:04:05.000000000"), path) 184 } 185 186 with := getOptions(opts...) 187 if !w.xSupports(with.op) { 188 return fmt.Errorf("%w: %s", xErrUnsupported, with.op) 189 } 190 191 add := func(path string, with withOpts, recurse bool) error { 192 var flags uint32 193 if with.noFollow { 194 flags |= unix.IN_DONT_FOLLOW 195 } 196 if with.op.Has(Create) { 197 flags |= unix.IN_CREATE 198 } 199 if with.op.Has(Write) { 200 flags |= unix.IN_MODIFY 201 } 202 if with.op.Has(Remove) { 203 flags |= unix.IN_DELETE | unix.IN_DELETE_SELF 204 } 205 if with.op.Has(Rename) { 206 flags |= unix.IN_MOVED_TO | unix.IN_MOVED_FROM | unix.IN_MOVE_SELF 207 } 208 if with.op.Has(Chmod) { 209 flags |= unix.IN_ATTRIB 210 } 211 if with.op.Has(xUnportableOpen) { 212 flags |= unix.IN_OPEN 213 } 214 if with.op.Has(xUnportableRead) { 215 flags |= unix.IN_ACCESS 216 } 217 if with.op.Has(xUnportableCloseWrite) { 218 flags |= unix.IN_CLOSE_WRITE 219 } 220 if with.op.Has(xUnportableCloseRead) { 221 flags |= unix.IN_CLOSE_NOWRITE 222 } 223 return w.register(path, flags, recurse) 224 } 225 226 w.mu.Lock() 227 defer w.mu.Unlock() 228 path, recurse := recursivePath(path) 229 if recurse { 230 return filepath.WalkDir(path, func(root string, d fs.DirEntry, err error) error { 231 if err != nil { 232 return err 233 } 234 if !d.IsDir() { 235 if root == path { 236 return fmt.Errorf("fsnotify: not a directory: %q", path) 237 } 238 return nil 239 } 240 241 // Send a Create event when adding new directory from a recursive 242 // watch; this is for "mkdir -p one/two/three". Usually all those 243 // directories will be created before we can set up watchers on the 244 // subdirectories, so only "one" would be sent as a Create event and 245 // not "one/two" and "one/two/three" (inotifywait -r has the same 246 // problem). 247 if with.sendCreate && root != path { 248 w.sendEvent(Event{Name: root, Op: Create}) 249 } 250 251 return add(root, with, true) 252 }) 253 } 254 255 return add(path, with, false) 256 } 257 258 func (w *inotify) register(path string, flags uint32, recurse bool) error { 259 return w.watches.updatePath(path, func(existing *watch) (*watch, error) { 260 if existing != nil { 261 flags |= existing.flags | unix.IN_MASK_ADD 262 } 263 264 wd, err := unix.InotifyAddWatch(w.fd, path, flags) 265 if wd == -1 { 266 return nil, err 267 } 268 269 if e, ok := w.watches.wd[uint32(wd)]; ok { 270 return e, nil 271 } 272 273 if existing == nil { 274 return &watch{ 275 wd: uint32(wd), 276 path: path, 277 flags: flags, 278 recurse: recurse, 279 }, nil 280 } 281 282 existing.wd = uint32(wd) 283 existing.flags = flags 284 return existing, nil 285 }) 286 } 287 288 func (w *inotify) Remove(name string) error { 289 if w.isClosed() { 290 return nil 291 } 292 if debug { 293 fmt.Fprintf(os.Stderr, "FSNOTIFY_DEBUG: %s Remove(%q)\n", 294 time.Now().Format("15:04:05.000000000"), name) 295 } 296 297 w.mu.Lock() 298 defer w.mu.Unlock() 299 return w.remove(filepath.Clean(name)) 300 } 301 302 func (w *inotify) remove(name string) error { 303 wds, err := w.watches.removePath(name) 304 if err != nil { 305 return err 306 } 307 308 for _, wd := range wds { 309 _, err := unix.InotifyRmWatch(w.fd, wd) 310 if err != nil { 311 // TODO: Perhaps it's not helpful to return an error here in every 312 // case; the only two possible errors are: 313 // 314 // EBADF, which happens when w.fd is not a valid file descriptor of 315 // any kind. 316 // 317 // EINVAL, which is when fd is not an inotify descriptor or wd is 318 // not a valid watch descriptor. Watch descriptors are invalidated 319 // when they are removed explicitly or implicitly; explicitly by 320 // inotify_rm_watch, implicitly when the file they are watching is 321 // deleted. 322 return err 323 } 324 } 325 return nil 326 } 327 328 func (w *inotify) WatchList() []string { 329 if w.isClosed() { 330 return nil 331 } 332 333 w.mu.Lock() 334 defer w.mu.Unlock() 335 entries := make([]string, 0, w.watches.len()) 336 for pathname := range w.watches.path { 337 entries = append(entries, pathname) 338 } 339 return entries 340 } 341 342 // readEvents reads from the inotify file descriptor, converts the 343 // received events into Event objects and sends them via the Events channel 344 func (w *inotify) readEvents() { 345 defer func() { 346 close(w.doneResp) 347 close(w.Errors) 348 close(w.Events) 349 }() 350 351 var buf [unix.SizeofInotifyEvent * 4096]byte // Buffer for a maximum of 4096 raw events 352 for { 353 if w.isClosed() { 354 return 355 } 356 357 n, err := w.inotifyFile.Read(buf[:]) 358 if err != nil { 359 if errors.Is(err, os.ErrClosed) { 360 return 361 } 362 if !w.sendError(err) { 363 return 364 } 365 continue 366 } 367 368 if n < unix.SizeofInotifyEvent { 369 err := errors.New("notify: short read in readEvents()") // Read was too short. 370 if n == 0 { 371 err = io.EOF // If EOF is received. This should really never happen. 372 } 373 if !w.sendError(err) { 374 return 375 } 376 continue 377 } 378 379 // We don't know how many events we just read into the buffer While the 380 // offset points to at least one whole event. 381 var offset uint32 382 for offset <= uint32(n-unix.SizeofInotifyEvent) { 383 // Point to the event in the buffer. 384 inEvent := (*unix.InotifyEvent)(unsafe.Pointer(&buf[offset])) 385 386 if inEvent.Mask&unix.IN_Q_OVERFLOW != 0 { 387 if !w.sendError(ErrEventOverflow) { 388 return 389 } 390 } 391 392 ev, ok := w.handleEvent(inEvent, &buf, offset) 393 if !ok { 394 return 395 } 396 if !w.sendEvent(ev) { 397 return 398 } 399 400 // Move to the next event in the buffer 401 offset += unix.SizeofInotifyEvent + inEvent.Len 402 } 403 } 404 } 405 406 func (w *inotify) handleEvent(inEvent *unix.InotifyEvent, buf *[65536]byte, offset uint32) (Event, bool) { 407 w.mu.Lock() 408 defer w.mu.Unlock() 409 410 /// If the event happened to the watched directory or the watched file, the 411 /// kernel doesn't append the filename to the event, but we would like to 412 /// always fill the the "Name" field with a valid filename. We retrieve the 413 /// path of the watch from the "paths" map. 414 /// 415 /// Can be nil if Remove() was called in another goroutine for this path 416 /// inbetween reading the events from the kernel and reading the internal 417 /// state. Not much we can do about it, so just skip. See #616. 418 watch := w.watches.byWd(uint32(inEvent.Wd)) 419 if watch == nil { 420 return Event{}, true 421 } 422 423 var ( 424 name = watch.path 425 nameLen = uint32(inEvent.Len) 426 ) 427 if nameLen > 0 { 428 /// Point "bytes" at the first byte of the filename 429 bb := *buf 430 bytes := (*[unix.PathMax]byte)(unsafe.Pointer(&bb[offset+unix.SizeofInotifyEvent]))[:nameLen:nameLen] 431 /// The filename is padded with NULL bytes. TrimRight() gets rid of those. 432 name += "/" + strings.TrimRight(string(bytes[0:nameLen]), "\x00") 433 } 434 435 if debug { 436 internal.Debug(name, inEvent.Mask, inEvent.Cookie) 437 } 438 439 if inEvent.Mask&unix.IN_IGNORED != 0 || inEvent.Mask&unix.IN_UNMOUNT != 0 { 440 w.watches.remove(watch) 441 return Event{}, true 442 } 443 444 // inotify will automatically remove the watch on deletes; just need 445 // to clean our state here. 446 if inEvent.Mask&unix.IN_DELETE_SELF == unix.IN_DELETE_SELF { 447 w.watches.remove(watch) 448 } 449 450 // We can't really update the state when a watched path is moved; only 451 // IN_MOVE_SELF is sent and not IN_MOVED_{FROM,TO}. So remove the watch. 452 if inEvent.Mask&unix.IN_MOVE_SELF == unix.IN_MOVE_SELF { 453 if watch.recurse { // Do nothing 454 return Event{}, true 455 } 456 457 err := w.remove(watch.path) 458 if err != nil && !errors.Is(err, ErrNonExistentWatch) { 459 if !w.sendError(err) { 460 return Event{}, false 461 } 462 } 463 } 464 465 /// Skip if we're watching both this path and the parent; the parent will 466 /// already send a delete so no need to do it twice. 467 if inEvent.Mask&unix.IN_DELETE_SELF != 0 { 468 _, ok := w.watches.path[filepath.Dir(watch.path)] 469 if ok { 470 return Event{}, true 471 } 472 } 473 474 ev := w.newEvent(name, inEvent.Mask, inEvent.Cookie) 475 // Need to update watch path for recurse. 476 if watch.recurse { 477 isDir := inEvent.Mask&unix.IN_ISDIR == unix.IN_ISDIR 478 /// New directory created: set up watch on it. 479 if isDir && ev.Has(Create) { 480 err := w.register(ev.Name, watch.flags, true) 481 if !w.sendError(err) { 482 return Event{}, false 483 } 484 485 // This was a directory rename, so we need to update all the 486 // children. 487 // 488 // TODO: this is of course pretty slow; we should use a better data 489 // structure for storing all of this, e.g. store children in the 490 // watch. I have some code for this in my kqueue refactor we can use 491 // in the future. For now I'm okay with this as it's not publicly 492 // available. Correctness first, performance second. 493 if ev.renamedFrom != "" { 494 for k, ww := range w.watches.wd { 495 if k == watch.wd || ww.path == ev.Name { 496 continue 497 } 498 if strings.HasPrefix(ww.path, ev.renamedFrom) { 499 ww.path = strings.Replace(ww.path, ev.renamedFrom, ev.Name, 1) 500 w.watches.wd[k] = ww 501 } 502 } 503 } 504 } 505 } 506 507 return ev, true 508 } 509 510 func (w *inotify) isRecursive(path string) bool { 511 ww := w.watches.byPath(path) 512 if ww == nil { // path could be a file, so also check the Dir. 513 ww = w.watches.byPath(filepath.Dir(path)) 514 } 515 return ww != nil && ww.recurse 516 } 517 518 func (w *inotify) newEvent(name string, mask, cookie uint32) Event { 519 e := Event{Name: name} 520 if mask&unix.IN_CREATE == unix.IN_CREATE || mask&unix.IN_MOVED_TO == unix.IN_MOVED_TO { 521 e.Op |= Create 522 } 523 if mask&unix.IN_DELETE_SELF == unix.IN_DELETE_SELF || mask&unix.IN_DELETE == unix.IN_DELETE { 524 e.Op |= Remove 525 } 526 if mask&unix.IN_MODIFY == unix.IN_MODIFY { 527 e.Op |= Write 528 } 529 if mask&unix.IN_OPEN == unix.IN_OPEN { 530 e.Op |= xUnportableOpen 531 } 532 if mask&unix.IN_ACCESS == unix.IN_ACCESS { 533 e.Op |= xUnportableRead 534 } 535 if mask&unix.IN_CLOSE_WRITE == unix.IN_CLOSE_WRITE { 536 e.Op |= xUnportableCloseWrite 537 } 538 if mask&unix.IN_CLOSE_NOWRITE == unix.IN_CLOSE_NOWRITE { 539 e.Op |= xUnportableCloseRead 540 } 541 if mask&unix.IN_MOVE_SELF == unix.IN_MOVE_SELF || mask&unix.IN_MOVED_FROM == unix.IN_MOVED_FROM { 542 e.Op |= Rename 543 } 544 if mask&unix.IN_ATTRIB == unix.IN_ATTRIB { 545 e.Op |= Chmod 546 } 547 548 if cookie != 0 { 549 if mask&unix.IN_MOVED_FROM == unix.IN_MOVED_FROM { 550 w.cookiesMu.Lock() 551 w.cookies[w.cookieIndex] = koekje{cookie: cookie, path: e.Name} 552 w.cookieIndex++ 553 if w.cookieIndex > 9 { 554 w.cookieIndex = 0 555 } 556 w.cookiesMu.Unlock() 557 } else if mask&unix.IN_MOVED_TO == unix.IN_MOVED_TO { 558 w.cookiesMu.Lock() 559 var prev string 560 for _, c := range w.cookies { 561 if c.cookie == cookie { 562 prev = c.path 563 break 564 } 565 } 566 w.cookiesMu.Unlock() 567 e.renamedFrom = prev 568 } 569 } 570 return e 571 } 572 573 func (w *inotify) xSupports(op Op) bool { 574 return true // Supports everything. 575 } 576 577 func (w *inotify) state() { 578 w.mu.Lock() 579 defer w.mu.Unlock() 580 for wd, ww := range w.watches.wd { 581 fmt.Fprintf(os.Stderr, "%4d: recurse=%t %q\n", wd, ww.recurse, ww.path) 582 } 583 }