define data types for post and subscription storage

This commit is contained in:
Trolli Schmittlauch 2020-07-27 13:20:15 +02:00
parent 7878c67635
commit 04423171fd
4 changed files with 29 additions and 5 deletions

View file

@ -46,7 +46,7 @@ category: Network
extra-source-files: CHANGELOG.md extra-source-files: CHANGELOG.md
common deps common deps
build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable
ghc-options: -Wall ghc-options: -Wall

View file

@ -413,7 +413,7 @@ data FediChordConf = FediChordConf
class DHT d where class DHT d where
-- | lookup the responsible host handling a given key string, -- | lookup the responsible host handling a given key string,
-- possibly from a lookup cache -- possiblggy from a lookup cache
lookupKey :: d -> String -> IO (Maybe (String, PortNumber)) lookupKey :: d -> String -> IO (Maybe (String, PortNumber))
-- | lookup the responsible host handling a given key string, -- | lookup the responsible host handling a given key string,
-- but force the DHT to do a fresh lookup instead of returning a cached result. -- but force the DHT to do a fresh lookup instead of returning a cached result.

View file

@ -9,16 +9,21 @@
module Hash2Pub.PostService where module Hash2Pub.PostService where
import Control.Concurrent import Control.Concurrent
import Control.Concurrent.STM
import Control.Concurrent.STM.TChan
import Control.Concurrent.STM.TVar
import qualified Data.ByteString.Lazy.UTF8 as BSU import qualified Data.ByteString.Lazy.UTF8 as BSU
import qualified Data.HashMap.Strict as HMap import qualified Data.HashMap.Strict as HMap
import qualified Data.HashSet as HSet
import Data.Maybe (fromMaybe) import Data.Maybe (fromMaybe)
import Data.String (fromString) import Data.String (fromString)
import qualified Data.Text as Txt import qualified Data.Text as Txt
import Data.Time.Clock.POSIX
import qualified Network.Wai.Handler.Warp as Warp import qualified Network.Wai.Handler.Warp as Warp
import Servant import Servant
import Hash2Pub.FediChord import Hash2Pub.FediChordTypes
import Hash2Pub.RingMap import Hash2Pub.RingMap
import Hash2Pub.ServiceTypes import Hash2Pub.ServiceTypes
@ -29,6 +34,13 @@ data PostService d = PostService
-- queues, other data structures -- queues, other data structures
, baseDHT :: (DHT d) => d , baseDHT :: (DHT d) => d
, serviceThread :: ThreadId , serviceThread :: ThreadId
, subscribers :: TVar (RingMap NodeID TagSubscribers)
-- ^ for each tag store the subscribers + their queue
, ownSubscriptions :: TVar (HMap.HashMap NodeID POSIXTime)
-- ^ tags subscribed by the own node have an assigned lease time
, ownPosts :: TVar (HSet.HashSet Txt.Text)
-- ^ just store the existence of posts for saving memory,
-- always return the same placeholder
} }
instance DHT d => Service PostService d where instance DHT d => Service PostService d where
@ -45,12 +57,18 @@ instance DHT d => Service PostService d where
} }
getServicePort s = fromIntegral $ psPort s getServicePort s = fromIntegral $ psPort s
type PostContent = Txt.Text
-- | For each handled tag, store its subscribers and provide a
-- broadcast 'TChan' for enqueuing posts
type RelayTags = RingMap NodeID (TagSubscribers, TChan PostContent)
-- | each subscriber is identified by its contact data "hostname" "port"
-- and holds a TChan duplicated from the broadcast TChan of the tag
type TagSubscribers = HMap.HashMap (String, Int) (TChan PostContent)
-- | return a WAI application -- | return a WAI application
postServiceApplication :: Application postServiceApplication :: Application
postServiceApplication = serve exposedPostServiceAPI postServer postServiceApplication = serve exposedPostServiceAPI postServer
servicePort = 8081
-- | needed for guiding type inference -- | needed for guiding type inference
exposedPostServiceAPI :: Proxy PostServiceAPI exposedPostServiceAPI :: Proxy PostServiceAPI

View file

@ -1,9 +1,15 @@
{-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE MultiParamTypeClasses #-}
module Hash2Pub.ServiceTypes where module Hash2Pub.ServiceTypes where
import Hash2Pub.FediChord (DHT (..)) import Data.Hashable (Hashable(..))
import Hash2Pub.FediChord (DHT (..), NodeID(..))
class Service s d where class Service s d where
-- | run the service -- | run the service
runService :: (Integral i) => d -> String -> i -> IO (s d) runService :: (Integral i) => d -> String -> i -> IO (s d)
getServicePort :: (Integral i) => s d -> i getServicePort :: (Integral i) => s d -> i
instance Hashable NodeID where
hashWithSalt salt = hashWithSalt salt . getNodeID
hash = hash . getNodeID