From e32f0c91462cf891afdb288c4fc68c9aeb5097d8 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Fri, 5 Jun 2020 21:37:20 +0200 Subject: [PATCH] process and respond to join requests - serialiseMessage now starts numbering parts from the first number it gets, to be able to continue responding after having ACKed previous parts contributes to #28 --- src/Hash2Pub/ASN1Coding.hs | 2 +- src/Hash2Pub/DHTProtocol.hs | 97 ++++++++++++++++++++++++++--------- src/Hash2Pub/ProtocolTypes.hs | 7 +++ 3 files changed, 80 insertions(+), 26 deletions(-) diff --git a/src/Hash2Pub/ASN1Coding.hs b/src/Hash2Pub/ASN1Coding.hs index 6bbb9df..25e435b 100644 --- a/src/Hash2Pub/ASN1Coding.hs +++ b/src/Hash2Pub/ASN1Coding.hs @@ -107,7 +107,7 @@ serialiseMessage maxBytesLength msg = }):pls -- part starts at 1 payloadParts :: Int -> Maybe [(Integer, ActionPayload)] - payloadParts i = zip [1..] . splitPayload i <$> actionPayload + payloadParts i = zip [(part msg)..] . splitPayload i <$> actionPayload actionPayload = payload msg encodedMsgs i = Map.map encodeMsg $ messageParts i maxMsgLength = maximum . fmap BS.length . Map.elems diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index e30fe26..b759093 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -27,9 +27,9 @@ module Hash2Pub.DHTProtocol import Control.Concurrent.Async import Control.Concurrent.STM -import Control.Concurrent.STM.TVar import Control.Concurrent.STM.TBQueue import Control.Concurrent.STM.TQueue +import Control.Concurrent.STM.TVar import Control.Exception import Control.Monad (foldM, forM, forM_) import qualified Data.ByteString as BS @@ -170,6 +170,8 @@ ackRequest ownID req@Request{} = serialiseMessage sendMessageSize $ Response { } +-- | Dispatch incoming requests to the dedicated handling and response function, and enqueue +-- the response to be sent. handleIncomingRequest :: LocalNodeStateSTM -- ^ the handling node -> TQueue (BS.ByteString, SockAddr) -- ^ send queue -> Set.Set FediChordMessage -- ^ all parts of the request to handle @@ -187,32 +189,74 @@ handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do maybe (pure ()) ( mapM_ (\resp -> atomically $ writeTQueue sendQ (resp, sourceAddr)) ) - (case action aPart of - _ -> Just Map.empty) -- placeholder --- Ping -> Just respondPing nsSTM msgSet --- Join -> Just respondJoin nsSTM msgSet + =<< (case action aPart of +-- Ping -> Just <$> respondPing nsSTM msgSet + Join -> Just <$> respondJoin nsSTM msgSet -- -- ToDo: figure out what happens if not joined --- QueryID -> Just respondQueryID nsSTM msgSet +-- QueryID -> Just <$> respondQueryID nsSTM msgSet -- -- only when joined --- Leave -> if isJoined_ ns then Just respondLeave nsSTM msgSet else Nothing +-- Leave -> if isJoined_ ns then Just <$> respondLeave nsSTM msgSet else Nothing -- -- only when joined --- Stabilise -> if isJoined_ ns then Just respondStabilise nsSTM msgSet else Nothing +-- Stabilise -> if isJoined_ ns then Just <$> respondStabilise nsSTM msgSet else Nothing -- ) --- -- for single part request, response starts with part number 1. For multipart requests, response starts with part number n+1. --- --- -- TODO: determine request type only from first part, but catch RecSelError on each record access when folding, because otherwise different request type parts can make this crash --- -- TODO: test case: mixed message types of parts --- ----- ....... response sending ....... --- ----- this modifies node state, so locking and IO seems to be necessary. ----- Still try to keep as much code as possible pure ---respondJoin :: LocalNodeStateSTM -> Set.Set FediChordMessage -> Map Integer BS.ByteString ---respondJoin nsSTM msgSet = --- -- check whether the joining node falls into our responsibility --- -- if yes, adjust own predecessors/ successors and return those in a response --- -- if no: empty response or send a QueryID forwards response? --- -- TODO: notify service layer to copy over data now handled by the new joined node + -- for single part request, response starts with part number 1. For multipart requests, response starts with part number n+1. + + -- TODO: determine request type only from first part, but catch RecSelError on each record access when folding, because otherwise different request type parts can make this crash + -- TODO: test case: mixed message types of parts + + ) -- placeholder + + +-- ....... response sending ....... + +-- this modifies node state, so locking and IO seems to be necessary. +-- Still try to keep as much code as possible pure +respondJoin :: LocalNodeStateSTM -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString) +respondJoin nsSTM msgSet = do + -- atomically read and modify the node state according to the parsed request + responseMsg <- atomically $ do + nsSnap <- readTVar nsSTM + cache <- readTVar $ nodeCacheSTM nsSnap + let + aRequestPart = Set.elemAt 0 msgSet + senderNS = sender aRequestPart + responsibilityLookup = queryLocalCache nsSnap cache 1 (getNid senderNS) + thisNodeResponsible (FOUND _) = True + thisNodeResponsible (FORWARD _) = False + -- check whether the joining node falls into our responsibility + if thisNodeResponsible responsibilityLookup + then do + -- if yes, adjust own predecessors/ successors and return those in a response + let + newPreds = take (kNeighbours nsSnap) . sortBy (flip localCompare) $ getNid senderNS:predecessors nsSnap + joinedNS = setPredecessors newPreds nsSnap + responsePayload = JoinResponsePayload { + joinSuccessors = successors joinedNS + , joinPredecessors = predecessors joinedNS + , joinCache = toRemoteCache cache + } + joinResponse = Response { + responseTo = requestID aRequestPart + , senderID = getNid joinedNS + , part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1 + , isFinalPart = False + , action = Join + , payload = Just responsePayload + } + writeTVar nsSTM joinedNS + pure joinResponse + -- otherwise respond with empty payload + else pure Response { + responseTo = requestID aRequestPart + , senderID = getNid nsSnap + , part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1 + , isFinalPart = False + , action = Join + , payload = Nothing + } + + pure $ serialiseMessage sendMessageSize responseMsg + -- TODO: notify service layer to copy over data now handled by the new joined node -- ....... request sending ....... @@ -249,14 +293,17 @@ requestJoin toJoinOn ownStateSTM = ([], setPredecessors [] . setSuccessors [] $ ownState) responses -- sort successors and predecessors - newState = setSuccessors (sortBy localCompare $ successors joinedStateUnsorted) . setPredecessors (sortBy localCompare $ predecessors joinedStateUnsorted) $ joinedStateUnsorted + newState = setSuccessors (take (kNeighbours joinedStateUnsorted) . sortBy localCompare $ successors joinedStateUnsorted) . setPredecessors (take (kNeighbours joinedStateUnsorted) . sortBy (flip localCompare) $ predecessors joinedStateUnsorted) $ joinedStateUnsorted writeTVar ownStateSTM newState pure (cacheInsertQ, newState) -- execute the cache insertions mapM_ (\f -> f joinedState) cacheInsertQ if responses == Set.empty then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn) - else pure $ Right ownStateSTM + else if null (predecessors joinedState) && null (successors joinedState) + then pure $ Left "join error: no predecessors or successors" + -- successful join + else pure $ Right ownStateSTM ) `catch` (\e -> pure . Left $ displayException (e :: IOException)) diff --git a/src/Hash2Pub/ProtocolTypes.hs b/src/Hash2Pub/ProtocolTypes.hs index 5a594ca..c6348b3 100644 --- a/src/Hash2Pub/ProtocolTypes.hs +++ b/src/Hash2Pub/ProtocolTypes.hs @@ -1,5 +1,7 @@ module Hash2Pub.ProtocolTypes where +import qualified Data.Map as Map +import Data.Maybe (mapMaybe) import qualified Data.Set as Set import Data.Time.Clock.POSIX (POSIXTime) @@ -83,11 +85,16 @@ data RemoteCacheEntry = RemoteCacheEntry RemoteNodeState POSIXTime instance Ord RemoteCacheEntry where (RemoteCacheEntry ns1 _) `compare` (RemoteCacheEntry ns2 _) = nid ns1 `compare` nid ns2 +-- | Extracts a 'RemoteCacheEntry' from the indirections of a 'CacheEntry', if it holds one toRemoteCacheEntry :: CacheEntry -> Maybe RemoteCacheEntry toRemoteCacheEntry (NodeEntry _ ns ts) = Just $ RemoteCacheEntry ns ts toRemoteCacheEntry (ProxyEntry _ (Just entry@NodeEntry{})) = toRemoteCacheEntry entry toRemoteCacheEntry _ = Nothing +-- | a list of all entries of a 'NodeCache' as 'RemoteCacheEntry', useful for cache transfers +toRemoteCache :: NodeCache -> [RemoteCacheEntry] +toRemoteCache cache = mapMaybe toRemoteCacheEntry $ Map.elems cache + -- | extract the 'NodeState' from a 'RemoteCacheEntry' remoteNode :: RemoteCacheEntry -> RemoteNodeState remoteNode (RemoteCacheEntry ns _) = ns