1
0
Fork 0
mirror of https://github.com/documize/community.git synced 2025-07-19 13:19:43 +02:00

event handlers framework

This commit is contained in:
Harvey Kandola 2017-05-01 12:13:05 +01:00
parent 740c2ca189
commit bcabe494e3
15 changed files with 1091 additions and 5 deletions

View file

@ -110,14 +110,12 @@ func convertDocument(w http.ResponseWriter, r *http.Request, job, folderID strin
// All the commented-out code below should be in following function call
newDocument, err := processDocument(p, filename, job, folderID, fileResult)
if err != nil {
writeServerError(w, method, err)
return
}
json, err := json.Marshal(newDocument)
if err != nil {
writeJSONMarshalError(w, method, "conversion", err)
return

View file

@ -20,6 +20,7 @@ import (
"fmt"
"github.com/documize/community/core/api/request"
"github.com/documize/community/core/api/util"
"github.com/documize/community/core/event"
)
// GetSMTPConfig returns installation-wide SMTP settings
@ -157,6 +158,8 @@ func SaveLicense(w http.ResponseWriter, r *http.Request) {
request.ConfigSet("EDITION-LICENSE", js)
event.Handler().Publish(string(event.TypeSystemLicenseChange))
util.WriteSuccessEmptyJSON(w)
}

View file

@ -28,6 +28,7 @@ import (
"github.com/documize/community/core/api/request"
"github.com/documize/community/core/api/util"
api "github.com/documize/community/core/convapi"
"github.com/documize/community/core/event"
"github.com/documize/community/core/log"
"github.com/documize/community/core/utility"
@ -389,7 +390,6 @@ func StartDocumentFromSavedTemplate(w http.ResponseWriter, r *http.Request) {
d.Title = docTitle
err = p.AddDocument(d)
if err != nil {
log.IfErr(tx.Rollback())
writeGeneralSQLError(w, method, err)
@ -455,14 +455,14 @@ func StartDocumentFromSavedTemplate(w http.ResponseWriter, r *http.Request) {
log.IfErr(tx.Commit())
newDocument, err := p.GetDocument(documentID)
if err != nil {
writeServerError(w, method, err)
return
}
data, err := json.Marshal(newDocument)
event.Handler().Publish(string(event.TypeAddDocument), newDocument.Title)
data, err := json.Marshal(newDocument)
if err != nil {
writeJSONMarshalError(w, method, "document", err)
return

View file

@ -28,6 +28,7 @@ import (
"github.com/documize/community/core/log"
"github.com/documize/community/core/utility"
"github.com/documize/community/core/event"
"github.com/gorilla/mux"
"strconv"
)
@ -174,6 +175,14 @@ func AddUser(w http.ResponseWriter, r *http.Request) {
inviter, err := p.GetUser(p.Context.UserID)
log.IfErr(err)
if addUser {
event.Handler().Publish(string(event.TypeAddUser))
}
if addAccount {
event.Handler().Publish(string(event.TypeAddAccount))
}
// Prepare invitation email (that contains SSO link)
if addUser && addAccount {
size := len(requestedPassword)
@ -386,6 +395,8 @@ func DeleteUser(w http.ResponseWriter, r *http.Request) {
log.IfErr(tx.Commit())
event.Handler().Publish(string(event.TypeRemoveUser))
writeSuccessString(w, "{}")
}

View file

@ -86,6 +86,23 @@ func (p *Persister) GetAccountsByOrg() (t []entity.Account, err error) {
return
}
// CountOrgAccounts returns the numnber of active user accounts for specified organization.
func (p *Persister) CountOrgAccounts() (c int) {
row := Db.QueryRow("SELECT count(*) FROM account WHERE orgid=? AND active=1", p.Context.OrgID)
err := row.Scan(&c)
if err != nil && err != sql.ErrNoRows {
log.Error(p.Base.SQLSelectError("CountOrgAccounts", p.Context.OrgID), err)
return 0
}
if err == sql.ErrNoRows {
return 0
}
return
}
// UpdateAccount updates the database record for the given account to the given values.
func (p *Persister) UpdateAccount(account entity.Account) (err error) {
account.Revised = time.Now().UTC()

View file

@ -301,3 +301,20 @@ func (p *Persister) ForgotUserPassword(email, token string) (err error) {
return
}
// CountActiveUsers returns the number of active users in the system.
func CountActiveUsers() (c int) {
row := Db.QueryRow("SELECT count(*) FROM user u WHERE u.refid IN (SELECT userid FROM account WHERE active=1)")
err := row.Scan(&c)
if err != nil && err != sql.ErrNoRows {
log.Error("CountActiveUsers", err)
return 0
}
if err == sql.ErrNoRows {
return 0
}
return
}

22
core/event/LICENSE Executable file
View file

@ -0,0 +1,22 @@
The MIT License (MIT)
Copyright (c) 2014 Alex Saskevich
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

162
core/event/README.md Executable file
View file

@ -0,0 +1,162 @@
EventBus
======
[![GoDoc](https://godoc.org/github.com/asaskevich/EventBus?status.svg)](https://godoc.org/github.com/asaskevich/EventBus) [![Coverage Status](https://img.shields.io/coveralls/asaskevich/EventBus.svg)](https://coveralls.io/r/asaskevich/EventBus?branch=master) [![Build Status](https://travis-ci.org/asaskevich/EventBus.svg)](https://travis-ci.org/asaskevich/EventBus)
Package EventBus is the little and lightweight eventbus with async compatibility for GoLang.
#### Installation
Make sure that Go is installed on your computer.
Type the following command in your terminal:
go get github.com/asaskevich/EventBus
After it the package is ready to use.
#### Import package in your project
Add following line in your `*.go` file:
```go
import "github.com/asaskevich/EventBus"
```
If you unhappy to use long `EventBus`, you can do something like this:
```go
import (
evbus "github.com/asaskevich/EventBus"
)
```
#### Example
```go
func calculator(a int, b int) {
fmt.Printf("%d\n", a + b)
}
func main() {
bus := EventBus.New();
bus.Subscribe("main:calculator", calculator);
bus.Publish("main:calculator", 20, 40);
bus.Unsubscribe("main:calculator", calculator);
}
```
#### Implemented methods
* **New()**
* **Subscribe()**
* **SubscribeOnce()**
* **HasCallback()**
* **Unsubscribe()**
* **Publish()**
* **SubscribeAsync()**
* **SubscribeOnceAsync()**
* **WaitAsync()**
#### New()
New returns new EventBus with empty handlers.
```go
bus := EventBus.New();
```
#### Subscribe(topic string, fn interface{}) error
Subscribe to a topic. Returns error if `fn` is not a function.
```go
func Handler() { ... }
...
bus.Subscribe("topic:handler", Handler)
```
#### SubscribeOnce(topic string, fn interface{}) error
Subscribe to a topic once. Handler will be removed after executing. Returns error if `fn` is not a function.
```go
func HelloWorld() { ... }
...
bus.SubscribeOnce("topic:handler", HelloWorld)
```
#### Unsubscribe(topic string) error
Remove callback defined for a topic. Returns error if there are no callbacks subscribed to the topic.
```go
bus.Unsubscribe("topic:handler", HelloWord);
```
#### HasCallback(topic string) bool
Returns true if exists any callback subscribed to the topic.
#### Publish(topic string, args ...interface{})
Publish executes callback defined for a topic. Any addional argument will be tranfered to the callback.
```go
func Handler(str string) { ... }
...
bus.Subscribe("topic:handler", Handler)
...
bus.Publish("topic:handler", "Hello, World!");
```
#### SubscribeAsync(topic string, fn interface{}, transactional bool)
Subscribe to a topic with an asyncrhonous callback. Returns error if `fn` is not a function.
```go
func slowCalculator(a, b int) {
time.Sleep(3 * time.Second)
fmt.Printf("%d\n", a + b)
}
bus := EventBus.New()
bus.SubscribeAsync("main:slow_calculator", slowCalculator, false)
bus.Publish("main:slow_calculator", 20, 60)
fmt.Println("start: do some stuff while waiting for a result")
fmt.Println("end: do some stuff while waiting for a result")
bus.WaitAsync() // wait for all async callbacks to complete
fmt.Println("do some stuff after waiting for result")
```
Transactional determines whether subsequent callbacks for a topic are run serially (true) or concurrently(false)
#### SubscribeOnceAsync(topic string, args ...interface{})
SubscribeOnceAsync works like SubscribeOnce except the callback to executed asynchronously
#### WaitAsync()
WaitAsync waits for all async callbacks to complete.
#### Cross Process Events
Works with two rpc services:
- a client service to listen to remotely published events from a server
- a server service to listen to client subscriptions
server.go
```go
func main() {
server := NewServer(":2010", "/_server_bus_", New())
server.Start()
// ...
server.EventBus().Publish("main:calculator", 4, 6)
// ...
server.Stop()
}
```
client.go
```go
func main() {
client := NewClient(":2015", "/_client_bus_", New())
client.Start()
client.Subscribe("main:calculator", calculator, ":2010", "/_server_bus_")
// ...
client.Stop()
}
```
#### Notes
Documentation is available here: [godoc.org](https://godoc.org/github.com/asaskevich/EventBus).
Full information about code coverage is also available here: [EventBus on gocover.io](http://gocover.io/github.com/asaskevich/EventBus).
#### Support
If you do have a contribution for the package feel free to put up a Pull Request or open Issue.
#### Special thanks to [contributors](https://github.com/asaskevich/EventBus/graphs/contributors)
* [Brian Downs](https://github.com/briandowns)
* [Dominik Schulz](https://github.com/gittex)
* [bennAH](https://github.com/bennAH)
* [John Noble] (https://github.com/gaxunil)
* [Evan Borgstrom] (https://github.com/borgstrom)

122
core/event/client.go Executable file
View file

@ -0,0 +1,122 @@
package event
import (
"errors"
"fmt"
"net"
"net/http"
"net/rpc"
"sync"
)
const (
// PublishService - Client service method
PublishService = "ClientService.PushEvent"
)
// ClientArg - object containing event for client to publish locally
type ClientArg struct {
Args []interface{}
Topic string
}
// Client - object capable of subscribing to a remote event bus
type Client struct {
eventBus Bus
address string
path string
service *ClientService
}
// NewClient - create a client object with the address and server path
func NewClient(address, path string, eventBus Bus) *Client {
client := new(Client)
client.eventBus = eventBus
client.address = address
client.path = path
client.service = &ClientService{client, &sync.WaitGroup{}, false}
return client
}
// EventBus - returns the underlying event bus
func (client *Client) EventBus() Bus {
return client.eventBus
}
func (client *Client) doSubscribe(topic string, fn interface{}, serverAddr, serverPath string, subscribeType SubscribeType) {
defer func() {
if r := recover(); r != nil {
fmt.Println("Server not found -", r)
}
}()
rpcClient, err := rpc.DialHTTPPath("tcp", serverAddr, serverPath)
defer rpcClient.Close()
if err != nil {
fmt.Errorf("dialing: %v", err)
}
args := &SubscribeArg{client.address, client.path, PublishService, subscribeType, topic}
reply := new(bool)
err = rpcClient.Call(RegisterService, args, reply)
if err != nil {
fmt.Errorf("Register error: %v", err)
}
if *reply {
client.eventBus.Subscribe(topic, fn)
}
}
//Subscribe subscribes to a topic in a remote event bus
func (client *Client) Subscribe(topic string, fn interface{}, serverAddr, serverPath string) {
client.doSubscribe(topic, fn, serverAddr, serverPath, Subscribe)
}
//SubscribeOnce subscribes once to a topic in a remote event bus
func (client *Client) SubscribeOnce(topic string, fn interface{}, serverAddr, serverPath string) {
client.doSubscribe(topic, fn, serverAddr, serverPath, SubscribeOnce)
}
// Start - starts the client service to listen to remote events
func (client *Client) Start() error {
var err error
service := client.service
if !service.started {
server := rpc.NewServer()
server.Register(service)
server.HandleHTTP(client.path, "/debug"+client.path)
l, e := net.Listen("tcp", client.address)
if e != nil {
err = e
fmt.Errorf("listen error: %v", e)
}
service.wg.Add(1)
service.started = true
go http.Serve(l, nil)
} else {
err = errors.New("Client service already started")
}
return err
}
// Stop - signal for the service to stop serving
func (client *Client) Stop() {
service := client.service
if service.started {
service.wg.Done()
service.started = false
}
}
// ClientService - service object listening to events published in a remote event bus
type ClientService struct {
client *Client
wg *sync.WaitGroup
started bool
}
// PushEvent - exported service to listening to remote events
func (service *ClientService) PushEvent(arg *ClientArg, reply *bool) error {
service.client.eventBus.Publish(arg.Topic, arg.Args...)
*reply = true
return nil
}

207
core/event/event_bus.go Executable file
View file

@ -0,0 +1,207 @@
package event
import (
"fmt"
"reflect"
"sync"
)
//BusSubscriber defines subscription-related bus behavior
type BusSubscriber interface {
Subscribe(topic string, fn interface{}) error
SubscribeAsync(topic string, fn interface{}, transactional bool) error
SubscribeOnce(topic string, fn interface{}) error
SubscribeOnceAsync(topic string, fn interface{}) error
Unsubscribe(topic string, handler interface{}) error
}
//BusPublisher defines publishing-related bus behavior
type BusPublisher interface {
Publish(topic string, args ...interface{})
}
//BusController defines bus control behavior (checking handler's presence, synchronization)
type BusController interface {
HasCallback(topic string) bool
WaitAsync()
}
//Bus englobes global (subscribe, publish, control) bus behavior
type Bus interface {
BusController
BusSubscriber
BusPublisher
}
// EventBus - box for handlers and callbacks.
type EventBus struct {
handlers map[string][]*eventHandler
lock sync.Mutex // a lock for the map
wg sync.WaitGroup
}
type eventHandler struct {
callBack reflect.Value
flagOnce bool
async bool
transactional bool
sync.Mutex // lock for an event handler - useful for running async callbacks serially
}
// New returns new EventBus with empty handlers.
func New() Bus {
b := &EventBus{
make(map[string][]*eventHandler),
sync.Mutex{},
sync.WaitGroup{},
}
return Bus(b)
}
// doSubscribe handles the subscription logic and is utilized by the public Subscribe functions
func (bus *EventBus) doSubscribe(topic string, fn interface{}, handler *eventHandler) error {
bus.lock.Lock()
defer bus.lock.Unlock()
if !(reflect.TypeOf(fn).Kind() == reflect.Func) {
return fmt.Errorf("%s is not of type reflect.Func", reflect.TypeOf(fn).Kind())
}
bus.handlers[topic] = append(bus.handlers[topic], handler)
return nil
}
// Subscribe subscribes to a topic.
// Returns error if `fn` is not a function.
func (bus *EventBus) Subscribe(topic string, fn interface{}) error {
return bus.doSubscribe(topic, fn, &eventHandler{
reflect.ValueOf(fn), false, false, false, sync.Mutex{},
})
}
// SubscribeAsync subscribes to a topic with an asynchronous callback
// Transactional determines whether subsequent callbacks for a topic are
// run serially (true) or concurrently (false)
// Returns error if `fn` is not a function.
func (bus *EventBus) SubscribeAsync(topic string, fn interface{}, transactional bool) error {
return bus.doSubscribe(topic, fn, &eventHandler{
reflect.ValueOf(fn), false, true, transactional, sync.Mutex{},
})
}
// SubscribeOnce subscribes to a topic once. Handler will be removed after executing.
// Returns error if `fn` is not a function.
func (bus *EventBus) SubscribeOnce(topic string, fn interface{}) error {
return bus.doSubscribe(topic, fn, &eventHandler{
reflect.ValueOf(fn), true, false, false, sync.Mutex{},
})
}
// SubscribeOnceAsync subscribes to a topic once with an asynchronous callback
// Handler will be removed after executing.
// Returns error if `fn` is not a function.
func (bus *EventBus) SubscribeOnceAsync(topic string, fn interface{}) error {
return bus.doSubscribe(topic, fn, &eventHandler{
reflect.ValueOf(fn), true, true, false, sync.Mutex{},
})
}
// HasCallback returns true if exists any callback subscribed to the topic.
func (bus *EventBus) HasCallback(topic string) bool {
bus.lock.Lock()
defer bus.lock.Unlock()
_, ok := bus.handlers[topic]
if ok {
return len(bus.handlers[topic]) > 0
}
return false
}
// Unsubscribe removes callback defined for a topic.
// Returns error if there are no callbacks subscribed to the topic.
func (bus *EventBus) Unsubscribe(topic string, handler interface{}) error {
bus.lock.Lock()
defer bus.lock.Unlock()
if _, ok := bus.handlers[topic]; ok && len(bus.handlers[topic]) > 0 {
bus.removeHandler(topic, bus.findHandlerIdx(topic, reflect.ValueOf(handler)))
return nil
}
return fmt.Errorf("topic %s doesn't exist", topic)
}
// Publish executes callback defined for a topic. Any additional argument will be transferred to the callback.
func (bus *EventBus) Publish(topic string, args ...interface{}) {
bus.lock.Lock() // will unlock if handler is not found or always after setUpPublish
defer bus.lock.Unlock()
if handlers, ok := bus.handlers[topic]; ok && 0 < len(handlers) {
// Handlers slice may be changed by removeHandler and Unsubscribe during iteration,
// so make a copy and iterate the copied slice.
copyHandlers := make([]*eventHandler, 0, len(handlers))
copyHandlers = append(copyHandlers, handlers...)
for i, handler := range copyHandlers {
if handler.flagOnce {
bus.removeHandler(topic, i)
}
if !handler.async {
bus.doPublish(handler, topic, args...)
} else {
bus.wg.Add(1)
if handler.transactional {
handler.Lock()
}
go bus.doPublishAsync(handler, topic, args...)
}
}
}
}
func (bus *EventBus) doPublish(handler *eventHandler, topic string, args ...interface{}) {
passedArguments := bus.setUpPublish(topic, args...)
handler.callBack.Call(passedArguments)
}
func (bus *EventBus) doPublishAsync(handler *eventHandler, topic string, args ...interface{}) {
defer bus.wg.Done()
if handler.transactional {
defer handler.Unlock()
}
bus.doPublish(handler, topic, args...)
}
func (bus *EventBus) removeHandler(topic string, idx int) {
if _, ok := bus.handlers[topic]; !ok {
return
}
l := len(bus.handlers[topic])
if !(0 <= idx && idx < l) {
return
}
copy(bus.handlers[topic][idx:], bus.handlers[topic][idx+1:])
bus.handlers[topic][l-1] = nil // or the zero value of T
bus.handlers[topic] = bus.handlers[topic][:l-1]
}
func (bus *EventBus) findHandlerIdx(topic string, callback reflect.Value) int {
if _, ok := bus.handlers[topic]; ok {
for idx, handler := range bus.handlers[topic] {
if handler.callBack == callback {
return idx
}
}
}
return -1
}
func (bus *EventBus) setUpPublish(topic string, args ...interface{}) []reflect.Value {
passedArguments := make([]reflect.Value, 0)
for _, arg := range args {
passedArguments = append(passedArguments, reflect.ValueOf(arg))
}
return passedArguments
}
// WaitAsync waits for all async callbacks to complete
func (bus *EventBus) WaitAsync() {
bus.wg.Wait()
}

156
core/event/event_bus_test.go Executable file
View file

@ -0,0 +1,156 @@
package event
import (
"testing"
"time"
)
func TestNew(t *testing.T) {
bus := New()
if bus == nil {
t.Log("New EventBus not created!")
t.Fail()
}
}
func TestHasCallback(t *testing.T) {
bus := New()
bus.Subscribe("topic", func() {})
if bus.HasCallback("topic_topic") {
t.Fail()
}
if !bus.HasCallback("topic") {
t.Fail()
}
}
func TestSubscribe(t *testing.T) {
bus := New()
if bus.Subscribe("topic", func() {}) != nil {
t.Fail()
}
if bus.Subscribe("topic", "String") == nil {
t.Fail()
}
}
func TestSubscribeOnce(t *testing.T) {
bus := New()
if bus.SubscribeOnce("topic", func() {}) != nil {
t.Fail()
}
if bus.SubscribeOnce("topic", "String") == nil {
t.Fail()
}
}
func TestSubscribeOnceAndManySubscribe(t *testing.T) {
bus := New()
event := "topic"
flag := 0
fn := func() { flag += 1 }
bus.SubscribeOnce(event, fn)
bus.Subscribe(event, fn)
bus.Subscribe(event, fn)
bus.Publish(event)
if flag != 3 {
t.Fail()
}
}
func TestUnsubscribe(t *testing.T) {
bus := New()
handler := func() {}
bus.Subscribe("topic", handler)
if bus.Unsubscribe("topic", handler) != nil {
t.Fail()
}
if bus.Unsubscribe("topic", handler) == nil {
t.Fail()
}
}
func TestPublish(t *testing.T) {
bus := New()
bus.Subscribe("topic", func(a int, b int) {
if a != b {
t.Fail()
}
})
bus.Publish("topic", 10, 10)
}
func TestSubcribeOnceAsync(t *testing.T) {
results := make([]int, 0)
bus := New()
bus.SubscribeOnceAsync("topic", func(a int, out *[]int) {
*out = append(*out, a)
})
bus.Publish("topic", 10, &results)
bus.Publish("topic", 10, &results)
bus.WaitAsync()
if len(results) != 1 {
t.Fail()
}
if bus.HasCallback("topic") {
t.Fail()
}
}
func TestSubscribeAsyncTransactional(t *testing.T) {
results := make([]int, 0)
bus := New()
bus.SubscribeAsync("topic", func(a int, out *[]int, dur string) {
sleep, _ := time.ParseDuration(dur)
time.Sleep(sleep)
*out = append(*out, a)
}, true)
bus.Publish("topic", 1, &results, "1s")
bus.Publish("topic", 2, &results, "0s")
bus.WaitAsync()
if len(results) != 2 {
t.Fail()
}
if results[0] != 1 || results[1] != 2 {
t.Fail()
}
}
func TestSubscribeAsync(t *testing.T) {
results := make(chan int)
bus := New()
bus.SubscribeAsync("topic", func(a int, out chan<- int) {
out <- a
}, false)
bus.Publish("topic", 1, results)
bus.Publish("topic", 2, results)
numResults := 0
go func() {
for _ = range results {
numResults++
}
}()
bus.WaitAsync()
time.Sleep(10 * time.Millisecond)
if numResults != 2 {
t.Fail()
}
}

76
core/event/network_bus.go Executable file
View file

@ -0,0 +1,76 @@
package event
import (
"errors"
"fmt"
"net"
"net/http"
"net/rpc"
"sync"
)
// NetworkBus - object capable of subscribing to remote event buses in addition to remote event
// busses subscribing to it's local event bus. Compoed of a server and cliet
type NetworkBus struct {
*Client
*Server
service *NetworkBusService
sharedBus Bus
address string
path string
}
// NewNetworkBus - returns a new network bus object at the server address and path
func NewNetworkBus(address, path string) *NetworkBus {
bus := new(NetworkBus)
bus.sharedBus = New()
bus.Server = NewServer(address, path, bus.sharedBus)
bus.Client = NewClient(address, path, bus.sharedBus)
bus.service = &NetworkBusService{&sync.WaitGroup{}, false}
bus.address = address
bus.path = path
return bus
}
// EventBus - returns wrapped event bus
func (networkBus *NetworkBus) EventBus() Bus {
return networkBus.sharedBus
}
// NetworkBusService - object capable of serving the network bus
type NetworkBusService struct {
wg *sync.WaitGroup
started bool
}
// Start - helper method to serve a network bus service
func (networkBus *NetworkBus) Start() error {
var err error
service := networkBus.service
clientService := networkBus.Client.service
serverService := networkBus.Server.service
if !service.started {
server := rpc.NewServer()
server.RegisterName("ServerService", serverService)
server.RegisterName("ClientService", clientService)
server.HandleHTTP(networkBus.path, "/debug"+networkBus.path)
l, e := net.Listen("tcp", networkBus.address)
if e != nil {
err = fmt.Errorf("listen error: %v", e)
}
service.wg.Add(1)
go http.Serve(l, nil)
} else {
err = errors.New("Server bus already started")
}
return err
}
// Stop - signal for the service to stop serving
func (networkBus *NetworkBus) Stop() {
service := networkBus.service
if service.started {
service.wg.Done()
service.started = false
}
}

112
core/event/network_bus_test.go Executable file
View file

@ -0,0 +1,112 @@
package event
import (
"testing"
)
func TestNewServer(t *testing.T) {
serverBus := NewServer(":2010", "/_server_bus_", New())
serverBus.Start()
if serverBus == nil || !serverBus.service.started {
t.Log("New server EventBus not created!")
t.Fail()
}
serverBus.Stop()
}
func TestNewClient(t *testing.T) {
clientBus := NewClient(":2015", "/_client_bus_", New())
clientBus.Start()
if clientBus == nil || !clientBus.service.started {
t.Log("New client EventBus not created!")
t.Fail()
}
clientBus.Stop()
}
func TestRegister(t *testing.T) {
serverPath := "/_server_bus_"
serverBus := NewServer(":2010", serverPath, New())
args := &SubscribeArg{serverBus.address, serverPath, PublishService, Subscribe, "topic"}
reply := new(bool)
serverBus.service.Register(args, reply)
if serverBus.eventBus.HasCallback("topic_topic") {
t.Fail()
}
if !serverBus.eventBus.HasCallback("topic") {
t.Fail()
}
}
func TestPushEvent(t *testing.T) {
clientBus := NewClient("localhost:2015", "/_client_bus_", New())
eventArgs := make([]interface{}, 1)
eventArgs[0] = 10
clientArg := &ClientArg{eventArgs, "topic"}
reply := new(bool)
fn := func(a int) {
if a != 10 {
t.Fail()
}
}
clientBus.eventBus.Subscribe("topic", fn)
clientBus.service.PushEvent(clientArg, reply)
if !(*reply) {
t.Fail()
}
}
func TestServerPublish(t *testing.T) {
serverBus := NewServer(":2020", "/_server_bus_b", New())
serverBus.Start()
fn := func(a int) {
if a != 10 {
t.Fail()
}
}
clientBus := NewClient(":2025", "/_client_bus_b", New())
clientBus.Start()
clientBus.Subscribe("topic", fn, ":2010", "/_server_bus_b")
serverBus.EventBus().Publish("topic", 10)
clientBus.Stop()
serverBus.Stop()
}
func TestNetworkBus(t *testing.T) {
networkBusA := NewNetworkBus(":2035", "/_net_bus_A")
networkBusA.Start()
networkBusB := NewNetworkBus(":2030", "/_net_bus_B")
networkBusB.Start()
fnA := func(a int) {
if a != 10 {
t.Fail()
}
}
networkBusA.Subscribe("topic-A", fnA, ":2030", "/_net_bus_B")
networkBusB.EventBus().Publish("topic-A", 10)
fnB := func(a int) {
if a != 20 {
t.Fail()
}
}
networkBusB.Subscribe("topic-B", fnB, ":2035", "/_net_bus_A")
networkBusA.EventBus().Publish("topic-B", 20)
networkBusA.Stop()
networkBusB.Stop()
}

30
core/event/new.go Normal file
View file

@ -0,0 +1,30 @@
package event
// eventBus contains pub/sub
var eventBus Bus
func init() {
eventBus = New()
}
// Handler returns the global instance of the event bus
func Handler() Bus {
return eventBus
}
// Type defines the format of event descriptors
type Type string
// Valid event types for publication and subscription
const (
// TypeAddAccount for when account for user is created
TypeAddAccount Type = "ACCOUNT_ADD"
// TypeAddUser for when user is created
TypeAddUser Type = "USER_ADD"
// TypeRemoveUser for when user is deleted
TypeRemoveUser Type = "USER_DELETE"
// TypeAddDocument for when document created
TypeAddDocument Type = "DOCUMENT_ADD"
// TypeSystemLicenseChange for when adin updates license
TypeSystemLicenseChange Type = "LICENSE_CHANGE"
)

153
core/event/server.go Executable file
View file

@ -0,0 +1,153 @@
package event
import (
"errors"
"fmt"
"net"
"net/http"
"net/rpc"
"sync"
)
// SubscribeType - how the client intends to subscribe
type SubscribeType int
const (
// Subscribe - subscribe to all events
Subscribe SubscribeType = iota
// SubscribeOnce - subscribe to only one event
SubscribeOnce
)
const (
// RegisterService - Server subscribe service method
RegisterService = "ServerService.Register"
)
// SubscribeArg - object to hold subscribe arguments from remote event handlers
type SubscribeArg struct {
ClientAddr string
ClientPath string
ServiceMethod string
SubscribeType SubscribeType
Topic string
}
// Server - object capable of being subscribed to by remote handlers
type Server struct {
eventBus Bus
address string
path string
subscribers map[string][]*SubscribeArg
service *ServerService
}
// NewServer - create a new Server at the address and path
func NewServer(address, path string, eventBus Bus) *Server {
server := new(Server)
server.eventBus = eventBus
server.address = address
server.path = path
server.subscribers = make(map[string][]*SubscribeArg)
server.service = &ServerService{server, &sync.WaitGroup{}, false}
return server
}
// EventBus - returns wrapped event bus
func (server *Server) EventBus() Bus {
return server.eventBus
}
func (server *Server) rpcCallback(subscribeArg *SubscribeArg) func(args ...interface{}) {
return func(args ...interface{}) {
client, connErr := rpc.DialHTTPPath("tcp", subscribeArg.ClientAddr, subscribeArg.ClientPath)
defer client.Close()
if connErr != nil {
fmt.Errorf("dialing: %v", connErr)
}
clientArg := new(ClientArg)
clientArg.Topic = subscribeArg.Topic
clientArg.Args = args
var reply bool
err := client.Call(subscribeArg.ServiceMethod, clientArg, &reply)
if err != nil {
fmt.Errorf("dialing: %v", err)
}
}
}
// HasClientSubscribed - True if a client subscribed to this server with the same topic
func (server *Server) HasClientSubscribed(arg *SubscribeArg) bool {
if topicSubscribers, ok := server.subscribers[arg.Topic]; ok {
for _, topicSubscriber := range topicSubscribers {
if *topicSubscriber == *arg {
return true
}
}
}
return false
}
// Start - starts a service for remote clients to subscribe to events
func (server *Server) Start() error {
var err error
service := server.service
if !service.started {
rpcServer := rpc.NewServer()
rpcServer.Register(service)
rpcServer.HandleHTTP(server.path, "/debug"+server.path)
l, e := net.Listen("tcp", server.address)
if e != nil {
err = e
fmt.Errorf("listen error: %v", e)
}
service.started = true
service.wg.Add(1)
go http.Serve(l, nil)
} else {
err = errors.New("Server bus already started")
}
return err
}
// Stop - signal for the service to stop serving
func (server *Server) Stop() {
service := server.service
if service.started {
service.wg.Done()
service.started = false
}
}
// ServerService - service object to listen to remote subscriptions
type ServerService struct {
server *Server
wg *sync.WaitGroup
started bool
}
// Register - Registers a remote handler to this event bus
// for a remote subscribe - a given client address only needs to subscribe once
// event will be republished in local event bus
func (service *ServerService) Register(arg *SubscribeArg, success *bool) error {
subscribers := service.server.subscribers
if !service.server.HasClientSubscribed(arg) {
rpcCallback := service.server.rpcCallback(arg)
switch arg.SubscribeType {
case Subscribe:
service.server.eventBus.Subscribe(arg.Topic, rpcCallback)
case SubscribeOnce:
service.server.eventBus.SubscribeOnce(arg.Topic, rpcCallback)
}
var topicSubscribers []*SubscribeArg
if _, ok := subscribers[arg.Topic]; ok {
topicSubscribers = []*SubscribeArg{arg}
} else {
topicSubscribers = subscribers[arg.Topic]
topicSubscribers = append(topicSubscribers, arg)
}
subscribers[arg.Topic] = topicSubscribers
}
*success = true
return nil
}