-
Notifications
You must be signed in to change notification settings - Fork 2k
/
quickstart.js
127 lines (105 loc) · 3.67 KB
/
quickstart.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
// Copyright 2017 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// sample-metadata:
// title: Quickstart
// usage: node quickstart.js <PROJECT_ID> <REGION> <CLUSTER_NAME> <JOB_FILE_PATH>
// [START dataproc_quickstart]
// This quickstart sample walks a user through creating a Dataproc
// cluster, submitting a PySpark job from Google Cloud Storage to the
// cluster, reading the output of the job and deleting the cluster, all
// using the Node.js client library.
'use strict';
function main(projectId, region, clusterName, jobFilePath) {
const dataproc = require('@google-cloud/dataproc');
const {Storage} = require('@google-cloud/storage');
// Create a cluster client with the endpoint set to the desired cluster region
const clusterClient = new dataproc.v1.ClusterControllerClient({
apiEndpoint: `${region}-dataproc.googleapis.com`,
projectId: projectId,
});
// Create a job client with the endpoint set to the desired cluster region
const jobClient = new dataproc.v1.JobControllerClient({
apiEndpoint: `${region}-dataproc.googleapis.com`,
projectId: projectId,
});
async function quickstart() {
// Create the cluster config
const cluster = {
projectId: projectId,
region: region,
cluster: {
clusterName: clusterName,
config: {
masterConfig: {
numInstances: 1,
machineTypeUri: 'n1-standard-2',
},
workerConfig: {
numInstances: 2,
machineTypeUri: 'n1-standard-2',
},
},
},
};
// Create the cluster
const [operation] = await clusterClient.createCluster(cluster);
const [response] = await operation.promise();
// Output a success message
console.log(`Cluster created successfully: ${response.clusterName}`);
const job = {
projectId: projectId,
region: region,
job: {
placement: {
clusterName: clusterName,
},
pysparkJob: {
mainPythonFileUri: jobFilePath,
},
},
};
const [jobOperation] = await jobClient.submitJobAsOperation(job);
const [jobResponse] = await jobOperation.promise();
const matches =
jobResponse.driverOutputResourceUri.match('gs://(.*?)/(.*)');
const storage = new Storage();
const output = await storage
.bucket(matches[1])
.file(`${matches[2]}.000000000`)
.download();
// Output a success message.
console.log(`Job finished successfully: ${output}`);
// Delete the cluster once the job has terminated.
const deleteClusterReq = {
projectId: projectId,
region: region,
clusterName: clusterName,
};
const [deleteOperation] =
await clusterClient.deleteCluster(deleteClusterReq);
await deleteOperation.promise();
// Output a success message
console.log(`Cluster ${clusterName} successfully deleted.`);
}
quickstart();
}
const args = process.argv.slice(2);
if (args.length !== 4) {
console.log(
'Insufficient number of parameters provided. Please make sure a ' +
'PROJECT_ID, REGION, CLUSTER_NAME and JOB_FILE_PATH are provided, in this order.'
);
}
main(...args);
// [END dataproc_quickstart]