Skip to content

Commit ae108e2

Browse files
ciriartepivotal
authored andcommitted
Allow for setting concurrency to a non-default value
[#163240667] `kiln fetch` can download releases in parallel Signed-off-by: Slawek Ligus <sligus@pivotal.io>
1 parent e45fba8 commit ae108e2

File tree

3 files changed

+53
-15
lines changed

3 files changed

+53
-15
lines changed

acceptance/help_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ Usage: kiln [options] fetch [<args>]
6161
6262
Command Arguments:
6363
--assets-file, -a string (required) path to assets file
64+
--download-threads, -dt int number of parallel threads to download parts from S3
6465
--releases-directory, -rd string (required) path to a directory to download releases into
6566
`
6667

commands/fetch.go

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,11 @@ type Fetch struct {
4040
S3Provider func(*session.Session, ...*aws.Config) s3iface.S3API
4141

4242
Options struct {
43-
AssetsFile string `short:"a" long:"assets-file" required:"true" description:"path to assets file"`
44-
VariablesFiles []string `short:"vf" long:"variables-file" description:"path to variables file"`
45-
Variables []string `short:"vr" long:"variable" description:"variable in key=value format"`
46-
ReleasesDir string `short:"rd" long:"releases-directory" required:"true" description:"path to a directory to download releases into"`
43+
AssetsFile string `short:"a" long:"assets-file" required:"true" description:"path to assets file"`
44+
VariablesFiles []string `short:"vf" long:"variables-file" description:"path to variables file"`
45+
Variables []string `short:"vr" long:"variable" description:"variable in key=value format"`
46+
ReleasesDir string `short:"rd" long:"releases-directory" required:"true" description:"path to a directory to download releases into"`
47+
DownloadThreads int `short:"dt" long:"download-threads" description:"number of parallel threads to download parts from S3"`
4748
}
4849
}
4950

@@ -135,10 +136,16 @@ type Downloader interface {
135136
Download(w io.WriterAt, input *s3.GetObjectInput, options ...func(*s3manager.Downloader)) (n int64, err error)
136137
}
137138

138-
func DownloadReleases(logger *log.Logger, assetsLock cargo.AssetsLock, bucket string, matchedS3Objects map[cargo.CompiledRelease]string, fileCreator func(string) (io.WriterAt, error), downloader Downloader) error {
139+
func DownloadReleases(logger *log.Logger, assetsLock cargo.AssetsLock, bucket string, matchedS3Objects map[cargo.CompiledRelease]string, fileCreator func(string) (io.WriterAt, error), downloader Downloader, downloadThreads int) error {
139140
releases := assetsLock.Releases
140141
stemcell := assetsLock.Stemcell
141142

143+
setConcurrency := func(dl *s3manager.Downloader) {
144+
if downloadThreads > 0 {
145+
dl.Concurrency = downloadThreads
146+
}
147+
}
148+
142149
for _, release := range releases {
143150
s3Key, ok := matchedS3Objects[cargo.CompiledRelease{
144151
Name: release.Name,
@@ -161,7 +168,7 @@ func DownloadReleases(logger *log.Logger, assetsLock cargo.AssetsLock, bucket st
161168
_, err = downloader.Download(file, &s3.GetObjectInput{
162169
Bucket: aws.String(bucket),
163170
Key: aws.String(s3Key),
164-
})
171+
}, setConcurrency)
165172

166173
if err != nil {
167174
return fmt.Errorf("failed to download file, %v\n", err)
@@ -241,7 +248,7 @@ func (f Fetch) Execute(args []string) error {
241248
}
242249

243250
downloader := s3manager.NewDownloaderWithClient(s3Client)
244-
return DownloadReleases(f.logger, assetsLock, assets.CompiledReleases.Bucket, MatchedS3Objects, fileCreator, downloader)
251+
return DownloadReleases(f.logger, assetsLock, assets.CompiledReleases.Bucket, MatchedS3Objects, fileCreator, downloader, f.Options.DownloadThreads)
245252
}
246253

247254
func (f Fetch) Usage() jhanda.Usage {

commands/fetch_test.go

Lines changed: 38 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/aws/aws-sdk-go/aws/session"
1515
"github.com/aws/aws-sdk-go/service/s3"
1616
"github.com/aws/aws-sdk-go/service/s3/s3iface"
17+
"github.com/aws/aws-sdk-go/service/s3/s3manager"
1718
. "github.com/onsi/ginkgo"
1819
. "github.com/onsi/gomega"
1920
"github.com/pivotal-cf/jhanda"
@@ -34,10 +35,7 @@ compiled_releases:
3435
regex: ^2.5/.+/(?P<release_name>[a-z-_]+)-(?P<release_version>[0-9\.]+)-(?P<stemcell_os>[a-z-_]+)-(?P<stemcell_version>[\d\.]+)\.tgz$
3536
`
3637

37-
const MinimalAssetsLockContents = `
38-
---
39-
releases: []
40-
`
38+
const MinimalAssetsLockContents = `---`
4139

4240
var _ = Describe("Fetch", func() {
4341
var (
@@ -163,6 +161,7 @@ compiled_releases:
163161
Expect(sessionArg.Config.Region).To(Equal(aws.String("north-east-1")))
164162
})
165163
})
164+
166165
Context("failure cases", func() {
167166
Context("the assets-file flag is missing", func() {
168167
It("returns a flag error", func() {
@@ -320,25 +319,56 @@ compiled_releases:
320319
})
321320

322321
It("downloads the appropriate versions of releases listed in the assets.lock", func() {
323-
err = commands.DownloadReleases(logger, assetsLock, bucket, matchedS3Objects, fileCreator, fakeDownloader)
322+
err = commands.DownloadReleases(logger, assetsLock, bucket, matchedS3Objects, fileCreator, fakeDownloader, 7)
324323
Expect(err).NotTo(HaveOccurred())
325324
Expect(fakeDownloader.DownloadCallCount()).To(Equal(2))
326325

327-
w1, input1, _ := fakeDownloader.DownloadArgsForCall(0)
326+
w1, input1, opts := fakeDownloader.DownloadArgsForCall(0)
328327
Expect(w1).To(Equal(fakeUAAFile))
329328
Expect(input1).To(Equal(uaaInput))
329+
Expect(opts).To(HaveLen(1))
330+
331+
downloader := &s3manager.Downloader{
332+
Concurrency: s3manager.DefaultDownloadConcurrency,
333+
}
334+
335+
opts[0](downloader)
330336

331-
w2, input2, _ := fakeDownloader.DownloadArgsForCall(1)
337+
Expect(downloader.Concurrency).To(Equal(7))
338+
339+
w2, input2, opts := fakeDownloader.DownloadArgsForCall(1)
332340
Expect(w2).To(Equal(fakeBPMFile))
333341
Expect(input2).To(Equal(bpmInput))
342+
Expect(opts).To(HaveLen(1))
343+
})
344+
345+
Context("when number of threads is not specified", func() {
346+
It("uses the s3manager package's default download concurrency", func() {
347+
err = commands.DownloadReleases(logger, assetsLock, bucket, matchedS3Objects, fileCreator, fakeDownloader, 0)
348+
Expect(err).NotTo(HaveOccurred())
349+
Expect(fakeDownloader.DownloadCallCount()).To(Equal(2))
350+
351+
w1, input1, opts := fakeDownloader.DownloadArgsForCall(0)
352+
Expect(w1).To(Equal(fakeUAAFile))
353+
Expect(input1).To(Equal(uaaInput))
354+
Expect(opts).To(HaveLen(1))
355+
356+
downloader := &s3manager.Downloader{
357+
Concurrency: s3manager.DefaultDownloadConcurrency,
358+
}
359+
360+
opts[0](downloader)
361+
362+
Expect(downloader.Concurrency).To(Equal(s3manager.DefaultDownloadConcurrency))
363+
})
334364
})
335365

336366
It("returns an error if the release does not exist", func() {
337367
assetsLock.Releases = []cargo.Release{
338368
{Name: "not-real", Version: "1.2.3"},
339369
}
340370

341-
err = commands.DownloadReleases(logger, assetsLock, bucket, matchedS3Objects, fileCreator, fakeDownloader)
371+
err = commands.DownloadReleases(logger, assetsLock, bucket, matchedS3Objects, fileCreator, fakeDownloader, 0)
342372
Expect(err).To(HaveOccurred())
343373
Expect(err).To(MatchError("Compiled release: not-real, version: 1.2.3, stemcell OS: ubuntu-trusty, stemcell version: 1234, not found"))
344374
})

0 commit comments

Comments
 (0)