Compare commits

...

5 commits

Author SHA1 Message Date
Trolli Schmittlauch 736815ea83 normalise hastag unicode representation of incoming posts 2020-07-27 21:49:49 +02:00
Trolli Schmittlauch daae9d0b38 process and enqueue incoming posts 2020-07-27 21:39:49 +02:00
Trolli Schmittlauch 04423171fd define data types for post and subscription storage 2020-07-27 13:20:15 +02:00
Trolli Schmittlauch 7878c67635 adjust rest of code to refactored RingMap 2020-07-27 00:37:31 +02:00
Trolli Schmittlauch 988144e9e7 further relax constrains on RingMap
key now needs to be explicitly given at insert, instead of
deriving it from the value. This makes it possible to store values where
a key cannot be extracted from (HasKeyID)

contributes to #62, #32, #41
2020-07-26 18:55:55 +02:00
6 changed files with 222 additions and 143 deletions

View file

@ -46,7 +46,7 @@ category: Network
extra-source-files: CHANGELOG.md
common deps
build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers
build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms
ghc-options: -Wall

View file

@ -130,23 +130,25 @@ closestCachePredecessors remainingLookups lastID nCache
-- Looks up the successor of the lookup key on a 'RingMap' representation of the
-- predecessor list with the node itself added. If the result is the same as the node
-- itself then it falls into the responsibility interval.
isInOwnResponsibilitySlice :: HasKeyID a NodeID => a -> LocalNodeState -> Bool
isInOwnResponsibilitySlice lookupTarget ownNs = (getKeyID <$> rMapLookupSucc (getKeyID lookupTarget :: NodeID) predecessorRMap) == pure (getNid ownNs)
isInOwnResponsibilitySlice :: HasKeyID NodeID a => a -> LocalNodeState -> Bool
isInOwnResponsibilitySlice lookupTarget ownNs = (fst <$> rMapLookupSucc (getKeyID lookupTarget :: NodeID) predecessorRMap) == pure (getNid ownNs)
where
predecessorList = predecessors ownNs
-- add node itself to RingMap representation, to distinguish between
-- responsibility of own node and predecessor
predecessorRMap = addRMapEntry (toRemoteNodeState ownNs) $ rMapFromList predecessorList
predecessorRMap = addRMapEntry (getKeyID ownRemote) ownRemote $ rMapFromList (keyValuePair <$> predecessorList) :: RingMap NodeID RemoteNodeState
ownRemote = toRemoteNodeState ownNs
closestPredecessor = headMay predecessorList
isPossiblePredecessor :: HasKeyID a NodeID => a -> LocalNodeState -> Bool
isPossiblePredecessor :: HasKeyID NodeID a => a -> LocalNodeState -> Bool
isPossiblePredecessor = isInOwnResponsibilitySlice
isPossibleSuccessor :: HasKeyID a NodeID => a -> LocalNodeState -> Bool
isPossibleSuccessor lookupTarget ownNs = (getKeyID <$> rMapLookupPred (getKeyID lookupTarget :: NodeID) successorRMap) == pure (getNid ownNs)
isPossibleSuccessor :: HasKeyID NodeID a => a -> LocalNodeState -> Bool
isPossibleSuccessor lookupTarget ownNs = (fst <$> rMapLookupPred (getKeyID lookupTarget :: NodeID) successorRMap) == pure (getNid ownNs)
where
successorList = successors ownNs
successorRMap = addRMapEntry (toRemoteNodeState ownNs) $ rMapFromList successorList
successorRMap = addRMapEntry (getKeyID ownRemote) ownRemote $ rMapFromList (keyValuePair <$> successorList)
ownRemote = toRemoteNodeState ownNs
closestSuccessor = headMay successorList
-- cache operations
@ -169,7 +171,8 @@ addCacheEntryPure now (RemoteCacheEntry ns ts) cache =
let
-- TODO: limit diffSeconds to some maximum value to prevent malicious nodes from inserting entries valid nearly until eternity
timestamp' = if ts <= now then ts else now
newCache = addRMapEntryWith insertCombineFunction (CacheEntry False ns timestamp') cache
newEntry = CacheEntry False ns timestamp'
newCache = addRMapEntryWith insertCombineFunction (getKeyID newEntry) newEntry cache
insertCombineFunction newVal@(KeyEntry (CacheEntry newValidationState newNode newTimestamp)) oldVal =
case oldVal of
ProxyEntry n _ -> ProxyEntry n (Just newVal)
@ -202,7 +205,7 @@ addNodeAsVerifiedPure :: POSIXTime
-> RemoteNodeState
-> NodeCache
-> NodeCache
addNodeAsVerifiedPure now node = addRMapEntry (CacheEntry True node now)
addNodeAsVerifiedPure now node = addRMapEntry (getKeyID node) (CacheEntry True node now)

View file

@ -1,5 +1,6 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE MultiParamTypeClasses #-}
@ -26,8 +27,7 @@ module Hash2Pub.FediChordTypes (
, CacheEntry(..)
, RingEntry(..)
, RingMap(..)
, HasKeyID
, getKeyID
, HasKeyID(..)
, rMapSize
, rMapLookup
, rMapLookupPred
@ -271,31 +271,31 @@ instance Typeable a => Show (TQueue a) where
-- | convenience function that replaces the predecessors of a 'LocalNodeState' with the k closest nodes from the provided list
setPredecessors :: [RemoteNodeState] -> LocalNodeState -> LocalNodeState
setPredecessors preds ns = ns {predecessors = takeRMapPredecessors (getNid ns) (kNeighbours ns) . rMapFromList . filter ((/=) (getNid ns) . getNid) $ preds}
setPredecessors preds ns = ns {predecessors = takeRMapPredecessors (getNid ns) (kNeighbours ns) . rMapFromList . fmap keyValuePair . filter ((/=) (getNid ns) . getNid) $ preds}
-- | convenience function that replaces the successors of a 'LocalNodeState' with the k closest nodes from the provided list
setSuccessors :: [RemoteNodeState] -> LocalNodeState -> LocalNodeState
setSuccessors succs ns = ns {successors = takeRMapSuccessors (getNid ns) (kNeighbours ns) . rMapFromList . filter ((/=) (getNid ns) . getNid) $ succs}
setSuccessors succs ns = ns {successors = takeRMapSuccessors (getNid ns) (kNeighbours ns) . rMapFromList . fmap keyValuePair . filter ((/=) (getNid ns) . getNid) $ succs}
-- | sets the predecessors of a 'LocalNodeState' to the closest k nodes of the current predecessors and the provided list, combined
addPredecessors :: [RemoteNodeState] -> LocalNodeState -> LocalNodeState
addPredecessors preds ns = ns {predecessors = takeRMapPredecessors (getNid ns) (kNeighbours ns) . addRMapEntries (filter ((/=) (getNid ns) . getNid) preds) . rMapFromList $ predecessors ns}
addPredecessors preds ns = ns {predecessors = takeRMapPredecessors (getNid ns) (kNeighbours ns) . addRMapEntries (keyValuePair <$> filter ((/=) (getNid ns) . getNid) preds) . rMapFromList . fmap keyValuePair $ predecessors ns}
-- | sets the successors of a 'LocalNodeState' to the closest k nodes of the current successors and the provided list, combined
addSuccessors :: [RemoteNodeState] -> LocalNodeState -> LocalNodeState
addSuccessors succs ns = ns {successors = takeRMapSuccessors (getNid ns) (kNeighbours ns) . addRMapEntries (filter ((/=) (getNid ns) . getNid) succs) . rMapFromList $ successors ns}
addSuccessors succs ns = ns {successors = takeRMapSuccessors (getNid ns) (kNeighbours ns) . addRMapEntries (keyValuePair <$> filter ((/=) (getNid ns) . getNid) succs) . rMapFromList . fmap keyValuePair $ successors ns}
instance HasKeyID RemoteNodeState NodeID where
instance HasKeyID NodeID RemoteNodeState where
getKeyID = getNid
instance HasKeyID a k => HasKeyID (CacheEntry a) k where
instance HasKeyID k a => HasKeyID k (CacheEntry a) where
getKeyID (CacheEntry _ obj _) = getKeyID obj
instance HasKeyID NodeID NodeID where
getKeyID = id
type NodeCacheEntry = CacheEntry RemoteNodeState
type NodeCache = RingMap NodeCacheEntry NodeID
type NodeCache = RingMap NodeID NodeCacheEntry
type LookupCacheEntry = CacheEntry (String, PortNumber)
type LookupCache = Map.Map NodeID LookupCacheEntry
@ -319,12 +319,15 @@ cacheLookup = rMapLookup
cacheLookupSucc :: NodeID -- ^lookup key
-> NodeCache -- ^ring cache
-> Maybe NodeCacheEntry
cacheLookupSucc = rMapLookupSucc
cacheLookupSucc key cache = snd <$> rMapLookupSucc key cache
cacheLookupPred :: NodeID -- ^lookup key
-> NodeCache -- ^ring cache
-> Maybe NodeCacheEntry
cacheLookupPred = rMapLookupPred
cacheLookupPred key cache = snd <$> rMapLookupPred key cache
-- clean up cache entries: once now - entry > maxAge
-- transfer difference now - entry to other node
-- | return the @NodeState@ data from a cache entry without checking its validation status
cacheGetNodeStateUnvalidated :: CacheEntry RemoteNodeState -> RemoteNodeState
@ -410,7 +413,7 @@ data FediChordConf = FediChordConf
class DHT d where
-- | lookup the responsible host handling a given key string,
-- possibly from a lookup cache
-- possiblggy from a lookup cache
lookupKey :: d -> String -> IO (Maybe (String, PortNumber))
-- | lookup the responsible host handling a given key string,
-- but force the DHT to do a fresh lookup instead of returning a cached result.

View file

@ -1,54 +1,100 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE InstanceSigs #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE InstanceSigs #-}
module Hash2Pub.PostService where
import Control.Concurrent
import qualified Data.ByteString.Lazy.UTF8 as BSU
import Data.Maybe (fromMaybe)
import Data.String (fromString)
import qualified Data.Text as Txt
import Control.Concurrent.STM
import Control.Concurrent.STM.TChan
import Control.Concurrent.STM.TQueue
import Control.Concurrent.STM.TVar
import Control.Monad (forM_, forever)
import Control.Monad.IO.Class (liftIO)
import qualified Data.ByteString.Lazy.UTF8 as BSU
import qualified Data.HashMap.Strict as HMap
import qualified Data.HashSet as HSet
import Data.Maybe (fromMaybe)
import Data.String (fromString)
import qualified Data.Text.Lazy as Txt
import Data.Text.Normalize (NormalizationMode (NFC),
normalize)
import Data.Time.Clock.POSIX
import System.Random
import qualified Network.Wai.Handler.Warp as Warp
import qualified Network.Wai.Handler.Warp as Warp
import Servant
import Hash2Pub.FediChord
import Hash2Pub.FediChordTypes
import Hash2Pub.RingMap
import Hash2Pub.ServiceTypes
data PostService d = PostService
{ psPort :: Warp.Port
, psHost :: String
{ psPort :: Warp.Port
, psHost :: String
-- queues, other data structures
, baseDHT :: (DHT d) => d
, serviceThread :: ThreadId
, baseDHT :: (DHT d) => d
, serviceThread :: TVar ThreadId
, subscribers :: TVar (RingMap NodeID TagSubscribers)
-- ^ for each tag store the subscribers + their queue
, ownSubscriptions :: TVar (HMap.HashMap NodeID POSIXTime)
-- ^ tags subscribed by the own node have an assigned lease time
, ownPosts :: TVar (HSet.HashSet Txt.Text)
-- ^ just store the existence of posts for saving memory,
, relayInQueue :: TQueue (Hashtag, PostID, PostContent)
-- ^ Queue for processing incoming posts of own instance asynchronously
}
type Hashtag = Txt.Text
type PostID = Txt.Text
type PostContent = Txt.Text
-- | For each handled tag, store its subscribers and provide a
-- broadcast 'TChan' for enqueuing posts
type RelayTags = RingMap NodeID (TagSubscribers, TChan PostContent)
-- | each subscriber is identified by its contact data "hostname" "port"
-- and holds a TChan duplicated from the broadcast TChan of the tag
type TagSubscribers = HMap.HashMap (String, Int) (TChan PostContent)
instance DHT d => Service PostService d where
-- | initialise 'PostService' data structures and run server
runService dht host port = do
-- create necessary TVars
threadVar <- newTVarIO =<< myThreadId -- own thread ID as placeholder
subscriberVar <- newTVarIO emptyRMap
ownSubsVar <- newTVarIO HMap.empty
ownPostVar <- newTVarIO HSet.empty
relayInQueue' <- newTQueueIO
let
thisService = PostService {
psPort = port'
, psHost = host
, baseDHT = dht
, serviceThread = threadVar
, subscribers = subscriberVar
, ownSubscriptions = ownSubsVar
, ownPosts = ownPostVar
, relayInQueue = relayInQueue'
}
port' = fromIntegral port
warpSettings = Warp.setPort port' . Warp.setHost (fromString host) $ Warp.defaultSettings
servThread <- forkIO $ Warp.runSettings warpSettings postServiceApplication
pure $ PostService {
psPort = port'
, psHost = host
, baseDHT = dht
, serviceThread = servThread
}
servThreadID <- forkIO $ Warp.runSettings warpSettings $ postServiceApplication thisService
-- update thread ID after fork
atomically $ writeTVar threadVar servThreadID
pure thisService
getServicePort s = fromIntegral $ psPort s
-- | return a WAI application
postServiceApplication :: Application
postServiceApplication = serve exposedPostServiceAPI postServer
postServiceApplication :: PostService d -> Application
postServiceApplication serv = serve exposedPostServiceAPI $ postServer serv
servicePort = 8081
-- | needed for guiding type inference
exposedPostServiceAPI :: Proxy PostServiceAPI
@ -58,7 +104,7 @@ exposedPostServiceAPI = Proxy
-- ========= HTTP API and handlers =============
type PostServiceAPI = "relay" :> "inbox" :> ReqBody '[PlainText] Txt.Text :> PostCreated '[PlainText] Txt.Text
type PostServiceAPI = "relay" :> "inbox" :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent
-- ^ delivery endpoint of newly published posts of the relay's instance
:<|> "relay" :> "subscribers" :> ReqBody '[PlainText] Txt.Text :> Post '[PlainText] Txt.Text
-- ^ endpoint for delivering the subscriptions and outstanding queue
@ -77,37 +123,51 @@ type PostServiceAPI = "relay" :> "inbox" :> ReqBody '[PlainText] Txt.Text :> Pos
-- the Origin header to $hashtag
postServer :: Server PostServiceAPI
postServer = relayInbox
:<|> subscriptionDelivery
:<|> postFetch
:<|> postMultiFetch
:<|> tagDelivery
:<|> tagSubscribe
:<|> tagUnsubscribe
postServer :: PostService d -> Server PostServiceAPI
postServer service = relayInbox service
:<|> subscriptionDelivery service
:<|> postFetch service
:<|> postMultiFetch service
:<|> tagDelivery service
:<|> tagSubscribe service
:<|> tagUnsubscribe service
relayInbox :: Txt.Text -> Handler Txt.Text
relayInbox post = pure $ "Here be InboxDragons with " <> post
relayInbox :: PostService d -> Txt.Text -> Handler NoContent
relayInbox serv post = do
-- extract contained hashtags
let
containedTags = fmap (Txt.fromStrict . normalize NFC . Txt.toStrict . Txt.tail) . filter ((==) '#' . Txt.head) . Txt.words $ post
-- generate post ID
postId <- liftIO $ Txt.pack . show <$> (randomRIO (0, 2^128-1) :: IO Integer)
-- add ID to own posts
liftIO . atomically $ modifyTVar' (ownPosts serv) (HSet.insert postId)
-- enqueue a relay job for each tag
liftIO $ forM_ (containedTags :: [Txt.Text]) (\tag ->
atomically $ writeTQueue (relayInQueue serv) (tag, postId, post)
)
pure NoContent
subscriptionDelivery :: Txt.Text -> Handler Txt.Text
subscriptionDelivery subList = pure $ "Here be Subscription List dragons: " <> subList
postFetch :: Txt.Text -> Handler Txt.Text
postFetch postID = pure $ "Here be a post with dragon ID " <> postID
postMultiFetch :: Txt.Text -> Handler Txt.Text
postMultiFetch postIDs = pure $ "Here be multiple post dragons: "
subscriptionDelivery :: PostService d -> Txt.Text -> Handler Txt.Text
subscriptionDelivery serv subList = pure $ "Here be Subscription List dragons: " <> subList
postFetch :: PostService d -> Txt.Text -> Handler Txt.Text
postFetch serv postID = pure $ "Here be a post with dragon ID " <> postID
postMultiFetch :: PostService d -> Txt.Text -> Handler Txt.Text
postMultiFetch serv postIDs = pure $ "Here be multiple post dragons: "
<> (Txt.unwords . Txt.lines $ postIDs)
tagDelivery :: Txt.Text -> Txt.Text -> Handler Txt.Text
tagDelivery hashtag posts = pure $ "Here be #" <> hashtag <> " dragons with " <> posts
tagDelivery :: PostService d -> Txt.Text -> Txt.Text -> Handler Txt.Text
tagDelivery serv hashtag posts = pure $ "Here be #" <> hashtag <> " dragons with " <> posts
tagSubscribe :: Txt.Text -> Maybe Txt.Text -> Handler Integer
tagSubscribe hashtag origin = pure 42
tagSubscribe :: PostService d -> Txt.Text -> Maybe Txt.Text -> Handler Integer
tagSubscribe serv hashtag origin = pure 42
tagUnsubscribe :: Txt.Text -> Maybe Txt.Text -> Handler Txt.Text
tagUnsubscribe hashtag origin = pure $ "Here be a dragon unsubscription from " <> fromMaybe "Nothing" origin <> " to " <> hashtag
tagUnsubscribe :: PostService d -> Txt.Text -> Maybe Txt.Text -> Handler Txt.Text
tagUnsubscribe serv hashtag origin = pure $ "Here be a dragon unsubscription from " <> fromMaybe "Nothing" origin <> " to " <> hashtag
-- | define how to convert all showable types to PlainText

View file

@ -5,36 +5,38 @@ module Hash2Pub.RingMap where
import Data.Foldable (foldr')
import qualified Data.Map.Strict as Map
import Data.Maybe (fromJust, isJust, isNothing, mapMaybe)
import Data.Maybe (isJust, isNothing, mapMaybe)
-- | Class for all types that can be identified via a EpiChord key.
-- Used for restricting the types a 'RingMap' can store
class (Eq a, Show a, Bounded k, Ord k) => HasKeyID a k where
class (Eq a, Show a, Bounded k, Ord k) => HasKeyID k a where
getKeyID :: a -> k
keyValuePair :: a -> (k, a)
keyValuePair val = (getKeyID val, val)
-- | generic data structure for holding elements with a key and modular lookup
newtype RingMap a k = RingMap { getRingMap :: (HasKeyID a k, Bounded k, Ord k) => Map.Map k (RingEntry a k) }
newtype RingMap k a = RingMap { getRingMap :: (Bounded k, Ord k) => Map.Map k (RingEntry k a) }
instance (HasKeyID a k, Bounded k, Ord k) => Eq (RingMap a k) where
instance (Bounded k, Ord k, Eq a) => Eq (RingMap k a) where
a == b = getRingMap a == getRingMap b
instance (HasKeyID a k, Bounded k, Ord k, Show k) => Show (RingMap a k) where
instance (Bounded k, Ord k, Show k, Show a) => Show (RingMap k a) where
show rmap = shows "RingMap " (show $ getRingMap rmap)
-- | entry of a 'RingMap' that holds a value and can also
-- wrap around the lookup direction at the edges of the name space.
data RingEntry a k = KeyEntry a
| ProxyEntry (k, ProxyDirection) (Maybe (RingEntry a k))
data RingEntry k a = KeyEntry a
| ProxyEntry (k, ProxyDirection) (Maybe (RingEntry k a))
deriving (Show, Eq)
-- | as a compromise, only KeyEntry components are ordered by their key
-- while ProxyEntry components should never be tried to be ordered.
instance (HasKeyID a k, Eq k, Ord a, Bounded k, Ord k) => Ord (RingEntry a k) where
instance (HasKeyID k a, Eq k, Ord a, Bounded k, Ord k) => Ord (RingEntry k a) where
a `compare` b = compare (extractID a) (extractID b)
where
extractID :: (HasKeyID a k, Ord a, Bounded k, Ord k) => RingEntry a k -> k
extractID :: (HasKeyID k a, Ord a, Bounded k, Ord k) => RingEntry k a -> k
extractID (KeyEntry e) = getKeyID e
extractID ProxyEntry{} = error "proxy entries should never appear outside of the RingMap"
@ -49,51 +51,51 @@ instance Enum ProxyDirection where
fromEnum Backwards = - 1
fromEnum Forwards = 1
-- | helper function for getting the a from a RingEntry a k
extractRingEntry :: (HasKeyID a k, Bounded k, Ord k) => RingEntry a k -> Maybe a
-- | helper function for getting the a from a RingEntry k a
extractRingEntry :: (Bounded k, Ord k) => RingEntry k a -> Maybe a
extractRingEntry (KeyEntry entry) = Just entry
extractRingEntry (ProxyEntry _ (Just (KeyEntry entry))) = Just entry
extractRingEntry _ = Nothing
-- | An empty 'RingMap' needs to be initialised with 2 proxy entries,
-- linking the modular name space together by connecting @minBound@ and @maxBound@
emptyRMap :: (HasKeyID a k, Bounded k, Ord k) => RingMap a k
emptyRMap :: (Bounded k, Ord k) => RingMap k a
emptyRMap = RingMap . Map.fromList $ proxyEntry <$> [(maxBound, (minBound, Forwards)), (minBound, (maxBound, Backwards))]
where
proxyEntry (from,to) = (from, ProxyEntry to Nothing)
-- | Maybe returns the entry stored at given key
rMapLookup :: (HasKeyID a k, Bounded k, Ord k)
rMapLookup :: (Bounded k, Ord k)
=> k -- ^lookup key
-> RingMap a k -- ^lookup cache
-> RingMap k a -- ^lookup cache
-> Maybe a
rMapLookup key rmap = extractRingEntry =<< Map.lookup key (getRingMap rmap)
-- | returns number of present 'KeyEntry' in a properly initialised 'RingMap'
rMapSize :: (HasKeyID a k, Integral i, Bounded k, Ord k)
=> RingMap a k
rMapSize :: (Integral i, Bounded k, Ord k)
=> RingMap k a
-> i
rMapSize rmap = fromIntegral $ Map.size innerMap - oneIfEntry rmap minBound - oneIfEntry rmap maxBound
where
innerMap = getRingMap rmap
oneIfEntry :: (HasKeyID a k, Integral i, Bounded k, Ord k) => RingMap a k -> k -> i
oneIfEntry :: (Integral i, Bounded k, Ord k) => RingMap k a -> k -> i
oneIfEntry rmap' nid
| isNothing (rMapLookup nid rmap') = 1
| otherwise = 0
-- | a wrapper around lookup functions, making the lookup redirectable by a @ProxyEntry@
-- to simulate a modular ring
lookupWrapper :: (HasKeyID a k, Bounded k, Ord k, Num k)
=> (k -> Map.Map k (RingEntry a k) -> Maybe (k, RingEntry a k))
-> (k -> Map.Map k (RingEntry a k) -> Maybe (k, RingEntry a k))
lookupWrapper :: (Bounded k, Ord k, Num k)
=> (k -> Map.Map k (RingEntry k a) -> Maybe (k, RingEntry k a))
-> (k -> Map.Map k (RingEntry k a) -> Maybe (k, RingEntry k a))
-> ProxyDirection
-> k
-> RingMap a k
-> Maybe a
-> RingMap k a
-> Maybe (k, a)
lookupWrapper f fRepeat direction key rmap =
case f key $ getRingMap rmap of
-- the proxy entry found holds a
Just (_, ProxyEntry _ (Just (KeyEntry entry))) -> Just entry
Just (foundKey, ProxyEntry _ (Just (KeyEntry entry))) -> Just (foundKey, entry)
-- proxy entry holds another proxy entry, this should not happen
Just (_, ProxyEntry _ (Just (ProxyEntry _ _))) -> Nothing
-- proxy entry without own entry is a pointer on where to continue
@ -106,10 +108,10 @@ lookupWrapper f fRepeat direction key rmap =
then lookupWrapper fRepeat fRepeat direction newKey rmap
else Nothing
-- normal entries are returned
Just (_, KeyEntry entry) -> Just entry
Just (foundKey, KeyEntry entry) -> Just (foundKey, entry)
Nothing -> Nothing
where
rMapNotEmpty :: (HasKeyID a k, Bounded k, Ord k) => RingMap a k -> Bool
rMapNotEmpty :: (Bounded k, Ord k) => RingMap k a -> Bool
rMapNotEmpty rmap' = (Map.size (getRingMap rmap') > 2) -- there are more than the 2 ProxyEntries
|| isJust (rMapLookup minBound rmap') -- or one of the ProxyEntries holds a node
|| isJust (rMapLookup maxBound rmap')
@ -117,32 +119,34 @@ lookupWrapper f fRepeat direction key rmap =
-- | find the successor node to a given key on a modular EpiChord ring.
-- Note: The EpiChord definition of "successor" includes the node at the key itself,
-- if existing.
rMapLookupSucc :: (HasKeyID a k, Bounded k, Ord k, Num k)
rMapLookupSucc :: (Bounded k, Ord k, Num k)
=> k -- ^lookup key
-> RingMap a k -- ^ring cache
-> Maybe a
-> RingMap k a -- ^ring cache
-> Maybe (k, a)
rMapLookupSucc = lookupWrapper Map.lookupGE Map.lookupGE Forwards
-- | find the predecessor node to a given key on a modular EpiChord ring.
rMapLookupPred :: (HasKeyID a k, Bounded k, Ord k, Num k)
rMapLookupPred :: (Bounded k, Ord k, Num k)
=> k -- ^lookup key
-> RingMap a k -- ^ring cache
-> Maybe a
-> RingMap k a -- ^ring cache
-> Maybe (k, a)
rMapLookupPred = lookupWrapper Map.lookupLT Map.lookupLE Backwards
addRMapEntryWith :: (HasKeyID a k, Bounded k, Ord k)
=> (RingEntry a k -> RingEntry a k -> RingEntry a k)
-> a
-> RingMap a k
-> RingMap a k
addRMapEntryWith combineFunc entry = RingMap
. Map.insertWith combineFunc (getKeyID entry) (KeyEntry entry)
addRMapEntryWith :: (Bounded k, Ord k)
=> (RingEntry k a -> RingEntry k a -> RingEntry k a)
-> k -- ^ key
-> a -- ^ value
-> RingMap k a
-> RingMap k a
addRMapEntryWith combineFunc key entry = RingMap
. Map.insertWith combineFunc key (KeyEntry entry)
. getRingMap
addRMapEntry :: (HasKeyID a k, Bounded k, Ord k)
=> a
-> RingMap a k
-> RingMap a k
addRMapEntry :: (Bounded k, Ord k)
=> k -- ^ key
-> a -- ^ value
-> RingMap k a
-> RingMap k a
addRMapEntry = addRMapEntryWith insertCombineFunction
where
insertCombineFunction newVal oldVal =
@ -151,30 +155,30 @@ addRMapEntry = addRMapEntryWith insertCombineFunction
KeyEntry _ -> newVal
addRMapEntries :: (Foldable t, HasKeyID a k, Bounded k, Ord k)
=> t a
-> RingMap a k
-> RingMap a k
addRMapEntries entries rmap = foldr' addRMapEntry rmap entries
addRMapEntries :: (Foldable t, Bounded k, Ord k)
=> t (k, a)
-> RingMap k a
-> RingMap k a
addRMapEntries entries rmap = foldr' (\(k, v) rmap' -> addRMapEntry k v rmap') rmap entries
setRMapEntries :: (Foldable t, HasKeyID a k, Bounded k, Ord k)
=> t a
-> RingMap a k
setRMapEntries :: (Foldable t, Bounded k, Ord k)
=> t (k, a)
-> RingMap k a
setRMapEntries entries = addRMapEntries entries emptyRMap
deleteRMapEntry :: (HasKeyID a k, Bounded k, Ord k)
deleteRMapEntry :: (Bounded k, Ord k)
=> k
-> RingMap a k
-> RingMap a k
-> RingMap k a
-> RingMap k a
deleteRMapEntry nid = RingMap . Map.update modifier nid . getRingMap
where
modifier (ProxyEntry idPointer _) = Just (ProxyEntry idPointer Nothing)
modifier KeyEntry {} = Nothing
rMapToList :: (HasKeyID a k, Bounded k, Ord k) => RingMap a k -> [a]
rMapToList :: (Bounded k, Ord k) => RingMap k a -> [a]
rMapToList = mapMaybe extractRingEntry . Map.elems . getRingMap
rMapFromList :: (HasKeyID a k, Bounded k, Ord k) => [a] -> RingMap a k
rMapFromList :: (Bounded k, Ord k) => [(k, a)] -> RingMap k a
rMapFromList = setRMapEntries
-- | takes up to i entries from a 'RingMap' by calling a getter function on a
@ -182,49 +186,52 @@ rMapFromList = setRMapEntries
-- Stops once i entries have been taken or an entry has been encountered twice
-- (meaning the ring has been traversed completely).
-- Forms the basis for 'takeRMapSuccessors' and 'takeRMapPredecessors'.
takeRMapEntries_ :: (HasKeyID a k, Integral i, Bounded k, Ord k)
=> (k -> RingMap a k -> Maybe a)
-> k
-> i
-> RingMap a k
-> [a]
takeRMapEntries_ :: (Integral i, Bounded k, Ord k)
=> (k -> RingMap k a -> Maybe (k, a)) -- ^ parameterisable getter function to determine lookup direction
-> k -- ^ starting key
-> i -- ^ number of maximum values to take
-> RingMap k a
-> [a] -- ^ values taken
-- TODO: might be more efficient with dlists
takeRMapEntries_ getterFunc startAt num rmap = reverse $
case getterFunc startAt rmap of
Nothing -> []
Just anEntry -> takeEntriesUntil rmap getterFunc (getKeyID anEntry) (getKeyID anEntry) (num-1) [anEntry]
Just (foundKey, anEntry) -> takeEntriesUntil rmap getterFunc foundKey foundKey (num-1) [anEntry]
where
-- for some reason, just reusing the already-bound @rmap@ and @getterFunc@
-- variables leads to a type error, these need to be passed explicitly
takeEntriesUntil :: (HasKeyID a k, Integral i, Bounded k, Ord k)
=> RingMap a k
-> (k -> RingMap a k -> Maybe a) -- getter function
takeEntriesUntil :: (Integral i, Bounded k, Ord k)
=> RingMap k a
-> (k -> RingMap k a -> Maybe (k, a)) -- getter function
-> k
-> k
-> i
-> [a]
-> [a]
takeEntriesUntil rmap' getterFunc' havingReached previousEntry remaining takeAcc
-- length limit reached
| remaining <= 0 = takeAcc
| getKeyID (fromJust $ getterFunc' previousEntry rmap') == havingReached = takeAcc
| otherwise = let (Just gotEntry) = getterFunc' previousEntry rmap'
in takeEntriesUntil rmap' getterFunc' havingReached (getKeyID gotEntry) (remaining-1) (gotEntry:takeAcc)
--
| otherwise = case nextEntry of
Just (fKey, gotEntry)
| fKey == havingReached -> takeAcc
| otherwise -> takeEntriesUntil rmap' getterFunc' havingReached fKey (remaining - 1) (gotEntry:takeAcc)
Nothing -> takeAcc
where
nextEntry = getterFunc' previousEntry rmap'
takeRMapPredecessors :: (HasKeyID a k, Integral i, Bounded k, Ord k, Num k)
takeRMapPredecessors :: (Integral i, Bounded k, Ord k, Num k)
=> k
-> i
-> RingMap a k
-> RingMap k a
-> [a]
takeRMapPredecessors = takeRMapEntries_ rMapLookupPred
takeRMapSuccessors :: (HasKeyID a k, Integral i, Bounded k, Ord k, Num k)
takeRMapSuccessors :: (Integral i, Bounded k, Ord k, Num k)
=> k
-> i
-> RingMap a k
-> RingMap k a
-> [a]
takeRMapSuccessors = takeRMapEntries_ rMapLookupSucc
-- clean up cache entries: once now - entry > maxAge
-- transfer difference now - entry to other node

View file

@ -1,9 +1,15 @@
{-# LANGUAGE MultiParamTypeClasses #-}
module Hash2Pub.ServiceTypes where
import Hash2Pub.FediChord (DHT (..))
import Data.Hashable (Hashable (..))
import Hash2Pub.FediChord (DHT (..), NodeID (..))
class Service s d where
-- | run the service
runService :: (Integral i) => d -> String -> i -> IO (s d)
getServicePort :: (Integral i) => s d -> i
instance Hashable NodeID where
hashWithSalt salt = hashWithSalt salt . getNodeID
hash = hash . getNodeID