This discussion is archived
5 Replies Latest reply: Aug 24, 2012 9:29 PM by EJP RSS

Thread safety with ConcurrentHashMap

801863 Newbie
Currently Being Moderated
Hi all,
I have the following class. I use ConcurrentHashMap. I have many threds writing to the maps and a Timer that saves the data in the map every 5 minutes.
I manage to achieve thread safety by using putIfAbsent() when I write entries in the map. However, when I read from it and then remove all entries by clear() method, I want no other thread writes to map while I’m in the process of reading the map contents and then removing them, in order to prevent data loss so I lock the whole map by: synchronized(lock){}.

I was wondering is there any other way to achieve thread safety w/o enforcing synchronizing by an external lock? B/c it would have performance issues.

Any help is greatly appreciated.
public class LoggingHandler {
     
     private static LoggingHandler instance;
     private static Log log = LogFactory.getLog (LoggingHandler.class);
     private static final String MSN = "msn";
     private static final String GOOGLE = "google";     
     private static long delay = 5 * 60 * 1000;
          
     private ConcurrentMap<String, APICallEvent> msnCalls = new ConcurrentHashMap<String, APICallEvent>();
     private ConcurrentMap<String, APICallEvent> googleCalls = new ConcurrentHashMap<String, APICallEvent>();
     
     private Timer timer;
     
     private final Object lock = new Object();

     private LoggingHandler(){
          timer = new Timer();
                    
          timer.schedule(new TimerTask() {
               public void run() {
                    try {
                         saveLogEntries();
                    } catch (Throwable t) {
                         timer.cancel();
                         timer.purge();
                    }
               }
          //}, delay, delay);
          }, 0, delay);
     }
       
     public static synchronized LoggingHandler getLoggerInstance(){       
         if (instance == null){
              instance = new LoggingHandler();
         }
         return instance;
      }
     
public void logAPICall(String engine, String service, String method, int batchSize, boolean error, 
               String errorMsg, long timeSpent, long googleResponseTime){          
          ConcurrentMap<String, APICallEvent> map;
          String key = "";          
                    
          if (MSN.equalsIgnoreCase(engine)){
               map = msnCalls;
          }else if(GOOGLE.equalsIgnoreCase(engine)){  
               map = googleCalls;
          }else{
               return;
          }          
          
          
          key = service + "." + method;
// It would be the code if I use a regular HashMap instead of ConcurrentHashMap
          /*APICallEvent event = map.get(key);          
          
          // Map does not contain this service.method, create a CallEvent for the first time.
          if(event == null){
               event = new APICallEvent(service, method, batchSize, error, errorMsg, timeSpent, googleResponseTime);               
               map.put(key, event);
               
               // Map already contains this key, just adjust the numbers.
          }else{
               event.setTimeSpent(event.getTimeSpent()+timeSpent);
               event.setGoogleResponseTime(event.getGoogleResponseTime()+googleResponseTime);
               if(error){
                    event.setFailedCount(event.getFailedCount()+1);
                    event.setFailedBatchSize(event.getFailedBatchSize()+batchSize);
                    event.setFailedReason(errorMsg);                    
               }else{
                    event.setSuccessCount(event.getSuccessCount()+1);
                    event.setSuccessBatchSize(event.getSuccessBatchSize()+batchSize);
               }
          }*/
          //}
                    
          // Make it thread-safe using CHM
          APICallEvent newEvent = new APICallEvent(service, method, batchSize, error, errorMsg, timeSpent, googleResponseTime);
          APICallEvent existingEvent= map.putIfAbsent(key, newEvent); 
          
          if(existingEvent!=null && existingEvent!=newEvent){
               existingEvent.setTimeSpent(existingEvent.getTimeSpent()+timeSpent);
               existingEvent.setGoogleResponseTime(existingEvent.getGoogleResponseTime()+googleResponseTime);
               if(error){
                    existingEvent.setFailedCount(existingEvent.getFailedCount()+1);
                    existingEvent.setFailedBatchSize(existingEvent.getFailedBatchSize()+batchSize);
                    existingEvent.setFailedReason(errorMsg);                    
               }else{
                    existingEvent.setSuccessCount(existingEvent.getSuccessCount()+1);
                    existingEvent.setSuccessBatchSize(existingEvent.getSuccessBatchSize()+batchSize);
               }
          }          
     }
     
/* From Java docs: For aggregate operations such as putAll and clear, concurrent reads may reflect insertion or removal of only some 
 * entries.
 * Iterators and Enumerations return elements reflecting the state of the hash table at some point at or since the 
 * creation of the iterator/enumeration. They do not throw ConcurrentModificationException. However, iterators are 
 * designed to be used by only one thread at a time.
 * */     
     
     private void saveLogEntries(){
          IDatastore ds = null;
          Map<String, List<APICallEvent>> engineApiCalls = null;
          try {
               ds = Utils.getDatastore();
               
               engineApiCalls = new HashMap<String, List<APICallEvent>>();
               List<APICallEvent> events = null;
               
// How can I get rid of this lock and still have a thread safe code?
               synchronized(lock){
                    if(!msnCalls.isEmpty()){
                         events = new ArrayList<APICallEvent>();
                         events.addAll(msnCalls.values());
                         engineApiCalls.put(MSN, events);
                         msnCalls.clear();
                    }
                    if(!googleCalls.isEmpty()){
                         events = new ArrayList<APICallEvent>();
                         events.addAll(googleCalls.values());
                         engineApiCalls.put(GOOGLE, events);
                         googleCalls.clear();
                    }
               }
                    
// logAPICalls() saves the API call events in the DB.               
               ds.logAPICalls(engineApiCalls);
          } catch (Throwable t) {
               log.error(t.getMessage(), t);
          } finally {               
               if(engineApiCalls!=null){
                    engineApiCalls.clear();
               }
               Utils.closeQuietly(ds);
               ds=null;
          }     
     }          
}
  • 1. Re: Thread safety with ConcurrentHashMap
    801863 Newbie
    Currently Being Moderated
    I just noticed that my code is not threadsafe even with synchronized(lock){}, b/c the thread that owns the lock in saveLogEntries(), is not necessarily the same thread that writes into my maps in logAPICall()! Unless I lock the whole code in logAPICall() with the same lock object!

    Is there anyway, I can achieve thread safety w/o the lock object?
  • 2. Re: Thread safety with ConcurrentHashMap
    jtahlborn Expert
    Currently Being Moderated
    you could use a ReadWriteLock. the method which adds log entries would acquire a read lock and the method that saves them all would acquire a write lock. you would still need to use a ConcurrentMap because your "readers" are still modifying the map. but the ReadWriteLock gives you the ability to let some calls proceed in parallel while others are exclusive.

    another alternative which involves no extr synchronization (but is a bit more complex) would be to use ConcurrentMap.replace() method and never modify the values currently in the map. basically, whenever you want to update a value, you get the current value, create a new value which combines the old and new values, and use the ConcurrentMap.replace(key,old,new) method to atomically update the map (handling the failures appropriately). then your "save" call could just move through the map and remove each value as you come to it.
  • 3. Re: Thread safety with ConcurrentHashMap
    801863 Newbie
    Currently Being Moderated
    Thanks for the reply. The Java docs says:

    "A ReadWriteLock maintains a pair of associated locks, one for read-only operations and one for writing. The read lock may be held simultaneously by multiple reader threads, so long as there are no writers. The write lock is exclusive."
    the method which adds log entries would acquire a read lock and the method that saves them all would acquire a write lock.
    Shoudn't that be the opposite? The method that adds log entries is "writing" to the map, so it should acquire the write lock; the method that saves them is "reading" from map, so it should acquire the read lock?
    another alternative which involves no extr synchronization (but is a bit more complex) would be to use ConcurrentMap.replace() method and never modify the values currently in the map. basically, whenever you want to update a value, you get the current value, create a new value which combines the old and new values, and use the ConcurrentMap.replace(key,old,new) method to atomically update the map (handling the failures appropriately). then your "save" call could just move through the map and remove each value as you come to it.
    Obviously, I prefer not to use any lock at all, but, sorry, I'm confused over what you mentioned; would you please explain a little more or if its not too much trouble provide a pseudo code? Thanks a lot.
  • 4. Re: Thread safety with ConcurrentHashMap
    jtahlborn Expert
    Currently Being Moderated
    ronitt wrote:
    Thanks for the reply. The Java docs says:

    "A ReadWriteLock maintains a pair of associated locks, one for read-only operations and one for writing. The read lock may be held simultaneously by multiple reader threads, so long as there are no writers. The write lock is exclusive."
    the method which adds log entries would acquire a read lock and the method that saves them all would acquire a write lock.
    Shoudn't that be the opposite? The method that adds log entries is "writing" to the map, so it should acquire the write lock; the method that saves them is "reading" from map, so it should acquire the read lock?
    yes, i tried to explain (maybe a bit too briefly) that it's kind of confusing in your scenario. instead, think of it as "actions which can occur in parallel" (readers) and "an action which must occur by itself" (a writer). in your case, the logAPICall() method can be called by multiple threads in parallel (which is why you still need a ConcurrentMap), but your saveLogEntries() method needs to be called with exclusive access to the map.
    another alternative which involves no extr synchronization (but is a bit more complex) would be to use ConcurrentMap.replace() method and never modify the values currently in the map. basically, whenever you want to update a value, you get the current value, create a new value which combines the old and new values, and use the ConcurrentMap.replace(key,old,new) method to atomically update the map (handling the failures appropriately). then your "save" call could just move through the map and remove each value as you come to it.
    Obviously, I prefer not to use any lock at all, but, sorry, I'm confused over what you mentioned; would you please explain a little more or if its not too much trouble provide a pseudo code? Thanks a lot.
    i'll try to hash it out a bit, it would look something like this (the key point is that you never modify a value which is currently in the map ):
    public void logAPICall(...) {
    
      APICallEvent curEvent = map.get(key);
    
      // loop until we can atomically combine the new event data with any existing event data
      while(true) {
    
        if(curEvent == null) {
          // no current event, create new event and stick in map
          APICallEvent newEvent = ...;
          curEvent = map.putIfAbsent(key, newEvent);
          if(curEvent != null) {
            // map was modified in meantime, try again
            continue;
          }
        } else {
          // ... combine curEvent with new data into newEvent (curEvent is _not_ modified) ...
          APICallEvent newEvent = ...;
    
          if(!map.replace(key, curEvent, newEvent) {
            // map was modified in the meantime, try again
            curEvent = map.get(key);
            continue;
          }
        }
    
        // success!
        break;
      }
    }
    
    public void saveLogEntries() {
      List<APICallEvent> events = new <APICallEvent>>();
      for(String key : map.keySet()) {
        APICallEvent value = map.remove(key);
        if(value != null) {
          events.add(value);
        }
      }
    }
    Edited by: jtahlborn on Aug 24, 2012 9:53 PM
  • 5. Re: Thread safety with ConcurrentHashMap
    EJP Guru
    Currently Being Moderated
    Apparent crosspost.

Legend

  • Correct Answers - 10 points
  • Helpful Answers - 5 points