diff --git a/Hash2Pub.cabal b/Hash2Pub.cabal index 2d195e3..5ffff0d 100644 --- a/Hash2Pub.cabal +++ b/Hash2Pub.cabal @@ -46,8 +46,8 @@ category: Network extra-source-files: CHANGELOG.md common deps - build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms, http-client, http-types, unbounded-delays - ghc-options: -Wall -Wpartial-fields + build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms, http-client, http-types + ghc-options: -Wall @@ -93,20 +93,6 @@ executable Hash2Pub ghc-options: -threaded -executable Experiment - -- experiment runner - import: deps - - build-depends: Hash2Pub - - main-is: Experiment.hs - - hs-source-dirs: app - - default-language: Haskell2010 - - ghc-options: -threaded - test-suite Hash2Pub-test -- Test dependencies. diff --git a/app/Experiment.hs b/app/Experiment.hs deleted file mode 100644 index deb4cae..0000000 --- a/app/Experiment.hs +++ /dev/null @@ -1,44 +0,0 @@ -{-# LANGUAGE OverloadedStrings #-} - -module Main where - -import Control.Concurrent -import Control.Monad (forM_) -import Control.Monad.IO.Class -import Control.Monad.State.Class -import Control.Monad.State.Strict (evalStateT) -import qualified Network.HTTP.Client as HTTP -import System.Random - -import Hash2Pub.PostService (Hashtag, clientPublishPost) - --- placeholder post data definition - -tagsToPostTo = [ "JustSomeTag", "WantAnotherTag234", "HereWeGoAgain", "Oyä", "通信端末" ] - -knownRelays :: [(String, Int)] -knownRelays = - [ ("animalliberation.social", 3342) - , ("hostux.social", 3343) - , ("social.diskseven.com", 3344) - , ("social.imirhil.fr", 3345) - ] - -main :: IO () -main = do - -- initialise HTTP manager - httpMan <- HTTP.newManager HTTP.defaultManagerSettings - -- initialise RNG - let initRGen = mkStdGen 12 - -- cycle through tags and post to a random instance - evalStateT (forM_ (cycle tagsToPostTo) $ publishPostRandom httpMan) initRGen - -- wait for a specified time - -publishPostRandom :: (RandomGen g, MonadIO m, MonadState g m) => HTTP.Manager -> Hashtag -> m () -publishPostRandom httpman tag = do - index <- state $ randomR (0, length knownRelays - 1) - let (pubHost, pubPort) = knownRelays !! index - _ <- liftIO . forkIO $ do - postResult <- liftIO $ clientPublishPost httpman pubHost pubPort ("foobar #" <> tag) - either putStrLn (const $ pure ()) postResult - liftIO $ threadDelay 500 diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index fa5a54a..bd7953f 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -49,11 +49,9 @@ import Control.Concurrent.STM.TQueue import Control.Concurrent.STM.TVar import Control.Exception import Control.Monad (foldM, forM, forM_, void, when) -import Control.Monad.Except (MonadError (..), runExceptT) -import Control.Monad.IO.Class (MonadIO (..)) import qualified Data.ByteString as BS import Data.Either (rights) -import Data.Foldable (foldl', foldr', foldrM) +import Data.Foldable (foldl', foldr') import Data.Functor.Identity import Data.IP (IPv6, fromHostAddress6, toHostAddress6) @@ -108,6 +106,9 @@ queryLocalCache ownState nCache lBestNodes targetID -- the closest succeeding node (like with the p initiated parallel queries | otherwise = FORWARD $ closestSuccessor `Set.union` closestCachePredecessors (lBestNodes-1) targetID nCache where + ownID = getNid ownState + preds = predecessors ownState + closestSuccessor :: Set.Set RemoteCacheEntry closestSuccessor = maybe Set.empty (Set.singleton . toRemoteCacheEntry) $ cacheLookupSucc targetID nCache @@ -258,6 +259,7 @@ handleIncomingRequest :: Service s (RealNodeSTM s) -> SockAddr -- ^ source address of the request -> IO () handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do + putStrLn $ "handling incoming request: " <> show msgSet ns <- readTVarIO nsSTM -- add nodestate to cache now <- getPOSIXTime @@ -312,6 +314,7 @@ handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do -- | execute a key ID lookup on local cache and respond with the result respondQueryID :: LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString) respondQueryID nsSTM msgSet = do + putStrLn "responding to a QueryID request" -- this message cannot be split reasonably, so just -- consider the first payload let @@ -432,9 +435,7 @@ respondJoin nsSTM msgSet = do let aRequestPart = Set.elemAt 0 msgSet senderNS = sender aRequestPart - -- if not joined yet, attract responsibility for - -- all keys to make bootstrapping possible - responsibilityLookup = if isJoined nsSnap then queryLocalCache nsSnap cache 1 (getNid senderNS) else FOUND (toRemoteNodeState nsSnap) + responsibilityLookup = queryLocalCache nsSnap cache 1 (getNid senderNS) thisNodeResponsible (FOUND _) = True thisNodeResponsible (FORWARD _) = False -- check whether the joining node falls into our responsibility @@ -491,7 +492,7 @@ requestJoin toJoinOn ownStateSTM = do srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ownState) bracket (mkSendSocket srcAddr (getDomain toJoinOn) (getDhtPort toJoinOn)) close (\sock -> do -- extract own state for getting request information - responses <- sendRequestTo (\rid -> Request rid (toRemoteNodeState ownState) 1 True Join (Just JoinRequestPayload)) sock + responses <- sendRequestTo 5000 3 (\rid -> Request rid (toRemoteNodeState ownState) 1 True Join (Just JoinRequestPayload)) sock (cacheInsertQ, joinedState) <- atomically $ do stateSnap <- readTVar ownStateSTM let @@ -516,28 +517,28 @@ requestJoin toJoinOn ownStateSTM = do ([], Set.empty, Set.empty) responses -- sort, slice and set the accumulated successors and predecessors - -- the contacted node itself is a successor as well and, with few - -- nodes, can be a predecessor as well - newState = setSuccessors (toRemoteNodeState toJoinOn:Set.elems succAccSet) . setPredecessors (toRemoteNodeState toJoinOn:Set.elems predAccSet) $ stateSnap + newState = setSuccessors (Set.elems succAccSet) . setPredecessors (Set.elems predAccSet) $ stateSnap writeTVar ownStateSTM newState pure (cacheInsertQ, newState) -- execute the cache insertions mapM_ (\f -> f joinedState) cacheInsertQ if responses == Set.empty then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn) - else do - -- wait for migration data to be completely received - waitForMigrationFrom (nodeService prn) (getNid toJoinOn) - pure $ Right ownStateSTM + else if null (predecessors joinedState) && null (successors joinedState) + then pure $ Left "join error: no predecessors or successors" + -- successful join + else do + -- wait for migration data to be completely received + waitForMigrationFrom (nodeService prn) (getNid ownState) + pure $ Right ownStateSTM ) `catch` (\e -> pure . Left $ displayException (e :: IOException)) -- | Send a 'QueryID' 'Request' for getting the node that handles a certain key ID. -requestQueryID :: (MonadIO m, MonadError String m) - => LocalNodeState s -- ^ NodeState of the querying node +requestQueryID :: LocalNodeState s -- ^ NodeState of the querying node -> NodeID -- ^ target key ID to look up - -> m RemoteNodeState -- ^ the node responsible for handling that key + -> IO RemoteNodeState -- ^ the node responsible for handling that key -- 1. do a local lookup for the l closest nodes -- 2. create l sockets -- 3. send a message async concurrently to all l nodes @@ -545,23 +546,23 @@ requestQueryID :: (MonadIO m, MonadError String m) -- 5. repeat until FOUND (problem: new entries not necessarily already in cache, explicitly compare with closer results) -- TODO: deal with lookup failures requestQueryID ns targetID = do - firstCacheSnapshot <- liftIO . readTVarIO . nodeCacheSTM $ ns + firstCacheSnapshot <- readTVarIO . nodeCacheSTM $ ns -- TODO: make maxAttempts configurable queryIdLookupLoop firstCacheSnapshot ns 50 targetID -- | like 'requestQueryID, but allows passing of a custom cache, e.g. for joining -queryIdLookupLoop :: (MonadIO m, MonadError String m) => NodeCache -> LocalNodeState s -> Int -> NodeID -> m RemoteNodeState +queryIdLookupLoop :: NodeCache -> LocalNodeState s -> Int -> NodeID -> IO RemoteNodeState -- return node itself as default fallback value against infinite recursion. -- TODO: consider using an Either instead of a default value -queryIdLookupLoop _ ns 0 _ = throwError "exhausted maximum lookup attempts" +queryIdLookupLoop _ ns 0 _ = pure $ toRemoteNodeState ns queryIdLookupLoop cacheSnapshot ns maxAttempts targetID = do let localResult = queryLocalCache ns cacheSnapshot (lNumBestNodes ns) targetID -- FOUND can only be returned if targetID is owned by local node case localResult of FOUND thisNode -> pure thisNode FORWARD nodeSet -> do - responseEntries <- liftIO $ sendQueryIdMessages targetID ns Nothing (remoteNode <$> Set.elems nodeSet) - now <- liftIO getPOSIXTime + responseEntries <- sendQueryIdMessages targetID ns Nothing (remoteNode <$> Set.elems nodeSet) + now <- getPOSIXTime -- check for a FOUND and return it case responseEntries of FOUND foundNode -> pure foundNode @@ -587,7 +588,7 @@ sendQueryIdMessages targetID ns lParam targets = do srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns) -- ToDo: make attempts and timeout configurable queryThreads <- mapM (\resultNode -> async $ bracket (mkSendSocket srcAddr (getDomain resultNode) (getDhtPort resultNode)) close ( - sendRequestTo (lookupMessage targetID ns Nothing) + sendRequestTo 5000 3 (lookupMessage targetID ns Nothing) )) targets -- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613 -- ToDo: exception handling, maybe log them @@ -595,10 +596,8 @@ sendQueryIdMessages targetID ns lParam targets = do -- insert new cache entries both into global cache as well as return accumulated QueryResponses for further processing now <- getPOSIXTime -- collect cache entries from all responses - foldrM (\resp acc -> do - let - responseResult = queryResult <$> payload resp - entrySet = case responseResult of + foldM (\acc resp -> do + let entrySet = case queryResult <$> payload resp of Just (FOUND result1) -> Set.singleton (RemoteCacheEntry result1 now) Just (FORWARD resultset) -> resultset _ -> Set.empty @@ -608,15 +607,10 @@ sendQueryIdMessages targetID ns lParam targets = do -- return accumulated QueryResult pure $ case acc of -- once a FOUND as been encountered, return this as a result - FOUND{} -> acc - FORWARD accSet - | maybe False isFound responseResult -> fromJust responseResult - | otherwise -> FORWARD $ entrySet `Set.union` accSet + isFound@FOUND{} -> isFound + FORWARD accSet -> FORWARD $ entrySet `Set.union` accSet ) (FORWARD Set.empty) responses - where - isFound FOUND{} = True - isFound _ = False -- | Create a QueryID message to be supplied to 'sendRequestTo' lookupMessage :: Integral i @@ -636,7 +630,7 @@ requestStabilise :: LocalNodeState s -- ^ sending node -> IO (Either String ([RemoteNodeState], [RemoteNodeState])) -- ^ (predecessors, successors) of responding node requestStabilise ns neighbour = do srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns) - responses <- bracket (mkSendSocket srcAddr (getDomain neighbour) (getDhtPort neighbour)) close (fmap Right . sendRequestTo (\rid -> + responses <- bracket (mkSendSocket srcAddr (getDomain neighbour) (getDhtPort neighbour)) close (fmap Right . sendRequestTo 5000 3 (\rid -> Request { requestID = rid , sender = toRemoteNodeState ns @@ -658,7 +652,8 @@ requestStabilise ns neighbour = do ) ([],[]) respSet -- update successfully responded neighbour in cache - maybe (pure ()) (\p -> queueUpdateVerifieds (Identity $ senderID p) ns) $ headMay (Set.elems respSet) + now <- getPOSIXTime + maybe (pure ()) (\p -> queueAddEntries (Identity $ RemoteCacheEntry (sender p) now) ns) $ headMay (Set.elems respSet) pure $ if null responsePreds && null responseSuccs then Left "no neighbours returned" else Right (responsePreds, responseSuccs) @@ -679,7 +674,7 @@ requestLeave ns doMigration target = do , leavePredecessors = predecessors ns , leaveDoMigration = doMigration } - responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close (fmap Right . sendRequestTo (\rid -> + responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close (fmap Right . sendRequestTo 5000 3 (\rid -> Request { requestID = rid , sender = toRemoteNodeState ns @@ -704,7 +699,7 @@ requestPing ns target = do srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns) responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close (\sock -> do - resp <- sendRequestTo (\rid -> + resp <- sendRequestTo 5000 3 (\rid -> Request { requestID = rid , sender = toRemoteNodeState ns @@ -740,31 +735,25 @@ requestPing ns target = do ) responses --- | 'sendRequestToWithParams' with default timeout and retries already specified. --- Generic function for sending a request over a connected socket and collecting the response. --- Serialises the message and tries to deliver its parts for a number of attempts within a default timeout. -sendRequestTo :: (Integer -> FediChordMessage) -- ^ the message to be sent, still needing a requestID - -> Socket -- ^ connected socket to use for sending - -> IO (Set.Set FediChordMessage) -- ^ responses -sendRequestTo = sendRequestToWithParams 5000 3 -- | Generic function for sending a request over a connected socket and collecting the response. -- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout. -sendRequestToWithParams :: Int -- ^ timeout in milliseconds - -> Int -- ^ number of retries - -> (Integer -> FediChordMessage) -- ^ the message to be sent, still needing a requestID - -> Socket -- ^ connected socket to use for sending - -> IO (Set.Set FediChordMessage) -- ^ responses -sendRequestToWithParams timeoutMillis numAttempts msgIncomplete sock = do +sendRequestTo :: Int -- ^ timeout in seconds + -> Int -- ^ number of retries + -> (Integer -> FediChordMessage) -- ^ the message to be sent, still needing a requestID + -> Socket -- ^ connected socket to use for sending + -> IO (Set.Set FediChordMessage) -- ^ responses +sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do -- give the message a random request ID randomID <- randomRIO (0, 2^32-1) let msgComplete = msgIncomplete randomID requests = serialiseMessage sendMessageSize msgComplete + putStrLn $ "sending request message " <> show msgComplete -- create a queue for passing received response messages back, even after a timeout responseQ <- newTBQueueIO $ 2*maximumParts -- keep room for duplicate packets -- start sendAndAck with timeout - attempts numAttempts . timeout (timeoutMillis*1000) $ sendAndAck responseQ sock requests + attempts numAttempts . timeout timeoutMillis $ sendAndAck responseQ sock requests -- after timeout, check received responses, delete them from unacked message set/ map and rerun senAndAck with that if necessary. recvdParts <- atomically $ flushTBQueue responseQ pure $ Set.fromList recvdParts @@ -773,20 +762,19 @@ sendRequestToWithParams timeoutMillis numAttempts msgIncomplete sock = do -> Socket -- ^ the socket used for sending and receiving for this particular remote node -> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts -> IO () - sendAndAck responseQueue sock' remainingSends = do - sendMany sock' $ Map.elems remainingSends + sendAndAck responseQueue sock remainingSends = do + sendMany sock $ Map.elems remainingSends -- if all requests have been acked/ responded to, return prematurely - recvLoop sock' responseQueue remainingSends Set.empty Nothing - recvLoop :: Socket - -> TBQueue FediChordMessage -- ^ the queue for putting in the received responses + recvLoop responseQueue remainingSends Set.empty Nothing + recvLoop :: TBQueue FediChordMessage -- ^ the queue for putting in the received responses -> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts -> Set.Set Integer -- ^ already received response part numbers - -> Maybe Integer -- ^ total number of response parts if already known + -> Maybe Integer -- ^ total number of response parts if already known -> IO () - recvLoop sock' responseQueue remainingSends' receivedPartNums totalParts = do + recvLoop responseQueue remainingSends' receivedPartNums totalParts = do -- 65535 is maximum length of UDP packets, as long as -- no IPv6 jumbograms are used - response <- deserialiseMessage <$> recv sock' 65535 + response <- deserialiseMessage <$> recv sock 65535 case response of Right msg@Response{} -> do atomically $ writeTBQueue responseQueue msg @@ -794,12 +782,11 @@ sendRequestToWithParams timeoutMillis numAttempts msgIncomplete sock = do newTotalParts = if isFinalPart msg then Just (part msg) else totalParts newRemaining = Map.delete (part msg) remainingSends' newReceivedParts = Set.insert (part msg) receivedPartNums - if Map.null newRemaining && maybe False (\p -> Set.size newReceivedParts == fromIntegral p) newTotalParts + if Map.null newRemaining && maybe False (\p -> Set.size receivedPartNums == fromIntegral p) newTotalParts then pure () - else recvLoop sock' responseQueue newRemaining newReceivedParts newTotalParts + else recvLoop responseQueue newRemaining receivedPartNums newTotalParts -- drop errors and invalid messages - Right Request{} -> pure () -- expecting a response, not a request - Left _ -> recvLoop sock' responseQueue remainingSends' receivedPartNums totalParts + Left _ -> recvLoop responseQueue remainingSends' receivedPartNums totalParts -- | enqueue a list of RemoteCacheEntries to be added to the global NodeCache @@ -825,18 +812,6 @@ queueDeleteEntry :: NodeID -> IO () queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete - --- | enqueue the timestamp update and verification marking of an entry in the --- global 'NodeCache'. -queueUpdateVerifieds :: Foldable c - => c NodeID - -> LocalNodeState s - -> IO () -queueUpdateVerifieds nIds ns = do - now <- getPOSIXTime - forM_ nIds $ \nid' -> atomically $ writeTQueue (cacheWriteQueue ns) $ - markCacheEntryAsVerified (Just now) nid' - -- | retry an IO action at most *i* times until it delivers a result attempts :: Int -- ^ number of retries *i* -> IO (Maybe a) -- ^ action to retry diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index 45d0bf9..15563de 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -166,7 +166,6 @@ fediChordBootstrapJoin nsSTM bootstrapNode = do currentlyResponsible <- liftEither lookupResp liftIO . putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible) -- 2. then send a join to the currently responsible node - liftIO $ putStrLn "send a bootstrap Join" joinResult <- liftIO $ requestJoin currentlyResponsible nsSTM liftEither joinResult @@ -226,7 +225,7 @@ bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns) bootstrapResponse <- bracket (mkSendSocket srcAddr bootstrapHost bootstrapPort) close ( -- Initialise an empty cache only with the responses from a bootstrapping node - fmap Right . sendRequestTo (lookupMessage targetID ns Nothing) + fmap Right . sendRequestTo 5000 3 (lookupMessage targetID ns Nothing) ) `catch` (\e -> pure . Left $ "Error at bootstrap QueryId: " <> displayException (e :: IOException)) @@ -245,24 +244,26 @@ bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do Just (FORWARD resultset) -> foldr' (addCacheEntryPure now) cacheAcc resultset ) initCache resp - currentlyResponsible <- runExceptT $ queryIdLookupLoop bootstrapCache ns 50 $ getNid ns - pure currentlyResponsible + currentlyResponsible <- queryIdLookupLoop bootstrapCache ns 50 $ getNid ns + pure $ Right currentlyResponsible -- | join a node to the DHT using the global node cache -- node's position. -fediChordVserverJoin :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s)) +fediChordVserverJoin :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -- ^ the local 'NodeState' - -> m (LocalNodeStateSTM s) -- ^ the joined 'NodeState' after a + -> IO (Either String (LocalNodeStateSTM s)) -- ^ the joined 'NodeState' after a -- successful join, otherwise an error message fediChordVserverJoin nsSTM = do - ns <- liftIO $ readTVarIO nsSTM + ns <- readTVarIO nsSTM -- 1. get routed to the currently responsible node currentlyResponsible <- requestQueryID ns $ getNid ns - liftIO . putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible) + putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible) -- 2. then send a join to the currently responsible node - joinResult <- liftIO $ requestJoin currentlyResponsible nsSTM - liftEither joinResult + joinResult <- requestJoin currentlyResponsible nsSTM + case joinResult of + Left err -> pure . Left $ "Error joining on " <> err + Right joinedNS -> pure . Right $ joinedNS fediChordVserverLeave :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s)) => LocalNodeState s -> m () fediChordVserverLeave ns = do @@ -322,7 +323,7 @@ joinOnNewEntriesThread nsSTM = loop pure () -- otherwise try joining FORWARD _ -> do - joinResult <- runExceptT $ fediChordVserverJoin nsSTM + joinResult <- fediChordVserverJoin nsSTM either -- on join failure, sleep and retry -- TODO: make delay configurable @@ -503,26 +504,18 @@ stabiliseThread nsSTM = forever $ do -- try looking up additional neighbours if list too short forM_ [(length $ predecessors updatedNs)..(kNeighbours updatedNs)] (\_ -> do ns' <- readTVarIO nsSTM - nextEntry <- runExceptT . requestQueryID ns' $ pred . getNid $ lastDef (toRemoteNodeState ns') (predecessors ns') - either - (const $ pure ()) - (\entry -> atomically $ do - latestNs <- readTVar nsSTM - writeTVar nsSTM $ addPredecessors [entry] latestNs - ) - nextEntry + nextEntry <- requestQueryID ns' $ pred . getNid $ lastDef (toRemoteNodeState ns') (predecessors ns') + atomically $ do + latestNs <- readTVar nsSTM + writeTVar nsSTM $ addPredecessors [nextEntry] latestNs ) forM_ [(length $ successors updatedNs)..(kNeighbours updatedNs)] (\_ -> do ns' <- readTVarIO nsSTM - nextEntry <- runExceptT . requestQueryID ns' $ succ . getNid $ lastDef (toRemoteNodeState ns') (successors ns') - either - (const $ pure ()) - (\entry -> atomically $ do - latestNs <- readTVar nsSTM - writeTVar nsSTM $ addSuccessors [entry] latestNs - ) - nextEntry + nextEntry <- requestQueryID ns' $ succ . getNid $ lastDef (toRemoteNodeState ns') (successors ns') + atomically $ do + latestNs <- readTVar nsSTM + writeTVar nsSTM $ addSuccessors [nextEntry] latestNs ) newNs <- readTVarIO nsSTM @@ -645,7 +638,7 @@ requestMapPurge :: MVar RequestMap -> IO () requestMapPurge mapVar = forever $ do rMapState <- takeMVar mapVar now <- getPOSIXTime - putMVar mapVar $ Map.filter (\(RequestMapEntry _ _ ts) -> + putMVar mapVar $ Map.filter (\entry@(RequestMapEntry _ _ ts) -> now - ts < responsePurgeAge ) rMapState threadDelay $ round responsePurgeAge * 2 * 10^6 @@ -723,7 +716,7 @@ fediMessageHandler sendQ recvQ nsSTM = do instance DHT (RealNodeSTM s) where lookupKey nodeSTM keystring = getKeyResponsibility nodeSTM $ genKeyID keystring - forceLookupKey nodeSTM keystring = (putStrLn $ "forced responsibility lookup of #" <> keystring) >> (updateLookupCache nodeSTM $ genKeyID keystring) + forceLookupKey nodeSTM keystring = updateLookupCache nodeSTM $ genKeyID keystring -- potential better implementation: put all neighbours of all vservers and the vservers on a ringMap, look the key up and see whether it results in a LocalNodeState isResponsibleFor nodeSTM key = do node <- readTVarIO nodeSTM @@ -764,7 +757,7 @@ getKeyResponsibility nodeSTM lookupKey = do -- new entry. -- If no vserver is active in the DHT, 'Nothing' is returned. updateLookupCache :: RealNodeSTM s -> NodeID -> IO (Maybe (String, PortNumber)) -updateLookupCache nodeSTM keyToLookup = do +updateLookupCache nodeSTM lookupKey = do (node, lookupSource) <- atomically $ do node <- readTVar nodeSTM let firstVs = headMay (vservers node) @@ -774,25 +767,18 @@ updateLookupCache nodeSTM keyToLookup = do pure (node, lookupSource) maybe (do -- if no local node available, delete cache entry and return Nothing - atomically $ modifyTVar' (lookupCacheSTM node) $ Map.delete keyToLookup + atomically $ modifyTVar' (lookupCacheSTM node) $ Map.delete lookupKey pure Nothing ) (\n -> do -- start a lookup from the node, update the cache with the lookup result and return it - -- TODO: better retry management, because having no vserver joined yet should - -- be treated differently than other reasons for not getting a result. - newResponsible <- runExceptT $ requestQueryID n keyToLookup - either - (const $ pure Nothing) - (\result -> do - let newEntry = (getDomain result, getServicePort result) - now <- getPOSIXTime - -- atomic update against lost updates - atomically $ modifyTVar' (lookupCacheSTM node) $ - Map.insert keyToLookup (CacheEntry False newEntry now) - pure $ Just newEntry - ) - newResponsible + newResponsible <- requestQueryID n lookupKey + let newEntry = (getDomain newResponsible, getServicePort newResponsible) + now <- getPOSIXTime + -- atomic update against lost updates + atomically $ modifyTVar' (lookupCacheSTM node) $ + Map.insert lookupKey (CacheEntry False newEntry now) + pure $ Just newEntry ) lookupSource diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index 81cf552..a871343 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -10,30 +10,37 @@ module Hash2Pub.PostService where import Control.Concurrent import Control.Concurrent.Async +import Control.Concurrent.MVar import Control.Concurrent.STM -import Control.Exception (Exception (..), try) -import Control.Monad (foldM, forM, forM_, forever, void, - when) -import Control.Monad.IO.Class (liftIO) +import Control.Concurrent.STM.TChan +import Control.Concurrent.STM.TChan +import Control.Concurrent.STM.TQueue +import Control.Concurrent.STM.TVar +import Control.Exception (Exception (..), try) +import Control.Monad (foldM, forM, forM_, forever) +import Control.Monad.IO.Class (liftIO) +import Control.Monad.STM import Data.Bifunctor -import qualified Data.ByteString.Lazy.UTF8 as BSUL -import qualified Data.ByteString.UTF8 as BSU -import qualified Data.HashMap.Strict as HMap -import qualified Data.HashSet as HSet -import Data.Maybe (fromMaybe, isJust) -import Data.String (fromString) -import qualified Data.Text.Lazy as Txt -import Data.Text.Normalize (NormalizationMode (NFC), normalize) +import qualified Data.ByteString.Lazy.UTF8 as BSUL +import qualified Data.ByteString.UTF8 as BSU +import qualified Data.HashMap.Strict as HMap +import qualified Data.HashSet as HSet +import Data.Maybe (fromMaybe, isJust) +import Data.String (fromString) +import qualified Data.Text.Lazy as Txt +import Data.Text.Normalize (NormalizationMode (NFC), + normalize) import Data.Time.Clock.POSIX -import Data.Typeable (Typeable) -import qualified Network.HTTP.Client as HTTP -import qualified Network.HTTP.Types as HTTPT +import Data.Typeable (Typeable) +import qualified Network.HTTP.Client as HTTP +import qualified Network.HTTP.Types as HTTPT import System.Random -import Text.Read (readEither) +import Text.Read (readEither) -import qualified Network.Wai.Handler.Warp as Warp +import qualified Network.Wai.Handler.Warp as Warp import Servant import Servant.Client +import Servant.Server import Hash2Pub.FediChordTypes import Hash2Pub.RingMap @@ -143,7 +150,7 @@ placeholderPost = Txt.take 5120 . Txt.repeat $ 'O' -- size 5KiB -- ========= HTTP API and handlers ============= type PostServiceAPI = "relay" :> "inbox" :> Capture "hashtag" Txt.Text :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent - -- delivery endpoint at responsible relay for delivering posts of $tag for distribution + -- delivery endpoint of newly published posts of the relay's instance :<|> "relay" :> "subscribers" :> Capture "senderID" Integer :> ReqBody '[PlainText] Txt.Text :> PostNoContent '[PlainText] Txt.Text -- endpoint for delivering the subscriptions and outstanding queue :<|> "post" :> Capture "postid" Txt.Text :> Get '[PlainText] Txt.Text @@ -180,11 +187,11 @@ relayInbox serv tag posts = do -- skip checking whether the post actually contains the tag, just drop full post postIDs = head . Txt.splitOn "," <$> Txt.lines posts -- if tag is not in own responsibility, return a 410 Gone - responsible <- liftIO $ isResponsibleFor (baseDHT serv) (hashtagToId tag) + responsible <- liftIO $ isResponsibleFor (baseDHT serv) (genKeyID . Txt.unpack $ tag) if responsible then pure () else - throwError $ err410 { errBody = "Relay is not responsible for this tag"} + (throwError $ err410 { errBody = "Relay is not responsible for this tag"}) broadcastChan <- liftIO $ atomically $ getTagBroadcastChannel serv tag maybe -- if noone subscribed to the tag, nothing needs to be done @@ -214,7 +221,7 @@ subscriptionDelivery serv senderID subList = do -- not-handled tag occurs, this results in a single large transaction. -- Hopefully the performance isn't too bad. res <- liftIO . atomically $ (foldM (\_ tag' -> do - responsible <- isResponsibleForSTM (baseDHT serv) (hashtagToId tag') + responsible <- isResponsibleForSTM (baseDHT serv) (genKeyID . Txt.unpack $ tag') if responsible then processTag (subscribers serv) tag' else throwSTM $ UnhandledTagException (Txt.unpack tag' <> " not handled by this relay") @@ -288,7 +295,7 @@ tagDelivery :: PostService d -> Txt.Text -> Txt.Text -> Handler Txt.Text tagDelivery serv hashtag posts = do let postIDs = Txt.lines posts subscriptions <- liftIO . readTVarIO . ownSubscriptions $ serv - if isJust (HMap.lookup (hashtagToId hashtag) subscriptions) + if isJust (HMap.lookup (genKeyID . Txt.unpack $ hashtag) subscriptions) then -- TODO: increase a counter/ statistics for received posts of this tag liftIO $ forM_ postIDs $ atomically . writeTQueue (postFetchQueue serv) else -- silently drop posts from unsubscribed tags @@ -297,7 +304,7 @@ tagDelivery serv hashtag posts = do tagSubscribe :: DHT d => PostService d -> Txt.Text -> Maybe Txt.Text -> Handler Integer tagSubscribe serv hashtag origin = do - responsible <- liftIO $ isResponsibleFor (baseDHT serv) (hashtagToId hashtag) + responsible <- liftIO $ isResponsibleFor (baseDHT serv) (genKeyID . Txt.unpack $ hashtag) if not responsible -- GONE if not responsible then throwError err410 { errBody = "not responsible for this tag" } @@ -316,7 +323,7 @@ tagSubscribe serv hashtag origin = do tagUnsubscribe :: DHT d => PostService d -> Txt.Text -> Maybe Txt.Text -> Handler Txt.Text tagUnsubscribe serv hashtag origin = do - responsible <- liftIO $ isResponsibleFor (baseDHT serv) (hashtagToId hashtag) + responsible <- liftIO $ isResponsibleFor (baseDHT serv) (genKeyID . Txt.unpack $ hashtag) if not responsible -- GONE if not responsible then throwError err410 { errBody = "not responsible for this tag" } @@ -348,7 +355,7 @@ clientDeliverSubscriptions :: PostService d -> (String, Int) -- ^ hostname and port of instance to deliver to -> IO (Either String ()) -- Either signals success or failure clientDeliverSubscriptions serv fromNode fromKey toKey (toHost, toPort) = do - -- collect tag interval + -- collect tag intearval intervalTags <- takeRMapSuccessorsFromTo fromKey toKey <$> readTVarIO (subscribers serv) -- returns a [ (TagSubscribersSTM, TChan PostID, Hashtag) ] -- extract subscribers and posts @@ -378,7 +385,7 @@ clientDeliverSubscriptions serv fromNode fromKey toKey (toHost, toPort) = do Right _ -> do atomically $ modifyTVar' (subscribers serv) $ \tagMap -> - foldr deleteRMapEntry tagMap ((\(_, _, t) -> hashtagToId t) <$> intervalTags) + foldr deleteRMapEntry tagMap ((\(_, _, t) -> genKeyID . Txt.unpack $ t) <$> intervalTags) pure . Right $ () where channelGetAll :: TChan a -> STM [a] @@ -389,8 +396,7 @@ clientDeliverSubscriptions serv fromNode fromKey toKey (toHost, toPort) = do maybe (pure acc) (\x -> channelGetAll' chan (x:acc)) haveRead --- | Subscribe the client to the given hashtag. On success it returns the given lease time, --- but also records the subscription in its own data structure. +-- | Subscribe the client to the given hashtag. On success it returns the given lease time. clientSubscribeTo :: DHT d => PostService d -> Hashtag -> IO (Either String Integer) clientSubscribeTo serv tag = do lookupRes <- lookupKey (baseDHT serv) (Txt.unpack tag) @@ -407,9 +413,7 @@ clientSubscribeTo serv tag = do newRes <- forceLookupKey (baseDHT serv) (Txt.unpack tag) doSubscribe newRes False Left err -> pure . Left . show $ err - Right lease -> do - atomically . modifyTVar' (ownSubscriptions serv) $ HMap.insert (hashtagToId tag) (fromInteger lease) - pure . Right $ lease + Right lease -> pure . Right $ lease ) lookupResponse @@ -431,9 +435,7 @@ clientUnsubscribeFrom serv tag = do newRes <- forceLookupKey (baseDHT serv) (Txt.unpack tag) doUnsubscribe newRes False Left err -> pure . Left . show $ err - Right _ -> do - atomically . modifyTVar' (ownSubscriptions serv) $ HMap.delete (hashtagToId tag) - pure . Right $ () + Right _ -> pure . Right $ () ) lookupResponse @@ -443,11 +445,11 @@ clientUnsubscribeFrom serv tag = do -- the post to the responsible relays. -- As the initial publishing isn't done by a specific relay (but *to* a specific relay -- instead), the function does *not* take a PostService as argument. -clientPublishPost :: HTTP.Manager -- ^ for better performance, a shared HTTP manager has to be provided - -> String -- ^ hostname - -> Int -- ^ port - -> PostContent -- ^ post content - -> IO (Either String ()) -- ^ error or success +clientPublishPost :: HTTP.Manager -- for better performance, a shared HTTP manager has to be provided + -> String -- hostname + -> Int -- port + -> PostContent -- post content + -> IO (Either String ()) -- error or success clientPublishPost httpman hostname port postC = do resp <- runClientM (postInboxClient postC) (mkClientEnv httpman (BaseUrl Http hostname port "")) pure . bimap show (const ()) $ resp @@ -490,7 +492,7 @@ setupSubscriberChannel tagMapSTM tag subscriber leaseTime = do broadcastChan <- newBroadcastTChan tagOutChan <- dupTChan broadcastChan newSubMapSTM <- newTVar $ HMap.singleton subscriber (tagOutChan, leaseTime) - writeTVar tagMapSTM $ addRMapEntry (hashtagToId tag) (newSubMapSTM, broadcastChan, tag) tagMap + writeTVar tagMapSTM $ addRMapEntry (genKeyID . Txt.unpack $ tag) (newSubMapSTM, broadcastChan, tag) tagMap pure tagOutChan Just (foundSubMapSTM, broadcastChan, _) -> do -- otherwise use the existing subscriber map @@ -518,7 +520,7 @@ deleteSubscription tagMapSTM tag subscriber = do -- if there are no subscriptions for the tag anymore, remove its -- data sttructure altogether if HMap.null newSubMap - then writeTVar tagMapSTM $ deleteRMapEntry (hashtagToId tag) tagMap + then writeTVar tagMapSTM $ deleteRMapEntry (genKeyID . Txt.unpack $ tag) tagMap -- otherwise just remove the subscription of that node else writeTVar foundSubMapSTM newSubMap @@ -539,18 +541,13 @@ getTagBroadcastChannel serv tag = do -- | look up the subscription data of a tag lookupTagSubscriptions :: Hashtag -> RingMap NodeID a -> Maybe a -lookupTagSubscriptions tag = rMapLookup (hashtagToId tag) +lookupTagSubscriptions tag = rMapLookup (genKeyID . Txt.unpack $ tag) -- normalise the unicode representation of a string to NFC normaliseTag :: Txt.Text -> Txt.Text normaliseTag = Txt.fromStrict . normalize NFC . Txt.toStrict - --- | convert a hashtag to its representation on the DHT -hashtagToId :: Hashtag -> NodeID -hashtagToId = genKeyID . Txt.unpack - -- | define how to convert all showable types to PlainText -- No idea what I'm doing with these overlappable instances though ¯\_(ツ)_/¯ -- TODO: figure out how this overlapping stuff actually works https://downloads.haskell.org/~ghc/latest/docs/html/users_guide/glasgow_exts.html#instance-overlap @@ -569,13 +566,12 @@ processIncomingPosts serv = forever $ do -- blocks until available -- TODO: process multiple in parallel (tag, pID, pContent) <- atomically . readTQueue $ relayInQueue serv - let pIdUri = "http://" <> (Txt.pack . confServiceHost . serviceConf $ serv) <> ":" <> (fromString . show . confServicePort . serviceConf $ serv) <> "/post/" <> pID lookupRes <- lookupKey (baseDHT serv) (Txt.unpack tag) case lookupRes of -- no vserver active => wait and retry Nothing -> threadDelay $ 10 * 10^6 Just (responsibleHost, responsiblePort) -> do - resp <- runClientM (relayInboxClient tag $ pIdUri <> "," <> pContent) (mkClientEnv (httpMan serv) (BaseUrl Http responsibleHost (fromIntegral responsiblePort) "")) + resp <- runClientM (relayInboxClient tag $ pID <> "," <> pContent) (mkClientEnv (httpMan serv) (BaseUrl Http responsibleHost (fromIntegral responsiblePort) "")) case resp of Left err -> do putStrLn $ "Error: " <> show err @@ -584,14 +580,7 @@ processIncomingPosts serv = forever $ do -- TODO: keep track of maximum retries _ <- forceLookupKey (baseDHT serv) (Txt.unpack tag) atomically . writeTQueue (relayInQueue serv) $ (tag, pID, pContent) - Right _ -> do - -- TODO: stats - -- idea for the experiment: each post publication makes the initial posting instance subscribe to all contained tags - now <- getPOSIXTime - subscriptionStatus <- HMap.lookup (hashtagToId tag) <$> readTVarIO (ownSubscriptions serv) - -- if not yet subscribed or subscription expires within 2 minutes, (re)subscribe to tag - when (maybe False (\subLease -> now - subLease < 120) subscriptionStatus) $ - void $ clientSubscribeTo serv tag + Right yay -> putStrLn $ "Yay! " <> show yay -- | process the pending fetch jobs of delivered post IDs: Delivered posts are tried to be fetched from their URI-ID