Calculating degrees of parallelism

When working with tens of millions, hundreds of millions, or even billions of records in your Salesforce organization, optimizing the performance of your data loads and integrations is critical to your organization’s success. To scale your enterprise, make sure that you know how to use the tool that was engineered specifically for loading data into Salesforce quickly: the Salesforce Bulk API. The Bulk API is based on REST principles, and it’s optimized for inserting, updating, and deleting large sets of data.

You can use the Bulk API to process jobs either in serial mode or in parallel mode. Processing batches serially means running them one after another, and processing batches in parallel means running multiple batches at the same time. When you run a Bulk API job, processing more of its batches in parallel means giving that job a higher degree of parallelism, which in turn gives your overall run better data throughput.

When you boost your degrees of parallelism by optimizing your Salesforce organization's architecture and loading your large volumes of data properly, you can end up with data loads and integrations that perform orders of magnitude faster than they otherwise would. In this article, you can see these performance boosts in several test runs, dive deeper into the benefits of running jobs in parallel with the Bulk API, and learn how to optimize the Bulk API to process up to 20 million records per hour.

Table of Contents

Calculating Degrees of Parallelism

When you submit batches for processing, Salesforce uses an asynchronous message queue behind the scenes to manage the batches that enter the queuing framework. As worker threads become available to process your Bulk API jobs, Salesforce assigns them to process specific batches. Again, remember this important point: Processing more batches concurrently means having a higher degree of parallelism, and having a higher degree of parallelism means having better data throughput.

A traditional definition for a degree of parallelism might be “the number of processes or threads associated with an operation.” However, because degrees of parallelism are constantly moving targets, it’s easier to use a slightly different formula for calculating them in the Salesforce environment. Instead of calculating weighted averages of the number of parallel execution servers associated with a job, you can equate a degree of parallelism with the amount of work completed (as a duration), divided by the actual amount of time it took to complete that work.

degree of parallelism = amount of work completed (as a duration)
————————————————————————
actual amount of time it took to complete that work


To get a more concrete understanding of how you can calculate degrees of parallelism, consider this scenario. You process an integration job that inserts 100,000 records in 5 minutes. Your batch size is 10,000 records, so you have 10 total batches. For simplicity’s sake, say that it takes Salesforce 1 minute to process each batch. These are the execution times for your batches.

Calculating degrees of parallelism

Based these numbers, you know the following.

  1. The degree of parallelism for the first minute was 2. Salesforce processed two batches concurrently in the first minute.
  2. The degree of parallelism for the next three minutes was 1. Salesforce processed exactly one batch in each of those 3 minutes.
  3. The degree of parallelism for the last minute was 5. Salesforce processed five batches concurrently in the last minute.

The overall degree of parallelism for the Bulk API job is 2. The job included 10 total batches, each of which took 1 minute to be processed, and the entire Bulk API job took 5 minutes to complete. In other words, 10 minutes of work was completed in 5 minutes. If each batch had taken 2 minutes to be processed, and the entire job had completed in 5 minutes, 20 minutes’ worth of work would have been completed in 5 minutes, which would have given your run an overall degree of parallelism of 4.

Note: For small jobs that complete in only a few a minutes, degrees of parallelism can fluctuate widely. Calculating degrees of parallelism for large, long-running loads—or across more, shorter loads—generally yields more consistent numbers.

How Parallelism Affects Overall Performance

As a developer or architect, you want your data loads or integrations to run as quickly and efficiently as possible. When you create a Bulk API job, you can choose to process your job in serial mode and get a maximum degree of parallelism of 1, or you can choose to process your job in parallel mode and get the maximum degree of parallelism that the server allows. The degree of parallelism you expect might not be the degree of parallelism that you actually get, and this discrepancy might be caused by variations in load on the server or by something that you have implemented incorrectly in your architecture.

Because Salesforce is supported by the Salesforce1 multitenant platform, customers share the asynchronous processing queue and its processing power, just as they do other Salesforce resources. Salesforce uses a complex algorithm to determine which batches it should process and in what order it should process them, so you might not see exactly the same degree of parallelism in seemingly identical runs. However, if your Salesforce architecture is implemented properly, you won't need to worry about these variations. In the end, what is important for you are the degrees of parallelism that you get and whether anything within your architecture limited them, preventing you from obtaining peak load or integration performance.

For loads and integrations that are tuned, well architected, and should be processed in parallel, salesforce.com’s enterprise-scale customers typically see degrees of parallelism between 15 and 20 when they run their jobs in parallel mode. These numbers point to 15 to 20 hours of work being completed in 1 hour. You might think that these numbers don’t sound like much, but with these degrees of parallelism, you can insert or update as many as 15 to 20 million records per hour in very well optimized data loads.

I have heard data architects say that they process their records in serial mode because parallel processing seems like it’s just sharing the same Salesforce resources, and because they think their net performance won’t be any faster. This opinion couldn’t be further from the truth! To see real-world examples of why this opinion isn’t true, just keep reading.

Test Run Scenario

The test runs in this article use real-world loading strategies and a single Developer Edition organization, which features increased limits to mimic the capabilities of an enterprise-scale customer organization. In the test runs that deliver suboptimal results, you can see how poorly implemented Salesforce architectures can affect degrees of parallelism and overall throughput. In the other, better test runs, you can see how increasing degrees of parallelism can deliver huge performance boosts to organizations that load and integrate very large volumes of data.

Data Model

The organization in this scenario uses a very simple data model, which consists of two objects: the account object and the order object. The order object is set up with a fairly typical lookup relationship to the account object, which is configured as follows.

Data model for parallelism scenario

The account object is loaded with 500 records. A 500-record total isn’t a lot for most enterprise-scale customers, but it does create a realistic distribution for this scenario’s test loads, each of which consists of 1 million records. These numbers create an appropriate ratio for the lookup relationship to highlight the importance of the many architectural decisions that this article covers.

Data Generation and Loading

Because the Data Loader isn’t recommended for larger data loads, and because the examples in this scenario are intended to be realistic, the data generation and loading tools in this scenario are written in Java. The data generation program sets up CSV files that the data loading tool loads, and the data loading tool leverages the Bulk API.

The data generation program is a critical component of this article, as it sets up realistic distributions of data, loading conditions, and returned results. It can create orders that are randomly distributed among the organization’s existing accounts, and it can produce files of orders that are ordered by AccountId.

Serial Load

Under certain circumstances, you should load data serially instead of in parallel. Here’s one classic use case: When you insert group members or users who are assigned to roles—or perform any other data load that requires group membership operations—Salesforce uses organization-wide group membership locks to ensure data integrity and security. Your best option is to tell Salesforce to perform the operations one at a time. Set the concurrencyMode field on the JobInfo object to serial, then have Salesforce run your Bulk API job in serial mode so that it processes one batch after another.

The best degree of parallelism that a serial load can achieve is 1, and under normal circumstances, this is the baseline for what should be the slowest performance you get out of a data load. If you load or integrate data in parallel mode and get an overall degree of parallelism that’s less than 1, first try to maximize for parallelism using the techniques in this article. If these techniques don’t work because you cannot manage or avoid the locks, switch to serial mode.

Test Run Setup

For this test run, you load 1 million orders that are randomly distributed among the organization’s existing accounts. These orders appear in no particular order in the CSV file that you’re loading. Remember that the order object has a lookup relationship to the account object.

You set up this Bulk API job in Java as follows.

private JobInfo createJob(String sobjectType, BulkConnection connection) throws AsyncApiException {
	JobInfo job = new JobInfo();
	job.setObject(sobjectType);
	job.setOperation(OperationEnum.insert);
	job.setContentType(ContentType.CSV);
	job.setConcurrencyMode(ConcurrencyMode.Serial);
	job = connection.createJob(job);
	System.out.println(job);
	return job;
}

You build the job from 100 batches, each of which contains 10,000 orders.

private List<BatchInfo> createBatchesFromCSVFile(BulkConnection connection, JobInfo jobInfo, String csvFileName) throws IOException, AsyncApiException {
	List<BatchInfo> batchInfos = new ArrayList<BatchInfo>();
	BufferedReader rdr = new BufferedReader(
		new InputStreamReader(new FileInputStream(csvFileName))
	);
	// read the CSV header row
	byte[] headerBytes = (rdr.readLine() + "\n").getBytes("UTF-8");
	int headerBytesLength = headerBytes.length;
	File tmpFile = File.createTempFile("bulkAPIInsert", ".csv");

	// Split the CSV file into multiple batches
	try {
		FileOutputStream tmpOut = new FileOutputStream(tmpFile);
		int maxBytesPerBatch = 10000000; // 10 million bytes per batch
		int maxRowsPerBatch = 10000; // 10 thousand rows per batch
		int currentBytes = 0;
		int currentLines = 0;
		String nextLine;
		while ((nextLine = rdr.readLine()) != null) {
			byte[] bytes = (nextLine + "\n").getBytes("UTF-8");
			// Create a new batch when our batch size limit is reached
			if (currentBytes + bytes.length > maxBytesPerBatch
  				|| currentLines > maxRowsPerBatch) {
    			createBatch(tmpOut, tmpFile, batchInfos, connection, jobInfo);
    			currentBytes = 0;
    			currentLines = 0;
			}
			if (currentBytes == 0) {
    			tmpOut = new FileOutputStream(tmpFile);
    			tmpOut.write(headerBytes);
    			currentBytes = headerBytesLength;
    			currentLines = 1;
			}
			tmpOut.write(bytes);
			currentBytes += bytes.length;
			currentLines++;
		}
		// Finished processing all rows
		// Create a final batch for any remaining data
		if (currentLines > 1) {
    			createBatch(tmpOut, tmpFile, batchInfos, connection, jobInfo);
		}	
	} finally {
    	tmpFile.delete();
	}
 		return batchInfos;
}

Finally, you have Salesforce close the job after it creates all of the batches, then have it place the batches on the asynchronous message queue for processing.

public void runSample(String sobjectType, String userName, String password, String sampleFileName) throws AsyncApiException, ConnectionException, IOException {
	BulkConnection connection = getBulkConnection(userName, password);
	JobInfo job = createJob(sobjectType, connection);
	List<BatchInfo> batchInfoList = createBatchesFromCSVFile(connection, job,
    sampleFileName);
	closeJob(connection, job.getId());
	awaitCompletion(connection, job, batchInfoList);
	checkResults(connection, job, batchInfoList);
}

Test Run Results

Ready to look at the results of your run?

Results for the serial load

As you can see in this screenshot, Salesforce loaded 1 million records in 51 minutes and 30 seconds. When you drill into the batches within this job, you can see that each batch took approximately 29 seconds to load.

Drilling down into the serial load

Because the Bulk API job consisted of 100 batches, and each batch took about 29 seconds to load, the job completed about 2,900 seconds (48 minutes and 20 seconds) of work. And because the job actually took 3,090 seconds (51 minutes and 30 seconds) to finish loading the 1 million records, its degree of parallelism was approximately 0.94.


degree of parallelism = Work Completed
——————————
Run Time
= 2,900 seconds
——————————
3,090 seconds
≈ 0.94


Serial Load
Concurrency Mode Serial
Records Loaded 1 million
Records Failed 0
Run Time 51 minutes and 30 seconds
Work Completed 48 minutes and 20 seconds
Throughput 19,417 records per minute
Degree of Parallelism 0.94
Key Problem This load’s degree of parallelism was very close to 1, which was expected because you executed the run in serial mode. You might get better throughput if you set up your run differently.
Solution Run this job in parallel mode to try to increase throughput.


Salesforce.com Customer Support can use the internal Salesforce monitoring tools, which the rest of this article refers to, to verify loads’ exact degrees of parallelism. The tools show that this job's degree of parallelism was 0.95, so the 0.94 estimate was fairly accurate. Now that you have this benchmark, you return the organization to its baseline state by deleting the records that you just added. Next, you run the same job in parallel mode to see how much load performance improves.

Parallel Load

Enable parallel processing and execute the data load again.

Test Run Setup

You set up the Bulk API job in Java as follows.

private JobInfo createJob(String sobjectType, BulkConnection connection) throws AsyncApiException {
	JobInfo job = new JobInfo();
	job.setObject(sobjectType);
	job.setOperation(OperationEnum.insert);
	job.setContentType(ContentType.CSV);
	job.setConcurrencyMode(ConcurrencyMode.Parallel);
	job = connection.createJob(job);
	System.out.println(job);
	return job;
}

Test Run Results

Everything else in the Java loader remains the same as it did in the serial mode example, but the results of this run are very different. The first thing to notice is that is that a very large number of records failed to load; of the 1 million records that you attempted to load, only 198,200 were inserted successfully.

Results for the serial load


records loaded = records processed - records failed = 397,000 records - 198,800 records = 198,200 records


And even though Salesforce was able to insert only 198,200 records successfully, the internal logs show that the overall degree of parallelism for the job was a whopping 15.79. Whoa, what happened here? The server completed about 2.5 hours of work in the 9 minutes and 44 seconds that this load took to run!


work completed = run time x degree of parallelism = 584 seconds x 15.79 ≈ 9,221 seconds ≈ 2.6 hours


What is even more shocking about this parallelism scenario is how much overall work would have been wasted if you had used retry logic to successfully load 1 million records. Even though using retry logic might have enabled you to load 1 million records at slightly faster rate than the previous load did—in 20,363 records per minute instead of in 19,417 records per minute—it would have made the server work approximately 16 times as hard! The serial load completed 48 minutes and 20 seconds of work, and to load the same number of records in almost the same duration, this parallel load would have needed to complete almost 13 hours of work.


work completed = run time x degree of parallelism = 2,900 seconds x 15.79 ≈ 45,791 seconds ≈ 12.72 hours


This wasted capacity could instead have been used to complete other asynchronous processes, and running any additional asynchronous processes while running this parallel load would have affected actual run time for both this load and those additional processes.

Drill into this job a little deeper to understand why the server worked about three times as hard as it did in the serial load to load only about 20 percent of the records. First, look at a few of this load's jobs.

Drilling down into the parallel load

In this log, you can see that a huge number of failures were caused by what look like lock exceptions. Of the six pictured jobs, five jobs retried 10 times without completing, and only one job completed successfully—after nine retries. All of these failures and retries drove up the degree of parallelism, even though your run successfully inserted very few records.

Parallel Load
Concurrency Mode Parallel
Records Loaded 198,200
Records Failed 801,800
Run Time 9 minutes and 44 seconds
Work Completed 2 hours and 33 minutes and 41 seconds
Throughput 20,363 records per minute
Degree of Parallelism 15.79
Key Problem This load encountered lock exceptions, which made the server do almost three times the amount of work that it did in the serial test run to achieve almost the same throughput.
Solutions Either run the load in serial mode to increase efficiency or manage the locks to eliminate the lock exceptions.


Next, explore what can cause the type of lock failure that made this job inefficient.

Managing Locks to Maximize the Benefits of Parallelism

As you saw in the previous parallel loading example, your degree of parallelism doesn’t necessarily map directly to your level of throughput. Anything that slows overall processing or causes failures, such as lock exceptions, doesn’t decrease the number of parallel threads Salesforce allocates to complete a job. In the previous example, you saw the expected degree of parallelism for a Bulk API job that’s run in parallel mode, but the work didn’t complete any faster than when you ran the job in serial mode. In fact, the net result of the locks was that the server worked much harder to achieve about the same throughput.

So what can cause these locks and lock exceptions, and how can you manage them and their effects?

Master-Detail Relationships
When an object is in a master-detail relationship with another object, and you insert any detail records, Salesforce locks the related master records to ensure referential integrity. If detail records that look up to the same master record are inserted simultaneously in separate batches, there’s a high risk that those inserts will cause lock exceptions. Therefore, when you’re loading these types of records, either ensure that no master records’ IDs span multiple batches or order your loads by the master records’ IDs; these strategies minimize how often the detail records being inserted in multiple, concurrent batches reference a single master record’s ID.

Lookup Relationships
For objects that have a certain type of lookup relationship to another object, something similar happens: When you insert or update the base object’s records, Salesforce locks the target lookup records to ensure referential integrity. If records that have this type of lookup relationship to the same lookup record are inserted simultaneously in separate batches, there’s a high risk that those inserts will cause lock exceptions.

Note the “this type of lookup relationship” in the first sentence. For optional lookup fields, you can avoid the locks by setting the Clear the value of this field option, which does more than just tell Salesforce what to do if your lookup record is deleted. When you set this option, whenever a record that has a lookup field to this lookup record is inserted or updated, Salesforce doesn't lock the lookup records; instead it only validates that the lookup values exist. To avoid unnecessary locks, it’s best to set this option for your lookup fields whenever possible. If it isn’t possible to configure your lookup fields with this option, use a workaround similar to the one you would use for master-detail relationships: Ensure that no lookup values span multiple batches or order your loads by their lookup values to minimize how often the records being inserted in multiple, concurrent batches reference a single lookup value.

Roll-up Summary Fields
When an object is in a master-detail relationship with another object, the master object has roll-up summary fields, and you update any detail records that affect the master records’ roll-up summary fields, Salesforce locks those master records so it can update the appropriate roll-up summary field values. If detail records that look up to the same master record are updated simultaneously in separate batches, and those updates affect roll-up summary fields on the master record, there is a high risk that these updates will cause lock exceptions. In this type of situation, your best option is to remove the roll-up summary fields if you don’t need them.

In some cases, you can also use reporting as an alternative. When the detail records alone have all of the data that you need, you can create a summary report on them. Otherwise, you can create a summary report that spans the master-detail relationship that the roll-up summary fields were created on. In any case, when you do need roll-up summary fields, order your loads by the master object’s record IDs; this strategy minimizes how often the records being updated in multiple, concurrent batches reference a single master record’s ID.

Triggers
Although the flexibility of triggers is a great asset, triggers can cause many kinds of problems in your loads and integrations. Locks are one of those problems. When you’re loading records that fire one or more triggers, and any of those triggers performs either a select for update or a DML operation on records other than the record that you’re inserting, updating, or deleting, Salesforce locks those other records. These locks can in turn cause lock exceptions. Consider disabling trigger logic for your loads and integrations, or having a special, optimized code path for such activities.

Workflow Rules
When workflow rules trigger field updates, Salesforce locks the records that they update. And when multiple threads try to update the same records simultaneously, they can cause lock exceptions. Consider defining your workflow rules so that they don’t execute during loads and integrations.

Group Membership Locks
For a few special operations, Salesforce uses organization-wide group membership locks. To avoid lock exceptions when performing the following operations, you must use serial processing for your data load.

  • Adding users who are assigned to roles
  • Changing users’ roles
  • Adding a role to the role hierarchy or a territory to the territory hierarchy
  • Changing the structure of the role hierarchy or the territory hierarchy
  • Adding or removing members from public or personal groups, roles, territories, or queues
  • Changing the owner of an account that has at least one community role or portal role associated with it to a new owner who is assigned to a different role than the original owner

When performing operations that require group membership locks as part of a request, trim your code so that Salesforce executes only what is required for group membership operations. The additional code that Salesforce executes within your request might hold locks when they aren't needed, increasing the probability that lock exceptions will occur.

Overlapping Runs
When you execute recurring jobs, and your jobs overlap, this overlap can cause unintended lock exceptions on both the base objects and any relationships, code, or workflow rules that Salesforce executes as a part of the job. Schedule plenty of time between your recurring jobs’ runs to ensure that they complete without overlapping. If there’s a risk that your jobs might overlap, have each job verify that the previous job finishes executing before it begins processing its data.

The Big Takeaway
As you can see, there are many ways in which loads and integrations might encounter locks. You might think that building retry logic into your loads and integrations is the best way to resolve these types of issues, but as you optimize your job’s degree of parallelism, you’ll see just how incorrect that way of thinking is.

In the previous test run, a lookup relationship that was defined on the order object and linked to the account object caused lock exceptions. You know that you have two options for resolving exceptions caused by lookup relationships: ordering the records in the load by AccountId or changing the lookup configuration to Clear the value of this field when the lookup record is deleted.

Try both strategies, then compare your results. Which strategy do you think will be faster?

Managing Locks Load #1 - Not Locking Lookup Value

Ready for the first “managing locks” test run?

Test Run Setup

In this run, you make one very simple change to the configuration of the Order__c object: configuring the account lookup field to Clear the value of this field when the lookup record is deleted.

Setup for the first managing locks load

The loading code and the data being loaded are identical to what appeared in our previous parallel job, which encountered many lock exceptions and failures.

Test Run Results

Ready to look at the results of your run?

Results for the first managing locks load

Unlike the first two test runs, each of which took almost an hour to perform, this run encountered no failures, and its performance was exceptional. The job ran in 3 minutes and 4 seconds, and successfully loaded all 1 million records. From the internal server logs, you can also tell that its degree of parallelism was 19. At this rate, you’re loading almost 20 million records per hour!


throughput = Records Loaded
———————––––
Run Time
= 1 million records
———————————
((3 x 60) + 4) seconds
≈ 5,434.8 records per second ≈ 20 million records per hour


Managing Locks Load #1 - Not Locking Lookup Value
Concurrency Mode Parallel
Records Loaded 1 million
Records Failed 0
Run Time 3 minutes and 4 seconds
Work Completed 58 minutes and 16 seconds
Throughput 326,088 records per minute
Degree of Parallelism 19
Key Problems None
Solutions n/a


Managing Locks Load #2- Locking Lookup Value

Next, reconfigure the account lookup field so that it does lock the account record, then order the load file by AccountId.

Test Run Setup

You start by defining how the lookup field that’s configured on the order object behaves when its lookup record is deleted. You set the Don’t allow deletion of the lookup record that’s part of a lookup relationship option, which locks account records when their associated order records are inserted; this is the locking behavior that you saw in the first two test runs that you executed.

Next, you order the data load file by AccountId to minimize the how often the records being inserted in multiple, concurrent batches reference a single lookup value. Remember that in this scenario, the account object is loaded with 500 accounts, and you’re loading 1 million orders that have a random distribution of account IDs. This 500 accounts-to-1 million orders ratio gives you approximately 2,000 orders for each account that you’re loading. Again, the rest of this load, including its parallel loading Java program, is identical to what you used in your previous parallel loading test run.

Do you think that this parallel run, which will encounter locks, will complete faster or slower than that previous parallel run, which encountered no locks?

Test Run Results

This job successfully inserted all 1 million records in 3 minutes and 54 seconds, making it slightly slower than the previous “managing locks” job, which didn’t place locks on the orders’ parent accounts.

Results for the second managing locks load

This job completed much faster than the jobs in your first two test runs, but it also ran a fair amount slower than the job that didn’t lock the parent accounts. You might assume that the overhead of the locks on the account record caused this time difference, but you must dig into the logs to determine what really happened. Again, start by looking at the degree of parallelism. According to the logs, this run’s degree of parallelism was 16.5. That number certainly accounts for some of the processing improvement, but check if your run encountered any lock exceptions.

Drilling down into the second managing locks laod

As you can see from the bulk data load job detail page, the job featured no failed batches, failed records, or retries, so ordering the load file by AccountId worked; you completely avoided the lock exceptions and didn’t need to remove any locks in the configuration.

Next, look at the internal logs to see the batches’ average execution times, which show you if your batches took longer to load in this run than they did in the last run. The average execution time for each batch in this job was 34,304 milliseconds or about 34.3 seconds. In the previous job, the average execution time for each batch was 31,710 milliseconds or about 31.7 seconds.

Finally, to determine approximately how much of this job’s total additional processing time came from that additional execution time, multiply the difference in the average batch execution times by the total number of batches, then divide by this job's degree of parallelism.


(34.3 seconds per batch - 31.7 seconds per batch) 100 batches
(——————)
16.5
= 15.75 seconds


All that this number means is that, for each of this job's approximately 16.5 parallel threads, the platform spent an average of 15.75 more seconds executing batches. Because this 15.75 second-average is based on concurrent threads, you can subtract it from the difference in the two jobs' execution times, then see how much of that processing difference came from the difference in degrees of parallelism.


3 minutes and 54 seconds - 3 minutes and 4 seconds = 50 seconds


Of that 50-second difference in execution times, 34.25 seconds came from the difference in degrees of parallelism.


50 seconds - 15.75 seconds = 34.25 seconds


Managing Locks Load #2: Locking Lookup Value
Concurrency Mode Parallel
Records Loaded 1 million
Records Failed 0
Run Time 3 minutes and 54 seconds
Work Completed 1 hour and 4 minutes and 35 seconds
Throughput 256,410 records per minute
Degree of Parallelism 16.5
Key Problem This run was well optimized, but it could have been better optimized. The previous parallel run showed that not locking the lookup value could have shaved almost 16 seconds off of this run’s execution time.
Solution Configure the account lookup field to Clear the value of this field when the lookup record is deleted.


Parallelism and Throughput

Increasing your degree of parallelism can certainly increase your overall throughput, but an increase in a degree of parallelism doesn't necessarily cause a comparable improvement in throughput. Your serial load had a degree of parallelism of 0.94 and a throughput of 19,417 records per minute, and your first parallel run, which encountered lock exceptions, had a degree of parallelism of 15.79 and a throughput of 20,363 records per minute. In fact, in some cases, you can end up increasing your degree of parallelism and decreasing your overall throughput.

Your degree of parallelism is only half of your data loading or integration performance picture. The other half is your architecture’s ability to maximize the extra work it can complete successfully as your degrees of parallelism increase. Understanding the many circumstances in which record locks occur—and when to use or avoid record locks—is the key to maximizing that completed work. Whenever you set up a job for parallel processing, analyze how locks might affect the job to ensure maximum throughput. Of course, tuning your job also has a significant impact on overall throughput, but such tuning is well beyond the scope of this article.

When you’re trying to maximize your jobs’ degrees of parallelism, consider one other extremely important point: When you use the Bulk API to integrate or load data, an application is doing the integrating or loading for you. This application might be a middleware product, or it might be a customized program similar to the one you’re using for the test loads in this article. Many architects don’t understand how much their tool can affect their degrees of parallelism.

Next, change your loading tool so you can deliver your 1 million orders in controlled feed loads, which run jobs one after the other as those jobs are processed by the Salesforce server. Note that even though you’re running your load’s jobs one after another, you can still set a parallel or serial concurrency mode for your load’s batches.

Controlled Feed Load #1 - One Batch per Job

In your first controlled feed load, you create individual jobs for each of your 10,000-record batches.

Test Run Setup

Your loader code looks like this.

private List<BatchInfo> createBatchesFromCSVFileOneAtATime(String sobjectType, BulkConnection connection, String csvFileName) throws IOException, AsyncApiException {
	List<BatchInfo> batchInfos = new ArrayList<BatchInfo>();
	BufferedReader rdr = new BufferedReader(
		new InputStreamReader(new FileInputStream(csvFileName))
	);
	// read the CSV header row
	byte[] headerBytes = (rdr.readLine() + "\n").getBytes("UTF-8");
	int headerBytesLength = headerBytes.length;
	File tmpFile = File.createTempFile("bulkAPIInsert", ".csv");
	JobInfo job;

	// Split the CSV file into multiple batches
	try {
		FileOutputStream tmpOut = new FileOutputStream(tmpFile);
		int maxBytesPerBatch = 10000000; // 10 million bytes per batch
		int maxRowsPerBatch = 10000; // 10 thousand rows per batch
		int currentBytes = 0;
		int currentLines = 0;
		String nextLine;
		while ((nextLine = rdr.readLine()) != null) {
			byte[] bytes = (nextLine + "\n").getBytes("UTF-8");
			// Create a new batch when our batch size limit is reached
			if (currentBytes + bytes.length > maxBytesPerBatch
  				|| currentLines > maxRowsPerBatch) {

    			job = createJob(sobjectType, connection);
   		 		createBatch(tmpOut, tmpFile, batchInfos, connection, job);
   		 		closeJob(connection, job.getId());
   			 	awaitCompletion(connection, job, batchInfos);
   		 		checkResults(connection, job, batchInfos);
   		 		tmpFile.delete();
    			batchInfos.clear();

    			currentBytes = 0;
    			currentLines = 0;
			}
			if (currentBytes == 0) {
    			tmpOut = new FileOutputStream(tmpFile);
    			tmpOut.write(headerBytes);
    			currentBytes = headerBytesLength;
    			currentLines = 1;
			}
			tmpOut.write(bytes);
			currentBytes += bytes.length;
			currentLines++;
		}
		// Finished processing all rows
		// Create a final batch for any remaining data
		if (currentLines > 1) {
			job = createJob(sobjectType, connection);
   			createBatch(tmpOut, tmpFile, batchInfos, connection, job);
   			closeJob(connection, job.getId());
   			awaitCompletion(connection, job, batchInfos);
   		 	checkResults(connection, job, batchInfos);
   			tmpFile.delete();
    		batchInfos.clear();
		}
	} finally {
		tmpFile.delete();
	}
	return batchInfos;
}

To try to maximize the degree of parallelism for our load, you set the concurrency mode for the jobs that you’re creating to Parallel.

private JobInfo createJob(String sobjectType, BulkConnection connection) throws AsyncApiException {
	JobInfo job = new JobInfo();
	job.setObject(sobjectType);
	job.setOperation(OperationEnum.insert);
	job.setContentType(ContentType.CSV);
	job.setConcurrencyMode(ConcurrencyMode.Parallel);
	job = connection.createJob(job);
	System.out.println(job);
	return job;
}

Because you have kept the lookup relationship that locks the related account record, you’re also going to use the data load file that’s ordered by AccountId to ensure that your run doesn’t encounter any lock exceptions. The idea behind this loading approach is that it allows data extraction from an external system and Salesforce loading to occur in parallel. Because you’re loading 10,000 records at a time in parallel mode, you should get a loading performance that’s similar to the performance that you saw in your previous, optimized parallel load, right? Read on to see if that’s the case!

Test Run Results

Because you ran an individual job for each of your 100 batches, you have 100 jobs within this run. Here are a few of those jobs.

Results for the first controlled feed load

You might already have noticed that these batches took about as much time to complete as the batches in your optimized parallel runs did. Unlike those other runs, this run took 56 minutes to successfully insert all 1 million orders. In addition, the internal logs show that this run’s overall degree of parallelism was 0.83, a number that’s below both your previous degrees of parallelism and 1, the baseline degree of parallelism for data loads under normal circumstances.

So from both a performance and a parallelism perspective, this is the worst test run so far, and the poor results have nothing to do with your Salesforce architecture or the data that you’re loading! Your loading tool made this run suboptimal. It gave only a single batch of data to a single thread at any given time, preventing Salesforce from processing individual batches in parallel. Essentially, the loading tool forced the load to run serially, even though you defined a parallel concurrency mode.

Controlled Feed Load #1: One Batch per Job
Concurrency Mode Parallel
Records Loaded 1 million
Records Failed 0
Run Time 56 minutes
Work Completed 46 minutes and 48 seconds
Throughput 17,857 records per minute
Degree of Parallelism 0.83
Key Problems The loading tool placed only a single job and batch on the queue at a time. This load performed even slower than the load in which you processed batches in serial mode.
Solution Increase the number of batches per job or the number of jobs placed on the queue.


Because customers do often use controlled feed loads when they don’t want to load too much data at once or in a single job, use a controlled feed load again in the next run.

Controlled Feed Load #2 - 10 Batches per Job

The load performance in the previous run wasn’t good—the loading tool placed only a single job and batch on the queue at a time, which left threads idle, reducing both throughput and parallelism. To improve performance in this next controlled feed load, ensure that the server has multiple batches queued up so that they can be processed in concurrent threads.

Test Run Setup

Like the jobs in your previous run, each job in this run will wait until its previous job has completed before being submitted for processing. Unlike those other jobs, each of the jobs in this run contains 100,000 records, giving you 10 batches of 10,000 orders per job.

Your loader code looks like this.

private List<BatchInfo> createBatchesFromCSVFileOneAtATime(String sobjectType, BulkConnection connection, String csvFileName) throws IOException, AsyncApiException {
	List<BatchInfo> batchInfos = new ArrayList<BatchInfo>();
	BufferedReader rdr = new BufferedReader(
		new InputStreamReader(new FileInputStream(csvFileName))
	);
	// read the CSV header row
	byte[] headerBytes = (rdr.readLine() + "\n").getBytes("UTF-8");
	int headerBytesLength = headerBytes.length;
	File tmpFile = File.createTempFile("bulkAPIInsert", ".csv");
	JobInfo job;

	// Split the CSV file into multiple batches
	try {
		FileOutputStream tmpOut = new FileOutputStream(tmpFile);
		int maxBytesPerBatch = 10000000; // 10 million bytes per batch
		int maxRowsPerBatch = 10000; // 10 thousand rows per batch
		int maxBatchesPerJob = 10;
		int currentBytes = 0;
		int currentLines = 0;
		int currentBatch = 0;

		String nextLine;

		job = createJob(sobjectType, connection);

		while ((nextLine = rdr.readLine()) != null) {
			byte[] bytes = (nextLine + "\n").getBytes("UTF-8");
			// Create a new batch when our batch size limit is reached
			if (currentBytes + bytes.length > maxBytesPerBatch
  				|| currentLines > maxRowsPerBatch) {

           	 
				createBatch(tmpOut, tmpFile, batchInfos, connection, job);
				currentBatch++;

				if (currentBatch == 10) {
					closeJob(connection, job.getId());
						awaitCompletion(connection, job, batchInfos);
					checkResults(connection, job, batchInfos);
					tmpFile.delete();
  					batchInfos.clear();
					currentBatch = 0;
  					job = createJob(sobjectType, connection);
 				}

        		currentBytes = 0;
        		currentLines = 0;
			}
			if (currentBytes == 0) {
    			tmpOut = new FileOutputStream(tmpFile);
    			tmpOut.write(headerBytes);
    			currentBytes = headerBytesLength;
    			currentLines = 1;
			}
			tmpOut.write(bytes);
			currentBytes += bytes.length;
			currentLines++;
		}
		// Finished processing all rows
		// Create a final batch for any remaining data
		if (currentLines > 1) {
   		 	createBatch(tmpOut, tmpFile, batchInfos, connection, job);
   		 	closeJob(connection, job.getId());
   			awaitCompletion(connection, job, batchInfos);
   		 	checkResults(connection, job, batchInfos);
   		 	tmpFile.delete();
            batchInfos.clear();
		}
	} finally {
		tmpFile.delete();
	}
	return batchInfos;
}

Like the previous run, this run creates its jobs using the parallel concurrency mode.

private JobInfo createJob(String sobjectType, BulkConnection connection) throws AsyncApiException {
	JobInfo job = new JobInfo();
	job.setObject(sobjectType);
	job.setOperation(OperationEnum.insert);
	job.setContentType(ContentType.CSV);
	job.setConcurrencyMode(ConcurrencyMode.Parallel);
	job = connection.createJob(job);
	System.out.println(job);
	return job;
}

Because each job in this scenario queues up its batches in parallel, you should see lightning-fast loading performance, right? You perform the run to see what your results actually are.

Test Run Results

As the monitoring page shows, you did successfully load your 10 jobs of 100,000 orders.

Results for the second controlled feed load

But your 1 million orders took 10 minutes to load, and the internal logs show a degree of parallelism of only 5.9 across the entire load. The load time was about 60 percent slower here than it was in the optimized run that took 3 minutes and 54 seconds and loaded the same number of records. Saving that 6 minutes and 4 seconds might seem insignificant, but you can imagine how much of an impact a 60-percent reduction in load time would make if you were loading hundreds of millions or billions of records!

Analyze why this suboptimal run featured a 60-percent reduction in throughput. The first thing to understand is that each of the 100 batches contained 10,000 records, giving the run only 10 batches per job. Because only one job was placed on the queue at a time, Salesforce could process only 10 total batches in parallel. This number alone reduced the run’s best possible degree of parallelism to 10.

So what reduced the degree of parallelism from 10 to 5.9? Not all of the available threads were working for the entire duration of the run. During each job, when a thread finished processing its batch, that thread became available for processing—without having any additional work to process. So each job had a degree of parallelism of 10 when all 10 of its batches were being processed, but that number decreased as the threads finished processing their batches. At the very end of each job, when the server was finishing processing that job’s last batch, only a single thread was doing any work. Similarly, when Salesforce finished a job and placed a new one on the queue, the threads had to start picking up batches to process them.

Controlled Feed Load #2 - 10 Batches per Job
Concurrency Mode Parallel
Records Loaded 1 million
Records Failed 0
Run Time 10 minutes
Work Completed 59 minutes
Throughput 100,000 records per minute
Degree of Parallelism 5.9
Key Problems The loading tool placed only a single 10-batch job on the queue at a time. This controlled feed load performed significantly better than the previous one, but it still performed slower than the first two parallel loads.
Solution Ensure that the processing queue always has at least 20 batches to process either within a single job or across multiple jobs.


Controlled Feed Best Practices

Compared to the first two parallel test runs, the two controlled feed loads didn’t have great degrees of parallelism. These performance differences are no accident, and this is the big takeaway, which all of the following best practices support: Whenever possible, ensure that the asynchronous message queue’s threads always have work to process when they finish with—and are thus free from—their current work.

  • Avoid using controlled feed loads if possible. When some threads in controlled feed loads finish processing their batches before others do, they cannot pick up any new work until the current job finishes, and Salesforce places a new job on the queue. As both of the previous controlled feed loads show, sending jobs to the queue one after another can reduce overall throughput, even when batches are processed in parallel concurrency mode.
  • When you must use controlled feed loads, increase your number of batches and decrease your number of jobs. Following this best practice minimizes the resources that are wasted when threads have no work to complete.
  • As for all other loads, it’s best to keep at least 20 batches on the queue to optimize your overall degree of parallelism and throughput. Your best-optimized load had a degree of parallelism of 19, and because you cannot guarantee that all of your threads will be processing batches nonstop for the entire duration of a run, it’s best to include more than 19 batches in each job. This best practice gives you yet another way to maximize the amount of work completed during your loads and integrations.

Advanced Parallelism Scenario

OK! Now it’s time to treat yourself to some extra credit. See if you can apply the concepts that you just learned to a more complex scenario.

Scenario Setup

The Acme Wireless organization needs to load 100 million orders per day in its integration, but some of its major accounts have significant data skew, which can cause lock contention and reduce database concurrency. Of Acme Wireless’ 500 customer accounts, 30 are major accounts that have approximately 100,000 orders that must be integrated every day. Because the order object is in a master-detail relationship with the account object, Acme Wireless must address lock contention.

To further complicate matters, the order object has two additional lookup relationships. One lookup field goes to the contact object and must be required on orders, and the other goes to the catalog object to track which catalog customers are ordering from. The Don’t allow deletion of the lookup record that’s part of a lookup relationship option is set for both of the lookup relationships.

Given what you have learned in this article, how do you think that Acme Wireless should structure its load to ensure optimal loading performance?

  • a) Huh?
  • b) Just load the records and see what happens.
  • c) Implement retry logic to manage the locks.
  • d) None of the above

Scenario Challenges and Strategies

If you picked “d,” you’re right! Try to break this run down into its individual locking challenges so you can address them one at a time.

Locking Challenge #1: The Master-Detail Relationship

The order object is in a master-detail relationship with the account object. Because the loading requirements in this scenario must factor in this master-detail relationship, you must deal with the account locks that the relationship causes. Based on what you have seen in your previous test runs, you know that you can order the records in your load file by AccountId to avoid lock exceptions.

Locking Challenge #2: The Lookup Relationship to the Contact Object

The order object is in a lookup relationship with the contact object. Because each contact in this scenario is always a contact on an associated account record, ordering the records in your load file by AccountId helps you avoid locks on the contact records as well. Whew, you lucked out there.

Locking Challenge #3: The Lookup Relationship to the Catalog Object

The order object is also in a lookup relationship with the catalog object. Because many records are probably associated with relatively few catalogs, you probably have significant data skew, which will also prompt a significant number of locks on the catalog records. When your lookup field is optional, the easiest way to avoid these locks is to set the Clear the value of this field option, which does more than just tell Salesforce what to do if your lookup record is deleted. If you set this option, whenever a record that has a lookup field to your lookup record is inserted or updated, Salesforce doesn't lock the lookup records; instead it only validates that the lookup values exist. All you need to worry about now are the locks that might occur on the parent account. Remember the great performance results that you saw while avoiding these locks in your previous runs? Well, to make things difficult, assume that the lookup relationship to the catalog object must be configured so that it locks the catalog records.

There are two high-level approaches for resolving this locking problem. For both approaches, you must generate load files that minimize how often records being updated in multiple, concurrent batches reference the same account IDs and catalog IDs at the same time.

  • Approach #1: In the easiest approach, you split your run in two. In the first load, you order your orders by AccountId, then load your jobs in parallel without updating any CatalogId values. In the second load, you update the CatalogId values in the orders that you just loaded.
  • Approach #2: However, that first approach might not give you optimal degrees of parallelism or throughput. When you do need peak loading performance, consider the following approach.

For the second approach, you must take some additional steps to minimize how often Salesforce references the same account IDs or catalog IDs at the same time. Because you're updating CatalogId values in this run, you must define an intelligent loading tool that runs your jobs in a controlled feed load to avoid lock exceptions. To achieve maximum throughput, you must also process each job’s batches in parallel mode.

Remember that you have 50 catalogs and 500 customer accounts, and 100 million orders that you must load every day for your integration. For each day’s load, you create a master load file for your 100 million orders that is sorted first by account, then by catalog. You then use this master load file to build your smaller load files, each of which gets its own job.

Step #1: Start Building Your Load Files
To build each job’s load files, you start at the top of the master load file, then group a single account’s orders from a single catalog while working your way down the list of orders. You place the first account’s orders from the first catalog in the first job’s load file, the first account’s orders from the second catalog in the second job’s load file, and so on. When you finish going through this process for the first account, you follow it again for every subsequent account. So for the second account, you work your way down the list again, placing the second account’s orders from the first catalog in the first job’s load file, the second account’s orders from the second catalog in the second job’s load file, and so on.

Step #2: Shift Orders That Push Your Load Files to Your Batch Size Limit
You continue building load files like this until adding an account’s orders from a catalog to a job would cause that job to exceed 10,000 orders, the Bulk API’s batch size limit. At this point, you must shift the account’s orders to a different load file; without this shift, the orders would move to a new batch in the same job, where Salesforce might reference identical catalog IDs in the batches that it’s processing concurrently.

You can shift orders in any direction: to the next load file, to the previous load file, or to any other load file. The important thing to remember is that you don’t want any job to process more than one batch containing a single account’s orders from a single catalog.

In this scenario, assume that you’re shifting orders to the previous load file. Instead of adding account 4’s orders from catalog 1 to file 1, you add them to the last file in your load. The account’s orders from the other catalogs shift accordingly. The account 4’s orders from catalog 2 shift to the file 1, the account 4’s orders from catalog 3 shift to file 2, and so on. This diagram shows where and how you’re completing this shift, the first shift in your run.

How to shift orders in large data loads and integrations

To prevent this advanced scenario from becoming even more complicated, also assume that no single account placed more than 10,000 orders from a single catalog. You group and shift your orders like this. (Note that this diagram addresses only the first four files and the first nine accounts, but the complete diagram would also include the run’s other accounts and catalogs.)

Grouping and shifting your orders
  1. Account 4’s orders from catalog 1 would make file 1 exceed 10,000 orders, so these orders must be moved to a different file (the last file, file 50).
  2. Account 6’s orders from catalog 5 would make file 4 exceed 10,000 orders, so these orders must be moved to a different file (file 3).
  3. Account 7’s orders from catalog 4 would make file 2 exceed 10,000 orders, so these orders must be moved to a different file (file 1).

Note: This table assumes a relatively equal distribution of each account’s orders from each catalog. In reality, each batch wouldn’t necessarily have approximately the same number of records, and some batches in the same job files might reference the same account ID or catalog ID at the same time. The key to this solution is that you are minimizing this overlap, just as you did when you ordered your load file by AccountId in the previous parallel run.

Step #3: Create One or More Additional Sets of Job Files
These shifts continue until a shift would cause an account’s orders from a catalog to enter a file for a second time, which cannot happen without significantly increasing the number of orders in a job that reference the same catalog ID across multiple batches. To avoid this problem, you must close your current set of job files, create new job files, and then modify your loading tool so that it strings your new set of files to your first set of files.

Locking Challenge #4: Account Skew

You have one more challenge to deal with. Acme Wireless still has 30 accounts with approximately 100,000 orders. Even if you order your batches by AccountId, each of those batches can have a maximum of only 10,000 orders, and Salesforce will lock the order inserts. How do you think that you can solve this problem?

Without the catalog lookup, the skew scenario would be very easy to deal with and have minimal impact to throughput if you followed these best practices.

  • Generate a load file for each account that has approximately 100,000 orders, as well as a single file containing the orders from the accounts that don’t have data skew. For example, without the lookup to CatalogId, you would have 31 load files: one file for each of the 30 major accounts and one file for the other accounts’ orders, which are ordered by AccountId. Make that middleware work!
  • You can then build an intelligent loading tool that places all 31 load files on the queue for concurrent processing, and runs the 30 jobs associated with the major accounts in serial mode and the single job associated with the remaining accounts in parallel mode.

Because you do have this catalog lookup, you can apply the same principles that you used to avoid locks on your catalog records. Each of the skewed accounts placed about 2,000 orders from each catalog—100,000 orders divided by 50 catalogs—so these orders fit comfortably within your 10,000-record batches.

Conclusion

When you use the Bulk API to perform a load or integration, two factors can affect your level of throughput: your load’s or integration’s degree of parallelism, and the amount of work each thread must perform to complete the run. In other words, tuning your jobs is only half the battle. You could have a well-tuned load or integration that runs at a snail’s pace if you don’t optimize your run’s overall degree of parallelism.

When you do optimize your load’s or integration’s degree of parallelism, you leverage the power of the Force.com platform, making multiple concurrent threads perform up to 15-20 times as much work as they would in a serial run. So if a load is taking four days to complete in serial mode, you could reduce that time to 5-7 hours, simply by optimizing your degree of parallelism!

Just remember the following best practices when you’re planning your next load or integration that uses the Bulk API.

Use the Correct Concurrency Mode
When you cannot avoid lock exceptions, as you cannot with group membership operations, always load or integrate data using the serial concurrency mode to ensure that you’re loading your records efficiently. When your data load or integration doesn’t cause lock exceptions, use the parallel concurrency mode. And when master-detail relationships or lookup relationships cause lock exceptions, manage them, then configure your job to use the parallel concurrency mode.

Manage Locks
Locks can wipe out a job’s best possible throughput, and a well-optimized job encounters no lock exceptions. To know how to manage locks, you must first understand the problems that they can cause. Locks that cause lock exceptions in your jobs also cause concurrent threads to perform work that results in failure. In many cases, this failure can cause your job to run slower than it would in serial mode. Adjust relationships, order data, and organize jobs and their batches to ensure that your job does not encounter any lock exceptions. Never manage locks by implementing your own retry logic in your load or integration.

Keep a Full Queue
The Force.com platform cannot process what it doesn’t have the opportunity to process, and slowly feeding batches and jobs to Salesforce causes platform threads to sit idle when they could be processing batches. Always try to keep at least 20 batches on the queue at any given time. If you aren’t doing that, then you probably could be processing your job faster than you currently are. Ensure that your middleware or loading tool orders data files correctly, and creates jobs and batches that both keep the queue full and eliminate lock exceptions within the job.

By using the concepts and best practices described in this article, you should be able to process just about any integration or load in lightning speed, and ensure that your architecture scales for enterprise growth.

Related Resources

About the Author

Sean Regan is an Architect Evangelist within the Technical Enablement team of the salesforce.com Customer-Centric Engineering group. The team’s mission is to help customers understand how to implement technically sound salesforce.com solutions. Check out all of the resources that this team maintains on the Architect Core Resources page of Developer Force.