140 lines
2.9 KiB
Go
140 lines
2.9 KiB
Go
package cache
|
|
|
|
import (
|
|
"fmt"
|
|
"io"
|
|
"log/slog"
|
|
"net"
|
|
"net/http"
|
|
"os"
|
|
"path/filepath"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
const userAgent = "pacman/7.1.0 (Linux x86_64) libalpm/16.0.1"
|
|
|
|
type Cache struct {
|
|
cfg CacheConfig
|
|
cr *os.Root
|
|
mirrorIdx atomic.Uint64
|
|
refreshMu sync.Mutex
|
|
client http.Client
|
|
inFlight map[string]*inFlight
|
|
inFlightMu sync.Mutex
|
|
}
|
|
|
|
type CacheConfig struct {
|
|
mirrorURLs []string
|
|
mirroredRepos []string
|
|
DialTimeout time.Duration
|
|
ResponseHeaderTimeout time.Duration
|
|
ClientTimeout time.Duration
|
|
}
|
|
|
|
type inFlight struct {
|
|
tmpPath string
|
|
headerReady chan struct{}
|
|
contentLength int64
|
|
done chan struct{}
|
|
err error
|
|
}
|
|
|
|
type CacheFile struct {
|
|
Reader io.ReadCloser
|
|
Size int64
|
|
Filename string
|
|
}
|
|
|
|
func NewCache(cacheRoot string, mirrorURLs []string, mirroredRepos []string) (*Cache, error) {
|
|
cfg := CacheConfig{
|
|
mirrorURLs: mirrorURLs,
|
|
mirroredRepos: mirroredRepos,
|
|
DialTimeout: 5 * time.Second,
|
|
ResponseHeaderTimeout: 10 * time.Second,
|
|
ClientTimeout: 0 * time.Second,
|
|
}
|
|
|
|
transport := &http.Transport{
|
|
DialContext: (&net.Dialer{
|
|
Timeout: cfg.DialTimeout,
|
|
}).DialContext,
|
|
ResponseHeaderTimeout: cfg.ResponseHeaderTimeout,
|
|
}
|
|
|
|
cr, err := os.OpenRoot(cacheRoot)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := checkSymLinks(cr, mirroredRepos); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &Cache{
|
|
cfg: cfg,
|
|
cr: cr,
|
|
client: http.Client{
|
|
Timeout: cfg.ClientTimeout,
|
|
Transport: transport,
|
|
},
|
|
inFlight: make(map[string]*inFlight),
|
|
}, nil
|
|
}
|
|
|
|
func (c *Cache) Fetch(relPath string) (*CacheFile, error) {
|
|
// relPath is relative to the localRoot
|
|
// ie relPath includes /{repo}/os/{arch}/ and the actual name linux-x.x.x.pkg.tar.zst
|
|
|
|
// return file directly if exists in cache
|
|
cf, err := c.getCachedFile(relPath)
|
|
if err == nil {
|
|
return cf, nil
|
|
}
|
|
|
|
// fetch file from upstream
|
|
slog.Debug("calling fetch", "file", relPath)
|
|
flight, file, err := c.getStream(relPath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var size int64
|
|
select {
|
|
case <-flight.headerReady:
|
|
size = flight.contentLength
|
|
err = flight.err
|
|
case <-time.After(5 * time.Second):
|
|
return nil, fmt.Errorf("upstream header timeout")
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &CacheFile{
|
|
Reader: &tailer{f: file, flight: flight},
|
|
Size: size,
|
|
Filename: filepath.Base(relPath),
|
|
}, nil
|
|
|
|
}
|
|
|
|
func checkSymLinks(cr *os.Root, repos []string) error {
|
|
for _, repo := range repos {
|
|
dirPath := filepath.Join(repo, "os/x86_64")
|
|
if err := cr.MkdirAll(dirPath, 0750); err != nil {
|
|
return err
|
|
}
|
|
lnPath := filepath.Join(dirPath, repo+".db")
|
|
srcPath := repo + ".db.tar.gz"
|
|
if _, err := cr.Lstat(lnPath); err == nil {
|
|
continue // link exists
|
|
}
|
|
if err := cr.Symlink(srcPath, lnPath); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|