Compare commits

..

No commits in common. "mainline" and "data_migration" have entirely different histories.

13 changed files with 296 additions and 750 deletions

View file

@ -46,8 +46,8 @@ category: Network
extra-source-files: CHANGELOG.md extra-source-files: CHANGELOG.md
common deps common deps
build-depends: base >=4, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=3.1, time, cmdargs ^>= 0.10, cryptonite, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms, http-client, http-types, unbounded-delays, dlist, formatting build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms, http-client, http-types
ghc-options: -Wall -Wpartial-fields -O2 ghc-options: -Wall
@ -58,7 +58,7 @@ library
exposed-modules: Hash2Pub.FediChord, Hash2Pub.FediChordTypes, Hash2Pub.DHTProtocol, Hash2Pub.ASN1Coding, Hash2Pub.ProtocolTypes, Hash2Pub.PostService, Hash2Pub.RingMap exposed-modules: Hash2Pub.FediChord, Hash2Pub.FediChordTypes, Hash2Pub.DHTProtocol, Hash2Pub.ASN1Coding, Hash2Pub.ProtocolTypes, Hash2Pub.PostService, Hash2Pub.RingMap
-- Modules included in this library but not exported. -- Modules included in this library but not exported.
other-modules: Hash2Pub.Utils, Hash2Pub.PostService.API other-modules: Hash2Pub.Utils
-- LANGUAGE extensions used by modules in this package. -- LANGUAGE extensions used by modules in this package.
other-extensions: GeneralizedNewtypeDeriving, DataKinds, OverloadedStrings other-extensions: GeneralizedNewtypeDeriving, DataKinds, OverloadedStrings
@ -91,21 +91,7 @@ executable Hash2Pub
-- Base language which the package is written in. -- Base language which the package is written in.
default-language: Haskell2010 default-language: Haskell2010
ghc-options: -threaded -rtsopts -with-rtsopts=-N ghc-options: -threaded
executable Experiment
-- experiment runner
import: deps
build-depends: Hash2Pub
main-is: Experiment.hs
hs-source-dirs: app
default-language: Haskell2010
ghc-options: -threaded
test-suite Hash2Pub-test test-suite Hash2Pub-test

View file

@ -1,7 +1,7 @@
# Hash2Pub # Hash2Pub
***This is heavily WIP and does not provide any useful functionality yet***. ***This is heavily WIP and does not provide any useful functionality yet***.
I aim for always having the `mainline` branch in a state where it builds and tests pass. I aim for always having the master branch at a state where it builds and tests pass.
A fully-decentralised relay for global hashtag federation in [ActivityPub](https://activitypub.rocks) based on a distributed hash table. A fully-decentralised relay for global hashtag federation in [ActivityPub](https://activitypub.rocks) based on a distributed hash table.
It allows querying and subscribing to all posts of a certain hashtag and is implemented in Haskell. It allows querying and subscribing to all posts of a certain hashtag and is implemented in Haskell.
@ -10,13 +10,8 @@ This is the practical implementation of the concept presented in the paper [Dece
The ASN.1 module schema used for DHT messages can be found in `FediChord.asn1`. The ASN.1 module schema used for DHT messages can be found in `FediChord.asn1`.
For further questions and discussins, please refer to the **Hash2Pub topic in [SocialHub](https://socialhub.activitypub.rocks/c/software/hash2pub/48)**.
## Building ## Building
The project and its developent environment are built with [Nix](https://nixos.org/nix/). The project and its developent environment are built with [Nix](https://nixos.org/nix/).
The development environment can be entered with `nix-shell shell-minimal.nix`. Then the project can be built with `cabal build` from within the environment, or using `nix-shell --command "cabal build" shell-minimal.nix` to do both steps at once. The development environment can be entered with `nix-shell`. Then the project can be built with `cabal build` from within the environment, or using `nix-shell --command "cabal build"` to do both steps at once.
While the `shell-minimal.nix` environment contains everything necessary for building and testing this project, the `shell.nix` additionally contains the Haskell IDE engine *hie* and the documentation for all used Haskell packages for more convenient development.
Be aware that these need to be build from source and can take a very long time to build.

View file

@ -1,51 +0,0 @@
{-# LANGUAGE OverloadedStrings #-}
module Main where
import Control.Concurrent
import Control.Monad (forM_)
import qualified Data.Text.Lazy as Txt
import qualified Data.Text.Lazy.IO as TxtI
import qualified Network.HTTP.Client as HTTP
import System.Environment (getArgs)
import Hash2Pub.PostService (Hashtag, clientPublishPost)
-- configuration constants
timelineFile = "../simulationData/inputs/generated/timeline_sample.csv"
main :: IO ()
main = do
-- read CLI parameters
speedupStr : _ <- getArgs
-- read and parse timeline schedule
-- relying on lazyness of HaskellIO, hoping it does not introduce too strong delays
postEvents <- parseSchedule <$> TxtI.readFile timelineFile
-- actually schedule and send the post events
executeSchedule (read speedupStr) postEvents
pure ()
parseSchedule :: Txt.Text
-> [(Int, Hashtag, (String, Int))] -- ^ [(delay in microseconds, hashtag, (hostname, port))]
parseSchedule = fmap (parseEntry . Txt.split (== ';')) . Txt.lines
where
parseEntry [delayT, contactT, tag] =
(read $ Txt.unpack delayT, tag, read $ Txt.unpack contactT)
parseEntry entry = error $ "invalid schedule input format: " <> show entry
executeSchedule :: Int -- ^ speedup factor
-> [(Int, Hashtag, (String, Int))] -- ^ [(delay in microseconds, hashtag, (hostname, port))]
-> IO ()
executeSchedule speedup events = do
-- initialise HTTP manager
httpMan <- HTTP.newManager $ HTTP.defaultManagerSettings { HTTP.managerResponseTimeout = HTTP.responseTimeoutMicro 60000000 }
forM_ events $ \(delay, tag, (pubHost, pubPort)) -> do
_ <- forkIO $
clientPublishPost httpMan pubHost pubPort ("foobar #" <> tag)
>>= either putStrLn (const $ pure ())
-- while threadDelay gives only minimum delay guarantees, let's hope the
-- additional delays are negligible
-- otherwise: evaluate usage of https://hackage.haskell.org/package/schedule-0.3.0.0/docs/Data-Schedule.html
threadDelay $ delay `div` speedup

View file

@ -45,36 +45,21 @@ main = do
readConfig :: IO (FediChordConf, ServiceConf) readConfig :: IO (FediChordConf, ServiceConf)
readConfig = do readConfig = do
confDomainString : ipString : portString : servicePortString : speedupString : remainingArgs <- getArgs confDomainString : ipString : portString : bootstrapHost : bootstrapPortString : servicePortString : speedup : _ <- getArgs
-- allow starting the initial node without bootstrapping info to avoid
-- waiting for timeout
let let
speedup = read speedupString fConf = FediChordConf {
confBootstrapNodes' = case remainingArgs of confDomain = confDomainString
bootstrapHost : bootstrapPortString : _ -> , confIP = toHostAddress6 . read $ ipString
[(bootstrapHost, read bootstrapPortString)] , confDhtPort = read portString
_ -> [] , confBootstrapNodes = [(bootstrapHost, read bootstrapPortString)]
fConf = FediChordConf --, confStabiliseInterval = 60
{ confDomain = confDomainString , confBootstrapSamplingInterval = 180
, confIP = toHostAddress6 . read $ ipString , confMaxLookupCacheAge = 300
, confDhtPort = read portString }
, confBootstrapNodes = confBootstrapNodes' sConf = ServiceConf {
, confStabiliseInterval = 80 * 10^6 confSubscriptionExpiryTime = fromIntegral $ 2*3600 `div` (read speedup :: Integer)
, confBootstrapSamplingInterval = 180 * 10^6 `div` speedup , confServicePort = read servicePortString
, confMaxLookupCacheAge = 300 / fromIntegral speedup , confServiceHost = confDomainString
, confJoinAttemptsInterval = 60 * 10^6 `div` speedup }
, confMaxNodeCacheAge = 600 / fromIntegral speedup
, confResponsePurgeAge = 60 / fromIntegral speedup
, confRequestTimeout = 5 * 10^6 `div` speedup
, confRequestRetries = 3
}
sConf = ServiceConf
{ confSubscriptionExpiryTime = 24*3600 / fromIntegral speedup
, confServicePort = read servicePortString
, confServiceHost = confDomainString
, confLogfilePath = "../simulationData/logs/" <> confDomainString <> ".log"
, confSpeedupFactor = speedup
, confStatsEvalDelay = 120 * 10^6 `div` speedup
}
pure (fConf, sConf) pure (fConf, sConf)

View file

@ -1,18 +1,26 @@
{ {
compiler ? "ghc884" compiler ? "ghc865",
withHIE ? false
}: }:
let let
# pin all-hies for getting the language server
all-hies = fetchTarball {
url = "https://github.com/infinisil/all-hies/tarball/b8fb659620b99b4a393922abaa03a1695e2ca64d";
sha256 = "sha256:0br6wsqpfk1lzz90f7zw439w1ir2p54268qilw9l2pk6yz7ganfx";
};
pkgs = import ( pkgs = import (
builtins.fetchGit { builtins.fetchGit {
name = "nixpkgs-pinned"; name = "nixpkgs-pinned";
url = https://github.com/NixOS/nixpkgs/; url = https://github.com/NixOS/nixpkgs/;
ref = "refs/heads/release-20.09"; ref = "refs/heads/release-20.03";
rev = "e065200fc90175a8f6e50e76ef10a48786126e1c"; rev = "076c67fdea6d0529a568c7d0e0a72e6bc161ecf5";
}) { }) {
# Pass no config for purity # Pass no config for purity
config = {}; config = {};
overlays = []; overlays = [
(import all-hies {}).overlay
];
}; };
hp = pkgs.haskell.packages."${compiler}"; hp = pkgs.haskell.packages."${compiler}";
src = pkgs.nix-gitignore.gitignoreSource [] ./.; src = pkgs.nix-gitignore.gitignoreSource [] ./.;
@ -30,6 +38,7 @@ in
hlint hlint
stylish-haskell stylish-haskell
pkgs.python3Packages.asn1ate pkgs.python3Packages.asn1ate
]; ]
++ (if withHIE then [ hie ] else []);
}; };
} }

View file

@ -1 +1 @@
(import ./default.nix {}).shell (import ./default.nix {withHIE = true;}).shell

View file

@ -49,11 +49,9 @@ import Control.Concurrent.STM.TQueue
import Control.Concurrent.STM.TVar import Control.Concurrent.STM.TVar
import Control.Exception import Control.Exception
import Control.Monad (foldM, forM, forM_, void, when) import Control.Monad (foldM, forM, forM_, void, when)
import Control.Monad.Except (MonadError (..), runExceptT)
import Control.Monad.IO.Class (MonadIO (..))
import qualified Data.ByteString as BS import qualified Data.ByteString as BS
import Data.Either (rights) import Data.Either (rights)
import Data.Foldable (foldl', foldr', foldrM) import Data.Foldable (foldl', foldr')
import Data.Functor.Identity import Data.Functor.Identity
import Data.IP (IPv6, fromHostAddress6, import Data.IP (IPv6, fromHostAddress6,
toHostAddress6) toHostAddress6)
@ -108,6 +106,9 @@ queryLocalCache ownState nCache lBestNodes targetID
-- the closest succeeding node (like with the p initiated parallel queries -- the closest succeeding node (like with the p initiated parallel queries
| otherwise = FORWARD $ closestSuccessor `Set.union` closestCachePredecessors (lBestNodes-1) targetID nCache | otherwise = FORWARD $ closestSuccessor `Set.union` closestCachePredecessors (lBestNodes-1) targetID nCache
where where
ownID = getNid ownState
preds = predecessors ownState
closestSuccessor :: Set.Set RemoteCacheEntry closestSuccessor :: Set.Set RemoteCacheEntry
closestSuccessor = maybe Set.empty (Set.singleton . toRemoteCacheEntry) $ cacheLookupSucc targetID nCache closestSuccessor = maybe Set.empty (Set.singleton . toRemoteCacheEntry) $ cacheLookupSucc targetID nCache
@ -258,6 +259,7 @@ handleIncomingRequest :: Service s (RealNodeSTM s)
-> SockAddr -- ^ source address of the request -> SockAddr -- ^ source address of the request
-> IO () -> IO ()
handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do
putStrLn $ "handling incoming request: " <> show msgSet
ns <- readTVarIO nsSTM ns <- readTVarIO nsSTM
-- add nodestate to cache -- add nodestate to cache
now <- getPOSIXTime now <- getPOSIXTime
@ -312,6 +314,7 @@ handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do
-- | execute a key ID lookup on local cache and respond with the result -- | execute a key ID lookup on local cache and respond with the result
respondQueryID :: LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString) respondQueryID :: LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
respondQueryID nsSTM msgSet = do respondQueryID nsSTM msgSet = do
putStrLn "responding to a QueryID request"
-- this message cannot be split reasonably, so just -- this message cannot be split reasonably, so just
-- consider the first payload -- consider the first payload
let let
@ -432,9 +435,7 @@ respondJoin nsSTM msgSet = do
let let
aRequestPart = Set.elemAt 0 msgSet aRequestPart = Set.elemAt 0 msgSet
senderNS = sender aRequestPart senderNS = sender aRequestPart
-- if not joined yet, attract responsibility for responsibilityLookup = queryLocalCache nsSnap cache 1 (getNid senderNS)
-- all keys to make bootstrapping possible
responsibilityLookup = if isJoined nsSnap then queryLocalCache nsSnap cache 1 (getNid senderNS) else FOUND (toRemoteNodeState nsSnap)
thisNodeResponsible (FOUND _) = True thisNodeResponsible (FOUND _) = True
thisNodeResponsible (FORWARD _) = False thisNodeResponsible (FORWARD _) = False
-- check whether the joining node falls into our responsibility -- check whether the joining node falls into our responsibility
@ -488,11 +489,10 @@ requestJoin :: (NodeState a, Service s (RealNodeSTM s)) => a -- ^ cu
requestJoin toJoinOn ownStateSTM = do requestJoin toJoinOn ownStateSTM = do
ownState <- readTVarIO ownStateSTM ownState <- readTVarIO ownStateSTM
prn <- readTVarIO $ parentRealNode ownState prn <- readTVarIO $ parentRealNode ownState
nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ownState) srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ownState)
let srcAddr = confIP nodeConf
bracket (mkSendSocket srcAddr (getDomain toJoinOn) (getDhtPort toJoinOn)) close (\sock -> do bracket (mkSendSocket srcAddr (getDomain toJoinOn) (getDhtPort toJoinOn)) close (\sock -> do
-- extract own state for getting request information -- extract own state for getting request information
responses <- sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid -> Request rid (toRemoteNodeState ownState) 1 True Join (Just JoinRequestPayload)) sock responses <- sendRequestTo 5000 3 (\rid -> Request rid (toRemoteNodeState ownState) 1 True Join (Just JoinRequestPayload)) sock
(cacheInsertQ, joinedState) <- atomically $ do (cacheInsertQ, joinedState) <- atomically $ do
stateSnap <- readTVar ownStateSTM stateSnap <- readTVar ownStateSTM
let let
@ -517,28 +517,28 @@ requestJoin toJoinOn ownStateSTM = do
([], Set.empty, Set.empty) ([], Set.empty, Set.empty)
responses responses
-- sort, slice and set the accumulated successors and predecessors -- sort, slice and set the accumulated successors and predecessors
-- the contacted node itself is a successor as well and, with few newState = setSuccessors (Set.elems succAccSet) . setPredecessors (Set.elems predAccSet) $ stateSnap
-- nodes, can be a predecessor as well
newState = setSuccessors (toRemoteNodeState toJoinOn:Set.elems succAccSet) . setPredecessors (toRemoteNodeState toJoinOn:Set.elems predAccSet) $ stateSnap
writeTVar ownStateSTM newState writeTVar ownStateSTM newState
pure (cacheInsertQ, newState) pure (cacheInsertQ, newState)
-- execute the cache insertions -- execute the cache insertions
mapM_ (\f -> f joinedState) cacheInsertQ mapM_ (\f -> f joinedState) cacheInsertQ
if responses == Set.empty if responses == Set.empty
then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn) then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn)
else do else if null (predecessors joinedState) && null (successors joinedState)
-- wait for migration data to be completely received then pure $ Left "join error: no predecessors or successors"
waitForMigrationFrom (nodeService prn) (getNid toJoinOn) -- successful join
pure $ Right ownStateSTM else do
-- wait for migration data to be completely received
waitForMigrationFrom (nodeService prn) (getNid ownState)
pure $ Right ownStateSTM
) )
`catch` (\e -> pure . Left $ displayException (e :: IOException)) `catch` (\e -> pure . Left $ displayException (e :: IOException))
-- | Send a 'QueryID' 'Request' for getting the node that handles a certain key ID. -- | Send a 'QueryID' 'Request' for getting the node that handles a certain key ID.
requestQueryID :: (MonadIO m, MonadError String m) requestQueryID :: LocalNodeState s -- ^ NodeState of the querying node
=> LocalNodeState s -- ^ NodeState of the querying node
-> NodeID -- ^ target key ID to look up -> NodeID -- ^ target key ID to look up
-> m RemoteNodeState -- ^ the node responsible for handling that key -> IO RemoteNodeState -- ^ the node responsible for handling that key
-- 1. do a local lookup for the l closest nodes -- 1. do a local lookup for the l closest nodes
-- 2. create l sockets -- 2. create l sockets
-- 3. send a message async concurrently to all l nodes -- 3. send a message async concurrently to all l nodes
@ -546,23 +546,23 @@ requestQueryID :: (MonadIO m, MonadError String m)
-- 5. repeat until FOUND (problem: new entries not necessarily already in cache, explicitly compare with closer results) -- 5. repeat until FOUND (problem: new entries not necessarily already in cache, explicitly compare with closer results)
-- TODO: deal with lookup failures -- TODO: deal with lookup failures
requestQueryID ns targetID = do requestQueryID ns targetID = do
firstCacheSnapshot <- liftIO . readTVarIO . nodeCacheSTM $ ns firstCacheSnapshot <- readTVarIO . nodeCacheSTM $ ns
-- TODO: make maxAttempts configurable -- TODO: make maxAttempts configurable
queryIdLookupLoop firstCacheSnapshot ns 50 targetID queryIdLookupLoop firstCacheSnapshot ns 50 targetID
-- | like 'requestQueryID, but allows passing of a custom cache, e.g. for joining -- | like 'requestQueryID, but allows passing of a custom cache, e.g. for joining
queryIdLookupLoop :: (MonadIO m, MonadError String m) => NodeCache -> LocalNodeState s -> Int -> NodeID -> m RemoteNodeState queryIdLookupLoop :: NodeCache -> LocalNodeState s -> Int -> NodeID -> IO RemoteNodeState
-- return node itself as default fallback value against infinite recursion. -- return node itself as default fallback value against infinite recursion.
-- TODO: consider using an Either instead of a default value -- TODO: consider using an Either instead of a default value
queryIdLookupLoop _ ns 0 _ = throwError "exhausted maximum lookup attempts" queryIdLookupLoop _ ns 0 _ = pure $ toRemoteNodeState ns
queryIdLookupLoop cacheSnapshot ns maxAttempts targetID = do queryIdLookupLoop cacheSnapshot ns maxAttempts targetID = do
let localResult = queryLocalCache ns cacheSnapshot (lNumBestNodes ns) targetID let localResult = queryLocalCache ns cacheSnapshot (lNumBestNodes ns) targetID
-- FOUND can only be returned if targetID is owned by local node -- FOUND can only be returned if targetID is owned by local node
case localResult of case localResult of
FOUND thisNode -> pure thisNode FOUND thisNode -> pure thisNode
FORWARD nodeSet -> do FORWARD nodeSet -> do
responseEntries <- liftIO $ sendQueryIdMessages targetID ns Nothing (remoteNode <$> Set.elems nodeSet) responseEntries <- sendQueryIdMessages targetID ns Nothing (remoteNode <$> Set.elems nodeSet)
now <- liftIO getPOSIXTime now <- getPOSIXTime
-- check for a FOUND and return it -- check for a FOUND and return it
case responseEntries of case responseEntries of
FOUND foundNode -> pure foundNode FOUND foundNode -> pure foundNode
@ -585,10 +585,10 @@ sendQueryIdMessages targetID ns lParam targets = do
-- create connected sockets to all query targets and use them for request handling -- create connected sockets to all query targets and use them for request handling
nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns) srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns)
let srcAddr = confIP nodeConf -- ToDo: make attempts and timeout configurable
queryThreads <- mapM (\resultNode -> async $ bracket (mkSendSocket srcAddr (getDomain resultNode) (getDhtPort resultNode)) close ( queryThreads <- mapM (\resultNode -> async $ bracket (mkSendSocket srcAddr (getDomain resultNode) (getDhtPort resultNode)) close (
sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (lookupMessage targetID ns Nothing) sendRequestTo 5000 3 (lookupMessage targetID ns Nothing)
)) targets )) targets
-- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613 -- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613
-- ToDo: exception handling, maybe log them -- ToDo: exception handling, maybe log them
@ -596,10 +596,8 @@ sendQueryIdMessages targetID ns lParam targets = do
-- insert new cache entries both into global cache as well as return accumulated QueryResponses for further processing -- insert new cache entries both into global cache as well as return accumulated QueryResponses for further processing
now <- getPOSIXTime now <- getPOSIXTime
-- collect cache entries from all responses -- collect cache entries from all responses
foldrM (\resp acc -> do foldM (\acc resp -> do
let let entrySet = case queryResult <$> payload resp of
responseResult = queryResult <$> payload resp
entrySet = case responseResult of
Just (FOUND result1) -> Set.singleton (RemoteCacheEntry result1 now) Just (FOUND result1) -> Set.singleton (RemoteCacheEntry result1 now)
Just (FORWARD resultset) -> resultset Just (FORWARD resultset) -> resultset
_ -> Set.empty _ -> Set.empty
@ -609,15 +607,10 @@ sendQueryIdMessages targetID ns lParam targets = do
-- return accumulated QueryResult -- return accumulated QueryResult
pure $ case acc of pure $ case acc of
-- once a FOUND as been encountered, return this as a result -- once a FOUND as been encountered, return this as a result
FOUND{} -> acc isFound@FOUND{} -> isFound
FORWARD accSet FORWARD accSet -> FORWARD $ entrySet `Set.union` accSet
| maybe False isFound responseResult -> fromJust responseResult
| otherwise -> FORWARD $ entrySet `Set.union` accSet
) (FORWARD Set.empty) responses ) (FORWARD Set.empty) responses
where
isFound FOUND{} = True
isFound _ = False
-- | Create a QueryID message to be supplied to 'sendRequestTo' -- | Create a QueryID message to be supplied to 'sendRequestTo'
lookupMessage :: Integral i lookupMessage :: Integral i
@ -636,9 +629,8 @@ requestStabilise :: LocalNodeState s -- ^ sending node
-> RemoteNodeState -- ^ neighbour node to send to -> RemoteNodeState -- ^ neighbour node to send to
-> IO (Either String ([RemoteNodeState], [RemoteNodeState])) -- ^ (predecessors, successors) of responding node -> IO (Either String ([RemoteNodeState], [RemoteNodeState])) -- ^ (predecessors, successors) of responding node
requestStabilise ns neighbour = do requestStabilise ns neighbour = do
nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns) srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns)
let srcAddr = confIP nodeConf responses <- bracket (mkSendSocket srcAddr (getDomain neighbour) (getDhtPort neighbour)) close (fmap Right . sendRequestTo 5000 3 (\rid ->
responses <- bracket (mkSendSocket srcAddr (getDomain neighbour) (getDhtPort neighbour)) close (fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid ->
Request { Request {
requestID = rid requestID = rid
, sender = toRemoteNodeState ns , sender = toRemoteNodeState ns
@ -660,7 +652,8 @@ requestStabilise ns neighbour = do
) )
([],[]) respSet ([],[]) respSet
-- update successfully responded neighbour in cache -- update successfully responded neighbour in cache
maybe (pure ()) (\p -> queueUpdateVerifieds (Identity $ senderID p) ns) $ headMay (Set.elems respSet) now <- getPOSIXTime
maybe (pure ()) (\p -> queueAddEntries (Identity $ RemoteCacheEntry (sender p) now) ns) $ headMay (Set.elems respSet)
pure $ if null responsePreds && null responseSuccs pure $ if null responsePreds && null responseSuccs
then Left "no neighbours returned" then Left "no neighbours returned"
else Right (responsePreds, responseSuccs) else Right (responsePreds, responseSuccs)
@ -675,14 +668,13 @@ requestLeave :: LocalNodeState s
-> RemoteNodeState -- target node -> RemoteNodeState -- target node
-> IO (Either String ()) -- error or success -> IO (Either String ()) -- error or success
requestLeave ns doMigration target = do requestLeave ns doMigration target = do
nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns) srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns)
let srcAddr = confIP nodeConf let leavePayload = LeaveRequestPayload {
leavePayload = LeaveRequestPayload {
leaveSuccessors = successors ns leaveSuccessors = successors ns
, leavePredecessors = predecessors ns , leavePredecessors = predecessors ns
, leaveDoMigration = doMigration , leaveDoMigration = doMigration
} }
responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close (fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid -> responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close (fmap Right . sendRequestTo 5000 3 (\rid ->
Request { Request {
requestID = rid requestID = rid
, sender = toRemoteNodeState ns , sender = toRemoteNodeState ns
@ -704,11 +696,10 @@ requestPing :: LocalNodeState s -- ^ sending node
-> RemoteNodeState -- ^ node to be PINGed -> RemoteNodeState -- ^ node to be PINGed
-> IO (Either String [RemoteNodeState]) -- ^ all active vServers of the pinged node -> IO (Either String [RemoteNodeState]) -- ^ all active vServers of the pinged node
requestPing ns target = do requestPing ns target = do
nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns) srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns)
let srcAddr = confIP nodeConf
responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close
(\sock -> do (\sock -> do
resp <- sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid -> resp <- sendRequestTo 5000 3 (\rid ->
Request { Request {
requestID = rid requestID = rid
, sender = toRemoteNodeState ns , sender = toRemoteNodeState ns
@ -744,9 +735,10 @@ requestPing ns target = do
) responses ) responses
-- | Generic function for sending a request over a connected socket and collecting the response. -- | Generic function for sending a request over a connected socket and collecting the response.
-- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout. -- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout.
sendRequestTo :: Int -- ^ timeout in milliseconds sendRequestTo :: Int -- ^ timeout in seconds
-> Int -- ^ number of retries -> Int -- ^ number of retries
-> (Integer -> FediChordMessage) -- ^ the message to be sent, still needing a requestID -> (Integer -> FediChordMessage) -- ^ the message to be sent, still needing a requestID
-> Socket -- ^ connected socket to use for sending -> Socket -- ^ connected socket to use for sending
@ -757,10 +749,11 @@ sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do
let let
msgComplete = msgIncomplete randomID msgComplete = msgIncomplete randomID
requests = serialiseMessage sendMessageSize msgComplete requests = serialiseMessage sendMessageSize msgComplete
putStrLn $ "sending request message " <> show msgComplete
-- create a queue for passing received response messages back, even after a timeout -- create a queue for passing received response messages back, even after a timeout
responseQ <- newTBQueueIO $ 2*maximumParts -- keep room for duplicate packets responseQ <- newTBQueueIO $ 2*maximumParts -- keep room for duplicate packets
-- start sendAndAck with timeout -- start sendAndAck with timeout
_ <- attempts numAttempts . timeout (timeoutMillis*1000) $ sendAndAck responseQ sock requests attempts numAttempts . timeout timeoutMillis $ sendAndAck responseQ sock requests
-- after timeout, check received responses, delete them from unacked message set/ map and rerun senAndAck with that if necessary. -- after timeout, check received responses, delete them from unacked message set/ map and rerun senAndAck with that if necessary.
recvdParts <- atomically $ flushTBQueue responseQ recvdParts <- atomically $ flushTBQueue responseQ
pure $ Set.fromList recvdParts pure $ Set.fromList recvdParts
@ -769,20 +762,19 @@ sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do
-> Socket -- ^ the socket used for sending and receiving for this particular remote node -> Socket -- ^ the socket used for sending and receiving for this particular remote node
-> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts -> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts
-> IO () -> IO ()
sendAndAck responseQueue sock' remainingSends = do sendAndAck responseQueue sock remainingSends = do
sendMany sock' $ Map.elems remainingSends sendMany sock $ Map.elems remainingSends
-- if all requests have been acked/ responded to, return prematurely -- if all requests have been acked/ responded to, return prematurely
recvLoop sock' responseQueue remainingSends Set.empty Nothing recvLoop responseQueue remainingSends Set.empty Nothing
recvLoop :: Socket recvLoop :: TBQueue FediChordMessage -- ^ the queue for putting in the received responses
-> TBQueue FediChordMessage -- ^ the queue for putting in the received responses
-> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts -> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts
-> Set.Set Integer -- ^ already received response part numbers -> Set.Set Integer -- ^ already received response part numbers
-> Maybe Integer -- ^ total number of response parts if already known -> Maybe Integer -- ^ total number of response parts if already known
-> IO () -> IO ()
recvLoop sock' responseQueue remainingSends' receivedPartNums totalParts = do recvLoop responseQueue remainingSends' receivedPartNums totalParts = do
-- 65535 is maximum length of UDP packets, as long as -- 65535 is maximum length of UDP packets, as long as
-- no IPv6 jumbograms are used -- no IPv6 jumbograms are used
response <- deserialiseMessage <$> recv sock' 65535 response <- deserialiseMessage <$> recv sock 65535
case response of case response of
Right msg@Response{} -> do Right msg@Response{} -> do
atomically $ writeTBQueue responseQueue msg atomically $ writeTBQueue responseQueue msg
@ -790,12 +782,11 @@ sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do
newTotalParts = if isFinalPart msg then Just (part msg) else totalParts newTotalParts = if isFinalPart msg then Just (part msg) else totalParts
newRemaining = Map.delete (part msg) remainingSends' newRemaining = Map.delete (part msg) remainingSends'
newReceivedParts = Set.insert (part msg) receivedPartNums newReceivedParts = Set.insert (part msg) receivedPartNums
if Map.null newRemaining && maybe False (\p -> Set.size newReceivedParts == fromIntegral p) newTotalParts if Map.null newRemaining && maybe False (\p -> Set.size receivedPartNums == fromIntegral p) newTotalParts
then pure () then pure ()
else recvLoop sock' responseQueue newRemaining newReceivedParts newTotalParts else recvLoop responseQueue newRemaining receivedPartNums newTotalParts
-- drop errors and invalid messages -- drop errors and invalid messages
Right Request{} -> pure () -- expecting a response, not a request Left _ -> recvLoop responseQueue remainingSends' receivedPartNums totalParts
Left _ -> recvLoop sock' responseQueue remainingSends' receivedPartNums totalParts
-- | enqueue a list of RemoteCacheEntries to be added to the global NodeCache -- | enqueue a list of RemoteCacheEntries to be added to the global NodeCache
@ -821,18 +812,6 @@ queueDeleteEntry :: NodeID
-> IO () -> IO ()
queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete
-- | enqueue the timestamp update and verification marking of an entry in the
-- global 'NodeCache'.
queueUpdateVerifieds :: Foldable c
=> c NodeID
-> LocalNodeState s
-> IO ()
queueUpdateVerifieds nIds ns = do
now <- getPOSIXTime
forM_ nIds $ \nid' -> atomically $ writeTQueue (cacheWriteQueue ns) $
markCacheEntryAsVerified (Just now) nid'
-- | retry an IO action at most *i* times until it delivers a result -- | retry an IO action at most *i* times until it delivers a result
attempts :: Int -- ^ number of retries *i* attempts :: Int -- ^ number of retries *i*
-> IO (Maybe a) -- ^ action to retry -> IO (Maybe a) -- ^ action to retry
@ -865,7 +844,7 @@ mkServerSocket ip port = do
sockAddr <- addrAddress <$> resolve (Just $ show . fromHostAddress6 $ ip) (Just port) sockAddr <- addrAddress <$> resolve (Just $ show . fromHostAddress6 $ ip) (Just port)
sock <- socket AF_INET6 Datagram defaultProtocol sock <- socket AF_INET6 Datagram defaultProtocol
setSocketOption sock IPv6Only 1 setSocketOption sock IPv6Only 1
bind sock sockAddr `catch` (\e -> putStrLn $ "Caught exception while bind " <> show sock <> " " <> show sockAddr <> ": " <> show (e :: SomeException)) bind sock sockAddr
pure sock pure sock
-- | create a UDP datagram socket, connected to a destination. -- | create a UDP datagram socket, connected to a destination.
@ -881,6 +860,6 @@ mkSendSocket srcIp dest destPort = do
setSocketOption sendSock IPv6Only 1 setSocketOption sendSock IPv6Only 1
-- bind to the configured local IP to make sure that outgoing packets are sent from -- bind to the configured local IP to make sure that outgoing packets are sent from
-- this source address -- this source address
bind sendSock srcAddr `catch` (\e -> putStrLn $ "Caught exception while mkSendSocket bind " <> show sendSock <> " " <> show srcAddr <> ": " <> show (e :: SomeException)) bind sendSock srcAddr
connect sendSock destAddr connect sendSock destAddr
pure sendSock pure sendSock

View file

@ -166,7 +166,6 @@ fediChordBootstrapJoin nsSTM bootstrapNode = do
currentlyResponsible <- liftEither lookupResp currentlyResponsible <- liftEither lookupResp
liftIO . putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible) liftIO . putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible)
-- 2. then send a join to the currently responsible node -- 2. then send a join to the currently responsible node
liftIO $ putStrLn "send a bootstrap Join"
joinResult <- liftIO $ requestJoin currentlyResponsible nsSTM joinResult <- liftIO $ requestJoin currentlyResponsible nsSTM
liftEither joinResult liftEither joinResult
@ -199,7 +198,7 @@ convergenceSampleThread nsSTM = forever $ do
-- unjoined node: try joining through all bootstrapping nodes -- unjoined node: try joining through all bootstrapping nodes
else tryBootstrapJoining nsSTM >> pure () else tryBootstrapJoining nsSTM >> pure ()
let delaySecs = confBootstrapSamplingInterval . nodeConfig $ parentNode let delaySecs = confBootstrapSamplingInterval . nodeConfig $ parentNode
threadDelay delaySecs threadDelay $ delaySecs * 10^6
-- | Try joining the DHT through any of the bootstrapping nodes until it succeeds. -- | Try joining the DHT through any of the bootstrapping nodes until it succeeds.
@ -223,11 +222,10 @@ tryBootstrapJoining nsSTM = do
bootstrapQueryId :: LocalNodeStateSTM s -> (String, PortNumber) -> NodeID -> IO (Either String RemoteNodeState) bootstrapQueryId :: LocalNodeStateSTM s -> (String, PortNumber) -> NodeID -> IO (Either String RemoteNodeState)
bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do
ns <- readTVarIO nsSTM ns <- readTVarIO nsSTM
nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns) srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns)
let srcAddr = confIP nodeConf
bootstrapResponse <- bracket (mkSendSocket srcAddr bootstrapHost bootstrapPort) close ( bootstrapResponse <- bracket (mkSendSocket srcAddr bootstrapHost bootstrapPort) close (
-- Initialise an empty cache only with the responses from a bootstrapping node -- Initialise an empty cache only with the responses from a bootstrapping node
fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (lookupMessage targetID ns Nothing) fmap Right . sendRequestTo 5000 3 (lookupMessage targetID ns Nothing)
) )
`catch` (\e -> pure . Left $ "Error at bootstrap QueryId: " <> displayException (e :: IOException)) `catch` (\e -> pure . Left $ "Error at bootstrap QueryId: " <> displayException (e :: IOException))
@ -246,24 +244,26 @@ bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do
Just (FORWARD resultset) -> foldr' (addCacheEntryPure now) cacheAcc resultset Just (FORWARD resultset) -> foldr' (addCacheEntryPure now) cacheAcc resultset
) )
initCache resp initCache resp
currentlyResponsible <- runExceptT $ queryIdLookupLoop bootstrapCache ns 50 $ getNid ns currentlyResponsible <- queryIdLookupLoop bootstrapCache ns 50 $ getNid ns
pure currentlyResponsible pure $ Right currentlyResponsible
-- | join a node to the DHT using the global node cache -- | join a node to the DHT using the global node cache
-- node's position. -- node's position.
fediChordVserverJoin :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s)) fediChordVserverJoin :: Service s (RealNodeSTM s)
=> LocalNodeStateSTM s -- ^ the local 'NodeState' => LocalNodeStateSTM s -- ^ the local 'NodeState'
-> m (LocalNodeStateSTM s) -- ^ the joined 'NodeState' after a -> IO (Either String (LocalNodeStateSTM s)) -- ^ the joined 'NodeState' after a
-- successful join, otherwise an error message -- successful join, otherwise an error message
fediChordVserverJoin nsSTM = do fediChordVserverJoin nsSTM = do
ns <- liftIO $ readTVarIO nsSTM ns <- readTVarIO nsSTM
-- 1. get routed to the currently responsible node -- 1. get routed to the currently responsible node
currentlyResponsible <- requestQueryID ns $ getNid ns currentlyResponsible <- requestQueryID ns $ getNid ns
liftIO . putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible) putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible)
-- 2. then send a join to the currently responsible node -- 2. then send a join to the currently responsible node
joinResult <- liftIO $ requestJoin currentlyResponsible nsSTM joinResult <- requestJoin currentlyResponsible nsSTM
liftEither joinResult case joinResult of
Left err -> pure . Left $ "Error joining on " <> err
Right joinedNS -> pure . Right $ joinedNS
fediChordVserverLeave :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s)) => LocalNodeState s -> m () fediChordVserverLeave :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s)) => LocalNodeState s -> m ()
fediChordVserverLeave ns = do fediChordVserverLeave ns = do
@ -311,23 +311,23 @@ joinOnNewEntriesThread nsSTM = loop
where where
loop = do loop = do
nsSnap <- readTVarIO nsSTM nsSnap <- readTVarIO nsSTM
(lookupResult, parentNode) <- atomically $ do (lookupResult, cache) <- atomically $ do
cache <- readTVar $ nodeCacheSTM nsSnap cache <- readTVar $ nodeCacheSTM nsSnap
parentNode <- readTVar $ parentRealNode nsSnap
case queryLocalCache nsSnap cache 1 (getNid nsSnap) of case queryLocalCache nsSnap cache 1 (getNid nsSnap) of
-- empty cache, block until cache changes and then retry -- empty cache, block until cache changes and then retry
(FORWARD s) | Set.null s -> retry (FORWARD s) | Set.null s -> retry
result -> pure (result, parentNode) result -> pure (result, cache)
case lookupResult of case lookupResult of
-- already joined -- already joined
FOUND _ -> FOUND _ ->
pure () pure ()
-- otherwise try joining -- otherwise try joining
FORWARD _ -> do FORWARD _ -> do
joinResult <- runExceptT $ fediChordVserverJoin nsSTM joinResult <- fediChordVserverJoin nsSTM
either either
-- on join failure, sleep and retry -- on join failure, sleep and retry
(const $ threadDelay (confJoinAttemptsInterval . nodeConfig $ parentNode) >> loop) -- TODO: make delay configurable
(const $ threadDelay (30 * 10^6) >> loop)
(const $ pure ()) (const $ pure ())
joinResult joinResult
@ -342,15 +342,20 @@ nodeCacheWriter nsSTM =
modifyTVar' (nodeCacheSTM ns) cacheModifier modifyTVar' (nodeCacheSTM ns) cacheModifier
-- TODO: make max entry age configurable
maxEntryAge :: POSIXTime
maxEntryAge = 600
-- | Periodically iterate through cache, clean up expired entries and verify unverified ones -- | Periodically iterate through cache, clean up expired entries and verify unverified ones
nodeCacheVerifyThread :: LocalNodeStateSTM s -> IO () nodeCacheVerifyThread :: LocalNodeStateSTM s -> IO ()
nodeCacheVerifyThread nsSTM = forever $ do nodeCacheVerifyThread nsSTM = forever $ do
putStrLn "cache verify run: begin"
-- get cache -- get cache
(ns, cache, maxEntryAge) <- atomically $ do (ns, cache) <- atomically $ do
ns <- readTVar nsSTM ns <- readTVar nsSTM
cache <- readTVar $ nodeCacheSTM ns cache <- readTVar $ nodeCacheSTM ns
maxEntryAge <- confMaxNodeCacheAge . nodeConfig <$> readTVar (parentRealNode ns) pure (ns, cache)
pure (ns, cache, maxEntryAge)
-- iterate entries: -- iterate entries:
-- for avoiding too many time syscalls, get current time before iterating. -- for avoiding too many time syscalls, get current time before iterating.
now <- getPOSIXTime now <- getPOSIXTime
@ -397,7 +402,8 @@ nodeCacheVerifyThread nsSTM = forever $ do
forkIO $ sendQueryIdMessages targetID latestNs (Just (1 + jEntriesPerSlice latestNs)) (nodesToQuery targetID) >> pure () -- ask for 1 entry more than j because of querying the middle forkIO $ sendQueryIdMessages targetID latestNs (Just (1 + jEntriesPerSlice latestNs)) (nodesToQuery targetID) >> pure () -- ask for 1 entry more than j because of querying the middle
) )
threadDelay $ fromEnum (maxEntryAge / 20) `div` 10^6 -- convert from pico to milliseconds putStrLn "cache verify run: end"
threadDelay $ 10^6 * round maxEntryAge `div` 20
-- | Checks the invariant of at least @jEntries@ per cache slice. -- | Checks the invariant of at least @jEntries@ per cache slice.
@ -463,6 +469,7 @@ stabiliseThread :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> IO ()
stabiliseThread nsSTM = forever $ do stabiliseThread nsSTM = forever $ do
oldNs <- readTVarIO nsSTM oldNs <- readTVarIO nsSTM
putStrLn "stabilise run: begin"
-- iterate through the same snapshot, collect potential new neighbours -- iterate through the same snapshot, collect potential new neighbours
-- and nodes to be deleted, and modify these changes only at the end of -- and nodes to be deleted, and modify these changes only at the end of
@ -497,26 +504,18 @@ stabiliseThread nsSTM = forever $ do
-- try looking up additional neighbours if list too short -- try looking up additional neighbours if list too short
forM_ [(length $ predecessors updatedNs)..(kNeighbours updatedNs)] (\_ -> do forM_ [(length $ predecessors updatedNs)..(kNeighbours updatedNs)] (\_ -> do
ns' <- readTVarIO nsSTM ns' <- readTVarIO nsSTM
nextEntry <- runExceptT . requestQueryID ns' $ pred . getNid $ lastDef (toRemoteNodeState ns') (predecessors ns') nextEntry <- requestQueryID ns' $ pred . getNid $ lastDef (toRemoteNodeState ns') (predecessors ns')
either atomically $ do
(const $ pure ()) latestNs <- readTVar nsSTM
(\entry -> atomically $ do writeTVar nsSTM $ addPredecessors [nextEntry] latestNs
latestNs <- readTVar nsSTM
writeTVar nsSTM $ addPredecessors [entry] latestNs
)
nextEntry
) )
forM_ [(length $ successors updatedNs)..(kNeighbours updatedNs)] (\_ -> do forM_ [(length $ successors updatedNs)..(kNeighbours updatedNs)] (\_ -> do
ns' <- readTVarIO nsSTM ns' <- readTVarIO nsSTM
nextEntry <- runExceptT . requestQueryID ns' $ succ . getNid $ lastDef (toRemoteNodeState ns') (successors ns') nextEntry <- requestQueryID ns' $ succ . getNid $ lastDef (toRemoteNodeState ns') (successors ns')
either atomically $ do
(const $ pure ()) latestNs <- readTVar nsSTM
(\entry -> atomically $ do writeTVar nsSTM $ addSuccessors [nextEntry] latestNs
latestNs <- readTVar nsSTM
writeTVar nsSTM $ addSuccessors [entry] latestNs
)
nextEntry
) )
newNs <- readTVarIO nsSTM newNs <- readTVarIO nsSTM
@ -541,8 +540,9 @@ stabiliseThread nsSTM = forever $ do
) )
newPredecessor newPredecessor
stabiliseDelay <- confStabiliseInterval . nodeConfig <$> readTVarIO (parentRealNode newNs) putStrLn "stabilise run: end"
threadDelay stabiliseDelay -- TODO: make delay configurable
threadDelay (60 * 10^6)
where where
-- | send a stabilise request to the n-th neighbour -- | send a stabilise request to the n-th neighbour
-- (specified by the provided getter function) and on failure retry -- (specified by the provided getter function) and on failure retry
@ -629,15 +629,19 @@ type RequestMap = Map.Map (SockAddr, Integer) RequestMapEntry
data RequestMapEntry = RequestMapEntry (Set.Set FediChordMessage) (Maybe Integer) data RequestMapEntry = RequestMapEntry (Set.Set FediChordMessage) (Maybe Integer)
POSIXTime POSIXTime
-- TODO: make purge age configurable
-- | periodically clean up old request parts
responsePurgeAge :: POSIXTime
responsePurgeAge = 60 -- seconds
requestMapPurge :: POSIXTime -> MVar RequestMap -> IO () requestMapPurge :: MVar RequestMap -> IO ()
requestMapPurge purgeAge mapVar = forever $ do requestMapPurge mapVar = forever $ do
rMapState <- takeMVar mapVar rMapState <- takeMVar mapVar
now <- getPOSIXTime now <- getPOSIXTime
putMVar mapVar $ Map.filter (\(RequestMapEntry _ _ ts) -> putMVar mapVar $ Map.filter (\entry@(RequestMapEntry _ _ ts) ->
now - ts < purgeAge now - ts < responsePurgeAge
) rMapState ) rMapState
threadDelay $ (fromEnum purgeAge * 2) `div` 10^6 threadDelay $ round responsePurgeAge * 2 * 10^6
-- | Wait for messages, deserialise them, manage parts and acknowledgement status, -- | Wait for messages, deserialise them, manage parts and acknowledgement status,
@ -652,13 +656,12 @@ fediMessageHandler sendQ recvQ nsSTM = do
-- not change. -- not change.
-- Other functions are passed the nsSTM reference and thus can get the latest state. -- Other functions are passed the nsSTM reference and thus can get the latest state.
nsSnap <- readTVarIO nsSTM nsSnap <- readTVarIO nsSTM
nodeConf <- nodeConfig <$> readTVarIO (parentRealNode nsSnap)
-- handling multipart messages: -- handling multipart messages:
-- Request parts can be insert into a map (key: (sender IP against spoofing, request ID), value: timestamp + set of message parts, handle all of them when size of set == parts) before being handled. This map needs to be purged periodically by a separate thread and can be protected by an MVar for fairness. -- Request parts can be insert into a map (key: (sender IP against spoofing, request ID), value: timestamp + set of message parts, handle all of them when size of set == parts) before being handled. This map needs to be purged periodically by a separate thread and can be protected by an MVar for fairness.
requestMap <- newMVar (Map.empty :: RequestMap) requestMap <- newMVar (Map.empty :: RequestMap)
-- run receive loop and requestMapPurge concurrently, so that an exception makes -- run receive loop and requestMapPurge concurrently, so that an exception makes
-- both of them fail -- both of them fail
concurrently_ (requestMapPurge (confResponsePurgeAge nodeConf) requestMap) $ forever $ do concurrently_ (requestMapPurge requestMap) $ forever $ do
-- wait for incoming messages -- wait for incoming messages
(rawMsg, sourceAddr) <- atomically $ readTQueue recvQ (rawMsg, sourceAddr) <- atomically $ readTQueue recvQ
let aMsg = deserialiseMessage rawMsg let aMsg = deserialiseMessage rawMsg
@ -713,7 +716,7 @@ fediMessageHandler sendQ recvQ nsSTM = do
instance DHT (RealNodeSTM s) where instance DHT (RealNodeSTM s) where
lookupKey nodeSTM keystring = getKeyResponsibility nodeSTM $ genKeyID keystring lookupKey nodeSTM keystring = getKeyResponsibility nodeSTM $ genKeyID keystring
forceLookupKey nodeSTM keystring = (putStrLn $ "forced responsibility lookup of #" <> keystring) >> (updateLookupCache nodeSTM $ genKeyID keystring) forceLookupKey nodeSTM keystring = updateLookupCache nodeSTM $ genKeyID keystring
-- potential better implementation: put all neighbours of all vservers and the vservers on a ringMap, look the key up and see whether it results in a LocalNodeState -- potential better implementation: put all neighbours of all vservers and the vservers on a ringMap, look the key up and see whether it results in a LocalNodeState
isResponsibleFor nodeSTM key = do isResponsibleFor nodeSTM key = do
node <- readTVarIO nodeSTM node <- readTVarIO nodeSTM
@ -754,7 +757,7 @@ getKeyResponsibility nodeSTM lookupKey = do
-- new entry. -- new entry.
-- If no vserver is active in the DHT, 'Nothing' is returned. -- If no vserver is active in the DHT, 'Nothing' is returned.
updateLookupCache :: RealNodeSTM s -> NodeID -> IO (Maybe (String, PortNumber)) updateLookupCache :: RealNodeSTM s -> NodeID -> IO (Maybe (String, PortNumber))
updateLookupCache nodeSTM keyToLookup = do updateLookupCache nodeSTM lookupKey = do
(node, lookupSource) <- atomically $ do (node, lookupSource) <- atomically $ do
node <- readTVar nodeSTM node <- readTVar nodeSTM
let firstVs = headMay (vservers node) let firstVs = headMay (vservers node)
@ -764,25 +767,18 @@ updateLookupCache nodeSTM keyToLookup = do
pure (node, lookupSource) pure (node, lookupSource)
maybe (do maybe (do
-- if no local node available, delete cache entry and return Nothing -- if no local node available, delete cache entry and return Nothing
atomically $ modifyTVar' (lookupCacheSTM node) $ Map.delete keyToLookup atomically $ modifyTVar' (lookupCacheSTM node) $ Map.delete lookupKey
pure Nothing pure Nothing
) )
(\n -> do (\n -> do
-- start a lookup from the node, update the cache with the lookup result and return it -- start a lookup from the node, update the cache with the lookup result and return it
-- TODO: better retry management, because having no vserver joined yet should newResponsible <- requestQueryID n lookupKey
-- be treated differently than other reasons for not getting a result. let newEntry = (getDomain newResponsible, getServicePort newResponsible)
newResponsible <- runExceptT $ requestQueryID n keyToLookup now <- getPOSIXTime
either -- atomic update against lost updates
(const $ pure Nothing) atomically $ modifyTVar' (lookupCacheSTM node) $
(\result -> do Map.insert lookupKey (CacheEntry False newEntry now)
let newEntry = (getDomain result, getServicePort result) pure $ Just newEntry
now <- getPOSIXTime
-- atomic update against lost updates
atomically $ modifyTVar' (lookupCacheSTM node) $
Map.insert keyToLookup (CacheEntry False newEntry now)
pure $ Just newEntry
)
newResponsible
) lookupSource ) lookupSource
@ -797,4 +793,4 @@ lookupCacheCleanup nodeSTM = do
now - ts < confMaxLookupCacheAge (nodeConfig node) now - ts < confMaxLookupCacheAge (nodeConfig node)
) )
) )
threadDelay $ fromEnum (2 * confMaxLookupCacheAge (nodeConfig node)) `div` 10^6 threadDelay $ round (confMaxLookupCacheAge $ nodeConfig node) * (10^5)

View file

@ -411,22 +411,10 @@ data FediChordConf = FediChordConf
-- ^ listening port for the FediChord DHT -- ^ listening port for the FediChord DHT
, confBootstrapNodes :: [(String, PortNumber)] , confBootstrapNodes :: [(String, PortNumber)]
-- ^ list of potential bootstrapping nodes -- ^ list of potential bootstrapping nodes
, confStabiliseInterval :: Int
-- ^ pause between stabilise runs, in milliseconds
, confBootstrapSamplingInterval :: Int , confBootstrapSamplingInterval :: Int
-- ^ pause between sampling the own ID through bootstrap nodes, in milliseconds -- ^ pause between sampling the own ID through bootstrap nodes, in seconds
, confMaxLookupCacheAge :: POSIXTime , confMaxLookupCacheAge :: POSIXTime
-- ^ maximum age of key lookup cache entries in seconds -- ^ maximum age of lookup cache entries in seconds
, confJoinAttemptsInterval :: Int
-- ^ interval between join attempts on newly learned nodes, in milliseconds
, confMaxNodeCacheAge :: POSIXTime
-- ^ maximum age of entries in the node cache, in milliseconds
, confResponsePurgeAge :: POSIXTime
-- ^ maximum age of message parts in response part cache, in seconds
, confRequestTimeout :: Int
-- ^ how long to wait until response has arrived, in milliseconds
, confRequestRetries :: Int
-- ^ how often re-sending a timed-out request can be retried
} }
deriving (Show, Eq) deriving (Show, Eq)
@ -457,12 +445,6 @@ data ServiceConf = ServiceConf
-- ^ listening port for service -- ^ listening port for service
, confServiceHost :: String , confServiceHost :: String
-- ^ hostname of service -- ^ hostname of service
, confLogfilePath :: String
-- ^ where to store the (measurement) log file
, confStatsEvalDelay :: Int
-- ^ delay between statistic rate measurement samplings, in microseconds
, confSpeedupFactor :: Int
-- While the speedup factor needs to be already included in all
} }
class DHT d where class DHT d where

View file

@ -10,43 +10,41 @@ module Hash2Pub.PostService where
import Control.Concurrent import Control.Concurrent
import Control.Concurrent.Async import Control.Concurrent.Async
import Control.Concurrent.MVar
import Control.Concurrent.STM import Control.Concurrent.STM
import Control.Exception (Exception (..), try) import Control.Concurrent.STM.TChan
import Control.Monad (foldM, forM, forM_, forever, unless, import Control.Concurrent.STM.TChan
void, when) import Control.Concurrent.STM.TQueue
import Control.Monad.IO.Class (liftIO) import Control.Concurrent.STM.TVar
import Control.Exception (Exception (..), try)
import Control.Monad (foldM, forM, forM_, forever)
import Control.Monad.IO.Class (liftIO)
import Control.Monad.STM
import Data.Bifunctor import Data.Bifunctor
import qualified Data.ByteString.Lazy.UTF8 as BSUL import qualified Data.ByteString.Lazy.UTF8 as BSUL
import qualified Data.ByteString.UTF8 as BSU import qualified Data.ByteString.UTF8 as BSU
import qualified Data.DList as D import qualified Data.HashMap.Strict as HMap
import Data.Either (lefts, rights) import qualified Data.HashSet as HSet
import qualified Data.HashMap.Strict as HMap import Data.Maybe (fromMaybe, isJust)
import qualified Data.HashSet as HSet import Data.String (fromString)
import Data.Maybe (fromJust, isJust) import qualified Data.Text.Lazy as Txt
import Data.String (fromString) import Data.Text.Normalize (NormalizationMode (NFC),
import Data.Text.Lazy (Text) normalize)
import qualified Data.Text.Lazy as Txt
import qualified Data.Text.Lazy.IO as TxtI
import Data.Text.Normalize (NormalizationMode (NFC), normalize)
import Data.Time.Clock.POSIX import Data.Time.Clock.POSIX
import Data.Typeable (Typeable) import Data.Typeable (Typeable)
import qualified Network.HTTP.Client as HTTP import qualified Network.HTTP.Client as HTTP
import qualified Network.HTTP.Types as HTTPT import qualified Network.HTTP.Types as HTTPT
import System.IO
import System.Random import System.Random
import Text.Read (readEither) import Text.Read (readEither)
import Formatting (fixed, format, int, (%)) import qualified Network.Wai.Handler.Warp as Warp
import qualified Network.Wai.Handler.Warp as Warp
import Servant import Servant
import Servant.Client import Servant.Client
import Servant.Server
import Hash2Pub.FediChordTypes import Hash2Pub.FediChordTypes
import Hash2Pub.PostService.API
import Hash2Pub.RingMap import Hash2Pub.RingMap
import Hash2Pub.Utils
import Debug.Trace
data PostService d = PostService data PostService d = PostService
{ serviceConf :: ServiceConf { serviceConf :: ServiceConf
@ -57,22 +55,19 @@ data PostService d = PostService
-- ^ for each tag store the subscribers + their queue -- ^ for each tag store the subscribers + their queue
, ownSubscriptions :: TVar (HMap.HashMap NodeID POSIXTime) , ownSubscriptions :: TVar (HMap.HashMap NodeID POSIXTime)
-- ^ tags subscribed by the own node have an assigned lease time -- ^ tags subscribed by the own node have an assigned lease time
, ownPosts :: TVar (HSet.HashSet Txt.Text)
-- ^ just store the existence of posts for saving memory,
, relayInQueue :: TQueue (Hashtag, PostID, PostContent) , relayInQueue :: TQueue (Hashtag, PostID, PostContent)
-- ^ Queue for processing incoming posts of own instance asynchronously -- ^ Queue for processing incoming posts of own instance asynchronously
, postFetchQueue :: TQueue PostID , postFetchQueue :: TQueue PostID
-- ^ queue of posts to be fetched
, migrationsInProgress :: TVar (HMap.HashMap NodeID (MVar ())) , migrationsInProgress :: TVar (HMap.HashMap NodeID (MVar ()))
, httpMan :: HTTP.Manager , httpMan :: HTTP.Manager
, statsQueue :: TQueue StatsEvent
, loadStats :: TVar RelayStats
-- ^ current load stats, replaced periodically
, logFileHandle :: Handle
} }
deriving (Typeable) deriving (Typeable)
type Hashtag = Text type Hashtag = Txt.Text
type PostID = Text type PostID = Txt.Text
type PostContent = Text type PostContent = Txt.Text
-- | For each handled tag, store its subscribers and provide a -- | For each handled tag, store its subscribers and provide a
-- broadcast 'TChan' for enqueuing posts -- broadcast 'TChan' for enqueuing posts
type RelayTags = RingMap NodeID (TagSubscribersSTM, TChan PostID, Hashtag) type RelayTags = RingMap NodeID (TagSubscribersSTM, TChan PostID, Hashtag)
@ -90,38 +85,26 @@ instance DHT d => Service PostService d where
threadVar <- newTVarIO =<< myThreadId -- own thread ID as placeholder threadVar <- newTVarIO =<< myThreadId -- own thread ID as placeholder
subscriberVar <- newTVarIO emptyRMap subscriberVar <- newTVarIO emptyRMap
ownSubsVar <- newTVarIO HMap.empty ownSubsVar <- newTVarIO HMap.empty
--ownPostVar <- newTVarIO HSet.empty ownPostVar <- newTVarIO HSet.empty
relayInQueue' <- newTQueueIO relayInQueue' <- newTQueueIO
postFetchQueue' <- newTQueueIO postFetchQueue' <- newTQueueIO
migrationsInProgress' <- newTVarIO HMap.empty migrationsInProgress' <- newTVarIO HMap.empty
httpMan' <- HTTP.newManager HTTP.defaultManagerSettings httpMan' <- HTTP.newManager HTTP.defaultManagerSettings
statsQueue' <- newTQueueIO
loadStats' <- newTVarIO emptyStats
loggingFile <- openFile (confLogfilePath conf) WriteMode
hSetBuffering loggingFile LineBuffering
let let
thisService = PostService thisService = PostService {
{ serviceConf = conf serviceConf = conf
, baseDHT = dht , baseDHT = dht
, serviceThread = threadVar , serviceThread = threadVar
, subscribers = subscriberVar , subscribers = subscriberVar
, ownSubscriptions = ownSubsVar , ownSubscriptions = ownSubsVar
--, ownPosts = ownPostVar , ownPosts = ownPostVar
, relayInQueue = relayInQueue' , relayInQueue = relayInQueue'
, postFetchQueue = postFetchQueue' , postFetchQueue = postFetchQueue'
, migrationsInProgress = migrationsInProgress' , migrationsInProgress = migrationsInProgress'
, httpMan = httpMan' , httpMan = httpMan'
, statsQueue = statsQueue' }
, loadStats = loadStats'
, logFileHandle = loggingFile
}
port' = fromIntegral (confServicePort conf) port' = fromIntegral (confServicePort conf)
warpSettings = Warp.setPort port' . Warp.setHost (fromString . confServiceHost $ conf) $ Warp.defaultSettings warpSettings = Warp.setPort port' . Warp.setHost (fromString . confServiceHost $ conf) $ Warp.defaultSettings
-- log a start message, this also truncates existing files
TxtI.hPutStrLn loggingFile $ Txt.unlines
[ "# Starting mock relay implementation"
, "#time stamp ; relay receive rate ;relay delivery rate ;instance publish rate ;instance fetch rate ;total subscriptions"
]
-- Run 'concurrently_' from another thread to be able to return the -- Run 'concurrently_' from another thread to be able to return the
-- 'PostService'. -- 'PostService'.
-- Terminating that parent thread will make all child threads terminate as well. -- Terminating that parent thread will make all child threads terminate as well.
@ -129,11 +112,7 @@ instance DHT d => Service PostService d where
concurrently_ concurrently_
-- web server -- web server
(Warp.runSettings warpSettings $ postServiceApplication thisService) (Warp.runSettings warpSettings $ postServiceApplication thisService)
$ concurrently (processIncomingPosts thisService)
-- background processing workers
(launchWorkerThreads thisService)
-- statistics/ measurements
(launchStatsThreads thisService)
-- update thread ID after fork -- update thread ID after fork
atomically $ writeTVar threadVar servThreadID atomically $ writeTVar threadVar servThreadID
pure thisService pure thisService
@ -159,13 +138,38 @@ postServiceApplication :: DHT d => PostService d -> Application
postServiceApplication serv = serve exposedPostServiceAPI $ postServer serv postServiceApplication serv = serve exposedPostServiceAPI $ postServer serv
-- | needed for guiding type inference
exposedPostServiceAPI :: Proxy PostServiceAPI
exposedPostServiceAPI = Proxy
-- ========= constants =========== -- ========= constants ===========
placeholderPost :: Text placeholderPost :: Txt.Text
placeholderPost = Txt.take 5120 . Txt.repeat $ 'O' -- size 5KiB placeholderPost = Txt.take 5120 . Txt.repeat $ 'O' -- size 5KiB
-- ========= HTTP API and handlers ============= -- ========= HTTP API and handlers =============
type PostServiceAPI = "relay" :> "inbox" :> Capture "hashtag" Txt.Text :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent
-- delivery endpoint of newly published posts of the relay's instance
:<|> "relay" :> "subscribers" :> Capture "senderID" Integer :> ReqBody '[PlainText] Txt.Text :> PostNoContent '[PlainText] Txt.Text
-- endpoint for delivering the subscriptions and outstanding queue
:<|> "post" :> Capture "postid" Txt.Text :> Get '[PlainText] Txt.Text
-- fetch endpoint for posts, full post ID is http://$domain/post/$postid
:<|> "posts" :> ReqBody '[PlainText] Txt.Text :> Post '[PlainText] Txt.Text
-- endpoint for fetching multiple posts at once
:<|> "posts" :> "inbox" :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent
-- delivery endpoint of newly published posts of the relay's instance
:<|> "tags" :> Capture "hashtag" Txt.Text :> ReqBody '[PlainText] Txt.Text :> PostCreated '[PlainText] Txt.Text
-- delivery endpoint for posts of $tag at subscribing instance
:<|> "tags" :> Capture "hashtag" Txt.Text :> "subscribe" :> Header "Origin" Txt.Text :> Get '[PlainText] Integer
-- endpoint for subscribing the instance specified in
-- the Origin header to $hashtag.
-- Returns subscription lease time in seconds.
:<|> "tags" :> Capture "hashtag" Txt.Text :> "unsubscribe" :> Header "Origin" Txt.Text :> Get '[PlainText] Txt.Text
-- endpoint for unsubscribing the instance specified in
-- the Origin header to $hashtag
postServer :: DHT d => PostService d -> Server PostServiceAPI postServer :: DHT d => PostService d -> Server PostServiceAPI
postServer service = relayInbox service postServer service = relayInbox service
:<|> subscriptionDelivery service :<|> subscriptionDelivery service
@ -177,27 +181,24 @@ postServer service = relayInbox service
:<|> tagUnsubscribe service :<|> tagUnsubscribe service
-- | delivery endpoint: receive posts of a handled tag and enqueue them for relaying relayInbox :: DHT d => PostService d -> Hashtag -> Txt.Text -> Handler NoContent
relayInbox :: DHT d => PostService d -> Hashtag -> Text -> Handler NoContent
relayInbox serv tag posts = do relayInbox serv tag posts = do
let let
-- skip checking whether the post actually contains the tag, just drop full post -- skip checking whether the post actually contains the tag, just drop full post
postIDs = head . Txt.splitOn "," <$> Txt.lines posts postIDs = head . Txt.splitOn "," <$> Txt.lines posts
-- if tag is not in own responsibility, return a 410 Gone -- if tag is not in own responsibility, return a 410 Gone
responsible <- liftIO $ isResponsibleFor (baseDHT serv) (hashtagToId tag) responsible <- liftIO $ isResponsibleFor (baseDHT serv) (genKeyID . Txt.unpack $ tag)
if responsible if responsible
then pure () then pure ()
else else
throwError $ err410 { errBody = "Relay is not responsible for this tag"} (throwError $ err410 { errBody = "Relay is not responsible for this tag"})
broadcastChan <- liftIO $ atomically $ getTagBroadcastChannel serv tag broadcastChan <- liftIO $ atomically $ getTagBroadcastChannel serv tag
maybe maybe
-- if noone subscribed to the tag, nothing needs to be done -- if noone subscribed to the tag, nothing needs to be done
(pure ()) (pure ())
-- otherwise enqueue posts into broadcast queue of the tag -- otherwise enqueue posts into broadcast queue of the tag
(\queue -> do (\queue ->
liftIO $ forM_ postIDs (atomically . writeTChan queue) liftIO $ forM_ postIDs (atomically . writeTChan queue)
-- report the received post for statistic purposes
liftIO . atomically . writeTQueue (statsQueue serv) $ StatsEvent RelayReceiveEvent (length postIDs) (hashtagToId tag)
) )
broadcastChan broadcastChan
pure NoContent pure NoContent
@ -208,8 +209,7 @@ newtype UnhandledTagException = UnhandledTagException String
instance Exception UnhandledTagException instance Exception UnhandledTagException
-- | delivery endpoint: receives a list of subscribers of tags and their outstanding queues for migration subscriptionDelivery :: DHT d => PostService d -> Integer -> Txt.Text -> Handler Txt.Text
subscriptionDelivery :: DHT d => PostService d -> Integer -> Text -> Handler Text
subscriptionDelivery serv senderID subList = do subscriptionDelivery serv senderID subList = do
let let
tagSubs = Txt.lines subList tagSubs = Txt.lines subList
@ -221,7 +221,7 @@ subscriptionDelivery serv senderID subList = do
-- not-handled tag occurs, this results in a single large transaction. -- not-handled tag occurs, this results in a single large transaction.
-- Hopefully the performance isn't too bad. -- Hopefully the performance isn't too bad.
res <- liftIO . atomically $ (foldM (\_ tag' -> do res <- liftIO . atomically $ (foldM (\_ tag' -> do
responsible <- isResponsibleForSTM (baseDHT serv) (hashtagToId tag') responsible <- isResponsibleForSTM (baseDHT serv) (genKeyID . Txt.unpack $ tag')
if responsible if responsible
then processTag (subscribers serv) tag' then processTag (subscribers serv) tag'
else throwSTM $ UnhandledTagException (Txt.unpack tag' <> " not handled by this relay") else throwSTM $ UnhandledTagException (Txt.unpack tag' <> " not handled by this relay")
@ -243,7 +243,7 @@ subscriptionDelivery serv senderID subList = do
Right _ -> pure "" Right _ -> pure ""
-- TODO: check and only accept tags in own (future?) responsibility -- TODO: check and only accept tags in own (future?) responsibility
where where
processTag :: TVar RelayTags -> Text -> STM () processTag :: TVar RelayTags -> Txt.Text -> STM ()
processTag subscriberSTM tagData = do processTag subscriberSTM tagData = do
let let
tag:subText:lease:posts:_ = Txt.splitOn "," tagData tag:subText:lease:posts:_ = Txt.splitOn "," tagData
@ -254,62 +254,57 @@ subscriptionDelivery serv senderID subList = do
enqueueSubscription subscriberSTM (normaliseTag tag) sub postList leaseTime enqueueSubscription subscriberSTM (normaliseTag tag) sub postList leaseTime
-- | endpoint for fetching a post by its ID postFetch :: PostService d -> Txt.Text -> Handler Txt.Text
postFetch :: PostService d -> Text -> Handler Text postFetch serv postID = do
postFetch serv _ = do postSet <- liftIO . readTVarIO . ownPosts $ serv
-- decision: for saving memory do not store published posts, just if HSet.member postID postSet
-- pretend there is a post for each requested ID -- decision: always return the same placeholder post
liftIO . atomically . writeTQueue (statsQueue serv) $ StatsEvent IncomingPostFetchEvent 1 0 -- tag fetched for is irrelevant then pure placeholderPost
pure placeholderPost else throwError $ err404 { errBody = "No post found with this ID" }
-- | endpoint for fetching multiple posts of this instance by their IDs postMultiFetch :: PostService d -> Txt.Text -> Handler Txt.Text
postMultiFetch :: PostService d -> Text -> Handler Text
postMultiFetch serv postIDs = do postMultiFetch serv postIDs = do
let let idList = Txt.lines postIDs
idList = Txt.lines postIDs postSet <- liftIO . readTVarIO . ownPosts $ serv
-- decision: for saving memory do not store published posts, just -- look up existence of all given post IDs, fail if even one is missing
-- pretend there is a post for each requested ID foldM (\response postID ->
response = foldl (\response' _ -> if HSet.member postID postSet
placeholderPost <> "\n" <> response' then pure $ placeholderPost <> "\n" <> response
else throwError $ err404 { errBody = "No post found with this ID" }
) "" idList ) "" idList
liftIO . atomically . writeTQueue (statsQueue serv) $ StatsEvent IncomingPostFetchEvent (length idList) 0 -- tag fetched for is irrelevant
pure response
-- | delivery endpoint: inbox for initially publishing a post at an instance postInbox :: PostService d -> Txt.Text -> Handler NoContent
postInbox :: PostService d -> Text -> Handler NoContent
postInbox serv post = do postInbox serv post = do
-- extract contained hashtags -- extract contained hashtags
let let
containedTags = fmap (normaliseTag . Txt.tail) . filter ((==) '#' . Txt.head) . Txt.words $ post containedTags = fmap (normaliseTag . Txt.tail) . filter ((==) '#' . Txt.head) . Txt.words $ post
-- generate post ID -- generate post ID
postId <- liftIO $ Txt.pack . show <$> (randomRIO (0, 2^(128::Integer)-1) :: IO Integer) postId <- liftIO $ Txt.pack . show <$> (randomRIO (0, 2^(128::Integer)-1) :: IO Integer)
-- decision: for saving memory do not store published post IDs, just deliver a post for any requested ID -- add ID to own posts
liftIO . atomically $ modifyTVar' (ownPosts serv) (HSet.insert postId)
-- enqueue a relay job for each tag -- enqueue a relay job for each tag
liftIO $ forM_ (containedTags :: [Text]) (\tag -> liftIO $ forM_ (containedTags :: [Txt.Text]) (\tag ->
atomically $ writeTQueue (relayInQueue serv) (tag, postId, post) atomically $ writeTQueue (relayInQueue serv) (tag, postId, post)
) )
pure NoContent pure NoContent
-- | delivery endpoint: receive postIDs of a certain subscribed hashtag tagDelivery :: PostService d -> Txt.Text -> Txt.Text -> Handler Txt.Text
tagDelivery :: PostService d -> Text -> Text -> Handler Text
tagDelivery serv hashtag posts = do tagDelivery serv hashtag posts = do
let postIDs = Txt.lines posts let postIDs = Txt.lines posts
subscriptions <- liftIO . readTVarIO . ownSubscriptions $ serv subscriptions <- liftIO . readTVarIO . ownSubscriptions $ serv
if isJust (HMap.lookup (hashtagToId hashtag) subscriptions) if isJust (HMap.lookup (genKeyID . Txt.unpack $ hashtag) subscriptions)
then -- TODO: increase a counter/ statistics for received posts of this tag then -- TODO: increase a counter/ statistics for received posts of this tag
liftIO $ forM_ postIDs $ atomically . writeTQueue (postFetchQueue serv) liftIO $ forM_ postIDs $ atomically . writeTQueue (postFetchQueue serv)
else -- silently drop posts from unsubscribed tags else -- silently drop posts from unsubscribed tags
pure () pure ()
pure $ "Received a postID for tag " <> hashtag pure $ "Received a postID for tag " <> hashtag
tagSubscribe :: DHT d => PostService d -> Txt.Text -> Maybe Txt.Text -> Handler Integer
-- | receive subscription requests to a handled hashtag
tagSubscribe :: DHT d => PostService d -> Text -> Maybe Text -> Handler Integer
tagSubscribe serv hashtag origin = do tagSubscribe serv hashtag origin = do
responsible <- liftIO $ isResponsibleFor (baseDHT serv) (hashtagToId hashtag) responsible <- liftIO $ isResponsibleFor (baseDHT serv) (genKeyID . Txt.unpack $ hashtag)
if not responsible if not responsible
-- GONE if not responsible -- GONE if not responsible
then throwError err410 { errBody = "not responsible for this tag" } then throwError err410 { errBody = "not responsible for this tag" }
@ -323,14 +318,12 @@ tagSubscribe serv hashtag origin = do
let leaseTime = now + confSubscriptionExpiryTime (serviceConf serv) let leaseTime = now + confSubscriptionExpiryTime (serviceConf serv)
-- setup subscription entry -- setup subscription entry
_ <- liftIO . atomically $ setupSubscriberChannel (subscribers serv) hashtag (BSU.toString $ HTTP.host req, HTTP.port req) leaseTime _ <- liftIO . atomically $ setupSubscriberChannel (subscribers serv) hashtag (BSU.toString $ HTTP.host req, HTTP.port req) leaseTime
--liftIO . putStrLn $ "just got a subscription to " <> Txt.unpack hashtag
pure $ round leaseTime pure $ round leaseTime
-- | receive and handle unsubscription requests regarding a handled tag tagUnsubscribe :: DHT d => PostService d -> Txt.Text -> Maybe Txt.Text -> Handler Txt.Text
tagUnsubscribe :: DHT d => PostService d -> Text -> Maybe Text -> Handler Text
tagUnsubscribe serv hashtag origin = do tagUnsubscribe serv hashtag origin = do
responsible <- liftIO $ isResponsibleFor (baseDHT serv) (hashtagToId hashtag) responsible <- liftIO $ isResponsibleFor (baseDHT serv) (genKeyID . Txt.unpack $ hashtag)
if not responsible if not responsible
-- GONE if not responsible -- GONE if not responsible
then throwError err410 { errBody = "not responsible for this tag" } then throwError err410 { errBody = "not responsible for this tag" }
@ -348,15 +341,8 @@ tagUnsubscribe serv hashtag origin = do
clientAPI :: Proxy PostServiceAPI clientAPI :: Proxy PostServiceAPI
clientAPI = Proxy clientAPI = Proxy
relayInboxClient
:<|> subscriptionDeliveryClient relayInboxClient :<|> subscriptionDeliveryClient :<|> postFetchClient :<|> postMultiFetchClient :<|> postInboxClient :<|> tagDeliveryClient :<|> tagSubscribeClient :<|> tagUnsubscribeClient = client clientAPI
:<|> postFetchClient
:<|> postMultiFetchClient
:<|> postInboxClient
:<|> tagDeliveryClient
:<|> tagSubscribeClient
:<|> tagUnsubscribeClient
= client clientAPI
-- | Deliver the subscriber list of all hashtags in the interval [fromTag, toTag] -- | Deliver the subscriber list of all hashtags in the interval [fromTag, toTag]
@ -369,7 +355,7 @@ clientDeliverSubscriptions :: PostService d
-> (String, Int) -- ^ hostname and port of instance to deliver to -> (String, Int) -- ^ hostname and port of instance to deliver to
-> IO (Either String ()) -- Either signals success or failure -> IO (Either String ()) -- Either signals success or failure
clientDeliverSubscriptions serv fromNode fromKey toKey (toHost, toPort) = do clientDeliverSubscriptions serv fromNode fromKey toKey (toHost, toPort) = do
-- collect tag interval -- collect tag intearval
intervalTags <- takeRMapSuccessorsFromTo fromKey toKey <$> readTVarIO (subscribers serv) intervalTags <- takeRMapSuccessorsFromTo fromKey toKey <$> readTVarIO (subscribers serv)
-- returns a [ (TagSubscribersSTM, TChan PostID, Hashtag) ] -- returns a [ (TagSubscribersSTM, TChan PostID, Hashtag) ]
-- extract subscribers and posts -- extract subscribers and posts
@ -399,7 +385,7 @@ clientDeliverSubscriptions serv fromNode fromKey toKey (toHost, toPort) = do
Right _ -> do Right _ -> do
atomically $ atomically $
modifyTVar' (subscribers serv) $ \tagMap -> modifyTVar' (subscribers serv) $ \tagMap ->
foldr deleteRMapEntry tagMap ((\(_, _, t) -> hashtagToId t) <$> intervalTags) foldr deleteRMapEntry tagMap ((\(_, _, t) -> genKeyID . Txt.unpack $ t) <$> intervalTags)
pure . Right $ () pure . Right $ ()
where where
channelGetAll :: TChan a -> STM [a] channelGetAll :: TChan a -> STM [a]
@ -410,8 +396,7 @@ clientDeliverSubscriptions serv fromNode fromKey toKey (toHost, toPort) = do
maybe (pure acc) (\x -> channelGetAll' chan (x:acc)) haveRead maybe (pure acc) (\x -> channelGetAll' chan (x:acc)) haveRead
-- | Subscribe the client to the given hashtag. On success it returns the given lease time, -- | Subscribe the client to the given hashtag. On success it returns the given lease time.
-- but also records the subscription in its own data structure.
clientSubscribeTo :: DHT d => PostService d -> Hashtag -> IO (Either String Integer) clientSubscribeTo :: DHT d => PostService d -> Hashtag -> IO (Either String Integer)
clientSubscribeTo serv tag = do clientSubscribeTo serv tag = do
lookupRes <- lookupKey (baseDHT serv) (Txt.unpack tag) lookupRes <- lookupKey (baseDHT serv) (Txt.unpack tag)
@ -426,13 +411,9 @@ clientSubscribeTo serv tag = do
Left (FailureResponse _ fresp) Left (FailureResponse _ fresp)
|(HTTPT.statusCode . responseStatusCode $ fresp) == 410 && allowRetry -> do -- responsibility gone, force new lookup |(HTTPT.statusCode . responseStatusCode $ fresp) == 410 && allowRetry -> do -- responsibility gone, force new lookup
newRes <- forceLookupKey (baseDHT serv) (Txt.unpack tag) newRes <- forceLookupKey (baseDHT serv) (Txt.unpack tag)
--putStrLn $ "failed subscribing to " <> Txt.unpack tag <> " on " <> foundHost
doSubscribe newRes False doSubscribe newRes False
Left err -> pure . Left . show $ err Left err -> pure . Left . show $ err
Right lease -> do Right lease -> pure . Right $ lease
atomically . modifyTVar' (ownSubscriptions serv) $ HMap.insert (hashtagToId tag) (fromInteger lease)
--putStrLn $ "just subscribed to " <> Txt.unpack tag <> " on " <> foundHost
pure . Right $ lease
) )
lookupResponse lookupResponse
@ -454,9 +435,7 @@ clientUnsubscribeFrom serv tag = do
newRes <- forceLookupKey (baseDHT serv) (Txt.unpack tag) newRes <- forceLookupKey (baseDHT serv) (Txt.unpack tag)
doUnsubscribe newRes False doUnsubscribe newRes False
Left err -> pure . Left . show $ err Left err -> pure . Left . show $ err
Right _ -> do Right _ -> pure . Right $ ()
atomically . modifyTVar' (ownSubscriptions serv) $ HMap.delete (hashtagToId tag)
pure . Right $ ()
) )
lookupResponse lookupResponse
@ -466,11 +445,11 @@ clientUnsubscribeFrom serv tag = do
-- the post to the responsible relays. -- the post to the responsible relays.
-- As the initial publishing isn't done by a specific relay (but *to* a specific relay -- As the initial publishing isn't done by a specific relay (but *to* a specific relay
-- instead), the function does *not* take a PostService as argument. -- instead), the function does *not* take a PostService as argument.
clientPublishPost :: HTTP.Manager -- ^ for better performance, a shared HTTP manager has to be provided clientPublishPost :: HTTP.Manager -- for better performance, a shared HTTP manager has to be provided
-> String -- ^ hostname -> String -- hostname
-> Int -- ^ port -> Int -- port
-> PostContent -- ^ post content -> PostContent -- post content
-> IO (Either String ()) -- ^ error or success -> IO (Either String ()) -- error or success
clientPublishPost httpman hostname port postC = do clientPublishPost httpman hostname port postC = do
resp <- runClientM (postInboxClient postC) (mkClientEnv httpman (BaseUrl Http hostname port "")) resp <- runClientM (postInboxClient postC) (mkClientEnv httpman (BaseUrl Http hostname port ""))
pure . bimap show (const ()) $ resp pure . bimap show (const ()) $ resp
@ -513,7 +492,7 @@ setupSubscriberChannel tagMapSTM tag subscriber leaseTime = do
broadcastChan <- newBroadcastTChan broadcastChan <- newBroadcastTChan
tagOutChan <- dupTChan broadcastChan tagOutChan <- dupTChan broadcastChan
newSubMapSTM <- newTVar $ HMap.singleton subscriber (tagOutChan, leaseTime) newSubMapSTM <- newTVar $ HMap.singleton subscriber (tagOutChan, leaseTime)
writeTVar tagMapSTM $ addRMapEntry (hashtagToId tag) (newSubMapSTM, broadcastChan, tag) tagMap writeTVar tagMapSTM $ addRMapEntry (genKeyID . Txt.unpack $ tag) (newSubMapSTM, broadcastChan, tag) tagMap
pure tagOutChan pure tagOutChan
Just (foundSubMapSTM, broadcastChan, _) -> do Just (foundSubMapSTM, broadcastChan, _) -> do
-- otherwise use the existing subscriber map -- otherwise use the existing subscriber map
@ -541,7 +520,7 @@ deleteSubscription tagMapSTM tag subscriber = do
-- if there are no subscriptions for the tag anymore, remove its -- if there are no subscriptions for the tag anymore, remove its
-- data sttructure altogether -- data sttructure altogether
if HMap.null newSubMap if HMap.null newSubMap
then writeTVar tagMapSTM $ deleteRMapEntry (hashtagToId tag) tagMap then writeTVar tagMapSTM $ deleteRMapEntry (genKeyID . Txt.unpack $ tag) tagMap
-- otherwise just remove the subscription of that node -- otherwise just remove the subscription of that node
else writeTVar foundSubMapSTM newSubMap else writeTVar foundSubMapSTM newSubMap
@ -562,39 +541,12 @@ getTagBroadcastChannel serv tag = do
-- | look up the subscription data of a tag -- | look up the subscription data of a tag
lookupTagSubscriptions :: Hashtag -> RingMap NodeID a -> Maybe a lookupTagSubscriptions :: Hashtag -> RingMap NodeID a -> Maybe a
lookupTagSubscriptions tag = rMapLookup (hashtagToId tag) lookupTagSubscriptions tag = rMapLookup (genKeyID . Txt.unpack $ tag)
-- normalise the unicode representation of a string to NFC and convert to lower case -- normalise the unicode representation of a string to NFC
normaliseTag :: Text -> Text normaliseTag :: Txt.Text -> Txt.Text
normaliseTag = Txt.toLower . Txt.fromStrict . normalize NFC . Txt.toStrict normaliseTag = Txt.fromStrict . normalize NFC . Txt.toStrict
-- | convert a hashtag to its representation on the DHT
hashtagToId :: Hashtag -> NodeID
hashtagToId = genKeyID . Txt.unpack
readUpToTChan :: Int -> TChan a -> STM [a]
readUpToTChan 0 _ = pure []
readUpToTChan n chan = do
readFromChan <- tryReadTChan chan
case readFromChan of
Nothing -> pure []
Just val -> do
moreReads <- readUpToTChan (pred n) chan
pure (val:moreReads)
readUpToTQueue :: Int -> TQueue a -> STM [a]
readUpToTQueue 0 _ = pure []
readUpToTQueue n q = do
readFromQueue <- tryReadTQueue q
case readFromQueue of
Nothing -> pure []
Just val -> do
moreReads <- readUpToTQueue (pred n) q
pure (val:moreReads)
-- | define how to convert all showable types to PlainText -- | define how to convert all showable types to PlainText
-- No idea what I'm doing with these overlappable instances though ¯\_(ツ)_/¯ -- No idea what I'm doing with these overlappable instances though ¯\_(ツ)_/¯
@ -607,78 +559,28 @@ instance {-# OVERLAPPABLE #-} Read a => MimeUnrender PlainText a where
-- ====== worker threads ====== -- ====== worker threads ======
-- TODO: make configurable
numParallelDeliveries = 10
launchWorkerThreads :: DHT d => PostService d -> IO ()
launchWorkerThreads serv = concurrently_
(processIncomingPosts serv)
$ concurrently_
(purgeSubscriptionsThread serv)
$ concurrently_
(fetchTagPosts serv)
(relayWorker serv)
-- | periodically remove expired subscription entries from relay subscribers
purgeSubscriptionsThread :: PostService d -> IO ()
purgeSubscriptionsThread serv = forever $ do
-- read config
now <- getPOSIXTime
let
purgeInterval = confSubscriptionExpiryTime (serviceConf serv) / 10
-- no need to atomically lock this, as newly incoming subscriptions do not
-- need to be purged
tagMap <- readTVarIO $ subscribers serv
forM_ tagMap $ \(subscriberMapSTM, _, _) ->
-- but each subscriberMap needs to be modified atomically
atomically . modifyTVar' subscriberMapSTM $ HMap.filter (\(_, ts) -> ts > now)
threadDelay $ fromEnum purgeInterval `div` 10^6
-- | process the pending relay inbox of incoming posts from the internal queue: -- | process the pending relay inbox of incoming posts from the internal queue:
-- Look up responsible relay node for given hashtag and forward post to it -- Look up responsible relay node for given hashtag and forward post to it
processIncomingPosts :: DHT d => PostService d -> IO () processIncomingPosts :: DHT d => PostService d -> IO ()
processIncomingPosts serv = forever $ do processIncomingPosts serv = forever $ do
-- blocks until available -- blocks until available
deliveriesToProcess <- atomically $ do -- TODO: process multiple in parallel
readResult <- readUpToTQueue numParallelDeliveries $ relayInQueue serv (tag, pID, pContent) <- atomically . readTQueue $ relayInQueue serv
if null readResult lookupRes <- lookupKey (baseDHT serv) (Txt.unpack tag)
then retry case lookupRes of
else pure readResult -- no vserver active => wait and retry
runningJobs <- forM deliveriesToProcess $ \(tag, pID, pContent) -> async $ do Nothing -> threadDelay $ 10 * 10^6
let pIdUri = "http://" <> (Txt.pack . confServiceHost . serviceConf $ serv) <> ":" <> (fromString . show . confServicePort . serviceConf $ serv) <> "/post/" <> pID Just (responsibleHost, responsiblePort) -> do
lookupRes <- lookupKey (baseDHT serv) (Txt.unpack tag) resp <- runClientM (relayInboxClient tag $ pID <> "," <> pContent) (mkClientEnv (httpMan serv) (BaseUrl Http responsibleHost (fromIntegral responsiblePort) ""))
case lookupRes of case resp of
-- no vserver active => wait and retry Left err -> do
Nothing -> threadDelay (10 * 10^6) >> pure (Left "no vserver active") putStrLn $ "Error: " <> show err
Just (responsibleHost, responsiblePort) -> do -- 410 error indicates outdated responsibility mapping
resp <- runClientM (relayInboxClient tag $ pIdUri <> "," <> pContent) (mkClientEnv (httpMan serv) (BaseUrl Http responsibleHost (fromIntegral responsiblePort) "")) -- Simplification: just invalidate the mapping entry on all errors, force a re-lookup and re-queue the post
case resp of -- TODO: keep track of maximum retries
Left err -> do _ <- forceLookupKey (baseDHT serv) (Txt.unpack tag)
-- 410 error indicates outdated responsibility mapping atomically . writeTQueue (relayInQueue serv) $ (tag, pID, pContent)
-- Simplification: just invalidate the mapping entry on all errors, force a re-lookup and re-queue the post Right yay -> putStrLn $ "Yay! " <> show yay
-- TODO: keep track of maximum retries
_ <- forceLookupKey (baseDHT serv) (Txt.unpack tag)
atomically . writeTQueue (relayInQueue serv) $ (tag, pID, pContent)
pure . Left $ "Error: " <> show err
Right _ -> do
-- idea for the experiment: each post publication makes the initial posting instance subscribe to all contained tags
now <- getPOSIXTime
subscriptionStatus <- HMap.lookup (hashtagToId tag) <$> readTVarIO (ownSubscriptions serv)
-- if not yet subscribed or subscription expires within 5 minutes, (re)subscribe to tag
when (maybe True (\subLease -> now - subLease < 300) subscriptionStatus) $
void $ clientSubscribeTo serv tag
-- for evaluation, return the tag of the successfully forwarded post
pure $ Right tag
-- collect async results
results <- mapM waitCatch runningJobs
-- report the count of published posts for statistics
atomically . writeTQueue (statsQueue serv) $ StatsEvent PostPublishEvent (length . rights $ results) 0 -- hashtag published to doesn't matter
pure ()
-- | process the pending fetch jobs of delivered post IDs: Delivered posts are tried to be fetched from their URI-ID -- | process the pending fetch jobs of delivered post IDs: Delivered posts are tried to be fetched from their URI-ID
@ -688,197 +590,18 @@ fetchTagPosts serv = forever $ do
-- TODO: batching, retry -- TODO: batching, retry
-- TODO: process multiple in parallel -- TODO: process multiple in parallel
pIdUri <- atomically . readTQueue $ postFetchQueue serv pIdUri <- atomically . readTQueue $ postFetchQueue serv
fetchReq <- HTTP.parseRequest . Txt.unpack $ pIdUri fetchReq <- HTTP.parseRequest . Txt.unpack $pIdUri
resp <- try $ HTTP.httpLbs fetchReq (httpMan serv) :: IO (Either HTTP.HttpException (HTTP.Response BSUL.ByteString)) resp <- try $ HTTP.httpLbs fetchReq (httpMan serv) :: IO (Either HTTP.HttpException (HTTP.Response BSUL.ByteString))
case resp of case resp of
Right response -> Right response ->
-- TODO error handling, retry if HTTPT.statusCode (HTTP.responseStatus response) == 200
--if HTTPT.statusCode (HTTP.responseStatus response) == 200 then
-- then -- success, TODO: statistics
-- -- success, TODO: statistics putStrLn "post fetch success"
-- else else
pure () -- TODO error handling, retry
pure ()
Left _ -> Left _ ->
-- TODO error handling, retry -- TODO error handling, retry
pure () pure ()
relayWorker :: PostService d -> IO ()
relayWorker serv = forever $ do
-- atomically (to be able to retry) fold a list of due delivery actions
jobsToProcess <- atomically $ do
subscriptionMap <- readTVar $ subscribers serv
jobList <- D.toList <$> foldM (\jobAcc (subscriberMapSTM, _, tag) -> do
subscriberMap <- readTVar subscriberMapSTM
foldM (\jobAcc' ((subHost, subPort), (postChan, _)) -> do
postsToDeliver <- readUpToTChan 500 postChan
let postDeliveryAction = runClientM (tagDeliveryClient tag (Txt.unlines postsToDeliver)) (mkClientEnv (httpMan serv) (BaseUrl Http subHost (fromIntegral subPort) ""))
-- append relay push job to job list
pure $ if not (null postsToDeliver)
then jobAcc' `D.snoc` (do
deliveryResult <- postDeliveryAction
either
(const $ pure ())
-- on successful push, record that event for statistics
(const . atomically . writeTQueue (statsQueue serv) $ StatsEvent RelayDeliveryEvent (length postsToDeliver) (hashtagToId tag))
deliveryResult
pure deliveryResult
)
else jobAcc'
) jobAcc $ HMap.toList subscriberMap
) D.empty subscriptionMap
-- if no relay jobs, then retry
if null jobList
then retry
else pure jobList
-- when processing the list, send several deliveries in parallel
forM_ (chunksOf numParallelDeliveries jobsToProcess) $ \jobset -> do
runningJobs <- mapM async jobset
-- so far just dropping failed attempts, TODO: retry mechanism
results <- mapM waitCatch runningJobs
let
successfulResults = rights results
unsuccessfulResults = lefts results
unless (null unsuccessfulResults) $ putStrLn ("ERR: " <> show (length unsuccessfulResults) <> " failed deliveries!")
putStrLn $ "successfully relayed " <> show (length successfulResults)
pure ()
-- ======= statistics/measurement and logging =======
data StatsEventType = PostPublishEvent
| RelayReceiveEvent
| RelayDeliveryEvent
| IncomingPostFetchEvent
deriving (Enum, Show, Eq)
-- | Represents measurement event of a 'StatsEventType' with a count relevant for a certain key
data StatsEvent = StatsEvent StatsEventType Int NodeID
deriving (Show, Eq)
-- | measured rates of relay performance
-- TODO: maybe include other metrics in here as well, like number of subscribers?
data RelayStats = RelayStats
{ relayReceiveRates :: RingMap NodeID Double
-- ^ rate of incoming posts in the responsibility of this relay
, relayDeliveryRates :: RingMap NodeID Double
-- ^ rate of relayed outgoing posts
, postFetchRate :: Double -- no need to differentiate between tags
-- ^ number of post-fetches delivered
, postPublishRate :: Double
-- ^ rate of initially publishing posts through this instance
}
deriving (Show, Eq)
launchStatsThreads :: PostService d -> IO ()
launchStatsThreads serv = do
-- create shared accumulator
sharedAccum <- newTVarIO emptyStats
concurrently_
(accumulateStatsThread sharedAccum $ statsQueue serv)
(evaluateStatsThread serv sharedAccum)
-- | Read stats events from queue and add them to a shared accumulator.
-- Instead of letting the events accumulate in the queue and allocate linear memory, immediately fold the result.
accumulateStatsThread :: TVar RelayStats -> TQueue StatsEvent -> IO ()
accumulateStatsThread statsAccumulator statsQ = forever $ do
-- blocks until stats event arrives
event <- atomically $ readTQueue statsQ
-- add the event number to current accumulator
atomically $ modifyTVar' statsAccumulator $ statsAdder event
-- | add incoming stats events to accumulator value
statsAdder :: StatsEvent -> RelayStats -> RelayStats
statsAdder event stats = case event of
StatsEvent PostPublishEvent num _ ->
stats {postPublishRate = fromIntegral num + postPublishRate stats}
StatsEvent RelayReceiveEvent num key ->
stats {relayReceiveRates = sumIfEntryExists key (fromIntegral num) (relayReceiveRates stats)}
StatsEvent RelayDeliveryEvent num key ->
stats {relayDeliveryRates = sumIfEntryExists key (fromIntegral num) (relayDeliveryRates stats)}
StatsEvent IncomingPostFetchEvent num _ ->
stats {postFetchRate = fromIntegral num + postFetchRate stats}
where
sumIfEntryExists = addRMapEntryWith (\newVal oldVal ->
let toInsert = fromJust $ extractRingEntry newVal
in
case oldVal of
KeyEntry n -> KeyEntry (n + toInsert)
ProxyEntry pointer (Just (KeyEntry n)) -> ProxyEntry pointer (Just (KeyEntry $ n + toInsert))
ProxyEntry pointer Nothing -> ProxyEntry pointer (Just newVal)
_ -> error "RingMap nested too deeply"
)
-- Periodically exchange the accumulated statistics with empty ones, evaluate them
-- and make them the current statistics of the service.
evaluateStatsThread :: PostService d -> TVar RelayStats -> IO ()
evaluateStatsThread serv statsAcc = getPOSIXTime >>= loop
where
loop previousTs = do
threadDelay $ confStatsEvalDelay (serviceConf serv)
-- get and reset the stats accumulator
summedStats <- atomically $ do
stats <- readTVar statsAcc
writeTVar statsAcc emptyStats
pure stats
-- as the transaction might retry several times, current time needs to
-- be read afterwards
now <- getPOSIXTime
-- evaluate stats rate and replace server stats
-- persistently store in a TVar so it can be retrieved later by the DHT
let timePassed = (now - previousTs) * fromIntegral (confSpeedupFactor $ serviceConf serv)
rateStats = evaluateStats timePassed summedStats
atomically $ writeTVar (loadStats serv) rateStats
-- and now what? write a log to file
-- format: total relayReceiveRates;total relayDeliveryRates;postFetchRate;postPublishRate; subscriberSum
-- later: current (reported) load, target load
subscriberSum <- sumSubscribers
TxtI.hPutStrLn (logFileHandle serv) $
format (fixed 9 % ";" % fixed 20 % ";" % fixed 20 % ";" % fixed 20 % ";" % fixed 20 % ";" % int )
(realToFrac now :: Double)
(sum . relayReceiveRates $ rateStats)
(sum . relayDeliveryRates $ rateStats)
(postPublishRate rateStats)
(postFetchRate rateStats)
subscriberSum
loop now
sumSubscribers = do
tagMap <- readTVarIO $ subscribers serv
foldM (\subscriberSum (subscriberMapSTM, _, _) -> do
subscriberMap <- readTVarIO subscriberMapSTM
pure $ subscriberSum + HMap.size subscriberMap
)
0 tagMap
-- | Evaluate the accumulated statistic events: Currently mostly calculates the event
-- rates by dividing through the collection time frame
evaluateStats :: POSIXTime -> RelayStats -> RelayStats
evaluateStats timeInterval summedStats =
-- first sum all event numbers, then divide through number of seconds passed to
-- get rate per second
RelayStats
{ relayReceiveRates = (/ intervalSeconds) <$> relayReceiveRates summedStats
, relayDeliveryRates = (/ intervalSeconds) <$> relayDeliveryRates summedStats
, postPublishRate = postPublishRate summedStats / intervalSeconds
, postFetchRate = postFetchRate summedStats / intervalSeconds
}
where
intervalSeconds = realToFrac timeInterval
emptyStats :: RelayStats
emptyStats = RelayStats
{ relayReceiveRates = emptyRMap
, relayDeliveryRates = emptyRMap
, postFetchRate = 0
, postPublishRate = 0
}

View file

@ -1,37 +0,0 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE InstanceSigs #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE TypeOperators #-}
module Hash2Pub.PostService.API where
import Data.Text.Lazy (Text)
import Servant
type PostServiceAPI = "relay" :> "inbox" :> Capture "hashtag" Text :> ReqBody '[PlainText] Text :> PutCreated '[PlainText] NoContent
-- delivery endpoint at responsible relay for delivering posts of $tag for distribution
:<|> "relay" :> "subscribers" :> Capture "senderID" Integer :> ReqBody '[PlainText] Text :> PostNoContent '[PlainText] Text
-- endpoint for delivering the subscriptions and outstanding queue
:<|> "post" :> Capture "postid" Text :> Get '[PlainText] Text
-- fetch endpoint for posts, full post ID is http://$domain/post/$postid
:<|> "posts" :> ReqBody '[PlainText] Text :> Post '[PlainText] Text
-- endpoint for fetching multiple posts at once
:<|> "posts" :> "inbox" :> ReqBody '[PlainText] Text :> PutCreated '[PlainText] NoContent
-- delivery endpoint of newly published posts of the relay's instance
:<|> "tags" :> Capture "hashtag" Text :> ReqBody '[PlainText] Text :> PostCreated '[PlainText] Text
-- delivery endpoint for posts of $tag at subscribing instance
:<|> "tags" :> Capture "hashtag" Text :> "subscribe" :> Header "Origin" Text :> Get '[PlainText] Integer
-- endpoint for subscribing the instance specified in
-- the Origin header to $hashtag.
-- Returns subscription lease time in seconds.
:<|> "tags" :> Capture "hashtag" Text :> "unsubscribe" :> Header "Origin" Text :> Get '[PlainText] Text
-- endpoint for unsubscribing the instance specified in
-- the Origin header to $hashtag
-- | needed for guiding type inference
exposedPostServiceAPI :: Proxy PostServiceAPI
exposedPostServiceAPI = Proxy

View file

@ -1,5 +1,7 @@
module Hash2Pub.ProtocolTypes where module Hash2Pub.ProtocolTypes where
import qualified Data.Map as Map
import Data.Maybe (mapMaybe)
import qualified Data.Set as Set import qualified Data.Set as Set
import Data.Time.Clock.POSIX (POSIXTime) import Data.Time.Clock.POSIX (POSIXTime)

View file

@ -23,30 +23,7 @@ instance (Bounded k, Ord k, Eq a) => Eq (RingMap k a) where
a == b = getRingMap a == getRingMap b a == b = getRingMap a == getRingMap b
instance (Bounded k, Ord k, Show k, Show a) => Show (RingMap k a) where instance (Bounded k, Ord k, Show k, Show a) => Show (RingMap k a) where
show rmap = shows ("RingMap " :: String) (show $ getRingMap rmap) show rmap = shows "RingMap " (show $ getRingMap rmap)
instance (Bounded k, Ord k) => Functor (RingMap k) where
-- | map a function over all payload values of a 'RingMap'
fmap f = RingMap . Map.map traversingF . getRingMap
where
traversingF (KeyEntry a) = KeyEntry (f a)
traversingF (ProxyEntry pointer (Just entry)) = ProxyEntry pointer (Just $ traversingF entry)
traversingF (ProxyEntry pointer Nothing) = ProxyEntry pointer Nothing
instance (Bounded k, Ord k) => Foldable (RingMap k) where
foldr f initVal = Map.foldr traversingFR initVal . getRingMap
where
traversingFR (KeyEntry a) acc = f a acc
traversingFR (ProxyEntry _ Nothing) acc = acc
traversingFR (ProxyEntry _ (Just entry)) acc = traversingFR entry acc
foldl f initVal = Map.foldl traversingFL initVal . getRingMap
where
traversingFL acc (KeyEntry a) = f acc a
traversingFL acc (ProxyEntry _ Nothing) = acc
traversingFL acc (ProxyEntry _ (Just entry)) = traversingFL acc entry
-- | entry of a 'RingMap' that holds a value and can also -- | entry of a 'RingMap' that holds a value and can also
-- wrap around the lookup direction at the edges of the name space. -- wrap around the lookup direction at the edges of the name space.
@ -156,7 +133,7 @@ rMapLookupPred :: (Bounded k, Ord k, Num k)
rMapLookupPred = lookupWrapper Map.lookupLT Map.lookupLE Backwards rMapLookupPred = lookupWrapper Map.lookupLT Map.lookupLE Backwards
addRMapEntryWith :: (Bounded k, Ord k) addRMapEntryWith :: (Bounded k, Ord k)
=> (RingEntry k a -> RingEntry k a -> RingEntry k a) -- ^ f new_value mold_value => (RingEntry k a -> RingEntry k a -> RingEntry k a)
-> k -- ^ key -> k -- ^ key
-> a -- ^ value -> a -- ^ value
-> RingMap k a -> RingMap k a
@ -230,7 +207,7 @@ takeEntriesUntil_ :: (Integral i, Bounded k, Ord k)
-> Maybe i -- possible number limit -> Maybe i -- possible number limit
-> [a] -> [a]
-> [a] -> [a]
takeEntriesUntil_ _rmap' _getterFunc' _havingReached _previousEntry (Just remaining) takeAcc takeEntriesUntil_ rmap' getterFunc' havingReached previousEntry (Just remaining) takeAcc
-- length limit reached -- length limit reached
| remaining <= 0 = takeAcc | remaining <= 0 = takeAcc
takeEntriesUntil_ rmap' getterFunc' havingReached previousEntry numLimit takeAcc = takeEntriesUntil_ rmap' getterFunc' havingReached previousEntry numLimit takeAcc =