2 more tailer tests
This commit is contained in:
Vendored
+157
-5
@@ -9,6 +9,7 @@ import (
|
|||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -171,14 +172,165 @@ func TestGetStreamMultiplClient(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestDownloadWrangle(t *testing.T) {
|
func TestDownloadWrangle(t *testing.T) {
|
||||||
// Test: Upload error propagates to flight.err
|
t.Run("Download error propagates to flight.err", func(t *testing.T) {
|
||||||
// Test: Network error propagates to flight.err
|
const expected = "This is fake file contents"
|
||||||
// Test: retry works across mirror
|
svr := newTestServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
// Test: cleanup runs on failure
|
//nolint:errcheck //ephemeral no need to check
|
||||||
|
w.WriteHeader(http.StatusNotFound)
|
||||||
|
}))
|
||||||
|
|
||||||
|
c := newTestCache(t, []string{svr.URL + "/"})
|
||||||
|
relPath := "fakefile"
|
||||||
|
tmpPath := "fakefile.tmp"
|
||||||
|
flight := &inFlight{
|
||||||
|
tmpPath: tmpPath,
|
||||||
|
done: make(chan struct{}),
|
||||||
|
}
|
||||||
|
tmpFile, err := c.cr.Create(tmpPath)
|
||||||
|
require.NoError(t, err, "failed open test file")
|
||||||
|
|
||||||
|
c.downloadWrangle(relPath, flight, tmpFile)
|
||||||
|
select {
|
||||||
|
case <-flight.done:
|
||||||
|
//closed, pass
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
t.Fatal("done channel never closed")
|
||||||
|
}
|
||||||
|
assert.Error(t, flight.err, "expected err got nil, err: %v", err)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("Network error propagates to flight.err", func(t *testing.T) {
|
||||||
|
const expected = "This is fake file contents"
|
||||||
|
c := newTestCache(t, []string{"http://127.0.0.1/"})
|
||||||
|
relPath := "fakefile"
|
||||||
|
tmpPath := "fakefile.tmp"
|
||||||
|
flight := &inFlight{
|
||||||
|
tmpPath: tmpPath,
|
||||||
|
done: make(chan struct{}),
|
||||||
|
}
|
||||||
|
tmpFile, err := c.cr.Create(tmpPath)
|
||||||
|
require.NoError(t, err, "failed open test file")
|
||||||
|
|
||||||
|
c.downloadWrangle(relPath, flight, tmpFile)
|
||||||
|
select {
|
||||||
|
case <-flight.done:
|
||||||
|
//closed, pass
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
t.Fatal("done channel never closed")
|
||||||
|
}
|
||||||
|
assert.Error(t, flight.err, "expected err got none, err: %v", err)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("Retry works across mirror", func(t *testing.T) {
|
||||||
|
const expected = "This is fake file contents"
|
||||||
|
svrMiss := newTestServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
//nolint:errcheck //ephemeral no need to check
|
||||||
|
w.WriteHeader(http.StatusNotFound)
|
||||||
|
}))
|
||||||
|
|
||||||
|
svrFound := newTestServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
//nolint:errcheck //ephemeral no need to check
|
||||||
|
fmt.Fprintf(w, "%s", expected)
|
||||||
|
}))
|
||||||
|
|
||||||
|
c := newTestCache(t, []string{
|
||||||
|
svrMiss.URL + "/",
|
||||||
|
svrFound.URL + "/",
|
||||||
|
})
|
||||||
|
relPath := "fakefile"
|
||||||
|
tmpPath := "fakefile.tmp"
|
||||||
|
flight := &inFlight{
|
||||||
|
tmpPath: tmpPath,
|
||||||
|
done: make(chan struct{}),
|
||||||
|
}
|
||||||
|
tmpFile, err := c.cr.Create(tmpPath)
|
||||||
|
require.NoError(t, err, "failed open test file")
|
||||||
|
|
||||||
|
c.downloadWrangle(relPath, flight, tmpFile)
|
||||||
|
select {
|
||||||
|
case <-flight.done:
|
||||||
|
//closed, pass
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
t.Fatal("done channel never closed")
|
||||||
|
}
|
||||||
|
assert.NoError(t, flight.err, "expected no err got: %v", err)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("Cleanup runs on failure", func(t *testing.T) {
|
||||||
|
const expected = "This is fake file contents"
|
||||||
|
svr := newTestServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
//nolint:errcheck //ephemeral no need to check
|
||||||
|
w.WriteHeader(http.StatusNotFound)
|
||||||
|
}))
|
||||||
|
|
||||||
|
c := newTestCache(t, []string{svr.URL + "/"})
|
||||||
|
relPath := "fakefile"
|
||||||
|
tmpPath := "fakefile.tmp"
|
||||||
|
flight := &inFlight{
|
||||||
|
tmpPath: tmpPath,
|
||||||
|
done: make(chan struct{}),
|
||||||
|
}
|
||||||
|
tmpFile, err := c.cr.Create(tmpPath)
|
||||||
|
require.NoError(t, err, "failed open test file")
|
||||||
|
|
||||||
|
c.downloadWrangle(relPath, flight, tmpFile)
|
||||||
|
_, err = os.Stat(tmpPath)
|
||||||
|
assert.ErrorIs(t, err, os.ErrNotExist)
|
||||||
|
c.inFlightMu.Lock()
|
||||||
|
_, ok := c.inFlight[relPath]
|
||||||
|
c.inFlightMu.Unlock()
|
||||||
|
assert.False(t, ok, "expected inFlight entry to be removed")
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestTailer(t *testing.T) {
|
func TestTailer(t *testing.T) {
|
||||||
// Test: reads available bytes
|
const expected = "This is fake file contents"
|
||||||
|
const filename = "fakefile"
|
||||||
|
t.Run("Read from completed file", func(t *testing.T) {
|
||||||
|
tmpPath := filepath.Join(t.TempDir(), filename)
|
||||||
|
flight := &inFlight{
|
||||||
|
done: make(chan struct{}),
|
||||||
|
}
|
||||||
|
|
||||||
|
err := os.WriteFile(tmpPath, []byte(expected), 0660)
|
||||||
|
require.NoError(t, err)
|
||||||
|
f, err := os.Open(tmpPath)
|
||||||
|
require.NoError(t, err)
|
||||||
|
close(flight.done)
|
||||||
|
|
||||||
|
tr := &tailer{f: f, flight: flight}
|
||||||
|
data, err := io.ReadAll(tr)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, []byte(expected), data)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("Read chunks until done", func(t *testing.T) {
|
||||||
|
tmpPath := filepath.Join(t.TempDir(), filename)
|
||||||
|
flight := &inFlight{
|
||||||
|
done: make(chan struct{}),
|
||||||
|
}
|
||||||
|
|
||||||
|
wf, err := os.Create(tmpPath)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for _ = range 3 {
|
||||||
|
fmt.Fprintf(wf, "%s", expected)
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
}
|
||||||
|
wf.Sync()
|
||||||
|
wf.Close()
|
||||||
|
close(flight.done)
|
||||||
|
}()
|
||||||
|
|
||||||
|
f, err := os.Open(tmpPath)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
tr := &tailer{f: f, flight: flight}
|
||||||
|
data, err := io.ReadAll(tr)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, []byte(strings.Repeat(expected, 3)), data)
|
||||||
|
})
|
||||||
// Test: blocks until done
|
// Test: blocks until done
|
||||||
// Test: propagates flight.err
|
// Test: propagates flight.err
|
||||||
// Test: return true EOF
|
// Test: return true EOF
|
||||||
|
|||||||
Reference in New Issue
Block a user