I am performing MANY concurrent SQL INSERT statements which are colliding on a UNIQUE KEY constraint, even though I am also checking for existing records for the given key inside of a single transaction. I am looking for a way to eliminate, or minimize, the amount of collisions I am getting without hurting the performance (too much).
Background:
I am working on an ASP.NET MVC4 WebApi project which receives A LOT of HTTP POST requests to INSERT records. It gets about 5K - 10K requests a second. The project's sole responsibility is de-duplicating and aggregating records. It is very write heavy; it has a relatively small amount of read requests; all of which use a Transaction with IsolationLevel.ReadUncommitted.
Database schema
Here is the DB table:
CREATE TABLE [MySchema].[Records] ( 
    Id BIGINT IDENTITY NOT NULL, 
    RecordType TINYINT NOT NULL, 
    UserID BIGINT NOT NULL, 
    OtherID SMALLINT NULL, 
    TimestampUtc DATETIMEOFFSET NOT NULL, 
    CONSTRAINT [UQ_MySchemaRecords_UserIdRecordTypeOtherId] UNIQUE CLUSTERED ( 
        [UserID], [RecordType], [OtherID] 
    ), 
    CONSTRAINT [PK_MySchemaRecords_Id] PRIMARY KEY NONCLUSTERED ( 
        [Id] ASC 
    ) 
) 
Repository Code
Here is the code for the Upsert method which is causing the Exception:
using System;
using System.Data;
using System.Data.SqlClient;
using System.Linq;
using Dapper;
namespace MyProject.DataAccess
{
    public class MyRepo
    {
        public void Upsert(MyRecord record)
        {
            var dbConnectionString = "MyDbConnectionString";
            using (var connection = new SqlConnection(dbConnectionString))
            {
                connection.Open();
                using (var transaction = connection.BeginTransaction(IsolationLevel.ReadCommitted))
                {
                    try
                    {
                        var existingRecord = FindByByUniqueKey(transaction, record.RecordType, record.UserID, record.OtherID);
                        if (existingRecord == null)
                        {
                            const string sql = @"INSERT INTO [MySchema].[Records] 
                                                 ([UserID], [RecordType], [OtherID], [TimestampUtc]) 
                                                 VALUES (@UserID, @RecordType, @OtherID, @TimestampUtc) 
                                                 SELECT CAST(SCOPE_IDENTITY() AS BIGINT";
                            var results = transaction.Connection.Query<long>(sql, record, transaction);
                            record.Id = results.Single();
                        }
                        else if (existingRecord.TimestampUtc <= record.TimestampUtc)
                        {
                            // UPDATE
                        }
                        transaction.Commit();
                    }
                    catch (Exception e)
                    {
                        transaction.Rollback();
                        throw e;
                    }
                }
            }
        }
        // all read-only methods use explicit transactions with IsolationLevel.ReadUncommitted
        private static MyRecord FindByByUniqueKey(SqlTransaction transaction, RecordType recordType, long userID, short? otherID)
        {
            const string sql = @"SELECT * from [MySchema].[Records] 
                                 WHERE [UserID] = @UserID
                                 AND [RecordType] = @RecordType
                                 AND [OtherID] = @OtherID";
            var paramz = new {
                UserID = userID,
                RecordType = recordType,
                OtherID = otherID
            };
            var results = transaction.Connection.Query<MyRecord>(sql, paramz, transaction);
            return results.SingleOrDefault();
        }
    }
    public class MyRecord
    {
        public long ID { get; set; }
        public RecordType RecordType { get; set; }
        public long UserID { get; set; }
        public short? OtherID { get; set; }
        public DateTimeOffset TimestampUtc { get; set; }
    }
    public enum RecordType : byte
    {
        TypeOne = 1,
        TypeTwo = 2,
        TypeThree = 3
    }
}
The Problem
When the server is under heavy enough load, I am seeing many of these Exceptions occurring:
Violation of UNIQUE KEY constraint 'UQ_MySchemaRecords_UserIdRecordTypeOtherId'. Cannot insert duplicate key in object 'MySchema.Records'. The duplicate key value is (1234567890, 1, 123). The statement has been terminated.
This Exception occurs often, as many as 10 times in a minute.
What I have tried
IsolationLevel to Serializable. The Exception occured much less often but still occured. Also, the performance of the code suffered greatly; the system could only handle 2K requests a second. I suspect that this decrease in throughput was actually the cause of the reduced Exceptions so I concluded that this didn't solve my problem.UPDLOCK Table Hint but I don't fully understand how it cooperates with isolation levels or how to apply it to my code. It does seem like it might be the best solution though, from my current understanding.SELECT statement (for existing records) to be part of the INSERT statement, like shown here but this attempt still had the same problem.Upsert method by using the SQL MERGE statement but this also suffered from the same problem.My Question(s)
UNIQUE key constraint collisions?UPDLOCK table hint (or any other table hint for that matter), how would I add that to my code? Would I add it to the INSERT? The SELECT? Both?If you use an ORM, such as NHibernate or Entity Framework, then it’s going to be ISession.Dispose () (or ISession.Flush ()) and DbContext.SaveChanges () respectively. Create your own wrapper on top of these methods and convert exceptions about uniqueness constraint violations into Result instances.
Unique constraints are a type of application invariants (conditions that must be held true at all times). There are two types of unique constraints: aggregate-wide and application-wide.
On the other hand, a requirement for each customer to have a unique email address would be an example of an application-wide constraint.
Of course, if you use a relational database, then you can protect your data integrity by introducing a unique index for the Customer.Email column that would reject one of the updates.
Make the validating read take a lock:
FROM SomeTable WITH (UPDLOCK, ROWLOCK, HOLDLOCK)
This serializes accesses on a single key, allowing for concurrency on all others.
HOLDLOCK ( = SERIALIZABLE) protects a range of values. This ensures a row that doesn't exist continues to not exist so the INSERT succeeds.
UPDLOCK ensures any existing row is not changed or deleted by another concurrent transaction so the UPDATE succeeds.
ROWLOCK encourages the engine to take a row-level lock.
These changes may increase the chances of a deadlock.
It may be faster to permit and suppress the errors in your scenario than to attempt to eliminate them. If you're consolidating multiple sources synchronously with overlapping data you will need to create a bottleneck somewhere to manage the race condition.
You could create a singleton manager class that held the unique constraints of the records in a hashset so you would automatically drop duplicates when they're added to the set. Records get added prior to submitting to the DB and removed upon statement completion. That way either the hashset eats the duplicate or the existing record check you do at the top of your try detects the committed duplicate record.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With