In: Genel


Çözüm

Küme düğümü, bir anahtar ve bir geri arama işlevini eşleyen bir bekleme listesi tutar. Anahtar, geri aramayı başlatmak için belirli kriterlere bağlı olarak seçilir. Örneğin, başka bir küme düğümünden bir mesaj alındığında çağrılması gerekiyorsa,
Korelasyon Tanımlayıcı mesajın. Bu durumuda Çoğaltılan Günlük o
Yüksek Su İşareti. Geri arama, yanıtı işler ve müşteri isteğinin yerine getirilip getirilemeyeceğine karar verir.

Verilerin birden çok sunucuda çoğaltıldığı bir anahtar/değer deposu örneğini düşünün. Burada, yeter sayı istemciye bir yanıt başlatmak için bir çoğaltmanın ne zaman başarılı kabul edilebileceğine karar vermek için kullanılabilir. Küme düğümü daha sonra diğer küme düğümlerine gönderilen istekleri izler ve her istek için bir geri arama kaydedilir. Her istek bir ile işaretlenir Korelasyon Tanımlayıcı, isteğe verilen yanıtı eşlemek için kullanılır. Bekleme listesine daha sonra diğer küme düğümlerinden yanıt alındığında geri aramayı başlatması bildirilir.

Bu örnek için üç küme düğümümüze atina, bizans ve siren diyelim. İstemci, “başlığı” “Mikro hizmetler” olarak depolamak için Atina ile bağlantı kurar. Atina bunu Bizans ve siren üzerinde kopyalar; bu yüzden anahtar/değer çiftini saklamak için kendisine bir istek gönderir ve aynı anda hem bizans hem de sirene istek gönderir. Atina, yanıtları izlemek için bir WriteQuorumResponseCallback oluşturur ve bunu gönderilen her istek için bekleme listesine ekler.

Alınan her yanıt için, yanıtı işlemek üzere WriteQuorumResponseCallback çağrılır. Gerekli sayıda yanıtın alınıp alınmadığını kontrol eder. Bizans’tan yanıt alındıktan sonra yeter sayıya ulaşılır ve bekleyen müşteri talebi tamamlanır. Cyrene daha sonra yanıt verebilir, ancak yanıt beklemeden istemciye gönderilebilir.

Kod aşağıdaki örneğe benziyor: Her küme düğümünün kendi bekleme listesi örneğini koruduğunu unutmayın. Bekleme listesi, anahtarı ve ilişkili geri aramayı izler ve geri aramanın kaydedildiği zaman damgasını saklar. Zaman damgası, beklenen süre içinde yanıt alınmazsa, geri aramaların süresinin dolması gerekip gerekmediğini kontrol etmek için kullanılır.

public class RequestWaitingList<Key, Response> {
    private Map<Key, CallbackDetails> pendingRequests = new ConcurrentHashMap<>();
    public void add(Key key, RequestCallback<Response> callback) {
        pendingRequests.put(key, new CallbackDetails(callback, clock.nanoTime()));
    }
class CallbackDetails {
    RequestCallback requestCallback;
    long createTime;

    public CallbackDetails(RequestCallback requestCallback, long createTime) {
        this.requestCallback = requestCallback;
        this.createTime = createTime;
    }

    public RequestCallback getRequestCallback() {
        return requestCallback;
    }

    public long elapsedTime(long now) {
        return now - createTime;
    }
}
public interface RequestCallback<T> {
    void onResponse(T r);
    void onError(Throwable e);
}

Diğer küme düğümünden yanıt alındıktan sonra yanıtı veya hatayı işlemesi istenir.

sınıf İstekBekleme Listesi…

  public void handleResponse(Key key, Response response) {
      if (!pendingRequests.containsKey(key)) {
          return;
      }
      CallbackDetails callbackDetails = pendingRequests.remove(key);
      callbackDetails.getRequestCallback().onResponse(response);

  }

sınıf İstekBekleme Listesi…

  public void handleError(int requestId, Throwable e) {
      CallbackDetails callbackDetails = pendingRequests.remove(requestId);
      callbackDetails.getRequestCallback().onError(e);
  }

Bekleme listesi daha sonra, şuna benzer bir uygulama gibi görünen çekirdek yanıtlarını işlemek için kullanılabilir:

static class WriteQuorumCallback implements RequestCallback<RequestOrResponse> {
    private final int quorum;
    private volatile int expectedNumberOfResponses;
    private volatile int receivedResponses;
    private volatile int receivedErrors;
    private volatile boolean done;

    private final RequestOrResponse request;
    private final ClientConnection clientConnection;

    public WriteQuorumCallback(int totalExpectedResponses, RequestOrResponse clientRequest, ClientConnection clientConnection) {
        this.expectedNumberOfResponses = totalExpectedResponses;
        this.quorum = expectedNumberOfResponses / 2 + 1;
        this.request = clientRequest;
        this.clientConnection = clientConnection;
    }

    @Override
    public void onResponse(RequestOrResponse response) {
        receivedResponses++;
        if (receivedResponses == quorum && !done) {
            respondToClient("Success");
            done = true;
        }
    }

    @Override
    public void onError(Throwable t) {
        receivedErrors++;
        if (receivedErrors == quorum && !done) {
            respondToClient("Error");
            done = true;
        }
    }


    private void respondToClient(String response) {
        clientConnection.write(new RequestOrResponse(RequestId.SetValueResponse.getId(), response.getBytes(), request.getCorrelationId()));
    }
}

Bir küme düğümü diğer düğümlere istek gönderdiğinde, bekleme listesi eşlemeye bir geri arama ekler. Korelasyon Tanımlayıcı
gönderilen isteğin

sınıf ClusterNode…

  private void handleSetValueClientRequestRequiringQuorum(List<InetAddressAndPort> replicas, RequestOrResponse request, ClientConnection clientConnection) {
      int totalExpectedResponses = replicas.size();
      RequestCallback requestCallback = new WriteQuorumCallback(totalExpectedResponses, request, clientConnection);
      for (InetAddressAndPort replica : replicas) {
          int correlationId = nextRequestId();
          requestWaitingList.add(correlationId, requestCallback);
          try {
              SocketClient client = new SocketClient(replica);
              client.sendOneway(new RequestOrResponse(RequestId.SetValueRequest.getId(), request.getMessageBodyJson(), correlationId, listenAddress));
          } catch (IOException e) {
              requestWaitingList.handleError(correlationId, e);
          }
      }
  }

Yanıt alındıktan sonra, bekleme listesinden bununla ilgilenmesi istenir:

sınıf ClusterNode…

  private void handleSetValueResponse(RequestOrResponse response) {
      requestWaitingList.handleResponse(response.getCorrelationId(), response);
  }

Bekleme listesi daha sonra ilişkili WriteQuorumCallback’i çağırır. WriteQuorumCallback örneği, çekirdek yanıtlarının alınıp alınmadığını doğrular ve istemciye yanıt vermek için geri aramayı başlatır.

Süresi Dolan Uzun Bekleyen İstekler

Bazen diğer küme düğümlerinden gelen yanıtlar gecikir. Bu durumlarda, bekleme listesi genellikle bir zaman aşımından sonra istekleri sona erdirmek için bir mekanizmaya sahiptir:

sınıf İstekBekleme Listesi…

  private SystemClock clock;
  private ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
  private long expirationIntervalMillis = 2000;
  public RequestWaitingList(SystemClock clock) {
      this.clock = clock;
      executor.scheduleWithFixedDelay(this::expire, expirationIntervalMillis, expirationIntervalMillis, MILLISECONDS);
  }

  private void expire() {
      long now = clock.nanoTime();
      List<Key> expiredRequestKeys = getExpiredRequestKeys(now);
      expiredRequestKeys.stream().forEach(expiredRequestKey -> {
          CallbackDetails request = pendingRequests.remove(expiredRequestKey);
          request.requestCallback.onError(new TimeoutException("Request expired"));
      });
  }

  private List<Key> getExpiredRequestKeys(long now) {
      return pendingRequests.entrySet().stream().filter(entry -> entry.getValue().elapsedTime(now) > expirationIntervalMillis).map(e -> e.getKey()).collect(Collectors.toList());
  }

Bir cevap yazın

Ready to Grow Your Business?

We Serve our Clients’ Best Interests with the Best Marketing Solutions. Find out More

How Can We Help You?

Need to bounce off ideas for an upcoming project or digital campaign? Looking to transform your business with the implementation of full potential digital marketing?

For any career inquiries, please visit our careers page here.
[contact-form-7 404 "Bulunamadı"]