Newer Version Available
Walk Through the Sample Code
This code sets up the packages and classes in the WSC toolkit and the code generated from the partner WSDL:
1import java.io.*;
2import java.util.*;
3
4import com.sforce.async.*;
5import com.sforce.soap.partner.PartnerConnection;
6import com.sforce.ws.ConnectionException;
7import com.sforce.ws.ConnectorConfig;Set Up the main() Method
This code sets up the main() method for the class. It calls the runSample() method, which encompasses the processing logic for the sample. We look at the methods called in runSample() in subsequent sections.
1public static void main(String[] args)
2 throws AsyncApiException, ConnectionException, IOException {
3 BulkExample example = new BulkExample();
4 // Replace arguments below with your credentials and test file name
5 // The first parameter indicates that we are loading Account records
6 example.runSample("Account", "myUser@myOrg.com", "myPassword", "mySampleData.csv");
7 }
8
9 /**
10 * Creates a Bulk API job and uploads batches for a CSV file.
11 */
12 public void runSample(String sobjectType, String userName,
13 String password, String sampleFileName)
14 throws AsyncApiException, ConnectionException, IOException {
15 BulkConnection connection = getBulkConnection(userName, password);
16 JobInfo job = createJob(sobjectType, connection);
17 List<BatchInfo> batchInfoList = createBatchesFromCSVFile(connection, job,
18 sampleFileName);
19 closeJob(connection, job.getId());
20 awaitCompletion(connection, job, batchInfoList);
21 checkResults(connection, job, batchInfoList);
22 }Login and Configure BulkConnection
This code logs in using a partner connection (PartnerConnection) and then reuses the session to create a Bulk API connection (BulkConnection).
1/**
2 * Create the BulkConnection used to call Bulk API operations.
3 */
4 private BulkConnection getBulkConnection(String userName, String password)
5 throws ConnectionException, AsyncApiException {
6 ConnectorConfig partnerConfig = new ConnectorConfig();
7 partnerConfig.setUsername(userName);
8 partnerConfig.setPassword(password);
9 partnerConfig.setAuthEndpoint("https://login.salesforce.com/services/Soap/u/53.0");
10 // Creating the connection automatically handles login and stores
11 // the session in partnerConfig
12 new PartnerConnection(partnerConfig);
13 // When PartnerConnection is instantiated, a login is implicitly
14 // executed and, if successful,
15 // a valid session is stored in the ConnectorConfig instance.
16 // Use this key to initialize a BulkConnection:
17 ConnectorConfig config = new ConnectorConfig();
18 config.setSessionId(partnerConfig.getSessionId());
19 // The endpoint for the Bulk API service is the same as for the normal
20 // SOAP uri until the /Soap/ part. From here it's '/async/versionNumber'
21 String soapEndpoint = partnerConfig.getServiceEndpoint();
22 String apiVersion = "53.0";
23 String restEndpoint = soapEndpoint.substring(0, soapEndpoint.indexOf("Soap/"))
24 + "async/" + apiVersion;
25 config.setRestEndpoint(restEndpoint);
26 // This should only be false when doing debugging.
27 config.setCompression(true);
28 // Set this to true to see HTTP requests and responses on stdout
29 config.setTraceMessage(false);
30 BulkConnection connection = new BulkConnection(config);
31 return connection;
32 }This BulkConnection instance is the base for using the Bulk API. The instance can be reused for the rest of the application lifespan.
Create a Job
After creating the connection, create a job. Data is always processed in the context of a job. The job specifies the details about the data being processed: which operation is being executed (insert, update, upsert, or delete) and the object type. This code creates a new insert job on the Account object.
1/**
2 * Create a new job using the Bulk API.
3 *
4 * @param sobjectType
5 * The object type being loaded, such as "Account"
6 * @param connection
7 * BulkConnection used to create the new job.
8 * @return The JobInfo for the new job.
9 * @throws AsyncApiException
10 */
11 private JobInfo createJob(String sobjectType, BulkConnection connection)
12 throws AsyncApiException {
13 JobInfo job = new JobInfo();
14 job.setObject(sobjectType);
15 job.setOperation(OperationEnum.insert);
16 job.setContentType(ContentType.CSV);
17 job = connection.createJob(job);
18 System.out.println(job);
19 return job;
20 }When a job is created, it’s in the Open state. In this state, new batches can be added to the job. When a job is Closed, batches can no longer be added.
Add Batches to the Job
Data is processed in a series of batch requests. Each request is an HTTP POST containing the data set in XML format in the body. Your client application determines how many batches are used to process the whole data set as long as the batch size and total number of batches per day are within the limits specified in Limits.
The processing of each batch comes with an overhead. Make batch sizes large enough to minimize the overhead processing cost, and small enough to be handled and transferred easily. Batch sizes between 1,000 and 10,000 records are considered reasonable.
This code splits a CSV file into smaller batch files and uploads them to Salesforce.
1/**
2 * Create and upload batches using a CSV file.
3 * The file into the appropriate size batch files.
4 *
5 * @param connection
6 * Connection to use for creating batches
7 * @param jobInfo
8 * Job associated with new batches
9 * @param csvFileName
10 * The source file for batch data
11 */
12 private List<BatchInfo> createBatchesFromCSVFile(BulkConnection connection,
13 JobInfo jobInfo, String csvFileName)
14 throws IOException, AsyncApiException {
15 List<BatchInfo> batchInfos = new ArrayList<BatchInfo>();
16 BufferedReader rdr = new BufferedReader(
17 new InputStreamReader(new FileInputStream(csvFileName))
18 );
19 // read the CSV header row
20 byte[] headerBytes = (rdr.readLine() + "\n").getBytes("UTF-8");
21 int headerBytesLength = headerBytes.length;
22 File tmpFile = File.createTempFile("bulkAPIInsert", ".csv");
23
24 // Split the CSV file into multiple batches
25 try {
26 FileOutputStream tmpOut = new FileOutputStream(tmpFile);
27 int maxBytesPerBatch = 10000000; // 10 million bytes per batch
28 int maxRowsPerBatch = 10000; // 10 thousand rows per batch
29 int currentBytes = 0;
30 int currentLines = 0;
31 String nextLine;
32 while ((nextLine = rdr.readLine()) != null) {
33 byte[] bytes = (nextLine + "\n").getBytes("UTF-8");
34 // Create a new batch when our batch size limit is reached
35 if (currentBytes + bytes.length > maxBytesPerBatch
36 || currentLines > maxRowsPerBatch) {
37 createBatch(tmpOut, tmpFile, batchInfos, connection, jobInfo);
38 currentBytes = 0;
39 currentLines = 0;
40 }
41 if (currentBytes == 0) {
42 tmpOut = new FileOutputStream(tmpFile);
43 tmpOut.write(headerBytes);
44 currentBytes = headerBytesLength;
45 currentLines = 1;
46 }
47 tmpOut.write(bytes);
48 currentBytes += bytes.length;
49 currentLines++;
50 }
51 // Finished processing all rows
52 // Create a final batch for any remaining data
53 if (currentLines > 1) {
54 createBatch(tmpOut, tmpFile, batchInfos, connection, jobInfo);
55 }
56 } finally {
57 tmpFile.delete();
58 }
59 return batchInfos;
60 }
61
62 /**
63 * Create a batch by uploading the contents of the file.
64 * This closes the output stream.
65 *
66 * @param tmpOut
67 * The output stream used to write the CSV data for a single batch.
68 * @param tmpFile
69 * The file associated with the above stream.
70 * @param batchInfos
71 * The batch info for the newly created batch is added to this list.
72 * @param connection
73 * The BulkConnection used to create the new batch.
74 * @param jobInfo
75 * The JobInfo associated with the new batch.
76 */
77 private void createBatch(FileOutputStream tmpOut, File tmpFile,
78 List<BatchInfo> batchInfos, BulkConnection connection, JobInfo jobInfo)
79 throws IOException, AsyncApiException {
80 tmpOut.flush();
81 tmpOut.close();
82 FileInputStream tmpInputStream = new FileInputStream(tmpFile);
83 try {
84 BatchInfo batchInfo =
85 connection.createBatchFromStream(jobInfo, tmpInputStream);
86 System.out.println(batchInfo);
87 batchInfos.add(batchInfo);
88
89 } finally {
90 tmpInputStream.close();
91 }
92 }When the server receives a batch, it’s immediately queued for processing. Errors in formatting aren’t reported when sending the batch. These errors are reported in the result data when the batch is processed.
Close the Job
After all batches are added to a job, close the job. Closing the job ensures that processing of all batches can finish.
1private void closeJob(BulkConnection connection, String jobId)
2 throws AsyncApiException {
3 JobInfo job = new JobInfo();
4 job.setId(jobId);
5 job.setState(JobStateEnum.Closed);
6 connection.updateJob(job);
7 }Check Status on Batches
Batches are processed in the background. The size of the data set determines how long processing takes. During processing, you can retrieve and check the status of all batches, and you can see when processing is complete.
1/**
2 * Wait for a job to complete by polling the Bulk API.
3 *
4 * @param connection
5 * BulkConnection used to check results.
6 * @param job
7 * The job awaiting completion.
8 * @param batchInfoList
9 * List of batches for this job.
10 * @throws AsyncApiException
11 */
12 private void awaitCompletion(BulkConnection connection, JobInfo job,
13 List<BatchInfo> batchInfoList)
14 throws AsyncApiException {
15 long sleepTime = 0L;
16 Set<String> incomplete = new HashSet<String>();
17 for (BatchInfo bi : batchInfoList) {
18 incomplete.add(bi.getId());
19 }
20 while (!incomplete.isEmpty()) {
21 try {
22 Thread.sleep(sleepTime);
23 } catch (InterruptedException e) {}
24 System.out.println("Awaiting results..." + incomplete.size());
25 sleepTime = 10000L;
26 BatchInfo[] statusList =
27 connection.getBatchInfoList(job.getId()).getBatchInfo();
28 for (BatchInfo b : statusList) {
29 if (b.getState() == BatchStateEnum.Completed
30 || b.getState() == BatchStateEnum.Failed) {
31 if (incomplete.remove(b.getId())) {
32 System.out.println("BATCH STATUS:\n" + b);
33 }
34 }
35 }
36 }
37 }A batch is done when it's either failed or completed. This code loops infinitely until all the batches for the job have either failed or completed.
Get Results For a Job
You can retrieve the results of each batch when all batches are processed. Retrieve results whether the batch succeeded or failed, or even if the job was aborted, because only the result sets indicate the status of individual records. To properly pair a result with its corresponding record, the code must not lose track of how the batches correspond to the original data set. So keep the original list of batches from when they were created and use this list to retrieve results, as shown in this example:
1/**
2 * Gets the results of the operation and checks for errors.
3 */
4 private void checkResults(BulkConnection connection, JobInfo job,
5 List<BatchInfo> batchInfoList)
6 throws AsyncApiException, IOException {
7 // batchInfoList was populated when batches were created and submitted
8 for (BatchInfo b : batchInfoList) {
9 CSVReader rdr =
10 new CSVReader(connection.getBatchResultStream(job.getId(), b.getId()));
11 List<String> resultHeader = rdr.nextRecord();
12 int resultCols = resultHeader.size();
13
14 List<String> row;
15 while ((row = rdr.nextRecord()) != null) {
16 Map<String, String> resultInfo = new HashMap<String, String>();
17 for (int i = 0; i < resultCols; i++) {
18 resultInfo.put(resultHeader.get(i), row.get(i));
19 }
20 boolean success = Boolean.valueOf(resultInfo.get("Success"));
21 boolean created = Boolean.valueOf(resultInfo.get("Created"));
22 String id = resultInfo.get("Id");
23 String error = resultInfo.get("Error");
24 if (success && created) {
25 System.out.println("Created row with id " + id);
26 } else if (!success) {
27 System.out.println("Failed with error: " + error);
28 }
29 }
30 }
31 }This code retrieves the results for each record and reports whether the operation succeeded or failed. If an error occurred for a record, the code prints out the error.
Complete Quick Start Sample
Now that you're more familiar with jobs and batches, you can copy and paste the entire quick start sample and use it:
1
2import java.io.*;
3import java.util.*;
4
5import com.sforce.async.*;
6import com.sforce.soap.partner.PartnerConnection;
7import com.sforce.ws.ConnectionException;
8import com.sforce.ws.ConnectorConfig;
9
10
11public class BulkExample {
12
13
14 public static void main(String[] args)
15 throws AsyncApiException, ConnectionException, IOException {
16 BulkExample example = new BulkExample();
17 // Replace arguments below with your credentials and test file name
18 // The first parameter indicates that we are loading Account records
19 example.runSample("Account", "myUser@myOrg.com", "myPassword", "mySampleData.csv");
20 }
21
22 /**
23 * Creates a Bulk API job and uploads batches for a CSV file.
24 */
25 public void runSample(String sobjectType, String userName,
26 String password, String sampleFileName)
27 throws AsyncApiException, ConnectionException, IOException {
28 BulkConnection connection = getBulkConnection(userName, password);
29 JobInfo job = createJob(sobjectType, connection);
30 List<BatchInfo> batchInfoList = createBatchesFromCSVFile(connection, job,
31 sampleFileName);
32 closeJob(connection, job.getId());
33 awaitCompletion(connection, job, batchInfoList);
34 checkResults(connection, job, batchInfoList);
35 }
36
37
38
39 /**
40 * Gets the results of the operation and checks for errors.
41 */
42 private void checkResults(BulkConnection connection, JobInfo job,
43 List<BatchInfo> batchInfoList)
44 throws AsyncApiException, IOException {
45 // batchInfoList was populated when batches were created and submitted
46 for (BatchInfo b : batchInfoList) {
47 CSVReader rdr =
48 new CSVReader(connection.getBatchResultStream(job.getId(), b.getId()));
49 List<String> resultHeader = rdr.nextRecord();
50 int resultCols = resultHeader.size();
51
52 List<String> row;
53 while ((row = rdr.nextRecord()) != null) {
54 Map<String, String> resultInfo = new HashMap<String, String>();
55 for (int i = 0; i < resultCols; i++) {
56 resultInfo.put(resultHeader.get(i), row.get(i));
57 }
58 boolean success = Boolean.valueOf(resultInfo.get("Success"));
59 boolean created = Boolean.valueOf(resultInfo.get("Created"));
60 String id = resultInfo.get("Id");
61 String error = resultInfo.get("Error");
62 if (success && created) {
63 System.out.println("Created row with id " + id);
64 } else if (!success) {
65 System.out.println("Failed with error: " + error);
66 }
67 }
68 }
69 }
70
71
72
73 private void closeJob(BulkConnection connection, String jobId)
74 throws AsyncApiException {
75 JobInfo job = new JobInfo();
76 job.setId(jobId);
77 job.setState(JobStateEnum.Closed);
78 connection.updateJob(job);
79 }
80
81
82
83 /**
84 * Wait for a job to complete by polling the Bulk API.
85 *
86 * @param connection
87 * BulkConnection used to check results.
88 * @param job
89 * The job awaiting completion.
90 * @param batchInfoList
91 * List of batches for this job.
92 * @throws AsyncApiException
93 */
94 private void awaitCompletion(BulkConnection connection, JobInfo job,
95 List<BatchInfo> batchInfoList)
96 throws AsyncApiException {
97 long sleepTime = 0L;
98 Set<String> incomplete = new HashSet<String>();
99 for (BatchInfo bi : batchInfoList) {
100 incomplete.add(bi.getId());
101 }
102 while (!incomplete.isEmpty()) {
103 try {
104 Thread.sleep(sleepTime);
105 } catch (InterruptedException e) {}
106 System.out.println("Awaiting results..." + incomplete.size());
107 sleepTime = 10000L;
108 BatchInfo[] statusList =
109 connection.getBatchInfoList(job.getId()).getBatchInfo();
110 for (BatchInfo b : statusList) {
111 if (b.getState() == BatchStateEnum.Completed
112 || b.getState() == BatchStateEnum.Failed) {
113 if (incomplete.remove(b.getId())) {
114 System.out.println("BATCH STATUS:\n" + b);
115 }
116 }
117 }
118 }
119 }
120
121
122
123 /**
124 * Create a new job using the Bulk API.
125 *
126 * @param sobjectType
127 * The object type being loaded, such as "Account"
128 * @param connection
129 * BulkConnection used to create the new job.
130 * @return The JobInfo for the new job.
131 * @throws AsyncApiException
132 */
133 private JobInfo createJob(String sobjectType, BulkConnection connection)
134 throws AsyncApiException {
135 JobInfo job = new JobInfo();
136 job.setObject(sobjectType);
137 job.setOperation(OperationEnum.insert);
138 job.setContentType(ContentType.CSV);
139 job = connection.createJob(job);
140 System.out.println(job);
141 return job;
142 }
143
144
145
146 /**
147 * Create the BulkConnection used to call Bulk API operations.
148 */
149 private BulkConnection getBulkConnection(String userName, String password)
150 throws ConnectionException, AsyncApiException {
151 ConnectorConfig partnerConfig = new ConnectorConfig();
152 partnerConfig.setUsername(userName);
153 partnerConfig.setPassword(password);
154 partnerConfig.setAuthEndpoint("https://login.salesforce.com/services/Soap/u/53.0");
155 // Creating the connection automatically handles login and stores
156 // the session in partnerConfig
157 new PartnerConnection(partnerConfig);
158 // When PartnerConnection is instantiated, a login is implicitly
159 // executed and, if successful,
160 // a valid session is stored in the ConnectorConfig instance.
161 // Use this key to initialize a BulkConnection:
162 ConnectorConfig config = new ConnectorConfig();
163 config.setSessionId(partnerConfig.getSessionId());
164 // The endpoint for the Bulk API service is the same as for the normal
165 // SOAP uri until the /Soap/ part. From here it's '/async/versionNumber'
166 String soapEndpoint = partnerConfig.getServiceEndpoint();
167 String apiVersion = "53.0";
168 String restEndpoint = soapEndpoint.substring(0, soapEndpoint.indexOf("Soap/"))
169 + "async/" + apiVersion;
170 config.setRestEndpoint(restEndpoint);
171 // This should only be false when doing debugging.
172 config.setCompression(true);
173 // Set this to true to see HTTP requests and responses on stdout
174 config.setTraceMessage(false);
175 BulkConnection connection = new BulkConnection(config);
176 return connection;
177 }
178
179
180
181 /**
182 * Create and upload batches using a CSV file.
183 * The file into the appropriate size batch files.
184 *
185 * @param connection
186 * Connection to use for creating batches
187 * @param jobInfo
188 * Job associated with new batches
189 * @param csvFileName
190 * The source file for batch data
191 */
192 private List<BatchInfo> createBatchesFromCSVFile(BulkConnection connection,
193 JobInfo jobInfo, String csvFileName)
194 throws IOException, AsyncApiException {
195 List<BatchInfo> batchInfos = new ArrayList<BatchInfo>();
196 BufferedReader rdr = new BufferedReader(
197 new InputStreamReader(new FileInputStream(csvFileName))
198 );
199 // read the CSV header row
200 byte[] headerBytes = (rdr.readLine() + "\n").getBytes("UTF-8");
201 int headerBytesLength = headerBytes.length;
202 File tmpFile = File.createTempFile("bulkAPIInsert", ".csv");
203
204 // Split the CSV file into multiple batches
205 try {
206 FileOutputStream tmpOut = new FileOutputStream(tmpFile);
207 int maxBytesPerBatch = 10000000; // 10 million bytes per batch
208 int maxRowsPerBatch = 10000; // 10 thousand rows per batch
209 int currentBytes = 0;
210 int currentLines = 0;
211 String nextLine;
212 while ((nextLine = rdr.readLine()) != null) {
213 byte[] bytes = (nextLine + "\n").getBytes("UTF-8");
214 // Create a new batch when our batch size limit is reached
215 if (currentBytes + bytes.length > maxBytesPerBatch
216 || currentLines > maxRowsPerBatch) {
217 createBatch(tmpOut, tmpFile, batchInfos, connection, jobInfo);
218 currentBytes = 0;
219 currentLines = 0;
220 }
221 if (currentBytes == 0) {
222 tmpOut = new FileOutputStream(tmpFile);
223 tmpOut.write(headerBytes);
224 currentBytes = headerBytesLength;
225 currentLines = 1;
226 }
227 tmpOut.write(bytes);
228 currentBytes += bytes.length;
229 currentLines++;
230 }
231 // Finished processing all rows
232 // Create a final batch for any remaining data
233 if (currentLines > 1) {
234 createBatch(tmpOut, tmpFile, batchInfos, connection, jobInfo);
235 }
236 } finally {
237 tmpFile.delete();
238 }
239 return batchInfos;
240 }
241
242 /**
243 * Create a batch by uploading the contents of the file.
244 * This closes the output stream.
245 *
246 * @param tmpOut
247 * The output stream used to write the CSV data for a single batch.
248 * @param tmpFile
249 * The file associated with the above stream.
250 * @param batchInfos
251 * The batch info for the newly created batch is added to this list.
252 * @param connection
253 * The BulkConnection used to create the new batch.
254 * @param jobInfo
255 * The JobInfo associated with the new batch.
256 */
257 private void createBatch(FileOutputStream tmpOut, File tmpFile,
258 List<BatchInfo> batchInfos, BulkConnection connection, JobInfo jobInfo)
259 throws IOException, AsyncApiException {
260 tmpOut.flush();
261 tmpOut.close();
262 FileInputStream tmpInputStream = new FileInputStream(tmpFile);
263 try {
264 BatchInfo batchInfo =
265 connection.createBatchFromStream(jobInfo, tmpInputStream);
266 System.out.println(batchInfo);
267 batchInfos.add(batchInfo);
268
269 } finally {
270 tmpInputStream.close();
271 }
272 }
273
274
275}