Newer Version Available

This content describes an older version of this product. View Latest

Walk Through the Sample Code

After you set up your client, you can build client applications that use the Bulk API. Use the sample to create a client application. Each section steps through part of the code. The complete sample is included at the end.

This code sets up the packages and classes in the WSC toolkit and the code generated from the partner WSDL:

1import java.io.*;
2import java.util.*;
3
4import com.sforce.async.*;
5import com.sforce.soap.partner.PartnerConnection;
6import com.sforce.ws.ConnectionException;
7import com.sforce.ws.ConnectorConfig;

Set Up the main() Method

This code sets up the main() method for the class. It calls the runSample() method, which encompasses the processing logic for the sample. We look at the methods called in runSample() in subsequent sections.

1public static void main(String[] args)
2      throws AsyncApiException, ConnectionException, IOException {
3        BulkExample example = new BulkExample();
4        // Replace arguments below with your credentials and test file name
5        // The first parameter indicates that we are loading Account records
6        example.runSample("Account", "myUser@myOrg.com", "myPassword", "mySampleData.csv");
7    }
8
9    /**
10     * Creates a Bulk API job and uploads batches for a CSV file.
11     */
12    public void runSample(String sobjectType, String userName,
13              String password, String sampleFileName)
14            throws AsyncApiException, ConnectionException, IOException {
15        BulkConnection connection = getBulkConnection(userName, password);
16        JobInfo job = createJob(sobjectType, connection);
17        List<BatchInfo> batchInfoList = createBatchesFromCSVFile(connection, job,
18            sampleFileName);
19        closeJob(connection, job.getId());
20        awaitCompletion(connection, job, batchInfoList);
21        checkResults(connection, job, batchInfoList);
22    }

Login and Configure BulkConnection

This code logs in using a partner connection (PartnerConnection) and then reuses the session to create a Bulk API connection (BulkConnection).

1/**
2     * Create the BulkConnection used to call Bulk API operations.
3     */
4    private BulkConnection getBulkConnection(String userName, String password)
5          throws ConnectionException, AsyncApiException {
6        ConnectorConfig partnerConfig = new ConnectorConfig();
7        partnerConfig.setUsername(userName);
8        partnerConfig.setPassword(password);
9        partnerConfig.setAuthEndpoint("https://login.salesforce.com/services/Soap/u/66.0");
10        // Creating the connection automatically handles login and stores
11        // the session in partnerConfig
12        new PartnerConnection(partnerConfig);
13        // When PartnerConnection is instantiated, a login is implicitly
14        // executed and, if successful,
15        // a valid session is stored in the ConnectorConfig instance.
16        // Use this key to initialize a BulkConnection:
17        ConnectorConfig config = new ConnectorConfig();
18        config.setSessionId(partnerConfig.getSessionId());
19        // The endpoint for the Bulk API service is the same as for the normal
20        // SOAP uri until the /Soap/ part. From here it's '/async/versionNumber'
21        String soapEndpoint = partnerConfig.getServiceEndpoint();
22        String apiVersion = "66.0";
23        String restEndpoint = soapEndpoint.substring(0, soapEndpoint.indexOf("Soap/"))
24            + "async/" + apiVersion;
25        config.setRestEndpoint(restEndpoint);
26        // This should only be false when doing debugging.
27        config.setCompression(true);
28        // Set this to true to see HTTP requests and responses on stdout
29        config.setTraceMessage(false);
30        BulkConnection connection = new BulkConnection(config);
31        return connection;
32    }

This BulkConnection instance is the base for using the Bulk API. The instance can be reused for the rest of the application lifespan.

Create a Job

After creating the connection, create a job. Data is always processed in the context of a job. The job specifies the details about the data being processed: which operation is being executed (insert, update, upsert, or delete) and the object type. This code creates a new insert job on the Account object.

1/**
2     * Create a new job using the Bulk API.
3     * 
4     * @param sobjectType
5     *            The object type being loaded, such as "Account"
6     * @param connection
7     *            BulkConnection used to create the new job.
8     * @return The JobInfo for the new job.
9     * @throws AsyncApiException
10     */
11    private JobInfo createJob(String sobjectType, BulkConnection connection)
12          throws AsyncApiException {
13        JobInfo job = new JobInfo();
14        job.setObject(sobjectType);
15        job.setOperation(OperationEnum.insert);
16        job.setContentType(ContentType.CSV);
17        job = connection.createJob(job);
18        System.out.println(job);
19        return job;
20    }

When a job is created, it’s in the Open state. In this state, new batches can be added to the job. When a job is Closed, batches can no longer be added.

Add Batches to the Job

Data is processed in a series of batch requests. Each request is an HTTP POST containing the data set in XML format in the body. Your client application determines how many batches are used to process the whole data set as long as the batch size and total number of batches per day are within the limits specified in Limits.

The processing of each batch comes with an overhead. Make batch sizes large enough to minimize the overhead processing cost, and small enough to be handled and transferred easily. Batch sizes between 1,000 and 10,000 records are considered reasonable.

This code splits a CSV file into smaller batch files and uploads them to Salesforce.

1/**
2     * Create and upload batches using a CSV file.
3     * The file into the appropriate size batch files.
4     * 
5     * @param connection
6     *            Connection to use for creating batches
7     * @param jobInfo
8     *            Job associated with new batches
9     * @param csvFileName
10     *            The source file for batch data
11     */
12    private List<BatchInfo> createBatchesFromCSVFile(BulkConnection connection,
13          JobInfo jobInfo, String csvFileName)
14            throws IOException, AsyncApiException {
15        List<BatchInfo> batchInfos = new ArrayList<BatchInfo>();
16        BufferedReader rdr = new BufferedReader(
17            new InputStreamReader(new FileInputStream(csvFileName))
18        );
19        // read the CSV header row
20        byte[] headerBytes = (rdr.readLine() + "\n").getBytes("UTF-8");
21        int headerBytesLength = headerBytes.length;
22        File tmpFile = File.createTempFile("bulkAPIInsert", ".csv");
23
24        // Split the CSV file into multiple batches
25        try {
26            FileOutputStream tmpOut = new FileOutputStream(tmpFile);
27            int maxBytesPerBatch = 10000000; // 10 million bytes per batch
28            int maxRowsPerBatch = 10000; // 10 thousand rows per batch
29            int currentBytes = 0;
30            int currentLines = 0;
31            String nextLine;
32            while ((nextLine = rdr.readLine()) != null) {
33                byte[] bytes = (nextLine + "\n").getBytes("UTF-8");
34                // Create a new batch when our batch size limit is reached
35                if (currentBytes + bytes.length > maxBytesPerBatch
36                  || currentLines > maxRowsPerBatch) {
37                    createBatch(tmpOut, tmpFile, batchInfos, connection, jobInfo);
38                    currentBytes = 0;
39                    currentLines = 0;
40                }
41                if (currentBytes == 0) {
42                    tmpOut = new FileOutputStream(tmpFile);
43                    tmpOut.write(headerBytes);
44                    currentBytes = headerBytesLength;
45                    currentLines = 1;
46                }
47                tmpOut.write(bytes);
48                currentBytes += bytes.length;
49                currentLines++;
50            }
51            // Finished processing all rows
52            // Create a final batch for any remaining data
53            if (currentLines > 1) {
54                createBatch(tmpOut, tmpFile, batchInfos, connection, jobInfo);
55            }
56        } finally {
57            tmpFile.delete();
58        }
59        return batchInfos;
60    }
61
62    /**
63     * Create a batch by uploading the contents of the file.
64     * This closes the output stream.
65     * 
66     * @param tmpOut
67     *            The output stream used to write the CSV data for a single batch.
68     * @param tmpFile
69     *            The file associated with the above stream.
70     * @param batchInfos
71     *            The batch info for the newly created batch is added to this list.
72     * @param connection
73     *            The BulkConnection used to create the new batch.
74     * @param jobInfo
75     *            The JobInfo associated with the new batch.
76     */
77    private void createBatch(FileOutputStream tmpOut, File tmpFile,
78      List<BatchInfo> batchInfos, BulkConnection connection, JobInfo jobInfo)
79              throws IOException, AsyncApiException {
80        tmpOut.flush();
81        tmpOut.close();
82        FileInputStream tmpInputStream = new FileInputStream(tmpFile);
83        try {
84            BatchInfo batchInfo =
85              connection.createBatchFromStream(jobInfo, tmpInputStream);
86            System.out.println(batchInfo);
87            batchInfos.add(batchInfo);
88
89        } finally {
90            tmpInputStream.close();
91        }
92    }

When the server receives a batch, it’s immediately queued for processing. Errors in formatting aren’t reported when sending the batch. These errors are reported in the result data when the batch is processed.

To import binary attachments, use the following methods. Specify the CSV, XML, or JSON content for the batch in the batchContent parameter, or include request.txt in the attached files and pass null to the batchContent parameter. These methods are contained within the com.async.BulkConnection class:

  • createBatchFromDir()
  • createBatchWithFileAttachments()
  • createBatchWithInputStreamAttachments()
  • createBatchFromZipStream()

Tip

Close the Job

After all batches are added to a job, close the job. Closing the job ensures that processing of all batches can finish.

1private void closeJob(BulkConnection connection, String jobId)
2          throws AsyncApiException {
3        JobInfo job = new JobInfo();
4        job.setId(jobId);
5        job.setState(JobStateEnum.Closed);
6        connection.updateJob(job);
7    }

Check Status on Batches

Batches are processed in the background. The size of the data set determines how long processing takes. During processing, you can retrieve and check the status of all batches, and you can see when processing is complete.

1/**
2     * Wait for a job to complete by polling the Bulk API.
3     * 
4     * @param connection
5     *            BulkConnection used to check results.
6     * @param job
7     *            The job awaiting completion.
8     * @param batchInfoList
9     *            List of batches for this job.
10     * @throws AsyncApiException
11     */
12    private void awaitCompletion(BulkConnection connection, JobInfo job,
13          List<BatchInfo> batchInfoList)
14            throws AsyncApiException {
15        long sleepTime = 0L;
16        Set<String> incomplete = new HashSet<String>();
17        for (BatchInfo bi : batchInfoList) {
18            incomplete.add(bi.getId());
19        }
20        while (!incomplete.isEmpty()) {
21            try {
22                Thread.sleep(sleepTime);
23            } catch (InterruptedException e) {}
24            System.out.println("Awaiting results..." + incomplete.size());
25            sleepTime = 10000L;
26            BatchInfo[] statusList =
27              connection.getBatchInfoList(job.getId()).getBatchInfo();
28            for (BatchInfo b : statusList) {
29                if (b.getState() == BatchStateEnum.Completed
30                  || b.getState() == BatchStateEnum.Failed) {
31                    if (incomplete.remove(b.getId())) {
32                        System.out.println("BATCH STATUS:\n" + b);
33                    }
34                }
35            }
36        }
37    }

A batch is done when it's either failed or completed. This code loops infinitely until all the batches for the job have either failed or completed.

Get Results For a Job

You can retrieve the results of each batch when all batches are processed. Retrieve results whether the batch succeeded or failed, or even if the job was aborted, because only the result sets indicate the status of individual records. To properly pair a result with its corresponding record, the code must not lose track of how the batches correspond to the original data set. So keep the original list of batches from when they were created and use this list to retrieve results, as shown in this example:

1/**
2     * Gets the results of the operation and checks for errors.
3     */
4    private void checkResults(BulkConnection connection, JobInfo job,
5              List<BatchInfo> batchInfoList)
6            throws AsyncApiException, IOException {
7        // batchInfoList was populated when batches were created and submitted
8        for (BatchInfo b : batchInfoList) {
9            CSVReader rdr =
10              new CSVReader(connection.getBatchResultStream(job.getId(), b.getId()));
11            List<String> resultHeader = rdr.nextRecord();
12            int resultCols = resultHeader.size();
13
14            List<String> row;
15            while ((row = rdr.nextRecord()) != null) {
16                Map<String, String> resultInfo = new HashMap<String, String>();
17                for (int i = 0; i < resultCols; i++) {
18                    resultInfo.put(resultHeader.get(i), row.get(i));
19                }
20                boolean success = Boolean.valueOf(resultInfo.get("Success"));
21                boolean created = Boolean.valueOf(resultInfo.get("Created"));
22                String id = resultInfo.get("Id");
23                String error = resultInfo.get("Error");
24                if (success && created) {
25                    System.out.println("Created row with id " + id);
26                } else if (!success) {
27                    System.out.println("Failed with error: " + error);
28                }
29            }
30        }
31    }

This code retrieves the results for each record and reports whether the operation succeeded or failed. If an error occurred for a record, the code prints out the error.

Complete Quick Start Sample

Now that you're more familiar with jobs and batches, you can copy and paste the entire quick start sample and use it:

1import java.io.*;
2import java.util.*;
3
4import com.sforce.async.*;
5import com.sforce.soap.partner.PartnerConnection;
6import com.sforce.ws.ConnectionException;
7import com.sforce.ws.ConnectorConfig;
8
9    
10public class BulkExample {
11
12
13    public static void main(String[] args)
14      throws AsyncApiException, ConnectionException, IOException {
15        BulkExample example = new BulkExample();
16        // Replace arguments below with your credentials and test file name
17        // The first parameter indicates that we are loading Account records
18        example.runSample("Account", "myUser@myOrg.com", "myPassword", "mySampleData.csv");
19    }
20
21    /**
22     * Creates a Bulk API job and uploads batches for a CSV file.
23     */
24    public void runSample(String sobjectType, String userName,
25              String password, String sampleFileName)
26            throws AsyncApiException, ConnectionException, IOException {
27        BulkConnection connection = getBulkConnection(userName, password);
28        JobInfo job = createJob(sobjectType, connection);
29        List<BatchInfo> batchInfoList = createBatchesFromCSVFile(connection, job,
30            sampleFileName);
31        closeJob(connection, job.getId());
32        awaitCompletion(connection, job, batchInfoList);
33        checkResults(connection, job, batchInfoList);
34    }
35
36
37
38    /**
39     * Gets the results of the operation and checks for errors.
40     */
41    private void checkResults(BulkConnection connection, JobInfo job,
42              List<BatchInfo> batchInfoList)
43            throws AsyncApiException, IOException {
44        // batchInfoList was populated when batches were created and submitted
45        for (BatchInfo b : batchInfoList) {
46            CSVReader rdr =
47              new CSVReader(connection.getBatchResultStream(job.getId(), b.getId()));
48            List<String> resultHeader = rdr.nextRecord();
49            int resultCols = resultHeader.size();
50
51            List<String> row;
52            while ((row = rdr.nextRecord()) != null) {
53                Map<String, String> resultInfo = new HashMap<String, String>();
54                for (int i = 0; i < resultCols; i++) {
55                    resultInfo.put(resultHeader.get(i), row.get(i));
56                }
57                boolean success = Boolean.valueOf(resultInfo.get("Success"));
58                boolean created = Boolean.valueOf(resultInfo.get("Created"));
59                String id = resultInfo.get("Id");
60                String error = resultInfo.get("Error");
61                if (success && created) {
62                    System.out.println("Created row with id " + id);
63                } else if (!success) {
64                    System.out.println("Failed with error: " + error);
65                }
66            }
67        }
68    }
69
70
71
72    private void closeJob(BulkConnection connection, String jobId)
73          throws AsyncApiException {
74        JobInfo job = new JobInfo();
75        job.setId(jobId);
76        job.setState(JobStateEnum.Closed);
77        connection.updateJob(job);
78    }
79
80
81
82    /**
83     * Wait for a job to complete by polling the Bulk API.
84     * 
85     * @param connection
86     *            BulkConnection used to check results.
87     * @param job
88     *            The job awaiting completion.
89     * @param batchInfoList
90     *            List of batches for this job.
91     * @throws AsyncApiException
92     */
93    private void awaitCompletion(BulkConnection connection, JobInfo job,
94          List<BatchInfo> batchInfoList)
95            throws AsyncApiException {
96        long sleepTime = 0L;
97        Set<String> incomplete = new HashSet<String>();
98        for (BatchInfo bi : batchInfoList) {
99            incomplete.add(bi.getId());
100        }
101        while (!incomplete.isEmpty()) {
102            try {
103                Thread.sleep(sleepTime);
104            } catch (InterruptedException e) {}
105            System.out.println("Awaiting results..." + incomplete.size());
106            sleepTime = 10000L;
107            BatchInfo[] statusList =
108              connection.getBatchInfoList(job.getId()).getBatchInfo();
109            for (BatchInfo b : statusList) {
110                if (b.getState() == BatchStateEnum.Completed
111                  || b.getState() == BatchStateEnum.Failed) {
112                    if (incomplete.remove(b.getId())) {
113                        System.out.println("BATCH STATUS:\n" + b);
114                    }
115                }
116            }
117        }
118    }
119
120
121
122    /**
123     * Create a new job using the Bulk API.
124     * 
125     * @param sobjectType
126     *            The object type being loaded, such as "Account"
127     * @param connection
128     *            BulkConnection used to create the new job.
129     * @return The JobInfo for the new job.
130     * @throws AsyncApiException
131     */
132    private JobInfo createJob(String sobjectType, BulkConnection connection)
133          throws AsyncApiException {
134        JobInfo job = new JobInfo();
135        job.setObject(sobjectType);
136        job.setOperation(OperationEnum.insert);
137        job.setContentType(ContentType.CSV);
138        job = connection.createJob(job);
139        System.out.println(job);
140        return job;
141    }
142
143    
144
145    /**
146     * Create the BulkConnection used to call Bulk API operations.
147     */
148    private BulkConnection getBulkConnection(String userName, String password)
149          throws ConnectionException, AsyncApiException {
150        ConnectorConfig partnerConfig = new ConnectorConfig();
151        partnerConfig.setUsername(userName);
152        partnerConfig.setPassword(password);
153        partnerConfig.setAuthEndpoint("https://login.salesforce.com/services/Soap/u/66.0");
154        // Creating the connection automatically handles login and stores
155        // the session in partnerConfig
156        new PartnerConnection(partnerConfig);
157        // When PartnerConnection is instantiated, a login is implicitly
158        // executed and, if successful,
159        // a valid session is stored in the ConnectorConfig instance.
160        // Use this key to initialize a BulkConnection:
161        ConnectorConfig config = new ConnectorConfig();
162        config.setSessionId(partnerConfig.getSessionId());
163        // The endpoint for the Bulk API service is the same as for the normal
164        // SOAP uri until the /Soap/ part. From here it's '/async/versionNumber'
165        String soapEndpoint = partnerConfig.getServiceEndpoint();
166        String apiVersion = "66.0";
167        String restEndpoint = soapEndpoint.substring(0, soapEndpoint.indexOf("Soap/"))
168            + "async/" + apiVersion;
169        config.setRestEndpoint(restEndpoint);
170        // This should only be false when doing debugging.
171        config.setCompression(true);
172        // Set this to true to see HTTP requests and responses on stdout
173        config.setTraceMessage(false);
174        BulkConnection connection = new BulkConnection(config);
175        return connection;
176    }
177
178
179
180    /**
181     * Create and upload batches using a CSV file.
182     * The file into the appropriate size batch files.
183     * 
184     * @param connection
185     *            Connection to use for creating batches
186     * @param jobInfo
187     *            Job associated with new batches
188     * @param csvFileName
189     *            The source file for batch data
190     */
191    private List<BatchInfo> createBatchesFromCSVFile(BulkConnection connection,
192          JobInfo jobInfo, String csvFileName)
193            throws IOException, AsyncApiException {
194        List<BatchInfo> batchInfos = new ArrayList<BatchInfo>();
195        BufferedReader rdr = new BufferedReader(
196            new InputStreamReader(new FileInputStream(csvFileName))
197        );
198        // read the CSV header row
199        byte[] headerBytes = (rdr.readLine() + "\n").getBytes("UTF-8");
200        int headerBytesLength = headerBytes.length;
201        File tmpFile = File.createTempFile("bulkAPIInsert", ".csv");
202
203        // Split the CSV file into multiple batches
204        try {
205            FileOutputStream tmpOut = new FileOutputStream(tmpFile);
206            int maxBytesPerBatch = 10000000; // 10 million bytes per batch
207            int maxRowsPerBatch = 10000; // 10 thousand rows per batch
208            int currentBytes = 0;
209            int currentLines = 0;
210            String nextLine;
211            while ((nextLine = rdr.readLine()) != null) {
212                byte[] bytes = (nextLine + "\n").getBytes("UTF-8");
213                // Create a new batch when our batch size limit is reached
214                if (currentBytes + bytes.length > maxBytesPerBatch
215                  || currentLines > maxRowsPerBatch) {
216                    createBatch(tmpOut, tmpFile, batchInfos, connection, jobInfo);
217                    currentBytes = 0;
218                    currentLines = 0;
219                }
220                if (currentBytes == 0) {
221                    tmpOut = new FileOutputStream(tmpFile);
222                    tmpOut.write(headerBytes);
223                    currentBytes = headerBytesLength;
224                    currentLines = 1;
225                }
226                tmpOut.write(bytes);
227                currentBytes += bytes.length;
228                currentLines++;
229            }
230            // Finished processing all rows
231            // Create a final batch for any remaining data
232            if (currentLines > 1) {
233                createBatch(tmpOut, tmpFile, batchInfos, connection, jobInfo);
234            }
235        } finally {
236            tmpFile.delete();
237        }
238        return batchInfos;
239    }
240
241    /**
242     * Create a batch by uploading the contents of the file.
243     * This closes the output stream.
244     * 
245     * @param tmpOut
246     *            The output stream used to write the CSV data for a single batch.
247     * @param tmpFile
248     *            The file associated with the above stream.
249     * @param batchInfos
250     *            The batch info for the newly created batch is added to this list.
251     * @param connection
252     *            The BulkConnection used to create the new batch.
253     * @param jobInfo
254     *            The JobInfo associated with the new batch.
255     */
256    private void createBatch(FileOutputStream tmpOut, File tmpFile,
257      List<BatchInfo> batchInfos, BulkConnection connection, JobInfo jobInfo)
258              throws IOException, AsyncApiException {
259        tmpOut.flush();
260        tmpOut.close();
261        FileInputStream tmpInputStream = new FileInputStream(tmpFile);
262        try {
263            BatchInfo batchInfo =
264              connection.createBatchFromStream(jobInfo, tmpInputStream);
265            System.out.println(batchInfo);
266            batchInfos.add(batchInfo);
267
268        } finally {
269            tmpInputStream.close();
270        }
271    }
272
273
274}