From 66c6ed77932e62071ae55fef3d78d3102f723b0e Mon Sep 17 00:00:00 2001 From: Eric Phillips Date: Sun, 17 May 2026 23:48:24 -0600 Subject: [PATCH] streaming with custom reader and multiple client functionality --- internal/cache/cache.go | 31 +++++++++---- internal/cache/download.go | 90 ++++++++++++++++++++++++++++++++++++ internal/cache/fetch.go | 63 ------------------------- internal/cache/get_stream.go | 50 ++++++++++++++++++++ internal/cache/helpers.go | 49 ++++++-------------- internal/cache/tailer.go | 39 ++++++++++++++++ 6 files changed, 214 insertions(+), 108 deletions(-) create mode 100644 internal/cache/download.go delete mode 100644 internal/cache/fetch.go create mode 100644 internal/cache/get_stream.go create mode 100644 internal/cache/tailer.go diff --git a/internal/cache/cache.go b/internal/cache/cache.go index 7261335..0491dd9 100644 --- a/internal/cache/cache.go +++ b/internal/cache/cache.go @@ -1,8 +1,8 @@ package cache import ( - "fmt" "io" + "log/slog" "net" "net/http" "os" @@ -80,16 +80,29 @@ func NewCache(cacheRoot string, mirrorURLs []string, mirroredRepos []string) (*C }, nil } -func (c *Cache) Close() error { - return c.cr.Close() -} +func (c *Cache) Fetch(relPath string) (*CacheFile, error) { + // relPath is relative to the localRoot + // ie relPath includes /{repo}/os/{arch}/ and the actual name linux-x.x.x.pkg.tar.zst -type UpstreamError struct { - StatusCode int -} + // return file directly if exists in cache + cf, err := c.getCachedFile(relPath) + if err == nil { + return cf, nil + } + + // fetch file from upstream + slog.Debug("calling fetch", "file", relPath) + flight, file, err := c.getStream(relPath) + if err != nil { + return nil, err + } + + return &CacheFile{ + Reader: &tailer{f: file, flight: flight}, + Size: -1, + Filename: filepath.Base(relPath), + }, nil -func (e *UpstreamError) Error() string { - return fmt.Sprintf("upstream returned %d", e.StatusCode) } func checkSymLinks(cr *os.Root, repos []string) error { diff --git a/internal/cache/download.go b/internal/cache/download.go new file mode 100644 index 0000000..de143b7 --- /dev/null +++ b/internal/cache/download.go @@ -0,0 +1,90 @@ +package cache + +import ( + "errors" + "io" + "log/slog" + "net/http" + "os" +) + +func (c *Cache) downloadWrangle(relPath string, flight *inFlight, tmpFile *os.File) { + // defer map cleanup and signal client handle done + defer c.cleanupFlight(relPath, flight) + + // declare vars outside loop + var err error + // fetch pkgs from mirror with retry logic + for range len(c.cfg.mirrorURLs) { + url := c.nextMirror() + relPath + err = c.downloadToDisk(url, tmpFile) + if err == nil { + break + } + var upstreamErr *UpstreamError + if !errors.As(err, &upstreamErr) { + // network error, transfer interupted, bail + break + } + slog.Warn("mirror failed", "url", url, "status", upstreamErr.StatusCode) + } + if err != nil { + slog.Warn("download error", "err", err) + flight.err = err + removeErr := c.cr.Remove(flight.tmpPath) + if removeErr != nil { + slog.Warn("failed to remove temp file", "path", flight.tmpPath, "err", removeErr) + } + return + } + + // mv file to final location + err = c.cr.Rename(flight.tmpPath, relPath) + if err != nil { + removeErr := c.cr.Remove(flight.tmpPath) + if removeErr != nil { + slog.Warn("failed to remove temp file", "path", flight.tmpPath, "err", removeErr) + } + return + } + return +} + +func (c *Cache) downloadToDisk(url string, tmpFile *os.File) error { + slog.Info("fetching", "url", url) + + // set the user agent + req, err := http.NewRequest("GET", url, nil) + if err != nil { + slog.Error("failed create request", "err", err) + } + req.Header.Set("User-Agent", userAgent) + + resp, err := c.client.Do(req) + if err != nil { + slog.Warn("fetch failed", "url", url, "err", err) + return err + } + if resp.StatusCode != 200 { + slog.Info("fetch returned", "url", url, "status", resp.StatusCode) + return &UpstreamError{StatusCode: resp.StatusCode} + } + defer func() { + if closeErr := resp.Body.Close(); closeErr != nil { + err = closeErr + } + }() + + _, err = io.Copy(tmpFile, resp.Body) + if err != nil { + return err + } + return nil +} + +func (c *Cache) cleanupFlight(key string, f *inFlight) { + c.inFlightMu.Lock() + delete(c.inFlight, key) + c.inFlightMu.Unlock() + close(f.done) +} diff --git a/internal/cache/fetch.go b/internal/cache/fetch.go deleted file mode 100644 index 0d25aaf..0000000 --- a/internal/cache/fetch.go +++ /dev/null @@ -1,63 +0,0 @@ -package cache - -import ( - "errors" - "log/slog" - "path/filepath" -) - -func (c *Cache) Fetch(relPath string) (*CacheFile, error) { - // relPath is relative to the localRoot - // ie relPath includes /{repo}/os/{arch}/ and the actual name linux-x.x.x.pkg.tar.zst - - // return file directly if exists in cache - cf, err := c.getCachedFile(relPath) - if err == nil { - return cf, nil - } - - // fetch file from upstream - slog.Debug("calling fetch", "file", relPath) - return nil, c.getStream(relPath) -} - -func (c *Cache) getStream(relPath string) error { - - // declare vars outside loop - var err error - // fetch pkgs from mirror with retry logic - for range len(c.cfg.mirrorURLs) { - url := c.nextMirror() + relPath - err = c.downloadToDisk(url, relPath) - if err == nil { - break - } - if upstreamErr, ok := errors.AsType[*UpstreamError](err); ok { - slog.Warn("mirror failed", "url", url, "status", upstreamErr.StatusCode) - } else { - slog.Warn("mirror unreachable", "url", url, "err", err) - } - } - if err != nil { - return err - } - return nil -} - -func (c *Cache) getCachedFile(relPath string) (*CacheFile, error) { - info, err := c.cr.Stat(relPath) - if err != nil { - return nil, err - } - - f, err := c.cr.Open(relPath) - if err != nil { - return nil, err - } - - return &CacheFile{ - Reader: f, - Size: info.Size(), - Filename: filepath.Base(relPath), - }, nil -} diff --git a/internal/cache/get_stream.go b/internal/cache/get_stream.go new file mode 100644 index 0000000..e4adb7c --- /dev/null +++ b/internal/cache/get_stream.go @@ -0,0 +1,50 @@ +package cache + +import ( + "os" + "path/filepath" +) + +func (c *Cache) getStream(relPath string) (*inFlight, *os.File, error) { + + // lock the map + c.inFlightMu.Lock() + defer c.inFlightMu.Unlock() + + // download in progress connect new stream to existing download + if flight, ok := c.inFlight[relPath]; ok { + f, err := c.cr.Open(flight.tmpPath) + if err != nil { + return nil, nil, err + } + return flight, f, nil + } + + // download not in progress, setup new download and stream + + // make sure the dir structure exists + if err := c.cr.MkdirAll(filepath.Dir(relPath), 0750); err != nil { + return nil, nil, err + } + // use a tmp file for the initial fetch in case it fails + tmpPath := relPath + ".tmp" + tmpFile, err := c.cr.Create(tmpPath) + if err != nil { + return nil, nil, err + } + + flight := &inFlight{ + tmpPath: tmpPath, + } + + c.inFlight[relPath] = flight + + // spin off a downloading func so we can return the file handler + // downloadWrangle takes ownership of tmpFile and closes it. + go c.downloadWrangle(relPath, flight, tmpFile) + f, err := c.cr.Open(flight.tmpPath) + if err != nil { + return nil, nil, err + } + return flight, f, nil +} diff --git a/internal/cache/helpers.go b/internal/cache/helpers.go index ce58ac3..666c0e9 100644 --- a/internal/cache/helpers.go +++ b/internal/cache/helpers.go @@ -1,10 +1,7 @@ package cache import ( - "io" - "log/slog" - "net/http" - "os" + "fmt" "path/filepath" ) @@ -14,38 +11,6 @@ func (c *Cache) nextMirror() string { return c.cfg.mirrorURLs[idx%mirrorCount] } -func (c *Cache) downloadToDisk(url string, tmpFile *os.File) error { - slog.Info("fetching", "url", url) - - // set the user agent - req, err := http.NewRequest("GET", url, nil) - if err != nil { - slog.Error("failed create request", "err", err) - } - req.Header.Set("User-Agent", userAgent) - - resp, err := c.client.Do(req) - if err != nil { - slog.Warn("fetch failed", "url", url, "err", err) - return err - } - if resp.StatusCode != 200 { - slog.Info("fetch returned", "url", url, "status", resp.StatusCode) - return &UpstreamError{StatusCode: resp.StatusCode} - } - defer func() { - if closeErr := resp.Body.Close(); closeErr != nil { - err = closeErr - } - }() - - _, err = io.Copy(tmpFile, resp.Body) - if err != nil { - return err - } - return nil -} - func (c *Cache) getCachedFile(relPath string) (*CacheFile, error) { info, err := c.cr.Stat(relPath) if err != nil { @@ -63,3 +28,15 @@ func (c *Cache) getCachedFile(relPath string) (*CacheFile, error) { Filename: filepath.Base(relPath), }, nil } + +type UpstreamError struct { + StatusCode int +} + +func (e *UpstreamError) Error() string { + return fmt.Sprintf("upstream returned %d", e.StatusCode) +} + +func (c *Cache) Close() error { + return c.cr.Close() +} diff --git a/internal/cache/tailer.go b/internal/cache/tailer.go new file mode 100644 index 0000000..3036aab --- /dev/null +++ b/internal/cache/tailer.go @@ -0,0 +1,39 @@ +package cache + +import ( + "io" + "os" + "time" +) + +type tailer struct { + f *os.File + done <-chan struct{} + flight *inFlight +} + +func (t *tailer) Read(p []byte) (int, error) { + for { + n, err := t.f.Read(p) + if n > 0 { + return n, nil + } + if err == io.EOF { + select { + case <-t.done: + if t.flight.err != nil { + return 0, t.flight.err + } + return t.f.Read(p) // send remainiing bytes + default: + time.Sleep(50 * time.Millisecond) + continue + } + } + return n, err + } +} + +func (t *tailer) Close() error { + return t.f.Close() +}