Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion adapters/folder/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,14 @@ func (ifc *ImportFolderCmd) run(cmd *cobra.Command, args []string, app *app.Appl
const icloudMetadataExt = ".csv"

func (ifc *ImportFolderCmd) Browse(ctx context.Context) chan *assets.Group {
gOut := make(chan *assets.Group)
// ARM/low-memory fix: use a buffered channel so that file discovery
// goroutines are not blocked waiting for uploadLoop() to start.
// uploadLoop() only starts after getImmichAlbums() finishes (which
// makes one HTTP request per album and can take minutes on large
// libraries). Without a buffer, the 'Assets found: N' counter freezes
// and the process appears hung. A buffer of 512 allows discovery to
// run to completion independently of the initialization phase.
gOut := make(chan *assets.Group, 512)
go func() {
defer func() {
close(gOut)
Expand Down
16 changes: 15 additions & 1 deletion app/upload/noui.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ func (uc *UpCmd) runNoUI(ctx context.Context, app *app.Application) error {
spinner := []rune{' ', ' ', '.', ' ', ' '}
spinIdx := 0

// ARM/low-memory fix: track album-fetch phase separately so the user
// sees "Fetching albums..." instead of a frozen progress counter while
// getImmichAlbums() is making its (potentially many) HTTP requests.
var albumsFetching atomic.Bool

immichUpdate := func(value, total int) {
lock.Lock()
currImmich, maxImmich = value, total
Expand All @@ -50,7 +55,11 @@ func (uc *UpCmd) runNoUI(ctx context.Context, app *app.Application) error {
}
lock.Unlock()

return fmt.Sprintf("\rImmich read %d%%, Assets found: %d, Upload errors: %d, Uploaded %d %s", immichPct, app.FileProcessor().Logger().TotalAssets(), counts[fileevent.ErrorServerError], counts[fileevent.ProcessedUploadSuccess], string(spinner[spinIdx]))
phase := ""
if immichPct == 100 && albumsFetching.Load() {
phase = " [Fetching album details...]"
}
return fmt.Sprintf("\rImmich read %d%%%s, Assets found: %d, Upload errors: %d, Uploaded %d %s", immichPct, phase, app.FileProcessor().Logger().TotalAssets(), counts[fileevent.ErrorServerError], counts[fileevent.ProcessedUploadSuccess], string(spinner[spinIdx]))
}
uiGrp := errgroup.Group{}

Expand Down Expand Up @@ -88,6 +97,11 @@ func (uc *UpCmd) runNoUI(ctx context.Context, app *app.Application) error {
return err
})
processGrp.Go(func() error {
// Signal to the progress reporter that we are in the album-fetch
// phase so the user sees a meaningful status instead of a frozen
// counter (ARM/low-memory pipeline-stall fix).
albumsFetching.Store(true)
defer albumsFetching.Store(false)
return uc.getImmichAlbums(ctx)
})
processGrp.Go(func() error {
Expand Down
41 changes: 30 additions & 11 deletions app/upload/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,28 +175,44 @@ func (uc *UpCmd) getImmichAlbums(ctx context.Context) error {
case <-ctx.Done():
return ctx.Err()
case <-uc.immichAssetsReady:
// Wait for the server's assets to be ready.
// ARM/low-memory fix: fetch album details in parallel using the
// existing worker pool. The original sequential loop made one HTTP
// request per album and blocked the entire pipeline during that
// time. With N albums at ~100ms each, 100 albums = 10s, 1000 albums
// = 100s of apparent hang. Parallel fetching reduces this to
// roughly (N / concurrency) * latency.
var albumMu sync.Mutex
pool := worker.NewPool(uc.app.ConcurrentTask)
var wg sync.WaitGroup
for _, a := range serverAlbums {
select {
case <-ctx.Done():
pool.Stop()
wg.Wait()
return ctx.Err()
default:
}
alb := a // capture loop variable
wg.Add(1)
pool.Submit(func() {
defer wg.Done()
// Get the album info from the server, with assets.
r, err := uc.client.Immich.GetAlbumInfo(ctx, a.ID, false)
r, err := uc.client.Immich.GetAlbumInfo(ctx, alb.ID, false)
if err != nil {
uc.app.Log().Error("can't get the album info from the server", "album", a.AlbumName, "err", err)
continue
uc.app.Log().Error("can't get the album info from the server", "album", alb.AlbumName, "err", err)
return
}
ids := make([]string, 0, len(r.Assets))
for _, aa := range r.Assets {
ids = append(ids, aa.ID)
}

album := assets.NewAlbum(a.ID, a.AlbumName, a.Description)
uc.albumsCache.NewCollection(a.AlbumName, album, ids)
uc.app.Log().Info("got album from the server", "album", a.AlbumName, "assets", len(r.Assets))
uc.app.Log().Debug("got album from the server", "album", a.AlbumName, "assets", ids)
// assign the album to the assets
album := assets.NewAlbum(alb.ID, alb.AlbumName, alb.Description)
uc.app.Log().Info("got album from the server", "album", alb.AlbumName, "assets", len(r.Assets))
uc.app.Log().Debug("got album from the server", "album", alb.AlbumName, "assets", ids)
// Assign album to assets and populate cache under a lock
// because multiple goroutines write to the shared index.
albumMu.Lock()
uc.albumsCache.NewCollection(alb.AlbumName, album, ids)
for _, id := range ids {
a := uc.assetIndex.getByID(id)
if a == nil {
Expand All @@ -205,8 +221,11 @@ func (uc *UpCmd) getImmichAlbums(ctx context.Context) error {
}
a.Albums = append(a.Albums, album)
}
}
albumMu.Unlock()
})
}
wg.Wait()
pool.Stop()
}
return nil
}
Expand Down
10 changes: 7 additions & 3 deletions immich/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,16 @@ func NewImmichClient(endPoint string, key string, options ...clientOption) (*Imm

ic := ImmichClient{
endPoint: endPoint + "/api",
// ARM fix: reduce connection pool from 100 to 16. The default of 100
// allocates goroutine stacks and kernel socket buffers for connections
// that will never be used (ConcurrentTask defaults to 4 on H616).
// 16 gives headroom for retries and album/asset API calls without waste.
transport: &http.Transport{
MaxIdleConns: 100,
MaxIdleConns: 16,
IdleConnTimeout: 90 * time.Second,
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
MaxIdleConnsPerHost: 100,
MaxConnsPerHost: 100,
MaxIdleConnsPerHost: 16,
MaxConnsPerHost: 16,
Dial: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
Expand Down
21 changes: 19 additions & 2 deletions internal/fshelper/hash/sha1.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,35 @@
package hash

import (
"bufio"
"crypto/sha1"
"fmt"
"io"
"io/fs"
"sync"
)

// sha1BufPool pools 4MB read buffers to reduce allocations and GC pressure
// during concurrent SHA1 hashing. 4MB matches typical SD card page sizes
// and reduces syscall count from ~640 to ~5 per 20MB RAW file on ARM devices.
var sha1BufPool = sync.Pool{
New: func() any {
return bufio.NewReaderSize(nil, 4*1024*1024)
},
}

func GetSHA1Hash(r io.Reader) ([]byte, error) {
h := sha1.New()
if _, err := io.Copy(h, r); err != nil {
// ARM/SD-card fix: use a pooled 4MB buffered reader to batch small reads
// into large sequential I/O operations. Go's default io.Copy buffer is
// 32KB, causing ~640 read syscalls per 20MB file. With 4MB buffers this
// drops to ~5 syscalls, cutting hashing time significantly on SD card storage.
br := sha1BufPool.Get().(*bufio.Reader)
br.Reset(r)
defer sha1BufPool.Put(br)
if _, err := io.Copy(h, br); err != nil {
return nil, err
}

return h.Sum(nil), nil
}

Expand Down
25 changes: 24 additions & 1 deletion internal/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,18 @@ type Pool struct {

// NewPool creates a new Pool with a specified number of workers.
func NewPool(numWorkers int) *Pool {
// ARM fix: buffer the tasks channel so that Submit() never blocks the
// caller waiting for a free worker. An unbuffered channel causes the
// main goroutine (reading from groupChan) to stall between each task
// submission, serialising what should be pipelined work. A buffer of
// numWorkers*2 allows the caller to stay ahead of the workers without
// holding more than a small number of pending tasks in memory.
bufSize := numWorkers * 2
if bufSize < 4 {
bufSize = 4
}
pool := &Pool{
tasks: make(chan Task),
tasks: make(chan Task, bufSize),
quit: make(chan struct{}),
}

Expand Down Expand Up @@ -49,8 +59,21 @@ func (p *Pool) Submit(task Task) {
}

// Stop stops all the workers and waits for them to finish.
// It drains any buffered tasks that have not yet been picked up by a worker
// before signalling shutdown, ensuring no submitted work is silently dropped.
func (p *Pool) Stop() {
if !p.closed {
// Drain remaining buffered tasks before stopping workers.
// With a buffered channel, tasks submitted just before Stop() may
// sit in the buffer without a worker having picked them up yet.
// We must let the workers consume them before sending quit.
for len(p.tasks) > 0 {
select {
case task := <-p.tasks:
task()
default:
}
}
close(p.quit)
p.wg.Wait()
close(p.tasks)
Expand Down
12 changes: 12 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,24 @@ import (
"fmt"
"os"
"os/signal"
"runtime/debug"

"github.com/simulot/immich-go/app/root"
)

// immich-go entry point
func main() {
// ARM/1GB fix: set a soft memory limit so the Go GC does not trigger
// excessively on constrained devices. Without this, GOGC=100 (default)
// causes the GC to run every time the heap doubles, which on a 1GB
// device with 512MB available means frequent 5-20ms stop-the-world
// pauses during large uploads. 800MB leaves headroom for the OS and
// kernel while allowing the GC to batch work more efficiently.
// This can be overridden at runtime with GOMEMLIMIT env var.
if os.Getenv("GOMEMLIMIT") == "" {
debug.SetMemoryLimit(800 * 1024 * 1024) // 800 MiB
}

ctx := context.Background()
err := immichGoMain(ctx)
if err != nil {
Expand Down