この文章は Salesforce 機械翻訳システムを使用して翻訳されました。詳細はこちらをご参照ください。
英語に切り替える

サンプルコードの説明

クライアントの設定が完了したら、Bulk API を使用するクライアントアプリケーションを構築できます。次のサンプルを使用して、クライアントアプリケーションを作成します。各セクションで、コードの特定部分を順に説明していきます。詳細なサンプルは最後に記載します。

次のコードは、WSC ツールキット内のパッケージとクラスを設定し、さらに Partner WSDL から生成されたコードを設定します。

1import java.io.*;
2import java.util.*;
3
4import com.sforce.async.*;
5import com.sforce.soap.partner.PartnerConnection;
6import com.sforce.ws.ConnectionException;
7import com.sforce.ws.ConnectorConfig;

main() メソッドの設定

次のコードは、クラスの main() メソッドを設定します。main() メソッドは、サンプルの処理ロジックを含む runSample() メソッドを呼び出します。runSample() で呼び出されるメソッドについては、後続のセクションで取り上げます。

1public static void main(String[] args)
2      throws AsyncApiException, ConnectionException, IOException {
3        BulkExample example = new BulkExample();
4        // Replace arguments below with your credentials and test file name
5        // The first parameter indicates that we are loading Account records
6        example.runSample("Account", "myUser@myOrg.com", "myPassword", "mySampleData.csv");
7    }
8
9    /**
10     * Creates a Bulk API job and uploads batches for a CSV file.
11     */
12    public void runSample(String sobjectType, String userName,
13              String password, String sampleFileName)
14            throws AsyncApiException, ConnectionException, IOException {
15        BulkConnection connection = getBulkConnection(userName, password);
16        JobInfo job = createJob(sobjectType, connection);
17        List<BatchInfo> batchInfoList = createBatchesFromCSVFile(connection, job,
18            sampleFileName);
19        closeJob(connection, job.getId());
20        awaitCompletion(connection, job, batchInfoList);
21        checkResults(connection, job, batchInfoList);
22    }

ログインと BulkConnection の設定

次のコードは、パートナー接続 (PartnerConnection) を使用してログインし、セッションを再利用して Bulk API 接続 (BulkConnection) を作成します。

1/**
2     * Create the BulkConnection used to call Bulk API operations.
3     */
4    private BulkConnection getBulkConnection(String userName, String password)
5          throws ConnectionException, AsyncApiException {
6        ConnectorConfig partnerConfig = new ConnectorConfig();
7        partnerConfig.setUsername(userName);
8        partnerConfig.setPassword(password);
9        partnerConfig.setAuthEndpoint("https://login.salesforce.com/services/Soap/u/39.0");
10        // Creating the connection automatically handles login and stores
11        // the session in partnerConfig
12        new PartnerConnection(partnerConfig);
13        // When PartnerConnection is instantiated, a login is implicitly
14        // executed and, if successful,
15        // a valid session is stored in the ConnectorConfig instance.
16        // Use this key to initialize a BulkConnection:
17        ConnectorConfig config = new ConnectorConfig();
18        config.setSessionId(partnerConfig.getSessionId());
19        // The endpoint for the Bulk API service is the same as for the normal
20        // SOAP uri until the /Soap/ part. From here it's '/async/versionNumber'
21        String soapEndpoint = partnerConfig.getServiceEndpoint();
22        String apiVersion = "39.0";
23        String restEndpoint = soapEndpoint.substring(0, soapEndpoint.indexOf("Soap/"))
24            + "async/" + apiVersion;
25        config.setRestEndpoint(restEndpoint);
26        // This should only be false when doing debugging.
27        config.setCompression(true);
28        // Set this to true to see HTTP requests and responses on stdout
29        config.setTraceMessage(false);
30        BulkConnection connection = new BulkConnection(config);
31        return connection;
32    }

この BulkConnection インスタンスは、Bulk API を使用するための基盤であり、アプリケーションのライフサイクルにわたって繰り返し再利用できます。

ジョブの作成

接続が作成できたら、ジョブを作成します。データは常にジョブ単位で処理されます。ジョブは、処理するデータの詳細、つまり実行する処理の種類 (挿入、更新、更新/挿入、削除) やオブジェクト種別を指定します。次のコードは、Account オブジェクトを対象とした挿入処理のジョブを新規作成します。

1/**
2     * Create a new job using the Bulk API.
3     * 
4     * @param sobjectType
5     *            The object type being loaded, such as "Account"
6     * @param connection
7     *            BulkConnection used to create the new job.
8     * @return The JobInfo for the new job.
9     * @throws AsyncApiException
10     */
11    private JobInfo createJob(String sobjectType, BulkConnection connection)
12          throws AsyncApiException {
13        JobInfo job = new JobInfo();
14        job.setObject(sobjectType);
15        job.setOperation(OperationEnum.insert);
16        job.setContentType(ContentType.CSV);
17        job = connection.createJob(job);
18        System.out.println(job);
19        return job;
20    }

作成したばかりのジョブの状態は、Open になります。ジョブがこの状態にある間は、新しいバッチをジョブに追加できます。ジョブの状態が Closed になると、それ以上バッチを追加することはできません。

ジョブへのバッチの追加

データは一連のバッチ要求を通じて処理されます。各要求は、リクエストボディに XML 形式のデータセットを含む HTTP POST です。「Bulk API の制限」で説明されているバッチサイズと、1 日あたりの処理バッチ数の上限を超過しない限り、データセット全体をどの程度に分割して処理するかは、クライアントアプリケーション側で決定できます。

各バッチの処理にはオーバーヘッドが伴います。オーバーヘッドの処理コストを最小限に抑えられるよう、バッチを処理と転送に適したサイズに調整する必要があります。レコード数 1,000 ~ 10,000 件の範囲が、適切なバッチサイズとみなされます。

次のコードは、CSV ファイルを小さいバッチファイルに分割して、Salesforce にアップロードします。

1/**
2     * Create and upload batches using a CSV file.
3     * The file into the appropriate size batch files.
4     * 
5     * @param connection
6     *            Connection to use for creating batches
7     * @param jobInfo
8     *            Job associated with new batches
9     * @param csvFileName
10     *            The source file for batch data
11     */
12    private List<BatchInfo> createBatchesFromCSVFile(BulkConnection connection,
13          JobInfo jobInfo, String csvFileName)
14            throws IOException, AsyncApiException {
15        List<BatchInfo> batchInfos = new ArrayList<BatchInfo>();
16        BufferedReader rdr = new BufferedReader(
17            new InputStreamReader(new FileInputStream(csvFileName))
18        );
19        // read the CSV header row
20        byte[] headerBytes = (rdr.readLine() + "\n").getBytes("UTF-8");
21        int headerBytesLength = headerBytes.length;
22        File tmpFile = File.createTempFile("bulkAPIInsert", ".csv");
23
24        // Split the CSV file into multiple batches
25        try {
26            FileOutputStream tmpOut = new FileOutputStream(tmpFile);
27            int maxBytesPerBatch = 10000000; // 10 million bytes per batch
28            int maxRowsPerBatch = 10000; // 10 thousand rows per batch
29            int currentBytes = 0;
30            int currentLines = 0;
31            String nextLine;
32            while ((nextLine = rdr.readLine()) != null) {
33                byte[] bytes = (nextLine + "\n").getBytes("UTF-8");
34                // Create a new batch when our batch size limit is reached
35                if (currentBytes + bytes.length > maxBytesPerBatch
36                  || currentLines > maxRowsPerBatch) {
37                    createBatch(tmpOut, tmpFile, batchInfos, connection, jobInfo);
38                    currentBytes = 0;
39                    currentLines = 0;
40                }
41                if (currentBytes == 0) {
42                    tmpOut = new FileOutputStream(tmpFile);
43                    tmpOut.write(headerBytes);
44                    currentBytes = headerBytesLength;
45                    currentLines = 1;
46                }
47                tmpOut.write(bytes);
48                currentBytes += bytes.length;
49                currentLines++;
50            }
51            // Finished processing all rows
52            // Create a final batch for any remaining data
53            if (currentLines > 1) {
54                createBatch(tmpOut, tmpFile, batchInfos, connection, jobInfo);
55            }
56        } finally {
57            tmpFile.delete();
58        }
59        return batchInfos;
60    }
61
62    /**
63     * Create a batch by uploading the contents of the file.
64     * This closes the output stream.
65     * 
66     * @param tmpOut
67     *            The output stream used to write the CSV data for a single batch.
68     * @param tmpFile
69     *            The file associated with the above stream.
70     * @param batchInfos
71     *            The batch info for the newly created batch is added to this list.
72     * @param connection
73     *            The BulkConnection used to create the new batch.
74     * @param jobInfo
75     *            The JobInfo associated with the new batch.
76     */
77    private void createBatch(FileOutputStream tmpOut, File tmpFile,
78      List<BatchInfo> batchInfos, BulkConnection connection, JobInfo jobInfo)
79              throws IOException, AsyncApiException {
80        tmpOut.flush();
81        tmpOut.close();
82        FileInputStream tmpInputStream = new FileInputStream(tmpFile);
83        try {
84            BatchInfo batchInfo =
85              connection.createBatchFromStream(jobInfo, tmpInputStream);
86            System.out.println(batchInfo);
87            batchInfos.add(batchInfo);
88
89        } finally {
90            tmpInputStream.close();
91        }
92    }

サーバは、受け取ったバッチをただちに処理待ちのキューに格納します。バッチの送信時には、形式のエラーは報告はされません。こうしたエラーはバッチの処理が完了した後、結果データとして報告されます。

バイナリ添付ファイルをインポートするには、以下のメソッドを使用して、batchContent パラメータにバッチのコンテンツ (CSV、XML、または JSON のいずれか) を指定するか、添付ファイルに request.txt を追加して、batchContent パラメータには null を渡します。これらのメソッドは、com.async.BulkConnection クラス内に含まれます。

  • createBatchFromDir()
  • createBatchWithFileAttachments()
  • createBatchWithInputStreamAttachments()
  • createBatchFromZipStream()

ヒント

ジョブの終了

すべてのバッチをジョブに追加したら、ジョブを終了します。ジョブを終了すると、すべてのバッチの処理が確実に完了します。

1private void closeJob(BulkConnection connection, String jobId)
2          throws AsyncApiException {
3        JobInfo job = new JobInfo();
4        job.setId(jobId);
5        job.setState(JobStateEnum.Closed);
6        connection.updateJob(job);
7    }

バッチの状況の確認

バッチはバックグラウンドで処理されます。バッチが完了するまでの処理時間は、データセットのサイズによって異なります。処理の実行中に、すべてのバッチの状況を取得して、バッチが完了しているかどうかを確認することができます。

1/**
2     * Wait for a job to complete by polling the Bulk API.
3     * 
4     * @param connection
5     *            BulkConnection used to check results.
6     * @param job
7     *            The job awaiting completion.
8     * @param batchInfoList
9     *            List of batches for this job.
10     * @throws AsyncApiException
11     */
12    private void awaitCompletion(BulkConnection connection, JobInfo job,
13          List<BatchInfo> batchInfoList)
14            throws AsyncApiException {
15        long sleepTime = 0L;
16        Set<String> incomplete = new HashSet<String>();
17        for (BatchInfo bi : batchInfoList) {
18            incomplete.add(bi.getId());
19        }
20        while (!incomplete.isEmpty()) {
21            try {
22                Thread.sleep(sleepTime);
23            } catch (InterruptedException e) {}
24            System.out.println("Awaiting results..." + incomplete.size());
25            sleepTime = 10000L;
26            BatchInfo[] statusList =
27              connection.getBatchInfoList(job.getId()).getBatchInfo();
28            for (BatchInfo b : statusList) {
29                if (b.getState() == BatchStateEnum.Completed
30                  || b.getState() == BatchStateEnum.Failed) {
31                    if (incomplete.remove(b.getId())) {
32                        System.out.println("BATCH STATUS:\n" + b);
33                    }
34                }
35            }
36        }
37    }

バッチは、状況が Failed か Completed のいずれかになった場合に終了となります。このコードでは、ジョブのすべてのバッチが終了するまでループ処理を実行します。

ジョブの結果の取得

すべてのバッチの処理が完了したら、各バッチの結果を取得できます。バッチが成功した場合も、失敗した場合も、またジョブが途中で中止された場合も、必ず結果を取得してください。結果セットを取得しないと、個々のレコードの状況を確認できません。各レコードの結果を正しく取得するには、バッチと対応する元のデータセットをコード内で正確に追跡する必要がありまが、そのためには、バッチの作成時のリストを保持し、結果の取得に使用するようにします。次のコードはそのための処理を記述しています。

1/**
2     * Gets the results of the operation and checks for errors.
3     */
4    private void checkResults(BulkConnection connection, JobInfo job,
5              List<BatchInfo> batchInfoList)
6            throws AsyncApiException, IOException {
7        // batchInfoList was populated when batches were created and submitted
8        for (BatchInfo b : batchInfoList) {
9            CSVReader rdr =
10              new CSVReader(connection.getBatchResultStream(job.getId(), b.getId()));
11            List<String> resultHeader = rdr.nextRecord();
12            int resultCols = resultHeader.size();
13
14            List<String> row;
15            while ((row = rdr.nextRecord()) != null) {
16                Map<String, String> resultInfo = new HashMap<String, String>();
17                for (int i = 0; i < resultCols; i++) {
18                    resultInfo.put(resultHeader.get(i), row.get(i));
19                }
20                boolean success = Boolean.valueOf(resultInfo.get("Success"));
21                boolean created = Boolean.valueOf(resultInfo.get("Created"));
22                String id = resultInfo.get("Id");
23                String error = resultInfo.get("Error");
24                if (success && created) {
25                    System.out.println("Created row with id " + id);
26                } else if (!success) {
27                    System.out.println("Failed with error: " + error);
28                }
29            }
30        }
31    }

このコードは、各レコードの結果を取得し、処理が成功したか失敗したかを報告します。レコードでエラーが発生した場合にはエラーを出力します。

クイックスタートサンプルの完全版

ジョブやバッチについての理解が深まったでしょうか。クイックスタートサンプルのコード全体を次に示します。コピーして活用してください。

1
2import java.io.*;
3import java.util.*;
4
5import com.sforce.async.*;
6import com.sforce.soap.partner.PartnerConnection;
7import com.sforce.ws.ConnectionException;
8import com.sforce.ws.ConnectorConfig;
9
10    
11public class BulkExample {
12
13
14    public static void main(String[] args)
15      throws AsyncApiException, ConnectionException, IOException {
16        BulkExample example = new BulkExample();
17        // Replace arguments below with your credentials and test file name
18        // The first parameter indicates that we are loading Account records
19        example.runSample("Account", "myUser@myOrg.com", "myPassword", "mySampleData.csv");
20    }
21
22    /**
23     * Creates a Bulk API job and uploads batches for a CSV file.
24     */
25    public void runSample(String sobjectType, String userName,
26              String password, String sampleFileName)
27            throws AsyncApiException, ConnectionException, IOException {
28        BulkConnection connection = getBulkConnection(userName, password);
29        JobInfo job = createJob(sobjectType, connection);
30        List<BatchInfo> batchInfoList = createBatchesFromCSVFile(connection, job,
31            sampleFileName);
32        closeJob(connection, job.getId());
33        awaitCompletion(connection, job, batchInfoList);
34        checkResults(connection, job, batchInfoList);
35    }
36
37
38
39    /**
40     * Gets the results of the operation and checks for errors.
41     */
42    private void checkResults(BulkConnection connection, JobInfo job,
43              List<BatchInfo> batchInfoList)
44            throws AsyncApiException, IOException {
45        // batchInfoList was populated when batches were created and submitted
46        for (BatchInfo b : batchInfoList) {
47            CSVReader rdr =
48              new CSVReader(connection.getBatchResultStream(job.getId(), b.getId()));
49            List<String> resultHeader = rdr.nextRecord();
50            int resultCols = resultHeader.size();
51
52            List<String> row;
53            while ((row = rdr.nextRecord()) != null) {
54                Map<String, String> resultInfo = new HashMap<String, String>();
55                for (int i = 0; i < resultCols; i++) {
56                    resultInfo.put(resultHeader.get(i), row.get(i));
57                }
58                boolean success = Boolean.valueOf(resultInfo.get("Success"));
59                boolean created = Boolean.valueOf(resultInfo.get("Created"));
60                String id = resultInfo.get("Id");
61                String error = resultInfo.get("Error");
62                if (success && created) {
63                    System.out.println("Created row with id " + id);
64                } else if (!success) {
65                    System.out.println("Failed with error: " + error);
66                }
67            }
68        }
69    }
70
71
72
73    private void closeJob(BulkConnection connection, String jobId)
74          throws AsyncApiException {
75        JobInfo job = new JobInfo();
76        job.setId(jobId);
77        job.setState(JobStateEnum.Closed);
78        connection.updateJob(job);
79    }
80
81
82
83    /**
84     * Wait for a job to complete by polling the Bulk API.
85     * 
86     * @param connection
87     *            BulkConnection used to check results.
88     * @param job
89     *            The job awaiting completion.
90     * @param batchInfoList
91     *            List of batches for this job.
92     * @throws AsyncApiException
93     */
94    private void awaitCompletion(BulkConnection connection, JobInfo job,
95          List<BatchInfo> batchInfoList)
96            throws AsyncApiException {
97        long sleepTime = 0L;
98        Set<String> incomplete = new HashSet<String>();
99        for (BatchInfo bi : batchInfoList) {
100            incomplete.add(bi.getId());
101        }
102        while (!incomplete.isEmpty()) {
103            try {
104                Thread.sleep(sleepTime);
105            } catch (InterruptedException e) {}
106            System.out.println("Awaiting results..." + incomplete.size());
107            sleepTime = 10000L;
108            BatchInfo[] statusList =
109              connection.getBatchInfoList(job.getId()).getBatchInfo();
110            for (BatchInfo b : statusList) {
111                if (b.getState() == BatchStateEnum.Completed
112                  || b.getState() == BatchStateEnum.Failed) {
113                    if (incomplete.remove(b.getId())) {
114                        System.out.println("BATCH STATUS:\n" + b);
115                    }
116                }
117            }
118        }
119    }
120
121
122
123    /**
124     * Create a new job using the Bulk API.
125     * 
126     * @param sobjectType
127     *            The object type being loaded, such as "Account"
128     * @param connection
129     *            BulkConnection used to create the new job.
130     * @return The JobInfo for the new job.
131     * @throws AsyncApiException
132     */
133    private JobInfo createJob(String sobjectType, BulkConnection connection)
134          throws AsyncApiException {
135        JobInfo job = new JobInfo();
136        job.setObject(sobjectType);
137        job.setOperation(OperationEnum.insert);
138        job.setContentType(ContentType.CSV);
139        job = connection.createJob(job);
140        System.out.println(job);
141        return job;
142    }
143
144    
145
146    /**
147     * Create the BulkConnection used to call Bulk API operations.
148     */
149    private BulkConnection getBulkConnection(String userName, String password)
150          throws ConnectionException, AsyncApiException {
151        ConnectorConfig partnerConfig = new ConnectorConfig();
152        partnerConfig.setUsername(userName);
153        partnerConfig.setPassword(password);
154        partnerConfig.setAuthEndpoint("https://login.salesforce.com/services/Soap/u/39.0");
155        // Creating the connection automatically handles login and stores
156        // the session in partnerConfig
157        new PartnerConnection(partnerConfig);
158        // When PartnerConnection is instantiated, a login is implicitly
159        // executed and, if successful,
160        // a valid session is stored in the ConnectorConfig instance.
161        // Use this key to initialize a BulkConnection:
162        ConnectorConfig config = new ConnectorConfig();
163        config.setSessionId(partnerConfig.getSessionId());
164        // The endpoint for the Bulk API service is the same as for the normal
165        // SOAP uri until the /Soap/ part. From here it's '/async/versionNumber'
166        String soapEndpoint = partnerConfig.getServiceEndpoint();
167        String apiVersion = "39.0";
168        String restEndpoint = soapEndpoint.substring(0, soapEndpoint.indexOf("Soap/"))
169            + "async/" + apiVersion;
170        config.setRestEndpoint(restEndpoint);
171        // This should only be false when doing debugging.
172        config.setCompression(true);
173        // Set this to true to see HTTP requests and responses on stdout
174        config.setTraceMessage(false);
175        BulkConnection connection = new BulkConnection(config);
176        return connection;
177    }
178
179
180
181    /**
182     * Create and upload batches using a CSV file.
183     * The file into the appropriate size batch files.
184     * 
185     * @param connection
186     *            Connection to use for creating batches
187     * @param jobInfo
188     *            Job associated with new batches
189     * @param csvFileName
190     *            The source file for batch data
191     */
192    private List<BatchInfo> createBatchesFromCSVFile(BulkConnection connection,
193          JobInfo jobInfo, String csvFileName)
194            throws IOException, AsyncApiException {
195        List<BatchInfo> batchInfos = new ArrayList<BatchInfo>();
196        BufferedReader rdr = new BufferedReader(
197            new InputStreamReader(new FileInputStream(csvFileName))
198        );
199        // read the CSV header row
200        byte[] headerBytes = (rdr.readLine() + "\n").getBytes("UTF-8");
201        int headerBytesLength = headerBytes.length;
202        File tmpFile = File.createTempFile("bulkAPIInsert", ".csv");
203
204        // Split the CSV file into multiple batches
205        try {
206            FileOutputStream tmpOut = new FileOutputStream(tmpFile);
207            int maxBytesPerBatch = 10000000; // 10 million bytes per batch
208            int maxRowsPerBatch = 10000; // 10 thousand rows per batch
209            int currentBytes = 0;
210            int currentLines = 0;
211            String nextLine;
212            while ((nextLine = rdr.readLine()) != null) {
213                byte[] bytes = (nextLine + "\n").getBytes("UTF-8");
214                // Create a new batch when our batch size limit is reached
215                if (currentBytes + bytes.length > maxBytesPerBatch
216                  || currentLines > maxRowsPerBatch) {
217                    createBatch(tmpOut, tmpFile, batchInfos, connection, jobInfo);
218                    currentBytes = 0;
219                    currentLines = 0;
220                }
221                if (currentBytes == 0) {
222                    tmpOut = new FileOutputStream(tmpFile);
223                    tmpOut.write(headerBytes);
224                    currentBytes = headerBytesLength;
225                    currentLines = 1;
226                }
227                tmpOut.write(bytes);
228                currentBytes += bytes.length;
229                currentLines++;
230            }
231            // Finished processing all rows
232            // Create a final batch for any remaining data
233            if (currentLines > 1) {
234                createBatch(tmpOut, tmpFile, batchInfos, connection, jobInfo);
235            }
236        } finally {
237            tmpFile.delete();
238        }
239        return batchInfos;
240    }
241
242    /**
243     * Create a batch by uploading the contents of the file.
244     * This closes the output stream.
245     * 
246     * @param tmpOut
247     *            The output stream used to write the CSV data for a single batch.
248     * @param tmpFile
249     *            The file associated with the above stream.
250     * @param batchInfos
251     *            The batch info for the newly created batch is added to this list.
252     * @param connection
253     *            The BulkConnection used to create the new batch.
254     * @param jobInfo
255     *            The JobInfo associated with the new batch.
256     */
257    private void createBatch(FileOutputStream tmpOut, File tmpFile,
258      List<BatchInfo> batchInfos, BulkConnection connection, JobInfo jobInfo)
259              throws IOException, AsyncApiException {
260        tmpOut.flush();
261        tmpOut.close();
262        FileInputStream tmpInputStream = new FileInputStream(tmpFile);
263        try {
264            BatchInfo batchInfo =
265              connection.createBatchFromStream(jobInfo, tmpInputStream);
266            System.out.println(batchInfo);
267            batchInfos.add(batchInfo);
268
269        } finally {
270            tmpInputStream.close();
271        }
272    }
273
274
275}