In: Genel


Bölümlerin küme düğümlerine eşlenmesi gerekir. Eşlemenin de saklanması ve istemciler tarafından erişilebilir hale getirilmesi gerekir. Adanmış kullanmak yaygındır Tutarlı Çekirdek; bu ikisini de idare eder. Özel Tutarlı Çekirdek, kümedeki tüm düğümleri izleyen ve bölümleri düğümlere eşleyen bir koordinatör görevi görür. Ayrıca, eşlemeyi, bir kullanarak hataya dayanıklı bir şekilde depolar.
Çoğaltılan Günlük. içindeki ana küme YugabaytDB
veya denetleyici uygulaması Kafka’daki her ikisi de bunun iyi örnekleridir.

gibi eşler arası sistemler Akça veya fındık
ayrıca koordinatör olarak hareket etmek için belirli bir küme düğümüne ihtiyaç duyar. Onlar kullanırlar Acil Lider koordinatör olarak.

gibi sistemler [kubernetes] jenerik kullan
Tutarlı Çekirdek beğenmek [etcd]. Tartışıldığı gibi koordinatör rolünü oynamak için küme düğümlerinden birini seçmeleri gerekir. burada.

Küme Üyeliğini İzleme

Her küme düğümü, kendisini tutarlı çekirdek ile kaydeder. Ayrıca periyodik olarak bir kalp atışı Tutarlı Çekirdeğin düğüm hatalarını algılamasına izin vermek için.

sınıf KVStore…

  public void start() {
      socketListener.start();
      requestHandler.start();
      network.sendAndReceive(coordLeader, new RegisterClusterNodeRequest(generateMessageId(), listenAddress));
      scheduler.scheduleAtFixedRate(()->{
          network.send(coordLeader, new HeartbeatMessage(generateMessageId(), listenAddress));
      }, 200, 200, TimeUnit.MILLISECONDS);

  }

Koordinatör kaydı yönetir ve ardından üye bilgilerini saklar.

sınıf Küme Koordinatörü…

  ReplicatedLog replicatedLog;
  Membership membership = new Membership();
  TimeoutBasedFailureDetector failureDetector = new TimeoutBasedFailureDetector(Duration.ofMillis(TIMEOUT_MILLIS));

  private void handleRegisterClusterNodeRequest(Message message) {
      logger.info("Registering node " + message.from);
      CompletableFuture completableFuture = registerClusterNode(message.from);
      completableFuture.whenComplete((response, error) -> {
          logger.info("Sending register response to node " + message.from);
          network.send(message.from, new RegisterClusterNodeResponse(message.messageId, listenAddress));
      });
  }

  public CompletableFuture registerClusterNode(InetAddressAndPort address) {
      return replicatedLog.propose(new RegisterClusterNodeCommand(address));
  }

Kayıt işlemi yapıldığında Çoğaltılan Günlüküyelik güncellenecektir.

sınıf Küme Koordinatörü…

  private void applyRegisterClusterNodeEntry(RegisterClusterNodeCommand command) {
      updateMembership(command.memberAddress);
  }

sınıf Küme Koordinatörü…

  private void updateMembership(InetAddressAndPort address) {
      membership = membership.addNewMember(address);
      failureDetector.heartBeatReceived(address);
  }

Koordinatör, kümenin parçası olan tüm düğümlerin bir listesini tutar:

sınıf üyeliği…

  public class Membership {
      List<Member> liveMembers = new ArrayList<>();
      List<Member> failedMembers = new ArrayList<>();
  
      public boolean isFailed(InetAddressAndPort address) {
          return failedMembers.stream().anyMatch(m -> m.address.equals(address));
      }

sınıf üyesi…

  public class Member implements Comparable<Member> {
      InetAddressAndPort address;
      MemberStatus status;

Koordinatör, aşağıdakine benzer bir mekanizma kullanarak küme düğümü hatalarını tespit edecektir.
Kiralama. Bir küme düğümü sinyal göndermeyi durdurursa, düğüm başarısız olarak işaretlenir.

sınıf Küme Koordinatörü…

  @Override
  public void onBecomingLeader() {
      scheduledTask = executor.scheduleWithFixedDelay(this::checkMembership,
              1000,
              1000,
              TimeUnit.MILLISECONDS);
      failureDetector.start();
  }

  private void checkMembership() {
      List<Member> failedMembers = getFailedMembers();
      if (!failedMembers.isEmpty()) {
          replicatedLog.propose(new MemberFailedCommand(failedMembers));
      }
  }

  private List<Member> getFailedMembers() {
      List<Member> liveMembers = membership.getLiveMembers();
      return liveMembers.stream()
              .filter(m -> failureDetector.isMonitoring(m.getAddress()) && !failureDetector.isAlive(m.getAddress()))
              .collect(Collectors.toList());

  }
Örnek bir senaryo

Atina, Bizans ve Kirene olmak üzere üç veri sunucusu olduğunu düşünün. 9 bölüm olduğu düşünüldüğünde akış aşağıdaki gibi görünmektedir.

İstemci daha sonra belirli bir küme düğümüne belirli bir anahtarı eşlemek için bölüm tablosunu kullanabilir.

Şimdi kümeye yeni bir küme düğümü – ‘efes’ – eklendi. Yönetici bir yeniden atamayı tetikler ve koordinatör, bölüm tablosunu kontrol ederek hangi düğümlerin yetersiz yüklendiğini kontrol eder. Efes’in yetersiz yüklenen düğüm olduğunu anlar ve ona bölüm 7’yi ayırmaya karar verir ve onu Atina’dan hareket ettirir. Koordinatör, göçleri saklar ve ardından 7. bölümü efes’e taşıma talebini Atina’ya gönderir. Geçiş tamamlandığında, Atina koordinatöre haber verir. Koordinatör daha sonra bölüm tablosunu günceller.

Küme Düğümlerine Bölüm Atama

Koordinatör, zamanın o noktasında bilinen küme düğümlerine bölümler atar. Her yeni küme düğümü eklendiğinde tetiklenirse, küme kararlı bir duruma ulaşana kadar bölümleri çok erken eşleyebilir. Bu nedenle koordinatör, küme minimum boyuta ulaşana kadar bekleyecek şekilde yapılandırılmalıdır.

Bölme ataması ilk kez yapıldığında, yalnızca bir kez deneme şeklinde yapılabilir.

sınıf Küme Koordinatörü…

  CompletableFuture assignPartitionsToClusterNodes() {
      if (!minimumClusterSizeReached()) {
          return CompletableFuture.failedFuture(new NotEnoughClusterNodesException(MINIMUM_CLUSTER_SIZE));
      }
      return initializePartitionAssignment();
  }

  private boolean minimumClusterSizeReached() {
      return membership.getLiveMembers().size() >= MINIMUM_CLUSTER_SIZE;
  }
  private CompletableFuture initializePartitionAssignment() {
      partitionAssignmentStatus = PartitionAssignmentStatus.IN_PROGRESS;
      PartitionTable partitionTable = arrangePartitions();
      return replicatedLog.propose(new PartitiontableCommand(partitionTable));
  }

  public PartitionTable arrangePartitions() {
      PartitionTable partitionTable = new PartitionTable();
      List<Member> liveMembers = membership.getLiveMembers();
      for (int partitionId = 1; partitionId <= noOfPartitions; partitionId++) {
          int index = partitionId % liveMembers.size();
          Member member = liveMembers.get(index);
          partitionTable.addPartition(partitionId, new PartitionInfo(partitionId, member.getAddress(), PartitionStatus.ASSIGNED));
      }
      return partitionTable;
  }

Çoğaltma günlüğü, bölüm tablosunu kalıcı hale getirir.

sınıf Küme Koordinatörü…

  PartitionTable partitionTable;
  PartitionAssignmentStatus partitionAssignmentStatus = PartitionAssignmentStatus.UNASSIGNED;

  private void applyPartitionTableCommand(PartitiontableCommand command) {
      this.partitionTable = command.partitionTable;
      partitionAssignmentStatus = PartitionAssignmentStatus.ASSIGNED;
      if (isLeader()) {
          sendMessagesToMembers(partitionTable);
      }
  }

Bölüm ataması kalıcı olduğunda, koordinatör her bir düğüme artık hangi bölümlere sahip olduğunu söylemek için tüm küme düğümlerine mesajlar gönderir.

sınıf Küme Koordinatörü…

  List<Integer> pendingPartitionAssignments = new ArrayList<>();

  private void sendMessagesToMembers(PartitionTable partitionTable) {
      Map<Integer, PartitionInfo> partitionsTobeHosted = partitionTable.getPartitionsTobeHosted();
      partitionsTobeHosted.forEach((partitionId, partitionInfo) -> {
          pendingPartitionAssignments.add(partitionId);
          HostPartitionMessage message = new HostPartitionMessage(requestNumber++, this.listenAddress, partitionId);
          logger.info("Sending host partition message to " + partitionInfo.hostedOn + " partitionId=" + partitionId);
          scheduler.execute(new RetryableTask(partitionInfo.hostedOn, network, this, partitionId, message));
      });
  }

Denetleyici, mesajı başarılı olana kadar sürekli olarak düğümlere ulaşmaya çalışmaya devam edecektir.

sınıf RetryableTask…

  static class RetryableTask implements Runnable {
      Logger logger = LogManager.getLogger(RetryableTask.class);
      InetAddressAndPort address;
      Network network;
      ClusterCoordinator coordinator;
      Integer partitionId;
      int attempt;
      private Message message;

      public RetryableTask(InetAddressAndPort address, Network network, ClusterCoordinator coordinator, Integer partitionId, Message message) {
          this.address = address;
          this.network = network;
          this.coordinator = coordinator;
          this.partitionId = partitionId;
          this.message = message;
      }

      @Override
      public void run() {
          attempt++;
          try {
              //stop trying if the node is failed.
              if (coordinator.isSuspected(address)) {
                  return;
              }
              logger.info("Sending " + message + " to=" + address);
              network.send(address, message);
          } catch (Exception e) {
              logger.error("Error trying to send ");
              scheduleWithBackOff();
          }
      }

      private void scheduleWithBackOff() {
          scheduler.schedule(this, getBackOffDelay(attempt), TimeUnit.MILLISECONDS);
      }


      private long getBackOffDelay(int attempt) {
          long baseDelay = (long) Math.pow(2, attempt);
          long jitter = randomJitter();
          return baseDelay + jitter;
      }

      private long randomJitter() {
          int i = new Random(1).nextInt();
          i = i < 0 ? i * -1 : i;
          long jitter = i % 50;
          return jitter;
      }

  }

Küme düğümü, bölüm oluşturma isteğini aldığında, verilen bölüm kimliğiyle bir tane oluşturur. Bunun basit bir anahtar/değer deposunda gerçekleştiğini hayal edersek, uygulaması şuna benzer:

sınıf KVStore…

  Map<Integer, Partition> allPartitions = new ConcurrentHashMap<>();
  private void handleHostPartitionMessage(Message message) {
      Integer partitionId = ((HostPartitionMessage) message).getPartitionId();
      addPartitions(partitionId);
      logger.info("Adding partition " + partitionId + " to " + listenAddress);
      network.send(message.from, new HostPartitionAcks(message.messageId, this.listenAddress, partitionId));
  }

  public void addPartitions(Integer partitionId) {
      allPartitions.put(partitionId, new Partition(partitionId));

  }

sınıf Bölme…

  SortedMap<String, String> kv = new TreeMap<>();
  private Integer partitionId;

Koordinatör, bölümün başarıyla oluşturulduğu mesajını aldığında, bunu çoğaltılan günlükte sürdürür ve bölüm durumunu çevrimiçi olacak şekilde günceller.

sınıf Küme Koordinatörü…

  private void handleHostPartitionAck(Message message) {
      int partitionId = ((HostPartitionAcks) message).getPartitionId();
      pendingPartitionAssignments.remove(Integer.valueOf(partitionId));
      logger.info("Received host partition ack from " + message.from + " partitionId=" + partitionId + " pending=" + pendingPartitionAssignments);
      CompletableFuture future = replicatedLog.propose(new UpdatePartitionStatusCommand(partitionId, PartitionStatus.ONLINE));
      future.join();
  }

Bir kere Yüksek Su İşareti ulaşılır ve kayıt uygulanır, bölümün durumu güncellenir.

sınıf Küme Koordinatörü…

  private void updateParitionStatus(UpdatePartitionStatusCommand command) {
      removePendingRequest(command.partitionId);
      logger.info("Changing status for " + command.partitionId + " to " + command.status);
      logger.info(partitionTable.toString());
      partitionTable.updateStatus(command.partitionId, command.status);
  }
İstemci Arayüzü

Basit bir anahtar ve değer deposu örneğini tekrar ele alırsak, bir müşterinin belirli bir anahtar için bir değer depolaması veya alması gerekiyorsa, bunu aşağıdaki adımları izleyerek yapabilir:

  • İstemci, hash işlevini anahtara uygular ve toplam bölüm sayısına göre ilgili bölümü bulur.
  • İstemci, bölüm tablosunu koordinatörden alır ve bölümü barındıran küme düğümünü bulur. İstemci ayrıca bölüm tablosunu periyodik olarak yeniler.

Koordinatörden bir bölüm tablosu getiren istemciler, özellikle tüm istekler tek bir koordinatör lideri tarafından karşılanıyorsa, hızlı bir şekilde darboğazlara yol açabilir. Bu nedenle meta verileri tüm küme düğümlerinde kullanılabilir durumda tutmak yaygın bir uygulamadır. Koordinatör, meta verileri küme düğümlerine iletebilir veya küme düğümleri, bunları koordinatörden alabilir. İstemciler daha sonra meta verileri yenilemek için herhangi bir küme düğümüne bağlanabilir.

Bu genellikle, anahtar değer deposu tarafından sağlanan istemci kitaplığı içinde veya istemci istek işleme (küme düğümlerinde gerçekleşir) tarafından uygulanır.

sınıf Müşteri…

  public void put(String key, String value) throws IOException {
      Integer partitionId = findPartition(key, noOfPartitions);
      InetAddressAndPort nodeAddress = getNodeAddressFor(partitionId);
      sendPutMessage(partitionId, nodeAddress, key, value);
  }

  private InetAddressAndPort getNodeAddressFor(Integer partitionId) {
      PartitionInfo partitionInfo = partitionTable.getPartition(partitionId);
      InetAddressAndPort nodeAddress = partitionInfo.getAddress();
      return nodeAddress;
  }

  private void sendPutMessage(Integer partitionId, InetAddressAndPort address, String key, String value) throws IOException {
      PartitionPutMessage partitionPutMessage = new PartitionPutMessage(partitionId, key, value);
      SocketClient socketClient = new SocketClient(address);
      socketClient.blockingSend(new RequestOrResponse(RequestId.PartitionPutKV.getId(),
                                                JsonSerDes.serialize(partitionPutMessage)));
  }
  public String get(String key) throws IOException {
      Integer partitionId = findPartition(key, noOfPartitions);
      InetAddressAndPort nodeAddress = getNodeAddressFor(partitionId);
      return sendGetMessage(partitionId, key, nodeAddress);
  }

  private String sendGetMessage(Integer partitionId, String key, InetAddressAndPort address) throws IOException {
      PartitionGetMessage partitionGetMessage = new PartitionGetMessage(partitionId, key);
      SocketClient socketClient = new SocketClient(address);
      RequestOrResponse response = socketClient.blockingSend(new RequestOrResponse(RequestId.PartitionGetKV.getId(), JsonSerDes.serialize(partitionGetMessage)));
      PartitionGetResponseMessage partitionGetResponseMessage = JsonSerDes.deserialize(response.getMessageBodyJson(), PartitionGetResponseMessage.class);
      return partitionGetResponseMessage.getValue();
  }
Bölümleri yeni eklenen üyelere taşıma

Bir kümeye yeni düğümler eklendiğinde, bazı bölümler diğer düğümlere taşınabilir. Bu, yeni bir küme düğümü eklendiğinde otomatik olarak yapılabilir. Ancak, küme düğümü boyunca çok fazla verinin taşınmasını içerebilir, bu nedenle bir yönetici genellikle yeniden bölümlemeyi tetikler. Bunu yapmanın basit bir yöntemi, her düğümün barındırması gereken ortalama bölüm sayısını hesaplamak ve ardından ek bölümleri yeni düğüme taşımaktır. Örneğin, bölüm sayısı 30 ise ve kümede mevcut üç düğüm varsa, her düğüm 10 bölüm barındırmalıdır. Yeni bir düğüm eklenirse, düğüm başına ortalama yaklaşık 7’dir. Koordinatör bu nedenle her küme düğümünden yenisine üç bölüm taşımaya çalışacaktır.

sınıf Küme Koordinatörü…

  List<Migration> pendingMigrations = new ArrayList<>();

  boolean reassignPartitions() {
      if (partitionAssignmentInProgress()) {
          logger.info("Partition assignment in progress");
          return false;
      }
      List<Migration> migrations = repartition(this.partitionTable);
      CompletableFuture proposalFuture = replicatedLog.propose(new MigratePartitionsCommand(migrations));
      proposalFuture.join();
      return true;
  }
public List<Migration> repartition(PartitionTable partitionTable) {
    int averagePartitionsPerNode = getAveragePartitionsPerNode();
    List<Member> liveMembers = membership.getLiveMembers();
    var overloadedNodes = partitionTable.getOverloadedNodes(averagePartitionsPerNode, liveMembers);
    var underloadedNodes = partitionTable.getUnderloadedNodes(averagePartitionsPerNode, liveMembers);

    var migrations = tryMovingPartitionsToUnderLoadedMembers(averagePartitionsPerNode, overloadedNodes, underloadedNodes);
    return migrations;
}

private List<Migration> tryMovingPartitionsToUnderLoadedMembers(int averagePartitionsPerNode,
                                                                Map<InetAddressAndPort, PartitionList> overloadedNodes,
                                                                Map<InetAddressAndPort, PartitionList> underloadedNodes) {
    List<Migration> migrations = new ArrayList<>();
    for (InetAddressAndPort member : overloadedNodes.keySet()) {
        var partitions = overloadedNodes.get(member);
        var toMove = partitions.subList(averagePartitionsPerNode, partitions.getSize());
        overloadedNodes.put(member, partitions.subList(0, averagePartitionsPerNode));
        ArrayDeque<Integer> moveQ = new ArrayDeque<Integer>(toMove.partitionList());
        while (!moveQ.isEmpty() && nodeWithLeastPartitions(underloadedNodes, averagePartitionsPerNode).isPresent()) {
            assignToNodesWithLeastPartitions(migrations, member, moveQ, underloadedNodes, averagePartitionsPerNode);
        }
        if (!moveQ.isEmpty()) {
            overloadedNodes.get(member).addAll(moveQ);
        }
    }
    return migrations;
}

int getAveragePartitionsPerNode() {
    return noOfPartitions / membership.getLiveMembers().size();
}

Koordinatör, çoğaltılan günlükte hesaplanan geçişleri sürdürecek ve ardından bölümleri küme düğümleri arasında taşımak için istekler gönderecektir.

private void applyMigratePartitionCommand(MigratePartitionsCommand command) {
    logger.info("Handling partition migrations " + command.migrations);
    for (Migration migration : command.migrations) {
        RequestPartitionMigrationMessage message = new RequestPartitionMigrationMessage(requestNumber++, this.listenAddress, migration);
        pendingMigrations.add(migration);
        if (isLeader()) {
            scheduler.execute(new RetryableTask(migration.fromMember, network, this, migration.getPartitionId(), message));
        }
    }
}

Bir küme düğümü, geçiş isteği aldığında, bölümü geçiş yapıyor olarak işaretler. Bu, bölümdeki diğer değişiklikleri durdurur. Daha sonra tüm bölüm verilerini hedef düğüme gönderir.

sınıf KVStore…

  private void handleRequestPartitionMigrationMessage(RequestPartitionMigrationMessage message) {
      Migration migration = message.getMigration();
      Integer partitionId = migration.getPartitionId();
      InetAddressAndPort toServer = migration.getToMember();
      if (!allPartitions.containsKey(partitionId)) {
          return;// The partition is not available with this node.
      }
      Partition partition = allPartitions.get(partitionId);
      partition.setMigrating();
      network.send(toServer, new MovePartitionMessage(requestNumber++, this.listenAddress, toServer, partition));
  }

İsteği alan küme düğümü, yeni bölümü kendisine ekleyecek ve bir onay döndürecektir.

sınıf KVStore…

  private void handleMovePartition(Message message) {
      MovePartitionMessage movePartitionMessage = (MovePartitionMessage) message;
      Partition partition = movePartitionMessage.getPartition();
      allPartitions.put(partition.getId(), partition);
      network.send(message.from, new PartitionMovementComplete(message.messageId, listenAddress,
              new Migration(movePartitionMessage.getMigrateFrom(), movePartitionMessage.getMigrateTo(),  partition.getId())));
  }

Daha önce bölüme sahip olan küme düğümü, geçiş tamamlandı iletisini küme koordinatörüne gönderir.

sınıf KVStore…

  private void handlePartitionMovementCompleteMessage(PartitionMovementComplete message) {
      allPartitions.remove(message.getMigration().getPartitionId());
      network.send(coordLeader, new MigrationCompleteMessage(requestNumber++, listenAddress,
              message.getMigration()));
  }

Küme koordinatörü daha sonra geçişi tamamlandı olarak işaretler. Değişiklik, çoğaltılan günlükte saklanır.

sınıf Küme Koordinatörü…

  private void handleMigrationCompleteMessage(MigrationCompleteMessage message) {
      MigrationCompleteMessage migrationCompleteMessage = message;
      CompletableFuture propose = replicatedLog.propose(new MigrationCompletedCommand(message.getMigration()));
      propose.join();
  }

sınıf Küme Koordinatörü…

  private void applyMigrationCompleted(MigrationCompletedCommand command) {
      pendingMigrations.remove(command.getMigration());
      logger.info("Completed migration " + command.getMigration());
      logger.info("pendingMigrations = " + pendingMigrations);
      partitionTable.migrationCompleted(command.getMigration());
  }

sınıf PartitionTable…

  public void migrationCompleted(Migration migration) {
      this.addPartition(migration.partitionId, new PartitionInfo(migration.partitionId, migration.toMember, ClusterCoordinator.PartitionStatus.ONLINE));
  }

Bir cevap yazın

Ready to Grow Your Business?

We Serve our Clients’ Best Interests with the Best Marketing Solutions. Find out More

How Can We Help You?

Need to bounce off ideas for an upcoming project or digital campaign? Looking to transform your business with the implementation of full potential digital marketing?

For any career inquiries, please visit our careers page here.
[contact-form-7 404 "Bulunamadı"]