streaming with custom reader and multiple client functionality

This commit is contained in:
2026-05-17 23:48:24 -06:00
parent ff596276cb
commit 66c6ed7793
6 changed files with 214 additions and 108 deletions
+22 -9
View File
@@ -1,8 +1,8 @@
package cache
import (
"fmt"
"io"
"log/slog"
"net"
"net/http"
"os"
@@ -80,16 +80,29 @@ func NewCache(cacheRoot string, mirrorURLs []string, mirroredRepos []string) (*C
}, nil
}
func (c *Cache) Close() error {
return c.cr.Close()
}
func (c *Cache) Fetch(relPath string) (*CacheFile, error) {
// relPath is relative to the localRoot
// ie relPath includes /{repo}/os/{arch}/ and the actual name linux-x.x.x.pkg.tar.zst
type UpstreamError struct {
StatusCode int
}
// return file directly if exists in cache
cf, err := c.getCachedFile(relPath)
if err == nil {
return cf, nil
}
// fetch file from upstream
slog.Debug("calling fetch", "file", relPath)
flight, file, err := c.getStream(relPath)
if err != nil {
return nil, err
}
return &CacheFile{
Reader: &tailer{f: file, flight: flight},
Size: -1,
Filename: filepath.Base(relPath),
}, nil
func (e *UpstreamError) Error() string {
return fmt.Sprintf("upstream returned %d", e.StatusCode)
}
func checkSymLinks(cr *os.Root, repos []string) error {
+90
View File
@@ -0,0 +1,90 @@
package cache
import (
"errors"
"io"
"log/slog"
"net/http"
"os"
)
func (c *Cache) downloadWrangle(relPath string, flight *inFlight, tmpFile *os.File) {
// defer map cleanup and signal client handle done
defer c.cleanupFlight(relPath, flight)
// declare vars outside loop
var err error
// fetch pkgs from mirror with retry logic
for range len(c.cfg.mirrorURLs) {
url := c.nextMirror() + relPath
err = c.downloadToDisk(url, tmpFile)
if err == nil {
break
}
var upstreamErr *UpstreamError
if !errors.As(err, &upstreamErr) {
// network error, transfer interupted, bail
break
}
slog.Warn("mirror failed", "url", url, "status", upstreamErr.StatusCode)
}
if err != nil {
slog.Warn("download error", "err", err)
flight.err = err
removeErr := c.cr.Remove(flight.tmpPath)
if removeErr != nil {
slog.Warn("failed to remove temp file", "path", flight.tmpPath, "err", removeErr)
}
return
}
// mv file to final location
err = c.cr.Rename(flight.tmpPath, relPath)
if err != nil {
removeErr := c.cr.Remove(flight.tmpPath)
if removeErr != nil {
slog.Warn("failed to remove temp file", "path", flight.tmpPath, "err", removeErr)
}
return
}
return
}
func (c *Cache) downloadToDisk(url string, tmpFile *os.File) error {
slog.Info("fetching", "url", url)
// set the user agent
req, err := http.NewRequest("GET", url, nil)
if err != nil {
slog.Error("failed create request", "err", err)
}
req.Header.Set("User-Agent", userAgent)
resp, err := c.client.Do(req)
if err != nil {
slog.Warn("fetch failed", "url", url, "err", err)
return err
}
if resp.StatusCode != 200 {
slog.Info("fetch returned", "url", url, "status", resp.StatusCode)
return &UpstreamError{StatusCode: resp.StatusCode}
}
defer func() {
if closeErr := resp.Body.Close(); closeErr != nil {
err = closeErr
}
}()
_, err = io.Copy(tmpFile, resp.Body)
if err != nil {
return err
}
return nil
}
func (c *Cache) cleanupFlight(key string, f *inFlight) {
c.inFlightMu.Lock()
delete(c.inFlight, key)
c.inFlightMu.Unlock()
close(f.done)
}
-63
View File
@@ -1,63 +0,0 @@
package cache
import (
"errors"
"log/slog"
"path/filepath"
)
func (c *Cache) Fetch(relPath string) (*CacheFile, error) {
// relPath is relative to the localRoot
// ie relPath includes /{repo}/os/{arch}/ and the actual name linux-x.x.x.pkg.tar.zst
// return file directly if exists in cache
cf, err := c.getCachedFile(relPath)
if err == nil {
return cf, nil
}
// fetch file from upstream
slog.Debug("calling fetch", "file", relPath)
return nil, c.getStream(relPath)
}
func (c *Cache) getStream(relPath string) error {
// declare vars outside loop
var err error
// fetch pkgs from mirror with retry logic
for range len(c.cfg.mirrorURLs) {
url := c.nextMirror() + relPath
err = c.downloadToDisk(url, relPath)
if err == nil {
break
}
if upstreamErr, ok := errors.AsType[*UpstreamError](err); ok {
slog.Warn("mirror failed", "url", url, "status", upstreamErr.StatusCode)
} else {
slog.Warn("mirror unreachable", "url", url, "err", err)
}
}
if err != nil {
return err
}
return nil
}
func (c *Cache) getCachedFile(relPath string) (*CacheFile, error) {
info, err := c.cr.Stat(relPath)
if err != nil {
return nil, err
}
f, err := c.cr.Open(relPath)
if err != nil {
return nil, err
}
return &CacheFile{
Reader: f,
Size: info.Size(),
Filename: filepath.Base(relPath),
}, nil
}
+50
View File
@@ -0,0 +1,50 @@
package cache
import (
"os"
"path/filepath"
)
func (c *Cache) getStream(relPath string) (*inFlight, *os.File, error) {
// lock the map
c.inFlightMu.Lock()
defer c.inFlightMu.Unlock()
// download in progress connect new stream to existing download
if flight, ok := c.inFlight[relPath]; ok {
f, err := c.cr.Open(flight.tmpPath)
if err != nil {
return nil, nil, err
}
return flight, f, nil
}
// download not in progress, setup new download and stream
// make sure the dir structure exists
if err := c.cr.MkdirAll(filepath.Dir(relPath), 0750); err != nil {
return nil, nil, err
}
// use a tmp file for the initial fetch in case it fails
tmpPath := relPath + ".tmp"
tmpFile, err := c.cr.Create(tmpPath)
if err != nil {
return nil, nil, err
}
flight := &inFlight{
tmpPath: tmpPath,
}
c.inFlight[relPath] = flight
// spin off a downloading func so we can return the file handler
// downloadWrangle takes ownership of tmpFile and closes it.
go c.downloadWrangle(relPath, flight, tmpFile)
f, err := c.cr.Open(flight.tmpPath)
if err != nil {
return nil, nil, err
}
return flight, f, nil
}
+13 -36
View File
@@ -1,10 +1,7 @@
package cache
import (
"io"
"log/slog"
"net/http"
"os"
"fmt"
"path/filepath"
)
@@ -14,38 +11,6 @@ func (c *Cache) nextMirror() string {
return c.cfg.mirrorURLs[idx%mirrorCount]
}
func (c *Cache) downloadToDisk(url string, tmpFile *os.File) error {
slog.Info("fetching", "url", url)
// set the user agent
req, err := http.NewRequest("GET", url, nil)
if err != nil {
slog.Error("failed create request", "err", err)
}
req.Header.Set("User-Agent", userAgent)
resp, err := c.client.Do(req)
if err != nil {
slog.Warn("fetch failed", "url", url, "err", err)
return err
}
if resp.StatusCode != 200 {
slog.Info("fetch returned", "url", url, "status", resp.StatusCode)
return &UpstreamError{StatusCode: resp.StatusCode}
}
defer func() {
if closeErr := resp.Body.Close(); closeErr != nil {
err = closeErr
}
}()
_, err = io.Copy(tmpFile, resp.Body)
if err != nil {
return err
}
return nil
}
func (c *Cache) getCachedFile(relPath string) (*CacheFile, error) {
info, err := c.cr.Stat(relPath)
if err != nil {
@@ -63,3 +28,15 @@ func (c *Cache) getCachedFile(relPath string) (*CacheFile, error) {
Filename: filepath.Base(relPath),
}, nil
}
type UpstreamError struct {
StatusCode int
}
func (e *UpstreamError) Error() string {
return fmt.Sprintf("upstream returned %d", e.StatusCode)
}
func (c *Cache) Close() error {
return c.cr.Close()
}
+39
View File
@@ -0,0 +1,39 @@
package cache
import (
"io"
"os"
"time"
)
type tailer struct {
f *os.File
done <-chan struct{}
flight *inFlight
}
func (t *tailer) Read(p []byte) (int, error) {
for {
n, err := t.f.Read(p)
if n > 0 {
return n, nil
}
if err == io.EOF {
select {
case <-t.done:
if t.flight.err != nil {
return 0, t.flight.err
}
return t.f.Read(p) // send remainiing bytes
default:
time.Sleep(50 * time.Millisecond)
continue
}
}
return n, err
}
}
func (t *tailer) Close() error {
return t.f.Close()
}