diff --git a/Hash2Pub.cabal b/Hash2Pub.cabal index 5ffff0d..2d195e3 100644 --- a/Hash2Pub.cabal +++ b/Hash2Pub.cabal @@ -46,8 +46,8 @@ category: Network extra-source-files: CHANGELOG.md common deps - build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms, http-client, http-types - ghc-options: -Wall + build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms, http-client, http-types, unbounded-delays + ghc-options: -Wall -Wpartial-fields @@ -93,6 +93,20 @@ executable Hash2Pub ghc-options: -threaded +executable Experiment + -- experiment runner + import: deps + + build-depends: Hash2Pub + + main-is: Experiment.hs + + hs-source-dirs: app + + default-language: Haskell2010 + + ghc-options: -threaded + test-suite Hash2Pub-test -- Test dependencies. diff --git a/app/Experiment.hs b/app/Experiment.hs new file mode 100644 index 0000000..deb4cae --- /dev/null +++ b/app/Experiment.hs @@ -0,0 +1,44 @@ +{-# LANGUAGE OverloadedStrings #-} + +module Main where + +import Control.Concurrent +import Control.Monad (forM_) +import Control.Monad.IO.Class +import Control.Monad.State.Class +import Control.Monad.State.Strict (evalStateT) +import qualified Network.HTTP.Client as HTTP +import System.Random + +import Hash2Pub.PostService (Hashtag, clientPublishPost) + +-- placeholder post data definition + +tagsToPostTo = [ "JustSomeTag", "WantAnotherTag234", "HereWeGoAgain", "Oyä", "通信端末" ] + +knownRelays :: [(String, Int)] +knownRelays = + [ ("animalliberation.social", 3342) + , ("hostux.social", 3343) + , ("social.diskseven.com", 3344) + , ("social.imirhil.fr", 3345) + ] + +main :: IO () +main = do + -- initialise HTTP manager + httpMan <- HTTP.newManager HTTP.defaultManagerSettings + -- initialise RNG + let initRGen = mkStdGen 12 + -- cycle through tags and post to a random instance + evalStateT (forM_ (cycle tagsToPostTo) $ publishPostRandom httpMan) initRGen + -- wait for a specified time + +publishPostRandom :: (RandomGen g, MonadIO m, MonadState g m) => HTTP.Manager -> Hashtag -> m () +publishPostRandom httpman tag = do + index <- state $ randomR (0, length knownRelays - 1) + let (pubHost, pubPort) = knownRelays !! index + _ <- liftIO . forkIO $ do + postResult <- liftIO $ clientPublishPost httpman pubHost pubPort ("foobar #" <> tag) + either putStrLn (const $ pure ()) postResult + liftIO $ threadDelay 500 diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index bd7953f..fa5a54a 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -49,9 +49,11 @@ import Control.Concurrent.STM.TQueue import Control.Concurrent.STM.TVar import Control.Exception import Control.Monad (foldM, forM, forM_, void, when) +import Control.Monad.Except (MonadError (..), runExceptT) +import Control.Monad.IO.Class (MonadIO (..)) import qualified Data.ByteString as BS import Data.Either (rights) -import Data.Foldable (foldl', foldr') +import Data.Foldable (foldl', foldr', foldrM) import Data.Functor.Identity import Data.IP (IPv6, fromHostAddress6, toHostAddress6) @@ -106,9 +108,6 @@ queryLocalCache ownState nCache lBestNodes targetID -- the closest succeeding node (like with the p initiated parallel queries | otherwise = FORWARD $ closestSuccessor `Set.union` closestCachePredecessors (lBestNodes-1) targetID nCache where - ownID = getNid ownState - preds = predecessors ownState - closestSuccessor :: Set.Set RemoteCacheEntry closestSuccessor = maybe Set.empty (Set.singleton . toRemoteCacheEntry) $ cacheLookupSucc targetID nCache @@ -259,7 +258,6 @@ handleIncomingRequest :: Service s (RealNodeSTM s) -> SockAddr -- ^ source address of the request -> IO () handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do - putStrLn $ "handling incoming request: " <> show msgSet ns <- readTVarIO nsSTM -- add nodestate to cache now <- getPOSIXTime @@ -314,7 +312,6 @@ handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do -- | execute a key ID lookup on local cache and respond with the result respondQueryID :: LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString) respondQueryID nsSTM msgSet = do - putStrLn "responding to a QueryID request" -- this message cannot be split reasonably, so just -- consider the first payload let @@ -435,7 +432,9 @@ respondJoin nsSTM msgSet = do let aRequestPart = Set.elemAt 0 msgSet senderNS = sender aRequestPart - responsibilityLookup = queryLocalCache nsSnap cache 1 (getNid senderNS) + -- if not joined yet, attract responsibility for + -- all keys to make bootstrapping possible + responsibilityLookup = if isJoined nsSnap then queryLocalCache nsSnap cache 1 (getNid senderNS) else FOUND (toRemoteNodeState nsSnap) thisNodeResponsible (FOUND _) = True thisNodeResponsible (FORWARD _) = False -- check whether the joining node falls into our responsibility @@ -492,7 +491,7 @@ requestJoin toJoinOn ownStateSTM = do srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ownState) bracket (mkSendSocket srcAddr (getDomain toJoinOn) (getDhtPort toJoinOn)) close (\sock -> do -- extract own state for getting request information - responses <- sendRequestTo 5000 3 (\rid -> Request rid (toRemoteNodeState ownState) 1 True Join (Just JoinRequestPayload)) sock + responses <- sendRequestTo (\rid -> Request rid (toRemoteNodeState ownState) 1 True Join (Just JoinRequestPayload)) sock (cacheInsertQ, joinedState) <- atomically $ do stateSnap <- readTVar ownStateSTM let @@ -517,28 +516,28 @@ requestJoin toJoinOn ownStateSTM = do ([], Set.empty, Set.empty) responses -- sort, slice and set the accumulated successors and predecessors - newState = setSuccessors (Set.elems succAccSet) . setPredecessors (Set.elems predAccSet) $ stateSnap + -- the contacted node itself is a successor as well and, with few + -- nodes, can be a predecessor as well + newState = setSuccessors (toRemoteNodeState toJoinOn:Set.elems succAccSet) . setPredecessors (toRemoteNodeState toJoinOn:Set.elems predAccSet) $ stateSnap writeTVar ownStateSTM newState pure (cacheInsertQ, newState) -- execute the cache insertions mapM_ (\f -> f joinedState) cacheInsertQ if responses == Set.empty then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn) - else if null (predecessors joinedState) && null (successors joinedState) - then pure $ Left "join error: no predecessors or successors" - -- successful join - else do - -- wait for migration data to be completely received - waitForMigrationFrom (nodeService prn) (getNid ownState) - pure $ Right ownStateSTM + else do + -- wait for migration data to be completely received + waitForMigrationFrom (nodeService prn) (getNid toJoinOn) + pure $ Right ownStateSTM ) `catch` (\e -> pure . Left $ displayException (e :: IOException)) -- | Send a 'QueryID' 'Request' for getting the node that handles a certain key ID. -requestQueryID :: LocalNodeState s -- ^ NodeState of the querying node +requestQueryID :: (MonadIO m, MonadError String m) + => LocalNodeState s -- ^ NodeState of the querying node -> NodeID -- ^ target key ID to look up - -> IO RemoteNodeState -- ^ the node responsible for handling that key + -> m RemoteNodeState -- ^ the node responsible for handling that key -- 1. do a local lookup for the l closest nodes -- 2. create l sockets -- 3. send a message async concurrently to all l nodes @@ -546,23 +545,23 @@ requestQueryID :: LocalNodeState s -- ^ NodeState of the querying node -- 5. repeat until FOUND (problem: new entries not necessarily already in cache, explicitly compare with closer results) -- TODO: deal with lookup failures requestQueryID ns targetID = do - firstCacheSnapshot <- readTVarIO . nodeCacheSTM $ ns + firstCacheSnapshot <- liftIO . readTVarIO . nodeCacheSTM $ ns -- TODO: make maxAttempts configurable queryIdLookupLoop firstCacheSnapshot ns 50 targetID -- | like 'requestQueryID, but allows passing of a custom cache, e.g. for joining -queryIdLookupLoop :: NodeCache -> LocalNodeState s -> Int -> NodeID -> IO RemoteNodeState +queryIdLookupLoop :: (MonadIO m, MonadError String m) => NodeCache -> LocalNodeState s -> Int -> NodeID -> m RemoteNodeState -- return node itself as default fallback value against infinite recursion. -- TODO: consider using an Either instead of a default value -queryIdLookupLoop _ ns 0 _ = pure $ toRemoteNodeState ns +queryIdLookupLoop _ ns 0 _ = throwError "exhausted maximum lookup attempts" queryIdLookupLoop cacheSnapshot ns maxAttempts targetID = do let localResult = queryLocalCache ns cacheSnapshot (lNumBestNodes ns) targetID -- FOUND can only be returned if targetID is owned by local node case localResult of FOUND thisNode -> pure thisNode FORWARD nodeSet -> do - responseEntries <- sendQueryIdMessages targetID ns Nothing (remoteNode <$> Set.elems nodeSet) - now <- getPOSIXTime + responseEntries <- liftIO $ sendQueryIdMessages targetID ns Nothing (remoteNode <$> Set.elems nodeSet) + now <- liftIO getPOSIXTime -- check for a FOUND and return it case responseEntries of FOUND foundNode -> pure foundNode @@ -588,7 +587,7 @@ sendQueryIdMessages targetID ns lParam targets = do srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns) -- ToDo: make attempts and timeout configurable queryThreads <- mapM (\resultNode -> async $ bracket (mkSendSocket srcAddr (getDomain resultNode) (getDhtPort resultNode)) close ( - sendRequestTo 5000 3 (lookupMessage targetID ns Nothing) + sendRequestTo (lookupMessage targetID ns Nothing) )) targets -- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613 -- ToDo: exception handling, maybe log them @@ -596,8 +595,10 @@ sendQueryIdMessages targetID ns lParam targets = do -- insert new cache entries both into global cache as well as return accumulated QueryResponses for further processing now <- getPOSIXTime -- collect cache entries from all responses - foldM (\acc resp -> do - let entrySet = case queryResult <$> payload resp of + foldrM (\resp acc -> do + let + responseResult = queryResult <$> payload resp + entrySet = case responseResult of Just (FOUND result1) -> Set.singleton (RemoteCacheEntry result1 now) Just (FORWARD resultset) -> resultset _ -> Set.empty @@ -607,10 +608,15 @@ sendQueryIdMessages targetID ns lParam targets = do -- return accumulated QueryResult pure $ case acc of -- once a FOUND as been encountered, return this as a result - isFound@FOUND{} -> isFound - FORWARD accSet -> FORWARD $ entrySet `Set.union` accSet + FOUND{} -> acc + FORWARD accSet + | maybe False isFound responseResult -> fromJust responseResult + | otherwise -> FORWARD $ entrySet `Set.union` accSet ) (FORWARD Set.empty) responses + where + isFound FOUND{} = True + isFound _ = False -- | Create a QueryID message to be supplied to 'sendRequestTo' lookupMessage :: Integral i @@ -630,7 +636,7 @@ requestStabilise :: LocalNodeState s -- ^ sending node -> IO (Either String ([RemoteNodeState], [RemoteNodeState])) -- ^ (predecessors, successors) of responding node requestStabilise ns neighbour = do srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns) - responses <- bracket (mkSendSocket srcAddr (getDomain neighbour) (getDhtPort neighbour)) close (fmap Right . sendRequestTo 5000 3 (\rid -> + responses <- bracket (mkSendSocket srcAddr (getDomain neighbour) (getDhtPort neighbour)) close (fmap Right . sendRequestTo (\rid -> Request { requestID = rid , sender = toRemoteNodeState ns @@ -652,8 +658,7 @@ requestStabilise ns neighbour = do ) ([],[]) respSet -- update successfully responded neighbour in cache - now <- getPOSIXTime - maybe (pure ()) (\p -> queueAddEntries (Identity $ RemoteCacheEntry (sender p) now) ns) $ headMay (Set.elems respSet) + maybe (pure ()) (\p -> queueUpdateVerifieds (Identity $ senderID p) ns) $ headMay (Set.elems respSet) pure $ if null responsePreds && null responseSuccs then Left "no neighbours returned" else Right (responsePreds, responseSuccs) @@ -674,7 +679,7 @@ requestLeave ns doMigration target = do , leavePredecessors = predecessors ns , leaveDoMigration = doMigration } - responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close (fmap Right . sendRequestTo 5000 3 (\rid -> + responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close (fmap Right . sendRequestTo (\rid -> Request { requestID = rid , sender = toRemoteNodeState ns @@ -699,7 +704,7 @@ requestPing ns target = do srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns) responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close (\sock -> do - resp <- sendRequestTo 5000 3 (\rid -> + resp <- sendRequestTo (\rid -> Request { requestID = rid , sender = toRemoteNodeState ns @@ -735,25 +740,31 @@ requestPing ns target = do ) responses +-- | 'sendRequestToWithParams' with default timeout and retries already specified. +-- Generic function for sending a request over a connected socket and collecting the response. +-- Serialises the message and tries to deliver its parts for a number of attempts within a default timeout. +sendRequestTo :: (Integer -> FediChordMessage) -- ^ the message to be sent, still needing a requestID + -> Socket -- ^ connected socket to use for sending + -> IO (Set.Set FediChordMessage) -- ^ responses +sendRequestTo = sendRequestToWithParams 5000 3 -- | Generic function for sending a request over a connected socket and collecting the response. -- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout. -sendRequestTo :: Int -- ^ timeout in seconds - -> Int -- ^ number of retries - -> (Integer -> FediChordMessage) -- ^ the message to be sent, still needing a requestID - -> Socket -- ^ connected socket to use for sending - -> IO (Set.Set FediChordMessage) -- ^ responses -sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do +sendRequestToWithParams :: Int -- ^ timeout in milliseconds + -> Int -- ^ number of retries + -> (Integer -> FediChordMessage) -- ^ the message to be sent, still needing a requestID + -> Socket -- ^ connected socket to use for sending + -> IO (Set.Set FediChordMessage) -- ^ responses +sendRequestToWithParams timeoutMillis numAttempts msgIncomplete sock = do -- give the message a random request ID randomID <- randomRIO (0, 2^32-1) let msgComplete = msgIncomplete randomID requests = serialiseMessage sendMessageSize msgComplete - putStrLn $ "sending request message " <> show msgComplete -- create a queue for passing received response messages back, even after a timeout responseQ <- newTBQueueIO $ 2*maximumParts -- keep room for duplicate packets -- start sendAndAck with timeout - attempts numAttempts . timeout timeoutMillis $ sendAndAck responseQ sock requests + attempts numAttempts . timeout (timeoutMillis*1000) $ sendAndAck responseQ sock requests -- after timeout, check received responses, delete them from unacked message set/ map and rerun senAndAck with that if necessary. recvdParts <- atomically $ flushTBQueue responseQ pure $ Set.fromList recvdParts @@ -762,19 +773,20 @@ sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do -> Socket -- ^ the socket used for sending and receiving for this particular remote node -> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts -> IO () - sendAndAck responseQueue sock remainingSends = do - sendMany sock $ Map.elems remainingSends + sendAndAck responseQueue sock' remainingSends = do + sendMany sock' $ Map.elems remainingSends -- if all requests have been acked/ responded to, return prematurely - recvLoop responseQueue remainingSends Set.empty Nothing - recvLoop :: TBQueue FediChordMessage -- ^ the queue for putting in the received responses + recvLoop sock' responseQueue remainingSends Set.empty Nothing + recvLoop :: Socket + -> TBQueue FediChordMessage -- ^ the queue for putting in the received responses -> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts -> Set.Set Integer -- ^ already received response part numbers - -> Maybe Integer -- ^ total number of response parts if already known + -> Maybe Integer -- ^ total number of response parts if already known -> IO () - recvLoop responseQueue remainingSends' receivedPartNums totalParts = do + recvLoop sock' responseQueue remainingSends' receivedPartNums totalParts = do -- 65535 is maximum length of UDP packets, as long as -- no IPv6 jumbograms are used - response <- deserialiseMessage <$> recv sock 65535 + response <- deserialiseMessage <$> recv sock' 65535 case response of Right msg@Response{} -> do atomically $ writeTBQueue responseQueue msg @@ -782,11 +794,12 @@ sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do newTotalParts = if isFinalPart msg then Just (part msg) else totalParts newRemaining = Map.delete (part msg) remainingSends' newReceivedParts = Set.insert (part msg) receivedPartNums - if Map.null newRemaining && maybe False (\p -> Set.size receivedPartNums == fromIntegral p) newTotalParts + if Map.null newRemaining && maybe False (\p -> Set.size newReceivedParts == fromIntegral p) newTotalParts then pure () - else recvLoop responseQueue newRemaining receivedPartNums newTotalParts + else recvLoop sock' responseQueue newRemaining newReceivedParts newTotalParts -- drop errors and invalid messages - Left _ -> recvLoop responseQueue remainingSends' receivedPartNums totalParts + Right Request{} -> pure () -- expecting a response, not a request + Left _ -> recvLoop sock' responseQueue remainingSends' receivedPartNums totalParts -- | enqueue a list of RemoteCacheEntries to be added to the global NodeCache @@ -812,6 +825,18 @@ queueDeleteEntry :: NodeID -> IO () queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete + +-- | enqueue the timestamp update and verification marking of an entry in the +-- global 'NodeCache'. +queueUpdateVerifieds :: Foldable c + => c NodeID + -> LocalNodeState s + -> IO () +queueUpdateVerifieds nIds ns = do + now <- getPOSIXTime + forM_ nIds $ \nid' -> atomically $ writeTQueue (cacheWriteQueue ns) $ + markCacheEntryAsVerified (Just now) nid' + -- | retry an IO action at most *i* times until it delivers a result attempts :: Int -- ^ number of retries *i* -> IO (Maybe a) -- ^ action to retry diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index 15563de..45d0bf9 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -166,6 +166,7 @@ fediChordBootstrapJoin nsSTM bootstrapNode = do currentlyResponsible <- liftEither lookupResp liftIO . putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible) -- 2. then send a join to the currently responsible node + liftIO $ putStrLn "send a bootstrap Join" joinResult <- liftIO $ requestJoin currentlyResponsible nsSTM liftEither joinResult @@ -225,7 +226,7 @@ bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns) bootstrapResponse <- bracket (mkSendSocket srcAddr bootstrapHost bootstrapPort) close ( -- Initialise an empty cache only with the responses from a bootstrapping node - fmap Right . sendRequestTo 5000 3 (lookupMessage targetID ns Nothing) + fmap Right . sendRequestTo (lookupMessage targetID ns Nothing) ) `catch` (\e -> pure . Left $ "Error at bootstrap QueryId: " <> displayException (e :: IOException)) @@ -244,26 +245,24 @@ bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do Just (FORWARD resultset) -> foldr' (addCacheEntryPure now) cacheAcc resultset ) initCache resp - currentlyResponsible <- queryIdLookupLoop bootstrapCache ns 50 $ getNid ns - pure $ Right currentlyResponsible + currentlyResponsible <- runExceptT $ queryIdLookupLoop bootstrapCache ns 50 $ getNid ns + pure currentlyResponsible -- | join a node to the DHT using the global node cache -- node's position. -fediChordVserverJoin :: Service s (RealNodeSTM s) +fediChordVserverJoin :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s)) => LocalNodeStateSTM s -- ^ the local 'NodeState' - -> IO (Either String (LocalNodeStateSTM s)) -- ^ the joined 'NodeState' after a + -> m (LocalNodeStateSTM s) -- ^ the joined 'NodeState' after a -- successful join, otherwise an error message fediChordVserverJoin nsSTM = do - ns <- readTVarIO nsSTM + ns <- liftIO $ readTVarIO nsSTM -- 1. get routed to the currently responsible node currentlyResponsible <- requestQueryID ns $ getNid ns - putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible) + liftIO . putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible) -- 2. then send a join to the currently responsible node - joinResult <- requestJoin currentlyResponsible nsSTM - case joinResult of - Left err -> pure . Left $ "Error joining on " <> err - Right joinedNS -> pure . Right $ joinedNS + joinResult <- liftIO $ requestJoin currentlyResponsible nsSTM + liftEither joinResult fediChordVserverLeave :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s)) => LocalNodeState s -> m () fediChordVserverLeave ns = do @@ -323,7 +322,7 @@ joinOnNewEntriesThread nsSTM = loop pure () -- otherwise try joining FORWARD _ -> do - joinResult <- fediChordVserverJoin nsSTM + joinResult <- runExceptT $ fediChordVserverJoin nsSTM either -- on join failure, sleep and retry -- TODO: make delay configurable @@ -504,18 +503,26 @@ stabiliseThread nsSTM = forever $ do -- try looking up additional neighbours if list too short forM_ [(length $ predecessors updatedNs)..(kNeighbours updatedNs)] (\_ -> do ns' <- readTVarIO nsSTM - nextEntry <- requestQueryID ns' $ pred . getNid $ lastDef (toRemoteNodeState ns') (predecessors ns') - atomically $ do - latestNs <- readTVar nsSTM - writeTVar nsSTM $ addPredecessors [nextEntry] latestNs + nextEntry <- runExceptT . requestQueryID ns' $ pred . getNid $ lastDef (toRemoteNodeState ns') (predecessors ns') + either + (const $ pure ()) + (\entry -> atomically $ do + latestNs <- readTVar nsSTM + writeTVar nsSTM $ addPredecessors [entry] latestNs + ) + nextEntry ) forM_ [(length $ successors updatedNs)..(kNeighbours updatedNs)] (\_ -> do ns' <- readTVarIO nsSTM - nextEntry <- requestQueryID ns' $ succ . getNid $ lastDef (toRemoteNodeState ns') (successors ns') - atomically $ do - latestNs <- readTVar nsSTM - writeTVar nsSTM $ addSuccessors [nextEntry] latestNs + nextEntry <- runExceptT . requestQueryID ns' $ succ . getNid $ lastDef (toRemoteNodeState ns') (successors ns') + either + (const $ pure ()) + (\entry -> atomically $ do + latestNs <- readTVar nsSTM + writeTVar nsSTM $ addSuccessors [entry] latestNs + ) + nextEntry ) newNs <- readTVarIO nsSTM @@ -638,7 +645,7 @@ requestMapPurge :: MVar RequestMap -> IO () requestMapPurge mapVar = forever $ do rMapState <- takeMVar mapVar now <- getPOSIXTime - putMVar mapVar $ Map.filter (\entry@(RequestMapEntry _ _ ts) -> + putMVar mapVar $ Map.filter (\(RequestMapEntry _ _ ts) -> now - ts < responsePurgeAge ) rMapState threadDelay $ round responsePurgeAge * 2 * 10^6 @@ -716,7 +723,7 @@ fediMessageHandler sendQ recvQ nsSTM = do instance DHT (RealNodeSTM s) where lookupKey nodeSTM keystring = getKeyResponsibility nodeSTM $ genKeyID keystring - forceLookupKey nodeSTM keystring = updateLookupCache nodeSTM $ genKeyID keystring + forceLookupKey nodeSTM keystring = (putStrLn $ "forced responsibility lookup of #" <> keystring) >> (updateLookupCache nodeSTM $ genKeyID keystring) -- potential better implementation: put all neighbours of all vservers and the vservers on a ringMap, look the key up and see whether it results in a LocalNodeState isResponsibleFor nodeSTM key = do node <- readTVarIO nodeSTM @@ -757,7 +764,7 @@ getKeyResponsibility nodeSTM lookupKey = do -- new entry. -- If no vserver is active in the DHT, 'Nothing' is returned. updateLookupCache :: RealNodeSTM s -> NodeID -> IO (Maybe (String, PortNumber)) -updateLookupCache nodeSTM lookupKey = do +updateLookupCache nodeSTM keyToLookup = do (node, lookupSource) <- atomically $ do node <- readTVar nodeSTM let firstVs = headMay (vservers node) @@ -767,18 +774,25 @@ updateLookupCache nodeSTM lookupKey = do pure (node, lookupSource) maybe (do -- if no local node available, delete cache entry and return Nothing - atomically $ modifyTVar' (lookupCacheSTM node) $ Map.delete lookupKey + atomically $ modifyTVar' (lookupCacheSTM node) $ Map.delete keyToLookup pure Nothing ) (\n -> do -- start a lookup from the node, update the cache with the lookup result and return it - newResponsible <- requestQueryID n lookupKey - let newEntry = (getDomain newResponsible, getServicePort newResponsible) - now <- getPOSIXTime - -- atomic update against lost updates - atomically $ modifyTVar' (lookupCacheSTM node) $ - Map.insert lookupKey (CacheEntry False newEntry now) - pure $ Just newEntry + -- TODO: better retry management, because having no vserver joined yet should + -- be treated differently than other reasons for not getting a result. + newResponsible <- runExceptT $ requestQueryID n keyToLookup + either + (const $ pure Nothing) + (\result -> do + let newEntry = (getDomain result, getServicePort result) + now <- getPOSIXTime + -- atomic update against lost updates + atomically $ modifyTVar' (lookupCacheSTM node) $ + Map.insert keyToLookup (CacheEntry False newEntry now) + pure $ Just newEntry + ) + newResponsible ) lookupSource diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index a871343..81cf552 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -10,37 +10,30 @@ module Hash2Pub.PostService where import Control.Concurrent import Control.Concurrent.Async -import Control.Concurrent.MVar import Control.Concurrent.STM -import Control.Concurrent.STM.TChan -import Control.Concurrent.STM.TChan -import Control.Concurrent.STM.TQueue -import Control.Concurrent.STM.TVar -import Control.Exception (Exception (..), try) -import Control.Monad (foldM, forM, forM_, forever) -import Control.Monad.IO.Class (liftIO) -import Control.Monad.STM +import Control.Exception (Exception (..), try) +import Control.Monad (foldM, forM, forM_, forever, void, + when) +import Control.Monad.IO.Class (liftIO) import Data.Bifunctor -import qualified Data.ByteString.Lazy.UTF8 as BSUL -import qualified Data.ByteString.UTF8 as BSU -import qualified Data.HashMap.Strict as HMap -import qualified Data.HashSet as HSet -import Data.Maybe (fromMaybe, isJust) -import Data.String (fromString) -import qualified Data.Text.Lazy as Txt -import Data.Text.Normalize (NormalizationMode (NFC), - normalize) +import qualified Data.ByteString.Lazy.UTF8 as BSUL +import qualified Data.ByteString.UTF8 as BSU +import qualified Data.HashMap.Strict as HMap +import qualified Data.HashSet as HSet +import Data.Maybe (fromMaybe, isJust) +import Data.String (fromString) +import qualified Data.Text.Lazy as Txt +import Data.Text.Normalize (NormalizationMode (NFC), normalize) import Data.Time.Clock.POSIX -import Data.Typeable (Typeable) -import qualified Network.HTTP.Client as HTTP -import qualified Network.HTTP.Types as HTTPT +import Data.Typeable (Typeable) +import qualified Network.HTTP.Client as HTTP +import qualified Network.HTTP.Types as HTTPT import System.Random -import Text.Read (readEither) +import Text.Read (readEither) -import qualified Network.Wai.Handler.Warp as Warp +import qualified Network.Wai.Handler.Warp as Warp import Servant import Servant.Client -import Servant.Server import Hash2Pub.FediChordTypes import Hash2Pub.RingMap @@ -150,7 +143,7 @@ placeholderPost = Txt.take 5120 . Txt.repeat $ 'O' -- size 5KiB -- ========= HTTP API and handlers ============= type PostServiceAPI = "relay" :> "inbox" :> Capture "hashtag" Txt.Text :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent - -- delivery endpoint of newly published posts of the relay's instance + -- delivery endpoint at responsible relay for delivering posts of $tag for distribution :<|> "relay" :> "subscribers" :> Capture "senderID" Integer :> ReqBody '[PlainText] Txt.Text :> PostNoContent '[PlainText] Txt.Text -- endpoint for delivering the subscriptions and outstanding queue :<|> "post" :> Capture "postid" Txt.Text :> Get '[PlainText] Txt.Text @@ -187,11 +180,11 @@ relayInbox serv tag posts = do -- skip checking whether the post actually contains the tag, just drop full post postIDs = head . Txt.splitOn "," <$> Txt.lines posts -- if tag is not in own responsibility, return a 410 Gone - responsible <- liftIO $ isResponsibleFor (baseDHT serv) (genKeyID . Txt.unpack $ tag) + responsible <- liftIO $ isResponsibleFor (baseDHT serv) (hashtagToId tag) if responsible then pure () else - (throwError $ err410 { errBody = "Relay is not responsible for this tag"}) + throwError $ err410 { errBody = "Relay is not responsible for this tag"} broadcastChan <- liftIO $ atomically $ getTagBroadcastChannel serv tag maybe -- if noone subscribed to the tag, nothing needs to be done @@ -221,7 +214,7 @@ subscriptionDelivery serv senderID subList = do -- not-handled tag occurs, this results in a single large transaction. -- Hopefully the performance isn't too bad. res <- liftIO . atomically $ (foldM (\_ tag' -> do - responsible <- isResponsibleForSTM (baseDHT serv) (genKeyID . Txt.unpack $ tag') + responsible <- isResponsibleForSTM (baseDHT serv) (hashtagToId tag') if responsible then processTag (subscribers serv) tag' else throwSTM $ UnhandledTagException (Txt.unpack tag' <> " not handled by this relay") @@ -295,7 +288,7 @@ tagDelivery :: PostService d -> Txt.Text -> Txt.Text -> Handler Txt.Text tagDelivery serv hashtag posts = do let postIDs = Txt.lines posts subscriptions <- liftIO . readTVarIO . ownSubscriptions $ serv - if isJust (HMap.lookup (genKeyID . Txt.unpack $ hashtag) subscriptions) + if isJust (HMap.lookup (hashtagToId hashtag) subscriptions) then -- TODO: increase a counter/ statistics for received posts of this tag liftIO $ forM_ postIDs $ atomically . writeTQueue (postFetchQueue serv) else -- silently drop posts from unsubscribed tags @@ -304,7 +297,7 @@ tagDelivery serv hashtag posts = do tagSubscribe :: DHT d => PostService d -> Txt.Text -> Maybe Txt.Text -> Handler Integer tagSubscribe serv hashtag origin = do - responsible <- liftIO $ isResponsibleFor (baseDHT serv) (genKeyID . Txt.unpack $ hashtag) + responsible <- liftIO $ isResponsibleFor (baseDHT serv) (hashtagToId hashtag) if not responsible -- GONE if not responsible then throwError err410 { errBody = "not responsible for this tag" } @@ -323,7 +316,7 @@ tagSubscribe serv hashtag origin = do tagUnsubscribe :: DHT d => PostService d -> Txt.Text -> Maybe Txt.Text -> Handler Txt.Text tagUnsubscribe serv hashtag origin = do - responsible <- liftIO $ isResponsibleFor (baseDHT serv) (genKeyID . Txt.unpack $ hashtag) + responsible <- liftIO $ isResponsibleFor (baseDHT serv) (hashtagToId hashtag) if not responsible -- GONE if not responsible then throwError err410 { errBody = "not responsible for this tag" } @@ -355,7 +348,7 @@ clientDeliverSubscriptions :: PostService d -> (String, Int) -- ^ hostname and port of instance to deliver to -> IO (Either String ()) -- Either signals success or failure clientDeliverSubscriptions serv fromNode fromKey toKey (toHost, toPort) = do - -- collect tag intearval + -- collect tag interval intervalTags <- takeRMapSuccessorsFromTo fromKey toKey <$> readTVarIO (subscribers serv) -- returns a [ (TagSubscribersSTM, TChan PostID, Hashtag) ] -- extract subscribers and posts @@ -385,7 +378,7 @@ clientDeliverSubscriptions serv fromNode fromKey toKey (toHost, toPort) = do Right _ -> do atomically $ modifyTVar' (subscribers serv) $ \tagMap -> - foldr deleteRMapEntry tagMap ((\(_, _, t) -> genKeyID . Txt.unpack $ t) <$> intervalTags) + foldr deleteRMapEntry tagMap ((\(_, _, t) -> hashtagToId t) <$> intervalTags) pure . Right $ () where channelGetAll :: TChan a -> STM [a] @@ -396,7 +389,8 @@ clientDeliverSubscriptions serv fromNode fromKey toKey (toHost, toPort) = do maybe (pure acc) (\x -> channelGetAll' chan (x:acc)) haveRead --- | Subscribe the client to the given hashtag. On success it returns the given lease time. +-- | Subscribe the client to the given hashtag. On success it returns the given lease time, +-- but also records the subscription in its own data structure. clientSubscribeTo :: DHT d => PostService d -> Hashtag -> IO (Either String Integer) clientSubscribeTo serv tag = do lookupRes <- lookupKey (baseDHT serv) (Txt.unpack tag) @@ -413,7 +407,9 @@ clientSubscribeTo serv tag = do newRes <- forceLookupKey (baseDHT serv) (Txt.unpack tag) doSubscribe newRes False Left err -> pure . Left . show $ err - Right lease -> pure . Right $ lease + Right lease -> do + atomically . modifyTVar' (ownSubscriptions serv) $ HMap.insert (hashtagToId tag) (fromInteger lease) + pure . Right $ lease ) lookupResponse @@ -435,7 +431,9 @@ clientUnsubscribeFrom serv tag = do newRes <- forceLookupKey (baseDHT serv) (Txt.unpack tag) doUnsubscribe newRes False Left err -> pure . Left . show $ err - Right _ -> pure . Right $ () + Right _ -> do + atomically . modifyTVar' (ownSubscriptions serv) $ HMap.delete (hashtagToId tag) + pure . Right $ () ) lookupResponse @@ -445,11 +443,11 @@ clientUnsubscribeFrom serv tag = do -- the post to the responsible relays. -- As the initial publishing isn't done by a specific relay (but *to* a specific relay -- instead), the function does *not* take a PostService as argument. -clientPublishPost :: HTTP.Manager -- for better performance, a shared HTTP manager has to be provided - -> String -- hostname - -> Int -- port - -> PostContent -- post content - -> IO (Either String ()) -- error or success +clientPublishPost :: HTTP.Manager -- ^ for better performance, a shared HTTP manager has to be provided + -> String -- ^ hostname + -> Int -- ^ port + -> PostContent -- ^ post content + -> IO (Either String ()) -- ^ error or success clientPublishPost httpman hostname port postC = do resp <- runClientM (postInboxClient postC) (mkClientEnv httpman (BaseUrl Http hostname port "")) pure . bimap show (const ()) $ resp @@ -492,7 +490,7 @@ setupSubscriberChannel tagMapSTM tag subscriber leaseTime = do broadcastChan <- newBroadcastTChan tagOutChan <- dupTChan broadcastChan newSubMapSTM <- newTVar $ HMap.singleton subscriber (tagOutChan, leaseTime) - writeTVar tagMapSTM $ addRMapEntry (genKeyID . Txt.unpack $ tag) (newSubMapSTM, broadcastChan, tag) tagMap + writeTVar tagMapSTM $ addRMapEntry (hashtagToId tag) (newSubMapSTM, broadcastChan, tag) tagMap pure tagOutChan Just (foundSubMapSTM, broadcastChan, _) -> do -- otherwise use the existing subscriber map @@ -520,7 +518,7 @@ deleteSubscription tagMapSTM tag subscriber = do -- if there are no subscriptions for the tag anymore, remove its -- data sttructure altogether if HMap.null newSubMap - then writeTVar tagMapSTM $ deleteRMapEntry (genKeyID . Txt.unpack $ tag) tagMap + then writeTVar tagMapSTM $ deleteRMapEntry (hashtagToId tag) tagMap -- otherwise just remove the subscription of that node else writeTVar foundSubMapSTM newSubMap @@ -541,13 +539,18 @@ getTagBroadcastChannel serv tag = do -- | look up the subscription data of a tag lookupTagSubscriptions :: Hashtag -> RingMap NodeID a -> Maybe a -lookupTagSubscriptions tag = rMapLookup (genKeyID . Txt.unpack $ tag) +lookupTagSubscriptions tag = rMapLookup (hashtagToId tag) -- normalise the unicode representation of a string to NFC normaliseTag :: Txt.Text -> Txt.Text normaliseTag = Txt.fromStrict . normalize NFC . Txt.toStrict + +-- | convert a hashtag to its representation on the DHT +hashtagToId :: Hashtag -> NodeID +hashtagToId = genKeyID . Txt.unpack + -- | define how to convert all showable types to PlainText -- No idea what I'm doing with these overlappable instances though ¯\_(ツ)_/¯ -- TODO: figure out how this overlapping stuff actually works https://downloads.haskell.org/~ghc/latest/docs/html/users_guide/glasgow_exts.html#instance-overlap @@ -566,12 +569,13 @@ processIncomingPosts serv = forever $ do -- blocks until available -- TODO: process multiple in parallel (tag, pID, pContent) <- atomically . readTQueue $ relayInQueue serv + let pIdUri = "http://" <> (Txt.pack . confServiceHost . serviceConf $ serv) <> ":" <> (fromString . show . confServicePort . serviceConf $ serv) <> "/post/" <> pID lookupRes <- lookupKey (baseDHT serv) (Txt.unpack tag) case lookupRes of -- no vserver active => wait and retry Nothing -> threadDelay $ 10 * 10^6 Just (responsibleHost, responsiblePort) -> do - resp <- runClientM (relayInboxClient tag $ pID <> "," <> pContent) (mkClientEnv (httpMan serv) (BaseUrl Http responsibleHost (fromIntegral responsiblePort) "")) + resp <- runClientM (relayInboxClient tag $ pIdUri <> "," <> pContent) (mkClientEnv (httpMan serv) (BaseUrl Http responsibleHost (fromIntegral responsiblePort) "")) case resp of Left err -> do putStrLn $ "Error: " <> show err @@ -580,7 +584,14 @@ processIncomingPosts serv = forever $ do -- TODO: keep track of maximum retries _ <- forceLookupKey (baseDHT serv) (Txt.unpack tag) atomically . writeTQueue (relayInQueue serv) $ (tag, pID, pContent) - Right yay -> putStrLn $ "Yay! " <> show yay + Right _ -> do + -- TODO: stats + -- idea for the experiment: each post publication makes the initial posting instance subscribe to all contained tags + now <- getPOSIXTime + subscriptionStatus <- HMap.lookup (hashtagToId tag) <$> readTVarIO (ownSubscriptions serv) + -- if not yet subscribed or subscription expires within 2 minutes, (re)subscribe to tag + when (maybe False (\subLease -> now - subLease < 120) subscriptionStatus) $ + void $ clientSubscribeTo serv tag -- | process the pending fetch jobs of delivered post IDs: Delivered posts are tried to be fetched from their URI-ID