From 988144e9e7f9f9c22e4f43d5fdcac603d750a217 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Sun, 26 Jul 2020 18:55:23 +0200 Subject: [PATCH 1/5] further relax constrains on RingMap key now needs to be explicitly given at insert, instead of deriving it from the value. This makes it possible to store values where a key cannot be extracted from (HasKeyID) contributes to #62, #32, #41 --- src/Hash2Pub/RingMap.hs | 155 +++++++++++++++++++++------------------- 1 file changed, 80 insertions(+), 75 deletions(-) diff --git a/src/Hash2Pub/RingMap.hs b/src/Hash2Pub/RingMap.hs index 529a68b..9c7f63b 100644 --- a/src/Hash2Pub/RingMap.hs +++ b/src/Hash2Pub/RingMap.hs @@ -10,31 +10,31 @@ import Data.Maybe (fromJust, isJust, isNothing, mapMaybe) -- | Class for all types that can be identified via a EpiChord key. -- Used for restricting the types a 'RingMap' can store -class (Eq a, Show a, Bounded k, Ord k) => HasKeyID a k where +class (Eq a, Show a, Bounded k, Ord k) => HasKeyID k a where getKeyID :: a -> k -- | generic data structure for holding elements with a key and modular lookup -newtype RingMap a k = RingMap { getRingMap :: (HasKeyID a k, Bounded k, Ord k) => Map.Map k (RingEntry a k) } +newtype RingMap k a = RingMap { getRingMap :: (Bounded k, Ord k) => Map.Map k (RingEntry k a) } -instance (HasKeyID a k, Bounded k, Ord k) => Eq (RingMap a k) where +instance (Bounded k, Ord k, Eq a) => Eq (RingMap k a) where a == b = getRingMap a == getRingMap b -instance (HasKeyID a k, Bounded k, Ord k, Show k) => Show (RingMap a k) where +instance (Bounded k, Ord k, Show k, Show a) => Show (RingMap k a) where show rmap = shows "RingMap " (show $ getRingMap rmap) -- | entry of a 'RingMap' that holds a value and can also -- wrap around the lookup direction at the edges of the name space. -data RingEntry a k = KeyEntry a - | ProxyEntry (k, ProxyDirection) (Maybe (RingEntry a k)) +data RingEntry k a = KeyEntry a + | ProxyEntry (k, ProxyDirection) (Maybe (RingEntry k a)) deriving (Show, Eq) -- | as a compromise, only KeyEntry components are ordered by their key -- while ProxyEntry components should never be tried to be ordered. -instance (HasKeyID a k, Eq k, Ord a, Bounded k, Ord k) => Ord (RingEntry a k) where +instance (HasKeyID k a, Eq k, Ord a, Bounded k, Ord k) => Ord (RingEntry k a) where a `compare` b = compare (extractID a) (extractID b) where - extractID :: (HasKeyID a k, Ord a, Bounded k, Ord k) => RingEntry a k -> k + extractID :: (HasKeyID k a, Ord a, Bounded k, Ord k) => RingEntry k a -> k extractID (KeyEntry e) = getKeyID e extractID ProxyEntry{} = error "proxy entries should never appear outside of the RingMap" @@ -49,51 +49,51 @@ instance Enum ProxyDirection where fromEnum Backwards = - 1 fromEnum Forwards = 1 --- | helper function for getting the a from a RingEntry a k -extractRingEntry :: (HasKeyID a k, Bounded k, Ord k) => RingEntry a k -> Maybe a +-- | helper function for getting the a from a RingEntry k a +extractRingEntry :: (Bounded k, Ord k) => RingEntry k a -> Maybe a extractRingEntry (KeyEntry entry) = Just entry extractRingEntry (ProxyEntry _ (Just (KeyEntry entry))) = Just entry extractRingEntry _ = Nothing -- | An empty 'RingMap' needs to be initialised with 2 proxy entries, -- linking the modular name space together by connecting @minBound@ and @maxBound@ -emptyRMap :: (HasKeyID a k, Bounded k, Ord k) => RingMap a k +emptyRMap :: (Bounded k, Ord k) => RingMap k a emptyRMap = RingMap . Map.fromList $ proxyEntry <$> [(maxBound, (minBound, Forwards)), (minBound, (maxBound, Backwards))] where proxyEntry (from,to) = (from, ProxyEntry to Nothing) -- | Maybe returns the entry stored at given key -rMapLookup :: (HasKeyID a k, Bounded k, Ord k) +rMapLookup :: (Bounded k, Ord k) => k -- ^lookup key - -> RingMap a k -- ^lookup cache + -> RingMap k a -- ^lookup cache -> Maybe a rMapLookup key rmap = extractRingEntry =<< Map.lookup key (getRingMap rmap) -- | returns number of present 'KeyEntry' in a properly initialised 'RingMap' -rMapSize :: (HasKeyID a k, Integral i, Bounded k, Ord k) - => RingMap a k +rMapSize :: (Integral i, Bounded k, Ord k) + => RingMap k a -> i rMapSize rmap = fromIntegral $ Map.size innerMap - oneIfEntry rmap minBound - oneIfEntry rmap maxBound where innerMap = getRingMap rmap - oneIfEntry :: (HasKeyID a k, Integral i, Bounded k, Ord k) => RingMap a k -> k -> i + oneIfEntry :: (Integral i, Bounded k, Ord k) => RingMap k a -> k -> i oneIfEntry rmap' nid | isNothing (rMapLookup nid rmap') = 1 | otherwise = 0 -- | a wrapper around lookup functions, making the lookup redirectable by a @ProxyEntry@ -- to simulate a modular ring -lookupWrapper :: (HasKeyID a k, Bounded k, Ord k, Num k) - => (k -> Map.Map k (RingEntry a k) -> Maybe (k, RingEntry a k)) - -> (k -> Map.Map k (RingEntry a k) -> Maybe (k, RingEntry a k)) +lookupWrapper :: (Bounded k, Ord k, Num k) + => (k -> Map.Map k (RingEntry k a) -> Maybe (k, RingEntry k a)) + -> (k -> Map.Map k (RingEntry k a) -> Maybe (k, RingEntry k a)) -> ProxyDirection -> k - -> RingMap a k - -> Maybe a + -> RingMap k a + -> Maybe (k, a) lookupWrapper f fRepeat direction key rmap = case f key $ getRingMap rmap of -- the proxy entry found holds a - Just (_, ProxyEntry _ (Just (KeyEntry entry))) -> Just entry + Just (foundKey, ProxyEntry _ (Just (KeyEntry entry))) -> Just (foundKey, entry) -- proxy entry holds another proxy entry, this should not happen Just (_, ProxyEntry _ (Just (ProxyEntry _ _))) -> Nothing -- proxy entry without own entry is a pointer on where to continue @@ -106,10 +106,10 @@ lookupWrapper f fRepeat direction key rmap = then lookupWrapper fRepeat fRepeat direction newKey rmap else Nothing -- normal entries are returned - Just (_, KeyEntry entry) -> Just entry + Just (foundKey, KeyEntry entry) -> Just (foundKey, entry) Nothing -> Nothing where - rMapNotEmpty :: (HasKeyID a k, Bounded k, Ord k) => RingMap a k -> Bool + rMapNotEmpty :: (Bounded k, Ord k) => RingMap k a -> Bool rMapNotEmpty rmap' = (Map.size (getRingMap rmap') > 2) -- there are more than the 2 ProxyEntries || isJust (rMapLookup minBound rmap') -- or one of the ProxyEntries holds a node || isJust (rMapLookup maxBound rmap') @@ -117,32 +117,34 @@ lookupWrapper f fRepeat direction key rmap = -- | find the successor node to a given key on a modular EpiChord ring. -- Note: The EpiChord definition of "successor" includes the node at the key itself, -- if existing. -rMapLookupSucc :: (HasKeyID a k, Bounded k, Ord k, Num k) +rMapLookupSucc :: (Bounded k, Ord k, Num k) => k -- ^lookup key - -> RingMap a k -- ^ring cache - -> Maybe a + -> RingMap k a -- ^ring cache + -> Maybe (k, a) rMapLookupSucc = lookupWrapper Map.lookupGE Map.lookupGE Forwards -- | find the predecessor node to a given key on a modular EpiChord ring. -rMapLookupPred :: (HasKeyID a k, Bounded k, Ord k, Num k) +rMapLookupPred :: (Bounded k, Ord k, Num k) => k -- ^lookup key - -> RingMap a k -- ^ring cache - -> Maybe a + -> RingMap k a -- ^ring cache + -> Maybe (k, a) rMapLookupPred = lookupWrapper Map.lookupLT Map.lookupLE Backwards -addRMapEntryWith :: (HasKeyID a k, Bounded k, Ord k) - => (RingEntry a k -> RingEntry a k -> RingEntry a k) - -> a - -> RingMap a k - -> RingMap a k -addRMapEntryWith combineFunc entry = RingMap - . Map.insertWith combineFunc (getKeyID entry) (KeyEntry entry) +addRMapEntryWith :: (Bounded k, Ord k) + => (RingEntry k a -> RingEntry k a -> RingEntry k a) + -> k -- ^ key + -> a -- ^ value + -> RingMap k a + -> RingMap k a +addRMapEntryWith combineFunc key entry = RingMap + . Map.insertWith combineFunc key (KeyEntry entry) . getRingMap -addRMapEntry :: (HasKeyID a k, Bounded k, Ord k) - => a - -> RingMap a k - -> RingMap a k +addRMapEntry :: (Bounded k, Ord k) + => k -- ^ key + -> a -- ^ value + -> RingMap k a + -> RingMap k a addRMapEntry = addRMapEntryWith insertCombineFunction where insertCombineFunction newVal oldVal = @@ -151,30 +153,30 @@ addRMapEntry = addRMapEntryWith insertCombineFunction KeyEntry _ -> newVal -addRMapEntries :: (Foldable t, HasKeyID a k, Bounded k, Ord k) - => t a - -> RingMap a k - -> RingMap a k -addRMapEntries entries rmap = foldr' addRMapEntry rmap entries +addRMapEntries :: (Foldable t, Bounded k, Ord k) + => t (k, a) + -> RingMap k a + -> RingMap k a +addRMapEntries entries rmap = foldr' (\(k, v) rmap' -> addRMapEntry k v rmap') rmap entries -setRMapEntries :: (Foldable t, HasKeyID a k, Bounded k, Ord k) - => t a - -> RingMap a k +setRMapEntries :: (Foldable t, Bounded k, Ord k) + => t (k, a) + -> RingMap k a setRMapEntries entries = addRMapEntries entries emptyRMap -deleteRMapEntry :: (HasKeyID a k, Bounded k, Ord k) +deleteRMapEntry :: (Bounded k, Ord k) => k - -> RingMap a k - -> RingMap a k + -> RingMap k a + -> RingMap k a deleteRMapEntry nid = RingMap . Map.update modifier nid . getRingMap where modifier (ProxyEntry idPointer _) = Just (ProxyEntry idPointer Nothing) modifier KeyEntry {} = Nothing -rMapToList :: (HasKeyID a k, Bounded k, Ord k) => RingMap a k -> [a] +rMapToList :: (Bounded k, Ord k) => RingMap k a -> [a] rMapToList = mapMaybe extractRingEntry . Map.elems . getRingMap -rMapFromList :: (HasKeyID a k, Bounded k, Ord k) => [a] -> RingMap a k +rMapFromList :: (Bounded k, Ord k) => [(k, a)] -> RingMap k a rMapFromList = setRMapEntries -- | takes up to i entries from a 'RingMap' by calling a getter function on a @@ -182,49 +184,52 @@ rMapFromList = setRMapEntries -- Stops once i entries have been taken or an entry has been encountered twice -- (meaning the ring has been traversed completely). -- Forms the basis for 'takeRMapSuccessors' and 'takeRMapPredecessors'. -takeRMapEntries_ :: (HasKeyID a k, Integral i, Bounded k, Ord k) - => (k -> RingMap a k -> Maybe a) - -> k - -> i - -> RingMap a k - -> [a] +takeRMapEntries_ :: (Integral i, Bounded k, Ord k) + => (k -> RingMap k a -> Maybe (k, a)) -- ^ parameterisable getter function to determine lookup direction + -> k -- ^ starting key + -> i -- ^ number of maximum values to take + -> RingMap k a + -> [a] -- ^ values taken -- TODO: might be more efficient with dlists takeRMapEntries_ getterFunc startAt num rmap = reverse $ case getterFunc startAt rmap of Nothing -> [] - Just anEntry -> takeEntriesUntil rmap getterFunc (getKeyID anEntry) (getKeyID anEntry) (num-1) [anEntry] + Just (foundKey, anEntry) -> takeEntriesUntil rmap getterFunc foundKey foundKey (num-1) [anEntry] where -- for some reason, just reusing the already-bound @rmap@ and @getterFunc@ -- variables leads to a type error, these need to be passed explicitly - takeEntriesUntil :: (HasKeyID a k, Integral i, Bounded k, Ord k) - => RingMap a k - -> (k -> RingMap a k -> Maybe a) -- getter function + takeEntriesUntil :: (Integral i, Bounded k, Ord k) + => RingMap k a + -> (k -> RingMap k a -> Maybe (k, a)) -- getter function -> k -> k -> i -> [a] -> [a] takeEntriesUntil rmap' getterFunc' havingReached previousEntry remaining takeAcc + -- length limit reached | remaining <= 0 = takeAcc - | getKeyID (fromJust $ getterFunc' previousEntry rmap') == havingReached = takeAcc - | otherwise = let (Just gotEntry) = getterFunc' previousEntry rmap' - in takeEntriesUntil rmap' getterFunc' havingReached (getKeyID gotEntry) (remaining-1) (gotEntry:takeAcc) + -- + | otherwise = case nextEntry of + Just (fKey, gotEntry) + | fKey == havingReached -> takeAcc + | otherwise -> takeEntriesUntil rmap' getterFunc' havingReached fKey (remaining - 1) (gotEntry:takeAcc) + Nothing -> takeAcc + where + nextEntry = getterFunc' previousEntry rmap' -takeRMapPredecessors :: (HasKeyID a k, Integral i, Bounded k, Ord k, Num k) + +takeRMapPredecessors :: (Integral i, Bounded k, Ord k, Num k) => k -> i - -> RingMap a k + -> RingMap k a -> [a] takeRMapPredecessors = takeRMapEntries_ rMapLookupPred -takeRMapSuccessors :: (HasKeyID a k, Integral i, Bounded k, Ord k, Num k) +takeRMapSuccessors :: (Integral i, Bounded k, Ord k, Num k) => k -> i - -> RingMap a k + -> RingMap k a -> [a] takeRMapSuccessors = takeRMapEntries_ rMapLookupSucc --- clean up cache entries: once now - entry > maxAge --- transfer difference now - entry to other node - - From 7878c67635d2e11ef2bcf81783b1ffe7e19cd8ca Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Mon, 27 Jul 2020 00:37:17 +0200 Subject: [PATCH 2/5] adjust rest of code to refactored RingMap --- src/Hash2Pub/DHTProtocol.hs | 21 ++++++++++++--------- src/Hash2Pub/FediChordTypes.hs | 25 ++++++++++++++----------- src/Hash2Pub/PostService.hs | 4 +++- src/Hash2Pub/RingMap.hs | 4 +++- 4 files changed, 32 insertions(+), 22 deletions(-) diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index d69d94c..546c10f 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -130,23 +130,25 @@ closestCachePredecessors remainingLookups lastID nCache -- Looks up the successor of the lookup key on a 'RingMap' representation of the -- predecessor list with the node itself added. If the result is the same as the node -- itself then it falls into the responsibility interval. -isInOwnResponsibilitySlice :: HasKeyID a NodeID => a -> LocalNodeState -> Bool -isInOwnResponsibilitySlice lookupTarget ownNs = (getKeyID <$> rMapLookupSucc (getKeyID lookupTarget :: NodeID) predecessorRMap) == pure (getNid ownNs) +isInOwnResponsibilitySlice :: HasKeyID NodeID a => a -> LocalNodeState -> Bool +isInOwnResponsibilitySlice lookupTarget ownNs = (fst <$> rMapLookupSucc (getKeyID lookupTarget :: NodeID) predecessorRMap) == pure (getNid ownNs) where predecessorList = predecessors ownNs -- add node itself to RingMap representation, to distinguish between -- responsibility of own node and predecessor - predecessorRMap = addRMapEntry (toRemoteNodeState ownNs) $ rMapFromList predecessorList + predecessorRMap = addRMapEntry (getKeyID ownRemote) ownRemote $ rMapFromList (keyValuePair <$> predecessorList) :: RingMap NodeID RemoteNodeState + ownRemote = toRemoteNodeState ownNs closestPredecessor = headMay predecessorList -isPossiblePredecessor :: HasKeyID a NodeID => a -> LocalNodeState -> Bool +isPossiblePredecessor :: HasKeyID NodeID a => a -> LocalNodeState -> Bool isPossiblePredecessor = isInOwnResponsibilitySlice -isPossibleSuccessor :: HasKeyID a NodeID => a -> LocalNodeState -> Bool -isPossibleSuccessor lookupTarget ownNs = (getKeyID <$> rMapLookupPred (getKeyID lookupTarget :: NodeID) successorRMap) == pure (getNid ownNs) +isPossibleSuccessor :: HasKeyID NodeID a => a -> LocalNodeState -> Bool +isPossibleSuccessor lookupTarget ownNs = (fst <$> rMapLookupPred (getKeyID lookupTarget :: NodeID) successorRMap) == pure (getNid ownNs) where successorList = successors ownNs - successorRMap = addRMapEntry (toRemoteNodeState ownNs) $ rMapFromList successorList + successorRMap = addRMapEntry (getKeyID ownRemote) ownRemote $ rMapFromList (keyValuePair <$> successorList) + ownRemote = toRemoteNodeState ownNs closestSuccessor = headMay successorList -- cache operations @@ -169,7 +171,8 @@ addCacheEntryPure now (RemoteCacheEntry ns ts) cache = let -- TODO: limit diffSeconds to some maximum value to prevent malicious nodes from inserting entries valid nearly until eternity timestamp' = if ts <= now then ts else now - newCache = addRMapEntryWith insertCombineFunction (CacheEntry False ns timestamp') cache + newEntry = CacheEntry False ns timestamp' + newCache = addRMapEntryWith insertCombineFunction (getKeyID newEntry) newEntry cache insertCombineFunction newVal@(KeyEntry (CacheEntry newValidationState newNode newTimestamp)) oldVal = case oldVal of ProxyEntry n _ -> ProxyEntry n (Just newVal) @@ -202,7 +205,7 @@ addNodeAsVerifiedPure :: POSIXTime -> RemoteNodeState -> NodeCache -> NodeCache -addNodeAsVerifiedPure now node = addRMapEntry (CacheEntry True node now) +addNodeAsVerifiedPure now node = addRMapEntry (getKeyID node) (CacheEntry True node now) diff --git a/src/Hash2Pub/FediChordTypes.hs b/src/Hash2Pub/FediChordTypes.hs index 7652f4f..6e0bef6 100644 --- a/src/Hash2Pub/FediChordTypes.hs +++ b/src/Hash2Pub/FediChordTypes.hs @@ -1,5 +1,6 @@ {-# LANGUAGE DataKinds #-} {-# LANGUAGE DerivingStrategies #-} +{-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE MultiParamTypeClasses #-} @@ -26,8 +27,7 @@ module Hash2Pub.FediChordTypes ( , CacheEntry(..) , RingEntry(..) , RingMap(..) - , HasKeyID - , getKeyID + , HasKeyID(..) , rMapSize , rMapLookup , rMapLookupPred @@ -271,31 +271,31 @@ instance Typeable a => Show (TQueue a) where -- | convenience function that replaces the predecessors of a 'LocalNodeState' with the k closest nodes from the provided list setPredecessors :: [RemoteNodeState] -> LocalNodeState -> LocalNodeState -setPredecessors preds ns = ns {predecessors = takeRMapPredecessors (getNid ns) (kNeighbours ns) . rMapFromList . filter ((/=) (getNid ns) . getNid) $ preds} +setPredecessors preds ns = ns {predecessors = takeRMapPredecessors (getNid ns) (kNeighbours ns) . rMapFromList . fmap keyValuePair . filter ((/=) (getNid ns) . getNid) $ preds} -- | convenience function that replaces the successors of a 'LocalNodeState' with the k closest nodes from the provided list setSuccessors :: [RemoteNodeState] -> LocalNodeState -> LocalNodeState -setSuccessors succs ns = ns {successors = takeRMapSuccessors (getNid ns) (kNeighbours ns) . rMapFromList . filter ((/=) (getNid ns) . getNid) $ succs} +setSuccessors succs ns = ns {successors = takeRMapSuccessors (getNid ns) (kNeighbours ns) . rMapFromList . fmap keyValuePair . filter ((/=) (getNid ns) . getNid) $ succs} -- | sets the predecessors of a 'LocalNodeState' to the closest k nodes of the current predecessors and the provided list, combined addPredecessors :: [RemoteNodeState] -> LocalNodeState -> LocalNodeState -addPredecessors preds ns = ns {predecessors = takeRMapPredecessors (getNid ns) (kNeighbours ns) . addRMapEntries (filter ((/=) (getNid ns) . getNid) preds) . rMapFromList $ predecessors ns} +addPredecessors preds ns = ns {predecessors = takeRMapPredecessors (getNid ns) (kNeighbours ns) . addRMapEntries (keyValuePair <$> filter ((/=) (getNid ns) . getNid) preds) . rMapFromList . fmap keyValuePair $ predecessors ns} -- | sets the successors of a 'LocalNodeState' to the closest k nodes of the current successors and the provided list, combined addSuccessors :: [RemoteNodeState] -> LocalNodeState -> LocalNodeState -addSuccessors succs ns = ns {successors = takeRMapSuccessors (getNid ns) (kNeighbours ns) . addRMapEntries (filter ((/=) (getNid ns) . getNid) succs) . rMapFromList $ successors ns} +addSuccessors succs ns = ns {successors = takeRMapSuccessors (getNid ns) (kNeighbours ns) . addRMapEntries (keyValuePair <$> filter ((/=) (getNid ns) . getNid) succs) . rMapFromList . fmap keyValuePair $ successors ns} -instance HasKeyID RemoteNodeState NodeID where +instance HasKeyID NodeID RemoteNodeState where getKeyID = getNid -instance HasKeyID a k => HasKeyID (CacheEntry a) k where +instance HasKeyID k a => HasKeyID k (CacheEntry a) where getKeyID (CacheEntry _ obj _) = getKeyID obj instance HasKeyID NodeID NodeID where getKeyID = id type NodeCacheEntry = CacheEntry RemoteNodeState -type NodeCache = RingMap NodeCacheEntry NodeID +type NodeCache = RingMap NodeID NodeCacheEntry type LookupCacheEntry = CacheEntry (String, PortNumber) type LookupCache = Map.Map NodeID LookupCacheEntry @@ -319,12 +319,15 @@ cacheLookup = rMapLookup cacheLookupSucc :: NodeID -- ^lookup key -> NodeCache -- ^ring cache -> Maybe NodeCacheEntry -cacheLookupSucc = rMapLookupSucc +cacheLookupSucc key cache = snd <$> rMapLookupSucc key cache cacheLookupPred :: NodeID -- ^lookup key -> NodeCache -- ^ring cache -> Maybe NodeCacheEntry -cacheLookupPred = rMapLookupPred +cacheLookupPred key cache = snd <$> rMapLookupPred key cache + +-- clean up cache entries: once now - entry > maxAge +-- transfer difference now - entry to other node -- | return the @NodeState@ data from a cache entry without checking its validation status cacheGetNodeStateUnvalidated :: CacheEntry RemoteNodeState -> RemoteNodeState diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index e8b325b..21a7238 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -1,15 +1,16 @@ {-# LANGUAGE DataKinds #-} {-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE InstanceSigs #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE TypeOperators #-} -{-# LANGUAGE InstanceSigs #-} module Hash2Pub.PostService where import Control.Concurrent import qualified Data.ByteString.Lazy.UTF8 as BSU +import qualified Data.HashMap.Strict as HMap import Data.Maybe (fromMaybe) import Data.String (fromString) import qualified Data.Text as Txt @@ -18,6 +19,7 @@ import qualified Network.Wai.Handler.Warp as Warp import Servant import Hash2Pub.FediChord +import Hash2Pub.RingMap import Hash2Pub.ServiceTypes diff --git a/src/Hash2Pub/RingMap.hs b/src/Hash2Pub/RingMap.hs index 9c7f63b..016f9f1 100644 --- a/src/Hash2Pub/RingMap.hs +++ b/src/Hash2Pub/RingMap.hs @@ -5,13 +5,15 @@ module Hash2Pub.RingMap where import Data.Foldable (foldr') import qualified Data.Map.Strict as Map -import Data.Maybe (fromJust, isJust, isNothing, mapMaybe) +import Data.Maybe (isJust, isNothing, mapMaybe) -- | Class for all types that can be identified via a EpiChord key. -- Used for restricting the types a 'RingMap' can store class (Eq a, Show a, Bounded k, Ord k) => HasKeyID k a where getKeyID :: a -> k + keyValuePair :: a -> (k, a) + keyValuePair val = (getKeyID val, val) -- | generic data structure for holding elements with a key and modular lookup From 04423171fdbc307b9c0d05d9f3ec16f6453ec5f9 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Mon, 27 Jul 2020 13:20:15 +0200 Subject: [PATCH 3/5] define data types for post and subscription storage --- Hash2Pub.cabal | 2 +- src/Hash2Pub/FediChordTypes.hs | 2 +- src/Hash2Pub/PostService.hs | 22 ++++++++++++++++++++-- src/Hash2Pub/ServiceTypes.hs | 8 +++++++- 4 files changed, 29 insertions(+), 5 deletions(-) diff --git a/Hash2Pub.cabal b/Hash2Pub.cabal index ebc9c7e..3ca520e 100644 --- a/Hash2Pub.cabal +++ b/Hash2Pub.cabal @@ -46,7 +46,7 @@ category: Network extra-source-files: CHANGELOG.md common deps - build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers + build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable ghc-options: -Wall diff --git a/src/Hash2Pub/FediChordTypes.hs b/src/Hash2Pub/FediChordTypes.hs index 6e0bef6..d764b71 100644 --- a/src/Hash2Pub/FediChordTypes.hs +++ b/src/Hash2Pub/FediChordTypes.hs @@ -413,7 +413,7 @@ data FediChordConf = FediChordConf class DHT d where -- | lookup the responsible host handling a given key string, - -- possibly from a lookup cache + -- possiblggy from a lookup cache lookupKey :: d -> String -> IO (Maybe (String, PortNumber)) -- | lookup the responsible host handling a given key string, -- but force the DHT to do a fresh lookup instead of returning a cached result. diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index 21a7238..bc1dc23 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -9,16 +9,21 @@ module Hash2Pub.PostService where import Control.Concurrent +import Control.Concurrent.STM +import Control.Concurrent.STM.TChan +import Control.Concurrent.STM.TVar import qualified Data.ByteString.Lazy.UTF8 as BSU import qualified Data.HashMap.Strict as HMap +import qualified Data.HashSet as HSet import Data.Maybe (fromMaybe) import Data.String (fromString) import qualified Data.Text as Txt +import Data.Time.Clock.POSIX import qualified Network.Wai.Handler.Warp as Warp import Servant -import Hash2Pub.FediChord +import Hash2Pub.FediChordTypes import Hash2Pub.RingMap import Hash2Pub.ServiceTypes @@ -29,6 +34,13 @@ data PostService d = PostService -- queues, other data structures , baseDHT :: (DHT d) => d , serviceThread :: ThreadId + , subscribers :: TVar (RingMap NodeID TagSubscribers) + -- ^ for each tag store the subscribers + their queue + , ownSubscriptions :: TVar (HMap.HashMap NodeID POSIXTime) + -- ^ tags subscribed by the own node have an assigned lease time + , ownPosts :: TVar (HSet.HashSet Txt.Text) + -- ^ just store the existence of posts for saving memory, + -- always return the same placeholder } instance DHT d => Service PostService d where @@ -45,12 +57,18 @@ instance DHT d => Service PostService d where } getServicePort s = fromIntegral $ psPort s +type PostContent = Txt.Text +-- | For each handled tag, store its subscribers and provide a +-- broadcast 'TChan' for enqueuing posts +type RelayTags = RingMap NodeID (TagSubscribers, TChan PostContent) +-- | each subscriber is identified by its contact data "hostname" "port" +-- and holds a TChan duplicated from the broadcast TChan of the tag +type TagSubscribers = HMap.HashMap (String, Int) (TChan PostContent) -- | return a WAI application postServiceApplication :: Application postServiceApplication = serve exposedPostServiceAPI postServer -servicePort = 8081 -- | needed for guiding type inference exposedPostServiceAPI :: Proxy PostServiceAPI diff --git a/src/Hash2Pub/ServiceTypes.hs b/src/Hash2Pub/ServiceTypes.hs index ab06052..430dc74 100644 --- a/src/Hash2Pub/ServiceTypes.hs +++ b/src/Hash2Pub/ServiceTypes.hs @@ -1,9 +1,15 @@ {-# LANGUAGE MultiParamTypeClasses #-} module Hash2Pub.ServiceTypes where -import Hash2Pub.FediChord (DHT (..)) +import Data.Hashable (Hashable(..)) + +import Hash2Pub.FediChord (DHT (..), NodeID(..)) class Service s d where -- | run the service runService :: (Integral i) => d -> String -> i -> IO (s d) getServicePort :: (Integral i) => s d -> i + +instance Hashable NodeID where + hashWithSalt salt = hashWithSalt salt . getNodeID + hash = hash . getNodeID From daae9d0b38182985963f896018a46c2435e78a80 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Mon, 27 Jul 2020 21:39:33 +0200 Subject: [PATCH 4/5] process and enqueue incoming posts --- src/Hash2Pub/PostService.hs | 146 ++++++++++++++++++++++------------- src/Hash2Pub/ServiceTypes.hs | 4 +- 2 files changed, 94 insertions(+), 56 deletions(-) diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index bc1dc23..fc3e5e8 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -11,16 +11,20 @@ module Hash2Pub.PostService where import Control.Concurrent import Control.Concurrent.STM import Control.Concurrent.STM.TChan +import Control.Concurrent.STM.TQueue import Control.Concurrent.STM.TVar -import qualified Data.ByteString.Lazy.UTF8 as BSU -import qualified Data.HashMap.Strict as HMap -import qualified Data.HashSet as HSet -import Data.Maybe (fromMaybe) -import Data.String (fromString) -import qualified Data.Text as Txt -import Data.Time.Clock.POSIX +import Control.Monad (forM_, forever) +import Control.Monad.IO.Class (liftIO) +import qualified Data.ByteString.Lazy.UTF8 as BSU +import qualified Data.HashMap.Strict as HMap +import qualified Data.HashSet as HSet +import Data.Maybe (fromMaybe) +import Data.String (fromString) +import qualified Data.Text.Lazy as Txt +import Data.Time.Clock.POSIX +import System.Random -import qualified Network.Wai.Handler.Warp as Warp +import qualified Network.Wai.Handler.Warp as Warp import Servant import Hash2Pub.FediChordTypes @@ -29,34 +33,23 @@ import Hash2Pub.ServiceTypes data PostService d = PostService - { psPort :: Warp.Port - , psHost :: String + { psPort :: Warp.Port + , psHost :: String -- queues, other data structures - , baseDHT :: (DHT d) => d - , serviceThread :: ThreadId - , subscribers :: TVar (RingMap NodeID TagSubscribers) + , baseDHT :: (DHT d) => d + , serviceThread :: TVar ThreadId + , subscribers :: TVar (RingMap NodeID TagSubscribers) -- ^ for each tag store the subscribers + their queue , ownSubscriptions :: TVar (HMap.HashMap NodeID POSIXTime) -- ^ tags subscribed by the own node have an assigned lease time - , ownPosts :: TVar (HSet.HashSet Txt.Text) + , ownPosts :: TVar (HSet.HashSet Txt.Text) -- ^ just store the existence of posts for saving memory, - -- always return the same placeholder + , relayInQueue :: TQueue (Hashtag, PostID, PostContent) + -- ^ Queue for processing incoming posts of own instance asynchronously } -instance DHT d => Service PostService d where - runService dht host port = do - let - port' = fromIntegral port - warpSettings = Warp.setPort port' . Warp.setHost (fromString host) $ Warp.defaultSettings - servThread <- forkIO $ Warp.runSettings warpSettings postServiceApplication - pure $ PostService { - psPort = port' - , psHost = host - , baseDHT = dht - , serviceThread = servThread - } - getServicePort s = fromIntegral $ psPort s - +type Hashtag = Txt.Text +type PostID = Txt.Text type PostContent = Txt.Text -- | For each handled tag, store its subscribers and provide a -- broadcast 'TChan' for enqueuing posts @@ -65,9 +58,40 @@ type RelayTags = RingMap NodeID (TagSubscribers, TChan PostContent) -- and holds a TChan duplicated from the broadcast TChan of the tag type TagSubscribers = HMap.HashMap (String, Int) (TChan PostContent) + +instance DHT d => Service PostService d where + -- | initialise 'PostService' data structures and run server + runService dht host port = do + -- create necessary TVars + threadVar <- newTVarIO =<< myThreadId -- own thread ID as placeholder + subscriberVar <- newTVarIO emptyRMap + ownSubsVar <- newTVarIO HMap.empty + ownPostVar <- newTVarIO HSet.empty + relayInQueue' <- newTQueueIO + let + thisService = PostService { + psPort = port' + , psHost = host + , baseDHT = dht + , serviceThread = threadVar + , subscribers = subscriberVar + , ownSubscriptions = ownSubsVar + , ownPosts = ownPostVar + , relayInQueue = relayInQueue' + } + port' = fromIntegral port + warpSettings = Warp.setPort port' . Warp.setHost (fromString host) $ Warp.defaultSettings + servThreadID <- forkIO $ Warp.runSettings warpSettings $ postServiceApplication thisService + -- update thread ID after fork + atomically $ writeTVar threadVar servThreadID + pure thisService + + getServicePort s = fromIntegral $ psPort s + + -- | return a WAI application -postServiceApplication :: Application -postServiceApplication = serve exposedPostServiceAPI postServer +postServiceApplication :: PostService d -> Application +postServiceApplication serv = serve exposedPostServiceAPI $ postServer serv -- | needed for guiding type inference @@ -78,7 +102,7 @@ exposedPostServiceAPI = Proxy -- ========= HTTP API and handlers ============= -type PostServiceAPI = "relay" :> "inbox" :> ReqBody '[PlainText] Txt.Text :> PostCreated '[PlainText] Txt.Text +type PostServiceAPI = "relay" :> "inbox" :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent -- ^ delivery endpoint of newly published posts of the relay's instance :<|> "relay" :> "subscribers" :> ReqBody '[PlainText] Txt.Text :> Post '[PlainText] Txt.Text -- ^ endpoint for delivering the subscriptions and outstanding queue @@ -97,37 +121,51 @@ type PostServiceAPI = "relay" :> "inbox" :> ReqBody '[PlainText] Txt.Text :> Pos -- the Origin header to $hashtag -postServer :: Server PostServiceAPI -postServer = relayInbox - :<|> subscriptionDelivery - :<|> postFetch - :<|> postMultiFetch - :<|> tagDelivery - :<|> tagSubscribe - :<|> tagUnsubscribe +postServer :: PostService d -> Server PostServiceAPI +postServer service = relayInbox service + :<|> subscriptionDelivery service + :<|> postFetch service + :<|> postMultiFetch service + :<|> tagDelivery service + :<|> tagSubscribe service + :<|> tagUnsubscribe service -relayInbox :: Txt.Text -> Handler Txt.Text -relayInbox post = pure $ "Here be InboxDragons with " <> post +relayInbox :: PostService d -> Txt.Text -> Handler NoContent +relayInbox serv post = do + -- extract contained hashtags + let + containedTags = fmap Txt.tail . filter ((==) '#' . Txt.head) . Txt.words $ post + -- generate post ID + postId <- liftIO $ Txt.pack . show <$> (randomRIO (0, 2^128-1) :: IO Integer) + -- add ID to own posts + liftIO . atomically $ modifyTVar' (ownPosts serv) (HSet.insert postId) + -- enqueue a relay job for each tag + liftIO $ forM_ (containedTags :: [Txt.Text]) (\tag -> + atomically $ writeTQueue (relayInQueue serv) (tag, postId, post) + ) + pure NoContent -subscriptionDelivery :: Txt.Text -> Handler Txt.Text -subscriptionDelivery subList = pure $ "Here be Subscription List dragons: " <> subList -postFetch :: Txt.Text -> Handler Txt.Text -postFetch postID = pure $ "Here be a post with dragon ID " <> postID -postMultiFetch :: Txt.Text -> Handler Txt.Text -postMultiFetch postIDs = pure $ "Here be multiple post dragons: " +subscriptionDelivery :: PostService d -> Txt.Text -> Handler Txt.Text +subscriptionDelivery serv subList = pure $ "Here be Subscription List dragons: " <> subList + +postFetch :: PostService d -> Txt.Text -> Handler Txt.Text +postFetch serv postID = pure $ "Here be a post with dragon ID " <> postID + +postMultiFetch :: PostService d -> Txt.Text -> Handler Txt.Text +postMultiFetch serv postIDs = pure $ "Here be multiple post dragons: " <> (Txt.unwords . Txt.lines $ postIDs) -tagDelivery :: Txt.Text -> Txt.Text -> Handler Txt.Text -tagDelivery hashtag posts = pure $ "Here be #" <> hashtag <> " dragons with " <> posts +tagDelivery :: PostService d -> Txt.Text -> Txt.Text -> Handler Txt.Text +tagDelivery serv hashtag posts = pure $ "Here be #" <> hashtag <> " dragons with " <> posts -tagSubscribe :: Txt.Text -> Maybe Txt.Text -> Handler Integer -tagSubscribe hashtag origin = pure 42 +tagSubscribe :: PostService d -> Txt.Text -> Maybe Txt.Text -> Handler Integer +tagSubscribe serv hashtag origin = pure 42 -tagUnsubscribe :: Txt.Text -> Maybe Txt.Text -> Handler Txt.Text -tagUnsubscribe hashtag origin = pure $ "Here be a dragon unsubscription from " <> fromMaybe "Nothing" origin <> " to " <> hashtag +tagUnsubscribe :: PostService d -> Txt.Text -> Maybe Txt.Text -> Handler Txt.Text +tagUnsubscribe serv hashtag origin = pure $ "Here be a dragon unsubscription from " <> fromMaybe "Nothing" origin <> " to " <> hashtag -- | define how to convert all showable types to PlainText diff --git a/src/Hash2Pub/ServiceTypes.hs b/src/Hash2Pub/ServiceTypes.hs index 430dc74..5e2b37c 100644 --- a/src/Hash2Pub/ServiceTypes.hs +++ b/src/Hash2Pub/ServiceTypes.hs @@ -1,9 +1,9 @@ {-# LANGUAGE MultiParamTypeClasses #-} module Hash2Pub.ServiceTypes where -import Data.Hashable (Hashable(..)) +import Data.Hashable (Hashable (..)) -import Hash2Pub.FediChord (DHT (..), NodeID(..)) +import Hash2Pub.FediChord (DHT (..), NodeID (..)) class Service s d where -- | run the service From 736815ea831bc7fdecd0f9680f3218b70684b6ce Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Mon, 27 Jul 2020 21:49:42 +0200 Subject: [PATCH 5/5] normalise hastag unicode representation of incoming posts --- Hash2Pub.cabal | 2 +- src/Hash2Pub/PostService.hs | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/Hash2Pub.cabal b/Hash2Pub.cabal index 3ca520e..56441ad 100644 --- a/Hash2Pub.cabal +++ b/Hash2Pub.cabal @@ -46,7 +46,7 @@ category: Network extra-source-files: CHANGELOG.md common deps - build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable + build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms ghc-options: -Wall diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index fc3e5e8..e44c8c6 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -21,6 +21,8 @@ import qualified Data.HashSet as HSet import Data.Maybe (fromMaybe) import Data.String (fromString) import qualified Data.Text.Lazy as Txt +import Data.Text.Normalize (NormalizationMode (NFC), + normalize) import Data.Time.Clock.POSIX import System.Random @@ -135,7 +137,7 @@ relayInbox :: PostService d -> Txt.Text -> Handler NoContent relayInbox serv post = do -- extract contained hashtags let - containedTags = fmap Txt.tail . filter ((==) '#' . Txt.head) . Txt.words $ post + containedTags = fmap (Txt.fromStrict . normalize NFC . Txt.toStrict . Txt.tail) . filter ((==) '#' . Txt.head) . Txt.words $ post -- generate post ID postId <- liftIO $ Txt.pack . show <$> (randomRIO (0, 2^128-1) :: IO Integer) -- add ID to own posts