From bcabe494e307628826a919f6ff958e50c8395779 Mon Sep 17 00:00:00 2001 From: Harvey Kandola Date: Mon, 1 May 2017 12:13:05 +0100 Subject: [PATCH] event handlers framework --- core/api/endpoint/conversion_endpoint.go | 2 - core/api/endpoint/global_endpoint.go | 3 + core/api/endpoint/templates_endpoint.go | 6 +- core/api/endpoint/user_endpoint.go | 11 ++ core/api/request/account.go | 17 ++ core/api/request/user.go | 17 ++ core/event/LICENSE | 22 +++ core/event/README.md | 162 ++++++++++++++++++ core/event/client.go | 122 +++++++++++++ core/event/event_bus.go | 207 +++++++++++++++++++++++ core/event/event_bus_test.go | 156 +++++++++++++++++ core/event/network_bus.go | 76 +++++++++ core/event/network_bus_test.go | 112 ++++++++++++ core/event/new.go | 30 ++++ core/event/server.go | 153 +++++++++++++++++ 15 files changed, 1091 insertions(+), 5 deletions(-) create mode 100755 core/event/LICENSE create mode 100755 core/event/README.md create mode 100755 core/event/client.go create mode 100755 core/event/event_bus.go create mode 100755 core/event/event_bus_test.go create mode 100755 core/event/network_bus.go create mode 100755 core/event/network_bus_test.go create mode 100644 core/event/new.go create mode 100755 core/event/server.go diff --git a/core/api/endpoint/conversion_endpoint.go b/core/api/endpoint/conversion_endpoint.go index 455b95f5..c3d006b1 100644 --- a/core/api/endpoint/conversion_endpoint.go +++ b/core/api/endpoint/conversion_endpoint.go @@ -110,14 +110,12 @@ func convertDocument(w http.ResponseWriter, r *http.Request, job, folderID strin // All the commented-out code below should be in following function call newDocument, err := processDocument(p, filename, job, folderID, fileResult) - if err != nil { writeServerError(w, method, err) return } json, err := json.Marshal(newDocument) - if err != nil { writeJSONMarshalError(w, method, "conversion", err) return diff --git a/core/api/endpoint/global_endpoint.go b/core/api/endpoint/global_endpoint.go index a610be82..4de462b1 100644 --- a/core/api/endpoint/global_endpoint.go +++ b/core/api/endpoint/global_endpoint.go @@ -20,6 +20,7 @@ import ( "fmt" "github.com/documize/community/core/api/request" "github.com/documize/community/core/api/util" + "github.com/documize/community/core/event" ) // GetSMTPConfig returns installation-wide SMTP settings @@ -157,6 +158,8 @@ func SaveLicense(w http.ResponseWriter, r *http.Request) { request.ConfigSet("EDITION-LICENSE", js) + event.Handler().Publish(string(event.TypeSystemLicenseChange)) + util.WriteSuccessEmptyJSON(w) } diff --git a/core/api/endpoint/templates_endpoint.go b/core/api/endpoint/templates_endpoint.go index f5e69f5f..a94bffe2 100644 --- a/core/api/endpoint/templates_endpoint.go +++ b/core/api/endpoint/templates_endpoint.go @@ -28,6 +28,7 @@ import ( "github.com/documize/community/core/api/request" "github.com/documize/community/core/api/util" api "github.com/documize/community/core/convapi" + "github.com/documize/community/core/event" "github.com/documize/community/core/log" "github.com/documize/community/core/utility" @@ -389,7 +390,6 @@ func StartDocumentFromSavedTemplate(w http.ResponseWriter, r *http.Request) { d.Title = docTitle err = p.AddDocument(d) - if err != nil { log.IfErr(tx.Rollback()) writeGeneralSQLError(w, method, err) @@ -455,14 +455,14 @@ func StartDocumentFromSavedTemplate(w http.ResponseWriter, r *http.Request) { log.IfErr(tx.Commit()) newDocument, err := p.GetDocument(documentID) - if err != nil { writeServerError(w, method, err) return } - data, err := json.Marshal(newDocument) + event.Handler().Publish(string(event.TypeAddDocument), newDocument.Title) + data, err := json.Marshal(newDocument) if err != nil { writeJSONMarshalError(w, method, "document", err) return diff --git a/core/api/endpoint/user_endpoint.go b/core/api/endpoint/user_endpoint.go index a870d0ae..c1e29863 100644 --- a/core/api/endpoint/user_endpoint.go +++ b/core/api/endpoint/user_endpoint.go @@ -28,6 +28,7 @@ import ( "github.com/documize/community/core/log" "github.com/documize/community/core/utility" + "github.com/documize/community/core/event" "github.com/gorilla/mux" "strconv" ) @@ -174,6 +175,14 @@ func AddUser(w http.ResponseWriter, r *http.Request) { inviter, err := p.GetUser(p.Context.UserID) log.IfErr(err) + if addUser { + event.Handler().Publish(string(event.TypeAddUser)) + } + + if addAccount { + event.Handler().Publish(string(event.TypeAddAccount)) + } + // Prepare invitation email (that contains SSO link) if addUser && addAccount { size := len(requestedPassword) @@ -386,6 +395,8 @@ func DeleteUser(w http.ResponseWriter, r *http.Request) { log.IfErr(tx.Commit()) + event.Handler().Publish(string(event.TypeRemoveUser)) + writeSuccessString(w, "{}") } diff --git a/core/api/request/account.go b/core/api/request/account.go index c4f91861..968d6e29 100644 --- a/core/api/request/account.go +++ b/core/api/request/account.go @@ -86,6 +86,23 @@ func (p *Persister) GetAccountsByOrg() (t []entity.Account, err error) { return } +// CountOrgAccounts returns the numnber of active user accounts for specified organization. +func (p *Persister) CountOrgAccounts() (c int) { + row := Db.QueryRow("SELECT count(*) FROM account WHERE orgid=? AND active=1", p.Context.OrgID) + + err := row.Scan(&c) + if err != nil && err != sql.ErrNoRows { + log.Error(p.Base.SQLSelectError("CountOrgAccounts", p.Context.OrgID), err) + return 0 + } + + if err == sql.ErrNoRows { + return 0 + } + + return +} + // UpdateAccount updates the database record for the given account to the given values. func (p *Persister) UpdateAccount(account entity.Account) (err error) { account.Revised = time.Now().UTC() diff --git a/core/api/request/user.go b/core/api/request/user.go index f34f8fff..1a2afc51 100644 --- a/core/api/request/user.go +++ b/core/api/request/user.go @@ -301,3 +301,20 @@ func (p *Persister) ForgotUserPassword(email, token string) (err error) { return } + +// CountActiveUsers returns the number of active users in the system. +func CountActiveUsers() (c int) { + row := Db.QueryRow("SELECT count(*) FROM user u WHERE u.refid IN (SELECT userid FROM account WHERE active=1)") + + err := row.Scan(&c) + if err != nil && err != sql.ErrNoRows { + log.Error("CountActiveUsers", err) + return 0 + } + + if err == sql.ErrNoRows { + return 0 + } + + return +} diff --git a/core/event/LICENSE b/core/event/LICENSE new file mode 100755 index 00000000..9603a6f0 --- /dev/null +++ b/core/event/LICENSE @@ -0,0 +1,22 @@ +The MIT License (MIT) + +Copyright (c) 2014 Alex Saskevich + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + diff --git a/core/event/README.md b/core/event/README.md new file mode 100755 index 00000000..acfb10e6 --- /dev/null +++ b/core/event/README.md @@ -0,0 +1,162 @@ +EventBus +====== + +[![GoDoc](https://godoc.org/github.com/asaskevich/EventBus?status.svg)](https://godoc.org/github.com/asaskevich/EventBus) [![Coverage Status](https://img.shields.io/coveralls/asaskevich/EventBus.svg)](https://coveralls.io/r/asaskevich/EventBus?branch=master) [![Build Status](https://travis-ci.org/asaskevich/EventBus.svg)](https://travis-ci.org/asaskevich/EventBus) + +Package EventBus is the little and lightweight eventbus with async compatibility for GoLang. + +#### Installation +Make sure that Go is installed on your computer. +Type the following command in your terminal: + + go get github.com/asaskevich/EventBus + +After it the package is ready to use. + +#### Import package in your project +Add following line in your `*.go` file: +```go +import "github.com/asaskevich/EventBus" +``` +If you unhappy to use long `EventBus`, you can do something like this: +```go +import ( + evbus "github.com/asaskevich/EventBus" +) +``` + +#### Example +```go +func calculator(a int, b int) { + fmt.Printf("%d\n", a + b) +} + +func main() { + bus := EventBus.New(); + bus.Subscribe("main:calculator", calculator); + bus.Publish("main:calculator", 20, 40); + bus.Unsubscribe("main:calculator", calculator); +} +``` + +#### Implemented methods +* **New()** +* **Subscribe()** +* **SubscribeOnce()** +* **HasCallback()** +* **Unsubscribe()** +* **Publish()** +* **SubscribeAsync()** +* **SubscribeOnceAsync()** +* **WaitAsync()** + +#### New() +New returns new EventBus with empty handlers. +```go +bus := EventBus.New(); +``` + +#### Subscribe(topic string, fn interface{}) error +Subscribe to a topic. Returns error if `fn` is not a function. +```go +func Handler() { ... } +... +bus.Subscribe("topic:handler", Handler) +``` + +#### SubscribeOnce(topic string, fn interface{}) error +Subscribe to a topic once. Handler will be removed after executing. Returns error if `fn` is not a function. +```go +func HelloWorld() { ... } +... +bus.SubscribeOnce("topic:handler", HelloWorld) +``` + +#### Unsubscribe(topic string) error +Remove callback defined for a topic. Returns error if there are no callbacks subscribed to the topic. +```go +bus.Unsubscribe("topic:handler", HelloWord); +``` + +#### HasCallback(topic string) bool +Returns true if exists any callback subscribed to the topic. + +#### Publish(topic string, args ...interface{}) +Publish executes callback defined for a topic. Any addional argument will be tranfered to the callback. +```go +func Handler(str string) { ... } +... +bus.Subscribe("topic:handler", Handler) +... +bus.Publish("topic:handler", "Hello, World!"); +``` + +#### SubscribeAsync(topic string, fn interface{}, transactional bool) +Subscribe to a topic with an asyncrhonous callback. Returns error if `fn` is not a function. +```go +func slowCalculator(a, b int) { + time.Sleep(3 * time.Second) + fmt.Printf("%d\n", a + b) +} + +bus := EventBus.New() +bus.SubscribeAsync("main:slow_calculator", slowCalculator, false) + +bus.Publish("main:slow_calculator", 20, 60) + +fmt.Println("start: do some stuff while waiting for a result") +fmt.Println("end: do some stuff while waiting for a result") + +bus.WaitAsync() // wait for all async callbacks to complete + +fmt.Println("do some stuff after waiting for result") +``` +Transactional determines whether subsequent callbacks for a topic are run serially (true) or concurrently(false) + +#### SubscribeOnceAsync(topic string, args ...interface{}) +SubscribeOnceAsync works like SubscribeOnce except the callback to executed asynchronously + +#### WaitAsync() +WaitAsync waits for all async callbacks to complete. + +#### Cross Process Events +Works with two rpc services: +- a client service to listen to remotely published events from a server +- a server service to listen to client subscriptions + +server.go +```go +func main() { + server := NewServer(":2010", "/_server_bus_", New()) + server.Start() + // ... + server.EventBus().Publish("main:calculator", 4, 6) + // ... + server.Stop() +} +``` + +client.go +```go +func main() { + client := NewClient(":2015", "/_client_bus_", New()) + client.Start() + client.Subscribe("main:calculator", calculator, ":2010", "/_server_bus_") + // ... + client.Stop() +} +``` + +#### Notes +Documentation is available here: [godoc.org](https://godoc.org/github.com/asaskevich/EventBus). +Full information about code coverage is also available here: [EventBus on gocover.io](http://gocover.io/github.com/asaskevich/EventBus). + +#### Support +If you do have a contribution for the package feel free to put up a Pull Request or open Issue. + +#### Special thanks to [contributors](https://github.com/asaskevich/EventBus/graphs/contributors) +* [Brian Downs](https://github.com/briandowns) +* [Dominik Schulz](https://github.com/gittex) +* [bennAH](https://github.com/bennAH) +* [John Noble] (https://github.com/gaxunil) +* [Evan Borgstrom] (https://github.com/borgstrom) diff --git a/core/event/client.go b/core/event/client.go new file mode 100755 index 00000000..c5e41193 --- /dev/null +++ b/core/event/client.go @@ -0,0 +1,122 @@ +package event + +import ( + "errors" + "fmt" + "net" + "net/http" + "net/rpc" + "sync" +) + +const ( + // PublishService - Client service method + PublishService = "ClientService.PushEvent" +) + +// ClientArg - object containing event for client to publish locally +type ClientArg struct { + Args []interface{} + Topic string +} + +// Client - object capable of subscribing to a remote event bus +type Client struct { + eventBus Bus + address string + path string + service *ClientService +} + +// NewClient - create a client object with the address and server path +func NewClient(address, path string, eventBus Bus) *Client { + client := new(Client) + client.eventBus = eventBus + client.address = address + client.path = path + client.service = &ClientService{client, &sync.WaitGroup{}, false} + return client +} + +// EventBus - returns the underlying event bus +func (client *Client) EventBus() Bus { + return client.eventBus +} + +func (client *Client) doSubscribe(topic string, fn interface{}, serverAddr, serverPath string, subscribeType SubscribeType) { + defer func() { + if r := recover(); r != nil { + fmt.Println("Server not found -", r) + } + }() + + rpcClient, err := rpc.DialHTTPPath("tcp", serverAddr, serverPath) + defer rpcClient.Close() + if err != nil { + fmt.Errorf("dialing: %v", err) + } + args := &SubscribeArg{client.address, client.path, PublishService, subscribeType, topic} + reply := new(bool) + err = rpcClient.Call(RegisterService, args, reply) + if err != nil { + fmt.Errorf("Register error: %v", err) + } + if *reply { + client.eventBus.Subscribe(topic, fn) + } +} + +//Subscribe subscribes to a topic in a remote event bus +func (client *Client) Subscribe(topic string, fn interface{}, serverAddr, serverPath string) { + client.doSubscribe(topic, fn, serverAddr, serverPath, Subscribe) +} + +//SubscribeOnce subscribes once to a topic in a remote event bus +func (client *Client) SubscribeOnce(topic string, fn interface{}, serverAddr, serverPath string) { + client.doSubscribe(topic, fn, serverAddr, serverPath, SubscribeOnce) +} + +// Start - starts the client service to listen to remote events +func (client *Client) Start() error { + var err error + service := client.service + if !service.started { + server := rpc.NewServer() + server.Register(service) + server.HandleHTTP(client.path, "/debug"+client.path) + l, e := net.Listen("tcp", client.address) + if e != nil { + err = e + fmt.Errorf("listen error: %v", e) + } + service.wg.Add(1) + service.started = true + go http.Serve(l, nil) + } else { + err = errors.New("Client service already started") + } + return err +} + +// Stop - signal for the service to stop serving +func (client *Client) Stop() { + service := client.service + if service.started { + service.wg.Done() + service.started = false + } +} + +// ClientService - service object listening to events published in a remote event bus +type ClientService struct { + client *Client + wg *sync.WaitGroup + started bool +} + +// PushEvent - exported service to listening to remote events +func (service *ClientService) PushEvent(arg *ClientArg, reply *bool) error { + service.client.eventBus.Publish(arg.Topic, arg.Args...) + *reply = true + return nil +} diff --git a/core/event/event_bus.go b/core/event/event_bus.go new file mode 100755 index 00000000..472f3484 --- /dev/null +++ b/core/event/event_bus.go @@ -0,0 +1,207 @@ +package event + +import ( + "fmt" + "reflect" + "sync" +) + +//BusSubscriber defines subscription-related bus behavior +type BusSubscriber interface { + Subscribe(topic string, fn interface{}) error + SubscribeAsync(topic string, fn interface{}, transactional bool) error + SubscribeOnce(topic string, fn interface{}) error + SubscribeOnceAsync(topic string, fn interface{}) error + Unsubscribe(topic string, handler interface{}) error +} + +//BusPublisher defines publishing-related bus behavior +type BusPublisher interface { + Publish(topic string, args ...interface{}) +} + +//BusController defines bus control behavior (checking handler's presence, synchronization) +type BusController interface { + HasCallback(topic string) bool + WaitAsync() +} + +//Bus englobes global (subscribe, publish, control) bus behavior +type Bus interface { + BusController + BusSubscriber + BusPublisher +} + +// EventBus - box for handlers and callbacks. +type EventBus struct { + handlers map[string][]*eventHandler + lock sync.Mutex // a lock for the map + wg sync.WaitGroup +} + +type eventHandler struct { + callBack reflect.Value + flagOnce bool + async bool + transactional bool + sync.Mutex // lock for an event handler - useful for running async callbacks serially +} + +// New returns new EventBus with empty handlers. +func New() Bus { + b := &EventBus{ + make(map[string][]*eventHandler), + sync.Mutex{}, + sync.WaitGroup{}, + } + return Bus(b) +} + +// doSubscribe handles the subscription logic and is utilized by the public Subscribe functions +func (bus *EventBus) doSubscribe(topic string, fn interface{}, handler *eventHandler) error { + bus.lock.Lock() + defer bus.lock.Unlock() + if !(reflect.TypeOf(fn).Kind() == reflect.Func) { + return fmt.Errorf("%s is not of type reflect.Func", reflect.TypeOf(fn).Kind()) + } + bus.handlers[topic] = append(bus.handlers[topic], handler) + return nil +} + +// Subscribe subscribes to a topic. +// Returns error if `fn` is not a function. +func (bus *EventBus) Subscribe(topic string, fn interface{}) error { + return bus.doSubscribe(topic, fn, &eventHandler{ + reflect.ValueOf(fn), false, false, false, sync.Mutex{}, + }) +} + +// SubscribeAsync subscribes to a topic with an asynchronous callback +// Transactional determines whether subsequent callbacks for a topic are +// run serially (true) or concurrently (false) +// Returns error if `fn` is not a function. +func (bus *EventBus) SubscribeAsync(topic string, fn interface{}, transactional bool) error { + return bus.doSubscribe(topic, fn, &eventHandler{ + reflect.ValueOf(fn), false, true, transactional, sync.Mutex{}, + }) +} + +// SubscribeOnce subscribes to a topic once. Handler will be removed after executing. +// Returns error if `fn` is not a function. +func (bus *EventBus) SubscribeOnce(topic string, fn interface{}) error { + return bus.doSubscribe(topic, fn, &eventHandler{ + reflect.ValueOf(fn), true, false, false, sync.Mutex{}, + }) +} + +// SubscribeOnceAsync subscribes to a topic once with an asynchronous callback +// Handler will be removed after executing. +// Returns error if `fn` is not a function. +func (bus *EventBus) SubscribeOnceAsync(topic string, fn interface{}) error { + return bus.doSubscribe(topic, fn, &eventHandler{ + reflect.ValueOf(fn), true, true, false, sync.Mutex{}, + }) +} + +// HasCallback returns true if exists any callback subscribed to the topic. +func (bus *EventBus) HasCallback(topic string) bool { + bus.lock.Lock() + defer bus.lock.Unlock() + _, ok := bus.handlers[topic] + if ok { + return len(bus.handlers[topic]) > 0 + } + return false +} + +// Unsubscribe removes callback defined for a topic. +// Returns error if there are no callbacks subscribed to the topic. +func (bus *EventBus) Unsubscribe(topic string, handler interface{}) error { + bus.lock.Lock() + defer bus.lock.Unlock() + if _, ok := bus.handlers[topic]; ok && len(bus.handlers[topic]) > 0 { + bus.removeHandler(topic, bus.findHandlerIdx(topic, reflect.ValueOf(handler))) + return nil + } + return fmt.Errorf("topic %s doesn't exist", topic) +} + +// Publish executes callback defined for a topic. Any additional argument will be transferred to the callback. +func (bus *EventBus) Publish(topic string, args ...interface{}) { + bus.lock.Lock() // will unlock if handler is not found or always after setUpPublish + defer bus.lock.Unlock() + if handlers, ok := bus.handlers[topic]; ok && 0 < len(handlers) { + // Handlers slice may be changed by removeHandler and Unsubscribe during iteration, + // so make a copy and iterate the copied slice. + copyHandlers := make([]*eventHandler, 0, len(handlers)) + copyHandlers = append(copyHandlers, handlers...) + for i, handler := range copyHandlers { + if handler.flagOnce { + bus.removeHandler(topic, i) + } + if !handler.async { + bus.doPublish(handler, topic, args...) + } else { + bus.wg.Add(1) + if handler.transactional { + handler.Lock() + } + go bus.doPublishAsync(handler, topic, args...) + } + } + } +} + +func (bus *EventBus) doPublish(handler *eventHandler, topic string, args ...interface{}) { + passedArguments := bus.setUpPublish(topic, args...) + handler.callBack.Call(passedArguments) +} + +func (bus *EventBus) doPublishAsync(handler *eventHandler, topic string, args ...interface{}) { + defer bus.wg.Done() + if handler.transactional { + defer handler.Unlock() + } + bus.doPublish(handler, topic, args...) +} + +func (bus *EventBus) removeHandler(topic string, idx int) { + if _, ok := bus.handlers[topic]; !ok { + return + } + l := len(bus.handlers[topic]) + + if !(0 <= idx && idx < l) { + return + } + + copy(bus.handlers[topic][idx:], bus.handlers[topic][idx+1:]) + bus.handlers[topic][l-1] = nil // or the zero value of T + bus.handlers[topic] = bus.handlers[topic][:l-1] +} + +func (bus *EventBus) findHandlerIdx(topic string, callback reflect.Value) int { + if _, ok := bus.handlers[topic]; ok { + for idx, handler := range bus.handlers[topic] { + if handler.callBack == callback { + return idx + } + } + } + return -1 +} + +func (bus *EventBus) setUpPublish(topic string, args ...interface{}) []reflect.Value { + + passedArguments := make([]reflect.Value, 0) + for _, arg := range args { + passedArguments = append(passedArguments, reflect.ValueOf(arg)) + } + return passedArguments +} + +// WaitAsync waits for all async callbacks to complete +func (bus *EventBus) WaitAsync() { + bus.wg.Wait() +} diff --git a/core/event/event_bus_test.go b/core/event/event_bus_test.go new file mode 100755 index 00000000..ab1af017 --- /dev/null +++ b/core/event/event_bus_test.go @@ -0,0 +1,156 @@ +package event + +import ( + "testing" + "time" +) + +func TestNew(t *testing.T) { + bus := New() + if bus == nil { + t.Log("New EventBus not created!") + t.Fail() + } +} + +func TestHasCallback(t *testing.T) { + bus := New() + bus.Subscribe("topic", func() {}) + if bus.HasCallback("topic_topic") { + t.Fail() + } + if !bus.HasCallback("topic") { + t.Fail() + } +} + +func TestSubscribe(t *testing.T) { + bus := New() + if bus.Subscribe("topic", func() {}) != nil { + t.Fail() + } + if bus.Subscribe("topic", "String") == nil { + t.Fail() + } +} + +func TestSubscribeOnce(t *testing.T) { + bus := New() + if bus.SubscribeOnce("topic", func() {}) != nil { + t.Fail() + } + if bus.SubscribeOnce("topic", "String") == nil { + t.Fail() + } +} + +func TestSubscribeOnceAndManySubscribe(t *testing.T) { + bus := New() + event := "topic" + flag := 0 + fn := func() { flag += 1 } + bus.SubscribeOnce(event, fn) + bus.Subscribe(event, fn) + bus.Subscribe(event, fn) + bus.Publish(event) + + if flag != 3 { + t.Fail() + } +} + +func TestUnsubscribe(t *testing.T) { + bus := New() + handler := func() {} + bus.Subscribe("topic", handler) + if bus.Unsubscribe("topic", handler) != nil { + t.Fail() + } + if bus.Unsubscribe("topic", handler) == nil { + t.Fail() + } +} + +func TestPublish(t *testing.T) { + bus := New() + bus.Subscribe("topic", func(a int, b int) { + if a != b { + t.Fail() + } + }) + bus.Publish("topic", 10, 10) +} + +func TestSubcribeOnceAsync(t *testing.T) { + results := make([]int, 0) + + bus := New() + bus.SubscribeOnceAsync("topic", func(a int, out *[]int) { + *out = append(*out, a) + }) + + bus.Publish("topic", 10, &results) + bus.Publish("topic", 10, &results) + + bus.WaitAsync() + + if len(results) != 1 { + t.Fail() + } + + if bus.HasCallback("topic") { + t.Fail() + } +} + +func TestSubscribeAsyncTransactional(t *testing.T) { + results := make([]int, 0) + + bus := New() + bus.SubscribeAsync("topic", func(a int, out *[]int, dur string) { + sleep, _ := time.ParseDuration(dur) + time.Sleep(sleep) + *out = append(*out, a) + }, true) + + bus.Publish("topic", 1, &results, "1s") + bus.Publish("topic", 2, &results, "0s") + + bus.WaitAsync() + + if len(results) != 2 { + t.Fail() + } + + if results[0] != 1 || results[1] != 2 { + t.Fail() + } +} + +func TestSubscribeAsync(t *testing.T) { + results := make(chan int) + + bus := New() + bus.SubscribeAsync("topic", func(a int, out chan<- int) { + out <- a + }, false) + + bus.Publish("topic", 1, results) + bus.Publish("topic", 2, results) + + numResults := 0 + + go func() { + for _ = range results { + numResults++ + } + }() + + bus.WaitAsync() + + time.Sleep(10 * time.Millisecond) + + if numResults != 2 { + t.Fail() + } +} diff --git a/core/event/network_bus.go b/core/event/network_bus.go new file mode 100755 index 00000000..2a77c9b5 --- /dev/null +++ b/core/event/network_bus.go @@ -0,0 +1,76 @@ +package event + +import ( + "errors" + "fmt" + "net" + "net/http" + "net/rpc" + "sync" +) + +// NetworkBus - object capable of subscribing to remote event buses in addition to remote event +// busses subscribing to it's local event bus. Compoed of a server and cliet +type NetworkBus struct { + *Client + *Server + service *NetworkBusService + sharedBus Bus + address string + path string +} + +// NewNetworkBus - returns a new network bus object at the server address and path +func NewNetworkBus(address, path string) *NetworkBus { + bus := new(NetworkBus) + bus.sharedBus = New() + bus.Server = NewServer(address, path, bus.sharedBus) + bus.Client = NewClient(address, path, bus.sharedBus) + bus.service = &NetworkBusService{&sync.WaitGroup{}, false} + bus.address = address + bus.path = path + return bus +} + +// EventBus - returns wrapped event bus +func (networkBus *NetworkBus) EventBus() Bus { + return networkBus.sharedBus +} + +// NetworkBusService - object capable of serving the network bus +type NetworkBusService struct { + wg *sync.WaitGroup + started bool +} + +// Start - helper method to serve a network bus service +func (networkBus *NetworkBus) Start() error { + var err error + service := networkBus.service + clientService := networkBus.Client.service + serverService := networkBus.Server.service + if !service.started { + server := rpc.NewServer() + server.RegisterName("ServerService", serverService) + server.RegisterName("ClientService", clientService) + server.HandleHTTP(networkBus.path, "/debug"+networkBus.path) + l, e := net.Listen("tcp", networkBus.address) + if e != nil { + err = fmt.Errorf("listen error: %v", e) + } + service.wg.Add(1) + go http.Serve(l, nil) + } else { + err = errors.New("Server bus already started") + } + return err +} + +// Stop - signal for the service to stop serving +func (networkBus *NetworkBus) Stop() { + service := networkBus.service + if service.started { + service.wg.Done() + service.started = false + } +} diff --git a/core/event/network_bus_test.go b/core/event/network_bus_test.go new file mode 100755 index 00000000..e0a484e1 --- /dev/null +++ b/core/event/network_bus_test.go @@ -0,0 +1,112 @@ +package event + +import ( + "testing" +) + +func TestNewServer(t *testing.T) { + serverBus := NewServer(":2010", "/_server_bus_", New()) + serverBus.Start() + if serverBus == nil || !serverBus.service.started { + t.Log("New server EventBus not created!") + t.Fail() + } + serverBus.Stop() +} + +func TestNewClient(t *testing.T) { + clientBus := NewClient(":2015", "/_client_bus_", New()) + clientBus.Start() + if clientBus == nil || !clientBus.service.started { + t.Log("New client EventBus not created!") + t.Fail() + } + clientBus.Stop() +} + +func TestRegister(t *testing.T) { + serverPath := "/_server_bus_" + serverBus := NewServer(":2010", serverPath, New()) + + args := &SubscribeArg{serverBus.address, serverPath, PublishService, Subscribe, "topic"} + reply := new(bool) + + serverBus.service.Register(args, reply) + + if serverBus.eventBus.HasCallback("topic_topic") { + t.Fail() + } + if !serverBus.eventBus.HasCallback("topic") { + t.Fail() + } +} + +func TestPushEvent(t *testing.T) { + clientBus := NewClient("localhost:2015", "/_client_bus_", New()) + + eventArgs := make([]interface{}, 1) + eventArgs[0] = 10 + + clientArg := &ClientArg{eventArgs, "topic"} + reply := new(bool) + + fn := func(a int) { + if a != 10 { + t.Fail() + } + } + + clientBus.eventBus.Subscribe("topic", fn) + clientBus.service.PushEvent(clientArg, reply) + if !(*reply) { + t.Fail() + } +} + +func TestServerPublish(t *testing.T) { + serverBus := NewServer(":2020", "/_server_bus_b", New()) + serverBus.Start() + + fn := func(a int) { + if a != 10 { + t.Fail() + } + } + + clientBus := NewClient(":2025", "/_client_bus_b", New()) + clientBus.Start() + + clientBus.Subscribe("topic", fn, ":2010", "/_server_bus_b") + + serverBus.EventBus().Publish("topic", 10) + + clientBus.Stop() + serverBus.Stop() +} + +func TestNetworkBus(t *testing.T) { + networkBusA := NewNetworkBus(":2035", "/_net_bus_A") + networkBusA.Start() + + networkBusB := NewNetworkBus(":2030", "/_net_bus_B") + networkBusB.Start() + + fnA := func(a int) { + if a != 10 { + t.Fail() + } + } + networkBusA.Subscribe("topic-A", fnA, ":2030", "/_net_bus_B") + networkBusB.EventBus().Publish("topic-A", 10) + + fnB := func(a int) { + if a != 20 { + t.Fail() + } + } + networkBusB.Subscribe("topic-B", fnB, ":2035", "/_net_bus_A") + networkBusA.EventBus().Publish("topic-B", 20) + + networkBusA.Stop() + networkBusB.Stop() +} diff --git a/core/event/new.go b/core/event/new.go new file mode 100644 index 00000000..c5c86a9f --- /dev/null +++ b/core/event/new.go @@ -0,0 +1,30 @@ +package event + +// eventBus contains pub/sub +var eventBus Bus + +func init() { + eventBus = New() +} + +// Handler returns the global instance of the event bus +func Handler() Bus { + return eventBus +} + +// Type defines the format of event descriptors +type Type string + +// Valid event types for publication and subscription +const ( + // TypeAddAccount for when account for user is created + TypeAddAccount Type = "ACCOUNT_ADD" + // TypeAddUser for when user is created + TypeAddUser Type = "USER_ADD" + // TypeRemoveUser for when user is deleted + TypeRemoveUser Type = "USER_DELETE" + // TypeAddDocument for when document created + TypeAddDocument Type = "DOCUMENT_ADD" + // TypeSystemLicenseChange for when adin updates license + TypeSystemLicenseChange Type = "LICENSE_CHANGE" +) diff --git a/core/event/server.go b/core/event/server.go new file mode 100755 index 00000000..b8515b10 --- /dev/null +++ b/core/event/server.go @@ -0,0 +1,153 @@ +package event + +import ( + "errors" + "fmt" + "net" + "net/http" + "net/rpc" + "sync" +) + +// SubscribeType - how the client intends to subscribe +type SubscribeType int + +const ( + // Subscribe - subscribe to all events + Subscribe SubscribeType = iota + // SubscribeOnce - subscribe to only one event + SubscribeOnce +) + +const ( + // RegisterService - Server subscribe service method + RegisterService = "ServerService.Register" +) + +// SubscribeArg - object to hold subscribe arguments from remote event handlers +type SubscribeArg struct { + ClientAddr string + ClientPath string + ServiceMethod string + SubscribeType SubscribeType + Topic string +} + +// Server - object capable of being subscribed to by remote handlers +type Server struct { + eventBus Bus + address string + path string + subscribers map[string][]*SubscribeArg + service *ServerService +} + +// NewServer - create a new Server at the address and path +func NewServer(address, path string, eventBus Bus) *Server { + server := new(Server) + server.eventBus = eventBus + server.address = address + server.path = path + server.subscribers = make(map[string][]*SubscribeArg) + server.service = &ServerService{server, &sync.WaitGroup{}, false} + return server +} + +// EventBus - returns wrapped event bus +func (server *Server) EventBus() Bus { + return server.eventBus +} + +func (server *Server) rpcCallback(subscribeArg *SubscribeArg) func(args ...interface{}) { + return func(args ...interface{}) { + client, connErr := rpc.DialHTTPPath("tcp", subscribeArg.ClientAddr, subscribeArg.ClientPath) + defer client.Close() + if connErr != nil { + fmt.Errorf("dialing: %v", connErr) + } + clientArg := new(ClientArg) + clientArg.Topic = subscribeArg.Topic + clientArg.Args = args + var reply bool + err := client.Call(subscribeArg.ServiceMethod, clientArg, &reply) + if err != nil { + fmt.Errorf("dialing: %v", err) + } + } +} + +// HasClientSubscribed - True if a client subscribed to this server with the same topic +func (server *Server) HasClientSubscribed(arg *SubscribeArg) bool { + if topicSubscribers, ok := server.subscribers[arg.Topic]; ok { + for _, topicSubscriber := range topicSubscribers { + if *topicSubscriber == *arg { + return true + } + } + } + return false +} + +// Start - starts a service for remote clients to subscribe to events +func (server *Server) Start() error { + var err error + service := server.service + if !service.started { + rpcServer := rpc.NewServer() + rpcServer.Register(service) + rpcServer.HandleHTTP(server.path, "/debug"+server.path) + l, e := net.Listen("tcp", server.address) + if e != nil { + err = e + fmt.Errorf("listen error: %v", e) + } + service.started = true + service.wg.Add(1) + go http.Serve(l, nil) + } else { + err = errors.New("Server bus already started") + } + return err +} + +// Stop - signal for the service to stop serving +func (server *Server) Stop() { + service := server.service + if service.started { + service.wg.Done() + service.started = false + } +} + +// ServerService - service object to listen to remote subscriptions +type ServerService struct { + server *Server + wg *sync.WaitGroup + started bool +} + +// Register - Registers a remote handler to this event bus +// for a remote subscribe - a given client address only needs to subscribe once +// event will be republished in local event bus +func (service *ServerService) Register(arg *SubscribeArg, success *bool) error { + subscribers := service.server.subscribers + if !service.server.HasClientSubscribed(arg) { + rpcCallback := service.server.rpcCallback(arg) + switch arg.SubscribeType { + case Subscribe: + service.server.eventBus.Subscribe(arg.Topic, rpcCallback) + case SubscribeOnce: + service.server.eventBus.SubscribeOnce(arg.Topic, rpcCallback) + } + var topicSubscribers []*SubscribeArg + if _, ok := subscribers[arg.Topic]; ok { + topicSubscribers = []*SubscribeArg{arg} + } else { + topicSubscribers = subscribers[arg.Topic] + topicSubscribers = append(topicSubscribers, arg) + } + subscribers[arg.Topic] = topicSubscribers + } + *success = true + return nil +}