From f7926ec9a4320cc708f816a964b237d3c3587a10 Mon Sep 17 00:00:00 2001 From: Ulises Jeremias Cornejo Fandos Date: Thu, 2 Dec 2021 06:15:07 -0300 Subject: [PATCH] vlib/context: add onecontext as submodule (#12549) --- cmd/tools/vtest-self.v | 1 + vlib/context/cancel.v | 3 +- vlib/context/deadline.v | 7 +- vlib/context/onecontext/README.md | 40 +++++ vlib/context/onecontext/onecontext.v | 146 ++++++++++++++++++ vlib/context/onecontext/onecontext_test.v | 175 ++++++++++++++++++++++ 6 files changed, 369 insertions(+), 3 deletions(-) create mode 100644 vlib/context/onecontext/README.md create mode 100644 vlib/context/onecontext/onecontext.v create mode 100644 vlib/context/onecontext/onecontext_test.v diff --git a/cmd/tools/vtest-self.v b/cmd/tools/vtest-self.v index fc654df05..65a9316b9 100644 --- a/cmd/tools/vtest-self.v +++ b/cmd/tools/vtest-self.v @@ -8,6 +8,7 @@ const github_job = os.getenv('GITHUB_JOB') const ( skip_test_files = [ + 'vlib/context/onecontext/onecontext_test.v', 'vlib/context/deadline_test.v' /* sometimes blocks */, 'vlib/mysql/mysql_orm_test.v' /* mysql not installed */, 'vlib/pg/pg_orm_test.v' /* pg not installed */, diff --git a/vlib/context/cancel.v b/vlib/context/cancel.v index 323df2e92..86fd9379c 100644 --- a/vlib/context/cancel.v +++ b/vlib/context/cancel.v @@ -51,9 +51,10 @@ mut: pub fn with_cancel(mut parent Context) (Context, CancelFn) { mut c := new_cancel_context(parent) propagate_cancel(mut parent, mut c) - return Context(c), fn [mut c] () { + cancel_fn := fn [mut c] () { c.cancel(true, canceled) } + return Context(c), CancelFn(cancel_fn) } // new_cancel_context returns an initialized CancelContext. diff --git a/vlib/context/deadline.v b/vlib/context/deadline.v index 10f6c68f5..e44810de7 100644 --- a/vlib/context/deadline.v +++ b/vlib/context/deadline.v @@ -44,9 +44,10 @@ pub fn with_deadline(mut parent Context, d time.Time) (Context, CancelFn) { dur := d - time.now() if dur.nanoseconds() <= 0 { ctx.cancel(true, deadline_exceeded) // deadline has already passed - return Context(ctx), fn [mut ctx] () { + cancel_fn := fn [mut ctx] () { ctx.cancel(true, canceled) } + return Context(ctx), CancelFn(cancel_fn) } if ctx.err() is none { @@ -55,9 +56,11 @@ pub fn with_deadline(mut parent Context, d time.Time) (Context, CancelFn) { ctx.cancel(true, deadline_exceeded) }(mut ctx, dur) } - return Context(ctx), fn [mut ctx] () { + + cancel_fn := fn [mut ctx] () { ctx.cancel(true, canceled) } + return Context(ctx), CancelFn(cancel_fn) } // with_timeout returns with_deadline(parent, time.now().add(timeout)). diff --git a/vlib/context/onecontext/README.md b/vlib/context/onecontext/README.md new file mode 100644 index 000000000..79b2bd195 --- /dev/null +++ b/vlib/context/onecontext/README.md @@ -0,0 +1,40 @@ +# onecontext + +A library to merge existing V contexts. + +## Overview + +Have you ever faced the situation where you have to merge multiple existing contexts? +If not, then you might, eventually. + +For example, we can face the situation where we are building an application +using a library that gives us a global context. +This context expires once the application is stopped. + +Meanwhile, we are exposing a service like this: + +```v ignore +fn (f Foo) get(ctx context.Context, bar Bar) ?Baz { + . . . +} +``` + +Here, we receive another context provided by the service. + +Then, in the `get` implementation, we want for example to query a database and +we must provide a context for that. + +Ideally, we would like to provide a merged context that would expire either: + +- When the application is stopped +- Or when the received service context expires + +This is exactly the purpose of this library. + +In our case, we can now merge the two contexts in a single one like this: + +```v ignore +ctx, cancel := onecontext.merge(ctx1, ctx2) +``` + +This returns a merged context that we can now propagate diff --git a/vlib/context/onecontext/onecontext.v b/vlib/context/onecontext/onecontext.v new file mode 100644 index 000000000..9de76069b --- /dev/null +++ b/vlib/context/onecontext/onecontext.v @@ -0,0 +1,146 @@ +module onecontext + +import context +import sync +import time + +// canceled is the error returned when the cancel function is called on a merged context +pub const canceled = error('canceled context') + +struct OneContext { +mut: + ctx context.Context + ctxs []context.Context + done chan int + err IError = none + err_mutex sync.Mutex + cancel_fn context.CancelFn + cancel_ctx context.Context +} + +// merge allows to merge multiple contexts +// it returns the merged context +pub fn merge(ctx context.Context, ctxs ...context.Context) (context.Context, context.CancelFn) { + mut background := context.background() + cancel_ctx, cancel := context.with_cancel(mut &background) + mut octx := &OneContext{ + done: chan int{cap: 3} + ctx: ctx + ctxs: ctxs + cancel_fn: cancel + cancel_ctx: cancel_ctx + } + go octx.run() + return context.Context(octx), context.CancelFn(cancel) +} + +pub fn (octx OneContext) deadline() ?time.Time { + mut min := time.Time{} + + if deadline := octx.ctx.deadline() { + min = deadline + } + + for ctx in octx.ctxs { + if deadline := ctx.deadline() { + if min.unix_time() == 0 || deadline < min { + min = deadline + } + } + } + + if min.unix_time() == 0 { + return none + } + + return min +} + +pub fn (octx OneContext) done() chan int { + return octx.done +} + +pub fn (mut octx OneContext) err() IError { + octx.err_mutex.@lock() + defer { + octx.err_mutex.unlock() + } + return octx.err +} + +pub fn (octx OneContext) value(key context.Key) ?context.Any { + if value := octx.ctx.value(key) { + return value + } + + for ctx in octx.ctxs { + if value := ctx.value(key) { + return value + } + } + + return none +} + +pub fn (mut octx OneContext) run() { + mut wrapped_ctx := &octx.ctx + if octx.ctxs.len == 1 { + mut first_ctx := &octx.ctxs[0] + octx.run_two_contexts(mut wrapped_ctx, mut first_ctx) + return + } + + octx.run_multiple_contexts(mut wrapped_ctx) + for mut ctx in octx.ctxs { + octx.run_multiple_contexts(mut &ctx) + } +} + +pub fn (octx OneContext) str() string { + return '' +} + +pub fn (mut octx OneContext) cancel(err IError) { + octx.cancel_fn() + octx.err_mutex.@lock() + octx.err = err + octx.err_mutex.unlock() + if !octx.done.closed { + octx.done <- 0 + octx.done.close() + } +} + +pub fn (mut octx OneContext) run_two_contexts(mut ctx1 context.Context, mut ctx2 context.Context) { + go fn (mut octx OneContext, mut ctx1 context.Context, mut ctx2 context.Context) { + octx_cancel_done := octx.cancel_ctx.done() + c1done := ctx1.done() + c2done := ctx2.done() + select { + _ := <-octx_cancel_done { + octx.cancel(onecontext.canceled) + } + _ := <-c1done { + octx.cancel(ctx1.err()) + } + _ := <-c2done { + octx.cancel(ctx1.err()) + } + } + }(mut &octx, mut &ctx1, mut &ctx2) +} + +pub fn (mut octx OneContext) run_multiple_contexts(mut ctx context.Context) { + go fn (mut octx OneContext, mut ctx context.Context) { + octx_cancel_done := octx.cancel_ctx.done() + cdone := ctx.done() + select { + _ := <-octx_cancel_done { + octx.cancel(onecontext.canceled) + } + _ := <-cdone { + octx.cancel(ctx.err()) + } + } + }(mut &octx, mut &ctx) +} diff --git a/vlib/context/onecontext/onecontext_test.v b/vlib/context/onecontext/onecontext_test.v new file mode 100644 index 000000000..4bee5fe1c --- /dev/null +++ b/vlib/context/onecontext/onecontext_test.v @@ -0,0 +1,175 @@ +module onecontext + +import context +import time + +fn eventually(ch chan int) bool { + mut background := context.background() + mut timeout, cancel := context.with_timeout(mut &background, 30 * time.millisecond) + defer { + cancel() + } + + tdone := timeout.done() + select { + _ := <-ch { + return true + } + _ := <-tdone { + return false + } + } + + return false +} + +struct Value { + val string +} + +fn test_merge_nomilan() { + mut background := context.background() + foo := &Value{ + val: 'foo' + } + mut value_ctx1 := context.with_value(background, 'foo', foo) + mut ctx1, cancel := context.with_cancel(mut &value_ctx1) + defer { + cancel() + } + + bar := &Value{ + val: 'bar' + } + mut value_ctx2 := context.with_value(background, 'bar', bar) + mut ctx2, _ := context.with_cancel(mut &value_ctx2) + + mut ctx, cancel2 := merge(ctx1, ctx2) + + if deadline := ctx.deadline() { + panic('this should never happen') + } + + val1 := ctx.value('foo') or { panic('wrong value access for key `foo`') } + match val1 { + Value { + assert foo == val1 + } + else { + assert false + } + } + + val2 := ctx.value('bar') or { panic('wrong value access for key `bar`') } + match val2 { + Value { + assert bar == val2 + } + else { + assert false + } + } + + if _ := ctx.value('baz') { + panic('this should never happen') + } + + assert !eventually(ctx.done()) + assert ctx.err() is none + + cancel2() + assert eventually(ctx.done()) + assert ctx.err() is Error +} + +fn test_merge_deadline_context_1() { + mut background := context.background() + mut ctx1, cancel := context.with_timeout(mut &background, time.second) + defer { + cancel() + } + ctx2 := context.background() + mut ctx, _ := merge(ctx1, ctx2) + + if deadline := ctx.deadline() { + assert deadline.unix_time() != 0 + } else { + panic('this should never happen') + } +} + +fn test_merge_deadline_context_2() { + mut background := context.background() + ctx1 := context.background() + mut ctx2, cancel := context.with_timeout(mut &background, time.second) + defer { + cancel() + } + mut ctx, _ := merge(ctx1, ctx2) + + if deadline := ctx.deadline() { + assert deadline.unix_time() != 0 + } else { + panic('this should never happen') + } +} + +fn test_merge_deadline_context_n() { + mut background := context.background() + ctx1 := context.background() + + mut ctxs := []context.Context{cap: 21} + for i in 0 .. 10 { + ctxs << context.background() + } + mut ctx_n, _ := context.with_timeout(mut &background, time.second) + ctxs << ctx_n + + for i in 0 .. 10 { + ctxs << context.background() + } + + mut ctx, cancel := merge(ctx1, ...ctxs) + + assert !eventually(ctx.done()) + assert ctx.err() is none + cancel() + assert eventually(ctx.done()) + assert ctx.err() is Error +} + +fn test_merge_deadline_none() { + ctx1 := context.background() + ctx2 := context.background() + + mut ctx, _ := merge(ctx1, ctx2) + + if _ := ctx.deadline() { + panic('this should never happen') + } +} + +fn test_merge_cancel_two() { + ctx1 := context.background() + ctx2 := context.background() + + mut ctx, cancel := merge(ctx1, ctx2) + cancel() + + assert eventually(ctx.done()) + assert ctx.err() is Error + assert ctx.err().str() == 'canceled context' +} + +fn test_merge_cancel_multiple() { + ctx1 := context.background() + ctx2 := context.background() + ctx3 := context.background() + + mut ctx, cancel := merge(ctx1, ctx2, ctx3) + cancel() + + assert eventually(ctx.done()) + assert ctx.err() is Error + assert ctx.err().str() == 'canceled context' +} -- 2.30.2