Hash2Pub/src/Hash2Pub/DHTProtocol.hs

891 lines
45 KiB
Haskell

{-# LANGUAGE FlexibleContexts #-}
module Hash2Pub.DHTProtocol
( QueryResponse (..)
, queryLocalCache
, addCacheEntry
, addCacheEntryPure
, addNodeAsVerified
, addNodeAsVerifiedPure
, deleteCacheEntry
, deserialiseMessage
, RemoteCacheEntry(..)
, toRemoteCacheEntry
, remoteNode
, Action(..)
, ActionPayload(..)
, FediChordMessage(..)
, maximumParts
, sendQueryIdMessages
, requestQueryID
, requestJoin
, requestLeave
, requestPing
, requestStabilise
, lookupMessage
, sendRequestTo
, queryIdLookupLoop
, queueAddEntries
, queueDeleteEntries
, queueDeleteEntry
, resolve
, mkSendSocket
, mkServerSocket
, handleIncomingRequest
, ackRequest
, isPossibleSuccessor
, isPossiblePredecessor
, isInOwnResponsibilitySlice
, isJoined
, closestCachePredecessors
)
where
import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Concurrent.STM.TBQueue
import Control.Concurrent.STM.TQueue
import Control.Concurrent.STM.TVar
import Control.Exception
import Control.Monad (foldM, forM, forM_, void, when)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Except (MonadError(..), runExceptT)
import qualified Data.ByteString as BS
import Data.Either (rights)
import Data.Foldable (foldl', foldr', foldrM)
import Data.Functor.Identity
import Data.IP (IPv6, fromHostAddress6,
toHostAddress6)
import Data.List (delete, nub, sortBy)
import qualified Data.Map as Map
import Data.Maybe (fromJust, fromMaybe, isJust,
isNothing, mapMaybe, maybe)
import qualified Data.Set as Set
import Data.Time.Clock.POSIX
import Network.Socket hiding (recv, recvFrom, send,
sendTo)
import Network.Socket.ByteString
import Safe
import System.Random
import System.Timeout
import Hash2Pub.ASN1Coding
import Hash2Pub.FediChordTypes (CacheEntry (..),
CacheEntry (..),
FediChordConf (..),
HasKeyID (..),
LocalNodeState (..),
LocalNodeStateSTM, NodeCache,
NodeID, NodeState (..),
RealNode (..), RealNodeSTM,
RemoteNodeState (..),
RingEntry (..), RingMap (..),
Service (..), addRMapEntry,
addRMapEntryWith,
cacheGetNodeStateUnvalidated,
cacheLookup, cacheLookupPred,
cacheLookupSucc, genNodeID,
getKeyID, localCompare,
rMapFromList, rMapLookupPred,
rMapLookupSucc,
setPredecessors, setSuccessors)
import Hash2Pub.ProtocolTypes
import Debug.Trace (trace)
-- === queries ===
-- TODO: evaluate more fine-grained argument passing to allow granular locking
-- | look up an ID to either claim responsibility for it or return the closest l nodes from the local cache
queryLocalCache :: LocalNodeState s -> NodeCache -> Int -> NodeID -> QueryResponse
queryLocalCache ownState nCache lBestNodes targetID
-- as target ID falls between own ID and first predecessor, it is handled by this node
-- This only makes sense if the node is part of the DHT by having joined.
-- A default answer to nodes querying an unjoined node is provided by 'respondQueryID'.
| isJoined ownState && targetID `isInOwnResponsibilitySlice` ownState = FOUND . toRemoteNodeState $ ownState
-- my interpretation: the "l best next hops" are the l-1 closest preceding nodes and
-- the closest succeeding node (like with the p initiated parallel queries
| otherwise = FORWARD $ closestSuccessor `Set.union` closestCachePredecessors (lBestNodes-1) targetID nCache
where
closestSuccessor :: Set.Set RemoteCacheEntry
closestSuccessor = maybe Set.empty (Set.singleton . toRemoteCacheEntry) $ cacheLookupSucc targetID nCache
-- | look up the 3 direct predecessor cache entries of a given ID
closestCachePredecessors :: (Integral n)
=> n -- ^ number of entries to look up
-> NodeID -- ^ target ID to get the predecessors of
-> NodeCache -- ^ cache to use for lookup
-> Set.Set RemoteCacheEntry
closestCachePredecessors 0 _ _ = Set.empty
closestCachePredecessors remainingLookups lastID nCache
| remainingLookups < 0 = Set.empty
| otherwise =
let result = cacheLookupPred lastID nCache
in
case toRemoteCacheEntry <$> result of
Nothing -> Set.empty
Just nPred@(RemoteCacheEntry ns ts) -> Set.insert nPred $ closestCachePredecessors (remainingLookups-1) (nid ns) nCache
-- | Determines whether a lookup key is within the responsibility slice of a node,
-- as it falls between its first predecessor and the node itself.
-- Looks up the successor of the lookup key on a 'RingMap' representation of the
-- predecessor list with the node itself added. If the result is the same as the node
-- itself then it falls into the responsibility interval.
isInOwnResponsibilitySlice :: HasKeyID NodeID a => a -> LocalNodeState s -> Bool
isInOwnResponsibilitySlice lookupTarget ownNs = (fst <$> rMapLookupSucc (getKeyID lookupTarget :: NodeID) predecessorRMap) == pure (getNid ownNs)
where
predecessorList = predecessors ownNs
-- add node itself to RingMap representation, to distinguish between
-- responsibility of own node and predecessor
predecessorRMap = addRMapEntry (getKeyID ownRemote) ownRemote $ rMapFromList (keyValuePair <$> predecessorList) :: RingMap NodeID RemoteNodeState
ownRemote = toRemoteNodeState ownNs
closestPredecessor = headMay predecessorList
isPossiblePredecessor :: HasKeyID NodeID a => a -> LocalNodeState s -> Bool
isPossiblePredecessor = isInOwnResponsibilitySlice
isPossibleSuccessor :: HasKeyID NodeID a => a -> LocalNodeState s -> Bool
isPossibleSuccessor lookupTarget ownNs = (fst <$> rMapLookupPred (getKeyID lookupTarget :: NodeID) successorRMap) == pure (getNid ownNs)
where
successorList = successors ownNs
successorRMap = addRMapEntry (getKeyID ownRemote) ownRemote $ rMapFromList (keyValuePair <$> successorList)
ownRemote = toRemoteNodeState ownNs
closestSuccessor = headMay successorList
-- cache operations
-- | update or insert a 'RemoteCacheEntry' into the cache,
-- converting it to a local 'CacheEntry'
addCacheEntry :: RemoteCacheEntry -- ^ a remote cache entry received from network
-> NodeCache -- ^ node cache to insert to
-> IO NodeCache -- ^ new node cache with the element inserted
addCacheEntry entry cache = do
now <- getPOSIXTime
pure $ addCacheEntryPure now entry cache
-- | pure version of 'addCacheEntry' with current time explicitly specified as argument
addCacheEntryPure :: POSIXTime -- ^ current time
-> RemoteCacheEntry -- ^ a remote cache entry received from network
-> NodeCache -- ^ node cache to insert to
-> NodeCache -- ^ new node cache with the element inserted
addCacheEntryPure now (RemoteCacheEntry ns ts) cache =
let
-- TODO: limit diffSeconds to some maximum value to prevent malicious nodes from inserting entries valid nearly until eternity
timestamp' = if ts <= now then ts else now
newEntry = CacheEntry False ns timestamp'
newCache = addRMapEntryWith insertCombineFunction (getKeyID newEntry) newEntry cache
insertCombineFunction newVal@(KeyEntry (CacheEntry newValidationState newNode newTimestamp)) oldVal =
case oldVal of
ProxyEntry n _ -> ProxyEntry n (Just newVal)
KeyEntry (CacheEntry oldValidationState _ oldTimestamp) -> KeyEntry (CacheEntry oldValidationState newNode (max oldTimestamp newTimestamp))
in
newCache
-- | delete the node with given ID from cache
deleteCacheEntry :: NodeID -- ^ID of the node to be deleted
-> NodeCache -- ^cache to delete from
-> NodeCache -- ^cache without the specified element
deleteCacheEntry nid = RingMap . Map.update modifier nid . getRingMap
where
modifier (ProxyEntry idPointer _) = Just (ProxyEntry idPointer Nothing)
modifier KeyEntry {} = Nothing
-- | Add a 'RemoteNodeState' to the node cache marked as verified.
-- If an entry already exists, it is replaced by the new verified one.
addNodeAsVerified :: RemoteNodeState
-> NodeCache
-> IO NodeCache
addNodeAsVerified node cache = do
now <- getPOSIXTime
pure $ addNodeAsVerifiedPure now node cache
-- | Pure variant of 'addNodeAsVerified' with current time explicitly specified as an argument
addNodeAsVerifiedPure :: POSIXTime
-> RemoteNodeState
-> NodeCache
-> NodeCache
addNodeAsVerifiedPure now node = addRMapEntry (getKeyID node) (CacheEntry True node now)
-- | Mark a cache entry as verified after pinging it, possibly bumping its timestamp.
markCacheEntryAsVerified :: Maybe POSIXTime -- ^ the (current) timestamp to be
-- given to the entry, or Nothing
-> NodeID -- ^ which node to mark
-> NodeCache -- ^ current node cache
-> NodeCache -- ^ new NodeCache with the updated entry
markCacheEntryAsVerified timestamp nid = RingMap . Map.adjust adjustFunc nid . getRingMap
where
adjustFunc (KeyEntry (CacheEntry _ ns ts)) = KeyEntry (CacheEntry True ns $ fromMaybe ts timestamp)
adjustFunc (ProxyEntry _ (Just entry)) = adjustFunc entry
adjustFunc entry = entry
-- | uses the successor and predecessor list of a node as an indicator for whether a
-- node has properly joined the DHT
isJoined :: LocalNodeState s -> Bool
isJoined ns = not . all null $ [successors ns, predecessors ns]
-- | the size limit to be used when serialising messages for sending
sendMessageSize :: Num i => i
sendMessageSize = 1200
-- ====== message send and receive operations ======
-- encode the response to a request that just signals successful receipt
ackRequest :: NodeID -> FediChordMessage -> Map.Map Integer BS.ByteString
ackRequest ownID req@Request{} = serialiseMessage sendMessageSize $ Response {
requestID = requestID req
, senderID = ownID
, part = part req
, isFinalPart = False
, action = action req
, payload = Nothing
}
ackRequest _ _ = Map.empty
-- | Dispatch incoming requests to the dedicated handling and response function, and enqueue
-- the response to be sent.
handleIncomingRequest :: Service s (RealNodeSTM s)
=> LocalNodeStateSTM s -- ^ the handling node
-> TQueue (BS.ByteString, SockAddr) -- ^ send queue
-> Set.Set FediChordMessage -- ^ all parts of the request to handle
-> SockAddr -- ^ source address of the request
-> IO ()
handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do
ns <- readTVarIO nsSTM
-- add nodestate to cache
now <- getPOSIXTime
case headMay . Set.elems $ msgSet of
Nothing -> pure ()
Just aPart -> do
let (SockAddrInet6 _ _ sourceIP _) = sourceAddr
queueAddEntries (Identity $ RemoteCacheEntry (sender aPart) now) ns
-- distinguish on whether and how to respond. If responding, pass message to response generating function and write responses to send queue
maybe (pure ()) (
mapM_ (\resp -> atomically $ writeTQueue sendQ (resp, sourceAddr))
)
=<< (case action aPart of
Ping -> Just <$> respondPing nsSTM msgSet
Join -> dropSpoofedIDs sourceIP nsSTM msgSet respondJoin
-- ToDo: figure out what happens if not joined
QueryID -> Just <$> respondQueryID nsSTM msgSet
-- only when joined
Leave -> if isJoined ns then dropSpoofedIDs sourceIP nsSTM msgSet respondLeave else pure Nothing
Stabilise -> if isJoined ns then dropSpoofedIDs sourceIP nsSTM msgSet respondStabilise else pure Nothing
)
-- for single part request, response starts with part number 1. For multipart requests, response starts with part number n+1.
-- TODO: determine request type only from first part, but catch RecSelError on each record access when folding, because otherwise different request type parts can make this crash
-- TODO: test case: mixed message types of parts
where
-- | Filter out requests with spoofed node IDs by recomputing the ID using
-- the sender IP.
-- For valid (non-spoofed) sender IDs, the passed responder function is invoked.
dropSpoofedIDs :: HostAddress6 -- msg source address
-> LocalNodeStateSTM s
-> Set.Set FediChordMessage -- message parts of the request
-> (LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)) -- reponder function to be invoked for valid requests
-> IO (Maybe (Map.Map Integer BS.ByteString))
dropSpoofedIDs addr nsSTM' msgSet' responder =
let
aRequestPart = Set.elemAt 0 msgSet
senderNs = sender aRequestPart
givenSenderID = getNid senderNs
recomputedID = genNodeID addr (getDomain senderNs) (fromInteger $ getVServerID senderNs)
in
if recomputedID == givenSenderID
then Just <$> responder nsSTM' msgSet'
else pure Nothing
-- ....... response sending .......
-- TODO: could all these respond* functions be in STM instead of IO?
-- | execute a key ID lookup on local cache and respond with the result
respondQueryID :: LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
respondQueryID nsSTM msgSet = do
-- this message cannot be split reasonably, so just
-- consider the first payload
let
aRequestPart = Set.elemAt 0 msgSet
senderID = getNid . sender $ aRequestPart
senderPayload = foldr' (\msg plAcc ->
if isNothing plAcc && isJust (payload msg)
then payload msg
else plAcc
) Nothing msgSet
-- return only empty message serialisation if no payload was included in parts
maybe (pure Map.empty) (\senderPayload' -> do
responseMsg <- atomically $ do
nsSnap <- readTVar nsSTM
cache <- readTVar $ nodeCacheSTM nsSnap
let
responsePayload = QueryIDResponsePayload {
queryResult = if isJoined nsSnap
then queryLocalCache nsSnap cache (fromIntegral $ queryLBestNodes senderPayload') (queryTargetID senderPayload')
-- if not joined yet, attract responsibility for
-- all keys to make bootstrapping possible
else FOUND (toRemoteNodeState nsSnap)
}
queryResponseMsg = Response {
requestID = requestID aRequestPart
, senderID = getNid nsSnap
, part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1
, isFinalPart = False
, action = QueryID
, payload = Just responsePayload
}
pure queryResponseMsg
pure $ serialiseMessage sendMessageSize responseMsg
) senderPayload
-- | Respond to a Leave request by removing the leaving node from local data structures
-- and confirming with response.
respondLeave :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
respondLeave nsSTM msgSet = do
-- combine payload of all parts
let (requestPreds, requestSuccs) = foldr' (\msg (predAcc, succAcc) ->
(maybe predAcc (++ predAcc) (leavePredecessors <$> payload msg)
,maybe succAcc (++ succAcc) (leaveSuccessors <$> payload msg))
)
([],[]) msgSet
aRequestPart = Set.elemAt 0 msgSet
leaveSenderID = getNid . sender $ aRequestPart
responseMsg <- atomically $ do
nsSnap <- readTVar nsSTM
-- remove leaving node from successors, predecessors and NodeCache
writeTQueue (cacheWriteQueue nsSnap) $ deleteCacheEntry leaveSenderID
writeTVar nsSTM $
-- add predecessors and successors of leaving node to own lists
setPredecessors (filter ((/=) leaveSenderID . getNid) $ requestPreds <> predecessors nsSnap)
. setSuccessors (filter ((/=) leaveSenderID . getNid) $ requestSuccs <> successors nsSnap) $ nsSnap
let leaveResponse = Response {
requestID = requestID aRequestPart
, senderID = getNid nsSnap
, part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1
, isFinalPart = False
, action = Leave
, payload = Just LeaveResponsePayload
}
pure leaveResponse
-- if awaiting an incoming service data migration, collect the lock without blocking this thread
when (maybe False leaveDoMigration (payload aRequestPart)) $ do
ownService <- atomically $ nodeService <$> (readTVar nsSTM >>= (readTVar . parentRealNode))
void (forkIO $ waitForMigrationFrom ownService leaveSenderID)
pure $ serialiseMessage sendMessageSize responseMsg
-- | respond to stabilise requests by returning successor and predecessor list
respondStabilise :: LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
respondStabilise nsSTM msgSet = do
nsSnap <- readTVarIO nsSTM
let
aRequestPart = Set.elemAt 0 msgSet
responsePayload = StabiliseResponsePayload {
stabiliseSuccessors = successors nsSnap
, stabilisePredecessors = predecessors nsSnap
}
stabiliseResponse = Response {
requestID = requestID aRequestPart
, senderID = getNid nsSnap
, part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1
, isFinalPart = False
, action = Stabilise
, payload = Just responsePayload
}
-- TODO: return service endpoint for copying over key data
pure $ serialiseMessage sendMessageSize stabiliseResponse
-- | respond to Ping request by returning all active vserver NodeStates
respondPing :: LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
respondPing nsSTM msgSet = do
-- TODO: respond with all active VS when implementing k-choices
nsSnap <- readTVarIO nsSTM
let
aRequestPart = Set.elemAt 0 msgSet
responsePayload = PingResponsePayload { pingNodeStates = [ toRemoteNodeState nsSnap ] }
pingResponse = Response {
requestID = requestID aRequestPart
, senderID = getNid nsSnap
, part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1
, isFinalPart = False
, action = Ping
, payload = Just responsePayload
}
pure $ serialiseMessage sendMessageSize pingResponse
respondJoin :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
respondJoin nsSTM msgSet = do
-- atomically read and modify the node state according to the parsed request
(dataMigration, responseMsg) <- atomically $ do
nsSnap <- readTVar nsSTM
cache <- readTVar $ nodeCacheSTM nsSnap
let
aRequestPart = Set.elemAt 0 msgSet
senderNS = sender aRequestPart
-- if not joined yet, attract responsibility for
-- all keys to make bootstrapping possible
responsibilityLookup = if isJoined nsSnap then queryLocalCache nsSnap cache 1 (getNid senderNS) else FOUND (toRemoteNodeState nsSnap)
thisNodeResponsible (FOUND _) = True
thisNodeResponsible (FORWARD _) = False
-- check whether the joining node falls into our responsibility
if thisNodeResponsible responsibilityLookup
then do
-- if yes, adjust own predecessors/ successors and return those in a response
let
newPreds = senderNS:predecessors nsSnap
joinedNS = setPredecessors newPreds nsSnap
responsePayload = JoinResponsePayload {
joinSuccessors = successors joinedNS
, joinPredecessors = predecessors joinedNS
, joinCache = toRemoteCache cache
}
joinResponse = Response {
requestID = requestID aRequestPart
, senderID = getNid joinedNS
, part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1
, isFinalPart = False
, action = Join
, payload = Just responsePayload
}
writeTVar nsSTM joinedNS
ownService <- nodeService <$> readTVar (parentRealNode nsSnap)
let
serviceDataMigrator = migrateData ownService (getNid nsSnap) lowerKeyBound (getNid senderNS) (getDomain senderNS, fromIntegral $ getServicePort senderNS)
lowerKeyBound = maybe (getNid nsSnap) getNid $ headMay (predecessors nsSnap)
pure (Just serviceDataMigrator, joinResponse)
-- otherwise respond with empty payload
else pure (Nothing, Response {
requestID = requestID aRequestPart
, senderID = getNid nsSnap
, part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1
, isFinalPart = False
, action = Join
, payload = Nothing
})
-- as DHT response is required immediately, fork the service data migration push
-- into a new thread. That's kind of ugly but the best I can think of so far
when (isJust dataMigration) (forkIO (fromJust dataMigration >> pure ()) >> pure ())
pure $ serialiseMessage sendMessageSize responseMsg
-- TODO: notify service layer to copy over data now handled by the new joined node
-- ....... request sending .......
-- | send a join request and return the joined 'LocalNodeState' including neighbours
requestJoin :: (NodeState a, Service s (RealNodeSTM s)) => a -- ^ currently responsible node to be contacted
-> LocalNodeStateSTM s -- ^ joining NodeState
-> IO (Either String (LocalNodeStateSTM s)) -- ^ node after join with all its new information
requestJoin toJoinOn ownStateSTM = do
ownState <- readTVarIO ownStateSTM
prn <- readTVarIO $ parentRealNode ownState
srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ownState)
bracket (mkSendSocket srcAddr (getDomain toJoinOn) (getDhtPort toJoinOn)) close (\sock -> do
-- extract own state for getting request information
responses <- sendRequestTo (\rid -> Request rid (toRemoteNodeState ownState) 1 True Join (Just JoinRequestPayload)) sock
(cacheInsertQ, joinedState) <- atomically $ do
stateSnap <- readTVar ownStateSTM
let
(cacheInsertQ, predAccSet, succAccSet) = foldl'
(\(insertQ, predAccSet', succAccSet') msg ->
let
insertQ' = maybe insertQ (\msgPl ->
-- collect list of insertion statements into global cache
queueAddEntries (joinCache msgPl) : insertQ
) $ payload msg
-- collect received predecessors and successors
predAccSet'' = maybe predAccSet' (
foldr' Set.insert predAccSet' . joinPredecessors
) $ payload msg
succAccSet'' = maybe succAccSet' (
foldr' Set.insert succAccSet' . joinSuccessors
) $ payload msg
in
(insertQ', predAccSet'', succAccSet'')
)
-- reset predecessors and successors
([], Set.empty, Set.empty)
responses
-- sort, slice and set the accumulated successors and predecessors
-- the contacted node itself is a successor as well and, with few
-- nodes, can be a predecessor as well
newState = setSuccessors (toRemoteNodeState toJoinOn:Set.elems succAccSet) . setPredecessors (toRemoteNodeState toJoinOn:Set.elems predAccSet) $ stateSnap
writeTVar ownStateSTM newState
pure (cacheInsertQ, newState)
-- execute the cache insertions
mapM_ (\f -> f joinedState) cacheInsertQ
if responses == Set.empty
then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn)
else do
-- wait for migration data to be completely received
waitForMigrationFrom (nodeService prn) (getNid toJoinOn)
pure $ Right ownStateSTM
)
`catch` (\e -> pure . Left $ displayException (e :: IOException))
-- | Send a 'QueryID' 'Request' for getting the node that handles a certain key ID.
requestQueryID :: (MonadIO m, MonadError String m)
=> LocalNodeState s -- ^ NodeState of the querying node
-> NodeID -- ^ target key ID to look up
-> m RemoteNodeState -- ^ the node responsible for handling that key
-- 1. do a local lookup for the l closest nodes
-- 2. create l sockets
-- 3. send a message async concurrently to all l nodes
-- 4. collect the results, insert them into cache
-- 5. repeat until FOUND (problem: new entries not necessarily already in cache, explicitly compare with closer results)
-- TODO: deal with lookup failures
requestQueryID ns targetID = do
firstCacheSnapshot <- liftIO . readTVarIO . nodeCacheSTM $ ns
-- TODO: make maxAttempts configurable
queryIdLookupLoop firstCacheSnapshot ns 50 targetID
-- | like 'requestQueryID, but allows passing of a custom cache, e.g. for joining
queryIdLookupLoop :: (MonadIO m, MonadError String m) => NodeCache -> LocalNodeState s -> Int -> NodeID -> m RemoteNodeState
-- return node itself as default fallback value against infinite recursion.
-- TODO: consider using an Either instead of a default value
queryIdLookupLoop _ ns 0 _ = throwError "exhausted maximum lookup attempts"
queryIdLookupLoop cacheSnapshot ns maxAttempts targetID = do
let localResult = queryLocalCache ns cacheSnapshot (lNumBestNodes ns) targetID
-- FOUND can only be returned if targetID is owned by local node
case localResult of
FOUND thisNode -> pure thisNode
FORWARD nodeSet -> do
responseEntries <- liftIO $ sendQueryIdMessages targetID ns Nothing (remoteNode <$> Set.elems nodeSet)
now <- liftIO getPOSIXTime
-- check for a FOUND and return it
case responseEntries of
FOUND foundNode -> pure foundNode
-- if not FOUND, insert entries into local cache copy and recurse
FORWARD entrySet ->
let newLCache = foldr' (
addCacheEntryPure now
) cacheSnapshot entrySet
in
queryIdLookupLoop newLCache ns (maxAttempts - 1) targetID
sendQueryIdMessages :: (Integral i)
=> NodeID -- ^ target key ID to look up
-> LocalNodeState s -- ^ node state of the node doing the query
-> Maybe i -- ^ optionally provide an explicit @l@ parameter of number of nodes to be returned
-> [RemoteNodeState] -- ^ nodes to query
-> IO QueryResponse -- ^ accumulated response
sendQueryIdMessages targetID ns lParam targets = do
-- create connected sockets to all query targets and use them for request handling
srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns)
-- ToDo: make attempts and timeout configurable
queryThreads <- mapM (\resultNode -> async $ bracket (mkSendSocket srcAddr (getDomain resultNode) (getDhtPort resultNode)) close (
sendRequestTo (lookupMessage targetID ns Nothing)
)) targets
-- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613
-- ToDo: exception handling, maybe log them
responses <- (mconcat . fmap Set.elems) . rights <$> mapM waitCatch queryThreads
-- insert new cache entries both into global cache as well as return accumulated QueryResponses for further processing
now <- getPOSIXTime
-- collect cache entries from all responses
foldrM (\resp acc -> do
let
responseResult = queryResult <$> payload resp
entrySet = case responseResult of
Just (FOUND result1) -> Set.singleton (RemoteCacheEntry result1 now)
Just (FORWARD resultset) -> resultset
_ -> Set.empty
-- forward entries to global cache
queueAddEntries entrySet ns
-- return accumulated QueryResult
pure $ case acc of
-- once a FOUND as been encountered, return this as a result
FOUND{} -> acc
FORWARD accSet
| maybe False isFound responseResult -> fromJust responseResult
| otherwise -> FORWARD $ entrySet `Set.union` accSet
) (FORWARD Set.empty) responses
where
isFound FOUND{} = True
isFound _ = False
-- | Create a QueryID message to be supplied to 'sendRequestTo'
lookupMessage :: Integral i
=> NodeID -- ^ target ID
-> LocalNodeState s -- ^ sender node state
-> Maybe i -- ^ optionally provide a different l parameter
-> (Integer -> FediChordMessage)
lookupMessage targetID ns lParam = \rID -> Request rID (toRemoteNodeState ns) 1 True QueryID (Just $ pl ns targetID)
where
pl ns' targetID' = QueryIDRequestPayload { queryTargetID = targetID', queryLBestNodes = maybe (fromIntegral $ lNumBestNodes ns) fromIntegral lParam }
-- | Send a stabilise request to provided 'RemoteNode' and, if successful,
-- return parsed neighbour lists
requestStabilise :: LocalNodeState s -- ^ sending node
-> RemoteNodeState -- ^ neighbour node to send to
-> IO (Either String ([RemoteNodeState], [RemoteNodeState])) -- ^ (predecessors, successors) of responding node
requestStabilise ns neighbour = do
srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns)
responses <- bracket (mkSendSocket srcAddr (getDomain neighbour) (getDhtPort neighbour)) close (fmap Right . sendRequestTo (\rid ->
Request {
requestID = rid
, sender = toRemoteNodeState ns
, part = 1
, isFinalPart = False
, action = Stabilise
, payload = Just StabiliseRequestPayload
}
)
) `catch` (\e -> pure . Left $ displayException (e :: IOException))
either
-- forward IO error messages
(pure . Left)
(\respSet -> do
-- fold all reply parts together
let (responsePreds, responseSuccs) = foldr' (\msg (predAcc, succAcc) ->
(maybe predAcc (++ predAcc) (stabilisePredecessors <$> payload msg)
,maybe succAcc (++ succAcc) (stabiliseSuccessors <$> payload msg))
)
([],[]) respSet
-- update successfully responded neighbour in cache
maybe (pure ()) (\p -> queueUpdateVerifieds (Identity $ senderID p) ns) $ headMay (Set.elems respSet)
pure $ if null responsePreds && null responseSuccs
then Left "no neighbours returned"
else Right (responsePreds, responseSuccs)
) responses
-- | Send a Leave request to the specified node.
-- Service data transfer needs to be done separately, as not all neighbours
-- that need to know about the leaving handle the new service data.
requestLeave :: LocalNodeState s
-> Bool -- whether to migrate service data
-> RemoteNodeState -- target node
-> IO (Either String ()) -- error or success
requestLeave ns doMigration target = do
srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns)
let leavePayload = LeaveRequestPayload {
leaveSuccessors = successors ns
, leavePredecessors = predecessors ns
, leaveDoMigration = doMigration
}
responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close (fmap Right . sendRequestTo (\rid ->
Request {
requestID = rid
, sender = toRemoteNodeState ns
, part = 1
, isFinalPart = False
, action = Leave
, payload = Just leavePayload
}
)
) `catch` (\e -> pure . Left $ displayException (e :: IOException))
either
-- forward IO error messages
(pure . Left)
-- empty payload, so no processing required
(const . pure . Right $ ())
responses
requestPing :: LocalNodeState s -- ^ sending node
-> RemoteNodeState -- ^ node to be PINGed
-> IO (Either String [RemoteNodeState]) -- ^ all active vServers of the pinged node
requestPing ns target = do
srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns)
responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close
(\sock -> do
resp <- sendRequestTo (\rid ->
Request {
requestID = rid
, sender = toRemoteNodeState ns
, part = 1
, isFinalPart = False
, action = Ping
, payload = Just PingRequestPayload
}
) sock
(SockAddrInet6 _ _ peerAddr _) <- getPeerName sock
pure $ Right (peerAddr, resp)
) `catch` (\e -> pure . Left $ displayException (e :: IOException))
either
-- forward IO error messages
(pure . Left)
(\(peerAddr, respSet) -> do
-- fold all reply parts together
let responseVss = foldr' (\msg acc ->
maybe acc (foldr' (:) acc) (pingNodeStates <$> payload msg)
)
[] respSet
-- recompute ID for each received node and mark as verified in cache
now <- getPOSIXTime
forM_ responseVss (\vs ->
let recomputedID = genNodeID peerAddr (getDomain vs) (fromInteger $ getVServerID vs)
in if recomputedID == getNid vs
then atomically $ writeTQueue (cacheWriteQueue ns) $ addNodeAsVerifiedPure now vs
else pure ()
)
pure $ if null responseVss
then Left "no active vServer IDs returned, ignoring node"
else Right responseVss
) responses
-- | 'sendRequestToWithParams' with default timeout and retries already specified.
-- Generic function for sending a request over a connected socket and collecting the response.
-- Serialises the message and tries to deliver its parts for a number of attempts within a default timeout.
sendRequestTo :: (Integer -> FediChordMessage) -- ^ the message to be sent, still needing a requestID
-> Socket -- ^ connected socket to use for sending
-> IO (Set.Set FediChordMessage) -- ^ responses
sendRequestTo = sendRequestToWithParams 5000 3
-- | Generic function for sending a request over a connected socket and collecting the response.
-- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout.
sendRequestToWithParams :: Int -- ^ timeout in milliseconds
-> Int -- ^ number of retries
-> (Integer -> FediChordMessage) -- ^ the message to be sent, still needing a requestID
-> Socket -- ^ connected socket to use for sending
-> IO (Set.Set FediChordMessage) -- ^ responses
sendRequestToWithParams timeoutMillis numAttempts msgIncomplete sock = do
-- give the message a random request ID
randomID <- randomRIO (0, 2^32-1)
let
msgComplete = msgIncomplete randomID
requests = serialiseMessage sendMessageSize msgComplete
-- create a queue for passing received response messages back, even after a timeout
responseQ <- newTBQueueIO $ 2*maximumParts -- keep room for duplicate packets
-- start sendAndAck with timeout
attempts numAttempts . timeout (timeoutMillis*1000) $ sendAndAck responseQ sock requests
-- after timeout, check received responses, delete them from unacked message set/ map and rerun senAndAck with that if necessary.
recvdParts <- atomically $ flushTBQueue responseQ
pure $ Set.fromList recvdParts
where
sendAndAck :: TBQueue FediChordMessage -- ^ the queue for putting in the received responses
-> Socket -- ^ the socket used for sending and receiving for this particular remote node
-> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts
-> IO ()
sendAndAck responseQueue sock' remainingSends = do
sendMany sock' $ Map.elems remainingSends
-- if all requests have been acked/ responded to, return prematurely
recvLoop sock' responseQueue remainingSends Set.empty Nothing
recvLoop :: Socket
-> TBQueue FediChordMessage -- ^ the queue for putting in the received responses
-> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts
-> Set.Set Integer -- ^ already received response part numbers
-> Maybe Integer -- ^ total number of response parts if already known
-> IO ()
recvLoop sock' responseQueue remainingSends' receivedPartNums totalParts = do
-- 65535 is maximum length of UDP packets, as long as
-- no IPv6 jumbograms are used
response <- deserialiseMessage <$> recv sock' 65535
case response of
Right msg@Response{} -> do
atomically $ writeTBQueue responseQueue msg
let
newTotalParts = if isFinalPart msg then Just (part msg) else totalParts
newRemaining = Map.delete (part msg) remainingSends'
newReceivedParts = Set.insert (part msg) receivedPartNums
if Map.null newRemaining && maybe False (\p -> Set.size newReceivedParts == fromIntegral p) newTotalParts
then pure ()
else recvLoop sock' responseQueue newRemaining newReceivedParts newTotalParts
-- drop errors and invalid messages
Right Request{} -> pure () -- expecting a response, not a request
Left _ -> recvLoop sock' responseQueue remainingSends' receivedPartNums totalParts
-- | enqueue a list of RemoteCacheEntries to be added to the global NodeCache
queueAddEntries :: Foldable c => c RemoteCacheEntry
-> LocalNodeState s
-> IO ()
queueAddEntries entries ns = do
now <- getPOSIXTime
forM_ entries $ \entry -> atomically $ writeTQueue (cacheWriteQueue ns) $ addCacheEntryPure now entry
-- | enque a list of node IDs to be deleted from the global NodeCache
queueDeleteEntries :: Foldable c
=> c NodeID
-> LocalNodeState s
-> IO ()
queueDeleteEntries ids ns = forM_ ids $ atomically . writeTQueue (cacheWriteQueue ns) . deleteCacheEntry
-- | enque a single node ID to be deleted from the global NodeCache
queueDeleteEntry :: NodeID
-> LocalNodeState s
-> IO ()
queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete
-- | enqueue the timestamp update and verification marking of an entry in the
-- global 'NodeCache'.
queueUpdateVerifieds :: Foldable c
=> c NodeID
-> LocalNodeState s
-> IO ()
queueUpdateVerifieds nIds ns = do
now <- getPOSIXTime
forM_ nIds $ \nid' -> atomically $ writeTQueue (cacheWriteQueue ns) $
markCacheEntryAsVerified (Just now) nid'
-- | retry an IO action at most *i* times until it delivers a result
attempts :: Int -- ^ number of retries *i*
-> IO (Maybe a) -- ^ action to retry
-> IO (Maybe a) -- ^ result after at most *i* retries
attempts 0 _ = pure Nothing
attempts i action = do
actionResult <- action
case actionResult of
Nothing -> attempts (i-1) action
Just res -> pure $ Just res
-- ====== network socket operations ======
-- | resolve a specified host and return the 'AddrInfo' for it.
-- If no hostname or IP is specified, the 'AddrInfo' can be used to bind to all
-- addresses;
-- if no port is specified an arbitrary free port is selected.
resolve :: Maybe String -- ^ hostname or IP address to be resolved
-> Maybe PortNumber -- ^ port number of either local bind or remote
-> IO AddrInfo
resolve host port = let
hints = defaultHints { addrFamily = AF_INET6, addrSocketType = Datagram
, addrFlags = [AI_PASSIVE] }
in
head <$> getAddrInfo (Just hints) host (show <$> port)
-- | create an unconnected UDP Datagram 'Socket' bound to the specified address
mkServerSocket :: HostAddress6 -> PortNumber -> IO Socket
mkServerSocket ip port = do
sockAddr <- addrAddress <$> resolve (Just $ show . fromHostAddress6 $ ip) (Just port)
sock <- socket AF_INET6 Datagram defaultProtocol
setSocketOption sock IPv6Only 1
bind sock sockAddr
pure sock
-- | create a UDP datagram socket, connected to a destination.
-- The socket gets an arbitrary free local port assigned.
mkSendSocket :: HostAddress6 -- ^ source address
-> String -- ^ destination hostname or IP
-> PortNumber -- ^ destination port
-> IO Socket -- ^ a socket with an arbitrary source port
mkSendSocket srcIp dest destPort = do
srcAddr <- addrAddress <$> resolve (Just $ show . fromHostAddress6 $ srcIp) Nothing
destAddr <- addrAddress <$> resolve (Just dest) (Just destPort)
sendSock <- socket AF_INET6 Datagram defaultProtocol
setSocketOption sendSock IPv6Only 1
-- bind to the configured local IP to make sure that outgoing packets are sent from
-- this source address
bind sendSock srcAddr
connect sendSock destAddr
pure sendSock