サンプルコードの説明
クライアントの設定が完了したら、Bulk API を使用するクライアントアプリケーションの構築を開始できます。次のサンプルを使用して、クライアントアプリケーションを作成します。各セクションで、コードの特定部分を順に説明していきます。詳細なサンプルは最後に記載します。
次のコードは、WSC ツールキット内のパッケージとクラスを設定し、さらに Partner WSDL から生成されたコードを設定します。
1
2import java.io.*;
3import java.util.*;
4
5import com.sforce.async.*;
6import com.sforce.soap.partner.PartnerConnection;
7import com.sforce.ws.ConnectionException;
8import com.sforce.ws.ConnectorConfig;
9main() メソッドの設定
次のコードは、クラスの main() メソッドを設定します。main() メソッドは、サンプルの処理ロジックを含む runSample() メソッドを呼び出します。runSample() で呼び出されるメソッドについては、後続のセクションで取り上げます。
1
2 public static void main(String[] args)
3 throws AsyncApiException, ConnectionException, IOException {
4 BulkExample example = new BulkExample();
5 // Replace arguments below with your credentials and test file name
6 // The first parameter indicates that we are loading Account records
7 example.runSample("Account", "myUser@myOrg.com", "myPassword", "mySampleData.csv");
8 }
9
10 /**
11 * Creates a Bulk API job and uploads batches for a CSV file.
12 */
13 public void runSample(String sobjectType, String userName,
14 String password, String sampleFileName)
15 throws AsyncApiException, ConnectionException, IOException {
16 BulkConnection connection = getBulkConnection(userName, password);
17 JobInfo job = createJob(sobjectType, connection);
18 List<BatchInfo> batchInfoList = createBatchesFromCSVFile(connection, job,
19 sampleFileName);
20 closeJob(connection, job.getId());
21 awaitCompletion(connection, job, batchInfoList);
22 checkResults(connection, job, batchInfoList);
23 }
24ログインと BulkConnection の設定
次のコードは、パートナー接続 (PartnerConnection) を使用してログインし、セッションを再利用して Bulk API 接続 (BulkConnection) を作成します。
1
2 /**
3 * Create the BulkConnection used to call Bulk API operations.
4 */
5 private BulkConnection getBulkConnection(String userName, String password)
6 throws ConnectionException, AsyncApiException {
7 ConnectorConfig partnerConfig = new ConnectorConfig();
8 partnerConfig.setUsername(userName);
9 partnerConfig.setPassword(password);
10 partnerConfig.setAuthEndpoint("https://login.salesforce.com/services/Soap/u/35.0");
11 // Creating the connection automatically handles login and stores
12 // the session in partnerConfig
13 new PartnerConnection(partnerConfig);
14 // When PartnerConnection is instantiated, a login is implicitly
15 // executed and, if successful,
16 // a valid session is stored in the ConnectorConfig instance.
17 // Use this key to initialize a BulkConnection:
18 ConnectorConfig config = new ConnectorConfig();
19 config.setSessionId(partnerConfig.getSessionId());
20 // The endpoint for the Bulk API service is the same as for the normal
21 // SOAP uri until the /Soap/ part. From here it's '/async/versionNumber'
22 String soapEndpoint = partnerConfig.getServiceEndpoint();
23 String apiVersion = "35.0";
24 String restEndpoint = soapEndpoint.substring(0, soapEndpoint.indexOf("Soap/"))
25 + "async/" + apiVersion;
26 config.setRestEndpoint(restEndpoint);
27 // This should only be false when doing debugging.
28 config.setCompression(true);
29 // Set this to true to see HTTP requests and responses on stdout
30 config.setTraceMessage(false);
31 BulkConnection connection = new BulkConnection(config);
32 return connection;
33 }
34この BulkConnection インスタンスは、Bulk API を使用するための基盤であり、アプリケーションのライフサイクルにわたって繰り返し再利用できます。
ジョブの新規作成
接続が作成できたら、新しいジョブを作成します。データは常にジョブ単位で処理されます。ジョブは、処理するデータの詳細、つまり実行する処理の種類 (挿入、更新、更新/挿入、削除) やオブジェクト種別を指定します。次のコードは、Account オブジェクトを対象とした挿入処理のジョブを新規作成します。
1
2 /**
3 * Create a new job using the Bulk API.
4 *
5 * @param sobjectType
6 * The object type being loaded, such as "Account"
7 * @param connection
8 * BulkConnection used to create the new job.
9 * @return The JobInfo for the new job.
10 * @throws AsyncApiException
11 */
12 private JobInfo createJob(String sobjectType, BulkConnection connection)
13 throws AsyncApiException {
14 JobInfo job = new JobInfo();
15 job.setObject(sobjectType);
16 job.setOperation(OperationEnum.insert);
17 job.setContentType(ContentType.CSV);
18 job = connection.createJob(job);
19 System.out.println(job);
20 return job;
21 }
22作成したばかりのジョブの状態は、Open になります。ジョブがこの状態にある間は、新しいバッチをジョブに追加できます。ジョブの状態が Closed になると、それ以上バッチを追加することはできません。
ジョブへのバッチの追加
データは一連のバッチ要求を通じて処理されます。各要求は、リクエストボディに XML 形式のデータセットを含む HTTP POST です。「Bulk API の制限」で説明されているバッチサイズと、1 日あたりの処理バッチ数の上限を超過しない限り、データセット全体をどの程度に分割して処理するかは、クライアントアプリケーション側で決定できます。
各バッチの処理にはオーバーヘッドが伴います。オーバーヘッドの処理コストを最小限に抑えられるよう、バッチを処理と転送に適したサイズに調整する必要があります。レコード数 1,000 ~ 10,000 件の範囲が、適切なバッチサイズとみなされます。
次のコードは、CSV ファイルを小さいバッチファイルに分割して、Salesforce にアップロードします。
1
2 /**
3 * Create and upload batches using a CSV file.
4 * The file into the appropriate size batch files.
5 *
6 * @param connection
7 * Connection to use for creating batches
8 * @param jobInfo
9 * Job associated with new batches
10 * @param csvFileName
11 * The source file for batch data
12 */
13 private List<BatchInfo> createBatchesFromCSVFile(BulkConnection connection,
14 JobInfo jobInfo, String csvFileName)
15 throws IOException, AsyncApiException {
16 List<BatchInfo> batchInfos = new ArrayList<BatchInfo>();
17 BufferedReader rdr = new BufferedReader(
18 new InputStreamReader(new FileInputStream(csvFileName))
19 );
20 // read the CSV header row
21 byte[] headerBytes = (rdr.readLine() + "\n").getBytes("UTF-8");
22 int headerBytesLength = headerBytes.length;
23 File tmpFile = File.createTempFile("bulkAPIInsert", ".csv");
24
25 // Split the CSV file into multiple batches
26 try {
27 FileOutputStream tmpOut = new FileOutputStream(tmpFile);
28 int maxBytesPerBatch = 10000000; // 10 million bytes per batch
29 int maxRowsPerBatch = 10000; // 10 thousand rows per batch
30 int currentBytes = 0;
31 int currentLines = 0;
32 String nextLine;
33 while ((nextLine = rdr.readLine()) != null) {
34 byte[] bytes = (nextLine + "\n").getBytes("UTF-8");
35 // Create a new batch when our batch size limit is reached
36 if (currentBytes + bytes.length > maxBytesPerBatch
37 || currentLines > maxRowsPerBatch) {
38 createBatch(tmpOut, tmpFile, batchInfos, connection, jobInfo);
39 currentBytes = 0;
40 currentLines = 0;
41 }
42 if (currentBytes == 0) {
43 tmpOut = new FileOutputStream(tmpFile);
44 tmpOut.write(headerBytes);
45 currentBytes = headerBytesLength;
46 currentLines = 1;
47 }
48 tmpOut.write(bytes);
49 currentBytes += bytes.length;
50 currentLines++;
51 }
52 // Finished processing all rows
53 // Create a final batch for any remaining data
54 if (currentLines > 1) {
55 createBatch(tmpOut, tmpFile, batchInfos, connection, jobInfo);
56 }
57 } finally {
58 tmpFile.delete();
59 }
60 return batchInfos;
61 }
62
63 /**
64 * Create a batch by uploading the contents of the file.
65 * This closes the output stream.
66 *
67 * @param tmpOut
68 * The output stream used to write the CSV data for a single batch.
69 * @param tmpFile
70 * The file associated with the above stream.
71 * @param batchInfos
72 * The batch info for the newly created batch is added to this list.
73 * @param connection
74 * The BulkConnection used to create the new batch.
75 * @param jobInfo
76 * The JobInfo associated with the new batch.
77 */
78 private void createBatch(FileOutputStream tmpOut, File tmpFile,
79 List<BatchInfo> batchInfos, BulkConnection connection, JobInfo jobInfo)
80 throws IOException, AsyncApiException {
81 tmpOut.flush();
82 tmpOut.close();
83 FileInputStream tmpInputStream = new FileInputStream(tmpFile);
84 try {
85 BatchInfo batchInfo =
86 connection.createBatchFromStream(jobInfo, tmpInputStream);
87 System.out.println(batchInfo);
88 batchInfos.add(batchInfo);
89
90 } finally {
91 tmpInputStream.close();
92 }
93 }
94サーバは、受け取ったバッチをただちに処理待ちのキューに格納します。バッチの送信時には、形式にエラーがあっても報告はされません。こうしたエラーはバッチの処理が完了した後、結果データとして報告されます。
ジョブの終了
すべてのバッチをジョブに追加したら、ジョブを終了します。ジョブを終了すると、すべてのバッチの処理が確実に完了します。
1
2 private void closeJob(BulkConnection connection, String jobId)
3 throws AsyncApiException {
4 JobInfo job = new JobInfo();
5 job.setId(jobId);
6 job.setState(JobStateEnum.Closed);
7 connection.updateJob(job);
8 }
9バッチの状況の確認
バッチはバックグラウンドで処理されます。バッチが完了するまでの処理時間は、データセットのサイズによって異なります。処理の実行中に、すべてのバッチの状況を取得して、バッチが完了しているかどうかを確認することができます。
1
2 /**
3 * Wait for a job to complete by polling the Bulk API.
4 *
5 * @param connection
6 * BulkConnection used to check results.
7 * @param job
8 * The job awaiting completion.
9 * @param batchInfoList
10 * List of batches for this job.
11 * @throws AsyncApiException
12 */
13 private void awaitCompletion(BulkConnection connection, JobInfo job,
14 List<BatchInfo> batchInfoList)
15 throws AsyncApiException {
16 long sleepTime = 0L;
17 Set<String> incomplete = new HashSet<String>();
18 for (BatchInfo bi : batchInfoList) {
19 incomplete.add(bi.getId());
20 }
21 while (!incomplete.isEmpty()) {
22 try {
23 Thread.sleep(sleepTime);
24 } catch (InterruptedException e) {}
25 System.out.println("Awaiting results..." + incomplete.size());
26 sleepTime = 10000L;
27 BatchInfo[] statusList =
28 connection.getBatchInfoList(job.getId()).getBatchInfo();
29 for (BatchInfo b : statusList) {
30 if (b.getState() == BatchStateEnum.Completed
31 || b.getState() == BatchStateEnum.Failed) {
32 if (incomplete.remove(b.getId())) {
33 System.out.println("BATCH STATUS:\n" + b);
34 }
35 }
36 }
37 }
38 }
39バッチは、状況が Failed か Completed のいずれかになった場合に終了となります。このコードでは、ジョブのすべてのバッチが終了するまでループ処理を実行します。
ジョブの結果の取得
すべてのバッチの処理が完了したら、各バッチの結果を取得できます。バッチが成功した場合も、失敗した場合も、またジョブが途中で中止された場合も、必ず結果を取得してください。結果セットを取得しないと、個々のレコードの状況を確認できません。各レコードの結果を正しく取得するには、バッチと対応する元のデータセットをコード内で正確に追跡する必要がありまが、そのためには、バッチの作成時のリストを保持し、結果の取得に使用するようにします。次のコードはそのための処理を記述しています。
1
2 /**
3 * Gets the results of the operation and checks for errors.
4 */
5 private void checkResults(BulkConnection connection, JobInfo job,
6 List<BatchInfo> batchInfoList)
7 throws AsyncApiException, IOException {
8 // batchInfoList was populated when batches were created and submitted
9 for (BatchInfo b : batchInfoList) {
10 CSVReader rdr =
11 new CSVReader(connection.getBatchResultStream(job.getId(), b.getId()));
12 List<String> resultHeader = rdr.nextRecord();
13 int resultCols = resultHeader.size();
14
15 List<String> row;
16 while ((row = rdr.nextRecord()) != null) {
17 Map<String, String> resultInfo = new HashMap<String, String>();
18 for (int i = 0; i < resultCols; i++) {
19 resultInfo.put(resultHeader.get(i), row.get(i));
20 }
21 boolean success = Boolean.valueOf(resultInfo.get("Success"));
22 boolean created = Boolean.valueOf(resultInfo.get("Created"));
23 String id = resultInfo.get("Id");
24 String error = resultInfo.get("Error");
25 if (success && created) {
26 System.out.println("Created row with id " + id);
27 } else if (!success) {
28 System.out.println("Failed with error: " + error);
29 }
30 }
31 }
32 }
33このコードは、各レコードの結果を取得し、処理が成功したか失敗したかを報告します。レコードでエラーが発生した場合にはエラーを出力します。
クイックスタートサンプルの完全版
ジョブやバッチについての理解が深まったでしょうか。クイックスタートサンプルのコード全体を次に示します。コピーして活用してください。
1
2import java.io.*;
3import java.util.*;
4
5import com.sforce.async.*;
6import com.sforce.soap.partner.PartnerConnection;
7import com.sforce.ws.ConnectionException;
8import com.sforce.ws.ConnectorConfig;
9
10
11public class BulkExample {
12
13
14 public static void main(String[] args)
15 throws AsyncApiException, ConnectionException, IOException {
16 BulkExample example = new BulkExample();
17 // Replace arguments below with your credentials and test file name
18 // The first parameter indicates that we are loading Account records
19 example.runSample("Account", "myUser@myOrg.com", "myPassword", "mySampleData.csv");
20 }
21
22 /**
23 * Creates a Bulk API job and uploads batches for a CSV file.
24 */
25 public void runSample(String sobjectType, String userName,
26 String password, String sampleFileName)
27 throws AsyncApiException, ConnectionException, IOException {
28 BulkConnection connection = getBulkConnection(userName, password);
29 JobInfo job = createJob(sobjectType, connection);
30 List<BatchInfo> batchInfoList = createBatchesFromCSVFile(connection, job,
31 sampleFileName);
32 closeJob(connection, job.getId());
33 awaitCompletion(connection, job, batchInfoList);
34 checkResults(connection, job, batchInfoList);
35 }
36
37
38
39 /**
40 * Gets the results of the operation and checks for errors.
41 */
42 private void checkResults(BulkConnection connection, JobInfo job,
43 List<BatchInfo> batchInfoList)
44 throws AsyncApiException, IOException {
45 // batchInfoList was populated when batches were created and submitted
46 for (BatchInfo b : batchInfoList) {
47 CSVReader rdr =
48 new CSVReader(connection.getBatchResultStream(job.getId(), b.getId()));
49 List<String> resultHeader = rdr.nextRecord();
50 int resultCols = resultHeader.size();
51
52 List<String> row;
53 while ((row = rdr.nextRecord()) != null) {
54 Map<String, String> resultInfo = new HashMap<String, String>();
55 for (int i = 0; i < resultCols; i++) {
56 resultInfo.put(resultHeader.get(i), row.get(i));
57 }
58 boolean success = Boolean.valueOf(resultInfo.get("Success"));
59 boolean created = Boolean.valueOf(resultInfo.get("Created"));
60 String id = resultInfo.get("Id");
61 String error = resultInfo.get("Error");
62 if (success && created) {
63 System.out.println("Created row with id " + id);
64 } else if (!success) {
65 System.out.println("Failed with error: " + error);
66 }
67 }
68 }
69 }
70
71
72
73 private void closeJob(BulkConnection connection, String jobId)
74 throws AsyncApiException {
75 JobInfo job = new JobInfo();
76 job.setId(jobId);
77 job.setState(JobStateEnum.Closed);
78 connection.updateJob(job);
79 }
80
81
82
83 /**
84 * Wait for a job to complete by polling the Bulk API.
85 *
86 * @param connection
87 * BulkConnection used to check results.
88 * @param job
89 * The job awaiting completion.
90 * @param batchInfoList
91 * List of batches for this job.
92 * @throws AsyncApiException
93 */
94 private void awaitCompletion(BulkConnection connection, JobInfo job,
95 List<BatchInfo> batchInfoList)
96 throws AsyncApiException {
97 long sleepTime = 0L;
98 Set<String> incomplete = new HashSet<String>();
99 for (BatchInfo bi : batchInfoList) {
100 incomplete.add(bi.getId());
101 }
102 while (!incomplete.isEmpty()) {
103 try {
104 Thread.sleep(sleepTime);
105 } catch (InterruptedException e) {}
106 System.out.println("Awaiting results..." + incomplete.size());
107 sleepTime = 10000L;
108 BatchInfo[] statusList =
109 connection.getBatchInfoList(job.getId()).getBatchInfo();
110 for (BatchInfo b : statusList) {
111 if (b.getState() == BatchStateEnum.Completed
112 || b.getState() == BatchStateEnum.Failed) {
113 if (incomplete.remove(b.getId())) {
114 System.out.println("BATCH STATUS:\n" + b);
115 }
116 }
117 }
118 }
119 }
120
121
122
123 /**
124 * Create a new job using the Bulk API.
125 *
126 * @param sobjectType
127 * The object type being loaded, such as "Account"
128 * @param connection
129 * BulkConnection used to create the new job.
130 * @return The JobInfo for the new job.
131 * @throws AsyncApiException
132 */
133 private JobInfo createJob(String sobjectType, BulkConnection connection)
134 throws AsyncApiException {
135 JobInfo job = new JobInfo();
136 job.setObject(sobjectType);
137 job.setOperation(OperationEnum.insert);
138 job.setContentType(ContentType.CSV);
139 job = connection.createJob(job);
140 System.out.println(job);
141 return job;
142 }
143
144
145
146 /**
147 * Create the BulkConnection used to call Bulk API operations.
148 */
149 private BulkConnection getBulkConnection(String userName, String password)
150 throws ConnectionException, AsyncApiException {
151 ConnectorConfig partnerConfig = new ConnectorConfig();
152 partnerConfig.setUsername(userName);
153 partnerConfig.setPassword(password);
154 partnerConfig.setAuthEndpoint("https://login.salesforce.com/services/Soap/u/35.0");
155 // Creating the connection automatically handles login and stores
156 // the session in partnerConfig
157 new PartnerConnection(partnerConfig);
158 // When PartnerConnection is instantiated, a login is implicitly
159 // executed and, if successful,
160 // a valid session is stored in the ConnectorConfig instance.
161 // Use this key to initialize a BulkConnection:
162 ConnectorConfig config = new ConnectorConfig();
163 config.setSessionId(partnerConfig.getSessionId());
164 // The endpoint for the Bulk API service is the same as for the normal
165 // SOAP uri until the /Soap/ part. From here it's '/async/versionNumber'
166 String soapEndpoint = partnerConfig.getServiceEndpoint();
167 String apiVersion = "35.0";
168 String restEndpoint = soapEndpoint.substring(0, soapEndpoint.indexOf("Soap/"))
169 + "async/" + apiVersion;
170 config.setRestEndpoint(restEndpoint);
171 // This should only be false when doing debugging.
172 config.setCompression(true);
173 // Set this to true to see HTTP requests and responses on stdout
174 config.setTraceMessage(false);
175 BulkConnection connection = new BulkConnection(config);
176 return connection;
177 }
178
179
180
181 /**
182 * Create and upload batches using a CSV file.
183 * The file into the appropriate size batch files.
184 *
185 * @param connection
186 * Connection to use for creating batches
187 * @param jobInfo
188 * Job associated with new batches
189 * @param csvFileName
190 * The source file for batch data
191 */
192 private List<BatchInfo> createBatchesFromCSVFile(BulkConnection connection,
193 JobInfo jobInfo, String csvFileName)
194 throws IOException, AsyncApiException {
195 List<BatchInfo> batchInfos = new ArrayList<BatchInfo>();
196 BufferedReader rdr = new BufferedReader(
197 new InputStreamReader(new FileInputStream(csvFileName))
198 );
199 // read the CSV header row
200 byte[] headerBytes = (rdr.readLine() + "\n").getBytes("UTF-8");
201 int headerBytesLength = headerBytes.length;
202 File tmpFile = File.createTempFile("bulkAPIInsert", ".csv");
203
204 // Split the CSV file into multiple batches
205 try {
206 FileOutputStream tmpOut = new FileOutputStream(tmpFile);
207 int maxBytesPerBatch = 10000000; // 10 million bytes per batch
208 int maxRowsPerBatch = 10000; // 10 thousand rows per batch
209 int currentBytes = 0;
210 int currentLines = 0;
211 String nextLine;
212 while ((nextLine = rdr.readLine()) != null) {
213 byte[] bytes = (nextLine + "\n").getBytes("UTF-8");
214 // Create a new batch when our batch size limit is reached
215 if (currentBytes + bytes.length > maxBytesPerBatch
216 || currentLines > maxRowsPerBatch) {
217 createBatch(tmpOut, tmpFile, batchInfos, connection, jobInfo);
218 currentBytes = 0;
219 currentLines = 0;
220 }
221 if (currentBytes == 0) {
222 tmpOut = new FileOutputStream(tmpFile);
223 tmpOut.write(headerBytes);
224 currentBytes = headerBytesLength;
225 currentLines = 1;
226 }
227 tmpOut.write(bytes);
228 currentBytes += bytes.length;
229 currentLines++;
230 }
231 // Finished processing all rows
232 // Create a final batch for any remaining data
233 if (currentLines > 1) {
234 createBatch(tmpOut, tmpFile, batchInfos, connection, jobInfo);
235 }
236 } finally {
237 tmpFile.delete();
238 }
239 return batchInfos;
240 }
241
242 /**
243 * Create a batch by uploading the contents of the file.
244 * This closes the output stream.
245 *
246 * @param tmpOut
247 * The output stream used to write the CSV data for a single batch.
248 * @param tmpFile
249 * The file associated with the above stream.
250 * @param batchInfos
251 * The batch info for the newly created batch is added to this list.
252 * @param connection
253 * The BulkConnection used to create the new batch.
254 * @param jobInfo
255 * The JobInfo associated with the new batch.
256 */
257 private void createBatch(FileOutputStream tmpOut, File tmpFile,
258 List<BatchInfo> batchInfos, BulkConnection connection, JobInfo jobInfo)
259 throws IOException, AsyncApiException {
260 tmpOut.flush();
261 tmpOut.close();
262 FileInputStream tmpInputStream = new FileInputStream(tmpFile);
263 try {
264 BatchInfo batchInfo =
265 connection.createBatchFromStream(jobInfo, tmpInputStream);
266 System.out.println(batchInfo);
267 batchInfos.add(batchInfo);
268
269 } finally {
270 tmpInputStream.close();
271 }
272 }
273
274
275}