この文章は Salesforce 機械翻訳システムを使用して翻訳されました。詳細はこちらをご参照ください。
英語に切り替える

サンプルコードの説明

クライアントの設定が完了したら、Bulk API を使用するクライアントアプリケーションの構築を開始できます。次のサンプルを使用して、クライアントアプリケーションを作成します。各セクションで、コードの特定部分を順に説明していきます。詳細なサンプルは最後に記載します。

次のコードは、WSC ツールキット内のパッケージとクラスを設定し、さらに Partner WSDL から生成されたコードを設定します。

1
2import java.io.*;
3import java.util.*;
4
5import com.sforce.async.*;
6import com.sforce.soap.partner.PartnerConnection;
7import com.sforce.ws.ConnectionException;
8import com.sforce.ws.ConnectorConfig;
9

main() メソッドの設定

次のコードは、クラスの main() メソッドを設定します。main() メソッドは、サンプルの処理ロジックを含む runSample() メソッドを呼び出します。runSample() で呼び出されるメソッドについては、後続のセクションで取り上げます。

1
2    public static void main(String[] args)
3      throws AsyncApiException, ConnectionException, IOException {
4        BulkExample example = new BulkExample();
5        // Replace arguments below with your credentials and test file name
6        // The first parameter indicates that we are loading Account records
7        example.runSample("Account", "myUser@myOrg.com", "myPassword", "mySampleData.csv");
8    }
9
10    /**
11     * Creates a Bulk API job and uploads batches for a CSV file.
12     */
13    public void runSample(String sobjectType, String userName,
14              String password, String sampleFileName)
15            throws AsyncApiException, ConnectionException, IOException {
16        BulkConnection connection = getBulkConnection(userName, password);
17        JobInfo job = createJob(sobjectType, connection);
18        List<BatchInfo> batchInfoList = createBatchesFromCSVFile(connection, job,
19            sampleFileName);
20        closeJob(connection, job.getId());
21        awaitCompletion(connection, job, batchInfoList);
22        checkResults(connection, job, batchInfoList);
23    }
24

ログインと BulkConnection の設定

次のコードは、パートナー接続 (PartnerConnection) を使用してログインし、セッションを再利用して Bulk API 接続 (BulkConnection) を作成します。

1
2    /**
3     * Create the BulkConnection used to call Bulk API operations.
4     */
5    private BulkConnection getBulkConnection(String userName, String password)
6          throws ConnectionException, AsyncApiException {
7        ConnectorConfig partnerConfig = new ConnectorConfig();
8        partnerConfig.setUsername(userName);
9        partnerConfig.setPassword(password);
10        partnerConfig.setAuthEndpoint("https://login.salesforce.com/services/Soap/u/35.0");
11        // Creating the connection automatically handles login and stores
12        // the session in partnerConfig
13        new PartnerConnection(partnerConfig);
14        // When PartnerConnection is instantiated, a login is implicitly
15        // executed and, if successful,
16        // a valid session is stored in the ConnectorConfig instance.
17        // Use this key to initialize a BulkConnection:
18        ConnectorConfig config = new ConnectorConfig();
19        config.setSessionId(partnerConfig.getSessionId());
20        // The endpoint for the Bulk API service is the same as for the normal
21        // SOAP uri until the /Soap/ part. From here it's '/async/versionNumber'
22        String soapEndpoint = partnerConfig.getServiceEndpoint();
23        String apiVersion = "35.0";
24        String restEndpoint = soapEndpoint.substring(0, soapEndpoint.indexOf("Soap/"))
25            + "async/" + apiVersion;
26        config.setRestEndpoint(restEndpoint);
27        // This should only be false when doing debugging.
28        config.setCompression(true);
29        // Set this to true to see HTTP requests and responses on stdout
30        config.setTraceMessage(false);
31        BulkConnection connection = new BulkConnection(config);
32        return connection;
33    }
34

この BulkConnection インスタンスは、Bulk API を使用するための基盤であり、アプリケーションのライフサイクルにわたって繰り返し再利用できます。

ジョブの新規作成

接続が作成できたら、新しいジョブを作成します。データは常にジョブ単位で処理されます。ジョブは、処理するデータの詳細、つまり実行する処理の種類 (挿入、更新、更新/挿入、削除) やオブジェクト種別を指定します。次のコードは、Account オブジェクトを対象とした挿入処理のジョブを新規作成します。

1
2    /**
3     * Create a new job using the Bulk API.
4     * 
5     * @param sobjectType
6     *            The object type being loaded, such as "Account"
7     * @param connection
8     *            BulkConnection used to create the new job.
9     * @return The JobInfo for the new job.
10     * @throws AsyncApiException
11     */
12    private JobInfo createJob(String sobjectType, BulkConnection connection)
13          throws AsyncApiException {
14        JobInfo job = new JobInfo();
15        job.setObject(sobjectType);
16        job.setOperation(OperationEnum.insert);
17        job.setContentType(ContentType.CSV);
18        job = connection.createJob(job);
19        System.out.println(job);
20        return job;
21    }
22

作成したばかりのジョブの状態は、Open になります。ジョブがこの状態にある間は、新しいバッチをジョブに追加できます。ジョブの状態が Closed になると、それ以上バッチを追加することはできません。

ジョブへのバッチの追加

データは一連のバッチ要求を通じて処理されます。各要求は、リクエストボディに XML 形式のデータセットを含む HTTP POST です。Bulk API の制限で説明されているバッチサイズと、1 日あたりの処理バッチ数の上限を超過しない限り、データセット全体をどの程度に分割して処理するかは、クライアントアプリケーション側で決定できます。

各バッチの処理にはオーバーヘッドが伴います。オーバーヘッドの処理コストを最小限に抑えられるよう、バッチを処理と転送に適したサイズに調整する必要があります。レコード数 1,000 ~ 10,000 件の範囲が、適切なバッチサイズとみなされます。

次のコードは、CSV ファイルを小さいバッチファイルに分割して、Salesforce にアップロードします。

1
2    /**
3     * Create and upload batches using a CSV file.
4     * The file into the appropriate size batch files.
5     * 
6     * @param connection
7     *            Connection to use for creating batches
8     * @param jobInfo
9     *            Job associated with new batches
10     * @param csvFileName
11     *            The source file for batch data
12     */
13    private List<BatchInfo> createBatchesFromCSVFile(BulkConnection connection,
14          JobInfo jobInfo, String csvFileName)
15            throws IOException, AsyncApiException {
16        List<BatchInfo> batchInfos = new ArrayList<BatchInfo>();
17        BufferedReader rdr = new BufferedReader(
18            new InputStreamReader(new FileInputStream(csvFileName))
19        );
20        // read the CSV header row
21        byte[] headerBytes = (rdr.readLine() + "\n").getBytes("UTF-8");
22        int headerBytesLength = headerBytes.length;
23        File tmpFile = File.createTempFile("bulkAPIInsert", ".csv");
24
25        // Split the CSV file into multiple batches
26        try {
27            FileOutputStream tmpOut = new FileOutputStream(tmpFile);
28            int maxBytesPerBatch = 10000000; // 10 million bytes per batch
29            int maxRowsPerBatch = 10000; // 10 thousand rows per batch
30            int currentBytes = 0;
31            int currentLines = 0;
32            String nextLine;
33            while ((nextLine = rdr.readLine()) != null) {
34                byte[] bytes = (nextLine + "\n").getBytes("UTF-8");
35                // Create a new batch when our batch size limit is reached
36                if (currentBytes + bytes.length > maxBytesPerBatch
37                  || currentLines > maxRowsPerBatch) {
38                    createBatch(tmpOut, tmpFile, batchInfos, connection, jobInfo);
39                    currentBytes = 0;
40                    currentLines = 0;
41                }
42                if (currentBytes == 0) {
43                    tmpOut = new FileOutputStream(tmpFile);
44                    tmpOut.write(headerBytes);
45                    currentBytes = headerBytesLength;
46                    currentLines = 1;
47                }
48                tmpOut.write(bytes);
49                currentBytes += bytes.length;
50                currentLines++;
51            }
52            // Finished processing all rows
53            // Create a final batch for any remaining data
54            if (currentLines > 1) {
55                createBatch(tmpOut, tmpFile, batchInfos, connection, jobInfo);
56            }
57        } finally {
58            tmpFile.delete();
59        }
60        return batchInfos;
61    }
62
63    /**
64     * Create a batch by uploading the contents of the file.
65     * This closes the output stream.
66     * 
67     * @param tmpOut
68     *            The output stream used to write the CSV data for a single batch.
69     * @param tmpFile
70     *            The file associated with the above stream.
71     * @param batchInfos
72     *            The batch info for the newly created batch is added to this list.
73     * @param connection
74     *            The BulkConnection used to create the new batch.
75     * @param jobInfo
76     *            The JobInfo associated with the new batch.
77     */
78    private void createBatch(FileOutputStream tmpOut, File tmpFile,
79      List<BatchInfo> batchInfos, BulkConnection connection, JobInfo jobInfo)
80              throws IOException, AsyncApiException {
81        tmpOut.flush();
82        tmpOut.close();
83        FileInputStream tmpInputStream = new FileInputStream(tmpFile);
84        try {
85            BatchInfo batchInfo =
86              connection.createBatchFromStream(jobInfo, tmpInputStream);
87            System.out.println(batchInfo);
88            batchInfos.add(batchInfo);
89
90        } finally {
91            tmpInputStream.close();
92        }
93    }
94

サーバは、受け取ったバッチをただちに処理待ちのキューに格納します。バッチの送信時には、形式にエラーがあっても報告はされません。こうしたエラーはバッチの処理が完了した後、結果データとして報告されます。

バイナリ型の添付ファイルをインポートするには、以下のメソッドを使用して、batchContent パラメータにバッチのコンテンツ (CSV、XML のいずれか) を指定するか、添付ファイルに request.txt を追加して、batchContent パラメータには null を渡します。これらのメソッドは、com.async.BulkConnection クラス内に含まれます。

  • createBatchFromDir()
  • createBatchWithFileAttachments()
  • createBatchWithInputStreamAttachments()
  • createBatchFromZipStream()

ヒント

ジョブの終了

すべてのバッチをジョブに追加したら、ジョブを終了します。ジョブを終了すると、すべてのバッチの処理が確実に完了します。

1
2    private void closeJob(BulkConnection connection, String jobId)
3          throws AsyncApiException {
4        JobInfo job = new JobInfo();
5        job.setId(jobId);
6        job.setState(JobStateEnum.Closed);
7        connection.updateJob(job);
8    }
9

バッチの状況の確認

バッチはバックグラウンドで処理されます。バッチが完了するまでの処理時間は、データセットのサイズによって異なります。処理の実行中に、すべてのバッチの状況を取得して、バッチが完了しているかどうかを確認することができます。

1
2    /**
3     * Wait for a job to complete by polling the Bulk API.
4     * 
5     * @param connection
6     *            BulkConnection used to check results.
7     * @param job
8     *            The job awaiting completion.
9     * @param batchInfoList
10     *            List of batches for this job.
11     * @throws AsyncApiException
12     */
13    private void awaitCompletion(BulkConnection connection, JobInfo job,
14          List<BatchInfo> batchInfoList)
15            throws AsyncApiException {
16        long sleepTime = 0L;
17        Set<String> incomplete = new HashSet<String>();
18        for (BatchInfo bi : batchInfoList) {
19            incomplete.add(bi.getId());
20        }
21        while (!incomplete.isEmpty()) {
22            try {
23                Thread.sleep(sleepTime);
24            } catch (InterruptedException e) {}
25            System.out.println("Awaiting results..." + incomplete.size());
26            sleepTime = 10000L;
27            BatchInfo[] statusList =
28              connection.getBatchInfoList(job.getId()).getBatchInfo();
29            for (BatchInfo b : statusList) {
30                if (b.getState() == BatchStateEnum.Completed
31                  || b.getState() == BatchStateEnum.Failed) {
32                    if (incomplete.remove(b.getId())) {
33                        System.out.println("BATCH STATUS:\n" + b);
34                    }
35                }
36            }
37        }
38    }
39

バッチは、状況が Failed か Completed のいずれかになった場合に終了となります。このコードでは、ジョブのすべてのバッチが終了するまでループ処理を実行します。

ジョブの結果の取得

すべてのバッチの処理が完了したら、各バッチの結果を取得できます。バッチが成功した場合も、失敗した場合も、またジョブが途中で中止された場合も、必ず結果を取得してください。結果セットを取得しないと、個々のレコードの状況を確認できません。各レコードの結果を正しく取得するには、バッチと対応する元のデータセットをコード内で正確に追跡する必要がありまが、そのためには、バッチの作成時のリストを保持し、結果の取得に使用するようにします。次のコードはそのための処理を記述しています。

1
2    /**
3     * Gets the results of the operation and checks for errors.
4     */
5    private void checkResults(BulkConnection connection, JobInfo job,
6              List<BatchInfo> batchInfoList)
7            throws AsyncApiException, IOException {
8        // batchInfoList was populated when batches were created and submitted
9        for (BatchInfo b : batchInfoList) {
10            CSVReader rdr =
11              new CSVReader(connection.getBatchResultStream(job.getId(), b.getId()));
12            List<String> resultHeader = rdr.nextRecord();
13            int resultCols = resultHeader.size();
14
15            List<String> row;
16            while ((row = rdr.nextRecord()) != null) {
17                Map<String, String> resultInfo = new HashMap<String, String>();
18                for (int i = 0; i < resultCols; i++) {
19                    resultInfo.put(resultHeader.get(i), row.get(i));
20                }
21                boolean success = Boolean.valueOf(resultInfo.get("Success"));
22                boolean created = Boolean.valueOf(resultInfo.get("Created"));
23                String id = resultInfo.get("Id");
24                String error = resultInfo.get("Error");
25                if (success && created) {
26                    System.out.println("Created row with id " + id);
27                } else if (!success) {
28                    System.out.println("Failed with error: " + error);
29                }
30            }
31        }
32    }
33

このコードは、各レコードの結果を取得し、処理が成功したか失敗したかを報告します。レコードでエラーが発生した場合にはエラーを出力します。

クイックスタートサンプルの完全版

ジョブやバッチについての理解が深まったでしょうか。クイックスタートサンプルのコード全体を次に示します。コピーして活用してください。

1
2import java.io.*;
3import java.util.*;
4
5import com.sforce.async.*;
6import com.sforce.soap.partner.PartnerConnection;
7import com.sforce.ws.ConnectionException;
8import com.sforce.ws.ConnectorConfig;
9
10    
11public class BulkExample {
12
13
14    public static void main(String[] args)
15      throws AsyncApiException, ConnectionException, IOException {
16        BulkExample example = new BulkExample();
17        // Replace arguments below with your credentials and test file name
18        // The first parameter indicates that we are loading Account records
19        example.runSample("Account", "myUser@myOrg.com", "myPassword", "mySampleData.csv");
20    }
21
22    /**
23     * Creates a Bulk API job and uploads batches for a CSV file.
24     */
25    public void runSample(String sobjectType, String userName,
26              String password, String sampleFileName)
27            throws AsyncApiException, ConnectionException, IOException {
28        BulkConnection connection = getBulkConnection(userName, password);
29        JobInfo job = createJob(sobjectType, connection);
30        List<BatchInfo> batchInfoList = createBatchesFromCSVFile(connection, job,
31            sampleFileName);
32        closeJob(connection, job.getId());
33        awaitCompletion(connection, job, batchInfoList);
34        checkResults(connection, job, batchInfoList);
35    }
36
37
38
39    /**
40     * Gets the results of the operation and checks for errors.
41     */
42    private void checkResults(BulkConnection connection, JobInfo job,
43              List<BatchInfo> batchInfoList)
44            throws AsyncApiException, IOException {
45        // batchInfoList was populated when batches were created and submitted
46        for (BatchInfo b : batchInfoList) {
47            CSVReader rdr =
48              new CSVReader(connection.getBatchResultStream(job.getId(), b.getId()));
49            List<String> resultHeader = rdr.nextRecord();
50            int resultCols = resultHeader.size();
51
52            List<String> row;
53            while ((row = rdr.nextRecord()) != null) {
54                Map<String, String> resultInfo = new HashMap<String, String>();
55                for (int i = 0; i < resultCols; i++) {
56                    resultInfo.put(resultHeader.get(i), row.get(i));
57                }
58                boolean success = Boolean.valueOf(resultInfo.get("Success"));
59                boolean created = Boolean.valueOf(resultInfo.get("Created"));
60                String id = resultInfo.get("Id");
61                String error = resultInfo.get("Error");
62                if (success && created) {
63                    System.out.println("Created row with id " + id);
64                } else if (!success) {
65                    System.out.println("Failed with error: " + error);
66                }
67            }
68        }
69    }
70
71
72
73    private void closeJob(BulkConnection connection, String jobId)
74          throws AsyncApiException {
75        JobInfo job = new JobInfo();
76        job.setId(jobId);
77        job.setState(JobStateEnum.Closed);
78        connection.updateJob(job);
79    }
80
81
82
83    /**
84     * Wait for a job to complete by polling the Bulk API.
85     * 
86     * @param connection
87     *            BulkConnection used to check results.
88     * @param job
89     *            The job awaiting completion.
90     * @param batchInfoList
91     *            List of batches for this job.
92     * @throws AsyncApiException
93     */
94    private void awaitCompletion(BulkConnection connection, JobInfo job,
95          List<BatchInfo> batchInfoList)
96            throws AsyncApiException {
97        long sleepTime = 0L;
98        Set<String> incomplete = new HashSet<String>();
99        for (BatchInfo bi : batchInfoList) {
100            incomplete.add(bi.getId());
101        }
102        while (!incomplete.isEmpty()) {
103            try {
104                Thread.sleep(sleepTime);
105            } catch (InterruptedException e) {}
106            System.out.println("Awaiting results..." + incomplete.size());
107            sleepTime = 10000L;
108            BatchInfo[] statusList =
109              connection.getBatchInfoList(job.getId()).getBatchInfo();
110            for (BatchInfo b : statusList) {
111                if (b.getState() == BatchStateEnum.Completed
112                  || b.getState() == BatchStateEnum.Failed) {
113                    if (incomplete.remove(b.getId())) {
114                        System.out.println("BATCH STATUS:\n" + b);
115                    }
116                }
117            }
118        }
119    }
120
121
122
123    /**
124     * Create a new job using the Bulk API.
125     * 
126     * @param sobjectType
127     *            The object type being loaded, such as "Account"
128     * @param connection
129     *            BulkConnection used to create the new job.
130     * @return The JobInfo for the new job.
131     * @throws AsyncApiException
132     */
133    private JobInfo createJob(String sobjectType, BulkConnection connection)
134          throws AsyncApiException {
135        JobInfo job = new JobInfo();
136        job.setObject(sobjectType);
137        job.setOperation(OperationEnum.insert);
138        job.setContentType(ContentType.CSV);
139        job = connection.createJob(job);
140        System.out.println(job);
141        return job;
142    }
143
144    
145
146    /**
147     * Create the BulkConnection used to call Bulk API operations.
148     */
149    private BulkConnection getBulkConnection(String userName, String password)
150          throws ConnectionException, AsyncApiException {
151        ConnectorConfig partnerConfig = new ConnectorConfig();
152        partnerConfig.setUsername(userName);
153        partnerConfig.setPassword(password);
154        partnerConfig.setAuthEndpoint("https://login.salesforce.com/services/Soap/u/35.0");
155        // Creating the connection automatically handles login and stores
156        // the session in partnerConfig
157        new PartnerConnection(partnerConfig);
158        // When PartnerConnection is instantiated, a login is implicitly
159        // executed and, if successful,
160        // a valid session is stored in the ConnectorConfig instance.
161        // Use this key to initialize a BulkConnection:
162        ConnectorConfig config = new ConnectorConfig();
163        config.setSessionId(partnerConfig.getSessionId());
164        // The endpoint for the Bulk API service is the same as for the normal
165        // SOAP uri until the /Soap/ part. From here it's '/async/versionNumber'
166        String soapEndpoint = partnerConfig.getServiceEndpoint();
167        String apiVersion = "35.0";
168        String restEndpoint = soapEndpoint.substring(0, soapEndpoint.indexOf("Soap/"))
169            + "async/" + apiVersion;
170        config.setRestEndpoint(restEndpoint);
171        // This should only be false when doing debugging.
172        config.setCompression(true);
173        // Set this to true to see HTTP requests and responses on stdout
174        config.setTraceMessage(false);
175        BulkConnection connection = new BulkConnection(config);
176        return connection;
177    }
178
179
180
181    /**
182     * Create and upload batches using a CSV file.
183     * The file into the appropriate size batch files.
184     * 
185     * @param connection
186     *            Connection to use for creating batches
187     * @param jobInfo
188     *            Job associated with new batches
189     * @param csvFileName
190     *            The source file for batch data
191     */
192    private List<BatchInfo> createBatchesFromCSVFile(BulkConnection connection,
193          JobInfo jobInfo, String csvFileName)
194            throws IOException, AsyncApiException {
195        List<BatchInfo> batchInfos = new ArrayList<BatchInfo>();
196        BufferedReader rdr = new BufferedReader(
197            new InputStreamReader(new FileInputStream(csvFileName))
198        );
199        // read the CSV header row
200        byte[] headerBytes = (rdr.readLine() + "\n").getBytes("UTF-8");
201        int headerBytesLength = headerBytes.length;
202        File tmpFile = File.createTempFile("bulkAPIInsert", ".csv");
203
204        // Split the CSV file into multiple batches
205        try {
206            FileOutputStream tmpOut = new FileOutputStream(tmpFile);
207            int maxBytesPerBatch = 10000000; // 10 million bytes per batch
208            int maxRowsPerBatch = 10000; // 10 thousand rows per batch
209            int currentBytes = 0;
210            int currentLines = 0;
211            String nextLine;
212            while ((nextLine = rdr.readLine()) != null) {
213                byte[] bytes = (nextLine + "\n").getBytes("UTF-8");
214                // Create a new batch when our batch size limit is reached
215                if (currentBytes + bytes.length > maxBytesPerBatch
216                  || currentLines > maxRowsPerBatch) {
217                    createBatch(tmpOut, tmpFile, batchInfos, connection, jobInfo);
218                    currentBytes = 0;
219                    currentLines = 0;
220                }
221                if (currentBytes == 0) {
222                    tmpOut = new FileOutputStream(tmpFile);
223                    tmpOut.write(headerBytes);
224                    currentBytes = headerBytesLength;
225                    currentLines = 1;
226                }
227                tmpOut.write(bytes);
228                currentBytes += bytes.length;
229                currentLines++;
230            }
231            // Finished processing all rows
232            // Create a final batch for any remaining data
233            if (currentLines > 1) {
234                createBatch(tmpOut, tmpFile, batchInfos, connection, jobInfo);
235            }
236        } finally {
237            tmpFile.delete();
238        }
239        return batchInfos;
240    }
241
242    /**
243     * Create a batch by uploading the contents of the file.
244     * This closes the output stream.
245     * 
246     * @param tmpOut
247     *            The output stream used to write the CSV data for a single batch.
248     * @param tmpFile
249     *            The file associated with the above stream.
250     * @param batchInfos
251     *            The batch info for the newly created batch is added to this list.
252     * @param connection
253     *            The BulkConnection used to create the new batch.
254     * @param jobInfo
255     *            The JobInfo associated with the new batch.
256     */
257    private void createBatch(FileOutputStream tmpOut, File tmpFile,
258      List<BatchInfo> batchInfos, BulkConnection connection, JobInfo jobInfo)
259              throws IOException, AsyncApiException {
260        tmpOut.flush();
261        tmpOut.close();
262        FileInputStream tmpInputStream = new FileInputStream(tmpFile);
263        try {
264            BatchInfo batchInfo =
265              connection.createBatchFromStream(jobInfo, tmpInputStream);
266            System.out.println(batchInfo);
267            batchInfos.add(batchInfo);
268
269        } finally {
270            tmpInputStream.close();
271        }
272    }
273
274
275}