diff --git a/app/Main.hs b/app/Main.hs index 4810fb0..c08cd3c 100644 --- a/app/Main.hs +++ b/app/Main.hs @@ -45,27 +45,33 @@ main = do readConfig :: IO (FediChordConf, ServiceConf) readConfig = do - confDomainString : ipString : portString : servicePortString : speedup : remainingArgs <- getArgs + confDomainString : ipString : portString : servicePortString : speedupString : remainingArgs <- getArgs -- allow starting the initial node without bootstrapping info to avoid -- waiting for timeout let + speedup = read speedupString confBootstrapNodes' = case remainingArgs of bootstrapHost : bootstrapPortString : _ -> [(bootstrapHost, read bootstrapPortString)] _ -> [] fConf = FediChordConf { - confDomain = confDomainString - , confIP = toHostAddress6 . read $ ipString - , confDhtPort = read portString - , confBootstrapNodes = confBootstrapNodes' - --, confStabiliseInterval = 60 - , confBootstrapSamplingInterval = 180 - , confMaxLookupCacheAge = 300 + confDomain = confDomainString + , confIP = toHostAddress6 . read $ ipString + , confDhtPort = read portString + , confBootstrapNodes = confBootstrapNodes' + , confStabiliseInterval = 60 * 10^6 + , confBootstrapSamplingInterval = 180 * 10^6 `div` speedup + , confMaxLookupCacheAge = 300 / fromIntegral speedup + , confJoinAttemptsInterval = 60 * 10^6 `div` speedup + , confMaxNodeCacheAge = 600 / fromIntegral speedup + , confResponsePurgeAge = 60 / fromIntegral speedup + , confRequestTimeout = 5 * 10^6 `div` speedup + , confRequestRetries = 3 } sConf = ServiceConf { - confSubscriptionExpiryTime = fromIntegral $ 2*3600 `div` (read speedup :: Integer) - , confServicePort = read servicePortString - , confServiceHost = confDomainString + confSubscriptionExpiryTime = fromIntegral $ 2*3600 `div` speedup + , confServicePort = read servicePortString + , confServiceHost = confDomainString } pure (fConf, sConf) diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index fa5a54a..8258ca3 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -488,10 +488,11 @@ requestJoin :: (NodeState a, Service s (RealNodeSTM s)) => a -- ^ cu requestJoin toJoinOn ownStateSTM = do ownState <- readTVarIO ownStateSTM prn <- readTVarIO $ parentRealNode ownState - srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ownState) + nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ownState) + let srcAddr = confIP nodeConf bracket (mkSendSocket srcAddr (getDomain toJoinOn) (getDhtPort toJoinOn)) close (\sock -> do -- extract own state for getting request information - responses <- sendRequestTo (\rid -> Request rid (toRemoteNodeState ownState) 1 True Join (Just JoinRequestPayload)) sock + responses <- sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid -> Request rid (toRemoteNodeState ownState) 1 True Join (Just JoinRequestPayload)) sock (cacheInsertQ, joinedState) <- atomically $ do stateSnap <- readTVar ownStateSTM let @@ -584,10 +585,11 @@ sendQueryIdMessages targetID ns lParam targets = do -- create connected sockets to all query targets and use them for request handling - srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns) + nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns) + let srcAddr = confIP nodeConf -- ToDo: make attempts and timeout configurable queryThreads <- mapM (\resultNode -> async $ bracket (mkSendSocket srcAddr (getDomain resultNode) (getDhtPort resultNode)) close ( - sendRequestTo (lookupMessage targetID ns Nothing) + sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (lookupMessage targetID ns Nothing) )) targets -- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613 -- ToDo: exception handling, maybe log them @@ -635,8 +637,9 @@ requestStabilise :: LocalNodeState s -- ^ sending node -> RemoteNodeState -- ^ neighbour node to send to -> IO (Either String ([RemoteNodeState], [RemoteNodeState])) -- ^ (predecessors, successors) of responding node requestStabilise ns neighbour = do - srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns) - responses <- bracket (mkSendSocket srcAddr (getDomain neighbour) (getDhtPort neighbour)) close (fmap Right . sendRequestTo (\rid -> + nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns) + let srcAddr = confIP nodeConf + responses <- bracket (mkSendSocket srcAddr (getDomain neighbour) (getDhtPort neighbour)) close (fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid -> Request { requestID = rid , sender = toRemoteNodeState ns @@ -673,13 +676,14 @@ requestLeave :: LocalNodeState s -> RemoteNodeState -- target node -> IO (Either String ()) -- error or success requestLeave ns doMigration target = do - srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns) - let leavePayload = LeaveRequestPayload { + nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns) + let srcAddr = confIP nodeConf + leavePayload = LeaveRequestPayload { leaveSuccessors = successors ns , leavePredecessors = predecessors ns , leaveDoMigration = doMigration } - responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close (fmap Right . sendRequestTo (\rid -> + responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close (fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid -> Request { requestID = rid , sender = toRemoteNodeState ns @@ -701,10 +705,11 @@ requestPing :: LocalNodeState s -- ^ sending node -> RemoteNodeState -- ^ node to be PINGed -> IO (Either String [RemoteNodeState]) -- ^ all active vServers of the pinged node requestPing ns target = do - srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns) + nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns) + let srcAddr = confIP nodeConf responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close (\sock -> do - resp <- sendRequestTo (\rid -> + resp <- sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid -> Request { requestID = rid , sender = toRemoteNodeState ns @@ -740,22 +745,14 @@ requestPing ns target = do ) responses --- | 'sendRequestToWithParams' with default timeout and retries already specified. --- Generic function for sending a request over a connected socket and collecting the response. --- Serialises the message and tries to deliver its parts for a number of attempts within a default timeout. -sendRequestTo :: (Integer -> FediChordMessage) -- ^ the message to be sent, still needing a requestID - -> Socket -- ^ connected socket to use for sending - -> IO (Set.Set FediChordMessage) -- ^ responses -sendRequestTo = sendRequestToWithParams 5000 3 - -- | Generic function for sending a request over a connected socket and collecting the response. -- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout. -sendRequestToWithParams :: Int -- ^ timeout in milliseconds - -> Int -- ^ number of retries - -> (Integer -> FediChordMessage) -- ^ the message to be sent, still needing a requestID - -> Socket -- ^ connected socket to use for sending - -> IO (Set.Set FediChordMessage) -- ^ responses -sendRequestToWithParams timeoutMillis numAttempts msgIncomplete sock = do +sendRequestTo :: Int -- ^ timeout in milliseconds + -> Int -- ^ number of retries + -> (Integer -> FediChordMessage) -- ^ the message to be sent, still needing a requestID + -> Socket -- ^ connected socket to use for sending + -> IO (Set.Set FediChordMessage) -- ^ responses +sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do -- give the message a random request ID randomID <- randomRIO (0, 2^32-1) let @@ -764,7 +761,7 @@ sendRequestToWithParams timeoutMillis numAttempts msgIncomplete sock = do -- create a queue for passing received response messages back, even after a timeout responseQ <- newTBQueueIO $ 2*maximumParts -- keep room for duplicate packets -- start sendAndAck with timeout - attempts numAttempts . timeout (timeoutMillis*1000) $ sendAndAck responseQ sock requests + _ <- attempts numAttempts . timeout (timeoutMillis*1000) $ sendAndAck responseQ sock requests -- after timeout, check received responses, delete them from unacked message set/ map and rerun senAndAck with that if necessary. recvdParts <- atomically $ flushTBQueue responseQ pure $ Set.fromList recvdParts diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index 45d0bf9..33044fa 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -199,7 +199,7 @@ convergenceSampleThread nsSTM = forever $ do -- unjoined node: try joining through all bootstrapping nodes else tryBootstrapJoining nsSTM >> pure () let delaySecs = confBootstrapSamplingInterval . nodeConfig $ parentNode - threadDelay $ delaySecs * 10^6 + threadDelay delaySecs -- | Try joining the DHT through any of the bootstrapping nodes until it succeeds. @@ -223,10 +223,11 @@ tryBootstrapJoining nsSTM = do bootstrapQueryId :: LocalNodeStateSTM s -> (String, PortNumber) -> NodeID -> IO (Either String RemoteNodeState) bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do ns <- readTVarIO nsSTM - srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns) + nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns) + let srcAddr = confIP nodeConf bootstrapResponse <- bracket (mkSendSocket srcAddr bootstrapHost bootstrapPort) close ( -- Initialise an empty cache only with the responses from a bootstrapping node - fmap Right . sendRequestTo (lookupMessage targetID ns Nothing) + fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (lookupMessage targetID ns Nothing) ) `catch` (\e -> pure . Left $ "Error at bootstrap QueryId: " <> displayException (e :: IOException)) @@ -310,12 +311,13 @@ joinOnNewEntriesThread nsSTM = loop where loop = do nsSnap <- readTVarIO nsSTM - (lookupResult, cache) <- atomically $ do + (lookupResult, parentNode) <- atomically $ do cache <- readTVar $ nodeCacheSTM nsSnap + parentNode <- readTVar $ parentRealNode nsSnap case queryLocalCache nsSnap cache 1 (getNid nsSnap) of -- empty cache, block until cache changes and then retry (FORWARD s) | Set.null s -> retry - result -> pure (result, cache) + result -> pure (result, parentNode) case lookupResult of -- already joined FOUND _ -> @@ -325,8 +327,7 @@ joinOnNewEntriesThread nsSTM = loop joinResult <- runExceptT $ fediChordVserverJoin nsSTM either -- on join failure, sleep and retry - -- TODO: make delay configurable - (const $ threadDelay (30 * 10^6) >> loop) + (const $ threadDelay (confJoinAttemptsInterval . nodeConfig $ parentNode) >> loop) (const $ pure ()) joinResult @@ -341,20 +342,16 @@ nodeCacheWriter nsSTM = modifyTVar' (nodeCacheSTM ns) cacheModifier --- TODO: make max entry age configurable -maxEntryAge :: POSIXTime -maxEntryAge = 600 - - -- | Periodically iterate through cache, clean up expired entries and verify unverified ones nodeCacheVerifyThread :: LocalNodeStateSTM s -> IO () nodeCacheVerifyThread nsSTM = forever $ do putStrLn "cache verify run: begin" -- get cache - (ns, cache) <- atomically $ do + (ns, cache, maxEntryAge) <- atomically $ do ns <- readTVar nsSTM cache <- readTVar $ nodeCacheSTM ns - pure (ns, cache) + maxEntryAge <- confMaxNodeCacheAge . nodeConfig <$> readTVar (parentRealNode ns) + pure (ns, cache, maxEntryAge) -- iterate entries: -- for avoiding too many time syscalls, get current time before iterating. now <- getPOSIXTime @@ -402,7 +399,7 @@ nodeCacheVerifyThread nsSTM = forever $ do ) putStrLn "cache verify run: end" - threadDelay $ 10^6 * round maxEntryAge `div` 20 + threadDelay $ fromEnum (maxEntryAge / 20) `div` 10^6 -- convert from pico to milliseconds -- | Checks the invariant of at least @jEntries@ per cache slice. @@ -548,8 +545,8 @@ stabiliseThread nsSTM = forever $ do newPredecessor putStrLn "stabilise run: end" - -- TODO: make delay configurable - threadDelay (60 * 10^6) + stabiliseDelay <- confStabiliseInterval . nodeConfig <$> readTVarIO (parentRealNode newNs) + threadDelay stabiliseDelay where -- | send a stabilise request to the n-th neighbour -- (specified by the provided getter function) and on failure retry @@ -636,19 +633,15 @@ type RequestMap = Map.Map (SockAddr, Integer) RequestMapEntry data RequestMapEntry = RequestMapEntry (Set.Set FediChordMessage) (Maybe Integer) POSIXTime --- TODO: make purge age configurable --- | periodically clean up old request parts -responsePurgeAge :: POSIXTime -responsePurgeAge = 60 -- seconds -requestMapPurge :: MVar RequestMap -> IO () -requestMapPurge mapVar = forever $ do +requestMapPurge :: POSIXTime -> MVar RequestMap -> IO () +requestMapPurge purgeAge mapVar = forever $ do rMapState <- takeMVar mapVar now <- getPOSIXTime putMVar mapVar $ Map.filter (\(RequestMapEntry _ _ ts) -> - now - ts < responsePurgeAge + now - ts < purgeAge ) rMapState - threadDelay $ round responsePurgeAge * 2 * 10^6 + threadDelay $ (fromEnum purgeAge * 2) `div` 10^6 -- | Wait for messages, deserialise them, manage parts and acknowledgement status, @@ -663,12 +656,13 @@ fediMessageHandler sendQ recvQ nsSTM = do -- not change. -- Other functions are passed the nsSTM reference and thus can get the latest state. nsSnap <- readTVarIO nsSTM + nodeConf <- nodeConfig <$> readTVarIO (parentRealNode nsSnap) -- handling multipart messages: -- Request parts can be insert into a map (key: (sender IP against spoofing, request ID), value: timestamp + set of message parts, handle all of them when size of set == parts) before being handled. This map needs to be purged periodically by a separate thread and can be protected by an MVar for fairness. requestMap <- newMVar (Map.empty :: RequestMap) -- run receive loop and requestMapPurge concurrently, so that an exception makes -- both of them fail - concurrently_ (requestMapPurge requestMap) $ forever $ do + concurrently_ (requestMapPurge (confResponsePurgeAge nodeConf) requestMap) $ forever $ do -- wait for incoming messages (rawMsg, sourceAddr) <- atomically $ readTQueue recvQ let aMsg = deserialiseMessage rawMsg @@ -807,4 +801,4 @@ lookupCacheCleanup nodeSTM = do now - ts < confMaxLookupCacheAge (nodeConfig node) ) ) - threadDelay $ round (confMaxLookupCacheAge $ nodeConfig node) * (10^5) + threadDelay $ fromEnum (2 * confMaxLookupCacheAge (nodeConfig node)) `div` 10^6 diff --git a/src/Hash2Pub/FediChordTypes.hs b/src/Hash2Pub/FediChordTypes.hs index cbd3a58..3b563e6 100644 --- a/src/Hash2Pub/FediChordTypes.hs +++ b/src/Hash2Pub/FediChordTypes.hs @@ -411,10 +411,22 @@ data FediChordConf = FediChordConf -- ^ listening port for the FediChord DHT , confBootstrapNodes :: [(String, PortNumber)] -- ^ list of potential bootstrapping nodes + , confStabiliseInterval :: Int + -- ^ pause between stabilise runs, in milliseconds , confBootstrapSamplingInterval :: Int - -- ^ pause between sampling the own ID through bootstrap nodes, in seconds + -- ^ pause between sampling the own ID through bootstrap nodes, in milliseconds , confMaxLookupCacheAge :: POSIXTime - -- ^ maximum age of lookup cache entries in seconds + -- ^ maximum age of key lookup cache entries in seconds + , confJoinAttemptsInterval :: Int + -- ^ interval between join attempts on newly learned nodes, in milliseconds + , confMaxNodeCacheAge :: POSIXTime + -- ^ maximum age of entries in the node cache, in milliseconds + , confResponsePurgeAge :: POSIXTime + -- ^ maximum age of message parts in response part cache, in seconds + , confRequestTimeout :: Int + -- ^ how long to wait until response has arrived, in milliseconds + , confRequestRetries :: Int + -- ^ how often re-sending a timed-out request can be retried } deriving (Show, Eq)