diff --git a/Hash2Pub.cabal b/Hash2Pub.cabal index 2d195e3..2cc2d84 100644 --- a/Hash2Pub.cabal +++ b/Hash2Pub.cabal @@ -47,7 +47,7 @@ extra-source-files: CHANGELOG.md common deps build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms, http-client, http-types, unbounded-delays - ghc-options: -Wall -Wpartial-fields + ghc-options: -Wall diff --git a/app/Experiment.hs b/app/Experiment.hs index deb4cae..51b8e88 100644 --- a/app/Experiment.hs +++ b/app/Experiment.hs @@ -1,16 +1,16 @@ -{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE OverloadedStrings #-} module Main where -import Control.Concurrent -import Control.Monad (forM_) -import Control.Monad.IO.Class -import Control.Monad.State.Class -import Control.Monad.State.Strict (evalStateT) -import qualified Network.HTTP.Client as HTTP -import System.Random +import System.Random +import Control.Concurrent +import Control.Monad (forM_) +import Control.Monad.State.Class +import Control.Monad.State.Strict (evalStateT) +import Control.Monad.IO.Class +import qualified Network.HTTP.Client as HTTP -import Hash2Pub.PostService (Hashtag, clientPublishPost) +import Hash2Pub.PostService (clientPublishPost, Hashtag) -- placeholder post data definition diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index fa5a54a..e22834a 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -49,11 +49,9 @@ import Control.Concurrent.STM.TQueue import Control.Concurrent.STM.TVar import Control.Exception import Control.Monad (foldM, forM, forM_, void, when) -import Control.Monad.Except (MonadError (..), runExceptT) -import Control.Monad.IO.Class (MonadIO (..)) import qualified Data.ByteString as BS import Data.Either (rights) -import Data.Foldable (foldl', foldr', foldrM) +import Data.Foldable (foldl', foldr') import Data.Functor.Identity import Data.IP (IPv6, fromHostAddress6, toHostAddress6) @@ -516,28 +514,28 @@ requestJoin toJoinOn ownStateSTM = do ([], Set.empty, Set.empty) responses -- sort, slice and set the accumulated successors and predecessors - -- the contacted node itself is a successor as well and, with few - -- nodes, can be a predecessor as well - newState = setSuccessors (toRemoteNodeState toJoinOn:Set.elems succAccSet) . setPredecessors (toRemoteNodeState toJoinOn:Set.elems predAccSet) $ stateSnap + newState = setSuccessors (Set.elems succAccSet) . setPredecessors (Set.elems predAccSet) $ stateSnap writeTVar ownStateSTM newState pure (cacheInsertQ, newState) -- execute the cache insertions mapM_ (\f -> f joinedState) cacheInsertQ if responses == Set.empty then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn) - else do - -- wait for migration data to be completely received - waitForMigrationFrom (nodeService prn) (getNid toJoinOn) - pure $ Right ownStateSTM + else if null (predecessors joinedState) && null (successors joinedState) + then pure $ Left "join error: no predecessors or successors" + -- successful join + else do + -- wait for migration data to be completely received + waitForMigrationFrom (nodeService prn) (getNid ownState) + pure $ Right ownStateSTM ) `catch` (\e -> pure . Left $ displayException (e :: IOException)) -- | Send a 'QueryID' 'Request' for getting the node that handles a certain key ID. -requestQueryID :: (MonadIO m, MonadError String m) - => LocalNodeState s -- ^ NodeState of the querying node +requestQueryID :: LocalNodeState s -- ^ NodeState of the querying node -> NodeID -- ^ target key ID to look up - -> m RemoteNodeState -- ^ the node responsible for handling that key + -> IO RemoteNodeState -- ^ the node responsible for handling that key -- 1. do a local lookup for the l closest nodes -- 2. create l sockets -- 3. send a message async concurrently to all l nodes @@ -545,23 +543,23 @@ requestQueryID :: (MonadIO m, MonadError String m) -- 5. repeat until FOUND (problem: new entries not necessarily already in cache, explicitly compare with closer results) -- TODO: deal with lookup failures requestQueryID ns targetID = do - firstCacheSnapshot <- liftIO . readTVarIO . nodeCacheSTM $ ns + firstCacheSnapshot <- readTVarIO . nodeCacheSTM $ ns -- TODO: make maxAttempts configurable queryIdLookupLoop firstCacheSnapshot ns 50 targetID -- | like 'requestQueryID, but allows passing of a custom cache, e.g. for joining -queryIdLookupLoop :: (MonadIO m, MonadError String m) => NodeCache -> LocalNodeState s -> Int -> NodeID -> m RemoteNodeState +queryIdLookupLoop :: NodeCache -> LocalNodeState s -> Int -> NodeID -> IO RemoteNodeState -- return node itself as default fallback value against infinite recursion. -- TODO: consider using an Either instead of a default value -queryIdLookupLoop _ ns 0 _ = throwError "exhausted maximum lookup attempts" +queryIdLookupLoop _ ns 0 _ = pure $ toRemoteNodeState ns queryIdLookupLoop cacheSnapshot ns maxAttempts targetID = do let localResult = queryLocalCache ns cacheSnapshot (lNumBestNodes ns) targetID -- FOUND can only be returned if targetID is owned by local node case localResult of FOUND thisNode -> pure thisNode FORWARD nodeSet -> do - responseEntries <- liftIO $ sendQueryIdMessages targetID ns Nothing (remoteNode <$> Set.elems nodeSet) - now <- liftIO getPOSIXTime + responseEntries <- sendQueryIdMessages targetID ns Nothing (remoteNode <$> Set.elems nodeSet) + now <- getPOSIXTime -- check for a FOUND and return it case responseEntries of FOUND foundNode -> pure foundNode @@ -595,10 +593,8 @@ sendQueryIdMessages targetID ns lParam targets = do -- insert new cache entries both into global cache as well as return accumulated QueryResponses for further processing now <- getPOSIXTime -- collect cache entries from all responses - foldrM (\resp acc -> do - let - responseResult = queryResult <$> payload resp - entrySet = case responseResult of + foldM (\acc resp -> do + let entrySet = case queryResult <$> payload resp of Just (FOUND result1) -> Set.singleton (RemoteCacheEntry result1 now) Just (FORWARD resultset) -> resultset _ -> Set.empty @@ -608,15 +604,10 @@ sendQueryIdMessages targetID ns lParam targets = do -- return accumulated QueryResult pure $ case acc of -- once a FOUND as been encountered, return this as a result - FOUND{} -> acc - FORWARD accSet - | maybe False isFound responseResult -> fromJust responseResult - | otherwise -> FORWARD $ entrySet `Set.union` accSet + isFound@FOUND{} -> isFound + FORWARD accSet -> FORWARD $ entrySet `Set.union` accSet ) (FORWARD Set.empty) responses - where - isFound FOUND{} = True - isFound _ = False -- | Create a QueryID message to be supplied to 'sendRequestTo' lookupMessage :: Integral i @@ -658,7 +649,8 @@ requestStabilise ns neighbour = do ) ([],[]) respSet -- update successfully responded neighbour in cache - maybe (pure ()) (\p -> queueUpdateVerifieds (Identity $ senderID p) ns) $ headMay (Set.elems respSet) + now <- getPOSIXTime + maybe (pure ()) (\p -> queueAddEntries (Identity $ RemoteCacheEntry (sender p) now) ns) $ headMay (Set.elems respSet) pure $ if null responsePreds && null responseSuccs then Left "no neighbours returned" else Right (responsePreds, responseSuccs) @@ -774,14 +766,14 @@ sendRequestToWithParams timeoutMillis numAttempts msgIncomplete sock = do -> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts -> IO () sendAndAck responseQueue sock' remainingSends = do - sendMany sock' $ Map.elems remainingSends + sendMany sock $ Map.elems remainingSends -- if all requests have been acked/ responded to, return prematurely recvLoop sock' responseQueue remainingSends Set.empty Nothing recvLoop :: Socket -> TBQueue FediChordMessage -- ^ the queue for putting in the received responses -> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts -> Set.Set Integer -- ^ already received response part numbers - -> Maybe Integer -- ^ total number of response parts if already known + -> Maybe Integer -- ^ total number of response parts if already known -> IO () recvLoop sock' responseQueue remainingSends' receivedPartNums totalParts = do -- 65535 is maximum length of UDP packets, as long as @@ -794,11 +786,10 @@ sendRequestToWithParams timeoutMillis numAttempts msgIncomplete sock = do newTotalParts = if isFinalPart msg then Just (part msg) else totalParts newRemaining = Map.delete (part msg) remainingSends' newReceivedParts = Set.insert (part msg) receivedPartNums - if Map.null newRemaining && maybe False (\p -> Set.size newReceivedParts == fromIntegral p) newTotalParts + if Map.null newRemaining && maybe False (\p -> Set.size receivedPartNums == fromIntegral p) newTotalParts then pure () - else recvLoop sock' responseQueue newRemaining newReceivedParts newTotalParts + else recvLoop sock' responseQueue newRemaining receivedPartNums newTotalParts -- drop errors and invalid messages - Right Request{} -> pure () -- expecting a response, not a request Left _ -> recvLoop sock' responseQueue remainingSends' receivedPartNums totalParts @@ -825,18 +816,6 @@ queueDeleteEntry :: NodeID -> IO () queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete - --- | enqueue the timestamp update and verification marking of an entry in the --- global 'NodeCache'. -queueUpdateVerifieds :: Foldable c - => c NodeID - -> LocalNodeState s - -> IO () -queueUpdateVerifieds nIds ns = do - now <- getPOSIXTime - forM_ nIds $ \nid' -> atomically $ writeTQueue (cacheWriteQueue ns) $ - markCacheEntryAsVerified (Just now) nid' - -- | retry an IO action at most *i* times until it delivers a result attempts :: Int -- ^ number of retries *i* -> IO (Maybe a) -- ^ action to retry diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index 45d0bf9..54c5e9a 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -166,7 +166,6 @@ fediChordBootstrapJoin nsSTM bootstrapNode = do currentlyResponsible <- liftEither lookupResp liftIO . putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible) -- 2. then send a join to the currently responsible node - liftIO $ putStrLn "send a bootstrap Join" joinResult <- liftIO $ requestJoin currentlyResponsible nsSTM liftEither joinResult @@ -245,24 +244,26 @@ bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do Just (FORWARD resultset) -> foldr' (addCacheEntryPure now) cacheAcc resultset ) initCache resp - currentlyResponsible <- runExceptT $ queryIdLookupLoop bootstrapCache ns 50 $ getNid ns - pure currentlyResponsible + currentlyResponsible <- queryIdLookupLoop bootstrapCache ns 50 $ getNid ns + pure $ Right currentlyResponsible -- | join a node to the DHT using the global node cache -- node's position. -fediChordVserverJoin :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s)) +fediChordVserverJoin :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -- ^ the local 'NodeState' - -> m (LocalNodeStateSTM s) -- ^ the joined 'NodeState' after a + -> IO (Either String (LocalNodeStateSTM s)) -- ^ the joined 'NodeState' after a -- successful join, otherwise an error message fediChordVserverJoin nsSTM = do - ns <- liftIO $ readTVarIO nsSTM + ns <- readTVarIO nsSTM -- 1. get routed to the currently responsible node currentlyResponsible <- requestQueryID ns $ getNid ns - liftIO . putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible) + putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible) -- 2. then send a join to the currently responsible node - joinResult <- liftIO $ requestJoin currentlyResponsible nsSTM - liftEither joinResult + joinResult <- requestJoin currentlyResponsible nsSTM + case joinResult of + Left err -> pure . Left $ "Error joining on " <> err + Right joinedNS -> pure . Right $ joinedNS fediChordVserverLeave :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s)) => LocalNodeState s -> m () fediChordVserverLeave ns = do @@ -322,7 +323,7 @@ joinOnNewEntriesThread nsSTM = loop pure () -- otherwise try joining FORWARD _ -> do - joinResult <- runExceptT $ fediChordVserverJoin nsSTM + joinResult <- fediChordVserverJoin nsSTM either -- on join failure, sleep and retry -- TODO: make delay configurable @@ -503,26 +504,18 @@ stabiliseThread nsSTM = forever $ do -- try looking up additional neighbours if list too short forM_ [(length $ predecessors updatedNs)..(kNeighbours updatedNs)] (\_ -> do ns' <- readTVarIO nsSTM - nextEntry <- runExceptT . requestQueryID ns' $ pred . getNid $ lastDef (toRemoteNodeState ns') (predecessors ns') - either - (const $ pure ()) - (\entry -> atomically $ do - latestNs <- readTVar nsSTM - writeTVar nsSTM $ addPredecessors [entry] latestNs - ) - nextEntry + nextEntry <- requestQueryID ns' $ pred . getNid $ lastDef (toRemoteNodeState ns') (predecessors ns') + atomically $ do + latestNs <- readTVar nsSTM + writeTVar nsSTM $ addPredecessors [nextEntry] latestNs ) forM_ [(length $ successors updatedNs)..(kNeighbours updatedNs)] (\_ -> do ns' <- readTVarIO nsSTM - nextEntry <- runExceptT . requestQueryID ns' $ succ . getNid $ lastDef (toRemoteNodeState ns') (successors ns') - either - (const $ pure ()) - (\entry -> atomically $ do - latestNs <- readTVar nsSTM - writeTVar nsSTM $ addSuccessors [entry] latestNs - ) - nextEntry + nextEntry <- requestQueryID ns' $ succ . getNid $ lastDef (toRemoteNodeState ns') (successors ns') + atomically $ do + latestNs <- readTVar nsSTM + writeTVar nsSTM $ addSuccessors [nextEntry] latestNs ) newNs <- readTVarIO nsSTM @@ -645,7 +638,7 @@ requestMapPurge :: MVar RequestMap -> IO () requestMapPurge mapVar = forever $ do rMapState <- takeMVar mapVar now <- getPOSIXTime - putMVar mapVar $ Map.filter (\(RequestMapEntry _ _ ts) -> + putMVar mapVar $ Map.filter (\entry@(RequestMapEntry _ _ ts) -> now - ts < responsePurgeAge ) rMapState threadDelay $ round responsePurgeAge * 2 * 10^6 @@ -764,7 +757,7 @@ getKeyResponsibility nodeSTM lookupKey = do -- new entry. -- If no vserver is active in the DHT, 'Nothing' is returned. updateLookupCache :: RealNodeSTM s -> NodeID -> IO (Maybe (String, PortNumber)) -updateLookupCache nodeSTM keyToLookup = do +updateLookupCache nodeSTM lookupKey = do (node, lookupSource) <- atomically $ do node <- readTVar nodeSTM let firstVs = headMay (vservers node) @@ -774,25 +767,18 @@ updateLookupCache nodeSTM keyToLookup = do pure (node, lookupSource) maybe (do -- if no local node available, delete cache entry and return Nothing - atomically $ modifyTVar' (lookupCacheSTM node) $ Map.delete keyToLookup + atomically $ modifyTVar' (lookupCacheSTM node) $ Map.delete lookupKey pure Nothing ) (\n -> do -- start a lookup from the node, update the cache with the lookup result and return it - -- TODO: better retry management, because having no vserver joined yet should - -- be treated differently than other reasons for not getting a result. - newResponsible <- runExceptT $ requestQueryID n keyToLookup - either - (const $ pure Nothing) - (\result -> do - let newEntry = (getDomain result, getServicePort result) - now <- getPOSIXTime - -- atomic update against lost updates - atomically $ modifyTVar' (lookupCacheSTM node) $ - Map.insert keyToLookup (CacheEntry False newEntry now) - pure $ Just newEntry - ) - newResponsible + newResponsible <- requestQueryID n lookupKey + let newEntry = (getDomain newResponsible, getServicePort newResponsible) + now <- getPOSIXTime + -- atomic update against lost updates + atomically $ modifyTVar' (lookupCacheSTM node) $ + Map.insert lookupKey (CacheEntry False newEntry now) + pure $ Just newEntry ) lookupSource diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index 81cf552..fe013a0 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -10,30 +10,37 @@ module Hash2Pub.PostService where import Control.Concurrent import Control.Concurrent.Async +import Control.Concurrent.MVar import Control.Concurrent.STM -import Control.Exception (Exception (..), try) -import Control.Monad (foldM, forM, forM_, forever, void, - when) -import Control.Monad.IO.Class (liftIO) +import Control.Concurrent.STM.TChan +import Control.Concurrent.STM.TChan +import Control.Concurrent.STM.TQueue +import Control.Concurrent.STM.TVar +import Control.Exception (Exception (..), try) +import Control.Monad (foldM, forM, forM_, forever, when, void) +import Control.Monad.IO.Class (liftIO) +import Control.Monad.STM import Data.Bifunctor -import qualified Data.ByteString.Lazy.UTF8 as BSUL -import qualified Data.ByteString.UTF8 as BSU -import qualified Data.HashMap.Strict as HMap -import qualified Data.HashSet as HSet -import Data.Maybe (fromMaybe, isJust) -import Data.String (fromString) -import qualified Data.Text.Lazy as Txt -import Data.Text.Normalize (NormalizationMode (NFC), normalize) +import qualified Data.ByteString.Lazy.UTF8 as BSUL +import qualified Data.ByteString.UTF8 as BSU +import qualified Data.HashMap.Strict as HMap +import qualified Data.HashSet as HSet +import Data.Maybe (fromMaybe, isJust) +import Data.String (fromString) +import qualified Data.Text.Lazy as Txt +import Data.Text.Normalize (NormalizationMode (NFC), + normalize) import Data.Time.Clock.POSIX -import Data.Typeable (Typeable) -import qualified Network.HTTP.Client as HTTP -import qualified Network.HTTP.Types as HTTPT +import Data.Typeable (Typeable) +import qualified Network.HTTP.Client as HTTP +import qualified Network.HTTP.Types as HTTPT import System.Random -import Text.Read (readEither) +import Text.Read (readEither) -import qualified Network.Wai.Handler.Warp as Warp +import qualified Network.Wai.Handler.Warp as Warp import Servant import Servant.Client +import Servant.Server import Hash2Pub.FediChordTypes import Hash2Pub.RingMap @@ -348,7 +355,7 @@ clientDeliverSubscriptions :: PostService d -> (String, Int) -- ^ hostname and port of instance to deliver to -> IO (Either String ()) -- Either signals success or failure clientDeliverSubscriptions serv fromNode fromKey toKey (toHost, toPort) = do - -- collect tag interval + -- collect tag intearval intervalTags <- takeRMapSuccessorsFromTo fromKey toKey <$> readTVarIO (subscribers serv) -- returns a [ (TagSubscribersSTM, TChan PostID, Hashtag) ] -- extract subscribers and posts