Skip to content

Commit

Permalink
feat: enable bytes read tracking (#1074)
Browse files Browse the repository at this point in the history
* feat: enable bytesrRead tracking on uploads

* refactor: only attach listener if flagged by user

* lint: lint

* refactor: forward progress event from resumable or simple upload to user

* lint: lint

* fix: add listener on a condition

* refactor: only register progress listener if onUploadProgress param is set

* feat: add onUploadProgress to file.save()

* lint: lint

* refactor: readability

* chore: move event handler assignment on it own line

* fix(deps): update dependency @google-cloud/common to v3
enables bytes read tracking from simple upload

* test: typo

Co-authored-by: Jonathan Lui <[email protected]>
Co-authored-by: Stephen <[email protected]>
  • Loading branch information
3 people authored May 13, 2020
1 parent 0bb1909 commit 0776a04
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 7 deletions.
8 changes: 7 additions & 1 deletion src/bucket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,8 @@ export interface UploadOptions
encryptionKey?: string | Buffer;
kmsKeyName?: string;
resumable?: boolean;
// tslint:disable-next-line:no-any
onUploadProgress?: (progressEvent: any) => void;
}

export interface MakeAllFilesPublicPrivateOptions {
Expand Down Expand Up @@ -3464,9 +3466,13 @@ class Bucket extends ServiceObject {
}

function upload() {
const writable = newFile.createWriteStream(options);
if (options.onUploadProgress) {
writable.on('progress', options.onUploadProgress);
}
fs.createReadStream(pathString)
.on('error', callback!)
.pipe(newFile.createWriteStream(options))
.pipe(writable)
.on('error', callback!)
.on('finish', () => {
callback!(null, newFile, newFile.metadata);
Expand Down
24 changes: 18 additions & 6 deletions src/file.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import * as os from 'os';
// eslint-disable-next-line @typescript-eslint/no-var-requires
const pumpify = require('pumpify');
import * as resumableUpload from 'gcs-resumable-upload';
import {Duplex, Writable, Readable} from 'stream';
import {Duplex, Writable, Readable, Transform} from 'stream';
import * as streamEvents from 'stream-events';
import * as through from 'through2';
import * as xdgBasedir from 'xdg-basedir';
Expand Down Expand Up @@ -349,7 +349,10 @@ export interface CreateReadStreamOptions {
decompress?: boolean;
}

export type SaveOptions = CreateWriteStreamOptions;
export interface SaveOptions extends CreateWriteStreamOptions {
// tslint:disable-next-line:no-any
onUploadProgress?: (progressEvent: any) => void;
}

export interface SaveCallback {
(err?: Error | null): void;
Expand Down Expand Up @@ -1736,6 +1739,10 @@ class File extends ServiceObject<File> {

const fileWriteStream = duplexify();

fileWriteStream.on('progress', evt => {
stream.emit('progress', evt);
});

const stream = streamEvents(
pumpify([
gzip ? zlib.createGzip() : through(),
Expand Down Expand Up @@ -3383,10 +3390,14 @@ class File extends ServiceObject<File> {
const options =
typeof optionsOrCallback === 'object' ? optionsOrCallback : {};

this.createWriteStream(options)
const writable = this.createWriteStream(options)
.on('error', callback!)
.on('finish', callback!)
.end(data);
.on('finish', callback!);
if (options.onUploadProgress) {
writable.on('progress', options.onUploadProgress);
}

writable.end(data);
}
setStorageClass(
storageClass: string,
Expand Down Expand Up @@ -3545,7 +3556,8 @@ class File extends ServiceObject<File> {
})
.on('finish', () => {
dup.emit('complete');
});
})
.on('progress', evt => dup.emit('progress', evt));

dup.setWritable(uploadStream);
}
Expand Down
36 changes: 36 additions & 0 deletions system-test/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ import {
DeleteNotificationCallback,
} from '../src';
import * as nock from 'nock';
import * as readline from 'readline';

interface ErrorCallbackFunction {
(err: Error | null): void;
Expand Down Expand Up @@ -2811,6 +2812,41 @@ describe('storage', () => {
});
});

describe('bucket upload with progress', () => {
it('show bytes sent with resumable upload', async () => {
const fileSize = fs.statSync(FILES.big.path).size;
let called = false;
function onUploadProgress(evt: {bytesWritten: number}) {
called = true;
assert.strictEqual(typeof evt.bytesWritten, 'number');
assert.ok(evt.bytesWritten >= 0 && evt.bytesWritten <= fileSize);
}

await bucket.upload(FILES.big.path, {
resumable: true,
onUploadProgress,
});

assert.strictEqual(called, true);
});

it('show bytes sent with simple upload', async () => {
const fileSize = fs.statSync(FILES.big.path).size;
let called = false;
function onUploadProgress(evt: {bytesWritten: number}) {
called = true;
assert.strictEqual(typeof evt.bytesWritten, 'number');
assert.ok(evt.bytesWritten >= 0 && evt.bytesWritten <= fileSize);
}
await bucket.upload(FILES.big.path, {
resumable: false,
onUploadProgress,
});

assert.strictEqual(called, true);
});
});

describe('channels', () => {
it('should create a channel', done => {
const config = {
Expand Down
84 changes: 84 additions & 0 deletions test/file.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1725,6 +1725,53 @@ describe('File', () => {
writable.write('data');
});

it('should emit progress via resumable upload', done => {
const progress = {};

resumableUploadOverride = {
upload() {
const uploadStream = new stream.PassThrough();
setImmediate(() => {
uploadStream.emit('progress', progress);
});

return uploadStream;
},
};

const writable = file.createWriteStream();

writable.on('progress', (evt: {}) => {
assert.strictEqual(evt, progress);
done();
});

writable.write('data');
});

it('should emit progress via simple upload', done => {
const progress = {};

makeWritableStreamOverride = (dup: duplexify.Duplexify) => {
const uploadStream = new stream.PassThrough();
uploadStream.on('progress', evt => dup.emit('progress', evt));

dup.setWritable(uploadStream);
setImmediate(() => {
uploadStream.emit('progress', progress);
});
};

const writable = file.createWriteStream({resumable: false});

writable.on('progress', (evt: {}) => {
assert.strictEqual(evt, progress);
done();
});

writable.write('data');
});

it('should start a simple upload if specified', done => {
const options = {
metadata: METADATA,
Expand Down Expand Up @@ -3782,6 +3829,21 @@ describe('File', () => {
file.save(DATA, assert.ifError);
});

it('should register the progress listener if onUploadProgress is passed', done => {
const onUploadProgress = util.noop;
file.createWriteStream = () => {
const writeStream = new stream.PassThrough();
setImmediate(() => {
const [listener] = writeStream.listeners('progress');
assert.strictEqual(listener, onUploadProgress);
done();
});
return writeStream;
};

file.save(DATA, {onUploadProgress}, assert.ifError);
});

it('should write the data', done => {
file.createWriteStream = () => {
const writeStream = new stream.PassThrough();
Expand Down Expand Up @@ -4080,6 +4142,28 @@ describe('File', () => {

file.startResumableUpload_(dup);
});

it('should emit progress event', done => {
const progress = {};
const dup = duplexify();
dup.on('progress', evt => {
assert.strictEqual(evt, progress);
done();
});

resumableUploadOverride = {
upload() {
const uploadStream = new stream.Transform();
setImmediate(() => {
uploadStream.emit('progress', progress);
});

return uploadStream;
},
};

file.startResumableUpload_(dup);
});
});
});

Expand Down

0 comments on commit 0776a04

Please sign in to comment.