code.dwrz.net

Go monorepo.
Log | Files | Refs

singleflight.go (5420B)


      1 // Copyright 2013 The Go Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style
      3 // license that can be found in the LICENSE file.
      4 
      5 package singleflight
      6 
      7 import (
      8 	"bytes"
      9 	"errors"
     10 	"fmt"
     11 	"runtime"
     12 	"runtime/debug"
     13 	"sync"
     14 )
     15 
     16 // errGoexit indicates the runtime.Goexit was called in
     17 // the user given function.
     18 var errGoexit = errors.New("runtime.Goexit was called")
     19 
     20 // A panicError is an arbitrary value recovered from a panic
     21 // with the stack trace during the execution of given function.
     22 type panicError struct {
     23 	value interface{}
     24 	stack []byte
     25 }
     26 
     27 // Error implements error interface.
     28 func (p *panicError) Error() string {
     29 	return fmt.Sprintf("%v\n\n%s", p.value, p.stack)
     30 }
     31 
     32 func newPanicError(v interface{}) error {
     33 	stack := debug.Stack()
     34 
     35 	// The first line of the stack trace is of the form "goroutine N [status]:"
     36 	// but by the time the panic reaches Do the goroutine may no longer exist
     37 	// and its status will have changed. Trim out the misleading line.
     38 	if line := bytes.IndexByte(stack[:], '\n'); line >= 0 {
     39 		stack = stack[line+1:]
     40 	}
     41 	return &panicError{value: v, stack: stack}
     42 }
     43 
     44 // call is an in-flight or completed singleflight.Do call
     45 type call struct {
     46 	wg sync.WaitGroup
     47 
     48 	// These fields are written once before the WaitGroup is done
     49 	// and are only read after the WaitGroup is done.
     50 	val interface{}
     51 	err error
     52 
     53 	// forgotten indicates whether Forget was called with this call's key
     54 	// while the call was still in flight.
     55 	forgotten bool
     56 
     57 	// These fields are read and written with the singleflight
     58 	// mutex held before the WaitGroup is done, and are read but
     59 	// not written after the WaitGroup is done.
     60 	dups  int
     61 	chans []chan<- Result
     62 }
     63 
     64 // Group represents a class of work and forms a namespace in
     65 // which units of work can be executed with duplicate suppression.
     66 type Group struct {
     67 	mu sync.Mutex       // protects m
     68 	m  map[string]*call // lazily initialized
     69 }
     70 
     71 // Result holds the results of Do, so they can be passed
     72 // on a channel.
     73 type Result struct {
     74 	Val    interface{}
     75 	Err    error
     76 	Shared bool
     77 }
     78 
     79 // Do executes and returns the results of the given function, making
     80 // sure that only one execution is in-flight for a given key at a
     81 // time. If a duplicate comes in, the duplicate caller waits for the
     82 // original to complete and receives the same results.
     83 // The return value shared indicates whether v was given to multiple callers.
     84 func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
     85 	g.mu.Lock()
     86 	if g.m == nil {
     87 		g.m = make(map[string]*call)
     88 	}
     89 	if c, ok := g.m[key]; ok {
     90 		c.dups++
     91 		g.mu.Unlock()
     92 		c.wg.Wait()
     93 
     94 		if e, ok := c.err.(*panicError); ok {
     95 			panic(e)
     96 		} else if c.err == errGoexit {
     97 			runtime.Goexit()
     98 		}
     99 		return c.val, c.err, true
    100 	}
    101 	c := new(call)
    102 	c.wg.Add(1)
    103 	g.m[key] = c
    104 	g.mu.Unlock()
    105 
    106 	g.doCall(c, key, fn)
    107 	return c.val, c.err, c.dups > 0
    108 }
    109 
    110 // DoChan is like Do but returns a channel that will receive the
    111 // results when they are ready.
    112 //
    113 // The returned channel will not be closed.
    114 func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
    115 	ch := make(chan Result, 1)
    116 	g.mu.Lock()
    117 	if g.m == nil {
    118 		g.m = make(map[string]*call)
    119 	}
    120 	if c, ok := g.m[key]; ok {
    121 		c.dups++
    122 		c.chans = append(c.chans, ch)
    123 		g.mu.Unlock()
    124 		return ch
    125 	}
    126 	c := &call{chans: []chan<- Result{ch}}
    127 	c.wg.Add(1)
    128 	g.m[key] = c
    129 	g.mu.Unlock()
    130 
    131 	go g.doCall(c, key, fn)
    132 
    133 	return ch
    134 }
    135 
    136 // doCall handles the single call for a key.
    137 func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
    138 	normalReturn := false
    139 	recovered := false
    140 
    141 	// use double-defer to distinguish panic from runtime.Goexit,
    142 	// more details see https://golang.org/cl/134395
    143 	defer func() {
    144 		// the given function invoked runtime.Goexit
    145 		if !normalReturn && !recovered {
    146 			c.err = errGoexit
    147 		}
    148 
    149 		c.wg.Done()
    150 		g.mu.Lock()
    151 		defer g.mu.Unlock()
    152 		if !c.forgotten {
    153 			delete(g.m, key)
    154 		}
    155 
    156 		if e, ok := c.err.(*panicError); ok {
    157 			// In order to prevent the waiting channels from being blocked forever,
    158 			// needs to ensure that this panic cannot be recovered.
    159 			if len(c.chans) > 0 {
    160 				go panic(e)
    161 				select {} // Keep this goroutine around so that it will appear in the crash dump.
    162 			} else {
    163 				panic(e)
    164 			}
    165 		} else if c.err == errGoexit {
    166 			// Already in the process of goexit, no need to call again
    167 		} else {
    168 			// Normal return
    169 			for _, ch := range c.chans {
    170 				ch <- Result{c.val, c.err, c.dups > 0}
    171 			}
    172 		}
    173 	}()
    174 
    175 	func() {
    176 		defer func() {
    177 			if !normalReturn {
    178 				// Ideally, we would wait to take a stack trace until we've determined
    179 				// whether this is a panic or a runtime.Goexit.
    180 				//
    181 				// Unfortunately, the only way we can distinguish the two is to see
    182 				// whether the recover stopped the goroutine from terminating, and by
    183 				// the time we know that, the part of the stack trace relevant to the
    184 				// panic has been discarded.
    185 				if r := recover(); r != nil {
    186 					c.err = newPanicError(r)
    187 				}
    188 			}
    189 		}()
    190 
    191 		c.val, c.err = fn()
    192 		normalReturn = true
    193 	}()
    194 
    195 	if !normalReturn {
    196 		recovered = true
    197 	}
    198 }
    199 
    200 // Forget tells the singleflight to forget about a key.  Future calls
    201 // to Do for this key will call the function rather than waiting for
    202 // an earlier call to complete.
    203 func (g *Group) Forget(key string) {
    204 	g.mu.Lock()
    205 	if c, ok := g.m[key]; ok {
    206 		c.forgotten = true
    207 	}
    208 	delete(g.m, key)
    209 	g.mu.Unlock()
    210 }