diff --git a/Hash2Pub.cabal b/Hash2Pub.cabal index 376d675..2d195e3 100644 --- a/Hash2Pub.cabal +++ b/Hash2Pub.cabal @@ -46,8 +46,8 @@ category: Network extra-source-files: CHANGELOG.md common deps - build-depends: base >=4, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=3.1, time, cmdargs ^>= 0.10, cryptonite, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms, http-client, http-types, unbounded-delays, dlist, formatting - ghc-options: -Wall -Wpartial-fields -O2 + build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms, http-client, http-types, unbounded-delays + ghc-options: -Wall -Wpartial-fields @@ -58,7 +58,7 @@ library exposed-modules: Hash2Pub.FediChord, Hash2Pub.FediChordTypes, Hash2Pub.DHTProtocol, Hash2Pub.ASN1Coding, Hash2Pub.ProtocolTypes, Hash2Pub.PostService, Hash2Pub.RingMap -- Modules included in this library but not exported. - other-modules: Hash2Pub.Utils, Hash2Pub.PostService.API + other-modules: Hash2Pub.Utils -- LANGUAGE extensions used by modules in this package. other-extensions: GeneralizedNewtypeDeriving, DataKinds, OverloadedStrings @@ -91,7 +91,7 @@ executable Hash2Pub -- Base language which the package is written in. default-language: Haskell2010 - ghc-options: -threaded -rtsopts -with-rtsopts=-N + ghc-options: -threaded executable Experiment -- experiment runner diff --git a/Readme.md b/Readme.md index e3cff3d..3c7dbe5 100644 --- a/Readme.md +++ b/Readme.md @@ -1,7 +1,7 @@ # Hash2Pub ***This is heavily WIP and does not provide any useful functionality yet***. -I aim for always having the `mainline` branch in a state where it builds and tests pass. +I aim for always having the master branch at a state where it builds and tests pass. A fully-decentralised relay for global hashtag federation in [ActivityPub](https://activitypub.rocks) based on a distributed hash table. It allows querying and subscribing to all posts of a certain hashtag and is implemented in Haskell. @@ -10,13 +10,8 @@ This is the practical implementation of the concept presented in the paper [Dece The ASN.1 module schema used for DHT messages can be found in `FediChord.asn1`. -For further questions and discussins, please refer to the **Hash2Pub topic in [SocialHub](https://socialhub.activitypub.rocks/c/software/hash2pub/48)**. - ## Building The project and its developent environment are built with [Nix](https://nixos.org/nix/). -The development environment can be entered with `nix-shell shell-minimal.nix`. Then the project can be built with `cabal build` from within the environment, or using `nix-shell --command "cabal build" shell-minimal.nix` to do both steps at once. - -While the `shell-minimal.nix` environment contains everything necessary for building and testing this project, the `shell.nix` additionally contains the Haskell IDE engine *hie* and the documentation for all used Haskell packages for more convenient development. -Be aware that these need to be build from source and can take a very long time to build. +The development environment can be entered with `nix-shell`. Then the project can be built with `cabal build` from within the environment, or using `nix-shell --command "cabal build"` to do both steps at once. diff --git a/app/Experiment.hs b/app/Experiment.hs index f2fa586..deb4cae 100644 --- a/app/Experiment.hs +++ b/app/Experiment.hs @@ -3,49 +3,42 @@ module Main where import Control.Concurrent -import Control.Monad (forM_) -import qualified Data.Text.Lazy as Txt -import qualified Data.Text.Lazy.IO as TxtI -import qualified Network.HTTP.Client as HTTP -import System.Environment (getArgs) +import Control.Monad (forM_) +import Control.Monad.IO.Class +import Control.Monad.State.Class +import Control.Monad.State.Strict (evalStateT) +import qualified Network.HTTP.Client as HTTP +import System.Random -import Hash2Pub.PostService (Hashtag, clientPublishPost) +import Hash2Pub.PostService (Hashtag, clientPublishPost) --- configuration constants -timelineFile = "../simulationData/inputs/generated/timeline_sample.csv" +-- placeholder post data definition + +tagsToPostTo = [ "JustSomeTag", "WantAnotherTag234", "HereWeGoAgain", "Oyä", "通信端末" ] + +knownRelays :: [(String, Int)] +knownRelays = + [ ("animalliberation.social", 3342) + , ("hostux.social", 3343) + , ("social.diskseven.com", 3344) + , ("social.imirhil.fr", 3345) + ] main :: IO () main = do - -- read CLI parameters - speedupStr : _ <- getArgs - -- read and parse timeline schedule - -- relying on lazyness of HaskellIO, hoping it does not introduce too strong delays - postEvents <- parseSchedule <$> TxtI.readFile timelineFile - -- actually schedule and send the post events - executeSchedule (read speedupStr) postEvents - pure () - - - -parseSchedule :: Txt.Text - -> [(Int, Hashtag, (String, Int))] -- ^ [(delay in microseconds, hashtag, (hostname, port))] -parseSchedule = fmap (parseEntry . Txt.split (== ';')) . Txt.lines - where - parseEntry [delayT, contactT, tag] = - (read $ Txt.unpack delayT, tag, read $ Txt.unpack contactT) - parseEntry entry = error $ "invalid schedule input format: " <> show entry - -executeSchedule :: Int -- ^ speedup factor - -> [(Int, Hashtag, (String, Int))] -- ^ [(delay in microseconds, hashtag, (hostname, port))] - -> IO () -executeSchedule speedup events = do -- initialise HTTP manager - httpMan <- HTTP.newManager $ HTTP.defaultManagerSettings { HTTP.managerResponseTimeout = HTTP.responseTimeoutMicro 60000000 } - forM_ events $ \(delay, tag, (pubHost, pubPort)) -> do - _ <- forkIO $ - clientPublishPost httpMan pubHost pubPort ("foobar #" <> tag) - >>= either putStrLn (const $ pure ()) - -- while threadDelay gives only minimum delay guarantees, let's hope the - -- additional delays are negligible - -- otherwise: evaluate usage of https://hackage.haskell.org/package/schedule-0.3.0.0/docs/Data-Schedule.html - threadDelay $ delay `div` speedup + httpMan <- HTTP.newManager HTTP.defaultManagerSettings + -- initialise RNG + let initRGen = mkStdGen 12 + -- cycle through tags and post to a random instance + evalStateT (forM_ (cycle tagsToPostTo) $ publishPostRandom httpMan) initRGen + -- wait for a specified time + +publishPostRandom :: (RandomGen g, MonadIO m, MonadState g m) => HTTP.Manager -> Hashtag -> m () +publishPostRandom httpman tag = do + index <- state $ randomR (0, length knownRelays - 1) + let (pubHost, pubPort) = knownRelays !! index + _ <- liftIO . forkIO $ do + postResult <- liftIO $ clientPublishPost httpman pubHost pubPort ("foobar #" <> tag) + either putStrLn (const $ pure ()) postResult + liftIO $ threadDelay 500 diff --git a/app/Main.hs b/app/Main.hs index eac223d..3bdb4d4 100644 --- a/app/Main.hs +++ b/app/Main.hs @@ -45,36 +45,21 @@ main = do readConfig :: IO (FediChordConf, ServiceConf) readConfig = do - confDomainString : ipString : portString : servicePortString : speedupString : remainingArgs <- getArgs - -- allow starting the initial node without bootstrapping info to avoid - -- waiting for timeout + confDomainString : ipString : portString : bootstrapHost : bootstrapPortString : servicePortString : speedup : _ <- getArgs let - speedup = read speedupString - confBootstrapNodes' = case remainingArgs of - bootstrapHost : bootstrapPortString : _ -> - [(bootstrapHost, read bootstrapPortString)] - _ -> [] - fConf = FediChordConf - { confDomain = confDomainString - , confIP = toHostAddress6 . read $ ipString - , confDhtPort = read portString - , confBootstrapNodes = confBootstrapNodes' - , confStabiliseInterval = 80 * 10^6 - , confBootstrapSamplingInterval = 180 * 10^6 `div` speedup - , confMaxLookupCacheAge = 300 / fromIntegral speedup - , confJoinAttemptsInterval = 60 * 10^6 `div` speedup - , confMaxNodeCacheAge = 600 / fromIntegral speedup - , confResponsePurgeAge = 60 / fromIntegral speedup - , confRequestTimeout = 5 * 10^6 `div` speedup - , confRequestRetries = 3 - } - sConf = ServiceConf - { confSubscriptionExpiryTime = 24*3600 / fromIntegral speedup - , confServicePort = read servicePortString - , confServiceHost = confDomainString - , confLogfilePath = "../simulationData/logs/" <> confDomainString <> ".log" - , confSpeedupFactor = speedup - , confStatsEvalDelay = 120 * 10^6 `div` speedup - } + fConf = FediChordConf { + confDomain = confDomainString + , confIP = toHostAddress6 . read $ ipString + , confDhtPort = read portString + , confBootstrapNodes = [(bootstrapHost, read bootstrapPortString)] + --, confStabiliseInterval = 60 + , confBootstrapSamplingInterval = 180 + , confMaxLookupCacheAge = 300 + } + sConf = ServiceConf { + confSubscriptionExpiryTime = fromIntegral $ 2*3600 `div` (read speedup :: Integer) + , confServicePort = read servicePortString + , confServiceHost = confDomainString + } pure (fConf, sConf) diff --git a/default.nix b/default.nix index a3f7640..cea4aa3 100644 --- a/default.nix +++ b/default.nix @@ -1,18 +1,26 @@ { - compiler ? "ghc884" + compiler ? "ghc865", + withHIE ? false }: let + # pin all-hies for getting the language server + all-hies = fetchTarball { + url = "https://github.com/infinisil/all-hies/tarball/b8fb659620b99b4a393922abaa03a1695e2ca64d"; + sha256 = "sha256:0br6wsqpfk1lzz90f7zw439w1ir2p54268qilw9l2pk6yz7ganfx"; + }; pkgs = import ( builtins.fetchGit { name = "nixpkgs-pinned"; url = https://github.com/NixOS/nixpkgs/; - ref = "refs/heads/release-20.09"; - rev = "e065200fc90175a8f6e50e76ef10a48786126e1c"; + ref = "refs/heads/release-20.03"; + rev = "de3780b937d2984f9b5e20d191f23be4f857b3aa"; }) { # Pass no config for purity config = {}; - overlays = []; + overlays = [ + (import all-hies {}).overlay + ]; }; hp = pkgs.haskell.packages."${compiler}"; src = pkgs.nix-gitignore.gitignoreSource [] ./.; @@ -30,6 +38,7 @@ in hlint stylish-haskell pkgs.python3Packages.asn1ate - ]; + ] + ++ (if withHIE then [ hie ] else []); }; } diff --git a/shell.nix b/shell.nix index 82fb296..dafd212 100644 --- a/shell.nix +++ b/shell.nix @@ -1 +1 @@ -(import ./default.nix {}).shell +(import ./default.nix {withHIE = true;}).shell diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index c86c0f1..fa5a54a 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -488,11 +488,10 @@ requestJoin :: (NodeState a, Service s (RealNodeSTM s)) => a -- ^ cu requestJoin toJoinOn ownStateSTM = do ownState <- readTVarIO ownStateSTM prn <- readTVarIO $ parentRealNode ownState - nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ownState) - let srcAddr = confIP nodeConf + srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ownState) bracket (mkSendSocket srcAddr (getDomain toJoinOn) (getDhtPort toJoinOn)) close (\sock -> do -- extract own state for getting request information - responses <- sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid -> Request rid (toRemoteNodeState ownState) 1 True Join (Just JoinRequestPayload)) sock + responses <- sendRequestTo (\rid -> Request rid (toRemoteNodeState ownState) 1 True Join (Just JoinRequestPayload)) sock (cacheInsertQ, joinedState) <- atomically $ do stateSnap <- readTVar ownStateSTM let @@ -585,10 +584,10 @@ sendQueryIdMessages targetID ns lParam targets = do -- create connected sockets to all query targets and use them for request handling - nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns) - let srcAddr = confIP nodeConf + srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns) + -- ToDo: make attempts and timeout configurable queryThreads <- mapM (\resultNode -> async $ bracket (mkSendSocket srcAddr (getDomain resultNode) (getDhtPort resultNode)) close ( - sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (lookupMessage targetID ns Nothing) + sendRequestTo (lookupMessage targetID ns Nothing) )) targets -- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613 -- ToDo: exception handling, maybe log them @@ -636,9 +635,8 @@ requestStabilise :: LocalNodeState s -- ^ sending node -> RemoteNodeState -- ^ neighbour node to send to -> IO (Either String ([RemoteNodeState], [RemoteNodeState])) -- ^ (predecessors, successors) of responding node requestStabilise ns neighbour = do - nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns) - let srcAddr = confIP nodeConf - responses <- bracket (mkSendSocket srcAddr (getDomain neighbour) (getDhtPort neighbour)) close (fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid -> + srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns) + responses <- bracket (mkSendSocket srcAddr (getDomain neighbour) (getDhtPort neighbour)) close (fmap Right . sendRequestTo (\rid -> Request { requestID = rid , sender = toRemoteNodeState ns @@ -675,14 +673,13 @@ requestLeave :: LocalNodeState s -> RemoteNodeState -- target node -> IO (Either String ()) -- error or success requestLeave ns doMigration target = do - nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns) - let srcAddr = confIP nodeConf - leavePayload = LeaveRequestPayload { + srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns) + let leavePayload = LeaveRequestPayload { leaveSuccessors = successors ns , leavePredecessors = predecessors ns , leaveDoMigration = doMigration } - responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close (fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid -> + responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close (fmap Right . sendRequestTo (\rid -> Request { requestID = rid , sender = toRemoteNodeState ns @@ -704,11 +701,10 @@ requestPing :: LocalNodeState s -- ^ sending node -> RemoteNodeState -- ^ node to be PINGed -> IO (Either String [RemoteNodeState]) -- ^ all active vServers of the pinged node requestPing ns target = do - nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns) - let srcAddr = confIP nodeConf + srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns) responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close (\sock -> do - resp <- sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid -> + resp <- sendRequestTo (\rid -> Request { requestID = rid , sender = toRemoteNodeState ns @@ -744,14 +740,22 @@ requestPing ns target = do ) responses --- | Generic function for sending a request over a connected socket and collecting the response. --- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout. -sendRequestTo :: Int -- ^ timeout in milliseconds - -> Int -- ^ number of retries - -> (Integer -> FediChordMessage) -- ^ the message to be sent, still needing a requestID +-- | 'sendRequestToWithParams' with default timeout and retries already specified. +-- Generic function for sending a request over a connected socket and collecting the response. +-- Serialises the message and tries to deliver its parts for a number of attempts within a default timeout. +sendRequestTo :: (Integer -> FediChordMessage) -- ^ the message to be sent, still needing a requestID -> Socket -- ^ connected socket to use for sending -> IO (Set.Set FediChordMessage) -- ^ responses -sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do +sendRequestTo = sendRequestToWithParams 5000 3 + +-- | Generic function for sending a request over a connected socket and collecting the response. +-- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout. +sendRequestToWithParams :: Int -- ^ timeout in milliseconds + -> Int -- ^ number of retries + -> (Integer -> FediChordMessage) -- ^ the message to be sent, still needing a requestID + -> Socket -- ^ connected socket to use for sending + -> IO (Set.Set FediChordMessage) -- ^ responses +sendRequestToWithParams timeoutMillis numAttempts msgIncomplete sock = do -- give the message a random request ID randomID <- randomRIO (0, 2^32-1) let @@ -760,7 +764,7 @@ sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do -- create a queue for passing received response messages back, even after a timeout responseQ <- newTBQueueIO $ 2*maximumParts -- keep room for duplicate packets -- start sendAndAck with timeout - _ <- attempts numAttempts . timeout (timeoutMillis*1000) $ sendAndAck responseQ sock requests + attempts numAttempts . timeout (timeoutMillis*1000) $ sendAndAck responseQ sock requests -- after timeout, check received responses, delete them from unacked message set/ map and rerun senAndAck with that if necessary. recvdParts <- atomically $ flushTBQueue responseQ pure $ Set.fromList recvdParts @@ -865,7 +869,7 @@ mkServerSocket ip port = do sockAddr <- addrAddress <$> resolve (Just $ show . fromHostAddress6 $ ip) (Just port) sock <- socket AF_INET6 Datagram defaultProtocol setSocketOption sock IPv6Only 1 - bind sock sockAddr `catch` (\e -> putStrLn $ "Caught exception while bind " <> show sock <> " " <> show sockAddr <> ": " <> show (e :: SomeException)) + bind sock sockAddr pure sock -- | create a UDP datagram socket, connected to a destination. @@ -881,6 +885,6 @@ mkSendSocket srcIp dest destPort = do setSocketOption sendSock IPv6Only 1 -- bind to the configured local IP to make sure that outgoing packets are sent from -- this source address - bind sendSock srcAddr `catch` (\e -> putStrLn $ "Caught exception while mkSendSocket bind " <> show sendSock <> " " <> show srcAddr <> ": " <> show (e :: SomeException)) + bind sendSock srcAddr connect sendSock destAddr pure sendSock diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index 9f14a1e..45d0bf9 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -199,7 +199,7 @@ convergenceSampleThread nsSTM = forever $ do -- unjoined node: try joining through all bootstrapping nodes else tryBootstrapJoining nsSTM >> pure () let delaySecs = confBootstrapSamplingInterval . nodeConfig $ parentNode - threadDelay delaySecs + threadDelay $ delaySecs * 10^6 -- | Try joining the DHT through any of the bootstrapping nodes until it succeeds. @@ -223,11 +223,10 @@ tryBootstrapJoining nsSTM = do bootstrapQueryId :: LocalNodeStateSTM s -> (String, PortNumber) -> NodeID -> IO (Either String RemoteNodeState) bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do ns <- readTVarIO nsSTM - nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns) - let srcAddr = confIP nodeConf + srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns) bootstrapResponse <- bracket (mkSendSocket srcAddr bootstrapHost bootstrapPort) close ( -- Initialise an empty cache only with the responses from a bootstrapping node - fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (lookupMessage targetID ns Nothing) + fmap Right . sendRequestTo (lookupMessage targetID ns Nothing) ) `catch` (\e -> pure . Left $ "Error at bootstrap QueryId: " <> displayException (e :: IOException)) @@ -311,13 +310,12 @@ joinOnNewEntriesThread nsSTM = loop where loop = do nsSnap <- readTVarIO nsSTM - (lookupResult, parentNode) <- atomically $ do + (lookupResult, cache) <- atomically $ do cache <- readTVar $ nodeCacheSTM nsSnap - parentNode <- readTVar $ parentRealNode nsSnap case queryLocalCache nsSnap cache 1 (getNid nsSnap) of -- empty cache, block until cache changes and then retry (FORWARD s) | Set.null s -> retry - result -> pure (result, parentNode) + result -> pure (result, cache) case lookupResult of -- already joined FOUND _ -> @@ -327,7 +325,8 @@ joinOnNewEntriesThread nsSTM = loop joinResult <- runExceptT $ fediChordVserverJoin nsSTM either -- on join failure, sleep and retry - (const $ threadDelay (confJoinAttemptsInterval . nodeConfig $ parentNode) >> loop) + -- TODO: make delay configurable + (const $ threadDelay (30 * 10^6) >> loop) (const $ pure ()) joinResult @@ -342,15 +341,20 @@ nodeCacheWriter nsSTM = modifyTVar' (nodeCacheSTM ns) cacheModifier +-- TODO: make max entry age configurable +maxEntryAge :: POSIXTime +maxEntryAge = 600 + + -- | Periodically iterate through cache, clean up expired entries and verify unverified ones nodeCacheVerifyThread :: LocalNodeStateSTM s -> IO () nodeCacheVerifyThread nsSTM = forever $ do + putStrLn "cache verify run: begin" -- get cache - (ns, cache, maxEntryAge) <- atomically $ do + (ns, cache) <- atomically $ do ns <- readTVar nsSTM cache <- readTVar $ nodeCacheSTM ns - maxEntryAge <- confMaxNodeCacheAge . nodeConfig <$> readTVar (parentRealNode ns) - pure (ns, cache, maxEntryAge) + pure (ns, cache) -- iterate entries: -- for avoiding too many time syscalls, get current time before iterating. now <- getPOSIXTime @@ -397,7 +401,8 @@ nodeCacheVerifyThread nsSTM = forever $ do forkIO $ sendQueryIdMessages targetID latestNs (Just (1 + jEntriesPerSlice latestNs)) (nodesToQuery targetID) >> pure () -- ask for 1 entry more than j because of querying the middle ) - threadDelay $ fromEnum (maxEntryAge / 20) `div` 10^6 -- convert from pico to milliseconds + putStrLn "cache verify run: end" + threadDelay $ 10^6 * round maxEntryAge `div` 20 -- | Checks the invariant of at least @jEntries@ per cache slice. @@ -463,6 +468,7 @@ stabiliseThread :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> IO () stabiliseThread nsSTM = forever $ do oldNs <- readTVarIO nsSTM + putStrLn "stabilise run: begin" -- iterate through the same snapshot, collect potential new neighbours -- and nodes to be deleted, and modify these changes only at the end of @@ -541,8 +547,9 @@ stabiliseThread nsSTM = forever $ do ) newPredecessor - stabiliseDelay <- confStabiliseInterval . nodeConfig <$> readTVarIO (parentRealNode newNs) - threadDelay stabiliseDelay + putStrLn "stabilise run: end" + -- TODO: make delay configurable + threadDelay (60 * 10^6) where -- | send a stabilise request to the n-th neighbour -- (specified by the provided getter function) and on failure retry @@ -629,15 +636,19 @@ type RequestMap = Map.Map (SockAddr, Integer) RequestMapEntry data RequestMapEntry = RequestMapEntry (Set.Set FediChordMessage) (Maybe Integer) POSIXTime +-- TODO: make purge age configurable +-- | periodically clean up old request parts +responsePurgeAge :: POSIXTime +responsePurgeAge = 60 -- seconds -requestMapPurge :: POSIXTime -> MVar RequestMap -> IO () -requestMapPurge purgeAge mapVar = forever $ do +requestMapPurge :: MVar RequestMap -> IO () +requestMapPurge mapVar = forever $ do rMapState <- takeMVar mapVar now <- getPOSIXTime putMVar mapVar $ Map.filter (\(RequestMapEntry _ _ ts) -> - now - ts < purgeAge + now - ts < responsePurgeAge ) rMapState - threadDelay $ (fromEnum purgeAge * 2) `div` 10^6 + threadDelay $ round responsePurgeAge * 2 * 10^6 -- | Wait for messages, deserialise them, manage parts and acknowledgement status, @@ -652,13 +663,12 @@ fediMessageHandler sendQ recvQ nsSTM = do -- not change. -- Other functions are passed the nsSTM reference and thus can get the latest state. nsSnap <- readTVarIO nsSTM - nodeConf <- nodeConfig <$> readTVarIO (parentRealNode nsSnap) -- handling multipart messages: -- Request parts can be insert into a map (key: (sender IP against spoofing, request ID), value: timestamp + set of message parts, handle all of them when size of set == parts) before being handled. This map needs to be purged periodically by a separate thread and can be protected by an MVar for fairness. requestMap <- newMVar (Map.empty :: RequestMap) -- run receive loop and requestMapPurge concurrently, so that an exception makes -- both of them fail - concurrently_ (requestMapPurge (confResponsePurgeAge nodeConf) requestMap) $ forever $ do + concurrently_ (requestMapPurge requestMap) $ forever $ do -- wait for incoming messages (rawMsg, sourceAddr) <- atomically $ readTQueue recvQ let aMsg = deserialiseMessage rawMsg @@ -797,4 +807,4 @@ lookupCacheCleanup nodeSTM = do now - ts < confMaxLookupCacheAge (nodeConfig node) ) ) - threadDelay $ fromEnum (2 * confMaxLookupCacheAge (nodeConfig node)) `div` 10^6 + threadDelay $ round (confMaxLookupCacheAge $ nodeConfig node) * (10^5) diff --git a/src/Hash2Pub/FediChordTypes.hs b/src/Hash2Pub/FediChordTypes.hs index 4ce20a7..cbd3a58 100644 --- a/src/Hash2Pub/FediChordTypes.hs +++ b/src/Hash2Pub/FediChordTypes.hs @@ -411,22 +411,10 @@ data FediChordConf = FediChordConf -- ^ listening port for the FediChord DHT , confBootstrapNodes :: [(String, PortNumber)] -- ^ list of potential bootstrapping nodes - , confStabiliseInterval :: Int - -- ^ pause between stabilise runs, in milliseconds , confBootstrapSamplingInterval :: Int - -- ^ pause between sampling the own ID through bootstrap nodes, in milliseconds + -- ^ pause between sampling the own ID through bootstrap nodes, in seconds , confMaxLookupCacheAge :: POSIXTime - -- ^ maximum age of key lookup cache entries in seconds - , confJoinAttemptsInterval :: Int - -- ^ interval between join attempts on newly learned nodes, in milliseconds - , confMaxNodeCacheAge :: POSIXTime - -- ^ maximum age of entries in the node cache, in milliseconds - , confResponsePurgeAge :: POSIXTime - -- ^ maximum age of message parts in response part cache, in seconds - , confRequestTimeout :: Int - -- ^ how long to wait until response has arrived, in milliseconds - , confRequestRetries :: Int - -- ^ how often re-sending a timed-out request can be retried + -- ^ maximum age of lookup cache entries in seconds } deriving (Show, Eq) @@ -457,12 +445,6 @@ data ServiceConf = ServiceConf -- ^ listening port for service , confServiceHost :: String -- ^ hostname of service - , confLogfilePath :: String - -- ^ where to store the (measurement) log file - , confStatsEvalDelay :: Int - -- ^ delay between statistic rate measurement samplings, in microseconds - , confSpeedupFactor :: Int - -- While the speedup factor needs to be already included in all } class DHT d where diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index ffeef17..81cf552 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -12,41 +12,32 @@ import Control.Concurrent import Control.Concurrent.Async import Control.Concurrent.STM import Control.Exception (Exception (..), try) -import Control.Monad (foldM, forM, forM_, forever, unless, - void, when) +import Control.Monad (foldM, forM, forM_, forever, void, + when) import Control.Monad.IO.Class (liftIO) import Data.Bifunctor import qualified Data.ByteString.Lazy.UTF8 as BSUL import qualified Data.ByteString.UTF8 as BSU -import qualified Data.DList as D -import Data.Either (lefts, rights) import qualified Data.HashMap.Strict as HMap import qualified Data.HashSet as HSet -import Data.Maybe (fromJust, isJust) +import Data.Maybe (fromMaybe, isJust) import Data.String (fromString) -import Data.Text.Lazy (Text) import qualified Data.Text.Lazy as Txt -import qualified Data.Text.Lazy.IO as TxtI import Data.Text.Normalize (NormalizationMode (NFC), normalize) import Data.Time.Clock.POSIX import Data.Typeable (Typeable) import qualified Network.HTTP.Client as HTTP import qualified Network.HTTP.Types as HTTPT -import System.IO import System.Random import Text.Read (readEither) -import Formatting (fixed, format, int, (%)) import qualified Network.Wai.Handler.Warp as Warp import Servant import Servant.Client import Hash2Pub.FediChordTypes -import Hash2Pub.PostService.API import Hash2Pub.RingMap -import Hash2Pub.Utils -import Debug.Trace data PostService d = PostService { serviceConf :: ServiceConf @@ -57,22 +48,19 @@ data PostService d = PostService -- ^ for each tag store the subscribers + their queue , ownSubscriptions :: TVar (HMap.HashMap NodeID POSIXTime) -- ^ tags subscribed by the own node have an assigned lease time + , ownPosts :: TVar (HSet.HashSet Txt.Text) + -- ^ just store the existence of posts for saving memory, , relayInQueue :: TQueue (Hashtag, PostID, PostContent) -- ^ Queue for processing incoming posts of own instance asynchronously , postFetchQueue :: TQueue PostID - -- ^ queue of posts to be fetched , migrationsInProgress :: TVar (HMap.HashMap NodeID (MVar ())) , httpMan :: HTTP.Manager - , statsQueue :: TQueue StatsEvent - , loadStats :: TVar RelayStats - -- ^ current load stats, replaced periodically - , logFileHandle :: Handle } deriving (Typeable) -type Hashtag = Text -type PostID = Text -type PostContent = Text +type Hashtag = Txt.Text +type PostID = Txt.Text +type PostContent = Txt.Text -- | For each handled tag, store its subscribers and provide a -- broadcast 'TChan' for enqueuing posts type RelayTags = RingMap NodeID (TagSubscribersSTM, TChan PostID, Hashtag) @@ -90,38 +78,26 @@ instance DHT d => Service PostService d where threadVar <- newTVarIO =<< myThreadId -- own thread ID as placeholder subscriberVar <- newTVarIO emptyRMap ownSubsVar <- newTVarIO HMap.empty - --ownPostVar <- newTVarIO HSet.empty + ownPostVar <- newTVarIO HSet.empty relayInQueue' <- newTQueueIO postFetchQueue' <- newTQueueIO migrationsInProgress' <- newTVarIO HMap.empty httpMan' <- HTTP.newManager HTTP.defaultManagerSettings - statsQueue' <- newTQueueIO - loadStats' <- newTVarIO emptyStats - loggingFile <- openFile (confLogfilePath conf) WriteMode - hSetBuffering loggingFile LineBuffering let - thisService = PostService - { serviceConf = conf + thisService = PostService { + serviceConf = conf , baseDHT = dht , serviceThread = threadVar , subscribers = subscriberVar , ownSubscriptions = ownSubsVar - --, ownPosts = ownPostVar + , ownPosts = ownPostVar , relayInQueue = relayInQueue' , postFetchQueue = postFetchQueue' , migrationsInProgress = migrationsInProgress' , httpMan = httpMan' - , statsQueue = statsQueue' - , loadStats = loadStats' - , logFileHandle = loggingFile - } + } port' = fromIntegral (confServicePort conf) warpSettings = Warp.setPort port' . Warp.setHost (fromString . confServiceHost $ conf) $ Warp.defaultSettings - -- log a start message, this also truncates existing files - TxtI.hPutStrLn loggingFile $ Txt.unlines - [ "# Starting mock relay implementation" - , "#time stamp ; relay receive rate ;relay delivery rate ;instance publish rate ;instance fetch rate ;total subscriptions" - ] -- Run 'concurrently_' from another thread to be able to return the -- 'PostService'. -- Terminating that parent thread will make all child threads terminate as well. @@ -129,11 +105,7 @@ instance DHT d => Service PostService d where concurrently_ -- web server (Warp.runSettings warpSettings $ postServiceApplication thisService) - $ concurrently - -- background processing workers - (launchWorkerThreads thisService) - -- statistics/ measurements - (launchStatsThreads thisService) + (processIncomingPosts thisService) -- update thread ID after fork atomically $ writeTVar threadVar servThreadID pure thisService @@ -159,13 +131,38 @@ postServiceApplication :: DHT d => PostService d -> Application postServiceApplication serv = serve exposedPostServiceAPI $ postServer serv +-- | needed for guiding type inference +exposedPostServiceAPI :: Proxy PostServiceAPI +exposedPostServiceAPI = Proxy + -- ========= constants =========== -placeholderPost :: Text +placeholderPost :: Txt.Text placeholderPost = Txt.take 5120 . Txt.repeat $ 'O' -- size 5KiB -- ========= HTTP API and handlers ============= +type PostServiceAPI = "relay" :> "inbox" :> Capture "hashtag" Txt.Text :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent + -- delivery endpoint at responsible relay for delivering posts of $tag for distribution + :<|> "relay" :> "subscribers" :> Capture "senderID" Integer :> ReqBody '[PlainText] Txt.Text :> PostNoContent '[PlainText] Txt.Text + -- endpoint for delivering the subscriptions and outstanding queue + :<|> "post" :> Capture "postid" Txt.Text :> Get '[PlainText] Txt.Text + -- fetch endpoint for posts, full post ID is http://$domain/post/$postid + :<|> "posts" :> ReqBody '[PlainText] Txt.Text :> Post '[PlainText] Txt.Text + -- endpoint for fetching multiple posts at once + :<|> "posts" :> "inbox" :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent + -- delivery endpoint of newly published posts of the relay's instance + :<|> "tags" :> Capture "hashtag" Txt.Text :> ReqBody '[PlainText] Txt.Text :> PostCreated '[PlainText] Txt.Text + -- delivery endpoint for posts of $tag at subscribing instance + :<|> "tags" :> Capture "hashtag" Txt.Text :> "subscribe" :> Header "Origin" Txt.Text :> Get '[PlainText] Integer + -- endpoint for subscribing the instance specified in + -- the Origin header to $hashtag. + -- Returns subscription lease time in seconds. + :<|> "tags" :> Capture "hashtag" Txt.Text :> "unsubscribe" :> Header "Origin" Txt.Text :> Get '[PlainText] Txt.Text + -- endpoint for unsubscribing the instance specified in + -- the Origin header to $hashtag + + postServer :: DHT d => PostService d -> Server PostServiceAPI postServer service = relayInbox service :<|> subscriptionDelivery service @@ -177,8 +174,7 @@ postServer service = relayInbox service :<|> tagUnsubscribe service --- | delivery endpoint: receive posts of a handled tag and enqueue them for relaying -relayInbox :: DHT d => PostService d -> Hashtag -> Text -> Handler NoContent +relayInbox :: DHT d => PostService d -> Hashtag -> Txt.Text -> Handler NoContent relayInbox serv tag posts = do let -- skip checking whether the post actually contains the tag, just drop full post @@ -194,10 +190,8 @@ relayInbox serv tag posts = do -- if noone subscribed to the tag, nothing needs to be done (pure ()) -- otherwise enqueue posts into broadcast queue of the tag - (\queue -> do + (\queue -> liftIO $ forM_ postIDs (atomically . writeTChan queue) - -- report the received post for statistic purposes - liftIO . atomically . writeTQueue (statsQueue serv) $ StatsEvent RelayReceiveEvent (length postIDs) (hashtagToId tag) ) broadcastChan pure NoContent @@ -208,8 +202,7 @@ newtype UnhandledTagException = UnhandledTagException String instance Exception UnhandledTagException --- | delivery endpoint: receives a list of subscribers of tags and their outstanding queues for migration -subscriptionDelivery :: DHT d => PostService d -> Integer -> Text -> Handler Text +subscriptionDelivery :: DHT d => PostService d -> Integer -> Txt.Text -> Handler Txt.Text subscriptionDelivery serv senderID subList = do let tagSubs = Txt.lines subList @@ -243,7 +236,7 @@ subscriptionDelivery serv senderID subList = do Right _ -> pure "" -- TODO: check and only accept tags in own (future?) responsibility where - processTag :: TVar RelayTags -> Text -> STM () + processTag :: TVar RelayTags -> Txt.Text -> STM () processTag subscriberSTM tagData = do let tag:subText:lease:posts:_ = Txt.splitOn "," tagData @@ -254,47 +247,44 @@ subscriptionDelivery serv senderID subList = do enqueueSubscription subscriberSTM (normaliseTag tag) sub postList leaseTime --- | endpoint for fetching a post by its ID -postFetch :: PostService d -> Text -> Handler Text -postFetch serv _ = do - -- decision: for saving memory do not store published posts, just - -- pretend there is a post for each requested ID - liftIO . atomically . writeTQueue (statsQueue serv) $ StatsEvent IncomingPostFetchEvent 1 0 -- tag fetched for is irrelevant - pure placeholderPost +postFetch :: PostService d -> Txt.Text -> Handler Txt.Text +postFetch serv postID = do + postSet <- liftIO . readTVarIO . ownPosts $ serv + if HSet.member postID postSet + -- decision: always return the same placeholder post + then pure placeholderPost + else throwError $ err404 { errBody = "No post found with this ID" } --- | endpoint for fetching multiple posts of this instance by their IDs -postMultiFetch :: PostService d -> Text -> Handler Text +postMultiFetch :: PostService d -> Txt.Text -> Handler Txt.Text postMultiFetch serv postIDs = do - let - idList = Txt.lines postIDs - -- decision: for saving memory do not store published posts, just - -- pretend there is a post for each requested ID - response = foldl (\response' _ -> - placeholderPost <> "\n" <> response' + let idList = Txt.lines postIDs + postSet <- liftIO . readTVarIO . ownPosts $ serv + -- look up existence of all given post IDs, fail if even one is missing + foldM (\response postID -> + if HSet.member postID postSet + then pure $ placeholderPost <> "\n" <> response + else throwError $ err404 { errBody = "No post found with this ID" } ) "" idList - liftIO . atomically . writeTQueue (statsQueue serv) $ StatsEvent IncomingPostFetchEvent (length idList) 0 -- tag fetched for is irrelevant - pure response --- | delivery endpoint: inbox for initially publishing a post at an instance -postInbox :: PostService d -> Text -> Handler NoContent +postInbox :: PostService d -> Txt.Text -> Handler NoContent postInbox serv post = do -- extract contained hashtags let containedTags = fmap (normaliseTag . Txt.tail) . filter ((==) '#' . Txt.head) . Txt.words $ post -- generate post ID postId <- liftIO $ Txt.pack . show <$> (randomRIO (0, 2^(128::Integer)-1) :: IO Integer) - -- decision: for saving memory do not store published post IDs, just deliver a post for any requested ID + -- add ID to own posts + liftIO . atomically $ modifyTVar' (ownPosts serv) (HSet.insert postId) -- enqueue a relay job for each tag - liftIO $ forM_ (containedTags :: [Text]) (\tag -> + liftIO $ forM_ (containedTags :: [Txt.Text]) (\tag -> atomically $ writeTQueue (relayInQueue serv) (tag, postId, post) ) pure NoContent --- | delivery endpoint: receive postIDs of a certain subscribed hashtag -tagDelivery :: PostService d -> Text -> Text -> Handler Text +tagDelivery :: PostService d -> Txt.Text -> Txt.Text -> Handler Txt.Text tagDelivery serv hashtag posts = do let postIDs = Txt.lines posts subscriptions <- liftIO . readTVarIO . ownSubscriptions $ serv @@ -305,9 +295,7 @@ tagDelivery serv hashtag posts = do pure () pure $ "Received a postID for tag " <> hashtag - --- | receive subscription requests to a handled hashtag -tagSubscribe :: DHT d => PostService d -> Text -> Maybe Text -> Handler Integer +tagSubscribe :: DHT d => PostService d -> Txt.Text -> Maybe Txt.Text -> Handler Integer tagSubscribe serv hashtag origin = do responsible <- liftIO $ isResponsibleFor (baseDHT serv) (hashtagToId hashtag) if not responsible @@ -323,12 +311,10 @@ tagSubscribe serv hashtag origin = do let leaseTime = now + confSubscriptionExpiryTime (serviceConf serv) -- setup subscription entry _ <- liftIO . atomically $ setupSubscriberChannel (subscribers serv) hashtag (BSU.toString $ HTTP.host req, HTTP.port req) leaseTime - --liftIO . putStrLn $ "just got a subscription to " <> Txt.unpack hashtag pure $ round leaseTime --- | receive and handle unsubscription requests regarding a handled tag -tagUnsubscribe :: DHT d => PostService d -> Text -> Maybe Text -> Handler Text +tagUnsubscribe :: DHT d => PostService d -> Txt.Text -> Maybe Txt.Text -> Handler Txt.Text tagUnsubscribe serv hashtag origin = do responsible <- liftIO $ isResponsibleFor (baseDHT serv) (hashtagToId hashtag) if not responsible @@ -348,15 +334,8 @@ tagUnsubscribe serv hashtag origin = do clientAPI :: Proxy PostServiceAPI clientAPI = Proxy -relayInboxClient - :<|> subscriptionDeliveryClient - :<|> postFetchClient - :<|> postMultiFetchClient - :<|> postInboxClient - :<|> tagDeliveryClient - :<|> tagSubscribeClient - :<|> tagUnsubscribeClient - = client clientAPI + +relayInboxClient :<|> subscriptionDeliveryClient :<|> postFetchClient :<|> postMultiFetchClient :<|> postInboxClient :<|> tagDeliveryClient :<|> tagSubscribeClient :<|> tagUnsubscribeClient = client clientAPI -- | Deliver the subscriber list of all hashtags in the interval [fromTag, toTag] @@ -426,12 +405,10 @@ clientSubscribeTo serv tag = do Left (FailureResponse _ fresp) |(HTTPT.statusCode . responseStatusCode $ fresp) == 410 && allowRetry -> do -- responsibility gone, force new lookup newRes <- forceLookupKey (baseDHT serv) (Txt.unpack tag) - --putStrLn $ "failed subscribing to " <> Txt.unpack tag <> " on " <> foundHost doSubscribe newRes False Left err -> pure . Left . show $ err Right lease -> do atomically . modifyTVar' (ownSubscriptions serv) $ HMap.insert (hashtagToId tag) (fromInteger lease) - --putStrLn $ "just subscribed to " <> Txt.unpack tag <> " on " <> foundHost pure . Right $ lease ) lookupResponse @@ -565,37 +542,15 @@ lookupTagSubscriptions :: Hashtag -> RingMap NodeID a -> Maybe a lookupTagSubscriptions tag = rMapLookup (hashtagToId tag) --- normalise the unicode representation of a string to NFC and convert to lower case -normaliseTag :: Text -> Text -normaliseTag = Txt.toLower . Txt.fromStrict . normalize NFC . Txt.toStrict +-- normalise the unicode representation of a string to NFC +normaliseTag :: Txt.Text -> Txt.Text +normaliseTag = Txt.fromStrict . normalize NFC . Txt.toStrict -- | convert a hashtag to its representation on the DHT hashtagToId :: Hashtag -> NodeID hashtagToId = genKeyID . Txt.unpack - -readUpToTChan :: Int -> TChan a -> STM [a] -readUpToTChan 0 _ = pure [] -readUpToTChan n chan = do - readFromChan <- tryReadTChan chan - case readFromChan of - Nothing -> pure [] - Just val -> do - moreReads <- readUpToTChan (pred n) chan - pure (val:moreReads) - - -readUpToTQueue :: Int -> TQueue a -> STM [a] -readUpToTQueue 0 _ = pure [] -readUpToTQueue n q = do - readFromQueue <- tryReadTQueue q - case readFromQueue of - Nothing -> pure [] - Just val -> do - moreReads <- readUpToTQueue (pred n) q - pure (val:moreReads) - -- | define how to convert all showable types to PlainText -- No idea what I'm doing with these overlappable instances though ¯\_(ツ)_/¯ -- TODO: figure out how this overlapping stuff actually works https://downloads.haskell.org/~ghc/latest/docs/html/users_guide/glasgow_exts.html#instance-overlap @@ -607,78 +562,36 @@ instance {-# OVERLAPPABLE #-} Read a => MimeUnrender PlainText a where -- ====== worker threads ====== --- TODO: make configurable -numParallelDeliveries = 10 - -launchWorkerThreads :: DHT d => PostService d -> IO () -launchWorkerThreads serv = concurrently_ - (processIncomingPosts serv) - $ concurrently_ - (purgeSubscriptionsThread serv) - $ concurrently_ - (fetchTagPosts serv) - (relayWorker serv) - - --- | periodically remove expired subscription entries from relay subscribers -purgeSubscriptionsThread :: PostService d -> IO () -purgeSubscriptionsThread serv = forever $ do - -- read config - now <- getPOSIXTime - let - purgeInterval = confSubscriptionExpiryTime (serviceConf serv) / 10 - -- no need to atomically lock this, as newly incoming subscriptions do not - -- need to be purged - tagMap <- readTVarIO $ subscribers serv - forM_ tagMap $ \(subscriberMapSTM, _, _) -> - -- but each subscriberMap needs to be modified atomically - atomically . modifyTVar' subscriberMapSTM $ HMap.filter (\(_, ts) -> ts > now) - threadDelay $ fromEnum purgeInterval `div` 10^6 - - -- | process the pending relay inbox of incoming posts from the internal queue: -- Look up responsible relay node for given hashtag and forward post to it processIncomingPosts :: DHT d => PostService d -> IO () processIncomingPosts serv = forever $ do -- blocks until available - deliveriesToProcess <- atomically $ do - readResult <- readUpToTQueue numParallelDeliveries $ relayInQueue serv - if null readResult - then retry - else pure readResult - runningJobs <- forM deliveriesToProcess $ \(tag, pID, pContent) -> async $ do - let pIdUri = "http://" <> (Txt.pack . confServiceHost . serviceConf $ serv) <> ":" <> (fromString . show . confServicePort . serviceConf $ serv) <> "/post/" <> pID - lookupRes <- lookupKey (baseDHT serv) (Txt.unpack tag) - case lookupRes of - -- no vserver active => wait and retry - Nothing -> threadDelay (10 * 10^6) >> pure (Left "no vserver active") - Just (responsibleHost, responsiblePort) -> do - resp <- runClientM (relayInboxClient tag $ pIdUri <> "," <> pContent) (mkClientEnv (httpMan serv) (BaseUrl Http responsibleHost (fromIntegral responsiblePort) "")) - case resp of - Left err -> do - -- 410 error indicates outdated responsibility mapping - -- Simplification: just invalidate the mapping entry on all errors, force a re-lookup and re-queue the post - -- TODO: keep track of maximum retries - _ <- forceLookupKey (baseDHT serv) (Txt.unpack tag) - atomically . writeTQueue (relayInQueue serv) $ (tag, pID, pContent) - pure . Left $ "Error: " <> show err - Right _ -> do - -- idea for the experiment: each post publication makes the initial posting instance subscribe to all contained tags - now <- getPOSIXTime - subscriptionStatus <- HMap.lookup (hashtagToId tag) <$> readTVarIO (ownSubscriptions serv) - -- if not yet subscribed or subscription expires within 5 minutes, (re)subscribe to tag - when (maybe True (\subLease -> now - subLease < 300) subscriptionStatus) $ - void $ clientSubscribeTo serv tag - - -- for evaluation, return the tag of the successfully forwarded post - pure $ Right tag - - -- collect async results - results <- mapM waitCatch runningJobs - -- report the count of published posts for statistics - atomically . writeTQueue (statsQueue serv) $ StatsEvent PostPublishEvent (length . rights $ results) 0 -- hashtag published to doesn't matter - pure () - + -- TODO: process multiple in parallel + (tag, pID, pContent) <- atomically . readTQueue $ relayInQueue serv + let pIdUri = "http://" <> (Txt.pack . confServiceHost . serviceConf $ serv) <> ":" <> (fromString . show . confServicePort . serviceConf $ serv) <> "/post/" <> pID + lookupRes <- lookupKey (baseDHT serv) (Txt.unpack tag) + case lookupRes of + -- no vserver active => wait and retry + Nothing -> threadDelay $ 10 * 10^6 + Just (responsibleHost, responsiblePort) -> do + resp <- runClientM (relayInboxClient tag $ pIdUri <> "," <> pContent) (mkClientEnv (httpMan serv) (BaseUrl Http responsibleHost (fromIntegral responsiblePort) "")) + case resp of + Left err -> do + putStrLn $ "Error: " <> show err + -- 410 error indicates outdated responsibility mapping + -- Simplification: just invalidate the mapping entry on all errors, force a re-lookup and re-queue the post + -- TODO: keep track of maximum retries + _ <- forceLookupKey (baseDHT serv) (Txt.unpack tag) + atomically . writeTQueue (relayInQueue serv) $ (tag, pID, pContent) + Right _ -> do + -- TODO: stats + -- idea for the experiment: each post publication makes the initial posting instance subscribe to all contained tags + now <- getPOSIXTime + subscriptionStatus <- HMap.lookup (hashtagToId tag) <$> readTVarIO (ownSubscriptions serv) + -- if not yet subscribed or subscription expires within 2 minutes, (re)subscribe to tag + when (maybe False (\subLease -> now - subLease < 120) subscriptionStatus) $ + void $ clientSubscribeTo serv tag -- | process the pending fetch jobs of delivered post IDs: Delivered posts are tried to be fetched from their URI-ID @@ -688,197 +601,18 @@ fetchTagPosts serv = forever $ do -- TODO: batching, retry -- TODO: process multiple in parallel pIdUri <- atomically . readTQueue $ postFetchQueue serv - fetchReq <- HTTP.parseRequest . Txt.unpack $ pIdUri + fetchReq <- HTTP.parseRequest . Txt.unpack $pIdUri resp <- try $ HTTP.httpLbs fetchReq (httpMan serv) :: IO (Either HTTP.HttpException (HTTP.Response BSUL.ByteString)) case resp of Right response -> - -- TODO error handling, retry - --if HTTPT.statusCode (HTTP.responseStatus response) == 200 - -- then - -- -- success, TODO: statistics - -- else - pure () + if HTTPT.statusCode (HTTP.responseStatus response) == 200 + then + -- success, TODO: statistics + putStrLn "post fetch success" + else + -- TODO error handling, retry + pure () Left _ -> -- TODO error handling, retry pure () - -relayWorker :: PostService d -> IO () -relayWorker serv = forever $ do - -- atomically (to be able to retry) fold a list of due delivery actions - jobsToProcess <- atomically $ do - subscriptionMap <- readTVar $ subscribers serv - jobList <- D.toList <$> foldM (\jobAcc (subscriberMapSTM, _, tag) -> do - subscriberMap <- readTVar subscriberMapSTM - foldM (\jobAcc' ((subHost, subPort), (postChan, _)) -> do - postsToDeliver <- readUpToTChan 500 postChan - let postDeliveryAction = runClientM (tagDeliveryClient tag (Txt.unlines postsToDeliver)) (mkClientEnv (httpMan serv) (BaseUrl Http subHost (fromIntegral subPort) "")) - -- append relay push job to job list - pure $ if not (null postsToDeliver) - then jobAcc' `D.snoc` (do - deliveryResult <- postDeliveryAction - either - (const $ pure ()) - -- on successful push, record that event for statistics - (const . atomically . writeTQueue (statsQueue serv) $ StatsEvent RelayDeliveryEvent (length postsToDeliver) (hashtagToId tag)) - deliveryResult - pure deliveryResult - ) - else jobAcc' - ) jobAcc $ HMap.toList subscriberMap - ) D.empty subscriptionMap - -- if no relay jobs, then retry - if null jobList - then retry - else pure jobList - - -- when processing the list, send several deliveries in parallel - forM_ (chunksOf numParallelDeliveries jobsToProcess) $ \jobset -> do - runningJobs <- mapM async jobset - -- so far just dropping failed attempts, TODO: retry mechanism - results <- mapM waitCatch runningJobs - let - successfulResults = rights results - unsuccessfulResults = lefts results - unless (null unsuccessfulResults) $ putStrLn ("ERR: " <> show (length unsuccessfulResults) <> " failed deliveries!") - putStrLn $ "successfully relayed " <> show (length successfulResults) - pure () - - --- ======= statistics/measurement and logging ======= - -data StatsEventType = PostPublishEvent - | RelayReceiveEvent - | RelayDeliveryEvent - | IncomingPostFetchEvent - deriving (Enum, Show, Eq) - --- | Represents measurement event of a 'StatsEventType' with a count relevant for a certain key -data StatsEvent = StatsEvent StatsEventType Int NodeID - deriving (Show, Eq) - - --- | measured rates of relay performance --- TODO: maybe include other metrics in here as well, like number of subscribers? -data RelayStats = RelayStats - { relayReceiveRates :: RingMap NodeID Double - -- ^ rate of incoming posts in the responsibility of this relay - , relayDeliveryRates :: RingMap NodeID Double - -- ^ rate of relayed outgoing posts - , postFetchRate :: Double -- no need to differentiate between tags - -- ^ number of post-fetches delivered - , postPublishRate :: Double - -- ^ rate of initially publishing posts through this instance - } - deriving (Show, Eq) - - - -launchStatsThreads :: PostService d -> IO () -launchStatsThreads serv = do - -- create shared accumulator - sharedAccum <- newTVarIO emptyStats - concurrently_ - (accumulateStatsThread sharedAccum $ statsQueue serv) - (evaluateStatsThread serv sharedAccum) - - --- | Read stats events from queue and add them to a shared accumulator. --- Instead of letting the events accumulate in the queue and allocate linear memory, immediately fold the result. -accumulateStatsThread :: TVar RelayStats -> TQueue StatsEvent -> IO () -accumulateStatsThread statsAccumulator statsQ = forever $ do - -- blocks until stats event arrives - event <- atomically $ readTQueue statsQ - -- add the event number to current accumulator - atomically $ modifyTVar' statsAccumulator $ statsAdder event - - --- | add incoming stats events to accumulator value -statsAdder :: StatsEvent -> RelayStats -> RelayStats -statsAdder event stats = case event of - StatsEvent PostPublishEvent num _ -> - stats {postPublishRate = fromIntegral num + postPublishRate stats} - StatsEvent RelayReceiveEvent num key -> - stats {relayReceiveRates = sumIfEntryExists key (fromIntegral num) (relayReceiveRates stats)} - StatsEvent RelayDeliveryEvent num key -> - stats {relayDeliveryRates = sumIfEntryExists key (fromIntegral num) (relayDeliveryRates stats)} - StatsEvent IncomingPostFetchEvent num _ -> - stats {postFetchRate = fromIntegral num + postFetchRate stats} - where - sumIfEntryExists = addRMapEntryWith (\newVal oldVal -> - let toInsert = fromJust $ extractRingEntry newVal - in - case oldVal of - KeyEntry n -> KeyEntry (n + toInsert) - ProxyEntry pointer (Just (KeyEntry n)) -> ProxyEntry pointer (Just (KeyEntry $ n + toInsert)) - ProxyEntry pointer Nothing -> ProxyEntry pointer (Just newVal) - _ -> error "RingMap nested too deeply" - ) - - --- Periodically exchange the accumulated statistics with empty ones, evaluate them --- and make them the current statistics of the service. -evaluateStatsThread :: PostService d -> TVar RelayStats -> IO () -evaluateStatsThread serv statsAcc = getPOSIXTime >>= loop - where - loop previousTs = do - threadDelay $ confStatsEvalDelay (serviceConf serv) - -- get and reset the stats accumulator - summedStats <- atomically $ do - stats <- readTVar statsAcc - writeTVar statsAcc emptyStats - pure stats - -- as the transaction might retry several times, current time needs to - -- be read afterwards - now <- getPOSIXTime - -- evaluate stats rate and replace server stats - -- persistently store in a TVar so it can be retrieved later by the DHT - let timePassed = (now - previousTs) * fromIntegral (confSpeedupFactor $ serviceConf serv) - rateStats = evaluateStats timePassed summedStats - atomically $ writeTVar (loadStats serv) rateStats - -- and now what? write a log to file - -- format: total relayReceiveRates;total relayDeliveryRates;postFetchRate;postPublishRate; subscriberSum - -- later: current (reported) load, target load - subscriberSum <- sumSubscribers - TxtI.hPutStrLn (logFileHandle serv) $ - format (fixed 9 % ";" % fixed 20 % ";" % fixed 20 % ";" % fixed 20 % ";" % fixed 20 % ";" % int ) - (realToFrac now :: Double) - (sum . relayReceiveRates $ rateStats) - (sum . relayDeliveryRates $ rateStats) - (postPublishRate rateStats) - (postFetchRate rateStats) - subscriberSum - loop now - - sumSubscribers = do - tagMap <- readTVarIO $ subscribers serv - foldM (\subscriberSum (subscriberMapSTM, _, _) -> do - subscriberMap <- readTVarIO subscriberMapSTM - pure $ subscriberSum + HMap.size subscriberMap - ) - 0 tagMap - - --- | Evaluate the accumulated statistic events: Currently mostly calculates the event --- rates by dividing through the collection time frame -evaluateStats :: POSIXTime -> RelayStats -> RelayStats -evaluateStats timeInterval summedStats = - -- first sum all event numbers, then divide through number of seconds passed to - -- get rate per second - RelayStats - { relayReceiveRates = (/ intervalSeconds) <$> relayReceiveRates summedStats - , relayDeliveryRates = (/ intervalSeconds) <$> relayDeliveryRates summedStats - , postPublishRate = postPublishRate summedStats / intervalSeconds - , postFetchRate = postFetchRate summedStats / intervalSeconds - } - where - intervalSeconds = realToFrac timeInterval - - -emptyStats :: RelayStats -emptyStats = RelayStats - { relayReceiveRates = emptyRMap - , relayDeliveryRates = emptyRMap - , postFetchRate = 0 - , postPublishRate = 0 - } diff --git a/src/Hash2Pub/PostService/API.hs b/src/Hash2Pub/PostService/API.hs deleted file mode 100644 index 1484631..0000000 --- a/src/Hash2Pub/PostService/API.hs +++ /dev/null @@ -1,37 +0,0 @@ -{-# LANGUAGE DataKinds #-} -{-# LANGUAGE FlexibleInstances #-} -{-# LANGUAGE InstanceSigs #-} -{-# LANGUAGE MultiParamTypeClasses #-} -{-# LANGUAGE OverloadedStrings #-} -{-# LANGUAGE RankNTypes #-} -{-# LANGUAGE TypeOperators #-} -module Hash2Pub.PostService.API where - -import Data.Text.Lazy (Text) - -import Servant - -type PostServiceAPI = "relay" :> "inbox" :> Capture "hashtag" Text :> ReqBody '[PlainText] Text :> PutCreated '[PlainText] NoContent - -- delivery endpoint at responsible relay for delivering posts of $tag for distribution - :<|> "relay" :> "subscribers" :> Capture "senderID" Integer :> ReqBody '[PlainText] Text :> PostNoContent '[PlainText] Text - -- endpoint for delivering the subscriptions and outstanding queue - :<|> "post" :> Capture "postid" Text :> Get '[PlainText] Text - -- fetch endpoint for posts, full post ID is http://$domain/post/$postid - :<|> "posts" :> ReqBody '[PlainText] Text :> Post '[PlainText] Text - -- endpoint for fetching multiple posts at once - :<|> "posts" :> "inbox" :> ReqBody '[PlainText] Text :> PutCreated '[PlainText] NoContent - -- delivery endpoint of newly published posts of the relay's instance - :<|> "tags" :> Capture "hashtag" Text :> ReqBody '[PlainText] Text :> PostCreated '[PlainText] Text - -- delivery endpoint for posts of $tag at subscribing instance - :<|> "tags" :> Capture "hashtag" Text :> "subscribe" :> Header "Origin" Text :> Get '[PlainText] Integer - -- endpoint for subscribing the instance specified in - -- the Origin header to $hashtag. - -- Returns subscription lease time in seconds. - :<|> "tags" :> Capture "hashtag" Text :> "unsubscribe" :> Header "Origin" Text :> Get '[PlainText] Text - -- endpoint for unsubscribing the instance specified in - -- the Origin header to $hashtag - --- | needed for guiding type inference -exposedPostServiceAPI :: Proxy PostServiceAPI -exposedPostServiceAPI = Proxy - diff --git a/src/Hash2Pub/ProtocolTypes.hs b/src/Hash2Pub/ProtocolTypes.hs index a5af10c..86825a7 100644 --- a/src/Hash2Pub/ProtocolTypes.hs +++ b/src/Hash2Pub/ProtocolTypes.hs @@ -1,5 +1,7 @@ module Hash2Pub.ProtocolTypes where +import qualified Data.Map as Map +import Data.Maybe (mapMaybe) import qualified Data.Set as Set import Data.Time.Clock.POSIX (POSIXTime) diff --git a/src/Hash2Pub/RingMap.hs b/src/Hash2Pub/RingMap.hs index a2fe3ae..e99f8b2 100644 --- a/src/Hash2Pub/RingMap.hs +++ b/src/Hash2Pub/RingMap.hs @@ -23,30 +23,7 @@ instance (Bounded k, Ord k, Eq a) => Eq (RingMap k a) where a == b = getRingMap a == getRingMap b instance (Bounded k, Ord k, Show k, Show a) => Show (RingMap k a) where - show rmap = shows ("RingMap " :: String) (show $ getRingMap rmap) - - -instance (Bounded k, Ord k) => Functor (RingMap k) where - -- | map a function over all payload values of a 'RingMap' - fmap f = RingMap . Map.map traversingF . getRingMap - where - traversingF (KeyEntry a) = KeyEntry (f a) - traversingF (ProxyEntry pointer (Just entry)) = ProxyEntry pointer (Just $ traversingF entry) - traversingF (ProxyEntry pointer Nothing) = ProxyEntry pointer Nothing - - -instance (Bounded k, Ord k) => Foldable (RingMap k) where - foldr f initVal = Map.foldr traversingFR initVal . getRingMap - where - traversingFR (KeyEntry a) acc = f a acc - traversingFR (ProxyEntry _ Nothing) acc = acc - traversingFR (ProxyEntry _ (Just entry)) acc = traversingFR entry acc - foldl f initVal = Map.foldl traversingFL initVal . getRingMap - where - traversingFL acc (KeyEntry a) = f acc a - traversingFL acc (ProxyEntry _ Nothing) = acc - traversingFL acc (ProxyEntry _ (Just entry)) = traversingFL acc entry - + show rmap = shows "RingMap " (show $ getRingMap rmap) -- | entry of a 'RingMap' that holds a value and can also -- wrap around the lookup direction at the edges of the name space. @@ -156,7 +133,7 @@ rMapLookupPred :: (Bounded k, Ord k, Num k) rMapLookupPred = lookupWrapper Map.lookupLT Map.lookupLE Backwards addRMapEntryWith :: (Bounded k, Ord k) - => (RingEntry k a -> RingEntry k a -> RingEntry k a) -- ^ f new_value mold_value + => (RingEntry k a -> RingEntry k a -> RingEntry k a) -> k -- ^ key -> a -- ^ value -> RingMap k a @@ -230,7 +207,7 @@ takeEntriesUntil_ :: (Integral i, Bounded k, Ord k) -> Maybe i -- possible number limit -> [a] -> [a] -takeEntriesUntil_ _rmap' _getterFunc' _havingReached _previousEntry (Just remaining) takeAcc +takeEntriesUntil_ rmap' getterFunc' havingReached previousEntry (Just remaining) takeAcc -- length limit reached | remaining <= 0 = takeAcc takeEntriesUntil_ rmap' getterFunc' havingReached previousEntry numLimit takeAcc =