Compare commits

..

121 commits

Author SHA1 Message Date
Trolli Schmittlauch b46f66e2c0 update Readme with latest branch name and pointer to SocialHub 2021-08-16 20:12:53 +02:00
Trolli Schmittlauch ea14ff9b09 update ghc to 8.6.4, nixpkgs base to 20.09
- relaxes some version constraints as dirty update quickfix
- removes hie integration as that project is abandoned, todo: switch to
  haskell-languageserver instead
2021-01-01 14:33:03 +01:00
Trolli Schmittlauch 9d8df6d3d8 make the multithread-runtime use all cores by default 2020-10-02 02:36:02 +02:00
Trolli Schmittlauch d7355aa04d increase HTTP timeout for initial post publication to 60 seconds
After a while, experiments made some publication events time-out.
Increasing the timeout just in case, although it i likely to be a mere symptom but the core fault.
2020-09-22 19:47:39 +02:00
Trolli Schmittlauch d8b2186016 make inclusion of HIE overlay conditional as well 2020-09-21 22:16:02 +02:00
Trolli Schmittlauch b48b7a7bba Merge branch 'measurement_logging' into mainline
closes #11, implements #60
2020-09-17 02:19:04 +02:00
Trolli Schmittlauch eee40ce4fb add log messages for failed relays as well 2020-09-17 02:17:57 +02:00
Trolli Schmittlauch 556b69d887 increase subscription lease to 1 (simulated) day for achieving higher subscriber numbers 2020-09-16 16:07:39 +02:00
Trolli Schmittlauch f5de7601bb do not store published posts for reducing memory consumption 2020-09-16 13:49:26 +02:00
Trolli Schmittlauch a2f268d374 improve logging: line buffering, time stamps
contributes to #60
2020-09-16 01:54:50 +02:00
Trolli Schmittlauch bb17b136d6 increase stabilise interval 2020-09-16 01:54:40 +02:00
Trolli Schmittlauch c036dea7f9 periodically purge expired subscriptions 2020-09-14 15:49:44 +02:00
Trolli Schmittlauch a0e7142a7d report number of subscriptions 2020-09-14 14:57:25 +02:00
Trolli Schmittlauch 3c28cde942 catch and print all Socket bind exceptions 2020-09-12 15:45:03 +02:00
Trolli Schmittlauch 1fc264a226 manage logging via file handle
reason: `appendFile` combined with lazy evaluation lead to exhaustion of
open file descriptors, as each file is opened again for each write and
due to lazy evaluation is kept open multiple times.
2020-09-12 12:37:41 +02:00
Trolli Schmittlauch da579a0756 decrease logging verbosity 2020-09-11 00:39:14 +02:00
Trolli Schmittlauch e12d8ef70a properly format stats log numbers: no e-notation 2020-09-11 00:39:14 +02:00
Trolli Schmittlauch 0f9727c05a log the post rates instead of the absolute sums 2020-09-10 22:41:06 +02:00
Trolli Schmittlauch 34ecdd66e1 make stats measurement delay configurable, take speedup into account 2020-09-10 21:23:33 +02:00
Trolli Schmittlauch 8f917130c4 tag normalisation includes lower case conversion 2020-09-10 13:14:48 +02:00
Trolli Schmittlauch 3ac89d301c bugfix: subscribe as default if not subscribed yet, when posting to a tag 2020-09-10 13:14:23 +02:00
Trolli Schmittlauch 3c76544afb launch background worker threads 2020-09-10 12:00:17 +02:00
Trolli Schmittlauch f8d30d0cc4 report post fetches to statistics 2020-09-09 19:55:34 +02:00
Trolli Schmittlauch 620e998876 report incoming relay posts to statistics 2020-09-09 19:25:48 +02:00
Trolli Schmittlauch 85d10f6773 report published posts to statistics 2020-09-09 18:50:55 +02:00
Trolli Schmittlauch e3a8912360 process incoming posts in parallel 2020-09-09 18:50:45 +02:00
Trolli Schmittlauch 12fcd13754 annotate the PostService server/ request-handler functions 2020-09-09 18:01:51 +02:00
Trolli Schmittlauch 72eca0f4fe log metrics to file
contributes to #60
2020-09-09 17:22:20 +02:00
Trolli Schmittlauch 0ffe9effc0 refactor relay processing to STM-retry instead of busy-wait 2020-09-09 14:24:34 +02:00
Trolli Schmittlauch 2b39648a77 actually implement simple relaying of posts
was still missing for #41
2020-09-09 11:51:09 +02:00
Trolli Schmittlauch df479982fa make RingMap instance of Functor and Foldable 2020-09-08 08:46:36 +02:00
Trolli Schmittlauch c536994afe re-format Servant client pattern matching 2020-09-07 16:35:59 +02:00
Trolli Schmittlauch 5c338b9cd7 split up stats summing and evaluating, launch threads 2020-09-07 16:27:56 +02:00
Trolli Schmittlauch c823e6357a accumulate all statistic/ measurement events to a measurement summary
- RingMap can now be mapped over
2020-09-07 13:00:15 +02:00
Trolli Schmittlauch 6b166ac4ca fixup! Merge branch 'measurement_logging' into mainline 2020-09-07 10:32:58 +02:00
Trolli Schmittlauch 4d2d6faf1b Merge pull request 'Improve general readability' (#69) from Hecate/Hash2Pub:readability into mainline
brings several readability enhancements:

- Text is always unqualified
- The Servant API declaration is moved to its own file
- Redundant imports are removed
2020-09-05 16:27:23 +02:00
Hécate 7d833e064b Improve readability 2020-09-05 15:45:47 +02:00
Trolli Schmittlauch fa78c6fc43 clarify different nix-shell environments in readme 2020-09-05 15:01:46 +02:00
Hécate d3e5eac5c5 Unsused imports and syntax error 2020-09-05 12:41:18 +02:00
Trolli Schmittlauch 60f5780742 Merge branch 'measurement_logging' into mainline
more values are parametrised through FediChordConfig
2020-09-05 12:31:40 +02:00
Trolli Schmittlauch c9b0e66110 scale request timeout with speedup and pass it directly to function 2020-09-05 12:30:31 +02:00
Trolli Schmittlauch 4f08d33d2e make all delays configurable and scale them according to a speedup factor 2020-09-05 12:30:31 +02:00
Trolli Schmittlauch 20050654bc make passing bootstrap information optional
reason: allow the first node to start without having to wait for a
timeout
part of #58
2020-09-03 11:20:38 +02:00
Trolli Schmittlauch 59beb3441f instrumentation script executes the prepared schedule
- reads CSV schedule from file
- sends the given schedule of post events
- not thoroughly tested yet

implements #59
2020-09-02 21:37:01 +02:00
Trolli Schmittlauch 1aee41db88 enable compiler optimisation 2020-08-31 13:37:40 +02:00
Trolli Schmittlauch 3c1652d86d prototype instrumentation of periodically sending posts to a test setup of 3 nodes
- contributes to #59
2020-08-27 12:01:08 +02:00
Trolli Schmittlauch 1b5fc039b3 Merge branch 'fix_networking': fix some DHT joins and crashes
several major flaws in the basic DHT communication had been discovered,
see individual commits for details
2020-08-27 00:39:08 +02:00
Trolli Schmittlauch 1a962f1500 stylish run 2020-08-27 00:33:19 +02:00
Trolli Schmittlauch ab9d593a1b bugfix: fix wrong partial Response sender access
- replaces improper record field access of `sender`, only existing in a
  Request, by `senderID` of a Response
- fixes the resulting exception-crash
- adds new function that enqueues a verification mark and timestamp bump
  of an existing cache entry
2020-08-27 00:27:36 +02:00
Trolli Schmittlauch f1b15d5a9e bugfix: fix join by adding join node and waiting for it
- additionally to adding neighbours of join node, add the join node
  itself as a neighbour as well
- wait for migrations from the node
2020-08-26 17:43:32 +02:00
Trolli Schmittlauch fc8aa3e330 bugfix: properly process QueryID responses so FOUND is conserved
fixes dproper discovery of announced responsibility by FOUND
2020-08-25 22:01:01 +02:00
Trolli Schmittlauch b23201a49c Make key lookups fail after request exhaustion instead of providing default
Returning the own node as a default does not make sense in all contexts:
Especially for bootstrap joining this can be harmful, so signalling
instead that the lookup failed makes distinguishing on a case by case
basis possible.

Also contributes to #57
2020-08-25 12:51:33 +02:00
Trolli Schmittlauch 6c5e40f8ad fix wrong passing of arguments in receive-loop part checking 2020-08-24 15:28:06 +02:00
Trolli Schmittlauch 3bd4cb667d explicitly pass socket in send-receive-loop 2020-08-24 10:02:45 +02:00
Trolli Schmittlauch 4ba592d8a2 bugfix: DHT request timeout unit is milliseconds 2020-08-23 15:21:24 +02:00
Trolli Schmittlauch cd8ea07600 bugfix: make unjoined nodes consider all IDs to be their responsibility 2020-08-23 13:04:58 +02:00
Trolli Schmittlauch 2b418189a6 use hard-coded defaults for DHT request timeout and retries 2020-08-23 12:06:26 +02:00
Trolli Schmittlauch c3b1aad1c7 abstract away the hashtag -> NodeID conversion 2020-08-21 23:55:20 +02:00
Trolli Schmittlauch 75c1932ef6 send fetchable post URIs as ID 2020-08-21 23:47:42 +02:00
Trolli Schmittlauch 5511026c8d reduce logging verbosity 2020-08-21 14:40:29 +02:00
Trolli Schmittlauch f330ff1070 successful post publishing with MonadState and random relay selection 2020-08-21 12:31:50 +02:00
Trolli Schmittlauch 32734102cd improve documentation of clientPublishPost 2020-08-20 18:13:50 +02:00
Trolli Schmittlauch 70145bc544 bump nixpkgs revision 2020-08-20 15:59:04 +02:00
Trolli Schmittlauch 24088581fe bump nixpkgs revision 2020-08-20 15:58:35 +02:00
Trolli Schmittlauch 2548b6a507 automatically subscribe when publishing to a tag 2020-08-20 11:49:23 +02:00
Trolli Schmittlauch 2ee40a7f64 start working on the experiment runner #59 2020-08-19 15:49:39 +02:00
Trolli Schmittlauch b48251d29d Merge branch 'data_migration': closes #36
this simple implementation still contains minor design issues, for which
tickets have been filed though
2020-08-18 12:05:55 +02:00
Trolli Schmittlauch fce5ff9153 implement service data migration for stabilise 2020-08-18 00:19:21 +02:00
Trolli Schmittlauch 969f6d7fc2 fix tests 2020-08-17 13:39:22 +02:00
Trolli Schmittlauch 6982a0b245 indicate in LeaveRequest whether to expect a migration
this information is used to decide whether to await an incoming
migration in `respondLeave`
2020-08-17 12:36:02 +02:00
Trolli Schmittlauch b8cedada48 prevent threads not awaiting migration from blocking their response 2020-08-17 11:37:04 +02:00
Trolli Schmittlauch c49c1a89c9 wait for migration to complete on join
also clean up migration entry after success
2020-08-17 00:22:48 +02:00
Trolli Schmittlauch 414564705a possibility to wait for a migration to complete 2020-08-16 23:26:31 +02:00
Trolli Schmittlauch 581757965a trigger service data migration at join 2020-08-16 17:53:56 +02:00
Trolli Schmittlauch 470ce6f39a correct the slice of transfered tags at leave 2020-08-15 23:58:47 +02:00
Trolli Schmittlauch 4302452d18 implement vserver leave and trigger data transfer initiation
still unused though
contributes to #36
2020-08-15 22:56:16 +02:00
Trolli Schmittlauch d2e4359a21 rename join function to clarify it just joining a single vserver 2020-08-15 17:37:14 +02:00
Trolli Schmittlauch 8db8907163 filter out spoofed requests for important operations like Join, Leave, Stabilise 2020-08-15 17:19:53 +02:00
Trolli Schmittlauch 5f7ca23f71 add missing leave request sending function 2020-08-14 22:59:28 +02:00
Trolli Schmittlauch 0ecad38748 merge implemented mock relay service API
closes #32 #41
2020-08-14 11:29:50 +02:00
Trolli Schmittlauch 4339cace20 function for initially publishing a post 2020-08-14 11:06:58 +02:00
Trolli Schmittlauch bf277c5a73 unsubsribe from tag 2020-08-13 23:50:33 +02:00
Trolli Schmittlauch 402378a78b signal and handle non-responsibility to subscriptions 2020-08-13 23:44:24 +02:00
Trolli Schmittlauch e646045ab2 include port in Origin header 2020-08-13 21:57:28 +02:00
Trolli Schmittlauch e9ae258dde subscribe to tag 2020-08-13 21:12:39 +02:00
Trolli Schmittlauch 375014812a use a shared HTTP manager for requests 2020-08-13 19:05:38 +02:00
Trolli Schmittlauch bdb00a32f3 add nix shell environment without HIE for smaller foot print 2020-08-13 13:09:48 +02:00
Trolli Schmittlauch dcd4a7b563 add nix shell environment without HIE for smaller foot print 2020-08-13 13:08:38 +02:00
Trolli Schmittlauch 580410e0b4 simple post fetch worker thread 2020-08-13 13:07:50 +02:00
Trolli Schmittlauch c1ce386b65 send prepared subscriptions and clean up on success 2020-08-12 15:23:10 +02:00
Trolli Schmittlauch 2e88a4079b extract and build subscriber payload for sending 2020-08-12 14:07:19 +02:00
Trolli Schmittlauch 1d808b6776 fix typo 2020-08-12 12:16:20 +02:00
Trolli Schmittlauch 1258f673da flush responsibility cache and retry in post queue delivery 2020-08-12 12:07:41 +02:00
Trolli Schmittlauch 96c1963a4f actually check own responsibility for tags before accepting posts 2020-08-11 00:16:10 +02:00
Trolli Schmittlauch 7036867ae0 implemented first Servant client query 2020-08-03 22:50:48 +02:00
Trolli Schmittlauch 20e51ecca4 define API client functions 2020-08-02 14:59:03 +02:00
Trolli Schmittlauch 8faa9dc016 fix test by providing a MockService 2020-08-01 18:58:49 +02:00
Trolli Schmittlauch 89706f688a server endpoint for tag unsubscription 2020-08-01 11:18:16 +02:00
Trolli Schmittlauch 7280f251b5 server endpoint for tag subscription 2020-08-01 11:00:38 +02:00
Trolli Schmittlauch 7d7fa3b52a fix haddock parsing 2020-07-31 17:49:52 +02:00
Trolli Schmittlauch 50044673a6 server endpoint for tag-post delivery 2020-07-31 17:46:33 +02:00
Trolli Schmittlauch e3c7faa80b properly initialise RealNode with service and vserver data, set up reference 2020-07-31 16:54:19 +02:00
Trolli Schmittlauch 5ffe1b074e add reference from RealNode to Service
This required to make both RealNode(STM) and LocalNodeState(STM) parameterisable
polymorphic types
2020-07-30 02:19:52 +02:00
Trolli Schmittlauch 4bf8091143 fix type signature of fediChordInit 2020-07-30 01:30:42 +02:00
Trolli Schmittlauch 98ca0ff13e service config, integrate service launch into DHT launch
TODO: hold a reference from DHT to service
2020-07-30 01:23:03 +02:00
Trolli Schmittlauch da47f8062f add lease time to subscription entries 2020-07-29 23:06:07 +02:00
Trolli Schmittlauch ad52a017aa add relay inbox endpoint 2020-07-29 22:15:14 +02:00
Trolli Schmittlauch bd70e2dff0 implement multiple post fetch (with placeholder content) 2020-07-29 00:06:27 +02:00
Trolli Schmittlauch 63bc06a88e implement post fetch (with placeholder content) 2020-07-28 23:45:21 +02:00
Trolli Schmittlauch 970c94ff0d set up subscription data structures and transfer subscription endpoint 2020-07-28 23:17:26 +02:00
Trolli Schmittlauch 3b65757406 worker thread for processing incoming posts in background, started together with web server 2020-07-28 02:12:15 +02:00
Trolli Schmittlauch 736815ea83 normalise hastag unicode representation of incoming posts 2020-07-27 21:49:49 +02:00
Trolli Schmittlauch daae9d0b38 process and enqueue incoming posts 2020-07-27 21:39:49 +02:00
Trolli Schmittlauch 04423171fd define data types for post and subscription storage 2020-07-27 13:20:15 +02:00
Trolli Schmittlauch 7878c67635 adjust rest of code to refactored RingMap 2020-07-27 00:37:31 +02:00
Trolli Schmittlauch 988144e9e7 further relax constrains on RingMap
key now needs to be explicitly given at insert, instead of
deriving it from the value. This makes it possible to store values where
a key cannot be extracted from (HasKeyID)

contributes to #62, #32, #41
2020-07-26 18:55:55 +02:00
Trolli Schmittlauch 6349e05033 enable HIE only in the shell environment, but not by default 2020-07-26 16:38:56 +02:00
Trolli Schmittlauch 473ccb631d add hie as language server to dev environment 2020-07-26 16:26:26 +02:00
Trolli Schmittlauch 91ac4ca7e1 Merge pull request 'refactorRingMap' (#63) from refactorRingMap into mainline
This PR likely enables too many LanguageExtensions and could possibly be simplified, see 63.

Merging this for now though as tests run through fine.
2020-07-26 00:07:53 +02:00
Trolli Schmittlauch 1dfa7a5b8e Merge commit 'd55c2f1f1b' into mainline 2020-07-26 00:02:36 +02:00
Trolli Schmittlauch da1b8f4b9d define typeclasses for interfacing between PostService and DHT 2020-07-25 23:54:27 +02:00
17 changed files with 1625 additions and 433 deletions

View file

@ -89,8 +89,8 @@ StabiliseResponsePayload ::= SEQUENCE {
LeaveRequestPayload ::= SEQUENCE {
successors SEQUENCE OF NodeState,
predecessors SEQUENCE OF NodeState
-- ToDo: transfer of own data to newly responsible node
predecessors SEQUENCE OF NodeState,
doMigration BOOLEAN
}
LeaveResponsePayload ::= NULL -- just a confirmation

View file

@ -46,8 +46,8 @@ category: Network
extra-source-files: CHANGELOG.md
common deps
build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers
ghc-options: -Wall
build-depends: base >=4, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=3.1, time, cmdargs ^>= 0.10, cryptonite, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms, http-client, http-types, unbounded-delays, dlist, formatting
ghc-options: -Wall -Wpartial-fields -O2
@ -55,10 +55,10 @@ library
import: deps
-- Modules exported by the library.
exposed-modules: Hash2Pub.FediChord, Hash2Pub.FediChordTypes, Hash2Pub.DHTProtocol, Hash2Pub.ASN1Coding, Hash2Pub.ProtocolTypes, Hash2Pub.PostService, Hash2Pub.ServiceTypes, Hash2Pub.RingMap
exposed-modules: Hash2Pub.FediChord, Hash2Pub.FediChordTypes, Hash2Pub.DHTProtocol, Hash2Pub.ASN1Coding, Hash2Pub.ProtocolTypes, Hash2Pub.PostService, Hash2Pub.RingMap
-- Modules included in this library but not exported.
other-modules: Hash2Pub.Utils
other-modules: Hash2Pub.Utils, Hash2Pub.PostService.API
-- LANGUAGE extensions used by modules in this package.
other-extensions: GeneralizedNewtypeDeriving, DataKinds, OverloadedStrings
@ -91,6 +91,20 @@ executable Hash2Pub
-- Base language which the package is written in.
default-language: Haskell2010
ghc-options: -threaded -rtsopts -with-rtsopts=-N
executable Experiment
-- experiment runner
import: deps
build-depends: Hash2Pub
main-is: Experiment.hs
hs-source-dirs: app
default-language: Haskell2010
ghc-options: -threaded

View file

@ -1,7 +1,7 @@
# Hash2Pub
***This is heavily WIP and does not provide any useful functionality yet***.
I aim for always having the master branch at a state where it builds and tests pass.
I aim for always having the `mainline` branch in a state where it builds and tests pass.
A fully-decentralised relay for global hashtag federation in [ActivityPub](https://activitypub.rocks) based on a distributed hash table.
It allows querying and subscribing to all posts of a certain hashtag and is implemented in Haskell.
@ -10,8 +10,13 @@ This is the practical implementation of the concept presented in the paper [Dece
The ASN.1 module schema used for DHT messages can be found in `FediChord.asn1`.
For further questions and discussins, please refer to the **Hash2Pub topic in [SocialHub](https://socialhub.activitypub.rocks/c/software/hash2pub/48)**.
## Building
The project and its developent environment are built with [Nix](https://nixos.org/nix/).
The development environment can be entered with `nix-shell`. Then the project can be built with `cabal build` from within the environment, or using `nix-shell --command "cabal build"` to do both steps at once.
The development environment can be entered with `nix-shell shell-minimal.nix`. Then the project can be built with `cabal build` from within the environment, or using `nix-shell --command "cabal build" shell-minimal.nix` to do both steps at once.
While the `shell-minimal.nix` environment contains everything necessary for building and testing this project, the `shell.nix` additionally contains the Haskell IDE engine *hie* and the documentation for all used Haskell packages for more convenient development.
Be aware that these need to be build from source and can take a very long time to build.

51
app/Experiment.hs Normal file
View file

@ -0,0 +1,51 @@
{-# LANGUAGE OverloadedStrings #-}
module Main where
import Control.Concurrent
import Control.Monad (forM_)
import qualified Data.Text.Lazy as Txt
import qualified Data.Text.Lazy.IO as TxtI
import qualified Network.HTTP.Client as HTTP
import System.Environment (getArgs)
import Hash2Pub.PostService (Hashtag, clientPublishPost)
-- configuration constants
timelineFile = "../simulationData/inputs/generated/timeline_sample.csv"
main :: IO ()
main = do
-- read CLI parameters
speedupStr : _ <- getArgs
-- read and parse timeline schedule
-- relying on lazyness of HaskellIO, hoping it does not introduce too strong delays
postEvents <- parseSchedule <$> TxtI.readFile timelineFile
-- actually schedule and send the post events
executeSchedule (read speedupStr) postEvents
pure ()
parseSchedule :: Txt.Text
-> [(Int, Hashtag, (String, Int))] -- ^ [(delay in microseconds, hashtag, (hostname, port))]
parseSchedule = fmap (parseEntry . Txt.split (== ';')) . Txt.lines
where
parseEntry [delayT, contactT, tag] =
(read $ Txt.unpack delayT, tag, read $ Txt.unpack contactT)
parseEntry entry = error $ "invalid schedule input format: " <> show entry
executeSchedule :: Int -- ^ speedup factor
-> [(Int, Hashtag, (String, Int))] -- ^ [(delay in microseconds, hashtag, (hostname, port))]
-> IO ()
executeSchedule speedup events = do
-- initialise HTTP manager
httpMan <- HTTP.newManager $ HTTP.defaultManagerSettings { HTTP.managerResponseTimeout = HTTP.responseTimeoutMicro 60000000 }
forM_ events $ \(delay, tag, (pubHost, pubPort)) -> do
_ <- forkIO $
clientPublishPost httpMan pubHost pubPort ("foobar #" <> tag)
>>= either putStrLn (const $ pure ())
-- while threadDelay gives only minimum delay guarantees, let's hope the
-- additional delays are negligible
-- otherwise: evaluate usage of https://hackage.haskell.org/package/schedule-0.3.0.0/docs/Data-Schedule.html
threadDelay $ delay `div` speedup

View file

@ -10,15 +10,17 @@ import Data.IP (IPv6, toHostAddress6)
import System.Environment
import Hash2Pub.FediChord
import Hash2Pub.FediChordTypes
import Hash2Pub.PostService (PostService (..))
main :: IO ()
main = do
-- ToDo: parse and pass config
-- probably use `tomland` for that
conf <- readConfig
(fConf, sConf) <- readConfig
-- TODO: first initialise 'RealNode', then the vservers
-- ToDo: load persisted caches, bootstrapping nodes …
(serverSock, thisNode) <- fediChordInit conf
(serverSock, thisNode) <- fediChordInit fConf (runService sConf :: DHT d => d -> IO (PostService d))
-- currently no masking is necessary, as there is nothing to clean up
nodeCacheWriterThread <- forkIO $ nodeCacheWriter thisNode
-- try joining the DHT using one of the provided bootstrapping nodes
@ -41,15 +43,38 @@ main = do
pure ()
readConfig :: IO FediChordConf
readConfig :: IO (FediChordConf, ServiceConf)
readConfig = do
confDomainString : ipString : portString : bootstrapHost : bootstrapPortString : _ <- getArgs
pure $ FediChordConf {
confDomain = confDomainString
confDomainString : ipString : portString : servicePortString : speedupString : remainingArgs <- getArgs
-- allow starting the initial node without bootstrapping info to avoid
-- waiting for timeout
let
speedup = read speedupString
confBootstrapNodes' = case remainingArgs of
bootstrapHost : bootstrapPortString : _ ->
[(bootstrapHost, read bootstrapPortString)]
_ -> []
fConf = FediChordConf
{ confDomain = confDomainString
, confIP = toHostAddress6 . read $ ipString
, confDhtPort = read portString
, confBootstrapNodes = [(bootstrapHost, read bootstrapPortString)]
--, confStabiliseInterval = 60
, confBootstrapSamplingInterval = 180
, confMaxLookupCacheAge = 300
, confBootstrapNodes = confBootstrapNodes'
, confStabiliseInterval = 80 * 10^6
, confBootstrapSamplingInterval = 180 * 10^6 `div` speedup
, confMaxLookupCacheAge = 300 / fromIntegral speedup
, confJoinAttemptsInterval = 60 * 10^6 `div` speedup
, confMaxNodeCacheAge = 600 / fromIntegral speedup
, confResponsePurgeAge = 60 / fromIntegral speedup
, confRequestTimeout = 5 * 10^6 `div` speedup
, confRequestRetries = 3
}
sConf = ServiceConf
{ confSubscriptionExpiryTime = 24*3600 / fromIntegral speedup
, confServicePort = read servicePortString
, confServiceHost = confDomainString
, confLogfilePath = "../simulationData/logs/" <> confDomainString <> ".log"
, confSpeedupFactor = speedup
, confStatsEvalDelay = 120 * 10^6 `div` speedup
}
pure (fConf, sConf)

View file

@ -1,14 +1,19 @@
{ pkgs ? import (
builtins.fetchGit {
name = "nixpkgs-pinned";
url = https://github.com/NixOS/nixpkgs/;
ref = "refs/heads/release-20.03";
rev = "da7ddd822e32aeebea00e97ab5aeca9758250a40";
}) {},
compiler ? "ghc865"
{
compiler ? "ghc884"
}:
let
pkgs = import (
builtins.fetchGit {
name = "nixpkgs-pinned";
url = https://github.com/NixOS/nixpkgs/;
ref = "refs/heads/release-20.09";
rev = "e065200fc90175a8f6e50e76ef10a48786126e1c";
}) {
# Pass no config for purity
config = {};
overlays = [];
};
hp = pkgs.haskell.packages."${compiler}";
src = pkgs.nix-gitignore.gitignoreSource [] ./.;
drv = hp.callCabal2nix "Hash2Pub" "${src}/Hash2Pub.cabal" {};

1
shell-minimal.nix Normal file
View file

@ -0,0 +1 @@
(import ./default.nix {withHIE = false;}).shell

View file

@ -38,6 +38,7 @@ splitPayload numParts pl@LeaveRequestPayload{} = [
LeaveRequestPayload {
leaveSuccessors = atDef [] (listInto numParts $ leaveSuccessors pl) (thisPart-1)
, leavePredecessors = atDef [] (listInto numParts $ leavePredecessors pl) (thisPart-1)
, leaveDoMigration = leaveDoMigration pl
} | thisPart <- [1..numParts] ]
splitPayload numParts pl@StabiliseResponsePayload{} = [
StabiliseResponsePayload {
@ -134,9 +135,8 @@ encodePayload payload'@LeaveRequestPayload{} =
<> [End Sequence
, Start Sequence]
<> concatMap encodeNodeState (leavePredecessors payload')
<> [End Sequence
, End Sequence]
-- currently StabiliseResponsePayload and LeaveRequestPayload are equal
<> [End Sequence]
<> [Boolean (leaveDoMigration payload'), End Sequence]
encodePayload payload'@StabiliseResponsePayload{} =
Start Sequence
: Start Sequence
@ -144,8 +144,7 @@ encodePayload payload'@StabiliseResponsePayload{} =
<> [End Sequence
, Start Sequence]
<> concatMap encodeNodeState (stabilisePredecessors payload')
<> [End Sequence
, End Sequence]
<> [End Sequence, End Sequence]
encodePayload payload'@StabiliseRequestPayload = [Null]
encodePayload payload'@QueryIDResponsePayload{} =
let
@ -415,9 +414,11 @@ parseLeaveRequest :: ParseASN1 ActionPayload
parseLeaveRequest = onNextContainer Sequence $ do
succ' <- onNextContainer Sequence (getMany parseNodeState)
pred' <- onNextContainer Sequence (getMany parseNodeState)
doMigration <- parseBool
pure $ LeaveRequestPayload {
leaveSuccessors = succ'
, leavePredecessors = pred'
, leaveDoMigration = doMigration
}
parseLeaveResponse :: ParseASN1 ActionPayload

View file

@ -19,6 +19,7 @@ module Hash2Pub.DHTProtocol
, sendQueryIdMessages
, requestQueryID
, requestJoin
, requestLeave
, requestPing
, requestStabilise
, lookupMessage
@ -34,21 +35,25 @@ module Hash2Pub.DHTProtocol
, ackRequest
, isPossibleSuccessor
, isPossiblePredecessor
, isInOwnResponsibilitySlice
, isJoined
, closestCachePredecessors
)
where
import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Concurrent.STM.TBQueue
import Control.Concurrent.STM.TQueue
import Control.Concurrent.STM.TVar
import Control.Exception
import Control.Monad (foldM, forM, forM_)
import Control.Monad (foldM, forM, forM_, void, when)
import Control.Monad.Except (MonadError (..), runExceptT)
import Control.Monad.IO.Class (MonadIO (..))
import qualified Data.ByteString as BS
import Data.Either (rights)
import Data.Foldable (foldl', foldr')
import Data.Foldable (foldl', foldr', foldrM)
import Data.Functor.Identity
import Data.IP (IPv6, fromHostAddress6,
toHostAddress6)
@ -73,10 +78,11 @@ import Hash2Pub.FediChordTypes (CacheEntry (..),
LocalNodeState (..),
LocalNodeStateSTM, NodeCache,
NodeID, NodeState (..),
RealNode (..),
RealNode (..), RealNodeSTM,
RemoteNodeState (..),
RingEntry (..), RingMap (..),
addRMapEntry, addRMapEntryWith,
Service (..), addRMapEntry,
addRMapEntryWith,
cacheGetNodeStateUnvalidated,
cacheLookup, cacheLookupPred,
cacheLookupSucc, genNodeID,
@ -92,7 +98,7 @@ import Debug.Trace (trace)
-- TODO: evaluate more fine-grained argument passing to allow granular locking
-- | look up an ID to either claim responsibility for it or return the closest l nodes from the local cache
queryLocalCache :: LocalNodeState -> NodeCache -> Int -> NodeID -> QueryResponse
queryLocalCache :: LocalNodeState s -> NodeCache -> Int -> NodeID -> QueryResponse
queryLocalCache ownState nCache lBestNodes targetID
-- as target ID falls between own ID and first predecessor, it is handled by this node
-- This only makes sense if the node is part of the DHT by having joined.
@ -102,9 +108,6 @@ queryLocalCache ownState nCache lBestNodes targetID
-- the closest succeeding node (like with the p initiated parallel queries
| otherwise = FORWARD $ closestSuccessor `Set.union` closestCachePredecessors (lBestNodes-1) targetID nCache
where
ownID = getNid ownState
preds = predecessors ownState
closestSuccessor :: Set.Set RemoteCacheEntry
closestSuccessor = maybe Set.empty (Set.singleton . toRemoteCacheEntry) $ cacheLookupSucc targetID nCache
@ -130,23 +133,25 @@ closestCachePredecessors remainingLookups lastID nCache
-- Looks up the successor of the lookup key on a 'RingMap' representation of the
-- predecessor list with the node itself added. If the result is the same as the node
-- itself then it falls into the responsibility interval.
isInOwnResponsibilitySlice :: HasKeyID a NodeID => a -> LocalNodeState -> Bool
isInOwnResponsibilitySlice lookupTarget ownNs = (getKeyID <$> rMapLookupSucc (getKeyID lookupTarget :: NodeID) predecessorRMap) == pure (getNid ownNs)
isInOwnResponsibilitySlice :: HasKeyID NodeID a => a -> LocalNodeState s -> Bool
isInOwnResponsibilitySlice lookupTarget ownNs = (fst <$> rMapLookupSucc (getKeyID lookupTarget :: NodeID) predecessorRMap) == pure (getNid ownNs)
where
predecessorList = predecessors ownNs
-- add node itself to RingMap representation, to distinguish between
-- responsibility of own node and predecessor
predecessorRMap = addRMapEntry (toRemoteNodeState ownNs) $ rMapFromList predecessorList
predecessorRMap = addRMapEntry (getKeyID ownRemote) ownRemote $ rMapFromList (keyValuePair <$> predecessorList) :: RingMap NodeID RemoteNodeState
ownRemote = toRemoteNodeState ownNs
closestPredecessor = headMay predecessorList
isPossiblePredecessor :: HasKeyID a NodeID => a -> LocalNodeState -> Bool
isPossiblePredecessor :: HasKeyID NodeID a => a -> LocalNodeState s -> Bool
isPossiblePredecessor = isInOwnResponsibilitySlice
isPossibleSuccessor :: HasKeyID a NodeID => a -> LocalNodeState -> Bool
isPossibleSuccessor lookupTarget ownNs = (getKeyID <$> rMapLookupPred (getKeyID lookupTarget :: NodeID) successorRMap) == pure (getNid ownNs)
isPossibleSuccessor :: HasKeyID NodeID a => a -> LocalNodeState s -> Bool
isPossibleSuccessor lookupTarget ownNs = (fst <$> rMapLookupPred (getKeyID lookupTarget :: NodeID) successorRMap) == pure (getNid ownNs)
where
successorList = successors ownNs
successorRMap = addRMapEntry (toRemoteNodeState ownNs) $ rMapFromList successorList
successorRMap = addRMapEntry (getKeyID ownRemote) ownRemote $ rMapFromList (keyValuePair <$> successorList)
ownRemote = toRemoteNodeState ownNs
closestSuccessor = headMay successorList
-- cache operations
@ -169,7 +174,8 @@ addCacheEntryPure now (RemoteCacheEntry ns ts) cache =
let
-- TODO: limit diffSeconds to some maximum value to prevent malicious nodes from inserting entries valid nearly until eternity
timestamp' = if ts <= now then ts else now
newCache = addRMapEntryWith insertCombineFunction (CacheEntry False ns timestamp') cache
newEntry = CacheEntry False ns timestamp'
newCache = addRMapEntryWith insertCombineFunction (getKeyID newEntry) newEntry cache
insertCombineFunction newVal@(KeyEntry (CacheEntry newValidationState newNode newTimestamp)) oldVal =
case oldVal of
ProxyEntry n _ -> ProxyEntry n (Just newVal)
@ -202,7 +208,7 @@ addNodeAsVerifiedPure :: POSIXTime
-> RemoteNodeState
-> NodeCache
-> NodeCache
addNodeAsVerifiedPure now node = addRMapEntry (CacheEntry True node now)
addNodeAsVerifiedPure now node = addRMapEntry (getKeyID node) (CacheEntry True node now)
@ -221,7 +227,7 @@ markCacheEntryAsVerified timestamp nid = RingMap . Map.adjust adjustFunc nid . g
-- | uses the successor and predecessor list of a node as an indicator for whether a
-- node has properly joined the DHT
isJoined :: LocalNodeState -> Bool
isJoined :: LocalNodeState s -> Bool
isJoined ns = not . all null $ [successors ns, predecessors ns]
-- | the size limit to be used when serialising messages for sending
@ -245,19 +251,20 @@ ackRequest _ _ = Map.empty
-- | Dispatch incoming requests to the dedicated handling and response function, and enqueue
-- the response to be sent.
handleIncomingRequest :: LocalNodeStateSTM -- ^ the handling node
handleIncomingRequest :: Service s (RealNodeSTM s)
=> LocalNodeStateSTM s -- ^ the handling node
-> TQueue (BS.ByteString, SockAddr) -- ^ send queue
-> Set.Set FediChordMessage -- ^ all parts of the request to handle
-> SockAddr -- ^ source address of the request
-> IO ()
handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do
putStrLn $ "handling incoming request: " <> show msgSet
ns <- readTVarIO nsSTM
-- add nodestate to cache
now <- getPOSIXTime
case headMay . Set.elems $ msgSet of
Nothing -> pure ()
Just aPart -> do
let (SockAddrInet6 _ _ sourceIP _) = sourceAddr
queueAddEntries (Identity $ RemoteCacheEntry (sender aPart) now) ns
-- distinguish on whether and how to respond. If responding, pass message to response generating function and write responses to send queue
maybe (pure ()) (
@ -265,17 +272,36 @@ handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do
)
=<< (case action aPart of
Ping -> Just <$> respondPing nsSTM msgSet
Join -> Just <$> respondJoin nsSTM msgSet
Join -> dropSpoofedIDs sourceIP nsSTM msgSet respondJoin
-- ToDo: figure out what happens if not joined
QueryID -> Just <$> respondQueryID nsSTM msgSet
-- only when joined
Leave -> if isJoined ns then Just <$> respondLeave nsSTM msgSet else pure Nothing
Stabilise -> if isJoined ns then Just <$> respondStabilise nsSTM msgSet else pure Nothing
Leave -> if isJoined ns then dropSpoofedIDs sourceIP nsSTM msgSet respondLeave else pure Nothing
Stabilise -> if isJoined ns then dropSpoofedIDs sourceIP nsSTM msgSet respondStabilise else pure Nothing
)
-- for single part request, response starts with part number 1. For multipart requests, response starts with part number n+1.
-- TODO: determine request type only from first part, but catch RecSelError on each record access when folding, because otherwise different request type parts can make this crash
-- TODO: test case: mixed message types of parts
where
-- | Filter out requests with spoofed node IDs by recomputing the ID using
-- the sender IP.
-- For valid (non-spoofed) sender IDs, the passed responder function is invoked.
dropSpoofedIDs :: HostAddress6 -- msg source address
-> LocalNodeStateSTM s
-> Set.Set FediChordMessage -- message parts of the request
-> (LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)) -- reponder function to be invoked for valid requests
-> IO (Maybe (Map.Map Integer BS.ByteString))
dropSpoofedIDs addr nsSTM' msgSet' responder =
let
aRequestPart = Set.elemAt 0 msgSet
senderNs = sender aRequestPart
givenSenderID = getNid senderNs
recomputedID = genNodeID addr (getDomain senderNs) (fromInteger $ getVServerID senderNs)
in
if recomputedID == givenSenderID
then Just <$> responder nsSTM' msgSet'
else pure Nothing
-- ....... response sending .......
@ -284,9 +310,8 @@ handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do
-- | execute a key ID lookup on local cache and respond with the result
respondQueryID :: LocalNodeStateSTM -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
respondQueryID :: LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
respondQueryID nsSTM msgSet = do
putStrLn "responding to a QueryID request"
-- this message cannot be split reasonably, so just
-- consider the first payload
let
@ -324,8 +349,7 @@ respondQueryID nsSTM msgSet = do
-- | Respond to a Leave request by removing the leaving node from local data structures
-- and confirming with response.
-- TODO: copy over key data from leaver and confirm
respondLeave :: LocalNodeStateSTM -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
respondLeave :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
respondLeave nsSTM msgSet = do
-- combine payload of all parts
let (requestPreds, requestSuccs) = foldr' (\msg (predAcc, succAcc) ->
@ -334,16 +358,15 @@ respondLeave nsSTM msgSet = do
)
([],[]) msgSet
aRequestPart = Set.elemAt 0 msgSet
senderID = getNid . sender $ aRequestPart
leaveSenderID = getNid . sender $ aRequestPart
responseMsg <- atomically $ do
nsSnap <- readTVar nsSTM
-- remove leaving node from successors, predecessors and NodeCache
writeTQueue (cacheWriteQueue nsSnap) $ deleteCacheEntry senderID
writeTQueue (cacheWriteQueue nsSnap) $ deleteCacheEntry leaveSenderID
writeTVar nsSTM $
-- add predecessors and successors of leaving node to own lists
setPredecessors (filter ((/=) senderID . getNid) $ requestPreds <> predecessors nsSnap)
. setSuccessors (filter ((/=) senderID . getNid) $ requestSuccs <> successors nsSnap) $ nsSnap
-- TODO: handle handover of key data
setPredecessors (filter ((/=) leaveSenderID . getNid) $ requestPreds <> predecessors nsSnap)
. setSuccessors (filter ((/=) leaveSenderID . getNid) $ requestSuccs <> successors nsSnap) $ nsSnap
let leaveResponse = Response {
requestID = requestID aRequestPart
, senderID = getNid nsSnap
@ -353,10 +376,14 @@ respondLeave nsSTM msgSet = do
, payload = Just LeaveResponsePayload
}
pure leaveResponse
-- if awaiting an incoming service data migration, collect the lock without blocking this thread
when (maybe False leaveDoMigration (payload aRequestPart)) $ do
ownService <- atomically $ nodeService <$> (readTVar nsSTM >>= (readTVar . parentRealNode))
void (forkIO $ waitForMigrationFrom ownService leaveSenderID)
pure $ serialiseMessage sendMessageSize responseMsg
-- | respond to stabilise requests by returning successor and predecessor list
respondStabilise :: LocalNodeStateSTM -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
respondStabilise :: LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
respondStabilise nsSTM msgSet = do
nsSnap <- readTVarIO nsSTM
let
@ -378,7 +405,7 @@ respondStabilise nsSTM msgSet = do
-- | respond to Ping request by returning all active vserver NodeStates
respondPing :: LocalNodeStateSTM -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
respondPing :: LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
respondPing nsSTM msgSet = do
-- TODO: respond with all active VS when implementing k-choices
nsSnap <- readTVarIO nsSTM
@ -395,18 +422,19 @@ respondPing nsSTM msgSet = do
}
pure $ serialiseMessage sendMessageSize pingResponse
-- this modifies node state, so locking and IO seems to be necessary.
-- Still try to keep as much code as possible pure
respondJoin :: LocalNodeStateSTM -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
respondJoin :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
respondJoin nsSTM msgSet = do
-- atomically read and modify the node state according to the parsed request
responseMsg <- atomically $ do
(dataMigration, responseMsg) <- atomically $ do
nsSnap <- readTVar nsSTM
cache <- readTVar $ nodeCacheSTM nsSnap
let
aRequestPart = Set.elemAt 0 msgSet
senderNS = sender aRequestPart
responsibilityLookup = queryLocalCache nsSnap cache 1 (getNid senderNS)
-- if not joined yet, attract responsibility for
-- all keys to make bootstrapping possible
responsibilityLookup = if isJoined nsSnap then queryLocalCache nsSnap cache 1 (getNid senderNS) else FOUND (toRemoteNodeState nsSnap)
thisNodeResponsible (FOUND _) = True
thisNodeResponsible (FORWARD _) = False
-- check whether the joining node falls into our responsibility
@ -430,33 +458,41 @@ respondJoin nsSTM msgSet = do
, payload = Just responsePayload
}
writeTVar nsSTM joinedNS
pure joinResponse
ownService <- nodeService <$> readTVar (parentRealNode nsSnap)
let
serviceDataMigrator = migrateData ownService (getNid nsSnap) lowerKeyBound (getNid senderNS) (getDomain senderNS, fromIntegral $ getServicePort senderNS)
lowerKeyBound = maybe (getNid nsSnap) getNid $ headMay (predecessors nsSnap)
pure (Just serviceDataMigrator, joinResponse)
-- otherwise respond with empty payload
else pure Response {
else pure (Nothing, Response {
requestID = requestID aRequestPart
, senderID = getNid nsSnap
, part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1
, isFinalPart = False
, action = Join
, payload = Nothing
}
})
-- as DHT response is required immediately, fork the service data migration push
-- into a new thread. That's kind of ugly but the best I can think of so far
when (isJust dataMigration) (forkIO (fromJust dataMigration >> pure ()) >> pure ())
pure $ serialiseMessage sendMessageSize responseMsg
-- TODO: notify service layer to copy over data now handled by the new joined node
-- ....... request sending .......
-- | send a join request and return the joined 'LocalNodeState' including neighbours
requestJoin :: NodeState a => a -- ^ currently responsible node to be contacted
-> LocalNodeStateSTM -- ^ joining NodeState
-> IO (Either String LocalNodeStateSTM) -- ^ node after join with all its new information
requestJoin :: (NodeState a, Service s (RealNodeSTM s)) => a -- ^ currently responsible node to be contacted
-> LocalNodeStateSTM s -- ^ joining NodeState
-> IO (Either String (LocalNodeStateSTM s)) -- ^ node after join with all its new information
requestJoin toJoinOn ownStateSTM = do
ownState <- readTVarIO ownStateSTM
prn <- readTVarIO $ parentRealNode ownState
srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ownState)
nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ownState)
let srcAddr = confIP nodeConf
bracket (mkSendSocket srcAddr (getDomain toJoinOn) (getDhtPort toJoinOn)) close (\sock -> do
-- extract own state for getting request information
responses <- sendRequestTo 5000 3 (\rid -> Request rid (toRemoteNodeState ownState) 1 True Join (Just JoinRequestPayload)) sock
responses <- sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid -> Request rid (toRemoteNodeState ownState) 1 True Join (Just JoinRequestPayload)) sock
(cacheInsertQ, joinedState) <- atomically $ do
stateSnap <- readTVar ownStateSTM
let
@ -481,25 +517,28 @@ requestJoin toJoinOn ownStateSTM = do
([], Set.empty, Set.empty)
responses
-- sort, slice and set the accumulated successors and predecessors
newState = setSuccessors (Set.elems succAccSet) . setPredecessors (Set.elems predAccSet) $ stateSnap
-- the contacted node itself is a successor as well and, with few
-- nodes, can be a predecessor as well
newState = setSuccessors (toRemoteNodeState toJoinOn:Set.elems succAccSet) . setPredecessors (toRemoteNodeState toJoinOn:Set.elems predAccSet) $ stateSnap
writeTVar ownStateSTM newState
pure (cacheInsertQ, newState)
-- execute the cache insertions
mapM_ (\f -> f joinedState) cacheInsertQ
pure $ if responses == Set.empty
then Left $ "join error: got no response from " <> show (getNid toJoinOn)
else if null (predecessors joinedState) && null (successors joinedState)
then Left "join error: no predecessors or successors"
-- successful join
else Right ownStateSTM
if responses == Set.empty
then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn)
else do
-- wait for migration data to be completely received
waitForMigrationFrom (nodeService prn) (getNid toJoinOn)
pure $ Right ownStateSTM
)
`catch` (\e -> pure . Left $ displayException (e :: IOException))
-- | Send a 'QueryID' 'Request' for getting the node that handles a certain key ID.
requestQueryID :: LocalNodeState -- ^ NodeState of the querying node
requestQueryID :: (MonadIO m, MonadError String m)
=> LocalNodeState s -- ^ NodeState of the querying node
-> NodeID -- ^ target key ID to look up
-> IO RemoteNodeState -- ^ the node responsible for handling that key
-> m RemoteNodeState -- ^ the node responsible for handling that key
-- 1. do a local lookup for the l closest nodes
-- 2. create l sockets
-- 3. send a message async concurrently to all l nodes
@ -507,23 +546,23 @@ requestQueryID :: LocalNodeState -- ^ NodeState of the querying node
-- 5. repeat until FOUND (problem: new entries not necessarily already in cache, explicitly compare with closer results)
-- TODO: deal with lookup failures
requestQueryID ns targetID = do
firstCacheSnapshot <- readTVarIO . nodeCacheSTM $ ns
firstCacheSnapshot <- liftIO . readTVarIO . nodeCacheSTM $ ns
-- TODO: make maxAttempts configurable
queryIdLookupLoop firstCacheSnapshot ns 50 targetID
-- | like 'requestQueryID, but allows passing of a custom cache, e.g. for joining
queryIdLookupLoop :: NodeCache -> LocalNodeState -> Int -> NodeID -> IO RemoteNodeState
queryIdLookupLoop :: (MonadIO m, MonadError String m) => NodeCache -> LocalNodeState s -> Int -> NodeID -> m RemoteNodeState
-- return node itself as default fallback value against infinite recursion.
-- TODO: consider using an Either instead of a default value
queryIdLookupLoop _ ns 0 _ = pure $ toRemoteNodeState ns
queryIdLookupLoop _ ns 0 _ = throwError "exhausted maximum lookup attempts"
queryIdLookupLoop cacheSnapshot ns maxAttempts targetID = do
let localResult = queryLocalCache ns cacheSnapshot (lNumBestNodes ns) targetID
-- FOUND can only be returned if targetID is owned by local node
case localResult of
FOUND thisNode -> pure thisNode
FORWARD nodeSet -> do
responseEntries <- sendQueryIdMessages targetID ns Nothing (remoteNode <$> Set.elems nodeSet)
now <- getPOSIXTime
responseEntries <- liftIO $ sendQueryIdMessages targetID ns Nothing (remoteNode <$> Set.elems nodeSet)
now <- liftIO getPOSIXTime
-- check for a FOUND and return it
case responseEntries of
FOUND foundNode -> pure foundNode
@ -538,7 +577,7 @@ queryIdLookupLoop cacheSnapshot ns maxAttempts targetID = do
sendQueryIdMessages :: (Integral i)
=> NodeID -- ^ target key ID to look up
-> LocalNodeState -- ^ node state of the node doing the query
-> LocalNodeState s -- ^ node state of the node doing the query
-> Maybe i -- ^ optionally provide an explicit @l@ parameter of number of nodes to be returned
-> [RemoteNodeState] -- ^ nodes to query
-> IO QueryResponse -- ^ accumulated response
@ -546,10 +585,10 @@ sendQueryIdMessages targetID ns lParam targets = do
-- create connected sockets to all query targets and use them for request handling
srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns)
-- ToDo: make attempts and timeout configurable
nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns)
let srcAddr = confIP nodeConf
queryThreads <- mapM (\resultNode -> async $ bracket (mkSendSocket srcAddr (getDomain resultNode) (getDhtPort resultNode)) close (
sendRequestTo 5000 3 (lookupMessage targetID ns Nothing)
sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (lookupMessage targetID ns Nothing)
)) targets
-- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613
-- ToDo: exception handling, maybe log them
@ -557,8 +596,10 @@ sendQueryIdMessages targetID ns lParam targets = do
-- insert new cache entries both into global cache as well as return accumulated QueryResponses for further processing
now <- getPOSIXTime
-- collect cache entries from all responses
foldM (\acc resp -> do
let entrySet = case queryResult <$> payload resp of
foldrM (\resp acc -> do
let
responseResult = queryResult <$> payload resp
entrySet = case responseResult of
Just (FOUND result1) -> Set.singleton (RemoteCacheEntry result1 now)
Just (FORWARD resultset) -> resultset
_ -> Set.empty
@ -568,15 +609,20 @@ sendQueryIdMessages targetID ns lParam targets = do
-- return accumulated QueryResult
pure $ case acc of
-- once a FOUND as been encountered, return this as a result
isFound@FOUND{} -> isFound
FORWARD accSet -> FORWARD $ entrySet `Set.union` accSet
FOUND{} -> acc
FORWARD accSet
| maybe False isFound responseResult -> fromJust responseResult
| otherwise -> FORWARD $ entrySet `Set.union` accSet
) (FORWARD Set.empty) responses
where
isFound FOUND{} = True
isFound _ = False
-- | Create a QueryID message to be supplied to 'sendRequestTo'
lookupMessage :: Integral i
=> NodeID -- ^ target ID
-> LocalNodeState -- ^ sender node state
-> LocalNodeState s -- ^ sender node state
-> Maybe i -- ^ optionally provide a different l parameter
-> (Integer -> FediChordMessage)
lookupMessage targetID ns lParam = \rID -> Request rID (toRemoteNodeState ns) 1 True QueryID (Just $ pl ns targetID)
@ -586,12 +632,13 @@ lookupMessage targetID ns lParam = \rID -> Request rID (toRemoteNodeState ns) 1
-- | Send a stabilise request to provided 'RemoteNode' and, if successful,
-- return parsed neighbour lists
requestStabilise :: LocalNodeState -- ^ sending node
requestStabilise :: LocalNodeState s -- ^ sending node
-> RemoteNodeState -- ^ neighbour node to send to
-> IO (Either String ([RemoteNodeState], [RemoteNodeState])) -- ^ (predecessors, successors) of responding node
requestStabilise ns neighbour = do
srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns)
responses <- bracket (mkSendSocket srcAddr (getDomain neighbour) (getDhtPort neighbour)) close (fmap Right . sendRequestTo 5000 3 (\rid ->
nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns)
let srcAddr = confIP nodeConf
responses <- bracket (mkSendSocket srcAddr (getDomain neighbour) (getDhtPort neighbour)) close (fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid ->
Request {
requestID = rid
, sender = toRemoteNodeState ns
@ -613,22 +660,55 @@ requestStabilise ns neighbour = do
)
([],[]) respSet
-- update successfully responded neighbour in cache
now <- getPOSIXTime
maybe (pure ()) (\p -> queueAddEntries (Identity $ RemoteCacheEntry (sender p) now) ns) $ headMay (Set.elems respSet)
maybe (pure ()) (\p -> queueUpdateVerifieds (Identity $ senderID p) ns) $ headMay (Set.elems respSet)
pure $ if null responsePreds && null responseSuccs
then Left "no neighbours returned"
else Right (responsePreds, responseSuccs)
) responses
requestPing :: LocalNodeState -- ^ sending node
-- | Send a Leave request to the specified node.
-- Service data transfer needs to be done separately, as not all neighbours
-- that need to know about the leaving handle the new service data.
requestLeave :: LocalNodeState s
-> Bool -- whether to migrate service data
-> RemoteNodeState -- target node
-> IO (Either String ()) -- error or success
requestLeave ns doMigration target = do
nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns)
let srcAddr = confIP nodeConf
leavePayload = LeaveRequestPayload {
leaveSuccessors = successors ns
, leavePredecessors = predecessors ns
, leaveDoMigration = doMigration
}
responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close (fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid ->
Request {
requestID = rid
, sender = toRemoteNodeState ns
, part = 1
, isFinalPart = False
, action = Leave
, payload = Just leavePayload
}
)
) `catch` (\e -> pure . Left $ displayException (e :: IOException))
either
-- forward IO error messages
(pure . Left)
-- empty payload, so no processing required
(const . pure . Right $ ())
responses
requestPing :: LocalNodeState s -- ^ sending node
-> RemoteNodeState -- ^ node to be PINGed
-> IO (Either String [RemoteNodeState]) -- ^ all active vServers of the pinged node
requestPing ns target = do
srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns)
nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns)
let srcAddr = confIP nodeConf
responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close
(\sock -> do
resp <- sendRequestTo 5000 3 (\rid ->
resp <- sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid ->
Request {
requestID = rid
, sender = toRemoteNodeState ns
@ -664,10 +744,9 @@ requestPing ns target = do
) responses
-- | Generic function for sending a request over a connected socket and collecting the response.
-- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout.
sendRequestTo :: Int -- ^ timeout in seconds
sendRequestTo :: Int -- ^ timeout in milliseconds
-> Int -- ^ number of retries
-> (Integer -> FediChordMessage) -- ^ the message to be sent, still needing a requestID
-> Socket -- ^ connected socket to use for sending
@ -678,11 +757,10 @@ sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do
let
msgComplete = msgIncomplete randomID
requests = serialiseMessage sendMessageSize msgComplete
putStrLn $ "sending request message " <> show msgComplete
-- create a queue for passing received response messages back, even after a timeout
responseQ <- newTBQueueIO $ 2*maximumParts -- keep room for duplicate packets
-- start sendAndAck with timeout
attempts numAttempts . timeout timeoutMillis $ sendAndAck responseQ sock requests
_ <- attempts numAttempts . timeout (timeoutMillis*1000) $ sendAndAck responseQ sock requests
-- after timeout, check received responses, delete them from unacked message set/ map and rerun senAndAck with that if necessary.
recvdParts <- atomically $ flushTBQueue responseQ
pure $ Set.fromList recvdParts
@ -691,19 +769,20 @@ sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do
-> Socket -- ^ the socket used for sending and receiving for this particular remote node
-> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts
-> IO ()
sendAndAck responseQueue sock remainingSends = do
sendMany sock $ Map.elems remainingSends
sendAndAck responseQueue sock' remainingSends = do
sendMany sock' $ Map.elems remainingSends
-- if all requests have been acked/ responded to, return prematurely
recvLoop responseQueue remainingSends Set.empty Nothing
recvLoop :: TBQueue FediChordMessage -- ^ the queue for putting in the received responses
recvLoop sock' responseQueue remainingSends Set.empty Nothing
recvLoop :: Socket
-> TBQueue FediChordMessage -- ^ the queue for putting in the received responses
-> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts
-> Set.Set Integer -- ^ already received response part numbers
-> Maybe Integer -- ^ total number of response parts if already known
-> IO ()
recvLoop responseQueue remainingSends' receivedPartNums totalParts = do
recvLoop sock' responseQueue remainingSends' receivedPartNums totalParts = do
-- 65535 is maximum length of UDP packets, as long as
-- no IPv6 jumbograms are used
response <- deserialiseMessage <$> recv sock 65535
response <- deserialiseMessage <$> recv sock' 65535
case response of
Right msg@Response{} -> do
atomically $ writeTBQueue responseQueue msg
@ -711,16 +790,17 @@ sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do
newTotalParts = if isFinalPart msg then Just (part msg) else totalParts
newRemaining = Map.delete (part msg) remainingSends'
newReceivedParts = Set.insert (part msg) receivedPartNums
if Map.null newRemaining && maybe False (\p -> Set.size receivedPartNums == fromIntegral p) newTotalParts
if Map.null newRemaining && maybe False (\p -> Set.size newReceivedParts == fromIntegral p) newTotalParts
then pure ()
else recvLoop responseQueue newRemaining receivedPartNums newTotalParts
else recvLoop sock' responseQueue newRemaining newReceivedParts newTotalParts
-- drop errors and invalid messages
Left _ -> recvLoop responseQueue remainingSends' receivedPartNums totalParts
Right Request{} -> pure () -- expecting a response, not a request
Left _ -> recvLoop sock' responseQueue remainingSends' receivedPartNums totalParts
-- | enqueue a list of RemoteCacheEntries to be added to the global NodeCache
queueAddEntries :: Foldable c => c RemoteCacheEntry
-> LocalNodeState
-> LocalNodeState s
-> IO ()
queueAddEntries entries ns = do
now <- getPOSIXTime
@ -730,17 +810,29 @@ queueAddEntries entries ns = do
-- | enque a list of node IDs to be deleted from the global NodeCache
queueDeleteEntries :: Foldable c
=> c NodeID
-> LocalNodeState
-> LocalNodeState s
-> IO ()
queueDeleteEntries ids ns = forM_ ids $ atomically . writeTQueue (cacheWriteQueue ns) . deleteCacheEntry
-- | enque a single node ID to be deleted from the global NodeCache
queueDeleteEntry :: NodeID
-> LocalNodeState
-> LocalNodeState s
-> IO ()
queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete
-- | enqueue the timestamp update and verification marking of an entry in the
-- global 'NodeCache'.
queueUpdateVerifieds :: Foldable c
=> c NodeID
-> LocalNodeState s
-> IO ()
queueUpdateVerifieds nIds ns = do
now <- getPOSIXTime
forM_ nIds $ \nid' -> atomically $ writeTQueue (cacheWriteQueue ns) $
markCacheEntryAsVerified (Just now) nid'
-- | retry an IO action at most *i* times until it delivers a result
attempts :: Int -- ^ number of retries *i*
-> IO (Maybe a) -- ^ action to retry
@ -773,7 +865,7 @@ mkServerSocket ip port = do
sockAddr <- addrAddress <$> resolve (Just $ show . fromHostAddress6 $ ip) (Just port)
sock <- socket AF_INET6 Datagram defaultProtocol
setSocketOption sock IPv6Only 1
bind sock sockAddr
bind sock sockAddr `catch` (\e -> putStrLn $ "Caught exception while bind " <> show sock <> " " <> show sockAddr <> ": " <> show (e :: SomeException))
pure sock
-- | create a UDP datagram socket, connected to a destination.
@ -789,6 +881,6 @@ mkSendSocket srcIp dest destPort = do
setSocketOption sendSock IPv6Only 1
-- bind to the configured local IP to make sure that outgoing packets are sent from
-- this source address
bind sendSock srcAddr
bind sendSock srcAddr `catch` (\e -> putStrLn $ "Caught exception while mkSendSocket bind " <> show sendSock <> " " <> show srcAddr <> ": " <> show (e :: SomeException))
connect sendSock destAddr
pure sendSock

View file

@ -3,7 +3,6 @@
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TypeSynonymInstances #-}
{- |
Module : FediChord
Description : An opinionated implementation of the EpiChord DHT by Leong et al.
@ -40,7 +39,7 @@ module Hash2Pub.FediChord (
, bsAsIpAddr
, FediChordConf(..)
, fediChordInit
, fediChordJoin
, fediChordVserverJoin
, fediChordBootstrapJoin
, tryBootstrapJoining
, fediMainThreads
@ -78,7 +77,6 @@ import Data.Maybe (catMaybes, fromJust, fromMaybe,
isJust, isNothing, mapMaybe)
import qualified Data.Set as Set
import Data.Time.Clock.POSIX
import Data.Typeable (Typeable (..), typeOf)
import Data.Word
import qualified Network.ByteOrder as NetworkBytes
import Network.Socket hiding (recv, recvFrom, send,
@ -95,24 +93,34 @@ import Debug.Trace (trace)
-- | initialise data structures, compute own IDs and bind to listening socket
-- ToDo: load persisted state, thus this function already operates in IO
fediChordInit :: FediChordConf -> IO (Socket, LocalNodeStateSTM)
fediChordInit initConf = do
fediChordInit :: (Service s (RealNodeSTM s))
=> FediChordConf
-> (RealNodeSTM s -> IO (s (RealNodeSTM s))) -- ^ runner function for service
-> IO (Socket, LocalNodeStateSTM s)
fediChordInit initConf serviceRunner = do
emptyLookupCache <- newTVarIO Map.empty
let realNode = RealNode {
vservers = []
, nodeConfig = initConf
, bootstrapNodes = confBootstrapNodes initConf
, lookupCacheSTM = emptyLookupCache
, nodeService = undefined
}
realNodeSTM <- newTVarIO realNode
-- launch service and set the reference in the RealNode
serv <- serviceRunner realNodeSTM
atomically . modifyTVar' realNodeSTM $ \rn -> rn { nodeService = serv }
-- initialise a single vserver
initialState <- nodeStateInit realNodeSTM
initialStateSTM <- newTVarIO initialState
-- add vserver to list at RealNode
atomically . modifyTVar' realNodeSTM $ \rn -> rn { vservers = initialStateSTM:vservers rn }
serverSock <- mkServerSocket (getIpAddr initialState) (getDhtPort initialState)
pure (serverSock, initialStateSTM)
-- | initialises the 'NodeState' for this local node.
-- Separated from 'fediChordInit' to be usable in tests.
nodeStateInit :: RealNodeSTM -> IO LocalNodeState
nodeStateInit :: Service s (RealNodeSTM s) => RealNodeSTM s -> IO (LocalNodeState s)
nodeStateInit realNodeSTM = do
realNode <- readTVarIO realNodeSTM
cacheSTM <- newTVarIO initCache
@ -125,7 +133,7 @@ nodeStateInit realNodeSTM = do
, ipAddr = confIP conf
, nid = genNodeID (confIP conf) (confDomain conf) $ fromInteger vsID
, dhtPort = toEnum $ confDhtPort conf
, servicePort = 0
, servicePort = getListeningPortFromService $ nodeService realNode
, vServerID = vsID
}
initialState = LocalNodeState {
@ -144,9 +152,10 @@ nodeStateInit realNodeSTM = do
-- | Join a new node into the DHT, using a provided bootstrap node as initial cache seed
-- for resolving the new node's position.
fediChordBootstrapJoin :: LocalNodeStateSTM -- ^ the local 'NodeState'
fediChordBootstrapJoin :: Service s (RealNodeSTM s)
=> LocalNodeStateSTM s -- ^ the local 'NodeState'
-> (String, PortNumber) -- ^ domain and port of a bootstrapping node
-> IO (Either String LocalNodeStateSTM) -- ^ the joined 'NodeState' after a
-> IO (Either String (LocalNodeStateSTM s)) -- ^ the joined 'NodeState' after a
-- successful join, otherwise an error message
fediChordBootstrapJoin nsSTM bootstrapNode = do
-- can be invoked multiple times with all known bootstrapping nodes until successfully joined
@ -157,12 +166,13 @@ fediChordBootstrapJoin nsSTM bootstrapNode = do
currentlyResponsible <- liftEither lookupResp
liftIO . putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible)
-- 2. then send a join to the currently responsible node
liftIO $ putStrLn "send a bootstrap Join"
joinResult <- liftIO $ requestJoin currentlyResponsible nsSTM
liftEither joinResult
-- Periodically lookup own ID through a random bootstrapping node to discover and merge separated DHT clusters.
-- Unjoined try joining instead.
convergenceSampleThread :: LocalNodeStateSTM -> IO ()
convergenceSampleThread :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> IO ()
convergenceSampleThread nsSTM = forever $ do
nsSnap <- readTVarIO nsSTM
parentNode <- readTVarIO $ parentRealNode nsSnap
@ -189,11 +199,11 @@ convergenceSampleThread nsSTM = forever $ do
-- unjoined node: try joining through all bootstrapping nodes
else tryBootstrapJoining nsSTM >> pure ()
let delaySecs = confBootstrapSamplingInterval . nodeConfig $ parentNode
threadDelay $ delaySecs * 10^6
threadDelay delaySecs
-- | Try joining the DHT through any of the bootstrapping nodes until it succeeds.
tryBootstrapJoining :: LocalNodeStateSTM -> IO (Either String LocalNodeStateSTM)
tryBootstrapJoining :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> IO (Either String (LocalNodeStateSTM s))
tryBootstrapJoining nsSTM = do
bss <- atomically $ do
nsSnap <- readTVar nsSTM
@ -210,13 +220,14 @@ tryBootstrapJoining nsSTM = do
-- | Look up a key just based on the responses of a single bootstrapping node.
bootstrapQueryId :: LocalNodeStateSTM -> (String, PortNumber) -> NodeID -> IO (Either String RemoteNodeState)
bootstrapQueryId :: LocalNodeStateSTM s -> (String, PortNumber) -> NodeID -> IO (Either String RemoteNodeState)
bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do
ns <- readTVarIO nsSTM
srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns)
nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns)
let srcAddr = confIP nodeConf
bootstrapResponse <- bracket (mkSendSocket srcAddr bootstrapHost bootstrapPort) close (
-- Initialise an empty cache only with the responses from a bootstrapping node
fmap Right . sendRequestTo 5000 3 (lookupMessage targetID ns Nothing)
fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (lookupMessage targetID ns Nothing)
)
`catch` (\e -> pure . Left $ "Error at bootstrap QueryId: " <> displayException (e :: IOException))
@ -235,60 +246,95 @@ bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do
Just (FORWARD resultset) -> foldr' (addCacheEntryPure now) cacheAcc resultset
)
initCache resp
currentlyResponsible <- queryIdLookupLoop bootstrapCache ns 50 $ getNid ns
pure $ Right currentlyResponsible
currentlyResponsible <- runExceptT $ queryIdLookupLoop bootstrapCache ns 50 $ getNid ns
pure currentlyResponsible
-- | join a node to the DHT using the global node cache
-- node's position.
fediChordJoin :: LocalNodeStateSTM -- ^ the local 'NodeState'
-> IO (Either String LocalNodeStateSTM) -- ^ the joined 'NodeState' after a
fediChordVserverJoin :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s))
=> LocalNodeStateSTM s -- ^ the local 'NodeState'
-> m (LocalNodeStateSTM s) -- ^ the joined 'NodeState' after a
-- successful join, otherwise an error message
fediChordJoin nsSTM = do
ns <- readTVarIO nsSTM
fediChordVserverJoin nsSTM = do
ns <- liftIO $ readTVarIO nsSTM
-- 1. get routed to the currently responsible node
currentlyResponsible <- requestQueryID ns $ getNid ns
putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible)
liftIO . putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible)
-- 2. then send a join to the currently responsible node
joinResult <- requestJoin currentlyResponsible nsSTM
case joinResult of
Left err -> pure . Left $ "Error joining on " <> err
Right joinedNS -> pure . Right $ joinedNS
joinResult <- liftIO $ requestJoin currentlyResponsible nsSTM
liftEither joinResult
fediChordVserverLeave :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s)) => LocalNodeState s -> m ()
fediChordVserverLeave ns = do
-- TODO: deal with failure of all successors, e.g. by invoking a stabilise
-- and looking up further successors. So far we just fail here.
_ <- migrateSuccessor
-- then send leave messages to all other neighbours
-- TODO: distinguish between sending error causes on our side and on the
-- network/ target side. The latter cannot be fixed anyways while the
-- former could be worked around
-- send a leave message to all neighbours
forM_ (predecessors ns <> successors ns) $ liftIO . requestLeave ns False
where
sendUntilSuccess i = maybe
(pure $ Left "Exhausted all successors")
(\neighb -> do
leaveResponse <- requestLeave ns True neighb
case leaveResponse of
Left _ -> sendUntilSuccess (i+1)
-- return first successfully contacted neighbour,
-- so it can be contacted by the service layer for migration
Right _ -> pure $ Right neighb
)
$ atMay (successors ns) i
migrateSuccessor :: (MonadError String m, MonadIO m) => m ()
migrateSuccessor = do
-- send leave message to first responding successor
successorLeave <- liftIO $ sendUntilSuccess 0
-- trigger service data transfer for abandoned key space
migrateToNode <- liftEither successorLeave
let lowerKeyBound = maybe (getNid ns) getNid $ headMay (predecessors ns)
ownService <- nodeService <$> (liftIO . readTVarIO $ parentRealNode ns)
-- previously held data is the one between the immediate predecessor and
-- the own ID
migrationResult <- liftIO $ migrateData ownService (getNid ns) lowerKeyBound (getNid ns) (getDomain migrateToNode, fromIntegral $ getServicePort migrateToNode)
liftEither migrationResult
-- | Wait for new cache entries to appear and then try joining on them.
-- Exits after successful joining.
joinOnNewEntriesThread :: LocalNodeStateSTM -> IO ()
joinOnNewEntriesThread :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> IO ()
joinOnNewEntriesThread nsSTM = loop
where
loop = do
nsSnap <- readTVarIO nsSTM
(lookupResult, cache) <- atomically $ do
(lookupResult, parentNode) <- atomically $ do
cache <- readTVar $ nodeCacheSTM nsSnap
parentNode <- readTVar $ parentRealNode nsSnap
case queryLocalCache nsSnap cache 1 (getNid nsSnap) of
-- empty cache, block until cache changes and then retry
(FORWARD s) | Set.null s -> retry
result -> pure (result, cache)
result -> pure (result, parentNode)
case lookupResult of
-- already joined
FOUND _ -> do
print =<< readTVarIO nsSTM
FOUND _ ->
pure ()
-- otherwise try joining
FORWARD _ -> do
joinResult <- fediChordJoin nsSTM
joinResult <- runExceptT $ fediChordVserverJoin nsSTM
either
-- on join failure, sleep and retry
-- TODO: make delay configurable
(const $ threadDelay (30 * 10^6) >> loop)
(const $ threadDelay (confJoinAttemptsInterval . nodeConfig $ parentNode) >> loop)
(const $ pure ())
joinResult
emptyset = Set.empty -- because pattern matches don't accept qualified names
-- | cache updater thread that waits for incoming NodeCache update instructions on
-- the node's cacheWriteQueue and then modifies the NodeCache as the single writer.
nodeCacheWriter :: LocalNodeStateSTM -> IO ()
nodeCacheWriter :: LocalNodeStateSTM s -> IO ()
nodeCacheWriter nsSTM =
forever $ atomically $ do
ns <- readTVar nsSTM
@ -296,20 +342,15 @@ nodeCacheWriter nsSTM =
modifyTVar' (nodeCacheSTM ns) cacheModifier
-- TODO: make max entry age configurable
maxEntryAge :: POSIXTime
maxEntryAge = 600
-- | Periodically iterate through cache, clean up expired entries and verify unverified ones
nodeCacheVerifyThread :: LocalNodeStateSTM -> IO ()
nodeCacheVerifyThread :: LocalNodeStateSTM s -> IO ()
nodeCacheVerifyThread nsSTM = forever $ do
putStrLn "cache verify run: begin"
-- get cache
(ns, cache) <- atomically $ do
(ns, cache, maxEntryAge) <- atomically $ do
ns <- readTVar nsSTM
cache <- readTVar $ nodeCacheSTM ns
pure (ns, cache)
maxEntryAge <- confMaxNodeCacheAge . nodeConfig <$> readTVar (parentRealNode ns)
pure (ns, cache, maxEntryAge)
-- iterate entries:
-- for avoiding too many time syscalls, get current time before iterating.
now <- getPOSIXTime
@ -356,14 +397,13 @@ nodeCacheVerifyThread nsSTM = forever $ do
forkIO $ sendQueryIdMessages targetID latestNs (Just (1 + jEntriesPerSlice latestNs)) (nodesToQuery targetID) >> pure () -- ask for 1 entry more than j because of querying the middle
)
putStrLn "cache verify run: end"
threadDelay $ 10^6 * round maxEntryAge `div` 20
threadDelay $ fromEnum (maxEntryAge / 20) `div` 10^6 -- convert from pico to milliseconds
-- | Checks the invariant of at least @jEntries@ per cache slice.
-- If this invariant does not hold, the middle of the slice is returned for
-- making lookups to that ID
checkCacheSliceInvariants :: LocalNodeState
checkCacheSliceInvariants :: LocalNodeState s
-> NodeCache
-> [NodeID] -- ^ list of middle IDs of slices not
-- ^ fulfilling the invariant
@ -419,12 +459,10 @@ checkCacheSliceInvariants ns
-- | Periodically send @StabiliseRequest' s to the closest neighbour nodes, until
-- one responds, and get their neighbours for maintaining the own neighbour lists.
-- If necessary, request new neighbours.
stabiliseThread :: LocalNodeStateSTM -> IO ()
stabiliseThread :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> IO ()
stabiliseThread nsSTM = forever $ do
ns <- readTVarIO nsSTM
oldNs <- readTVarIO nsSTM
putStrLn "stabilise run: begin"
print ns
-- iterate through the same snapshot, collect potential new neighbours
-- and nodes to be deleted, and modify these changes only at the end of
@ -433,8 +471,8 @@ stabiliseThread nsSTM = forever $ do
-- don't contact all neighbours unless the previous one failed/ Left ed
predStabilise <- stabiliseClosestResponder ns predecessors 1 []
succStabilise <- stabiliseClosestResponder ns predecessors 1 []
predStabilise <- stabiliseClosestResponder oldNs predecessors 1 []
succStabilise <- stabiliseClosestResponder oldNs predecessors 1 []
let
(predDeletes, predNeighbours) = either (const ([], [])) id predStabilise
@ -459,31 +497,60 @@ stabiliseThread nsSTM = forever $ do
-- try looking up additional neighbours if list too short
forM_ [(length $ predecessors updatedNs)..(kNeighbours updatedNs)] (\_ -> do
ns' <- readTVarIO nsSTM
nextEntry <- requestQueryID ns' $ pred . getNid $ lastDef (toRemoteNodeState ns') (predecessors ns')
atomically $ do
nextEntry <- runExceptT . requestQueryID ns' $ pred . getNid $ lastDef (toRemoteNodeState ns') (predecessors ns')
either
(const $ pure ())
(\entry -> atomically $ do
latestNs <- readTVar nsSTM
writeTVar nsSTM $ addPredecessors [nextEntry] latestNs
writeTVar nsSTM $ addPredecessors [entry] latestNs
)
nextEntry
)
forM_ [(length $ successors updatedNs)..(kNeighbours updatedNs)] (\_ -> do
ns' <- readTVarIO nsSTM
nextEntry <- requestQueryID ns' $ succ . getNid $ lastDef (toRemoteNodeState ns') (successors ns')
atomically $ do
nextEntry <- runExceptT . requestQueryID ns' $ succ . getNid $ lastDef (toRemoteNodeState ns') (successors ns')
either
(const $ pure ())
(\entry -> atomically $ do
latestNs <- readTVar nsSTM
writeTVar nsSTM $ addSuccessors [nextEntry] latestNs
writeTVar nsSTM $ addSuccessors [entry] latestNs
)
nextEntry
)
putStrLn "stabilise run: end"
-- TODO: make delay configurable
threadDelay (60 * 10^6)
newNs <- readTVarIO nsSTM
let
oldPredecessor = headDef (toRemoteNodeState oldNs) $ predecessors oldNs
newPredecessor = headMay $ predecessors newNs
-- manage need for service data migration:
maybe (pure ()) (\newPredecessor' ->
when (
isJust newPredecessor
&& oldPredecessor /= newPredecessor'
-- case: predecessor has changed in some way => own responsibility has changed in some way
-- case 1: new predecessor is further away => broader responsibility, but new pred needs to push the data
-- If this is due to a node leaving without transfering its data, try getting it from a redundant copy
-- case 2: new predecessor is closer, it takes some of our data but somehow didn't join on us => push data to it
&& isInOwnResponsibilitySlice newPredecessor' oldNs) $ do
ownService <- nodeService <$> (liftIO . readTVarIO $ parentRealNode newNs)
migrationResult <- migrateData ownService (getNid newNs) (getNid oldPredecessor) (getNid newPredecessor') (getDomain newPredecessor', fromIntegral $ getServicePort newPredecessor')
-- TODO: deal with migration failure, e.g retry
pure ()
)
newPredecessor
stabiliseDelay <- confStabiliseInterval . nodeConfig <$> readTVarIO (parentRealNode newNs)
threadDelay stabiliseDelay
where
-- | send a stabilise request to the n-th neighbour
-- (specified by the provided getter function) and on failure retr
-- (specified by the provided getter function) and on failure retry
-- with the n+1-th neighbour.
-- On success, return 2 lists: The failed nodes and the potential neighbours
-- returned by the queried node.
stabiliseClosestResponder :: LocalNodeState -- ^ own node
-> (LocalNodeState -> [RemoteNodeState]) -- ^ getter function for either predecessors or successors
stabiliseClosestResponder :: LocalNodeState s -- ^ own node
-> (LocalNodeState s -> [RemoteNodeState]) -- ^ getter function for either predecessors or successors
-> Int -- ^ index of neighbour to query
-> [RemoteNodeState] -- ^ delete accumulator
-> IO (Either String ([RemoteNodeState], [RemoteNodeState])) -- ^ (nodes to be deleted, successfully pinged potential neighbours)
@ -507,7 +574,7 @@ stabiliseThread nsSTM = forever $ do
currentNeighbour ns neighbourGetter = atMay $ neighbourGetter ns
checkReachability :: LocalNodeState -- ^ this node
checkReachability :: LocalNodeState s -- ^ this node
-> RemoteNodeState -- ^ node to Ping for reachability
-> IO (Maybe RemoteNodeState) -- ^ if the Pinged node handles the requested node state then that one
checkReachability ns toCheck = do
@ -536,10 +603,10 @@ sendThread sock sendQ = forever $ do
sendAllTo sock packet addr
-- | Sets up and manages the main server threads of FediChord
fediMainThreads :: Socket -> LocalNodeStateSTM -> IO ()
fediMainThreads :: Service s (RealNodeSTM s) => Socket -> LocalNodeStateSTM s -> IO ()
fediMainThreads sock nsSTM = do
ns <- readTVarIO nsSTM
putStrLn $ "launching threads, ns: " <> show ns
putStrLn "launching threads"
sendQ <- newTQueueIO
recvQ <- newTQueueIO
-- concurrently launch all handler threads, if one of them throws an exception
@ -562,38 +629,36 @@ type RequestMap = Map.Map (SockAddr, Integer) RequestMapEntry
data RequestMapEntry = RequestMapEntry (Set.Set FediChordMessage) (Maybe Integer)
POSIXTime
-- TODO: make purge age configurable
-- | periodically clean up old request parts
responsePurgeAge :: POSIXTime
responsePurgeAge = 60 -- seconds
requestMapPurge :: MVar RequestMap -> IO ()
requestMapPurge mapVar = forever $ do
requestMapPurge :: POSIXTime -> MVar RequestMap -> IO ()
requestMapPurge purgeAge mapVar = forever $ do
rMapState <- takeMVar mapVar
now <- getPOSIXTime
putMVar mapVar $ Map.filter (\entry@(RequestMapEntry _ _ ts) ->
now - ts < responsePurgeAge
putMVar mapVar $ Map.filter (\(RequestMapEntry _ _ ts) ->
now - ts < purgeAge
) rMapState
threadDelay $ round responsePurgeAge * 2 * 10^6
threadDelay $ (fromEnum purgeAge * 2) `div` 10^6
-- | Wait for messages, deserialise them, manage parts and acknowledgement status,
-- and pass them to their specific handling function.
fediMessageHandler :: TQueue (BS.ByteString, SockAddr) -- ^ send queue
fediMessageHandler :: Service s (RealNodeSTM s)
=> TQueue (BS.ByteString, SockAddr) -- ^ send queue
-> TQueue (BS.ByteString, SockAddr) -- ^ receive queue
-> LocalNodeStateSTM -- ^ acting NodeState
-> LocalNodeStateSTM s -- ^ acting NodeState
-> IO ()
fediMessageHandler sendQ recvQ nsSTM = do
-- Read node state just once, assuming that all relevant data for this function does
-- not change.
-- Other functions are passed the nsSTM reference and thus can get the latest state.
nsSnap <- readTVarIO nsSTM
nodeConf <- nodeConfig <$> readTVarIO (parentRealNode nsSnap)
-- handling multipart messages:
-- Request parts can be insert into a map (key: (sender IP against spoofing, request ID), value: timestamp + set of message parts, handle all of them when size of set == parts) before being handled. This map needs to be purged periodically by a separate thread and can be protected by an MVar for fairness.
requestMap <- newMVar (Map.empty :: RequestMap)
-- run receive loop and requestMapPurge concurrently, so that an exception makes
-- both of them fail
concurrently_ (requestMapPurge requestMap) $ forever $ do
concurrently_ (requestMapPurge (confResponsePurgeAge nodeConf) requestMap) $ forever $ do
-- wait for incoming messages
(rawMsg, sourceAddr) <- atomically $ readTQueue recvQ
let aMsg = deserialiseMessage rawMsg
@ -646,14 +711,33 @@ fediMessageHandler sendQ recvQ nsSTM = do
-- ==== interface to service layer ====
instance DHT RealNodeSTM where
instance DHT (RealNodeSTM s) where
lookupKey nodeSTM keystring = getKeyResponsibility nodeSTM $ genKeyID keystring
forceLookupKey nodeSTM keystring = updateLookupCache nodeSTM $ genKeyID keystring
forceLookupKey nodeSTM keystring = (putStrLn $ "forced responsibility lookup of #" <> keystring) >> (updateLookupCache nodeSTM $ genKeyID keystring)
-- potential better implementation: put all neighbours of all vservers and the vservers on a ringMap, look the key up and see whether it results in a LocalNodeState
isResponsibleFor nodeSTM key = do
node <- readTVarIO nodeSTM
foldM (\responsible vsSTM -> do
vs <- readTVarIO vsSTM
pure $ responsible || isInOwnResponsibilitySlice key vs
)
False
$ vservers node
isResponsibleForSTM nodeSTM key = do
node <- readTVar nodeSTM
foldM (\responsible vsSTM -> do
vs <- readTVar vsSTM
pure $ responsible || isInOwnResponsibilitySlice key vs
)
False
$ vservers node
-- | Returns the hostname and port of the host responsible for a key.
-- Information is provided from a cache, only on a cache miss a new DHT lookup
-- is triggered.
getKeyResponsibility :: RealNodeSTM -> NodeID -> IO (Maybe (String, PortNumber))
getKeyResponsibility :: RealNodeSTM s -> NodeID -> IO (Maybe (String, PortNumber))
getKeyResponsibility nodeSTM lookupKey = do
node <- readTVarIO nodeSTM
cache <- readTVarIO $ lookupCacheSTM node
@ -669,8 +753,8 @@ getKeyResponsibility nodeSTM lookupKey = do
-- | Triggers a new DHT lookup for a key, updates the lookup cache and returns the
-- new entry.
-- If no vserver is active in the DHT, 'Nothing' is returned.
updateLookupCache :: RealNodeSTM -> NodeID -> IO (Maybe (String, PortNumber))
updateLookupCache nodeSTM lookupKey = do
updateLookupCache :: RealNodeSTM s -> NodeID -> IO (Maybe (String, PortNumber))
updateLookupCache nodeSTM keyToLookup = do
(node, lookupSource) <- atomically $ do
node <- readTVar nodeSTM
let firstVs = headMay (vservers node)
@ -680,23 +764,30 @@ updateLookupCache nodeSTM lookupKey = do
pure (node, lookupSource)
maybe (do
-- if no local node available, delete cache entry and return Nothing
atomically $ modifyTVar' (lookupCacheSTM node) $ Map.delete lookupKey
atomically $ modifyTVar' (lookupCacheSTM node) $ Map.delete keyToLookup
pure Nothing
)
(\n -> do
-- start a lookup from the node, update the cache with the lookup result and return it
newResponsible <- requestQueryID n lookupKey
let newEntry = (getDomain newResponsible, getServicePort newResponsible)
-- TODO: better retry management, because having no vserver joined yet should
-- be treated differently than other reasons for not getting a result.
newResponsible <- runExceptT $ requestQueryID n keyToLookup
either
(const $ pure Nothing)
(\result -> do
let newEntry = (getDomain result, getServicePort result)
now <- getPOSIXTime
-- atomic update against lost updates
atomically $ modifyTVar' (lookupCacheSTM node) $
Map.insert lookupKey (CacheEntry False newEntry now)
Map.insert keyToLookup (CacheEntry False newEntry now)
pure $ Just newEntry
)
newResponsible
) lookupSource
-- | Periodically clean the lookup cache from expired entries.
lookupCacheCleanup :: RealNodeSTM -> IO ()
lookupCacheCleanup :: RealNodeSTM s -> IO ()
lookupCacheCleanup nodeSTM = do
node <- readTVarIO nodeSTM
forever $ do
@ -706,4 +797,4 @@ lookupCacheCleanup nodeSTM = do
now - ts < confMaxLookupCacheAge (nodeConfig node)
)
)
threadDelay $ round (confMaxLookupCacheAge $ nodeConfig node) * (10^5)
threadDelay $ fromEnum (2 * confMaxLookupCacheAge (nodeConfig node)) `div` 10^6

View file

@ -1,5 +1,6 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE MultiParamTypeClasses #-}
@ -26,8 +27,7 @@ module Hash2Pub.FediChordTypes (
, CacheEntry(..)
, RingEntry(..)
, RingMap(..)
, HasKeyID
, getKeyID
, HasKeyID(..)
, rMapSize
, rMapLookup
, rMapLookupPred
@ -58,11 +58,14 @@ module Hash2Pub.FediChordTypes (
, bsAsIpAddr
, FediChordConf(..)
, DHT(..)
, Service(..)
, ServiceConf(..)
) where
import Control.Exception
import Data.Foldable (foldr')
import Data.Function (on)
import qualified Data.Hashable as Hashable
import Data.List (delete, nub, sortBy)
import qualified Data.Map.Strict as Map
import Data.Maybe (fromJust, fromMaybe, isJust,
@ -144,8 +147,8 @@ a `localCompare` b
-- | Data for managing the virtual server nodes of this real node.
-- Also contains shared data and config values.
-- TODO: more data structures for k-choices bookkeeping
data RealNode = RealNode
{ vservers :: [LocalNodeStateSTM]
data RealNode s = RealNode
{ vservers :: [LocalNodeStateSTM s]
-- ^ references to all active versers
, nodeConfig :: FediChordConf
-- ^ holds the initial configuration read at program start
@ -153,9 +156,10 @@ data RealNode = RealNode
-- ^ nodes to be used as bootstrapping points, new ones learned during operation
, lookupCacheSTM :: TVar LookupCache
-- ^ a global cache of looked up keys and their associated nodes
, nodeService :: s (RealNodeSTM s)
}
type RealNodeSTM = TVar RealNode
type RealNodeSTM s = TVar (RealNode s)
-- | represents a node and all its important state
data RemoteNodeState = RemoteNodeState
@ -177,7 +181,7 @@ instance Ord RemoteNodeState where
a `compare` b = nid a `compare` nid b
-- | represents a node and encapsulates all data and parameters that are not present for remote nodes
data LocalNodeState = LocalNodeState
data LocalNodeState s = LocalNodeState
{ nodeState :: RemoteNodeState
-- ^ represents common data present both in remote and local node representations
, nodeCacheSTM :: TVar NodeCache
@ -196,13 +200,13 @@ data LocalNodeState = LocalNodeState
-- ^ number of parallel sent queries
, jEntriesPerSlice :: Int
-- ^ number of desired entries per cache slice
, parentRealNode :: RealNodeSTM
, parentRealNode :: RealNodeSTM s
-- ^ the parent node managing this vserver instance
}
deriving (Show, Eq)
-- | for concurrent access, LocalNodeState is wrapped in a TVar
type LocalNodeStateSTM = TVar LocalNodeState
type LocalNodeStateSTM s = TVar (LocalNodeState s)
-- | class for various NodeState representations, providing
-- getters and setters for common values
@ -239,14 +243,14 @@ instance NodeState RemoteNodeState where
toRemoteNodeState = id
-- | helper function for setting values on the 'RemoteNodeState' contained in the 'LocalNodeState'
propagateNodeStateSet_ :: (RemoteNodeState -> RemoteNodeState) -> LocalNodeState -> LocalNodeState
propagateNodeStateSet_ :: (RemoteNodeState -> RemoteNodeState) -> LocalNodeState s -> LocalNodeState s
propagateNodeStateSet_ func ns = let
newNs = func $ nodeState ns
in
ns {nodeState = newNs}
instance NodeState LocalNodeState where
instance NodeState (LocalNodeState s) where
getNid = getNid . nodeState
getDomain = getDomain . nodeState
getIpAddr = getIpAddr . nodeState
@ -268,34 +272,37 @@ instance Typeable a => Show (TVar a) where
instance Typeable a => Show (TQueue a) where
show x = show (typeOf x)
instance Typeable a => Show (TChan a) where
show x = show (typeOf x)
-- | convenience function that replaces the predecessors of a 'LocalNodeState' with the k closest nodes from the provided list
setPredecessors :: [RemoteNodeState] -> LocalNodeState -> LocalNodeState
setPredecessors preds ns = ns {predecessors = takeRMapPredecessors (getNid ns) (kNeighbours ns) . rMapFromList . filter ((/=) (getNid ns) . getNid) $ preds}
setPredecessors :: [RemoteNodeState] -> LocalNodeState s -> LocalNodeState s
setPredecessors preds ns = ns {predecessors = takeRMapPredecessors (getNid ns) (kNeighbours ns) . rMapFromList . fmap keyValuePair . filter ((/=) (getNid ns) . getNid) $ preds}
-- | convenience function that replaces the successors of a 'LocalNodeState' with the k closest nodes from the provided list
setSuccessors :: [RemoteNodeState] -> LocalNodeState -> LocalNodeState
setSuccessors succs ns = ns {successors = takeRMapSuccessors (getNid ns) (kNeighbours ns) . rMapFromList . filter ((/=) (getNid ns) . getNid) $ succs}
setSuccessors :: [RemoteNodeState] -> LocalNodeState s -> LocalNodeState s
setSuccessors succs ns = ns {successors = takeRMapSuccessors (getNid ns) (kNeighbours ns) . rMapFromList . fmap keyValuePair . filter ((/=) (getNid ns) . getNid) $ succs}
-- | sets the predecessors of a 'LocalNodeState' to the closest k nodes of the current predecessors and the provided list, combined
addPredecessors :: [RemoteNodeState] -> LocalNodeState -> LocalNodeState
addPredecessors preds ns = ns {predecessors = takeRMapPredecessors (getNid ns) (kNeighbours ns) . addRMapEntries (filter ((/=) (getNid ns) . getNid) preds) . rMapFromList $ predecessors ns}
addPredecessors :: [RemoteNodeState] -> LocalNodeState s -> LocalNodeState s
addPredecessors preds ns = ns {predecessors = takeRMapPredecessors (getNid ns) (kNeighbours ns) . addRMapEntries (keyValuePair <$> filter ((/=) (getNid ns) . getNid) preds) . rMapFromList . fmap keyValuePair $ predecessors ns}
-- | sets the successors of a 'LocalNodeState' to the closest k nodes of the current successors and the provided list, combined
addSuccessors :: [RemoteNodeState] -> LocalNodeState -> LocalNodeState
addSuccessors succs ns = ns {successors = takeRMapSuccessors (getNid ns) (kNeighbours ns) . addRMapEntries (filter ((/=) (getNid ns) . getNid) succs) . rMapFromList $ successors ns}
addSuccessors :: [RemoteNodeState] -> LocalNodeState s -> LocalNodeState s
addSuccessors succs ns = ns {successors = takeRMapSuccessors (getNid ns) (kNeighbours ns) . addRMapEntries (keyValuePair <$> filter ((/=) (getNid ns) . getNid) succs) . rMapFromList . fmap keyValuePair $ successors ns}
instance HasKeyID RemoteNodeState NodeID where
instance HasKeyID NodeID RemoteNodeState where
getKeyID = getNid
instance HasKeyID a k => HasKeyID (CacheEntry a) k where
instance HasKeyID k a => HasKeyID k (CacheEntry a) where
getKeyID (CacheEntry _ obj _) = getKeyID obj
instance HasKeyID NodeID NodeID where
getKeyID = id
type NodeCacheEntry = CacheEntry RemoteNodeState
type NodeCache = RingMap NodeCacheEntry NodeID
type NodeCache = RingMap NodeID NodeCacheEntry
type LookupCacheEntry = CacheEntry (String, PortNumber)
type LookupCache = Map.Map NodeID LookupCacheEntry
@ -319,12 +326,15 @@ cacheLookup = rMapLookup
cacheLookupSucc :: NodeID -- ^lookup key
-> NodeCache -- ^ring cache
-> Maybe NodeCacheEntry
cacheLookupSucc = rMapLookupSucc
cacheLookupSucc key cache = snd <$> rMapLookupSucc key cache
cacheLookupPred :: NodeID -- ^lookup key
-> NodeCache -- ^ring cache
-> Maybe NodeCacheEntry
cacheLookupPred = rMapLookupPred
cacheLookupPred key cache = snd <$> rMapLookupPred key cache
-- clean up cache entries: once now - entry > maxAge
-- transfer difference now - entry to other node
-- | return the @NodeState@ data from a cache entry without checking its validation status
cacheGetNodeStateUnvalidated :: CacheEntry RemoteNodeState -> RemoteNodeState
@ -401,18 +411,67 @@ data FediChordConf = FediChordConf
-- ^ listening port for the FediChord DHT
, confBootstrapNodes :: [(String, PortNumber)]
-- ^ list of potential bootstrapping nodes
, confStabiliseInterval :: Int
-- ^ pause between stabilise runs, in milliseconds
, confBootstrapSamplingInterval :: Int
-- ^ pause between sampling the own ID through bootstrap nodes, in seconds
-- ^ pause between sampling the own ID through bootstrap nodes, in milliseconds
, confMaxLookupCacheAge :: POSIXTime
-- ^ maximum age of lookup cache entries in seconds
-- ^ maximum age of key lookup cache entries in seconds
, confJoinAttemptsInterval :: Int
-- ^ interval between join attempts on newly learned nodes, in milliseconds
, confMaxNodeCacheAge :: POSIXTime
-- ^ maximum age of entries in the node cache, in milliseconds
, confResponsePurgeAge :: POSIXTime
-- ^ maximum age of message parts in response part cache, in seconds
, confRequestTimeout :: Int
-- ^ how long to wait until response has arrived, in milliseconds
, confRequestRetries :: Int
-- ^ how often re-sending a timed-out request can be retried
}
deriving (Show, Eq)
-- ====== Service Types ============
class Service s d where
-- | run the service
runService :: ServiceConf -> d -> IO (s d)
getListeningPortFromService :: (Integral i) => s d -> i
-- | trigger a service data migration of data between the two given keys
migrateData :: s d
-> NodeID -- ^ source/ sender node ID
-> NodeID -- ^ start key
-> NodeID -- ^ end key
-> (String, Int) -- ^ hostname and port of target service
-> IO (Either String ()) -- ^ success or failure
-- | Wait for an incoming migration from a given node to succeed, may block forever
waitForMigrationFrom :: s d -> NodeID -> IO ()
instance Hashable.Hashable NodeID where
hashWithSalt salt = Hashable.hashWithSalt salt . getNodeID
hash = Hashable.hash . getNodeID
data ServiceConf = ServiceConf
{ confSubscriptionExpiryTime :: POSIXTime
-- ^ subscription lease expiration in seconds
, confServicePort :: Int
-- ^ listening port for service
, confServiceHost :: String
-- ^ hostname of service
, confLogfilePath :: String
-- ^ where to store the (measurement) log file
, confStatsEvalDelay :: Int
-- ^ delay between statistic rate measurement samplings, in microseconds
, confSpeedupFactor :: Int
-- While the speedup factor needs to be already included in all
}
class DHT d where
-- | lookup the responsible host handling a given key string,
-- possibly from a lookup cache
-- possiblggy from a lookup cache
lookupKey :: d -> String -> IO (Maybe (String, PortNumber))
-- | lookup the responsible host handling a given key string,
-- but force the DHT to do a fresh lookup instead of returning a cached result.
-- Also invalidates old cache entries.
forceLookupKey :: d -> String -> IO (Maybe (String, PortNumber))
isResponsibleFor :: d -> NodeID -> IO Bool
isResponsibleForSTM :: d -> NodeID -> STM Bool

View file

@ -1,117 +1,884 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE InstanceSigs #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE InstanceSigs #-}
module Hash2Pub.PostService where
import Control.Concurrent
import qualified Data.ByteString.Lazy.UTF8 as BSU
import Data.Maybe (fromMaybe)
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Exception (Exception (..), try)
import Control.Monad (foldM, forM, forM_, forever, unless,
void, when)
import Control.Monad.IO.Class (liftIO)
import Data.Bifunctor
import qualified Data.ByteString.Lazy.UTF8 as BSUL
import qualified Data.ByteString.UTF8 as BSU
import qualified Data.DList as D
import Data.Either (lefts, rights)
import qualified Data.HashMap.Strict as HMap
import qualified Data.HashSet as HSet
import Data.Maybe (fromJust, isJust)
import Data.String (fromString)
import qualified Data.Text as Txt
import Data.Text.Lazy (Text)
import qualified Data.Text.Lazy as Txt
import qualified Data.Text.Lazy.IO as TxtI
import Data.Text.Normalize (NormalizationMode (NFC), normalize)
import Data.Time.Clock.POSIX
import Data.Typeable (Typeable)
import qualified Network.HTTP.Client as HTTP
import qualified Network.HTTP.Types as HTTPT
import System.IO
import System.Random
import Text.Read (readEither)
import Formatting (fixed, format, int, (%))
import qualified Network.Wai.Handler.Warp as Warp
import Servant
import Servant.Client
import Hash2Pub.FediChord
import Hash2Pub.ServiceTypes
import Hash2Pub.FediChordTypes
import Hash2Pub.PostService.API
import Hash2Pub.RingMap
import Hash2Pub.Utils
import Debug.Trace
data PostService d = PostService
{ psPort :: Warp.Port
, psHost :: String
{ serviceConf :: ServiceConf
-- queues, other data structures
, baseDHT :: (DHT d) => d
, serviceThread :: ThreadId
, serviceThread :: TVar ThreadId
, subscribers :: TVar RelayTags
-- ^ for each tag store the subscribers + their queue
, ownSubscriptions :: TVar (HMap.HashMap NodeID POSIXTime)
-- ^ tags subscribed by the own node have an assigned lease time
, relayInQueue :: TQueue (Hashtag, PostID, PostContent)
-- ^ Queue for processing incoming posts of own instance asynchronously
, postFetchQueue :: TQueue PostID
-- ^ queue of posts to be fetched
, migrationsInProgress :: TVar (HMap.HashMap NodeID (MVar ()))
, httpMan :: HTTP.Manager
, statsQueue :: TQueue StatsEvent
, loadStats :: TVar RelayStats
-- ^ current load stats, replaced periodically
, logFileHandle :: Handle
}
deriving (Typeable)
type Hashtag = Text
type PostID = Text
type PostContent = Text
-- | For each handled tag, store its subscribers and provide a
-- broadcast 'TChan' for enqueuing posts
type RelayTags = RingMap NodeID (TagSubscribersSTM, TChan PostID, Hashtag)
type TagSubscribersSTM = TVar TagSubscribers
-- | each subscriber is identified by its contact data "hostname" "port"
-- and holds a TChan duplicated from the broadcast TChan of the tag
-- + an expiration timestamp
type TagSubscribers = (HMap.HashMap (String, Int) (TChan PostID, POSIXTime))
instance DHT d => Service PostService d where
runService dht host port = do
-- | initialise 'PostService' data structures and run server
runService conf dht = do
-- create necessary TVars
threadVar <- newTVarIO =<< myThreadId -- own thread ID as placeholder
subscriberVar <- newTVarIO emptyRMap
ownSubsVar <- newTVarIO HMap.empty
--ownPostVar <- newTVarIO HSet.empty
relayInQueue' <- newTQueueIO
postFetchQueue' <- newTQueueIO
migrationsInProgress' <- newTVarIO HMap.empty
httpMan' <- HTTP.newManager HTTP.defaultManagerSettings
statsQueue' <- newTQueueIO
loadStats' <- newTVarIO emptyStats
loggingFile <- openFile (confLogfilePath conf) WriteMode
hSetBuffering loggingFile LineBuffering
let
port' = fromIntegral port
warpSettings = Warp.setPort port' . Warp.setHost (fromString host) $ Warp.defaultSettings
servThread <- forkIO $ Warp.runSettings warpSettings postServiceApplication
pure $ PostService {
psPort = port'
, psHost = host
thisService = PostService
{ serviceConf = conf
, baseDHT = dht
, serviceThread = servThread
, serviceThread = threadVar
, subscribers = subscriberVar
, ownSubscriptions = ownSubsVar
--, ownPosts = ownPostVar
, relayInQueue = relayInQueue'
, postFetchQueue = postFetchQueue'
, migrationsInProgress = migrationsInProgress'
, httpMan = httpMan'
, statsQueue = statsQueue'
, loadStats = loadStats'
, logFileHandle = loggingFile
}
getServicePort s = fromIntegral $ psPort s
port' = fromIntegral (confServicePort conf)
warpSettings = Warp.setPort port' . Warp.setHost (fromString . confServiceHost $ conf) $ Warp.defaultSettings
-- log a start message, this also truncates existing files
TxtI.hPutStrLn loggingFile $ Txt.unlines
[ "# Starting mock relay implementation"
, "#time stamp ; relay receive rate ;relay delivery rate ;instance publish rate ;instance fetch rate ;total subscriptions"
]
-- Run 'concurrently_' from another thread to be able to return the
-- 'PostService'.
-- Terminating that parent thread will make all child threads terminate as well.
servThreadID <- forkIO $
concurrently_
-- web server
(Warp.runSettings warpSettings $ postServiceApplication thisService)
$ concurrently
-- background processing workers
(launchWorkerThreads thisService)
-- statistics/ measurements
(launchStatsThreads thisService)
-- update thread ID after fork
atomically $ writeTVar threadVar servThreadID
pure thisService
getListeningPortFromService = fromIntegral . confServicePort . serviceConf
migrateData = clientDeliverSubscriptions
waitForMigrationFrom serv fromID = do
migrationSynchroniser <- atomically $ do
syncPoint <- HMap.lookup fromID <$> readTVar (migrationsInProgress serv)
maybe
-- decision: this function blocks until it gets an incoming migration from given ID
retry
pure
syncPoint
-- block until migration finished
takeMVar migrationSynchroniser
-- | return a WAI application
postServiceApplication :: Application
postServiceApplication = serve exposedPostServiceAPI postServer
servicePort = 8081
-- | needed for guiding type inference
exposedPostServiceAPI :: Proxy PostServiceAPI
exposedPostServiceAPI = Proxy
postServiceApplication :: DHT d => PostService d -> Application
postServiceApplication serv = serve exposedPostServiceAPI $ postServer serv
-- ========= constants ===========
placeholderPost :: Text
placeholderPost = Txt.take 5120 . Txt.repeat $ 'O' -- size 5KiB
-- ========= HTTP API and handlers =============
type PostServiceAPI = "relay" :> "inbox" :> ReqBody '[PlainText] Txt.Text :> PostCreated '[PlainText] Txt.Text
-- ^ delivery endpoint of newly published posts of the relay's instance
:<|> "relay" :> "subscribers" :> ReqBody '[PlainText] Txt.Text :> Post '[PlainText] Txt.Text
-- ^ endpoint for delivering the subscriptions and outstanding queue
:<|> "post" :> Capture "postid" Txt.Text :> Get '[PlainText] Txt.Text
-- ^ fetch endpoint for posts, full post ID is http://$domain/post/$postid
:<|> "posts" :> ReqBody '[PlainText] Txt.Text :> Post '[PlainText] Txt.Text
-- ^ endpoint for fetching multiple posts at once
:<|> "tags" :> Capture "hashtag" Txt.Text :> ReqBody '[PlainText] Txt.Text :> PostCreated '[PlainText] Txt.Text
-- ^ delivery endpoint for posts of $tag at subscribing instance
:<|> "tags" :> Capture "hashtag" Txt.Text :> "subscribe" :> Header "Origin" Txt.Text :> Get '[PlainText] Integer
-- ^ endpoint for subscribing the instance specified in
-- the Origin header to $hashtag.
-- Returns subscription lease time in seconds.
:<|> "tags" :> Capture "hashtag" Txt.Text :> "unsubscribe" :> Header "Origin" Txt.Text :> Get '[PlainText] Txt.Text
-- ^ endpoint for unsubscribing the instance specified in
-- the Origin header to $hashtag
postServer :: DHT d => PostService d -> Server PostServiceAPI
postServer service = relayInbox service
:<|> subscriptionDelivery service
:<|> postFetch service
:<|> postMultiFetch service
:<|> postInbox service
:<|> tagDelivery service
:<|> tagSubscribe service
:<|> tagUnsubscribe service
postServer :: Server PostServiceAPI
postServer = relayInbox
:<|> subscriptionDelivery
:<|> postFetch
:<|> postMultiFetch
:<|> tagDelivery
:<|> tagSubscribe
:<|> tagUnsubscribe
-- | delivery endpoint: receive posts of a handled tag and enqueue them for relaying
relayInbox :: DHT d => PostService d -> Hashtag -> Text -> Handler NoContent
relayInbox serv tag posts = do
let
-- skip checking whether the post actually contains the tag, just drop full post
postIDs = head . Txt.splitOn "," <$> Txt.lines posts
-- if tag is not in own responsibility, return a 410 Gone
responsible <- liftIO $ isResponsibleFor (baseDHT serv) (hashtagToId tag)
if responsible
then pure ()
else
throwError $ err410 { errBody = "Relay is not responsible for this tag"}
broadcastChan <- liftIO $ atomically $ getTagBroadcastChannel serv tag
maybe
-- if noone subscribed to the tag, nothing needs to be done
(pure ())
-- otherwise enqueue posts into broadcast queue of the tag
(\queue -> do
liftIO $ forM_ postIDs (atomically . writeTChan queue)
-- report the received post for statistic purposes
liftIO . atomically . writeTQueue (statsQueue serv) $ StatsEvent RelayReceiveEvent (length postIDs) (hashtagToId tag)
)
broadcastChan
pure NoContent
-- exception to be thrown when a tag is not in the responsibility of a relay
newtype UnhandledTagException = UnhandledTagException String
deriving (Show, Typeable)
instance Exception UnhandledTagException
-- | delivery endpoint: receives a list of subscribers of tags and their outstanding queues for migration
subscriptionDelivery :: DHT d => PostService d -> Integer -> Text -> Handler Text
subscriptionDelivery serv senderID subList = do
let
tagSubs = Txt.lines subList
-- signal that the migration is in progress
syncMVar <- liftIO newEmptyMVar
liftIO . atomically $ modifyTVar' (migrationsInProgress serv) $
HMap.insert (fromInteger senderID) syncMVar
-- In favor of having the convenience of rolling back the transaction once a
-- not-handled tag occurs, this results in a single large transaction.
-- Hopefully the performance isn't too bad.
res <- liftIO . atomically $ (foldM (\_ tag' -> do
responsible <- isResponsibleForSTM (baseDHT serv) (hashtagToId tag')
if responsible
then processTag (subscribers serv) tag'
else throwSTM $ UnhandledTagException (Txt.unpack tag' <> " not handled by this relay")
pure $ Right ()
) (pure ()) tagSubs
`catchSTM` (\e -> pure . Left $ show (e :: UnhandledTagException))
-- TODO: potentially log this
:: STM (Either String ()))
-- TODO: should this always signal migration finished to avoid deadlocksP
liftIO $ putMVar syncMVar () -- wakes up waiting thread
-- allow response to be completed independently from waiting thread
_ <- liftIO . forkIO $ do
putMVar syncMVar () -- blocks until waiting thread has resumed
-- delete this migration from ongoing ones
liftIO . atomically $ modifyTVar' (migrationsInProgress serv) $
HMap.delete (fromInteger senderID)
case res of
Left err -> throwError err410 {errBody = BSUL.fromString err}
Right _ -> pure ""
-- TODO: check and only accept tags in own (future?) responsibility
where
processTag :: TVar RelayTags -> Text -> STM ()
processTag subscriberSTM tagData = do
let
tag:subText:lease:posts:_ = Txt.splitOn "," tagData
-- ignore checking of lease time
leaseTime = fromIntegral (read . Txt.unpack $ lease :: Integer)
sub = read . Txt.unpack $ subText :: (String, Int)
postList = Txt.words posts
enqueueSubscription subscriberSTM (normaliseTag tag) sub postList leaseTime
relayInbox :: Txt.Text -> Handler Txt.Text
relayInbox post = pure $ "Here be InboxDragons with " <> post
-- | endpoint for fetching a post by its ID
postFetch :: PostService d -> Text -> Handler Text
postFetch serv _ = do
-- decision: for saving memory do not store published posts, just
-- pretend there is a post for each requested ID
liftIO . atomically . writeTQueue (statsQueue serv) $ StatsEvent IncomingPostFetchEvent 1 0 -- tag fetched for is irrelevant
pure placeholderPost
subscriptionDelivery :: Txt.Text -> Handler Txt.Text
subscriptionDelivery subList = pure $ "Here be Subscription List dragons: " <> subList
postFetch :: Txt.Text -> Handler Txt.Text
postFetch postID = pure $ "Here be a post with dragon ID " <> postID
-- | endpoint for fetching multiple posts of this instance by their IDs
postMultiFetch :: PostService d -> Text -> Handler Text
postMultiFetch serv postIDs = do
let
idList = Txt.lines postIDs
-- decision: for saving memory do not store published posts, just
-- pretend there is a post for each requested ID
response = foldl (\response' _ ->
placeholderPost <> "\n" <> response'
) "" idList
liftIO . atomically . writeTQueue (statsQueue serv) $ StatsEvent IncomingPostFetchEvent (length idList) 0 -- tag fetched for is irrelevant
pure response
postMultiFetch :: Txt.Text -> Handler Txt.Text
postMultiFetch postIDs = pure $ "Here be multiple post dragons: "
<> (Txt.unwords . Txt.lines $ postIDs)
tagDelivery :: Txt.Text -> Txt.Text -> Handler Txt.Text
tagDelivery hashtag posts = pure $ "Here be #" <> hashtag <> " dragons with " <> posts
-- | delivery endpoint: inbox for initially publishing a post at an instance
postInbox :: PostService d -> Text -> Handler NoContent
postInbox serv post = do
-- extract contained hashtags
let
containedTags = fmap (normaliseTag . Txt.tail) . filter ((==) '#' . Txt.head) . Txt.words $ post
-- generate post ID
postId <- liftIO $ Txt.pack . show <$> (randomRIO (0, 2^(128::Integer)-1) :: IO Integer)
-- decision: for saving memory do not store published post IDs, just deliver a post for any requested ID
-- enqueue a relay job for each tag
liftIO $ forM_ (containedTags :: [Text]) (\tag ->
atomically $ writeTQueue (relayInQueue serv) (tag, postId, post)
)
pure NoContent
tagSubscribe :: Txt.Text -> Maybe Txt.Text -> Handler Integer
tagSubscribe hashtag origin = pure 42
tagUnsubscribe :: Txt.Text -> Maybe Txt.Text -> Handler Txt.Text
tagUnsubscribe hashtag origin = pure $ "Here be a dragon unsubscription from " <> fromMaybe "Nothing" origin <> " to " <> hashtag
-- | delivery endpoint: receive postIDs of a certain subscribed hashtag
tagDelivery :: PostService d -> Text -> Text -> Handler Text
tagDelivery serv hashtag posts = do
let postIDs = Txt.lines posts
subscriptions <- liftIO . readTVarIO . ownSubscriptions $ serv
if isJust (HMap.lookup (hashtagToId hashtag) subscriptions)
then -- TODO: increase a counter/ statistics for received posts of this tag
liftIO $ forM_ postIDs $ atomically . writeTQueue (postFetchQueue serv)
else -- silently drop posts from unsubscribed tags
pure ()
pure $ "Received a postID for tag " <> hashtag
-- | receive subscription requests to a handled hashtag
tagSubscribe :: DHT d => PostService d -> Text -> Maybe Text -> Handler Integer
tagSubscribe serv hashtag origin = do
responsible <- liftIO $ isResponsibleFor (baseDHT serv) (hashtagToId hashtag)
if not responsible
-- GONE if not responsible
then throwError err410 { errBody = "not responsible for this tag" }
else pure ()
originURL <- maybe
(throwError $ err400 { errBody = "Missing Origin header" })
pure
origin
req <- HTTP.parseUrlThrow (Txt.unpack originURL)
now <- liftIO getPOSIXTime
let leaseTime = now + confSubscriptionExpiryTime (serviceConf serv)
-- setup subscription entry
_ <- liftIO . atomically $ setupSubscriberChannel (subscribers serv) hashtag (BSU.toString $ HTTP.host req, HTTP.port req) leaseTime
--liftIO . putStrLn $ "just got a subscription to " <> Txt.unpack hashtag
pure $ round leaseTime
-- | receive and handle unsubscription requests regarding a handled tag
tagUnsubscribe :: DHT d => PostService d -> Text -> Maybe Text -> Handler Text
tagUnsubscribe serv hashtag origin = do
responsible <- liftIO $ isResponsibleFor (baseDHT serv) (hashtagToId hashtag)
if not responsible
-- GONE if not responsible
then throwError err410 { errBody = "not responsible for this tag" }
else pure ()
originURL <- maybe
(throwError $ err400 { errBody = "Missing Origin header" })
pure
origin
req <- HTTP.parseUrlThrow (Txt.unpack originURL)
liftIO . atomically $ deleteSubscription (subscribers serv) hashtag (BSU.toString $ HTTP.host req, HTTP.port req)
pure "bye bye"
-- client/ request functions
clientAPI :: Proxy PostServiceAPI
clientAPI = Proxy
relayInboxClient
:<|> subscriptionDeliveryClient
:<|> postFetchClient
:<|> postMultiFetchClient
:<|> postInboxClient
:<|> tagDeliveryClient
:<|> tagSubscribeClient
:<|> tagUnsubscribeClient
= client clientAPI
-- | Deliver the subscriber list of all hashtags in the interval [fromTag, toTag]
-- and their outstanding delivery queue to another instance.
-- If the transfer succeeds, the transfered subscribers are removed from the local list.
clientDeliverSubscriptions :: PostService d
-> NodeID -- ^ sender node ID
-> NodeID -- ^ fromTag
-> NodeID -- ^ toTag
-> (String, Int) -- ^ hostname and port of instance to deliver to
-> IO (Either String ()) -- Either signals success or failure
clientDeliverSubscriptions serv fromNode fromKey toKey (toHost, toPort) = do
-- collect tag interval
intervalTags <- takeRMapSuccessorsFromTo fromKey toKey <$> readTVarIO (subscribers serv)
-- returns a [ (TagSubscribersSTM, TChan PostID, Hashtag) ]
-- extract subscribers and posts
-- no need for extracting as a single atomic operation, as newly incoming posts are supposed to be rejected because of already having re-positioned on the DHT
subscriberData <- foldM (\response (subSTM, _, tag) -> do
subMap <- readTVarIO subSTM
thisTagsData <- foldM (\tagResponse (subscriber, (subChan, lease)) -> do
-- duplicate the pending queue to work on a copy, in case of a delivery error
pending <- atomically $ do
queueCopy <- cloneTChan subChan
channelGetAll queueCopy
if null pending
then pure tagResponse
else pure $ tag <> "," <> Txt.pack (show subscriber) <> "," <> Txt.pack (show lease) <> "," <> Txt.unwords pending <> "\n"
)
""
(HMap.toList subMap)
pure $ thisTagsData <> response
)
""
intervalTags
-- send subscribers
resp <- runClientM (subscriptionDeliveryClient (getNodeID fromNode) subscriberData) (mkClientEnv (httpMan serv) (BaseUrl Http toHost (fromIntegral toPort) ""))
-- on failure return a Left, otherwise delete subscription entry
case resp of
Left err -> pure . Left . show $ err
Right _ -> do
atomically $
modifyTVar' (subscribers serv) $ \tagMap ->
foldr deleteRMapEntry tagMap ((\(_, _, t) -> hashtagToId t) <$> intervalTags)
pure . Right $ ()
where
channelGetAll :: TChan a -> STM [a]
channelGetAll chan = channelGetAll' chan []
channelGetAll' :: TChan a -> [a] -> STM [a]
channelGetAll' chan acc = do
haveRead <- tryReadTChan chan
maybe (pure acc) (\x -> channelGetAll' chan (x:acc)) haveRead
-- | Subscribe the client to the given hashtag. On success it returns the given lease time,
-- but also records the subscription in its own data structure.
clientSubscribeTo :: DHT d => PostService d -> Hashtag -> IO (Either String Integer)
clientSubscribeTo serv tag = do
lookupRes <- lookupKey (baseDHT serv) (Txt.unpack tag)
doSubscribe lookupRes True
where
doSubscribe lookupResponse allowRetry = maybe
(pure . Left $ "No node found")
(\(foundHost, foundPort) -> do
let origin = "http://" <> Txt.pack (confServiceHost $ serviceConf serv) <> ":" <> Txt.pack (show (getListeningPortFromService serv :: Integer))
resp <- runClientM (tagSubscribeClient tag (Just origin)) (mkClientEnv (httpMan serv) (BaseUrl Http foundHost (fromIntegral foundPort) ""))
case resp of
Left (FailureResponse _ fresp)
|(HTTPT.statusCode . responseStatusCode $ fresp) == 410 && allowRetry -> do -- responsibility gone, force new lookup
newRes <- forceLookupKey (baseDHT serv) (Txt.unpack tag)
--putStrLn $ "failed subscribing to " <> Txt.unpack tag <> " on " <> foundHost
doSubscribe newRes False
Left err -> pure . Left . show $ err
Right lease -> do
atomically . modifyTVar' (ownSubscriptions serv) $ HMap.insert (hashtagToId tag) (fromInteger lease)
--putStrLn $ "just subscribed to " <> Txt.unpack tag <> " on " <> foundHost
pure . Right $ lease
)
lookupResponse
-- | Unsubscribe the client from the given hashtag.
clientUnsubscribeFrom :: DHT d => PostService d -> Hashtag -> IO (Either String ())
clientUnsubscribeFrom serv tag = do
lookupRes <- lookupKey (baseDHT serv) (Txt.unpack tag)
doUnsubscribe lookupRes True
where
doUnsubscribe lookupResponse allowRetry = maybe
(pure . Left $ "No node found")
(\(foundHost, foundPort) -> do
let origin = "http://" <> Txt.pack (confServiceHost $ serviceConf serv) <> ":" <> Txt.pack (show (getListeningPortFromService serv :: Integer))
resp <- runClientM (tagUnsubscribeClient tag (Just origin)) (mkClientEnv (httpMan serv) (BaseUrl Http foundHost (fromIntegral foundPort) ""))
case resp of
Left (FailureResponse _ fresp)
|(HTTPT.statusCode . responseStatusCode $ fresp) == 410 && allowRetry -> do -- responsibility gone, force new lookup
newRes <- forceLookupKey (baseDHT serv) (Txt.unpack tag)
doUnsubscribe newRes False
Left err -> pure . Left . show $ err
Right _ -> do
atomically . modifyTVar' (ownSubscriptions serv) $ HMap.delete (hashtagToId tag)
pure . Right $ ()
)
lookupResponse
-- | publish a new post to the inbox of a specified relay instance. This
-- instance will then be the originating instance of the post and will forward
-- the post to the responsible relays.
-- As the initial publishing isn't done by a specific relay (but *to* a specific relay
-- instead), the function does *not* take a PostService as argument.
clientPublishPost :: HTTP.Manager -- ^ for better performance, a shared HTTP manager has to be provided
-> String -- ^ hostname
-> Int -- ^ port
-> PostContent -- ^ post content
-> IO (Either String ()) -- ^ error or success
clientPublishPost httpman hostname port postC = do
resp <- runClientM (postInboxClient postC) (mkClientEnv httpman (BaseUrl Http hostname port ""))
pure . bimap show (const ()) $ resp
-- currently this is unused code
getClients :: String -> Int -> HTTP.Manager -> Client IO PostServiceAPI
getClients hostname' port' httpMan = hoistClient clientAPI
(fmap (either (error . show) id)
. flip runClientM clientEnv
)
(client clientAPI)
where
clientEnv = mkClientEnv httpMan (BaseUrl Http hostname' port' "")
-- ======= data structure manipulations =========
-- | Write all pending posts of a subscriber-tag-combination to its queue.
-- Sets up all necessary data structures if they are still missing.
enqueueSubscription :: TVar RelayTags -- tag-subscriber map
-> Hashtag -- hashtag of pending posts
-> (String, Int) -- subscriber's connection information
-> [PostID] -- pending posts
-> POSIXTime -- lease expiry time
-> STM ()
enqueueSubscription tagMapSTM tag subscriber posts leaseTime = do
-- get the tag output queue and, if necessary, create it
subChan <- setupSubscriberChannel tagMapSTM tag subscriber leaseTime
forM_ posts (writeTChan subChan)
-- | STM operation to return the outgoing post queue of a tag to a specified subscriber.
-- If the queue doesn't exist yet, all necessary data structures are set up accordingly.
setupSubscriberChannel :: TVar RelayTags -> Hashtag -> (String, Int) -> POSIXTime -> STM (TChan PostID)
setupSubscriberChannel tagMapSTM tag subscriber leaseTime = do
tagMap <- readTVar tagMapSTM
case lookupTagSubscriptions tag tagMap of
Nothing -> do
-- if no collision/ tag doesn't exist yet, just initialize a
-- new subscriber map
broadcastChan <- newBroadcastTChan
tagOutChan <- dupTChan broadcastChan
newSubMapSTM <- newTVar $ HMap.singleton subscriber (tagOutChan, leaseTime)
writeTVar tagMapSTM $ addRMapEntry (hashtagToId tag) (newSubMapSTM, broadcastChan, tag) tagMap
pure tagOutChan
Just (foundSubMapSTM, broadcastChan, _) -> do
-- otherwise use the existing subscriber map
foundSubMap <- readTVar foundSubMapSTM
case HMap.lookup subscriber foundSubMap of
Nothing -> do
-- for new subscribers, create new output channel
tagOutChan <- dupTChan broadcastChan
writeTVar foundSubMapSTM $ HMap.insert subscriber (tagOutChan, leaseTime) foundSubMap
pure tagOutChan
-- existing subscriber's channels are just returned
Just (tagOutChan, _) -> pure tagOutChan
-- | deletes a subscription from the passed subscriber map
deleteSubscription :: TVar RelayTags -> Hashtag -> (String, Int) -> STM ()
deleteSubscription tagMapSTM tag subscriber = do
tagMap <- readTVar tagMapSTM
case lookupTagSubscriptions tag tagMap of
-- no subscribers to that tag, just return
Nothing -> pure ()
Just (foundSubMapSTM, _, _) -> do
foundSubMap <- readTVar foundSubMapSTM
let newSubMap = HMap.delete subscriber foundSubMap
-- if there are no subscriptions for the tag anymore, remove its
-- data sttructure altogether
if HMap.null newSubMap
then writeTVar tagMapSTM $ deleteRMapEntry (hashtagToId tag) tagMap
-- otherwise just remove the subscription of that node
else writeTVar foundSubMapSTM newSubMap
-- | returns the broadcast channel of a hashtag if there are any subscribers to it
getTagBroadcastChannel :: PostService d -> Hashtag -> STM (Maybe (TChan PostID))
getTagBroadcastChannel serv tag = do
tagMap <- readTVar $ subscribers serv
case lookupTagSubscriptions tag tagMap of
Nothing -> pure Nothing
Just (subscriberSTM, broadcastChan, _) -> do
subscriberMap <- readTVar subscriberSTM
if HMap.null subscriberMap
then pure Nothing
else pure (Just broadcastChan)
-- | look up the subscription data of a tag
lookupTagSubscriptions :: Hashtag -> RingMap NodeID a -> Maybe a
lookupTagSubscriptions tag = rMapLookup (hashtagToId tag)
-- normalise the unicode representation of a string to NFC and convert to lower case
normaliseTag :: Text -> Text
normaliseTag = Txt.toLower . Txt.fromStrict . normalize NFC . Txt.toStrict
-- | convert a hashtag to its representation on the DHT
hashtagToId :: Hashtag -> NodeID
hashtagToId = genKeyID . Txt.unpack
readUpToTChan :: Int -> TChan a -> STM [a]
readUpToTChan 0 _ = pure []
readUpToTChan n chan = do
readFromChan <- tryReadTChan chan
case readFromChan of
Nothing -> pure []
Just val -> do
moreReads <- readUpToTChan (pred n) chan
pure (val:moreReads)
readUpToTQueue :: Int -> TQueue a -> STM [a]
readUpToTQueue 0 _ = pure []
readUpToTQueue n q = do
readFromQueue <- tryReadTQueue q
case readFromQueue of
Nothing -> pure []
Just val -> do
moreReads <- readUpToTQueue (pred n) q
pure (val:moreReads)
-- | define how to convert all showable types to PlainText
-- No idea what I'm doing with these overlappable instances though ¯\_(ツ)_/¯
-- TODO: figure out how this overlapping stuff actually works https://downloads.haskell.org/~ghc/latest/docs/html/users_guide/glasgow_exts.html#instance-overlap
instance {-# OVERLAPPABLE #-} Show a => MimeRender PlainText a where
mimeRender _ = BSU.fromString . show
mimeRender _ = BSUL.fromString . show
instance {-# OVERLAPPABLE #-} Read a => MimeUnrender PlainText a where
mimeUnrender _ = readEither . BSUL.toString
-- ====== worker threads ======
-- TODO: make configurable
numParallelDeliveries = 10
launchWorkerThreads :: DHT d => PostService d -> IO ()
launchWorkerThreads serv = concurrently_
(processIncomingPosts serv)
$ concurrently_
(purgeSubscriptionsThread serv)
$ concurrently_
(fetchTagPosts serv)
(relayWorker serv)
-- | periodically remove expired subscription entries from relay subscribers
purgeSubscriptionsThread :: PostService d -> IO ()
purgeSubscriptionsThread serv = forever $ do
-- read config
now <- getPOSIXTime
let
purgeInterval = confSubscriptionExpiryTime (serviceConf serv) / 10
-- no need to atomically lock this, as newly incoming subscriptions do not
-- need to be purged
tagMap <- readTVarIO $ subscribers serv
forM_ tagMap $ \(subscriberMapSTM, _, _) ->
-- but each subscriberMap needs to be modified atomically
atomically . modifyTVar' subscriberMapSTM $ HMap.filter (\(_, ts) -> ts > now)
threadDelay $ fromEnum purgeInterval `div` 10^6
-- | process the pending relay inbox of incoming posts from the internal queue:
-- Look up responsible relay node for given hashtag and forward post to it
processIncomingPosts :: DHT d => PostService d -> IO ()
processIncomingPosts serv = forever $ do
-- blocks until available
deliveriesToProcess <- atomically $ do
readResult <- readUpToTQueue numParallelDeliveries $ relayInQueue serv
if null readResult
then retry
else pure readResult
runningJobs <- forM deliveriesToProcess $ \(tag, pID, pContent) -> async $ do
let pIdUri = "http://" <> (Txt.pack . confServiceHost . serviceConf $ serv) <> ":" <> (fromString . show . confServicePort . serviceConf $ serv) <> "/post/" <> pID
lookupRes <- lookupKey (baseDHT serv) (Txt.unpack tag)
case lookupRes of
-- no vserver active => wait and retry
Nothing -> threadDelay (10 * 10^6) >> pure (Left "no vserver active")
Just (responsibleHost, responsiblePort) -> do
resp <- runClientM (relayInboxClient tag $ pIdUri <> "," <> pContent) (mkClientEnv (httpMan serv) (BaseUrl Http responsibleHost (fromIntegral responsiblePort) ""))
case resp of
Left err -> do
-- 410 error indicates outdated responsibility mapping
-- Simplification: just invalidate the mapping entry on all errors, force a re-lookup and re-queue the post
-- TODO: keep track of maximum retries
_ <- forceLookupKey (baseDHT serv) (Txt.unpack tag)
atomically . writeTQueue (relayInQueue serv) $ (tag, pID, pContent)
pure . Left $ "Error: " <> show err
Right _ -> do
-- idea for the experiment: each post publication makes the initial posting instance subscribe to all contained tags
now <- getPOSIXTime
subscriptionStatus <- HMap.lookup (hashtagToId tag) <$> readTVarIO (ownSubscriptions serv)
-- if not yet subscribed or subscription expires within 5 minutes, (re)subscribe to tag
when (maybe True (\subLease -> now - subLease < 300) subscriptionStatus) $
void $ clientSubscribeTo serv tag
-- for evaluation, return the tag of the successfully forwarded post
pure $ Right tag
-- collect async results
results <- mapM waitCatch runningJobs
-- report the count of published posts for statistics
atomically . writeTQueue (statsQueue serv) $ StatsEvent PostPublishEvent (length . rights $ results) 0 -- hashtag published to doesn't matter
pure ()
-- | process the pending fetch jobs of delivered post IDs: Delivered posts are tried to be fetched from their URI-ID
fetchTagPosts :: DHT d => PostService d -> IO ()
fetchTagPosts serv = forever $ do
-- blocks until available
-- TODO: batching, retry
-- TODO: process multiple in parallel
pIdUri <- atomically . readTQueue $ postFetchQueue serv
fetchReq <- HTTP.parseRequest . Txt.unpack $ pIdUri
resp <- try $ HTTP.httpLbs fetchReq (httpMan serv) :: IO (Either HTTP.HttpException (HTTP.Response BSUL.ByteString))
case resp of
Right response ->
-- TODO error handling, retry
--if HTTPT.statusCode (HTTP.responseStatus response) == 200
-- then
-- -- success, TODO: statistics
-- else
pure ()
Left _ ->
-- TODO error handling, retry
pure ()
relayWorker :: PostService d -> IO ()
relayWorker serv = forever $ do
-- atomically (to be able to retry) fold a list of due delivery actions
jobsToProcess <- atomically $ do
subscriptionMap <- readTVar $ subscribers serv
jobList <- D.toList <$> foldM (\jobAcc (subscriberMapSTM, _, tag) -> do
subscriberMap <- readTVar subscriberMapSTM
foldM (\jobAcc' ((subHost, subPort), (postChan, _)) -> do
postsToDeliver <- readUpToTChan 500 postChan
let postDeliveryAction = runClientM (tagDeliveryClient tag (Txt.unlines postsToDeliver)) (mkClientEnv (httpMan serv) (BaseUrl Http subHost (fromIntegral subPort) ""))
-- append relay push job to job list
pure $ if not (null postsToDeliver)
then jobAcc' `D.snoc` (do
deliveryResult <- postDeliveryAction
either
(const $ pure ())
-- on successful push, record that event for statistics
(const . atomically . writeTQueue (statsQueue serv) $ StatsEvent RelayDeliveryEvent (length postsToDeliver) (hashtagToId tag))
deliveryResult
pure deliveryResult
)
else jobAcc'
) jobAcc $ HMap.toList subscriberMap
) D.empty subscriptionMap
-- if no relay jobs, then retry
if null jobList
then retry
else pure jobList
-- when processing the list, send several deliveries in parallel
forM_ (chunksOf numParallelDeliveries jobsToProcess) $ \jobset -> do
runningJobs <- mapM async jobset
-- so far just dropping failed attempts, TODO: retry mechanism
results <- mapM waitCatch runningJobs
let
successfulResults = rights results
unsuccessfulResults = lefts results
unless (null unsuccessfulResults) $ putStrLn ("ERR: " <> show (length unsuccessfulResults) <> " failed deliveries!")
putStrLn $ "successfully relayed " <> show (length successfulResults)
pure ()
-- ======= statistics/measurement and logging =======
data StatsEventType = PostPublishEvent
| RelayReceiveEvent
| RelayDeliveryEvent
| IncomingPostFetchEvent
deriving (Enum, Show, Eq)
-- | Represents measurement event of a 'StatsEventType' with a count relevant for a certain key
data StatsEvent = StatsEvent StatsEventType Int NodeID
deriving (Show, Eq)
-- | measured rates of relay performance
-- TODO: maybe include other metrics in here as well, like number of subscribers?
data RelayStats = RelayStats
{ relayReceiveRates :: RingMap NodeID Double
-- ^ rate of incoming posts in the responsibility of this relay
, relayDeliveryRates :: RingMap NodeID Double
-- ^ rate of relayed outgoing posts
, postFetchRate :: Double -- no need to differentiate between tags
-- ^ number of post-fetches delivered
, postPublishRate :: Double
-- ^ rate of initially publishing posts through this instance
}
deriving (Show, Eq)
launchStatsThreads :: PostService d -> IO ()
launchStatsThreads serv = do
-- create shared accumulator
sharedAccum <- newTVarIO emptyStats
concurrently_
(accumulateStatsThread sharedAccum $ statsQueue serv)
(evaluateStatsThread serv sharedAccum)
-- | Read stats events from queue and add them to a shared accumulator.
-- Instead of letting the events accumulate in the queue and allocate linear memory, immediately fold the result.
accumulateStatsThread :: TVar RelayStats -> TQueue StatsEvent -> IO ()
accumulateStatsThread statsAccumulator statsQ = forever $ do
-- blocks until stats event arrives
event <- atomically $ readTQueue statsQ
-- add the event number to current accumulator
atomically $ modifyTVar' statsAccumulator $ statsAdder event
-- | add incoming stats events to accumulator value
statsAdder :: StatsEvent -> RelayStats -> RelayStats
statsAdder event stats = case event of
StatsEvent PostPublishEvent num _ ->
stats {postPublishRate = fromIntegral num + postPublishRate stats}
StatsEvent RelayReceiveEvent num key ->
stats {relayReceiveRates = sumIfEntryExists key (fromIntegral num) (relayReceiveRates stats)}
StatsEvent RelayDeliveryEvent num key ->
stats {relayDeliveryRates = sumIfEntryExists key (fromIntegral num) (relayDeliveryRates stats)}
StatsEvent IncomingPostFetchEvent num _ ->
stats {postFetchRate = fromIntegral num + postFetchRate stats}
where
sumIfEntryExists = addRMapEntryWith (\newVal oldVal ->
let toInsert = fromJust $ extractRingEntry newVal
in
case oldVal of
KeyEntry n -> KeyEntry (n + toInsert)
ProxyEntry pointer (Just (KeyEntry n)) -> ProxyEntry pointer (Just (KeyEntry $ n + toInsert))
ProxyEntry pointer Nothing -> ProxyEntry pointer (Just newVal)
_ -> error "RingMap nested too deeply"
)
-- Periodically exchange the accumulated statistics with empty ones, evaluate them
-- and make them the current statistics of the service.
evaluateStatsThread :: PostService d -> TVar RelayStats -> IO ()
evaluateStatsThread serv statsAcc = getPOSIXTime >>= loop
where
loop previousTs = do
threadDelay $ confStatsEvalDelay (serviceConf serv)
-- get and reset the stats accumulator
summedStats <- atomically $ do
stats <- readTVar statsAcc
writeTVar statsAcc emptyStats
pure stats
-- as the transaction might retry several times, current time needs to
-- be read afterwards
now <- getPOSIXTime
-- evaluate stats rate and replace server stats
-- persistently store in a TVar so it can be retrieved later by the DHT
let timePassed = (now - previousTs) * fromIntegral (confSpeedupFactor $ serviceConf serv)
rateStats = evaluateStats timePassed summedStats
atomically $ writeTVar (loadStats serv) rateStats
-- and now what? write a log to file
-- format: total relayReceiveRates;total relayDeliveryRates;postFetchRate;postPublishRate; subscriberSum
-- later: current (reported) load, target load
subscriberSum <- sumSubscribers
TxtI.hPutStrLn (logFileHandle serv) $
format (fixed 9 % ";" % fixed 20 % ";" % fixed 20 % ";" % fixed 20 % ";" % fixed 20 % ";" % int )
(realToFrac now :: Double)
(sum . relayReceiveRates $ rateStats)
(sum . relayDeliveryRates $ rateStats)
(postPublishRate rateStats)
(postFetchRate rateStats)
subscriberSum
loop now
sumSubscribers = do
tagMap <- readTVarIO $ subscribers serv
foldM (\subscriberSum (subscriberMapSTM, _, _) -> do
subscriberMap <- readTVarIO subscriberMapSTM
pure $ subscriberSum + HMap.size subscriberMap
)
0 tagMap
-- | Evaluate the accumulated statistic events: Currently mostly calculates the event
-- rates by dividing through the collection time frame
evaluateStats :: POSIXTime -> RelayStats -> RelayStats
evaluateStats timeInterval summedStats =
-- first sum all event numbers, then divide through number of seconds passed to
-- get rate per second
RelayStats
{ relayReceiveRates = (/ intervalSeconds) <$> relayReceiveRates summedStats
, relayDeliveryRates = (/ intervalSeconds) <$> relayDeliveryRates summedStats
, postPublishRate = postPublishRate summedStats / intervalSeconds
, postFetchRate = postFetchRate summedStats / intervalSeconds
}
where
intervalSeconds = realToFrac timeInterval
emptyStats :: RelayStats
emptyStats = RelayStats
{ relayReceiveRates = emptyRMap
, relayDeliveryRates = emptyRMap
, postFetchRate = 0
, postPublishRate = 0
}

View file

@ -0,0 +1,37 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE InstanceSigs #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE TypeOperators #-}
module Hash2Pub.PostService.API where
import Data.Text.Lazy (Text)
import Servant
type PostServiceAPI = "relay" :> "inbox" :> Capture "hashtag" Text :> ReqBody '[PlainText] Text :> PutCreated '[PlainText] NoContent
-- delivery endpoint at responsible relay for delivering posts of $tag for distribution
:<|> "relay" :> "subscribers" :> Capture "senderID" Integer :> ReqBody '[PlainText] Text :> PostNoContent '[PlainText] Text
-- endpoint for delivering the subscriptions and outstanding queue
:<|> "post" :> Capture "postid" Text :> Get '[PlainText] Text
-- fetch endpoint for posts, full post ID is http://$domain/post/$postid
:<|> "posts" :> ReqBody '[PlainText] Text :> Post '[PlainText] Text
-- endpoint for fetching multiple posts at once
:<|> "posts" :> "inbox" :> ReqBody '[PlainText] Text :> PutCreated '[PlainText] NoContent
-- delivery endpoint of newly published posts of the relay's instance
:<|> "tags" :> Capture "hashtag" Text :> ReqBody '[PlainText] Text :> PostCreated '[PlainText] Text
-- delivery endpoint for posts of $tag at subscribing instance
:<|> "tags" :> Capture "hashtag" Text :> "subscribe" :> Header "Origin" Text :> Get '[PlainText] Integer
-- endpoint for subscribing the instance specified in
-- the Origin header to $hashtag.
-- Returns subscription lease time in seconds.
:<|> "tags" :> Capture "hashtag" Text :> "unsubscribe" :> Header "Origin" Text :> Get '[PlainText] Text
-- endpoint for unsubscribing the instance specified in
-- the Origin header to $hashtag
-- | needed for guiding type inference
exposedPostServiceAPI :: Proxy PostServiceAPI
exposedPostServiceAPI = Proxy

View file

@ -1,7 +1,5 @@
module Hash2Pub.ProtocolTypes where
import qualified Data.Map as Map
import Data.Maybe (mapMaybe)
import qualified Data.Set as Set
import Data.Time.Clock.POSIX (POSIXTime)
@ -55,6 +53,7 @@ data ActionPayload = QueryIDRequestPayload
| LeaveRequestPayload
{ leaveSuccessors :: [RemoteNodeState]
, leavePredecessors :: [RemoteNodeState]
, leaveDoMigration :: Bool
}
| StabiliseRequestPayload
| PingRequestPayload

View file

@ -5,36 +5,61 @@ module Hash2Pub.RingMap where
import Data.Foldable (foldr')
import qualified Data.Map.Strict as Map
import Data.Maybe (fromJust, isJust, isNothing, mapMaybe)
import Data.Maybe (isJust, isNothing, mapMaybe)
-- | Class for all types that can be identified via a EpiChord key.
-- Used for restricting the types a 'RingMap' can store
class (Eq a, Show a, Bounded k, Ord k) => HasKeyID a k where
class (Eq a, Show a, Bounded k, Ord k) => HasKeyID k a where
getKeyID :: a -> k
keyValuePair :: a -> (k, a)
keyValuePair val = (getKeyID val, val)
-- | generic data structure for holding elements with a key and modular lookup
newtype RingMap a k = RingMap { getRingMap :: (HasKeyID a k, Bounded k, Ord k) => Map.Map k (RingEntry a k) }
newtype RingMap k a = RingMap { getRingMap :: (Bounded k, Ord k) => Map.Map k (RingEntry k a) }
instance (HasKeyID a k, Bounded k, Ord k) => Eq (RingMap a k) where
instance (Bounded k, Ord k, Eq a) => Eq (RingMap k a) where
a == b = getRingMap a == getRingMap b
instance (HasKeyID a k, Bounded k, Ord k, Show k) => Show (RingMap a k) where
show rmap = shows "RingMap " (show $ getRingMap rmap)
instance (Bounded k, Ord k, Show k, Show a) => Show (RingMap k a) where
show rmap = shows ("RingMap " :: String) (show $ getRingMap rmap)
instance (Bounded k, Ord k) => Functor (RingMap k) where
-- | map a function over all payload values of a 'RingMap'
fmap f = RingMap . Map.map traversingF . getRingMap
where
traversingF (KeyEntry a) = KeyEntry (f a)
traversingF (ProxyEntry pointer (Just entry)) = ProxyEntry pointer (Just $ traversingF entry)
traversingF (ProxyEntry pointer Nothing) = ProxyEntry pointer Nothing
instance (Bounded k, Ord k) => Foldable (RingMap k) where
foldr f initVal = Map.foldr traversingFR initVal . getRingMap
where
traversingFR (KeyEntry a) acc = f a acc
traversingFR (ProxyEntry _ Nothing) acc = acc
traversingFR (ProxyEntry _ (Just entry)) acc = traversingFR entry acc
foldl f initVal = Map.foldl traversingFL initVal . getRingMap
where
traversingFL acc (KeyEntry a) = f acc a
traversingFL acc (ProxyEntry _ Nothing) = acc
traversingFL acc (ProxyEntry _ (Just entry)) = traversingFL acc entry
-- | entry of a 'RingMap' that holds a value and can also
-- wrap around the lookup direction at the edges of the name space.
data RingEntry a k = KeyEntry a
| ProxyEntry (k, ProxyDirection) (Maybe (RingEntry a k))
data RingEntry k a = KeyEntry a
| ProxyEntry (k, ProxyDirection) (Maybe (RingEntry k a))
deriving (Show, Eq)
-- | as a compromise, only KeyEntry components are ordered by their key
-- while ProxyEntry components should never be tried to be ordered.
instance (HasKeyID a k, Eq k, Ord a, Bounded k, Ord k) => Ord (RingEntry a k) where
instance (HasKeyID k a, Eq k, Ord a, Bounded k, Ord k) => Ord (RingEntry k a) where
a `compare` b = compare (extractID a) (extractID b)
where
extractID :: (HasKeyID a k, Ord a, Bounded k, Ord k) => RingEntry a k -> k
extractID :: (HasKeyID k a, Ord a, Bounded k, Ord k) => RingEntry k a -> k
extractID (KeyEntry e) = getKeyID e
extractID ProxyEntry{} = error "proxy entries should never appear outside of the RingMap"
@ -49,51 +74,51 @@ instance Enum ProxyDirection where
fromEnum Backwards = - 1
fromEnum Forwards = 1
-- | helper function for getting the a from a RingEntry a k
extractRingEntry :: (HasKeyID a k, Bounded k, Ord k) => RingEntry a k -> Maybe a
-- | helper function for getting the a from a RingEntry k a
extractRingEntry :: (Bounded k, Ord k) => RingEntry k a -> Maybe a
extractRingEntry (KeyEntry entry) = Just entry
extractRingEntry (ProxyEntry _ (Just (KeyEntry entry))) = Just entry
extractRingEntry _ = Nothing
-- | An empty 'RingMap' needs to be initialised with 2 proxy entries,
-- linking the modular name space together by connecting @minBound@ and @maxBound@
emptyRMap :: (HasKeyID a k, Bounded k, Ord k) => RingMap a k
emptyRMap :: (Bounded k, Ord k) => RingMap k a
emptyRMap = RingMap . Map.fromList $ proxyEntry <$> [(maxBound, (minBound, Forwards)), (minBound, (maxBound, Backwards))]
where
proxyEntry (from,to) = (from, ProxyEntry to Nothing)
-- | Maybe returns the entry stored at given key
rMapLookup :: (HasKeyID a k, Bounded k, Ord k)
rMapLookup :: (Bounded k, Ord k)
=> k -- ^lookup key
-> RingMap a k -- ^lookup cache
-> RingMap k a -- ^lookup cache
-> Maybe a
rMapLookup key rmap = extractRingEntry =<< Map.lookup key (getRingMap rmap)
-- | returns number of present 'KeyEntry' in a properly initialised 'RingMap'
rMapSize :: (HasKeyID a k, Integral i, Bounded k, Ord k)
=> RingMap a k
rMapSize :: (Integral i, Bounded k, Ord k)
=> RingMap k a
-> i
rMapSize rmap = fromIntegral $ Map.size innerMap - oneIfEntry rmap minBound - oneIfEntry rmap maxBound
where
innerMap = getRingMap rmap
oneIfEntry :: (HasKeyID a k, Integral i, Bounded k, Ord k) => RingMap a k -> k -> i
oneIfEntry :: (Integral i, Bounded k, Ord k) => RingMap k a -> k -> i
oneIfEntry rmap' nid
| isNothing (rMapLookup nid rmap') = 1
| otherwise = 0
-- | a wrapper around lookup functions, making the lookup redirectable by a @ProxyEntry@
-- to simulate a modular ring
lookupWrapper :: (HasKeyID a k, Bounded k, Ord k, Num k)
=> (k -> Map.Map k (RingEntry a k) -> Maybe (k, RingEntry a k))
-> (k -> Map.Map k (RingEntry a k) -> Maybe (k, RingEntry a k))
lookupWrapper :: (Bounded k, Ord k, Num k)
=> (k -> Map.Map k (RingEntry k a) -> Maybe (k, RingEntry k a))
-> (k -> Map.Map k (RingEntry k a) -> Maybe (k, RingEntry k a))
-> ProxyDirection
-> k
-> RingMap a k
-> Maybe a
-> RingMap k a
-> Maybe (k, a)
lookupWrapper f fRepeat direction key rmap =
case f key $ getRingMap rmap of
-- the proxy entry found holds a
Just (_, ProxyEntry _ (Just (KeyEntry entry))) -> Just entry
Just (foundKey, ProxyEntry _ (Just (KeyEntry entry))) -> Just (foundKey, entry)
-- proxy entry holds another proxy entry, this should not happen
Just (_, ProxyEntry _ (Just (ProxyEntry _ _))) -> Nothing
-- proxy entry without own entry is a pointer on where to continue
@ -106,10 +131,10 @@ lookupWrapper f fRepeat direction key rmap =
then lookupWrapper fRepeat fRepeat direction newKey rmap
else Nothing
-- normal entries are returned
Just (_, KeyEntry entry) -> Just entry
Just (foundKey, KeyEntry entry) -> Just (foundKey, entry)
Nothing -> Nothing
where
rMapNotEmpty :: (HasKeyID a k, Bounded k, Ord k) => RingMap a k -> Bool
rMapNotEmpty :: (Bounded k, Ord k) => RingMap k a -> Bool
rMapNotEmpty rmap' = (Map.size (getRingMap rmap') > 2) -- there are more than the 2 ProxyEntries
|| isJust (rMapLookup minBound rmap') -- or one of the ProxyEntries holds a node
|| isJust (rMapLookup maxBound rmap')
@ -117,32 +142,34 @@ lookupWrapper f fRepeat direction key rmap =
-- | find the successor node to a given key on a modular EpiChord ring.
-- Note: The EpiChord definition of "successor" includes the node at the key itself,
-- if existing.
rMapLookupSucc :: (HasKeyID a k, Bounded k, Ord k, Num k)
rMapLookupSucc :: (Bounded k, Ord k, Num k)
=> k -- ^lookup key
-> RingMap a k -- ^ring cache
-> Maybe a
-> RingMap k a -- ^ring cache
-> Maybe (k, a)
rMapLookupSucc = lookupWrapper Map.lookupGE Map.lookupGE Forwards
-- | find the predecessor node to a given key on a modular EpiChord ring.
rMapLookupPred :: (HasKeyID a k, Bounded k, Ord k, Num k)
rMapLookupPred :: (Bounded k, Ord k, Num k)
=> k -- ^lookup key
-> RingMap a k -- ^ring cache
-> Maybe a
-> RingMap k a -- ^ring cache
-> Maybe (k, a)
rMapLookupPred = lookupWrapper Map.lookupLT Map.lookupLE Backwards
addRMapEntryWith :: (HasKeyID a k, Bounded k, Ord k)
=> (RingEntry a k -> RingEntry a k -> RingEntry a k)
-> a
-> RingMap a k
-> RingMap a k
addRMapEntryWith combineFunc entry = RingMap
. Map.insertWith combineFunc (getKeyID entry) (KeyEntry entry)
addRMapEntryWith :: (Bounded k, Ord k)
=> (RingEntry k a -> RingEntry k a -> RingEntry k a) -- ^ f new_value mold_value
-> k -- ^ key
-> a -- ^ value
-> RingMap k a
-> RingMap k a
addRMapEntryWith combineFunc key entry = RingMap
. Map.insertWith combineFunc key (KeyEntry entry)
. getRingMap
addRMapEntry :: (HasKeyID a k, Bounded k, Ord k)
=> a
-> RingMap a k
-> RingMap a k
addRMapEntry :: (Bounded k, Ord k)
=> k -- ^ key
-> a -- ^ value
-> RingMap k a
-> RingMap k a
addRMapEntry = addRMapEntryWith insertCombineFunction
where
insertCombineFunction newVal oldVal =
@ -151,30 +178,30 @@ addRMapEntry = addRMapEntryWith insertCombineFunction
KeyEntry _ -> newVal
addRMapEntries :: (Foldable t, HasKeyID a k, Bounded k, Ord k)
=> t a
-> RingMap a k
-> RingMap a k
addRMapEntries entries rmap = foldr' addRMapEntry rmap entries
addRMapEntries :: (Foldable t, Bounded k, Ord k)
=> t (k, a)
-> RingMap k a
-> RingMap k a
addRMapEntries entries rmap = foldr' (\(k, v) rmap' -> addRMapEntry k v rmap') rmap entries
setRMapEntries :: (Foldable t, HasKeyID a k, Bounded k, Ord k)
=> t a
-> RingMap a k
setRMapEntries :: (Foldable t, Bounded k, Ord k)
=> t (k, a)
-> RingMap k a
setRMapEntries entries = addRMapEntries entries emptyRMap
deleteRMapEntry :: (HasKeyID a k, Bounded k, Ord k)
deleteRMapEntry :: (Bounded k, Ord k)
=> k
-> RingMap a k
-> RingMap a k
-> RingMap k a
-> RingMap k a
deleteRMapEntry nid = RingMap . Map.update modifier nid . getRingMap
where
modifier (ProxyEntry idPointer _) = Just (ProxyEntry idPointer Nothing)
modifier KeyEntry {} = Nothing
rMapToList :: (HasKeyID a k, Bounded k, Ord k) => RingMap a k -> [a]
rMapToList :: (Bounded k, Ord k) => RingMap k a -> [a]
rMapToList = mapMaybe extractRingEntry . Map.elems . getRingMap
rMapFromList :: (HasKeyID a k, Bounded k, Ord k) => [a] -> RingMap a k
rMapFromList :: (Bounded k, Ord k) => [(k, a)] -> RingMap k a
rMapFromList = setRMapEntries
-- | takes up to i entries from a 'RingMap' by calling a getter function on a
@ -182,49 +209,64 @@ rMapFromList = setRMapEntries
-- Stops once i entries have been taken or an entry has been encountered twice
-- (meaning the ring has been traversed completely).
-- Forms the basis for 'takeRMapSuccessors' and 'takeRMapPredecessors'.
takeRMapEntries_ :: (HasKeyID a k, Integral i, Bounded k, Ord k)
=> (k -> RingMap a k -> Maybe a)
-> k
-> i
-> RingMap a k
-> [a]
takeRMapEntries_ :: (Integral i, Bounded k, Ord k)
=> (k -> RingMap k a -> Maybe (k, a)) -- ^ parameterisable getter function to determine lookup direction
-> k -- ^ starting key
-> i -- ^ number of maximum values to take
-> RingMap k a
-> [a] -- ^ values taken
-- TODO: might be more efficient with dlists
takeRMapEntries_ getterFunc startAt num rmap = reverse $
case getterFunc startAt rmap of
Nothing -> []
Just anEntry -> takeEntriesUntil rmap getterFunc (getKeyID anEntry) (getKeyID anEntry) (num-1) [anEntry]
where
-- for some reason, just reusing the already-bound @rmap@ and @getterFunc@
-- variables leads to a type error, these need to be passed explicitly
takeEntriesUntil :: (HasKeyID a k, Integral i, Bounded k, Ord k)
=> RingMap a k
-> (k -> RingMap a k -> Maybe a) -- getter function
-> k
-> k
-> i
-> [a]
-> [a]
takeEntriesUntil rmap' getterFunc' havingReached previousEntry remaining takeAcc
| remaining <= 0 = takeAcc
| getKeyID (fromJust $ getterFunc' previousEntry rmap') == havingReached = takeAcc
| otherwise = let (Just gotEntry) = getterFunc' previousEntry rmap'
in takeEntriesUntil rmap' getterFunc' havingReached (getKeyID gotEntry) (remaining-1) (gotEntry:takeAcc)
Just (foundKey, anEntry) -> takeEntriesUntil_ rmap getterFunc foundKey foundKey (Just $ num-1) [anEntry]
takeRMapPredecessors :: (HasKeyID a k, Integral i, Bounded k, Ord k, Num k)
takeEntriesUntil_ :: (Integral i, Bounded k, Ord k)
=> RingMap k a
-> (k -> RingMap k a -> Maybe (k, a)) -- getter function
-> k -- limit value
-> k -- start value
-> Maybe i -- possible number limit
-> [a]
-> [a]
takeEntriesUntil_ _rmap' _getterFunc' _havingReached _previousEntry (Just remaining) takeAcc
-- length limit reached
| remaining <= 0 = takeAcc
takeEntriesUntil_ rmap' getterFunc' havingReached previousEntry numLimit takeAcc =
case nextEntry of
Just (fKey, gotEntry)
| fKey == havingReached -> takeAcc
| otherwise -> takeEntriesUntil_ rmap' getterFunc' havingReached fKey (fmap pred numLimit) (gotEntry:takeAcc)
Nothing -> takeAcc
where
nextEntry = getterFunc' previousEntry rmap'
takeRMapPredecessors :: (Integral i, Bounded k, Ord k, Num k)
=> k
-> i
-> RingMap a k
-> RingMap k a
-> [a]
takeRMapPredecessors = takeRMapEntries_ rMapLookupPred
takeRMapSuccessors :: (HasKeyID a k, Integral i, Bounded k, Ord k, Num k)
takeRMapSuccessors :: (Integral i, Bounded k, Ord k, Num k)
=> k
-> i
-> RingMap a k
-> RingMap k a
-> [a]
takeRMapSuccessors = takeRMapEntries_ rMapLookupSucc
-- clean up cache entries: once now - entry > maxAge
-- transfer difference now - entry to other node
takeRMapPredecessorsFromTo :: (Bounded k, Ord k, Num k)
=> k -- start value for taking
-> k -- stop value for taking
-> RingMap k a
-> [a]
takeRMapPredecessorsFromTo fromVal toVal rmap = takeEntriesUntil_ rmap rMapLookupPred toVal fromVal Nothing []
takeRMapSuccessorsFromTo :: (Bounded k, Ord k, Num k)
=> k -- start value for taking
-> k -- stop value for taking
-> RingMap k a
-> [a]
takeRMapSuccessorsFromTo fromVal toVal rmap = takeEntriesUntil_ rmap rMapLookupSucc toVal fromVal Nothing []

View file

@ -1,9 +0,0 @@
{-# LANGUAGE MultiParamTypeClasses #-}
module Hash2Pub.ServiceTypes where
import Hash2Pub.FediChord (DHT (..))
class Service s d where
-- | run the service
runService :: (Integral i) => d -> String -> i -> IO (s d)
getServicePort :: (Integral i) => s d -> i

View file

@ -1,3 +1,5 @@
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
module FediChordSpec where
@ -187,6 +189,7 @@ spec = do
lReqPayload = LeaveRequestPayload {
leaveSuccessors = someNodes
, leavePredecessors = someNodes
, leaveDoMigration = True
}
stabReqPayload = StabiliseRequestPayload
pingReqPayload = PingRequestPayload
@ -292,12 +295,15 @@ exampleNodeState = RemoteNodeState {
, vServerID = 0
}
exampleLocalNode :: IO LocalNodeState
exampleLocalNode = nodeStateInit =<< (newTVarIO $ RealNode {
exampleLocalNode :: IO (LocalNodeState MockService)
exampleLocalNode = do
realNode <- newTVarIO $ RealNode {
vservers = []
, nodeConfig = exampleFediConf
, bootstrapNodes = confBootstrapNodes exampleFediConf
})
, nodeService = MockService
}
nodeStateInit realNode
exampleFediConf :: FediChordConf
@ -313,3 +319,9 @@ exampleVs :: (Integral i) => i
exampleVs = 4
exampleIp :: HostAddress6
exampleIp = tupleToHostAddress6 (0x2001, 0x16b8, 0x755a, 0xb110, 0x7d6a, 0x12ab, 0xf0c5, 0x386e)
data MockService d = MockService
instance DHT d => Service MockService d where
runService _ _ = pure MockService
getListeningPortFromService = const 1337