-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(storage/transfermanager): prototype #10045
Conversation
sketch of an interface for downloads
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few initial comments, overall looks like a good start
type Downloader struct { | ||
client *storage.Client | ||
config *transferManagerConfig | ||
work chan *DownloadObjectInput // Piece of work to be executed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Presumably this should be send-only and output should be receive?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are sending and receiving from both channels in different places in the downloader. Unidirectional channels could be used in subcomponents or if we were providing the channel to the user, but I don't see how we could implement this with unidirectional channels - if we only received from output, who would send us the output (and vice-versa for work)?
return crc32cHash.Sum32(), w.Close() | ||
} | ||
|
||
type testWriter struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doesn't have to be in this PR, but we should add a version of this (well, some kind of DownloaderBuffer
that implements WriterAt) to the library.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I'd have to look more into that. This is a very barebones implementation that is likely not at all efficient (and doesn't really work as a WriterAt yet).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I think that's fine for now.
} | ||
|
||
// Start workers in background. | ||
for i := 0; i < d.config.numWorkers; i++ { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Presumably we could optimize this by spinning up workers as needed when there are objects enqueued? Doesn't have to be in this PR though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, though I'm not sure how much that would optimize this by... I guess it depends on the num of workers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah something we can test out later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks really good overall. We can chat tomorrow about a few of the remaining sync/async API changes perhaps?
storage/transfermanager/doc.go
Outdated
|
||
# Example usage | ||
|
||
// Pass in any client opts or set retry policy here. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add something like https://github.com/googleapis/google-cloud-go/blob/main/storage/example_test.go instead perhaps so it can be in compiled code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
// result. | ||
// The results of downloads initiated with this method will not be provided in | ||
// the results given in Downloader.Results. | ||
func (d *Downloader) DownloadObjectWithCallback(ctx context.Context, input *DownloadObjectInput, callback func(*DownloadOutput)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the expected behavior here if someone calls this without using the Callback option when creating a downloader? Or vice versa with DownloadObject? There are some corner cases to think through here.
I'm leaning towards just having one entry point function for DownloadObject and moving the callback to a field on the DownloadObjectInput -- what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I forgot to remove the callback Option - right now it's completely unused. We could enforce it's usage (erroring in the output or here if the option does not match the call); as of now callbacks and no callbacks could be used in the same downloader (and it should work - though it's untested).
I lean towards the two separate entry points. I think the distinction in behaviour is big enough that we should make a similar distinction in our surface - I feel like having it just as a field isn't as nice an experience for people using it and encourages more mixing of using both callback and no callbacks. If we do add it as a field, I'd suggest then also adding the output to the results returned by Results(), in addition to calling the callback if available. That way the callback is more of an optional field that would get called and less of something that will cause different behaviour.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed offline, we'll enforce usage of the option and set callback as a field.
// Waits for all outstanding downloads to complete. The Downloader must not be | ||
// used to download more objects or directories after this has been called. | ||
// WaitAndClose waits for all outstanding downloads to complete. The Downloader | ||
// must not be used for any more downloads after this has been called. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clearer to say that no new downloads can be added, perhaps? Also are we enforcing this somehow?
I think we should add unit tests for this behavior (and for stuff like the callback thing I mentioned).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We aren't enforcing it, but it should panic when trying to send on the closed channels. I can change the wording (and maybe also mention that it will panic) and add a test for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed this to error instead of panicking.
func (d *Downloader) Results() *DownloadOutputIterator { | ||
return &DownloadOutputIterator{ | ||
output: d.output, | ||
// Results returns all the results of the downloads completed since the last |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we could probably enforce that this can only be called after the full job is complete (given that we are keeping WaitAndClose as a single func). Or we could even just return the slice from WaitAndClose. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking of just returning it in WaitAndClose, but hesitated given the async case - seems cleaner in that case to not return the results in WaitAndClose (since presumably it'd be empty).
We could enforce it only be called until the full job is complete by returning an error and/or an empty array if called before WaitAndClose... but then it may just be cleaner to return it in WaitAndClose. The way it is now should not cause any error if users do call it several times or before WaitAndClose, but could be confusing for some users. We could also always return the whole slice, but that has the same problems.
I think that, if we weigh both async and sync equally (and we don't want users grabbing results part way through) returning results in WaitAndClose is better, especially if we can assume people using async would be more aware of what they are doing and reading the docs, that would mention that results are empty for their case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed, I'll change this to return results directly in WaitAndClose
|
||
// Keep track of any error that occurred. | ||
if out.Err != nil { | ||
d.error(out.Err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we wrap this error to include the bucket/object name? I don't see that added in downloadShard either.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure! Technically they would have the information since it's in the output, but I see no harm including it here as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good; yeah if it's in the top-level output then you can see which individual result to inspect as well.
// errorIs is equivalent to errors.Is, except that it additionally will return | ||
// true if err and targetErr are googleapi.Errors with identical error codes, | ||
// or if both errors have the same gRPC status code. | ||
func errorIs(err error, targetErr error) bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clever idea, we should probably do something like this for other tests which force errors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean in the storage package? Yeah, I think we mostly do this type of check in the test functions themselves or don't check the exact returned err.
If we make a shared testutil we can add this to it!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Few docs notes, otherwise I think we should be able to submit this soon
d.addInput(input) | ||
} | ||
// download but is non-blocking; call Downloader.Results or use the callback to | ||
// process the result. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should explicitly note that this is intended to be thread-safe (can share the Downloader across goroutines). And add the note about timeouts/timing that you have in the example.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Note I also made WaitAndClose callable more than once (just returns the same instead of panicking).
Is this ready to merge @BrennaEpp @tritone ? |
} | ||
ctx := context.Background() | ||
|
||
tb.bucket = prefix + uidSpace.New() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should talk about adding some more objects (at least more than the thread count) and of a wider variety of sizes. Can be in a follow up PR though.
🤖 I have created a release *beep* *boop* --- ## [1.42.0](https://togithub.com/googleapis/google-cloud-go/compare/storage/v1.41.0...storage/v1.42.0) (2024-06-10) ### Features * **storage:** Add new package transfermanager. This package is intended for parallel uploads and downloads, and is in preview. It is not stable, and is likely to change. ([#10045](https://togithub.com/googleapis/google-cloud-go/issues/10045)) ([cde5cbb](https://togithub.com/googleapis/google-cloud-go/commit/cde5cbba3145d5a702683656a42158621234fe71)) * **storage:** Add bucket HierarchicalNamespace ([#10315](https://togithub.com/googleapis/google-cloud-go/issues/10315)) ([b92406c](https://togithub.com/googleapis/google-cloud-go/commit/b92406ccfadfdcee379e86d6f78c901d772401a9)), refs [#10146](https://togithub.com/googleapis/google-cloud-go/issues/10146) * **storage:** Add BucketName to BucketHandle ([#10127](https://togithub.com/googleapis/google-cloud-go/issues/10127)) ([203cc59](https://togithub.com/googleapis/google-cloud-go/commit/203cc599e5e2f2f821dc75b47c5a4c9073333f05)) ### Bug Fixes * **storage:** Set invocation headers on xml reads ([#10250](https://togithub.com/googleapis/google-cloud-go/issues/10250)) ([c87e1ab](https://togithub.com/googleapis/google-cloud-go/commit/c87e1ab6f9618b8b3f4d0005ac159abd87b0daaf)) ### Documentation * **storage:** Update autoclass doc ([#10135](https://togithub.com/googleapis/google-cloud-go/issues/10135)) ([e4b2737](https://togithub.com/googleapis/google-cloud-go/commit/e4b2737ddc16d3bf8139a6def7326ac905f62acd)) --- This PR was generated with [Release Please](https://togithub.com/googleapis/release-please). See [documentation](https://togithub.com/googleapis/release-please#release-please).
🤖 I have created a release *beep* *boop* --- ## [1.42.0](https://togithub.com/googleapis/google-cloud-go/compare/storage/v1.41.0...storage/v1.42.0) (2024-06-10) ### Features * **storage:** Add new package transfermanager. This package is intended for parallel uploads and downloads, and is in preview. It is not stable, and is likely to change. ([googleapis#10045](https://togithub.com/googleapis/google-cloud-go/issues/10045)) ([cde5cbb](https://togithub.com/googleapis/google-cloud-go/commit/cde5cbba3145d5a702683656a42158621234fe71)) * **storage:** Add bucket HierarchicalNamespace ([googleapis#10315](https://togithub.com/googleapis/google-cloud-go/issues/10315)) ([b92406c](https://togithub.com/googleapis/google-cloud-go/commit/b92406ccfadfdcee379e86d6f78c901d772401a9)), refs [googleapis#10146](https://togithub.com/googleapis/google-cloud-go/issues/10146) * **storage:** Add BucketName to BucketHandle ([googleapis#10127](https://togithub.com/googleapis/google-cloud-go/issues/10127)) ([203cc59](https://togithub.com/googleapis/google-cloud-go/commit/203cc599e5e2f2f821dc75b47c5a4c9073333f05)) ### Bug Fixes * **storage:** Set invocation headers on xml reads ([googleapis#10250](https://togithub.com/googleapis/google-cloud-go/issues/10250)) ([c87e1ab](https://togithub.com/googleapis/google-cloud-go/commit/c87e1ab6f9618b8b3f4d0005ac159abd87b0daaf)) ### Documentation * **storage:** Update autoclass doc ([googleapis#10135](https://togithub.com/googleapis/google-cloud-go/issues/10135)) ([e4b2737](https://togithub.com/googleapis/google-cloud-go/commit/e4b2737ddc16d3bf8139a6def7326ac905f62acd)) --- This PR was generated with [Release Please](https://togithub.com/googleapis/release-please). See [documentation](https://togithub.com/googleapis/release-please#release-please).
No description provided.