send stabilise to certain successor or predecessor

This commit is contained in:
Trolli Schmittlauch 2020-06-12 15:48:56 +02:00
parent 1204457a2a
commit 2739b47162
2 changed files with 54 additions and 16 deletions

View file

@ -16,6 +16,7 @@ module Hash2Pub.DHTProtocol
, sendQueryIdMessage
, requestQueryID
, requestJoin
, requestStabilise
, queryIdLookupLoop
, resolve
, mkSendSocket
@ -485,14 +486,14 @@ requestStabilise :: LocalNodeState -- ^ sending node
-> RemoteNodeState -- ^ neighbour node to send to
-> IO (Either String ([RemoteNodeState], [RemoteNodeState])) -- ^ (predecessors, successors) of responding node
requestStabilise ns neighbour = do
responses <- bracket (mkSendSocket (getDomain neighbour) (getDhtPort neighbour)) close (sendRequestTo 5000 3 (\rid ->
responses <- bracket (mkSendSocket (getDomain neighbour) (getDhtPort neighbour)) close (fmap Right . sendRequestTo 5000 3 (\rid ->
Request {
requestID = rid
, sender = toRemoteNodeState ns
, part = 1
, isFinalPart = False
, action = Stabilise
, payload = Just StabiliseRequestPaylod
, payload = Just StabiliseRequestPayload
}
)
) `catch` (\e -> pure . Left $ displayException (e :: IOException))
@ -510,7 +511,7 @@ requestStabilise ns neighbour = do
then Left "no neighbours returned"
else Right (responsePreds, responseSuccs)
) responses

View file

@ -1,5 +1,6 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE OverloadedStrings #-}
{- |
Module : FediChord
@ -48,33 +49,33 @@ module Hash2Pub.FediChord (
) where
import Control.Applicative ((<|>))
import Control.Exception
import Data.Foldable (foldr')
import qualified Data.Map.Strict as Map
import Data.Maybe (fromJust, fromMaybe, isJust,
mapMaybe)
import qualified Data.Set as Set
import Data.Time.Clock.POSIX
import Network.Socket hiding (recv, recvFrom, send,
sendTo)
import Network.Socket.ByteString
-- for hashing and ID conversion
import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Concurrent.STM.TQueue
import Control.Concurrent.STM.TVar
import Control.Exception
import Control.Monad (forM_, forever)
import Control.Monad.Except
import Crypto.Hash
import qualified Data.ByteArray as BA
import qualified Data.ByteString as BS
import qualified Data.ByteString.UTF8 as BSU
import Data.Foldable (foldr')
import Data.IP (IPv6, fromHostAddress6,
toHostAddress6)
import qualified Data.Map.Strict as Map
import Data.Maybe (fromJust, fromMaybe, isJust,
mapMaybe)
import qualified Data.Set as Set
import Data.Time.Clock.POSIX
import Data.Typeable (Typeable (..), typeOf)
import Data.Word
import qualified Network.ByteOrder as NetworkBytes
import Network.Socket hiding (recv, recvFrom, send,
sendTo)
import Network.Socket.ByteString
import Safe
import Hash2Pub.DHTProtocol
import Hash2Pub.FediChordTypes
@ -179,6 +180,43 @@ cacheWriter nsSTM =
cacheModifier <- readTQueue $ cacheWriteQueue ns
modifyTVar' (nodeCacheSTM ns) cacheModifier
stabiliseThread :: LocalNodeStateSTM -> IO ()
stabiliseThread nsSTM = do
-- TODO: update successfully stabilised nodes in cache
-- placeholder
stabiliseNeighbour nsSTM successors setSuccessors 1
pure ()
where
stabiliseNeighbour :: LocalNodeStateSTM
-> (LocalNodeState -> [RemoteNodeState])
-> ([RemoteNodeState] -> LocalNodeState -> LocalNodeState)
-> Int
-> IO (Either String ())
stabiliseNeighbour nsSTM neighbourGetter neighbourSetter neighbourNum = do
nsSnap <- readTVarIO nsSTM
let chosenNode = maybe (Left "no such neighbour entry") Right $ atMay (neighbourGetter nsSnap) neighbourNum
-- returning @Left@ signifies the need to try again with next from list
runExceptT $ requestToNeighbour nsSnap chosenNode >>= parseNeighbourResponse
requestToNeighbour :: (MonadError String m, MonadIO m)
=> LocalNodeState
-> Either String RemoteNodeState
-> m (Either String ([RemoteNodeState],[RemoteNodeState]))
requestToNeighbour _ (Left err) = throwError err
requestToNeighbour ns (Right n) = liftIO $ requestStabilise ns n
parseNeighbourResponse :: (MonadError String m, MonadIO m)
=> Either String ([RemoteNodeState], [RemoteNodeState])
-> m ()
parseNeighbourResponse (Left err) = throwError err
parseNeighbourResponse (Right (succs, preds)) = liftIO $ do
atomically $ do
newerNsSnap <- readTVar nsSTM
writeTVar nsSTM $ setPredecessors (predecessors newerNsSnap <> preds) . setSuccessors (successors newerNsSnap <> succs) $ newerNsSnap
pure ()
-- periodically contact immediate successor and predecessor
-- If they respond, see whether there is a closer neighbour in between
-- If they don't respond, drop them and try the next one
-- | Receives UDP packets and passes them to other threads via the given TQueue.
-- Shall be used as the single receiving thread on the server socket, as multiple
-- threads blocking on the same socket degrades performance.
@ -233,7 +271,6 @@ requestMapPurge mapVar = forever $ do
threadDelay $ fromEnum purgeAge * 2000
-- | Wait for messages, deserialise them, manage parts and acknowledgement status,
-- and pass them to their specific handling function.
fediMessageHandler :: TQueue (BS.ByteString, SockAddr) -- ^ send queue