Compare commits
38 commits
Author | SHA1 | Date | |
---|---|---|---|
Trolli Schmittlauch | e79ba52e00 | ||
Trolli Schmittlauch | 4aa4667a1d | ||
Trolli Schmittlauch | 6aebd982f8 | ||
Trolli Schmittlauch | 048a6ce391 | ||
Trolli Schmittlauch | 8bd4e04bcd | ||
Trolli Schmittlauch | 0cb4b6815c | ||
Trolli Schmittlauch | b111515178 | ||
Trolli Schmittlauch | ecb127e6af | ||
Trolli Schmittlauch | 5ed8a28fde | ||
Trolli Schmittlauch | bb0fb0919a | ||
Trolli Schmittlauch | b2b4fe3dd8 | ||
Trolli Schmittlauch | c208aeceaa | ||
Trolli Schmittlauch | 0ee8f0dc43 | ||
Trolli Schmittlauch | 21ecf9b041 | ||
Trolli Schmittlauch | 9a61c186e3 | ||
Trolli Schmittlauch | 578cc362b9 | ||
Trolli Schmittlauch | 1a0de55b8c | ||
Trolli Schmittlauch | 7a87d86c32 | ||
Trolli Schmittlauch | 3b6d129bfc | ||
Trolli Schmittlauch | 62da66aade | ||
Trolli Schmittlauch | 13c5b385b1 | ||
Trolli Schmittlauch | 1ed0281417 | ||
Trolli Schmittlauch | 499c90e63a | ||
Trolli Schmittlauch | 1a7afed062 | ||
Trolli Schmittlauch | 8e8ea41dc4 | ||
Trolli Schmittlauch | 33ae904d17 | ||
Trolli Schmittlauch | 68de73d919 | ||
Trolli Schmittlauch | 0ab6ee9c8f | ||
Trolli Schmittlauch | 12dfc56a73 | ||
Trolli Schmittlauch | 9bf7365a2c | ||
Trolli Schmittlauch | 5e745cd035 | ||
Trolli Schmittlauch | 30bf0529ed | ||
Trolli Schmittlauch | 576ea2c3f6 | ||
Trolli Schmittlauch | 7dd7e96cce | ||
Trolli Schmittlauch | a1cfbbac48 | ||
Trolli Schmittlauch | af27cded19 | ||
Trolli Schmittlauch | 41aaa8ff70 | ||
Trolli Schmittlauch | ddea599022 |
|
@ -6,20 +6,22 @@ Domain ::= VisibleString
|
|||
|
||||
Partnum ::= INTEGER (0..150)
|
||||
|
||||
Action ::= ENUMERATED {queryID, join, leave, stabilise, ping}
|
||||
Action ::= ENUMERATED {queryID, join, leave, stabilise, ping, queryLoad}
|
||||
|
||||
Request ::= SEQUENCE {
|
||||
action Action,
|
||||
requestID INTEGER (0..4294967295), -- arbitrarily restricting to an unsigned 32bit integer
|
||||
receiverID NodeID,
|
||||
sender NodeState,
|
||||
part Partnum, -- part number of this message, starts at 1
|
||||
finalPart BOOLEAN, -- flag indicating this `part` to be the last of this reuest
|
||||
actionPayload CHOICE {
|
||||
queryIDRequestPayload QueryIDRequestPayload,
|
||||
joinRequestPayload JoinRequestPayload,
|
||||
leaveRequestPayload LeaveRequestPayload,
|
||||
stabiliseRequestPayload StabiliseRequestPayload,
|
||||
pingRequestPayload PingRequestPayload
|
||||
leaveRequestPayload LeaveRequestPayload,
|
||||
stabiliseRequestPayload StabiliseRequestPayload,
|
||||
pingRequestPayload PingRequestPayload,
|
||||
loadRequestPayload LoadRequestPayload
|
||||
} OPTIONAL -- just for symmetry reasons with response, requests without a payload have no meaning
|
||||
}
|
||||
|
||||
|
@ -34,11 +36,12 @@ Response ::= SEQUENCE {
|
|||
finalPart BOOLEAN, -- flag indicating this `part` to be the last of this response
|
||||
action Action,
|
||||
actionPayload CHOICE {
|
||||
queryIDResponsePayload QueryIDResponsePayload,
|
||||
joinResponsePayload JoinResponsePayload,
|
||||
queryIDResponsePayload QueryIDResponsePayload,
|
||||
joinResponsePayload JoinResponsePayload,
|
||||
leaveResponsePayload LeaveResponsePayload,
|
||||
stabiliseResponsePayload StabiliseResponsePayload,
|
||||
pingResponsePayload PingResponsePayload
|
||||
pingResponsePayload PingResponsePayload,
|
||||
loadResponsePayload LoadResponsePayload
|
||||
} OPTIONAL -- no payload when just ACKing a previous request
|
||||
}
|
||||
|
||||
|
@ -101,5 +104,15 @@ PingRequestPayload ::= NULL -- do not include a node/ vserver ID, so that
|
|||
-- learning all active vserver IDs handled by the server at once
|
||||
PingResponsePayload ::= SEQUENCE OF NodeState
|
||||
|
||||
LoadRequestPayload ::= SEQUENCE {
|
||||
upperSegmentBound NodeID
|
||||
}
|
||||
|
||||
LoadResponsePayload ::= SEQUENCE {
|
||||
loadSum REAL,
|
||||
remainingLoadTarget REAL,
|
||||
totalCapacity REAL,
|
||||
lowerBound NodeID
|
||||
}
|
||||
|
||||
END
|
||||
|
|
|
@ -46,7 +46,7 @@ category: Network
|
|||
extra-source-files: CHANGELOG.md
|
||||
|
||||
common deps
|
||||
build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms, http-client, http-types, unbounded-delays, dlist, formatting
|
||||
build-depends: base >=4, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=3.1, time, cmdargs ^>= 0.10, cryptonite, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms, http-client, http-types, unbounded-delays, dlist, formatting
|
||||
ghc-options: -Wall -Wpartial-fields -O2
|
||||
|
||||
|
||||
|
|
35
app/Main.hs
35
app/Main.hs
|
@ -18,38 +18,20 @@ main = do
|
|||
-- ToDo: parse and pass config
|
||||
-- probably use `tomland` for that
|
||||
(fConf, sConf) <- readConfig
|
||||
-- TODO: first initialise 'RealNode', then the vservers
|
||||
-- ToDo: load persisted caches, bootstrapping nodes …
|
||||
(serverSock, thisNode) <- fediChordInit fConf (runService sConf :: DHT d => d -> IO (PostService d))
|
||||
-- currently no masking is necessary, as there is nothing to clean up
|
||||
nodeCacheWriterThread <- forkIO $ nodeCacheWriter thisNode
|
||||
-- try joining the DHT using one of the provided bootstrapping nodes
|
||||
joinedState <- tryBootstrapJoining thisNode
|
||||
either (\err -> do
|
||||
-- handle unsuccessful join
|
||||
|
||||
putStrLn $ err <> " Error joining, start listening for incoming requests anyways"
|
||||
print =<< readTVarIO thisNode
|
||||
-- launch thread attempting to join on new cache entries
|
||||
_ <- forkIO $ joinOnNewEntriesThread thisNode
|
||||
wait =<< async (fediMainThreads serverSock thisNode)
|
||||
)
|
||||
(\joinedNS -> do
|
||||
-- launch main eventloop with successfully joined state
|
||||
putStrLn "successful join"
|
||||
wait =<< async (fediMainThreads serverSock thisNode)
|
||||
)
|
||||
joinedState
|
||||
pure ()
|
||||
(fediThreads, thisNode) <- fediChordInit fConf (runService sConf :: DHT d => d -> IO (PostService d))
|
||||
-- wait for all DHT threads to terminate, this keeps the main thread running
|
||||
wait fediThreads
|
||||
|
||||
|
||||
readConfig :: IO (FediChordConf, ServiceConf)
|
||||
readConfig = do
|
||||
confDomainString : ipString : portString : servicePortString : speedupString : remainingArgs <- getArgs
|
||||
confDomainString : ipString : portString : servicePortString : speedupString : loadBalancingEnabled : remainingArgs <- getArgs
|
||||
-- allow starting the initial node without bootstrapping info to avoid
|
||||
-- waiting for timeout
|
||||
let
|
||||
speedup = read speedupString
|
||||
statsEvalD = 120 * 10^6 `div` speedup
|
||||
confBootstrapNodes' = case remainingArgs of
|
||||
bootstrapHost : bootstrapPortString : _ ->
|
||||
[(bootstrapHost, read bootstrapPortString)]
|
||||
|
@ -67,6 +49,11 @@ readConfig = do
|
|||
, confResponsePurgeAge = 60 / fromIntegral speedup
|
||||
, confRequestTimeout = 5 * 10^6 `div` speedup
|
||||
, confRequestRetries = 3
|
||||
, confEnableKChoices = loadBalancingEnabled /= "off"
|
||||
, confKChoicesOverload = 0.9
|
||||
, confKChoicesUnderload = 0.1
|
||||
, confKChoicesMaxVS = 8
|
||||
, confKChoicesRebalanceInterval = round (realToFrac statsEvalD * 1.1 :: Double)
|
||||
}
|
||||
sConf = ServiceConf
|
||||
{ confSubscriptionExpiryTime = 24*3600 / fromIntegral speedup
|
||||
|
@ -74,7 +61,7 @@ readConfig = do
|
|||
, confServiceHost = confDomainString
|
||||
, confLogfilePath = "../simulationData/logs/" <> confDomainString <> ".log"
|
||||
, confSpeedupFactor = speedup
|
||||
, confStatsEvalDelay = 120 * 10^6 `div` speedup
|
||||
, confStatsEvalDelay = statsEvalD
|
||||
}
|
||||
pure (fConf, sConf)
|
||||
|
||||
|
|
19
default.nix
19
default.nix
|
@ -1,26 +1,18 @@
|
|||
{
|
||||
compiler ? "ghc865",
|
||||
withHIE ? false
|
||||
compiler ? "ghc884"
|
||||
}:
|
||||
|
||||
let
|
||||
# pin all-hies for getting the language server
|
||||
all-hies = fetchTarball {
|
||||
url = "https://github.com/infinisil/all-hies/tarball/b8fb659620b99b4a393922abaa03a1695e2ca64d";
|
||||
sha256 = "sha256:0br6wsqpfk1lzz90f7zw439w1ir2p54268qilw9l2pk6yz7ganfx";
|
||||
};
|
||||
pkgs = import (
|
||||
builtins.fetchGit {
|
||||
name = "nixpkgs-pinned";
|
||||
url = https://github.com/NixOS/nixpkgs/;
|
||||
ref = "refs/heads/release-20.03";
|
||||
rev = "de3780b937d2984f9b5e20d191f23be4f857b3aa";
|
||||
ref = "refs/heads/release-20.09";
|
||||
rev = "e065200fc90175a8f6e50e76ef10a48786126e1c";
|
||||
}) {
|
||||
# Pass no config for purity
|
||||
config = {};
|
||||
overlays = [
|
||||
(import all-hies {}).overlay
|
||||
];
|
||||
overlays = [];
|
||||
};
|
||||
hp = pkgs.haskell.packages."${compiler}";
|
||||
src = pkgs.nix-gitignore.gitignoreSource [] ./.;
|
||||
|
@ -38,7 +30,6 @@ in
|
|||
hlint
|
||||
stylish-haskell
|
||||
pkgs.python3Packages.asn1ate
|
||||
]
|
||||
++ (if withHIE then [ hie ] else []);
|
||||
];
|
||||
};
|
||||
}
|
||||
|
|
|
@ -1 +1 @@
|
|||
(import ./default.nix {withHIE = true;}).shell
|
||||
(import ./default.nix {}).shell
|
||||
|
|
|
@ -184,6 +184,19 @@ encodePayload payload'@PingResponsePayload{} =
|
|||
Start Sequence
|
||||
: concatMap encodeNodeState (pingNodeStates payload')
|
||||
<> [End Sequence]
|
||||
encodePayload payload'@LoadRequestPayload{} =
|
||||
[ Start Sequence
|
||||
, IntVal . getNodeID $ loadSegmentUpperBound payload'
|
||||
, End Sequence
|
||||
]
|
||||
encodePayload payload'@LoadResponsePayload{} =
|
||||
[ Start Sequence
|
||||
, Real $ loadSum payload'
|
||||
, Real $ loadRemainingTarget payload'
|
||||
, Real $ loadTotalCapacity payload'
|
||||
, IntVal . getNodeID $ loadSegmentLowerBound payload'
|
||||
, End Sequence
|
||||
]
|
||||
|
||||
encodeNodeState :: NodeState a => a -> [ASN1]
|
||||
encodeNodeState ns = [
|
||||
|
@ -193,7 +206,7 @@ encodeNodeState ns = [
|
|||
, OctetString (ipAddrAsBS $ getIpAddr ns)
|
||||
, IntVal (toInteger . getDhtPort $ ns)
|
||||
, IntVal (toInteger . getServicePort $ ns)
|
||||
, IntVal (getVServerID ns)
|
||||
, IntVal (toInteger $ getVServerID ns)
|
||||
, End Sequence
|
||||
]
|
||||
|
||||
|
@ -215,10 +228,11 @@ encodeQueryResult FORWARD{} = Enumerated 1
|
|||
encodeMessage :: FediChordMessage -- ^ the 'FediChordMessage to be encoded
|
||||
-> [ASN1]
|
||||
encodeMessage
|
||||
(Request requestID sender part isFinalPart action requestPayload) =
|
||||
(Request requestID receiverID sender part isFinalPart action requestPayload) =
|
||||
Start Sequence
|
||||
: (Enumerated . fromIntegral . fromEnum $ action)
|
||||
: IntVal requestID
|
||||
: (IntVal . getNodeID $ receiverID)
|
||||
: encodeNodeState sender
|
||||
<> [IntVal part
|
||||
, Boolean isFinalPart]
|
||||
|
@ -262,18 +276,20 @@ parseMessage = do
|
|||
parseRequest :: Action -> ParseASN1 FediChordMessage
|
||||
parseRequest action = do
|
||||
requestID <- parseInteger
|
||||
receiverID' <- fromInteger <$> parseInteger
|
||||
sender <- parseNodeState
|
||||
part <- parseInteger
|
||||
isFinalPart <- parseBool
|
||||
hasPayload <- hasNext
|
||||
payload <- if not hasPayload then pure Nothing else Just <$> case action of
|
||||
QueryID -> parseQueryIDRequest
|
||||
Join -> parseJoinRequest
|
||||
Leave -> parseLeaveRequest
|
||||
Stabilise -> parseStabiliseRequest
|
||||
Ping -> parsePingRequest
|
||||
QueryID -> parseQueryIDRequestPayload
|
||||
Join -> parseJoinRequestPayload
|
||||
Leave -> parseLeaveRequestPayload
|
||||
Stabilise -> parseStabiliseRequestPayload
|
||||
Ping -> parsePingRequestPayload
|
||||
QueryLoad -> parseLoadRequestPayload
|
||||
|
||||
pure $ Request requestID sender part isFinalPart action payload
|
||||
pure $ Request requestID receiverID' sender part isFinalPart action payload
|
||||
|
||||
parseResponse :: Integer -> ParseASN1 FediChordMessage
|
||||
parseResponse requestID = do
|
||||
|
@ -283,11 +299,12 @@ parseResponse requestID = do
|
|||
action <- parseEnum :: ParseASN1 Action
|
||||
hasPayload <- hasNext
|
||||
payload <- if not hasPayload then pure Nothing else Just <$> case action of
|
||||
QueryID -> parseQueryIDResponse
|
||||
Join -> parseJoinResponse
|
||||
Leave -> parseLeaveResponse
|
||||
Stabilise -> parseStabiliseResponse
|
||||
Ping -> parsePingResponse
|
||||
QueryID -> parseQueryIDResponsePayload
|
||||
Join -> parseJoinResponsePayload
|
||||
Leave -> parseLeaveResponsePayload
|
||||
Stabilise -> parseStabiliseResponsePayload
|
||||
Ping -> parsePingResponsePayload
|
||||
QueryLoad -> parseLoadResponsePayload
|
||||
|
||||
pure $ Response requestID senderID part isFinalPart action payload
|
||||
|
||||
|
@ -305,6 +322,13 @@ parseInteger = do
|
|||
IntVal parsed -> pure parsed
|
||||
x -> throwParseError $ "Expected IntVal but got " <> show x
|
||||
|
||||
parseReal :: ParseASN1 Double
|
||||
parseReal = do
|
||||
i <- getNext
|
||||
case i of
|
||||
Real parsed -> pure parsed
|
||||
x -> throwParseError $ "Expected Real but got " <> show x
|
||||
|
||||
parseEnum :: Enum a => ParseASN1 a
|
||||
parseEnum = do
|
||||
e <- getNext
|
||||
|
@ -346,7 +370,7 @@ parseNodeState = onNextContainer Sequence $ do
|
|||
, domain = domain'
|
||||
, dhtPort = dhtPort'
|
||||
, servicePort = servicePort'
|
||||
, vServerID = vServer'
|
||||
, vServerID = fromInteger vServer'
|
||||
, ipAddr = ip'
|
||||
}
|
||||
|
||||
|
@ -360,13 +384,13 @@ parseCacheEntry = onNextContainer Sequence $ do
|
|||
parseNodeCache :: ParseASN1 [RemoteCacheEntry]
|
||||
parseNodeCache = onNextContainer Sequence $ getMany parseCacheEntry
|
||||
|
||||
parseJoinRequest :: ParseASN1 ActionPayload
|
||||
parseJoinRequest = do
|
||||
parseJoinRequestPayload :: ParseASN1 ActionPayload
|
||||
parseJoinRequestPayload = do
|
||||
parseNull
|
||||
pure JoinRequestPayload
|
||||
|
||||
parseJoinResponse :: ParseASN1 ActionPayload
|
||||
parseJoinResponse = onNextContainer Sequence $ do
|
||||
parseJoinResponsePayload :: ParseASN1 ActionPayload
|
||||
parseJoinResponsePayload = onNextContainer Sequence $ do
|
||||
succ' <- onNextContainer Sequence (getMany parseNodeState)
|
||||
pred' <- onNextContainer Sequence (getMany parseNodeState)
|
||||
cache <- parseNodeCache
|
||||
|
@ -376,8 +400,8 @@ parseJoinResponse = onNextContainer Sequence $ do
|
|||
, joinCache = cache
|
||||
}
|
||||
|
||||
parseQueryIDRequest :: ParseASN1 ActionPayload
|
||||
parseQueryIDRequest = onNextContainer Sequence $ do
|
||||
parseQueryIDRequestPayload :: ParseASN1 ActionPayload
|
||||
parseQueryIDRequestPayload = onNextContainer Sequence $ do
|
||||
targetID <- fromInteger <$> parseInteger
|
||||
lBestNodes <- parseInteger
|
||||
pure $ QueryIDRequestPayload {
|
||||
|
@ -385,8 +409,8 @@ parseQueryIDRequest = onNextContainer Sequence $ do
|
|||
, queryLBestNodes = lBestNodes
|
||||
}
|
||||
|
||||
parseQueryIDResponse :: ParseASN1 ActionPayload
|
||||
parseQueryIDResponse = onNextContainer Sequence $ do
|
||||
parseQueryIDResponsePayload :: ParseASN1 ActionPayload
|
||||
parseQueryIDResponsePayload = onNextContainer Sequence $ do
|
||||
Enumerated resultType <- getNext
|
||||
result <- case resultType of
|
||||
0 -> FOUND <$> parseNodeState
|
||||
|
@ -396,13 +420,13 @@ parseQueryIDResponse = onNextContainer Sequence $ do
|
|||
queryResult = result
|
||||
}
|
||||
|
||||
parseStabiliseRequest :: ParseASN1 ActionPayload
|
||||
parseStabiliseRequest = do
|
||||
parseStabiliseRequestPayload :: ParseASN1 ActionPayload
|
||||
parseStabiliseRequestPayload = do
|
||||
parseNull
|
||||
pure StabiliseRequestPayload
|
||||
|
||||
parseStabiliseResponse :: ParseASN1 ActionPayload
|
||||
parseStabiliseResponse = onNextContainer Sequence $ do
|
||||
parseStabiliseResponsePayload :: ParseASN1 ActionPayload
|
||||
parseStabiliseResponsePayload = onNextContainer Sequence $ do
|
||||
succ' <- onNextContainer Sequence (getMany parseNodeState)
|
||||
pred' <- onNextContainer Sequence (getMany parseNodeState)
|
||||
pure $ StabiliseResponsePayload {
|
||||
|
@ -410,8 +434,8 @@ parseStabiliseResponse = onNextContainer Sequence $ do
|
|||
, stabilisePredecessors = pred'
|
||||
}
|
||||
|
||||
parseLeaveRequest :: ParseASN1 ActionPayload
|
||||
parseLeaveRequest = onNextContainer Sequence $ do
|
||||
parseLeaveRequestPayload :: ParseASN1 ActionPayload
|
||||
parseLeaveRequestPayload = onNextContainer Sequence $ do
|
||||
succ' <- onNextContainer Sequence (getMany parseNodeState)
|
||||
pred' <- onNextContainer Sequence (getMany parseNodeState)
|
||||
doMigration <- parseBool
|
||||
|
@ -421,19 +445,40 @@ parseLeaveRequest = onNextContainer Sequence $ do
|
|||
, leaveDoMigration = doMigration
|
||||
}
|
||||
|
||||
parseLeaveResponse :: ParseASN1 ActionPayload
|
||||
parseLeaveResponse = do
|
||||
parseLeaveResponsePayload :: ParseASN1 ActionPayload
|
||||
parseLeaveResponsePayload = do
|
||||
parseNull
|
||||
pure LeaveResponsePayload
|
||||
|
||||
parsePingRequest :: ParseASN1 ActionPayload
|
||||
parsePingRequest = do
|
||||
parsePingRequestPayload :: ParseASN1 ActionPayload
|
||||
parsePingRequestPayload = do
|
||||
parseNull
|
||||
pure PingRequestPayload
|
||||
|
||||
parsePingResponse :: ParseASN1 ActionPayload
|
||||
parsePingResponse = onNextContainer Sequence $ do
|
||||
parsePingResponsePayload :: ParseASN1 ActionPayload
|
||||
parsePingResponsePayload = onNextContainer Sequence $ do
|
||||
handledNodes <- getMany parseNodeState
|
||||
pure $ PingResponsePayload {
|
||||
pingNodeStates = handledNodes
|
||||
}
|
||||
|
||||
parseLoadRequestPayload :: ParseASN1 ActionPayload
|
||||
parseLoadRequestPayload = onNextContainer Sequence $ do
|
||||
loadUpperBound' <- fromInteger <$> parseInteger
|
||||
pure LoadRequestPayload
|
||||
{ loadSegmentUpperBound = loadUpperBound'
|
||||
}
|
||||
|
||||
parseLoadResponsePayload :: ParseASN1 ActionPayload
|
||||
parseLoadResponsePayload = onNextContainer Sequence $ do
|
||||
loadSum' <- parseReal
|
||||
loadRemainingTarget' <- parseReal
|
||||
loadTotalCapacity' <- parseReal
|
||||
loadSegmentLowerBound' <- fromInteger <$> parseInteger
|
||||
pure LoadResponsePayload
|
||||
{ loadSum = loadSum'
|
||||
, loadRemainingTarget = loadRemainingTarget'
|
||||
, loadTotalCapacity = loadTotalCapacity'
|
||||
, loadSegmentLowerBound = loadSegmentLowerBound'
|
||||
}
|
||||
|
||||
|
|
|
@ -15,6 +15,7 @@ module Hash2Pub.DHTProtocol
|
|||
, Action(..)
|
||||
, ActionPayload(..)
|
||||
, FediChordMessage(..)
|
||||
, mkRequest
|
||||
, maximumParts
|
||||
, sendQueryIdMessages
|
||||
, requestQueryID
|
||||
|
@ -22,6 +23,7 @@ module Hash2Pub.DHTProtocol
|
|||
, requestLeave
|
||||
, requestPing
|
||||
, requestStabilise
|
||||
, requestQueryLoad
|
||||
, lookupMessage
|
||||
, sendRequestTo
|
||||
, queryIdLookupLoop
|
||||
|
@ -36,7 +38,7 @@ module Hash2Pub.DHTProtocol
|
|||
, isPossibleSuccessor
|
||||
, isPossiblePredecessor
|
||||
, isInOwnResponsibilitySlice
|
||||
, isJoined
|
||||
, vsIsJoined
|
||||
, closestCachePredecessors
|
||||
)
|
||||
where
|
||||
|
@ -49,7 +51,8 @@ import Control.Concurrent.STM.TQueue
|
|||
import Control.Concurrent.STM.TVar
|
||||
import Control.Exception
|
||||
import Control.Monad (foldM, forM, forM_, void, when)
|
||||
import Control.Monad.Except (MonadError (..), runExceptT)
|
||||
import Control.Monad.Except (MonadError (..), liftEither,
|
||||
runExceptT)
|
||||
import Control.Monad.IO.Class (MonadIO (..))
|
||||
import qualified Data.ByteString as BS
|
||||
import Data.Either (rights)
|
||||
|
@ -63,6 +66,7 @@ import Data.Maybe (fromJust, fromMaybe, isJust,
|
|||
isNothing, mapMaybe, maybe)
|
||||
import qualified Data.Set as Set
|
||||
import Data.Time.Clock.POSIX
|
||||
import Data.Word (Word8)
|
||||
import Network.Socket hiding (recv, recvFrom, send,
|
||||
sendTo)
|
||||
import Network.Socket.ByteString
|
||||
|
@ -74,23 +78,27 @@ import Hash2Pub.ASN1Coding
|
|||
import Hash2Pub.FediChordTypes (CacheEntry (..),
|
||||
CacheEntry (..),
|
||||
FediChordConf (..),
|
||||
HasKeyID (..),
|
||||
HasKeyID (..), LoadStats (..),
|
||||
LocalNodeState (..),
|
||||
LocalNodeStateSTM, NodeCache,
|
||||
NodeID, NodeState (..),
|
||||
RealNode (..), RealNodeSTM,
|
||||
RemoteNodeState (..),
|
||||
RingEntry (..), RingMap (..),
|
||||
SegmentLoadStats (..),
|
||||
Service (..), addRMapEntry,
|
||||
addRMapEntryWith,
|
||||
cacheGetNodeStateUnvalidated,
|
||||
cacheLookup, cacheLookupPred,
|
||||
cacheLookupSucc, genNodeID,
|
||||
getKeyID, localCompare,
|
||||
getKeyID, hasValidNodeId,
|
||||
loadSliceSum, localCompare,
|
||||
rMapFromList, rMapLookupPred,
|
||||
rMapLookupSucc,
|
||||
remainingLoadTarget,
|
||||
setPredecessors, setSuccessors)
|
||||
import Hash2Pub.ProtocolTypes
|
||||
import Hash2Pub.RingMap
|
||||
|
||||
import Debug.Trace (trace)
|
||||
|
||||
|
@ -103,7 +111,7 @@ queryLocalCache ownState nCache lBestNodes targetID
|
|||
-- as target ID falls between own ID and first predecessor, it is handled by this node
|
||||
-- This only makes sense if the node is part of the DHT by having joined.
|
||||
-- A default answer to nodes querying an unjoined node is provided by 'respondQueryID'.
|
||||
| isJoined ownState && targetID `isInOwnResponsibilitySlice` ownState = FOUND . toRemoteNodeState $ ownState
|
||||
| vsIsJoined ownState && targetID `isInOwnResponsibilitySlice` ownState = FOUND . toRemoteNodeState $ ownState
|
||||
-- my interpretation: the "l best next hops" are the l-1 closest preceding nodes and
|
||||
-- the closest succeeding node (like with the p initiated parallel queries
|
||||
| otherwise = FORWARD $ closestSuccessor `Set.union` closestCachePredecessors (lBestNodes-1) targetID nCache
|
||||
|
@ -227,8 +235,8 @@ markCacheEntryAsVerified timestamp nid = RingMap . Map.adjust adjustFunc nid . g
|
|||
|
||||
-- | uses the successor and predecessor list of a node as an indicator for whether a
|
||||
-- node has properly joined the DHT
|
||||
isJoined :: LocalNodeState s -> Bool
|
||||
isJoined ns = not . all null $ [successors ns, predecessors ns]
|
||||
vsIsJoined :: LocalNodeState s -> Bool
|
||||
vsIsJoined ns = not . all null $ [successors ns, predecessors ns]
|
||||
|
||||
-- | the size limit to be used when serialising messages for sending
|
||||
sendMessageSize :: Num i => i
|
||||
|
@ -237,27 +245,37 @@ sendMessageSize = 1200
|
|||
-- ====== message send and receive operations ======
|
||||
|
||||
-- encode the response to a request that just signals successful receipt
|
||||
ackRequest :: NodeID -> FediChordMessage -> Map.Map Integer BS.ByteString
|
||||
ackRequest ownID req@Request{} = serialiseMessage sendMessageSize $ Response {
|
||||
ackRequest :: FediChordMessage -> Map.Map Integer BS.ByteString
|
||||
ackRequest req@Request{} = serialiseMessage sendMessageSize $ Response {
|
||||
requestID = requestID req
|
||||
, senderID = ownID
|
||||
, senderID = receiverID req
|
||||
, part = part req
|
||||
, isFinalPart = False
|
||||
, action = action req
|
||||
, payload = Nothing
|
||||
}
|
||||
ackRequest _ _ = Map.empty
|
||||
ackRequest _ = Map.empty
|
||||
|
||||
|
||||
-- | extract the first payload from a received message set
|
||||
extractFirstPayload :: Set.Set FediChordMessage -> Maybe ActionPayload
|
||||
extractFirstPayload msgSet = foldr' (\msg plAcc ->
|
||||
if isNothing plAcc && isJust (payload msg)
|
||||
then payload msg
|
||||
else plAcc
|
||||
) Nothing msgSet
|
||||
|
||||
|
||||
-- | Dispatch incoming requests to the dedicated handling and response function, and enqueue
|
||||
-- the response to be sent.
|
||||
handleIncomingRequest :: Service s (RealNodeSTM s)
|
||||
=> LocalNodeStateSTM s -- ^ the handling node
|
||||
=> Word8 -- ^ maximum number of vservers, because of decision to @dropSpoofedIDs@ in here and not already in @fediMessageHandler@
|
||||
-> LocalNodeStateSTM s -- ^ the handling node
|
||||
-> TQueue (BS.ByteString, SockAddr) -- ^ send queue
|
||||
-> Set.Set FediChordMessage -- ^ all parts of the request to handle
|
||||
-> SockAddr -- ^ source address of the request
|
||||
-> IO ()
|
||||
handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do
|
||||
handleIncomingRequest vsLimit nsSTM sendQ msgSet sourceAddr = do
|
||||
ns <- readTVarIO nsSTM
|
||||
-- add nodestate to cache
|
||||
now <- getPOSIXTime
|
||||
|
@ -265,19 +283,20 @@ handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do
|
|||
Nothing -> pure ()
|
||||
Just aPart -> do
|
||||
let (SockAddrInet6 _ _ sourceIP _) = sourceAddr
|
||||
queueAddEntries (Identity $ RemoteCacheEntry (sender aPart) now) ns
|
||||
queueAddEntries (Identity $ RemoteCacheEntry (sender aPart) now) (cacheWriteQueue ns)
|
||||
-- distinguish on whether and how to respond. If responding, pass message to response generating function and write responses to send queue
|
||||
maybe (pure ()) (
|
||||
mapM_ (\resp -> atomically $ writeTQueue sendQ (resp, sourceAddr))
|
||||
)
|
||||
=<< (case action aPart of
|
||||
Ping -> Just <$> respondPing nsSTM msgSet
|
||||
Join -> dropSpoofedIDs sourceIP nsSTM msgSet respondJoin
|
||||
Join -> dropSpoofedIDs vsLimit sourceIP nsSTM msgSet respondJoin
|
||||
-- ToDo: figure out what happens if not joined
|
||||
QueryID -> Just <$> respondQueryID nsSTM msgSet
|
||||
-- only when joined
|
||||
Leave -> if isJoined ns then dropSpoofedIDs sourceIP nsSTM msgSet respondLeave else pure Nothing
|
||||
Stabilise -> if isJoined ns then dropSpoofedIDs sourceIP nsSTM msgSet respondStabilise else pure Nothing
|
||||
Leave -> if vsIsJoined ns then dropSpoofedIDs vsLimit sourceIP nsSTM msgSet respondLeave else pure Nothing
|
||||
Stabilise -> if vsIsJoined ns then dropSpoofedIDs vsLimit sourceIP nsSTM msgSet respondStabilise else pure Nothing
|
||||
QueryLoad -> if vsIsJoined ns then Just <$> respondQueryLoad nsSTM msgSet else pure Nothing
|
||||
)
|
||||
-- for single part request, response starts with part number 1. For multipart requests, response starts with part number n+1.
|
||||
|
||||
|
@ -287,19 +306,18 @@ handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do
|
|||
-- | Filter out requests with spoofed node IDs by recomputing the ID using
|
||||
-- the sender IP.
|
||||
-- For valid (non-spoofed) sender IDs, the passed responder function is invoked.
|
||||
dropSpoofedIDs :: HostAddress6 -- msg source address
|
||||
dropSpoofedIDs :: Word8 -- ^ maximum number of vservers per node
|
||||
-> HostAddress6 -- ^ msg source address
|
||||
-> LocalNodeStateSTM s
|
||||
-> Set.Set FediChordMessage -- message parts of the request
|
||||
-> (LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)) -- reponder function to be invoked for valid requests
|
||||
-> Set.Set FediChordMessage -- ^ message parts of the request
|
||||
-> (LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)) -- ^ reponder function to be invoked for valid requests
|
||||
-> IO (Maybe (Map.Map Integer BS.ByteString))
|
||||
dropSpoofedIDs addr nsSTM' msgSet' responder =
|
||||
dropSpoofedIDs limVs addr nsSTM' msgSet' responder =
|
||||
let
|
||||
aRequestPart = Set.elemAt 0 msgSet
|
||||
senderNs = sender aRequestPart
|
||||
givenSenderID = getNid senderNs
|
||||
recomputedID = genNodeID addr (getDomain senderNs) (fromInteger $ getVServerID senderNs)
|
||||
in
|
||||
if recomputedID == givenSenderID
|
||||
if hasValidNodeId limVs senderNs addr
|
||||
then Just <$> responder nsSTM' msgSet'
|
||||
else pure Nothing
|
||||
|
||||
|
@ -317,19 +335,15 @@ respondQueryID nsSTM msgSet = do
|
|||
let
|
||||
aRequestPart = Set.elemAt 0 msgSet
|
||||
senderID = getNid . sender $ aRequestPart
|
||||
senderPayload = foldr' (\msg plAcc ->
|
||||
if isNothing plAcc && isJust (payload msg)
|
||||
then payload msg
|
||||
else plAcc
|
||||
) Nothing msgSet
|
||||
-- return only empty message serialisation if no payload was included in parts
|
||||
senderPayload = extractFirstPayload msgSet
|
||||
-- return only empty message serialisation if no payload was included in parts
|
||||
maybe (pure Map.empty) (\senderPayload' -> do
|
||||
responseMsg <- atomically $ do
|
||||
nsSnap <- readTVar nsSTM
|
||||
cache <- readTVar $ nodeCacheSTM nsSnap
|
||||
let
|
||||
responsePayload = QueryIDResponsePayload {
|
||||
queryResult = if isJoined nsSnap
|
||||
queryResult = if vsIsJoined nsSnap
|
||||
then queryLocalCache nsSnap cache (fromIntegral $ queryLBestNodes senderPayload') (queryTargetID senderPayload')
|
||||
-- if not joined yet, attract responsibility for
|
||||
-- all keys to make bootstrapping possible
|
||||
|
@ -422,6 +436,47 @@ respondPing nsSTM msgSet = do
|
|||
}
|
||||
pure $ serialiseMessage sendMessageSize pingResponse
|
||||
|
||||
respondQueryLoad :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
|
||||
respondQueryLoad nsSTM msgSet = do
|
||||
nsSnap <- readTVarIO nsSTM
|
||||
-- this message cannot be split reasonably, so just
|
||||
-- consider the first payload
|
||||
let
|
||||
aRequestPart = Set.elemAt 0 msgSet
|
||||
senderPayload = extractFirstPayload msgSet
|
||||
responsePl <- maybe (pure Nothing) (\pl ->
|
||||
case pl of
|
||||
LoadRequestPayload{} -> do
|
||||
parentNode <- readTVarIO (parentRealNode nsSnap)
|
||||
let
|
||||
serv = nodeService parentNode
|
||||
conf = nodeConfig parentNode
|
||||
lStats <- getServiceLoadStats serv
|
||||
let
|
||||
directSucc = getNid . head . predecessors $ nsSnap
|
||||
handledTagSum = loadSliceSum lStats directSucc (loadSegmentUpperBound pl)
|
||||
pure $ Just LoadResponsePayload
|
||||
{ loadSum = handledTagSum
|
||||
, loadRemainingTarget = remainingLoadTarget conf lStats
|
||||
, loadTotalCapacity = totalCapacity lStats
|
||||
, loadSegmentLowerBound = directSucc
|
||||
}
|
||||
_ -> pure Nothing
|
||||
)
|
||||
senderPayload
|
||||
|
||||
pure $ maybe
|
||||
Map.empty
|
||||
(\pl -> serialiseMessage sendMessageSize $ Response
|
||||
{ requestID = requestID aRequestPart
|
||||
, senderID = getNid nsSnap
|
||||
, part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1
|
||||
, isFinalPart = False
|
||||
, action = QueryLoad
|
||||
, payload = Just pl
|
||||
}
|
||||
) responsePl
|
||||
|
||||
|
||||
respondJoin :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
|
||||
respondJoin nsSTM msgSet = do
|
||||
|
@ -434,7 +489,7 @@ respondJoin nsSTM msgSet = do
|
|||
senderNS = sender aRequestPart
|
||||
-- if not joined yet, attract responsibility for
|
||||
-- all keys to make bootstrapping possible
|
||||
responsibilityLookup = if isJoined nsSnap then queryLocalCache nsSnap cache 1 (getNid senderNS) else FOUND (toRemoteNodeState nsSnap)
|
||||
responsibilityLookup = if vsIsJoined nsSnap then queryLocalCache nsSnap cache 1 (getNid senderNS) else FOUND (toRemoteNodeState nsSnap)
|
||||
thisNodeResponsible (FOUND _) = True
|
||||
thisNodeResponsible (FORWARD _) = False
|
||||
-- check whether the joining node falls into our responsibility
|
||||
|
@ -481,6 +536,21 @@ respondJoin nsSTM msgSet = do
|
|||
|
||||
-- ....... request sending .......
|
||||
|
||||
-- | defautl constructor for request messages, fills standard values like
|
||||
-- part number to avoid code repition
|
||||
mkRequest :: LocalNodeState s -> NodeID -> Action -> Maybe ActionPayload -> (Integer -> FediChordMessage)
|
||||
mkRequest ns targetID action pl rid = Request
|
||||
{ requestID = rid
|
||||
, receiverID = targetID
|
||||
, sender = toRemoteNodeState ns
|
||||
-- part number and final flag can be changed by ASN1 encoder to make packet
|
||||
-- fit the MTU restrictions
|
||||
, part = 1
|
||||
, isFinalPart = True
|
||||
, action = action
|
||||
, payload = pl
|
||||
}
|
||||
|
||||
-- | send a join request and return the joined 'LocalNodeState' including neighbours
|
||||
requestJoin :: (NodeState a, Service s (RealNodeSTM s)) => a -- ^ currently responsible node to be contacted
|
||||
-> LocalNodeStateSTM s -- ^ joining NodeState
|
||||
|
@ -492,7 +562,7 @@ requestJoin toJoinOn ownStateSTM = do
|
|||
let srcAddr = confIP nodeConf
|
||||
bracket (mkSendSocket srcAddr (getDomain toJoinOn) (getDhtPort toJoinOn)) close (\sock -> do
|
||||
-- extract own state for getting request information
|
||||
responses <- sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid -> Request rid (toRemoteNodeState ownState) 1 True Join (Just JoinRequestPayload)) sock
|
||||
responses <- sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (mkRequest ownState (getNid toJoinOn) Join (Just JoinRequestPayload)) sock
|
||||
(cacheInsertQ, joinedState) <- atomically $ do
|
||||
stateSnap <- readTVar ownStateSTM
|
||||
let
|
||||
|
@ -523,7 +593,7 @@ requestJoin toJoinOn ownStateSTM = do
|
|||
writeTVar ownStateSTM newState
|
||||
pure (cacheInsertQ, newState)
|
||||
-- execute the cache insertions
|
||||
mapM_ (\f -> f joinedState) cacheInsertQ
|
||||
mapM_ (\f -> f (cacheWriteQueue joinedState)) cacheInsertQ
|
||||
if responses == Set.empty
|
||||
then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn)
|
||||
else do
|
||||
|
@ -581,14 +651,14 @@ sendQueryIdMessages :: (Integral i)
|
|||
-> Maybe i -- ^ optionally provide an explicit @l@ parameter of number of nodes to be returned
|
||||
-> [RemoteNodeState] -- ^ nodes to query
|
||||
-> IO QueryResponse -- ^ accumulated response
|
||||
sendQueryIdMessages targetID ns lParam targets = do
|
||||
sendQueryIdMessages lookupID ns lParam targets = do
|
||||
|
||||
-- create connected sockets to all query targets and use them for request handling
|
||||
|
||||
nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns)
|
||||
let srcAddr = confIP nodeConf
|
||||
queryThreads <- mapM (\resultNode -> async $ bracket (mkSendSocket srcAddr (getDomain resultNode) (getDhtPort resultNode)) close (
|
||||
sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (lookupMessage targetID ns Nothing)
|
||||
sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (lookupMessage lookupID ns Nothing (getNid resultNode))
|
||||
)) targets
|
||||
-- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613
|
||||
-- ToDo: exception handling, maybe log them
|
||||
|
@ -605,7 +675,7 @@ sendQueryIdMessages targetID ns lParam targets = do
|
|||
_ -> Set.empty
|
||||
|
||||
-- forward entries to global cache
|
||||
queueAddEntries entrySet ns
|
||||
queueAddEntries entrySet (cacheWriteQueue ns)
|
||||
-- return accumulated QueryResult
|
||||
pure $ case acc of
|
||||
-- once a FOUND as been encountered, return this as a result
|
||||
|
@ -621,13 +691,14 @@ sendQueryIdMessages targetID ns lParam targets = do
|
|||
|
||||
-- | Create a QueryID message to be supplied to 'sendRequestTo'
|
||||
lookupMessage :: Integral i
|
||||
=> NodeID -- ^ target ID
|
||||
=> NodeID -- ^ lookup ID to be looked up
|
||||
-> LocalNodeState s -- ^ sender node state
|
||||
-> Maybe i -- ^ optionally provide a different l parameter
|
||||
-> NodeID -- ^ target ID of message destination
|
||||
-> (Integer -> FediChordMessage)
|
||||
lookupMessage targetID ns lParam = \rID -> Request rID (toRemoteNodeState ns) 1 True QueryID (Just $ pl ns targetID)
|
||||
lookupMessage lookupID ns lParam targetID = mkRequest ns targetID QueryID (Just $ pl ns lookupID)
|
||||
where
|
||||
pl ns' targetID' = QueryIDRequestPayload { queryTargetID = targetID', queryLBestNodes = maybe (fromIntegral $ lNumBestNodes ns) fromIntegral lParam }
|
||||
pl ns' lookupID' = QueryIDRequestPayload { queryTargetID = lookupID', queryLBestNodes = maybe (fromIntegral $ lNumBestNodes ns') fromIntegral lParam }
|
||||
|
||||
|
||||
-- | Send a stabilise request to provided 'RemoteNode' and, if successful,
|
||||
|
@ -638,16 +709,7 @@ requestStabilise :: LocalNodeState s -- ^ sending node
|
|||
requestStabilise ns neighbour = do
|
||||
nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns)
|
||||
let srcAddr = confIP nodeConf
|
||||
responses <- bracket (mkSendSocket srcAddr (getDomain neighbour) (getDhtPort neighbour)) close (fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid ->
|
||||
Request {
|
||||
requestID = rid
|
||||
, sender = toRemoteNodeState ns
|
||||
, part = 1
|
||||
, isFinalPart = False
|
||||
, action = Stabilise
|
||||
, payload = Just StabiliseRequestPayload
|
||||
}
|
||||
)
|
||||
responses <- bracket (mkSendSocket srcAddr (getDomain neighbour) (getDhtPort neighbour)) close (fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (mkRequest ns (getNid neighbour) Stabilise (Just StabiliseRequestPayload))
|
||||
) `catch` (\e -> pure . Left $ displayException (e :: IOException))
|
||||
either
|
||||
-- forward IO error messages
|
||||
|
@ -660,7 +722,7 @@ requestStabilise ns neighbour = do
|
|||
)
|
||||
([],[]) respSet
|
||||
-- update successfully responded neighbour in cache
|
||||
maybe (pure ()) (\p -> queueUpdateVerifieds (Identity $ senderID p) ns) $ headMay (Set.elems respSet)
|
||||
maybe (pure ()) (\p -> queueUpdateVerifieds (Identity $ senderID p) (cacheWriteQueue ns)) $ headMay (Set.elems respSet)
|
||||
pure $ if null responsePreds && null responseSuccs
|
||||
then Left "no neighbours returned"
|
||||
else Right (responsePreds, responseSuccs)
|
||||
|
@ -682,17 +744,12 @@ requestLeave ns doMigration target = do
|
|||
, leavePredecessors = predecessors ns
|
||||
, leaveDoMigration = doMigration
|
||||
}
|
||||
responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close (fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid ->
|
||||
Request {
|
||||
requestID = rid
|
||||
, sender = toRemoteNodeState ns
|
||||
, part = 1
|
||||
, isFinalPart = False
|
||||
, action = Leave
|
||||
, payload = Just leavePayload
|
||||
}
|
||||
)
|
||||
) `catch` (\e -> pure . Left $ displayException (e :: IOException))
|
||||
responses <- bracket
|
||||
(mkSendSocket srcAddr (getDomain target) (getDhtPort target))
|
||||
close
|
||||
(fmap Right
|
||||
. sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (mkRequest ns (getNid target) Leave (Just leavePayload))
|
||||
) `catch` (\e -> pure . Left $ displayException (e :: IOException))
|
||||
either
|
||||
-- forward IO error messages
|
||||
(pure . Left)
|
||||
|
@ -708,16 +765,7 @@ requestPing ns target = do
|
|||
let srcAddr = confIP nodeConf
|
||||
responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close
|
||||
(\sock -> do
|
||||
resp <- sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid ->
|
||||
Request {
|
||||
requestID = rid
|
||||
, sender = toRemoteNodeState ns
|
||||
, part = 1
|
||||
, isFinalPart = False
|
||||
, action = Ping
|
||||
, payload = Just PingRequestPayload
|
||||
}
|
||||
) sock
|
||||
resp <- sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (mkRequest ns (getNid target) Ping (Just PingRequestPayload)) sock
|
||||
(SockAddrInet6 _ _ peerAddr _) <- getPeerName sock
|
||||
pure $ Right (peerAddr, resp)
|
||||
) `catch` (\e -> pure . Left $ displayException (e :: IOException))
|
||||
|
@ -733,10 +781,9 @@ requestPing ns target = do
|
|||
-- recompute ID for each received node and mark as verified in cache
|
||||
now <- getPOSIXTime
|
||||
forM_ responseVss (\vs ->
|
||||
let recomputedID = genNodeID peerAddr (getDomain vs) (fromInteger $ getVServerID vs)
|
||||
in if recomputedID == getNid vs
|
||||
then atomically $ writeTQueue (cacheWriteQueue ns) $ addNodeAsVerifiedPure now vs
|
||||
else pure ()
|
||||
if hasValidNodeId (confKChoicesMaxVS nodeConf) vs peerAddr
|
||||
then atomically $ writeTQueue (cacheWriteQueue ns) $ addNodeAsVerifiedPure now vs
|
||||
else pure ()
|
||||
)
|
||||
pure $ if null responseVss
|
||||
then Left "no active vServer IDs returned, ignoring node"
|
||||
|
@ -744,6 +791,37 @@ requestPing ns target = do
|
|||
) responses
|
||||
|
||||
|
||||
-- still need a particular vserver as LocalNodeState, because requests need a sender
|
||||
requestQueryLoad :: (MonadError String m, MonadIO m)
|
||||
=> LocalNodeState s -- ^ the local source vserver for the request
|
||||
-> NodeID -- ^ upper bound of the segment queried, lower bound is set automatically by the queried node
|
||||
-> RemoteNodeState -- ^ target node to query
|
||||
-> m SegmentLoadStats
|
||||
requestQueryLoad ns upperIdBound target = do
|
||||
nodeConf <- nodeConfig <$> liftIO (readTVarIO $ parentRealNode ns)
|
||||
let
|
||||
srcAddr = confIP nodeConf
|
||||
loadReqPl = LoadRequestPayload
|
||||
{ loadSegmentUpperBound = upperIdBound
|
||||
}
|
||||
responses <- liftIO $ bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close
|
||||
(fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (mkRequest ns (getNid target) QueryLoad (Just loadReqPl))
|
||||
) `catch` (\e -> pure . Left $ displayException (e :: IOException))
|
||||
responseMsgSet <- liftEither responses
|
||||
-- throws an error if an exception happened
|
||||
loadResPl <- maybe (throwError "no load response payload found") pure
|
||||
(extractFirstPayload responseMsgSet)
|
||||
pure SegmentLoadStats
|
||||
{ segmentLowerKeyBound = loadSegmentLowerBound loadResPl
|
||||
, segmentUpperKeyBound = upperIdBound
|
||||
, segmentLoad = loadSum loadResPl
|
||||
, segmentOwnerRemainingLoadTarget = loadRemainingTarget loadResPl
|
||||
, segmentOwnerCapacity = loadTotalCapacity loadResPl
|
||||
, segmentCurrentOwner = target
|
||||
}
|
||||
|
||||
|
||||
|
||||
-- | Generic function for sending a request over a connected socket and collecting the response.
|
||||
-- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout.
|
||||
sendRequestTo :: Int -- ^ timeout in milliseconds
|
||||
|
@ -800,24 +878,24 @@ sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do
|
|||
|
||||
-- | enqueue a list of RemoteCacheEntries to be added to the global NodeCache
|
||||
queueAddEntries :: Foldable c => c RemoteCacheEntry
|
||||
-> LocalNodeState s
|
||||
-> TQueue (NodeCache -> NodeCache)
|
||||
-> IO ()
|
||||
queueAddEntries entries ns = do
|
||||
queueAddEntries entries cacheQ = do
|
||||
now <- getPOSIXTime
|
||||
forM_ entries $ \entry -> atomically $ writeTQueue (cacheWriteQueue ns) $ addCacheEntryPure now entry
|
||||
forM_ entries $ \entry -> atomically $ writeTQueue cacheQ $ addCacheEntryPure now entry
|
||||
|
||||
|
||||
-- | enque a list of node IDs to be deleted from the global NodeCache
|
||||
queueDeleteEntries :: Foldable c
|
||||
=> c NodeID
|
||||
-> LocalNodeState s
|
||||
-> TQueue (NodeCache -> NodeCache)
|
||||
-> IO ()
|
||||
queueDeleteEntries ids ns = forM_ ids $ atomically . writeTQueue (cacheWriteQueue ns) . deleteCacheEntry
|
||||
queueDeleteEntries ids cacheQ = forM_ ids $ atomically . writeTQueue cacheQ . deleteCacheEntry
|
||||
|
||||
|
||||
-- | enque a single node ID to be deleted from the global NodeCache
|
||||
queueDeleteEntry :: NodeID
|
||||
-> LocalNodeState s
|
||||
-> TQueue (NodeCache -> NodeCache)
|
||||
-> IO ()
|
||||
queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete
|
||||
|
||||
|
@ -826,11 +904,11 @@ queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete
|
|||
-- global 'NodeCache'.
|
||||
queueUpdateVerifieds :: Foldable c
|
||||
=> c NodeID
|
||||
-> LocalNodeState s
|
||||
-> TQueue (NodeCache -> NodeCache)
|
||||
-> IO ()
|
||||
queueUpdateVerifieds nIds ns = do
|
||||
queueUpdateVerifieds nIds cacheQ = do
|
||||
now <- getPOSIXTime
|
||||
forM_ nIds $ \nid' -> atomically $ writeTQueue (cacheWriteQueue ns) $
|
||||
forM_ nIds $ \nid' -> atomically $ writeTQueue cacheQ $
|
||||
markCacheEntryAsVerified (Just now) nid'
|
||||
|
||||
-- | retry an IO action at most *i* times until it delivers a result
|
||||
|
|
File diff suppressed because it is too large
Load diff
|
@ -7,8 +7,8 @@
|
|||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE RankNTypes #-}
|
||||
|
||||
module Hash2Pub.FediChordTypes (
|
||||
NodeID -- abstract, but newtype constructors cannot be hidden
|
||||
module Hash2Pub.FediChordTypes
|
||||
( NodeID -- abstract, but newtype constructors cannot be hidden
|
||||
, idBits
|
||||
, getNodeID
|
||||
, toNodeID
|
||||
|
@ -18,6 +18,13 @@ module Hash2Pub.FediChordTypes (
|
|||
, RemoteNodeState (..)
|
||||
, RealNode (..)
|
||||
, RealNodeSTM
|
||||
, VSMap
|
||||
, LoadStats (..)
|
||||
, emptyLoadStats
|
||||
, remainingLoadTarget
|
||||
, loadSliceSum
|
||||
, addVserver
|
||||
, SegmentLoadStats (..)
|
||||
, setSuccessors
|
||||
, setPredecessors
|
||||
, NodeCache
|
||||
|
@ -51,6 +58,7 @@ module Hash2Pub.FediChordTypes (
|
|||
, localCompare
|
||||
, genNodeID
|
||||
, genNodeIDBS
|
||||
, hasValidNodeId
|
||||
, genKeyID
|
||||
, genKeyIDBS
|
||||
, byteStringToUInteger
|
||||
|
@ -60,12 +68,14 @@ module Hash2Pub.FediChordTypes (
|
|||
, DHT(..)
|
||||
, Service(..)
|
||||
, ServiceConf(..)
|
||||
) where
|
||||
) where
|
||||
|
||||
import Control.Exception
|
||||
import Data.Foldable (foldr')
|
||||
import Data.Function (on)
|
||||
import qualified Data.Hashable as Hashable
|
||||
import Data.HashMap.Strict (HashMap)
|
||||
import qualified Data.HashMap.Strict as HMap
|
||||
import Data.List (delete, nub, sortBy)
|
||||
import qualified Data.Map.Strict as Map
|
||||
import Data.Maybe (fromJust, fromMaybe, isJust,
|
||||
|
@ -148,17 +158,27 @@ a `localCompare` b
|
|||
-- Also contains shared data and config values.
|
||||
-- TODO: more data structures for k-choices bookkeeping
|
||||
data RealNode s = RealNode
|
||||
{ vservers :: [LocalNodeStateSTM s]
|
||||
-- ^ references to all active versers
|
||||
, nodeConfig :: FediChordConf
|
||||
{ vservers :: VSMap s
|
||||
-- ^ map of all active VServer node IDs to their node state
|
||||
, nodeConfig :: FediChordConf
|
||||
-- ^ holds the initial configuration read at program start
|
||||
, bootstrapNodes :: [(String, PortNumber)]
|
||||
, bootstrapNodes :: [(String, PortNumber)]
|
||||
-- ^ nodes to be used as bootstrapping points, new ones learned during operation
|
||||
, lookupCacheSTM :: TVar LookupCache
|
||||
, lookupCacheSTM :: TVar LookupCache
|
||||
-- ^ a global cache of looked up keys and their associated nodes
|
||||
, nodeService :: s (RealNodeSTM s)
|
||||
, globalNodeCacheSTM :: TVar NodeCache
|
||||
-- ^ EpiChord node cache with expiry times for nodes.
|
||||
, globalCacheWriteQueue :: TQueue (NodeCache -> NodeCache)
|
||||
-- ^ cache updates are not written directly to the 'globalNodeCacheSTM'
|
||||
, nodeService :: s (RealNodeSTM s)
|
||||
}
|
||||
|
||||
-- | insert a new vserver mapping into a node
|
||||
addVserver :: (NodeID, LocalNodeStateSTM s) -> RealNode s -> RealNode s
|
||||
addVserver (key, nstate) node = node
|
||||
{ vservers = addRMapEntry key nstate (vservers node) }
|
||||
|
||||
type VSMap s = RingMap NodeID (LocalNodeStateSTM s)
|
||||
type RealNodeSTM s = TVar (RealNode s)
|
||||
|
||||
-- | represents a node and all its important state
|
||||
|
@ -172,7 +192,7 @@ data RemoteNodeState = RemoteNodeState
|
|||
-- ^ port of the DHT itself
|
||||
, servicePort :: PortNumber
|
||||
-- ^ port of the service provided on top of the DHT
|
||||
, vServerID :: Integer
|
||||
, vServerID :: Word8
|
||||
-- ^ ID of this vserver
|
||||
}
|
||||
deriving (Show, Eq)
|
||||
|
@ -185,9 +205,9 @@ data LocalNodeState s = LocalNodeState
|
|||
{ nodeState :: RemoteNodeState
|
||||
-- ^ represents common data present both in remote and local node representations
|
||||
, nodeCacheSTM :: TVar NodeCache
|
||||
-- ^ EpiChord node cache with expiry times for nodes
|
||||
-- ^ reference to the 'globalNodeCacheSTM'
|
||||
, cacheWriteQueue :: TQueue (NodeCache -> NodeCache)
|
||||
-- ^ cache updates are not written directly to the 'nodeCache' but queued and
|
||||
-- ^ reference to the 'globalCacheWriteQueue
|
||||
, successors :: [RemoteNodeState] -- could be a set instead as these are ordered as well
|
||||
-- ^ successor nodes in ascending order by distance
|
||||
, predecessors :: [RemoteNodeState]
|
||||
|
@ -217,14 +237,14 @@ class NodeState a where
|
|||
getIpAddr :: a -> HostAddress6
|
||||
getDhtPort :: a -> PortNumber
|
||||
getServicePort :: a -> PortNumber
|
||||
getVServerID :: a -> Integer
|
||||
getVServerID :: a -> Word8
|
||||
-- setters for common properties
|
||||
setNid :: NodeID -> a -> a
|
||||
setDomain :: String -> a -> a
|
||||
setIpAddr :: HostAddress6 -> a -> a
|
||||
setDhtPort :: PortNumber -> a -> a
|
||||
setServicePort :: PortNumber -> a -> a
|
||||
setVServerID :: Integer -> a -> a
|
||||
setVServerID :: Word8 -> a -> a
|
||||
toRemoteNodeState :: a -> RemoteNodeState
|
||||
|
||||
instance NodeState RemoteNodeState where
|
||||
|
@ -373,6 +393,11 @@ genNodeID :: HostAddress6 -- ^a node's IPv6 address
|
|||
-> NodeID -- ^the generated @NodeID@
|
||||
genNodeID ip nodeDomain vs = NodeID . byteStringToUInteger $ genNodeIDBS ip nodeDomain vs
|
||||
|
||||
|
||||
hasValidNodeId :: Word8 -> RemoteNodeState -> HostAddress6 -> Bool
|
||||
hasValidNodeId numVs rns addr = getVServerID rns < numVs && getNid rns == genNodeID addr (getDomain rns) (getVServerID rns)
|
||||
|
||||
|
||||
-- | generates a 256 bit long key identifier, represented as ByteString, for looking up its data on the DHT
|
||||
genKeyIDBS :: String -- ^the key string
|
||||
-> BS.ByteString -- ^the key ID represented as a @ByteString@
|
||||
|
@ -427,9 +452,70 @@ data FediChordConf = FediChordConf
|
|||
-- ^ how long to wait until response has arrived, in milliseconds
|
||||
, confRequestRetries :: Int
|
||||
-- ^ how often re-sending a timed-out request can be retried
|
||||
, confEnableKChoices :: Bool
|
||||
-- ^ whether to enable k-choices load balancing
|
||||
, confKChoicesOverload :: Double
|
||||
-- ^ fraction of capacity above which a node considers itself overloaded
|
||||
, confKChoicesUnderload :: Double
|
||||
-- ^ fraction of capacity below which a node considers itself underloaded
|
||||
, confKChoicesMaxVS :: Word8
|
||||
-- ^ upper limit of vserver index κ
|
||||
, confKChoicesRebalanceInterval :: Int
|
||||
-- ^ interval between vserver rebalance attempts
|
||||
}
|
||||
deriving (Show, Eq)
|
||||
|
||||
-- ====== k-choices load balancing types ======
|
||||
|
||||
data LoadStats = LoadStats
|
||||
{ loadPerTag :: RingMap NodeID Double
|
||||
-- ^ map of loads for each handled tag
|
||||
, totalCapacity :: Double
|
||||
-- ^ total designated capacity of the service
|
||||
, compensatedLoadSum :: Double
|
||||
-- ^ effective load reevant for load balancing after compensating for
|
||||
}
|
||||
deriving (Show, Eq)
|
||||
|
||||
-- | calculates the mismatch from the target load by taking into account the
|
||||
-- underload and overload limits
|
||||
remainingLoadTarget :: FediChordConf -> LoadStats -> Double
|
||||
remainingLoadTarget conf lstats = targetLoad - compensatedLoadSum lstats
|
||||
where
|
||||
targetLoad = totalCapacity lstats * (confKChoicesUnderload conf + confKChoicesOverload conf) / 2
|
||||
|
||||
|
||||
-- | calculates the sum of tag load in a contiguous slice between to keys
|
||||
loadSliceSum :: LoadStats
|
||||
-> NodeID -- ^ lower segment bound
|
||||
-> NodeID -- ^ upper segment bound
|
||||
-> Double -- ^ sum of all tag loads within that segment
|
||||
loadSliceSum stats from to = sum . takeRMapSuccessorsFromTo from to $ loadPerTag stats
|
||||
|
||||
|
||||
data SegmentLoadStats = SegmentLoadStats
|
||||
{ segmentLowerKeyBound :: NodeID
|
||||
-- ^ segment start key
|
||||
, segmentUpperKeyBound :: NodeID
|
||||
-- ^ segment end key
|
||||
, segmentLoad :: Double
|
||||
-- ^ sum of load of all keys in the segment
|
||||
, segmentOwnerRemainingLoadTarget :: Double
|
||||
-- ^ remaining load target of the current segment handler:
|
||||
, segmentOwnerCapacity :: Double
|
||||
-- ^ total capacity of the current segment handler node, used for normalisation
|
||||
, segmentCurrentOwner :: RemoteNodeState
|
||||
-- ^ the current owner of the segment that needs to be joined on
|
||||
}
|
||||
|
||||
-- TODO: figure out a better way of initialising
|
||||
emptyLoadStats :: LoadStats
|
||||
emptyLoadStats = LoadStats
|
||||
{ loadPerTag = emptyRMap
|
||||
, totalCapacity = 0
|
||||
, compensatedLoadSum = 0
|
||||
}
|
||||
|
||||
-- ====== Service Types ============
|
||||
|
||||
class Service s d where
|
||||
|
@ -445,6 +531,7 @@ class Service s d where
|
|||
-> IO (Either String ()) -- ^ success or failure
|
||||
-- | Wait for an incoming migration from a given node to succeed, may block forever
|
||||
waitForMigrationFrom :: s d -> NodeID -> IO ()
|
||||
getServiceLoadStats :: s d -> IO LoadStats
|
||||
|
||||
instance Hashable.Hashable NodeID where
|
||||
hashWithSalt salt = Hashable.hashWithSalt salt . getNodeID
|
||||
|
|
|
@ -22,7 +22,7 @@ import qualified Data.DList as D
|
|||
import Data.Either (lefts, rights)
|
||||
import qualified Data.HashMap.Strict as HMap
|
||||
import qualified Data.HashSet as HSet
|
||||
import Data.Maybe (fromJust, isJust)
|
||||
import Data.Maybe (fromJust, fromMaybe, isJust)
|
||||
import Data.String (fromString)
|
||||
import Data.Text.Lazy (Text)
|
||||
import qualified Data.Text.Lazy as Txt
|
||||
|
@ -64,8 +64,10 @@ data PostService d = PostService
|
|||
, migrationsInProgress :: TVar (HMap.HashMap NodeID (MVar ()))
|
||||
, httpMan :: HTTP.Manager
|
||||
, statsQueue :: TQueue StatsEvent
|
||||
, loadStats :: TVar RelayStats
|
||||
-- ^ current load stats, replaced periodically
|
||||
, relayStats :: TVar RelayStats
|
||||
-- ^ current relay stats, replaced periodically
|
||||
, loadStats :: TVar LoadStats
|
||||
-- ^ current load values of the relay, replaced periodically and used by
|
||||
, logFileHandle :: Handle
|
||||
}
|
||||
deriving (Typeable)
|
||||
|
@ -96,7 +98,8 @@ instance DHT d => Service PostService d where
|
|||
migrationsInProgress' <- newTVarIO HMap.empty
|
||||
httpMan' <- HTTP.newManager HTTP.defaultManagerSettings
|
||||
statsQueue' <- newTQueueIO
|
||||
loadStats' <- newTVarIO emptyStats
|
||||
relayStats' <- newTVarIO emptyStats
|
||||
loadStats' <- newTVarIO emptyLoadStats
|
||||
loggingFile <- openFile (confLogfilePath conf) WriteMode
|
||||
hSetBuffering loggingFile LineBuffering
|
||||
let
|
||||
|
@ -112,6 +115,7 @@ instance DHT d => Service PostService d where
|
|||
, migrationsInProgress = migrationsInProgress'
|
||||
, httpMan = httpMan'
|
||||
, statsQueue = statsQueue'
|
||||
, relayStats = relayStats'
|
||||
, loadStats = loadStats'
|
||||
, logFileHandle = loggingFile
|
||||
}
|
||||
|
@ -153,6 +157,12 @@ instance DHT d => Service PostService d where
|
|||
-- block until migration finished
|
||||
takeMVar migrationSynchroniser
|
||||
|
||||
getServiceLoadStats = getLoadStats
|
||||
|
||||
|
||||
getLoadStats :: PostService d -> IO LoadStats
|
||||
getLoadStats serv = readTVarIO $ loadStats serv
|
||||
|
||||
|
||||
-- | return a WAI application
|
||||
postServiceApplication :: DHT d => PostService d -> Application
|
||||
|
@ -835,7 +845,12 @@ evaluateStatsThread serv statsAcc = getPOSIXTime >>= loop
|
|||
-- persistently store in a TVar so it can be retrieved later by the DHT
|
||||
let timePassed = (now - previousTs) * fromIntegral (confSpeedupFactor $ serviceConf serv)
|
||||
rateStats = evaluateStats timePassed summedStats
|
||||
atomically $ writeTVar (loadStats serv) rateStats
|
||||
currentSubscribers <- readTVarIO $ subscribers serv
|
||||
-- translate the rate statistics to load values
|
||||
loads <- evaluateLoadStats rateStats currentSubscribers
|
||||
atomically $
|
||||
writeTVar (relayStats serv) rateStats
|
||||
>> writeTVar (loadStats serv) loads
|
||||
-- and now what? write a log to file
|
||||
-- format: total relayReceiveRates;total relayDeliveryRates;postFetchRate;postPublishRate; subscriberSum
|
||||
-- later: current (reported) load, target load
|
||||
|
@ -859,6 +874,33 @@ evaluateStatsThread serv statsAcc = getPOSIXTime >>= loop
|
|||
0 tagMap
|
||||
|
||||
|
||||
-- | calculate load values from rate statistics
|
||||
evaluateLoadStats :: RelayStats -> RelayTags -> IO LoadStats
|
||||
evaluateLoadStats currentStats currentSubscribers = do
|
||||
-- load caused by each tag: incomingPostRate * ( 1 + subscribers)
|
||||
-- calculate remaining load target: post publish rate * 2.5 - sum loadPerTag - postFetchRate
|
||||
let
|
||||
totalCapacity' = 2.5 * postPublishRate currentStats
|
||||
(loadSum, loadPerTag') <- foldM (\(loadSum, loadPerTag') (key, (subscriberMapSTM, _, _)) -> do
|
||||
numSubscribers <- HMap.size <$> readTVarIO subscriberMapSTM
|
||||
let
|
||||
thisTagRate = fromMaybe 0 $ rMapLookup key (relayReceiveRates currentStats)
|
||||
thisTagLoad = thisTagRate * (1 + fromIntegral numSubscribers)
|
||||
pure (loadSum + thisTagLoad, addRMapEntry key thisTagLoad loadPerTag')
|
||||
)
|
||||
(0, emptyRMap)
|
||||
$ rMapToListWithKeys currentSubscribers
|
||||
let remainingLoadTarget' = totalCapacity' - loadSum - postFetchRate currentStats
|
||||
pure LoadStats
|
||||
{ loadPerTag = loadPerTag'
|
||||
, totalCapacity = totalCapacity'
|
||||
-- load caused by post fetches cannot be influenced by re-balancing nodes,
|
||||
-- but still reduces the totally available capacity
|
||||
, compensatedLoadSum = loadSum + postFetchRate currentStats
|
||||
}
|
||||
|
||||
|
||||
|
||||
-- | Evaluate the accumulated statistic events: Currently mostly calculates the event
|
||||
-- rates by dividing through the collection time frame
|
||||
evaluateStats :: POSIXTime -> RelayStats -> RelayStats
|
||||
|
|
|
@ -16,10 +16,12 @@ data Action = QueryID
|
|||
| Leave
|
||||
| Stabilise
|
||||
| Ping
|
||||
| QueryLoad
|
||||
deriving (Show, Eq, Enum)
|
||||
|
||||
data FediChordMessage = Request
|
||||
{ requestID :: Integer
|
||||
, receiverID :: NodeID
|
||||
, sender :: RemoteNodeState
|
||||
, part :: Integer
|
||||
, isFinalPart :: Bool
|
||||
|
@ -57,6 +59,10 @@ data ActionPayload = QueryIDRequestPayload
|
|||
}
|
||||
| StabiliseRequestPayload
|
||||
| PingRequestPayload
|
||||
| LoadRequestPayload
|
||||
{ loadSegmentUpperBound :: NodeID
|
||||
-- ^ upper bound of segment interested in,
|
||||
}
|
||||
| QueryIDResponsePayload
|
||||
{ queryResult :: QueryResponse
|
||||
}
|
||||
|
@ -73,6 +79,12 @@ data ActionPayload = QueryIDRequestPayload
|
|||
| PingResponsePayload
|
||||
{ pingNodeStates :: [RemoteNodeState]
|
||||
}
|
||||
| LoadResponsePayload
|
||||
{ loadSum :: Double
|
||||
, loadRemainingTarget :: Double
|
||||
, loadTotalCapacity :: Double
|
||||
, loadSegmentLowerBound :: NodeID
|
||||
}
|
||||
deriving (Show, Eq)
|
||||
|
||||
-- | global limit of parts per message used when (de)serialising messages.
|
||||
|
|
|
@ -47,6 +47,13 @@ instance (Bounded k, Ord k) => Foldable (RingMap k) where
|
|||
traversingFL acc (ProxyEntry _ Nothing) = acc
|
||||
traversingFL acc (ProxyEntry _ (Just entry)) = traversingFL acc entry
|
||||
|
||||
instance (Bounded k, Ord k) => Traversable (RingMap k) where
|
||||
traverse f = fmap RingMap . traverse traversingF . getRingMap
|
||||
where
|
||||
traversingF (KeyEntry entry) = KeyEntry <$> f entry
|
||||
traversingF (ProxyEntry to Nothing) = pure $ ProxyEntry to Nothing
|
||||
traversingF (ProxyEntry to (Just entry)) = ProxyEntry to . Just <$> traversingF entry
|
||||
|
||||
|
||||
-- | entry of a 'RingMap' that holds a value and can also
|
||||
-- wrap around the lookup direction at the edges of the name space.
|
||||
|
@ -106,6 +113,23 @@ rMapSize rmap = fromIntegral $ Map.size innerMap - oneIfEntry rmap minBound - on
|
|||
| isNothing (rMapLookup nid rmap') = 1
|
||||
| otherwise = 0
|
||||
|
||||
|
||||
-- | whether the RingMap is empty (except for empty proxy entries)
|
||||
nullRMap :: (Num k, Bounded k, Ord k)
|
||||
=> RingMap k a
|
||||
-> Bool
|
||||
-- basic idea: look for a predecessor starting from the upper Map bound,
|
||||
-- Nothing indicates no entry being found
|
||||
nullRMap = isNothing . rMapLookupPred maxBound
|
||||
|
||||
|
||||
-- | O(logn( Is the key a member of the RingMap?
|
||||
memberRMap :: (Bounded k, Ord k)
|
||||
=> k
|
||||
-> RingMap k a
|
||||
-> Bool
|
||||
memberRMap key = isJust . rMapLookup key
|
||||
|
||||
-- | a wrapper around lookup functions, making the lookup redirectable by a @ProxyEntry@
|
||||
-- to simulate a modular ring
|
||||
lookupWrapper :: (Bounded k, Ord k, Num k)
|
||||
|
@ -198,12 +222,28 @@ deleteRMapEntry nid = RingMap . Map.update modifier nid . getRingMap
|
|||
modifier (ProxyEntry idPointer _) = Just (ProxyEntry idPointer Nothing)
|
||||
modifier KeyEntry {} = Nothing
|
||||
|
||||
-- TODO: rename this to elems
|
||||
rMapToList :: (Bounded k, Ord k) => RingMap k a -> [a]
|
||||
rMapToList = mapMaybe extractRingEntry . Map.elems . getRingMap
|
||||
|
||||
-- TODO: rename this to toList
|
||||
rMapToListWithKeys :: (Bounded k, Ord k) => RingMap k a -> [(k, a)]
|
||||
rMapToListWithKeys = Map.foldrWithKey (\k v acc ->
|
||||
maybe acc (\val -> (k, val):acc) $ extractRingEntry v
|
||||
)
|
||||
[]
|
||||
. getRingMap
|
||||
|
||||
rMapFromList :: (Bounded k, Ord k) => [(k, a)] -> RingMap k a
|
||||
rMapFromList = setRMapEntries
|
||||
|
||||
|
||||
-- | this just merges the underlying 'Map.Map' s and does not check whether the
|
||||
-- ProxyEntry pointers are consistent, so better only create unions of
|
||||
-- equal-pointered RingMaps
|
||||
unionRMap :: (Bounded k, Ord k) => RingMap k a -> RingMap k a -> RingMap k a
|
||||
unionRMap a b = RingMap $ Map.union (getRingMap a) (getRingMap b)
|
||||
|
||||
-- | takes up to i entries from a 'RingMap' by calling a getter function on a
|
||||
-- *startAt* value and after that on the previously returned value.
|
||||
-- Stops once i entries have been taken or an entry has been encountered twice
|
||||
|
|
|
@ -7,6 +7,7 @@ import Control.Concurrent.STM.TVar
|
|||
import Control.Exception
|
||||
import Data.ASN1.Parse (runParseASN1)
|
||||
import qualified Data.ByteString as BS
|
||||
import qualified Data.HashMap.Strict as HMap
|
||||
import qualified Data.Map.Strict as Map
|
||||
import Data.Maybe (fromJust, isJust)
|
||||
import qualified Data.Set as Set
|
||||
|
@ -18,6 +19,7 @@ import Hash2Pub.ASN1Coding
|
|||
import Hash2Pub.DHTProtocol
|
||||
import Hash2Pub.FediChord
|
||||
import Hash2Pub.FediChordTypes
|
||||
import Hash2Pub.RingMap
|
||||
|
||||
spec :: Spec
|
||||
spec = do
|
||||
|
@ -221,14 +223,16 @@ spec = do
|
|||
, exampleNodeState {nid = fromInteger (-5)}
|
||||
]
|
||||
}
|
||||
requestTemplate = Request {
|
||||
requestID = 2342
|
||||
, sender = exampleNodeState
|
||||
, part = 1
|
||||
, isFinalPart = True
|
||||
, action = undefined
|
||||
, payload = undefined
|
||||
}
|
||||
qLoadReqPayload = LoadRequestPayload
|
||||
{ loadSegmentUpperBound = 1025
|
||||
}
|
||||
qLoadResPayload = LoadResponsePayload
|
||||
{ loadSum = 3.141
|
||||
, loadRemainingTarget = -1.337
|
||||
, loadTotalCapacity = 2.21
|
||||
, loadSegmentLowerBound = 12
|
||||
}
|
||||
|
||||
responseTemplate = Response {
|
||||
requestID = 2342
|
||||
, senderID = nid exampleNodeState
|
||||
|
@ -237,7 +241,7 @@ spec = do
|
|||
, action = undefined
|
||||
, payload = undefined
|
||||
}
|
||||
requestWith a pa = requestTemplate {action = a, payload = Just pa}
|
||||
requestWith senderNode a pa = mkRequest senderNode 4545 a (Just pa) 2342
|
||||
responseWith a pa = responseTemplate {action = a, payload = Just pa}
|
||||
|
||||
encodeDecodeAndCheck msg = runParseASN1 parseMessage (encodeMessage msg) `shouldBe` pure msg
|
||||
|
@ -248,17 +252,20 @@ spec = do
|
|||
}
|
||||
|
||||
it "messages are encoded and decoded correctly from and to ASN1" $ do
|
||||
encodeDecodeAndCheck $ requestWith QueryID qidReqPayload
|
||||
encodeDecodeAndCheck $ requestWith Join jReqPayload
|
||||
encodeDecodeAndCheck $ requestWith Leave lReqPayload
|
||||
encodeDecodeAndCheck $ requestWith Stabilise stabReqPayload
|
||||
encodeDecodeAndCheck $ requestWith Ping pingReqPayload
|
||||
localNS <- exampleLocalNode
|
||||
encodeDecodeAndCheck $ requestWith localNS QueryID qidReqPayload
|
||||
encodeDecodeAndCheck $ requestWith localNS Join jReqPayload
|
||||
encodeDecodeAndCheck $ requestWith localNS Leave lReqPayload
|
||||
encodeDecodeAndCheck $ requestWith localNS Stabilise stabReqPayload
|
||||
encodeDecodeAndCheck $ requestWith localNS Ping pingReqPayload
|
||||
encodeDecodeAndCheck $ requestWith localNS QueryLoad qLoadReqPayload
|
||||
encodeDecodeAndCheck $ responseWith QueryID qidResPayload1
|
||||
encodeDecodeAndCheck $ responseWith QueryID qidResPayload2
|
||||
encodeDecodeAndCheck $ responseWith Join jResPayload
|
||||
encodeDecodeAndCheck $ responseWith Leave lResPayload
|
||||
encodeDecodeAndCheck $ responseWith Stabilise stabResPayload
|
||||
encodeDecodeAndCheck $ responseWith Ping pingResPayload
|
||||
encodeDecodeAndCheck $ responseWith QueryLoad qLoadResPayload
|
||||
it "messages are encoded and decoded to ASN.1 DER properly" $
|
||||
deserialiseMessage (fromJust $ Map.lookup 1 (serialiseMessage 652 $ responseWith Ping pingResPayload)) `shouldBe` Right (responseWith Ping pingResPayload)
|
||||
it "messages too large for a single packet can (often) be split into multiple parts" $ do
|
||||
|
@ -297,13 +304,13 @@ exampleNodeState = RemoteNodeState {
|
|||
|
||||
exampleLocalNode :: IO (LocalNodeState MockService)
|
||||
exampleLocalNode = do
|
||||
realNode <- newTVarIO $ RealNode {
|
||||
vservers = []
|
||||
realNodeSTM <- newTVarIO $ RealNode {
|
||||
vservers = emptyRMap
|
||||
, nodeConfig = exampleFediConf
|
||||
, bootstrapNodes = confBootstrapNodes exampleFediConf
|
||||
, nodeService = MockService
|
||||
}
|
||||
nodeStateInit realNode
|
||||
nodeStateInit realNodeSTM 0
|
||||
|
||||
|
||||
exampleFediConf :: FediChordConf
|
||||
|
|
Loading…
Reference in a new issue