0 Replies Latest reply on Sep 11, 2019 3:01 PM by 962259

    Streams for bulk writing and transform

    962259

      Coherence supports streams for bulk reading from caches - through AsyncNamedCache API - but there does not seem to be a stream API for bulk writing to a cache (did I miss it?).

       

      We are looking for similar support in Coherence for what we can see in a competitor product, Ignite code shown here:

       

      o open a stream (stmr) for writing to cache

      o you can keep a reference to the stream and re-use between calls (loop illustrated here)

      o the stream action can transform data if required (a simple increment here)

       

      try (IgniteCache<Integer, Long> stmCache = ignite.getOrCreateCache(cfg)) {


                      try (IgniteDataStreamer<Integer, Long> stmr = ignite.dataStreamer(stmCache.getName())) {
                          // Allow data updates.
                          stmr.allowOverwrite(true);

       

                          // Configure data transformation to count random numbers added to the stream.
                          stmr.receiver(StreamTransformer.from((e, arg) -> {
                              // Get current count.
                              Long val = e.getValue();

       

                              // Increment count by 1.
                              e.setValue(val == null ? 1L : val + 1);

       

                              return null;
                          }));

       

                          // Stream 10 million of random numbers into the streamer cache.
                          for (int i = 1; i <= 10_000_000; i++) {
                              stmr.addData(RAND.nextInt(RANGE), 1L);

       

                              if (i % 500_000 == 0)
                                  System.out.println("Number of tuples streamed into Ignite: " + i);
                          }
                      }

       

                      // Query top 10 most popular numbers every.
                      SqlFieldsQuery top10Qry = new SqlFieldsQuery("select _key, _val from Long order by _val desc limit 10");

       

                      // Execute queries.
                      List<List<?>> top10 = stmCache.query(top10Qry).getAll();

       

                      System.out.println("Top 10 most popular numbers:");

       

                      // Print top 10 words.
                      ExamplesUtils.printQueryResults(top10);
      }