5 Replies Latest reply: May 15, 2012 8:13 AM by BretCalvey RSS

    CQCs + ThreadGroups


      Not sure if this can be answered, but I'll try anyway...

      We have an application running in a Tomcat container that connects to Coherence ( via Extend proxies.

      This application sets up some CQCs.

      Because the criteria changes with time, there is a timer thread that will set up new CQCs and call "release" on the old ones. For example...

      Pseudo code
      Find "widgits" that have an expiry time within the next 24 hours
      Let newCQC = new CQC for those "widgits"
      Set up new MapListener on "newCQC"
      Let oldCQC = currentCQC
      Let currentCQC = newCQC
      Remove map listener from "oldCQC"
      Call "release" on "oldCQC" (if not null)
      Wait 10 seconds
      We can run this for days without any problem. Then for reasons unknown, we find that our heap fills up and we get Out Of Memory exceptions.

      When we do a heap dump, we see that hundreds of thousands of ThreadGroup objects have been created. All of these relate to our CQCs. We can tell by looking at the thread names. For example...

      ("EventQueue:ContinuousQueryCache{Cache=WIDGIT, Filter=...}")

      I have seen this happen before, but that was due to a coding error (I was calling "dispose" instead of "release" on the CQC - Oops!!) - this has now been fixed.

      Has anyone experienced anything similar?

      Thanks in advance


      I'm sure I'm not doing anything out of the ordinary - here are some code snippets...

      class IREventCache {
             public IREventCache(Set<Long> eventIds) {
                  this.eventIds = eventIds;
           private void setupContinuousQueries() {
                Filter eventIdFilter = new InFilter(new PofExtractor(Long.class, CacheableItemSerializer.ID_FIELD), eventIds);
                Filter parentEventFilter = new InFilter(new PofExtractor(Long.class, CacheableItemSerializer.PARENT_ID_FIELD), eventIds);
                gameEventCQC = createCQC(gameEventCache, eventIdFilter);
                rankEventCQC = createCQC(rankEventCache, eventIdFilter);
                clockCQC = createCQC(clockCache, eventIdFilter);
                eventTotalCQC = createCQC(eventTotalCache, parentEventFilter);
                marketCQC = createCQC(marketCache, parentEventFilter);
                outcomeSummaryCQC = createCQC(outcomeSummaryCache, new InFilter(new PofExtractor(Long.class, OutcomeSummary.EVENT_ID_FIELD), eventIds));
           private ContinuousQueryCache createCQC(NamedCache cache, Filter filter) {
                ContinuousQueryCache cqc = facade.createContinuousQuery(cache, filter);
                return cqc;
      class IREventCache {
           public void dispose() {
           private void cleanupCQC(ContinuousQueryCache cqc) {
                try {
                } catch (Exception ex) {
                     log.error("Exception cleaning up Continuous Query", ex);
      class AnotherClass {
           private IREventCache currentCache;
           public void init() {
                Set<Long> eventIds = goAndFindEventIDsBasedOnTime(); // just runs a filter + gets keys
                this.currentCache = new IREventCache(eventIds); // setupCQCs will get called here...
           // Called by timer
           public void refresh() {
                Set<Long> eventIds = goAndFindEventIDsBasedOnTime();  // just runs a filter + gets keys
                IREventCache newCache = new IREventCache(eventIds); // setupCQCs will get called here...
                IREventCache oldCache = this.currentCache;
                this.currentCache = newCache;
        • 1. Re: CQCs + ThreadGroups
          Probably should mention that all this line of code does...
          x = facade.createContinuousQuery(cache, filter);
          public ContinuousQueryCache createContinuousQuery(NamedCache cache, Filter filter) {
               return new ContinuousQueryCache(cachem, filter);
          (Basically this "facade" class helps us with unit testing...)
          • 2. Re: CQCs + ThreadGroups
            Just found out that this is to do with Logging - all these thread groups are referenced by a "Coherence$Logger"...

            Must have something misconfigured there...
            • 3. Found the problem...

              We've managed to find out why we are leaking memory and it is because CQCs and MapListeners.

              The way we found out what was going on was by debugging our process and putting a breakpoint in the constructor for ThreadGroup...
                  private ThreadGroup(Void unused, ThreadGroup parent, String name) {
                   this.name = name;                                 // *BREAKPOINT*
                   this.maxPriority = parent.maxPriority;
                   this.daemon = parent.daemon;
                   this.vmAllowSuspension = parent.vmAllowSuspension;
                   this.parent = parent;
              When you attach a map listener to a new CQC, you will hit this breakpoint and you will see a stack trace like this...
              Thread [pool-1-thread-1] (Suspended (breakpoint at line 104 in ThreadGroup))     
                   ThreadGroup.<init>(Void, ThreadGroup, String) line: 104     
                   ThreadGroup.<init>(ThreadGroup, String) line: 100     
                   ThreadGroup.<init>(String) line: 79     
                   TaskDaemon(Daemon).<init>(String, int, boolean) line: 74     
                   TaskDaemon(Daemon).<init>(String) line: 47     
                   TaskDaemon.<init>(String) line: 38     
                   ContinuousQueryCache.instantiateEventQueue() line: 1818     
                   ContinuousQueryCache.ensureEventQueue() line: 1843     
                   ContinuousQueryCache.addMapListener(MapListener, Filter, boolean) line: 715     
                   <OUR CODE>
              The problem occurs in the "parent.add(this)" call (the last line of the above constructor)...
                  private final void add(ThreadGroup g){
                   synchronized (this) {
                       if (destroyed) {
                        throw new IllegalThreadStateException();
                       if (groups == null) {
                        groups = new ThreadGroup[4];
                       } else if (ngroups == groups.length) {
                        groups = Arrays.copyOf(groups, ngroups * 2);
                       groups[ngroups] = g;
                       // This is done last so it doesn't matter in case the
                       // thread is killed
              Each time you add a listener to a new CQC, the "groups" array may grow. As we are setting up new CQCs every ten seconds in our application. this just keeps growing and growing until we (eventually) run out of memory.

              You might think "Why do you need to set up a new CQC every ten seconds?" - this is because we need to find things based on time - but I'll state another example...

              We have an application that lists all of the "widgets" we currently have. When a user clicks to view a "widget", we show them the current "price" associated with it.

              We need to keep this information up to date so we use CQCs for this.

              Each time a trader clicks to view a different widget (or set of widget) we will call "release" on any existing CQCs they have and then set up new ones (with a new Filter criteria for the widgit(s) they want to view)

              Before we call release, we remove any MapListeners and after setting up new CQCs, we add MapListeners (which causes the above to happen).

              So this applicauion is also leaking memory in the same way. We hardly notice it as they typically close this application down when they have finished for the day and they do not click between widgits at a very high rate.

              There have been times in the past when a user has left the application running for a long time and it has crashed.

              Now we know why!!

              I suspect the post above regarding logging was a red-herring (the logger is probably trying to log an out of memory message or something - have not mamanged to reproduce this yet)


              I know the following example is a bit silly, but this will (eventually) fill up your heap...
              package leak;
              import com.tangosol.net.CacheFactory;
              import com.tangosol.net.NamedCache;
              import com.tangosol.net.cache.ContinuousQueryCache;
              import com.tangosol.util.MapEvent;
              import com.tangosol.util.MapListener;
              import com.tangosol.util.filter.AlwaysFilter;
              public class CQCLeak {
                   public static void main(String[] args) throws Exception {
                        NamedCache cache = CacheFactory.getCache("WIDGET");
                        MapListener mapListener = new MapListener() {
                             public void entryUpdated(MapEvent event) {
                             public void entryInserted(MapEvent event) {
                             public void entryDeleted(MapEvent event) {
                        while(true) {
                             ContinuousQueryCache cqc = new ContinuousQueryCache(cache, AlwaysFilter.INSTANCE);
              This does not happen if you move the CQC out of the loop - which would be an obvious fix, but imagine if the filter had to change every time...

              Edited by: Bret Calvey on 12-May-2012 09:15

              Edited by: Bret Calvey on 12-May-2012 09:29
              • 4. Re: CQCs + ThreadGroups
                It looks like a bug - COH-7232. CQC creates new daemon thread on which events for registered listeners will be processed, and it's implementation creates a new ThreadGroup and sets isDaemon flag to false to allow for worker thread restarts, which causes memory leak when daemon is stopped.
                • 5. Re: CQCs + ThreadGroups