diff --git a/FediChord.asn1 b/FediChord.asn1 index eafd303..79b894a 100644 --- a/FediChord.asn1 +++ b/FediChord.asn1 @@ -6,22 +6,20 @@ Domain ::= VisibleString Partnum ::= INTEGER (0..150) -Action ::= ENUMERATED {queryID, join, leave, stabilise, ping, queryLoad} +Action ::= ENUMERATED {queryID, join, leave, stabilise, ping} Request ::= SEQUENCE { action Action, requestID INTEGER (0..4294967295), -- arbitrarily restricting to an unsigned 32bit integer - receiverID NodeID, sender NodeState, part Partnum, -- part number of this message, starts at 1 finalPart BOOLEAN, -- flag indicating this `part` to be the last of this reuest actionPayload CHOICE { queryIDRequestPayload QueryIDRequestPayload, joinRequestPayload JoinRequestPayload, - leaveRequestPayload LeaveRequestPayload, - stabiliseRequestPayload StabiliseRequestPayload, - pingRequestPayload PingRequestPayload, - loadRequestPayload LoadRequestPayload + leaveRequestPayload LeaveRequestPayload, + stabiliseRequestPayload StabiliseRequestPayload, + pingRequestPayload PingRequestPayload } OPTIONAL -- just for symmetry reasons with response, requests without a payload have no meaning } @@ -36,12 +34,11 @@ Response ::= SEQUENCE { finalPart BOOLEAN, -- flag indicating this `part` to be the last of this response action Action, actionPayload CHOICE { - queryIDResponsePayload QueryIDResponsePayload, - joinResponsePayload JoinResponsePayload, + queryIDResponsePayload QueryIDResponsePayload, + joinResponsePayload JoinResponsePayload, leaveResponsePayload LeaveResponsePayload, stabiliseResponsePayload StabiliseResponsePayload, - pingResponsePayload PingResponsePayload, - loadResponsePayload LoadResponsePayload + pingResponsePayload PingResponsePayload } OPTIONAL -- no payload when just ACKing a previous request } @@ -104,15 +101,5 @@ PingRequestPayload ::= NULL -- do not include a node/ vserver ID, so that -- learning all active vserver IDs handled by the server at once PingResponsePayload ::= SEQUENCE OF NodeState -LoadRequestPayload ::= SEQUENCE { - upperSegmentBound NodeID - } - -LoadResponsePayload ::= SEQUENCE { - loadSum REAL, - remainingLoadTarget REAL, - totalCapacity REAL, - lowerBound NodeID - } END diff --git a/Hash2Pub.cabal b/Hash2Pub.cabal index 94fd6a5..376d675 100644 --- a/Hash2Pub.cabal +++ b/Hash2Pub.cabal @@ -91,7 +91,7 @@ executable Hash2Pub -- Base language which the package is written in. default-language: Haskell2010 - ghc-options: -threaded + ghc-options: -threaded -rtsopts -with-rtsopts=-N executable Experiment -- experiment runner diff --git a/Readme.md b/Readme.md index daf9e38..e3cff3d 100644 --- a/Readme.md +++ b/Readme.md @@ -1,7 +1,7 @@ # Hash2Pub ***This is heavily WIP and does not provide any useful functionality yet***. -I aim for always having the master branch at a state where it builds and tests pass. +I aim for always having the `mainline` branch in a state where it builds and tests pass. A fully-decentralised relay for global hashtag federation in [ActivityPub](https://activitypub.rocks) based on a distributed hash table. It allows querying and subscribing to all posts of a certain hashtag and is implemented in Haskell. @@ -10,6 +10,8 @@ This is the practical implementation of the concept presented in the paper [Dece The ASN.1 module schema used for DHT messages can be found in `FediChord.asn1`. +For further questions and discussins, please refer to the **Hash2Pub topic in [SocialHub](https://socialhub.activitypub.rocks/c/software/hash2pub/48)**. + ## Building The project and its developent environment are built with [Nix](https://nixos.org/nix/). diff --git a/app/Experiment.hs b/app/Experiment.hs index a999dea..f2fa586 100644 --- a/app/Experiment.hs +++ b/app/Experiment.hs @@ -40,7 +40,7 @@ executeSchedule :: Int -- ^ speedup factor -> IO () executeSchedule speedup events = do -- initialise HTTP manager - httpMan <- HTTP.newManager HTTP.defaultManagerSettings + httpMan <- HTTP.newManager $ HTTP.defaultManagerSettings { HTTP.managerResponseTimeout = HTTP.responseTimeoutMicro 60000000 } forM_ events $ \(delay, tag, (pubHost, pubPort)) -> do _ <- forkIO $ clientPublishPost httpMan pubHost pubPort ("foobar #" <> tag) diff --git a/app/Main.hs b/app/Main.hs index 043d123..eac223d 100644 --- a/app/Main.hs +++ b/app/Main.hs @@ -18,20 +18,38 @@ main = do -- ToDo: parse and pass config -- probably use `tomland` for that (fConf, sConf) <- readConfig + -- TODO: first initialise 'RealNode', then the vservers -- ToDo: load persisted caches, bootstrapping nodes … - (fediThreads, thisNode) <- fediChordInit fConf (runService sConf :: DHT d => d -> IO (PostService d)) - -- wait for all DHT threads to terminate, this keeps the main thread running - wait fediThreads + (serverSock, thisNode) <- fediChordInit fConf (runService sConf :: DHT d => d -> IO (PostService d)) + -- currently no masking is necessary, as there is nothing to clean up + nodeCacheWriterThread <- forkIO $ nodeCacheWriter thisNode + -- try joining the DHT using one of the provided bootstrapping nodes + joinedState <- tryBootstrapJoining thisNode + either (\err -> do + -- handle unsuccessful join + + putStrLn $ err <> " Error joining, start listening for incoming requests anyways" + print =<< readTVarIO thisNode + -- launch thread attempting to join on new cache entries + _ <- forkIO $ joinOnNewEntriesThread thisNode + wait =<< async (fediMainThreads serverSock thisNode) + ) + (\joinedNS -> do + -- launch main eventloop with successfully joined state + putStrLn "successful join" + wait =<< async (fediMainThreads serverSock thisNode) + ) + joinedState + pure () readConfig :: IO (FediChordConf, ServiceConf) readConfig = do - confDomainString : ipString : portString : servicePortString : speedupString : loadBalancingEnabled : remainingArgs <- getArgs + confDomainString : ipString : portString : servicePortString : speedupString : remainingArgs <- getArgs -- allow starting the initial node without bootstrapping info to avoid -- waiting for timeout let speedup = read speedupString - statsEvalD = 120 * 10^6 `div` speedup confBootstrapNodes' = case remainingArgs of bootstrapHost : bootstrapPortString : _ -> [(bootstrapHost, read bootstrapPortString)] @@ -49,11 +67,6 @@ readConfig = do , confResponsePurgeAge = 60 / fromIntegral speedup , confRequestTimeout = 5 * 10^6 `div` speedup , confRequestRetries = 3 - , confEnableKChoices = loadBalancingEnabled /= "off" - , confKChoicesOverload = 0.9 - , confKChoicesUnderload = 0.1 - , confKChoicesMaxVS = 8 - , confKChoicesRebalanceInterval = round (realToFrac statsEvalD * 1.1 :: Double) } sConf = ServiceConf { confSubscriptionExpiryTime = 24*3600 / fromIntegral speedup @@ -61,7 +74,7 @@ readConfig = do , confServiceHost = confDomainString , confLogfilePath = "../simulationData/logs/" <> confDomainString <> ".log" , confSpeedupFactor = speedup - , confStatsEvalDelay = statsEvalD + , confStatsEvalDelay = 120 * 10^6 `div` speedup } pure (fConf, sConf) diff --git a/src/Hash2Pub/ASN1Coding.hs b/src/Hash2Pub/ASN1Coding.hs index 65f5e21..10177ab 100644 --- a/src/Hash2Pub/ASN1Coding.hs +++ b/src/Hash2Pub/ASN1Coding.hs @@ -184,19 +184,6 @@ encodePayload payload'@PingResponsePayload{} = Start Sequence : concatMap encodeNodeState (pingNodeStates payload') <> [End Sequence] -encodePayload payload'@LoadRequestPayload{} = - [ Start Sequence - , IntVal . getNodeID $ loadSegmentUpperBound payload' - , End Sequence - ] -encodePayload payload'@LoadResponsePayload{} = - [ Start Sequence - , Real $ loadSum payload' - , Real $ loadRemainingTarget payload' - , Real $ loadTotalCapacity payload' - , IntVal . getNodeID $ loadSegmentLowerBound payload' - , End Sequence - ] encodeNodeState :: NodeState a => a -> [ASN1] encodeNodeState ns = [ @@ -206,7 +193,7 @@ encodeNodeState ns = [ , OctetString (ipAddrAsBS $ getIpAddr ns) , IntVal (toInteger . getDhtPort $ ns) , IntVal (toInteger . getServicePort $ ns) - , IntVal (toInteger $ getVServerID ns) + , IntVal (getVServerID ns) , End Sequence ] @@ -228,11 +215,10 @@ encodeQueryResult FORWARD{} = Enumerated 1 encodeMessage :: FediChordMessage -- ^ the 'FediChordMessage to be encoded -> [ASN1] encodeMessage - (Request requestID receiverID sender part isFinalPart action requestPayload) = + (Request requestID sender part isFinalPart action requestPayload) = Start Sequence : (Enumerated . fromIntegral . fromEnum $ action) : IntVal requestID - : (IntVal . getNodeID $ receiverID) : encodeNodeState sender <> [IntVal part , Boolean isFinalPart] @@ -276,20 +262,18 @@ parseMessage = do parseRequest :: Action -> ParseASN1 FediChordMessage parseRequest action = do requestID <- parseInteger - receiverID' <- fromInteger <$> parseInteger sender <- parseNodeState part <- parseInteger isFinalPart <- parseBool hasPayload <- hasNext payload <- if not hasPayload then pure Nothing else Just <$> case action of - QueryID -> parseQueryIDRequestPayload - Join -> parseJoinRequestPayload - Leave -> parseLeaveRequestPayload - Stabilise -> parseStabiliseRequestPayload - Ping -> parsePingRequestPayload - QueryLoad -> parseLoadRequestPayload + QueryID -> parseQueryIDRequest + Join -> parseJoinRequest + Leave -> parseLeaveRequest + Stabilise -> parseStabiliseRequest + Ping -> parsePingRequest - pure $ Request requestID receiverID' sender part isFinalPart action payload + pure $ Request requestID sender part isFinalPart action payload parseResponse :: Integer -> ParseASN1 FediChordMessage parseResponse requestID = do @@ -299,12 +283,11 @@ parseResponse requestID = do action <- parseEnum :: ParseASN1 Action hasPayload <- hasNext payload <- if not hasPayload then pure Nothing else Just <$> case action of - QueryID -> parseQueryIDResponsePayload - Join -> parseJoinResponsePayload - Leave -> parseLeaveResponsePayload - Stabilise -> parseStabiliseResponsePayload - Ping -> parsePingResponsePayload - QueryLoad -> parseLoadResponsePayload + QueryID -> parseQueryIDResponse + Join -> parseJoinResponse + Leave -> parseLeaveResponse + Stabilise -> parseStabiliseResponse + Ping -> parsePingResponse pure $ Response requestID senderID part isFinalPart action payload @@ -322,13 +305,6 @@ parseInteger = do IntVal parsed -> pure parsed x -> throwParseError $ "Expected IntVal but got " <> show x -parseReal :: ParseASN1 Double -parseReal = do - i <- getNext - case i of - Real parsed -> pure parsed - x -> throwParseError $ "Expected Real but got " <> show x - parseEnum :: Enum a => ParseASN1 a parseEnum = do e <- getNext @@ -370,7 +346,7 @@ parseNodeState = onNextContainer Sequence $ do , domain = domain' , dhtPort = dhtPort' , servicePort = servicePort' - , vServerID = fromInteger vServer' + , vServerID = vServer' , ipAddr = ip' } @@ -384,13 +360,13 @@ parseCacheEntry = onNextContainer Sequence $ do parseNodeCache :: ParseASN1 [RemoteCacheEntry] parseNodeCache = onNextContainer Sequence $ getMany parseCacheEntry -parseJoinRequestPayload :: ParseASN1 ActionPayload -parseJoinRequestPayload = do +parseJoinRequest :: ParseASN1 ActionPayload +parseJoinRequest = do parseNull pure JoinRequestPayload -parseJoinResponsePayload :: ParseASN1 ActionPayload -parseJoinResponsePayload = onNextContainer Sequence $ do +parseJoinResponse :: ParseASN1 ActionPayload +parseJoinResponse = onNextContainer Sequence $ do succ' <- onNextContainer Sequence (getMany parseNodeState) pred' <- onNextContainer Sequence (getMany parseNodeState) cache <- parseNodeCache @@ -400,8 +376,8 @@ parseJoinResponsePayload = onNextContainer Sequence $ do , joinCache = cache } -parseQueryIDRequestPayload :: ParseASN1 ActionPayload -parseQueryIDRequestPayload = onNextContainer Sequence $ do +parseQueryIDRequest :: ParseASN1 ActionPayload +parseQueryIDRequest = onNextContainer Sequence $ do targetID <- fromInteger <$> parseInteger lBestNodes <- parseInteger pure $ QueryIDRequestPayload { @@ -409,8 +385,8 @@ parseQueryIDRequestPayload = onNextContainer Sequence $ do , queryLBestNodes = lBestNodes } -parseQueryIDResponsePayload :: ParseASN1 ActionPayload -parseQueryIDResponsePayload = onNextContainer Sequence $ do +parseQueryIDResponse :: ParseASN1 ActionPayload +parseQueryIDResponse = onNextContainer Sequence $ do Enumerated resultType <- getNext result <- case resultType of 0 -> FOUND <$> parseNodeState @@ -420,13 +396,13 @@ parseQueryIDResponsePayload = onNextContainer Sequence $ do queryResult = result } -parseStabiliseRequestPayload :: ParseASN1 ActionPayload -parseStabiliseRequestPayload = do +parseStabiliseRequest :: ParseASN1 ActionPayload +parseStabiliseRequest = do parseNull pure StabiliseRequestPayload -parseStabiliseResponsePayload :: ParseASN1 ActionPayload -parseStabiliseResponsePayload = onNextContainer Sequence $ do +parseStabiliseResponse :: ParseASN1 ActionPayload +parseStabiliseResponse = onNextContainer Sequence $ do succ' <- onNextContainer Sequence (getMany parseNodeState) pred' <- onNextContainer Sequence (getMany parseNodeState) pure $ StabiliseResponsePayload { @@ -434,8 +410,8 @@ parseStabiliseResponsePayload = onNextContainer Sequence $ do , stabilisePredecessors = pred' } -parseLeaveRequestPayload :: ParseASN1 ActionPayload -parseLeaveRequestPayload = onNextContainer Sequence $ do +parseLeaveRequest :: ParseASN1 ActionPayload +parseLeaveRequest = onNextContainer Sequence $ do succ' <- onNextContainer Sequence (getMany parseNodeState) pred' <- onNextContainer Sequence (getMany parseNodeState) doMigration <- parseBool @@ -445,40 +421,19 @@ parseLeaveRequestPayload = onNextContainer Sequence $ do , leaveDoMigration = doMigration } -parseLeaveResponsePayload :: ParseASN1 ActionPayload -parseLeaveResponsePayload = do +parseLeaveResponse :: ParseASN1 ActionPayload +parseLeaveResponse = do parseNull pure LeaveResponsePayload -parsePingRequestPayload :: ParseASN1 ActionPayload -parsePingRequestPayload = do +parsePingRequest :: ParseASN1 ActionPayload +parsePingRequest = do parseNull pure PingRequestPayload -parsePingResponsePayload :: ParseASN1 ActionPayload -parsePingResponsePayload = onNextContainer Sequence $ do +parsePingResponse :: ParseASN1 ActionPayload +parsePingResponse = onNextContainer Sequence $ do handledNodes <- getMany parseNodeState pure $ PingResponsePayload { pingNodeStates = handledNodes } - -parseLoadRequestPayload :: ParseASN1 ActionPayload -parseLoadRequestPayload = onNextContainer Sequence $ do - loadUpperBound' <- fromInteger <$> parseInteger - pure LoadRequestPayload - { loadSegmentUpperBound = loadUpperBound' - } - -parseLoadResponsePayload :: ParseASN1 ActionPayload -parseLoadResponsePayload = onNextContainer Sequence $ do - loadSum' <- parseReal - loadRemainingTarget' <- parseReal - loadTotalCapacity' <- parseReal - loadSegmentLowerBound' <- fromInteger <$> parseInteger - pure LoadResponsePayload - { loadSum = loadSum' - , loadRemainingTarget = loadRemainingTarget' - , loadTotalCapacity = loadTotalCapacity' - , loadSegmentLowerBound = loadSegmentLowerBound' - } - diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index 39eaad2..c86c0f1 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -15,7 +15,6 @@ module Hash2Pub.DHTProtocol , Action(..) , ActionPayload(..) , FediChordMessage(..) - , mkRequest , maximumParts , sendQueryIdMessages , requestQueryID @@ -23,7 +22,6 @@ module Hash2Pub.DHTProtocol , requestLeave , requestPing , requestStabilise - , requestQueryLoad , lookupMessage , sendRequestTo , queryIdLookupLoop @@ -38,7 +36,7 @@ module Hash2Pub.DHTProtocol , isPossibleSuccessor , isPossiblePredecessor , isInOwnResponsibilitySlice - , vsIsJoined + , isJoined , closestCachePredecessors ) where @@ -51,8 +49,7 @@ import Control.Concurrent.STM.TQueue import Control.Concurrent.STM.TVar import Control.Exception import Control.Monad (foldM, forM, forM_, void, when) -import Control.Monad.Except (MonadError (..), liftEither, - runExceptT) +import Control.Monad.Except (MonadError (..), runExceptT) import Control.Monad.IO.Class (MonadIO (..)) import qualified Data.ByteString as BS import Data.Either (rights) @@ -66,7 +63,6 @@ import Data.Maybe (fromJust, fromMaybe, isJust, isNothing, mapMaybe, maybe) import qualified Data.Set as Set import Data.Time.Clock.POSIX -import Data.Word (Word8) import Network.Socket hiding (recv, recvFrom, send, sendTo) import Network.Socket.ByteString @@ -78,27 +74,23 @@ import Hash2Pub.ASN1Coding import Hash2Pub.FediChordTypes (CacheEntry (..), CacheEntry (..), FediChordConf (..), - HasKeyID (..), LoadStats (..), + HasKeyID (..), LocalNodeState (..), LocalNodeStateSTM, NodeCache, NodeID, NodeState (..), RealNode (..), RealNodeSTM, RemoteNodeState (..), RingEntry (..), RingMap (..), - SegmentLoadStats (..), Service (..), addRMapEntry, addRMapEntryWith, cacheGetNodeStateUnvalidated, cacheLookup, cacheLookupPred, cacheLookupSucc, genNodeID, - getKeyID, hasValidNodeId, - loadSliceSum, localCompare, + getKeyID, localCompare, rMapFromList, rMapLookupPred, rMapLookupSucc, - remainingLoadTarget, setPredecessors, setSuccessors) import Hash2Pub.ProtocolTypes -import Hash2Pub.RingMap import Debug.Trace (trace) @@ -111,7 +103,7 @@ queryLocalCache ownState nCache lBestNodes targetID -- as target ID falls between own ID and first predecessor, it is handled by this node -- This only makes sense if the node is part of the DHT by having joined. -- A default answer to nodes querying an unjoined node is provided by 'respondQueryID'. - | vsIsJoined ownState && targetID `isInOwnResponsibilitySlice` ownState = FOUND . toRemoteNodeState $ ownState + | isJoined ownState && targetID `isInOwnResponsibilitySlice` ownState = FOUND . toRemoteNodeState $ ownState -- my interpretation: the "l best next hops" are the l-1 closest preceding nodes and -- the closest succeeding node (like with the p initiated parallel queries | otherwise = FORWARD $ closestSuccessor `Set.union` closestCachePredecessors (lBestNodes-1) targetID nCache @@ -235,8 +227,8 @@ markCacheEntryAsVerified timestamp nid = RingMap . Map.adjust adjustFunc nid . g -- | uses the successor and predecessor list of a node as an indicator for whether a -- node has properly joined the DHT -vsIsJoined :: LocalNodeState s -> Bool -vsIsJoined ns = not . all null $ [successors ns, predecessors ns] +isJoined :: LocalNodeState s -> Bool +isJoined ns = not . all null $ [successors ns, predecessors ns] -- | the size limit to be used when serialising messages for sending sendMessageSize :: Num i => i @@ -245,37 +237,27 @@ sendMessageSize = 1200 -- ====== message send and receive operations ====== -- encode the response to a request that just signals successful receipt -ackRequest :: FediChordMessage -> Map.Map Integer BS.ByteString -ackRequest req@Request{} = serialiseMessage sendMessageSize $ Response { +ackRequest :: NodeID -> FediChordMessage -> Map.Map Integer BS.ByteString +ackRequest ownID req@Request{} = serialiseMessage sendMessageSize $ Response { requestID = requestID req - , senderID = receiverID req + , senderID = ownID , part = part req , isFinalPart = False , action = action req , payload = Nothing } -ackRequest _ = Map.empty - - --- | extract the first payload from a received message set -extractFirstPayload :: Set.Set FediChordMessage -> Maybe ActionPayload -extractFirstPayload msgSet = foldr' (\msg plAcc -> - if isNothing plAcc && isJust (payload msg) - then payload msg - else plAcc - ) Nothing msgSet +ackRequest _ _ = Map.empty -- | Dispatch incoming requests to the dedicated handling and response function, and enqueue -- the response to be sent. handleIncomingRequest :: Service s (RealNodeSTM s) - => Word8 -- ^ maximum number of vservers, because of decision to @dropSpoofedIDs@ in here and not already in @fediMessageHandler@ - -> LocalNodeStateSTM s -- ^ the handling node + => LocalNodeStateSTM s -- ^ the handling node -> TQueue (BS.ByteString, SockAddr) -- ^ send queue -> Set.Set FediChordMessage -- ^ all parts of the request to handle -> SockAddr -- ^ source address of the request -> IO () -handleIncomingRequest vsLimit nsSTM sendQ msgSet sourceAddr = do +handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do ns <- readTVarIO nsSTM -- add nodestate to cache now <- getPOSIXTime @@ -283,20 +265,19 @@ handleIncomingRequest vsLimit nsSTM sendQ msgSet sourceAddr = do Nothing -> pure () Just aPart -> do let (SockAddrInet6 _ _ sourceIP _) = sourceAddr - queueAddEntries (Identity $ RemoteCacheEntry (sender aPart) now) (cacheWriteQueue ns) + queueAddEntries (Identity $ RemoteCacheEntry (sender aPart) now) ns -- distinguish on whether and how to respond. If responding, pass message to response generating function and write responses to send queue maybe (pure ()) ( mapM_ (\resp -> atomically $ writeTQueue sendQ (resp, sourceAddr)) ) =<< (case action aPart of Ping -> Just <$> respondPing nsSTM msgSet - Join -> dropSpoofedIDs vsLimit sourceIP nsSTM msgSet respondJoin + Join -> dropSpoofedIDs sourceIP nsSTM msgSet respondJoin -- ToDo: figure out what happens if not joined QueryID -> Just <$> respondQueryID nsSTM msgSet -- only when joined - Leave -> if vsIsJoined ns then dropSpoofedIDs vsLimit sourceIP nsSTM msgSet respondLeave else pure Nothing - Stabilise -> if vsIsJoined ns then dropSpoofedIDs vsLimit sourceIP nsSTM msgSet respondStabilise else pure Nothing - QueryLoad -> if vsIsJoined ns then Just <$> respondQueryLoad nsSTM msgSet else pure Nothing + Leave -> if isJoined ns then dropSpoofedIDs sourceIP nsSTM msgSet respondLeave else pure Nothing + Stabilise -> if isJoined ns then dropSpoofedIDs sourceIP nsSTM msgSet respondStabilise else pure Nothing ) -- for single part request, response starts with part number 1. For multipart requests, response starts with part number n+1. @@ -306,18 +287,19 @@ handleIncomingRequest vsLimit nsSTM sendQ msgSet sourceAddr = do -- | Filter out requests with spoofed node IDs by recomputing the ID using -- the sender IP. -- For valid (non-spoofed) sender IDs, the passed responder function is invoked. - dropSpoofedIDs :: Word8 -- ^ maximum number of vservers per node - -> HostAddress6 -- ^ msg source address + dropSpoofedIDs :: HostAddress6 -- msg source address -> LocalNodeStateSTM s - -> Set.Set FediChordMessage -- ^ message parts of the request - -> (LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)) -- ^ reponder function to be invoked for valid requests + -> Set.Set FediChordMessage -- message parts of the request + -> (LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)) -- reponder function to be invoked for valid requests -> IO (Maybe (Map.Map Integer BS.ByteString)) - dropSpoofedIDs limVs addr nsSTM' msgSet' responder = + dropSpoofedIDs addr nsSTM' msgSet' responder = let aRequestPart = Set.elemAt 0 msgSet senderNs = sender aRequestPart + givenSenderID = getNid senderNs + recomputedID = genNodeID addr (getDomain senderNs) (fromInteger $ getVServerID senderNs) in - if hasValidNodeId limVs senderNs addr + if recomputedID == givenSenderID then Just <$> responder nsSTM' msgSet' else pure Nothing @@ -335,15 +317,19 @@ respondQueryID nsSTM msgSet = do let aRequestPart = Set.elemAt 0 msgSet senderID = getNid . sender $ aRequestPart - senderPayload = extractFirstPayload msgSet - -- return only empty message serialisation if no payload was included in parts + senderPayload = foldr' (\msg plAcc -> + if isNothing plAcc && isJust (payload msg) + then payload msg + else plAcc + ) Nothing msgSet + -- return only empty message serialisation if no payload was included in parts maybe (pure Map.empty) (\senderPayload' -> do responseMsg <- atomically $ do nsSnap <- readTVar nsSTM cache <- readTVar $ nodeCacheSTM nsSnap let responsePayload = QueryIDResponsePayload { - queryResult = if vsIsJoined nsSnap + queryResult = if isJoined nsSnap then queryLocalCache nsSnap cache (fromIntegral $ queryLBestNodes senderPayload') (queryTargetID senderPayload') -- if not joined yet, attract responsibility for -- all keys to make bootstrapping possible @@ -436,47 +422,6 @@ respondPing nsSTM msgSet = do } pure $ serialiseMessage sendMessageSize pingResponse -respondQueryLoad :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString) -respondQueryLoad nsSTM msgSet = do - nsSnap <- readTVarIO nsSTM - -- this message cannot be split reasonably, so just - -- consider the first payload - let - aRequestPart = Set.elemAt 0 msgSet - senderPayload = extractFirstPayload msgSet - responsePl <- maybe (pure Nothing) (\pl -> - case pl of - LoadRequestPayload{} -> do - parentNode <- readTVarIO (parentRealNode nsSnap) - let - serv = nodeService parentNode - conf = nodeConfig parentNode - lStats <- getServiceLoadStats serv - let - directSucc = getNid . head . predecessors $ nsSnap - handledTagSum = loadSliceSum lStats directSucc (loadSegmentUpperBound pl) - pure $ Just LoadResponsePayload - { loadSum = handledTagSum - , loadRemainingTarget = remainingLoadTarget conf lStats - , loadTotalCapacity = totalCapacity lStats - , loadSegmentLowerBound = directSucc - } - _ -> pure Nothing - ) - senderPayload - - pure $ maybe - Map.empty - (\pl -> serialiseMessage sendMessageSize $ Response - { requestID = requestID aRequestPart - , senderID = getNid nsSnap - , part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1 - , isFinalPart = False - , action = QueryLoad - , payload = Just pl - } - ) responsePl - respondJoin :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString) respondJoin nsSTM msgSet = do @@ -489,7 +434,7 @@ respondJoin nsSTM msgSet = do senderNS = sender aRequestPart -- if not joined yet, attract responsibility for -- all keys to make bootstrapping possible - responsibilityLookup = if vsIsJoined nsSnap then queryLocalCache nsSnap cache 1 (getNid senderNS) else FOUND (toRemoteNodeState nsSnap) + responsibilityLookup = if isJoined nsSnap then queryLocalCache nsSnap cache 1 (getNid senderNS) else FOUND (toRemoteNodeState nsSnap) thisNodeResponsible (FOUND _) = True thisNodeResponsible (FORWARD _) = False -- check whether the joining node falls into our responsibility @@ -536,21 +481,6 @@ respondJoin nsSTM msgSet = do -- ....... request sending ....... --- | defautl constructor for request messages, fills standard values like --- part number to avoid code repition -mkRequest :: LocalNodeState s -> NodeID -> Action -> Maybe ActionPayload -> (Integer -> FediChordMessage) -mkRequest ns targetID action pl rid = Request - { requestID = rid - , receiverID = targetID - , sender = toRemoteNodeState ns - -- part number and final flag can be changed by ASN1 encoder to make packet - -- fit the MTU restrictions - , part = 1 - , isFinalPart = True - , action = action - , payload = pl - } - -- | send a join request and return the joined 'LocalNodeState' including neighbours requestJoin :: (NodeState a, Service s (RealNodeSTM s)) => a -- ^ currently responsible node to be contacted -> LocalNodeStateSTM s -- ^ joining NodeState @@ -562,7 +492,7 @@ requestJoin toJoinOn ownStateSTM = do let srcAddr = confIP nodeConf bracket (mkSendSocket srcAddr (getDomain toJoinOn) (getDhtPort toJoinOn)) close (\sock -> do -- extract own state for getting request information - responses <- sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (mkRequest ownState (getNid toJoinOn) Join (Just JoinRequestPayload)) sock + responses <- sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid -> Request rid (toRemoteNodeState ownState) 1 True Join (Just JoinRequestPayload)) sock (cacheInsertQ, joinedState) <- atomically $ do stateSnap <- readTVar ownStateSTM let @@ -593,7 +523,7 @@ requestJoin toJoinOn ownStateSTM = do writeTVar ownStateSTM newState pure (cacheInsertQ, newState) -- execute the cache insertions - mapM_ (\f -> f (cacheWriteQueue joinedState)) cacheInsertQ + mapM_ (\f -> f joinedState) cacheInsertQ if responses == Set.empty then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn) else do @@ -651,14 +581,14 @@ sendQueryIdMessages :: (Integral i) -> Maybe i -- ^ optionally provide an explicit @l@ parameter of number of nodes to be returned -> [RemoteNodeState] -- ^ nodes to query -> IO QueryResponse -- ^ accumulated response -sendQueryIdMessages lookupID ns lParam targets = do +sendQueryIdMessages targetID ns lParam targets = do -- create connected sockets to all query targets and use them for request handling nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns) let srcAddr = confIP nodeConf queryThreads <- mapM (\resultNode -> async $ bracket (mkSendSocket srcAddr (getDomain resultNode) (getDhtPort resultNode)) close ( - sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (lookupMessage lookupID ns Nothing (getNid resultNode)) + sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (lookupMessage targetID ns Nothing) )) targets -- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613 -- ToDo: exception handling, maybe log them @@ -675,7 +605,7 @@ sendQueryIdMessages lookupID ns lParam targets = do _ -> Set.empty -- forward entries to global cache - queueAddEntries entrySet (cacheWriteQueue ns) + queueAddEntries entrySet ns -- return accumulated QueryResult pure $ case acc of -- once a FOUND as been encountered, return this as a result @@ -691,14 +621,13 @@ sendQueryIdMessages lookupID ns lParam targets = do -- | Create a QueryID message to be supplied to 'sendRequestTo' lookupMessage :: Integral i - => NodeID -- ^ lookup ID to be looked up + => NodeID -- ^ target ID -> LocalNodeState s -- ^ sender node state -> Maybe i -- ^ optionally provide a different l parameter - -> NodeID -- ^ target ID of message destination -> (Integer -> FediChordMessage) -lookupMessage lookupID ns lParam targetID = mkRequest ns targetID QueryID (Just $ pl ns lookupID) +lookupMessage targetID ns lParam = \rID -> Request rID (toRemoteNodeState ns) 1 True QueryID (Just $ pl ns targetID) where - pl ns' lookupID' = QueryIDRequestPayload { queryTargetID = lookupID', queryLBestNodes = maybe (fromIntegral $ lNumBestNodes ns') fromIntegral lParam } + pl ns' targetID' = QueryIDRequestPayload { queryTargetID = targetID', queryLBestNodes = maybe (fromIntegral $ lNumBestNodes ns) fromIntegral lParam } -- | Send a stabilise request to provided 'RemoteNode' and, if successful, @@ -709,7 +638,16 @@ requestStabilise :: LocalNodeState s -- ^ sending node requestStabilise ns neighbour = do nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns) let srcAddr = confIP nodeConf - responses <- bracket (mkSendSocket srcAddr (getDomain neighbour) (getDhtPort neighbour)) close (fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (mkRequest ns (getNid neighbour) Stabilise (Just StabiliseRequestPayload)) + responses <- bracket (mkSendSocket srcAddr (getDomain neighbour) (getDhtPort neighbour)) close (fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid -> + Request { + requestID = rid + , sender = toRemoteNodeState ns + , part = 1 + , isFinalPart = False + , action = Stabilise + , payload = Just StabiliseRequestPayload + } + ) ) `catch` (\e -> pure . Left $ displayException (e :: IOException)) either -- forward IO error messages @@ -722,7 +660,7 @@ requestStabilise ns neighbour = do ) ([],[]) respSet -- update successfully responded neighbour in cache - maybe (pure ()) (\p -> queueUpdateVerifieds (Identity $ senderID p) (cacheWriteQueue ns)) $ headMay (Set.elems respSet) + maybe (pure ()) (\p -> queueUpdateVerifieds (Identity $ senderID p) ns) $ headMay (Set.elems respSet) pure $ if null responsePreds && null responseSuccs then Left "no neighbours returned" else Right (responsePreds, responseSuccs) @@ -744,12 +682,17 @@ requestLeave ns doMigration target = do , leavePredecessors = predecessors ns , leaveDoMigration = doMigration } - responses <- bracket - (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) - close - (fmap Right - . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (mkRequest ns (getNid target) Leave (Just leavePayload)) - ) `catch` (\e -> pure . Left $ displayException (e :: IOException)) + responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close (fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid -> + Request { + requestID = rid + , sender = toRemoteNodeState ns + , part = 1 + , isFinalPart = False + , action = Leave + , payload = Just leavePayload + } + ) + ) `catch` (\e -> pure . Left $ displayException (e :: IOException)) either -- forward IO error messages (pure . Left) @@ -765,7 +708,16 @@ requestPing ns target = do let srcAddr = confIP nodeConf responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close (\sock -> do - resp <- sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (mkRequest ns (getNid target) Ping (Just PingRequestPayload)) sock + resp <- sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid -> + Request { + requestID = rid + , sender = toRemoteNodeState ns + , part = 1 + , isFinalPart = False + , action = Ping + , payload = Just PingRequestPayload + } + ) sock (SockAddrInet6 _ _ peerAddr _) <- getPeerName sock pure $ Right (peerAddr, resp) ) `catch` (\e -> pure . Left $ displayException (e :: IOException)) @@ -781,9 +733,10 @@ requestPing ns target = do -- recompute ID for each received node and mark as verified in cache now <- getPOSIXTime forM_ responseVss (\vs -> - if hasValidNodeId (confKChoicesMaxVS nodeConf) vs peerAddr - then atomically $ writeTQueue (cacheWriteQueue ns) $ addNodeAsVerifiedPure now vs - else pure () + let recomputedID = genNodeID peerAddr (getDomain vs) (fromInteger $ getVServerID vs) + in if recomputedID == getNid vs + then atomically $ writeTQueue (cacheWriteQueue ns) $ addNodeAsVerifiedPure now vs + else pure () ) pure $ if null responseVss then Left "no active vServer IDs returned, ignoring node" @@ -791,37 +744,6 @@ requestPing ns target = do ) responses --- still need a particular vserver as LocalNodeState, because requests need a sender -requestQueryLoad :: (MonadError String m, MonadIO m) - => LocalNodeState s -- ^ the local source vserver for the request - -> NodeID -- ^ upper bound of the segment queried, lower bound is set automatically by the queried node - -> RemoteNodeState -- ^ target node to query - -> m SegmentLoadStats -requestQueryLoad ns upperIdBound target = do - nodeConf <- nodeConfig <$> liftIO (readTVarIO $ parentRealNode ns) - let - srcAddr = confIP nodeConf - loadReqPl = LoadRequestPayload - { loadSegmentUpperBound = upperIdBound - } - responses <- liftIO $ bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close - (fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (mkRequest ns (getNid target) QueryLoad (Just loadReqPl)) - ) `catch` (\e -> pure . Left $ displayException (e :: IOException)) - responseMsgSet <- liftEither responses - -- throws an error if an exception happened - loadResPl <- maybe (throwError "no load response payload found") pure - (extractFirstPayload responseMsgSet) - pure SegmentLoadStats - { segmentLowerKeyBound = loadSegmentLowerBound loadResPl - , segmentUpperKeyBound = upperIdBound - , segmentLoad = loadSum loadResPl - , segmentOwnerRemainingLoadTarget = loadRemainingTarget loadResPl - , segmentOwnerCapacity = loadTotalCapacity loadResPl - , segmentCurrentOwner = target - } - - - -- | Generic function for sending a request over a connected socket and collecting the response. -- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout. sendRequestTo :: Int -- ^ timeout in milliseconds @@ -878,24 +800,24 @@ sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do -- | enqueue a list of RemoteCacheEntries to be added to the global NodeCache queueAddEntries :: Foldable c => c RemoteCacheEntry - -> TQueue (NodeCache -> NodeCache) + -> LocalNodeState s -> IO () -queueAddEntries entries cacheQ = do +queueAddEntries entries ns = do now <- getPOSIXTime - forM_ entries $ \entry -> atomically $ writeTQueue cacheQ $ addCacheEntryPure now entry + forM_ entries $ \entry -> atomically $ writeTQueue (cacheWriteQueue ns) $ addCacheEntryPure now entry -- | enque a list of node IDs to be deleted from the global NodeCache queueDeleteEntries :: Foldable c => c NodeID - -> TQueue (NodeCache -> NodeCache) + -> LocalNodeState s -> IO () -queueDeleteEntries ids cacheQ = forM_ ids $ atomically . writeTQueue cacheQ . deleteCacheEntry +queueDeleteEntries ids ns = forM_ ids $ atomically . writeTQueue (cacheWriteQueue ns) . deleteCacheEntry -- | enque a single node ID to be deleted from the global NodeCache queueDeleteEntry :: NodeID - -> TQueue (NodeCache -> NodeCache) + -> LocalNodeState s -> IO () queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete @@ -904,11 +826,11 @@ queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete -- global 'NodeCache'. queueUpdateVerifieds :: Foldable c => c NodeID - -> TQueue (NodeCache -> NodeCache) + -> LocalNodeState s -> IO () -queueUpdateVerifieds nIds cacheQ = do +queueUpdateVerifieds nIds ns = do now <- getPOSIXTime - forM_ nIds $ \nid' -> atomically $ writeTQueue cacheQ $ + forM_ nIds $ \nid' -> atomically $ writeTQueue (cacheWriteQueue ns) $ markCacheEntryAsVerified (Just now) nid' -- | retry an IO action at most *i* times until it delivers a result diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index ab413cf..9f14a1e 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -63,20 +63,15 @@ import Control.Exception import Control.Monad (forM_, forever) import Control.Monad.Except import Crypto.Hash -import Data.Bifunctor (first) import qualified Data.ByteArray as BA import qualified Data.ByteString as BS import qualified Data.ByteString.UTF8 as BSU import Data.Either (rights) import Data.Foldable (foldr') import Data.Functor.Identity -import Data.HashMap.Strict (HashMap) -import qualified Data.HashMap.Strict as HMap -import Data.HashSet (HashSet) -import qualified Data.HashSet as HSet import Data.IP (IPv6, fromHostAddress6, toHostAddress6) -import Data.List (sortBy, sortOn, (\\)) +import Data.List ((\\)) import qualified Data.Map.Strict as Map import Data.Maybe (catMaybes, fromJust, fromMaybe, isJust, isNothing, mapMaybe) @@ -92,7 +87,6 @@ import System.Random (randomRIO) import Hash2Pub.DHTProtocol import Hash2Pub.FediChordTypes -import Hash2Pub.RingMap import Hash2Pub.Utils import Debug.Trace (trace) @@ -102,87 +96,50 @@ import Debug.Trace (trace) fediChordInit :: (Service s (RealNodeSTM s)) => FediChordConf -> (RealNodeSTM s -> IO (s (RealNodeSTM s))) -- ^ runner function for service - -> IO (Async (), RealNodeSTM s) + -> IO (Socket, LocalNodeStateSTM s) fediChordInit initConf serviceRunner = do emptyLookupCache <- newTVarIO Map.empty - cacheSTM <- newTVarIO initCache - cacheQ <- atomically newTQueue - let realNode = RealNode - { vservers = emptyRMap + let realNode = RealNode { + vservers = [] , nodeConfig = initConf , bootstrapNodes = confBootstrapNodes initConf , lookupCacheSTM = emptyLookupCache , nodeService = undefined - , globalNodeCacheSTM = cacheSTM - , globalCacheWriteQueue = cacheQ - } + } realNodeSTM <- newTVarIO realNode - serverSock <- mkServerSocket (confIP initConf) (fromIntegral $ confDhtPort initConf) -- launch service and set the reference in the RealNode serv <- serviceRunner realNodeSTM atomically . modifyTVar' realNodeSTM $ \rn -> rn { nodeService = serv } - -- prepare for joining: start node cache writer thread - -- currently no masking is necessary, as there is nothing to clean up - nodeCacheWriterThread <- forkIO $ nodeCacheWriter realNodeSTM - fediThreadsAsync <- - either (\err -> do - -- handle unsuccessful join - putStrLn $ err <> " Error joining, start listening for incoming requests anyways" - -- add an unjoined placeholder vserver to be able to listen for - -- incoming request - placeholderVS <- nodeStateInit realNodeSTM 0 - placeholderVSSTM <- newTVarIO placeholderVS - atomically . modifyTVar' realNodeSTM $ - addVserver (getNid placeholderVS, placeholderVSSTM) - -- launch thread attempting to join on new cache entries - _ <- forkIO $ joinOnNewEntriesThread realNodeSTM - async (fediMainThreads serverSock realNodeSTM) - ) - (\_ -> do - -- launch main eventloop with successfully joined state - putStrLn "successful join" - async (fediMainThreads serverSock realNodeSTM) - ) - =<< tryBootstrapJoining realNodeSTM - pure (fediThreadsAsync, realNodeSTM) - - --- | Create a new vserver and join it through a provided remote node. --- TODO: Many fediChord* functions already cover parts of this, refactor these to use --- this function. -fediChordJoinNewVs :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s)) - => RealNodeSTM s -- ^ parent real node - -> Word8 -- ^ vserver ID - -> RemoteNodeState -- ^ target node to join on - -> m (NodeID, LocalNodeStateSTM s) -- ^ on success: (vserver ID, TVar of vserver) -fediChordJoinNewVs nodeSTM vsId target = do - newVs <- liftIO $ nodeStateInit nodeSTM vsId - newVsSTM <- liftIO $ newTVarIO newVs - liftIO . putStrLn $ "Trying to join on " <> show (getNid target) - liftIO $ putStrLn "send a Join" - _ <- liftIO . requestJoin target $ newVsSTM - pure (getNid newVs, newVsSTM) + -- initialise a single vserver + initialState <- nodeStateInit realNodeSTM + initialStateSTM <- newTVarIO initialState + -- add vserver to list at RealNode + atomically . modifyTVar' realNodeSTM $ \rn -> rn { vservers = initialStateSTM:vservers rn } + serverSock <- mkServerSocket (getIpAddr initialState) (getDhtPort initialState) + pure (serverSock, initialStateSTM) -- | initialises the 'NodeState' for this local node. -- Separated from 'fediChordInit' to be usable in tests. -nodeStateInit :: Service s (RealNodeSTM s) => RealNodeSTM s -> Word8 -> IO (LocalNodeState s) -nodeStateInit realNodeSTM vsID' = do +nodeStateInit :: Service s (RealNodeSTM s) => RealNodeSTM s -> IO (LocalNodeState s) +nodeStateInit realNodeSTM = do realNode <- readTVarIO realNodeSTM + cacheSTM <- newTVarIO initCache + q <- atomically newTQueue let conf = nodeConfig realNode - vsID = vsID' + vsID = 0 containedState = RemoteNodeState { domain = confDomain conf , ipAddr = confIP conf - , nid = genNodeID (confIP conf) (confDomain conf) vsID + , nid = genNodeID (confIP conf) (confDomain conf) $ fromInteger vsID , dhtPort = toEnum $ confDhtPort conf , servicePort = getListeningPortFromService $ nodeService realNode , vServerID = vsID } initialState = LocalNodeState { nodeState = containedState - , nodeCacheSTM = globalNodeCacheSTM realNode - , cacheWriteQueue = globalCacheWriteQueue realNode + , nodeCacheSTM = cacheSTM + , cacheWriteQueue = q , successors = [] , predecessors = [] , kNeighbours = 3 @@ -193,377 +150,105 @@ nodeStateInit realNodeSTM vsID' = do } pure initialState - --- | Joins a 'RealNode' to the DHT by joining several vservers, trying to match --- the own load target best. --- Triggers 'kChoicesVsJoin' -kChoicesNodeJoin :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s)) - => RealNodeSTM s - -> Maybe (String, PortNumber) -- ^ domain and port of a bootstrapping node, if bootstrap joining - -> m () -kChoicesNodeJoin nodeSTM bootstrapNode = do - node <- liftIO $ readTVarIO nodeSTM - -- use vserver 0 as origin of bootstrapping messages - vs0 <- liftIO $ nodeStateInit nodeSTM 0 - vs0STM <- liftIO $ newTVarIO vs0 - - ownLoadStats <- liftIO . getServiceLoadStats . nodeService $ node - let - conf = nodeConfig node - -- T_a of k-choices - -- compute load target - joinLoadTarget = totalCapacity ownLoadStats * (confKChoicesUnderload conf + confKChoicesOverload conf) / 2 - initialJoins = confKChoicesMaxVS conf `div` 2 - -- edge case: however small the target is, at least join 1 vs - -- kCoicesVsJoin until target is met – unless there's already an active & joined VS causing enough load - alreadyJoinedVss <- liftIO $ foldM (\sumAcc vsSTM -> readTVarIO vsSTM >>= (\vs -> pure . (+) sumAcc $ if vsIsJoined vs then 1 else 0)) 0 $ vservers node - unless (alreadyJoinedVss > 0 && compensatedLoadSum ownLoadStats >= joinLoadTarget) $ do - joinedVss <- vsJoins vs0 (totalCapacity ownLoadStats) (vservers node) joinLoadTarget (fromIntegral initialJoins - alreadyJoinedVss) nodeSTM - if nullRMap joinedVss - then throwError "k-choices join unsuccessful, no vserver joined" - else liftIO . atomically . modifyTVar' nodeSTM $ \node' -> node' - { vservers = unionRMap joinedVss (vservers node') } - - where - vsJoins :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s)) - => LocalNodeState s -> Double -> VSMap s -> Double -> Int -> RealNodeSTM s -> m (VSMap s) - vsJoins _ _ vsmap _ 0 _ = pure vsmap - vsJoins queryVs capacity vsmap remainingTargetLoad remainingJoins nodeSTM' - | remainingTargetLoad <= 0 = pure vsmap - | otherwise = do - - (acquiredLoad, (newNid, newVs)) <- kChoicesVsJoin queryVs bootstrapNode capacity vsmap nodeSTM' remainingTargetLoad - -- on successful vserver join add the new VS to node and recurse - vsJoins queryVs capacity (addRMapEntry newNid newVs vsmap) (remainingTargetLoad - acquiredLoad) (pred remainingJoins) nodeSTM' - -- on error, just reduce the amount of tries and retry - `catchError` (\e -> liftIO (putStrLn e) >> vsJoins queryVs capacity vsmap remainingTargetLoad (pred remainingJoins) nodeSTM') - - -- error cause 1: not a single queried node has responded -> indicates permanent failure - -- error cause 2: only a certain join failed, just ignore that join target for now, but problem: it will be the chosen - -- target even at the next attempt again - -- `catchError` (const . - -kChoicesVsJoin :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s)) - => LocalNodeState s -- ^ vserver to be used for querying - -> Maybe (String, PortNumber) -- ^ domain and port of a bootstrapping node, if bootstrapping - -> Double -- ^ own capacity - -> VSMap s -- ^ currently active VServers - -> RealNodeSTM s -- ^ parent node is needed for initialising a new vserver - -> Double -- ^ remaining load target - -> m (Double, (NodeID, LocalNodeStateSTM s)) -- ^ on success return tuple of acquired load and newly acquired vserver -kChoicesVsJoin queryVs bootstrapNode capacity activeVss nodeSTM remainingTarget = do - conf <- nodeConfig <$> liftIO (readTVarIO nodeSTM) - -- generate all possible vs IDs - segmentLoads <- kChoicesSegmentLoads conf queryVs bootstrapNode activeVss - -- cost calculation and sort by cost - -- edge case: no possible ID has responded - (mincost, vsId, toJoinOn) <- maybe (throwError "received no load information") pure - . headMay - . sortOn (\(cost, _, _) -> cost) - . fmap (\(segment, vsId, toJoinOn) -> (kChoicesJoinCost remainingTarget capacity segment, vsId, toJoinOn)) - $ segmentLoads - -- join at min cost - joinedNode <- fediChordJoinNewVs nodeSTM vsId toJoinOn - -- idea: a single join failure shall not make the whole process fail - --`catchError` - pure (mincost, joinedNode) - - -- Possible optimisation: - -- Instead of sampling all join candidates again at each invocation, querying - -- all segment loads before the first join and trying to re-use these - -- load information can save round trips. - -- possible edge case: detect when joining a subsegment of one already owned - -- joining into own segments => When first joining into segment S1 and then - -- later joining into the subsegment S2, the - -- resulting load l(S1+S2) = l(S1) != l(S1) + l(S2) - -- => need to re-query the load of both S1 and S2 - -- possible edge case 2: taking multiple segments from the same owner - -- changes the remainingLoadTarget at each vsJoin. This target change - -- needs to be accounted for starting from the 2nd vsJoin. - - --- | query the load of all still unjoined VS positions -kChoicesSegmentLoads :: (Service s (RealNodeSTM s), MonadError String m, MonadIO m) - => FediChordConf -- ^ config params needed for generating all possible VSs - -> LocalNodeState s -- ^ vserver to be used for querying - -> Maybe (String, PortNumber) -- ^ domain and port of a bootstrapping node, if bootstrapping - -> VSMap s -- ^ currently active VServers - -> m [(SegmentLoadStats, Word8, RemoteNodeState)] -kChoicesSegmentLoads conf queryVs bootstrapNode activeVss = do - let - -- tuples of node IDs and vserver IDs, because vserver IDs are needed for - -- LocalNodeState creation - nonJoinedIDs = filter (not . flip memberRMap activeVss . fst) [ (genNodeID (confIP conf) (confDomain conf) v, v) | v <- [0..pred (confKChoicesMaxVS conf)]] - - -- query load of all possible segments - -- simplification: treat each load lookup failure as a general unavailability of that segment - -- TODO: retries for transient failures - -- TODO: parallel queries - fmap catMaybes . forM nonJoinedIDs $ (\(vsNid, vsId) -> (do - -- if bootstrap node is provided, do initial lookup via that - currentlyResponsible <- maybe - (requestQueryID queryVs vsNid) - (\bs -> bootstrapQueryId queryVs bs vsNid) - bootstrapNode - segment <- requestQueryLoad queryVs vsNid currentlyResponsible - pure $ Just (segment, vsId, currentlyResponsible) - -- store segment stats and vserver ID together, so it's clear - -- which vs needs to be joined to acquire this segment - ) `catchError` const (pure Nothing) - ) - - - - -kChoicesJoinCost :: Double -- ^ own remaining load target - -> Double -- ^ own capacity - -> SegmentLoadStats -- ^ load stats of neighbour vs - -> Double -kChoicesJoinCost remainingOwnLoad ownCap segment = - abs (segmentOwnerRemainingLoadTarget segment + segmentLoad segment) / segmentOwnerCapacity segment - + abs (remainingOwnLoad - segmentLoad segment) / ownCap - - abs (segmentOwnerRemainingLoadTarget segment) / segmentOwnerCapacity segment - -kChoicesDepartureCost :: Double -- ^ own remaining load target - -> Double -- ^ own capacity - -> Double -- ^ load of own segment to hand over - -> SegmentLoadStats -- ^ load stats of neighbour VS - -> Double -kChoicesDepartureCost remainingOwnLoad ownCap thisSegmentLoad segment = - abs (segmentOwnerRemainingLoadTarget segment - thisSegmentLoad) / segmentOwnerCapacity segment - + abs (remainingOwnLoad + thisSegmentLoad) / ownCap - - abs (segmentOwnerRemainingLoadTarget segment) / segmentOwnerCapacity segment - - -kChoicesRebalanceThread :: (Service s (RealNodeSTM s)) => RealNodeSTM s -> IO () -kChoicesRebalanceThread nodeSTM = do - interval <- confKChoicesRebalanceInterval . nodeConfig <$> readTVarIO nodeSTM - runExceptT $ loop interval - pure () - where - loop interval = rebalanceVS interval `catchError` \_ -> loop interval - rebalanceVS :: (MonadError String m, MonadIO m) => Int -> m () - rebalanceVS interval = do - liftIO $ threadDelay interval - node <- liftIO $ readTVarIO nodeSTM - let - activeVssSTM = vservers node - conf = nodeConfig node - -- use an active vserver for load queries - queryVsSTM <- maybe (throwError "no active vserver") pure - $ headMay (rMapToList activeVssSTM) - queryVs <- liftIO . readTVarIO $ queryVsSTM - -- TODO: segment load and neighbour load queries can be done in parallel - -- query load of all existing VSes neighbours - -- TODO: what happens if neighbour is one of our own vservers? - neighbourLoadFetches <- liftIO . forM activeVssSTM $ async . (\vsSTM -> runExceptT $ do - vs <- liftIO . readTVarIO $ vsSTM - succNode <- maybe - (throwError "vs has no successor") - pure - . headMay . successors $ vs - neighbourStats <- requestQueryLoad queryVs (getNid succNode) succNode - pure (getNid succNode, neighbourStats) - ) - -- TODO: deal with exceptions - -- TODO: better handling of nested Eithers - -- so far this is a RingMap NodeID (Either SomeException (Either String (NodeID, SegmentLoadStats))) - neighbourLoads <- liftIO . mapM waitCatch $ neighbourLoadFetches - -- get local load stats - ownLoadStats <- liftIO . getServiceLoadStats . nodeService $ node - -- calculate all departure costs - let - departureCosts = - sortOn (\(cost, _, _) -> cost) - . foldl (\acc (ownVsId, neighbourLoad) -> case neighbourLoad of - Right (Right (neighbourId, neighbourStats)) -> - let - ownRemainingTarget = remainingLoadTarget conf ownLoadStats - thisSegmentLoad = loadSliceSum ownLoadStats ownVsId neighbourId - in - ( kChoicesDepartureCost ownRemainingTarget (totalCapacity ownLoadStats) thisSegmentLoad neighbourStats - , thisSegmentLoad - , ownVsId) - :acc - _ -> acc - ) - [] - $ rMapToListWithKeys neighbourLoads - -- select VS with lowest departure cost - (lowestDepartionCost, departingSegmentLoad, lowestCostDeparter) <- maybe - (throwError "not enough data for calculating departure costs") - pure - $ headMay departureCosts - -- query load of all possible available VS IDs - segmentLoads <- kChoicesSegmentLoads conf queryVs Nothing activeVssSTM - -- calculate all relocation costs of that VS - (joinCost, toJoinOn) <- - maybe (throwError "got no segment loads") pure - . headMay - . sortOn fst - . fmap (\(segment, vsId, toJoinOn) -> - let joinCosts = kChoicesJoinCost - -- when relocating a node, the load of the departing node is freed - (remainingLoadTarget conf ownLoadStats + departingSegmentLoad) - (totalCapacity ownLoadStats) - segment - in - (joinCosts, segmentCurrentOwner segment) - ) - $ segmentLoads - - -- if deciding to re-balance, first leave and then join - -- combined costs need to be a gain (negative) and that gain needs - -- to be larger than Epsilon - when (lowestDepartionCost + joinCost <= negate kChoicesEpsilon) $ do - liftIO . putStrLn $ "here will be a relocation!" - -- loop - rebalanceVS interval - - - --- TODO: make parameterisable --- | dampening factor constant for deciding whether the match gain is worth relocating -kChoicesEpsilon :: Double -kChoicesEpsilon = 0.05 - -- | Join a new node into the DHT, using a provided bootstrap node as initial cache seed -- for resolving the new node's position. fediChordBootstrapJoin :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -- ^ the local 'NodeState' -> (String, PortNumber) -- ^ domain and port of a bootstrapping node - -> IO (Either String ()) -- ^ the joined 'NodeState' after a + -> IO (Either String (LocalNodeStateSTM s)) -- ^ the joined 'NodeState' after a -- successful join, otherwise an error message fediChordBootstrapJoin nsSTM bootstrapNode = do -- can be invoked multiple times with all known bootstrapping nodes until successfully joined ns <- readTVarIO nsSTM runExceptT $ do -- 1. get routed to the currently responsible node - currentlyResponsible <- bootstrapQueryId ns bootstrapNode $ getNid ns + lookupResp <- liftIO $ bootstrapQueryId nsSTM bootstrapNode $ getNid ns + currentlyResponsible <- liftEither lookupResp liftIO . putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible) -- 2. then send a join to the currently responsible node liftIO $ putStrLn "send a bootstrap Join" - _ <- liftEither =<< liftIO (requestJoin currentlyResponsible nsSTM) - pure () + joinResult <- liftIO $ requestJoin currentlyResponsible nsSTM + liftEither joinResult --- Periodically lookup own IDs through a random bootstrapping node to discover and merge separated DHT clusters. +-- Periodically lookup own ID through a random bootstrapping node to discover and merge separated DHT clusters. -- Unjoined try joining instead. -convergenceSampleThread :: Service s (RealNodeSTM s) => RealNodeSTM s -> IO () -convergenceSampleThread nodeSTM = forever $ do - node <- readTVarIO nodeSTM - forM_ (vservers node) $ \nsSTM -> do - nsSnap <- readTVarIO nsSTM - parentNode <- readTVarIO $ parentRealNode nsSnap - if vsIsJoined nsSnap - then - runExceptT (do - -- joined node: choose random node, do queryIDLoop, compare result with own responsibility - let bss = bootstrapNodes parentNode - randIndex <- liftIO $ randomRIO (0, length bss - 1) - chosenNode <- maybe (throwError "invalid bootstrapping node index") pure $ atMay bss randIndex - currentlyResponsible <- bootstrapQueryId nsSnap chosenNode (getNid nsSnap) - if getNid currentlyResponsible /= getNid nsSnap - -- if mismatch, stabilise on the result, else do nothing - then do - stabResult <- liftIO $ requestStabilise nsSnap currentlyResponsible - (preds, succs) <- liftEither stabResult - -- TODO: verify neighbours before adding, see #55 - liftIO . atomically $ do - ns <- readTVar nsSTM - writeTVar nsSTM $ addPredecessors preds ns - else pure () - ) >> pure () - -- unjoined node: try joining through all bootstrapping nodes - else tryBootstrapJoining nodeSTM >> pure () - - let delaySecs = confBootstrapSamplingInterval . nodeConfig $ node +convergenceSampleThread :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> IO () +convergenceSampleThread nsSTM = forever $ do + nsSnap <- readTVarIO nsSTM + parentNode <- readTVarIO $ parentRealNode nsSnap + if isJoined nsSnap + then + runExceptT (do + -- joined node: choose random node, do queryIDLoop, compare result with own responsibility + let bss = bootstrapNodes parentNode + randIndex <- liftIO $ randomRIO (0, length bss - 1) + chosenNode <- maybe (throwError "invalid bootstrapping node index") pure $ atMay bss randIndex + lookupResult <- liftIO $ bootstrapQueryId nsSTM chosenNode (getNid nsSnap) + currentlyResponsible <- liftEither lookupResult + if getNid currentlyResponsible /= getNid nsSnap + -- if mismatch, stabilise on the result, else do nothing + then do + stabResult <- liftIO $ requestStabilise nsSnap currentlyResponsible + (preds, succs) <- liftEither stabResult + -- TODO: verify neighbours before adding, see #55 + liftIO . atomically $ do + ns <- readTVar nsSTM + writeTVar nsSTM $ addPredecessors preds ns + else pure () + ) >> pure () + -- unjoined node: try joining through all bootstrapping nodes + else tryBootstrapJoining nsSTM >> pure () + let delaySecs = confBootstrapSamplingInterval . nodeConfig $ parentNode threadDelay delaySecs -- | Try joining the DHT through any of the bootstrapping nodes until it succeeds. -tryBootstrapJoining :: Service s (RealNodeSTM s) => RealNodeSTM s -> IO (Either String ()) -tryBootstrapJoining nodeSTM = do - node <- readTVarIO nodeSTM - let - bss = bootstrapNodes node - conf = nodeConfig node - if confEnableKChoices conf - then tryJoining bss $ runExceptT . kChoicesNodeJoin nodeSTM . Just - else do - firstVS <- nodeStateInit nodeSTM 0 - firstVSSTM <- newTVarIO firstVS - joinResult <- tryJoining bss (fediChordBootstrapJoin firstVSSTM) - either - (pure . Left) - (\_ -> fmap Right . atomically . modifyTVar' nodeSTM $ - addVserver (getNid firstVS, firstVSSTM) - ) - (joinResult :: Either String ()) - +tryBootstrapJoining :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> IO (Either String (LocalNodeStateSTM s)) +tryBootstrapJoining nsSTM = do + bss <- atomically $ do + nsSnap <- readTVar nsSTM + realNodeSnap <- readTVar $ parentRealNode nsSnap + pure $ bootstrapNodes realNodeSnap + tryJoining bss where - tryJoining :: [(String, PortNumber)] -> ((String, PortNumber) -> IO (Either String ())) -> IO (Either String ()) - tryJoining (bn:bns) joinFunc = do - j <- joinFunc bn + tryJoining (bn:bns) = do + j <- fediChordBootstrapJoin nsSTM bn case j of - Left err -> putStrLn ("join error: " <> err) >> tryJoining bns joinFunc - Right joined -> pure $ Right () - tryJoining [] _ = pure $ Left "Exhausted all bootstrap points for joining." + Left err -> putStrLn ("join error: " <> err) >> tryJoining bns + Right joined -> pure $ Right joined + tryJoining [] = pure $ Left "Exhausted all bootstrap points for joining." -- | Look up a key just based on the responses of a single bootstrapping node. -bootstrapQueryId :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s)) - => LocalNodeState s - -> (String, PortNumber) - -> NodeID - -> m RemoteNodeState -bootstrapQueryId ns (bootstrapHost, bootstrapPort) targetID = do - nodeConf <- nodeConfig <$> liftIO (readTVarIO $ parentRealNode ns) +bootstrapQueryId :: LocalNodeStateSTM s -> (String, PortNumber) -> NodeID -> IO (Either String RemoteNodeState) +bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do + ns <- readTVarIO nsSTM + nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns) let srcAddr = confIP nodeConf - -- IP address needed for ID generation, so look it up - bootstrapAddr <- addrAddress <$> liftIO (resolve (Just bootstrapHost) (Just bootstrapPort)) - bootstrapIP <- case bootstrapAddr of - SockAddrInet6 _ _ bootstrapIP _ -> pure bootstrapIP - _ -> throwError $ "Expected an IPv6 address, but got " <> show bootstrapAddr - let possibleJoinIDs = - [ genNodeID bootstrapIP bootstrapHost v | v <- [0..pred ( - if confEnableKChoices nodeConf then confKChoicesMaxVS nodeConf else 1)]] - tryQuery ns srcAddr nodeConf possibleJoinIDs - where - -- | try bootstrapping a query through any possible ID of the - -- given bootstrap node - tryQuery :: (MonadError String m, MonadIO m) - => LocalNodeState s - -> HostAddress6 - -> FediChordConf - -> [NodeID] - -> m RemoteNodeState - tryQuery _ _ _ [] = throwError $ "No ID of " <> show bootstrapHost <> " has responded." - tryQuery ns srcAddr nodeConf (bnid:bnids) = (do - bootstrapResponse <- liftIO $ bracket (mkSendSocket srcAddr bootstrapHost bootstrapPort) close ( - -- Initialise an empty cache only with the responses from a bootstrapping node - fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (lookupMessage targetID ns Nothing bnid) - ) - `catch` (\e -> pure . Left $ "Error at bootstrap QueryId: " <> displayException (e :: IOException)) + bootstrapResponse <- bracket (mkSendSocket srcAddr bootstrapHost bootstrapPort) close ( + -- Initialise an empty cache only with the responses from a bootstrapping node + fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (lookupMessage targetID ns Nothing) + ) + `catch` (\e -> pure . Left $ "Error at bootstrap QueryId: " <> displayException (e :: IOException)) + + case bootstrapResponse of + Left err -> pure $ Left err + Right resp + | resp == Set.empty -> pure . Left $ "Bootstrapping node " <> show bootstrapHost <> " gave no response." + | otherwise -> do + now <- getPOSIXTime + -- create new cache with all returned node responses + let bootstrapCache = + -- traverse response parts + foldr' (\resp cacheAcc -> case queryResult <$> payload resp of + Nothing -> cacheAcc + Just (FOUND result1) -> addCacheEntryPure now (RemoteCacheEntry result1 now) cacheAcc + Just (FORWARD resultset) -> foldr' (addCacheEntryPure now) cacheAcc resultset + ) + initCache resp + currentlyResponsible <- runExceptT $ queryIdLookupLoop bootstrapCache ns 50 $ getNid ns + pure currentlyResponsible - case bootstrapResponse of - Left err -> throwError err - Right resp - | resp == Set.empty -> throwError $ "Bootstrapping node " <> show bootstrapHost <> " gave no response." - | otherwise -> do - now <- liftIO getPOSIXTime - -- create new cache with all returned node responses - let bootstrapCache = - -- traverse response parts - foldr' (\resp' cacheAcc -> case queryResult <$> payload resp' of - Nothing -> cacheAcc - Just (FOUND result1) -> addCacheEntryPure now (RemoteCacheEntry result1 now) cacheAcc - Just (FORWARD resultset) -> foldr' (addCacheEntryPure now) cacheAcc resultset - ) - initCache resp - queryIdLookupLoop bootstrapCache ns 50 $ getNid ns - ) `catchError` (\_ -> - -- only throw an error if all IDs have been tried - tryQuery ns srcAddr nodeConf bnids) -- | join a node to the DHT using the global node cache -- node's position. @@ -580,7 +265,6 @@ fediChordVserverJoin nsSTM = do joinResult <- liftIO $ requestJoin currentlyResponsible nsSTM liftEither joinResult - fediChordVserverLeave :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s)) => LocalNodeState s -> m () fediChordVserverLeave ns = do -- TODO: deal with failure of all successors, e.g. by invoking a stabilise @@ -622,133 +306,96 @@ fediChordVserverLeave ns = do -- | Wait for new cache entries to appear and then try joining on them. -- Exits after successful joining. -joinOnNewEntriesThread :: Service s (RealNodeSTM s) => RealNodeSTM s -> IO () -joinOnNewEntriesThread nodeSTM = loop +joinOnNewEntriesThread :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> IO () +joinOnNewEntriesThread nsSTM = loop where - -- situation 1: not joined yet -> vservers == empty - -- problem: empty vservers means not responding to incoming requests, so - -- another node cannot join on us an we not on them (as they're still - -- unjoined as well) - -- solution: on failure still join a dummy node, also add it as vserver - -- problem: once another node has joined on the dummy vserver, we shouldn't - -- just delete it again as it now relies on it as a neighbour - -- => either trigger a kChoicesNodeJoin with the flag that **not** at least one - -- single node needs to be joined (read vservers initially), or do an accelerated - -- periodic rebalance - -- TODO: document this approach in the docs loop = do - (lookupResult, conf, firstVSSTM) <- atomically $ do - nodeSnap <- readTVar nodeSTM - let conf = nodeConfig nodeSnap - case headMay (rMapToList $ vservers nodeSnap) of - Nothing -> retry - Just vsSTM -> do - -- take any active vserver as heuristic for whether this node has - -- successfully joined: - -- If the node hasn't joined yet, only a single placeholder node - -- is active… - firstVS <- readTVar vsSTM - cache <- readTVar $ globalNodeCacheSTM nodeSnap - case queryLocalCache firstVS cache 1 (getNid firstVS) of - -- …which, having no neighbours, returns an empty forward list - -- -> block until cache changes and then retry - (FORWARD s) | Set.null s -> retry - result -> pure (result, conf, vsSTM) + nsSnap <- readTVarIO nsSTM + (lookupResult, parentNode) <- atomically $ do + cache <- readTVar $ nodeCacheSTM nsSnap + parentNode <- readTVar $ parentRealNode nsSnap + case queryLocalCache nsSnap cache 1 (getNid nsSnap) of + -- empty cache, block until cache changes and then retry + (FORWARD s) | Set.null s -> retry + result -> pure (result, parentNode) case lookupResult of -- already joined FOUND _ -> pure () -- otherwise try joining FORWARD _ -> do - -- do normal join, but without bootstrap nodes - joinResult <- if confEnableKChoices conf - then runExceptT $ kChoicesNodeJoin nodeSTM Nothing - else runExceptT $ fediChordVserverJoin firstVSSTM - >> pure () + joinResult <- runExceptT $ fediChordVserverJoin nsSTM either -- on join failure, sleep and retry - (const $ threadDelay (confJoinAttemptsInterval conf) >> loop) + (const $ threadDelay (confJoinAttemptsInterval . nodeConfig $ parentNode) >> loop) (const $ pure ()) joinResult -- | cache updater thread that waits for incoming NodeCache update instructions on -- the node's cacheWriteQueue and then modifies the NodeCache as the single writer. -nodeCacheWriter :: RealNodeSTM s -> IO () -nodeCacheWriter nodeSTM = do - node <- readTVarIO nodeSTM +nodeCacheWriter :: LocalNodeStateSTM s -> IO () +nodeCacheWriter nsSTM = forever $ atomically $ do - cacheModifier <- readTQueue $ globalCacheWriteQueue node - modifyTVar' (globalNodeCacheSTM node) cacheModifier + ns <- readTVar nsSTM + cacheModifier <- readTQueue $ cacheWriteQueue ns + modifyTVar' (nodeCacheSTM ns) cacheModifier -- | Periodically iterate through cache, clean up expired entries and verify unverified ones -nodeCacheVerifyThread :: RealNodeSTM s -> IO () -nodeCacheVerifyThread nodeSTM = forever $ do - (node, firstVSSTM) <- atomically $ do - node <- readTVar nodeSTM - case headMay (rMapToList $ vservers node) of - -- wait until first VS is joined - Nothing -> retry - Just vs' -> pure (node, vs') - let - maxEntryAge = confMaxNodeCacheAge $ nodeConfig node - cacheQ = globalCacheWriteQueue node - cache <- readTVarIO $ globalNodeCacheSTM node - -- always use the first active VS as a sender for operations like Ping - firstVS <- readTVarIO firstVSSTM +nodeCacheVerifyThread :: LocalNodeStateSTM s -> IO () +nodeCacheVerifyThread nsSTM = forever $ do + -- get cache + (ns, cache, maxEntryAge) <- atomically $ do + ns <- readTVar nsSTM + cache <- readTVar $ nodeCacheSTM ns + maxEntryAge <- confMaxNodeCacheAge . nodeConfig <$> readTVar (parentRealNode ns) + pure (ns, cache, maxEntryAge) -- iterate entries: -- for avoiding too many time syscalls, get current time before iterating. now <- getPOSIXTime - forM_ (nodeCacheEntries cache) (\(CacheEntry validated cacheNode ts) -> + forM_ (nodeCacheEntries cache) (\(CacheEntry validated node ts) -> -- case too old: delete (future work: decide whether pinging and resetting timestamp is better) if (now - ts) > maxEntryAge then - queueDeleteEntry (getNid cacheNode) cacheQ - -- case unverified: try verifying, otherwise delete + queueDeleteEntry (getNid node) ns + -- case unverified: try verifying, otherwise delete else if not validated then do -- marking as verified is done by 'requestPing' as well - pong <- requestPing firstVS cacheNode + pong <- requestPing ns node either (\_-> - queueDeleteEntry (getNid cacheNode) cacheQ + queueDeleteEntry (getNid node) ns ) (\vss -> - if cacheNode `notElem` vss - then queueDeleteEntry (getNid cacheNode) cacheQ + if node `notElem` vss + then queueDeleteEntry (getNid node) ns -- after verifying a node, check whether it can be a closer neighbour - -- do this for each node - -- TODO: optimisation: place all LocalNodeStates on the cache ring and check whether any of them is the predecessor/ successor - else forM_ (vservers node) (\nsSTM -> do - ns <- readTVarIO nsSTM - if cacheNode `isPossiblePredecessor` ns + else do + if node `isPossiblePredecessor` ns then atomically $ do ns' <- readTVar nsSTM - writeTVar nsSTM $ addPredecessors [cacheNode] ns' + writeTVar nsSTM $ addPredecessors [node] ns' else pure () - if cacheNode `isPossibleSuccessor` ns + if node `isPossibleSuccessor` ns then atomically $ do ns' <- readTVar nsSTM - writeTVar nsSTM $ addSuccessors [cacheNode] ns' + writeTVar nsSTM $ addSuccessors [node] ns' else pure () - ) ) pong else pure () ) -- check the cache invariant per slice and, if necessary, do a single lookup to the -- middle of each slice not verifying the invariant - latestNode <- readTVarIO nodeSTM - forM_ (vservers latestNode) (\nsSTM -> do - latestNs <- readTVarIO nsSTM - latestCache <- readTVarIO $ nodeCacheSTM latestNs - let nodesToQuery targetID = case queryLocalCache latestNs latestCache (lNumBestNodes latestNs) targetID of - FOUND node -> [node] - FORWARD nodeSet -> remoteNode <$> Set.elems nodeSet - forM_ (checkCacheSliceInvariants latestNs latestCache) (\targetID -> - forkIO $ sendQueryIdMessages targetID latestNs (Just (1 + jEntriesPerSlice latestNs)) (nodesToQuery targetID) >> pure () -- ask for 1 entry more than j because of querying the middle - ) - ) + latestNs <- readTVarIO nsSTM + latestCache <- readTVarIO $ nodeCacheSTM latestNs + let nodesToQuery targetID = case queryLocalCache latestNs latestCache (lNumBestNodes latestNs) targetID of + FOUND node -> [node] + FORWARD nodeSet -> remoteNode <$> Set.elems nodeSet + forM_ (checkCacheSliceInvariants latestNs latestCache) (\targetID -> + forkIO $ sendQueryIdMessages targetID latestNs (Just (1 + jEntriesPerSlice latestNs)) (nodesToQuery targetID) >> pure () -- ask for 1 entry more than j because of querying the middle + ) threadDelay $ fromEnum (maxEntryAge / 20) `div` 10^6 -- convert from pico to milliseconds @@ -761,7 +408,7 @@ checkCacheSliceInvariants :: LocalNodeState s -> [NodeID] -- ^ list of middle IDs of slices not -- ^ fulfilling the invariant checkCacheSliceInvariants ns - | vsIsJoined ns = checkPredecessorSlice jEntries (getNid ns) startBound lastPred <> checkSuccessorSlice jEntries (getNid ns) startBound lastSucc + | isJoined ns = checkPredecessorSlice jEntries (getNid ns) startBound lastPred <> checkSuccessorSlice jEntries (getNid ns) startBound lastSucc | otherwise = const [] where jEntries = jEntriesPerSlice ns @@ -812,93 +459,90 @@ checkCacheSliceInvariants ns -- | Periodically send @StabiliseRequest' s to the closest neighbour nodes, until -- one responds, and get their neighbours for maintaining the own neighbour lists. -- If necessary, request new neighbours. -stabiliseThread :: Service s (RealNodeSTM s) => RealNodeSTM s -> IO () -stabiliseThread nodeSTM = forever $ do - node <- readTVarIO nodeSTM - forM_ (vservers node) (\nsSTM -> do - oldNs <- readTVarIO nsSTM +stabiliseThread :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> IO () +stabiliseThread nsSTM = forever $ do + oldNs <- readTVarIO nsSTM - -- iterate through the same snapshot, collect potential new neighbours - -- and nodes to be deleted, and modify these changes only at the end of - -- each stabilise run. - -- This decision makes iterating through a potentially changing list easier. + -- iterate through the same snapshot, collect potential new neighbours + -- and nodes to be deleted, and modify these changes only at the end of + -- each stabilise run. + -- This decision makes iterating through a potentially changing list easier. - -- don't contact all neighbours unless the previous one failed/ Left ed + -- don't contact all neighbours unless the previous one failed/ Left ed - predStabilise <- stabiliseClosestResponder oldNs predecessors 1 [] - succStabilise <- stabiliseClosestResponder oldNs predecessors 1 [] + predStabilise <- stabiliseClosestResponder oldNs predecessors 1 [] + succStabilise <- stabiliseClosestResponder oldNs predecessors 1 [] + let + (predDeletes, predNeighbours) = either (const ([], [])) id predStabilise + (succDeletes, succNeighbours) = either (const ([], [])) id succStabilise + allDeletes = predDeletes <> succDeletes + allNeighbours = predNeighbours <> succNeighbours + + -- now actually modify the node state's neighbours + updatedNs <- atomically $ do + newerNsSnap <- readTVar nsSTM let - (predDeletes, predNeighbours) = either (const ([], [])) id predStabilise - (succDeletes, succNeighbours) = either (const ([], [])) id succStabilise - allDeletes = predDeletes <> succDeletes - allNeighbours = predNeighbours <> succNeighbours + -- sorting and taking only k neighbours is taken care of by the + -- setSuccessors/ setPredecessors functions + newPreds = (predecessors newerNsSnap \\ allDeletes) <> allNeighbours + newSuccs = (successors newerNsSnap \\ allDeletes) <> allNeighbours + newNs = setPredecessors newPreds . setSuccessors newSuccs $ newerNsSnap + writeTVar nsSTM newNs + pure newNs + -- delete unresponding nodes from cache as well + mapM_ (atomically . writeTQueue (cacheWriteQueue updatedNs) . deleteCacheEntry . getNid) allDeletes - -- now actually modify the node state's neighbours - updatedNs <- atomically $ do - newerNsSnap <- readTVar nsSTM - let - -- sorting and taking only k neighbours is taken care of by the - -- setSuccessors/ setPredecessors functions - newPreds = (predecessors newerNsSnap \\ allDeletes) <> allNeighbours - newSuccs = (successors newerNsSnap \\ allDeletes) <> allNeighbours - newNs = setPredecessors newPreds . setSuccessors newSuccs $ newerNsSnap - writeTVar nsSTM newNs - pure newNs - -- delete unresponding nodes from cache as well - mapM_ (atomically . writeTQueue (cacheWriteQueue updatedNs) . deleteCacheEntry . getNid) allDeletes + -- try looking up additional neighbours if list too short + forM_ [(length $ predecessors updatedNs)..(kNeighbours updatedNs)] (\_ -> do + ns' <- readTVarIO nsSTM + nextEntry <- runExceptT . requestQueryID ns' $ pred . getNid $ lastDef (toRemoteNodeState ns') (predecessors ns') + either + (const $ pure ()) + (\entry -> atomically $ do + latestNs <- readTVar nsSTM + writeTVar nsSTM $ addPredecessors [entry] latestNs + ) + nextEntry + ) - -- try looking up additional neighbours if list too short - forM_ [(length $ predecessors updatedNs)..(kNeighbours updatedNs)] (\_ -> do - ns' <- readTVarIO nsSTM - nextEntry <- runExceptT . requestQueryID ns' $ pred . getNid $ lastDef (toRemoteNodeState ns') (predecessors ns') - either - (const $ pure ()) - (\entry -> atomically $ do - latestNs <- readTVar nsSTM - writeTVar nsSTM $ addPredecessors [entry] latestNs - ) - nextEntry - ) + forM_ [(length $ successors updatedNs)..(kNeighbours updatedNs)] (\_ -> do + ns' <- readTVarIO nsSTM + nextEntry <- runExceptT . requestQueryID ns' $ succ . getNid $ lastDef (toRemoteNodeState ns') (successors ns') + either + (const $ pure ()) + (\entry -> atomically $ do + latestNs <- readTVar nsSTM + writeTVar nsSTM $ addSuccessors [entry] latestNs + ) + nextEntry + ) - forM_ [(length $ successors updatedNs)..(kNeighbours updatedNs)] (\_ -> do - ns' <- readTVarIO nsSTM - nextEntry <- runExceptT . requestQueryID ns' $ succ . getNid $ lastDef (toRemoteNodeState ns') (successors ns') - either - (const $ pure ()) - (\entry -> atomically $ do - latestNs <- readTVar nsSTM - writeTVar nsSTM $ addSuccessors [entry] latestNs - ) - nextEntry - ) + newNs <- readTVarIO nsSTM - newNs <- readTVarIO nsSTM + let + oldPredecessor = headDef (toRemoteNodeState oldNs) $ predecessors oldNs + newPredecessor = headMay $ predecessors newNs + -- manage need for service data migration: + maybe (pure ()) (\newPredecessor' -> + when ( + isJust newPredecessor + && oldPredecessor /= newPredecessor' + -- case: predecessor has changed in some way => own responsibility has changed in some way + -- case 1: new predecessor is further away => broader responsibility, but new pred needs to push the data + -- If this is due to a node leaving without transfering its data, try getting it from a redundant copy + -- case 2: new predecessor is closer, it takes some of our data but somehow didn't join on us => push data to it + && isInOwnResponsibilitySlice newPredecessor' oldNs) $ do + ownService <- nodeService <$> (liftIO . readTVarIO $ parentRealNode newNs) + migrationResult <- migrateData ownService (getNid newNs) (getNid oldPredecessor) (getNid newPredecessor') (getDomain newPredecessor', fromIntegral $ getServicePort newPredecessor') + -- TODO: deal with migration failure, e.g retry + pure () + ) + newPredecessor - let - oldPredecessor = headDef (toRemoteNodeState oldNs) $ predecessors oldNs - newPredecessor = headMay $ predecessors newNs - -- manage need for service data migration: - maybe (pure ()) (\newPredecessor' -> - when ( - isJust newPredecessor - && oldPredecessor /= newPredecessor' - -- case: predecessor has changed in some way => own responsibility has changed in some way - -- case 1: new predecessor is further away => broader responsibility, but new pred needs to push the data - -- If this is due to a node leaving without transfering its data, try getting it from a redundant copy - -- case 2: new predecessor is closer, it takes some of our data but somehow didn't join on us => push data to it - && isInOwnResponsibilitySlice newPredecessor' oldNs) $ do - ownService <- nodeService <$> (liftIO . readTVarIO $ parentRealNode newNs) - migrationResult <- migrateData ownService (getNid newNs) (getNid oldPredecessor) (getNid newPredecessor') (getDomain newPredecessor', fromIntegral $ getServicePort newPredecessor') - -- TODO: deal with migration failure, e.g retry - pure () - ) - newPredecessor - - ) - - threadDelay . confStabiliseInterval . nodeConfig $ node + stabiliseDelay <- confStabiliseInterval . nodeConfig <$> readTVarIO (parentRealNode newNs) + threadDelay stabiliseDelay where -- | send a stabilise request to the n-th neighbour -- (specified by the provided getter function) and on failure retry @@ -959,23 +603,20 @@ sendThread sock sendQ = forever $ do sendAllTo sock packet addr -- | Sets up and manages the main server threads of FediChord -fediMainThreads :: Service s (RealNodeSTM s) => Socket -> RealNodeSTM s -> IO () -fediMainThreads sock nodeSTM = do - node <- readTVarIO nodeSTM +fediMainThreads :: Service s (RealNodeSTM s) => Socket -> LocalNodeStateSTM s -> IO () +fediMainThreads sock nsSTM = do + ns <- readTVarIO nsSTM putStrLn "launching threads" sendQ <- newTQueueIO recvQ <- newTQueueIO -- concurrently launch all handler threads, if one of them throws an exception -- all get cancelled concurrently_ - (fediMessageHandler sendQ recvQ nodeSTM) $ - -- decision whether to [1] launch 1 thread per VS or [2] let a single - -- thread process all VSes sequentially: - -- choose option 2 for the sake of limiting concurrency in simulation scenario - concurrently_ (stabiliseThread nodeSTM) $ - concurrently_ (nodeCacheVerifyThread nodeSTM) $ - concurrently_ (convergenceSampleThread nodeSTM) $ - concurrently_ (lookupCacheCleanup nodeSTM) $ + (fediMessageHandler sendQ recvQ nsSTM) $ + concurrently_ (stabiliseThread nsSTM) $ + concurrently_ (nodeCacheVerifyThread nsSTM) $ + concurrently_ (convergenceSampleThread nsSTM) $ + concurrently_ (lookupCacheCleanup $ parentRealNode ns) $ concurrently_ (sendThread sock sendQ) (recvThread sock recvQ) @@ -1004,23 +645,20 @@ requestMapPurge purgeAge mapVar = forever $ do fediMessageHandler :: Service s (RealNodeSTM s) => TQueue (BS.ByteString, SockAddr) -- ^ send queue -> TQueue (BS.ByteString, SockAddr) -- ^ receive queue - -> RealNodeSTM s -- ^ node + -> LocalNodeStateSTM s -- ^ acting NodeState -> IO () -fediMessageHandler sendQ recvQ nodeSTM = do - nodeConf <- nodeConfig <$> readTVarIO nodeSTM +fediMessageHandler sendQ recvQ nsSTM = do + -- Read node state just once, assuming that all relevant data for this function does + -- not change. + -- Other functions are passed the nsSTM reference and thus can get the latest state. + nsSnap <- readTVarIO nsSTM + nodeConf <- nodeConfig <$> readTVarIO (parentRealNode nsSnap) -- handling multipart messages: -- Request parts can be insert into a map (key: (sender IP against spoofing, request ID), value: timestamp + set of message parts, handle all of them when size of set == parts) before being handled. This map needs to be purged periodically by a separate thread and can be protected by an MVar for fairness. requestMap <- newMVar (Map.empty :: RequestMap) -- run receive loop and requestMapPurge concurrently, so that an exception makes -- both of them fail concurrently_ (requestMapPurge (confResponsePurgeAge nodeConf) requestMap) $ forever $ do - node <- readTVarIO nodeSTM - -- Messages from invalid (spoofed) sender IDs could already be dropped here - -- or in @dispatchVS@. But as the checking on each possible ID causes an - -- overhead, it is only done for critical operations and the case - -- differentiation is done in @handleIncomingRequest@. Thus the vserver - -- number limit, required for this check, needs to be passed to that function. - let handlerFunc = handleIncomingRequest $ confKChoicesMaxVS nodeConf -- wait for incoming messages (rawMsg, sourceAddr) <- atomically $ readTQueue recvQ let aMsg = deserialiseMessage rawMsg @@ -1030,14 +668,12 @@ fediMessageHandler sendQ recvQ nodeSTM = do ) (\validMsg -> case validMsg of - aRequest@Request{} -> case dispatchVS node aRequest of - -- if no match to an active vserver ID, just ignore - Nothing -> pure () + aRequest@Request{} -- if not a multipart message, handle immediately. Response is at the same time an ACK - Just nsSTM | part aRequest == 1 && isFinalPart aRequest -> - forkIO (handlerFunc nsSTM sendQ (Set.singleton aRequest) sourceAddr) >> pure () + | part aRequest == 1 && isFinalPart aRequest -> + forkIO (handleIncomingRequest nsSTM sendQ (Set.singleton aRequest) sourceAddr) >> pure () -- otherwise collect all message parts first before handling the whole request - Just nsSTM | otherwise -> do + | otherwise -> do now <- getPOSIXTime -- critical locking section of requestMap rMapState <- takeMVar requestMap @@ -1055,14 +691,14 @@ fediMessageHandler sendQ recvQ nodeSTM = do -- put map back into MVar, end of critical section putMVar requestMap newMapState -- ACK the received part - forM_ (ackRequest aRequest) $ + forM_ (ackRequest (getNid nsSnap) aRequest) $ \msg -> atomically $ writeTQueue sendQ (msg, sourceAddr) -- if all parts received, then handle request. let (RequestMapEntry theseParts mayMaxParts _) = fromJust $ Map.lookup thisKey newMapState numParts = Set.size theseParts if maybe False (numParts ==) (fromIntegral <$> mayMaxParts) - then forkIO (handlerFunc nsSTM sendQ theseParts sourceAddr) >> pure() + then forkIO (handleIncomingRequest nsSTM sendQ theseParts sourceAddr) >> pure() else pure() -- Responses should never arrive on the main server port, as they are always -- responses to requests sent from dedicated sockets on another port @@ -1071,8 +707,6 @@ fediMessageHandler sendQ recvQ nodeSTM = do aMsg pure () - where - dispatchVS node req = rMapLookup (receiverID req) (vservers node) -- ==== interface to service layer ==== @@ -1123,7 +757,7 @@ updateLookupCache :: RealNodeSTM s -> NodeID -> IO (Maybe (String, PortNumber)) updateLookupCache nodeSTM keyToLookup = do (node, lookupSource) <- atomically $ do node <- readTVar nodeSTM - let firstVs = headMay (rMapToList $ vservers node) + let firstVs = headMay (vservers node) lookupSource <- case firstVs of Nothing -> pure Nothing Just vs -> Just <$> readTVar vs diff --git a/src/Hash2Pub/FediChordTypes.hs b/src/Hash2Pub/FediChordTypes.hs index 347c90c..4ce20a7 100644 --- a/src/Hash2Pub/FediChordTypes.hs +++ b/src/Hash2Pub/FediChordTypes.hs @@ -7,8 +7,8 @@ {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE RankNTypes #-} -module Hash2Pub.FediChordTypes - ( NodeID -- abstract, but newtype constructors cannot be hidden +module Hash2Pub.FediChordTypes ( + NodeID -- abstract, but newtype constructors cannot be hidden , idBits , getNodeID , toNodeID @@ -18,13 +18,6 @@ module Hash2Pub.FediChordTypes , RemoteNodeState (..) , RealNode (..) , RealNodeSTM - , VSMap - , LoadStats (..) - , emptyLoadStats - , remainingLoadTarget - , loadSliceSum - , addVserver - , SegmentLoadStats (..) , setSuccessors , setPredecessors , NodeCache @@ -58,7 +51,6 @@ module Hash2Pub.FediChordTypes , localCompare , genNodeID , genNodeIDBS - , hasValidNodeId , genKeyID , genKeyIDBS , byteStringToUInteger @@ -68,14 +60,12 @@ module Hash2Pub.FediChordTypes , DHT(..) , Service(..) , ServiceConf(..) - ) where + ) where import Control.Exception import Data.Foldable (foldr') import Data.Function (on) import qualified Data.Hashable as Hashable -import Data.HashMap.Strict (HashMap) -import qualified Data.HashMap.Strict as HMap import Data.List (delete, nub, sortBy) import qualified Data.Map.Strict as Map import Data.Maybe (fromJust, fromMaybe, isJust, @@ -158,27 +148,17 @@ a `localCompare` b -- Also contains shared data and config values. -- TODO: more data structures for k-choices bookkeeping data RealNode s = RealNode - { vservers :: VSMap s - -- ^ map of all active VServer node IDs to their node state - , nodeConfig :: FediChordConf + { vservers :: [LocalNodeStateSTM s] + -- ^ references to all active versers + , nodeConfig :: FediChordConf -- ^ holds the initial configuration read at program start - , bootstrapNodes :: [(String, PortNumber)] + , bootstrapNodes :: [(String, PortNumber)] -- ^ nodes to be used as bootstrapping points, new ones learned during operation - , lookupCacheSTM :: TVar LookupCache + , lookupCacheSTM :: TVar LookupCache -- ^ a global cache of looked up keys and their associated nodes - , globalNodeCacheSTM :: TVar NodeCache - -- ^ EpiChord node cache with expiry times for nodes. - , globalCacheWriteQueue :: TQueue (NodeCache -> NodeCache) - -- ^ cache updates are not written directly to the 'globalNodeCacheSTM' - , nodeService :: s (RealNodeSTM s) + , nodeService :: s (RealNodeSTM s) } --- | insert a new vserver mapping into a node -addVserver :: (NodeID, LocalNodeStateSTM s) -> RealNode s -> RealNode s -addVserver (key, nstate) node = node - { vservers = addRMapEntry key nstate (vservers node) } - -type VSMap s = RingMap NodeID (LocalNodeStateSTM s) type RealNodeSTM s = TVar (RealNode s) -- | represents a node and all its important state @@ -192,7 +172,7 @@ data RemoteNodeState = RemoteNodeState -- ^ port of the DHT itself , servicePort :: PortNumber -- ^ port of the service provided on top of the DHT - , vServerID :: Word8 + , vServerID :: Integer -- ^ ID of this vserver } deriving (Show, Eq) @@ -205,9 +185,9 @@ data LocalNodeState s = LocalNodeState { nodeState :: RemoteNodeState -- ^ represents common data present both in remote and local node representations , nodeCacheSTM :: TVar NodeCache - -- ^ reference to the 'globalNodeCacheSTM' + -- ^ EpiChord node cache with expiry times for nodes , cacheWriteQueue :: TQueue (NodeCache -> NodeCache) - -- ^ reference to the 'globalCacheWriteQueue + -- ^ cache updates are not written directly to the 'nodeCache' but queued and , successors :: [RemoteNodeState] -- could be a set instead as these are ordered as well -- ^ successor nodes in ascending order by distance , predecessors :: [RemoteNodeState] @@ -237,14 +217,14 @@ class NodeState a where getIpAddr :: a -> HostAddress6 getDhtPort :: a -> PortNumber getServicePort :: a -> PortNumber - getVServerID :: a -> Word8 + getVServerID :: a -> Integer -- setters for common properties setNid :: NodeID -> a -> a setDomain :: String -> a -> a setIpAddr :: HostAddress6 -> a -> a setDhtPort :: PortNumber -> a -> a setServicePort :: PortNumber -> a -> a - setVServerID :: Word8 -> a -> a + setVServerID :: Integer -> a -> a toRemoteNodeState :: a -> RemoteNodeState instance NodeState RemoteNodeState where @@ -393,11 +373,6 @@ genNodeID :: HostAddress6 -- ^a node's IPv6 address -> NodeID -- ^the generated @NodeID@ genNodeID ip nodeDomain vs = NodeID . byteStringToUInteger $ genNodeIDBS ip nodeDomain vs - -hasValidNodeId :: Word8 -> RemoteNodeState -> HostAddress6 -> Bool -hasValidNodeId numVs rns addr = getVServerID rns < numVs && getNid rns == genNodeID addr (getDomain rns) (getVServerID rns) - - -- | generates a 256 bit long key identifier, represented as ByteString, for looking up its data on the DHT genKeyIDBS :: String -- ^the key string -> BS.ByteString -- ^the key ID represented as a @ByteString@ @@ -452,70 +427,9 @@ data FediChordConf = FediChordConf -- ^ how long to wait until response has arrived, in milliseconds , confRequestRetries :: Int -- ^ how often re-sending a timed-out request can be retried - , confEnableKChoices :: Bool - -- ^ whether to enable k-choices load balancing - , confKChoicesOverload :: Double - -- ^ fraction of capacity above which a node considers itself overloaded - , confKChoicesUnderload :: Double - -- ^ fraction of capacity below which a node considers itself underloaded - , confKChoicesMaxVS :: Word8 - -- ^ upper limit of vserver index κ - , confKChoicesRebalanceInterval :: Int - -- ^ interval between vserver rebalance attempts } deriving (Show, Eq) --- ====== k-choices load balancing types ====== - -data LoadStats = LoadStats - { loadPerTag :: RingMap NodeID Double - -- ^ map of loads for each handled tag - , totalCapacity :: Double - -- ^ total designated capacity of the service - , compensatedLoadSum :: Double - -- ^ effective load reevant for load balancing after compensating for - } - deriving (Show, Eq) - --- | calculates the mismatch from the target load by taking into account the --- underload and overload limits -remainingLoadTarget :: FediChordConf -> LoadStats -> Double -remainingLoadTarget conf lstats = targetLoad - compensatedLoadSum lstats - where - targetLoad = totalCapacity lstats * (confKChoicesUnderload conf + confKChoicesOverload conf) / 2 - - --- | calculates the sum of tag load in a contiguous slice between to keys -loadSliceSum :: LoadStats - -> NodeID -- ^ lower segment bound - -> NodeID -- ^ upper segment bound - -> Double -- ^ sum of all tag loads within that segment -loadSliceSum stats from to = sum . takeRMapSuccessorsFromTo from to $ loadPerTag stats - - -data SegmentLoadStats = SegmentLoadStats - { segmentLowerKeyBound :: NodeID - -- ^ segment start key - , segmentUpperKeyBound :: NodeID - -- ^ segment end key - , segmentLoad :: Double - -- ^ sum of load of all keys in the segment - , segmentOwnerRemainingLoadTarget :: Double - -- ^ remaining load target of the current segment handler: - , segmentOwnerCapacity :: Double - -- ^ total capacity of the current segment handler node, used for normalisation - , segmentCurrentOwner :: RemoteNodeState - -- ^ the current owner of the segment that needs to be joined on - } - --- TODO: figure out a better way of initialising -emptyLoadStats :: LoadStats -emptyLoadStats = LoadStats - { loadPerTag = emptyRMap - , totalCapacity = 0 - , compensatedLoadSum = 0 - } - -- ====== Service Types ============ class Service s d where @@ -531,7 +445,6 @@ class Service s d where -> IO (Either String ()) -- ^ success or failure -- | Wait for an incoming migration from a given node to succeed, may block forever waitForMigrationFrom :: s d -> NodeID -> IO () - getServiceLoadStats :: s d -> IO LoadStats instance Hashable.Hashable NodeID where hashWithSalt salt = Hashable.hashWithSalt salt . getNodeID diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index f1376e4..ffeef17 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -22,7 +22,7 @@ import qualified Data.DList as D import Data.Either (lefts, rights) import qualified Data.HashMap.Strict as HMap import qualified Data.HashSet as HSet -import Data.Maybe (fromJust, fromMaybe, isJust) +import Data.Maybe (fromJust, isJust) import Data.String (fromString) import Data.Text.Lazy (Text) import qualified Data.Text.Lazy as Txt @@ -64,10 +64,8 @@ data PostService d = PostService , migrationsInProgress :: TVar (HMap.HashMap NodeID (MVar ())) , httpMan :: HTTP.Manager , statsQueue :: TQueue StatsEvent - , relayStats :: TVar RelayStats - -- ^ current relay stats, replaced periodically - , loadStats :: TVar LoadStats - -- ^ current load values of the relay, replaced periodically and used by + , loadStats :: TVar RelayStats + -- ^ current load stats, replaced periodically , logFileHandle :: Handle } deriving (Typeable) @@ -98,8 +96,7 @@ instance DHT d => Service PostService d where migrationsInProgress' <- newTVarIO HMap.empty httpMan' <- HTTP.newManager HTTP.defaultManagerSettings statsQueue' <- newTQueueIO - relayStats' <- newTVarIO emptyStats - loadStats' <- newTVarIO emptyLoadStats + loadStats' <- newTVarIO emptyStats loggingFile <- openFile (confLogfilePath conf) WriteMode hSetBuffering loggingFile LineBuffering let @@ -115,7 +112,6 @@ instance DHT d => Service PostService d where , migrationsInProgress = migrationsInProgress' , httpMan = httpMan' , statsQueue = statsQueue' - , relayStats = relayStats' , loadStats = loadStats' , logFileHandle = loggingFile } @@ -157,12 +153,6 @@ instance DHT d => Service PostService d where -- block until migration finished takeMVar migrationSynchroniser - getServiceLoadStats = getLoadStats - - -getLoadStats :: PostService d -> IO LoadStats -getLoadStats serv = readTVarIO $ loadStats serv - -- | return a WAI application postServiceApplication :: DHT d => PostService d -> Application @@ -845,12 +835,7 @@ evaluateStatsThread serv statsAcc = getPOSIXTime >>= loop -- persistently store in a TVar so it can be retrieved later by the DHT let timePassed = (now - previousTs) * fromIntegral (confSpeedupFactor $ serviceConf serv) rateStats = evaluateStats timePassed summedStats - currentSubscribers <- readTVarIO $ subscribers serv - -- translate the rate statistics to load values - loads <- evaluateLoadStats rateStats currentSubscribers - atomically $ - writeTVar (relayStats serv) rateStats - >> writeTVar (loadStats serv) loads + atomically $ writeTVar (loadStats serv) rateStats -- and now what? write a log to file -- format: total relayReceiveRates;total relayDeliveryRates;postFetchRate;postPublishRate; subscriberSum -- later: current (reported) load, target load @@ -874,33 +859,6 @@ evaluateStatsThread serv statsAcc = getPOSIXTime >>= loop 0 tagMap --- | calculate load values from rate statistics -evaluateLoadStats :: RelayStats -> RelayTags -> IO LoadStats -evaluateLoadStats currentStats currentSubscribers = do - -- load caused by each tag: incomingPostRate * ( 1 + subscribers) - -- calculate remaining load target: post publish rate * 2.5 - sum loadPerTag - postFetchRate - let - totalCapacity' = 2.5 * postPublishRate currentStats - (loadSum, loadPerTag') <- foldM (\(loadSum, loadPerTag') (key, (subscriberMapSTM, _, _)) -> do - numSubscribers <- HMap.size <$> readTVarIO subscriberMapSTM - let - thisTagRate = fromMaybe 0 $ rMapLookup key (relayReceiveRates currentStats) - thisTagLoad = thisTagRate * (1 + fromIntegral numSubscribers) - pure (loadSum + thisTagLoad, addRMapEntry key thisTagLoad loadPerTag') - ) - (0, emptyRMap) - $ rMapToListWithKeys currentSubscribers - let remainingLoadTarget' = totalCapacity' - loadSum - postFetchRate currentStats - pure LoadStats - { loadPerTag = loadPerTag' - , totalCapacity = totalCapacity' - -- load caused by post fetches cannot be influenced by re-balancing nodes, - -- but still reduces the totally available capacity - , compensatedLoadSum = loadSum + postFetchRate currentStats - } - - - -- | Evaluate the accumulated statistic events: Currently mostly calculates the event -- rates by dividing through the collection time frame evaluateStats :: POSIXTime -> RelayStats -> RelayStats diff --git a/src/Hash2Pub/ProtocolTypes.hs b/src/Hash2Pub/ProtocolTypes.hs index b5ce0a9..a5af10c 100644 --- a/src/Hash2Pub/ProtocolTypes.hs +++ b/src/Hash2Pub/ProtocolTypes.hs @@ -16,12 +16,10 @@ data Action = QueryID | Leave | Stabilise | Ping - | QueryLoad deriving (Show, Eq, Enum) data FediChordMessage = Request { requestID :: Integer - , receiverID :: NodeID , sender :: RemoteNodeState , part :: Integer , isFinalPart :: Bool @@ -59,10 +57,6 @@ data ActionPayload = QueryIDRequestPayload } | StabiliseRequestPayload | PingRequestPayload - | LoadRequestPayload - { loadSegmentUpperBound :: NodeID - -- ^ upper bound of segment interested in, - } | QueryIDResponsePayload { queryResult :: QueryResponse } @@ -79,12 +73,6 @@ data ActionPayload = QueryIDRequestPayload | PingResponsePayload { pingNodeStates :: [RemoteNodeState] } - | LoadResponsePayload - { loadSum :: Double - , loadRemainingTarget :: Double - , loadTotalCapacity :: Double - , loadSegmentLowerBound :: NodeID - } deriving (Show, Eq) -- | global limit of parts per message used when (de)serialising messages. diff --git a/src/Hash2Pub/RingMap.hs b/src/Hash2Pub/RingMap.hs index d26835c..a2fe3ae 100644 --- a/src/Hash2Pub/RingMap.hs +++ b/src/Hash2Pub/RingMap.hs @@ -47,13 +47,6 @@ instance (Bounded k, Ord k) => Foldable (RingMap k) where traversingFL acc (ProxyEntry _ Nothing) = acc traversingFL acc (ProxyEntry _ (Just entry)) = traversingFL acc entry -instance (Bounded k, Ord k) => Traversable (RingMap k) where - traverse f = fmap RingMap . traverse traversingF . getRingMap - where - traversingF (KeyEntry entry) = KeyEntry <$> f entry - traversingF (ProxyEntry to Nothing) = pure $ ProxyEntry to Nothing - traversingF (ProxyEntry to (Just entry)) = ProxyEntry to . Just <$> traversingF entry - -- | entry of a 'RingMap' that holds a value and can also -- wrap around the lookup direction at the edges of the name space. @@ -113,23 +106,6 @@ rMapSize rmap = fromIntegral $ Map.size innerMap - oneIfEntry rmap minBound - on | isNothing (rMapLookup nid rmap') = 1 | otherwise = 0 - --- | whether the RingMap is empty (except for empty proxy entries) -nullRMap :: (Num k, Bounded k, Ord k) - => RingMap k a - -> Bool --- basic idea: look for a predecessor starting from the upper Map bound, --- Nothing indicates no entry being found -nullRMap = isNothing . rMapLookupPred maxBound - - --- | O(logn( Is the key a member of the RingMap? -memberRMap :: (Bounded k, Ord k) - => k - -> RingMap k a - -> Bool -memberRMap key = isJust . rMapLookup key - -- | a wrapper around lookup functions, making the lookup redirectable by a @ProxyEntry@ -- to simulate a modular ring lookupWrapper :: (Bounded k, Ord k, Num k) @@ -222,28 +198,12 @@ deleteRMapEntry nid = RingMap . Map.update modifier nid . getRingMap modifier (ProxyEntry idPointer _) = Just (ProxyEntry idPointer Nothing) modifier KeyEntry {} = Nothing --- TODO: rename this to elems rMapToList :: (Bounded k, Ord k) => RingMap k a -> [a] rMapToList = mapMaybe extractRingEntry . Map.elems . getRingMap --- TODO: rename this to toList -rMapToListWithKeys :: (Bounded k, Ord k) => RingMap k a -> [(k, a)] -rMapToListWithKeys = Map.foldrWithKey (\k v acc -> - maybe acc (\val -> (k, val):acc) $ extractRingEntry v - ) - [] - . getRingMap - rMapFromList :: (Bounded k, Ord k) => [(k, a)] -> RingMap k a rMapFromList = setRMapEntries - --- | this just merges the underlying 'Map.Map' s and does not check whether the --- ProxyEntry pointers are consistent, so better only create unions of --- equal-pointered RingMaps -unionRMap :: (Bounded k, Ord k) => RingMap k a -> RingMap k a -> RingMap k a -unionRMap a b = RingMap $ Map.union (getRingMap a) (getRingMap b) - -- | takes up to i entries from a 'RingMap' by calling a getter function on a -- *startAt* value and after that on the previously returned value. -- Stops once i entries have been taken or an entry has been encountered twice diff --git a/test/FediChordSpec.hs b/test/FediChordSpec.hs index 9a0ea9f..6a3ca5d 100644 --- a/test/FediChordSpec.hs +++ b/test/FediChordSpec.hs @@ -7,7 +7,6 @@ import Control.Concurrent.STM.TVar import Control.Exception import Data.ASN1.Parse (runParseASN1) import qualified Data.ByteString as BS -import qualified Data.HashMap.Strict as HMap import qualified Data.Map.Strict as Map import Data.Maybe (fromJust, isJust) import qualified Data.Set as Set @@ -19,7 +18,6 @@ import Hash2Pub.ASN1Coding import Hash2Pub.DHTProtocol import Hash2Pub.FediChord import Hash2Pub.FediChordTypes -import Hash2Pub.RingMap spec :: Spec spec = do @@ -223,16 +221,14 @@ spec = do , exampleNodeState {nid = fromInteger (-5)} ] } - qLoadReqPayload = LoadRequestPayload - { loadSegmentUpperBound = 1025 - } - qLoadResPayload = LoadResponsePayload - { loadSum = 3.141 - , loadRemainingTarget = -1.337 - , loadTotalCapacity = 2.21 - , loadSegmentLowerBound = 12 - } - + requestTemplate = Request { + requestID = 2342 + , sender = exampleNodeState + , part = 1 + , isFinalPart = True + , action = undefined + , payload = undefined + } responseTemplate = Response { requestID = 2342 , senderID = nid exampleNodeState @@ -241,7 +237,7 @@ spec = do , action = undefined , payload = undefined } - requestWith senderNode a pa = mkRequest senderNode 4545 a (Just pa) 2342 + requestWith a pa = requestTemplate {action = a, payload = Just pa} responseWith a pa = responseTemplate {action = a, payload = Just pa} encodeDecodeAndCheck msg = runParseASN1 parseMessage (encodeMessage msg) `shouldBe` pure msg @@ -252,20 +248,17 @@ spec = do } it "messages are encoded and decoded correctly from and to ASN1" $ do - localNS <- exampleLocalNode - encodeDecodeAndCheck $ requestWith localNS QueryID qidReqPayload - encodeDecodeAndCheck $ requestWith localNS Join jReqPayload - encodeDecodeAndCheck $ requestWith localNS Leave lReqPayload - encodeDecodeAndCheck $ requestWith localNS Stabilise stabReqPayload - encodeDecodeAndCheck $ requestWith localNS Ping pingReqPayload - encodeDecodeAndCheck $ requestWith localNS QueryLoad qLoadReqPayload + encodeDecodeAndCheck $ requestWith QueryID qidReqPayload + encodeDecodeAndCheck $ requestWith Join jReqPayload + encodeDecodeAndCheck $ requestWith Leave lReqPayload + encodeDecodeAndCheck $ requestWith Stabilise stabReqPayload + encodeDecodeAndCheck $ requestWith Ping pingReqPayload encodeDecodeAndCheck $ responseWith QueryID qidResPayload1 encodeDecodeAndCheck $ responseWith QueryID qidResPayload2 encodeDecodeAndCheck $ responseWith Join jResPayload encodeDecodeAndCheck $ responseWith Leave lResPayload encodeDecodeAndCheck $ responseWith Stabilise stabResPayload encodeDecodeAndCheck $ responseWith Ping pingResPayload - encodeDecodeAndCheck $ responseWith QueryLoad qLoadResPayload it "messages are encoded and decoded to ASN.1 DER properly" $ deserialiseMessage (fromJust $ Map.lookup 1 (serialiseMessage 652 $ responseWith Ping pingResPayload)) `shouldBe` Right (responseWith Ping pingResPayload) it "messages too large for a single packet can (often) be split into multiple parts" $ do @@ -304,13 +297,13 @@ exampleNodeState = RemoteNodeState { exampleLocalNode :: IO (LocalNodeState MockService) exampleLocalNode = do - realNodeSTM <- newTVarIO $ RealNode { - vservers = emptyRMap + realNode <- newTVarIO $ RealNode { + vservers = [] , nodeConfig = exampleFediConf , bootstrapNodes = confBootstrapNodes exampleFediConf , nodeService = MockService } - nodeStateInit realNodeSTM 0 + nodeStateInit realNode exampleFediConf :: FediChordConf