Newer Version Available

This content describes an older version of this product. View Latest

Walk Through the Sample Code

After you set up your client, you can build client applications that use the Bulk API. Use the sample to create a client application. Each section steps through part of the code. The complete sample is included at the end.

This code sets up the packages and classes in the WSC toolkit and the code generated from the partner WSDL:

1import java.io.*;
2import java.util.*;
3
4import com.sforce.async.*;
5import com.sforce.soap.partner.PartnerConnection;
6import com.sforce.ws.ConnectionException;
7import com.sforce.ws.ConnectorConfig;

Set Up the main() Method

This code sets up the main() method for the class. It calls the runSample() method, which encompasses the processing logic for the sample. We look at the methods called in runSample() in subsequent sections.

1public static void main(String[] args)
2      throws AsyncApiException, ConnectionException, IOException {
3        BulkExample example = new BulkExample();
4        // Replace arguments below with your credentials and test file name
5        // The first parameter indicates that we are loading Account records
6        example.runSample("Account", "myUser@myOrg.com", "myPassword", "mySampleData.csv");
7    }
8
9    /**
10     * Creates a Bulk API job and uploads batches for a CSV file.
11     */
12    public void runSample(String sobjectType, String userName,
13              String password, String sampleFileName)
14            throws AsyncApiException, ConnectionException, IOException {
15        BulkConnection connection = getBulkConnection(userName, password);
16        JobInfo job = createJob(sobjectType, connection);
17        List<BatchInfo> batchInfoList = createBatchesFromCSVFile(connection, job,
18            sampleFileName);
19        closeJob(connection, job.getId());
20        awaitCompletion(connection, job, batchInfoList);
21        checkResults(connection, job, batchInfoList);
22    }

Login and Configure BulkConnection

This code logs in using a partner connection (PartnerConnection) and then reuses the session to create a Bulk API connection (BulkConnection).

1/**
2     * Create the BulkConnection used to call Bulk API operations.
3     */
4    private BulkConnection getBulkConnection(String userName, String password)
5          throws ConnectionException, AsyncApiException {
6        ConnectorConfig partnerConfig = new ConnectorConfig();
7        partnerConfig.setUsername(userName);
8        partnerConfig.setPassword(password);
9        partnerConfig.setAuthEndpoint("https://login.salesforce.com/services/Soap/u/54.0");
10        // Creating the connection automatically handles login and stores
11        // the session in partnerConfig
12        new PartnerConnection(partnerConfig);
13        // When PartnerConnection is instantiated, a login is implicitly
14        // executed and, if successful,
15        // a valid session is stored in the ConnectorConfig instance.
16        // Use this key to initialize a BulkConnection:
17        ConnectorConfig config = new ConnectorConfig();
18        config.setSessionId(partnerConfig.getSessionId());
19        // The endpoint for the Bulk API service is the same as for the normal
20        // SOAP uri until the /Soap/ part. From here it's '/async/versionNumber'
21        String soapEndpoint = partnerConfig.getServiceEndpoint();
22        String apiVersion = "54.0";
23        String restEndpoint = soapEndpoint.substring(0, soapEndpoint.indexOf("Soap/"))
24            + "async/" + apiVersion;
25        config.setRestEndpoint(restEndpoint);
26        // This should only be false when doing debugging.
27        config.setCompression(true);
28        // Set this to true to see HTTP requests and responses on stdout
29        config.setTraceMessage(false);
30        BulkConnection connection = new BulkConnection(config);
31        return connection;
32    }

This BulkConnection instance is the base for using the Bulk API. The instance can be reused for the rest of the application lifespan.

Create a Job

After creating the connection, create a job. Data is always processed in the context of a job. The job specifies the details about the data being processed: which operation is being executed (insert, update, upsert, or delete) and the object type. This code creates a new insert job on the Account object.

1/**
2     * Create a new job using the Bulk API.
3     * 
4     * @param sobjectType
5     *            The object type being loaded, such as "Account"
6     * @param connection
7     *            BulkConnection used to create the new job.
8     * @return The JobInfo for the new job.
9     * @throws AsyncApiException
10     */
11    private JobInfo createJob(String sobjectType, BulkConnection connection)
12          throws AsyncApiException {
13        JobInfo job = new JobInfo();
14        job.setObject(sobjectType);
15        job.setOperation(OperationEnum.insert);
16        job.setContentType(ContentType.CSV);
17        job = connection.createJob(job);
18        System.out.println(job);
19        return job;
20    }

When a job is created, it’s in the Open state. In this state, new batches can be added to the job. When a job is Closed, batches can no longer be added.

Add Batches to the Job

Data is processed in a series of batch requests. Each request is an HTTP POST containing the data set in XML format in the body. Your client application determines how many batches are used to process the whole data set as long as the batch size and total number of batches per day are within the limits specified in Limits.

The processing of each batch comes with an overhead. Make batch sizes large enough to minimize the overhead processing cost, and small enough to be handled and transferred easily. Batch sizes between 1,000 and 10,000 records are considered reasonable.

This code splits a CSV file into smaller batch files and uploads them to Salesforce.

1/**
2     * Create and upload batches using a CSV file.
3     * The file into the appropriate size batch files.
4     * 
5     * @param connection
6     *            Connection to use for creating batches
7     * @param jobInfo
8     *            Job associated with new batches
9     * @param csvFileName
10     *            The source file for batch data
11     */
12    private List<BatchInfo> createBatchesFromCSVFile(BulkConnection connection,
13          JobInfo jobInfo, String csvFileName)
14            throws IOException, AsyncApiException {
15        List<BatchInfo> batchInfos = new ArrayList<BatchInfo>();
16        BufferedReader rdr = new BufferedReader(
17            new InputStreamReader(new FileInputStream(csvFileName))
18        );
19        // read the CSV header row
20        byte[] headerBytes = (rdr.readLine() + "\n").getBytes("UTF-8");
21        int headerBytesLength = headerBytes.length;
22        File tmpFile = File.createTempFile("bulkAPIInsert", ".csv");
23
24        // Split the CSV file into multiple batches
25        try {
26            FileOutputStream tmpOut = new FileOutputStream(tmpFile);
27            int maxBytesPerBatch = 10000000; // 10 million bytes per batch
28            int maxRowsPerBatch = 10000; // 10 thousand rows per batch
29            int currentBytes = 0;
30            int currentLines = 0;
31            String nextLine;
32            while ((nextLine = rdr.readLine()) != null) {
33                byte[] bytes = (nextLine + "\n").getBytes("UTF-8");
34                // Create a new batch when our batch size limit is reached
35                if (currentBytes + bytes.length > maxBytesPerBatch
36                  || currentLines > maxRowsPerBatch) {
37                    createBatch(tmpOut, tmpFile, batchInfos, connection, jobInfo);
38                    currentBytes = 0;
39                    currentLines = 0;
40                }
41                if (currentBytes == 0) {
42                    tmpOut = new FileOutputStream(tmpFile);
43                    tmpOut.write(headerBytes);
44                    currentBytes = headerBytesLength;
45                    currentLines = 1;
46                }
47                tmpOut.write(bytes);
48                currentBytes += bytes.length;
49                currentLines++;
50            }
51            // Finished processing all rows
52            // Create a final batch for any remaining data
53            if (currentLines > 1) {
54                createBatch(tmpOut, tmpFile, batchInfos, connection, jobInfo);
55            }
56        } finally {
57            tmpFile.delete();
58        }
59        return batchInfos;
60    }
61
62    /**
63     * Create a batch by uploading the contents of the file.
64     * This closes the output stream.
65     * 
66     * @param tmpOut
67     *            The output stream used to write the CSV data for a single batch.
68     * @param tmpFile
69     *            The file associated with the above stream.
70     * @param batchInfos
71     *            The batch info for the newly created batch is added to this list.
72     * @param connection
73     *            The BulkConnection used to create the new batch.
74     * @param jobInfo
75     *            The JobInfo associated with the new batch.
76     */
77    private void createBatch(FileOutputStream tmpOut, File tmpFile,
78      List<BatchInfo> batchInfos, BulkConnection connection, JobInfo jobInfo)
79              throws IOException, AsyncApiException {
80        tmpOut.flush();
81        tmpOut.close();
82        FileInputStream tmpInputStream = new FileInputStream(tmpFile);
83        try {
84            BatchInfo batchInfo =
85              connection.createBatchFromStream(jobInfo, tmpInputStream);
86            System.out.println(batchInfo);
87            batchInfos.add(batchInfo);
88
89        } finally {
90            tmpInputStream.close();
91        }
92    }

When the server receives a batch, it’s immediately queued for processing. Errors in formatting aren’t reported when sending the batch. These errors are reported in the result data when the batch is processed.

To import binary attachments, use the following methods. Specify the CSV, XML, or JSON content for the batch in the batchContent parameter, or include request.txt in the attached files and pass null to the batchContent parameter. These methods are contained within the com.async.BulkConnection class:

  • createBatchFromDir()
  • createBatchWithFileAttachments()
  • createBatchWithInputStreamAttachments()
  • createBatchFromZipStream()

Tip

Close the Job

After all batches are added to a job, close the job. Closing the job ensures that processing of all batches can finish.

1private void closeJob(BulkConnection connection, String jobId)
2          throws AsyncApiException {
3        JobInfo job = new JobInfo();
4        job.setId(jobId);
5        job.setState(JobStateEnum.Closed);
6        connection.updateJob(job);
7    }

Check Status on Batches

Batches are processed in the background. The size of the data set determines how long processing takes. During processing, you can retrieve and check the status of all batches, and you can see when processing is complete.

1/**
2     * Wait for a job to complete by polling the Bulk API.
3     * 
4     * @param connection
5     *            BulkConnection used to check results.
6     * @param job
7     *            The job awaiting completion.
8     * @param batchInfoList
9     *            List of batches for this job.
10     * @throws AsyncApiException
11     */
12    private void awaitCompletion(BulkConnection connection, JobInfo job,
13          List<BatchInfo> batchInfoList)
14            throws AsyncApiException {
15        long sleepTime = 0L;
16        Set<String> incomplete = new HashSet<String>();
17        for (BatchInfo bi : batchInfoList) {
18            incomplete.add(bi.getId());
19        }
20        while (!incomplete.isEmpty()) {
21            try {
22                Thread.sleep(sleepTime);
23            } catch (InterruptedException e) {}
24            System.out.println("Awaiting results..." + incomplete.size());
25            sleepTime = 10000L;
26            BatchInfo[] statusList =
27              connection.getBatchInfoList(job.getId()).getBatchInfo();
28            for (BatchInfo b : statusList) {
29                if (b.getState() == BatchStateEnum.Completed
30                  || b.getState() == BatchStateEnum.Failed) {
31                    if (incomplete.remove(b.getId())) {
32                        System.out.println("BATCH STATUS:\n" + b);
33                    }
34                }
35            }
36        }
37    }

A batch is done when it's either failed or completed. This code loops infinitely until all the batches for the job have either failed or completed.

Get Results For a Job

You can retrieve the results of each batch when all batches are processed. Retrieve results whether the batch succeeded or failed, or even if the job was aborted, because only the result sets indicate the status of individual records. To properly pair a result with its corresponding record, the code must not lose track of how the batches correspond to the original data set. So keep the original list of batches from when they were created and use this list to retrieve results, as shown in this example:

1/**
2     * Gets the results of the operation and checks for errors.
3     */
4    private void checkResults(BulkConnection connection, JobInfo job,
5              List<BatchInfo> batchInfoList)
6            throws AsyncApiException, IOException {
7        // batchInfoList was populated when batches were created and submitted
8        for (BatchInfo b : batchInfoList) {
9            CSVReader rdr =
10              new CSVReader(connection.getBatchResultStream(job.getId(), b.getId()));
11            List<String> resultHeader = rdr.nextRecord();
12            int resultCols = resultHeader.size();
13
14            List<String> row;
15            while ((row = rdr.nextRecord()) != null) {
16                Map<String, String> resultInfo = new HashMap<String, String>();
17                for (int i = 0; i < resultCols; i++) {
18                    resultInfo.put(resultHeader.get(i), row.get(i));
19                }
20                boolean success = Boolean.valueOf(resultInfo.get("Success"));
21                boolean created = Boolean.valueOf(resultInfo.get("Created"));
22                String id = resultInfo.get("Id");
23                String error = resultInfo.get("Error");
24                if (success && created) {
25                    System.out.println("Created row with id " + id);
26                } else if (!success) {
27                    System.out.println("Failed with error: " + error);
28                }
29            }
30        }
31    }

This code retrieves the results for each record and reports whether the operation succeeded or failed. If an error occurred for a record, the code prints out the error.

Complete Quick Start Sample

Now that you're more familiar with jobs and batches, you can copy and paste the entire quick start sample and use it:

1
2import java.io.*;
3import java.util.*;
4
5import com.sforce.async.*;
6import com.sforce.soap.partner.PartnerConnection;
7import com.sforce.ws.ConnectionException;
8import com.sforce.ws.ConnectorConfig;
9
10    
11public class BulkExample {
12
13
14    public static void main(String[] args)
15      throws AsyncApiException, ConnectionException, IOException {
16        BulkExample example = new BulkExample();
17        // Replace arguments below with your credentials and test file name
18        // The first parameter indicates that we are loading Account records
19        example.runSample("Account", "myUser@myOrg.com", "myPassword", "mySampleData.csv");
20    }
21
22    /**
23     * Creates a Bulk API job and uploads batches for a CSV file.
24     */
25    public void runSample(String sobjectType, String userName,
26              String password, String sampleFileName)
27            throws AsyncApiException, ConnectionException, IOException {
28        BulkConnection connection = getBulkConnection(userName, password);
29        JobInfo job = createJob(sobjectType, connection);
30        List<BatchInfo> batchInfoList = createBatchesFromCSVFile(connection, job,
31            sampleFileName);
32        closeJob(connection, job.getId());
33        awaitCompletion(connection, job, batchInfoList);
34        checkResults(connection, job, batchInfoList);
35    }
36
37
38
39    /**
40     * Gets the results of the operation and checks for errors.
41     */
42    private void checkResults(BulkConnection connection, JobInfo job,
43              List<BatchInfo> batchInfoList)
44            throws AsyncApiException, IOException {
45        // batchInfoList was populated when batches were created and submitted
46        for (BatchInfo b : batchInfoList) {
47            CSVReader rdr =
48              new CSVReader(connection.getBatchResultStream(job.getId(), b.getId()));
49            List<String> resultHeader = rdr.nextRecord();
50            int resultCols = resultHeader.size();
51
52            List<String> row;
53            while ((row = rdr.nextRecord()) != null) {
54                Map<String, String> resultInfo = new HashMap<String, String>();
55                for (int i = 0; i < resultCols; i++) {
56                    resultInfo.put(resultHeader.get(i), row.get(i));
57                }
58                boolean success = Boolean.valueOf(resultInfo.get("Success"));
59                boolean created = Boolean.valueOf(resultInfo.get("Created"));
60                String id = resultInfo.get("Id");
61                String error = resultInfo.get("Error");
62                if (success && created) {
63                    System.out.println("Created row with id " + id);
64                } else if (!success) {
65                    System.out.println("Failed with error: " + error);
66                }
67            }
68        }
69    }
70
71
72
73    private void closeJob(BulkConnection connection, String jobId)
74          throws AsyncApiException {
75        JobInfo job = new JobInfo();
76        job.setId(jobId);
77        job.setState(JobStateEnum.Closed);
78        connection.updateJob(job);
79    }
80
81
82
83    /**
84     * Wait for a job to complete by polling the Bulk API.
85     * 
86     * @param connection
87     *            BulkConnection used to check results.
88     * @param job
89     *            The job awaiting completion.
90     * @param batchInfoList
91     *            List of batches for this job.
92     * @throws AsyncApiException
93     */
94    private void awaitCompletion(BulkConnection connection, JobInfo job,
95          List<BatchInfo> batchInfoList)
96            throws AsyncApiException {
97        long sleepTime = 0L;
98        Set<String> incomplete = new HashSet<String>();
99        for (BatchInfo bi : batchInfoList) {
100            incomplete.add(bi.getId());
101        }
102        while (!incomplete.isEmpty()) {
103            try {
104                Thread.sleep(sleepTime);
105            } catch (InterruptedException e) {}
106            System.out.println("Awaiting results..." + incomplete.size());
107            sleepTime = 10000L;
108            BatchInfo[] statusList =
109              connection.getBatchInfoList(job.getId()).getBatchInfo();
110            for (BatchInfo b : statusList) {
111                if (b.getState() == BatchStateEnum.Completed
112                  || b.getState() == BatchStateEnum.Failed) {
113                    if (incomplete.remove(b.getId())) {
114                        System.out.println("BATCH STATUS:\n" + b);
115                    }
116                }
117            }
118        }
119    }
120
121
122
123    /**
124     * Create a new job using the Bulk API.
125     * 
126     * @param sobjectType
127     *            The object type being loaded, such as "Account"
128     * @param connection
129     *            BulkConnection used to create the new job.
130     * @return The JobInfo for the new job.
131     * @throws AsyncApiException
132     */
133    private JobInfo createJob(String sobjectType, BulkConnection connection)
134          throws AsyncApiException {
135        JobInfo job = new JobInfo();
136        job.setObject(sobjectType);
137        job.setOperation(OperationEnum.insert);
138        job.setContentType(ContentType.CSV);
139        job = connection.createJob(job);
140        System.out.println(job);
141        return job;
142    }
143
144    
145
146    /**
147     * Create the BulkConnection used to call Bulk API operations.
148     */
149    private BulkConnection getBulkConnection(String userName, String password)
150          throws ConnectionException, AsyncApiException {
151        ConnectorConfig partnerConfig = new ConnectorConfig();
152        partnerConfig.setUsername(userName);
153        partnerConfig.setPassword(password);
154        partnerConfig.setAuthEndpoint("https://login.salesforce.com/services/Soap/u/54.0");
155        // Creating the connection automatically handles login and stores
156        // the session in partnerConfig
157        new PartnerConnection(partnerConfig);
158        // When PartnerConnection is instantiated, a login is implicitly
159        // executed and, if successful,
160        // a valid session is stored in the ConnectorConfig instance.
161        // Use this key to initialize a BulkConnection:
162        ConnectorConfig config = new ConnectorConfig();
163        config.setSessionId(partnerConfig.getSessionId());
164        // The endpoint for the Bulk API service is the same as for the normal
165        // SOAP uri until the /Soap/ part. From here it's '/async/versionNumber'
166        String soapEndpoint = partnerConfig.getServiceEndpoint();
167        String apiVersion = "54.0";
168        String restEndpoint = soapEndpoint.substring(0, soapEndpoint.indexOf("Soap/"))
169            + "async/" + apiVersion;
170        config.setRestEndpoint(restEndpoint);
171        // This should only be false when doing debugging.
172        config.setCompression(true);
173        // Set this to true to see HTTP requests and responses on stdout
174        config.setTraceMessage(false);
175        BulkConnection connection = new BulkConnection(config);
176        return connection;
177    }
178
179
180
181    /**
182     * Create and upload batches using a CSV file.
183     * The file into the appropriate size batch files.
184     * 
185     * @param connection
186     *            Connection to use for creating batches
187     * @param jobInfo
188     *            Job associated with new batches
189     * @param csvFileName
190     *            The source file for batch data
191     */
192    private List<BatchInfo> createBatchesFromCSVFile(BulkConnection connection,
193          JobInfo jobInfo, String csvFileName)
194            throws IOException, AsyncApiException {
195        List<BatchInfo> batchInfos = new ArrayList<BatchInfo>();
196        BufferedReader rdr = new BufferedReader(
197            new InputStreamReader(new FileInputStream(csvFileName))
198        );
199        // read the CSV header row
200        byte[] headerBytes = (rdr.readLine() + "\n").getBytes("UTF-8");
201        int headerBytesLength = headerBytes.length;
202        File tmpFile = File.createTempFile("bulkAPIInsert", ".csv");
203
204        // Split the CSV file into multiple batches
205        try {
206            FileOutputStream tmpOut = new FileOutputStream(tmpFile);
207            int maxBytesPerBatch = 10000000; // 10 million bytes per batch
208            int maxRowsPerBatch = 10000; // 10 thousand rows per batch
209            int currentBytes = 0;
210            int currentLines = 0;
211            String nextLine;
212            while ((nextLine = rdr.readLine()) != null) {
213                byte[] bytes = (nextLine + "\n").getBytes("UTF-8");
214                // Create a new batch when our batch size limit is reached
215                if (currentBytes + bytes.length > maxBytesPerBatch
216                  || currentLines > maxRowsPerBatch) {
217                    createBatch(tmpOut, tmpFile, batchInfos, connection, jobInfo);
218                    currentBytes = 0;
219                    currentLines = 0;
220                }
221                if (currentBytes == 0) {
222                    tmpOut = new FileOutputStream(tmpFile);
223                    tmpOut.write(headerBytes);
224                    currentBytes = headerBytesLength;
225                    currentLines = 1;
226                }
227                tmpOut.write(bytes);
228                currentBytes += bytes.length;
229                currentLines++;
230            }
231            // Finished processing all rows
232            // Create a final batch for any remaining data
233            if (currentLines > 1) {
234                createBatch(tmpOut, tmpFile, batchInfos, connection, jobInfo);
235            }
236        } finally {
237            tmpFile.delete();
238        }
239        return batchInfos;
240    }
241
242    /**
243     * Create a batch by uploading the contents of the file.
244     * This closes the output stream.
245     * 
246     * @param tmpOut
247     *            The output stream used to write the CSV data for a single batch.
248     * @param tmpFile
249     *            The file associated with the above stream.
250     * @param batchInfos
251     *            The batch info for the newly created batch is added to this list.
252     * @param connection
253     *            The BulkConnection used to create the new batch.
254     * @param jobInfo
255     *            The JobInfo associated with the new batch.
256     */
257    private void createBatch(FileOutputStream tmpOut, File tmpFile,
258      List<BatchInfo> batchInfos, BulkConnection connection, JobInfo jobInfo)
259              throws IOException, AsyncApiException {
260        tmpOut.flush();
261        tmpOut.close();
262        FileInputStream tmpInputStream = new FileInputStream(tmpFile);
263        try {
264            BatchInfo batchInfo =
265              connection.createBatchFromStream(jobInfo, tmpInputStream);
266            System.out.println(batchInfo);
267            batchInfos.add(batchInfo);
268
269        } finally {
270            tmpInputStream.close();
271        }
272    }
273
274
275}