網(wǎng)上有很多關(guān)于pos機源碼資源,Dledger日志復(fù)制源碼分析的知識,也有很多人為大家解答關(guān)于pos機源碼資源的問題,今天pos機之家(www.www690aa.com)為大家整理了關(guān)于這方面的知識,讓我們一起來看下吧!
本文目錄一覽:
1、pos機源碼資源
pos機源碼資源
from:cnblogs.com/shanml/p/17153989.htm
Broker收到消息后會調(diào)用CommitLog的asyncPutMessage方法寫入消息,在DLedger模式下使用的是DLedgerCommitLog,進入asyncPutMessages方法,主要處理邏輯如下:
調(diào)用serialize方法將消息數(shù)據(jù)序列化;構(gòu)建批量消息追加請求BatchAppendEntryrequest,并設(shè)置上一步序列化的消息數(shù)據(jù);調(diào)用handleAppend方法提交消息追加請求,進行消息寫入;public class DLedgerCommitLog extends CommitLog { @Override public CompletableFuture<PutMessageResult> asyncPutMessages(MessageExtBatch messageExtBatch) { // ... AppendMessageResult appendResult; BatchAppendFuture<AppendEntryResponse> dledgerFuture; EncodeResult encodeResult; // 將消息數(shù)據(jù)序列化 encodeResult = this.messageSerializer.serialize(messageExtBatch); if (encodeResult.status != AppendMessageStatus.PUT_OK) { return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult .status))); } putMessagelock.lock(); msgIdBuilder.setLength(0); long elapsedTimeInLock; long queueOffset; int MsgNum = 0; try { beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now(); queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey, tranType); encodeResult.setQueueOffsetKey(queueOffset, true); // 創(chuàng)建批量追加消息請求 BatchAppendEntryRequest request = new BatchAppendEntryRequest(); request.setGroup(dLedgerConfig.getGroup()); // 設(shè)置group request.setRemoteId(dLedgerServer.getMemberState().getSelfId()); // 從EncodeResult中獲取序列化的消息數(shù)據(jù) request.setBatchMsgs(encodeResult.batchData); // 調(diào)用handleAppend將數(shù)據(jù)寫入 AppendFuture<AppendEntryResponse> appendFuture = (AppendFuture<AppendEntryResponse>) dLedgerServer.handleAppend(request); if (appendFuture.getPos() == -1) { log.warn("HandleAppend return false due to error code {}", appendFuture.get().getCode()); return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); } // ... } catch (Exception e) { log.error("Put message error", e); return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); } finally { beginTimeInDledgerLock = 0; putMessageLock.unlock(); } // ... }); }}序列化
在serialize方法中,主要是將消息數(shù)據(jù)序列化到內(nèi)存buffer,由于消息可能有多條,所以開啟循環(huán)讀取每一條數(shù)據(jù)進行序列化:
讀取總數(shù)據(jù)大小、魔數(shù)和CRC校驗和,這三步是為了讓buffer的讀取指針向后移動;讀取FLAG,記在flag變量;讀取消息長度,記在bodyLen變量;接下來是消息內(nèi)容開始位置,將開始位置記錄在bodyPos變量;從消息內(nèi)容開始位置,讀取消息內(nèi)容計算CRC校驗和;更改buffer讀取指針位置,將指針從bodyPos開始移動bodyLen個位置,也就是跳過消息內(nèi)容,繼續(xù)讀取下一個數(shù)據(jù);讀取消息屬性長度,記錄消息屬性開始位置;獲取主題信息并計算數(shù)據(jù)的長度;計算消息長度,并根據(jù)消息長度分配內(nèi)存;校驗消息長度是否超過限制;初始化內(nèi)存空間,將消息的相關(guān)內(nèi)容依次寫入;返回序列化結(jié)果EncodeResult;class MessageSerializer { public EncodeResult serialize(final MessageExtBatch messageExtBatch) { // 設(shè)置Key:top+queueId String key = messageExtBatch.getTopic() + "-" + messageExtBatch.getQueueId(); int totalMsgLen = 0; // 獲取消息數(shù)據(jù) byteBuffer messagesByteBuff = messageExtBatch.wrap(); List<byte[]> batchBody = new LinkedList<>(); // 獲取系統(tǒng)標識 int sysFlag = messageExtBatch.getSysFlag(); int bornHostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4; int storeHostLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 : 16 + 4; // 分配內(nèi)存 ByteBuffer bornHostHolder = ByteBuffer.allocate(bornHostLength); ByteBuffer storeHostHolder = ByteBuffer.allocate(storeHostLength); // 是否有剩余數(shù)據(jù)未讀取 while (messagesByteBuff.hasRemaining()) { // 讀取總大小 messagesByteBuff.getInt(); // 讀取魔數(shù) messagesByteBuff.getInt(); // 讀取CRC校驗和 messagesByteBuff.getInt(); // 讀取FLAG int flag = messagesByteBuff.getInt(); // 讀取消息長度 int bodyLen = messagesByteBuff.getInt(); // 記錄消息內(nèi)容開始位置 int bodyPos = messagesByteBuff.position(); // 從消息內(nèi)容開始位置,讀取消息內(nèi)容計算CRC校驗和 int bodyCrc = UtilAll.crc32(messagesByteBuff.array(), bodyPos, bodyLen); // 更改位置,將指針從bodyPos開始移動bodyLen個位置,也就是跳過消息內(nèi)容,繼續(xù)讀取下一個數(shù)據(jù) messagesByteBuff.position(bodyPos + bodyLen); // 讀取消息屬性長度 short propertiesLen = messagesByteBuff.getShort(); // 記錄消息屬性位置 int propertiesPos = messagesByteBuff.position(); // 更改位置,跳過消息屬性 messagesByteBuff.position(propertiesPos + propertiesLen); // 獲取主題信息 final byte[] topicData = messageExtBatch.getTopic().getBytes(MessageDecoder.CHARSET_UTF8); // 主題字節(jié)數(shù)組長度 final int topicLength = topicData.length; // 計算消息長度 final int msgLen = calMsgLength(messageExtBatch.getSysFlag(), bodyLen, topicLength, propertiesLen); // 根據(jù)消息長度分配內(nèi)存 ByteBuffer msgStoreItemMemory = ByteBuffer.allocate(msgLen); // 如果超過了最大消息大小 if (msgLen > this.maxMessageSize) { CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLen + ", maxMessageSize: " + this.maxMessageSize); throw new RuntimeException("message size exceeded"); } // 更新總長度 totalMsgLen += msgLen; // 如果超過了最大消息大小 if (totalMsgLen > maxMessageSize) { throw new RuntimeException("message size exceeded"); } // 初始化內(nèi)存空間 this.resetByteBuffer(msgStoreItemMemory, msgLen); // 1 寫入長度 msgStoreItemMemory.putInt(msgLen); // 2 寫入魔數(shù) msgStoreItemMemory.putInt(DLedgerCommitLog.MESSAGE_MAGIC_CODE); // 3 寫入CRC校驗和 msgStoreItemMemory.putInt(bodyCrc); // 4 寫入QUEUEID msgStoreItemMemory.putInt(messageExtBatch.getQueueId()); // 5 寫入FLAG msgStoreItemMemory.putInt(flag); // 6 寫入隊列偏移量QUEUEOFFSET msgStoreItemMemory.putLong(0L); // 7 寫入物理偏移量 msgStoreItemMemory.putLong(0); // 8 寫入系統(tǒng)標識SYSFLAG msgStoreItemMemory.putInt(messageExtBatch.getSysFlag()); // 9 寫入消息產(chǎn)生的時間戳 msgStoreItemMemory.putLong(messageExtBatch.getBornTimestamp()); // 10 BORNHOST resetByteBuffer(bornHostHolder, bornHostLength); msgStoreItemMemory.put(messageExtBatch.getBornHostBytes(bornHostHolder)); // 11 寫入消息存儲時間戳 msgStoreItemMemory.putLong(messageExtBatch.getStoreTimestamp()); // 12 STOREHOSTADDRESS resetByteBuffer(storeHostHolder, storeHostLength); msgStoreItemMemory.put(messageExtBatch.getStoreHostBytes(storeHostHolder)); // 13 RECONSUMETIMES msgStoreItemMemory.putInt(messageExtBatch.getReconsumeTimes()); // 14 Prepared Transaction Offset msgStoreItemMemory.putLong(0); // 15 寫入消息內(nèi)容長度 msgStoreItemMemory.putInt(bodyLen); if (bodyLen > 0) { // 寫入消息內(nèi)容 msgStoreItemMemory.put(messagesByteBuff.array(), bodyPos, bodyLen); } // 16 寫入主題 msgStoreItemMemory.put((byte) topicLength); msgStoreItemMemory.put(topicData); // 17 寫入屬性長度 msgStoreItemMemory.putShort(propertiesLen); if (propertiesLen > 0) { msgStoreItemMemory.put(messagesByteBuff.array(), propertiesPos, propertiesLen); } // 創(chuàng)建字節(jié)數(shù)組 byte[] data = new byte[msgLen]; msgStoreItemMemory.clear(); msgStoreItemMemory.get(data); // 加入到消息集合 batchBody.add(data); } // 返回結(jié)果 return new EncodeResult(AppendMessageStatus.PUT_OK, key, batchBody, totalMsgLen); }} 寫入消息
將消息數(shù)據(jù)序列化之后,封裝了消息追加請求,調(diào)用handleAppend方法寫入消息,處理邏輯如下:
獲取當(dāng)前的Term,判斷當(dāng)前Term對應(yīng)的寫入請求數(shù)量是否超過了最大值,如果未超過進入下一步,如果超過,設(shè)置響應(yīng)狀態(tài)為LEADER_PENDING_FULL表示處理的消息追加請求數(shù)量過多,拒絕處理當(dāng)前請求;校驗是否是批量請求:如果是:遍歷每一個消息,為消息創(chuàng)建DLedgerEntry對象,調(diào)用appendAsLeader將消息寫入到Leader節(jié)點, 并調(diào)用waitAck為最后最后一條消息創(chuàng)建異步響應(yīng)對象;如果不是:直接為消息創(chuàng)建DLedgerEntry對象,調(diào)用appendAsLeader將消息寫入到Leader節(jié)點并調(diào)用waitAck創(chuàng)建異步響應(yīng)對象;public class DLedgerServer implements DLedgerProtocolHander { @Override public CompletableFuture<AppendEntryResponse> handleAppend(AppendEntryRequest request) throws IOException { try { PreConditions.check(memberState.getSelfId().equals(request.getRemoteId()), DLedgerResponseCode.UNKNOWN_MEMBER, "%s != %s", request.getRemoteId(), memberState.getSelfId()); PreConditions.check(memberState.getGroup().equals(request.getGroup()), DLedgerResponseCode.UNKNOWN_GROUP, "%s != %s", request.getGroup(), memberState.getGroup()); // 校驗是否是Leader節(jié)點,如果不是Leader拋出NOT_LEADER異常 PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER); PreConditions.check(memberState.getTransferee() == null, DLedgerResponseCode.LEADER_TRANSFERRING); // 獲取當(dāng)前的Term long currTerm = memberState.currTerm(); // 判斷Pengding請求的數(shù)量 if (dLedgerEntryPusher.isPendingFull(currTerm)) { AppendEntryResponse appendEntryResponse = new AppendEntryResponse(); appendEntryResponse.setGroup(memberState.getGroup()); // 設(shè)置響應(yīng)結(jié)果LEADER_PENDING_FULL appendEntryResponse.setCode(DLedgerResponseCode.LEADER_PENDING_FULL.getCode()); // 設(shè)置Term appendEntryResponse.setTerm(currTerm); appendEntryResponse.setLeaderId(memberState.getSelfId()); // 設(shè)置LeaderID return AppendFuture.newCompletedFuture(-1, appendEntryResponse); } else { if (request instanceof BatchAppendEntryRequest) { // 批量 BatchAppendEntryRequest batchRequest = (BatchAppendEntryRequest) request; if (batchRequest.getBatchMsgs() != null && batchRequest.getBatchMsgs().size() != 0) { long[] positions = new long[batchRequest.getBatchMsgs().size()]; DLedgerEntry resEntry = null; int index = 0; // 遍歷每一個消息 Iterator<byte[]> iterator = batchRequest.getBatchMsgs().iterator(); while (iterator.hasNext()) { // 創(chuàng)建DLedgerEntry DLedgerEntry dLedgerEntry = new DLedgerEntry(); // 設(shè)置消息內(nèi)容 dLedgerEntry.setBody(iterator.next()); // 寫入消息 resEntry = dLedgerStore.appendAsLeader(dLedgerEntry); positions[index++] = resEntry.getPos(); } // 為最后一個dLedgerEntry創(chuàng)建異步響應(yīng)對象 BatchAppendFuture<AppendEntryResponse> batchAppendFuture = (BatchAppendFuture<AppendEntryResponse>) dLedgerEntryPusher.waitAck(resEntry, true); batchAppendFuture.setPositions(positions); return batchAppendFuture; } throw new DLedgerException(DLedgerResponseCode.REQUEST_WITH_EMPTY_BODYS, "BatchAppendEntryRequest" + " with empty bodys"); } else { // 普通消息 DLedgerEntry dLedgerEntry = new DLedgerEntry(); // 設(shè)置消息內(nèi)容 dLedgerEntry.setBody(request.getBody()); // 寫入消息 DLedgerEntry resEntry = dLedgerStore.appendAsLeader(dLedgerEntry); // 等待響應(yīng),創(chuàng)建異步響應(yīng)對象 return dLedgerEntryPusher.waitAck(resEntry, false); } } } catch (DLedgerException e) { // ... } }}
pendingAppendResponsesByTermDLedgerEntryPusher中有一個pendingAppendResponsesByTerm成員變量,KEY為Term的值,VALUE是一個ConcurrentHashMap,KEY為消息的index(每條消息的編號,從0開始,后面會提到),ConcurrentMap的KEY為消息的index,value為此條消息寫入請求的異步響應(yīng)對象AppendEntryResponse:
調(diào)用isPendingFull方法的時候,會先校驗當(dāng)前Term是否在pendingAppendResponsesByTerm中有對應(yīng)的值,如果沒有,創(chuàng)建一個ConcurrentHashMap進行初始化,否則獲取對應(yīng)的ConcurrentHashMap里面數(shù)據(jù)的個數(shù),與MaxPendingRequestsNum做對比,校驗是否超過了最大值:
public class DLedgerEntryPusher { // 外層的KEY為Term的值,value是一個ConcurrentMap // ConcurrentMap的KEY為消息的index,value為此條消息寫入請求的異步響應(yīng)對象AppendEntryResponse private Map<Long, ConcurrentMap<Long, TimeoutFuture<AppendEntryResponse>>> pendingAppendResponsesByTerm = new ConcurrentHashMap<>(); public boolean isPendingFull(long currTerm) { // 校驗currTerm是否在pendingAppendResponsesByTerm中 checkTermForPendingMap(currTerm, "isPendingFull"); // 判斷當(dāng)前Term對應(yīng)的寫入請求數(shù)量是否超過了最大值 return pendingAppendResponsesByTerm.get(currTerm).size() > dLedgerConfig.getMaxPendingRequestsNum(); } private void checkTermForPendingMap(long term, String env) { // 如果pendingAppendResponsesByTerm不包含 if (!pendingAppendResponsesByTerm.containsKey(term)) { logger.info("Initialize the pending append map in {} for term={}", env, term); // 創(chuàng)建一個ConcurrentHashMap加入到pendingAppendResponsesByTerm pendingAppendResponsesByTerm.putIfAbsent(term, new ConcurrentHashMap<>()); } }}
pendingAppendResponsesByTerm的值是在什么時候加入的?在寫入Leader節(jié)點之后,調(diào)用DLedgerEntryPusher的waitAck方法(后面會講到)的時候,如果集群中有多個節(jié)點,會為當(dāng)前的請求創(chuàng)建AppendFuture<AppendEntryResponse>響應(yīng)對象加入到pendingAppendResponsesByTerm中,所以可以通過pendingAppendResponsesByTerm中存放的響應(yīng)對象數(shù)量判斷當(dāng)前Term有多少個在等待的寫入請求:
// 創(chuàng)建響應(yīng)對象 AppendFuture<AppendEntryResponse> future; // 創(chuàng)建AppendFuture if (isBatchWait) { // 批量 future = new BatchAppendFuture<>(dLedgerConfig.getMaxWaitAckTimeMs()); } else { future = new AppendFuture<>(dLedgerConfig.getMaxWaitAckTimeMs()); } future.setPos(entry.getPos()); // 將創(chuàng)建的AppendFuture對象加入到pendingAppendResponsesByTerm中 CompletableFuture<AppendEntryResponse> old = pendingAppendResponsesByTerm.get(entry.getTerm()).put(entry.getIndex(), future);寫入Leader
DLedgerStore有兩個實現(xiàn)類,分別為DLedgerMemoryStore(基于內(nèi)存存儲)和DLedgerMmapFileStore(基于Mmap文件映射):
在createDLedgerStore方法中可以看到,是根據(jù)配置的存儲類型進行選擇的:
public class DLedgerServer implements DLedgerProtocolHander { public DLedgerServer(DLedgerConfig dLedgerConfig) { this.dLedgerConfig = dLedgerConfig; this.memberState = new MemberState(dLedgerConfig); // 根據(jù)配置中的StoreType創(chuàng)建DLedgerStore this.dLedgerStore = createDLedgerStore(dLedgerConfig.getStoreType(), this.dLedgerConfig, this.memberState); // ... } // 創(chuàng)建DLedgerStore private DLedgerStore createDLedgerStore(String storeType, DLedgerConfig config, MemberState memberState) { if (storeType.equals(DLedgerConfig.MEMORY)) { return new DLedgerMemoryStore(config, memberState); } else { return new DLedgerMmapFileStore(config, memberState); } }}appendAsLeader
接下來以DLedgerMmapFileStore為例,看下appendAsLeader的處理邏輯:
進行Leader節(jié)點校驗和磁盤已滿校驗;獲取日志數(shù)據(jù)buffer(dataBuffer)和索引數(shù)據(jù)buffer(indexBuffer),會先將內(nèi)容寫入buffer,再將buffer內(nèi)容寫入文件;將entry消息內(nèi)容寫入dataBuffer;設(shè)置消息的index(為每條消息進行了編號),為ledgerEndIndex + 1,ledgerEndIndex初始值為-1,新增一條消息ledgerEndIndex的值也會增1,ledgerEndIndex是隨著消息的增加而遞增的,寫入成功之后會更新ledgerEndIndex的值,ledgerEndIndex記錄最后一條成功寫入消息的index;調(diào)用dataFileList的append方法將dataBuffer內(nèi)容寫入日志文件,返回數(shù)據(jù)在文件中的偏移量;將索引信息寫入indexBuffer;調(diào)用indexFileList的append方法將indexBuffer內(nèi)容寫入索引文件;ledgerEndIndex加1;設(shè)置ledgerEndTerm的值為當(dāng)前Term;調(diào)用updateLedgerEndIndexAndTerm方法更新MemberState中記錄的LedgerEndIndex和LedgerEndTerm的值,LedgerEndIndex會在FLUSH的時候,將內(nèi)容寫入到文件進行持久化保存。public class DLedgerMmapFileStore extends DLedgerStore { // 日志數(shù)據(jù)buffer private ThreadLocal<ByteBuffer> localEntryBuffer; // 索引數(shù)據(jù)buffer private ThreadLocal<ByteBuffer> localIndexBuffer; @Override public DLedgerEntry appendAsLeader(DLedgerEntry entry) { // Leader校驗判斷當(dāng)前節(jié)點是否是Leader PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER); // 磁盤是否已滿校驗 PreConditions.check(!isDiskFull, DLedgerResponseCode.DISK_FULL); // 獲取日志數(shù)據(jù)buffer ByteBuffer dataBuffer = localEntryBuffer.get(); // 獲取索引數(shù)據(jù)buffer ByteBuffer indexBuffer = localIndexBuffer.get(); // 將entry消息內(nèi)容寫入dataBuffer DLedgerEntryCoder.encode(entry, dataBuffer); int entrySize = dataBuffer.remaining(); synchronized (memberState) { PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER, null); PreConditions.check(memberState.getTransferee() == null, DLedgerResponseCode.LEADER_TRANSFERRING, null); // 設(shè)置消息的index,為ledgerEndIndex + 1 long nextIndex = ledgerEndIndex + 1; // 設(shè)置消息的index entry.setIndex(nextIndex); // 設(shè)置Term entry.setTerm(memberState.currTerm()); // 設(shè)置魔數(shù) entry.setMagic(CURRENT_MAGIC); // 設(shè)置Term的Index DLedgerEntryCoder.setIndexTerm(dataBuffer, nextIndex, memberState.currTerm(), CURRENT_MAGIC); long prePos = dataFileList.preAppend(dataBuffer.remaining()); entry.setPos(prePos); PreConditions.check(prePos != -1, DLedgerResponseCode.DISK_ERROR, null); DLedgerEntryCoder.setPos(dataBuffer, prePos); for (AppendHook writeHook : appendHooks) { writeHook.doHook(entry, dataBuffer.slice(), DLedgerEntry.BODY_OFFSET); } // 將dataBuffer內(nèi)容寫入日志文件,返回數(shù)據(jù)的位置 long dataPos = dataFileList.append(dataBuffer.array(), 0, dataBuffer.remaining()); PreConditions.check(dataPos != -1, DLedgerResponseCode.DISK_ERROR, null); PreConditions.check(dataPos == prePos, DLedgerResponseCode.DISK_ERROR, null); // 將索引信息寫入indexBuffer DLedgerEntryCoder.encodeIndex(dataPos, entrySize, CURRENT_MAGIC, nextIndex, memberState.currTerm(), indexBuffer); // 將indexBuffer內(nèi)容寫入索引文件 long indexPos = indexFileList.append(indexBuffer.array(), 0, indexBuffer.remaining(), false); PreConditions.check(indexPos == entry.getIndex() * INDEX_UNIT_SIZE, DLedgerResponseCode.DISK_ERROR, null); if (logger.isDebugEnabled()) { logger.info("[{}] Append as Leader {} {}", memberState.getSelfId(), entry.getIndex(), entry.getBody().length); } // ledgerEndIndex自增 ledgerEndIndex++; // 設(shè)置ledgerEndTerm的值為當(dāng)前Term ledgerEndTerm = memberState.currTerm(); if (ledgerBeginIndex == -1) { // 更新ledgerBeginIndex ledgerBeginIndex = ledgerEndIndex; } // 更新LedgerEndIndex和LedgerEndTerm updateLedgerEndIndexAndTerm(); return entry; } }}更新LedgerEndIndex和LedgerEndTerm
在消息寫入Leader之后,會調(diào)用getLedgerEndIndex和getLedgerEndTerm法獲取DLedgerMmapFileStore中記錄的LedgerEndIndex和LedgerEndTerm的值,然后更新到MemberState中:
public abstract class DLedgerStore { protected void updateLedgerEndIndexAndTerm() { if (getMemberState() != null) { // 調(diào)用MemberState的updateLedgerIndexAndTerm進行更新 getMemberState().updateLedgerIndexAndTerm(getLedgerEndIndex(), getLedgerEndTerm()); } }}public class MemberState { private volatile long ledgerEndIndex = -1; private volatile long ledgerEndTerm = -1; // 更新ledgerEndIndex和ledgerEndTerm public void updateLedgerIndexAndTerm(long index, long term) { this.ledgerEndIndex = index; this.ledgerEndTerm = term; }}waitAck
在消息寫入Leader節(jié)點之后,由于Leader節(jié)點需要向Follwer節(jié)點轉(zhuǎn)發(fā)日志,這個過程是異步處理的,所以會在waitAck方法中為消息的寫入創(chuàng)建異步響應(yīng)對象,主要處理邏輯如下:
調(diào)用updatePeerWaterMark更新水位線,因為Leader節(jié)點需要將日志轉(zhuǎn)發(fā)給各個Follower,這個水位線其實是記錄每個節(jié)點消息的復(fù)制進度,也就是復(fù)制到哪條消息,將消息的index記錄下來,這里更新的是Leader節(jié)點最新寫入消息的index,后面會看到Follower節(jié)點的更新;如果集群中只有一個節(jié)點,創(chuàng)建AppendEntryResponse返回響應(yīng);如果集群中有多個節(jié)點,由于日志轉(zhuǎn)發(fā)是異步進行的,所以創(chuàng)建異步響應(yīng)對象AppendFuture<AppendEntryResponse>,并將創(chuàng)建的對象加入到pendingAppendResponsesByTerm中,pendingAppendResponsesByTerm的數(shù)據(jù)就是在這里加入的;這里再區(qū)分一下pendingAppendResponsesByTerm和peerWaterMarksByTerm:pendingAppendResponsesByTerm中記錄的是每條消息寫入請求的異步響應(yīng)對象AppendEntryResponse,因為要等待集群中大多數(shù)節(jié)點的響應(yīng),所以使用了異步處理,之后獲取處理結(jié)果。peerWaterMarksByTerm中記錄的是每個節(jié)點的消息復(fù)制進度,保存的是每個節(jié)點最后一條成功寫入的消息的index。
public class DLedgerEntryPusher { public CompletableFuture<AppendEntryResponse> waitAck(DLedgerEntry entry, boolean isBatchWait) { // 更新當(dāng)前節(jié)點最新寫入消息的index updatePeerWaterMark(entry.getTerm(), memberState.getSelfId(), entry.getIndex()); // 如果集群中只有一個節(jié)點 if (memberState.getPeerMap().size() == 1) { // 創(chuàng)建響應(yīng) AppendEntryResponse response = new AppendEntryResponse(); response.setGroup(memberState.getGroup()); response.setLeaderId(memberState.getSelfId()); response.setIndex(entry.getIndex()); response.setTerm(entry.getTerm()); response.setPos(entry.getPos()); if (isBatchWait) { return BatchAppendFuture.newCompletedFuture(entry.getPos(), response); } return AppendFuture.newCompletedFuture(entry.getPos(), response); } else { // pendingAppendResponsesByTerm checkTermForPendingMap(entry.getTerm(), "waitAck"); // 響應(yīng)對象 AppendFuture<AppendEntryResponse> future; // 創(chuàng)建AppendFuture if (isBatchWait) { // 批量 future = new BatchAppendFuture<>(dLedgerConfig.getMaxWaitAckTimeMs()); } else { future = new AppendFuture<>(dLedgerConfig.getMaxWaitAckTimeMs()); } future.setPos(entry.getPos()); // 將創(chuàng)建的AppendFuture對象加入到pendingAppendResponsesByTerm中 CompletableFuture<AppendEntryResponse> old = pendingAppendResponsesByTerm.get(entry.getTerm()).put(entry.getIndex(), future); if (old != null) { logger.warn("[MONITOR] get old wait at index={}", entry.getIndex()); } return future; } }}日志復(fù)制
消息寫入Leader之后,Leader節(jié)點會將消息轉(zhuǎn)發(fā)給其他Follower節(jié)點,這個過程是異步進行處理的,接下來看下消息的復(fù)制過程。
在DLedgerEntryPusher的startup方法中會啟動以下線程:
EntryDispatcher:用于Leader節(jié)點向Follwer節(jié)點轉(zhuǎn)發(fā)日志;EntryHandler:用于Follower節(jié)點處理Leader節(jié)點發(fā)送的日志;QuorumAckChecker:用于Leader節(jié)點等待Follower節(jié)點同步;需要注意的是,Leader節(jié)點會為每個Follower節(jié)點創(chuàng)建EntryDispatcher轉(zhuǎn)發(fā)器,每一個EntryDispatcher負責(zé)一個節(jié)點的日志轉(zhuǎn)發(fā),多個節(jié)點之間是并行處理的。
public class DLedgerEntryPusher { public DLedgerEntryPusher(DLedgerConfig dLedgerConfig, MemberState memberState, DLedgerStore dLedgerStore, DLedgerRpcService dLedgerRpcService) { this.dLedgerConfig = dLedgerConfig; this.memberState = memberState; this.dLedgerStore = dLedgerStore; this.dLedgerRpcService = dLedgerRpcService; for (String peer : memberState.getPeerMap().keySet()) { if (!peer.equals(memberState.getSelfId())) { // 為集群中除當(dāng)前節(jié)點以外的其他節(jié)點創(chuàng)建EntryDispatcher dispatcherMap.put(peer, new EntryDispatcher(peer, logger)); } } // 創(chuàng)建EntryHandler this.entryHandler = new EntryHandler(logger); // 創(chuàng)建QuorumAckChecker this.quorumAckChecker = new QuorumAckChecker(logger); } public void startup() { // 啟動EntryHandler entryHandler.start(); // 啟動QuorumAckChecker quorumAckChecker.start(); // 啟動EntryDispatcher for (EntryDispatcher dispatcher : dispatcherMap.values()) { dispatcher.start(); } }}EntryDispatcher(日志轉(zhuǎn)發(fā))
EntryDispatcher用于Leader節(jié)點向Follower轉(zhuǎn)發(fā)日志,它繼承了ShutdownAbleThread,所以會啟動線程處理日志轉(zhuǎn)發(fā),入口在doWork方法中。
在doWork方法中,首先調(diào)用checkAndFreshState校驗節(jié)點的狀態(tài),這一步主要是校驗當(dāng)前節(jié)點是否是Leader節(jié)點以及更改消息的推送類型,如果不是Leader節(jié)點結(jié)束處理,如果是Leader節(jié)點,對消息的推送類型進行判斷:
APPEND:消息追加,用于向Follower轉(zhuǎn)發(fā)消息,批量消息調(diào)用doBatchAppend,否則調(diào)用doAppend處理;COMPARE:消息對比,一般出現(xiàn)在數(shù)據(jù)不一致的情況下,此時調(diào)用doCompare對比消息;public class DLedgerEntryPusher { // 日志轉(zhuǎn)發(fā)線程 private class EntryDispatcher extends ShutdownAbleThread { @Override public void doWork() { try { // 檢查狀態(tài) if (!checkAndFreshState()) { waitForRunning(1); return; } // 如果是APPEND類型 if (type.get() == PushEntryRequest.Type.APPEND) { // 如果開啟了批量追加 if (dLedgerConfig.isEnableBatchPush()) { doBatchAppend(); } else { doAppend(); } } else { // 比較 doCompare(); } Thread.yield(); } catch (Throwable t) { DLedgerEntryPusher.logger.error("[Push-{}]Error in {} writeIndex={} compareIndex={}", peerId, getName(), writeIndex, compareIndex, t); // 出現(xiàn)異常轉(zhuǎn)為COMPARE changeState(-1, PushEntryRequest.Type.COMPARE); DLedgerUtils.sleep(500); } } }}狀態(tài)檢查(checkAndFreshState)
如果Term與memberState記錄的不一致或者LeaderId為空或者LeaderId與memberState的不一致,會調(diào)用changeState方法,將消息的推送類型更改為COMPARE,并將compareIndex置為-1:
public class DLedgerEntryPusher { private class EntryDispatcher extends ShutdownAbleThread { private long term = -1; private String leaderId = null; private boolean checkAndFreshState() { // 如果不是Leader節(jié)點 if (!memberState.isLeader()) { return false; } // 如果Term與memberState記錄的不一致或者LeaderId為空或者LeaderId與memberState的不一致 if (term != memberState.currTerm() || leaderId == null || !leaderId.equals(memberState.getLeaderId())) { synchronized (memberState) { // 加鎖 if (!memberState.isLeader()) { return false; } PreConditions.check(memberState.getSelfId().equals(memberState.getLeaderId()), DLedgerResponseCode.UNKNOWN); term = memberState.currTerm(); leaderId = memberState.getSelfId(); // 更改狀態(tài)為COMPARE changeState(-1, PushEntryRequest.Type.COMPARE); } } return true; } private synchronized void changeState(long index, PushEntryRequest.Type target) { logger.info("[Push-{}]Change state from {} to {} at {}", peerId, type.get(), target, index); switch (target) { case APPEND: compareIndex = -1; updatePeerWaterMark(term, peerId, index); quorumAckChecker.wakeup(); writeIndex = index + 1; if (dLedgerConfig.isEnableBatchPush()) { resetBatchAppendEntryRequest(); } break; case COMPARE: // 如果設(shè)置COMPARE狀態(tài)成功 if (this.type.compareAndSet(PushEntryRequest.Type.APPEND, PushEntryRequest.Type.COMPARE)) { compareIndex = -1; // compareIndex改為-1 if (dLedgerConfig.isEnableBatchPush()) { batchPendingMap.clear(); } else { pendingMap.clear(); } } break; case TRUNCATE: compareIndex = -1; break; default: break; } type.set(target); } } }Leader節(jié)點消息轉(zhuǎn)發(fā)
如果處于APPEND狀態(tài),Leader節(jié)點會向Follower節(jié)點發(fā)送Append請求,將消息轉(zhuǎn)發(fā)給Follower節(jié)點,doAppend方法的處理邏輯如下:
調(diào)用checkAndFreshState進行狀態(tài)檢查;判斷推送類型是否是APPEND,如果不是終止處理;writeIndex為待轉(zhuǎn)發(fā)消息的Index,默認值為-1,判斷是否大于LedgerEndIndex,如果大于調(diào)用doCommit向Follower節(jié)點發(fā)送COMMIT請求更新committedIndex(后面再說);這里可以看出轉(zhuǎn)發(fā)日志的時候也使用了一個計數(shù)器writeIndex來記錄待轉(zhuǎn)發(fā)消息的index,每次根據(jù)writeIndex的值從日志中取出消息進行轉(zhuǎn)發(fā),轉(zhuǎn)發(fā)成后更新writeIndex的值(自增)指向下一條數(shù)據(jù)。如果pendingMap中的大小超過了最大限制maxPendingSize的值,或者上次檢查時間超過了1000ms(有較長的時間未進行清理),進行過期數(shù)據(jù)清理(這一步主要就是為了清理數(shù)據(jù)):pendingMap是一個ConcurrentMap,KEY為消息的INDEX,value為該條消息向Follwer節(jié)點轉(zhuǎn)發(fā)的時間(doAppendInner方法中會將數(shù)據(jù)加入到pendingMap);前面知道peerWaterMark的數(shù)據(jù)記錄了每個節(jié)點的消息復(fù)制進度,這里根據(jù)Term和節(jié)點ID獲取對應(yīng)的復(fù)制進度(最新復(fù)制成功的消息的index)記在peerWaterMark變量中;遍歷pendingMap,與peerWaterMark的值對比,peerWaterMark之前的消息表示都已成功的寫入完畢,所以小于peerWaterMark說明已過期可以被清理掉,將數(shù)據(jù)從pendingMap移除達到清理空間的目的;更新檢查時間lastCheckLeakTimeMs的值為當(dāng)前時間;調(diào)用doAppendInner方法轉(zhuǎn)發(fā)消息;更新writeIndex的值,做自增操作指向下一條待轉(zhuǎn)發(fā)的消息index;public class DLedgerEntryPusher { private class EntryDispatcher extends ShutdownAbleThread { // 待轉(zhuǎn)發(fā)消息的Index,默認值為-1 private long writeIndex = -1; // KEY為消息的INDEX,value為該條消息向Follwer節(jié)點轉(zhuǎn)發(fā)的時間 private ConcurrentMap<Long, Long> pendingMap = new ConcurrentHashMap<>(); private void doAppend() throws Exception { while (true) { // 校驗狀態(tài) if (!checkAndFreshState()) { break; } // 如果不是APPEND狀態(tài),終止 if (type.get() != PushEntryRequest.Type.APPEND) { break; } // 判斷待轉(zhuǎn)發(fā)消息的Index是否大于LedgerEndIndex if (writeIndex > dLedgerStore.getLedgerEndIndex()) { doCommit(); // 向Follower節(jié)點發(fā)送COMMIT請求更新 doCheckAppendResponse(); break; } // 如果pendingMap中的大小超過了maxPendingSize,或者上次檢查時間超過了1000ms if (pendingMap.size() >= maxPendingSize || (DLedgerUtils.elapsed(lastCheckLeakTimeMs) > 1000)) { // 根據(jù)節(jié)點peerId獲取復(fù)制進度 long peerWaterMark = getPeerWaterMark(term, peerId); // 遍歷pendingMap for (Long index : pendingMap.keySet()) { // 如果index小于peerWaterMark if (index < peerWaterMark) { // 移除 pendingMap.remove(index); } } // 更新檢查時間 lastCheckLeakTimeMs = System.currentTimeMillis(); } if (pendingMap.size() >= maxPendingSize) { doCheckAppendResponse(); break; } // 同步消息 doAppendInner(writeIndex); // 更新writeIndex的值 writeIndex++; } } }}getPeerWaterMark
peerWaterMarksByTerm
peerWaterMarksByTerm中記錄了日志轉(zhuǎn)發(fā)的進度,KEY為Term,VALUE為ConcurrentMap,ConcurrentMap中的KEY為Follower節(jié)點的ID(peerId),VALUE為該節(jié)點已經(jīng)同步完畢的最新的那條消息的index:
調(diào)用getPeerWaterMark方法的時候,首先會調(diào)用checkTermForWaterMark檢查peerWaterMarksByTerm是否存在數(shù)據(jù),如果不存在, 創(chuàng)建ConcurrentMap,并遍歷集群中的節(jié)點,加入到ConcurrentMap,其中KEY為節(jié)點的ID,value為默認值-1,當(dāng)消息成功寫入Follower節(jié)點后,會調(diào)用updatePeerWaterMark更同步進度:
public class DLedgerEntryPusher { // 記錄Follower節(jié)點的同步進度,KEY為Term,VALUE為ConcurrentMap // ConcurrentMap中的KEY為Follower節(jié)點的ID(peerId),VALUE為該節(jié)點已經(jīng)同步完畢的最新的那條消息的index private Map<Long, ConcurrentMap<String, Long>> peerWaterMarksByTerm = new ConcurrentHashMap<>(); // 獲取節(jié)點的同步進度 public long getPeerWaterMark(long term, String peerId) { synchronized (peerWaterMarksByTerm) { checkTermForWaterMark(term, "getPeerWaterMark"); return peerWaterMarksByTerm.get(term).get(peerId); } } private void checkTermForWaterMark(long term, String env) { // 如果peerWaterMarksByTerm不存在 if (!peerWaterMarksByTerm.containsKey(term)) { logger.info("Initialize the watermark in {} for term={}", env, term); // 創(chuàng)建ConcurrentMap ConcurrentMap<String, Long> waterMarks = new ConcurrentHashMap<>(); // 對集群中的節(jié)點進行遍歷 for (String peer : memberState.getPeerMap().keySet()) { // 初始化,KEY為節(jié)點的PEER,VALUE為-1 waterMarks.put(peer, -1L); } // 加入到peerWaterMarksByTerm peerWaterMarksByTerm.putIfAbsent(term, waterMarks); } } // 更新水位線 private void updatePeerWaterMark(long term, String peerId, long index) { synchronized (peerWaterMarksByTerm) { // 校驗 checkTermForWaterMark(term, "updatePeerWaterMark"); // 如果之前的水位線小于當(dāng)前的index進行更新 if (peerWaterMarksByTerm.get(term).get(peerId) < index) { peerWaterMarksByTerm.get(term).put(peerId, index); } } }}轉(zhuǎn)發(fā)消息
doAppendInner的處理邏輯如下:
根據(jù)消息的index從日志獲取消息Entry;調(diào)用buildPushRequest方法構(gòu)建日志轉(zhuǎn)發(fā)請求PushEntryRequest,在請求中設(shè)置了消息entry、當(dāng)前Term、Leader節(jié)點的commitIndex(最后一條得到集群中大多數(shù)節(jié)點響應(yīng)的消息index)等信息;調(diào)用dLedgerRpcService的push方法將請求發(fā)送給Follower節(jié)點;將本條消息對應(yīng)的index加入到pendingMap中記錄消息的發(fā)送時間(key為消息的index,value為當(dāng)前時間);等待Follower節(jié)點返回響應(yīng):(1)如果響應(yīng)狀態(tài)為SUCCESS, 表示節(jié)點寫入成功:從pendingMap中移除本條消息index的信息;更新當(dāng)前節(jié)點的復(fù)制進度,也就是updatePeerWaterMark中的值;調(diào)用quorumAckChecker的wakeup,喚醒QuorumAckChecker線程;(2)如果響應(yīng)狀態(tài)為INCONSISTENT_STATE,表示Follower節(jié)點數(shù)據(jù)出現(xiàn)了不一致的情況,需要調(diào)用changeState更改狀態(tài)為COMPARE;private class EntryDispatcher extends ShutdownAbleThread { private void doAppendInner(long index) throws Exception { // 根據(jù)index從日志獲取消息Entry DLedgerEntry entry = getDLedgerEntryForAppend(index); if (null == entry) { return; } checkQuotaAndWait(entry); // 構(gòu)建日志轉(zhuǎn)發(fā)請求PushEntryRequest PushEntryRequest request = buildPushRequest(entry, PushEntryRequest.Type.APPEND); // 添加日志轉(zhuǎn)發(fā)請求,發(fā)送給Follower節(jié)點 CompletableFuture<PushEntryResponse> responseFuture = dLedgerRpcService.push(request); // 加入到pendingMap中,key為消息的index,value為當(dāng)前時間 pendingMap.put(index, System.currentTimeMillis()); responseFuture.whenComplete((x, ex) -> { try { // 處理請求響應(yīng) PreConditions.check(ex == null, DLedgerResponseCode.UNKNOWN); DLedgerResponseCode responseCode = DLedgerResponseCode.valueOf(x.getCode()); switch (responseCode) { case SUCCESS: // 如果成功 // 從pendingMap中移除 pendingMap.remove(x.getIndex()); // 更新updatePeerWaterMark updatePeerWaterMark(x.getTerm(), peerId, x.getIndex()); // 喚醒 quorumAckChecker.wakeup(); break; case INCONSISTENT_STATE: // 如果響應(yīng)狀態(tài)為INCONSISTENT_STATE logger.info("[Push-{}]Get INCONSISTENT_STATE when push index={} term={}", peerId, x.getIndex(), x.getTerm()); changeState(-1, PushEntryRequest.Type.COMPARE); // 轉(zhuǎn)為COMPARE狀態(tài) break; default: logger.warn("[Push-{}]Get error response code {} {}", peerId, responseCode, x.baseInfo()); break; } } catch (Throwable t) { logger.error("", t); } }); lastPushCommitTimeMs = System.currentTimeMillis(); } private PushEntryRequest buildPushRequest(DLedgerEntry entry, PushEntryRequest.Type target) { PushEntryRequest request = new PushEntryRequest(); // 創(chuàng)建PushEntryRequest request.setGroup(memberState.getGroup()); request.setRemoteId(peerId); request.setLeaderId(leaderId); // 設(shè)置Term request.setTerm(term); // 設(shè)置消息 request.setEntry(entry); request.setType(target); // 設(shè)置commitIndex,最后一條得到集群中大多數(shù)節(jié)點響應(yīng)的消息index request.setCommitIndex(dLedgerStore.getCommittedIndex()); return request; } }
為了便于將Leader節(jié)點的轉(zhuǎn)發(fā)和Follower節(jié)點的處理邏輯串起來,這里添加了Follower對APPEND請求的處理鏈接,F(xiàn)ollower處理APPEND請求。
Leader節(jié)點消息比較處于以下兩種情況之一時,會認為數(shù)據(jù)出現(xiàn)了不一致的情況,將狀態(tài)更改為Compare:(1)Leader節(jié)點在調(diào)用checkAndFreshState檢查的時候,發(fā)現(xiàn)當(dāng)前Term與memberState記錄的不一致或者LeaderId為空或者LeaderId與memberState記錄的LeaderId不一致;(2)Follower節(jié)點在處理消息APPEND請求在進行校驗的時候(Follower節(jié)點請求校驗鏈接),發(fā)現(xiàn)數(shù)據(jù)出現(xiàn)了不一致,會在請求的響應(yīng)中設(shè)置不一致的狀態(tài)INCONSISTENT_STATE,通知Leader節(jié)點;
在COMPARE狀態(tài)下,會調(diào)用doCompare方法向Follower節(jié)點發(fā)送比較請求,處理邏輯如下:
調(diào)用checkAndFreshState校驗狀態(tài);判斷是否是COMPARE或者TRUNCATE請求,如果不是終止處理;如果compareIndex為-1(changeState方法將狀態(tài)改為COMPARE時中會將compareIndex置為-1),獲取LedgerEndIndex作為compareIndex的值進行更新;如果compareIndex的值大于LedgerEndIndex或者小于LedgerBeginIndex,依舊使用LedgerEndIndex作為compareIndex的值,所以單獨加一個判斷條件應(yīng)該是為了打印日志,與第3步做區(qū)分;根據(jù)compareIndex獲取消息entry對象,調(diào)用buildPushRequest方法構(gòu)建COMPARE請求;向Follower節(jié)點推送建COMPARE請求進行比較,這里可以快速跳轉(zhuǎn)到Follwer節(jié)點對COMPARE請求的處理;狀態(tài)更改為COMPARE之后,compareIndex的值會被初始化為-1,在doCompare中,會將compareIndex的值更改為Leader節(jié)點的最后一條寫入的消息,也就是LedgerEndIndex的值,發(fā)給Follower節(jié)點進行對比。
向Follower節(jié)點發(fā)起請求后,等待COMPARE請求返回響應(yīng),請求中會將Follower節(jié)點最后成功寫入的消息的index設(shè)置在響應(yīng)對象的EndIndex變量中,第一條寫入的消息記錄在BeginIndex變量中:
請求響應(yīng)成功:如果compareIndex與follower返回請求中的EndIndex相等,表示沒有數(shù)據(jù)不一致的情況,將狀態(tài)更改為APPEND;其他情況,將truncateIndex的值置為compareIndex;如果請求中返回的EndIndex小于當(dāng)前節(jié)點的LedgerBeginIndex,或者BeginIndex大于LedgerEndIndex,也就是follower與leader的index不相交時, 將truncateIndex設(shè)置為Leader的BeginIndex;根據(jù)代碼中的注釋來看,這種情況通常發(fā)生在Follower節(jié)點出現(xiàn)故障了很長一段時間,在此期間Leader節(jié)點刪除了一些過期的消息;compareIndex比follower的BeginIndex小,將truncateIndex設(shè)置為Leader的BeginIndex;根據(jù)代碼中的注釋來看,這種情況請通常發(fā)生在磁盤出現(xiàn)故障的時候。其他情況,將compareIndex的值減一,從上一條消息開始繼續(xù)對比;如果truncateIndex的值不為-1,調(diào)用doTruncate方法進行處理;public class DLedgerEntryPusher { private class EntryDispatcher extends ShutdownAbleThread { private void doCompare() throws Exception { while (true) { // 校驗狀態(tài) if (!checkAndFreshState()) { break; } // 如果不是COMPARE請求也不是TRUNCATE請求 if (type.get() != PushEntryRequest.Type.COMPARE && type.get() != PushEntryRequest.Type.TRUNCATE) { break; } // 如果compareIndex為-1并且LedgerEndIndex為-1 if (compareIndex == -1 && dLedgerStore.getLedgerEndIndex() == -1) { break; } // 如果compareIndex為-1 if (compareIndex == -1) { // 獲取LedgerEndIndex作為compareIndex compareIndex = dLedgerStore.getLedgerEndIndex(); logger.info("[Push-{}][DoCompare] compareIndex=-1 means start to compare", peerId); } else if (compareIndex > dLedgerStore.getLedgerEndIndex() || compareIndex < dLedgerStore.getLedgerBeginIndex()) { logger.info("[Push-{}][DoCompare] compareIndex={} out of range {}-{}", peerId, compareIndex, dLedgerStore.getLedgerBeginIndex(), dLedgerStore.getLedgerEndIndex()); // 依舊獲取LedgerEndIndex作為compareIndex,這里應(yīng)該是為了打印日志所以單獨又加了一個if條件 compareIndex = dLedgerStore.getLedgerEndIndex(); } // 根據(jù)compareIndex獲取消息 DLedgerEntry entry = dLedgerStore.get(compareIndex); PreConditions.check(entry != null, DLedgerResponseCode.INTERNAL_ERROR, "compareIndex=%d", compareIndex); // 構(gòu)建COMPARE請求 PushEntryRequest request = buildPushRequest(entry, PushEntryRequest.Type.COMPARE); // 發(fā)送COMPARE請求 CompletableFuture<PushEntryResponse> responseFuture = dLedgerRpcService.push(request); // 獲取響應(yīng)結(jié)果 PushEntryResponse response = responseFuture.get(3, TimeUnit.SECONDS); PreConditions.check(response != null, DLedgerResponseCode.INTERNAL_ERROR, "compareIndex=%d", compareIndex); PreConditions.check(response.getCode() == DLedgerResponseCode.INCONSISTENT_STATE.getCode() || response.getCode() == DLedgerResponseCode.SUCCESS.getCode() , DLedgerResponseCode.valueOf(response.getCode()), "compareIndex=%d", compareIndex); long truncateIndex = -1; // 如果返回成功 if (response.getCode() == DLedgerResponseCode.SUCCESS.getCode()) { // 如果compareIndex與 follower的EndIndex相等 if (compareIndex == response.getEndIndex()) { // 改為APPEND狀態(tài) changeState(compareIndex, PushEntryRequest.Type.APPEND); break; } else { // 將truncateIndex設(shè)置為compareIndex truncateIndex = compareIndex; } } else if (response.getEndIndex() < dLedgerStore.getLedgerBeginIndex() || response.getBeginIndex() > dLedgerStore.getLedgerEndIndex()) { /* The follower's entries does not intersect with the leader. This usually happened when the follower has crashed for a long time while the leader has deleted the expired entries. Just truncate the follower. */ // 如果請求中返回的EndIndex小于當(dāng)前節(jié)點的LedgerBeginIndex,或者BeginIndex大于LedgerEndIndex // 當(dāng)follower與leader的index不相交時,這種情況通常Follower節(jié)點出現(xiàn)故障了很長一段時間,在此期間Leader節(jié)點刪除了一些過期的消息 // 將truncateIndex設(shè)置為Leader的BeginIndex truncateIndex = dLedgerStore.getLedgerBeginIndex(); } else if (compareIndex < response.getBeginIndex()) { /* The compared index is smaller than the follower's begin index. This happened rarely, usually means some disk damage. Just truncate the follower. */ // compareIndex比follower的BeginIndex小,通常發(fā)生在磁盤出現(xiàn)故障的時候 // 將truncateIndex設(shè)置為Leader的BeginIndex truncateIndex = dLedgerStore.getLedgerBeginIndex(); } else if (compareIndex > response.getEndIndex()) { /* The compared index is bigger than the follower's end index. This happened frequently. For the compared index is usually starting from the end index of the leader. */ // compareIndex比follower的EndIndex大 // compareIndexx設(shè)置為Follower的EndIndex compareIndex = response.getEndIndex(); } else { /* Compare failed and the compared index is in the range of follower's entries. */ // 比較失敗 compareIndex--; } // 如果compareIndex比當(dāng)前節(jié)點的LedgerBeginIndex小 if (compareIndex < dLedgerStore.getLedgerBeginIndex()) { truncateIndex = dLedgerStore.getLedgerBeginIndex(); } // 如果truncateIndex的值不為-1,調(diào)用doTruncate開始刪除 if (truncateIndex != -1) { changeState(truncateIndex, PushEntryRequest.Type.TRUNCATE); doTruncate(truncateIndex); break; } } } }}
在doTruncate方法中,會構(gòu)建TRUNCATE請求設(shè)置truncateIndex(要刪除的消息的index),發(fā)送給Follower節(jié)點,通知Follower節(jié)點將數(shù)據(jù)不一致的那條消息刪除,如果響應(yīng)成功,可以看到接下來調(diào)用了changeState將狀態(tài)改為APPEND,在changeState中,調(diào)用了updatePeerWaterMark更新節(jié)點的復(fù)制進度為出現(xiàn)數(shù)據(jù)不一致的那條消息的index,同時也更新了writeIndex,下次從writeIndex處重新給Follower節(jié)點發(fā)送APPEND請求進行消息寫入:
private class EntryDispatcher extends ShutdownAbleThread { private void doTruncate(long truncateIndex) throws Exception { PreConditions.check(type.get() == PushEntryRequest.Type.TRUNCATE, DLedgerResponseCode.UNKNOWN); DLedgerEntry truncateEntry = dLedgerStore.get(truncateIndex); PreConditions.check(truncateEntry != null, DLedgerResponseCode.UNKNOWN); logger.info("[Push-{}]Will push data to truncate truncateIndex={} pos={}", peerId, truncateIndex, truncateEntry.getPos()); // 構(gòu)建TRUNCATE請求 PushEntryRequest truncateRequest = buildPushRequest(truncateEntry, PushEntryRequest.Type.TRUNCATE); // 向Folower節(jié)點發(fā)送TRUNCATE請求 PushEntryResponse truncateResponse = dLedgerRpcService.push(truncateRequest).get(3, TimeUnit.SECONDS); PreConditions.check(truncateResponse != null, DLedgerResponseCode.UNKNOWN, "truncateIndex=%d", truncateIndex); PreConditions.check(truncateResponse.getCode() == DLedgerResponseCode.SUCCESS.getCode(), DLedgerResponseCode.valueOf(truncateResponse.getCode()), "truncateIndex=%d", truncateIndex); lastPushCommitTimeMs = System.currentTimeMillis(); // 更改回APPEND狀態(tài) changeState(truncateIndex, PushEntryRequest.Type.APPEND); } private synchronized void changeState(long index, PushEntryRequest.Type target) { logger.info("[Push-{}]Change state from {} to {} at {}", peerId, type.get(), target, index); switch (target) { case APPEND: compareIndex = -1; // 更新節(jié)點的復(fù)制進度,改為出現(xiàn)數(shù)據(jù)不一致的那條消息的index updatePeerWaterMark(term, peerId, index); // 喚醒quorumAckChecker quorumAckChecker.wakeup(); // 更新writeIndex writeIndex = index + 1; if (dLedgerConfig.isEnableBatchPush()) { resetBatchAppendEntryRequest(); } break; // ... } type.set(target); } }EntryHandler
EntryHandler用于Follower節(jié)點處理Leader發(fā)送的消息請求,對請求的處理在handlePush方法中,根據(jù)請求類型的不同做如下處理:
如果是APPEND請求,將請求加入到writeRequestMap中;如果是COMMIT請求,將請求加入到compareOrTruncateRequests;如果是COMPARE或者TRUNCATE,將請求加入到compareOrTruncateRequests;handlePush方法中,并沒有直接處理請求,而是將不同類型的請求加入到不同的請求集合中,請求的處理是另外一個線程在doWork方法中處理的。
public class DLedgerEntryPusher { private class EntryHandler extends ShutdownAbleThread { ConcurrentMap<Long, Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>>> writeRequestMap = new ConcurrentHashMap<>(); BlockingQueue<Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>>> compareOrTruncateRequests = new ArrayBlockingQueue<Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>>>(100); public CompletableFuture<PushEntryResponse> handlePush(PushEntryRequest request) throws Exception { CompletableFuture<PushEntryResponse> future = new TimeoutFuture<>(1000); switch (request.getType()) { case APPEND: // 如果是Append if (request.isBatch()) { PreConditions.check(request.getBatchEntry() != null && request.getCount() > 0, DLedgerResponseCode.UNEXPECTED_ARGUMENT); } else { PreConditions.check(request.getEntry() != null, DLedgerResponseCode.UNEXPECTED_ARGUMENT); } long index = request.getFirstEntryIndex(); // 將請求加入到writeRequestMap Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> old = writeRequestMap.putIfAbsent(index, new Pair<>(request, future)); if (old != null) { logger.warn("[MONITOR]The index {} has already existed with {} and curr is {}", index, old.getKey().baseInfo(), request.baseInfo()); future.complete(buildResponse(request, DLedgerResponseCode.REPEATED_PUSH.getCode())); } break; case COMMIT: // 如果是提交 // 加入到compareOrTruncateRequests compareOrTruncateRequests.put(new Pair<>(request, future)); break; case COMPARE: case TRUNCATE: PreConditions.check(request.getEntry() != null, DLedgerResponseCode.UNEXPECTED_ARGUMENT); writeRequestMap.clear(); // 加入到compareOrTruncateRequests compareOrTruncateRequests.put(new Pair<>(request, future)); break; default: logger.error("[BUG]Unknown type {} from {}", request.getType(), request.baseInfo()); future.complete(buildResponse(request, DLedgerResponseCode.UNEXPECTED_ARGUMENT.getCode())); break; } wakeup(); return future; } }}
EntryHandler同樣繼承了ShutdownAbleThread,所以會啟動線程執(zhí)行doWork方法,在doWork方法中對請求進行了處理:
如果compareOrTruncateRequests不為空,對請求類型進行判斷:TRUNCATE:調(diào)用handleDoTruncate處理;COMPARE:調(diào)用handleDoCompare處理;COMMIT:調(diào)用handleDoCommit處理;如果不是第1種情況,會認為是APPEND請求:(1)LedgerEndIndex記錄了最后一條成功寫入消息的index,對其 + 1表示下一條待寫入消息的index;(2)根據(jù)待寫入消息的index從writeRequestMap獲取數(shù)據(jù),如果獲取為空,調(diào)用checkAbnormalFuture進行檢查;(3)獲取不為空,調(diào)用handleDoAppend方法處理消息寫入;這里可以看出,Follower是從當(dāng)前記錄的最后一條成功寫入的index(LedgerEndIndex),進行加1來處理下一條需要寫入的消息的。public class DLedgerEntryPusher { private class EntryHandler extends ShutdownAbleThread { @Override public void doWork() { try { // 判斷是否是Follower if (!memberState.isFollower()) { waitForRunning(1); return; } // 如果compareOrTruncateRequests不為空 if (compareOrTruncateRequests.peek() != null) { Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> pair = compareOrTruncateRequests.poll(); PreConditions.check(pair != null, DLedgerResponseCode.UNKNOWN); switch (pair.getKey().getType()) { case TRUNCATE: // TRUNCATE handleDoTruncate(pair.getKey().getEntry().getIndex(), pair.getKey(), pair.getValue()); break; case COMPARE: // COMPARE handleDoCompare(pair.getKey().getEntry().getIndex(), pair.getKey(), pair.getValue()); break; case COMMIT: // COMMIT handleDoCommit(pair.getKey().getCommitIndex(), pair.getKey(), pair.getValue()); break; default: break; } } else { // 設(shè)置消息Index,為最后一條成功寫入的消息index + 1 long nextIndex = dLedgerStore.getLedgerEndIndex() + 1; // 從writeRequestMap取出請求 Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> pair = writeRequestMap.remove(nextIndex); // 如果獲取的請求為空,調(diào)用checkAbnormalFuture進行檢查 if (pair == null) { checkAbnormalFuture(dLedgerStore.getLedgerEndIndex()); waitForRunning(1); return; } PushEntryRequest request = pair.getKey(); if (request.isBatch()) { handleDoBatchAppend(nextIndex, request, pair.getValue()); } else { // 處理 handleDoAppend(nextIndex, request, pair.getValue()); } } } catch (Throwable t) { DLedgerEntryPusher.logger.error("Error in {}", getName(), t); DLedgerUtils.sleep(100); } } }Follower數(shù)據(jù)不一致檢查checkAbnormalFuture
方法用于檢查數(shù)據(jù)的一致性,處理邏輯如下: 1. 如果距離上次檢查的時間未超過1000ms,直接返回; 2. 更新檢查時間lastCheckFastForwardTimeMs的值; 3. 如果writeRequestMap為空表示目前沒有寫入請求,暫不需要處理; 4. 調(diào)用`checkAppendFuture`方法進行檢查;
public class DLedgerEntryPusher { private class EntryHandler extends ShutdownAbleThread { /** * The leader does push entries to follower, and record the pushed index. But in the following conditions, the push may get stopped. * * If the follower is abnormally shutdown, its ledger end index may be smaller than before. At this time, the leader may push fast-forward entries, and retry all the time. * * If the last ack is missed, and no new message is coming in.The leader may retry push the last message, but the follower will ignore it. * @param endIndex */ private void checkAbnormalFuture(long endIndex) { // 如果距離上次檢查的時間未超過1000ms if (DLedgerUtils.elapsed(lastCheckFastForwardTimeMs) < 1000) { return; } // 更新檢查時間 lastCheckFastForwardTimeMs = System.currentTimeMillis(); // 如果writeRequestMap表示沒有寫入請求,暫不需要處理 if (writeRequestMap.isEmpty()) { return; } // 檢查 checkAppendFuture(endIndex); } }}
checkAppendFuture方法中的入?yún)ndIndex,表示當(dāng)前待寫入消息的index,也就是當(dāng)前節(jié)點記錄的最后一條成功寫入的index(LedgerEndIndex)值加1,方法的處理邏輯如下:
將minFastForwardIndex初始化為最大值,minFastForwardIndex用于找到最小的那個出現(xiàn)數(shù)據(jù)不一致的消息index;遍歷writeRequestMap,處理每一個正在進行中的寫入請求:(1)由于消息可能是批量的,所以獲取當(dāng)前請求中的第一條消息index,記為firstEntryIndex;(2)獲取當(dāng)前請求中的最后一條消息index,記為lastEntryIndex;(3)如果lastEntryIndex如果小于等于endIndex的值,進行如下處理:對比請求中的消息與當(dāng)前節(jié)點存儲的消息是否一致,如果是批量消息,遍歷請求中的每一個消息,并根據(jù)消息的index從當(dāng)前節(jié)的日志中獲取消息進行對比,由于endIndex之前的消息都已成功寫入,對應(yīng)的寫入請求還在writeRequestMap中表示可能由于某些原因未能從writeRequestMap中移除,所以如果數(shù)據(jù)對比一致的情況下可以將對應(yīng)的請求響應(yīng)設(shè)置為完成,并從writeRequestMap中移除;如果對比不一致,進入到異常處理,構(gòu)建響應(yīng)請求,狀態(tài)設(shè)置為INCONSISTENT_STATE,通知Leader節(jié)點出現(xiàn)了數(shù)據(jù)不一致的情況;(4)如果第一條消息firstEntryIndex與endIndex + 1相等(這里不太明白為什么不是與endIndex 相等而是需要加1),表示該請求是endIndex之后的消息請求,結(jié)束本次檢查;(5)判斷當(dāng)前請求的處理時間是否超時,如果未超時,繼續(xù)處理下一個請求,如果超時進入到下一步;(6)走到這里,如果firstEntryIndex比minFastForwardIndex小,說明出現(xiàn)了數(shù)據(jù)不一致的情況,此時更新minFastForwardIndex,記錄最小的那個數(shù)據(jù)不一致消息的index;如果minFastForwardIndex依舊是MAX_VALUE,表示沒有數(shù)據(jù)不一致的消息,直接返回;根據(jù)minFastForwardIndex從writeRequestMap獲取請求,如果獲取為空,直接返回,否則調(diào)用buildBatchAppendResponse方法構(gòu)建請求響應(yīng),表示數(shù)據(jù)出現(xiàn)了不一致,在響應(yīng)中通知Leader節(jié)點;private class EntryHandler extends ShutdownAbleThread { private void checkAppendFuture(long endIndex) { // 初始化為最大值 long minFastForwardIndex = Long.MAX_VALUE; // 遍歷writeRequestMap的value for (Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> pair : writeRequestMap.values()) { // 獲取每個請求里面的第一條消息index long firstEntryIndex = pair.getKey().getFirstEntryIndex(); // 獲取每個請求里面的最后一條消息index long lastEntryIndex = pair.getKey().getLastEntryIndex(); // 如果小于等于endIndex if (lastEntryIndex <= endIndex) { try { if (pair.getKey().isBatch()) { // 批量請求 // 遍歷所有的消息 for (DLedgerEntry dLedgerEntry : pair.getKey().getBatchEntry()) { // 校驗與當(dāng)前節(jié)點存儲的消息是否一致 PreConditions.check(dLedgerEntry.equals(dLedgerStore.get(dLedgerEntry.getIndex())), DLedgerResponseCode.INCONSISTENT_STATE); } } else { DLedgerEntry dLedgerEntry = pair.getKey().getEntry(); // 校驗請求中的消息與當(dāng)前節(jié)點存儲的消息是否一致 PreConditions.check(dLedgerEntry.equals(dLedgerStore.get(dLedgerEntry.getIndex())), DLedgerResponseCode.INCONSISTENT_STATE); } // 設(shè)置完成 pair.getValue().complete(buildBatchAppendResponse(pair.getKey(), DLedgerResponseCode.SUCCESS.getCode())); logger.warn("[PushFallBehind]The leader pushed an batch append entry last index={} smaller than current ledgerEndIndex={}, maybe the last ack is missed", lastEntryIndex, endIndex); } catch (Throwable t) { logger.error("[PushFallBehind]The leader pushed an batch append entry last index={} smaller than current ledgerEndIndex={}, maybe the last ack is missed", lastEntryIndex, endIndex, t); // 如果出現(xiàn)了異常,向Leader節(jié)點發(fā)送數(shù)據(jù)不一致的請求 pair.getValue().complete(buildBatchAppendResponse(pair.getKey(), DLedgerResponseCode.INCONSISTENT_STATE.getCode())); } // 處理之后從writeRequestMap移除 writeRequestMap.remove(pair.getKey().getFirstEntryIndex()); continue; } // 如果firstEntryIndex與endIndex + 1相等,表示該請求是endIndex之后的消息請求,結(jié)束本次檢查 if (firstEntryIndex == endIndex + 1) { return; } // 判斷響應(yīng)是否超時,如果未超時,繼續(xù)處理下一個 TimeoutFuture<PushEntryResponse> future = (TimeoutFuture<PushEntryResponse>) pair.getValue(); if (!future.isTimeOut()) { continue; } // 如果firstEntryIndex比minFastForwardIndex小 if (firstEntryIndex < minFastForwardIndex) { // 更新minFastForwardIndex minFastForwardIndex = firstEntryIndex; } } // 如果minFastForwardIndex依舊是MAX_VALUE,表示沒有數(shù)據(jù)不一致的消息,直接返回 if (minFastForwardIndex == Long.MAX_VALUE) { return; } // 根據(jù)minFastForwardIndex獲取請求 Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> pair = writeRequestMap.get(minFastForwardIndex); if (pair == null) { // 如果未獲取到直接返回 return; } logger.warn("[PushFastForward] ledgerEndIndex={} entryIndex={}", endIndex, minFastForwardIndex); // 向Leader返回響應(yīng),響應(yīng)狀態(tài)為INCONSISTENT_STATE pair.getValue().complete(buildBatchAppendResponse(pair.getKey(), DLedgerResponseCode.INCONSISTENT_STATE.getCode())); } private PushEntryResponse buildBatchAppendResponse(PushEntryRequest request, int code) { PushEntryResponse response = new PushEntryResponse(); response.setGroup(request.getGroup()); response.setCode(code); response.setTerm(request.getTerm()); response.setIndex(request.getLastEntryIndex()); // 設(shè)置當(dāng)前節(jié)點的LedgerBeginIndex response.setBeginIndex(dLedgerStore.getLedgerBeginIndex()); // 設(shè)置LedgerEndIndex response.setEndIndex(dLedgerStore.getLedgerEndIndex()); return response; } }Follower節(jié)點消息寫入handleDoAppend
handleDoAppend方法用于處理Append請求,將Leader轉(zhuǎn)發(fā)的消息寫入到日志文件: 1. 從請求中獲取消息Entry,**調(diào)用appendAsFollower方法將消息寫入文件**; 2. **調(diào)用updateCommittedIndex方法將Leader請求中攜帶的commitIndex更新到Follower本地**,后面在講`QuorumAckChecker`時候會提到;
public class DLedgerEntryPusher { private class EntryHandler extends ShutdownAbleThread { private void handleDoAppend(long writeIndex, PushEntryRequest request, CompletableFuture<PushEntryResponse> future) { try { PreConditions.check(writeIndex == request.getEntry().getIndex(), DLedgerResponseCode.INCONSISTENT_STATE); // 將消息寫入日志 DLedgerEntry entry = dLedgerStore.appendAsFollower(request.getEntry(), request.getTerm(), request.getLeaderId()); PreConditions.check(entry.getIndex() == writeIndex, DLedgerResponseCode.INCONSISTENT_STATE); future.complete(buildResponse(request, DLedgerResponseCode.SUCCESS.getCode())); // 更新CommitIndex dLedgerStore.updateCommittedIndex(request.getTerm(), request.getCommitIndex()); } catch (Throwable t) { logger.error("[HandleDoWrite] writeIndex={}", writeIndex, t); future.complete(buildResponse(request, DLedgerResponseCode.INCONSISTENT_STATE.getCode())); } } }}寫入文件
同樣以DLedgerMmapFileStore為例,看下appendAsFollower方法的處理過程,前面已經(jīng)講過appendAsLeader的處理邏輯,他們的處理過程相似,基本就是將entry內(nèi)容寫入buffer,然后再將buffer寫入數(shù)據(jù)文件和索引文件,這里不再贅述:
public class DLedgerMmapFileStore extends DLedgerStore { @Override public DLedgerEntry appendAsFollower(DLedgerEntry entry, long leaderTerm, String leaderId) { PreConditions.check(memberState.isFollower(), DLedgerResponseCode.NOT_FOLLOWER, "role=%s", memberState.getRole()); PreConditions.check(!isDiskFull, DLedgerResponseCode.DISK_FULL); // 獲取數(shù)據(jù)Buffer ByteBuffer dataBuffer = localEntryBuffer.get(); // 獲取索引Buffer ByteBuffer indexBuffer = localIndexBuffer.get(); // encode DLedgerEntryCoder.encode(entry, dataBuffer); int entrySize = dataBuffer.remaining(); synchronized (memberState) { PreConditions.check(memberState.isFollower(), DLedgerResponseCode.NOT_FOLLOWER, "role=%s", memberState.getRole()); long nextIndex = ledgerEndIndex + 1; PreConditions.check(nextIndex == entry.getIndex(), DLedgerResponseCode.INCONSISTENT_INDEX, null); PreConditions.check(leaderTerm == memberState.currTerm(), DLedgerResponseCode.INCONSISTENT_TERM, null); PreConditions.check(leaderId.equals(memberState.getLeaderId()), DLedgerResponseCode.INCONSISTENT_LEADER, null); // 寫入數(shù)據(jù)文件 long dataPos = dataFileList.append(dataBuffer.array(), 0, dataBuffer.remaining()); PreConditions.check(dataPos == entry.getPos(), DLedgerResponseCode.DISK_ERROR, "%d != %d", dataPos, entry.getPos()); DLedgerEntryCoder.encodeIndex(dataPos, entrySize, entry.getMagic(), entry.getIndex(), entry.getTerm(), indexBuffer); // 寫入索引文件 long indexPos = indexFileList.append(indexBuffer.array(), 0, indexBuffer.remaining(), false); PreConditions.check(indexPos == entry.getIndex() * INDEX_UNIT_SIZE, DLedgerResponseCode.DISK_ERROR, null); ledgerEndTerm = entry.getTerm(); ledgerEndIndex = entry.getIndex(); if (ledgerBeginIndex == -1) { ledgerBeginIndex = ledgerEndIndex; } updateLedgerEndIndexAndTerm(); return entry; } }}ComparehandleDoCompare
用于處理COMPARE請求,compareIndex為需要比較的index,處理邏輯如下:
進行校驗,主要判斷compareIndex與請求中的Index是否一致,以及請求類型是否是COMPARE;根據(jù)compareIndex獲取消息Entry;構(gòu)建響應(yīng)內(nèi)容,在響應(yīng)中設(shè)置當(dāng)前節(jié)點以及同步的消息的BeginIndex和EndIndex;public class DLedgerEntryPusher { private class EntryHandler extends ShutdownAbleThread { private CompletableFuture<PushEntryResponse> handleDoCompare(long compareIndex, PushEntryRequest request, CompletableFuture<PushEntryResponse> future) { try { // 校驗compareIndex與請求中的Index是否一致 PreConditions.check(compareIndex == request.getEntry().getIndex(), DLedgerResponseCode.UNKNOWN); // 校驗請求類型是否是COMPARE PreConditions.check(request.getType() == PushEntryRequest.Type.COMPARE, DLedgerResponseCode.UNKNOWN); // 獲取Entry DLedgerEntry local = dLedgerStore.get(compareIndex); // 校驗請求中的Entry與本地的是否一致 PreConditions.check(request.getEntry().equals(local), DLedgerResponseCode.INCONSISTENT_STATE); // 構(gòu)建請求響應(yīng),這里返回成功,說明數(shù)據(jù)沒有出現(xiàn)不一致 future.complete(buildResponse(request, DLedgerResponseCode.SUCCESS.getCode())); } catch (Throwable t) { logger.error("[HandleDoCompare] compareIndex={}", compareIndex, t); future.complete(buildResponse(request, DLedgerResponseCode.INCONSISTENT_STATE.getCode())); } return future; } private PushEntryResponse buildResponse(PushEntryRequest request, int code) { // 構(gòu)建請求響應(yīng) PushEntryResponse response = new PushEntryResponse(); response.setGroup(request.getGroup()); // 設(shè)置響應(yīng)狀態(tài) response.setCode(code); // 設(shè)置Term response.setTerm(request.getTerm()); // 如果不是COMMIT if (request.getType() != PushEntryRequest.Type.COMMIT) { // 設(shè)置Index response.setIndex(request.getEntry().getIndex()); } // 設(shè)置BeginIndex response.setBeginIndex(dLedgerStore.getLedgerBeginIndex()); // 設(shè)置EndIndex response.setEndIndex(dLedgerStore.getLedgerEndIndex()); return response; } }}Truncate
Follower節(jié)點對Truncate的請求處理在handleDoTruncate方法中,主要是根據(jù)Leader節(jié)點發(fā)送的truncateIndex,進行數(shù)據(jù)刪除,將truncateIndex之后的消息從日志中刪除:
private class EntryDispatcher extends ShutdownAbleThread { // truncateIndex為待刪除的消息的index private CompletableFuture<PushEntryResponse> handleDoTruncate(long truncateIndex, PushEntryRequest request, CompletableFuture<PushEntryResponse> future) { try { logger.info("[HandleDoTruncate] truncateIndex={} pos={}", truncateIndex, request.getEntry().getPos()); PreConditions.check(truncateIndex == request.getEntry().getIndex(), DLedgerResponseCode.UNKNOWN); PreConditions.check(request.getType() == PushEntryRequest.Type.TRUNCATE, DLedgerResponseCode.UNKNOWN); // 進行刪除 long index = dLedgerStore.truncate(request.getEntry(), request.getTerm(), request.getLeaderId()); PreConditions.check(index == truncateIndex, DLedgerResponseCode.INCONSISTENT_STATE); future.complete(buildResponse(request, DLedgerResponseCode.SUCCESS.getCode())); // 更新committedIndex dLedgerStore.updateCommittedIndex(request.getTerm(), request.getCommitIndex()); } catch (Throwable t) { logger.error("[HandleDoTruncate] truncateIndex={}", truncateIndex, t); future.complete(buildResponse(request, DLedgerResponseCode.INCONSISTENT_STATE.getCode())); } return future; } }Commit
前面講到Leader節(jié)點會向Follower節(jié)點發(fā)送COMMIT請求,COMMIT請求主要是更新Follower節(jié)點本地的committedIndex的值,記錄集群中最新的那條獲取大多數(shù)響應(yīng)的消息的index,在后面QuorumAckChecker中還會看到:
private class EntryHandler extends ShutdownAbleThread { private CompletableFuture<PushEntryResponse> handleDoCommit(long committedIndex, PushEntryRequest request, CompletableFuture<PushEntryResponse> future) { try { PreConditions.check(committedIndex == request.getCommitIndex(), DLedgerResponseCode.UNKNOWN); PreConditions.check(request.getType() == PushEntryRequest.Type.COMMIT, DLedgerResponseCode.UNKNOWN); // 更新committedIndex dLedgerStore.updateCommittedIndex(request.getTerm(), committedIndex); future.complete(buildResponse(request, DLedgerResponseCode.SUCCESS.getCode())); } catch (Throwable t) { logger.error("[HandleDoCommit] committedIndex={}", request.getCommitIndex(), t); future.complete(buildResponse(request, DLedgerResponseCode.UNKNOWN.getCode())); } return future; } }QuorumAckChecker
QuorumAckChecker用于Leader節(jié)點等待Follower節(jié)點復(fù)制完畢,處理邏輯如下:
如果pendingAppendResponsesByTerm的個數(shù)大于1,對其進行遍歷,如果KEY的值與當(dāng)前Term不一致,說明數(shù)據(jù)已過期,將過期數(shù)據(jù)置為完成狀態(tài)并從pendingAppendResponsesByTerm中移除;如果peerWaterMarksByTerm個數(shù)大于1,對其進行遍歷,同樣找出與當(dāng)前TERM不一致的數(shù)據(jù),進行清理;獲取當(dāng)前Term的peerWaterMarks,peerWaterMarks記錄了每個Follower節(jié)點的日志復(fù)制進度,對所有的復(fù)制進度進行排序,取出處于中間位置的那個進度值,也就是消息的index值,這里不太好理解,舉個例子,假如一個Leader有5個Follower節(jié)點,當(dāng)前Term為1:{ "1" : { // TERM的值,對應(yīng)peerWaterMarks中的Key "節(jié)點1" : "1", // 節(jié)點1復(fù)制到第1條消息 "節(jié)點2" : "1", // 節(jié)點2復(fù)制到第1條消息 "節(jié)點3" : "2", // 節(jié)點3復(fù)制到第2條消息 "節(jié)點4" : "3", // 節(jié)點4復(fù)制到第3條消息 "節(jié)點5" : "3" // 節(jié)點5復(fù)制到第3條消息 } }對所有Follower節(jié)點的復(fù)制進度倒序排序之后的list如下:[3, 3, 2, 1, 1]取5 / 2 的整數(shù)部分為2,也就是下標為2處的值,對應(yīng)節(jié)點3的復(fù)制進度(消息index為2),記錄在quorumIndex變量中,節(jié)點4和5對應(yīng)的消息進度大于消息2的,所以對于消息2,集群已經(jīng)有三個節(jié)點復(fù)制成功,滿足了集群中大多數(shù)節(jié)點復(fù)制成功的條件。如果要判斷某條消息是否集群中大多數(shù)節(jié)點已經(jīng)成功寫入,一種常規(guī)的處理方法,對每個節(jié)點的復(fù)制進度進行判斷,記錄已經(jīng)復(fù)制成功的節(jié)點個數(shù),這樣需要每次遍歷整個節(jié)點,效率比較低,所以這里RocketMQ使用了一種更高效的方式來判斷某個消息是否獲得了集群中大多數(shù)節(jié)點的響應(yīng)。quorumIndex之前的消息都以成功復(fù)制,此時就可以更新提交點,調(diào)用updateCommittedIndex方法更新CommitterIndex的值;處理處于quorumIndex和lastQuorumIndex(上次quorumIndex的值)之間的數(shù)據(jù),比如上次lastQuorumIndex的值為1,本次quorumIndex為2,由于quorumIndex之前的消息已經(jīng)獲得了集群中大多數(shù)節(jié)點的響應(yīng),所以處于quorumIndex和lastQuorumIndex的數(shù)據(jù)需要清理,從pendingAppendResponsesByTerm中移除,并記錄數(shù)量ackNum;如果ackNum為0,表示quorumIndex與lastQuorumIndex相等,從quorumIndex + 1處開始,判斷消息的寫入請求是否已經(jīng)超時,如果超時設(shè)置WAIT_QUORUM_ACK_TIMEOUT并返回響應(yīng);這一步主要是為了處理超時的請求;如果上次校驗時間超過1000ms或者needCheck為true,更新節(jié)點的復(fù)制進度,遍歷當(dāng)前term所有的請求響應(yīng),如果小于quorumIndex,將其設(shè)置成完成狀態(tài)并移除響應(yīng),表示已完成,這一步主要是處理已經(jīng)寫入成功的消息對應(yīng)的響應(yīng)對象AppendEntryResponse,是否由于某些原因未移除,如果是需要進行清理;更新lastQuorumIndex的值;private class QuorumAckChecker extends ShutdownAbleThread { @Override public void doWork() { try { if (DLedgerUtils.elapsed(lastPrintWatermarkTimeMs) > 3000) { logger.info("[{}][{}] term={} ledgerBegin={} ledgerEnd={} committed={} watermarks={}", memberState.getSelfId(), memberState.getRole(), memberState.currTerm(), dLedgerStore.getLedgerBeginIndex(), dLedgerStore.getLedgerEndIndex(), dLedgerStore.getCommittedIndex(), JSON.toJSONString(peerWaterMarksByTerm)); lastPrintWatermarkTimeMs = System.currentTimeMillis(); } // 如果不是Leader if (!memberState.isLeader()) { waitForRunning(1); return; } // 獲取當(dāng)前的Term long currTerm = memberState.currTerm(); checkTermForPendingMap(currTerm, "QuorumAckChecker"); checkTermForWaterMark(currTerm, "QuorumAckChecker"); // 如果pendingAppendResponsesByTerm的個數(shù)大于1 if (pendingAppendResponsesByTerm.size() > 1) { // 遍歷,處理與當(dāng)前TERM不一致的數(shù)據(jù) for (Long term : pendingAppendResponsesByTerm.keySet()) { // 如果與當(dāng)前Term一致 if (term == currTerm) { continue; } // 對VALUE進行遍歷 for (Map.Entry<Long, TimeoutFuture<AppendEntryResponse>> futureEntry : pendingAppendResponsesByTerm.get(term).entrySet()) { // 創(chuàng)建AppendEntryResponse AppendEntryResponse response = new AppendEntryResponse(); response.setGroup(memberState.getGroup()); response.setIndex(futureEntry.getKey()); response.setCode(DLedgerResponseCode.TERM_CHANGED.getCode()); response.setLeaderId(memberState.getLeaderId()); logger.info("[TermChange] Will clear the pending response index={} for term changed from {} to {}", futureEntry.getKey(), term, currTerm); // 設(shè)置完成 futureEntry.getValue().complete(response); } // 移除 pendingAppendResponsesByTerm.remove(term); } } // 處理與當(dāng)前TERM不一致的數(shù)據(jù) if (peerWaterMarksByTerm.size() > 1) { for (Long term : peerWaterMarksByTerm.keySet()) { if (term == currTerm) { continue; } logger.info("[TermChange] Will clear the watermarks for term changed from {} to {}", term, currTerm); peerWaterMarksByTerm.remove(term); } } // 獲取當(dāng)前Term的peerWaterMarks,也就是每個Follower節(jié)點的復(fù)制進度 Map<String, Long> peerWaterMarks = peerWaterMarksByTerm.get(currTerm); // 對value進行排序 List<Long> sortedWaterMarks = peerWaterMarks.values() .stream() .sorted(Comparator.reverseOrder()) .collect(Collectors.toList()); // 取中位數(shù) long quorumIndex = sortedWaterMarks.get(sortedWaterMarks.size() / 2); // 中位數(shù)之前的消息都已同步成功,此時更新CommittedIndex dLedgerStore.updateCommittedIndex(currTerm, quorumIndex); // 獲取當(dāng)前Term的日志轉(zhuǎn)發(fā)請求響應(yīng) ConcurrentMap<Long, TimeoutFuture<AppendEntryResponse>> responses = pendingAppendResponsesByTerm.get(currTerm); boolean needCheck = false; int ackNum = 0; // 從quorumIndex開始,向前遍歷,處理處于quorumIndex和lastQuorumIndex(上次quorumIndex的值)之間的數(shù)據(jù) for (Long i = quorumIndex; i > lastQuorumIndex; i--) { try { // 從responses中移除 CompletableFuture<AppendEntryResponse> future = responses.remove(i); if (future == null) { // 如果響應(yīng)為空,needCheck置為true needCheck = true; break; } else if (!future.isDone()) { // 如果未完成 AppendEntryResponse response = new AppendEntryResponse(); response.setGroup(memberState.getGroup()); response.setTerm(currTerm); response.setIndex(i); response.setLeaderId(memberState.getSelfId()); response.setPos(((AppendFuture) future).getPos()); future.complete(response); } // 記錄ACK節(jié)點的數(shù)量 ackNum++; } catch (Throwable t) { logger.error("Error in ack to index={} term={}", i, currTerm, t); } } // 如果ackNum為0,表示quorumIndex與lastQuorumIndex相等 // 這一步主要是為了處理超時的請求 if (ackNum == 0) { // 從quorumIndex + 1處開始處理 for (long i = quorumIndex + 1; i < Integer.MAX_VALUE; i++) { TimeoutFuture<AppendEntryResponse> future = responses.get(i); if (future == null) { // 如果為空,表示還沒有第i條消息,結(jié)束循環(huán) break; } else if (future.isTimeOut()) { // 如果第i條消息的請求已經(jīng)超時 AppendEntryResponse response = new AppendEntryResponse(); response.setGroup(memberState.getGroup()); // 設(shè)置超時狀態(tài)WAIT_QUORUM_ACK_TIMEOUT response.setCode(DLedgerResponseCode.WAIT_QUORUM_ACK_TIMEOUT.getCode()); response.setTerm(currTerm); response.setIndex(i); response.setLeaderId(memberState.getSelfId()); // 設(shè)置完成 future.complete(response); } else { break; } } waitForRunning(1); } // 如果上次校驗時間超過1000ms或者needCheck為true // 這一步主要是處理已經(jīng)寫入成功的消息對應(yīng)的響應(yīng)對象AppendEntryResponse,是否由于某些原因未移除,如果是需要進行清理 if (DLedgerUtils.elapsed(lastCheckLeakTimeMs) > 1000 || needCheck) { // 更新節(jié)點的復(fù)制進度 updatePeerWaterMark(currTerm, memberState.getSelfId(), dLedgerStore.getLedgerEndIndex()); // 遍歷當(dāng)前term所有的請求響應(yīng) for (Map.Entry<Long, TimeoutFuture<AppendEntryResponse>> futureEntry : responses.entrySet()) { // 如果小于quorumIndex if (futureEntry.getKey() < quorumIndex) { AppendEntryResponse response = new AppendEntryResponse(); response.setGroup(memberState.getGroup()); response.setTerm(currTerm); response.setIndex(futureEntry.getKey()); response.setLeaderId(memberState.getSelfId()); response.setPos(((AppendFuture) futureEntry.getValue()).getPos()); futureEntry.getValue().complete(response); // 移除 responses.remove(futureEntry.getKey()); } } lastCheckLeakTimeMs = System.currentTimeMillis(); } // 更新lastQuorumIndex lastQuorumIndex = quorumIndex; } catch (Throwable t) { DLedgerEntryPusher.logger.error("Error in {}", getName(), t); DLedgerUtils.sleep(100); } } }持久化
Leader節(jié)點在某個消息的寫入得到集群中大多數(shù)Follower節(jié)點的響應(yīng)之后,會調(diào)用updateCommittedIndex將消息的index記在committedIndex中,上面也提到過,F(xiàn)ollower節(jié)點在收到Leader節(jié)點的APPEND請求的時候,也會將請求中設(shè)置的Leader節(jié)點的committedIndex更新到本地。
在持久化檢查點的persistCheckPoint方法中,會將LedgerEndIndex和committedIndex寫入到文件(ChecktPoint)進行持久化(Broker停止或者FLUSH的時候):
ledgerEndIndex:Leader或者Follower節(jié)點最后一條成功寫入的消息的index;
committedIndex:如果某條消息轉(zhuǎn)發(fā)給Follower節(jié)點之后得到了集群中大多數(shù)節(jié)點的響應(yīng)成功,將對應(yīng)的index記在committedIndex表示該index之前的消息都已提交,已提交的消息可以被消費者消費,Leader節(jié)點會將值設(shè)置在APPEND請求中發(fā)送給Follower節(jié)點進行更新或者發(fā)送COMMIT請求進行更新;
public class DLedgerMmapFileStore extends DLedgerStore { public void updateCommittedIndex(long term, long newCommittedIndex) { if (newCommittedIndex == -1 || ledgerEndIndex == -1 || term < memberState.currTerm() || newCommittedIndex == this.committedIndex) { return; } if (newCommittedIndex < this.committedIndex || newCommittedIndex < this.ledgerBeginIndex) { logger.warn("[MONITOR]Skip update committed index for new={} < old={} or new={} < beginIndex={}", newCommittedIndex, this.committedIndex, newCommittedIndex, this.ledgerBeginIndex); return; } // 獲取ledgerEndIndex long endIndex = ledgerEndIndex; // 如果新的提交index大于最后一條消息的index if (newCommittedIndex > endIndex) { // 更新 newCommittedIndex = endIndex; } Pair<Long, Integer> posAndSize = getEntryPosAndSize(newCommittedIndex); PreConditions.check(posAndSize != null, DLedgerResponseCode.DISK_ERROR); this.committedIndex = newCommittedIndex; this.committedPos = posAndSize.getKey() + posAndSize.getValue(); } // 持久化檢查點 void persistCheckPoint() { try { Properties properties = new Properties(); // 設(shè)置LedgerEndIndex properties.put(END_INDEX_KEY, getLedgerEndIndex()); // 設(shè)置committedIndex properties.put(COMMITTED_INDEX_KEY, getCommittedIndex()); String data = IOUtils.properties2String(properties); // 將數(shù)據(jù)寫入文件 IOUtils.string2File(data, dLedgerConfig.getDefaultPath() + File.separator + CHECK_POINT_FILE); } catch (Throwable t) { logger.error("Persist checkpoint failed", t); } }}
以上就是關(guān)于pos機源碼資源,Dledger日志復(fù)制源碼分析的知識,后面我們會繼續(xù)為大家整理關(guān)于pos機源碼資源的知識,希望能夠幫助到大家!
