This discussion is archived
2 Replies Latest reply: Dec 11, 2012 2:29 AM by 979206 RSS

Using the distributed System.Transaction in several threads

900511 Newbie
Currently Being Moderated
I'm using Oracle DB 11.2.0.1 and ODP.NET 2.112.2.0.
I try to use the distributed transaction from System.Transaction namespace in several threads asynchronously. Here is the test console application. It creates new TransactionScope, creates 2 connections and opens them. At this time the transaction becomes distributed, because there are 2 opened connection in it. Then 2 INSERT commands are executed simultaneously in the same transaction (via Parallel.ForEach):

     class Program
     {
          private static string ConnectionString = "MyConnectionString";

          private static void Main(string[] args)
          {
               cleanup();
               for (int i = 0; i < 100; i++)
               {
                    try
                    {
                         init();
                         run();
                         check();

                         Console.WriteLine("ITERATION {0} IS OK", i);
                    }
                    catch (Exception ex)
                    {
                         Console.WriteLine("ERROR AT ITERATION {0}: {1}", i, ex);
                         //break;
                    }
                    finally
                    {
                         cleanup();
                    }
               }
          }

          // main method
          private static void run()
          {
               using (var scope = new TransactionScope())
               {
                    using (var connection1 = new OracleConnection(ConnectionString))
                    using (var connection2 = new OracleConnection(ConnectionString))
                    {
                         connection1.Open();
                         connection2.Open();

                         Transaction tx = Transaction.Current;
                         if (tx.TransactionInformation.DistributedIdentifier == Guid.Empty)
                              throw new InvalidOperationException("tx.TransactionInformation.DistributedIdentifier == Guid.Empty");

                         var queries = new ConcurrentDictionary<OracleConnection, string>();
                         queries[connection1] = "INSERT INTO T1 VALUES ('AAA')";
                         queries[connection2] = "INSERT INTO T2 VALUES ('BBB')";

                         Parallel.ForEach(
                              queries,
                              pair =>
                              {
                                   using (var innerScope = new TransactionScope(tx))
                                   {
                                        OracleCommand cmd = pair.Key.CreateCommand();
                                        cmd.CommandText = pair.Value;
                                        cmd.ExecuteNonQuery();
                                             
                                        innerScope.Complete();
                                   }
                              });

                    }

                    scope.Complete();
               }
          }

          // check results
          private static void check()
          {
               using (var connection = new OracleConnection(ConnectionString))
               {
                    connection.Open();
                    OracleCommand cmd = connection.CreateCommand();

                    cmd.CommandText = "SELECT COUNT(1) FROM T1";
                    int n1 = Convert.ToInt32(cmd.ExecuteScalar());
                    if (n1 != 1)
                         throw new Exception("COUNT(T1) != 1");

                    cmd.CommandText = "SELECT COUNT(1) FROM T2";
                    int n2 = Convert.ToInt32(cmd.ExecuteScalar());
                    if (n2 != 1)
                         throw new Exception("COUNT(T2) != 1");
               }
          }

          // initialization          
          private static void init()
          {
               executeCommands(
                    "CREATE TABLE T1 ( V1 VARCHAR2(100) )",
                    "CREATE TABLE T2 ( V2 VARCHAR2(100) )"
                    );
          }

          // cleaning up
          private static void cleanup()
          {
               try
               {
                    executeCommands(
                         "DROP TABLE T1",
                         "DROP TABLE T2"
                         );
               }
               catch
               {
               }
          }

          // helper method executing some commands
          private static void executeCommands(params string[] queries)
          {
               using (var connection = new OracleConnection(ConnectionString))
               {
                    connection.Open();

                    OracleCommand cmd = connection.CreateCommand();
                    foreach (string query in queries)
                    {
                         cmd.CommandText = query;
                         cmd.ExecuteNonQuery();
                    }
               }
          }
     }
-----
It works fine mostly. But sometimes an exception is thrown at check method ("COUNT(T1) != 1" or "COUNT(T2) != 1"). So one of commands isn't enlisted into the transaction. And when I change commands to insert values into the same table (and also change check method), a deadlock sometimes occurs.
Where am I doing wrong?

P.S. There are no errors on Oracle 9.

Legend

  • Correct Answers - 10 points
  • Helpful Answers - 5 points