process and respond to join requests
- serialiseMessage now starts numbering parts from the first number it gets, to be able to continue responding after having ACKed previous parts contributes to #28
This commit is contained in:
parent
914e07a412
commit
e32f0c9146
|
@ -107,7 +107,7 @@ serialiseMessage maxBytesLength msg =
|
||||||
}):pls
|
}):pls
|
||||||
-- part starts at 1
|
-- part starts at 1
|
||||||
payloadParts :: Int -> Maybe [(Integer, ActionPayload)]
|
payloadParts :: Int -> Maybe [(Integer, ActionPayload)]
|
||||||
payloadParts i = zip [1..] . splitPayload i <$> actionPayload
|
payloadParts i = zip [(part msg)..] . splitPayload i <$> actionPayload
|
||||||
actionPayload = payload msg
|
actionPayload = payload msg
|
||||||
encodedMsgs i = Map.map encodeMsg $ messageParts i
|
encodedMsgs i = Map.map encodeMsg $ messageParts i
|
||||||
maxMsgLength = maximum . fmap BS.length . Map.elems
|
maxMsgLength = maximum . fmap BS.length . Map.elems
|
||||||
|
|
|
@ -27,9 +27,9 @@ module Hash2Pub.DHTProtocol
|
||||||
|
|
||||||
import Control.Concurrent.Async
|
import Control.Concurrent.Async
|
||||||
import Control.Concurrent.STM
|
import Control.Concurrent.STM
|
||||||
import Control.Concurrent.STM.TVar
|
|
||||||
import Control.Concurrent.STM.TBQueue
|
import Control.Concurrent.STM.TBQueue
|
||||||
import Control.Concurrent.STM.TQueue
|
import Control.Concurrent.STM.TQueue
|
||||||
|
import Control.Concurrent.STM.TVar
|
||||||
import Control.Exception
|
import Control.Exception
|
||||||
import Control.Monad (foldM, forM, forM_)
|
import Control.Monad (foldM, forM, forM_)
|
||||||
import qualified Data.ByteString as BS
|
import qualified Data.ByteString as BS
|
||||||
|
@ -170,6 +170,8 @@ ackRequest ownID req@Request{} = serialiseMessage sendMessageSize $ Response {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
-- | Dispatch incoming requests to the dedicated handling and response function, and enqueue
|
||||||
|
-- the response to be sent.
|
||||||
handleIncomingRequest :: LocalNodeStateSTM -- ^ the handling node
|
handleIncomingRequest :: LocalNodeStateSTM -- ^ the handling node
|
||||||
-> TQueue (BS.ByteString, SockAddr) -- ^ send queue
|
-> TQueue (BS.ByteString, SockAddr) -- ^ send queue
|
||||||
-> Set.Set FediChordMessage -- ^ all parts of the request to handle
|
-> Set.Set FediChordMessage -- ^ all parts of the request to handle
|
||||||
|
@ -187,32 +189,74 @@ handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do
|
||||||
maybe (pure ()) (
|
maybe (pure ()) (
|
||||||
mapM_ (\resp -> atomically $ writeTQueue sendQ (resp, sourceAddr))
|
mapM_ (\resp -> atomically $ writeTQueue sendQ (resp, sourceAddr))
|
||||||
)
|
)
|
||||||
(case action aPart of
|
=<< (case action aPart of
|
||||||
_ -> Just Map.empty) -- placeholder
|
-- Ping -> Just <$> respondPing nsSTM msgSet
|
||||||
-- Ping -> Just respondPing nsSTM msgSet
|
Join -> Just <$> respondJoin nsSTM msgSet
|
||||||
-- Join -> Just respondJoin nsSTM msgSet
|
|
||||||
-- -- ToDo: figure out what happens if not joined
|
-- -- ToDo: figure out what happens if not joined
|
||||||
-- QueryID -> Just respondQueryID nsSTM msgSet
|
-- QueryID -> Just <$> respondQueryID nsSTM msgSet
|
||||||
-- -- only when joined
|
-- -- only when joined
|
||||||
-- Leave -> if isJoined_ ns then Just respondLeave nsSTM msgSet else Nothing
|
-- Leave -> if isJoined_ ns then Just <$> respondLeave nsSTM msgSet else Nothing
|
||||||
-- -- only when joined
|
-- -- only when joined
|
||||||
-- Stabilise -> if isJoined_ ns then Just respondStabilise nsSTM msgSet else Nothing
|
-- Stabilise -> if isJoined_ ns then Just <$> respondStabilise nsSTM msgSet else Nothing
|
||||||
-- )
|
-- )
|
||||||
-- -- for single part request, response starts with part number 1. For multipart requests, response starts with part number n+1.
|
-- for single part request, response starts with part number 1. For multipart requests, response starts with part number n+1.
|
||||||
--
|
|
||||||
-- -- TODO: determine request type only from first part, but catch RecSelError on each record access when folding, because otherwise different request type parts can make this crash
|
-- TODO: determine request type only from first part, but catch RecSelError on each record access when folding, because otherwise different request type parts can make this crash
|
||||||
-- -- TODO: test case: mixed message types of parts
|
-- TODO: test case: mixed message types of parts
|
||||||
--
|
|
||||||
---- ....... response sending .......
|
) -- placeholder
|
||||||
--
|
|
||||||
---- this modifies node state, so locking and IO seems to be necessary.
|
|
||||||
---- Still try to keep as much code as possible pure
|
-- ....... response sending .......
|
||||||
--respondJoin :: LocalNodeStateSTM -> Set.Set FediChordMessage -> Map Integer BS.ByteString
|
|
||||||
--respondJoin nsSTM msgSet =
|
-- this modifies node state, so locking and IO seems to be necessary.
|
||||||
-- -- check whether the joining node falls into our responsibility
|
-- Still try to keep as much code as possible pure
|
||||||
-- -- if yes, adjust own predecessors/ successors and return those in a response
|
respondJoin :: LocalNodeStateSTM -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
|
||||||
-- -- if no: empty response or send a QueryID forwards response?
|
respondJoin nsSTM msgSet = do
|
||||||
-- -- TODO: notify service layer to copy over data now handled by the new joined node
|
-- atomically read and modify the node state according to the parsed request
|
||||||
|
responseMsg <- atomically $ do
|
||||||
|
nsSnap <- readTVar nsSTM
|
||||||
|
cache <- readTVar $ nodeCacheSTM nsSnap
|
||||||
|
let
|
||||||
|
aRequestPart = Set.elemAt 0 msgSet
|
||||||
|
senderNS = sender aRequestPart
|
||||||
|
responsibilityLookup = queryLocalCache nsSnap cache 1 (getNid senderNS)
|
||||||
|
thisNodeResponsible (FOUND _) = True
|
||||||
|
thisNodeResponsible (FORWARD _) = False
|
||||||
|
-- check whether the joining node falls into our responsibility
|
||||||
|
if thisNodeResponsible responsibilityLookup
|
||||||
|
then do
|
||||||
|
-- if yes, adjust own predecessors/ successors and return those in a response
|
||||||
|
let
|
||||||
|
newPreds = take (kNeighbours nsSnap) . sortBy (flip localCompare) $ getNid senderNS:predecessors nsSnap
|
||||||
|
joinedNS = setPredecessors newPreds nsSnap
|
||||||
|
responsePayload = JoinResponsePayload {
|
||||||
|
joinSuccessors = successors joinedNS
|
||||||
|
, joinPredecessors = predecessors joinedNS
|
||||||
|
, joinCache = toRemoteCache cache
|
||||||
|
}
|
||||||
|
joinResponse = Response {
|
||||||
|
responseTo = requestID aRequestPart
|
||||||
|
, senderID = getNid joinedNS
|
||||||
|
, part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1
|
||||||
|
, isFinalPart = False
|
||||||
|
, action = Join
|
||||||
|
, payload = Just responsePayload
|
||||||
|
}
|
||||||
|
writeTVar nsSTM joinedNS
|
||||||
|
pure joinResponse
|
||||||
|
-- otherwise respond with empty payload
|
||||||
|
else pure Response {
|
||||||
|
responseTo = requestID aRequestPart
|
||||||
|
, senderID = getNid nsSnap
|
||||||
|
, part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1
|
||||||
|
, isFinalPart = False
|
||||||
|
, action = Join
|
||||||
|
, payload = Nothing
|
||||||
|
}
|
||||||
|
|
||||||
|
pure $ serialiseMessage sendMessageSize responseMsg
|
||||||
|
-- TODO: notify service layer to copy over data now handled by the new joined node
|
||||||
|
|
||||||
-- ....... request sending .......
|
-- ....... request sending .......
|
||||||
|
|
||||||
|
@ -249,14 +293,17 @@ requestJoin toJoinOn ownStateSTM =
|
||||||
([], setPredecessors [] . setSuccessors [] $ ownState)
|
([], setPredecessors [] . setSuccessors [] $ ownState)
|
||||||
responses
|
responses
|
||||||
-- sort successors and predecessors
|
-- sort successors and predecessors
|
||||||
newState = setSuccessors (sortBy localCompare $ successors joinedStateUnsorted) . setPredecessors (sortBy localCompare $ predecessors joinedStateUnsorted) $ joinedStateUnsorted
|
newState = setSuccessors (take (kNeighbours joinedStateUnsorted) . sortBy localCompare $ successors joinedStateUnsorted) . setPredecessors (take (kNeighbours joinedStateUnsorted) . sortBy (flip localCompare) $ predecessors joinedStateUnsorted) $ joinedStateUnsorted
|
||||||
writeTVar ownStateSTM newState
|
writeTVar ownStateSTM newState
|
||||||
pure (cacheInsertQ, newState)
|
pure (cacheInsertQ, newState)
|
||||||
-- execute the cache insertions
|
-- execute the cache insertions
|
||||||
mapM_ (\f -> f joinedState) cacheInsertQ
|
mapM_ (\f -> f joinedState) cacheInsertQ
|
||||||
if responses == Set.empty
|
if responses == Set.empty
|
||||||
then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn)
|
then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn)
|
||||||
else pure $ Right ownStateSTM
|
else if null (predecessors joinedState) && null (successors joinedState)
|
||||||
|
then pure $ Left "join error: no predecessors or successors"
|
||||||
|
-- successful join
|
||||||
|
else pure $ Right ownStateSTM
|
||||||
)
|
)
|
||||||
`catch` (\e -> pure . Left $ displayException (e :: IOException))
|
`catch` (\e -> pure . Left $ displayException (e :: IOException))
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,7 @@
|
||||||
module Hash2Pub.ProtocolTypes where
|
module Hash2Pub.ProtocolTypes where
|
||||||
|
|
||||||
|
import qualified Data.Map as Map
|
||||||
|
import Data.Maybe (mapMaybe)
|
||||||
import qualified Data.Set as Set
|
import qualified Data.Set as Set
|
||||||
import Data.Time.Clock.POSIX (POSIXTime)
|
import Data.Time.Clock.POSIX (POSIXTime)
|
||||||
|
|
||||||
|
@ -83,11 +85,16 @@ data RemoteCacheEntry = RemoteCacheEntry RemoteNodeState POSIXTime
|
||||||
instance Ord RemoteCacheEntry where
|
instance Ord RemoteCacheEntry where
|
||||||
(RemoteCacheEntry ns1 _) `compare` (RemoteCacheEntry ns2 _) = nid ns1 `compare` nid ns2
|
(RemoteCacheEntry ns1 _) `compare` (RemoteCacheEntry ns2 _) = nid ns1 `compare` nid ns2
|
||||||
|
|
||||||
|
-- | Extracts a 'RemoteCacheEntry' from the indirections of a 'CacheEntry', if it holds one
|
||||||
toRemoteCacheEntry :: CacheEntry -> Maybe RemoteCacheEntry
|
toRemoteCacheEntry :: CacheEntry -> Maybe RemoteCacheEntry
|
||||||
toRemoteCacheEntry (NodeEntry _ ns ts) = Just $ RemoteCacheEntry ns ts
|
toRemoteCacheEntry (NodeEntry _ ns ts) = Just $ RemoteCacheEntry ns ts
|
||||||
toRemoteCacheEntry (ProxyEntry _ (Just entry@NodeEntry{})) = toRemoteCacheEntry entry
|
toRemoteCacheEntry (ProxyEntry _ (Just entry@NodeEntry{})) = toRemoteCacheEntry entry
|
||||||
toRemoteCacheEntry _ = Nothing
|
toRemoteCacheEntry _ = Nothing
|
||||||
|
|
||||||
|
-- | a list of all entries of a 'NodeCache' as 'RemoteCacheEntry', useful for cache transfers
|
||||||
|
toRemoteCache :: NodeCache -> [RemoteCacheEntry]
|
||||||
|
toRemoteCache cache = mapMaybe toRemoteCacheEntry $ Map.elems cache
|
||||||
|
|
||||||
-- | extract the 'NodeState' from a 'RemoteCacheEntry'
|
-- | extract the 'NodeState' from a 'RemoteCacheEntry'
|
||||||
remoteNode :: RemoteCacheEntry -> RemoteNodeState
|
remoteNode :: RemoteCacheEntry -> RemoteNodeState
|
||||||
remoteNode (RemoteCacheEntry ns _) = ns
|
remoteNode (RemoteCacheEntry ns _) = ns
|
||||||
|
|
Loading…
Reference in a new issue