Newer Version Available

This content describes an older version of this product. View Latest

Walk Through the Sample Code

Once you have set up your client, you can begin building client applications that use the Bulk API. Use the following sample to create a client application. Each section steps through part of the code. The complete sample is included at the end.

The following code sets up the packages and classes in the WSC toolkit and the code generated from the partner WSDL:

1
2import java.io.*;
3import java.util.*;
4
5import com.sforce.async.*;
6import com.sforce.soap.partner.PartnerConnection;
7import com.sforce.ws.ConnectionException;
8import com.sforce.ws.ConnectorConfig;
9

Set Up the main() Method

This code sets up the main() method for the class. It calls the runSample() method, which encompasses the processing logic for the sample. We'll look at the methods called in runSample() in subsequent sections.

1
2    public static void main(String[] args)
3      throws AsyncApiException, ConnectionException, IOException {
4        BulkExample example = new BulkExample();
5        // Replace arguments below with your credentials and test file name
6        // The first parameter indicates that we are loading Account records
7        example.runSample("Account", "myUser@myOrg.com", "myPassword", "mySampleData.csv");
8    }
9
10    /**
11     * Creates a Bulk API job and uploads batches for a CSV file.
12     */
13    public void runSample(String sobjectType, String userName,
14              String password, String sampleFileName)
15            throws AsyncApiException, ConnectionException, IOException {
16        BulkConnection connection = getBulkConnection(userName, password);
17        JobInfo job = createJob(sobjectType, connection);
18        List<BatchInfo> batchInfoList = createBatchesFromCSVFile(connection, job,
19            sampleFileName);
20        closeJob(connection, job.getId());
21        awaitCompletion(connection, job, batchInfoList);
22        checkResults(connection, job, batchInfoList);
23    }
24

Login and Configure BulkConnection

The following code logs in using a partner connection (PartnerConnection) and then reuses the session to create a Bulk API connection (BulkConnection).

1
2    /**
3     * Create the BulkConnection used to call Bulk API operations.
4     */
5    private BulkConnection getBulkConnection(String userName, String password)
6          throws ConnectionException, AsyncApiException {
7        ConnectorConfig partnerConfig = new ConnectorConfig();
8        partnerConfig.setUsername(userName);
9        partnerConfig.setPassword(password);
10        partnerConfig.setAuthEndpoint("https://login.salesforce.com/services/Soap/u/30.0");
11        // Creating the connection automatically handles login and stores
12        // the session in partnerConfig
13        new PartnerConnection(partnerConfig);
14        // When PartnerConnection is instantiated, a login is implicitly
15        // executed and, if successful,
16        // a valid session is stored in the ConnectorConfig instance.
17        // Use this key to initialize a BulkConnection:
18        ConnectorConfig config = new ConnectorConfig();
19        config.setSessionId(partnerConfig.getSessionId());
20        // The endpoint for the Bulk API service is the same as for the normal
21        // SOAP uri until the /Soap/ part. From here it's '/async/versionNumber'
22        String soapEndpoint = partnerConfig.getServiceEndpoint();
23        String apiVersion = "30.0";
24        String restEndpoint = soapEndpoint.substring(0, soapEndpoint.indexOf("Soap/"))
25            + "async/" + apiVersion;
26        config.setRestEndpoint(restEndpoint);
27        // This should only be false when doing debugging.
28        config.setCompression(true);
29        // Set this to true to see HTTP requests and responses on stdout
30        config.setTraceMessage(false);
31        BulkConnection connection = new BulkConnection(config);
32        return connection;
33    }
34

This BulkConnection instance is the base for using the Bulk API. The instance can be reused for the rest of the application life span.

Create a New Job

After creating the connection, create a new job. Data is always processed in the context of a job. The job specifies the details about the data being processed: what operation is being executed (insert, update, upsert, or delete) and the object type. The following code creates a new insert job on the Account object.

1
2    /**
3     * Create a new job using the Bulk API.
4     * 
5     * @param sobjectType
6     *            The object type being loaded, such as "Account"
7     * @param connection
8     *            BulkConnection used to create the new job.
9     * @return The JobInfo for the new job.
10     * @throws AsyncApiException
11     */
12    private JobInfo createJob(String sobjectType, BulkConnection connection)
13          throws AsyncApiException {
14        JobInfo job = new JobInfo();
15        job.setObject(sobjectType);
16        job.setOperation(OperationEnum.insert);
17        job.setContentType(ContentType.CSV);
18        job = connection.createJob(job);
19        System.out.println(job);
20        return job;
21    }
22

When a job is created, it's in the Open state. In this state new batches can be added to the job. Once a job is Closed, batches can no longer be added.

Add Batches to the Job

Data is processed in a series of batch requests. Each request is an HTTP POST containing the data set in XML format in the body. Your client application determines how many batches are used to process the whole data set as long as the batch size and total number of batches per day are within the limits specified in Bulk API Limits.

The processing of each batch comes with an overhead. Batch sizes should be large enough to minimize the overhead processing cost and small enough to be easily handled and transferred. Batch sizes between 1,000 and 10,000 records are considered reasonable.

The following code splits a CSV file into smaller batch files and uploads them to Salesforce.

1
2    /**
3     * Create and upload batches using a CSV file.
4     * The file into the appropriate size batch files.
5     * 
6     * @param connection
7     *            Connection to use for creating batches
8     * @param jobInfo
9     *            Job associated with new batches
10     * @param csvFileName
11     *            The source file for batch data
12     */
13    private List<BatchInfo> createBatchesFromCSVFile(BulkConnection connection,
14          JobInfo jobInfo, String csvFileName)
15            throws IOException, AsyncApiException {
16        List<BatchInfo> batchInfos = new ArrayList<BatchInfo>();
17        BufferedReader rdr = new BufferedReader(
18            new InputStreamReader(new FileInputStream(csvFileName))
19        );
20        // read the CSV header row
21        byte[] headerBytes = (rdr.readLine() + "\n").getBytes("UTF-8");
22        int headerBytesLength = headerBytes.length;
23        File tmpFile = File.createTempFile("bulkAPIInsert", ".csv");
24
25        // Split the CSV file into multiple batches
26        try {
27            FileOutputStream tmpOut = new FileOutputStream(tmpFile);
28            int maxBytesPerBatch = 10000000; // 10 million bytes per batch
29            int maxRowsPerBatch = 10000; // 10 thousand rows per batch
30            int currentBytes = 0;
31            int currentLines = 0;
32            String nextLine;
33            while ((nextLine = rdr.readLine()) != null) {
34                byte[] bytes = (nextLine + "\n").getBytes("UTF-8");
35                // Create a new batch when our batch size limit is reached
36                if (currentBytes + bytes.length > maxBytesPerBatch
37                  || currentLines > maxRowsPerBatch) {
38                    createBatch(tmpOut, tmpFile, batchInfos, connection, jobInfo);
39                    currentBytes = 0;
40                    currentLines = 0;
41                }
42                if (currentBytes == 0) {
43                    tmpOut = new FileOutputStream(tmpFile);
44                    tmpOut.write(headerBytes);
45                    currentBytes = headerBytesLength;
46                    currentLines = 1;
47                }
48                tmpOut.write(bytes);
49                currentBytes += bytes.length;
50                currentLines++;
51            }
52            // Finished processing all rows
53            // Create a final batch for any remaining data
54            if (currentLines > 1) {
55                createBatch(tmpOut, tmpFile, batchInfos, connection, jobInfo);
56            }
57        } finally {
58            tmpFile.delete();
59        }
60        return batchInfos;
61    }
62
63    /**
64     * Create a batch by uploading the contents of the file.
65     * This closes the output stream.
66     * 
67     * @param tmpOut
68     *            The output stream used to write the CSV data for a single batch.
69     * @param tmpFile
70     *            The file associated with the above stream.
71     * @param batchInfos
72     *            The batch info for the newly created batch is added to this list.
73     * @param connection
74     *            The BulkConnection used to create the new batch.
75     * @param jobInfo
76     *            The JobInfo associated with the new batch.
77     */
78    private void createBatch(FileOutputStream tmpOut, File tmpFile,
79      List<BatchInfo> batchInfos, BulkConnection connection, JobInfo jobInfo)
80              throws IOException, AsyncApiException {
81        tmpOut.flush();
82        tmpOut.close();
83        FileInputStream tmpInputStream = new FileInputStream(tmpFile);
84        try {
85            BatchInfo batchInfo =
86              connection.createBatchFromStream(jobInfo, tmpInputStream);
87            System.out.println(batchInfo);
88            batchInfos.add(batchInfo);
89
90        } finally {
91            tmpInputStream.close();
92        }
93    }
94

Once the server receives a batch it's immediately queued for processing. Any errors in formatting aren't reported when sending the batch. These errors are reported in the result data when the batch is processed.

To import binary attachments, use the following methods. Specify the CSV or XML content for the batch in the batchContent parameter, or include request.txt in the attached files and pass null to the batchContent parameter. These methods are contained within the com.async.BulkConnection class:

  • createBatchFromDir()
  • createBatchWithFileAttachments()
  • createBatchWithInputStreamAttachments()
  • createBatchFromZipStream()

Tip

Close the Job

After all batches have been added to a job, close the job. Closing the job ensures that processing of all batches can finish.

1
2    private void closeJob(BulkConnection connection, String jobId)
3          throws AsyncApiException {
4        JobInfo job = new JobInfo();
5        job.setId(jobId);
6        job.setState(JobStateEnum.Closed);
7        connection.updateJob(job);
8    }
9

Check Status on Batches

Batches are processed in the background. A batch may take some time to complete depending on the size of the data set. During processing, the status of all batches can be retrieved and checked to see when they have completed.

1
2    /**
3     * Wait for a job to complete by polling the Bulk API.
4     * 
5     * @param connection
6     *            BulkConnection used to check results.
7     * @param job
8     *            The job awaiting completion.
9     * @param batchInfoList
10     *            List of batches for this job.
11     * @throws AsyncApiException
12     */
13    private void awaitCompletion(BulkConnection connection, JobInfo job,
14          List<BatchInfo> batchInfoList)
15            throws AsyncApiException {
16        long sleepTime = 0L;
17        Set<String> incomplete = new HashSet<String>();
18        for (BatchInfo bi : batchInfoList) {
19            incomplete.add(bi.getId());
20        }
21        while (!incomplete.isEmpty()) {
22            try {
23                Thread.sleep(sleepTime);
24            } catch (InterruptedException e) {}
25            System.out.println("Awaiting results..." + incomplete.size());
26            sleepTime = 10000L;
27            BatchInfo[] statusList =
28              connection.getBatchInfoList(job.getId()).getBatchInfo();
29            for (BatchInfo b : statusList) {
30                if (b.getState() == BatchStateEnum.Completed
31                  || b.getState() == BatchStateEnum.Failed) {
32                    if (incomplete.remove(b.getId())) {
33                        System.out.println("BATCH STATUS:\n" + b);
34                    }
35                }
36            }
37        }
38    }
39

A batch is done when it's either failed or completed. This code loops infinitely until all the batches for the job have either failed or completed.

Get Results For a Job

After all batches have completed, the results of each batch can be retrieved. Results should be retrieved whether the batch succeeded or failed, or even when the job was aborted, because only the result sets indicate the status of individual records. To properly pair a result with its corresponding record, the code must not lose track of how the batches correspond to the original data set. This can be achieved by keeping the original list of batches from when they were created and using this list to retrieve results, as shown in the following example:

1
2    /**
3     * Gets the results of the operation and checks for errors.
4     */
5    private void checkResults(BulkConnection connection, JobInfo job,
6              List<BatchInfo> batchInfoList)
7            throws AsyncApiException, IOException {
8        // batchInfoList was populated when batches were created and submitted
9        for (BatchInfo b : batchInfoList) {
10            CSVReader rdr =
11              new CSVReader(connection.getBatchResultStream(job.getId(), b.getId()));
12            List<String> resultHeader = rdr.nextRecord();
13            int resultCols = resultHeader.size();
14
15            List<String> row;
16            while ((row = rdr.nextRecord()) != null) {
17                Map<String, String> resultInfo = new HashMap<String, String>();
18                for (int i = 0; i < resultCols; i++) {
19                    resultInfo.put(resultHeader.get(i), row.get(i));
20                }
21                boolean success = Boolean.valueOf(resultInfo.get("Success"));
22                boolean created = Boolean.valueOf(resultInfo.get("Created"));
23                String id = resultInfo.get("Id");
24                String error = resultInfo.get("Error");
25                if (success && created) {
26                    System.out.println("Created row with id " + id);
27                } else if (!success) {
28                    System.out.println("Failed with error: " + error);
29                }
30            }
31        }
32    }
33

This code retrieves the results for each record and reports whether the operation succeeded or failed. If an error occurred for a record, the code prints out the error.

Complete Quick Start Sample

Now that you're more familiar with jobs and batches, you can copy and paste the entire quick start sample and use it:

1swfobject.registerObject("clippy.codeblock-8", "9");
2import java.io.*;
3import java.util.*;
4
5import com.sforce.async.*;
6import com.sforce.soap.partner.PartnerConnection;
7import com.sforce.ws.ConnectionException;
8import com.sforce.ws.ConnectorConfig;
9
10    
11public class BulkExample {
12
13
14    public static void main(String[] args)
15      throws AsyncApiException, ConnectionException, IOException {
16        BulkExample example = new BulkExample();
17        // Replace arguments below with your credentials and test file name
18        // The first parameter indicates that we are loading Account records
19        example.runSample("Account", "myUser@myOrg.com", "myPassword", "mySampleData.csv");
20    }
21
22    /**
23     * Creates a Bulk API job and uploads batches for a CSV file.
24     */
25    public void runSample(String sobjectType, String userName,
26              String password, String sampleFileName)
27            throws AsyncApiException, ConnectionException, IOException {
28        BulkConnection connection = getBulkConnection(userName, password);
29        JobInfo job = createJob(sobjectType, connection);
30        List<BatchInfo> batchInfoList = createBatchesFromCSVFile(connection, job,
31            sampleFileName);
32        closeJob(connection, job.getId());
33        awaitCompletion(connection, job, batchInfoList);
34        checkResults(connection, job, batchInfoList);
35    }
36
37
38
39    /**
40     * Gets the results of the operation and checks for errors.
41     */
42    private void checkResults(BulkConnection connection, JobInfo job,
43              List<BatchInfo> batchInfoList)
44            throws AsyncApiException, IOException {
45        // batchInfoList was populated when batches were created and submitted
46        for (BatchInfo b : batchInfoList) {
47            CSVReader rdr =
48              new CSVReader(connection.getBatchResultStream(job.getId(), b.getId()));
49            List<String> resultHeader = rdr.nextRecord();
50            int resultCols = resultHeader.size();
51
52            List<String> row;
53            while ((row = rdr.nextRecord()) != null) {
54                Map<String, String> resultInfo = new HashMap<String, String>();
55                for (int i = 0; i < resultCols; i++) {
56                    resultInfo.put(resultHeader.get(i), row.get(i));
57                }
58                boolean success = Boolean.valueOf(resultInfo.get("Success"));
59                boolean created = Boolean.valueOf(resultInfo.get("Created"));
60                String id = resultInfo.get("Id");
61                String error = resultInfo.get("Error");
62                if (success && created) {
63                    System.out.println("Created row with id " + id);
64                } else if (!success) {
65                    System.out.println("Failed with error: " + error);
66                }
67            }
68        }
69    }
70
71
72
73    private void closeJob(BulkConnection connection, String jobId)
74          throws AsyncApiException {
75        JobInfo job = new JobInfo();
76        job.setId(jobId);
77        job.setState(JobStateEnum.Closed);
78        connection.updateJob(job);
79    }
80
81
82
83    /**
84     * Wait for a job to complete by polling the Bulk API.
85     * 
86     * @param connection
87     *            BulkConnection used to check results.
88     * @param job
89     *            The job awaiting completion.
90     * @param batchInfoList
91     *            List of batches for this job.
92     * @throws AsyncApiException
93     */
94    private void awaitCompletion(BulkConnection connection, JobInfo job,
95          List<BatchInfo> batchInfoList)
96            throws AsyncApiException {
97        long sleepTime = 0L;
98        Set<String> incomplete = new HashSet<String>();
99        for (BatchInfo bi : batchInfoList) {
100            incomplete.add(bi.getId());
101        }
102        while (!incomplete.isEmpty()) {
103            try {
104                Thread.sleep(sleepTime);
105            } catch (InterruptedException e) {}
106            System.out.println("Awaiting results..." + incomplete.size());
107            sleepTime = 10000L;
108            BatchInfo[] statusList =
109              connection.getBatchInfoList(job.getId()).getBatchInfo();
110            for (BatchInfo b : statusList) {
111                if (b.getState() == BatchStateEnum.Completed
112                  || b.getState() == BatchStateEnum.Failed) {
113                    if (incomplete.remove(b.getId())) {
114                        System.out.println("BATCH STATUS:\n" + b);
115                    }
116                }
117            }
118        }
119    }
120
121
122
123    /**
124     * Create a new job using the Bulk API.
125     * 
126     * @param sobjectType
127     *            The object type being loaded, such as "Account"
128     * @param connection
129     *            BulkConnection used to create the new job.
130     * @return The JobInfo for the new job.
131     * @throws AsyncApiException
132     */
133    private JobInfo createJob(String sobjectType, BulkConnection connection)
134          throws AsyncApiException {
135        JobInfo job = new JobInfo();
136        job.setObject(sobjectType);
137        job.setOperation(OperationEnum.insert);
138        job.setContentType(ContentType.CSV);
139        job = connection.createJob(job);
140        System.out.println(job);
141        return job;
142    }
143
144    
145
146    /**
147     * Create the BulkConnection used to call Bulk API operations.
148     */
149    private BulkConnection getBulkConnection(String userName, String password)
150          throws ConnectionException, AsyncApiException {
151        ConnectorConfig partnerConfig = new ConnectorConfig();
152        partnerConfig.setUsername(userName);
153        partnerConfig.setPassword(password);
154        partnerConfig.setAuthEndpoint("https://login.salesforce.com/services/Soap/u/30.0");
155        // Creating the connection automatically handles login and stores
156        // the session in partnerConfig
157        new PartnerConnection(partnerConfig);
158        // When PartnerConnection is instantiated, a login is implicitly
159        // executed and, if successful,
160        // a valid session is stored in the ConnectorConfig instance.
161        // Use this key to initialize a BulkConnection:
162        ConnectorConfig config = new ConnectorConfig();
163        config.setSessionId(partnerConfig.getSessionId());
164        // The endpoint for the Bulk API service is the same as for the normal
165        // SOAP uri until the /Soap/ part. From here it's '/async/versionNumber'
166        String soapEndpoint = partnerConfig.getServiceEndpoint();
167        String apiVersion = "30.0";
168        String restEndpoint = soapEndpoint.substring(0, soapEndpoint.indexOf("Soap/"))
169            + "async/" + apiVersion;
170        config.setRestEndpoint(restEndpoint);
171        // This should only be false when doing debugging.
172        config.setCompression(true);
173        // Set this to true to see HTTP requests and responses on stdout
174        config.setTraceMessage(false);
175        BulkConnection connection = new BulkConnection(config);
176        return connection;
177    }
178
179
180
181    /**
182     * Create and upload batches using a CSV file.
183     * The file into the appropriate size batch files.
184     * 
185     * @param connection
186     *            Connection to use for creating batches
187     * @param jobInfo
188     *            Job associated with new batches
189     * @param csvFileName
190     *            The source file for batch data
191     */
192    private List<BatchInfo> createBatchesFromCSVFile(BulkConnection connection,
193          JobInfo jobInfo, String csvFileName)
194            throws IOException, AsyncApiException {
195        List<BatchInfo> batchInfos = new ArrayList<BatchInfo>();
196        BufferedReader rdr = new BufferedReader(
197            new InputStreamReader(new FileInputStream(csvFileName))
198        );
199        // read the CSV header row
200        byte[] headerBytes = (rdr.readLine() + "\n").getBytes("UTF-8");
201        int headerBytesLength = headerBytes.length;
202        File tmpFile = File.createTempFile("bulkAPIInsert", ".csv");
203
204        // Split the CSV file into multiple batches
205        try {
206            FileOutputStream tmpOut = new FileOutputStream(tmpFile);
207            int maxBytesPerBatch = 10000000; // 10 million bytes per batch
208            int maxRowsPerBatch = 10000; // 10 thousand rows per batch
209            int currentBytes = 0;
210            int currentLines = 0;
211            String nextLine;
212            while ((nextLine = rdr.readLine()) != null) {
213                byte[] bytes = (nextLine + "\n").getBytes("UTF-8");
214                // Create a new batch when our batch size limit is reached
215                if (currentBytes + bytes.length > maxBytesPerBatch
216                  || currentLines > maxRowsPerBatch) {
217                    createBatch(tmpOut, tmpFile, batchInfos, connection, jobInfo);
218                    currentBytes = 0;
219                    currentLines = 0;
220                }
221                if (currentBytes == 0) {
222                    tmpOut = new FileOutputStream(tmpFile);
223                    tmpOut.write(headerBytes);
224                    currentBytes = headerBytesLength;
225                    currentLines = 1;
226                }
227                tmpOut.write(bytes);
228                currentBytes += bytes.length;
229                currentLines++;
230            }
231            // Finished processing all rows
232            // Create a final batch for any remaining data
233            if (currentLines > 1) {
234                createBatch(tmpOut, tmpFile, batchInfos, connection, jobInfo);
235            }
236        } finally {
237            tmpFile.delete();
238        }
239        return batchInfos;
240    }
241
242    /**
243     * Create a batch by uploading the contents of the file.
244     * This closes the output stream.
245     * 
246     * @param tmpOut
247     *            The output stream used to write the CSV data for a single batch.
248     * @param tmpFile
249     *            The file associated with the above stream.
250     * @param batchInfos
251     *            The batch info for the newly created batch is added to this list.
252     * @param connection
253     *            The BulkConnection used to create the new batch.
254     * @param jobInfo
255     *            The JobInfo associated with the new batch.
256     */
257    private void createBatch(FileOutputStream tmpOut, File tmpFile,
258      List<BatchInfo> batchInfos, BulkConnection connection, JobInfo jobInfo)
259              throws IOException, AsyncApiException {
260        tmpOut.flush();
261        tmpOut.close();
262        FileInputStream tmpInputStream = new FileInputStream(tmpFile);
263        try {
264            BatchInfo batchInfo =
265              connection.createBatchFromStream(jobInfo, tmpInputStream);
266            System.out.println(batchInfo);
267            batchInfos.add(batchInfo);
268
269        } finally {
270            tmpInputStream.close();
271        }
272    }
273
274
275}