サンプルコードの説明
次のコードは、WSC ツールキット内のパッケージとクラスを設定し、さらに Partner WSDL から生成されたコードを設定します。
1import java.io.*;
2import java.util.*;
3
4import com.sforce.async.*;
5import com.sforce.soap.partner.PartnerConnection;
6import com.sforce.ws.ConnectionException;
7import com.sforce.ws.ConnectorConfig;main() メソッドの設定
次のコードは、クラスの main() メソッドを設定します。main() メソッドは、サンプルの処理ロジックを含む runSample() メソッドを呼び出します。runSample() で呼び出されるメソッドについては、後続のセクションで取り上げます。
1public static void main(String[] args)
2 throws AsyncApiException, ConnectionException, IOException {
3 BulkExample example = new BulkExample();
4 // Replace arguments below with your credentials and test file name
5 // The first parameter indicates that we are loading Account records
6 example.runSample("Account", "myUser@myOrg.com", "myPassword", "mySampleData.csv");
7 }
8
9 /**
10 * Creates a Bulk API job and uploads batches for a CSV file.
11 */
12 public void runSample(String sobjectType, String userName,
13 String password, String sampleFileName)
14 throws AsyncApiException, ConnectionException, IOException {
15 BulkConnection connection = getBulkConnection(userName, password);
16 JobInfo job = createJob(sobjectType, connection);
17 List<BatchInfo> batchInfoList = createBatchesFromCSVFile(connection, job,
18 sampleFileName);
19 closeJob(connection, job.getId());
20 awaitCompletion(connection, job, batchInfoList);
21 checkResults(connection, job, batchInfoList);
22 }ログインと BulkConnection の設定
次のコードは、パートナー接続 (PartnerConnection) を使用してログインし、セッションを再利用して Bulk API 接続 (BulkConnection) を作成します。
1/**
2 * Create the BulkConnection used to call Bulk API operations.
3 */
4 private BulkConnection getBulkConnection(String userName, String password)
5 throws ConnectionException, AsyncApiException {
6 ConnectorConfig partnerConfig = new ConnectorConfig();
7 partnerConfig.setUsername(userName);
8 partnerConfig.setPassword(password);
9 partnerConfig.setAuthEndpoint("https://login.salesforce.com/services/Soap/u/57.0");
10 // Creating the connection automatically handles login and stores
11 // the session in partnerConfig
12 new PartnerConnection(partnerConfig);
13 // When PartnerConnection is instantiated, a login is implicitly
14 // executed and, if successful,
15 // a valid session is stored in the ConnectorConfig instance.
16 // Use this key to initialize a BulkConnection:
17 ConnectorConfig config = new ConnectorConfig();
18 config.setSessionId(partnerConfig.getSessionId());
19 // The endpoint for the Bulk API service is the same as for the normal
20 // SOAP uri until the /Soap/ part. From here it's '/async/versionNumber'
21 String soapEndpoint = partnerConfig.getServiceEndpoint();
22 String apiVersion = "57.0";
23 String restEndpoint = soapEndpoint.substring(0, soapEndpoint.indexOf("Soap/"))
24 + "async/" + apiVersion;
25 config.setRestEndpoint(restEndpoint);
26 // This should only be false when doing debugging.
27 config.setCompression(true);
28 // Set this to true to see HTTP requests and responses on stdout
29 config.setTraceMessage(false);
30 BulkConnection connection = new BulkConnection(config);
31 return connection;
32 }この BulkConnection インスタンスは、Bulk API を使用するための基盤であり、アプリケーションのライフサイクルにわたって繰り返し再利用できます。
ジョブの作成
接続が作成できたら、ジョブを作成します。データは常にジョブ単位で処理されます。ジョブは、処理するデータの詳細、つまり実行する処理の種類 (挿入、更新、更新/挿入、削除) やオブジェクト種別を指定します。次のコードは、Account オブジェクトを対象とした挿入処理のジョブを新規作成します。
1/**
2 * Create a new job using the Bulk API.
3 *
4 * @param sobjectType
5 * The object type being loaded, such as "Account"
6 * @param connection
7 * BulkConnection used to create the new job.
8 * @return The JobInfo for the new job.
9 * @throws AsyncApiException
10 */
11 private JobInfo createJob(String sobjectType, BulkConnection connection)
12 throws AsyncApiException {
13 JobInfo job = new JobInfo();
14 job.setObject(sobjectType);
15 job.setOperation(OperationEnum.insert);
16 job.setContentType(ContentType.CSV);
17 job = connection.createJob(job);
18 System.out.println(job);
19 return job;
20 }作成したばかりのジョブの状態は、Open になります。ジョブがこの状態にある間は、新しいバッチをジョブに追加できます。ジョブの状態が Closed になると、それ以上バッチを追加することはできません。
ジョブへのバッチの追加
データは一連のバッチ要求を通じて処理されます。各要求は、リクエストボディに XML 形式のデータセットを含む HTTP POST です。「制限」で説明されているバッチサイズと、1 日あたりの処理バッチ数の上限を超過しない限り、データセット全体をどの程度に分割して処理するかは、クライアントアプリケーション側で決定できます。
各バッチの処理にはオーバーヘッドが伴います。バッチは、オーバーヘッドの処理コストが最小限に抑えられ、処理と転送に適したサイズになるように調整します。レコード数 1,000 ~ 10,000 件の範囲が、適切なバッチサイズとみなされます。
次のコードは、CSV ファイルを小さいバッチファイルに分割して、Salesforce にアップロードします。
1/**
2 * Create and upload batches using a CSV file.
3 * The file into the appropriate size batch files.
4 *
5 * @param connection
6 * Connection to use for creating batches
7 * @param jobInfo
8 * Job associated with new batches
9 * @param csvFileName
10 * The source file for batch data
11 */
12 private List<BatchInfo> createBatchesFromCSVFile(BulkConnection connection,
13 JobInfo jobInfo, String csvFileName)
14 throws IOException, AsyncApiException {
15 List<BatchInfo> batchInfos = new ArrayList<BatchInfo>();
16 BufferedReader rdr = new BufferedReader(
17 new InputStreamReader(new FileInputStream(csvFileName))
18 );
19 // read the CSV header row
20 byte[] headerBytes = (rdr.readLine() + "\n").getBytes("UTF-8");
21 int headerBytesLength = headerBytes.length;
22 File tmpFile = File.createTempFile("bulkAPIInsert", ".csv");
23
24 // Split the CSV file into multiple batches
25 try {
26 FileOutputStream tmpOut = new FileOutputStream(tmpFile);
27 int maxBytesPerBatch = 10000000; // 10 million bytes per batch
28 int maxRowsPerBatch = 10000; // 10 thousand rows per batch
29 int currentBytes = 0;
30 int currentLines = 0;
31 String nextLine;
32 while ((nextLine = rdr.readLine()) != null) {
33 byte[] bytes = (nextLine + "\n").getBytes("UTF-8");
34 // Create a new batch when our batch size limit is reached
35 if (currentBytes + bytes.length > maxBytesPerBatch
36 || currentLines > maxRowsPerBatch) {
37 createBatch(tmpOut, tmpFile, batchInfos, connection, jobInfo);
38 currentBytes = 0;
39 currentLines = 0;
40 }
41 if (currentBytes == 0) {
42 tmpOut = new FileOutputStream(tmpFile);
43 tmpOut.write(headerBytes);
44 currentBytes = headerBytesLength;
45 currentLines = 1;
46 }
47 tmpOut.write(bytes);
48 currentBytes += bytes.length;
49 currentLines++;
50 }
51 // Finished processing all rows
52 // Create a final batch for any remaining data
53 if (currentLines > 1) {
54 createBatch(tmpOut, tmpFile, batchInfos, connection, jobInfo);
55 }
56 } finally {
57 tmpFile.delete();
58 }
59 return batchInfos;
60 }
61
62 /**
63 * Create a batch by uploading the contents of the file.
64 * This closes the output stream.
65 *
66 * @param tmpOut
67 * The output stream used to write the CSV data for a single batch.
68 * @param tmpFile
69 * The file associated with the above stream.
70 * @param batchInfos
71 * The batch info for the newly created batch is added to this list.
72 * @param connection
73 * The BulkConnection used to create the new batch.
74 * @param jobInfo
75 * The JobInfo associated with the new batch.
76 */
77 private void createBatch(FileOutputStream tmpOut, File tmpFile,
78 List<BatchInfo> batchInfos, BulkConnection connection, JobInfo jobInfo)
79 throws IOException, AsyncApiException {
80 tmpOut.flush();
81 tmpOut.close();
82 FileInputStream tmpInputStream = new FileInputStream(tmpFile);
83 try {
84 BatchInfo batchInfo =
85 connection.createBatchFromStream(jobInfo, tmpInputStream);
86 System.out.println(batchInfo);
87 batchInfos.add(batchInfo);
88
89 } finally {
90 tmpInputStream.close();
91 }
92 }サーバは、受け取ったバッチをただちに処理待ちのキューに格納します。バッチの送信時には、形式のエラーは報告はされません。こうしたエラーはバッチの処理が完了した後、結果データとして報告されます。
ジョブの終了
すべてのバッチをジョブに追加したら、ジョブを終了します。ジョブを終了すると、すべてのバッチの処理が確実に完了します。
1private void closeJob(BulkConnection connection, String jobId)
2 throws AsyncApiException {
3 JobInfo job = new JobInfo();
4 job.setId(jobId);
5 job.setState(JobStateEnum.Closed);
6 connection.updateJob(job);
7 }バッチの状況の確認
バッチはバックグラウンドで処理されます。データセットのサイズと複雑さによって、処理に要する時間が決まります。処理の実行中に、すべてのバッチの状況を取得して検査し、処理がいつ完了したかを確認することができます。
1/**
2 * Wait for a job to complete by polling the Bulk API.
3 *
4 * @param connection
5 * BulkConnection used to check results.
6 * @param job
7 * The job awaiting completion.
8 * @param batchInfoList
9 * List of batches for this job.
10 * @throws AsyncApiException
11 */
12 private void awaitCompletion(BulkConnection connection, JobInfo job,
13 List<BatchInfo> batchInfoList)
14 throws AsyncApiException {
15 long sleepTime = 0L;
16 Set<String> incomplete = new HashSet<String>();
17 for (BatchInfo bi : batchInfoList) {
18 incomplete.add(bi.getId());
19 }
20 while (!incomplete.isEmpty()) {
21 try {
22 Thread.sleep(sleepTime);
23 } catch (InterruptedException e) {}
24 System.out.println("Awaiting results..." + incomplete.size());
25 sleepTime = 10000L;
26 BatchInfo[] statusList =
27 connection.getBatchInfoList(job.getId()).getBatchInfo();
28 for (BatchInfo b : statusList) {
29 if (b.getState() == BatchStateEnum.Completed
30 || b.getState() == BatchStateEnum.Failed) {
31 if (incomplete.remove(b.getId())) {
32 System.out.println("BATCH STATUS:\n" + b);
33 }
34 }
35 }
36 }
37 }バッチは、状況が Failed か Completed のいずれかになった場合に終了となります。このコードでは、ジョブのすべてのバッチが終了するまでループ処理を実行します。
ジョブの結果の取得
すべてのバッチが処理されたら、各バッチの結果を取得できます。バッチが成功した場合も、失敗した場合も、またジョブが途中で中止された場合も、結果を取得してください。結果セットを取得しないと、個々のレコードの状況を確認できません。各レコードの結果を正しく取得するには、バッチと対応する元のデータセットをコード内で正確に追跡する必要がありますが、そのためには、バッチの作成時のリストを保持し、結果の取得に使用するようにします。次のコードはそのための処理を記述しています。
1/**
2 * Gets the results of the operation and checks for errors.
3 */
4 private void checkResults(BulkConnection connection, JobInfo job,
5 List<BatchInfo> batchInfoList)
6 throws AsyncApiException, IOException {
7 // batchInfoList was populated when batches were created and submitted
8 for (BatchInfo b : batchInfoList) {
9 CSVReader rdr =
10 new CSVReader(connection.getBatchResultStream(job.getId(), b.getId()));
11 List<String> resultHeader = rdr.nextRecord();
12 int resultCols = resultHeader.size();
13
14 List<String> row;
15 while ((row = rdr.nextRecord()) != null) {
16 Map<String, String> resultInfo = new HashMap<String, String>();
17 for (int i = 0; i < resultCols; i++) {
18 resultInfo.put(resultHeader.get(i), row.get(i));
19 }
20 boolean success = Boolean.valueOf(resultInfo.get("Success"));
21 boolean created = Boolean.valueOf(resultInfo.get("Created"));
22 String id = resultInfo.get("Id");
23 String error = resultInfo.get("Error");
24 if (success && created) {
25 System.out.println("Created row with id " + id);
26 } else if (!success) {
27 System.out.println("Failed with error: " + error);
28 }
29 }
30 }
31 }このコードは、各レコードの結果を取得し、処理が成功したか失敗したかを報告します。レコードでエラーが発生した場合にはエラーを出力します。
クイックスタートサンプルの完全版
ジョブやバッチについての理解が深まったでしょうか。クイックスタートサンプルのコード全体を次に示します。コピーして活用してください。
1
2import java.io.*;
3import java.util.*;
4
5import com.sforce.async.*;
6import com.sforce.soap.partner.PartnerConnection;
7import com.sforce.ws.ConnectionException;
8import com.sforce.ws.ConnectorConfig;
9
10
11public class BulkExample {
12
13
14 public static void main(String[] args)
15 throws AsyncApiException, ConnectionException, IOException {
16 BulkExample example = new BulkExample();
17 // Replace arguments below with your credentials and test file name
18 // The first parameter indicates that we are loading Account records
19 example.runSample("Account", "myUser@myOrg.com", "myPassword", "mySampleData.csv");
20 }
21
22 /**
23 * Creates a Bulk API job and uploads batches for a CSV file.
24 */
25 public void runSample(String sobjectType, String userName,
26 String password, String sampleFileName)
27 throws AsyncApiException, ConnectionException, IOException {
28 BulkConnection connection = getBulkConnection(userName, password);
29 JobInfo job = createJob(sobjectType, connection);
30 List<BatchInfo> batchInfoList = createBatchesFromCSVFile(connection, job,
31 sampleFileName);
32 closeJob(connection, job.getId());
33 awaitCompletion(connection, job, batchInfoList);
34 checkResults(connection, job, batchInfoList);
35 }
36
37
38
39 /**
40 * Gets the results of the operation and checks for errors.
41 */
42 private void checkResults(BulkConnection connection, JobInfo job,
43 List<BatchInfo> batchInfoList)
44 throws AsyncApiException, IOException {
45 // batchInfoList was populated when batches were created and submitted
46 for (BatchInfo b : batchInfoList) {
47 CSVReader rdr =
48 new CSVReader(connection.getBatchResultStream(job.getId(), b.getId()));
49 List<String> resultHeader = rdr.nextRecord();
50 int resultCols = resultHeader.size();
51
52 List<String> row;
53 while ((row = rdr.nextRecord()) != null) {
54 Map<String, String> resultInfo = new HashMap<String, String>();
55 for (int i = 0; i < resultCols; i++) {
56 resultInfo.put(resultHeader.get(i), row.get(i));
57 }
58 boolean success = Boolean.valueOf(resultInfo.get("Success"));
59 boolean created = Boolean.valueOf(resultInfo.get("Created"));
60 String id = resultInfo.get("Id");
61 String error = resultInfo.get("Error");
62 if (success && created) {
63 System.out.println("Created row with id " + id);
64 } else if (!success) {
65 System.out.println("Failed with error: " + error);
66 }
67 }
68 }
69 }
70
71
72
73 private void closeJob(BulkConnection connection, String jobId)
74 throws AsyncApiException {
75 JobInfo job = new JobInfo();
76 job.setId(jobId);
77 job.setState(JobStateEnum.Closed);
78 connection.updateJob(job);
79 }
80
81
82
83 /**
84 * Wait for a job to complete by polling the Bulk API.
85 *
86 * @param connection
87 * BulkConnection used to check results.
88 * @param job
89 * The job awaiting completion.
90 * @param batchInfoList
91 * List of batches for this job.
92 * @throws AsyncApiException
93 */
94 private void awaitCompletion(BulkConnection connection, JobInfo job,
95 List<BatchInfo> batchInfoList)
96 throws AsyncApiException {
97 long sleepTime = 0L;
98 Set<String> incomplete = new HashSet<String>();
99 for (BatchInfo bi : batchInfoList) {
100 incomplete.add(bi.getId());
101 }
102 while (!incomplete.isEmpty()) {
103 try {
104 Thread.sleep(sleepTime);
105 } catch (InterruptedException e) {}
106 System.out.println("Awaiting results..." + incomplete.size());
107 sleepTime = 10000L;
108 BatchInfo[] statusList =
109 connection.getBatchInfoList(job.getId()).getBatchInfo();
110 for (BatchInfo b : statusList) {
111 if (b.getState() == BatchStateEnum.Completed
112 || b.getState() == BatchStateEnum.Failed) {
113 if (incomplete.remove(b.getId())) {
114 System.out.println("BATCH STATUS:\n" + b);
115 }
116 }
117 }
118 }
119 }
120
121
122
123 /**
124 * Create a new job using the Bulk API.
125 *
126 * @param sobjectType
127 * The object type being loaded, such as "Account"
128 * @param connection
129 * BulkConnection used to create the new job.
130 * @return The JobInfo for the new job.
131 * @throws AsyncApiException
132 */
133 private JobInfo createJob(String sobjectType, BulkConnection connection)
134 throws AsyncApiException {
135 JobInfo job = new JobInfo();
136 job.setObject(sobjectType);
137 job.setOperation(OperationEnum.insert);
138 job.setContentType(ContentType.CSV);
139 job = connection.createJob(job);
140 System.out.println(job);
141 return job;
142 }
143
144
145
146 /**
147 * Create the BulkConnection used to call Bulk API operations.
148 */
149 private BulkConnection getBulkConnection(String userName, String password)
150 throws ConnectionException, AsyncApiException {
151 ConnectorConfig partnerConfig = new ConnectorConfig();
152 partnerConfig.setUsername(userName);
153 partnerConfig.setPassword(password);
154 partnerConfig.setAuthEndpoint("https://login.salesforce.com/services/Soap/u/57.0");
155 // Creating the connection automatically handles login and stores
156 // the session in partnerConfig
157 new PartnerConnection(partnerConfig);
158 // When PartnerConnection is instantiated, a login is implicitly
159 // executed and, if successful,
160 // a valid session is stored in the ConnectorConfig instance.
161 // Use this key to initialize a BulkConnection:
162 ConnectorConfig config = new ConnectorConfig();
163 config.setSessionId(partnerConfig.getSessionId());
164 // The endpoint for the Bulk API service is the same as for the normal
165 // SOAP uri until the /Soap/ part. From here it's '/async/versionNumber'
166 String soapEndpoint = partnerConfig.getServiceEndpoint();
167 String apiVersion = "57.0";
168 String restEndpoint = soapEndpoint.substring(0, soapEndpoint.indexOf("Soap/"))
169 + "async/" + apiVersion;
170 config.setRestEndpoint(restEndpoint);
171 // This should only be false when doing debugging.
172 config.setCompression(true);
173 // Set this to true to see HTTP requests and responses on stdout
174 config.setTraceMessage(false);
175 BulkConnection connection = new BulkConnection(config);
176 return connection;
177 }
178
179
180
181 /**
182 * Create and upload batches using a CSV file.
183 * The file into the appropriate size batch files.
184 *
185 * @param connection
186 * Connection to use for creating batches
187 * @param jobInfo
188 * Job associated with new batches
189 * @param csvFileName
190 * The source file for batch data
191 */
192 private List<BatchInfo> createBatchesFromCSVFile(BulkConnection connection,
193 JobInfo jobInfo, String csvFileName)
194 throws IOException, AsyncApiException {
195 List<BatchInfo> batchInfos = new ArrayList<BatchInfo>();
196 BufferedReader rdr = new BufferedReader(
197 new InputStreamReader(new FileInputStream(csvFileName))
198 );
199 // read the CSV header row
200 byte[] headerBytes = (rdr.readLine() + "\n").getBytes("UTF-8");
201 int headerBytesLength = headerBytes.length;
202 File tmpFile = File.createTempFile("bulkAPIInsert", ".csv");
203
204 // Split the CSV file into multiple batches
205 try {
206 FileOutputStream tmpOut = new FileOutputStream(tmpFile);
207 int maxBytesPerBatch = 10000000; // 10 million bytes per batch
208 int maxRowsPerBatch = 10000; // 10 thousand rows per batch
209 int currentBytes = 0;
210 int currentLines = 0;
211 String nextLine;
212 while ((nextLine = rdr.readLine()) != null) {
213 byte[] bytes = (nextLine + "\n").getBytes("UTF-8");
214 // Create a new batch when our batch size limit is reached
215 if (currentBytes + bytes.length > maxBytesPerBatch
216 || currentLines > maxRowsPerBatch) {
217 createBatch(tmpOut, tmpFile, batchInfos, connection, jobInfo);
218 currentBytes = 0;
219 currentLines = 0;
220 }
221 if (currentBytes == 0) {
222 tmpOut = new FileOutputStream(tmpFile);
223 tmpOut.write(headerBytes);
224 currentBytes = headerBytesLength;
225 currentLines = 1;
226 }
227 tmpOut.write(bytes);
228 currentBytes += bytes.length;
229 currentLines++;
230 }
231 // Finished processing all rows
232 // Create a final batch for any remaining data
233 if (currentLines > 1) {
234 createBatch(tmpOut, tmpFile, batchInfos, connection, jobInfo);
235 }
236 } finally {
237 tmpFile.delete();
238 }
239 return batchInfos;
240 }
241
242 /**
243 * Create a batch by uploading the contents of the file.
244 * This closes the output stream.
245 *
246 * @param tmpOut
247 * The output stream used to write the CSV data for a single batch.
248 * @param tmpFile
249 * The file associated with the above stream.
250 * @param batchInfos
251 * The batch info for the newly created batch is added to this list.
252 * @param connection
253 * The BulkConnection used to create the new batch.
254 * @param jobInfo
255 * The JobInfo associated with the new batch.
256 */
257 private void createBatch(FileOutputStream tmpOut, File tmpFile,
258 List<BatchInfo> batchInfos, BulkConnection connection, JobInfo jobInfo)
259 throws IOException, AsyncApiException {
260 tmpOut.flush();
261 tmpOut.close();
262 FileInputStream tmpInputStream = new FileInputStream(tmpFile);
263 try {
264 BatchInfo batchInfo =
265 connection.createBatchFromStream(jobInfo, tmpInputStream);
266 System.out.println(batchInfo);
267 batchInfos.add(batchInfo);
268
269 } finally {
270 tmpInputStream.close();
271 }
272 }
273
274
275}