Compare commits

..

49 commits

Author SHA1 Message Date
Trolli Schmittlauch b46f66e2c0 update Readme with latest branch name and pointer to SocialHub 2021-08-16 20:12:53 +02:00
Trolli Schmittlauch ea14ff9b09 update ghc to 8.6.4, nixpkgs base to 20.09
- relaxes some version constraints as dirty update quickfix
- removes hie integration as that project is abandoned, todo: switch to
  haskell-languageserver instead
2021-01-01 14:33:03 +01:00
Trolli Schmittlauch 9d8df6d3d8 make the multithread-runtime use all cores by default 2020-10-02 02:36:02 +02:00
Trolli Schmittlauch d7355aa04d increase HTTP timeout for initial post publication to 60 seconds
After a while, experiments made some publication events time-out.
Increasing the timeout just in case, although it i likely to be a mere symptom but the core fault.
2020-09-22 19:47:39 +02:00
Trolli Schmittlauch d8b2186016 make inclusion of HIE overlay conditional as well 2020-09-21 22:16:02 +02:00
Trolli Schmittlauch b48b7a7bba Merge branch 'measurement_logging' into mainline
closes #11, implements #60
2020-09-17 02:19:04 +02:00
Trolli Schmittlauch eee40ce4fb add log messages for failed relays as well 2020-09-17 02:17:57 +02:00
Trolli Schmittlauch 556b69d887 increase subscription lease to 1 (simulated) day for achieving higher subscriber numbers 2020-09-16 16:07:39 +02:00
Trolli Schmittlauch f5de7601bb do not store published posts for reducing memory consumption 2020-09-16 13:49:26 +02:00
Trolli Schmittlauch a2f268d374 improve logging: line buffering, time stamps
contributes to #60
2020-09-16 01:54:50 +02:00
Trolli Schmittlauch bb17b136d6 increase stabilise interval 2020-09-16 01:54:40 +02:00
Trolli Schmittlauch c036dea7f9 periodically purge expired subscriptions 2020-09-14 15:49:44 +02:00
Trolli Schmittlauch a0e7142a7d report number of subscriptions 2020-09-14 14:57:25 +02:00
Trolli Schmittlauch 3c28cde942 catch and print all Socket bind exceptions 2020-09-12 15:45:03 +02:00
Trolli Schmittlauch 1fc264a226 manage logging via file handle
reason: `appendFile` combined with lazy evaluation lead to exhaustion of
open file descriptors, as each file is opened again for each write and
due to lazy evaluation is kept open multiple times.
2020-09-12 12:37:41 +02:00
Trolli Schmittlauch da579a0756 decrease logging verbosity 2020-09-11 00:39:14 +02:00
Trolli Schmittlauch e12d8ef70a properly format stats log numbers: no e-notation 2020-09-11 00:39:14 +02:00
Trolli Schmittlauch 0f9727c05a log the post rates instead of the absolute sums 2020-09-10 22:41:06 +02:00
Trolli Schmittlauch 34ecdd66e1 make stats measurement delay configurable, take speedup into account 2020-09-10 21:23:33 +02:00
Trolli Schmittlauch 8f917130c4 tag normalisation includes lower case conversion 2020-09-10 13:14:48 +02:00
Trolli Schmittlauch 3ac89d301c bugfix: subscribe as default if not subscribed yet, when posting to a tag 2020-09-10 13:14:23 +02:00
Trolli Schmittlauch 3c76544afb launch background worker threads 2020-09-10 12:00:17 +02:00
Trolli Schmittlauch f8d30d0cc4 report post fetches to statistics 2020-09-09 19:55:34 +02:00
Trolli Schmittlauch 620e998876 report incoming relay posts to statistics 2020-09-09 19:25:48 +02:00
Trolli Schmittlauch 85d10f6773 report published posts to statistics 2020-09-09 18:50:55 +02:00
Trolli Schmittlauch e3a8912360 process incoming posts in parallel 2020-09-09 18:50:45 +02:00
Trolli Schmittlauch 12fcd13754 annotate the PostService server/ request-handler functions 2020-09-09 18:01:51 +02:00
Trolli Schmittlauch 72eca0f4fe log metrics to file
contributes to #60
2020-09-09 17:22:20 +02:00
Trolli Schmittlauch 0ffe9effc0 refactor relay processing to STM-retry instead of busy-wait 2020-09-09 14:24:34 +02:00
Trolli Schmittlauch 2b39648a77 actually implement simple relaying of posts
was still missing for #41
2020-09-09 11:51:09 +02:00
Trolli Schmittlauch df479982fa make RingMap instance of Functor and Foldable 2020-09-08 08:46:36 +02:00
Trolli Schmittlauch c536994afe re-format Servant client pattern matching 2020-09-07 16:35:59 +02:00
Trolli Schmittlauch 5c338b9cd7 split up stats summing and evaluating, launch threads 2020-09-07 16:27:56 +02:00
Trolli Schmittlauch c823e6357a accumulate all statistic/ measurement events to a measurement summary
- RingMap can now be mapped over
2020-09-07 13:00:15 +02:00
Trolli Schmittlauch 6b166ac4ca fixup! Merge branch 'measurement_logging' into mainline 2020-09-07 10:32:58 +02:00
Trolli Schmittlauch 4d2d6faf1b Merge pull request 'Improve general readability' (#69) from Hecate/Hash2Pub:readability into mainline
brings several readability enhancements:

- Text is always unqualified
- The Servant API declaration is moved to its own file
- Redundant imports are removed
2020-09-05 16:27:23 +02:00
Hécate 7d833e064b Improve readability 2020-09-05 15:45:47 +02:00
Trolli Schmittlauch fa78c6fc43 clarify different nix-shell environments in readme 2020-09-05 15:01:46 +02:00
Hécate d3e5eac5c5 Unsused imports and syntax error 2020-09-05 12:41:18 +02:00
Trolli Schmittlauch 60f5780742 Merge branch 'measurement_logging' into mainline
more values are parametrised through FediChordConfig
2020-09-05 12:31:40 +02:00
Trolli Schmittlauch c9b0e66110 scale request timeout with speedup and pass it directly to function 2020-09-05 12:30:31 +02:00
Trolli Schmittlauch 4f08d33d2e make all delays configurable and scale them according to a speedup factor 2020-09-05 12:30:31 +02:00
Trolli Schmittlauch 20050654bc make passing bootstrap information optional
reason: allow the first node to start without having to wait for a
timeout
part of #58
2020-09-03 11:20:38 +02:00
Trolli Schmittlauch 59beb3441f instrumentation script executes the prepared schedule
- reads CSV schedule from file
- sends the given schedule of post events
- not thoroughly tested yet

implements #59
2020-09-02 21:37:01 +02:00
Trolli Schmittlauch 1aee41db88 enable compiler optimisation 2020-08-31 13:37:40 +02:00
Trolli Schmittlauch 3c1652d86d prototype instrumentation of periodically sending posts to a test setup of 3 nodes
- contributes to #59
2020-08-27 12:01:08 +02:00
Trolli Schmittlauch 1b5fc039b3 Merge branch 'fix_networking': fix some DHT joins and crashes
several major flaws in the basic DHT communication had been discovered,
see individual commits for details
2020-08-27 00:39:08 +02:00
Trolli Schmittlauch 1a962f1500 stylish run 2020-08-27 00:33:19 +02:00
Trolli Schmittlauch 70145bc544 bump nixpkgs revision 2020-08-20 15:59:04 +02:00
13 changed files with 610 additions and 264 deletions

View file

@ -46,8 +46,8 @@ category: Network
extra-source-files: CHANGELOG.md
common deps
build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms, http-client, http-types, unbounded-delays
ghc-options: -Wall -Wpartial-fields
build-depends: base >=4, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=3.1, time, cmdargs ^>= 0.10, cryptonite, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms, http-client, http-types, unbounded-delays, dlist, formatting
ghc-options: -Wall -Wpartial-fields -O2
@ -58,7 +58,7 @@ library
exposed-modules: Hash2Pub.FediChord, Hash2Pub.FediChordTypes, Hash2Pub.DHTProtocol, Hash2Pub.ASN1Coding, Hash2Pub.ProtocolTypes, Hash2Pub.PostService, Hash2Pub.RingMap
-- Modules included in this library but not exported.
other-modules: Hash2Pub.Utils
other-modules: Hash2Pub.Utils, Hash2Pub.PostService.API
-- LANGUAGE extensions used by modules in this package.
other-extensions: GeneralizedNewtypeDeriving, DataKinds, OverloadedStrings
@ -91,7 +91,7 @@ executable Hash2Pub
-- Base language which the package is written in.
default-language: Haskell2010
ghc-options: -threaded
ghc-options: -threaded -rtsopts -with-rtsopts=-N
executable Experiment
-- experiment runner

View file

@ -1,7 +1,7 @@
# Hash2Pub
***This is heavily WIP and does not provide any useful functionality yet***.
I aim for always having the master branch at a state where it builds and tests pass.
I aim for always having the `mainline` branch in a state where it builds and tests pass.
A fully-decentralised relay for global hashtag federation in [ActivityPub](https://activitypub.rocks) based on a distributed hash table.
It allows querying and subscribing to all posts of a certain hashtag and is implemented in Haskell.
@ -10,8 +10,13 @@ This is the practical implementation of the concept presented in the paper [Dece
The ASN.1 module schema used for DHT messages can be found in `FediChord.asn1`.
For further questions and discussins, please refer to the **Hash2Pub topic in [SocialHub](https://socialhub.activitypub.rocks/c/software/hash2pub/48)**.
## Building
The project and its developent environment are built with [Nix](https://nixos.org/nix/).
The development environment can be entered with `nix-shell`. Then the project can be built with `cabal build` from within the environment, or using `nix-shell --command "cabal build"` to do both steps at once.
The development environment can be entered with `nix-shell shell-minimal.nix`. Then the project can be built with `cabal build` from within the environment, or using `nix-shell --command "cabal build" shell-minimal.nix` to do both steps at once.
While the `shell-minimal.nix` environment contains everything necessary for building and testing this project, the `shell.nix` additionally contains the Haskell IDE engine *hie* and the documentation for all used Haskell packages for more convenient development.
Be aware that these need to be build from source and can take a very long time to build.

View file

@ -1,44 +1,51 @@
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE OverloadedStrings #-}
module Main where
import System.Random
import Control.Concurrent
import Control.Monad (forM_)
import Control.Monad.State.Class
import Control.Monad.State.Strict (evalStateT)
import Control.Monad.IO.Class
import qualified Network.HTTP.Client as HTTP
import Control.Concurrent
import Control.Monad (forM_)
import qualified Data.Text.Lazy as Txt
import qualified Data.Text.Lazy.IO as TxtI
import qualified Network.HTTP.Client as HTTP
import System.Environment (getArgs)
import Hash2Pub.PostService (clientPublishPost, Hashtag)
import Hash2Pub.PostService (Hashtag, clientPublishPost)
-- placeholder post data definition
tagsToPostTo = [ "JustSomeTag", "WantAnotherTag234", "HereWeGoAgain", "Oyä", "通信端末" ]
knownRelays :: [(String, Int)]
knownRelays =
[ ("animalliberation.social", 3342)
, ("hostux.social", 3343)
, ("social.diskseven.com", 3344)
, ("social.imirhil.fr", 3345)
]
-- configuration constants
timelineFile = "../simulationData/inputs/generated/timeline_sample.csv"
main :: IO ()
main = do
-- initialise HTTP manager
httpMan <- HTTP.newManager HTTP.defaultManagerSettings
-- initialise RNG
let initRGen = mkStdGen 12
-- cycle through tags and post to a random instance
evalStateT (forM_ (cycle tagsToPostTo) $ publishPostRandom httpMan) initRGen
-- wait for a specified time
-- read CLI parameters
speedupStr : _ <- getArgs
-- read and parse timeline schedule
-- relying on lazyness of HaskellIO, hoping it does not introduce too strong delays
postEvents <- parseSchedule <$> TxtI.readFile timelineFile
-- actually schedule and send the post events
executeSchedule (read speedupStr) postEvents
pure ()
publishPostRandom :: (RandomGen g, MonadIO m, MonadState g m) => HTTP.Manager -> Hashtag -> m ()
publishPostRandom httpman tag = do
index <- state $ randomR (0, length knownRelays - 1)
let (pubHost, pubPort) = knownRelays !! index
_ <- liftIO . forkIO $ do
postResult <- liftIO $ clientPublishPost httpman pubHost pubPort ("foobar #" <> tag)
either putStrLn (const $ pure ()) postResult
liftIO $ threadDelay 500
parseSchedule :: Txt.Text
-> [(Int, Hashtag, (String, Int))] -- ^ [(delay in microseconds, hashtag, (hostname, port))]
parseSchedule = fmap (parseEntry . Txt.split (== ';')) . Txt.lines
where
parseEntry [delayT, contactT, tag] =
(read $ Txt.unpack delayT, tag, read $ Txt.unpack contactT)
parseEntry entry = error $ "invalid schedule input format: " <> show entry
executeSchedule :: Int -- ^ speedup factor
-> [(Int, Hashtag, (String, Int))] -- ^ [(delay in microseconds, hashtag, (hostname, port))]
-> IO ()
executeSchedule speedup events = do
-- initialise HTTP manager
httpMan <- HTTP.newManager $ HTTP.defaultManagerSettings { HTTP.managerResponseTimeout = HTTP.responseTimeoutMicro 60000000 }
forM_ events $ \(delay, tag, (pubHost, pubPort)) -> do
_ <- forkIO $
clientPublishPost httpMan pubHost pubPort ("foobar #" <> tag)
>>= either putStrLn (const $ pure ())
-- while threadDelay gives only minimum delay guarantees, let's hope the
-- additional delays are negligible
-- otherwise: evaluate usage of https://hackage.haskell.org/package/schedule-0.3.0.0/docs/Data-Schedule.html
threadDelay $ delay `div` speedup

View file

@ -45,21 +45,36 @@ main = do
readConfig :: IO (FediChordConf, ServiceConf)
readConfig = do
confDomainString : ipString : portString : bootstrapHost : bootstrapPortString : servicePortString : speedup : _ <- getArgs
confDomainString : ipString : portString : servicePortString : speedupString : remainingArgs <- getArgs
-- allow starting the initial node without bootstrapping info to avoid
-- waiting for timeout
let
fConf = FediChordConf {
confDomain = confDomainString
, confIP = toHostAddress6 . read $ ipString
, confDhtPort = read portString
, confBootstrapNodes = [(bootstrapHost, read bootstrapPortString)]
--, confStabiliseInterval = 60
, confBootstrapSamplingInterval = 180
, confMaxLookupCacheAge = 300
}
sConf = ServiceConf {
confSubscriptionExpiryTime = fromIntegral $ 2*3600 `div` (read speedup :: Integer)
, confServicePort = read servicePortString
, confServiceHost = confDomainString
}
speedup = read speedupString
confBootstrapNodes' = case remainingArgs of
bootstrapHost : bootstrapPortString : _ ->
[(bootstrapHost, read bootstrapPortString)]
_ -> []
fConf = FediChordConf
{ confDomain = confDomainString
, confIP = toHostAddress6 . read $ ipString
, confDhtPort = read portString
, confBootstrapNodes = confBootstrapNodes'
, confStabiliseInterval = 80 * 10^6
, confBootstrapSamplingInterval = 180 * 10^6 `div` speedup
, confMaxLookupCacheAge = 300 / fromIntegral speedup
, confJoinAttemptsInterval = 60 * 10^6 `div` speedup
, confMaxNodeCacheAge = 600 / fromIntegral speedup
, confResponsePurgeAge = 60 / fromIntegral speedup
, confRequestTimeout = 5 * 10^6 `div` speedup
, confRequestRetries = 3
}
sConf = ServiceConf
{ confSubscriptionExpiryTime = 24*3600 / fromIntegral speedup
, confServicePort = read servicePortString
, confServiceHost = confDomainString
, confLogfilePath = "../simulationData/logs/" <> confDomainString <> ".log"
, confSpeedupFactor = speedup
, confStatsEvalDelay = 120 * 10^6 `div` speedup
}
pure (fConf, sConf)

View file

@ -1,26 +1,18 @@
{
compiler ? "ghc865",
withHIE ? false
compiler ? "ghc884"
}:
let
# pin all-hies for getting the language server
all-hies = fetchTarball {
url = "https://github.com/infinisil/all-hies/tarball/b8fb659620b99b4a393922abaa03a1695e2ca64d";
sha256 = "sha256:0br6wsqpfk1lzz90f7zw439w1ir2p54268qilw9l2pk6yz7ganfx";
};
pkgs = import (
builtins.fetchGit {
name = "nixpkgs-pinned";
url = https://github.com/NixOS/nixpkgs/;
ref = "refs/heads/release-20.03";
rev = "de3780b937d2984f9b5e20d191f23be4f857b3aa";
ref = "refs/heads/release-20.09";
rev = "e065200fc90175a8f6e50e76ef10a48786126e1c";
}) {
# Pass no config for purity
config = {};
overlays = [
(import all-hies {}).overlay
];
overlays = [];
};
hp = pkgs.haskell.packages."${compiler}";
src = pkgs.nix-gitignore.gitignoreSource [] ./.;
@ -38,7 +30,6 @@ in
hlint
stylish-haskell
pkgs.python3Packages.asn1ate
]
++ (if withHIE then [ hie ] else []);
];
};
}

View file

@ -1 +1 @@
(import ./default.nix {withHIE = true;}).shell
(import ./default.nix {}).shell

View file

@ -49,8 +49,8 @@ import Control.Concurrent.STM.TQueue
import Control.Concurrent.STM.TVar
import Control.Exception
import Control.Monad (foldM, forM, forM_, void, when)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Except (MonadError(..), runExceptT)
import Control.Monad.Except (MonadError (..), runExceptT)
import Control.Monad.IO.Class (MonadIO (..))
import qualified Data.ByteString as BS
import Data.Either (rights)
import Data.Foldable (foldl', foldr', foldrM)
@ -488,10 +488,11 @@ requestJoin :: (NodeState a, Service s (RealNodeSTM s)) => a -- ^ cu
requestJoin toJoinOn ownStateSTM = do
ownState <- readTVarIO ownStateSTM
prn <- readTVarIO $ parentRealNode ownState
srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ownState)
nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ownState)
let srcAddr = confIP nodeConf
bracket (mkSendSocket srcAddr (getDomain toJoinOn) (getDhtPort toJoinOn)) close (\sock -> do
-- extract own state for getting request information
responses <- sendRequestTo (\rid -> Request rid (toRemoteNodeState ownState) 1 True Join (Just JoinRequestPayload)) sock
responses <- sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid -> Request rid (toRemoteNodeState ownState) 1 True Join (Just JoinRequestPayload)) sock
(cacheInsertQ, joinedState) <- atomically $ do
stateSnap <- readTVar ownStateSTM
let
@ -516,7 +517,7 @@ requestJoin toJoinOn ownStateSTM = do
([], Set.empty, Set.empty)
responses
-- sort, slice and set the accumulated successors and predecessors
-- the contacted node itself is a successor as well and, with few
-- the contacted node itself is a successor as well and, with few
-- nodes, can be a predecessor as well
newState = setSuccessors (toRemoteNodeState toJoinOn:Set.elems succAccSet) . setPredecessors (toRemoteNodeState toJoinOn:Set.elems predAccSet) $ stateSnap
writeTVar ownStateSTM newState
@ -584,10 +585,10 @@ sendQueryIdMessages targetID ns lParam targets = do
-- create connected sockets to all query targets and use them for request handling
srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns)
-- ToDo: make attempts and timeout configurable
nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns)
let srcAddr = confIP nodeConf
queryThreads <- mapM (\resultNode -> async $ bracket (mkSendSocket srcAddr (getDomain resultNode) (getDhtPort resultNode)) close (
sendRequestTo (lookupMessage targetID ns Nothing)
sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (lookupMessage targetID ns Nothing)
)) targets
-- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613
-- ToDo: exception handling, maybe log them
@ -596,7 +597,7 @@ sendQueryIdMessages targetID ns lParam targets = do
now <- getPOSIXTime
-- collect cache entries from all responses
foldrM (\resp acc -> do
let
let
responseResult = queryResult <$> payload resp
entrySet = case responseResult of
Just (FOUND result1) -> Set.singleton (RemoteCacheEntry result1 now)
@ -609,14 +610,14 @@ sendQueryIdMessages targetID ns lParam targets = do
pure $ case acc of
-- once a FOUND as been encountered, return this as a result
FOUND{} -> acc
FORWARD accSet
FORWARD accSet
| maybe False isFound responseResult -> fromJust responseResult
| otherwise -> FORWARD $ entrySet `Set.union` accSet
) (FORWARD Set.empty) responses
where
isFound FOUND{} = True
isFound _ = False
isFound _ = False
-- | Create a QueryID message to be supplied to 'sendRequestTo'
lookupMessage :: Integral i
@ -635,8 +636,9 @@ requestStabilise :: LocalNodeState s -- ^ sending node
-> RemoteNodeState -- ^ neighbour node to send to
-> IO (Either String ([RemoteNodeState], [RemoteNodeState])) -- ^ (predecessors, successors) of responding node
requestStabilise ns neighbour = do
srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns)
responses <- bracket (mkSendSocket srcAddr (getDomain neighbour) (getDhtPort neighbour)) close (fmap Right . sendRequestTo (\rid ->
nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns)
let srcAddr = confIP nodeConf
responses <- bracket (mkSendSocket srcAddr (getDomain neighbour) (getDhtPort neighbour)) close (fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid ->
Request {
requestID = rid
, sender = toRemoteNodeState ns
@ -673,13 +675,14 @@ requestLeave :: LocalNodeState s
-> RemoteNodeState -- target node
-> IO (Either String ()) -- error or success
requestLeave ns doMigration target = do
srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns)
let leavePayload = LeaveRequestPayload {
nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns)
let srcAddr = confIP nodeConf
leavePayload = LeaveRequestPayload {
leaveSuccessors = successors ns
, leavePredecessors = predecessors ns
, leaveDoMigration = doMigration
}
responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close (fmap Right . sendRequestTo (\rid ->
responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close (fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid ->
Request {
requestID = rid
, sender = toRemoteNodeState ns
@ -701,10 +704,11 @@ requestPing :: LocalNodeState s -- ^ sending node
-> RemoteNodeState -- ^ node to be PINGed
-> IO (Either String [RemoteNodeState]) -- ^ all active vServers of the pinged node
requestPing ns target = do
srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns)
nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns)
let srcAddr = confIP nodeConf
responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close
(\sock -> do
resp <- sendRequestTo (\rid ->
resp <- sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid ->
Request {
requestID = rid
, sender = toRemoteNodeState ns
@ -740,22 +744,14 @@ requestPing ns target = do
) responses
-- | 'sendRequestToWithParams' with default timeout and retries already specified.
-- Generic function for sending a request over a connected socket and collecting the response.
-- Serialises the message and tries to deliver its parts for a number of attempts within a default timeout.
sendRequestTo :: (Integer -> FediChordMessage) -- ^ the message to be sent, still needing a requestID
-> Socket -- ^ connected socket to use for sending
-> IO (Set.Set FediChordMessage) -- ^ responses
sendRequestTo = sendRequestToWithParams 5000 3
-- | Generic function for sending a request over a connected socket and collecting the response.
-- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout.
sendRequestToWithParams :: Int -- ^ timeout in milliseconds
-> Int -- ^ number of retries
-> (Integer -> FediChordMessage) -- ^ the message to be sent, still needing a requestID
-> Socket -- ^ connected socket to use for sending
-> IO (Set.Set FediChordMessage) -- ^ responses
sendRequestToWithParams timeoutMillis numAttempts msgIncomplete sock = do
sendRequestTo :: Int -- ^ timeout in milliseconds
-> Int -- ^ number of retries
-> (Integer -> FediChordMessage) -- ^ the message to be sent, still needing a requestID
-> Socket -- ^ connected socket to use for sending
-> IO (Set.Set FediChordMessage) -- ^ responses
sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do
-- give the message a random request ID
randomID <- randomRIO (0, 2^32-1)
let
@ -764,7 +760,7 @@ sendRequestToWithParams timeoutMillis numAttempts msgIncomplete sock = do
-- create a queue for passing received response messages back, even after a timeout
responseQ <- newTBQueueIO $ 2*maximumParts -- keep room for duplicate packets
-- start sendAndAck with timeout
attempts numAttempts . timeout (timeoutMillis*1000) $ sendAndAck responseQ sock requests
_ <- attempts numAttempts . timeout (timeoutMillis*1000) $ sendAndAck responseQ sock requests
-- after timeout, check received responses, delete them from unacked message set/ map and rerun senAndAck with that if necessary.
recvdParts <- atomically $ flushTBQueue responseQ
pure $ Set.fromList recvdParts
@ -869,7 +865,7 @@ mkServerSocket ip port = do
sockAddr <- addrAddress <$> resolve (Just $ show . fromHostAddress6 $ ip) (Just port)
sock <- socket AF_INET6 Datagram defaultProtocol
setSocketOption sock IPv6Only 1
bind sock sockAddr
bind sock sockAddr `catch` (\e -> putStrLn $ "Caught exception while bind " <> show sock <> " " <> show sockAddr <> ": " <> show (e :: SomeException))
pure sock
-- | create a UDP datagram socket, connected to a destination.
@ -885,6 +881,6 @@ mkSendSocket srcIp dest destPort = do
setSocketOption sendSock IPv6Only 1
-- bind to the configured local IP to make sure that outgoing packets are sent from
-- this source address
bind sendSock srcAddr
bind sendSock srcAddr `catch` (\e -> putStrLn $ "Caught exception while mkSendSocket bind " <> show sendSock <> " " <> show srcAddr <> ": " <> show (e :: SomeException))
connect sendSock destAddr
pure sendSock

View file

@ -199,7 +199,7 @@ convergenceSampleThread nsSTM = forever $ do
-- unjoined node: try joining through all bootstrapping nodes
else tryBootstrapJoining nsSTM >> pure ()
let delaySecs = confBootstrapSamplingInterval . nodeConfig $ parentNode
threadDelay $ delaySecs * 10^6
threadDelay delaySecs
-- | Try joining the DHT through any of the bootstrapping nodes until it succeeds.
@ -223,10 +223,11 @@ tryBootstrapJoining nsSTM = do
bootstrapQueryId :: LocalNodeStateSTM s -> (String, PortNumber) -> NodeID -> IO (Either String RemoteNodeState)
bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do
ns <- readTVarIO nsSTM
srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns)
nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns)
let srcAddr = confIP nodeConf
bootstrapResponse <- bracket (mkSendSocket srcAddr bootstrapHost bootstrapPort) close (
-- Initialise an empty cache only with the responses from a bootstrapping node
fmap Right . sendRequestTo (lookupMessage targetID ns Nothing)
fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (lookupMessage targetID ns Nothing)
)
`catch` (\e -> pure . Left $ "Error at bootstrap QueryId: " <> displayException (e :: IOException))
@ -310,12 +311,13 @@ joinOnNewEntriesThread nsSTM = loop
where
loop = do
nsSnap <- readTVarIO nsSTM
(lookupResult, cache) <- atomically $ do
(lookupResult, parentNode) <- atomically $ do
cache <- readTVar $ nodeCacheSTM nsSnap
parentNode <- readTVar $ parentRealNode nsSnap
case queryLocalCache nsSnap cache 1 (getNid nsSnap) of
-- empty cache, block until cache changes and then retry
(FORWARD s) | Set.null s -> retry
result -> pure (result, cache)
result -> pure (result, parentNode)
case lookupResult of
-- already joined
FOUND _ ->
@ -325,8 +327,7 @@ joinOnNewEntriesThread nsSTM = loop
joinResult <- runExceptT $ fediChordVserverJoin nsSTM
either
-- on join failure, sleep and retry
-- TODO: make delay configurable
(const $ threadDelay (30 * 10^6) >> loop)
(const $ threadDelay (confJoinAttemptsInterval . nodeConfig $ parentNode) >> loop)
(const $ pure ())
joinResult
@ -341,20 +342,15 @@ nodeCacheWriter nsSTM =
modifyTVar' (nodeCacheSTM ns) cacheModifier
-- TODO: make max entry age configurable
maxEntryAge :: POSIXTime
maxEntryAge = 600
-- | Periodically iterate through cache, clean up expired entries and verify unverified ones
nodeCacheVerifyThread :: LocalNodeStateSTM s -> IO ()
nodeCacheVerifyThread nsSTM = forever $ do
putStrLn "cache verify run: begin"
-- get cache
(ns, cache) <- atomically $ do
(ns, cache, maxEntryAge) <- atomically $ do
ns <- readTVar nsSTM
cache <- readTVar $ nodeCacheSTM ns
pure (ns, cache)
maxEntryAge <- confMaxNodeCacheAge . nodeConfig <$> readTVar (parentRealNode ns)
pure (ns, cache, maxEntryAge)
-- iterate entries:
-- for avoiding too many time syscalls, get current time before iterating.
now <- getPOSIXTime
@ -401,8 +397,7 @@ nodeCacheVerifyThread nsSTM = forever $ do
forkIO $ sendQueryIdMessages targetID latestNs (Just (1 + jEntriesPerSlice latestNs)) (nodesToQuery targetID) >> pure () -- ask for 1 entry more than j because of querying the middle
)
putStrLn "cache verify run: end"
threadDelay $ 10^6 * round maxEntryAge `div` 20
threadDelay $ fromEnum (maxEntryAge / 20) `div` 10^6 -- convert from pico to milliseconds
-- | Checks the invariant of at least @jEntries@ per cache slice.
@ -468,7 +463,6 @@ stabiliseThread :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> IO ()
stabiliseThread nsSTM = forever $ do
oldNs <- readTVarIO nsSTM
putStrLn "stabilise run: begin"
-- iterate through the same snapshot, collect potential new neighbours
-- and nodes to be deleted, and modify these changes only at the end of
@ -504,7 +498,7 @@ stabiliseThread nsSTM = forever $ do
forM_ [(length $ predecessors updatedNs)..(kNeighbours updatedNs)] (\_ -> do
ns' <- readTVarIO nsSTM
nextEntry <- runExceptT . requestQueryID ns' $ pred . getNid $ lastDef (toRemoteNodeState ns') (predecessors ns')
either
either
(const $ pure ())
(\entry -> atomically $ do
latestNs <- readTVar nsSTM
@ -547,9 +541,8 @@ stabiliseThread nsSTM = forever $ do
)
newPredecessor
putStrLn "stabilise run: end"
-- TODO: make delay configurable
threadDelay (60 * 10^6)
stabiliseDelay <- confStabiliseInterval . nodeConfig <$> readTVarIO (parentRealNode newNs)
threadDelay stabiliseDelay
where
-- | send a stabilise request to the n-th neighbour
-- (specified by the provided getter function) and on failure retry
@ -636,19 +629,15 @@ type RequestMap = Map.Map (SockAddr, Integer) RequestMapEntry
data RequestMapEntry = RequestMapEntry (Set.Set FediChordMessage) (Maybe Integer)
POSIXTime
-- TODO: make purge age configurable
-- | periodically clean up old request parts
responsePurgeAge :: POSIXTime
responsePurgeAge = 60 -- seconds
requestMapPurge :: MVar RequestMap -> IO ()
requestMapPurge mapVar = forever $ do
requestMapPurge :: POSIXTime -> MVar RequestMap -> IO ()
requestMapPurge purgeAge mapVar = forever $ do
rMapState <- takeMVar mapVar
now <- getPOSIXTime
putMVar mapVar $ Map.filter (\(RequestMapEntry _ _ ts) ->
now - ts < responsePurgeAge
now - ts < purgeAge
) rMapState
threadDelay $ round responsePurgeAge * 2 * 10^6
threadDelay $ (fromEnum purgeAge * 2) `div` 10^6
-- | Wait for messages, deserialise them, manage parts and acknowledgement status,
@ -663,12 +652,13 @@ fediMessageHandler sendQ recvQ nsSTM = do
-- not change.
-- Other functions are passed the nsSTM reference and thus can get the latest state.
nsSnap <- readTVarIO nsSTM
nodeConf <- nodeConfig <$> readTVarIO (parentRealNode nsSnap)
-- handling multipart messages:
-- Request parts can be insert into a map (key: (sender IP against spoofing, request ID), value: timestamp + set of message parts, handle all of them when size of set == parts) before being handled. This map needs to be purged periodically by a separate thread and can be protected by an MVar for fairness.
requestMap <- newMVar (Map.empty :: RequestMap)
-- run receive loop and requestMapPurge concurrently, so that an exception makes
-- both of them fail
concurrently_ (requestMapPurge requestMap) $ forever $ do
concurrently_ (requestMapPurge (confResponsePurgeAge nodeConf) requestMap) $ forever $ do
-- wait for incoming messages
(rawMsg, sourceAddr) <- atomically $ readTQueue recvQ
let aMsg = deserialiseMessage rawMsg
@ -782,7 +772,7 @@ updateLookupCache nodeSTM keyToLookup = do
-- TODO: better retry management, because having no vserver joined yet should
-- be treated differently than other reasons for not getting a result.
newResponsible <- runExceptT $ requestQueryID n keyToLookup
either
either
(const $ pure Nothing)
(\result -> do
let newEntry = (getDomain result, getServicePort result)
@ -807,4 +797,4 @@ lookupCacheCleanup nodeSTM = do
now - ts < confMaxLookupCacheAge (nodeConfig node)
)
)
threadDelay $ round (confMaxLookupCacheAge $ nodeConfig node) * (10^5)
threadDelay $ fromEnum (2 * confMaxLookupCacheAge (nodeConfig node)) `div` 10^6

View file

@ -411,10 +411,22 @@ data FediChordConf = FediChordConf
-- ^ listening port for the FediChord DHT
, confBootstrapNodes :: [(String, PortNumber)]
-- ^ list of potential bootstrapping nodes
, confStabiliseInterval :: Int
-- ^ pause between stabilise runs, in milliseconds
, confBootstrapSamplingInterval :: Int
-- ^ pause between sampling the own ID through bootstrap nodes, in seconds
-- ^ pause between sampling the own ID through bootstrap nodes, in milliseconds
, confMaxLookupCacheAge :: POSIXTime
-- ^ maximum age of lookup cache entries in seconds
-- ^ maximum age of key lookup cache entries in seconds
, confJoinAttemptsInterval :: Int
-- ^ interval between join attempts on newly learned nodes, in milliseconds
, confMaxNodeCacheAge :: POSIXTime
-- ^ maximum age of entries in the node cache, in milliseconds
, confResponsePurgeAge :: POSIXTime
-- ^ maximum age of message parts in response part cache, in seconds
, confRequestTimeout :: Int
-- ^ how long to wait until response has arrived, in milliseconds
, confRequestRetries :: Int
-- ^ how often re-sending a timed-out request can be retried
}
deriving (Show, Eq)
@ -445,6 +457,12 @@ data ServiceConf = ServiceConf
-- ^ listening port for service
, confServiceHost :: String
-- ^ hostname of service
, confLogfilePath :: String
-- ^ where to store the (measurement) log file
, confStatsEvalDelay :: Int
-- ^ delay between statistic rate measurement samplings, in microseconds
, confSpeedupFactor :: Int
-- While the speedup factor needs to be already included in all
}
class DHT d where

View file

@ -11,33 +11,42 @@ module Hash2Pub.PostService where
import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Exception (Exception (..), try)
import Control.Monad (foldM, forM, forM_, forever, when, void)
import Control.Monad.IO.Class (liftIO)
import Control.Exception (Exception (..), try)
import Control.Monad (foldM, forM, forM_, forever, unless,
void, when)
import Control.Monad.IO.Class (liftIO)
import Data.Bifunctor
import qualified Data.ByteString.Lazy.UTF8 as BSUL
import qualified Data.ByteString.UTF8 as BSU
import qualified Data.HashMap.Strict as HMap
import qualified Data.HashSet as HSet
import Data.Maybe (fromMaybe, isJust)
import Data.String (fromString)
import qualified Data.Text.Lazy as Txt
import Data.Text.Normalize (NormalizationMode (NFC),
normalize)
import qualified Data.ByteString.Lazy.UTF8 as BSUL
import qualified Data.ByteString.UTF8 as BSU
import qualified Data.DList as D
import Data.Either (lefts, rights)
import qualified Data.HashMap.Strict as HMap
import qualified Data.HashSet as HSet
import Data.Maybe (fromJust, isJust)
import Data.String (fromString)
import Data.Text.Lazy (Text)
import qualified Data.Text.Lazy as Txt
import qualified Data.Text.Lazy.IO as TxtI
import Data.Text.Normalize (NormalizationMode (NFC), normalize)
import Data.Time.Clock.POSIX
import Data.Typeable (Typeable)
import qualified Network.HTTP.Client as HTTP
import qualified Network.HTTP.Types as HTTPT
import Data.Typeable (Typeable)
import qualified Network.HTTP.Client as HTTP
import qualified Network.HTTP.Types as HTTPT
import System.IO
import System.Random
import Text.Read (readEither)
import Text.Read (readEither)
import qualified Network.Wai.Handler.Warp as Warp
import Formatting (fixed, format, int, (%))
import qualified Network.Wai.Handler.Warp as Warp
import Servant
import Servant.Client
import Hash2Pub.FediChordTypes
import Hash2Pub.PostService.API
import Hash2Pub.RingMap
import Hash2Pub.Utils
import Debug.Trace
data PostService d = PostService
{ serviceConf :: ServiceConf
@ -48,19 +57,22 @@ data PostService d = PostService
-- ^ for each tag store the subscribers + their queue
, ownSubscriptions :: TVar (HMap.HashMap NodeID POSIXTime)
-- ^ tags subscribed by the own node have an assigned lease time
, ownPosts :: TVar (HSet.HashSet Txt.Text)
-- ^ just store the existence of posts for saving memory,
, relayInQueue :: TQueue (Hashtag, PostID, PostContent)
-- ^ Queue for processing incoming posts of own instance asynchronously
, postFetchQueue :: TQueue PostID
-- ^ queue of posts to be fetched
, migrationsInProgress :: TVar (HMap.HashMap NodeID (MVar ()))
, httpMan :: HTTP.Manager
, statsQueue :: TQueue StatsEvent
, loadStats :: TVar RelayStats
-- ^ current load stats, replaced periodically
, logFileHandle :: Handle
}
deriving (Typeable)
type Hashtag = Txt.Text
type PostID = Txt.Text
type PostContent = Txt.Text
type Hashtag = Text
type PostID = Text
type PostContent = Text
-- | For each handled tag, store its subscribers and provide a
-- broadcast 'TChan' for enqueuing posts
type RelayTags = RingMap NodeID (TagSubscribersSTM, TChan PostID, Hashtag)
@ -78,26 +90,38 @@ instance DHT d => Service PostService d where
threadVar <- newTVarIO =<< myThreadId -- own thread ID as placeholder
subscriberVar <- newTVarIO emptyRMap
ownSubsVar <- newTVarIO HMap.empty
ownPostVar <- newTVarIO HSet.empty
--ownPostVar <- newTVarIO HSet.empty
relayInQueue' <- newTQueueIO
postFetchQueue' <- newTQueueIO
migrationsInProgress' <- newTVarIO HMap.empty
httpMan' <- HTTP.newManager HTTP.defaultManagerSettings
statsQueue' <- newTQueueIO
loadStats' <- newTVarIO emptyStats
loggingFile <- openFile (confLogfilePath conf) WriteMode
hSetBuffering loggingFile LineBuffering
let
thisService = PostService {
serviceConf = conf
thisService = PostService
{ serviceConf = conf
, baseDHT = dht
, serviceThread = threadVar
, subscribers = subscriberVar
, ownSubscriptions = ownSubsVar
, ownPosts = ownPostVar
--, ownPosts = ownPostVar
, relayInQueue = relayInQueue'
, postFetchQueue = postFetchQueue'
, migrationsInProgress = migrationsInProgress'
, httpMan = httpMan'
}
, statsQueue = statsQueue'
, loadStats = loadStats'
, logFileHandle = loggingFile
}
port' = fromIntegral (confServicePort conf)
warpSettings = Warp.setPort port' . Warp.setHost (fromString . confServiceHost $ conf) $ Warp.defaultSettings
-- log a start message, this also truncates existing files
TxtI.hPutStrLn loggingFile $ Txt.unlines
[ "# Starting mock relay implementation"
, "#time stamp ; relay receive rate ;relay delivery rate ;instance publish rate ;instance fetch rate ;total subscriptions"
]
-- Run 'concurrently_' from another thread to be able to return the
-- 'PostService'.
-- Terminating that parent thread will make all child threads terminate as well.
@ -105,7 +129,11 @@ instance DHT d => Service PostService d where
concurrently_
-- web server
(Warp.runSettings warpSettings $ postServiceApplication thisService)
(processIncomingPosts thisService)
$ concurrently
-- background processing workers
(launchWorkerThreads thisService)
-- statistics/ measurements
(launchStatsThreads thisService)
-- update thread ID after fork
atomically $ writeTVar threadVar servThreadID
pure thisService
@ -131,38 +159,13 @@ postServiceApplication :: DHT d => PostService d -> Application
postServiceApplication serv = serve exposedPostServiceAPI $ postServer serv
-- | needed for guiding type inference
exposedPostServiceAPI :: Proxy PostServiceAPI
exposedPostServiceAPI = Proxy
-- ========= constants ===========
placeholderPost :: Txt.Text
placeholderPost :: Text
placeholderPost = Txt.take 5120 . Txt.repeat $ 'O' -- size 5KiB
-- ========= HTTP API and handlers =============
type PostServiceAPI = "relay" :> "inbox" :> Capture "hashtag" Txt.Text :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent
-- delivery endpoint at responsible relay for delivering posts of $tag for distribution
:<|> "relay" :> "subscribers" :> Capture "senderID" Integer :> ReqBody '[PlainText] Txt.Text :> PostNoContent '[PlainText] Txt.Text
-- endpoint for delivering the subscriptions and outstanding queue
:<|> "post" :> Capture "postid" Txt.Text :> Get '[PlainText] Txt.Text
-- fetch endpoint for posts, full post ID is http://$domain/post/$postid
:<|> "posts" :> ReqBody '[PlainText] Txt.Text :> Post '[PlainText] Txt.Text
-- endpoint for fetching multiple posts at once
:<|> "posts" :> "inbox" :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent
-- delivery endpoint of newly published posts of the relay's instance
:<|> "tags" :> Capture "hashtag" Txt.Text :> ReqBody '[PlainText] Txt.Text :> PostCreated '[PlainText] Txt.Text
-- delivery endpoint for posts of $tag at subscribing instance
:<|> "tags" :> Capture "hashtag" Txt.Text :> "subscribe" :> Header "Origin" Txt.Text :> Get '[PlainText] Integer
-- endpoint for subscribing the instance specified in
-- the Origin header to $hashtag.
-- Returns subscription lease time in seconds.
:<|> "tags" :> Capture "hashtag" Txt.Text :> "unsubscribe" :> Header "Origin" Txt.Text :> Get '[PlainText] Txt.Text
-- endpoint for unsubscribing the instance specified in
-- the Origin header to $hashtag
postServer :: DHT d => PostService d -> Server PostServiceAPI
postServer service = relayInbox service
:<|> subscriptionDelivery service
@ -174,7 +177,8 @@ postServer service = relayInbox service
:<|> tagUnsubscribe service
relayInbox :: DHT d => PostService d -> Hashtag -> Txt.Text -> Handler NoContent
-- | delivery endpoint: receive posts of a handled tag and enqueue them for relaying
relayInbox :: DHT d => PostService d -> Hashtag -> Text -> Handler NoContent
relayInbox serv tag posts = do
let
-- skip checking whether the post actually contains the tag, just drop full post
@ -190,8 +194,10 @@ relayInbox serv tag posts = do
-- if noone subscribed to the tag, nothing needs to be done
(pure ())
-- otherwise enqueue posts into broadcast queue of the tag
(\queue ->
(\queue -> do
liftIO $ forM_ postIDs (atomically . writeTChan queue)
-- report the received post for statistic purposes
liftIO . atomically . writeTQueue (statsQueue serv) $ StatsEvent RelayReceiveEvent (length postIDs) (hashtagToId tag)
)
broadcastChan
pure NoContent
@ -202,7 +208,8 @@ newtype UnhandledTagException = UnhandledTagException String
instance Exception UnhandledTagException
subscriptionDelivery :: DHT d => PostService d -> Integer -> Txt.Text -> Handler Txt.Text
-- | delivery endpoint: receives a list of subscribers of tags and their outstanding queues for migration
subscriptionDelivery :: DHT d => PostService d -> Integer -> Text -> Handler Text
subscriptionDelivery serv senderID subList = do
let
tagSubs = Txt.lines subList
@ -236,7 +243,7 @@ subscriptionDelivery serv senderID subList = do
Right _ -> pure ""
-- TODO: check and only accept tags in own (future?) responsibility
where
processTag :: TVar RelayTags -> Txt.Text -> STM ()
processTag :: TVar RelayTags -> Text -> STM ()
processTag subscriberSTM tagData = do
let
tag:subText:lease:posts:_ = Txt.splitOn "," tagData
@ -247,44 +254,47 @@ subscriptionDelivery serv senderID subList = do
enqueueSubscription subscriberSTM (normaliseTag tag) sub postList leaseTime
postFetch :: PostService d -> Txt.Text -> Handler Txt.Text
postFetch serv postID = do
postSet <- liftIO . readTVarIO . ownPosts $ serv
if HSet.member postID postSet
-- decision: always return the same placeholder post
then pure placeholderPost
else throwError $ err404 { errBody = "No post found with this ID" }
-- | endpoint for fetching a post by its ID
postFetch :: PostService d -> Text -> Handler Text
postFetch serv _ = do
-- decision: for saving memory do not store published posts, just
-- pretend there is a post for each requested ID
liftIO . atomically . writeTQueue (statsQueue serv) $ StatsEvent IncomingPostFetchEvent 1 0 -- tag fetched for is irrelevant
pure placeholderPost
postMultiFetch :: PostService d -> Txt.Text -> Handler Txt.Text
-- | endpoint for fetching multiple posts of this instance by their IDs
postMultiFetch :: PostService d -> Text -> Handler Text
postMultiFetch serv postIDs = do
let idList = Txt.lines postIDs
postSet <- liftIO . readTVarIO . ownPosts $ serv
-- look up existence of all given post IDs, fail if even one is missing
foldM (\response postID ->
if HSet.member postID postSet
then pure $ placeholderPost <> "\n" <> response
else throwError $ err404 { errBody = "No post found with this ID" }
let
idList = Txt.lines postIDs
-- decision: for saving memory do not store published posts, just
-- pretend there is a post for each requested ID
response = foldl (\response' _ ->
placeholderPost <> "\n" <> response'
) "" idList
liftIO . atomically . writeTQueue (statsQueue serv) $ StatsEvent IncomingPostFetchEvent (length idList) 0 -- tag fetched for is irrelevant
pure response
postInbox :: PostService d -> Txt.Text -> Handler NoContent
-- | delivery endpoint: inbox for initially publishing a post at an instance
postInbox :: PostService d -> Text -> Handler NoContent
postInbox serv post = do
-- extract contained hashtags
let
containedTags = fmap (normaliseTag . Txt.tail) . filter ((==) '#' . Txt.head) . Txt.words $ post
-- generate post ID
postId <- liftIO $ Txt.pack . show <$> (randomRIO (0, 2^(128::Integer)-1) :: IO Integer)
-- add ID to own posts
liftIO . atomically $ modifyTVar' (ownPosts serv) (HSet.insert postId)
-- decision: for saving memory do not store published post IDs, just deliver a post for any requested ID
-- enqueue a relay job for each tag
liftIO $ forM_ (containedTags :: [Txt.Text]) (\tag ->
liftIO $ forM_ (containedTags :: [Text]) (\tag ->
atomically $ writeTQueue (relayInQueue serv) (tag, postId, post)
)
pure NoContent
tagDelivery :: PostService d -> Txt.Text -> Txt.Text -> Handler Txt.Text
-- | delivery endpoint: receive postIDs of a certain subscribed hashtag
tagDelivery :: PostService d -> Text -> Text -> Handler Text
tagDelivery serv hashtag posts = do
let postIDs = Txt.lines posts
subscriptions <- liftIO . readTVarIO . ownSubscriptions $ serv
@ -295,7 +305,9 @@ tagDelivery serv hashtag posts = do
pure ()
pure $ "Received a postID for tag " <> hashtag
tagSubscribe :: DHT d => PostService d -> Txt.Text -> Maybe Txt.Text -> Handler Integer
-- | receive subscription requests to a handled hashtag
tagSubscribe :: DHT d => PostService d -> Text -> Maybe Text -> Handler Integer
tagSubscribe serv hashtag origin = do
responsible <- liftIO $ isResponsibleFor (baseDHT serv) (hashtagToId hashtag)
if not responsible
@ -311,10 +323,12 @@ tagSubscribe serv hashtag origin = do
let leaseTime = now + confSubscriptionExpiryTime (serviceConf serv)
-- setup subscription entry
_ <- liftIO . atomically $ setupSubscriberChannel (subscribers serv) hashtag (BSU.toString $ HTTP.host req, HTTP.port req) leaseTime
--liftIO . putStrLn $ "just got a subscription to " <> Txt.unpack hashtag
pure $ round leaseTime
tagUnsubscribe :: DHT d => PostService d -> Txt.Text -> Maybe Txt.Text -> Handler Txt.Text
-- | receive and handle unsubscription requests regarding a handled tag
tagUnsubscribe :: DHT d => PostService d -> Text -> Maybe Text -> Handler Text
tagUnsubscribe serv hashtag origin = do
responsible <- liftIO $ isResponsibleFor (baseDHT serv) (hashtagToId hashtag)
if not responsible
@ -334,8 +348,15 @@ tagUnsubscribe serv hashtag origin = do
clientAPI :: Proxy PostServiceAPI
clientAPI = Proxy
relayInboxClient :<|> subscriptionDeliveryClient :<|> postFetchClient :<|> postMultiFetchClient :<|> postInboxClient :<|> tagDeliveryClient :<|> tagSubscribeClient :<|> tagUnsubscribeClient = client clientAPI
relayInboxClient
:<|> subscriptionDeliveryClient
:<|> postFetchClient
:<|> postMultiFetchClient
:<|> postInboxClient
:<|> tagDeliveryClient
:<|> tagSubscribeClient
:<|> tagUnsubscribeClient
= client clientAPI
-- | Deliver the subscriber list of all hashtags in the interval [fromTag, toTag]
@ -405,10 +426,12 @@ clientSubscribeTo serv tag = do
Left (FailureResponse _ fresp)
|(HTTPT.statusCode . responseStatusCode $ fresp) == 410 && allowRetry -> do -- responsibility gone, force new lookup
newRes <- forceLookupKey (baseDHT serv) (Txt.unpack tag)
--putStrLn $ "failed subscribing to " <> Txt.unpack tag <> " on " <> foundHost
doSubscribe newRes False
Left err -> pure . Left . show $ err
Right lease -> do
atomically . modifyTVar' (ownSubscriptions serv) $ HMap.insert (hashtagToId tag) (fromInteger lease)
--putStrLn $ "just subscribed to " <> Txt.unpack tag <> " on " <> foundHost
pure . Right $ lease
)
lookupResponse
@ -542,15 +565,37 @@ lookupTagSubscriptions :: Hashtag -> RingMap NodeID a -> Maybe a
lookupTagSubscriptions tag = rMapLookup (hashtagToId tag)
-- normalise the unicode representation of a string to NFC
normaliseTag :: Txt.Text -> Txt.Text
normaliseTag = Txt.fromStrict . normalize NFC . Txt.toStrict
-- normalise the unicode representation of a string to NFC and convert to lower case
normaliseTag :: Text -> Text
normaliseTag = Txt.toLower . Txt.fromStrict . normalize NFC . Txt.toStrict
-- | convert a hashtag to its representation on the DHT
hashtagToId :: Hashtag -> NodeID
hashtagToId = genKeyID . Txt.unpack
readUpToTChan :: Int -> TChan a -> STM [a]
readUpToTChan 0 _ = pure []
readUpToTChan n chan = do
readFromChan <- tryReadTChan chan
case readFromChan of
Nothing -> pure []
Just val -> do
moreReads <- readUpToTChan (pred n) chan
pure (val:moreReads)
readUpToTQueue :: Int -> TQueue a -> STM [a]
readUpToTQueue 0 _ = pure []
readUpToTQueue n q = do
readFromQueue <- tryReadTQueue q
case readFromQueue of
Nothing -> pure []
Just val -> do
moreReads <- readUpToTQueue (pred n) q
pure (val:moreReads)
-- | define how to convert all showable types to PlainText
-- No idea what I'm doing with these overlappable instances though ¯\_(ツ)_/¯
-- TODO: figure out how this overlapping stuff actually works https://downloads.haskell.org/~ghc/latest/docs/html/users_guide/glasgow_exts.html#instance-overlap
@ -562,36 +607,78 @@ instance {-# OVERLAPPABLE #-} Read a => MimeUnrender PlainText a where
-- ====== worker threads ======
-- TODO: make configurable
numParallelDeliveries = 10
launchWorkerThreads :: DHT d => PostService d -> IO ()
launchWorkerThreads serv = concurrently_
(processIncomingPosts serv)
$ concurrently_
(purgeSubscriptionsThread serv)
$ concurrently_
(fetchTagPosts serv)
(relayWorker serv)
-- | periodically remove expired subscription entries from relay subscribers
purgeSubscriptionsThread :: PostService d -> IO ()
purgeSubscriptionsThread serv = forever $ do
-- read config
now <- getPOSIXTime
let
purgeInterval = confSubscriptionExpiryTime (serviceConf serv) / 10
-- no need to atomically lock this, as newly incoming subscriptions do not
-- need to be purged
tagMap <- readTVarIO $ subscribers serv
forM_ tagMap $ \(subscriberMapSTM, _, _) ->
-- but each subscriberMap needs to be modified atomically
atomically . modifyTVar' subscriberMapSTM $ HMap.filter (\(_, ts) -> ts > now)
threadDelay $ fromEnum purgeInterval `div` 10^6
-- | process the pending relay inbox of incoming posts from the internal queue:
-- Look up responsible relay node for given hashtag and forward post to it
processIncomingPosts :: DHT d => PostService d -> IO ()
processIncomingPosts serv = forever $ do
-- blocks until available
-- TODO: process multiple in parallel
(tag, pID, pContent) <- atomically . readTQueue $ relayInQueue serv
let pIdUri = "http://" <> (Txt.pack . confServiceHost . serviceConf $ serv) <> ":" <> (fromString . show . confServicePort . serviceConf $ serv) <> "/post/" <> pID
lookupRes <- lookupKey (baseDHT serv) (Txt.unpack tag)
case lookupRes of
-- no vserver active => wait and retry
Nothing -> threadDelay $ 10 * 10^6
Just (responsibleHost, responsiblePort) -> do
resp <- runClientM (relayInboxClient tag $ pIdUri <> "," <> pContent) (mkClientEnv (httpMan serv) (BaseUrl Http responsibleHost (fromIntegral responsiblePort) ""))
case resp of
Left err -> do
putStrLn $ "Error: " <> show err
-- 410 error indicates outdated responsibility mapping
-- Simplification: just invalidate the mapping entry on all errors, force a re-lookup and re-queue the post
-- TODO: keep track of maximum retries
_ <- forceLookupKey (baseDHT serv) (Txt.unpack tag)
atomically . writeTQueue (relayInQueue serv) $ (tag, pID, pContent)
Right _ -> do
-- TODO: stats
-- idea for the experiment: each post publication makes the initial posting instance subscribe to all contained tags
now <- getPOSIXTime
subscriptionStatus <- HMap.lookup (hashtagToId tag) <$> readTVarIO (ownSubscriptions serv)
-- if not yet subscribed or subscription expires within 2 minutes, (re)subscribe to tag
when (maybe False (\subLease -> now - subLease < 120) subscriptionStatus) $
void $ clientSubscribeTo serv tag
deliveriesToProcess <- atomically $ do
readResult <- readUpToTQueue numParallelDeliveries $ relayInQueue serv
if null readResult
then retry
else pure readResult
runningJobs <- forM deliveriesToProcess $ \(tag, pID, pContent) -> async $ do
let pIdUri = "http://" <> (Txt.pack . confServiceHost . serviceConf $ serv) <> ":" <> (fromString . show . confServicePort . serviceConf $ serv) <> "/post/" <> pID
lookupRes <- lookupKey (baseDHT serv) (Txt.unpack tag)
case lookupRes of
-- no vserver active => wait and retry
Nothing -> threadDelay (10 * 10^6) >> pure (Left "no vserver active")
Just (responsibleHost, responsiblePort) -> do
resp <- runClientM (relayInboxClient tag $ pIdUri <> "," <> pContent) (mkClientEnv (httpMan serv) (BaseUrl Http responsibleHost (fromIntegral responsiblePort) ""))
case resp of
Left err -> do
-- 410 error indicates outdated responsibility mapping
-- Simplification: just invalidate the mapping entry on all errors, force a re-lookup and re-queue the post
-- TODO: keep track of maximum retries
_ <- forceLookupKey (baseDHT serv) (Txt.unpack tag)
atomically . writeTQueue (relayInQueue serv) $ (tag, pID, pContent)
pure . Left $ "Error: " <> show err
Right _ -> do
-- idea for the experiment: each post publication makes the initial posting instance subscribe to all contained tags
now <- getPOSIXTime
subscriptionStatus <- HMap.lookup (hashtagToId tag) <$> readTVarIO (ownSubscriptions serv)
-- if not yet subscribed or subscription expires within 5 minutes, (re)subscribe to tag
when (maybe True (\subLease -> now - subLease < 300) subscriptionStatus) $
void $ clientSubscribeTo serv tag
-- for evaluation, return the tag of the successfully forwarded post
pure $ Right tag
-- collect async results
results <- mapM waitCatch runningJobs
-- report the count of published posts for statistics
atomically . writeTQueue (statsQueue serv) $ StatsEvent PostPublishEvent (length . rights $ results) 0 -- hashtag published to doesn't matter
pure ()
-- | process the pending fetch jobs of delivered post IDs: Delivered posts are tried to be fetched from their URI-ID
@ -601,18 +688,197 @@ fetchTagPosts serv = forever $ do
-- TODO: batching, retry
-- TODO: process multiple in parallel
pIdUri <- atomically . readTQueue $ postFetchQueue serv
fetchReq <- HTTP.parseRequest . Txt.unpack $pIdUri
fetchReq <- HTTP.parseRequest . Txt.unpack $ pIdUri
resp <- try $ HTTP.httpLbs fetchReq (httpMan serv) :: IO (Either HTTP.HttpException (HTTP.Response BSUL.ByteString))
case resp of
Right response ->
if HTTPT.statusCode (HTTP.responseStatus response) == 200
then
-- success, TODO: statistics
putStrLn "post fetch success"
else
-- TODO error handling, retry
pure ()
-- TODO error handling, retry
--if HTTPT.statusCode (HTTP.responseStatus response) == 200
-- then
-- -- success, TODO: statistics
-- else
pure ()
Left _ ->
-- TODO error handling, retry
pure ()
relayWorker :: PostService d -> IO ()
relayWorker serv = forever $ do
-- atomically (to be able to retry) fold a list of due delivery actions
jobsToProcess <- atomically $ do
subscriptionMap <- readTVar $ subscribers serv
jobList <- D.toList <$> foldM (\jobAcc (subscriberMapSTM, _, tag) -> do
subscriberMap <- readTVar subscriberMapSTM
foldM (\jobAcc' ((subHost, subPort), (postChan, _)) -> do
postsToDeliver <- readUpToTChan 500 postChan
let postDeliveryAction = runClientM (tagDeliveryClient tag (Txt.unlines postsToDeliver)) (mkClientEnv (httpMan serv) (BaseUrl Http subHost (fromIntegral subPort) ""))
-- append relay push job to job list
pure $ if not (null postsToDeliver)
then jobAcc' `D.snoc` (do
deliveryResult <- postDeliveryAction
either
(const $ pure ())
-- on successful push, record that event for statistics
(const . atomically . writeTQueue (statsQueue serv) $ StatsEvent RelayDeliveryEvent (length postsToDeliver) (hashtagToId tag))
deliveryResult
pure deliveryResult
)
else jobAcc'
) jobAcc $ HMap.toList subscriberMap
) D.empty subscriptionMap
-- if no relay jobs, then retry
if null jobList
then retry
else pure jobList
-- when processing the list, send several deliveries in parallel
forM_ (chunksOf numParallelDeliveries jobsToProcess) $ \jobset -> do
runningJobs <- mapM async jobset
-- so far just dropping failed attempts, TODO: retry mechanism
results <- mapM waitCatch runningJobs
let
successfulResults = rights results
unsuccessfulResults = lefts results
unless (null unsuccessfulResults) $ putStrLn ("ERR: " <> show (length unsuccessfulResults) <> " failed deliveries!")
putStrLn $ "successfully relayed " <> show (length successfulResults)
pure ()
-- ======= statistics/measurement and logging =======
data StatsEventType = PostPublishEvent
| RelayReceiveEvent
| RelayDeliveryEvent
| IncomingPostFetchEvent
deriving (Enum, Show, Eq)
-- | Represents measurement event of a 'StatsEventType' with a count relevant for a certain key
data StatsEvent = StatsEvent StatsEventType Int NodeID
deriving (Show, Eq)
-- | measured rates of relay performance
-- TODO: maybe include other metrics in here as well, like number of subscribers?
data RelayStats = RelayStats
{ relayReceiveRates :: RingMap NodeID Double
-- ^ rate of incoming posts in the responsibility of this relay
, relayDeliveryRates :: RingMap NodeID Double
-- ^ rate of relayed outgoing posts
, postFetchRate :: Double -- no need to differentiate between tags
-- ^ number of post-fetches delivered
, postPublishRate :: Double
-- ^ rate of initially publishing posts through this instance
}
deriving (Show, Eq)
launchStatsThreads :: PostService d -> IO ()
launchStatsThreads serv = do
-- create shared accumulator
sharedAccum <- newTVarIO emptyStats
concurrently_
(accumulateStatsThread sharedAccum $ statsQueue serv)
(evaluateStatsThread serv sharedAccum)
-- | Read stats events from queue and add them to a shared accumulator.
-- Instead of letting the events accumulate in the queue and allocate linear memory, immediately fold the result.
accumulateStatsThread :: TVar RelayStats -> TQueue StatsEvent -> IO ()
accumulateStatsThread statsAccumulator statsQ = forever $ do
-- blocks until stats event arrives
event <- atomically $ readTQueue statsQ
-- add the event number to current accumulator
atomically $ modifyTVar' statsAccumulator $ statsAdder event
-- | add incoming stats events to accumulator value
statsAdder :: StatsEvent -> RelayStats -> RelayStats
statsAdder event stats = case event of
StatsEvent PostPublishEvent num _ ->
stats {postPublishRate = fromIntegral num + postPublishRate stats}
StatsEvent RelayReceiveEvent num key ->
stats {relayReceiveRates = sumIfEntryExists key (fromIntegral num) (relayReceiveRates stats)}
StatsEvent RelayDeliveryEvent num key ->
stats {relayDeliveryRates = sumIfEntryExists key (fromIntegral num) (relayDeliveryRates stats)}
StatsEvent IncomingPostFetchEvent num _ ->
stats {postFetchRate = fromIntegral num + postFetchRate stats}
where
sumIfEntryExists = addRMapEntryWith (\newVal oldVal ->
let toInsert = fromJust $ extractRingEntry newVal
in
case oldVal of
KeyEntry n -> KeyEntry (n + toInsert)
ProxyEntry pointer (Just (KeyEntry n)) -> ProxyEntry pointer (Just (KeyEntry $ n + toInsert))
ProxyEntry pointer Nothing -> ProxyEntry pointer (Just newVal)
_ -> error "RingMap nested too deeply"
)
-- Periodically exchange the accumulated statistics with empty ones, evaluate them
-- and make them the current statistics of the service.
evaluateStatsThread :: PostService d -> TVar RelayStats -> IO ()
evaluateStatsThread serv statsAcc = getPOSIXTime >>= loop
where
loop previousTs = do
threadDelay $ confStatsEvalDelay (serviceConf serv)
-- get and reset the stats accumulator
summedStats <- atomically $ do
stats <- readTVar statsAcc
writeTVar statsAcc emptyStats
pure stats
-- as the transaction might retry several times, current time needs to
-- be read afterwards
now <- getPOSIXTime
-- evaluate stats rate and replace server stats
-- persistently store in a TVar so it can be retrieved later by the DHT
let timePassed = (now - previousTs) * fromIntegral (confSpeedupFactor $ serviceConf serv)
rateStats = evaluateStats timePassed summedStats
atomically $ writeTVar (loadStats serv) rateStats
-- and now what? write a log to file
-- format: total relayReceiveRates;total relayDeliveryRates;postFetchRate;postPublishRate; subscriberSum
-- later: current (reported) load, target load
subscriberSum <- sumSubscribers
TxtI.hPutStrLn (logFileHandle serv) $
format (fixed 9 % ";" % fixed 20 % ";" % fixed 20 % ";" % fixed 20 % ";" % fixed 20 % ";" % int )
(realToFrac now :: Double)
(sum . relayReceiveRates $ rateStats)
(sum . relayDeliveryRates $ rateStats)
(postPublishRate rateStats)
(postFetchRate rateStats)
subscriberSum
loop now
sumSubscribers = do
tagMap <- readTVarIO $ subscribers serv
foldM (\subscriberSum (subscriberMapSTM, _, _) -> do
subscriberMap <- readTVarIO subscriberMapSTM
pure $ subscriberSum + HMap.size subscriberMap
)
0 tagMap
-- | Evaluate the accumulated statistic events: Currently mostly calculates the event
-- rates by dividing through the collection time frame
evaluateStats :: POSIXTime -> RelayStats -> RelayStats
evaluateStats timeInterval summedStats =
-- first sum all event numbers, then divide through number of seconds passed to
-- get rate per second
RelayStats
{ relayReceiveRates = (/ intervalSeconds) <$> relayReceiveRates summedStats
, relayDeliveryRates = (/ intervalSeconds) <$> relayDeliveryRates summedStats
, postPublishRate = postPublishRate summedStats / intervalSeconds
, postFetchRate = postFetchRate summedStats / intervalSeconds
}
where
intervalSeconds = realToFrac timeInterval
emptyStats :: RelayStats
emptyStats = RelayStats
{ relayReceiveRates = emptyRMap
, relayDeliveryRates = emptyRMap
, postFetchRate = 0
, postPublishRate = 0
}

View file

@ -0,0 +1,37 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE InstanceSigs #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE TypeOperators #-}
module Hash2Pub.PostService.API where
import Data.Text.Lazy (Text)
import Servant
type PostServiceAPI = "relay" :> "inbox" :> Capture "hashtag" Text :> ReqBody '[PlainText] Text :> PutCreated '[PlainText] NoContent
-- delivery endpoint at responsible relay for delivering posts of $tag for distribution
:<|> "relay" :> "subscribers" :> Capture "senderID" Integer :> ReqBody '[PlainText] Text :> PostNoContent '[PlainText] Text
-- endpoint for delivering the subscriptions and outstanding queue
:<|> "post" :> Capture "postid" Text :> Get '[PlainText] Text
-- fetch endpoint for posts, full post ID is http://$domain/post/$postid
:<|> "posts" :> ReqBody '[PlainText] Text :> Post '[PlainText] Text
-- endpoint for fetching multiple posts at once
:<|> "posts" :> "inbox" :> ReqBody '[PlainText] Text :> PutCreated '[PlainText] NoContent
-- delivery endpoint of newly published posts of the relay's instance
:<|> "tags" :> Capture "hashtag" Text :> ReqBody '[PlainText] Text :> PostCreated '[PlainText] Text
-- delivery endpoint for posts of $tag at subscribing instance
:<|> "tags" :> Capture "hashtag" Text :> "subscribe" :> Header "Origin" Text :> Get '[PlainText] Integer
-- endpoint for subscribing the instance specified in
-- the Origin header to $hashtag.
-- Returns subscription lease time in seconds.
:<|> "tags" :> Capture "hashtag" Text :> "unsubscribe" :> Header "Origin" Text :> Get '[PlainText] Text
-- endpoint for unsubscribing the instance specified in
-- the Origin header to $hashtag
-- | needed for guiding type inference
exposedPostServiceAPI :: Proxy PostServiceAPI
exposedPostServiceAPI = Proxy

View file

@ -1,7 +1,5 @@
module Hash2Pub.ProtocolTypes where
import qualified Data.Map as Map
import Data.Maybe (mapMaybe)
import qualified Data.Set as Set
import Data.Time.Clock.POSIX (POSIXTime)

View file

@ -23,7 +23,30 @@ instance (Bounded k, Ord k, Eq a) => Eq (RingMap k a) where
a == b = getRingMap a == getRingMap b
instance (Bounded k, Ord k, Show k, Show a) => Show (RingMap k a) where
show rmap = shows "RingMap " (show $ getRingMap rmap)
show rmap = shows ("RingMap " :: String) (show $ getRingMap rmap)
instance (Bounded k, Ord k) => Functor (RingMap k) where
-- | map a function over all payload values of a 'RingMap'
fmap f = RingMap . Map.map traversingF . getRingMap
where
traversingF (KeyEntry a) = KeyEntry (f a)
traversingF (ProxyEntry pointer (Just entry)) = ProxyEntry pointer (Just $ traversingF entry)
traversingF (ProxyEntry pointer Nothing) = ProxyEntry pointer Nothing
instance (Bounded k, Ord k) => Foldable (RingMap k) where
foldr f initVal = Map.foldr traversingFR initVal . getRingMap
where
traversingFR (KeyEntry a) acc = f a acc
traversingFR (ProxyEntry _ Nothing) acc = acc
traversingFR (ProxyEntry _ (Just entry)) acc = traversingFR entry acc
foldl f initVal = Map.foldl traversingFL initVal . getRingMap
where
traversingFL acc (KeyEntry a) = f acc a
traversingFL acc (ProxyEntry _ Nothing) = acc
traversingFL acc (ProxyEntry _ (Just entry)) = traversingFL acc entry
-- | entry of a 'RingMap' that holds a value and can also
-- wrap around the lookup direction at the edges of the name space.
@ -133,7 +156,7 @@ rMapLookupPred :: (Bounded k, Ord k, Num k)
rMapLookupPred = lookupWrapper Map.lookupLT Map.lookupLE Backwards
addRMapEntryWith :: (Bounded k, Ord k)
=> (RingEntry k a -> RingEntry k a -> RingEntry k a)
=> (RingEntry k a -> RingEntry k a -> RingEntry k a) -- ^ f new_value mold_value
-> k -- ^ key
-> a -- ^ value
-> RingMap k a
@ -207,7 +230,7 @@ takeEntriesUntil_ :: (Integral i, Bounded k, Ord k)
-> Maybe i -- possible number limit
-> [a]
-> [a]
takeEntriesUntil_ rmap' getterFunc' havingReached previousEntry (Just remaining) takeAcc
takeEntriesUntil_ _rmap' _getterFunc' _havingReached _previousEntry (Just remaining) takeAcc
-- length limit reached
| remaining <= 0 = takeAcc
takeEntriesUntil_ rmap' getterFunc' havingReached previousEntry numLimit takeAcc =