diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index 73e564f..6469e1c 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -16,6 +16,7 @@ module Hash2Pub.DHTProtocol , sendQueryIdMessage , requestQueryID , requestJoin + , requestStabilise , queryIdLookupLoop , resolve , mkSendSocket @@ -485,14 +486,14 @@ requestStabilise :: LocalNodeState -- ^ sending node -> RemoteNodeState -- ^ neighbour node to send to -> IO (Either String ([RemoteNodeState], [RemoteNodeState])) -- ^ (predecessors, successors) of responding node requestStabilise ns neighbour = do - responses <- bracket (mkSendSocket (getDomain neighbour) (getDhtPort neighbour)) close (sendRequestTo 5000 3 (\rid -> + responses <- bracket (mkSendSocket (getDomain neighbour) (getDhtPort neighbour)) close (fmap Right . sendRequestTo 5000 3 (\rid -> Request { requestID = rid , sender = toRemoteNodeState ns , part = 1 , isFinalPart = False , action = Stabilise - , payload = Just StabiliseRequestPaylod + , payload = Just StabiliseRequestPayload } ) ) `catch` (\e -> pure . Left $ displayException (e :: IOException)) @@ -510,7 +511,7 @@ requestStabilise ns neighbour = do then Left "no neighbours returned" else Right (responsePreds, responseSuccs) ) responses - + diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index f66cafc..6a66220 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -1,5 +1,6 @@ {-# LANGUAGE DataKinds #-} {-# LANGUAGE DerivingStrategies #-} +{-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE OverloadedStrings #-} {- | Module : FediChord @@ -48,33 +49,33 @@ module Hash2Pub.FediChord ( ) where import Control.Applicative ((<|>)) -import Control.Exception -import Data.Foldable (foldr') -import qualified Data.Map.Strict as Map -import Data.Maybe (fromJust, fromMaybe, isJust, - mapMaybe) -import qualified Data.Set as Set -import Data.Time.Clock.POSIX -import Network.Socket hiding (recv, recvFrom, send, - sendTo) -import Network.Socket.ByteString - --- for hashing and ID conversion import Control.Concurrent import Control.Concurrent.Async import Control.Concurrent.STM import Control.Concurrent.STM.TQueue import Control.Concurrent.STM.TVar +import Control.Exception import Control.Monad (forM_, forever) +import Control.Monad.Except import Crypto.Hash import qualified Data.ByteArray as BA import qualified Data.ByteString as BS import qualified Data.ByteString.UTF8 as BSU +import Data.Foldable (foldr') import Data.IP (IPv6, fromHostAddress6, toHostAddress6) +import qualified Data.Map.Strict as Map +import Data.Maybe (fromJust, fromMaybe, isJust, + mapMaybe) +import qualified Data.Set as Set +import Data.Time.Clock.POSIX import Data.Typeable (Typeable (..), typeOf) import Data.Word import qualified Network.ByteOrder as NetworkBytes +import Network.Socket hiding (recv, recvFrom, send, + sendTo) +import Network.Socket.ByteString +import Safe import Hash2Pub.DHTProtocol import Hash2Pub.FediChordTypes @@ -179,6 +180,43 @@ cacheWriter nsSTM = cacheModifier <- readTQueue $ cacheWriteQueue ns modifyTVar' (nodeCacheSTM ns) cacheModifier +stabiliseThread :: LocalNodeStateSTM -> IO () +stabiliseThread nsSTM = do + -- TODO: update successfully stabilised nodes in cache + -- placeholder + stabiliseNeighbour nsSTM successors setSuccessors 1 + pure () + where + stabiliseNeighbour :: LocalNodeStateSTM + -> (LocalNodeState -> [RemoteNodeState]) + -> ([RemoteNodeState] -> LocalNodeState -> LocalNodeState) + -> Int + -> IO (Either String ()) + stabiliseNeighbour nsSTM neighbourGetter neighbourSetter neighbourNum = do + nsSnap <- readTVarIO nsSTM + let chosenNode = maybe (Left "no such neighbour entry") Right $ atMay (neighbourGetter nsSnap) neighbourNum + -- returning @Left@ signifies the need to try again with next from list + runExceptT $ requestToNeighbour nsSnap chosenNode >>= parseNeighbourResponse + requestToNeighbour :: (MonadError String m, MonadIO m) + => LocalNodeState + -> Either String RemoteNodeState + -> m (Either String ([RemoteNodeState],[RemoteNodeState])) + requestToNeighbour _ (Left err) = throwError err + requestToNeighbour ns (Right n) = liftIO $ requestStabilise ns n + parseNeighbourResponse :: (MonadError String m, MonadIO m) + => Either String ([RemoteNodeState], [RemoteNodeState]) + -> m () + parseNeighbourResponse (Left err) = throwError err + parseNeighbourResponse (Right (succs, preds)) = liftIO $ do + atomically $ do + newerNsSnap <- readTVar nsSTM + writeTVar nsSTM $ setPredecessors (predecessors newerNsSnap <> preds) . setSuccessors (successors newerNsSnap <> succs) $ newerNsSnap + pure () + +-- periodically contact immediate successor and predecessor +-- If they respond, see whether there is a closer neighbour in between +-- If they don't respond, drop them and try the next one + -- | Receives UDP packets and passes them to other threads via the given TQueue. -- Shall be used as the single receiving thread on the server socket, as multiple -- threads blocking on the same socket degrades performance. @@ -233,7 +271,6 @@ requestMapPurge mapVar = forever $ do threadDelay $ fromEnum purgeAge * 2000 - -- | Wait for messages, deserialise them, manage parts and acknowledgement status, -- and pass them to their specific handling function. fediMessageHandler :: TQueue (BS.ByteString, SockAddr) -- ^ send queue