tailer struct was incorrect and used incorrect channel

This commit is contained in:
2026-05-18 01:14:57 -06:00
parent 66c6ed7793
commit 9b0610b879
5 changed files with 21 additions and 8 deletions
+6 -2
View File
@@ -5,6 +5,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"log/slog"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"os" "os"
@@ -22,6 +23,8 @@ func newTestServer(t *testing.T, handler http.HandlerFunc) *httptest.Server {
func newTestCache(t *testing.T, mirrorURLs []string) *Cache { func newTestCache(t *testing.T, mirrorURLs []string) *Cache {
t.Helper() t.Helper()
// set slog to debug
slog.SetLogLoggerLevel(slog.LevelDebug)
mirroredRepos := []string{"core", "extra"} mirroredRepos := []string{"core", "extra"}
c, err := NewCache(t.TempDir(), mirrorURLs, mirroredRepos) c, err := NewCache(t.TempDir(), mirrorURLs, mirroredRepos)
if err != nil { if err != nil {
@@ -33,7 +36,6 @@ func newTestCache(t *testing.T, mirrorURLs []string) *Cache {
func TestCacheHit(t *testing.T) { func TestCacheHit(t *testing.T) {
const expected = "This is fake file contents" const expected = "This is fake file contents"
c := newTestCache(t, []string{"http://example.com/"}) c := newTestCache(t, []string{"http://example.com/"})
tmpFileName := "fakeFile" tmpFileName := "fakeFile"
err := c.cr.WriteFile(tmpFileName, []byte(expected), 0644) err := c.cr.WriteFile(tmpFileName, []byte(expected), 0644)
@@ -78,10 +80,12 @@ func TestCacheMissExists(t *testing.T) {
c := newTestCache(t, []string{svr.URL + "/"}) c := newTestCache(t, []string{svr.URL + "/"})
_, err := c.Fetch("fakefile") cf, err := c.Fetch("fakefile")
if err != nil { if err != nil {
t.Fatalf("Fetch failed %v", err) t.Fatalf("Fetch failed %v", err)
} }
io.Copy(io.Discard, cf.Reader)
cf.Reader.Close()
data, err := c.cr.ReadFile("fakefile") data, err := c.cr.ReadFile("fakefile")
if err != nil { if err != nil {
+8 -1
View File
@@ -9,6 +9,8 @@ import (
) )
func (c *Cache) downloadWrangle(relPath string, flight *inFlight, tmpFile *os.File) { func (c *Cache) downloadWrangle(relPath string, flight *inFlight, tmpFile *os.File) {
slog.Debug("wrangle starting", "relPath", relPath)
// defer map cleanup and signal client handle done // defer map cleanup and signal client handle done
defer c.cleanupFlight(relPath, flight) defer c.cleanupFlight(relPath, flight)
@@ -37,6 +39,7 @@ func (c *Cache) downloadWrangle(relPath string, flight *inFlight, tmpFile *os.Fi
} }
return return
} }
slog.Debug("wrangle download complete", "err", err)
// mv file to final location // mv file to final location
err = c.cr.Rename(flight.tmpPath, relPath) err = c.cr.Rename(flight.tmpPath, relPath)
@@ -47,7 +50,7 @@ func (c *Cache) downloadWrangle(relPath string, flight *inFlight, tmpFile *os.Fi
} }
return return
} }
return slog.Debug("file moved to final location", "err", err)
} }
func (c *Cache) downloadToDisk(url string, tmpFile *os.File) error { func (c *Cache) downloadToDisk(url string, tmpFile *os.File) error {
@@ -79,6 +82,9 @@ func (c *Cache) downloadToDisk(url string, tmpFile *os.File) error {
if err != nil { if err != nil {
return err return err
} }
if err = tmpFile.Sync(); err != nil {
return err
}
return nil return nil
} }
@@ -86,5 +92,6 @@ func (c *Cache) cleanupFlight(key string, f *inFlight) {
c.inFlightMu.Lock() c.inFlightMu.Lock()
delete(c.inFlight, key) delete(c.inFlight, key)
c.inFlightMu.Unlock() c.inFlightMu.Unlock()
slog.Debug("closing done channel")
close(f.done) close(f.done)
} }
+1
View File
@@ -35,6 +35,7 @@ func (c *Cache) getStream(relPath string) (*inFlight, *os.File, error) {
flight := &inFlight{ flight := &inFlight{
tmpPath: tmpPath, tmpPath: tmpPath,
done: make(chan struct{}),
} }
c.inFlight[relPath] = flight c.inFlight[relPath] = flight
+1 -1
View File
@@ -19,7 +19,7 @@ func (c *Cache) Refresh() error {
func (c *Cache) refreshDB(repo string) error { func (c *Cache) refreshDB(repo string) error {
dbFile := repo + ".db.tar.gz" dbFile := repo + ".db.tar.gz"
dbPath := filepath.Join(repo, "os/x86_64", dbFile) dbPath := filepath.Join(repo, "os/x86_64", dbFile)
err := c.getStream(dbPath) _, _, err := c.getStream(dbPath)
if err != nil { if err != nil {
return err return err
} }
+5 -4
View File
@@ -2,31 +2,32 @@ package cache
import ( import (
"io" "io"
"log/slog"
"os" "os"
"time" "time"
) )
type tailer struct { type tailer struct {
f *os.File f *os.File
done <-chan struct{}
flight *inFlight flight *inFlight
} }
func (t *tailer) Read(p []byte) (int, error) { func (t *tailer) Read(p []byte) (int, error) {
for { for {
n, err := t.f.Read(p) n, err := t.f.Read(p)
slog.Debug("tailer read", "n", n, "err", err)
if n > 0 { if n > 0 {
return n, nil return n, nil
} }
if err == io.EOF { if err == io.EOF {
select { select {
case <-t.done: case <-t.flight.done:
if t.flight.err != nil { if t.flight.err != nil {
return 0, t.flight.err return 0, t.flight.err
} }
return t.f.Read(p) // send remainiing bytes return t.f.Read(p) // send remainiing bytes
default: case <-time.After(50 * time.Millisecond):
time.Sleep(50 * time.Millisecond) slog.Debug("tailer waiting for more data")
continue continue
} }
} }