870 lines
44 KiB
Haskell
870 lines
44 KiB
Haskell
{-# LANGUAGE FlexibleContexts #-}
|
|
|
|
module Hash2Pub.DHTProtocol
|
|
( QueryResponse (..)
|
|
, queryLocalCache
|
|
, addCacheEntry
|
|
, addCacheEntryPure
|
|
, addNodeAsVerified
|
|
, addNodeAsVerifiedPure
|
|
, deleteCacheEntry
|
|
, deserialiseMessage
|
|
, RemoteCacheEntry(..)
|
|
, toRemoteCacheEntry
|
|
, remoteNode
|
|
, Action(..)
|
|
, ActionPayload(..)
|
|
, FediChordMessage(..)
|
|
, maximumParts
|
|
, sendQueryIdMessages
|
|
, requestQueryID
|
|
, requestJoin
|
|
, requestLeave
|
|
, requestPing
|
|
, requestStabilise
|
|
, lookupMessage
|
|
, sendRequestTo
|
|
, queryIdLookupLoop
|
|
, queueAddEntries
|
|
, queueDeleteEntries
|
|
, queueDeleteEntry
|
|
, resolve
|
|
, mkSendSocket
|
|
, mkServerSocket
|
|
, handleIncomingRequest
|
|
, ackRequest
|
|
, isPossibleSuccessor
|
|
, isPossiblePredecessor
|
|
, isInOwnResponsibilitySlice
|
|
, isJoined
|
|
, closestCachePredecessors
|
|
)
|
|
where
|
|
|
|
import Control.Concurrent
|
|
import Control.Concurrent.Async
|
|
import Control.Concurrent.STM
|
|
import Control.Concurrent.STM.TBQueue
|
|
import Control.Concurrent.STM.TQueue
|
|
import Control.Concurrent.STM.TVar
|
|
import Control.Exception
|
|
import Control.Monad (foldM, forM, forM_, void, when)
|
|
import qualified Data.ByteString as BS
|
|
import Data.Either (rights)
|
|
import Data.Foldable (foldl', foldr')
|
|
import Data.Functor.Identity
|
|
import Data.IP (IPv6, fromHostAddress6,
|
|
toHostAddress6)
|
|
import Data.List (delete, nub, sortBy)
|
|
import qualified Data.Map as Map
|
|
import Data.Maybe (fromJust, fromMaybe, isJust,
|
|
isNothing, mapMaybe, maybe)
|
|
import qualified Data.Set as Set
|
|
import Data.Time.Clock.POSIX
|
|
import Network.Socket hiding (recv, recvFrom, send,
|
|
sendTo)
|
|
import Network.Socket.ByteString
|
|
import Safe
|
|
import System.Random
|
|
import System.Timeout
|
|
|
|
import Hash2Pub.ASN1Coding
|
|
import Hash2Pub.FediChordTypes (CacheEntry (..),
|
|
CacheEntry (..),
|
|
FediChordConf (..),
|
|
HasKeyID (..),
|
|
LocalNodeState (..),
|
|
LocalNodeStateSTM, NodeCache,
|
|
NodeID, NodeState (..),
|
|
RealNode (..), RealNodeSTM,
|
|
RemoteNodeState (..),
|
|
RingEntry (..), RingMap (..),
|
|
Service (..), addRMapEntry,
|
|
addRMapEntryWith,
|
|
cacheGetNodeStateUnvalidated,
|
|
cacheLookup, cacheLookupPred,
|
|
cacheLookupSucc, genNodeID,
|
|
getKeyID, localCompare,
|
|
rMapFromList, rMapLookupPred,
|
|
rMapLookupSucc,
|
|
setPredecessors, setSuccessors)
|
|
import Hash2Pub.ProtocolTypes
|
|
|
|
import Debug.Trace (trace)
|
|
|
|
-- === queries ===
|
|
|
|
-- TODO: evaluate more fine-grained argument passing to allow granular locking
|
|
-- | look up an ID to either claim responsibility for it or return the closest l nodes from the local cache
|
|
queryLocalCache :: LocalNodeState s -> NodeCache -> Int -> NodeID -> QueryResponse
|
|
queryLocalCache ownState nCache lBestNodes targetID
|
|
-- as target ID falls between own ID and first predecessor, it is handled by this node
|
|
-- This only makes sense if the node is part of the DHT by having joined.
|
|
-- A default answer to nodes querying an unjoined node is provided by 'respondQueryID'.
|
|
| isJoined ownState && targetID `isInOwnResponsibilitySlice` ownState = FOUND . toRemoteNodeState $ ownState
|
|
-- my interpretation: the "l best next hops" are the l-1 closest preceding nodes and
|
|
-- the closest succeeding node (like with the p initiated parallel queries
|
|
| otherwise = FORWARD $ closestSuccessor `Set.union` closestCachePredecessors (lBestNodes-1) targetID nCache
|
|
where
|
|
ownID = getNid ownState
|
|
preds = predecessors ownState
|
|
|
|
closestSuccessor :: Set.Set RemoteCacheEntry
|
|
closestSuccessor = maybe Set.empty (Set.singleton . toRemoteCacheEntry) $ cacheLookupSucc targetID nCache
|
|
|
|
|
|
-- | look up the 3 direct predecessor cache entries of a given ID
|
|
closestCachePredecessors :: (Integral n)
|
|
=> n -- ^ number of entries to look up
|
|
-> NodeID -- ^ target ID to get the predecessors of
|
|
-> NodeCache -- ^ cache to use for lookup
|
|
-> Set.Set RemoteCacheEntry
|
|
closestCachePredecessors 0 _ _ = Set.empty
|
|
closestCachePredecessors remainingLookups lastID nCache
|
|
| remainingLookups < 0 = Set.empty
|
|
| otherwise =
|
|
let result = cacheLookupPred lastID nCache
|
|
in
|
|
case toRemoteCacheEntry <$> result of
|
|
Nothing -> Set.empty
|
|
Just nPred@(RemoteCacheEntry ns ts) -> Set.insert nPred $ closestCachePredecessors (remainingLookups-1) (nid ns) nCache
|
|
|
|
-- | Determines whether a lookup key is within the responsibility slice of a node,
|
|
-- as it falls between its first predecessor and the node itself.
|
|
-- Looks up the successor of the lookup key on a 'RingMap' representation of the
|
|
-- predecessor list with the node itself added. If the result is the same as the node
|
|
-- itself then it falls into the responsibility interval.
|
|
isInOwnResponsibilitySlice :: HasKeyID NodeID a => a -> LocalNodeState s -> Bool
|
|
isInOwnResponsibilitySlice lookupTarget ownNs = (fst <$> rMapLookupSucc (getKeyID lookupTarget :: NodeID) predecessorRMap) == pure (getNid ownNs)
|
|
where
|
|
predecessorList = predecessors ownNs
|
|
-- add node itself to RingMap representation, to distinguish between
|
|
-- responsibility of own node and predecessor
|
|
predecessorRMap = addRMapEntry (getKeyID ownRemote) ownRemote $ rMapFromList (keyValuePair <$> predecessorList) :: RingMap NodeID RemoteNodeState
|
|
ownRemote = toRemoteNodeState ownNs
|
|
closestPredecessor = headMay predecessorList
|
|
|
|
isPossiblePredecessor :: HasKeyID NodeID a => a -> LocalNodeState s -> Bool
|
|
isPossiblePredecessor = isInOwnResponsibilitySlice
|
|
|
|
isPossibleSuccessor :: HasKeyID NodeID a => a -> LocalNodeState s -> Bool
|
|
isPossibleSuccessor lookupTarget ownNs = (fst <$> rMapLookupPred (getKeyID lookupTarget :: NodeID) successorRMap) == pure (getNid ownNs)
|
|
where
|
|
successorList = successors ownNs
|
|
successorRMap = addRMapEntry (getKeyID ownRemote) ownRemote $ rMapFromList (keyValuePair <$> successorList)
|
|
ownRemote = toRemoteNodeState ownNs
|
|
closestSuccessor = headMay successorList
|
|
|
|
-- cache operations
|
|
|
|
-- | update or insert a 'RemoteCacheEntry' into the cache,
|
|
-- converting it to a local 'CacheEntry'
|
|
addCacheEntry :: RemoteCacheEntry -- ^ a remote cache entry received from network
|
|
-> NodeCache -- ^ node cache to insert to
|
|
-> IO NodeCache -- ^ new node cache with the element inserted
|
|
addCacheEntry entry cache = do
|
|
now <- getPOSIXTime
|
|
pure $ addCacheEntryPure now entry cache
|
|
|
|
-- | pure version of 'addCacheEntry' with current time explicitly specified as argument
|
|
addCacheEntryPure :: POSIXTime -- ^ current time
|
|
-> RemoteCacheEntry -- ^ a remote cache entry received from network
|
|
-> NodeCache -- ^ node cache to insert to
|
|
-> NodeCache -- ^ new node cache with the element inserted
|
|
addCacheEntryPure now (RemoteCacheEntry ns ts) cache =
|
|
let
|
|
-- TODO: limit diffSeconds to some maximum value to prevent malicious nodes from inserting entries valid nearly until eternity
|
|
timestamp' = if ts <= now then ts else now
|
|
newEntry = CacheEntry False ns timestamp'
|
|
newCache = addRMapEntryWith insertCombineFunction (getKeyID newEntry) newEntry cache
|
|
insertCombineFunction newVal@(KeyEntry (CacheEntry newValidationState newNode newTimestamp)) oldVal =
|
|
case oldVal of
|
|
ProxyEntry n _ -> ProxyEntry n (Just newVal)
|
|
KeyEntry (CacheEntry oldValidationState _ oldTimestamp) -> KeyEntry (CacheEntry oldValidationState newNode (max oldTimestamp newTimestamp))
|
|
in
|
|
newCache
|
|
|
|
-- | delete the node with given ID from cache
|
|
deleteCacheEntry :: NodeID -- ^ID of the node to be deleted
|
|
-> NodeCache -- ^cache to delete from
|
|
-> NodeCache -- ^cache without the specified element
|
|
deleteCacheEntry nid = RingMap . Map.update modifier nid . getRingMap
|
|
where
|
|
modifier (ProxyEntry idPointer _) = Just (ProxyEntry idPointer Nothing)
|
|
modifier KeyEntry {} = Nothing
|
|
|
|
|
|
-- | Add a 'RemoteNodeState' to the node cache marked as verified.
|
|
-- If an entry already exists, it is replaced by the new verified one.
|
|
addNodeAsVerified :: RemoteNodeState
|
|
-> NodeCache
|
|
-> IO NodeCache
|
|
addNodeAsVerified node cache = do
|
|
now <- getPOSIXTime
|
|
pure $ addNodeAsVerifiedPure now node cache
|
|
|
|
|
|
-- | Pure variant of 'addNodeAsVerified' with current time explicitly specified as an argument
|
|
addNodeAsVerifiedPure :: POSIXTime
|
|
-> RemoteNodeState
|
|
-> NodeCache
|
|
-> NodeCache
|
|
addNodeAsVerifiedPure now node = addRMapEntry (getKeyID node) (CacheEntry True node now)
|
|
|
|
|
|
|
|
-- | Mark a cache entry as verified after pinging it, possibly bumping its timestamp.
|
|
markCacheEntryAsVerified :: Maybe POSIXTime -- ^ the (current) timestamp to be
|
|
-- given to the entry, or Nothing
|
|
-> NodeID -- ^ which node to mark
|
|
-> NodeCache -- ^ current node cache
|
|
-> NodeCache -- ^ new NodeCache with the updated entry
|
|
markCacheEntryAsVerified timestamp nid = RingMap . Map.adjust adjustFunc nid . getRingMap
|
|
where
|
|
adjustFunc (KeyEntry (CacheEntry _ ns ts)) = KeyEntry (CacheEntry True ns $ fromMaybe ts timestamp)
|
|
adjustFunc (ProxyEntry _ (Just entry)) = adjustFunc entry
|
|
adjustFunc entry = entry
|
|
|
|
|
|
-- | uses the successor and predecessor list of a node as an indicator for whether a
|
|
-- node has properly joined the DHT
|
|
isJoined :: LocalNodeState s -> Bool
|
|
isJoined ns = not . all null $ [successors ns, predecessors ns]
|
|
|
|
-- | the size limit to be used when serialising messages for sending
|
|
sendMessageSize :: Num i => i
|
|
sendMessageSize = 1200
|
|
|
|
-- ====== message send and receive operations ======
|
|
|
|
-- encode the response to a request that just signals successful receipt
|
|
ackRequest :: NodeID -> FediChordMessage -> Map.Map Integer BS.ByteString
|
|
ackRequest ownID req@Request{} = serialiseMessage sendMessageSize $ Response {
|
|
requestID = requestID req
|
|
, senderID = ownID
|
|
, part = part req
|
|
, isFinalPart = False
|
|
, action = action req
|
|
, payload = Nothing
|
|
}
|
|
ackRequest _ _ = Map.empty
|
|
|
|
|
|
-- | Dispatch incoming requests to the dedicated handling and response function, and enqueue
|
|
-- the response to be sent.
|
|
handleIncomingRequest :: Service s (RealNodeSTM s)
|
|
=> LocalNodeStateSTM s -- ^ the handling node
|
|
-> TQueue (BS.ByteString, SockAddr) -- ^ send queue
|
|
-> Set.Set FediChordMessage -- ^ all parts of the request to handle
|
|
-> SockAddr -- ^ source address of the request
|
|
-> IO ()
|
|
handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do
|
|
ns <- readTVarIO nsSTM
|
|
-- add nodestate to cache
|
|
now <- getPOSIXTime
|
|
case headMay . Set.elems $ msgSet of
|
|
Nothing -> pure ()
|
|
Just aPart -> do
|
|
let (SockAddrInet6 _ _ sourceIP _) = sourceAddr
|
|
queueAddEntries (Identity $ RemoteCacheEntry (sender aPart) now) ns
|
|
-- distinguish on whether and how to respond. If responding, pass message to response generating function and write responses to send queue
|
|
maybe (pure ()) (
|
|
mapM_ (\resp -> atomically $ writeTQueue sendQ (resp, sourceAddr))
|
|
)
|
|
=<< (case action aPart of
|
|
Ping -> Just <$> respondPing nsSTM msgSet
|
|
Join -> dropSpoofedIDs sourceIP nsSTM msgSet respondJoin
|
|
-- ToDo: figure out what happens if not joined
|
|
QueryID -> Just <$> respondQueryID nsSTM msgSet
|
|
-- only when joined
|
|
Leave -> if isJoined ns then dropSpoofedIDs sourceIP nsSTM msgSet respondLeave else pure Nothing
|
|
Stabilise -> if isJoined ns then dropSpoofedIDs sourceIP nsSTM msgSet respondStabilise else pure Nothing
|
|
)
|
|
-- for single part request, response starts with part number 1. For multipart requests, response starts with part number n+1.
|
|
|
|
-- TODO: determine request type only from first part, but catch RecSelError on each record access when folding, because otherwise different request type parts can make this crash
|
|
-- TODO: test case: mixed message types of parts
|
|
where
|
|
-- | Filter out requests with spoofed node IDs by recomputing the ID using
|
|
-- the sender IP.
|
|
-- For valid (non-spoofed) sender IDs, the passed responder function is invoked.
|
|
dropSpoofedIDs :: HostAddress6 -- msg source address
|
|
-> LocalNodeStateSTM s
|
|
-> Set.Set FediChordMessage -- message parts of the request
|
|
-> (LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)) -- reponder function to be invoked for valid requests
|
|
-> IO (Maybe (Map.Map Integer BS.ByteString))
|
|
dropSpoofedIDs addr nsSTM' msgSet' responder =
|
|
let
|
|
aRequestPart = Set.elemAt 0 msgSet
|
|
senderNs = sender aRequestPart
|
|
givenSenderID = getNid senderNs
|
|
recomputedID = genNodeID addr (getDomain senderNs) (fromInteger $ getVServerID senderNs)
|
|
in
|
|
if recomputedID == givenSenderID
|
|
then Just <$> responder nsSTM' msgSet'
|
|
else pure Nothing
|
|
|
|
|
|
-- ....... response sending .......
|
|
|
|
-- TODO: could all these respond* functions be in STM instead of IO?
|
|
|
|
|
|
-- | execute a key ID lookup on local cache and respond with the result
|
|
respondQueryID :: LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
|
|
respondQueryID nsSTM msgSet = do
|
|
-- this message cannot be split reasonably, so just
|
|
-- consider the first payload
|
|
let
|
|
aRequestPart = Set.elemAt 0 msgSet
|
|
senderID = getNid . sender $ aRequestPart
|
|
senderPayload = foldr' (\msg plAcc ->
|
|
if isNothing plAcc && isJust (payload msg)
|
|
then payload msg
|
|
else plAcc
|
|
) Nothing msgSet
|
|
-- return only empty message serialisation if no payload was included in parts
|
|
maybe (pure Map.empty) (\senderPayload' -> do
|
|
responseMsg <- atomically $ do
|
|
nsSnap <- readTVar nsSTM
|
|
cache <- readTVar $ nodeCacheSTM nsSnap
|
|
let
|
|
responsePayload = QueryIDResponsePayload {
|
|
queryResult = if isJoined nsSnap
|
|
then queryLocalCache nsSnap cache (fromIntegral $ queryLBestNodes senderPayload') (queryTargetID senderPayload')
|
|
-- if not joined yet, attract responsibility for
|
|
-- all keys to make bootstrapping possible
|
|
else FOUND (toRemoteNodeState nsSnap)
|
|
}
|
|
queryResponseMsg = Response {
|
|
requestID = requestID aRequestPart
|
|
, senderID = getNid nsSnap
|
|
, part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1
|
|
, isFinalPart = False
|
|
, action = QueryID
|
|
, payload = Just responsePayload
|
|
}
|
|
pure queryResponseMsg
|
|
pure $ serialiseMessage sendMessageSize responseMsg
|
|
) senderPayload
|
|
|
|
-- | Respond to a Leave request by removing the leaving node from local data structures
|
|
-- and confirming with response.
|
|
respondLeave :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
|
|
respondLeave nsSTM msgSet = do
|
|
-- combine payload of all parts
|
|
let (requestPreds, requestSuccs) = foldr' (\msg (predAcc, succAcc) ->
|
|
(maybe predAcc (++ predAcc) (leavePredecessors <$> payload msg)
|
|
,maybe succAcc (++ succAcc) (leaveSuccessors <$> payload msg))
|
|
)
|
|
([],[]) msgSet
|
|
aRequestPart = Set.elemAt 0 msgSet
|
|
leaveSenderID = getNid . sender $ aRequestPart
|
|
responseMsg <- atomically $ do
|
|
nsSnap <- readTVar nsSTM
|
|
-- remove leaving node from successors, predecessors and NodeCache
|
|
writeTQueue (cacheWriteQueue nsSnap) $ deleteCacheEntry leaveSenderID
|
|
writeTVar nsSTM $
|
|
-- add predecessors and successors of leaving node to own lists
|
|
setPredecessors (filter ((/=) leaveSenderID . getNid) $ requestPreds <> predecessors nsSnap)
|
|
. setSuccessors (filter ((/=) leaveSenderID . getNid) $ requestSuccs <> successors nsSnap) $ nsSnap
|
|
let leaveResponse = Response {
|
|
requestID = requestID aRequestPart
|
|
, senderID = getNid nsSnap
|
|
, part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1
|
|
, isFinalPart = False
|
|
, action = Leave
|
|
, payload = Just LeaveResponsePayload
|
|
}
|
|
pure leaveResponse
|
|
-- if awaiting an incoming service data migration, collect the lock without blocking this thread
|
|
when (maybe False leaveDoMigration (payload aRequestPart)) $ do
|
|
ownService <- atomically $ nodeService <$> (readTVar nsSTM >>= (readTVar . parentRealNode))
|
|
void (forkIO $ waitForMigrationFrom ownService leaveSenderID)
|
|
pure $ serialiseMessage sendMessageSize responseMsg
|
|
|
|
-- | respond to stabilise requests by returning successor and predecessor list
|
|
respondStabilise :: LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
|
|
respondStabilise nsSTM msgSet = do
|
|
nsSnap <- readTVarIO nsSTM
|
|
let
|
|
aRequestPart = Set.elemAt 0 msgSet
|
|
responsePayload = StabiliseResponsePayload {
|
|
stabiliseSuccessors = successors nsSnap
|
|
, stabilisePredecessors = predecessors nsSnap
|
|
}
|
|
stabiliseResponse = Response {
|
|
requestID = requestID aRequestPart
|
|
, senderID = getNid nsSnap
|
|
, part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1
|
|
, isFinalPart = False
|
|
, action = Stabilise
|
|
, payload = Just responsePayload
|
|
}
|
|
-- TODO: return service endpoint for copying over key data
|
|
pure $ serialiseMessage sendMessageSize stabiliseResponse
|
|
|
|
|
|
-- | respond to Ping request by returning all active vserver NodeStates
|
|
respondPing :: LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
|
|
respondPing nsSTM msgSet = do
|
|
-- TODO: respond with all active VS when implementing k-choices
|
|
nsSnap <- readTVarIO nsSTM
|
|
let
|
|
aRequestPart = Set.elemAt 0 msgSet
|
|
responsePayload = PingResponsePayload { pingNodeStates = [ toRemoteNodeState nsSnap ] }
|
|
pingResponse = Response {
|
|
requestID = requestID aRequestPart
|
|
, senderID = getNid nsSnap
|
|
, part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1
|
|
, isFinalPart = False
|
|
, action = Ping
|
|
, payload = Just responsePayload
|
|
}
|
|
pure $ serialiseMessage sendMessageSize pingResponse
|
|
|
|
|
|
respondJoin :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
|
|
respondJoin nsSTM msgSet = do
|
|
-- atomically read and modify the node state according to the parsed request
|
|
(dataMigration, responseMsg) <- atomically $ do
|
|
nsSnap <- readTVar nsSTM
|
|
cache <- readTVar $ nodeCacheSTM nsSnap
|
|
let
|
|
aRequestPart = Set.elemAt 0 msgSet
|
|
senderNS = sender aRequestPart
|
|
responsibilityLookup = queryLocalCache nsSnap cache 1 (getNid senderNS)
|
|
thisNodeResponsible (FOUND _) = True
|
|
thisNodeResponsible (FORWARD _) = False
|
|
-- check whether the joining node falls into our responsibility
|
|
if thisNodeResponsible responsibilityLookup
|
|
then do
|
|
-- if yes, adjust own predecessors/ successors and return those in a response
|
|
let
|
|
newPreds = senderNS:predecessors nsSnap
|
|
joinedNS = setPredecessors newPreds nsSnap
|
|
responsePayload = JoinResponsePayload {
|
|
joinSuccessors = successors joinedNS
|
|
, joinPredecessors = predecessors joinedNS
|
|
, joinCache = toRemoteCache cache
|
|
}
|
|
joinResponse = Response {
|
|
requestID = requestID aRequestPart
|
|
, senderID = getNid joinedNS
|
|
, part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1
|
|
, isFinalPart = False
|
|
, action = Join
|
|
, payload = Just responsePayload
|
|
}
|
|
writeTVar nsSTM joinedNS
|
|
ownService <- nodeService <$> readTVar (parentRealNode nsSnap)
|
|
let
|
|
serviceDataMigrator = migrateData ownService (getNid nsSnap) lowerKeyBound (getNid senderNS) (getDomain senderNS, fromIntegral $ getServicePort senderNS)
|
|
lowerKeyBound = maybe (getNid nsSnap) getNid $ headMay (predecessors nsSnap)
|
|
pure (Just serviceDataMigrator, joinResponse)
|
|
-- otherwise respond with empty payload
|
|
else pure (Nothing, Response {
|
|
requestID = requestID aRequestPart
|
|
, senderID = getNid nsSnap
|
|
, part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1
|
|
, isFinalPart = False
|
|
, action = Join
|
|
, payload = Nothing
|
|
})
|
|
|
|
-- as DHT response is required immediately, fork the service data migration push
|
|
-- into a new thread. That's kind of ugly but the best I can think of so far
|
|
when (isJust dataMigration) (forkIO (fromJust dataMigration >> pure ()) >> pure ())
|
|
pure $ serialiseMessage sendMessageSize responseMsg
|
|
-- TODO: notify service layer to copy over data now handled by the new joined node
|
|
|
|
-- ....... request sending .......
|
|
|
|
-- | send a join request and return the joined 'LocalNodeState' including neighbours
|
|
requestJoin :: (NodeState a, Service s (RealNodeSTM s)) => a -- ^ currently responsible node to be contacted
|
|
-> LocalNodeStateSTM s -- ^ joining NodeState
|
|
-> IO (Either String (LocalNodeStateSTM s)) -- ^ node after join with all its new information
|
|
requestJoin toJoinOn ownStateSTM = do
|
|
ownState <- readTVarIO ownStateSTM
|
|
prn <- readTVarIO $ parentRealNode ownState
|
|
srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ownState)
|
|
bracket (mkSendSocket srcAddr (getDomain toJoinOn) (getDhtPort toJoinOn)) close (\sock -> do
|
|
-- extract own state for getting request information
|
|
responses <- sendRequestTo (\rid -> Request rid (toRemoteNodeState ownState) 1 True Join (Just JoinRequestPayload)) sock
|
|
(cacheInsertQ, joinedState) <- atomically $ do
|
|
stateSnap <- readTVar ownStateSTM
|
|
let
|
|
(cacheInsertQ, predAccSet, succAccSet) = foldl'
|
|
(\(insertQ, predAccSet', succAccSet') msg ->
|
|
let
|
|
insertQ' = maybe insertQ (\msgPl ->
|
|
-- collect list of insertion statements into global cache
|
|
queueAddEntries (joinCache msgPl) : insertQ
|
|
) $ payload msg
|
|
-- collect received predecessors and successors
|
|
predAccSet'' = maybe predAccSet' (
|
|
foldr' Set.insert predAccSet' . joinPredecessors
|
|
) $ payload msg
|
|
succAccSet'' = maybe succAccSet' (
|
|
foldr' Set.insert succAccSet' . joinSuccessors
|
|
) $ payload msg
|
|
in
|
|
(insertQ', predAccSet'', succAccSet'')
|
|
)
|
|
-- reset predecessors and successors
|
|
([], Set.empty, Set.empty)
|
|
responses
|
|
-- sort, slice and set the accumulated successors and predecessors
|
|
newState = setSuccessors (Set.elems succAccSet) . setPredecessors (Set.elems predAccSet) $ stateSnap
|
|
writeTVar ownStateSTM newState
|
|
pure (cacheInsertQ, newState)
|
|
-- execute the cache insertions
|
|
mapM_ (\f -> f joinedState) cacheInsertQ
|
|
if responses == Set.empty
|
|
then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn)
|
|
else if null (predecessors joinedState) && null (successors joinedState)
|
|
then pure $ Left "join error: no predecessors or successors"
|
|
-- successful join
|
|
else do
|
|
-- wait for migration data to be completely received
|
|
waitForMigrationFrom (nodeService prn) (getNid ownState)
|
|
pure $ Right ownStateSTM
|
|
)
|
|
`catch` (\e -> pure . Left $ displayException (e :: IOException))
|
|
|
|
|
|
-- | Send a 'QueryID' 'Request' for getting the node that handles a certain key ID.
|
|
requestQueryID :: LocalNodeState s -- ^ NodeState of the querying node
|
|
-> NodeID -- ^ target key ID to look up
|
|
-> IO RemoteNodeState -- ^ the node responsible for handling that key
|
|
-- 1. do a local lookup for the l closest nodes
|
|
-- 2. create l sockets
|
|
-- 3. send a message async concurrently to all l nodes
|
|
-- 4. collect the results, insert them into cache
|
|
-- 5. repeat until FOUND (problem: new entries not necessarily already in cache, explicitly compare with closer results)
|
|
-- TODO: deal with lookup failures
|
|
requestQueryID ns targetID = do
|
|
firstCacheSnapshot <- readTVarIO . nodeCacheSTM $ ns
|
|
-- TODO: make maxAttempts configurable
|
|
queryIdLookupLoop firstCacheSnapshot ns 50 targetID
|
|
|
|
-- | like 'requestQueryID, but allows passing of a custom cache, e.g. for joining
|
|
queryIdLookupLoop :: NodeCache -> LocalNodeState s -> Int -> NodeID -> IO RemoteNodeState
|
|
-- return node itself as default fallback value against infinite recursion.
|
|
-- TODO: consider using an Either instead of a default value
|
|
queryIdLookupLoop _ ns 0 _ = pure $ toRemoteNodeState ns
|
|
queryIdLookupLoop cacheSnapshot ns maxAttempts targetID = do
|
|
let localResult = queryLocalCache ns cacheSnapshot (lNumBestNodes ns) targetID
|
|
-- FOUND can only be returned if targetID is owned by local node
|
|
case localResult of
|
|
FOUND thisNode -> pure thisNode
|
|
FORWARD nodeSet -> do
|
|
responseEntries <- sendQueryIdMessages targetID ns Nothing (remoteNode <$> Set.elems nodeSet)
|
|
now <- getPOSIXTime
|
|
-- check for a FOUND and return it
|
|
case responseEntries of
|
|
FOUND foundNode -> pure foundNode
|
|
-- if not FOUND, insert entries into local cache copy and recurse
|
|
FORWARD entrySet ->
|
|
let newLCache = foldr' (
|
|
addCacheEntryPure now
|
|
) cacheSnapshot entrySet
|
|
in
|
|
queryIdLookupLoop newLCache ns (maxAttempts - 1) targetID
|
|
|
|
|
|
sendQueryIdMessages :: (Integral i)
|
|
=> NodeID -- ^ target key ID to look up
|
|
-> LocalNodeState s -- ^ node state of the node doing the query
|
|
-> Maybe i -- ^ optionally provide an explicit @l@ parameter of number of nodes to be returned
|
|
-> [RemoteNodeState] -- ^ nodes to query
|
|
-> IO QueryResponse -- ^ accumulated response
|
|
sendQueryIdMessages targetID ns lParam targets = do
|
|
|
|
-- create connected sockets to all query targets and use them for request handling
|
|
|
|
srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns)
|
|
-- ToDo: make attempts and timeout configurable
|
|
queryThreads <- mapM (\resultNode -> async $ bracket (mkSendSocket srcAddr (getDomain resultNode) (getDhtPort resultNode)) close (
|
|
sendRequestTo (lookupMessage targetID ns Nothing)
|
|
)) targets
|
|
-- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613
|
|
-- ToDo: exception handling, maybe log them
|
|
responses <- (mconcat . fmap Set.elems) . rights <$> mapM waitCatch queryThreads
|
|
-- insert new cache entries both into global cache as well as return accumulated QueryResponses for further processing
|
|
now <- getPOSIXTime
|
|
-- collect cache entries from all responses
|
|
foldM (\acc resp -> do
|
|
let entrySet = case queryResult <$> payload resp of
|
|
Just (FOUND result1) -> Set.singleton (RemoteCacheEntry result1 now)
|
|
Just (FORWARD resultset) -> resultset
|
|
_ -> Set.empty
|
|
|
|
-- forward entries to global cache
|
|
queueAddEntries entrySet ns
|
|
-- return accumulated QueryResult
|
|
pure $ case acc of
|
|
-- once a FOUND as been encountered, return this as a result
|
|
isFound@FOUND{} -> isFound
|
|
FORWARD accSet -> FORWARD $ entrySet `Set.union` accSet
|
|
|
|
) (FORWARD Set.empty) responses
|
|
|
|
-- | Create a QueryID message to be supplied to 'sendRequestTo'
|
|
lookupMessage :: Integral i
|
|
=> NodeID -- ^ target ID
|
|
-> LocalNodeState s -- ^ sender node state
|
|
-> Maybe i -- ^ optionally provide a different l parameter
|
|
-> (Integer -> FediChordMessage)
|
|
lookupMessage targetID ns lParam = \rID -> Request rID (toRemoteNodeState ns) 1 True QueryID (Just $ pl ns targetID)
|
|
where
|
|
pl ns' targetID' = QueryIDRequestPayload { queryTargetID = targetID', queryLBestNodes = maybe (fromIntegral $ lNumBestNodes ns) fromIntegral lParam }
|
|
|
|
|
|
-- | Send a stabilise request to provided 'RemoteNode' and, if successful,
|
|
-- return parsed neighbour lists
|
|
requestStabilise :: LocalNodeState s -- ^ sending node
|
|
-> RemoteNodeState -- ^ neighbour node to send to
|
|
-> IO (Either String ([RemoteNodeState], [RemoteNodeState])) -- ^ (predecessors, successors) of responding node
|
|
requestStabilise ns neighbour = do
|
|
srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns)
|
|
responses <- bracket (mkSendSocket srcAddr (getDomain neighbour) (getDhtPort neighbour)) close (fmap Right . sendRequestTo (\rid ->
|
|
Request {
|
|
requestID = rid
|
|
, sender = toRemoteNodeState ns
|
|
, part = 1
|
|
, isFinalPart = False
|
|
, action = Stabilise
|
|
, payload = Just StabiliseRequestPayload
|
|
}
|
|
)
|
|
) `catch` (\e -> pure . Left $ displayException (e :: IOException))
|
|
either
|
|
-- forward IO error messages
|
|
(pure . Left)
|
|
(\respSet -> do
|
|
-- fold all reply parts together
|
|
let (responsePreds, responseSuccs) = foldr' (\msg (predAcc, succAcc) ->
|
|
(maybe predAcc (++ predAcc) (stabilisePredecessors <$> payload msg)
|
|
,maybe succAcc (++ succAcc) (stabiliseSuccessors <$> payload msg))
|
|
)
|
|
([],[]) respSet
|
|
-- update successfully responded neighbour in cache
|
|
now <- getPOSIXTime
|
|
maybe (pure ()) (\p -> queueAddEntries (Identity $ RemoteCacheEntry (sender p) now) ns) $ headMay (Set.elems respSet)
|
|
pure $ if null responsePreds && null responseSuccs
|
|
then Left "no neighbours returned"
|
|
else Right (responsePreds, responseSuccs)
|
|
) responses
|
|
|
|
|
|
-- | Send a Leave request to the specified node.
|
|
-- Service data transfer needs to be done separately, as not all neighbours
|
|
-- that need to know about the leaving handle the new service data.
|
|
requestLeave :: LocalNodeState s
|
|
-> Bool -- whether to migrate service data
|
|
-> RemoteNodeState -- target node
|
|
-> IO (Either String ()) -- error or success
|
|
requestLeave ns doMigration target = do
|
|
srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns)
|
|
let leavePayload = LeaveRequestPayload {
|
|
leaveSuccessors = successors ns
|
|
, leavePredecessors = predecessors ns
|
|
, leaveDoMigration = doMigration
|
|
}
|
|
responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close (fmap Right . sendRequestTo (\rid ->
|
|
Request {
|
|
requestID = rid
|
|
, sender = toRemoteNodeState ns
|
|
, part = 1
|
|
, isFinalPart = False
|
|
, action = Leave
|
|
, payload = Just leavePayload
|
|
}
|
|
)
|
|
) `catch` (\e -> pure . Left $ displayException (e :: IOException))
|
|
either
|
|
-- forward IO error messages
|
|
(pure . Left)
|
|
-- empty payload, so no processing required
|
|
(const . pure . Right $ ())
|
|
responses
|
|
|
|
requestPing :: LocalNodeState s -- ^ sending node
|
|
-> RemoteNodeState -- ^ node to be PINGed
|
|
-> IO (Either String [RemoteNodeState]) -- ^ all active vServers of the pinged node
|
|
requestPing ns target = do
|
|
srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns)
|
|
responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close
|
|
(\sock -> do
|
|
resp <- sendRequestTo (\rid ->
|
|
Request {
|
|
requestID = rid
|
|
, sender = toRemoteNodeState ns
|
|
, part = 1
|
|
, isFinalPart = False
|
|
, action = Ping
|
|
, payload = Just PingRequestPayload
|
|
}
|
|
) sock
|
|
(SockAddrInet6 _ _ peerAddr _) <- getPeerName sock
|
|
pure $ Right (peerAddr, resp)
|
|
) `catch` (\e -> pure . Left $ displayException (e :: IOException))
|
|
either
|
|
-- forward IO error messages
|
|
(pure . Left)
|
|
(\(peerAddr, respSet) -> do
|
|
-- fold all reply parts together
|
|
let responseVss = foldr' (\msg acc ->
|
|
maybe acc (foldr' (:) acc) (pingNodeStates <$> payload msg)
|
|
)
|
|
[] respSet
|
|
-- recompute ID for each received node and mark as verified in cache
|
|
now <- getPOSIXTime
|
|
forM_ responseVss (\vs ->
|
|
let recomputedID = genNodeID peerAddr (getDomain vs) (fromInteger $ getVServerID vs)
|
|
in if recomputedID == getNid vs
|
|
then atomically $ writeTQueue (cacheWriteQueue ns) $ addNodeAsVerifiedPure now vs
|
|
else pure ()
|
|
)
|
|
pure $ if null responseVss
|
|
then Left "no active vServer IDs returned, ignoring node"
|
|
else Right responseVss
|
|
) responses
|
|
|
|
|
|
-- | 'sendRequestToWithParams' with default timeout and retries already specified.
|
|
-- Generic function for sending a request over a connected socket and collecting the response.
|
|
-- Serialises the message and tries to deliver its parts for a number of attempts within a default timeout.
|
|
sendRequestTo :: (Integer -> FediChordMessage) -- ^ the message to be sent, still needing a requestID
|
|
-> Socket -- ^ connected socket to use for sending
|
|
-> IO (Set.Set FediChordMessage) -- ^ responses
|
|
sendRequestTo = sendRequestToWithParams 5000 3
|
|
|
|
-- | Generic function for sending a request over a connected socket and collecting the response.
|
|
-- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout.
|
|
sendRequestToWithParams :: Int -- ^ timeout in seconds
|
|
-> Int -- ^ number of retries
|
|
-> (Integer -> FediChordMessage) -- ^ the message to be sent, still needing a requestID
|
|
-> Socket -- ^ connected socket to use for sending
|
|
-> IO (Set.Set FediChordMessage) -- ^ responses
|
|
sendRequestToWithParams timeoutMillis numAttempts msgIncomplete sock = do
|
|
-- give the message a random request ID
|
|
randomID <- randomRIO (0, 2^32-1)
|
|
let
|
|
msgComplete = msgIncomplete randomID
|
|
requests = serialiseMessage sendMessageSize msgComplete
|
|
-- create a queue for passing received response messages back, even after a timeout
|
|
responseQ <- newTBQueueIO $ 2*maximumParts -- keep room for duplicate packets
|
|
-- start sendAndAck with timeout
|
|
attempts numAttempts . timeout timeoutMillis $ sendAndAck responseQ sock requests
|
|
-- after timeout, check received responses, delete them from unacked message set/ map and rerun senAndAck with that if necessary.
|
|
recvdParts <- atomically $ flushTBQueue responseQ
|
|
pure $ Set.fromList recvdParts
|
|
where
|
|
sendAndAck :: TBQueue FediChordMessage -- ^ the queue for putting in the received responses
|
|
-> Socket -- ^ the socket used for sending and receiving for this particular remote node
|
|
-> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts
|
|
-> IO ()
|
|
sendAndAck responseQueue sock remainingSends = do
|
|
sendMany sock $ Map.elems remainingSends
|
|
-- if all requests have been acked/ responded to, return prematurely
|
|
recvLoop responseQueue remainingSends Set.empty Nothing
|
|
recvLoop :: TBQueue FediChordMessage -- ^ the queue for putting in the received responses
|
|
-> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts
|
|
-> Set.Set Integer -- ^ already received response part numbers
|
|
-> Maybe Integer -- ^ total number of response parts if already known
|
|
-> IO ()
|
|
recvLoop responseQueue remainingSends' receivedPartNums totalParts = do
|
|
-- 65535 is maximum length of UDP packets, as long as
|
|
-- no IPv6 jumbograms are used
|
|
response <- deserialiseMessage <$> recv sock 65535
|
|
case response of
|
|
Right msg@Response{} -> do
|
|
atomically $ writeTBQueue responseQueue msg
|
|
let
|
|
newTotalParts = if isFinalPart msg then Just (part msg) else totalParts
|
|
newRemaining = Map.delete (part msg) remainingSends'
|
|
newReceivedParts = Set.insert (part msg) receivedPartNums
|
|
if Map.null newRemaining && maybe False (\p -> Set.size receivedPartNums == fromIntegral p) newTotalParts
|
|
then pure ()
|
|
else recvLoop responseQueue newRemaining receivedPartNums newTotalParts
|
|
-- drop errors and invalid messages
|
|
Left _ -> recvLoop responseQueue remainingSends' receivedPartNums totalParts
|
|
|
|
|
|
-- | enqueue a list of RemoteCacheEntries to be added to the global NodeCache
|
|
queueAddEntries :: Foldable c => c RemoteCacheEntry
|
|
-> LocalNodeState s
|
|
-> IO ()
|
|
queueAddEntries entries ns = do
|
|
now <- getPOSIXTime
|
|
forM_ entries $ \entry -> atomically $ writeTQueue (cacheWriteQueue ns) $ addCacheEntryPure now entry
|
|
|
|
|
|
-- | enque a list of node IDs to be deleted from the global NodeCache
|
|
queueDeleteEntries :: Foldable c
|
|
=> c NodeID
|
|
-> LocalNodeState s
|
|
-> IO ()
|
|
queueDeleteEntries ids ns = forM_ ids $ atomically . writeTQueue (cacheWriteQueue ns) . deleteCacheEntry
|
|
|
|
|
|
-- | enque a single node ID to be deleted from the global NodeCache
|
|
queueDeleteEntry :: NodeID
|
|
-> LocalNodeState s
|
|
-> IO ()
|
|
queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete
|
|
|
|
-- | retry an IO action at most *i* times until it delivers a result
|
|
attempts :: Int -- ^ number of retries *i*
|
|
-> IO (Maybe a) -- ^ action to retry
|
|
-> IO (Maybe a) -- ^ result after at most *i* retries
|
|
attempts 0 _ = pure Nothing
|
|
attempts i action = do
|
|
actionResult <- action
|
|
case actionResult of
|
|
Nothing -> attempts (i-1) action
|
|
Just res -> pure $ Just res
|
|
|
|
-- ====== network socket operations ======
|
|
|
|
-- | resolve a specified host and return the 'AddrInfo' for it.
|
|
-- If no hostname or IP is specified, the 'AddrInfo' can be used to bind to all
|
|
-- addresses;
|
|
-- if no port is specified an arbitrary free port is selected.
|
|
resolve :: Maybe String -- ^ hostname or IP address to be resolved
|
|
-> Maybe PortNumber -- ^ port number of either local bind or remote
|
|
-> IO AddrInfo
|
|
resolve host port = let
|
|
hints = defaultHints { addrFamily = AF_INET6, addrSocketType = Datagram
|
|
, addrFlags = [AI_PASSIVE] }
|
|
in
|
|
head <$> getAddrInfo (Just hints) host (show <$> port)
|
|
|
|
-- | create an unconnected UDP Datagram 'Socket' bound to the specified address
|
|
mkServerSocket :: HostAddress6 -> PortNumber -> IO Socket
|
|
mkServerSocket ip port = do
|
|
sockAddr <- addrAddress <$> resolve (Just $ show . fromHostAddress6 $ ip) (Just port)
|
|
sock <- socket AF_INET6 Datagram defaultProtocol
|
|
setSocketOption sock IPv6Only 1
|
|
bind sock sockAddr
|
|
pure sock
|
|
|
|
-- | create a UDP datagram socket, connected to a destination.
|
|
-- The socket gets an arbitrary free local port assigned.
|
|
mkSendSocket :: HostAddress6 -- ^ source address
|
|
-> String -- ^ destination hostname or IP
|
|
-> PortNumber -- ^ destination port
|
|
-> IO Socket -- ^ a socket with an arbitrary source port
|
|
mkSendSocket srcIp dest destPort = do
|
|
srcAddr <- addrAddress <$> resolve (Just $ show . fromHostAddress6 $ srcIp) Nothing
|
|
destAddr <- addrAddress <$> resolve (Just dest) (Just destPort)
|
|
sendSock <- socket AF_INET6 Datagram defaultProtocol
|
|
setSocketOption sendSock IPv6Only 1
|
|
-- bind to the configured local IP to make sure that outgoing packets are sent from
|
|
-- this source address
|
|
bind sendSock srcAddr
|
|
connect sendSock destAddr
|
|
pure sendSock
|