In: Genel


Çözüm

Birden çok isteği tek bir istek toplu işinde birleştirin. İsteğin toplu işlemi, işlenmek üzere küme düğümüne gönderilir. her bir istek, bireysel bir istekle tam olarak aynı şekilde işlenir. Daha sonra yanıtların toplu olarak yanıt verecektir.

Örnek olarak, istemcinin sunucuda birden çok anahtar/değer çifti depolamak için istek gönderdiği dağıtılmış bir anahtar/değer deposu düşünün. İstemci, isteği göndermek için bir çağrı aldığında, bunu hemen ağ üzerinden göndermez; bunun yerine, gönderilecek isteklerin bir sırasını tutar.

sınıf Müşteri…

  LinkedBlockingQueue<RequestEntry> requests = new LinkedBlockingQueue<>();

  public CompletableFuture send(SetValueRequest setValueRequest) {
      int requestId = enqueueRequest(setValueRequest);
      CompletableFuture responseFuture = trackPendingRequest(requestId);
      return responseFuture;
  }

  private int enqueueRequest(SetValueRequest setValueRequest) {
      int requestId = nextRequestId();
      byte[] requestBytes = serialize(setValueRequest, requestId);
      requests.add(new RequestEntry(requestBytes, clock.nanoTime()));
      return requestId;
  }
  private int nextRequestId() {
      return requestNumber++;
  }

İsteğin sorgulandığı zaman izlenir; bu daha sonra isteğin toplu işin bir parçası olarak gönderilip gönderilmeyeceğine karar vermek için kullanılır.

sınıf RequestEntry…

  class RequestEntry {
      byte[] serializedRequest;
      long createdTime;
  
      public RequestEntry(byte[] serializedRequest, long createdTime) {
          this.serializedRequest = serializedRequest;
          this.createdTime = createdTime;
      }

Ardından, bir yanıt alındığında tamamlanmak üzere bekleyen istekleri izler. Her isteğe, yanıtı eşleştirmek ve istekleri tamamlamak için kullanılabilecek benzersiz bir istek numarası atanacaktır.

sınıf Müşteri…

  Map<Integer, CompletableFuture> pendingRequests = new ConcurrentHashMap<>();

  private CompletableFuture trackPendingRequest(Integer correlationId) {
      CompletableFuture responseFuture = new CompletableFuture();
      pendingRequests.put(correlationId, responseFuture);
      return responseFuture;
  }

İstemci, sıraya alınan istekleri sürekli olarak izleyen ayrı bir görev başlatır.

sınıf Müşteri…

  public Client(Config config, InetAddressAndPort serverAddress, SystemClock clock) {
      this.clock = clock;
      this.sender = new Sender(config, serverAddress, clock);
      this.sender.start();
  }

sınıf Gönderen…

  @Override
  public void run() {
      while (isRunning) {
          boolean maxWaitTimeElapsed = requestsWaitedFor(config.getMaxBatchWaitTime());
          boolean maxBatchSizeReached = maxBatchSizeReached(requests);
          if (maxWaitTimeElapsed || maxBatchSizeReached) {
              RequestBatch batch = createBatch(requests);
              try {
                  BatchResponse batchResponse = sendBatchRequest(batch, address);
                  handleResponse(batchResponse);

              } catch (IOException e) {
                  batch.getPackedRequests().stream().forEach(r -> {
                      pendingRequests.get(r.getCorrelationId()).completeExceptionally(e);
                  });
              }
          }
      }
  }

  private RequestBatch createBatch(LinkedBlockingQueue<RequestEntry> requests) {
      RequestBatch batch = new RequestBatch(MAX_BATCH_SIZE_BYTES);
      RequestEntry entry = requests.peek();
      while (entry != null && batch.hasSpaceFor(entry.getRequest())) {
          batch.add(entry.getRequest());
          requests.remove(entry);
          entry = requests.peek();
      }
      return batch;
  }

sınıf RequestBatch…

  public boolean hasSpaceFor(byte[] requestBytes) {
      return batchSize() + requestBytes.length <= maxSize;
  }
  private int batchSize() {
      return requests.stream().map(r->r.length).reduce(0, Integer::sum);
  }

Genelde yapılan iki kontrol vardır.

  • Yığını yapılandırılan maksimum boyuta doldurmak için yeterli istek birikmişse.
  • sınıf Gönderen…

  private boolean maxBatchSizeReached(Queue<RequestEntry> requests) {
      return accumulatedRequestSize(requests) > MAX_BATCH_SIZE_BYTES;
  }

  private int accumulatedRequestSize(Queue<RequestEntry> requests) {
      return requests.stream().map(re -> re.size()).reduce((r1, r2) -> r1 + r2).orElse(0);
  }
  • Partinin doldurulması için sonsuza kadar bekleyemediğimiz için, küçük bir bekleme süresi yapılandırabiliriz. Gönderen görevi bekler ve ardından isteğin maksimum bekleme süresinden önce eklenip eklenmediğini kontrol eder.

  • sınıf Gönderen…

      private boolean requestsWaitedFor(long batchingWindowInMs) {
          RequestEntry oldestPendingRequest = requests.peek();
          if (oldestPendingRequest == null) {
              return false;
          }
          long oldestEntryWaitTime = clock.nanoTime() - oldestPendingRequest.createdTime;
          return oldestEntryWaitTime > batchingWindowInMs;
      }

    Bu koşullardan herhangi biri yerine getirildiğinde toplu istek sunucuya gönderilebilir. Sunucu toplu isteği paketinden çıkarır ve bireysel isteklerin her birini işler.

    sınıf Sunucu…

      private void handleBatchRequest(RequestOrResponse batchRequest, ClientConnection clientConnection) {
          RequestBatch batch = JsonSerDes.deserialize(batchRequest.getMessageBodyJson(), RequestBatch.class);
          List<RequestOrResponse> requests = batch.getPackedRequests();
          List<RequestOrResponse> responses = new ArrayList<>();
          for (RequestOrResponse request : requests) {
              RequestOrResponse response = handleSetValueRequest(request);
              responses.add(response);
          }
          sendResponse(batchRequest, clientConnection, new BatchResponse(responses));
      }
    
      private RequestOrResponse handleSetValueRequest(RequestOrResponse request) {
          SetValueRequest setValueRequest = JsonSerDes.deserialize(request.getMessageBodyJson(), SetValueRequest.class);
          kv.put(setValueRequest.getKey(), setValueRequest.getValue());
          RequestOrResponse response = new RequestOrResponse(RequestId.SetValueResponse.getId(), "Success".getBytes(), request.getCorrelationId());
          return response;
      }

    İstemci toplu yanıtı alır ve bekleyen tüm istekleri tamamlar.

    sınıf Gönderen…

      private void handleResponse(BatchResponse batchResponse) {
          List<RequestOrResponse> responseList = batchResponse.getResponseList();
          logger.debug("Completing requests from " + responseList.get(0).getCorrelationId() + " to " + responseList.get(responseList.size() - 1).getCorrelationId());
          responseList.stream().forEach(r -> {
              CompletableFuture completableFuture = pendingRequests.remove(r.getCorrelationId());
              if (completableFuture != null) {
                  completableFuture.complete(r);
              } else {
                  logger.error("no pending request for " + r.getCorrelationId());
              }
          });
      }

    Teknik Hususlar

    Toplu iş boyutu, bireysel mesajların boyutuna ve mevcut ağ bant genişliğine ve ayrıca gerçek yaşam yüküne dayalı olarak gözlemlenen gecikme ve aktarım hızı iyileştirmelerine göre seçilmelidir. Bunlar, daha küçük mesaj boyutları ve sunucu tarafı işleme için en uygun toplu iş boyutu varsayılarak bazı makul varsayılanlara yapılandırılır. Örneğin, Kafkas varsayılan toplu iş boyutu 16Kb’dir. Ayrıca varsayılan değeri 0 olan “linger.ms” adlı bir yapılandırma parametresine sahiptir. Ancak, iletilerin boyutu daha büyükse, daha yüksek bir toplu iş boyutu daha iyi sonuç verebilir.

    Çok büyük bir parti büyüklüğüne sahip olmak, muhtemelen yalnızca azalan getiriler sunacaktır. Örneğin, MB cinsinden bir parti boyutuna sahip olmak, işleme açısından daha fazla ek yük getirebilir. Parti boyutu parametresinin tipik olarak performans testi yoluyla yapılan gözlemlere göre ayarlanmasının nedeni budur.

    Bir istek toplu işi genellikle aşağıdakilerle birlikte kullanılır: Talep Hattı
    genel verimi ve gecikmeyi iyileştirmek için.

    Küme düğümlerine istek göndermek için yeniden deneme-geri çekme ilkesi kullanıldığında, toplu isteğin tamamı yeniden denenir. Küme düğümü, toplu işin bir kısmını zaten işlemiş olabilir; bu nedenle yeniden denemenin sorunsuz çalışmasını sağlamak için uygulamalısınız Idempotent Alıcı.

    Bir cevap yazın

    Ready to Grow Your Business?

    We Serve our Clients’ Best Interests with the Best Marketing Solutions. Find out More

    How Can We Help You?

    Need to bounce off ideas for an upcoming project or digital campaign? Looking to transform your business with the implementation of full potential digital marketing?

    For any career inquiries, please visit our careers page here.
    [contact-form-7 404 "Bulunamadı"]