fix(nixery): Avoid race when the same image is fetched in parallel

Remove a race condition which appears when uploadHashLayer is called
with the same key from multiple threads simultaneously. This can
easily happen when the same image path is requested by multiple
clients at the same time. When it does, a 500 status is returned and
the following error message is logged:

{
  "context": {
    "filePath": "github.com/google/nixery/builder/builder.go",
    "lineNumber": 440,
    "functionName": "github.com/google/nixery/builder.uploadHashLayer"
  },
  "error": "rename /var/lib/nixery/staging/<hash> /var/lib/nixery/layers/<hash>: no such file or directory",
  "eventTime": "...",
  "layer": "<hash>",
  "message": "failed to move layer from staging",
  ...
}

To solve this issue, introduce a mutex keyed on the uploaded hash and
move all layer caching into uploadHashLayer. This could additionally
provide a small performance benefit when an already built image is
requested and NIXERY_PKGS_PATH is set, since symlink layers and config
layers are now also cached.

Change-Id: I50788a7ec7940cb5e5760f244692e361019a9bb7
Reviewed-on: https://cl.tvl.fyi/c/depot/+/6695
Reviewed-by: tazjin <tazjin@tvl.su>
Tested-by: BuildkiteCI
This commit is contained in:
talyz 2022-09-19 09:30:28 +02:00
parent 42efe2cdae
commit 2fe7455f48
5 changed files with 59 additions and 46 deletions

View File

@ -26,6 +26,7 @@ import (
"github.com/google/nixery/layers" "github.com/google/nixery/layers"
"github.com/google/nixery/manifest" "github.com/google/nixery/manifest"
"github.com/google/nixery/storage" "github.com/google/nixery/storage"
"github.com/im7mortal/kmutex"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
@ -37,10 +38,11 @@ const LayerBudget int = 94
// State holds the runtime state that is carried around in Nixery and // State holds the runtime state that is carried around in Nixery and
// passed to builder functions. // passed to builder functions.
type State struct { type State struct {
Storage storage.Backend Storage storage.Backend
Cache *LocalCache Cache *LocalCache
Cfg config.Config Cfg config.Config
Pop layers.Popularity Pop layers.Popularity
UploadMutex *kmutex.Kmutex
} }
// Architecture represents the possible CPU architectures for which // Architecture represents the possible CPU architectures for which
@ -292,30 +294,19 @@ func prepareLayers(ctx context.Context, s *State, image *Image, result *ImageRes
// Missing layers are built and uploaded to the storage // Missing layers are built and uploaded to the storage
// bucket. // bucket.
for _, l := range grouped { for _, l := range grouped {
if entry, cached := layerFromCache(ctx, s, l.Hash()); cached { lh := l.Hash()
entries = append(entries, *entry)
} else {
lh := l.Hash()
// While packing store paths, the SHA sum of // While packing store paths, the SHA sum of
// the uncompressed layer is computed and // the uncompressed layer is computed and
// written to `tarhash`. // written to `tarhash`.
// //
// TODO(tazjin): Refactor this to make the // TODO(tazjin): Refactor this to make the
// flow of data cleaner. // flow of data cleaner.
var tarhash string lw := func(w io.Writer) (string, error) {
lw := func(w io.Writer) error { tarhash, err := packStorePaths(&l, w)
var err error
tarhash, err = packStorePaths(&l, w)
return err
}
entry, err := uploadHashLayer(ctx, s, lh, lw)
if err != nil { if err != nil {
return nil, err return "", err
} }
entry.MergeRating = l.MergeRating
entry.TarHash = tarhash
var pkgs []string var pkgs []string
for _, p := range l.Contents { for _, p := range l.Contents {
@ -328,15 +319,21 @@ func prepareLayers(ctx context.Context, s *State, image *Image, result *ImageRes
"tarhash": tarhash, "tarhash": tarhash,
}).Info("created image layer") }).Info("created image layer")
go cacheLayer(ctx, s, l.Hash(), *entry) return tarhash, err
entries = append(entries, *entry)
} }
entry, err := uploadHashLayer(ctx, s, lh, l.MergeRating, lw)
if err != nil {
return nil, err
}
entries = append(entries, *entry)
} }
// Symlink layer (built in the first Nix build) needs to be // Symlink layer (built in the first Nix build) needs to be
// included here manually: // included here manually:
slkey := result.SymlinkLayer.TarHash slkey := result.SymlinkLayer.TarHash
entry, err := uploadHashLayer(ctx, s, slkey, func(w io.Writer) error { entry, err := uploadHashLayer(ctx, s, slkey, 0, func(w io.Writer) (string, error) {
f, err := os.Open(result.SymlinkLayer.Path) f, err := os.Open(result.SymlinkLayer.Path)
if err != nil { if err != nil {
log.WithError(err).WithFields(log.Fields{ log.WithError(err).WithFields(log.Fields{
@ -345,7 +342,7 @@ func prepareLayers(ctx context.Context, s *State, image *Image, result *ImageRes
"layer": slkey, "layer": slkey,
}).Error("failed to open symlink layer") }).Error("failed to open symlink layer")
return err return "", err
} }
defer f.Close() defer f.Close()
@ -358,18 +355,16 @@ func prepareLayers(ctx context.Context, s *State, image *Image, result *ImageRes
"layer": slkey, "layer": slkey,
}).Error("failed to upload symlink layer") }).Error("failed to upload symlink layer")
return err return "", err
} }
return gz.Close() return "sha256:" + slkey, gz.Close()
}) })
if err != nil { if err != nil {
return nil, err return nil, err
} }
entry.TarHash = "sha256:" + result.SymlinkLayer.TarHash
go cacheLayer(ctx, s, slkey, *entry)
entries = append(entries, *entry) entries = append(entries, *entry)
return entries, nil return entries, nil
@ -380,7 +375,7 @@ func prepareLayers(ctx context.Context, s *State, image *Image, result *ImageRes
// //
// This type exists to avoid duplication between the handling of // This type exists to avoid duplication between the handling of
// symlink layers and store path layers. // symlink layers and store path layers.
type layerWriter func(w io.Writer) error type layerWriter func(w io.Writer) (string, error)
// byteCounter is a special io.Writer that counts all bytes written to // byteCounter is a special io.Writer that counts all bytes written to
// it and does nothing else. // it and does nothing else.
@ -408,8 +403,16 @@ func (b *byteCounter) Write(p []byte) (n int, err error) {
// //
// The return value is the layer's SHA256 hash, which is used in the // The return value is the layer's SHA256 hash, which is used in the
// image manifest. // image manifest.
func uploadHashLayer(ctx context.Context, s *State, key string, lw layerWriter) (*manifest.Entry, error) { func uploadHashLayer(ctx context.Context, s *State, key string, mrating uint64, lw layerWriter) (*manifest.Entry, error) {
s.UploadMutex.Lock(key)
defer s.UploadMutex.Unlock(key)
if entry, cached := layerFromCache(ctx, s, key); cached {
return entry, nil
}
path := "staging/" + key path := "staging/" + key
var tarhash string
sha256sum, size, err := s.Storage.Persist(ctx, path, manifest.LayerType, func(sw io.Writer) (string, int64, error) { sha256sum, size, err := s.Storage.Persist(ctx, path, manifest.LayerType, func(sw io.Writer) (string, int64, error) {
// Sets up a "multiwriter" that simultaneously runs both hash // Sets up a "multiwriter" that simultaneously runs both hash
// algorithms and uploads to the storage backend. // algorithms and uploads to the storage backend.
@ -417,7 +420,8 @@ func uploadHashLayer(ctx context.Context, s *State, key string, lw layerWriter)
counter := &byteCounter{} counter := &byteCounter{}
multi := io.MultiWriter(sw, shasum, counter) multi := io.MultiWriter(sw, shasum, counter)
err := lw(multi) var err error
tarhash, err = lw(multi)
sha256sum := fmt.Sprintf("%x", shasum.Sum([]byte{})) sha256sum := fmt.Sprintf("%x", shasum.Sum([]byte{}))
return sha256sum, counter.count, err return sha256sum, counter.count, err
@ -449,10 +453,14 @@ func uploadHashLayer(ctx context.Context, s *State, key string, lw layerWriter)
}).Info("created and persisted layer") }).Info("created and persisted layer")
entry := manifest.Entry{ entry := manifest.Entry{
Digest: "sha256:" + sha256sum, Digest: "sha256:" + sha256sum,
Size: size, Size: size,
TarHash: tarhash,
MergeRating: mrating,
} }
cacheLayer(ctx, s, key, entry)
return &entry, nil return &entry, nil
} }
@ -493,13 +501,13 @@ func BuildImage(ctx context.Context, s *State, image *Image) (*BuildResult, erro
} }
m, c := manifest.Manifest(image.Arch.imageArch, layers, cmd) m, c := manifest.Manifest(image.Arch.imageArch, layers, cmd)
lw := func(w io.Writer) error { lw := func(w io.Writer) (string, error) {
r := bytes.NewReader(c.Config) r := bytes.NewReader(c.Config)
_, err := io.Copy(w, r) _, err := io.Copy(w, r)
return err return "", err
} }
if _, err = uploadHashLayer(ctx, s, c.SHA256, lw); err != nil { if _, err = uploadHashLayer(ctx, s, c.SHA256, 0, lw); err != nil {
log.WithError(err).WithFields(log.Fields{ log.WithError(err).WithFields(log.Fields{
"image": image.Name, "image": image.Name,
"tag": image.Tag, "tag": image.Tag,

View File

@ -30,6 +30,7 @@ import (
"github.com/google/nixery/logs" "github.com/google/nixery/logs"
mf "github.com/google/nixery/manifest" mf "github.com/google/nixery/manifest"
"github.com/google/nixery/storage" "github.com/google/nixery/storage"
"github.com/im7mortal/kmutex"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
@ -257,10 +258,11 @@ func main() {
} }
state := builder.State{ state := builder.State{
Cache: &cache, Cache: &cache,
Cfg: cfg, Cfg: cfg,
Pop: pop, Pop: pop,
Storage: s, Storage: s,
UploadMutex: kmutex.New(),
} }
log.WithFields(log.Fields{ log.WithFields(log.Fields{

View File

@ -69,7 +69,7 @@ depot.nix.readTree.drvTargets rec {
doCheck = true; doCheck = true;
# Needs to be updated after every modification of go.mod/go.sum # Needs to be updated after every modification of go.mod/go.sum
vendorSha256 = "115dfdhpklgmp6dsy59bp0i9inqim208lf1sqbnl9jy91bnnbl32"; vendorSha256 = "sha256-io9NCeZmjCZPLmII3ajXIsBWbT40XiW8ncXOuUDabbo=";
buildFlagsArray = [ buildFlagsArray = [
"-ldflags=-s -w -X main.version=${nixery-commit-hash}" "-ldflags=-s -w -X main.version=${nixery-commit-hash}"

1
go.mod
View File

@ -6,6 +6,7 @@ require (
cloud.google.com/go/storage v1.22.1 cloud.google.com/go/storage v1.22.1
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/google/go-cmp v0.5.8 github.com/google/go-cmp v0.5.8
github.com/im7mortal/kmutex v1.0.1 // indirect
github.com/pkg/xattr v0.4.7 github.com/pkg/xattr v0.4.7
github.com/sirupsen/logrus v1.8.1 github.com/sirupsen/logrus v1.8.1
golang.org/x/oauth2 v0.0.0-20220524215830-622c5d57e401 golang.org/x/oauth2 v0.0.0-20220524215830-622c5d57e401

2
go.sum
View File

@ -201,6 +201,8 @@ github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/im7mortal/kmutex v1.0.1 h1:zAACzjwD+OEknDqnLdvRa/BhzFM872EBwKijviGLc9Q=
github.com/im7mortal/kmutex v1.0.1/go.mod h1:f71c/Ugk/+58OHRAgvgzPP3QEiWGUjK13fd8ozfKWdo=
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
github.com/jung-kurt/gofpdf v1.0.0/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes= github.com/jung-kurt/gofpdf v1.0.0/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes=