Newer Version Available
Walk Through the Sample Code
This code sets up the packages and classes in the WSC toolkit and the code generated from the partner WSDL:
1import java.io.*;
2import java.util.*;
3
4import com.sforce.async.*;
5import com.sforce.soap.partner.PartnerConnection;
6import com.sforce.ws.ConnectionException;
7import com.sforce.ws.ConnectorConfig;Set Up the main() Method
This code sets up the main() method for the class. It calls the runSample() method, which encompasses the processing logic for the sample. We look at the methods called in runSample() in subsequent sections.
1public static void main(String[] args)
2 throws AsyncApiException, ConnectionException, IOException {
3 BulkExample example = new BulkExample();
4 // Replace arguments below with your credentials and test file name
5 // The first parameter indicates that we are loading Account records
6 example.runSample("Account", "myUser@myOrg.com", "myPassword", "mySampleData.csv");
7 }
8
9 /**
10 * Creates a Bulk API job and uploads batches for a CSV file.
11 */
12 public void runSample(String sobjectType, String userName,
13 String password, String sampleFileName)
14 throws AsyncApiException, ConnectionException, IOException {
15 BulkConnection connection = getBulkConnection(userName, password);
16 JobInfo job = createJob(sobjectType, connection);
17 List<BatchInfo> batchInfoList = createBatchesFromCSVFile(connection, job,
18 sampleFileName);
19 closeJob(connection, job.getId());
20 awaitCompletion(connection, job, batchInfoList);
21 checkResults(connection, job, batchInfoList);
22 }Login and Configure BulkConnection
This code logs in using a partner connection (PartnerConnection) and then reuses the session to create a Bulk API connection (BulkConnection).
1/**
2 * Create the BulkConnection used to call Bulk API operations.
3 */
4 private BulkConnection getBulkConnection(String userName, String password)
5 throws ConnectionException, AsyncApiException {
6 ConnectorConfig partnerConfig = new ConnectorConfig();
7 partnerConfig.setUsername(userName);
8 partnerConfig.setPassword(password);
9 partnerConfig.setAuthEndpoint("https://login.salesforce.com/services/Soap/u/66.0");
10 // Creating the connection automatically handles login and stores
11 // the session in partnerConfig
12 new PartnerConnection(partnerConfig);
13 // When PartnerConnection is instantiated, a login is implicitly
14 // executed and, if successful,
15 // a valid session is stored in the ConnectorConfig instance.
16 // Use this key to initialize a BulkConnection:
17 ConnectorConfig config = new ConnectorConfig();
18 config.setSessionId(partnerConfig.getSessionId());
19 // The endpoint for the Bulk API service is the same as for the normal
20 // SOAP uri until the /Soap/ part. From here it's '/async/versionNumber'
21 String soapEndpoint = partnerConfig.getServiceEndpoint();
22 String apiVersion = "66.0";
23 String restEndpoint = soapEndpoint.substring(0, soapEndpoint.indexOf("Soap/"))
24 + "async/" + apiVersion;
25 config.setRestEndpoint(restEndpoint);
26 // This should only be false when doing debugging.
27 config.setCompression(true);
28 // Set this to true to see HTTP requests and responses on stdout
29 config.setTraceMessage(false);
30 BulkConnection connection = new BulkConnection(config);
31 return connection;
32 }This BulkConnection instance is the base for using the Bulk API. The instance can be reused for the rest of the application lifespan.
Create a Job
After creating the connection, create a job. Data is always processed in the context of a job. The job specifies the details about the data being processed: which operation is being executed (insert, update, upsert, or delete) and the object type. This code creates a new insert job on the Account object.
1/**
2 * Create a new job using the Bulk API.
3 *
4 * @param sobjectType
5 * The object type being loaded, such as "Account"
6 * @param connection
7 * BulkConnection used to create the new job.
8 * @return The JobInfo for the new job.
9 * @throws AsyncApiException
10 */
11 private JobInfo createJob(String sobjectType, BulkConnection connection)
12 throws AsyncApiException {
13 JobInfo job = new JobInfo();
14 job.setObject(sobjectType);
15 job.setOperation(OperationEnum.insert);
16 job.setContentType(ContentType.CSV);
17 job = connection.createJob(job);
18 System.out.println(job);
19 return job;
20 }When a job is created, it’s in the Open state. In this state, new batches can be added to the job. When a job is Closed, batches can no longer be added.
Add Batches to the Job
Data is processed in a series of batch requests. Each request is an HTTP POST containing the data set in XML format in the body. Your client application determines how many batches are used to process the whole data set as long as the batch size and total number of batches per day are within the limits specified in Limits.
The processing of each batch comes with an overhead. Make batch sizes large enough to minimize the overhead processing cost, and small enough to be handled and transferred easily. Batch sizes between 1,000 and 10,000 records are considered reasonable.
This code splits a CSV file into smaller batch files and uploads them to Salesforce.
1/**
2 * Create and upload batches using a CSV file.
3 * The file into the appropriate size batch files.
4 *
5 * @param connection
6 * Connection to use for creating batches
7 * @param jobInfo
8 * Job associated with new batches
9 * @param csvFileName
10 * The source file for batch data
11 */
12 private List<BatchInfo> createBatchesFromCSVFile(BulkConnection connection,
13 JobInfo jobInfo, String csvFileName)
14 throws IOException, AsyncApiException {
15 List<BatchInfo> batchInfos = new ArrayList<BatchInfo>();
16 BufferedReader rdr = new BufferedReader(
17 new InputStreamReader(new FileInputStream(csvFileName))
18 );
19 // read the CSV header row
20 byte[] headerBytes = (rdr.readLine() + "\n").getBytes("UTF-8");
21 int headerBytesLength = headerBytes.length;
22 File tmpFile = File.createTempFile("bulkAPIInsert", ".csv");
23
24 // Split the CSV file into multiple batches
25 try {
26 FileOutputStream tmpOut = new FileOutputStream(tmpFile);
27 int maxBytesPerBatch = 10000000; // 10 million bytes per batch
28 int maxRowsPerBatch = 10000; // 10 thousand rows per batch
29 int currentBytes = 0;
30 int currentLines = 0;
31 String nextLine;
32 while ((nextLine = rdr.readLine()) != null) {
33 byte[] bytes = (nextLine + "\n").getBytes("UTF-8");
34 // Create a new batch when our batch size limit is reached
35 if (currentBytes + bytes.length > maxBytesPerBatch
36 || currentLines > maxRowsPerBatch) {
37 createBatch(tmpOut, tmpFile, batchInfos, connection, jobInfo);
38 currentBytes = 0;
39 currentLines = 0;
40 }
41 if (currentBytes == 0) {
42 tmpOut = new FileOutputStream(tmpFile);
43 tmpOut.write(headerBytes);
44 currentBytes = headerBytesLength;
45 currentLines = 1;
46 }
47 tmpOut.write(bytes);
48 currentBytes += bytes.length;
49 currentLines++;
50 }
51 // Finished processing all rows
52 // Create a final batch for any remaining data
53 if (currentLines > 1) {
54 createBatch(tmpOut, tmpFile, batchInfos, connection, jobInfo);
55 }
56 } finally {
57 tmpFile.delete();
58 }
59 return batchInfos;
60 }
61
62 /**
63 * Create a batch by uploading the contents of the file.
64 * This closes the output stream.
65 *
66 * @param tmpOut
67 * The output stream used to write the CSV data for a single batch.
68 * @param tmpFile
69 * The file associated with the above stream.
70 * @param batchInfos
71 * The batch info for the newly created batch is added to this list.
72 * @param connection
73 * The BulkConnection used to create the new batch.
74 * @param jobInfo
75 * The JobInfo associated with the new batch.
76 */
77 private void createBatch(FileOutputStream tmpOut, File tmpFile,
78 List<BatchInfo> batchInfos, BulkConnection connection, JobInfo jobInfo)
79 throws IOException, AsyncApiException {
80 tmpOut.flush();
81 tmpOut.close();
82 FileInputStream tmpInputStream = new FileInputStream(tmpFile);
83 try {
84 BatchInfo batchInfo =
85 connection.createBatchFromStream(jobInfo, tmpInputStream);
86 System.out.println(batchInfo);
87 batchInfos.add(batchInfo);
88
89 } finally {
90 tmpInputStream.close();
91 }
92 }When the server receives a batch, it’s immediately queued for processing. Errors in formatting aren’t reported when sending the batch. These errors are reported in the result data when the batch is processed.
Close the Job
After all batches are added to a job, close the job. Closing the job ensures that processing of all batches can finish.
1private void closeJob(BulkConnection connection, String jobId)
2 throws AsyncApiException {
3 JobInfo job = new JobInfo();
4 job.setId(jobId);
5 job.setState(JobStateEnum.Closed);
6 connection.updateJob(job);
7 }Check Status on Batches
Batches are processed in the background. The size of the data set determines how long processing takes. During processing, you can retrieve and check the status of all batches, and you can see when processing is complete.
1/**
2 * Wait for a job to complete by polling the Bulk API.
3 *
4 * @param connection
5 * BulkConnection used to check results.
6 * @param job
7 * The job awaiting completion.
8 * @param batchInfoList
9 * List of batches for this job.
10 * @throws AsyncApiException
11 */
12 private void awaitCompletion(BulkConnection connection, JobInfo job,
13 List<BatchInfo> batchInfoList)
14 throws AsyncApiException {
15 long sleepTime = 0L;
16 Set<String> incomplete = new HashSet<String>();
17 for (BatchInfo bi : batchInfoList) {
18 incomplete.add(bi.getId());
19 }
20 while (!incomplete.isEmpty()) {
21 try {
22 Thread.sleep(sleepTime);
23 } catch (InterruptedException e) {}
24 System.out.println("Awaiting results..." + incomplete.size());
25 sleepTime = 10000L;
26 BatchInfo[] statusList =
27 connection.getBatchInfoList(job.getId()).getBatchInfo();
28 for (BatchInfo b : statusList) {
29 if (b.getState() == BatchStateEnum.Completed
30 || b.getState() == BatchStateEnum.Failed) {
31 if (incomplete.remove(b.getId())) {
32 System.out.println("BATCH STATUS:\n" + b);
33 }
34 }
35 }
36 }
37 }A batch is done when it's either failed or completed. This code loops infinitely until all the batches for the job have either failed or completed.
Get Results For a Job
You can retrieve the results of each batch when all batches are processed. Retrieve results whether the batch succeeded or failed, or even if the job was aborted, because only the result sets indicate the status of individual records. To properly pair a result with its corresponding record, the code must not lose track of how the batches correspond to the original data set. So keep the original list of batches from when they were created and use this list to retrieve results, as shown in this example:
1/**
2 * Gets the results of the operation and checks for errors.
3 */
4 private void checkResults(BulkConnection connection, JobInfo job,
5 List<BatchInfo> batchInfoList)
6 throws AsyncApiException, IOException {
7 // batchInfoList was populated when batches were created and submitted
8 for (BatchInfo b : batchInfoList) {
9 CSVReader rdr =
10 new CSVReader(connection.getBatchResultStream(job.getId(), b.getId()));
11 List<String> resultHeader = rdr.nextRecord();
12 int resultCols = resultHeader.size();
13
14 List<String> row;
15 while ((row = rdr.nextRecord()) != null) {
16 Map<String, String> resultInfo = new HashMap<String, String>();
17 for (int i = 0; i < resultCols; i++) {
18 resultInfo.put(resultHeader.get(i), row.get(i));
19 }
20 boolean success = Boolean.valueOf(resultInfo.get("Success"));
21 boolean created = Boolean.valueOf(resultInfo.get("Created"));
22 String id = resultInfo.get("Id");
23 String error = resultInfo.get("Error");
24 if (success && created) {
25 System.out.println("Created row with id " + id);
26 } else if (!success) {
27 System.out.println("Failed with error: " + error);
28 }
29 }
30 }
31 }This code retrieves the results for each record and reports whether the operation succeeded or failed. If an error occurred for a record, the code prints out the error.
Complete Quick Start Sample
Now that you're more familiar with jobs and batches, you can copy and paste the entire quick start sample and use it:
1import java.io.*;
2import java.util.*;
3
4import com.sforce.async.*;
5import com.sforce.soap.partner.PartnerConnection;
6import com.sforce.ws.ConnectionException;
7import com.sforce.ws.ConnectorConfig;
8
9
10public class BulkExample {
11
12
13 public static void main(String[] args)
14 throws AsyncApiException, ConnectionException, IOException {
15 BulkExample example = new BulkExample();
16 // Replace arguments below with your credentials and test file name
17 // The first parameter indicates that we are loading Account records
18 example.runSample("Account", "myUser@myOrg.com", "myPassword", "mySampleData.csv");
19 }
20
21 /**
22 * Creates a Bulk API job and uploads batches for a CSV file.
23 */
24 public void runSample(String sobjectType, String userName,
25 String password, String sampleFileName)
26 throws AsyncApiException, ConnectionException, IOException {
27 BulkConnection connection = getBulkConnection(userName, password);
28 JobInfo job = createJob(sobjectType, connection);
29 List<BatchInfo> batchInfoList = createBatchesFromCSVFile(connection, job,
30 sampleFileName);
31 closeJob(connection, job.getId());
32 awaitCompletion(connection, job, batchInfoList);
33 checkResults(connection, job, batchInfoList);
34 }
35
36
37
38 /**
39 * Gets the results of the operation and checks for errors.
40 */
41 private void checkResults(BulkConnection connection, JobInfo job,
42 List<BatchInfo> batchInfoList)
43 throws AsyncApiException, IOException {
44 // batchInfoList was populated when batches were created and submitted
45 for (BatchInfo b : batchInfoList) {
46 CSVReader rdr =
47 new CSVReader(connection.getBatchResultStream(job.getId(), b.getId()));
48 List<String> resultHeader = rdr.nextRecord();
49 int resultCols = resultHeader.size();
50
51 List<String> row;
52 while ((row = rdr.nextRecord()) != null) {
53 Map<String, String> resultInfo = new HashMap<String, String>();
54 for (int i = 0; i < resultCols; i++) {
55 resultInfo.put(resultHeader.get(i), row.get(i));
56 }
57 boolean success = Boolean.valueOf(resultInfo.get("Success"));
58 boolean created = Boolean.valueOf(resultInfo.get("Created"));
59 String id = resultInfo.get("Id");
60 String error = resultInfo.get("Error");
61 if (success && created) {
62 System.out.println("Created row with id " + id);
63 } else if (!success) {
64 System.out.println("Failed with error: " + error);
65 }
66 }
67 }
68 }
69
70
71
72 private void closeJob(BulkConnection connection, String jobId)
73 throws AsyncApiException {
74 JobInfo job = new JobInfo();
75 job.setId(jobId);
76 job.setState(JobStateEnum.Closed);
77 connection.updateJob(job);
78 }
79
80
81
82 /**
83 * Wait for a job to complete by polling the Bulk API.
84 *
85 * @param connection
86 * BulkConnection used to check results.
87 * @param job
88 * The job awaiting completion.
89 * @param batchInfoList
90 * List of batches for this job.
91 * @throws AsyncApiException
92 */
93 private void awaitCompletion(BulkConnection connection, JobInfo job,
94 List<BatchInfo> batchInfoList)
95 throws AsyncApiException {
96 long sleepTime = 0L;
97 Set<String> incomplete = new HashSet<String>();
98 for (BatchInfo bi : batchInfoList) {
99 incomplete.add(bi.getId());
100 }
101 while (!incomplete.isEmpty()) {
102 try {
103 Thread.sleep(sleepTime);
104 } catch (InterruptedException e) {}
105 System.out.println("Awaiting results..." + incomplete.size());
106 sleepTime = 10000L;
107 BatchInfo[] statusList =
108 connection.getBatchInfoList(job.getId()).getBatchInfo();
109 for (BatchInfo b : statusList) {
110 if (b.getState() == BatchStateEnum.Completed
111 || b.getState() == BatchStateEnum.Failed) {
112 if (incomplete.remove(b.getId())) {
113 System.out.println("BATCH STATUS:\n" + b);
114 }
115 }
116 }
117 }
118 }
119
120
121
122 /**
123 * Create a new job using the Bulk API.
124 *
125 * @param sobjectType
126 * The object type being loaded, such as "Account"
127 * @param connection
128 * BulkConnection used to create the new job.
129 * @return The JobInfo for the new job.
130 * @throws AsyncApiException
131 */
132 private JobInfo createJob(String sobjectType, BulkConnection connection)
133 throws AsyncApiException {
134 JobInfo job = new JobInfo();
135 job.setObject(sobjectType);
136 job.setOperation(OperationEnum.insert);
137 job.setContentType(ContentType.CSV);
138 job = connection.createJob(job);
139 System.out.println(job);
140 return job;
141 }
142
143
144
145 /**
146 * Create the BulkConnection used to call Bulk API operations.
147 */
148 private BulkConnection getBulkConnection(String userName, String password)
149 throws ConnectionException, AsyncApiException {
150 ConnectorConfig partnerConfig = new ConnectorConfig();
151 partnerConfig.setUsername(userName);
152 partnerConfig.setPassword(password);
153 partnerConfig.setAuthEndpoint("https://login.salesforce.com/services/Soap/u/66.0");
154 // Creating the connection automatically handles login and stores
155 // the session in partnerConfig
156 new PartnerConnection(partnerConfig);
157 // When PartnerConnection is instantiated, a login is implicitly
158 // executed and, if successful,
159 // a valid session is stored in the ConnectorConfig instance.
160 // Use this key to initialize a BulkConnection:
161 ConnectorConfig config = new ConnectorConfig();
162 config.setSessionId(partnerConfig.getSessionId());
163 // The endpoint for the Bulk API service is the same as for the normal
164 // SOAP uri until the /Soap/ part. From here it's '/async/versionNumber'
165 String soapEndpoint = partnerConfig.getServiceEndpoint();
166 String apiVersion = "66.0";
167 String restEndpoint = soapEndpoint.substring(0, soapEndpoint.indexOf("Soap/"))
168 + "async/" + apiVersion;
169 config.setRestEndpoint(restEndpoint);
170 // This should only be false when doing debugging.
171 config.setCompression(true);
172 // Set this to true to see HTTP requests and responses on stdout
173 config.setTraceMessage(false);
174 BulkConnection connection = new BulkConnection(config);
175 return connection;
176 }
177
178
179
180 /**
181 * Create and upload batches using a CSV file.
182 * The file into the appropriate size batch files.
183 *
184 * @param connection
185 * Connection to use for creating batches
186 * @param jobInfo
187 * Job associated with new batches
188 * @param csvFileName
189 * The source file for batch data
190 */
191 private List<BatchInfo> createBatchesFromCSVFile(BulkConnection connection,
192 JobInfo jobInfo, String csvFileName)
193 throws IOException, AsyncApiException {
194 List<BatchInfo> batchInfos = new ArrayList<BatchInfo>();
195 BufferedReader rdr = new BufferedReader(
196 new InputStreamReader(new FileInputStream(csvFileName))
197 );
198 // read the CSV header row
199 byte[] headerBytes = (rdr.readLine() + "\n").getBytes("UTF-8");
200 int headerBytesLength = headerBytes.length;
201 File tmpFile = File.createTempFile("bulkAPIInsert", ".csv");
202
203 // Split the CSV file into multiple batches
204 try {
205 FileOutputStream tmpOut = new FileOutputStream(tmpFile);
206 int maxBytesPerBatch = 10000000; // 10 million bytes per batch
207 int maxRowsPerBatch = 10000; // 10 thousand rows per batch
208 int currentBytes = 0;
209 int currentLines = 0;
210 String nextLine;
211 while ((nextLine = rdr.readLine()) != null) {
212 byte[] bytes = (nextLine + "\n").getBytes("UTF-8");
213 // Create a new batch when our batch size limit is reached
214 if (currentBytes + bytes.length > maxBytesPerBatch
215 || currentLines > maxRowsPerBatch) {
216 createBatch(tmpOut, tmpFile, batchInfos, connection, jobInfo);
217 currentBytes = 0;
218 currentLines = 0;
219 }
220 if (currentBytes == 0) {
221 tmpOut = new FileOutputStream(tmpFile);
222 tmpOut.write(headerBytes);
223 currentBytes = headerBytesLength;
224 currentLines = 1;
225 }
226 tmpOut.write(bytes);
227 currentBytes += bytes.length;
228 currentLines++;
229 }
230 // Finished processing all rows
231 // Create a final batch for any remaining data
232 if (currentLines > 1) {
233 createBatch(tmpOut, tmpFile, batchInfos, connection, jobInfo);
234 }
235 } finally {
236 tmpFile.delete();
237 }
238 return batchInfos;
239 }
240
241 /**
242 * Create a batch by uploading the contents of the file.
243 * This closes the output stream.
244 *
245 * @param tmpOut
246 * The output stream used to write the CSV data for a single batch.
247 * @param tmpFile
248 * The file associated with the above stream.
249 * @param batchInfos
250 * The batch info for the newly created batch is added to this list.
251 * @param connection
252 * The BulkConnection used to create the new batch.
253 * @param jobInfo
254 * The JobInfo associated with the new batch.
255 */
256 private void createBatch(FileOutputStream tmpOut, File tmpFile,
257 List<BatchInfo> batchInfos, BulkConnection connection, JobInfo jobInfo)
258 throws IOException, AsyncApiException {
259 tmpOut.flush();
260 tmpOut.close();
261 FileInputStream tmpInputStream = new FileInputStream(tmpFile);
262 try {
263 BatchInfo batchInfo =
264 connection.createBatchFromStream(jobInfo, tmpInputStream);
265 System.out.println(batchInfo);
266 batchInfos.add(batchInfo);
267
268 } finally {
269 tmpInputStream.close();
270 }
271 }
272
273
274}