'添加反射'

This commit is contained in:
朱毅骏 2021-04-30 18:06:40 +08:00
commit f155a75deb
73 changed files with 4174 additions and 0 deletions

8
.idea/.gitignore vendored Normal file
View File

@ -0,0 +1,8 @@
# Default ignored files
/shelf/
/workspace.xml
# Datasource local storage ignored files
/../../../../../:\GoPATH\src\study\.idea/dataSources/
/dataSources.local.xml
# Editor-based HTTP Client requests
/httpRequests/

8
.idea/modules.xml Normal file
View File

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/study.iml" filepath="$PROJECT_DIR$/.idea/study.iml" />
</modules>
</component>
</project>

9
.idea/study.iml Normal file
View File

@ -0,0 +1,9 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="WEB_MODULE" version="4">
<component name="Go" enabled="true" />
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

30
AtomicCounter/main.go Normal file
View File

@ -0,0 +1,30 @@
package main
import (
"fmt"
"sync"
"sync/atomic"
)
func main() {
var ops uint64
var wg sync.WaitGroup
for i := 0; i < 50; i++ {
wg.Add(1)
go func() {
for c := 0; c < 1000; c++ {
atomic.AddUint64(&ops, 1)
}
wg.Done()
}()
}
wg.Wait()
fmt.Println("ops:", ops)
}

View File

@ -0,0 +1 @@
package FLAG从命令行获取参数

28
Ticker/main.go Normal file
View File

@ -0,0 +1,28 @@
package main
import (
"fmt"
"time"
)
//打点器
func main() {
ticker := time.NewTicker(500 * time.Millisecond)
done := make(chan bool)
go func() {
for {
select {
case <-done:
return
case t := <-ticker.C:
fmt.Println("Tick at", t)
}
}
}()
time.Sleep(2400 * time.Millisecond)
ticker.Stop()
done <- true
fmt.Println("Ticker stopped")
}

43
bursts/main.go Normal file
View File

@ -0,0 +1,43 @@
package main
import (
"fmt"
"time"
)
func main() {
requests := make(chan int, 5)
for i := 1; i <= 5; i++ {
requests <- i
}
close(requests)
limiter := time.Tick(200 * time.Millisecond)
for req := range requests {
<-limiter
fmt.Println("request", req, time.Now())
}
burstyLimiter := make(chan time.Time, 3)
for i := 0; i < 3; i++ {
burstyLimiter <- time.Now()
}
go func() {
for t := range time.Tick(200 * time.Millisecond) {
burstyLimiter <- t
}
}()
burstyRequests := make(chan int, 5)
for i := 1; i <= 5; i++ {
burstyRequests <- i
}
close(burstyRequests)
for req := range burstyRequests {
<-burstyLimiter
fmt.Println("request", req, time.Now())
}
}

34
chan/1/chan1.go Normal file
View File

@ -0,0 +1,34 @@
package main
import (
"fmt"
"time"
)
func main() {
c1 := make(chan string, 1)
go func() {
time.Sleep(2 * time.Second)
c1 <- "result 1"
}()
select {
case res := <-c1:
fmt.Println(res)
case <-time.After(1 * time.Second):
fmt.Println("timeout 1")
}
c2 := make(chan string, 1)
go func() {
time.Sleep(2 * time.Second)
c2 <- "result 2"
}()
select {
case res := <-c2:
fmt.Println(res)
case <-time.After(3 * time.Second):
fmt.Println("timeout 2")
}
}

30
chan/2/chan2.go Normal file
View File

@ -0,0 +1,30 @@
package main
import (
"fmt"
"time"
)
func main() {
c1 := make(chan string)
c2 := make(chan string)
go func() {
time.Sleep(1 * time.Second)
c1 <- "one"
}()
go func() {
time.Sleep(2 * time.Second)
c2 <- "two"
}()
for i := 0; i < 2; i++ {
select {
case msg1 := <-c1:
fmt.Println("received", msg1)
case msg2 := <-c2:
fmt.Println("received", msg2)
}
}
}

76
chan/3/chan3.go Normal file
View File

@ -0,0 +1,76 @@
package main
/*
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
void output(char *str) {
usleep(1000000);
printf("%s\n", str);
}
void aa() {
printf("你好");
}
int a(int b) {
return b==1?2:3;
}
*/
import "C"
import (
"net/http"
"unsafe"
"log"
_ "net/http/pprof"
"runtime"
"sync"
)
func init() {
go http.ListenAndServe(":9999", nil)
}
func main() {
for i := 0; i < 1000; i++ {
go func() {
str := "hello cgo"
//change to char*
cstr := C.CString(str)
C.output(cstr)
C.free(unsafe.Pointer(cstr))
C.aa()
println(C.a(0))
}()
}
killThreadService()
select {}
}
func sayhello(wr http.ResponseWriter, r *http.Request) {
KillOne()
}
func killThreadService() {
http.HandleFunc("/", sayhello)
err := http.ListenAndServe(":10003", nil)
if err != nil {
log.Fatal("ListenAndServe:", err)
}
}
// KillOne kills a thread
func KillOne() {
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
runtime.LockOSThread()
return
}()
wg.Wait()
}

32
chan/chan.go Normal file
View File

@ -0,0 +1,32 @@
package main
import (
`fmt`
`time`
)
func worker(done chan bool) {
fmt.Print("working...")
time.Sleep(time.Second)
fmt.Println("done")
//发送一个值来通知我们已经完工啦。
done <- true
}
func main() {
//缓冲
msg := make(chan string, 2)
msg <- "adad"
msg <- "55"
println(<-msg)
msg <- "222"
println(<-msg)
println(<-msg)
done := make(chan bool, 1)
go worker(done)
<-done
}

35
chan/jobs/jobs.go Normal file
View File

@ -0,0 +1,35 @@
package main
import (
`fmt`
`time`
)
func main() {
jobs := make(chan int, 5)
done := make(chan bool)
go func() {
for {
j, more := <-jobs
if more {
fmt.Println("received job", j)
} else {
fmt.Println("received all jobs")
done <- true
return
}
}
}()
for j := 1; j <= 3; j++ {
jobs <- j
fmt.Println("sent job", j)
time.Sleep(1*time.Second)
}
close(jobs)
fmt.Println("sent all jobs")
<-done
}

1
demo/hello/main.go Normal file
View File

@ -0,0 +1 @@
package hello

23
func/func.go Normal file
View File

@ -0,0 +1,23 @@
package main
import (
"fmt"
`time`
)
type student struct {
id int
name string
}
func Tostu(stu *student) {
fmt.Println(stu.name,stu.id);
}
func main() {
s2 := student{313, "张三"}
s1 := student{313, "张三"}
Tostu(&s1)
go func(stu *student) {
fmt.Println(stu.name,stu.id);
}(&s2)
time.Sleep(time.Second)
}

8
go.mod Normal file
View File

@ -0,0 +1,8 @@
module study
go 1.16
require (
github.com/smartystreets/goconvey v1.6.4
github.com/tidwall/gjson v1.6.8
)

19
go.sum Normal file
View File

@ -0,0 +1,19 @@
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s=
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/tidwall/gjson v1.6.8 h1:CTmXMClGYPAmln7652e69B7OLXfTi5ABcPPwjIWUv7w=
github.com/tidwall/gjson v1.6.8/go.mod h1:zeFuBCIqD4sN/gmqBzZ4j7Jd6UcA2Fc56x7QFsv+8fI=
github.com/tidwall/match v1.0.3 h1:FQUVvBImDutD8wJLN6c5eMzWtjgONK9MwIBCOrUJKeE=
github.com/tidwall/match v1.0.3/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/pretty v1.0.2 h1:Z7S3cePv9Jwm1KwS0513MRaoUe3S01WPbLNV40pwWZU=
github.com/tidwall/pretty v1.0.2/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=

37
hello.go Normal file
View File

@ -0,0 +1,37 @@
package main
import (
"fmt"
"github.com/tidwall/gjson"
)
func main() {
exampleJsonString := `{
"code":"000",
"data":{
"all_count":441353,
"lists":[
{
"id":441353,
"job_name":"经营日报-同步职位信息",
"job_recall_time":"2021-03-13 15:05:04",
"job_recall_content":"请求成功great",
"create_time":"2021-03-13 15:05:04"
},
{
"id":441352,
"job_name":"经营日报-Check张继学列表",
"job_recall_time":"2021-03-13 15:05:00",
"job_recall_content":"请求成功OK",
"create_time":"2021-03-13 15:05:00"
}
]
},
"msg":"获取列表成功",
"success":true
}`
jsonCode := gjson.Get(exampleJsonString, "code") //这个后面你可以继续跟想要的结果类型不加的话会是json字符串的类型
fmt.Println(jsonCode) // 结果 000
jsonOneJobName := gjson.Get(exampleJsonString, "data.lists.#.job_name").Array() //比如我这里就希望返回是一个切片类型
fmt.Println(jsonOneJobName) // 结果 [经营日报-同步职位信息 经营日报-Check张继学列表]
}

1
list/main.go Normal file
View File

@ -0,0 +1 @@
package list

60
mutex/main.go Normal file
View File

@ -0,0 +1,60 @@
package main
import (
"fmt"
"math/rand"
"sync"
"sync/atomic"
"time"
)
func main() {
var state = make(map[int]int)
var mutex = &sync.Mutex{}
var readOps uint64
var writeOps uint64
for r := 0; r < 100; r++ {
go func() {
total := 0
for {
key := rand.Intn(5)
mutex.Lock()
total += state[key]
mutex.Unlock()
atomic.AddUint64(&readOps, 1)
time.Sleep(time.Millisecond)
}
}()
}
for w := 0; w < 10; w++ {
go func() {
for {
key := rand.Intn(5)
val := rand.Intn(100)
mutex.Lock()
state[key] = val
mutex.Unlock()
atomic.AddUint64(&writeOps, 1)
time.Sleep(time.Millisecond)
}
}()
}
time.Sleep(time.Second)
readOpsFinal := atomic.LoadUint64(&readOps)
fmt.Println("readOps:", readOpsFinal)
writeOpsFinal := atomic.LoadUint64(&writeOps)
fmt.Println("writeOps:", writeOpsFinal)
mutex.Lock()
fmt.Println("state:", state)
mutex.Unlock()
}

36
pools/main.go Normal file
View File

@ -0,0 +1,36 @@
package main
import (
"fmt"
"time"
)
func worker(id int, jobs <-chan int, results chan<- int) {
for j := range jobs {
fmt.Println("worker", id, "started job", j)
time.Sleep(time.Second)
fmt.Println("worker", id, "finished job", j)
results <- j * 2
}
}
func main() {
//通道缓存五个
const numJobs = 5
jobs := make(chan int, numJobs)
results := make(chan int, numJobs)
for w := 1; w <= 3; w++ {
go worker(w, jobs, results)
}
for j := 1; j <= numJobs; j++ {
jobs <- j
}
close(jobs)
for a := 1; a <= numJobs; a++ {
println(<-results)
}
}

26
timer/main.go Normal file
View File

@ -0,0 +1,26 @@
package main
import (
"fmt"
"time"
)
func main() {
timer1 := time.NewTimer(2 * time.Second)
<-timer1.C
fmt.Println("Timer 1 fired")
timer2 := time.NewTimer(time.Second)
go func() {
<-timer2.C
fmt.Println("Timer 2 fired")
}()
stop2 := timer2.Stop()
if stop2 {
fmt.Println("Timer 2 stopped")
}
time.Sleep(2 * time.Second)
}

29
wait/main.go Normal file
View File

@ -0,0 +1,29 @@
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("Worker %d starting\n", id)
time.Sleep(time.Second)
fmt.Printf("Worker %d done\n", id)
}
func main() {
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
wg.Add(1)
go worker(i, &wg)
}
wg.Wait()
}

1
内存分配/main.go Normal file
View File

@ -0,0 +1 @@
package 内存分配

35
匿名字段/main.go Normal file
View File

@ -0,0 +1,35 @@
package main
import (
"fmt"
"sync"
"time"
)
type data struct {
sync.Mutex
buf [1024]byte
count int
}
func main() {
d := data{}
d.count =1
go func() {
for i:= 0;i<3;i++ {
d.Lock()
d.count++
fmt.Printf("第一个任务%d \n",d.count)
d.Unlock()
}
}()
go func() {
for i:= 0;i<3;i++ {
d.Lock()
d.count++
fmt.Printf("第二个任务%d \n",d.count)
d.Unlock()
}
}()
time.Sleep(time.Second *6)
}

27
匿名接口/main.go Normal file
View File

@ -0,0 +1,27 @@
package main
//结构体
type data struct {}
//方法data.string()
func (data) string() string{
return "data"
}
//ndoe 结构体
type node struct {
//定义匿名接口变量
data interface{
string() string
}
}
func main() {
var t interface{
string() string
} = data{}
n := node{
data: t,
}
print(n.data.string())
}

25
反射/值/main.go Normal file
View File

@ -0,0 +1,25 @@
package main
import (
"fmt"
"reflect"
)
//个type获取类型信息不同Value专注于对象实例数据读写
//接口变量恢复至对象 切是unaddressable的所以想要修改目标对象必须使用指针
//就算传入指针一样需要通过Elem获取目标对象。应为被接口储存的指针本身是不能寻址和进行设置操作的。
type User struct {
Name string
code int
}
func main() {
a := 100
va,vp := reflect.ValueOf(a),reflect.ValueOf(&a).Elem()
fmt.Println(va.CanAddr(),va.CanSet())
fmt.Println(vp.CanAddr(),vp.CanSet())
fmt.Println("---------------------")
//注意,不能对非导出字段直接进行设置操作,无论是当前包还是外包
}

35
反射/值/t1/main.go Normal file
View File

@ -0,0 +1,35 @@
package main
import (
"fmt"
"reflect"
"unsafe"
)
type User struct {
Name string
code int
}
/**
Value.Pointer 和Value.Int等方法类似将Value.data储存的数据转换为指针目标必须是指针类型
然而Unsafe返回任何CanAddr Value.data地址(相当于&去地址操作),比如Elem后的Value以及字段成员地址
以结构体里的指针类型字段为例Pointer返回该字段所保存的地址
而UnsafeAddr返回该字段自身的地址(结构对象地址+偏移量)
*/
func main() {
p := new(User)
v := reflect.ValueOf(p).Elem()
name := v.FieldByName("Name")
code := v.FieldByName("code")
fmt.Printf("name : canaddr =%v,canset = %v \n",name.CanSet(),name.CanSet())
fmt.Printf("code : canaddr =%v,canset = %v \n",code.CanSet(),code.CanSet())
if name.CanSet() {
name.SetString("Tom")
}
if code.CanAddr() {
*(*int)(unsafe.Pointer(code.UnsafeAddr())) = 100
}
fmt.Printf("%+v\n",*p)
}

View File

@ -0,0 +1,47 @@
package main
import (
"fmt"
"reflect"
)
/**
使用Interface方法进行类型推断和转换
*/
func main() {
type user struct {
Name string
Age int
}
type person struct {
Name string
Age int
}
u := user{
"q.yuhen",
60,
}
v := reflect.ValueOf(&u)
if !v.CanInterface() {
println("CanInterface :fail.")
return
}
p,ok := v.Interface().(*person)
if !ok {
println("Interface :fail.")
} else {
p.Age++
fmt.Printf("%+v\n",u)
return
}
m,ok := v.Interface().(*user)
if !ok {
println("Interface :fail.")
return
}
m.Age++
fmt.Printf("%+v\n",u)
}

View File

@ -0,0 +1,34 @@
package main
import (
"fmt"
"reflect"
"unsafe"
)
func main() {
c := make(chan int, 4)
v := reflect.ValueOf(c)
//直接使用Value.Int ,Bool等方法进行类型转换但失败是会引发panic且不支持ok-idiom
if v.TrySend(reflect.ValueOf(100)) {
fmt.Println(v.TryRecv())
}
//接口有两种nil状态这一直是一个潜在的麻烦。解决方法是使用isNil判断值是否为nil
var a interface{} = nil
var b interface{} = (*int)(nil)
fmt.Println("---------------------------")
fmt.Println(a == nil)
fmt.Println(b == nil, reflect.ValueOf(b).IsNil())
//也可以直接使用unsafe转换后直接判断iface.data是否为零值
fmt.Println("---------------------------")
iface := (*[2]uintptr)(unsafe.Pointer(&b))
fmt.Println(iface,iface[1]==0)
//Value里的某些方法并未实现ok-idom或者error所以得自行判断返回的是否为Zero Value 。
fmt.Println("---------------------------")
v2 := reflect.ValueOf(struct {
name string
}{})
println(v2.FieldByName("name").IsValid())
println(v2.FieldByName("xxx").IsValid())
}

View File

@ -0,0 +1,32 @@
package bench
import (
"reflect"
"testing"
)
type Data struct {
X int
}
var d = new(Data)
func set(x int) {
d.X = x
}
var v = reflect.ValueOf(d).Elem()
var f = v.FieldByName("X")
func rset(x int) {
f.Set(reflect.ValueOf(x))
}
func BenchmarkSet(b *testing.B) {
for i := 0; i < b.N; i++ {
set(100)
}
}
func BenchmarkRSet(b *testing.B) {
for i := 0; i < b.N; i++ {
rset(100)
}
}

View File

@ -0,0 +1,21 @@
package bench
import "reflect"
type Data struct {
X int
}
func (x *Data) Inc() {
x.X++
}
var d = new(Data)
var v = reflect.ValueOf(d).Elem()
var m = v.MethodByName("Inc")
func call() {
d.Inc()
}
func rCal() {
m.Call(nil)
}

View File

@ -0,0 +1,34 @@
package main
import (
"fmt"
"reflect"
)
type X struct {
}
func (X) Format(s string,a ...interface{}) string {
return fmt.Sprintf(s,a...)
}
func main() {
var a X
v := reflect.ValueOf(&a)
m := v.MethodByName("Format")
out := m.Call([]reflect.Value{
reflect.ValueOf("%s = %d"),
reflect.ValueOf("x"),
reflect.ValueOf(100),
})
fmt.Println(out)
//对于变参用CallSlice方便一些
out = m.CallSlice([]reflect.Value{
reflect.ValueOf("%s = %d"),
reflect.ValueOf([]interface{}{"x",100}),
})
fmt.Println(out)
//无法调用非导出方法,升值无法获取有效地址
}

28
反射/方法/main.go Normal file
View File

@ -0,0 +1,28 @@
package main
import (
"fmt"
"reflect"
)
type X struct {
}
func (X) Test(x,y int) (int,error) {
return x+y,fmt.Errorf("err:%d",x+y)
}
func main() {
var a X
v:=reflect.ValueOf(&a)
m := v.MethodByName("Test")
in := []reflect.Value {
reflect.ValueOf(1),
reflect.ValueOf(2),
}
out := m.Call(in)
for _,v := range out{
fmt.Println(v)
}
}

48
反射/构建/main.go Normal file
View File

@ -0,0 +1,48 @@
package main
import (
"reflect"
"strings"
)
/**
反射哭提供了内置函数make和new的对应插座其中最有意思的就是MakeFunc
可用它来实现通用末班适应不同数据类型
*/
func add(args []reflect.Value) (result []reflect.Value) {
if len(args) == 0 {
return nil
}
var ret reflect.Value
switch args[0].Kind() {
case reflect.Int:
n := 0
for _,a := range args{
n += int(a.Int())
}
ret = reflect.ValueOf(n)
case reflect.String:
ss := make([]string,0,len(args))
for _,s :=range args{
ss = append(ss,s.String())
}
ret = reflect.ValueOf(strings.Join(ss,""))
}
result = append(result,ret)
return
}
func makeAdd(fptr interface{}) {
fn := reflect.ValueOf(fptr).Elem()
v := reflect.MakeFunc(fn.Type(),add)
fn.Set(v)
}
//如果语言支持泛型就不需要这么麻烦了。。。。
func main() {
var intAdd func(x,y int) int
var strAdd func(a,b string) string
makeAdd(&intAdd)
makeAdd(&strAdd)
println(intAdd(100,200))
println(strAdd("hello, ","world"))
}

111
反射/类型/main.go Normal file
View File

@ -0,0 +1,111 @@
package main
import (
"fmt"
"net/http"
"reflect"
)
type X int
type Y int
type user struct {
name string
age int
}
type manager struct {
user
title string
}
type A int
type B struct {
A
}
func (A) av() {
}
func (*A) ap() {
}
func (B) bv() {
}
func (*B) bp() {
}
func main() {
var a,b X = 100,200
var c Y = 300
ta,tb,tc := reflect.TypeOf(a),reflect.TypeOf(b),reflect.TypeOf(c)
//类型判断
fmt.Println(ta==tb,ta==tc)
fmt.Println(ta.Kind() == tc.Kind())
fmt.Println("---------------------")
//直接够着一些基础复合类型
m := reflect.ArrayOf(10,reflect.TypeOf(byte(0)))
n := reflect.MapOf(reflect.TypeOf(""),reflect.TypeOf(byte(0)))
fmt.Println(m,n)
fmt.Println("---------------------")
//传入对象应该区分基础类型和指针类型,因为他们不属于同一类型
x :=100
tx,tp:=reflect.TypeOf(x),reflect.TypeOf(&x)
fmt.Println(tx,tp)
fmt.Println(tx.Kind(),tp.Kind())
//方法Elem()返回指针,数组,切片,字典(值)或者通道的基类型
fmt.Println(tx ==tp.Elem())
fmt.Println("---------------------")
fmt.Println(reflect.TypeOf(map[string]int{}).Elem())
fmt.Println(reflect.TypeOf([]int32{}).Elem())
fmt.Println("---------------------")
//只有获取结构体指针的基类型后,才能遍历他的字段
var mgr manager
t := reflect.TypeOf(&mgr)
if t.Kind() == reflect.Ptr {
t = t.Elem()
}
for i := 0; i < t.NumField(); i++ {
f := t.Field(i)
fmt.Println(f.Name,f.Type,f.Offset)
if f.Anonymous {
//输出匿名字段结构
for x:=0;x<f.Type.NumField();x++{
af := f.Type.Field(x)
fmt.Println(" ",af.Name,af.Type)
}
}
}
//对于匿名字段,可以用多级索引(按定义顺序)直接访问
//FieldByName 不支持多级名称,如有同名遮蔽,须通过匿名字段二次获取
fmt.Println("---------------------")
t1 := reflect.TypeOf(mgr)
name ,_ := t1.FieldByName("name")
fmt.Println(name.Name,name.Type)
age := t1.FieldByIndex([]int{0,1})
fmt.Println(age.Name,age.Type)
fmt.Println("---------------------")
var bb B
tt := reflect.TypeOf(&bb)
s := []reflect.Type{tt,tt.Elem()}
for _ ,t := range s {
fmt.Println(t,":")
for i := 0; i < tt.NumMethod(); i++ {
fmt.Println(" ",tt.Method(i))
}
}
//反射可以探知当前包或者外包IDE费到处结构成员
//相对于reflect而言当前包和外包都是“外包“。
fmt.Println("---------------------")
var ss http.Server
ttt := reflect.TypeOf(ss)
for i := 0; i < ttt.NumField(); i++ {
fmt.Println(ttt.Field(i).Name)
}
}

View File

@ -0,0 +1,40 @@
package main
import (
"fmt"
"reflect"
)
//可以用反射提取struct tag还能自动分解。其常用于ORM映射或数据格式验证
type user struct {
name string `field:"name" type:"varchar(50)"`
age int `field:"age" type:"int"`
}
type X int
func (X) String() string {
return ""
}
func main() {
var u user
t := reflect.TypeOf(u)
for i := 0; i < t.NumField(); i++ {
f := t.Field(i)
fmt.Printf("%s:%s %s\n", f.Name, f.Tag.Get("field"), f.Tag.Get("type"))
}
//辅助判断方法 ImplementsConvertibleToAssignableTo 都是运行期间进行动态调用和赋值所必须的
fmt.Println("---------------------")
var a X
t1 := reflect.TypeOf(a)
//Implements 不能直接使用类型作为参数
st := reflect.TypeOf((*fmt.Stringer)(nil)).Elem()
fmt.Println(t1.Implements(st))
it := reflect.TypeOf(0)
fmt.Println(t1.ConvertibleTo(it))
fmt.Println(t1.AssignableTo(st),t1.AssignableTo(it))
}

View File

@ -0,0 +1 @@
package 调用接口方法

View File

@ -0,0 +1,40 @@
package main
import (
"math"
"runtime"
"sync"
)
func count() {
x := 0
for i := 0; i < math.MaxUint32; i++ {
x += i
}
println(x)
}
//循环执行
func test(n int) {
for i := 0; i < n; i++ {
count()
}
}
//并发执行
func test2(n int) {
var wg sync.WaitGroup
wg.Add(n)
for i:= 0;i<n;i++ {
go func() {
count()
wg.Done()
}()
}
wg.Wait()
}
func main() {
n := runtime.GOMAXPROCS(0)
//test(n)
test2(n)
}

View File

@ -0,0 +1,28 @@
package main
import "runtime"
func main() {
exit := make(chan struct{})
go func() {
defer close(exit)
defer println("a")
func() {
defer func() {
println("b", recover() == nil)
}()
func() {
println("c")
//立即终止当前任务,运行时确保所有已经注册延迟调用被执行。
//该函数不会印象其他并发任务不会发生panic自然而无法捕获
runtime.Goexit()
println("c done.")
}()
println("b done.")
}()
println("a done.")
}()
<- exit
println("main exit.")
}

View File

@ -0,0 +1,24 @@
package main
import "runtime"
func main() {
runtime.GOMAXPROCS(1)
exit := make(chan struct{})
go func() {
defer close(exit)
go func() {
println("b")
}()
for i := 0; i < 4; i++ {
println("a:",i)
if i==2 {
//暂停,释放线程去执行其他任务。当前任务被放回队列,等待下次调度时恢复执行
runtime.Gosched()
}
}
}()
<-exit
}

View File

@ -0,0 +1,25 @@
package main
import (
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup
var gs [5]struct{
id int
result int
}
for i := 0; i < len(gs); i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
gs[id].id = id
gs[id].result = (id +1)*100
}(i)
}
wg.Wait()
fmt.Printf("%+v \n",gs)
}

View File

@ -0,0 +1,25 @@
package main
import (
"runtime"
"sync"
"time"
)
func main() {
runtime.GOMAXPROCS(4)
var wg sync.WaitGroup
wg.Add(1)
time.Sleep(time.Second /10)
go func() {
wg.Wait()
println("main exit..")
}()
go func() {
time.Sleep(time.Second)
println("done.")
wg.Done()
}()
wg.Wait()
println("main exit ...")
}

View File

@ -0,0 +1,19 @@
package main
import "time"
func main() {
exit := make(chan string)
go func() {
time.Sleep(time.Second)
println("goroutline done.")
//close(exit)
exit <- "enen"
}()
println("main ...")
<-exit
println("main exit ...")
}

20
并发/通道/main.go Normal file
View File

@ -0,0 +1,20 @@
package main
func main() {
//结束事件通道
done := make(chan struct{})
//数据传输通道
c := make(chan string)
go func() {
//消息接收
s := <- c
println(s)
close(done)
}()
println("main ..")
//消息发送
c <- "hi"
//阻塞,知道有数据或者通道关闭
<- done
}

View File

@ -0,0 +1,33 @@
package main
import "sync"
/**
通道默认是双向的并不会区分发送和接收端但某些时候
我们可以限制收发操作的方向来获取更严谨的操作逻辑
尽管可用make创建单向通道但那没有任何意义
通常使用类型转换来获取单向通道并分别富裕操作双方
*/
func main() {
var wg sync.WaitGroup
wg.Add(2)
c:=make(chan int)
var send chan <- int = c
var recv <- chan int =c
go func() {
defer wg.Done()
for x:=range recv{
println(x)
}
}()
go func() {
defer wg.Done()
defer close(c)
for i := 0; i < 3; i++ {
send <- i
}
}()
wg.Wait()
}

View File

@ -0,0 +1,22 @@
package main
/**
不能在单向通道上逆向操作
*/
func main() {
c := make(chan int,2)
//单向通道
var send chan <- int = c
var recv <- chan int =c
//不能逆向啊哦做
//<- send
//recv <- 1
send <- 1
<-recv
//close不能用于接收端
//close(recv)
//无法将单向通道重新转换回去
//var b chan int
//b =(chan int)(recv)
//b = (chan int)(send)
}

View File

@ -0,0 +1,33 @@
package main
import (
"fmt"
"unsafe"
)
func main() {
//通道缓冲区三个
//同步模式必须有配对操作的goroutline出现否则会一直阻塞
//异步模式在缓冲区未满或者数据未读完之前不会阻塞
//多数时候,异步通道有助于提升性能,减少排队阻塞
c := make(chan int,3)
c <-1
c <-2
println(<-c)
println(<-c)
//缓冲区大小仅仅是内部属性不属于类型组成部分。另外通道变量本身就是指针可用相等操作符判断是否为同一对象或nil
//虽然可传递指针来避免数据复制,但需要额外注意数据安全
var a,b = make(chan int ,3),make(chan int)
var d chan bool
println(a ==b)
println(d ==nil)
fmt.Printf("%p,%d \n",a,unsafe.Sizeof(a))
a <- 1
a <- 2
//内置函数cap和len返回缓冲区大小和当前已缓冲区大小和当前已缓冲数量
//而对于同步通道都返回0据此可以判断通道是异步还是同步
println("a:",len(a),cap(a))
println("b:",len(b),cap(b))
}

View File

@ -0,0 +1,23 @@
package main
func main() {
//除了使用简单的发送和接收操作符外还可以用ok-idom 或range模式处理数据
done := make(chan struct{})
c := make(chan int)
go func() {
defer close(done)
for{
x,ok := <-c
if !ok {
return
}
println(x)
}
}()
c <- 1
c <- 2
c <- 3
<- done
}

View File

@ -0,0 +1,19 @@
package main
func main() {
done := make(chan struct{})
c := make(chan int)
//循环接收数据,range模式更加简洁
go func() {
defer close(done)
//循环获取消息直到通道关闭
for x := range c{
println(x)
}
}()
c <- 1
c <- 2
c <- 3
close(c)
<- done
}

View File

@ -0,0 +1,19 @@
package main
/**
对于close或nil通道发送或者接收操作都有相应规则
- 向已经关闭通道发送数据引发panic
- 从已关闭通道接收数据返回已缓冲数据或者零值
- 无论收发nil通道都会阻塞
- 重复关闭或关闭nil通道会引发panic错误
*/
func main() {
c := make(chan int,3)
c <- 10
c <- 20
close(c)
for i := 0; i < cap(c)+1; i++ {
x,ok:= <-c
println(i,":",ok,x)
}
}

View File

@ -0,0 +1,48 @@
package main
import "sync"
func main() {
var wg sync.WaitGroup
wg.Add(2)
a,b := make(chan int),make(chan int)
go func() {
defer wg.Done()
for {
var(
name string
x int
ok bool
)
select {
case x,ok= <-a:
name ="a"
case x,ok = <-b:
name ="b"
}
if !ok {
return
}
println(name,x)
}
}()
go func() {
defer wg.Done()
defer close(a)
defer close(b)
for i := 0; i < 10; i++ {
select {
case a<- i:
case b <- i*10:
}
}
}()
wg.Wait()
}

View File

@ -0,0 +1,56 @@
package main
import "sync"
/**
等全部通道消息处理结束(closed),可将已完成通道设置为nil
这样他就会被阻塞不在被select选中
*/
func main() {
var wg sync.WaitGroup
wg.Add(2)
a,b := make(chan int),make(chan int)
go func() {
defer wg.Done()
for {
select {
case x,ok := <-a:
if !ok {
a =nil
break
}
println("a",x)
case x,ok := <-b:
if !ok {
b =nil
break
}
println("b",x)
}
if a == nil && b== nil {
return
}
}
}()
go func() {
defer wg.Done()
defer close(a)
for i := 0; i < 3; i++ {
a <- i
}
}()
go func() {
defer wg.Done()
defer close(b)
for i := 0; i < 5; i++ {
b <- i *10
}
}()
wg.Wait()
}

View File

@ -0,0 +1,32 @@
package main
import (
"sync"
"time"
)
/**
一次性事件使用close效率更好没有多余的开销连续或多样性事务
可传递不同数据标志实现还可以使用sync.Cond实现单播或广播事件
*/
func main() {
var wg sync.WaitGroup
ready := make(chan struct{})
for i := 0; i < 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
//运动员准备就绪
println(id,":ready.")
//等待发令
<- ready
println(id,":running...")
}(i)
}
time.Sleep(time.Second)
println("ready? Go!")
//砰
close(ready)
wg.Wait()
}

5
归并/main.go Normal file
View File

@ -0,0 +1,5 @@
package main
func main() {
}

58
接口/main.go Normal file
View File

@ -0,0 +1,58 @@
package main
import "fmt"
//定义接口
type Printer interface {
//打印方法
Print(interface{})
}
//定义函数为类型
type FuncCaller func(p interface{})
//实现Printer的Print方法
func (FuncCaller FuncCaller) Print(p interface{}) {
//调用funcCaller函数本体
FuncCaller(p)
}
type Cat interface {
CatchMouse()
}
type Dog interface {
Bark()
}
type CatDog struct {
Name string
}
func (catDog *CatDog)CatchMouse() {
fmt.Printf("%v caught the mouse and ate it!\n",catDog.Name)
}
func (catDog CatDog) Bark() {
fmt.Printf("%v barked loudly!\n",catDog.Name)
}
func main() {
var printer Printer
//将李明函数强制转为FuncCaller赋值给printer
printer = FuncCaller(func(p interface{}) {
fmt.Println(p)
})
printer.Print("Golang is Good!")
fmt.Println("-----------------")
catDog := &CatDog{
"Lucy",
}
var cat Cat
cat = catDog
cat.CatchMouse()
fmt.Println("-----------------")
var dog Dog
dog = catDog
dog.Bark()
fmt.Println("-----------------")
catDog.CatchMouse()
catDog.Bark()
fmt.Println("-----------------")
sayHello()
}

7
接口/sayHello.go Normal file
View File

@ -0,0 +1,7 @@
package main
import "fmt"
func sayHello() {
fmt.Println("Hello World")
}

69
方法集/main.go Normal file
View File

@ -0,0 +1,69 @@
package main
import (
"fmt"
"reflect"
"sync"
)
type S struct {
}
type T struct {
S
}
func (S) svVal() {
}
func (*S)sPtr() {
}
func (T) tVal() {
}
func (*T) tPtr() {
}
func methodSet(a interface{}) {
t := reflect.TypeOf(a)
for i,n := 0,t.NumMethod();i <n ;i++ {
m := t.Method(i)
fmt.Println(m.Name,m.Type)
}
}
type N int
func (n N) test() {
fmt.Printf("test.n: %p, %d \n",&n,n)
}
func main() {
var mu sync.Mutex
var wg sync.WaitGroup
wg.Add(10)
m :=0
mu.Lock()
for i:= 0;i<200;i++ {
m++
}
mu.Unlock()
var t T
methodSet(t)
println("--------------------")
methodSet(&t)
var n N = 5
fmt.Printf("main.n: %p, %d \n",&n,n)
f1 := N.test
f1(n)
f2 := (*N).test
f2(&n)
N.test(n)
(*N).test(&n)
}

1
析构/main.go Normal file
View File

@ -0,0 +1 @@
package 析构

1
测试/main_test.go Normal file
View File

@ -0,0 +1 @@
package 测试

View File

@ -0,0 +1 @@
package 性能测试

View File

@ -0,0 +1 @@
package 测试表

View File

@ -0,0 +1 @@
package 类型定义和别名区别

60
类型转换/main.go Normal file
View File

@ -0,0 +1,60 @@
package main
import "fmt"
type data int
func (d data) String() string {
return fmt.Sprintf("data:%d",d)
}
type FuncString func() string
func (f FuncString) String() string {
return f()
}
/*
type z int
func init() {
var _ fmt.Stringer = z(0)
}
*/
func main() {
var d data = 15
var x interface{} =d
//转会为更具体的接口类型
if n,ok := x.(fmt.Stringer); ok{
fmt.Println(n)
}
//转换回原生类型
if d2,ok := x.(data);ok {
fmt.Println(d2)
}
//main.data is not error: missing method Error
//e := x.(error)
//fmt.Println(e)
//使用ok-idion模式即便转换失败也不会ying'fa
var m interface{} = func(x int) string {
return fmt.Sprintf("d:%d",x)
}
switch v:= m.(type){
case nil:
println("nil")
case *int:
println(*v)
case func(int) string:
println(v(100))
case fmt.Stringer:
println(v)
default:
println("unknown")
}
var t fmt.Stringer = FuncString(func() string {
return "hello world"
})
fmt.Println(t)
}

5
编译/test.go Normal file
View File

@ -0,0 +1,5 @@
package main
func main() {
println("hello world")
}

73
运行机制/main.go Normal file
View File

@ -0,0 +1,73 @@
package main
import (
"go/ast"
"log"
"unsafe"
)
type itab struct {
inter *ast.InterfaceType
fun [1]uintptr
}
type iface struct {
//类型信息
tab *itab
//实际对象指针
data unsafe.Pointer
}
type data struct {
x int
}
type TestError struct {}
func (* TestError) Error() string {
return "error"
}
func test(x int) (int,error) {
//错误做法 err并不为nil
//var err *TestError
//if x < 0 {
// err = new(TestError)
// x =0
//} else {
// x +=100
//}
//return x,err
//正确做法 明确返回nil
if x < 0 {
return 0,new(TestError)
}
return x+100,nil
}
func main() {
d := data{100}
//将对象复制给接口 复制的对象不能修改
var t interface{} =d
println(t.(data).x)
//不能赋值 t.(data).x =100
var t1 interface{} = &d
println(t1.(*data).x)
t1.(*data).x = 30
println(t1.(*data).x)
//只有当接口变量内部的两个指针itab,data都为nil时接口才为nil
var a interface{} = nil
// data为nil但是b还包含类的信息
var b interface{} = (*int)(nil)
println(a==nil,b==nil) //true false
x,err :=test(100)
if err != nil {
log.Fatalln("err != nil")
}
println(x)
}

63
错误处理/main.go Normal file
View File

@ -0,0 +1,63 @@
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
type Promise struct {
wg sync.WaitGroup
res string
err error
}
func NewPromise(f func() (string, error)) *Promise {
p := &Promise{}
p.wg.Add(1)
go func() {
p.res, p.err = f()
p.wg.Done()
}()
return p
}
func (p *Promise) Then(r func(string), e func(error)) (*Promise){
go func() {
p.wg.Wait()
if p.err != nil {
e(p.err)
return
}
r(p.res)
}()
return p
}
func exampleTicker() (string, error) {
for i := 0; i < 3; i++ {
fmt.Println(i)
<-time.Tick(time.Second * 1)
}
rand.Seed(time.Now().UTC().UnixNano())
r:=rand.Intn(100)%2
fmt.Println(r)
if r != 0 {
return "hello, world", nil
} else {
return "", fmt.Errorf("error")
}
}
func main() {
doneChan := make(chan int)
var p = NewPromise(exampleTicker)
p.Then(func(result string) {
fmt.Println(result); doneChan <- 1 },
func(err error) {
fmt.Println(err); doneChan <-1
})
<-doneChan
}

View File

@ -0,0 +1,21 @@
The MIT License (MIT)
Copyright (c) 2014 fanliao
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@ -0,0 +1,302 @@
[home]: github.com/fanliao/go-promise
go-promise is a Go promise and future library.
Inspired by [Futures and promises]()
## Installation
$ go get github.com/fanliao/go-promise
## Features
* Future and Promise
* ```NewPromise()```
* ```promise.Future```
* Promise and Future callbacks
* ```.OnSuccess(v interface{})```
* ```.OnFailure(v interface{})```
* ```.OnComplete(v interface{})```
* ```.OnCancel()```
* Get the result of future
* ```.Get() ```
* ```.GetOrTimeout()```
* ```.GetChan()```
* Set timeout for future
* ```.SetTimeout(ms)```
* Merge multiple promises
* ```WhenAll(func1, func2, func3, ...)```
* ```WhenAny(func1, func2, func3, ...)```
* ```WhenAnyMatched(func1, func2, func3, ...)```
* Pipe
* ```.Pipe(funcWithDone, funcWithFail)```
* Cancel the future
* ```.Cancel()```
* ```.IsCancelled()```
* Create future by function
* ```Start(func() (r interface{}, e error))```
* ```Start(func())```
* ```Start(func(canceller Canceller) (r interface{}, e error))```
* ```Start(func(canceller Canceller))```
* Immediate wrappers
* ```Wrap(interface{})```
* Chain API
* ```Start(taskDone).Done(done1).Fail(fail1).Always(alwaysForDone1).Pipe(f1, f2).Done(done2)```
## Quick start
### Promise and Future
```go
import "github.com/fanliao/go-promise"
import "net/http"
p := promise.NewPromise()
p.OnSuccess(func(v interface{}) {
...
}).OnFailure(func(v interface{}) {
...
}).OnComplete(func(v interface{}) {
...
})
go func(){
url := "http://example.com/"
resp, err := http.Get(url)
defer resp.Body.Close()
if err != nil {
p.Reject(err)
}
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
p.Reject(err)
}
p.Resolve(body)
}()
r, err := p.Get()
```
If you want to provide a read-only view, you can get a future variable:
```go
p.Future //cannot Resolve, Reject for a future
```
Can use Start function to submit a future task, it will return a future variable, so cannot Resolve or Reject the future outside of Start function:
```go
import "github.com/fanliao/go-promise"
import "net/http"
task := func()(r interface{}, err error){
url := "http://example.com/"
resp, err := http.Get(url)
defer resp.Body.Close()
if err != nil {
return nil, err
}
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
return body, nil
}
f := promise.Start(task).OnSuccess(func(v interface{}) {
...
}).OnFailure(func(v interface{}) {
...
}).OnComplete(func(v interface{}) {
...
})
r, err := f.Get()
```
### Get the result of future
Please note the process will be block until the future task is completed
```go
f := promise.Start(func() (r interface{}, err error) {
return "ok", nil
})
r, err := f.Get() //return "ok", nil
f := promise.Start(func() (r interface{}, err error) {
return nil, errors.New("fail")
})
r, err := f.Get() //return nil, errorString{"fail"}
```
Can wait until timeout
```go
f := promise.Start(func() (r interface{}, err error) {
time.Sleep(500 * time.Millisecond)
return "ok", nil
})
r, err, timeout := f.GetOrTimeout(100) //return nil, nil, true
```
### Merge multiple futures
Creates a future that will be completed when all of the supplied future are completed.
```go
task1 := func() (r interface{}, err error) {
return "ok1", nil
}
task2 := func() (r interface{}, err error) {
return "ok2", nil
}
f := promise.WhenAll(task1, task2)
r, err := f.Get() //return []interface{}{"ok1", "ok2"}
```
If any future is failure, the future returnd by WhenAll will be failure
```go
task1 := func() (r interface{}, err error) {
return "ok", nil
}
task2 := func() (r interface{}, err error) {
return nil, errors.New("fail2")
}
f := promise.WhenAll(task1, task2)
r, ok := f.Get() //return nil, *AggregateError
```
Creates a future that will be completed when any of the supplied tasks is completed.
```go
task1 := func() (r interface{}, err error) {
return "ok1", nil
}
task2 := func() (r interface{}, err error) {
time.Sleep(200 * time.Millisecond)
return nil, errors.New("fail2")
}
f := promise.WhenAny(task1, task2)
r, err := f.Get() //return "ok1", nil
```
Also can add a predicate function by WhenAnyMatched, the future that will be completed when any of the supplied tasks is completed and match the predicate.
```go
task1 := func() (r interface{}, err error) {
time.Sleep(200 * time.Millisecond)
return "ok1", nil
}
task2 := func() (r interface{}, err error) {
return "ok2", nil
}
f := promise.WhenAnyMatched(func(v interface{}) bool{
return v == "ok1"
}, task1, task2)
r, err := f.Get() //return "ok1", nil
```
### Promise pipelining
```go
task1 := func() (r interface{}, err error) {
return 10, nil
}
task2 := func(v interface{}) (r interface{}, err error) {
return v.(int) * 2, nil
}
f := promise.Start(task1).Pipe(task2)
r, err := f.Get() //return 20
```
### Cancel the future or set timeout
If need cancel a future, can pass a canceller object to task function
```go
import "github.com/fanliao/go-promise"
import "net/http"
p := promise.NewPromise().EnableCanceller()
go func(canceller promise.Canceller){
for i < 50 {
if canceller.IsCancelled() {
return
}
time.Sleep(100 * time.Millisecond)
}
}(p.Canceller())
f.Cancel()
r, err := p.Get() //return nil, promise.CANCELLED
fmt.Println(p.Future.IsCancelled()) //true
```
Or can use Start to submit a future task which can be cancelled
```go
task := func(canceller promise.Canceller) (r interface{}, err error) {
for i < 50 {
if canceller.IsCancelled() {
return 0, nil
}
time.Sleep(100 * time.Millisecond)
}
return 1, nil
}
f := promise.Start(task1)
f.Cancel()
r, err := f.Get() //return nil, promise.CANCELLED
fmt.Println(f.IsCancelled()) //true
```
When call WhenAny() function, if a future is completed correctly, then will try to check if other futures enable cancel. If yes, will request cancelling all other futures.
You can also set timeout for a future
```go
task := func(canceller promise.Canceller) (r interface{}, err error) {
time.Sleep(300 * time.Millisecond)
if !canceller.IsCancelled(){
fmt.Println("Run done")
}
return
}
f := promise.Start(task).OnCancel(func() {
fmt.Println("Future is cancelled")
}).SetTimeout(100)
r, err := f.Get() //return nil, promise.CANCELLED
fmt.Println(f.IsCancelled()) //print true
```
## Document
* [GoDoc at godoc.org](http://godoc.org/github.com/fanliao/go-promise)
## License
go-promise is licensed under the MIT Licence, (http://www.apache.org/licenses/LICENSE-2.0.html).

View File

@ -0,0 +1,399 @@
/*
Package promise provides a complete promise and future implementation.
A quick start sample:
fu := Start(func()(resp interface{}, err error){
resp, err = http.Get("http://example.com/")
return
})
//do somthing...
resp, err := fu.Get()
*/
package promise
import (
"errors"
"fmt"
"sync/atomic"
"time"
"unsafe"
)
type callbackType int
const (
CALLBACK_DONE callbackType = iota
CALLBACK_FAIL
CALLBACK_ALWAYS
CALLBACK_CANCEL
)
//pipe presents a promise that will be chain call
type pipe struct {
pipeDoneTask, pipeFailTask func(v interface{}) *Future
pipePromise *Promise
}
//getPipe returns piped Future task function and pipe Promise by the status of current Promise.
func (this *pipe) getPipe(isResolved bool) (func(v interface{}) *Future, *Promise) {
if isResolved {
return this.pipeDoneTask, this.pipePromise
} else {
return this.pipeFailTask, this.pipePromise
}
}
//Canceller is used to check if the future is cancelled
//It usually be passed to the future task function
//for future task function can check if the future is cancelled.
type Canceller interface {
IsCancelled() bool
Cancel()
}
//canceller provides an implement of Canceller interface.
//It will be passed to future task function as paramter
type canceller struct {
f *Future
}
//Cancel sets Future task to CANCELLED status
func (this *canceller) Cancel() {
this.f.Cancel()
}
//IsCancelled returns true if Future task is cancelld, otherwise false.
func (this *canceller) IsCancelled() (r bool) {
return this.f.IsCancelled()
}
//futureVal stores the internal state of Future.
type futureVal struct {
dones, fails, always []func(v interface{})
cancels []func()
pipes []*pipe
r *PromiseResult
}
//Future provides a read-only view of promise,
//the value is set by using Resolve, Reject and Cancel methods of related Promise
type Future struct {
Id int //Id can be used as identity of Future
final chan struct{}
//val point to futureVal that stores status of future
//if need to change the status of future, must copy a new futureVal and modify it,
//then use CAS to put the pointer of new futureVal
val unsafe.Pointer
}
//Canceller returns a canceller object related to future.
func (this *Future) Canceller() Canceller {
return &canceller{this}
}
//IsCancelled returns true if the promise is cancelled, otherwise false
func (this *Future) IsCancelled() bool {
val := this.loadVal()
if val != nil && val.r != nil && val.r.Typ == RESULT_CANCELLED {
return true
} else {
return false
}
}
//SetTimeout sets the future task will be cancelled
//if future is not complete before time out
func (this *Future) SetTimeout(mm int) *Future {
if mm == 0 {
mm = 10
} else {
mm = mm * 1000 * 1000
}
go func() {
<-time.After((time.Duration)(mm) * time.Nanosecond)
this.Cancel()
}()
return this
}
//GetChan returns a channel than can be used to receive result of Promise
func (this *Future) GetChan() <-chan *PromiseResult {
c := make(chan *PromiseResult, 1)
this.OnComplete(func(v interface{}) {
c <- this.loadResult()
}).OnCancel(func() {
c <- this.loadResult()
})
return c
}
//Get will block current goroutines until the Future is resolved/rejected/cancelled.
//If Future is resolved, value and nil will be returned
//If Future is rejected, nil and error will be returned.
//If Future is cancelled, nil and CANCELLED error will be returned.
func (this *Future) Get() (val interface{}, err error) {
<-this.final
return getFutureReturnVal(this.loadResult())
}
//GetOrTimeout is similar to Get(), but GetOrTimeout will not block after timeout.
//If GetOrTimeout returns with a timeout, timeout value will be true in return values.
//The unit of paramter is millisecond.
func (this *Future) GetOrTimeout(mm uint) (val interface{}, err error, timout bool) {
if mm == 0 {
mm = 10
} else {
mm = mm * 1000 * 1000
}
select {
case <-time.After((time.Duration)(mm) * time.Nanosecond):
return nil, nil, true
case <-this.final:
r, err := getFutureReturnVal(this.loadResult())
return r, err, false
}
}
//Cancel sets the status of promise to RESULT_CANCELLED.
//If promise is cancelled, Get() will return nil and CANCELLED error.
//All callback functions will be not called if Promise is cancalled.
func (this *Future) Cancel() (e error) {
return this.setResult(&PromiseResult{CANCELLED, RESULT_CANCELLED})
}
//OnSuccess registers a callback function that will be called when Promise is resolved.
//If promise is already resolved, the callback will immediately called.
//The value of Promise will be paramter of Done callback function.
func (this *Future) OnSuccess(callback func(v interface{})) *Future {
this.addCallback(callback, CALLBACK_DONE)
return this
}
//OnFailure registers a callback function that will be called when Promise is rejected.
//If promise is already rejected, the callback will immediately called.
//The error of Promise will be paramter of Fail callback function.
func (this *Future) OnFailure(callback func(v interface{})) *Future {
this.addCallback(callback, CALLBACK_FAIL)
return this
}
//OnComplete register a callback function that will be called when Promise is rejected or resolved.
//If promise is already rejected or resolved, the callback will immediately called.
//According to the status of Promise, value or error will be paramter of Always callback function.
//Value is the paramter if Promise is resolved, or error is the paramter if Promise is rejected.
//Always callback will be not called if Promise be called.
func (this *Future) OnComplete(callback func(v interface{})) *Future {
this.addCallback(callback, CALLBACK_ALWAYS)
return this
}
//OnCancel registers a callback function that will be called when Promise is cancelled.
//If promise is already cancelled, the callback will immediately called.
func (this *Future) OnCancel(callback func()) *Future {
this.addCallback(callback, CALLBACK_CANCEL)
return this
}
//Pipe registers one or two functions that returns a Future, and returns a proxy of pipeline Future.
//First function will be called when Future is resolved, the returned Future will be as pipeline Future.
//Secondary function will be called when Futrue is rejected, the returned Future will be as pipeline Future.
func (this *Future) Pipe(callbacks ...interface{}) (result *Future, ok bool) {
if len(callbacks) == 0 ||
(len(callbacks) == 1 && callbacks[0] == nil) ||
(len(callbacks) > 1 && callbacks[0] == nil && callbacks[1] == nil) {
result = this
return
}
//ensure all callback functions match the spec "func(v interface{}) *Future"
cs := make([]func(v interface{}) *Future, len(callbacks), len(callbacks))
for i, callback := range callbacks {
if c, ok1 := callback.(func(v interface{}) *Future); ok1 {
cs[i] = c
} else if c, ok1 := callback.(func() *Future); ok1 {
cs[i] = func(v interface{}) *Future {
return c()
}
} else if c, ok1 := callback.(func(v interface{})); ok1 {
cs[i] = func(v interface{}) *Future {
return Start(func() {
c(v)
})
}
} else if c, ok1 := callback.(func(v interface{}) (r interface{}, err error)); ok1 {
cs[i] = func(v interface{}) *Future {
return Start(func() (r interface{}, err error) {
r, err = c(v)
return
})
}
} else if c, ok1 := callback.(func()); ok1 {
cs[i] = func(v interface{}) *Future {
return Start(func() {
c()
})
}
} else if c, ok1 := callback.(func() (r interface{}, err error)); ok1 {
cs[i] = func(v interface{}) *Future {
return Start(func() (r interface{}, err error) {
r, err = c()
return
})
}
} else {
ok = false
return
}
}
for {
v := this.loadVal()
r := v.r
if r != nil {
result = this
if r.Typ == RESULT_SUCCESS && cs[0] != nil {
result = (cs[0](r.Result))
} else if r.Typ == RESULT_FAILURE && len(cs) > 1 && cs[1] != nil {
result = (cs[1](r.Result))
}
} else {
newPipe := &pipe{}
newPipe.pipeDoneTask = cs[0]
if len(cs) > 1 {
newPipe.pipeFailTask = cs[1]
}
newPipe.pipePromise = NewPromise()
newVal := *v
newVal.pipes = append(newVal.pipes, newPipe)
//use CAS to ensure that the state of Future is not changed,
//if the state is changed, will retry CAS operation.
if atomic.CompareAndSwapPointer(&this.val, unsafe.Pointer(v), unsafe.Pointer(&newVal)) {
result = newPipe.pipePromise.Future
break
}
}
}
ok = true
return
}
//result uses Atomic load to return result of the Future
func (this *Future) loadResult() *PromiseResult {
val := this.loadVal()
return val.r
}
//val uses Atomic load to return state value of the Future
func (this *Future) loadVal() *futureVal {
r := atomic.LoadPointer(&this.val)
return (*futureVal)(r)
}
//setResult sets the value and final status of Promise, it will only be executed for once
func (this *Future) setResult(r *PromiseResult) (e error) { //r *PromiseResult) {
defer func() {
if err := getError(recover()); err != nil {
e = err
fmt.Println("\nerror in setResult():", err)
}
}()
e = errors.New("Cannot resolve/reject/cancel more than once")
for {
v := this.loadVal()
if v.r != nil {
return
}
newVal := *v
newVal.r = r
//Use CAS operation to ensure that the state of Promise isn't changed.
//If the state is changed, must get latest state and try to call CAS again.
//No ABA issue in this case because address of all objects are different.
if atomic.CompareAndSwapPointer(&this.val, unsafe.Pointer(v), unsafe.Pointer(&newVal)) {
//Close chEnd then all Get() and GetOrTimeout() will be unblocked
close(this.final)
//call callback functions and start the Promise pipeline
if len(v.dones) > 0 || len(v.fails) > 0 || len(v.always) > 0 || len(v.cancels) > 0 {
go func() {
execCallback(r, v.dones, v.fails, v.always, v.cancels)
}()
}
//start the pipeline
if len(v.pipes) > 0 {
go func() {
for _, pipe := range v.pipes {
pipeTask, pipePromise := pipe.getPipe(r.Typ == RESULT_SUCCESS)
startPipe(r, pipeTask, pipePromise)
}
}()
}
e = nil
break
}
}
return
}
//handleOneCallback registers a callback function
func (this *Future) addCallback(callback interface{}, t callbackType) {
if callback == nil {
return
}
if (t == CALLBACK_DONE) ||
(t == CALLBACK_FAIL) ||
(t == CALLBACK_ALWAYS) {
if _, ok := callback.(func(v interface{})); !ok {
panic(errors.New("Callback function spec must be func(v interface{})"))
}
} else if t == CALLBACK_CANCEL {
if _, ok := callback.(func()); !ok {
panic(errors.New("Callback function spec must be func()"))
}
}
for {
v := this.loadVal()
r := v.r
if r == nil {
newVal := *v
switch t {
case CALLBACK_DONE:
newVal.dones = append(newVal.dones, callback.(func(v interface{})))
case CALLBACK_FAIL:
newVal.fails = append(newVal.fails, callback.(func(v interface{})))
case CALLBACK_ALWAYS:
newVal.always = append(newVal.always, callback.(func(v interface{})))
case CALLBACK_CANCEL:
newVal.cancels = append(newVal.cancels, callback.(func()))
}
//use CAS to ensure that the state of Future is not changed,
//if the state is changed, will retry CAS operation.
if atomic.CompareAndSwapPointer(&this.val, unsafe.Pointer(v), unsafe.Pointer(&newVal)) {
break
}
} else {
if (t == CALLBACK_DONE && r.Typ == RESULT_SUCCESS) ||
(t == CALLBACK_FAIL && r.Typ == RESULT_FAILURE) ||
(t == CALLBACK_ALWAYS && r.Typ != RESULT_CANCELLED) {
callbackFunc := callback.(func(v interface{}))
callbackFunc(r.Result)
} else if t == CALLBACK_CANCEL && r.Typ == RESULT_CANCELLED {
callbackFunc := callback.(func())
callbackFunc()
}
break
}
}
}

View File

@ -0,0 +1,278 @@
package promise
import (
"sync/atomic"
)
type anyPromiseResult struct {
result interface{}
i int
}
//Start start a goroutines to execute task function
//and return a Future that presents the result.
//If option paramter is true, the act function will be sync called.
//Type of act can be any of below four types:
// func() (r interface{}, err error):
// if err returned by act != nil or panic error, then Future will be rejected with error,
// otherwise be resolved with r.
// func():
// if act panic error, then Future will be rejected, otherwise be resolved with nil.
// func(c promise.Canceller) (r interface{}, err error):
// if err returned by act != nil or panic error,
// then Future will be rejected with err, otherwise be resolved with r.
// We can check c.IsCancelled() to decide whether need to exit act function
// func(promise.Canceller):
// if act panic error, then Future will be rejected with error, otherwise be resolved with nil.
// We can check c.IsCancelled() to decide whether need to exit act function.
// error:
// Future will be rejected with error immediately
// other value:
// Future will be resolved with value immediately
func Start(act interface{}, syncs ...bool) *Future {
pr := NewPromise()
if f, ok := act.(*Future); ok {
return f
}
if action := getAct(pr, act); action != nil {
if syncs != nil && len(syncs) > 0 && !syncs[0] {
//sync call
r, err := action()
if pr.IsCancelled() {
pr.Cancel()
} else {
if err == nil {
pr.Resolve(r)
} else {
pr.Reject(err)
}
}
} else {
//async call
go func() {
r, err := action()
if pr.IsCancelled() {
pr.Cancel()
} else {
if err == nil {
pr.Resolve(r)
} else {
pr.Reject(err)
}
}
}()
}
}
return pr.Future
}
//Wrap return a Future that presents the wrapped value
func Wrap(value interface{}) *Future {
pr := NewPromise()
if e, ok := value.(error); !ok {
pr.Resolve(value)
} else {
pr.Reject(e)
}
return pr.Future
}
//WhenAny returns a Future.
//If any Future is resolved, this Future will be resolved and return result of resolved Future.
//Otherwise will rejected with results slice returned by all Futures
//Legit types of act are same with Start function
func WhenAny(acts ...interface{}) *Future {
return WhenAnyMatched(nil, acts...)
}
//WhenAnyMatched returns a Future.
//If any Future is resolved and match the predicate, this Future will be resolved and return result of resolved Future.
//If all Futures are cancelled, this Future will be cancelled.
//Otherwise will rejected with a NoMatchedError included results slice returned by all Futures
//Legit types of act are same with Start function
func WhenAnyMatched(predicate func(interface{}) bool, acts ...interface{}) *Future {
if predicate == nil {
predicate = func(v interface{}) bool { return true }
}
fs := make([]*Future, len(acts))
for i, act := range acts {
fs[i] = Start(act)
}
nf, rs := NewPromise(), make([]interface{}, len(fs))
if len(acts) == 0 {
nf.Resolve(nil)
}
chFails, chDones := make(chan anyPromiseResult), make(chan anyPromiseResult)
go func() {
for i, f := range fs {
k := i
f.OnSuccess(func(v interface{}) {
defer func() { _ = recover() }()
chDones <- anyPromiseResult{v, k}
}).OnFailure(func(v interface{}) {
defer func() { _ = recover() }()
chFails <- anyPromiseResult{v, k}
}).OnCancel(func() {
defer func() { _ = recover() }()
chFails <- anyPromiseResult{CANCELLED, k}
})
}
}()
if len(fs) == 1 {
select {
case r := <-chFails:
if _, ok := r.result.(CancelledError); ok {
nf.Cancel()
} else {
nf.Reject(newNoMatchedError1(r.result))
}
case r := <-chDones:
if predicate(r.result) {
nf.Resolve(r.result)
} else {
nf.Reject(newNoMatchedError1(r.result))
}
}
} else {
go func() {
defer func() {
if e := recover(); e != nil {
nf.Reject(newErrorWithStacks(e))
}
}()
j := 0
for {
select {
case r := <-chFails:
rs[r.i] = getError(r.result)
case r := <-chDones:
if predicate(r.result) {
//try to cancel other futures
for _, f := range fs {
f.Cancel()
}
//close the channel for avoid the send side be blocked
closeChan := func(c chan anyPromiseResult) {
defer func() { _ = recover() }()
close(c)
}
closeChan(chDones)
closeChan(chFails)
//Resolve the future and return result
nf.Resolve(r.result)
return
} else {
rs[r.i] = r.result
}
}
if j++; j == len(fs) {
m := 0
for _, r := range rs {
switch val := r.(type) {
case CancelledError:
default:
m++
_ = val
}
}
if m > 0 {
nf.Reject(newNoMatchedError(rs))
} else {
nf.Cancel()
}
break
}
}
}()
}
return nf.Future
}
//WhenAll receives function slice and returns a Future.
//If all Futures are resolved, this Future will be resolved and return results slice.
//Otherwise will rejected with results slice returned by all Futures
//Legit types of act are same with Start function
func WhenAll(acts ...interface{}) (fu *Future) {
pr := NewPromise()
fu = pr.Future
if len(acts) == 0 {
pr.Resolve([]interface{}{})
return
}
fs := make([]*Future, len(acts))
for i, act := range acts {
fs[i] = Start(act)
}
fu = whenAllFuture(fs...)
return
}
//WhenAll receives Futures slice and returns a Future.
//If all Futures are resolved, this Future will be resolved and return results slice.
//If any Future is cancelled, this Future will be cancelled.
//Otherwise will rejected with results slice returned by all Futures.
//Legit types of act are same with Start function
func whenAllFuture(fs ...*Future) *Future {
wf := NewPromise()
rs := make([]interface{}, len(fs))
if len(fs) == 0 {
wf.Resolve([]interface{}{})
} else {
n := int32(len(fs))
cancelOthers := func(j int) {
for k, f1 := range fs {
if k != j {
f1.Cancel()
}
}
}
go func() {
isCancelled := int32(0)
for i, f := range fs {
j := i
f.OnSuccess(func(v interface{}) {
rs[j] = v
if atomic.AddInt32(&n, -1) == 0 {
wf.Resolve(rs)
}
}).OnFailure(func(v interface{}) {
if atomic.CompareAndSwapInt32(&isCancelled, 0, 1) {
//try to cancel all futures
cancelOthers(j)
//errs := make([]error, 0, 1)
//errs = append(errs, v.(error))
e := newAggregateError1("Error appears in WhenAll:", v)
wf.Reject(e)
}
}).OnCancel(func() {
if atomic.CompareAndSwapInt32(&isCancelled, 0, 1) {
//try to cancel all futures
cancelOthers(j)
wf.Cancel()
}
})
}
}()
}
return wf.Future
}

View File

@ -0,0 +1,903 @@
package promise
import (
"errors"
"fmt"
c "github.com/smartystreets/goconvey/convey"
"reflect"
"strconv"
"testing"
"time"
)
const (
TASK_END = "task be end,"
CALL_DONE = "callback done,"
CALL_FAIL = "callback fail,"
CALL_ALWAYS = "callback always,"
WAIT_TASK = "wait task end,"
GET = "get task result,"
DONE_Pipe_END = "task Pipe done be end,"
FAIL_Pipe_END = "task Pipe fail be end,"
)
// errorLinq is a trivial implementation of error.
type myError struct {
val interface{}
}
func (e *myError) Error() string {
return fmt.Sprintf("%v", e.val)
}
func newMyError(v interface{}) *myError {
return &myError{v}
}
func TestResolveAndReject(t *testing.T) {
c.Convey("When Promise is resolved", t, func() {
p := NewPromise()
go func() {
time.Sleep(50 * time.Millisecond)
p.Resolve("ok")
}()
c.Convey("Should return the argument of Resolve", func() {
r, err := p.Get()
c.So(r, c.ShouldEqual, "ok")
c.So(err, c.ShouldBeNil)
})
})
c.Convey("When Promise is rejected", t, func() {
p := NewPromise()
go func() {
time.Sleep(50 * time.Millisecond)
p.Reject(errors.New("fail"))
}()
c.Convey("Should return error", func() {
r, err := p.Get()
c.So(err, c.ShouldNotBeNil)
c.So(r, c.ShouldEqual, nil)
})
})
}
func TestCancel(t *testing.T) {
c.Convey("When Promise is cancelled", t, func() {
p := NewPromise()
go func() {
time.Sleep(50 * time.Millisecond)
p.Cancel()
}()
c.Convey("Should return CancelledError", func() {
r, err := p.Get()
c.So(r, c.ShouldBeNil)
c.So(err, c.ShouldEqual, CANCELLED)
c.So(p.IsCancelled(), c.ShouldBeTrue)
})
})
}
func TestGetOrTimeout(t *testing.T) {
timout := 50 * time.Millisecond
c.Convey("When Promise is unfinished", t, func() {
p := NewPromise()
go func() {
<-time.After(timout)
p.Resolve("ok")
}()
c.Convey("timeout should be true", func() {
r, err, timeout := p.GetOrTimeout(10)
c.So(timeout, c.ShouldBeTrue)
c.So(r, c.ShouldBeNil)
c.So(err, c.ShouldBeNil)
})
c.Convey("When Promise is resolved, the argument of Resolve should be returned", func() {
r, err, timeout := p.GetOrTimeout(50)
c.So(timeout, c.ShouldBeFalse)
c.So(r, c.ShouldEqual, "ok")
c.So(err, c.ShouldBeNil)
})
})
c.Convey("When Promise is rejected", t, func() {
p := NewPromise()
go func() {
<-time.After(timout)
p.Reject(errors.New("fail"))
}()
c.Convey("Should return error", func() {
r, err, timeout := p.GetOrTimeout(83)
c.So(timeout, c.ShouldBeFalse)
c.So(r, c.ShouldBeNil)
c.So(err, c.ShouldNotBeNil)
})
})
c.Convey("When Promise is cancelled", t, func() {
p := NewPromise()
go func() {
<-time.After(timout)
p.Cancel()
}()
c.Convey("Should return CancelledError", func() {
r, err, timeout := p.GetOrTimeout(83)
c.So(timeout, c.ShouldBeFalse)
c.So(r, c.ShouldBeNil)
c.So(err, c.ShouldEqual, CANCELLED)
c.So(p.IsCancelled(), c.ShouldBeTrue)
})
})
}
func TestGetChan(t *testing.T) {
timout := 50 * time.Millisecond
c.Convey("When Promise is resolved", t, func() {
p := NewPromise()
go func() {
<-time.After(timout)
p.Resolve("ok")
}()
c.Convey("Should receive the argument of Resolve from returned channel", func() {
fr, ok := <-p.GetChan()
c.So(fr.Result, c.ShouldEqual, "ok")
c.So(fr.Typ, c.ShouldEqual, RESULT_SUCCESS)
c.So(ok, c.ShouldBeTrue)
})
})
c.Convey("When Promise is rejected", t, func() {
p := NewPromise()
go func() {
<-time.After(timout)
p.Reject(errors.New("fail"))
}()
c.Convey("Should receive error from returned channel", func() {
fr, ok := <-p.GetChan()
c.So(fr.Result, c.ShouldNotBeNil)
c.So(fr.Typ, c.ShouldEqual, RESULT_FAILURE)
c.So(ok, c.ShouldBeTrue)
})
})
c.Convey("When Promise is cancelled", t, func() {
p := NewPromise()
go func() {
<-time.After(timout)
p.Cancel()
}()
c.Convey("Should receive CancelledError from returned channel", func() {
fr, ok := <-p.GetChan()
c.So(fr.Result, c.ShouldEqual, CANCELLED)
c.So(p.IsCancelled(), c.ShouldBeTrue)
c.So(fr.Typ, c.ShouldEqual, RESULT_CANCELLED)
c.So(ok, c.ShouldBeTrue)
})
c.Convey("Should receive CancelledError from returned channel at second time", func() {
fr, ok := <-p.GetChan()
c.So(fr.Result, c.ShouldEqual, CANCELLED)
c.So(p.IsCancelled(), c.ShouldBeTrue)
c.So(fr.Typ, c.ShouldEqual, RESULT_CANCELLED)
c.So(ok, c.ShouldBeTrue)
})
})
}
func TestFuture(t *testing.T) {
c.Convey("Future can receive return value and status but cannot change the status", t, func() {
var fu *Future
c.Convey("When Future is resolved", func() {
func() {
p := NewPromise()
go func() {
time.Sleep(50 * time.Millisecond)
p.Resolve("ok")
}()
fu = p.Future
}()
r, err := fu.Get()
c.So(r, c.ShouldEqual, "ok")
c.So(err, c.ShouldBeNil)
})
c.Convey("When Future is rejected", func() {
func() {
p := NewPromise()
go func() {
time.Sleep(50 * time.Millisecond)
p.Reject(errors.New("fail"))
}()
fu = p.Future
}()
r, err := fu.Get()
c.So(err, c.ShouldNotBeNil)
c.So(r, c.ShouldEqual, nil)
})
c.Convey("When Future is cancelled", func() {
func() {
p := NewPromise()
go func() {
time.Sleep(50 * time.Millisecond)
p.Cancel()
}()
fu = p.Future
}()
r, err := fu.Get()
c.So(err, c.ShouldNotBeNil)
c.So(r, c.ShouldEqual, nil)
})
})
}
func TestCallbacks(t *testing.T) {
timout := 50 * time.Millisecond
done, always, fail, cancel := false, false, false, false
p := NewPromise()
go func() {
<-time.After(timout)
p.Resolve("ok")
}()
c.Convey("When Promise is resolved", t, func() {
p.OnSuccess(func(v interface{}) {
done = true
c.Convey("The argument of Done should be 'ok'", t, func() {
c.So(v, c.ShouldEqual, "ok")
})
}).OnComplete(func(v interface{}) {
always = true
c.Convey("The argument of Always should be 'ok'", t, func() {
c.So(v, c.ShouldEqual, "ok")
})
}).OnFailure(func(v interface{}) {
fail = true
panic("Unexpected calling")
})
r, err := p.Get()
//The code after Get() and the callback will be concurrent run
//So sleep 52 ms to wait all callback be done
time.Sleep(52 * time.Millisecond)
c.Convey("Should call the Done and Always callbacks", func() {
c.So(r, c.ShouldEqual, "ok")
c.So(err, c.ShouldBeNil)
c.So(done, c.ShouldEqual, true)
c.So(always, c.ShouldEqual, true)
c.So(fail, c.ShouldEqual, false)
})
})
c.Convey("When adding the callback after Promise is resolved", t, func() {
done, always, fail := false, false, false
p.OnSuccess(func(v interface{}) {
done = true
c.Convey("The argument of Done should be 'ok'", func() {
c.So(v, c.ShouldEqual, "ok")
})
}).OnComplete(func(v interface{}) {
always = true
c.Convey("The argument of Always should be 'ok'", func() {
c.So(v, c.ShouldEqual, "ok")
})
}).OnFailure(func(v interface{}) {
fail = true
panic("Unexpected calling")
})
c.Convey("Should immediately run the Done and Always callbacks", func() {
c.So(done, c.ShouldEqual, true)
c.So(always, c.ShouldEqual, true)
c.So(fail, c.ShouldEqual, false)
})
})
var e *error = nil
done, always, fail = false, false, false
p = NewPromise()
go func() {
<-time.After(timout)
p.Reject(errors.New("fail"))
}()
c.Convey("When Promise is rejected", t, func() {
p.OnSuccess(func(v interface{}) {
done = true
panic("Unexpected calling")
}).OnComplete(func(v interface{}) {
always = true
c.Convey("The argument of Always should be error", t, func() {
c.So(v, c.ShouldImplement, e)
})
}).OnFailure(func(v interface{}) {
fail = true
c.Convey("The argument of Fail should be error", t, func() {
c.So(v, c.ShouldImplement, e)
})
})
r, err := p.Get()
time.Sleep(52 * time.Millisecond)
c.Convey("Should call the Fail and Always callbacks", func() {
c.So(r, c.ShouldEqual, nil)
c.So(err, c.ShouldNotBeNil)
c.So(done, c.ShouldEqual, false)
c.So(always, c.ShouldEqual, true)
c.So(fail, c.ShouldEqual, true)
})
})
c.Convey("When adding the callback after Promise is rejected", t, func() {
done, always, fail = false, false, false
p.OnSuccess(func(v interface{}) {
done = true
panic("Unexpected calling")
}).OnComplete(func(v interface{}) {
always = true
c.Convey("The argument of Always should be error", func() {
c.So(v, c.ShouldImplement, e)
})
}).OnFailure(func(v interface{}) {
fail = true
c.Convey("The argument of Fail should be error", func() {
c.So(v, c.ShouldImplement, e)
})
})
c.Convey("Should immediately run the Fail and Always callbacks", func() {
c.So(done, c.ShouldEqual, false)
c.So(always, c.ShouldEqual, true)
c.So(fail, c.ShouldEqual, true)
})
})
done, always, fail = false, false, false
p = NewPromise()
go func() {
<-time.After(timout)
p.Cancel()
}()
c.Convey("When Promise is cancelled", t, func() {
done, always, fail, cancel = false, false, false, false
p.OnSuccess(func(v interface{}) {
done = true
}).OnComplete(func(v interface{}) {
always = true
}).OnFailure(func(v interface{}) {
fail = true
}).OnCancel(func() {
cancel = true
})
r, err := p.Get()
time.Sleep(62 * time.Millisecond)
c.Convey("Only cancel callback be called", func() {
c.So(r, c.ShouldBeNil)
c.So(err, c.ShouldNotBeNil)
c.So(done, c.ShouldEqual, false)
c.So(always, c.ShouldEqual, false)
c.So(fail, c.ShouldEqual, false)
c.So(cancel, c.ShouldEqual, true)
})
})
c.Convey("When adding the callback after Promise is cancelled", t, func() {
done, always, fail, cancel = false, false, false, false
p.OnSuccess(func(v interface{}) {
done = true
}).OnComplete(func(v interface{}) {
always = true
}).OnFailure(func(v interface{}) {
fail = true
}).OnCancel(func() {
cancel = true
})
c.Convey("Should not call any callbacks", func() {
c.So(done, c.ShouldEqual, false)
c.So(always, c.ShouldEqual, false)
c.So(fail, c.ShouldEqual, false)
c.So(cancel, c.ShouldEqual, true)
})
})
}
func TestStart(t *testing.T) {
c.Convey("Test start func()", t, func() {
c.Convey("When task completed", func() {
f := Start(func() {})
r, err := f.Get()
c.So(r, c.ShouldBeNil)
c.So(err, c.ShouldBeNil)
})
c.Convey("When task panic error", func() {
f := Start(func() { panic("fail") })
r, err := f.Get()
c.So(r, c.ShouldBeNil)
c.So(err, c.ShouldNotBeNil)
})
})
c.Convey("Test start func()(interface{}, error)", t, func() {
c.Convey("When task completed", func() {
f := Start(func() (interface{}, error) {
time.Sleep(10)
return "ok", nil
})
r, err := f.Get()
c.So(r, c.ShouldEqual, "ok")
c.So(err, c.ShouldBeNil)
})
c.Convey("When task returned error", func() {
f := Start(func() (interface{}, error) {
time.Sleep(10)
return "fail", errors.New("fail")
})
r, err := f.Get()
c.So(r, c.ShouldBeNil)
c.So(err, c.ShouldNotBeNil)
})
c.Convey("When task panic error", func() {
f := Start(func() (interface{}, error) { panic("fail") })
r, err := f.Get()
c.So(r, c.ShouldBeNil)
c.So(err, c.ShouldNotBeNil)
})
})
c.Convey("Test start func(canceller Canceller)", t, func() {
c.Convey("When task completed", func() {
f := Start(func(canceller Canceller) {
time.Sleep(10)
})
r, err := f.Get()
c.So(r, c.ShouldBeNil)
c.So(err, c.ShouldBeNil)
})
c.Convey("When task be cancelled", func() {
f := Start(func(canceller Canceller) {
time.Sleep(10)
if canceller.IsCancelled() {
return
}
})
f.Cancel()
r, err := f.Get()
c.So(f.IsCancelled(), c.ShouldBeTrue)
c.So(r, c.ShouldBeNil)
c.So(err, c.ShouldEqual, CANCELLED)
c.So(f.IsCancelled(), c.ShouldBeTrue)
})
c.Convey("When task panic error", func() {
f := Start(func(canceller Canceller) { panic("fail") })
r, err := f.Get()
c.So(r, c.ShouldBeNil)
c.So(err, c.ShouldNotBeNil)
})
})
c.Convey("Test start func(canceller Canceller)(interface{}, error)", t, func() {
c.Convey("When task be cancenlled", func() {
task := func(canceller Canceller) (interface{}, error) {
i := 0
for i < 50 {
if canceller.IsCancelled() {
return nil, nil
}
time.Sleep(100 * time.Millisecond)
}
panic("exception")
}
f := Start(task)
f.Cancel()
r, err := f.Get()
c.So(f.IsCancelled(), c.ShouldBeTrue)
c.So(r, c.ShouldBeNil)
c.So(err, c.ShouldEqual, CANCELLED)
c.So(f.IsCancelled(), c.ShouldBeTrue)
})
c.Convey("When task panic error", func() {
f := Start(func(canceller Canceller) (interface{}, error) {
panic("fail")
})
r, err := f.Get()
c.So(r, c.ShouldBeNil)
c.So(err, c.ShouldNotBeNil)
})
})
}
func TestPipe(t *testing.T) {
timout := 50 * time.Millisecond
taskDonePipe := func(v interface{}) *Future {
return Start(func() (interface{}, error) {
<-time.After(timout)
return v.(string) + "2", nil
})
}
taskFailPipe := func() (interface{}, error) {
<-time.After(timout)
return "fail2", nil
}
c.Convey("When task completed", t, func() {
p := NewPromise()
go func() {
<-time.After(timout)
p.Resolve("ok")
}()
fu, ok := p.Pipe(taskDonePipe, taskFailPipe)
r, err := fu.Get()
c.Convey("the done callback will be called, the future returned by done callback will be returned as chain future", func() {
c.So(r, c.ShouldEqual, "ok2")
c.So(err, c.ShouldBeNil)
c.So(ok, c.ShouldEqual, true)
})
})
c.Convey("When task failed", t, func() {
p := NewPromise()
go func() {
<-time.After(timout)
p.Reject(errors.New("fail"))
}()
fu, ok := p.Pipe(taskDonePipe, taskFailPipe)
r, err := fu.Get()
c.Convey("the fail callback will be called, the future returned by fail callback will be returned as chain future", func() {
c.So(r, c.ShouldEqual, "fail2")
c.So(err, c.ShouldBeNil)
c.So(ok, c.ShouldEqual, true)
})
})
c.Convey("Test pipe twice", t, func() {
p := NewPromise()
pipeFuture1, ok1 := p.Pipe(taskDonePipe, taskFailPipe)
c.Convey("Calling Pipe succeed at first time", func() {
c.So(ok1, c.ShouldEqual, true)
})
pipeFuture2, ok2 := p.Pipe(taskDonePipe, taskFailPipe)
c.Convey("Calling Pipe succeed at second time", func() {
c.So(ok2, c.ShouldEqual, true)
})
p.Resolve("ok")
r, _ := pipeFuture1.Get()
c.Convey("Pipeline future 1 should return ok2", func() {
c.So(r, c.ShouldEqual, "ok2")
})
r2, _ := pipeFuture2.Get()
c.Convey("Pipeline future 2 should return ok2", func() {
c.So(r2, c.ShouldEqual, "ok2")
})
})
}
func TestWhenAny(t *testing.T) {
c.Convey("Test WhenAny", t, func() {
whenAnyTasks := func(t1 int, t2 int) *Future {
timeouts := []time.Duration{time.Duration(t1), time.Duration(t2)}
getTask := func(i int) func() (interface{}, error) {
return func() (interface{}, error) {
if timeouts[i] > 0 {
time.Sleep(timeouts[i] * time.Millisecond)
return "ok" + strconv.Itoa(i), nil
} else {
time.Sleep((-1 * timeouts[i]) * time.Millisecond)
return nil, newMyError("fail" + strconv.Itoa(i))
}
}
}
task0 := getTask(0)
task1 := getTask(1)
f := WhenAny(task0, task1)
return f
}
c.Convey("When all tasks completed, and task 1 be first to complete", func() {
r, err := whenAnyTasks(200, 250).Get()
c.So(r, c.ShouldEqual, "ok0")
c.So(err, c.ShouldBeNil)
})
c.Convey("When all tasks completed, and task 2 be first to complete", func() {
r, err := whenAnyTasks(280, 250).Get()
c.So(r, c.ShouldEqual, "ok1")
c.So(err, c.ShouldBeNil)
})
c.Convey("When all tasks failed", func() {
r, err := whenAnyTasks(-280, -250).Get()
errs := err.(*NoMatchedError).Results
c.So(r, c.ShouldBeNil)
c.So(errs[0].(*myError).val, c.ShouldEqual, "fail0")
c.So(errs[1].(*myError).val, c.ShouldEqual, "fail1")
})
c.Convey("When one task completed", func() {
r, err := whenAnyTasks(-280, 150).Get()
c.So(r, c.ShouldEqual, "ok1")
c.So(err, c.ShouldBeNil)
})
c.Convey("When no task be passed", func() {
r, err := WhenAny().Get()
c.So(r, c.ShouldBeNil)
c.So(err, c.ShouldBeNil)
})
})
c.Convey("Test WhenAny, and task can be cancelled", t, func() {
var c1, c2 bool
whenAnyCanCancelTasks := func(t1 int, t2 int) *Future {
timeouts := []time.Duration{time.Duration(t1), time.Duration(t2)}
getTask := func(i int) func(canceller Canceller) (interface{}, error) {
return func(canceller Canceller) (interface{}, error) {
for j := 0; j < 10; j++ {
if timeouts[i] > 0 {
time.Sleep(timeouts[i] * time.Millisecond)
} else {
time.Sleep((-1 * timeouts[i]) * time.Millisecond)
}
if canceller.IsCancelled() {
if i == 0 {
c1 = true
} else {
c2 = true
}
return nil, nil
}
}
if timeouts[i] > 0 {
return "ok" + strconv.Itoa(i), nil
} else {
return nil, newMyError("fail" + strconv.Itoa(i))
}
}
}
task0 := getTask(0)
task1 := getTask(1)
f := WhenAny(Start(task0), Start(task1))
return f
}
c.Convey("When task 1 is the first to complete, task 2 will be cancelled", func() {
r, err := whenAnyCanCancelTasks(10, 250).Get()
c.So(r, c.ShouldEqual, "ok0")
c.So(err, c.ShouldBeNil)
time.Sleep(1000 * time.Millisecond)
c.So(c2, c.ShouldEqual, true)
})
c.Convey("When task 2 is the first to complete, task 1 will be cancelled", func() {
r, err := whenAnyCanCancelTasks(200, 10).Get()
c.So(r, c.ShouldEqual, "ok1")
c.So(err, c.ShouldBeNil)
time.Sleep(1000 * time.Millisecond)
c.So(c1, c.ShouldEqual, true)
})
})
}
func TestWhenAnyTrue(t *testing.T) {
c1, c2 := false, false
startTwoCanCancelTask := func(t1 int, t2 int, predicate func(interface{}) bool) *Future {
timeouts := []time.Duration{time.Duration(t1), time.Duration(t2)}
getTask := func(i int) func(canceller Canceller) (interface{}, error) {
return func(canceller Canceller) (interface{}, error) {
for j := 0; j < 10; j++ {
if timeouts[i] > 0 {
time.Sleep(timeouts[i] * time.Millisecond)
} else {
time.Sleep((-1 * timeouts[i]) * time.Millisecond)
}
if canceller.IsCancelled() {
if i == 0 {
c1 = true
} else {
c2 = true
}
return nil, nil
}
}
if timeouts[i] > 0 {
return "ok" + strconv.Itoa(i), nil
} else {
return nil, newMyError("fail" + strconv.Itoa(i))
}
}
}
task0 := getTask(0)
task1 := getTask(1)
f := WhenAnyMatched(predicate, Start(task0), Start(task1))
return f
}
//第一个任务先完成,第二个后完成,并且设定条件为返回值==第一个的返回值
c.Convey("When the task1 is the first to complete, and predicate returns true", t, func() {
r, err := startTwoCanCancelTask(30, 250, func(v interface{}) bool {
return v.(string) == "ok0"
}).Get()
c.So(r, c.ShouldEqual, "ok0")
c.So(err, c.ShouldBeNil)
time.Sleep(1000 * time.Millisecond)
c.So(c2, c.ShouldEqual, true)
})
//第一个任务后完成,第二个先完成,并且设定条件为返回值==第二个的返回值
c.Convey("When the task2 is the first to complete, and predicate returns true", t, func() {
c1, c2 = false, false
r, err := startTwoCanCancelTask(230, 50, func(v interface{}) bool {
return v.(string) == "ok1"
}).Get()
c.So(r, c.ShouldEqual, "ok1")
c.So(err, c.ShouldBeNil)
time.Sleep(1000 * time.Millisecond)
c.So(c1, c.ShouldEqual, true)
})
//第一个任务后完成,第二个先完成,并且设定条件为返回值不等于任意一个任务的返回值
c.Convey("When the task2 is the first to complete, and predicate always returns false", t, func() {
c1, c2 = false, false
r, err := startTwoCanCancelTask(30, 250, func(v interface{}) bool {
return v.(string) == "ok11"
}).Get()
_, ok := err.(*NoMatchedError)
c.So(r, c.ShouldBeNil)
c.So(ok, c.ShouldBeTrue)
c.So(err, c.ShouldNotBeNil)
time.Sleep(1000 * time.Millisecond)
c.So(c1, c.ShouldEqual, false)
c.So(c2, c.ShouldEqual, false)
})
//c.Convey("When all tasks be cancelled", t, func() {
// getTask := func(canceller Canceller) (interface{}, error) {
// for {
// time.Sleep(50 * time.Millisecond)
// if canceller.IsCancellationRequested() {
// canceller.Cancel()
// return nil, nil
// }
// }
// }
// f1 := Start(getTask)
// f2 := Start(getTask)
// f3 := WhenAnyMatched(nil, f1, f2)
// f1.RequestCancel()
// f2.RequestCancel()
// r, _ := f3.Get()
// c.So(r, c.ShouldBeNil)
//})
}
func TestWhenAll(t *testing.T) {
startTwoTask := func(t1 int, t2 int) (f *Future) {
timeouts := []time.Duration{time.Duration(t1), time.Duration(t2)}
getTask := func(i int) func() (interface{}, error) {
return func() (interface{}, error) {
if timeouts[i] > 0 {
time.Sleep(timeouts[i] * time.Millisecond)
return "ok" + strconv.Itoa(i), nil
} else {
time.Sleep((-1 * timeouts[i]) * time.Millisecond)
return nil, newMyError("fail" + strconv.Itoa(i))
}
}
}
task0 := getTask(0)
task1 := getTask(1)
f = WhenAll(task0, task1)
return f
}
c.Convey("Test WhenAllFuture", t, func() {
whenTwoTask := func(t1 int, t2 int) *Future {
return startTwoTask(t1, t2)
}
c.Convey("When all tasks completed, and the task1 is the first to complete", func() {
r, err := whenTwoTask(200, 230).Get()
c.So(r, shouldSlicesReSame, []interface{}{"ok0", "ok1"})
c.So(err, c.ShouldBeNil)
})
c.Convey("When all tasks completed, and the task1 is the first to complete", func() {
r, err := whenTwoTask(230, 200).Get()
c.So(r, shouldSlicesReSame, []interface{}{"ok0", "ok1"})
c.So(err, c.ShouldBeNil)
})
c.Convey("When task1 failed, but task2 is completed", func() {
r, err := whenTwoTask(-250, 210).Get()
c.So(err.(*AggregateError).InnerErrs[0].(*myError).val, c.ShouldEqual, "fail0")
c.So(r, c.ShouldBeNil)
})
c.Convey("When all tasks failed", func() {
r, err := whenTwoTask(-250, -110).Get()
c.So(err.(*AggregateError).InnerErrs[0].(*myError).val, c.ShouldEqual, "fail1")
c.So(r, c.ShouldBeNil)
})
c.Convey("When no task be passed", func() {
r, err := whenAllFuture().Get()
c.So(r, shouldSlicesReSame, []interface{}{})
c.So(err, c.ShouldBeNil)
})
c.Convey("When all tasks be cancelled", func() {
getTask := func(canceller Canceller) (interface{}, error) {
for {
time.Sleep(50 * time.Millisecond)
if canceller.IsCancelled() {
return nil, nil
}
}
}
f1 := Start(getTask)
f2 := Start(getTask)
f3 := WhenAll(f1, f2)
f1.Cancel()
f2.Cancel()
r, _ := f3.Get()
c.So(r, c.ShouldBeNil)
})
})
}
func TestWrap(t *testing.T) {
c.Convey("Test Wrap a value", t, func() {
r, err := Wrap(10).Get()
c.So(r, c.ShouldEqual, 10)
c.So(err, c.ShouldBeNil)
})
}
func shouldSlicesReSame(actual interface{}, expected ...interface{}) string {
actualSlice, expectedSlice := reflect.ValueOf(actual), reflect.ValueOf(expected[0])
if actualSlice.Kind() != expectedSlice.Kind() {
return fmt.Sprintf("Expected1: '%v'\nActual: '%v'\n", expected[0], actual)
}
if actualSlice.Kind() != reflect.Slice {
return fmt.Sprintf("Expected2: '%v'\nActual: '%v'\n", expected[0], actual)
}
if actualSlice.Len() != expectedSlice.Len() {
return fmt.Sprintf("Expected3: '%v'\nActual: '%v'\n", expected[0], actual)
}
for i := 0; i < actualSlice.Len(); i++ {
if !reflect.DeepEqual(actualSlice.Index(i).Interface(), expectedSlice.Index(i).Interface()) {
return fmt.Sprintf("Expected4: '%v'\nActual: '%v'\n", expected[0], actual)
}
}
return ""
}

View File

@ -0,0 +1,116 @@
package promise
import (
"math/rand"
"unsafe"
)
var (
CANCELLED error = &CancelledError{}
)
//CancelledError present the Future object is cancelled.
type CancelledError struct {
}
func (e *CancelledError) Error() string {
return "Task be cancelled"
}
//resultType present the type of Future final status.
type resultType int
const (
RESULT_SUCCESS resultType = iota
RESULT_FAILURE
RESULT_CANCELLED
)
//PromiseResult presents the result of a promise.
//If Typ is RESULT_SUCCESS, Result field will present the returned value of Future task.
//If Typ is RESULT_FAILURE, Result field will present a related error .
//If Typ is RESULT_CANCELLED, Result field will be null.
type PromiseResult struct {
Result interface{} //result of the Promise
Typ resultType //success, failure, or cancelled?
}
//Promise presents an object that acts as a proxy for a result.
//that is initially unknown, usually because the computation of its
//value is yet incomplete (refer to wikipedia).
//You can use Resolve/Reject/Cancel to set the final result of Promise.
//Future can return a read-only placeholder view of result.
type Promise struct {
*Future
}
//Cancel sets the status of promise to RESULT_CANCELLED.
//If promise is cancelled, Get() will return nil and CANCELLED error.
//All callback functions will be not called if Promise is cancalled.
func (this *Promise) Cancel() (e error) {
return this.Future.Cancel()
}
//Resolve sets the value for promise, and the status will be changed to RESULT_SUCCESS.
//if promise is resolved, Get() will return the value and nil error.
func (this *Promise) Resolve(v interface{}) (e error) {
return this.setResult(&PromiseResult{v, RESULT_SUCCESS})
}
//Resolve sets the error for promise, and the status will be changed to RESULT_FAILURE.
//if promise is rejected, Get() will return nil and the related error value.
func (this *Promise) Reject(err error) (e error) {
return this.setResult(&PromiseResult{err, RESULT_FAILURE})
}
//OnSuccess registers a callback function that will be called when Promise is resolved.
//If promise is already resolved, the callback will immediately called.
//The value of Promise will be paramter of Done callback function.
func (this *Promise) OnSuccess(callback func(v interface{})) *Promise {
this.Future.OnSuccess(callback)
return this
}
//OnFailure registers a callback function that will be called when Promise is rejected.
//If promise is already rejected, the callback will immediately called.
//The error of Promise will be paramter of Fail callback function.
func (this *Promise) OnFailure(callback func(v interface{})) *Promise {
this.Future.OnFailure(callback)
return this
}
//OnComplete register a callback function that will be called when Promise is rejected or resolved.
//If promise is already rejected or resolved, the callback will immediately called.
//According to the status of Promise, value or error will be paramter of Always callback function.
//Value is the paramter if Promise is resolved, or error is the paramter if Promise is rejected.
//Always callback will be not called if Promise be called.
func (this *Promise) OnComplete(callback func(v interface{})) *Promise {
this.Future.OnComplete(callback)
return this
}
//OnCancel registers a callback function that will be called when Promise is cancelled.
//If promise is already cancelled, the callback will immediately called.
func (this *Promise) OnCancel(callback func()) *Promise {
this.Future.OnCancel(callback)
return this
}
//NewPromise is factory function for Promise
func NewPromise() *Promise {
val := &futureVal{
make([]func(v interface{}), 0, 8),
make([]func(v interface{}), 0, 8),
make([]func(v interface{}), 0, 4),
make([]func(), 0, 2),
make([]*pipe, 0, 4), nil,
}
f := &Promise{
&Future{
rand.Int(),
make(chan struct{}),
unsafe.Pointer(val),
},
}
return f
}

View File

@ -0,0 +1,251 @@
package promise
import (
"bytes"
"errors"
"fmt"
"runtime"
"strconv"
)
//NoMatchedError presents no future that returns matched result in WhenAnyTrue function.
type NoMatchedError struct {
Results []interface{}
}
func (e *NoMatchedError) Error() string {
return "No matched future"
}
func (e *NoMatchedError) HasError() bool {
for _, ie := range e.Results {
if _, ok1 := ie.(error); ok1 {
return true
}
}
return false
}
func newNoMatchedError(results []interface{}) *NoMatchedError {
return &NoMatchedError{results}
}
func newNoMatchedError1(e interface{}) *NoMatchedError {
return &NoMatchedError{[]interface{}{e}}
}
//AggregateError aggregate multi errors into an error
type AggregateError struct {
s string
InnerErrs []error
}
func (e *AggregateError) Error() string {
if e.InnerErrs == nil {
return e.s
} else {
buf := bytes.NewBufferString(e.s)
buf.WriteString("\n\n")
for i, ie := range e.InnerErrs {
if ie == nil {
continue
}
buf.WriteString("error appears in Future ")
buf.WriteString(strconv.Itoa(i))
buf.WriteString(": ")
buf.WriteString(ie.Error())
buf.WriteString("\n")
}
buf.WriteString("\n")
return buf.String()
}
}
func newAggregateError(s string, innerErrors []error) *AggregateError {
return &AggregateError{newErrorWithStacks(s).Error(), innerErrors}
}
func newAggregateError1(s string, e interface{}) *AggregateError {
return &AggregateError{newErrorWithStacks(s).Error(), []error{getError(e)}}
}
func newErrorWithStacks(i interface{}) (e error) {
err := getError(i)
buf := bytes.NewBufferString(err.Error())
buf.WriteString("\n")
pcs := make([]uintptr, 50)
num := runtime.Callers(2, pcs)
for _, v := range pcs[0:num] {
fun := runtime.FuncForPC(v)
file, line := fun.FileLine(v)
name := fun.Name()
//fmt.Println(name, file + ":", line)
writeStrings(buf, []string{name, " ", file, ":", strconv.Itoa(line), "\n"})
}
return errors.New(buf.String())
}
func getAct(pr *Promise, act interface{}) (f func() (r interface{}, err error)) {
var (
act1 func() (interface{}, error)
act2 func(Canceller) (interface{}, error)
)
canCancel := false
//convert the act to the function that has return value and error if act function haven't return value and error
switch v := act.(type) {
case func() (interface{}, error):
act1 = v
case func(Canceller) (interface{}, error):
canCancel = true
act2 = v
case func():
act1 = func() (interface{}, error) {
v()
return nil, nil
}
case func(Canceller):
canCancel = true
act2 = func(canceller Canceller) (interface{}, error) {
v(canceller)
return nil, nil
}
default:
if e, ok := v.(error); !ok {
pr.Resolve(v)
} else {
pr.Reject(e)
}
return nil
}
//If paramters of act function has a Canceller interface, the Future will can be cancelled.
var canceller Canceller = nil
if pr != nil && canCancel {
//pr.EnableCanceller()
canceller = pr.Canceller()
}
//return proxy function of act function
f = func() (r interface{}, err error) {
defer func() {
if e := recover(); e != nil {
err = newErrorWithStacks(e)
}
}()
if canCancel {
r, err = act2(canceller)
} else {
r, err = act1()
}
return
}
return
}
func startPipe(r *PromiseResult, pipeTask func(v interface{}) *Future, pipePromise *Promise) {
//处理链式异步任务
if pipeTask != nil {
f := pipeTask(r.Result)
f.OnSuccess(func(v interface{}) {
pipePromise.Resolve(v)
}).OnFailure(func(v interface{}) {
pipePromise.Reject(getError(v))
})
}
}
func getFutureReturnVal(r *PromiseResult) (interface{}, error) {
if r.Typ == RESULT_SUCCESS {
return r.Result, nil
} else if r.Typ == RESULT_FAILURE {
return nil, getError(r.Result)
} else {
return nil, getError(r.Result) //&CancelledError{}
}
}
//执行回调函数
func execCallback(r *PromiseResult,
dones []func(v interface{}),
fails []func(v interface{}),
always []func(v interface{}),
cancels []func()) {
if r.Typ == RESULT_CANCELLED {
for _, f := range cancels {
func() {
defer func() {
if e := recover(); e != nil {
err := newErrorWithStacks(e)
fmt.Println("error happens:\n ", err)
}
}()
f()
}()
}
return
}
var callbacks []func(v interface{})
if r.Typ == RESULT_SUCCESS {
callbacks = dones
} else {
callbacks = fails
}
forFs := func(s []func(v interface{})) {
forSlice(s, func(f func(v interface{})) { f(r.Result) })
}
forFs(callbacks)
forFs(always)
}
func forSlice(s []func(v interface{}), f func(func(v interface{}))) {
for _, e := range s {
func() {
defer func() {
if e := recover(); e != nil {
err := newErrorWithStacks(e)
fmt.Println("error happens:\n ", err)
}
}()
f(e)
}()
}
}
//Error handling struct and functions------------------------------
type stringer interface {
String() string
}
func getError(i interface{}) (e error) {
if i != nil {
switch v := i.(type) {
case error:
e = v
case string:
e = errors.New(v)
default:
if s, ok := i.(stringer); ok {
e = errors.New(s.String())
} else {
e = errors.New(fmt.Sprintf("%v", i))
}
}
}
return
}
func writeStrings(buf *bytes.Buffer, strings []string) {
for _, s := range strings {
buf.WriteString(s)
}
}