merge implemented mock relay service API

closes #32 #41
This commit is contained in:
Trolli Schmittlauch 2020-08-14 11:29:50 +02:00
commit 0ecad38748
9 changed files with 799 additions and 246 deletions

View file

@ -46,7 +46,7 @@ category: Network
extra-source-files: CHANGELOG.md
common deps
build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers
build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms, http-client, http-types
ghc-options: -Wall
@ -55,7 +55,7 @@ library
import: deps
-- Modules exported by the library.
exposed-modules: Hash2Pub.FediChord, Hash2Pub.FediChordTypes, Hash2Pub.DHTProtocol, Hash2Pub.ASN1Coding, Hash2Pub.ProtocolTypes, Hash2Pub.PostService, Hash2Pub.ServiceTypes, Hash2Pub.RingMap
exposed-modules: Hash2Pub.FediChord, Hash2Pub.FediChordTypes, Hash2Pub.DHTProtocol, Hash2Pub.ASN1Coding, Hash2Pub.ProtocolTypes, Hash2Pub.PostService, Hash2Pub.RingMap
-- Modules included in this library but not exported.
other-modules: Hash2Pub.Utils

View file

@ -10,15 +10,17 @@ import Data.IP (IPv6, toHostAddress6)
import System.Environment
import Hash2Pub.FediChord
import Hash2Pub.FediChordTypes
import Hash2Pub.PostService (PostService (..))
main :: IO ()
main = do
-- ToDo: parse and pass config
-- probably use `tomland` for that
conf <- readConfig
(fConf, sConf) <- readConfig
-- TODO: first initialise 'RealNode', then the vservers
-- ToDo: load persisted caches, bootstrapping nodes …
(serverSock, thisNode) <- fediChordInit conf
(serverSock, thisNode) <- fediChordInit fConf (runService sConf :: DHT d => d -> IO (PostService d))
-- currently no masking is necessary, as there is nothing to clean up
nodeCacheWriterThread <- forkIO $ nodeCacheWriter thisNode
-- try joining the DHT using one of the provided bootstrapping nodes
@ -41,10 +43,11 @@ main = do
pure ()
readConfig :: IO FediChordConf
readConfig :: IO (FediChordConf, ServiceConf)
readConfig = do
confDomainString : ipString : portString : bootstrapHost : bootstrapPortString : _ <- getArgs
pure $ FediChordConf {
confDomainString : ipString : portString : bootstrapHost : bootstrapPortString : servicePortString : speedup : _ <- getArgs
let
fConf = FediChordConf {
confDomain = confDomainString
, confIP = toHostAddress6 . read $ ipString
, confDhtPort = read portString
@ -53,3 +56,10 @@ readConfig = do
, confBootstrapSamplingInterval = 180
, confMaxLookupCacheAge = 300
}
sConf = ServiceConf {
confSubscriptionExpiryTime = fromIntegral $ 2*3600 `div` (read speedup :: Integer)
, confServicePort = read servicePortString
, confServiceHost = confDomainString
}
pure (fConf, sConf)

View file

@ -34,6 +34,7 @@ module Hash2Pub.DHTProtocol
, ackRequest
, isPossibleSuccessor
, isPossiblePredecessor
, isInOwnResponsibilitySlice
, isJoined
, closestCachePredecessors
)
@ -92,7 +93,7 @@ import Debug.Trace (trace)
-- TODO: evaluate more fine-grained argument passing to allow granular locking
-- | look up an ID to either claim responsibility for it or return the closest l nodes from the local cache
queryLocalCache :: LocalNodeState -> NodeCache -> Int -> NodeID -> QueryResponse
queryLocalCache :: LocalNodeState s -> NodeCache -> Int -> NodeID -> QueryResponse
queryLocalCache ownState nCache lBestNodes targetID
-- as target ID falls between own ID and first predecessor, it is handled by this node
-- This only makes sense if the node is part of the DHT by having joined.
@ -130,23 +131,25 @@ closestCachePredecessors remainingLookups lastID nCache
-- Looks up the successor of the lookup key on a 'RingMap' representation of the
-- predecessor list with the node itself added. If the result is the same as the node
-- itself then it falls into the responsibility interval.
isInOwnResponsibilitySlice :: HasKeyID a NodeID => a -> LocalNodeState -> Bool
isInOwnResponsibilitySlice lookupTarget ownNs = (getKeyID <$> rMapLookupSucc (getKeyID lookupTarget :: NodeID) predecessorRMap) == pure (getNid ownNs)
isInOwnResponsibilitySlice :: HasKeyID NodeID a => a -> LocalNodeState s -> Bool
isInOwnResponsibilitySlice lookupTarget ownNs = (fst <$> rMapLookupSucc (getKeyID lookupTarget :: NodeID) predecessorRMap) == pure (getNid ownNs)
where
predecessorList = predecessors ownNs
-- add node itself to RingMap representation, to distinguish between
-- responsibility of own node and predecessor
predecessorRMap = addRMapEntry (toRemoteNodeState ownNs) $ rMapFromList predecessorList
predecessorRMap = addRMapEntry (getKeyID ownRemote) ownRemote $ rMapFromList (keyValuePair <$> predecessorList) :: RingMap NodeID RemoteNodeState
ownRemote = toRemoteNodeState ownNs
closestPredecessor = headMay predecessorList
isPossiblePredecessor :: HasKeyID a NodeID => a -> LocalNodeState -> Bool
isPossiblePredecessor :: HasKeyID NodeID a => a -> LocalNodeState s -> Bool
isPossiblePredecessor = isInOwnResponsibilitySlice
isPossibleSuccessor :: HasKeyID a NodeID => a -> LocalNodeState -> Bool
isPossibleSuccessor lookupTarget ownNs = (getKeyID <$> rMapLookupPred (getKeyID lookupTarget :: NodeID) successorRMap) == pure (getNid ownNs)
isPossibleSuccessor :: HasKeyID NodeID a => a -> LocalNodeState s -> Bool
isPossibleSuccessor lookupTarget ownNs = (fst <$> rMapLookupPred (getKeyID lookupTarget :: NodeID) successorRMap) == pure (getNid ownNs)
where
successorList = successors ownNs
successorRMap = addRMapEntry (toRemoteNodeState ownNs) $ rMapFromList successorList
successorRMap = addRMapEntry (getKeyID ownRemote) ownRemote $ rMapFromList (keyValuePair <$> successorList)
ownRemote = toRemoteNodeState ownNs
closestSuccessor = headMay successorList
-- cache operations
@ -169,7 +172,8 @@ addCacheEntryPure now (RemoteCacheEntry ns ts) cache =
let
-- TODO: limit diffSeconds to some maximum value to prevent malicious nodes from inserting entries valid nearly until eternity
timestamp' = if ts <= now then ts else now
newCache = addRMapEntryWith insertCombineFunction (CacheEntry False ns timestamp') cache
newEntry = CacheEntry False ns timestamp'
newCache = addRMapEntryWith insertCombineFunction (getKeyID newEntry) newEntry cache
insertCombineFunction newVal@(KeyEntry (CacheEntry newValidationState newNode newTimestamp)) oldVal =
case oldVal of
ProxyEntry n _ -> ProxyEntry n (Just newVal)
@ -202,7 +206,7 @@ addNodeAsVerifiedPure :: POSIXTime
-> RemoteNodeState
-> NodeCache
-> NodeCache
addNodeAsVerifiedPure now node = addRMapEntry (CacheEntry True node now)
addNodeAsVerifiedPure now node = addRMapEntry (getKeyID node) (CacheEntry True node now)
@ -221,7 +225,7 @@ markCacheEntryAsVerified timestamp nid = RingMap . Map.adjust adjustFunc nid . g
-- | uses the successor and predecessor list of a node as an indicator for whether a
-- node has properly joined the DHT
isJoined :: LocalNodeState -> Bool
isJoined :: LocalNodeState s -> Bool
isJoined ns = not . all null $ [successors ns, predecessors ns]
-- | the size limit to be used when serialising messages for sending
@ -245,7 +249,7 @@ ackRequest _ _ = Map.empty
-- | Dispatch incoming requests to the dedicated handling and response function, and enqueue
-- the response to be sent.
handleIncomingRequest :: LocalNodeStateSTM -- ^ the handling node
handleIncomingRequest :: LocalNodeStateSTM s -- ^ the handling node
-> TQueue (BS.ByteString, SockAddr) -- ^ send queue
-> Set.Set FediChordMessage -- ^ all parts of the request to handle
-> SockAddr -- ^ source address of the request
@ -284,7 +288,7 @@ handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do
-- | execute a key ID lookup on local cache and respond with the result
respondQueryID :: LocalNodeStateSTM -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
respondQueryID :: LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
respondQueryID nsSTM msgSet = do
putStrLn "responding to a QueryID request"
-- this message cannot be split reasonably, so just
@ -325,7 +329,7 @@ respondQueryID nsSTM msgSet = do
-- | Respond to a Leave request by removing the leaving node from local data structures
-- and confirming with response.
-- TODO: copy over key data from leaver and confirm
respondLeave :: LocalNodeStateSTM -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
respondLeave :: LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
respondLeave nsSTM msgSet = do
-- combine payload of all parts
let (requestPreds, requestSuccs) = foldr' (\msg (predAcc, succAcc) ->
@ -356,7 +360,7 @@ respondLeave nsSTM msgSet = do
pure $ serialiseMessage sendMessageSize responseMsg
-- | respond to stabilise requests by returning successor and predecessor list
respondStabilise :: LocalNodeStateSTM -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
respondStabilise :: LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
respondStabilise nsSTM msgSet = do
nsSnap <- readTVarIO nsSTM
let
@ -378,7 +382,7 @@ respondStabilise nsSTM msgSet = do
-- | respond to Ping request by returning all active vserver NodeStates
respondPing :: LocalNodeStateSTM -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
respondPing :: LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
respondPing nsSTM msgSet = do
-- TODO: respond with all active VS when implementing k-choices
nsSnap <- readTVarIO nsSTM
@ -397,7 +401,7 @@ respondPing nsSTM msgSet = do
-- this modifies node state, so locking and IO seems to be necessary.
-- Still try to keep as much code as possible pure
respondJoin :: LocalNodeStateSTM -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
respondJoin :: LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
respondJoin nsSTM msgSet = do
-- atomically read and modify the node state according to the parsed request
responseMsg <- atomically $ do
@ -448,8 +452,8 @@ respondJoin nsSTM msgSet = do
-- | send a join request and return the joined 'LocalNodeState' including neighbours
requestJoin :: NodeState a => a -- ^ currently responsible node to be contacted
-> LocalNodeStateSTM -- ^ joining NodeState
-> IO (Either String LocalNodeStateSTM) -- ^ node after join with all its new information
-> LocalNodeStateSTM s -- ^ joining NodeState
-> IO (Either String (LocalNodeStateSTM s)) -- ^ node after join with all its new information
requestJoin toJoinOn ownStateSTM = do
ownState <- readTVarIO ownStateSTM
prn <- readTVarIO $ parentRealNode ownState
@ -497,7 +501,7 @@ requestJoin toJoinOn ownStateSTM = do
-- | Send a 'QueryID' 'Request' for getting the node that handles a certain key ID.
requestQueryID :: LocalNodeState -- ^ NodeState of the querying node
requestQueryID :: LocalNodeState s -- ^ NodeState of the querying node
-> NodeID -- ^ target key ID to look up
-> IO RemoteNodeState -- ^ the node responsible for handling that key
-- 1. do a local lookup for the l closest nodes
@ -512,7 +516,7 @@ requestQueryID ns targetID = do
queryIdLookupLoop firstCacheSnapshot ns 50 targetID
-- | like 'requestQueryID, but allows passing of a custom cache, e.g. for joining
queryIdLookupLoop :: NodeCache -> LocalNodeState -> Int -> NodeID -> IO RemoteNodeState
queryIdLookupLoop :: NodeCache -> LocalNodeState s -> Int -> NodeID -> IO RemoteNodeState
-- return node itself as default fallback value against infinite recursion.
-- TODO: consider using an Either instead of a default value
queryIdLookupLoop _ ns 0 _ = pure $ toRemoteNodeState ns
@ -538,7 +542,7 @@ queryIdLookupLoop cacheSnapshot ns maxAttempts targetID = do
sendQueryIdMessages :: (Integral i)
=> NodeID -- ^ target key ID to look up
-> LocalNodeState -- ^ node state of the node doing the query
-> LocalNodeState s -- ^ node state of the node doing the query
-> Maybe i -- ^ optionally provide an explicit @l@ parameter of number of nodes to be returned
-> [RemoteNodeState] -- ^ nodes to query
-> IO QueryResponse -- ^ accumulated response
@ -576,7 +580,7 @@ sendQueryIdMessages targetID ns lParam targets = do
-- | Create a QueryID message to be supplied to 'sendRequestTo'
lookupMessage :: Integral i
=> NodeID -- ^ target ID
-> LocalNodeState -- ^ sender node state
-> LocalNodeState s -- ^ sender node state
-> Maybe i -- ^ optionally provide a different l parameter
-> (Integer -> FediChordMessage)
lookupMessage targetID ns lParam = \rID -> Request rID (toRemoteNodeState ns) 1 True QueryID (Just $ pl ns targetID)
@ -586,7 +590,7 @@ lookupMessage targetID ns lParam = \rID -> Request rID (toRemoteNodeState ns) 1
-- | Send a stabilise request to provided 'RemoteNode' and, if successful,
-- return parsed neighbour lists
requestStabilise :: LocalNodeState -- ^ sending node
requestStabilise :: LocalNodeState s -- ^ sending node
-> RemoteNodeState -- ^ neighbour node to send to
-> IO (Either String ([RemoteNodeState], [RemoteNodeState])) -- ^ (predecessors, successors) of responding node
requestStabilise ns neighbour = do
@ -621,7 +625,7 @@ requestStabilise ns neighbour = do
) responses
requestPing :: LocalNodeState -- ^ sending node
requestPing :: LocalNodeState s -- ^ sending node
-> RemoteNodeState -- ^ node to be PINGed
-> IO (Either String [RemoteNodeState]) -- ^ all active vServers of the pinged node
requestPing ns target = do
@ -720,7 +724,7 @@ sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do
-- | enqueue a list of RemoteCacheEntries to be added to the global NodeCache
queueAddEntries :: Foldable c => c RemoteCacheEntry
-> LocalNodeState
-> LocalNodeState s
-> IO ()
queueAddEntries entries ns = do
now <- getPOSIXTime
@ -730,14 +734,14 @@ queueAddEntries entries ns = do
-- | enque a list of node IDs to be deleted from the global NodeCache
queueDeleteEntries :: Foldable c
=> c NodeID
-> LocalNodeState
-> LocalNodeState s
-> IO ()
queueDeleteEntries ids ns = forM_ ids $ atomically . writeTQueue (cacheWriteQueue ns) . deleteCacheEntry
-- | enque a single node ID to be deleted from the global NodeCache
queueDeleteEntry :: NodeID
-> LocalNodeState
-> LocalNodeState s
-> IO ()
queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete

View file

@ -78,7 +78,6 @@ import Data.Maybe (catMaybes, fromJust, fromMaybe,
isJust, isNothing, mapMaybe)
import qualified Data.Set as Set
import Data.Time.Clock.POSIX
import Data.Typeable (Typeable (..), typeOf)
import Data.Word
import qualified Network.ByteOrder as NetworkBytes
import Network.Socket hiding (recv, recvFrom, send,
@ -95,24 +94,34 @@ import Debug.Trace (trace)
-- | initialise data structures, compute own IDs and bind to listening socket
-- ToDo: load persisted state, thus this function already operates in IO
fediChordInit :: FediChordConf -> IO (Socket, LocalNodeStateSTM)
fediChordInit initConf = do
fediChordInit :: (Service s (RealNodeSTM s))
=> FediChordConf
-> (RealNodeSTM s -> IO (s (RealNodeSTM s))) -- ^ runner function for service
-> IO (Socket, LocalNodeStateSTM s)
fediChordInit initConf serviceRunner = do
emptyLookupCache <- newTVarIO Map.empty
let realNode = RealNode {
vservers = []
, nodeConfig = initConf
, bootstrapNodes = confBootstrapNodes initConf
, lookupCacheSTM = emptyLookupCache
, nodeService = undefined
}
realNodeSTM <- newTVarIO realNode
-- launch service and set the reference in the RealNode
serv <- serviceRunner realNodeSTM
atomically . modifyTVar' realNodeSTM $ \rn -> rn { nodeService = serv }
-- initialise a single vserver
initialState <- nodeStateInit realNodeSTM
initialStateSTM <- newTVarIO initialState
-- add vserver to list at RealNode
atomically . modifyTVar' realNodeSTM $ \rn -> rn { vservers = initialStateSTM:vservers rn }
serverSock <- mkServerSocket (getIpAddr initialState) (getDhtPort initialState)
pure (serverSock, initialStateSTM)
-- | initialises the 'NodeState' for this local node.
-- Separated from 'fediChordInit' to be usable in tests.
nodeStateInit :: RealNodeSTM -> IO LocalNodeState
nodeStateInit :: Service s (RealNodeSTM s) => RealNodeSTM s -> IO (LocalNodeState s)
nodeStateInit realNodeSTM = do
realNode <- readTVarIO realNodeSTM
cacheSTM <- newTVarIO initCache
@ -125,7 +134,7 @@ nodeStateInit realNodeSTM = do
, ipAddr = confIP conf
, nid = genNodeID (confIP conf) (confDomain conf) $ fromInteger vsID
, dhtPort = toEnum $ confDhtPort conf
, servicePort = 0
, servicePort = getListeningPortFromService $ nodeService realNode
, vServerID = vsID
}
initialState = LocalNodeState {
@ -144,9 +153,9 @@ nodeStateInit realNodeSTM = do
-- | Join a new node into the DHT, using a provided bootstrap node as initial cache seed
-- for resolving the new node's position.
fediChordBootstrapJoin :: LocalNodeStateSTM -- ^ the local 'NodeState'
fediChordBootstrapJoin :: LocalNodeStateSTM s -- ^ the local 'NodeState'
-> (String, PortNumber) -- ^ domain and port of a bootstrapping node
-> IO (Either String LocalNodeStateSTM) -- ^ the joined 'NodeState' after a
-> IO (Either String (LocalNodeStateSTM s)) -- ^ the joined 'NodeState' after a
-- successful join, otherwise an error message
fediChordBootstrapJoin nsSTM bootstrapNode = do
-- can be invoked multiple times with all known bootstrapping nodes until successfully joined
@ -162,7 +171,7 @@ fediChordBootstrapJoin nsSTM bootstrapNode = do
-- Periodically lookup own ID through a random bootstrapping node to discover and merge separated DHT clusters.
-- Unjoined try joining instead.
convergenceSampleThread :: LocalNodeStateSTM -> IO ()
convergenceSampleThread :: LocalNodeStateSTM s -> IO ()
convergenceSampleThread nsSTM = forever $ do
nsSnap <- readTVarIO nsSTM
parentNode <- readTVarIO $ parentRealNode nsSnap
@ -193,7 +202,7 @@ convergenceSampleThread nsSTM = forever $ do
-- | Try joining the DHT through any of the bootstrapping nodes until it succeeds.
tryBootstrapJoining :: LocalNodeStateSTM -> IO (Either String LocalNodeStateSTM)
tryBootstrapJoining :: LocalNodeStateSTM s -> IO (Either String (LocalNodeStateSTM s))
tryBootstrapJoining nsSTM = do
bss <- atomically $ do
nsSnap <- readTVar nsSTM
@ -210,7 +219,7 @@ tryBootstrapJoining nsSTM = do
-- | Look up a key just based on the responses of a single bootstrapping node.
bootstrapQueryId :: LocalNodeStateSTM -> (String, PortNumber) -> NodeID -> IO (Either String RemoteNodeState)
bootstrapQueryId :: LocalNodeStateSTM s -> (String, PortNumber) -> NodeID -> IO (Either String RemoteNodeState)
bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do
ns <- readTVarIO nsSTM
srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns)
@ -241,8 +250,8 @@ bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do
-- | join a node to the DHT using the global node cache
-- node's position.
fediChordJoin :: LocalNodeStateSTM -- ^ the local 'NodeState'
-> IO (Either String LocalNodeStateSTM) -- ^ the joined 'NodeState' after a
fediChordJoin :: LocalNodeStateSTM s -- ^ the local 'NodeState'
-> IO (Either String (LocalNodeStateSTM s)) -- ^ the joined 'NodeState' after a
-- successful join, otherwise an error message
fediChordJoin nsSTM = do
ns <- readTVarIO nsSTM
@ -258,7 +267,7 @@ fediChordJoin nsSTM = do
-- | Wait for new cache entries to appear and then try joining on them.
-- Exits after successful joining.
joinOnNewEntriesThread :: LocalNodeStateSTM -> IO ()
joinOnNewEntriesThread :: LocalNodeStateSTM s -> IO ()
joinOnNewEntriesThread nsSTM = loop
where
loop = do
@ -271,8 +280,7 @@ joinOnNewEntriesThread nsSTM = loop
result -> pure (result, cache)
case lookupResult of
-- already joined
FOUND _ -> do
print =<< readTVarIO nsSTM
FOUND _ ->
pure ()
-- otherwise try joining
FORWARD _ -> do
@ -288,7 +296,7 @@ joinOnNewEntriesThread nsSTM = loop
-- | cache updater thread that waits for incoming NodeCache update instructions on
-- the node's cacheWriteQueue and then modifies the NodeCache as the single writer.
nodeCacheWriter :: LocalNodeStateSTM -> IO ()
nodeCacheWriter :: LocalNodeStateSTM s -> IO ()
nodeCacheWriter nsSTM =
forever $ atomically $ do
ns <- readTVar nsSTM
@ -302,7 +310,7 @@ maxEntryAge = 600
-- | Periodically iterate through cache, clean up expired entries and verify unverified ones
nodeCacheVerifyThread :: LocalNodeStateSTM -> IO ()
nodeCacheVerifyThread :: LocalNodeStateSTM s -> IO ()
nodeCacheVerifyThread nsSTM = forever $ do
putStrLn "cache verify run: begin"
-- get cache
@ -363,7 +371,7 @@ nodeCacheVerifyThread nsSTM = forever $ do
-- | Checks the invariant of at least @jEntries@ per cache slice.
-- If this invariant does not hold, the middle of the slice is returned for
-- making lookups to that ID
checkCacheSliceInvariants :: LocalNodeState
checkCacheSliceInvariants :: LocalNodeState s
-> NodeCache
-> [NodeID] -- ^ list of middle IDs of slices not
-- ^ fulfilling the invariant
@ -419,12 +427,11 @@ checkCacheSliceInvariants ns
-- | Periodically send @StabiliseRequest' s to the closest neighbour nodes, until
-- one responds, and get their neighbours for maintaining the own neighbour lists.
-- If necessary, request new neighbours.
stabiliseThread :: LocalNodeStateSTM -> IO ()
stabiliseThread :: LocalNodeStateSTM s -> IO ()
stabiliseThread nsSTM = forever $ do
ns <- readTVarIO nsSTM
putStrLn "stabilise run: begin"
print ns
-- iterate through the same snapshot, collect potential new neighbours
-- and nodes to be deleted, and modify these changes only at the end of
@ -482,8 +489,8 @@ stabiliseThread nsSTM = forever $ do
-- with the n+1-th neighbour.
-- On success, return 2 lists: The failed nodes and the potential neighbours
-- returned by the queried node.
stabiliseClosestResponder :: LocalNodeState -- ^ own node
-> (LocalNodeState -> [RemoteNodeState]) -- ^ getter function for either predecessors or successors
stabiliseClosestResponder :: LocalNodeState s -- ^ own node
-> (LocalNodeState s -> [RemoteNodeState]) -- ^ getter function for either predecessors or successors
-> Int -- ^ index of neighbour to query
-> [RemoteNodeState] -- ^ delete accumulator
-> IO (Either String ([RemoteNodeState], [RemoteNodeState])) -- ^ (nodes to be deleted, successfully pinged potential neighbours)
@ -507,7 +514,7 @@ stabiliseThread nsSTM = forever $ do
currentNeighbour ns neighbourGetter = atMay $ neighbourGetter ns
checkReachability :: LocalNodeState -- ^ this node
checkReachability :: LocalNodeState s -- ^ this node
-> RemoteNodeState -- ^ node to Ping for reachability
-> IO (Maybe RemoteNodeState) -- ^ if the Pinged node handles the requested node state then that one
checkReachability ns toCheck = do
@ -536,10 +543,10 @@ sendThread sock sendQ = forever $ do
sendAllTo sock packet addr
-- | Sets up and manages the main server threads of FediChord
fediMainThreads :: Socket -> LocalNodeStateSTM -> IO ()
fediMainThreads :: Socket -> LocalNodeStateSTM s -> IO ()
fediMainThreads sock nsSTM = do
ns <- readTVarIO nsSTM
putStrLn $ "launching threads, ns: " <> show ns
putStrLn "launching threads"
sendQ <- newTQueueIO
recvQ <- newTQueueIO
-- concurrently launch all handler threads, if one of them throws an exception
@ -581,7 +588,7 @@ requestMapPurge mapVar = forever $ do
-- and pass them to their specific handling function.
fediMessageHandler :: TQueue (BS.ByteString, SockAddr) -- ^ send queue
-> TQueue (BS.ByteString, SockAddr) -- ^ receive queue
-> LocalNodeStateSTM -- ^ acting NodeState
-> LocalNodeStateSTM s -- ^ acting NodeState
-> IO ()
fediMessageHandler sendQ recvQ nsSTM = do
-- Read node state just once, assuming that all relevant data for this function does
@ -646,14 +653,33 @@ fediMessageHandler sendQ recvQ nsSTM = do
-- ==== interface to service layer ====
instance DHT RealNodeSTM where
instance DHT (RealNodeSTM s) where
lookupKey nodeSTM keystring = getKeyResponsibility nodeSTM $ genKeyID keystring
forceLookupKey nodeSTM keystring = updateLookupCache nodeSTM $ genKeyID keystring
-- potential better implementation: put all neighbours of all vservers and the vservers on a ringMap, look the key up and see whether it results in a LocalNodeState
isResponsibleFor nodeSTM key = do
node <- readTVarIO nodeSTM
foldM (\responsible vsSTM -> do
vs <- readTVarIO vsSTM
pure $ responsible || isInOwnResponsibilitySlice key vs
)
False
$ vservers node
isResponsibleForSTM nodeSTM key = do
node <- readTVar nodeSTM
foldM (\responsible vsSTM -> do
vs <- readTVar vsSTM
pure $ responsible || isInOwnResponsibilitySlice key vs
)
False
$ vservers node
-- | Returns the hostname and port of the host responsible for a key.
-- Information is provided from a cache, only on a cache miss a new DHT lookup
-- is triggered.
getKeyResponsibility :: RealNodeSTM -> NodeID -> IO (Maybe (String, PortNumber))
getKeyResponsibility :: RealNodeSTM s -> NodeID -> IO (Maybe (String, PortNumber))
getKeyResponsibility nodeSTM lookupKey = do
node <- readTVarIO nodeSTM
cache <- readTVarIO $ lookupCacheSTM node
@ -669,7 +695,7 @@ getKeyResponsibility nodeSTM lookupKey = do
-- | Triggers a new DHT lookup for a key, updates the lookup cache and returns the
-- new entry.
-- If no vserver is active in the DHT, 'Nothing' is returned.
updateLookupCache :: RealNodeSTM -> NodeID -> IO (Maybe (String, PortNumber))
updateLookupCache :: RealNodeSTM s -> NodeID -> IO (Maybe (String, PortNumber))
updateLookupCache nodeSTM lookupKey = do
(node, lookupSource) <- atomically $ do
node <- readTVar nodeSTM
@ -696,7 +722,7 @@ updateLookupCache nodeSTM lookupKey = do
-- | Periodically clean the lookup cache from expired entries.
lookupCacheCleanup :: RealNodeSTM -> IO ()
lookupCacheCleanup :: RealNodeSTM s -> IO ()
lookupCacheCleanup nodeSTM = do
node <- readTVarIO nodeSTM
forever $ do

View file

@ -1,5 +1,6 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE MultiParamTypeClasses #-}
@ -26,8 +27,7 @@ module Hash2Pub.FediChordTypes (
, CacheEntry(..)
, RingEntry(..)
, RingMap(..)
, HasKeyID
, getKeyID
, HasKeyID(..)
, rMapSize
, rMapLookup
, rMapLookupPred
@ -58,11 +58,14 @@ module Hash2Pub.FediChordTypes (
, bsAsIpAddr
, FediChordConf(..)
, DHT(..)
, Service(..)
, ServiceConf(..)
) where
import Control.Exception
import Data.Foldable (foldr')
import Data.Function (on)
import qualified Data.Hashable as Hashable
import Data.List (delete, nub, sortBy)
import qualified Data.Map.Strict as Map
import Data.Maybe (fromJust, fromMaybe, isJust,
@ -144,8 +147,8 @@ a `localCompare` b
-- | Data for managing the virtual server nodes of this real node.
-- Also contains shared data and config values.
-- TODO: more data structures for k-choices bookkeeping
data RealNode = RealNode
{ vservers :: [LocalNodeStateSTM]
data RealNode s = RealNode
{ vservers :: [LocalNodeStateSTM s]
-- ^ references to all active versers
, nodeConfig :: FediChordConf
-- ^ holds the initial configuration read at program start
@ -153,9 +156,10 @@ data RealNode = RealNode
-- ^ nodes to be used as bootstrapping points, new ones learned during operation
, lookupCacheSTM :: TVar LookupCache
-- ^ a global cache of looked up keys and their associated nodes
, nodeService :: s (RealNodeSTM s)
}
type RealNodeSTM = TVar RealNode
type RealNodeSTM s = TVar (RealNode s)
-- | represents a node and all its important state
data RemoteNodeState = RemoteNodeState
@ -177,7 +181,7 @@ instance Ord RemoteNodeState where
a `compare` b = nid a `compare` nid b
-- | represents a node and encapsulates all data and parameters that are not present for remote nodes
data LocalNodeState = LocalNodeState
data LocalNodeState s = LocalNodeState
{ nodeState :: RemoteNodeState
-- ^ represents common data present both in remote and local node representations
, nodeCacheSTM :: TVar NodeCache
@ -196,13 +200,13 @@ data LocalNodeState = LocalNodeState
-- ^ number of parallel sent queries
, jEntriesPerSlice :: Int
-- ^ number of desired entries per cache slice
, parentRealNode :: RealNodeSTM
, parentRealNode :: RealNodeSTM s
-- ^ the parent node managing this vserver instance
}
deriving (Show, Eq)
-- | for concurrent access, LocalNodeState is wrapped in a TVar
type LocalNodeStateSTM = TVar LocalNodeState
type LocalNodeStateSTM s = TVar (LocalNodeState s)
-- | class for various NodeState representations, providing
-- getters and setters for common values
@ -239,14 +243,14 @@ instance NodeState RemoteNodeState where
toRemoteNodeState = id
-- | helper function for setting values on the 'RemoteNodeState' contained in the 'LocalNodeState'
propagateNodeStateSet_ :: (RemoteNodeState -> RemoteNodeState) -> LocalNodeState -> LocalNodeState
propagateNodeStateSet_ :: (RemoteNodeState -> RemoteNodeState) -> LocalNodeState s -> LocalNodeState s
propagateNodeStateSet_ func ns = let
newNs = func $ nodeState ns
in
ns {nodeState = newNs}
instance NodeState LocalNodeState where
instance NodeState (LocalNodeState s) where
getNid = getNid . nodeState
getDomain = getDomain . nodeState
getIpAddr = getIpAddr . nodeState
@ -268,34 +272,37 @@ instance Typeable a => Show (TVar a) where
instance Typeable a => Show (TQueue a) where
show x = show (typeOf x)
instance Typeable a => Show (TChan a) where
show x = show (typeOf x)
-- | convenience function that replaces the predecessors of a 'LocalNodeState' with the k closest nodes from the provided list
setPredecessors :: [RemoteNodeState] -> LocalNodeState -> LocalNodeState
setPredecessors preds ns = ns {predecessors = takeRMapPredecessors (getNid ns) (kNeighbours ns) . rMapFromList . filter ((/=) (getNid ns) . getNid) $ preds}
setPredecessors :: [RemoteNodeState] -> LocalNodeState s -> LocalNodeState s
setPredecessors preds ns = ns {predecessors = takeRMapPredecessors (getNid ns) (kNeighbours ns) . rMapFromList . fmap keyValuePair . filter ((/=) (getNid ns) . getNid) $ preds}
-- | convenience function that replaces the successors of a 'LocalNodeState' with the k closest nodes from the provided list
setSuccessors :: [RemoteNodeState] -> LocalNodeState -> LocalNodeState
setSuccessors succs ns = ns {successors = takeRMapSuccessors (getNid ns) (kNeighbours ns) . rMapFromList . filter ((/=) (getNid ns) . getNid) $ succs}
setSuccessors :: [RemoteNodeState] -> LocalNodeState s -> LocalNodeState s
setSuccessors succs ns = ns {successors = takeRMapSuccessors (getNid ns) (kNeighbours ns) . rMapFromList . fmap keyValuePair . filter ((/=) (getNid ns) . getNid) $ succs}
-- | sets the predecessors of a 'LocalNodeState' to the closest k nodes of the current predecessors and the provided list, combined
addPredecessors :: [RemoteNodeState] -> LocalNodeState -> LocalNodeState
addPredecessors preds ns = ns {predecessors = takeRMapPredecessors (getNid ns) (kNeighbours ns) . addRMapEntries (filter ((/=) (getNid ns) . getNid) preds) . rMapFromList $ predecessors ns}
addPredecessors :: [RemoteNodeState] -> LocalNodeState s -> LocalNodeState s
addPredecessors preds ns = ns {predecessors = takeRMapPredecessors (getNid ns) (kNeighbours ns) . addRMapEntries (keyValuePair <$> filter ((/=) (getNid ns) . getNid) preds) . rMapFromList . fmap keyValuePair $ predecessors ns}
-- | sets the successors of a 'LocalNodeState' to the closest k nodes of the current successors and the provided list, combined
addSuccessors :: [RemoteNodeState] -> LocalNodeState -> LocalNodeState
addSuccessors succs ns = ns {successors = takeRMapSuccessors (getNid ns) (kNeighbours ns) . addRMapEntries (filter ((/=) (getNid ns) . getNid) succs) . rMapFromList $ successors ns}
addSuccessors :: [RemoteNodeState] -> LocalNodeState s -> LocalNodeState s
addSuccessors succs ns = ns {successors = takeRMapSuccessors (getNid ns) (kNeighbours ns) . addRMapEntries (keyValuePair <$> filter ((/=) (getNid ns) . getNid) succs) . rMapFromList . fmap keyValuePair $ successors ns}
instance HasKeyID RemoteNodeState NodeID where
instance HasKeyID NodeID RemoteNodeState where
getKeyID = getNid
instance HasKeyID a k => HasKeyID (CacheEntry a) k where
instance HasKeyID k a => HasKeyID k (CacheEntry a) where
getKeyID (CacheEntry _ obj _) = getKeyID obj
instance HasKeyID NodeID NodeID where
getKeyID = id
type NodeCacheEntry = CacheEntry RemoteNodeState
type NodeCache = RingMap NodeCacheEntry NodeID
type NodeCache = RingMap NodeID NodeCacheEntry
type LookupCacheEntry = CacheEntry (String, PortNumber)
type LookupCache = Map.Map NodeID LookupCacheEntry
@ -319,12 +326,15 @@ cacheLookup = rMapLookup
cacheLookupSucc :: NodeID -- ^lookup key
-> NodeCache -- ^ring cache
-> Maybe NodeCacheEntry
cacheLookupSucc = rMapLookupSucc
cacheLookupSucc key cache = snd <$> rMapLookupSucc key cache
cacheLookupPred :: NodeID -- ^lookup key
-> NodeCache -- ^ring cache
-> Maybe NodeCacheEntry
cacheLookupPred = rMapLookupPred
cacheLookupPred key cache = snd <$> rMapLookupPred key cache
-- clean up cache entries: once now - entry > maxAge
-- transfer difference now - entry to other node
-- | return the @NodeState@ data from a cache entry without checking its validation status
cacheGetNodeStateUnvalidated :: CacheEntry RemoteNodeState -> RemoteNodeState
@ -408,11 +418,33 @@ data FediChordConf = FediChordConf
}
deriving (Show, Eq)
-- ====== Service Types ============
class Service s d where
-- | run the service
runService :: ServiceConf -> d -> IO (s d)
getListeningPortFromService :: (Integral i) => s d -> i
instance Hashable.Hashable NodeID where
hashWithSalt salt = Hashable.hashWithSalt salt . getNodeID
hash = Hashable.hash . getNodeID
data ServiceConf = ServiceConf
{ confSubscriptionExpiryTime :: POSIXTime
-- ^ subscription lease expiration in seconds
, confServicePort :: Int
-- ^ listening port for service
, confServiceHost :: String
-- ^ hostname of service
}
class DHT d where
-- | lookup the responsible host handling a given key string,
-- possibly from a lookup cache
-- possiblggy from a lookup cache
lookupKey :: d -> String -> IO (Maybe (String, PortNumber))
-- | lookup the responsible host handling a given key string,
-- but force the DHT to do a fresh lookup instead of returning a cached result.
-- Also invalidates old cache entries.
forceLookupKey :: d -> String -> IO (Maybe (String, PortNumber))
isResponsibleFor :: d -> NodeID -> IO Bool
isResponsibleForSTM :: d -> NodeID -> STM Bool

View file

@ -1,117 +1,577 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE InstanceSigs #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE InstanceSigs #-}
module Hash2Pub.PostService where
import Control.Concurrent
import qualified Data.ByteString.Lazy.UTF8 as BSU
import Data.Maybe (fromMaybe)
import Data.String (fromString)
import qualified Data.Text as Txt
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Concurrent.STM.TChan
import Control.Concurrent.STM.TChan
import Control.Concurrent.STM.TQueue
import Control.Concurrent.STM.TVar
import Control.Exception (Exception (..), try)
import Control.Monad (foldM, forM, forM_, forever)
import Control.Monad.IO.Class (liftIO)
import Control.Monad.STM
import Data.Bifunctor
import qualified Data.ByteString.Lazy.UTF8 as BSUL
import qualified Data.ByteString.UTF8 as BSU
import qualified Data.HashMap.Strict as HMap
import qualified Data.HashSet as HSet
import Data.Maybe (fromMaybe, isJust)
import Data.String (fromString)
import qualified Data.Text.Lazy as Txt
import Data.Text.Normalize (NormalizationMode (NFC),
normalize)
import Data.Time.Clock.POSIX
import Data.Typeable (Typeable)
import qualified Network.HTTP.Client as HTTP
import qualified Network.HTTP.Types as HTTPT
import System.Random
import Text.Read (readEither)
import qualified Network.Wai.Handler.Warp as Warp
import qualified Network.Wai.Handler.Warp as Warp
import Servant
import Servant.Client
import Servant.Server
import Hash2Pub.FediChord
import Hash2Pub.ServiceTypes
import Hash2Pub.FediChordTypes
import Hash2Pub.RingMap
data PostService d = PostService
{ psPort :: Warp.Port
, psHost :: String
{ serviceConf :: ServiceConf
-- queues, other data structures
, baseDHT :: (DHT d) => d
, serviceThread :: ThreadId
, baseDHT :: (DHT d) => d
, serviceThread :: TVar ThreadId
, subscribers :: TVar RelayTags
-- ^ for each tag store the subscribers + their queue
, ownSubscriptions :: TVar (HMap.HashMap NodeID POSIXTime)
-- ^ tags subscribed by the own node have an assigned lease time
, ownPosts :: TVar (HSet.HashSet Txt.Text)
-- ^ just store the existence of posts for saving memory,
, relayInQueue :: TQueue (Hashtag, PostID, PostContent)
-- ^ Queue for processing incoming posts of own instance asynchronously
, postFetchQueue :: TQueue PostID
, httpMan :: HTTP.Manager
}
deriving (Typeable)
type Hashtag = Txt.Text
type PostID = Txt.Text
type PostContent = Txt.Text
-- | For each handled tag, store its subscribers and provide a
-- broadcast 'TChan' for enqueuing posts
type RelayTags = RingMap NodeID (TagSubscribersSTM, TChan PostID, Hashtag)
type TagSubscribersSTM = TVar TagSubscribers
-- | each subscriber is identified by its contact data "hostname" "port"
-- and holds a TChan duplicated from the broadcast TChan of the tag
-- + an expiration timestamp
type TagSubscribers = (HMap.HashMap (String, Int) (TChan PostID, POSIXTime))
instance DHT d => Service PostService d where
runService dht host port = do
-- | initialise 'PostService' data structures and run server
runService conf dht = do
-- create necessary TVars
threadVar <- newTVarIO =<< myThreadId -- own thread ID as placeholder
subscriberVar <- newTVarIO emptyRMap
ownSubsVar <- newTVarIO HMap.empty
ownPostVar <- newTVarIO HSet.empty
relayInQueue' <- newTQueueIO
postFetchQueue' <- newTQueueIO
httpMan' <- HTTP.newManager HTTP.defaultManagerSettings
let
port' = fromIntegral port
warpSettings = Warp.setPort port' . Warp.setHost (fromString host) $ Warp.defaultSettings
servThread <- forkIO $ Warp.runSettings warpSettings postServiceApplication
pure $ PostService {
psPort = port'
, psHost = host
, baseDHT = dht
, serviceThread = servThread
}
getServicePort s = fromIntegral $ psPort s
thisService = PostService {
serviceConf = conf
, baseDHT = dht
, serviceThread = threadVar
, subscribers = subscriberVar
, ownSubscriptions = ownSubsVar
, ownPosts = ownPostVar
, relayInQueue = relayInQueue'
, postFetchQueue = postFetchQueue'
, httpMan = httpMan'
}
port' = fromIntegral (confServicePort conf)
warpSettings = Warp.setPort port' . Warp.setHost (fromString . confServiceHost $ conf) $ Warp.defaultSettings
-- Run 'concurrently_' from another thread to be able to return the
-- 'PostService'.
-- Terminating that parent thread will make all child threads terminate as well.
servThreadID <- forkIO $
concurrently_
-- web server
(Warp.runSettings warpSettings $ postServiceApplication thisService)
(processIncomingPosts thisService)
-- update thread ID after fork
atomically $ writeTVar threadVar servThreadID
pure thisService
getListeningPortFromService = fromIntegral . confServicePort . serviceConf
-- | return a WAI application
postServiceApplication :: Application
postServiceApplication = serve exposedPostServiceAPI postServer
postServiceApplication :: DHT d => PostService d -> Application
postServiceApplication serv = serve exposedPostServiceAPI $ postServer serv
servicePort = 8081
-- | needed for guiding type inference
exposedPostServiceAPI :: Proxy PostServiceAPI
exposedPostServiceAPI = Proxy
-- ========= constants ===========
placeholderPost :: Txt.Text
placeholderPost = Txt.take 5120 . Txt.repeat $ 'O' -- size 5KiB
-- ========= HTTP API and handlers =============
type PostServiceAPI = "relay" :> "inbox" :> ReqBody '[PlainText] Txt.Text :> PostCreated '[PlainText] Txt.Text
-- ^ delivery endpoint of newly published posts of the relay's instance
:<|> "relay" :> "subscribers" :> ReqBody '[PlainText] Txt.Text :> Post '[PlainText] Txt.Text
-- ^ endpoint for delivering the subscriptions and outstanding queue
type PostServiceAPI = "relay" :> "inbox" :> Capture "hashtag" Txt.Text :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent
-- delivery endpoint of newly published posts of the relay's instance
:<|> "relay" :> "subscribers" :> ReqBody '[PlainText] Txt.Text :> PostNoContent '[PlainText] Txt.Text
-- endpoint for delivering the subscriptions and outstanding queue
:<|> "post" :> Capture "postid" Txt.Text :> Get '[PlainText] Txt.Text
-- ^ fetch endpoint for posts, full post ID is http://$domain/post/$postid
-- fetch endpoint for posts, full post ID is http://$domain/post/$postid
:<|> "posts" :> ReqBody '[PlainText] Txt.Text :> Post '[PlainText] Txt.Text
-- ^ endpoint for fetching multiple posts at once
-- endpoint for fetching multiple posts at once
:<|> "posts" :> "inbox" :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent
-- delivery endpoint of newly published posts of the relay's instance
:<|> "tags" :> Capture "hashtag" Txt.Text :> ReqBody '[PlainText] Txt.Text :> PostCreated '[PlainText] Txt.Text
-- ^ delivery endpoint for posts of $tag at subscribing instance
-- delivery endpoint for posts of $tag at subscribing instance
:<|> "tags" :> Capture "hashtag" Txt.Text :> "subscribe" :> Header "Origin" Txt.Text :> Get '[PlainText] Integer
-- ^ endpoint for subscribing the instance specified in
-- endpoint for subscribing the instance specified in
-- the Origin header to $hashtag.
-- Returns subscription lease time in seconds.
:<|> "tags" :> Capture "hashtag" Txt.Text :> "unsubscribe" :> Header "Origin" Txt.Text :> Get '[PlainText] Txt.Text
-- ^ endpoint for unsubscribing the instance specified in
-- endpoint for unsubscribing the instance specified in
-- the Origin header to $hashtag
postServer :: Server PostServiceAPI
postServer = relayInbox
:<|> subscriptionDelivery
:<|> postFetch
:<|> postMultiFetch
:<|> tagDelivery
:<|> tagSubscribe
:<|> tagUnsubscribe
postServer :: DHT d => PostService d -> Server PostServiceAPI
postServer service = relayInbox service
:<|> subscriptionDelivery service
:<|> postFetch service
:<|> postMultiFetch service
:<|> postInbox service
:<|> tagDelivery service
:<|> tagSubscribe service
:<|> tagUnsubscribe service
relayInbox :: Txt.Text -> Handler Txt.Text
relayInbox post = pure $ "Here be InboxDragons with " <> post
relayInbox :: DHT d => PostService d -> Hashtag -> Txt.Text -> Handler NoContent
relayInbox serv tag posts = do
let
-- skip checking whether the post actually contains the tag, just drop full post
postIDs = head . Txt.splitOn "," <$> Txt.lines posts
-- if tag is not in own responsibility, return a 410 Gone
responsible <- liftIO $ isResponsibleFor (baseDHT serv) (genKeyID . Txt.unpack $ tag)
if responsible
then pure ()
else
(throwError $ err410 { errBody = "Relay is not responsible for this tag"})
broadcastChan <- liftIO $ atomically $ getTagBroadcastChannel serv tag
maybe
-- if noone subscribed to the tag, nothing needs to be done
(pure ())
-- otherwise enqueue posts into broadcast queue of the tag
(\queue ->
liftIO $ forM_ postIDs (atomically . writeTChan queue)
)
broadcastChan
pure NoContent
subscriptionDelivery :: Txt.Text -> Handler Txt.Text
subscriptionDelivery subList = pure $ "Here be Subscription List dragons: " <> subList
-- exception to be thrown when a tag is not in the responsibility of a relay
newtype UnhandledTagException = UnhandledTagException String
deriving (Show, Typeable)
postFetch :: Txt.Text -> Handler Txt.Text
postFetch postID = pure $ "Here be a post with dragon ID " <> postID
instance Exception UnhandledTagException
postMultiFetch :: Txt.Text -> Handler Txt.Text
postMultiFetch postIDs = pure $ "Here be multiple post dragons: "
<> (Txt.unwords . Txt.lines $ postIDs)
subscriptionDelivery :: DHT d => PostService d -> Txt.Text -> Handler Txt.Text
subscriptionDelivery serv subList = do
let
tagSubs = Txt.lines subList
-- In favor of having the convenience of rolling back the transaction once a
-- not-handled tag occurs, this results in a single large transaction.
-- Hopefully the performance isn't too bad.
res <- liftIO . atomically $ (foldM (\_ tag' -> do
responsible <- isResponsibleForSTM (baseDHT serv) (genKeyID . Txt.unpack $ tag')
if responsible
then processTag (subscribers serv) tag'
else throwSTM $ UnhandledTagException (Txt.unpack tag' <> " not handled by this relay")
pure $ Right ()
) (pure ()) tagSubs
`catchSTM` (\e -> pure . Left $ show (e :: UnhandledTagException))
-- TODO: potentially log this
:: STM (Either String ()))
case res of
Left err -> throwError err410 {errBody = BSUL.fromString err}
Right _ -> pure ""
-- TODO: check and only accept tags in own (future?) responsibility
where
processTag :: TVar RelayTags -> Txt.Text -> STM ()
processTag subscriberSTM tagData = do
let
tag:subText:lease:posts:_ = Txt.splitOn "," tagData
-- ignore checking of lease time
leaseTime = fromIntegral (read . Txt.unpack $ lease :: Integer)
sub = read . Txt.unpack $ subText :: (String, Int)
postList = Txt.words posts
enqueueSubscription subscriberSTM (normaliseTag tag) sub postList leaseTime
tagDelivery :: Txt.Text -> Txt.Text -> Handler Txt.Text
tagDelivery hashtag posts = pure $ "Here be #" <> hashtag <> " dragons with " <> posts
tagSubscribe :: Txt.Text -> Maybe Txt.Text -> Handler Integer
tagSubscribe hashtag origin = pure 42
postFetch :: PostService d -> Txt.Text -> Handler Txt.Text
postFetch serv postID = do
postSet <- liftIO . readTVarIO . ownPosts $ serv
if HSet.member postID postSet
-- decision: always return the same placeholder post
then pure placeholderPost
else throwError $ err404 { errBody = "No post found with this ID" }
tagUnsubscribe :: Txt.Text -> Maybe Txt.Text -> Handler Txt.Text
tagUnsubscribe hashtag origin = pure $ "Here be a dragon unsubscription from " <> fromMaybe "Nothing" origin <> " to " <> hashtag
postMultiFetch :: PostService d -> Txt.Text -> Handler Txt.Text
postMultiFetch serv postIDs = do
let idList = Txt.lines postIDs
postSet <- liftIO . readTVarIO . ownPosts $ serv
-- look up existence of all given post IDs, fail if even one is missing
foldM (\response postID ->
if HSet.member postID postSet
then pure $ placeholderPost <> "\n" <> response
else throwError $ err404 { errBody = "No post found with this ID" }
) "" idList
postInbox :: PostService d -> Txt.Text -> Handler NoContent
postInbox serv post = do
-- extract contained hashtags
let
containedTags = fmap (normaliseTag . Txt.tail) . filter ((==) '#' . Txt.head) . Txt.words $ post
-- generate post ID
postId <- liftIO $ Txt.pack . show <$> (randomRIO (0, 2^(128::Integer)-1) :: IO Integer)
-- add ID to own posts
liftIO . atomically $ modifyTVar' (ownPosts serv) (HSet.insert postId)
-- enqueue a relay job for each tag
liftIO $ forM_ (containedTags :: [Txt.Text]) (\tag ->
atomically $ writeTQueue (relayInQueue serv) (tag, postId, post)
)
pure NoContent
tagDelivery :: PostService d -> Txt.Text -> Txt.Text -> Handler Txt.Text
tagDelivery serv hashtag posts = do
let postIDs = Txt.lines posts
subscriptions <- liftIO . readTVarIO . ownSubscriptions $ serv
if isJust (HMap.lookup (genKeyID . Txt.unpack $ hashtag) subscriptions)
then -- TODO: increase a counter/ statistics for received posts of this tag
liftIO $ forM_ postIDs $ atomically . writeTQueue (postFetchQueue serv)
else -- silently drop posts from unsubscribed tags
pure ()
pure $ "Received a postID for tag " <> hashtag
tagSubscribe :: DHT d => PostService d -> Txt.Text -> Maybe Txt.Text -> Handler Integer
tagSubscribe serv hashtag origin = do
responsible <- liftIO $ isResponsibleFor (baseDHT serv) (genKeyID . Txt.unpack $ hashtag)
if not responsible
-- GONE if not responsible
then throwError err410 { errBody = "not responsible for this tag" }
else pure ()
originURL <- maybe
(throwError $ err400 { errBody = "Missing Origin header" })
pure
origin
req <- HTTP.parseUrlThrow (Txt.unpack originURL)
now <- liftIO getPOSIXTime
let leaseTime = now + confSubscriptionExpiryTime (serviceConf serv)
-- setup subscription entry
_ <- liftIO . atomically $ setupSubscriberChannel (subscribers serv) hashtag (BSU.toString $ HTTP.host req, HTTP.port req) leaseTime
pure $ round leaseTime
tagUnsubscribe :: DHT d => PostService d -> Txt.Text -> Maybe Txt.Text -> Handler Txt.Text
tagUnsubscribe serv hashtag origin = do
responsible <- liftIO $ isResponsibleFor (baseDHT serv) (genKeyID . Txt.unpack $ hashtag)
if not responsible
-- GONE if not responsible
then throwError err410 { errBody = "not responsible for this tag" }
else pure ()
originURL <- maybe
(throwError $ err400 { errBody = "Missing Origin header" })
pure
origin
req <- HTTP.parseUrlThrow (Txt.unpack originURL)
liftIO . atomically $ deleteSubscription (subscribers serv) hashtag (BSU.toString $ HTTP.host req, HTTP.port req)
pure "bye bye"
-- client/ request functions
clientAPI :: Proxy PostServiceAPI
clientAPI = Proxy
relayInboxClient :<|> subscriptionDeliveryClient :<|> postFetchClient :<|> postMultiFetchClient :<|> postInboxClient :<|> tagDeliveryClient :<|> tagSubscribeClient :<|> tagUnsubscribeClient = client clientAPI
-- | Deliver the subscriber list of all hashtags in the interval [fromTag, toTag]
-- and their outstanding delivery queue to another instance.
-- If the transfer succeeds, the transfered subscribers are removed from the local list.
clientDeliverSubscriptions :: PostService d
-> Hashtag -- ^ fromTag
-> Hashtag -- ^ toTag
-> (String, Int) -- ^ hostname and port of instance to deliver to
-> IO (Either String ()) -- Either signals success or failure
clientDeliverSubscriptions serv fromTag toTag (toHost, toPort) = do
-- collect tag intearval
intervalTags <- takeRMapSuccessorsFromTo (genKeyID $ Txt.unpack fromTag) (genKeyID $ Txt.unpack toTag) <$> readTVarIO (subscribers serv)
-- returns a [ (TagSubscribersSTM, TChan PostID, Hashtag) ]
-- extract subscribers and posts
-- no need for extracting as a single atomic operation, as newly incoming posts are supposed to be rejected because of already having re-positioned on the DHT
subscriberData <- foldM (\response (subSTM, _, tag) -> do
subMap <- readTVarIO subSTM
thisTagsData <- foldM (\tagResponse (subscriber, (subChan, lease)) -> do
-- duplicate the pending queue to work on a copy, in case of a delivery error
pending <- atomically $ do
queueCopy <- cloneTChan subChan
channelGetAll queueCopy
if null pending
then pure tagResponse
else pure $ tag <> "," <> Txt.pack (show subscriber) <> "," <> Txt.pack (show lease) <> "," <> Txt.unwords pending <> "\n"
)
""
(HMap.toList subMap)
pure $ thisTagsData <> response
)
""
intervalTags
-- send subscribers
resp <- runClientM (subscriptionDeliveryClient subscriberData) (mkClientEnv (httpMan serv) (BaseUrl Http toHost (fromIntegral toPort) ""))
-- on failure return a Left, otherwise delete subscription entry
case resp of
Left err -> pure . Left . show $ err
Right _ -> do
atomically $
modifyTVar' (subscribers serv) $ \tagMap ->
foldr deleteRMapEntry tagMap ((\(_, _, t) -> genKeyID . Txt.unpack $ t) <$> intervalTags)
pure . Right $ ()
where
channelGetAll :: TChan a -> STM [a]
channelGetAll chan = channelGetAll' chan []
channelGetAll' :: TChan a -> [a] -> STM [a]
channelGetAll' chan acc = do
haveRead <- tryReadTChan chan
maybe (pure acc) (\x -> channelGetAll' chan (x:acc)) haveRead
-- | Subscribe the client to the given hashtag. On success it returns the given lease time.
clientSubscribeTo :: DHT d => PostService d -> Hashtag -> IO (Either String Integer)
clientSubscribeTo serv tag = do
lookupRes <- lookupKey (baseDHT serv) (Txt.unpack tag)
doSubscribe lookupRes True
where
doSubscribe lookupResponse allowRetry = maybe
(pure . Left $ "No node found")
(\(foundHost, foundPort) -> do
let origin = "http://" <> Txt.pack (confServiceHost $ serviceConf serv) <> ":" <> Txt.pack (show (getListeningPortFromService serv :: Integer))
resp <- runClientM (tagSubscribeClient tag (Just origin)) (mkClientEnv (httpMan serv) (BaseUrl Http foundHost (fromIntegral foundPort) ""))
case resp of
Left (FailureResponse _ fresp)
|(HTTPT.statusCode . responseStatusCode $ fresp) == 410 && allowRetry -> do -- responsibility gone, force new lookup
newRes <- forceLookupKey (baseDHT serv) (Txt.unpack tag)
doSubscribe newRes False
Left err -> pure . Left . show $ err
Right lease -> pure . Right $ lease
)
lookupResponse
-- | Unsubscribe the client from the given hashtag.
clientUnsubscribeFrom :: DHT d => PostService d -> Hashtag -> IO (Either String ())
clientUnsubscribeFrom serv tag = do
lookupRes <- lookupKey (baseDHT serv) (Txt.unpack tag)
doUnsubscribe lookupRes True
where
doUnsubscribe lookupResponse allowRetry = maybe
(pure . Left $ "No node found")
(\(foundHost, foundPort) -> do
let origin = "http://" <> Txt.pack (confServiceHost $ serviceConf serv) <> ":" <> Txt.pack (show (getListeningPortFromService serv :: Integer))
resp <- runClientM (tagUnsubscribeClient tag (Just origin)) (mkClientEnv (httpMan serv) (BaseUrl Http foundHost (fromIntegral foundPort) ""))
case resp of
Left (FailureResponse _ fresp)
|(HTTPT.statusCode . responseStatusCode $ fresp) == 410 && allowRetry -> do -- responsibility gone, force new lookup
newRes <- forceLookupKey (baseDHT serv) (Txt.unpack tag)
doUnsubscribe newRes False
Left err -> pure . Left . show $ err
Right _ -> pure . Right $ ()
)
lookupResponse
-- | publish a new post to the inbox of a specified relay instance. This
-- instance will then be the originating instance of the post and will forward
-- the post to the responsible relays.
-- As the initial publishing isn't done by a specific relay (but *to* a specific relay
-- instead), the function does *not* take a PostService as argument.
clientPublishPost :: HTTP.Manager -- for better performance, a shared HTTP manager has to be provided
-> String -- hostname
-> Int -- port
-> PostContent -- post content
-> IO (Either String ()) -- error or success
clientPublishPost httpman hostname port postC = do
resp <- runClientM (postInboxClient postC) (mkClientEnv httpman (BaseUrl Http hostname port ""))
pure . bimap show (const ()) $ resp
-- currently this is unused code
getClients :: String -> Int -> HTTP.Manager -> Client IO PostServiceAPI
getClients hostname' port' httpMan = hoistClient clientAPI
(fmap (either (error . show) id)
. flip runClientM clientEnv
)
(client clientAPI)
where
clientEnv = mkClientEnv httpMan (BaseUrl Http hostname' port' "")
-- ======= data structure manipulations =========
-- | Write all pending posts of a subscriber-tag-combination to its queue.
-- Sets up all necessary data structures if they are still missing.
enqueueSubscription :: TVar RelayTags -- tag-subscriber map
-> Hashtag -- hashtag of pending posts
-> (String, Int) -- subscriber's connection information
-> [PostID] -- pending posts
-> POSIXTime -- lease expiry time
-> STM ()
enqueueSubscription tagMapSTM tag subscriber posts leaseTime = do
-- get the tag output queue and, if necessary, create it
subChan <- setupSubscriberChannel tagMapSTM tag subscriber leaseTime
forM_ posts (writeTChan subChan)
-- | STM operation to return the outgoing post queue of a tag to a specified subscriber.
-- If the queue doesn't exist yet, all necessary data structures are set up accordingly.
setupSubscriberChannel :: TVar RelayTags -> Hashtag -> (String, Int) -> POSIXTime -> STM (TChan PostID)
setupSubscriberChannel tagMapSTM tag subscriber leaseTime = do
tagMap <- readTVar tagMapSTM
case lookupTagSubscriptions tag tagMap of
Nothing -> do
-- if no collision/ tag doesn't exist yet, just initialize a
-- new subscriber map
broadcastChan <- newBroadcastTChan
tagOutChan <- dupTChan broadcastChan
newSubMapSTM <- newTVar $ HMap.singleton subscriber (tagOutChan, leaseTime)
writeTVar tagMapSTM $ addRMapEntry (genKeyID . Txt.unpack $ tag) (newSubMapSTM, broadcastChan, tag) tagMap
pure tagOutChan
Just (foundSubMapSTM, broadcastChan, _) -> do
-- otherwise use the existing subscriber map
foundSubMap <- readTVar foundSubMapSTM
case HMap.lookup subscriber foundSubMap of
Nothing -> do
-- for new subscribers, create new output channel
tagOutChan <- dupTChan broadcastChan
writeTVar foundSubMapSTM $ HMap.insert subscriber (tagOutChan, leaseTime) foundSubMap
pure tagOutChan
-- existing subscriber's channels are just returned
Just (tagOutChan, _) -> pure tagOutChan
-- | deletes a subscription from the passed subscriber map
deleteSubscription :: TVar RelayTags -> Hashtag -> (String, Int) -> STM ()
deleteSubscription tagMapSTM tag subscriber = do
tagMap <- readTVar tagMapSTM
case lookupTagSubscriptions tag tagMap of
-- no subscribers to that tag, just return
Nothing -> pure ()
Just (foundSubMapSTM, _, _) -> do
foundSubMap <- readTVar foundSubMapSTM
let newSubMap = HMap.delete subscriber foundSubMap
-- if there are no subscriptions for the tag anymore, remove its
-- data sttructure altogether
if HMap.null newSubMap
then writeTVar tagMapSTM $ deleteRMapEntry (genKeyID . Txt.unpack $ tag) tagMap
-- otherwise just remove the subscription of that node
else writeTVar foundSubMapSTM newSubMap
-- | returns the broadcast channel of a hashtag if there are any subscribers to it
getTagBroadcastChannel :: PostService d -> Hashtag -> STM (Maybe (TChan PostID))
getTagBroadcastChannel serv tag = do
tagMap <- readTVar $ subscribers serv
case lookupTagSubscriptions tag tagMap of
Nothing -> pure Nothing
Just (subscriberSTM, broadcastChan, _) -> do
subscriberMap <- readTVar subscriberSTM
if HMap.null subscriberMap
then pure Nothing
else pure (Just broadcastChan)
-- | look up the subscription data of a tag
lookupTagSubscriptions :: Hashtag -> RingMap NodeID a -> Maybe a
lookupTagSubscriptions tag = rMapLookup (genKeyID . Txt.unpack $ tag)
-- normalise the unicode representation of a string to NFC
normaliseTag :: Txt.Text -> Txt.Text
normaliseTag = Txt.fromStrict . normalize NFC . Txt.toStrict
-- | define how to convert all showable types to PlainText
-- No idea what I'm doing with these overlappable instances though ¯\_(ツ)_/¯
-- TODO: figure out how this overlapping stuff actually works https://downloads.haskell.org/~ghc/latest/docs/html/users_guide/glasgow_exts.html#instance-overlap
instance {-# OVERLAPPABLE #-} Show a => MimeRender PlainText a where
mimeRender _ = BSU.fromString . show
mimeRender _ = BSUL.fromString . show
instance {-# OVERLAPPABLE #-} Read a => MimeUnrender PlainText a where
mimeUnrender _ = readEither . BSUL.toString
-- ====== worker threads ======
-- | process the pending relay inbox of incoming posts from the internal queue:
-- Look up responsible relay node for given hashtag and forward post to it
processIncomingPosts :: DHT d => PostService d -> IO ()
processIncomingPosts serv = forever $ do
-- blocks until available
-- TODO: process multiple in parallel
(tag, pID, pContent) <- atomically . readTQueue $ relayInQueue serv
lookupRes <- lookupKey (baseDHT serv) (Txt.unpack tag)
case lookupRes of
-- no vserver active => wait and retry
Nothing -> threadDelay $ 10 * 10^6
Just (responsibleHost, responsiblePort) -> do
resp <- runClientM (relayInboxClient tag $ pID <> "," <> pContent) (mkClientEnv (httpMan serv) (BaseUrl Http responsibleHost (fromIntegral responsiblePort) ""))
case resp of
Left err -> do
putStrLn $ "Error: " <> show err
-- 410 error indicates outdated responsibility mapping
-- Simplification: just invalidate the mapping entry on all errors, force a re-lookup and re-queue the post
-- TODO: keep track of maximum retries
_ <- forceLookupKey (baseDHT serv) (Txt.unpack tag)
atomically . writeTQueue (relayInQueue serv) $ (tag, pID, pContent)
Right yay -> putStrLn $ "Yay! " <> show yay
-- | process the pending fetch jobs of delivered post IDs: Delivered posts are tried to be fetched from their URI-ID
fetchTagPosts :: DHT d => PostService d -> IO ()
fetchTagPosts serv = forever $ do
-- blocks until available
-- TODO: batching, retry
-- TODO: process multiple in parallel
pIdUri <- atomically . readTQueue $ postFetchQueue serv
fetchReq <- HTTP.parseRequest . Txt.unpack $pIdUri
resp <- try $ HTTP.httpLbs fetchReq (httpMan serv) :: IO (Either HTTP.HttpException (HTTP.Response BSUL.ByteString))
case resp of
Right response ->
if HTTPT.statusCode (HTTP.responseStatus response) == 200
then
-- success, TODO: statistics
putStrLn "post fetch success"
else
-- TODO error handling, retry
pure ()
Left _ ->
-- TODO error handling, retry
pure ()

View file

@ -5,36 +5,38 @@ module Hash2Pub.RingMap where
import Data.Foldable (foldr')
import qualified Data.Map.Strict as Map
import Data.Maybe (fromJust, isJust, isNothing, mapMaybe)
import Data.Maybe (isJust, isNothing, mapMaybe)
-- | Class for all types that can be identified via a EpiChord key.
-- Used for restricting the types a 'RingMap' can store
class (Eq a, Show a, Bounded k, Ord k) => HasKeyID a k where
class (Eq a, Show a, Bounded k, Ord k) => HasKeyID k a where
getKeyID :: a -> k
keyValuePair :: a -> (k, a)
keyValuePair val = (getKeyID val, val)
-- | generic data structure for holding elements with a key and modular lookup
newtype RingMap a k = RingMap { getRingMap :: (HasKeyID a k, Bounded k, Ord k) => Map.Map k (RingEntry a k) }
newtype RingMap k a = RingMap { getRingMap :: (Bounded k, Ord k) => Map.Map k (RingEntry k a) }
instance (HasKeyID a k, Bounded k, Ord k) => Eq (RingMap a k) where
instance (Bounded k, Ord k, Eq a) => Eq (RingMap k a) where
a == b = getRingMap a == getRingMap b
instance (HasKeyID a k, Bounded k, Ord k, Show k) => Show (RingMap a k) where
instance (Bounded k, Ord k, Show k, Show a) => Show (RingMap k a) where
show rmap = shows "RingMap " (show $ getRingMap rmap)
-- | entry of a 'RingMap' that holds a value and can also
-- wrap around the lookup direction at the edges of the name space.
data RingEntry a k = KeyEntry a
| ProxyEntry (k, ProxyDirection) (Maybe (RingEntry a k))
data RingEntry k a = KeyEntry a
| ProxyEntry (k, ProxyDirection) (Maybe (RingEntry k a))
deriving (Show, Eq)
-- | as a compromise, only KeyEntry components are ordered by their key
-- while ProxyEntry components should never be tried to be ordered.
instance (HasKeyID a k, Eq k, Ord a, Bounded k, Ord k) => Ord (RingEntry a k) where
instance (HasKeyID k a, Eq k, Ord a, Bounded k, Ord k) => Ord (RingEntry k a) where
a `compare` b = compare (extractID a) (extractID b)
where
extractID :: (HasKeyID a k, Ord a, Bounded k, Ord k) => RingEntry a k -> k
extractID :: (HasKeyID k a, Ord a, Bounded k, Ord k) => RingEntry k a -> k
extractID (KeyEntry e) = getKeyID e
extractID ProxyEntry{} = error "proxy entries should never appear outside of the RingMap"
@ -49,51 +51,51 @@ instance Enum ProxyDirection where
fromEnum Backwards = - 1
fromEnum Forwards = 1
-- | helper function for getting the a from a RingEntry a k
extractRingEntry :: (HasKeyID a k, Bounded k, Ord k) => RingEntry a k -> Maybe a
-- | helper function for getting the a from a RingEntry k a
extractRingEntry :: (Bounded k, Ord k) => RingEntry k a -> Maybe a
extractRingEntry (KeyEntry entry) = Just entry
extractRingEntry (ProxyEntry _ (Just (KeyEntry entry))) = Just entry
extractRingEntry _ = Nothing
-- | An empty 'RingMap' needs to be initialised with 2 proxy entries,
-- linking the modular name space together by connecting @minBound@ and @maxBound@
emptyRMap :: (HasKeyID a k, Bounded k, Ord k) => RingMap a k
emptyRMap :: (Bounded k, Ord k) => RingMap k a
emptyRMap = RingMap . Map.fromList $ proxyEntry <$> [(maxBound, (minBound, Forwards)), (minBound, (maxBound, Backwards))]
where
proxyEntry (from,to) = (from, ProxyEntry to Nothing)
-- | Maybe returns the entry stored at given key
rMapLookup :: (HasKeyID a k, Bounded k, Ord k)
rMapLookup :: (Bounded k, Ord k)
=> k -- ^lookup key
-> RingMap a k -- ^lookup cache
-> RingMap k a -- ^lookup cache
-> Maybe a
rMapLookup key rmap = extractRingEntry =<< Map.lookup key (getRingMap rmap)
-- | returns number of present 'KeyEntry' in a properly initialised 'RingMap'
rMapSize :: (HasKeyID a k, Integral i, Bounded k, Ord k)
=> RingMap a k
rMapSize :: (Integral i, Bounded k, Ord k)
=> RingMap k a
-> i
rMapSize rmap = fromIntegral $ Map.size innerMap - oneIfEntry rmap minBound - oneIfEntry rmap maxBound
where
innerMap = getRingMap rmap
oneIfEntry :: (HasKeyID a k, Integral i, Bounded k, Ord k) => RingMap a k -> k -> i
oneIfEntry :: (Integral i, Bounded k, Ord k) => RingMap k a -> k -> i
oneIfEntry rmap' nid
| isNothing (rMapLookup nid rmap') = 1
| otherwise = 0
-- | a wrapper around lookup functions, making the lookup redirectable by a @ProxyEntry@
-- to simulate a modular ring
lookupWrapper :: (HasKeyID a k, Bounded k, Ord k, Num k)
=> (k -> Map.Map k (RingEntry a k) -> Maybe (k, RingEntry a k))
-> (k -> Map.Map k (RingEntry a k) -> Maybe (k, RingEntry a k))
lookupWrapper :: (Bounded k, Ord k, Num k)
=> (k -> Map.Map k (RingEntry k a) -> Maybe (k, RingEntry k a))
-> (k -> Map.Map k (RingEntry k a) -> Maybe (k, RingEntry k a))
-> ProxyDirection
-> k
-> RingMap a k
-> Maybe a
-> RingMap k a
-> Maybe (k, a)
lookupWrapper f fRepeat direction key rmap =
case f key $ getRingMap rmap of
-- the proxy entry found holds a
Just (_, ProxyEntry _ (Just (KeyEntry entry))) -> Just entry
Just (foundKey, ProxyEntry _ (Just (KeyEntry entry))) -> Just (foundKey, entry)
-- proxy entry holds another proxy entry, this should not happen
Just (_, ProxyEntry _ (Just (ProxyEntry _ _))) -> Nothing
-- proxy entry without own entry is a pointer on where to continue
@ -106,10 +108,10 @@ lookupWrapper f fRepeat direction key rmap =
then lookupWrapper fRepeat fRepeat direction newKey rmap
else Nothing
-- normal entries are returned
Just (_, KeyEntry entry) -> Just entry
Just (foundKey, KeyEntry entry) -> Just (foundKey, entry)
Nothing -> Nothing
where
rMapNotEmpty :: (HasKeyID a k, Bounded k, Ord k) => RingMap a k -> Bool
rMapNotEmpty :: (Bounded k, Ord k) => RingMap k a -> Bool
rMapNotEmpty rmap' = (Map.size (getRingMap rmap') > 2) -- there are more than the 2 ProxyEntries
|| isJust (rMapLookup minBound rmap') -- or one of the ProxyEntries holds a node
|| isJust (rMapLookup maxBound rmap')
@ -117,32 +119,34 @@ lookupWrapper f fRepeat direction key rmap =
-- | find the successor node to a given key on a modular EpiChord ring.
-- Note: The EpiChord definition of "successor" includes the node at the key itself,
-- if existing.
rMapLookupSucc :: (HasKeyID a k, Bounded k, Ord k, Num k)
rMapLookupSucc :: (Bounded k, Ord k, Num k)
=> k -- ^lookup key
-> RingMap a k -- ^ring cache
-> Maybe a
-> RingMap k a -- ^ring cache
-> Maybe (k, a)
rMapLookupSucc = lookupWrapper Map.lookupGE Map.lookupGE Forwards
-- | find the predecessor node to a given key on a modular EpiChord ring.
rMapLookupPred :: (HasKeyID a k, Bounded k, Ord k, Num k)
rMapLookupPred :: (Bounded k, Ord k, Num k)
=> k -- ^lookup key
-> RingMap a k -- ^ring cache
-> Maybe a
-> RingMap k a -- ^ring cache
-> Maybe (k, a)
rMapLookupPred = lookupWrapper Map.lookupLT Map.lookupLE Backwards
addRMapEntryWith :: (HasKeyID a k, Bounded k, Ord k)
=> (RingEntry a k -> RingEntry a k -> RingEntry a k)
-> a
-> RingMap a k
-> RingMap a k
addRMapEntryWith combineFunc entry = RingMap
. Map.insertWith combineFunc (getKeyID entry) (KeyEntry entry)
addRMapEntryWith :: (Bounded k, Ord k)
=> (RingEntry k a -> RingEntry k a -> RingEntry k a)
-> k -- ^ key
-> a -- ^ value
-> RingMap k a
-> RingMap k a
addRMapEntryWith combineFunc key entry = RingMap
. Map.insertWith combineFunc key (KeyEntry entry)
. getRingMap
addRMapEntry :: (HasKeyID a k, Bounded k, Ord k)
=> a
-> RingMap a k
-> RingMap a k
addRMapEntry :: (Bounded k, Ord k)
=> k -- ^ key
-> a -- ^ value
-> RingMap k a
-> RingMap k a
addRMapEntry = addRMapEntryWith insertCombineFunction
where
insertCombineFunction newVal oldVal =
@ -151,30 +155,30 @@ addRMapEntry = addRMapEntryWith insertCombineFunction
KeyEntry _ -> newVal
addRMapEntries :: (Foldable t, HasKeyID a k, Bounded k, Ord k)
=> t a
-> RingMap a k
-> RingMap a k
addRMapEntries entries rmap = foldr' addRMapEntry rmap entries
addRMapEntries :: (Foldable t, Bounded k, Ord k)
=> t (k, a)
-> RingMap k a
-> RingMap k a
addRMapEntries entries rmap = foldr' (\(k, v) rmap' -> addRMapEntry k v rmap') rmap entries
setRMapEntries :: (Foldable t, HasKeyID a k, Bounded k, Ord k)
=> t a
-> RingMap a k
setRMapEntries :: (Foldable t, Bounded k, Ord k)
=> t (k, a)
-> RingMap k a
setRMapEntries entries = addRMapEntries entries emptyRMap
deleteRMapEntry :: (HasKeyID a k, Bounded k, Ord k)
deleteRMapEntry :: (Bounded k, Ord k)
=> k
-> RingMap a k
-> RingMap a k
-> RingMap k a
-> RingMap k a
deleteRMapEntry nid = RingMap . Map.update modifier nid . getRingMap
where
modifier (ProxyEntry idPointer _) = Just (ProxyEntry idPointer Nothing)
modifier KeyEntry {} = Nothing
rMapToList :: (HasKeyID a k, Bounded k, Ord k) => RingMap a k -> [a]
rMapToList :: (Bounded k, Ord k) => RingMap k a -> [a]
rMapToList = mapMaybe extractRingEntry . Map.elems . getRingMap
rMapFromList :: (HasKeyID a k, Bounded k, Ord k) => [a] -> RingMap a k
rMapFromList :: (Bounded k, Ord k) => [(k, a)] -> RingMap k a
rMapFromList = setRMapEntries
-- | takes up to i entries from a 'RingMap' by calling a getter function on a
@ -182,49 +186,64 @@ rMapFromList = setRMapEntries
-- Stops once i entries have been taken or an entry has been encountered twice
-- (meaning the ring has been traversed completely).
-- Forms the basis for 'takeRMapSuccessors' and 'takeRMapPredecessors'.
takeRMapEntries_ :: (HasKeyID a k, Integral i, Bounded k, Ord k)
=> (k -> RingMap a k -> Maybe a)
-> k
-> i
-> RingMap a k
-> [a]
takeRMapEntries_ :: (Integral i, Bounded k, Ord k)
=> (k -> RingMap k a -> Maybe (k, a)) -- ^ parameterisable getter function to determine lookup direction
-> k -- ^ starting key
-> i -- ^ number of maximum values to take
-> RingMap k a
-> [a] -- ^ values taken
-- TODO: might be more efficient with dlists
takeRMapEntries_ getterFunc startAt num rmap = reverse $
case getterFunc startAt rmap of
Nothing -> []
Just anEntry -> takeEntriesUntil rmap getterFunc (getKeyID anEntry) (getKeyID anEntry) (num-1) [anEntry]
where
-- for some reason, just reusing the already-bound @rmap@ and @getterFunc@
-- variables leads to a type error, these need to be passed explicitly
takeEntriesUntil :: (HasKeyID a k, Integral i, Bounded k, Ord k)
=> RingMap a k
-> (k -> RingMap a k -> Maybe a) -- getter function
-> k
-> k
-> i
-> [a]
-> [a]
takeEntriesUntil rmap' getterFunc' havingReached previousEntry remaining takeAcc
| remaining <= 0 = takeAcc
| getKeyID (fromJust $ getterFunc' previousEntry rmap') == havingReached = takeAcc
| otherwise = let (Just gotEntry) = getterFunc' previousEntry rmap'
in takeEntriesUntil rmap' getterFunc' havingReached (getKeyID gotEntry) (remaining-1) (gotEntry:takeAcc)
Just (foundKey, anEntry) -> takeEntriesUntil_ rmap getterFunc foundKey foundKey (Just $ num-1) [anEntry]
takeRMapPredecessors :: (HasKeyID a k, Integral i, Bounded k, Ord k, Num k)
takeEntriesUntil_ :: (Integral i, Bounded k, Ord k)
=> RingMap k a
-> (k -> RingMap k a -> Maybe (k, a)) -- getter function
-> k -- limit value
-> k -- start value
-> Maybe i -- possible number limit
-> [a]
-> [a]
takeEntriesUntil_ rmap' getterFunc' havingReached previousEntry (Just remaining) takeAcc
-- length limit reached
| remaining <= 0 = takeAcc
takeEntriesUntil_ rmap' getterFunc' havingReached previousEntry numLimit takeAcc =
case nextEntry of
Just (fKey, gotEntry)
| fKey == havingReached -> takeAcc
| otherwise -> takeEntriesUntil_ rmap' getterFunc' havingReached fKey (fmap pred numLimit) (gotEntry:takeAcc)
Nothing -> takeAcc
where
nextEntry = getterFunc' previousEntry rmap'
takeRMapPredecessors :: (Integral i, Bounded k, Ord k, Num k)
=> k
-> i
-> RingMap a k
-> RingMap k a
-> [a]
takeRMapPredecessors = takeRMapEntries_ rMapLookupPred
takeRMapSuccessors :: (HasKeyID a k, Integral i, Bounded k, Ord k, Num k)
takeRMapSuccessors :: (Integral i, Bounded k, Ord k, Num k)
=> k
-> i
-> RingMap a k
-> RingMap k a
-> [a]
takeRMapSuccessors = takeRMapEntries_ rMapLookupSucc
-- clean up cache entries: once now - entry > maxAge
-- transfer difference now - entry to other node
takeRMapPredecessorsFromTo :: (Bounded k, Ord k, Num k)
=> k -- start value for taking
-> k -- stop value for taking
-> RingMap k a
-> [a]
takeRMapPredecessorsFromTo fromVal toVal rmap = takeEntriesUntil_ rmap rMapLookupPred toVal fromVal Nothing []
takeRMapSuccessorsFromTo :: (Bounded k, Ord k, Num k)
=> k -- start value for taking
-> k -- stop value for taking
-> RingMap k a
-> [a]
takeRMapSuccessorsFromTo fromVal toVal rmap = takeEntriesUntil_ rmap rMapLookupSucc toVal fromVal Nothing []

View file

@ -1,9 +0,0 @@
{-# LANGUAGE MultiParamTypeClasses #-}
module Hash2Pub.ServiceTypes where
import Hash2Pub.FediChord (DHT (..))
class Service s d where
-- | run the service
runService :: (Integral i) => d -> String -> i -> IO (s d)
getServicePort :: (Integral i) => s d -> i

View file

@ -1,4 +1,6 @@
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
module FediChordSpec where
import Control.Concurrent.STM.TVar
@ -292,12 +294,15 @@ exampleNodeState = RemoteNodeState {
, vServerID = 0
}
exampleLocalNode :: IO LocalNodeState
exampleLocalNode = nodeStateInit =<< (newTVarIO $ RealNode {
exampleLocalNode :: IO (LocalNodeState MockService)
exampleLocalNode = do
realNode <- newTVarIO $ RealNode {
vservers = []
, nodeConfig = exampleFediConf
, bootstrapNodes = confBootstrapNodes exampleFediConf
})
, nodeService = MockService
}
nodeStateInit realNode
exampleFediConf :: FediChordConf
@ -313,3 +318,9 @@ exampleVs :: (Integral i) => i
exampleVs = 4
exampleIp :: HostAddress6
exampleIp = tupleToHostAddress6 (0x2001, 0x16b8, 0x755a, 0xb110, 0x7d6a, 0x12ab, 0xf0c5, 0x386e)
data MockService d = MockService
instance DHT d => Service MockService d where
runService _ _ = pure MockService
getListeningPortFromService = const 1337