[Coherence] Testing the Coherence Simple Partition Assignment Strategy

原文はこちら。
https://blogs.oracle.com/felcey/entry/testing_the_coherence_simple_assignment

Coherence 3.7.1で導入された単純なパーティション割当て方針(Simple Partition Assignment Strategy)を使うと、Coherenceが集中管理型のパーティショニング方針を使って、プライマリおよびバックアップデータのキャッシュ·データのパーティションを再バランスすることができます。以前は(現在のデフォルト設定でもありますが)、各クラスタ·メンバーは、キャッシュデータのパーティションを(単独で)自律的に分散する方法を決定していました。例えば、パーティションの総個数をクラスタメンバの個数で割って決めていました。
Oracle® Coherence Developer's Guide Release 3.7.1
Specifying the Simple Partition Assignment Strategy
http://docs.oracle.com/cd/E24290_01/coh.371/e22837/api_dataaffinity.htm#sthref131Oracle® Coherence開発者ガイド リリース3.7.1
単純なパーティション割当て方針の指定
http://docs.oracle.com/cd/E26853_01/coh.371/b65026/api_dataaffinity.htm#sthref131
集中管理型の方針を使うと 、 完全なクラスタトポロジを考慮に入れることができます。既定の自律型の方針のように、Simple Partition Assignment Strategyはキャッシュデータのパーティションをかなり分散しようとしますが、可能な限り、データのプライマリとバックアップコピーを別のサイト、ラック、マシンに配置することによって、データの安全性を確保しようともします。それゆえ、複数のサイトがある場合、プライマリとバックアップデータが異なるサイトに置かれます。クラスタが複数のラックをまたがるような場合、プライマリパーティションとバックアップパーティションが別のラックに置かれるので、完全なラックの故障の結果、データが損失することはありません。最後に、すべてのマシンが同じラックにあれば、プライマリパーティションとバックアップパーティションは、異なるマシン上に配置されます。
Simple Partition Assignment Strategyが、キャッシュデータパーティションをどこに配置するべきかを決定するため、クラスタ内の各メンバが完全なIDを指定する必要があります。つまり、キャッシュが載るマシンやラック、サイトの名前を、member-identity要素を使って指定する必要があります。
member-identity element
http://docs.oracle.com/cd/E24290_01/coh.371/e22837/appendix_operational.htm#BABCHJJCmember-identity要素
http://docs.oracle.com/cd/E26853_01/coh.371/b65026/appendix_operational.htm#BABCHJJC
これらは通常システムプロパティとして設定されています(以下のような感じ)が、クラスタオーバーライドファイルに配置することもできます。
-Dtangosol.coherence.cluster=ProdAppCluster
-Dtangosol.coherence.site=MainDataCentre
-Dtangosol.coherence.rack=Rack07
-Dtangosol.coherence.machine=Blade11
通常、 Simple Partition Assignment Strategyが機能していることをテストするには、相当のセットアップが必要ですが、 Jon Hallが作成した"littlegrid"と呼ばれる小規模なテストフレームワークを使うと、テスト全体を一つのJVMインスタンス上で実行でき、簡単なJUnitテストとして記述することができます。以下は新たに(分散サービスCustomDistributedCacheServiceへの)Simple Partition Assignment Strategyのクラスを導入するキャッシュの構成ファイルです、<partition-assignment-strategy>で括られた部分で割当て方針を指定しています。

  
    pof
  
  
    
      *
      CustomDistributedCacheScheme
    
  
  
    
      CustomDistributedCacheScheme
      CustomDistributedCacheService
      5
      
        
          com.tangosol.net.partition.SimpleAssignmentStrategy
          
        
      
      
        com.oracle.coherence.test.PartitionLostListener
      
      
         ">
        
      
      true
    
  

では、ここではJUnitテストのセットアップをします。"littlegrid"テストフレームワークを使ってラックの故障をシミュレーションしてみましょう。3個のラックは各々に2個のマシンがあり、各マシンに2個のノードを作成します。同時に、管理ノードとストレージを持たないテストクライアントを用意します。最後の2ノードは別のラック(デフォルトラック)に存在します。テストは以下の手順を実行します。
  1. あるデータをテストキャッシュに追加し、データを各パーティションに持つ
  2. (作成された3個から) ランダムにラックを選択し、当該ラックの全てのノードが同時にシャットダウンする
  3. 短い休止の後、 JMX MBean "Partition List"(下記参照)を全ての残りのストレージメンバについてチェックし、パーティションが失われていないことを確認する。
この最後のステップが可能なのは、Partition Event ListenerをCache構成ファイルにPartitioned Cache Service(CustomDistributedCacheService)として登録していためです。このPartition Event Listenerは自身をJMX MBeanとして公開するので、この類のイベントをチェックすることが可能です。

テストコードは以下のようなものです。

package com.oracle.coherence.test;

import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;

import java.lang.management.ManagementFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import javax.management.MBeanServerConnection;
import javax.management.ObjectName;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.littlegrid.ClusterMemberGroup;
import org.littlegrid.ClusterMemberGroupUtils;

import com.tangosol.net.CacheFactory;
import com.tangosol.net.Member;
import com.tangosol.net.NamedCache;
import com.tangosol.net.PartitionedService;

/**
 * Coherence simple assignment strategy tests.
 * 
 * @author Dave Felcey
 */
public class TestCase {
  private ClusterMemberGroup memberGroup;
  private NamedCache cache = null;
  private int[][] racks = null;

  @Before
  public void setUp() {
    // Create member group now, so later code simply merges into this group
    memberGroup = ClusterMemberGroupUtils.newBuilder()
        .buildAndConfigureForNoClient();

    final int numberOfRacks = 3;
    final int numberOfMachines = 2;
    final int numberOfStorageEnabledMembersPerMachine = 2;
    final int expectedClusterSize = (numberOfRacks
        * numberOfMachines * numberOfStorageEnabledMembersPerMachine);

    racks = new int[numberOfRacks][numberOfMachines
        * numberOfStorageEnabledMembersPerMachine];

    // Start up the storage enabled members on different racks and machines
    for (int rack = 0; rack < numberOfRacks; rack++) {
      for (int machine = 0; machine < numberOfMachines; machine++) {
        // Build using the identity parameters
        memberGroup.merge(ClusterMemberGroupUtils
            .newBuilder()
            .setFastStartJoinTimeoutMilliseconds(100)
            .setSiteName("site1")
            .setMachineName("r-" + rack + "-m-" + machine)
            .setRackName("r-" + rack)
            .setStorageEnabledCount(
                numberOfStorageEnabledMembersPerMachine)
            .buildAndConfigureForNoClient());
      }

      // Save member id's for rack
      System.arraycopy(memberGroup.getStartedMemberIds(),
          rack * numberOfMachines
              * numberOfStorageEnabledMembersPerMachine,
          racks[rack], 0, numberOfMachines
              * numberOfStorageEnabledMembersPerMachine);
    }

    // Create Management and client members with default rack and machine
    // identities
    memberGroup.merge(ClusterMemberGroupUtils.newBuilder()
        .setJmxMonitorCount(1).setLogLevel(9)
        .buildAndConfigureForStorageDisabledClient());

    assertThat(
        "Cluster size check - includes storage disabled client and JMX monitor",
        CacheFactory.ensureCluster().getMemberSet().size(),
        is(expectedClusterSize + 2));

    assertThat(
        "Member group check size is as expected - includes JMX monitor, but not storage disabled client",
        memberGroup.getStartedMemberIds().length,
        is(expectedClusterSize + 1));

    cache = CacheFactory.getCache("test");
  }

  /**
   * Demonstrate SimpleAssignementStrategy.
   */
  @Test
  public void testSimpleAssignmentStrategy()
      throws Exception {

    final Map entries = new HashMap();
    final int totalEntries = 1000;

    // Load test data
    for (int i = 0; i < totalEntries; i++) {
      entries.put(i, "entry " + i);
    }

    cache.putAll(entries);
    assertThat(cache.size(), is(totalEntries));

    // Kill rack - if partition lost then will exit
    Random random = new Random();
    int rack = Math.abs(random.nextInt() % racks.length);
    System.out.println("Stopping rack: " + rack);

    memberGroup.stopMember(racks[rack]);

    System.out
        .println("Pausing to allow data to be recovered");
    TimeUnit.SECONDS.sleep(memberGroup
        .getSuggestedSleepAfterStopDuration()
        * racks[rack].length * 10);

    assertThat("Partition lost",
        getPartitionLostEventCount(), is(0));
    assertThat("Cache size", cache.size(), is(totalEntries));
  }

  @After
  public void tearDown() {
    // Quick stop all - members *don't* leave the cluster politely - done for
    // this test so it shuts down quicker
    memberGroup.stopAll();

    ClusterMemberGroupUtils
        .shutdownCacheFactoryThenClusterMemberGroups(memberGroup);
  }

  /**
   * Get the number of partitions lost.
   * 
   * @return partitions lost
   */
  private int getPartitionLostEventCount() throws Exception {

    // Create an MBeanServerConnection
    final MBeanServerConnection connection = ManagementFactory
        .getPlatformMBeanServer();

    @SuppressWarnings("unchecked")
    final Set members = ((PartitionedService) cache
        .getCacheService()).getOwnershipEnabledMembers();
    final String serviceName = cache.getCacheService()
        .getInfo().getServiceName();

    int lostPartitions = 0;

    // Get any partition lost event information from cluster members
    for (Member member : members) {
      String path = "Coherence:type=PartitionListener,name=PartitionLostCount,service="
          + serviceName
          + ",id=PartitionLost,nodeId="
          + member.getId();

      lostPartitions += (Integer) connection.getAttribute(
          new ObjectName(path), "PartitionLostCount");
    }

    return lostPartitions;
  }
}
ラックが停止してからの停止時間を延ばすと、PartitionList MBean情報をJConsoleで見ることができます(下図)。

Andrew Wilsonのサンプルに基づくPartitionLostListernerは以下のようになります。
package com.oracle.coherence.test;

import java.util.concurrent.atomic.AtomicInteger;

import com.tangosol.net.CacheFactory;
import com.tangosol.net.management.Registry;
import com.tangosol.net.partition.PartitionEvent;
import com.tangosol.net.partition.PartitionListener;

/**
 * Partition lost listener
 */
public class PartitionLostListener implements
    PartitionListener, PartitionLostListenerMBean {
  private final AtomicInteger lostCount = new AtomicInteger();
  private String serviceName;

  public PartitionLostListener() {
  }

  @Override
  public synchronized void onPartitionEvent(
      PartitionEvent partitionEvent) {
    // Initialize if necessary
    if (serviceName == null) {
      serviceName = partitionEvent.getService().getInfo()
          .getServiceName();
      System.out.println("Registering JMX");
      Registry reg = CacheFactory.getCluster()
          .getManagement();
      reg.register(
          reg.ensureGlobalName("Coherence:type=PartitionListener,name=PartitionLostCount,service="
              + serviceName + ",id=PartitionLost"), this);
      System.out.println("Registered JMX");
    }

    // Handle the event
    if (partitionEvent.getId() == PartitionEvent.PARTITION_LOST) {
      System.out.println("Partition lost: "
          + partitionEvent);
      lostCount.addAndGet(partitionEvent.getPartitionSet()
          .cardinality());
    }
  }

  // Returns any partitions lost and resets
  public synchronized Integer getPartitionLostCount() {
    int temp = lostCount.get();
    lostCount.set(0);
    return temp;
  }
}
ご自身で試して見たいということでしたら、こちらから完全なサンプルフォームをダウンロードして、"littlegrid"テストツールをこちらからダウンロードして下さい。ご注意頂きたいのですが、適切にSimple Partition Strategyをテストするためには、最新のCoherenceを利用する必要があります(執筆時点で3.7.1.4がOracle Supportからダウンロードできます)。ただ、OTNからはこのパッチリリースをダウンロードできません。大規模クラスタでSimple Partition Assignment Strategyを利用する上でのもう一つのコツは、クラスタ立ち上げ時に分散クォーラム機能を使うことです。不要なネットワークトラフィックおよび処理を除去しながら、クラスタメンバシップの閾値が到達するまで、パーティションのバランスを取り戻しを抑止します。
nラックのクラスタの場合の閾値は、まず(n -1) * (ラックあたりのマシン個数 - 1)に設定しましょう。リストアクォーラムおよび読み取りクォーラムの機能は、"スプリットブレイン"シナリオの影響を緩和するために使用されることがあります。これが発生する場合、データを読み取り専用にして、クラスタが再構成できるようにすることで影響を緩和しようというものです。
Oracle® Coherence Developer's Guide Release 3.7.1
Using the Partitioned Cache Quorums
http://docs.oracle.com/cd/E24290_01/coh.371/e22837/cache_quorum.htm#BABEEBGG
Oracle® Coherence開発者ガイド リリース3.7.1
パーティション・キャッシュ・クォーラムの使用方法
http://docs.oracle.com/cd/E26853_01/coh.371/b65026/cache_quorum.htm#BABEEBGG

0 件のコメント:

コメントを投稿