system.reactive - Creating Multiple Timers with Reactive Extensions -
i've got simple class using poll directory new files. it's got location, time start monitoring location, , interval (in hours) when check again:
public class thing { public string name {get; set;} public uri uri { get; set;} public datetimeoffset starttime {get; set;} public double interval {get; set;} }
i new reactive extensions, think right tool job here. @ start time, , on every subsequent interval, want call web service heavy lifting - we'll use ever inventive public bool dowork(uri uri)
represent that.
edit: dowork call web service check new files , move them if necessary, execution should async. returns true if completed, false if not.
things complicated if have whole collection of these thing
s. can't wrap head around how create observable.timer()
each one, , have them call same method.
edit2: observable.timer(datetimeoffset, timespan) seems perfect create iobservable i'm trying here. thoughts?
do need there many timers? assume if have collection of 20 things, create 20 timers fire @ same point in time? on same thread/scheduler?
or perhaps want dowork
foreach thing @ every period?
i.e.
from thing in things x in observable.interval(thing.interval) select dowork(thing.uri)
vs.
observable.interval(interval) .select(_=> { foreach(var thing in things) { dowork(thing); } })
there many ways can work in future.
- you can use scheduler directly schedule work done in future.
- you can use observable.timer have sequence produces 1 value in specified time in future.
- you can use observable.interval have sequence produces many values each specified time period apart.
so introduces question. if have polling time 60seconds , work function takes 5 seconds; should next poll happen in 55 seconds or in 60 seconds? here 1 answer indicates want use rx sequence, other indicates want use periodic scheudling.
next question is, dowork return value? looks not*. in case think appropriate thing leverage periodic schedulers (assuming rx v2).
var things = new []{ new thing{name="google", uri = new uri("http://google.com"), starttime=datetimeoffset.now.addseconds(1), interval=3}, new thing{name="bing", uri = new uri("http://bing.com"), starttime=datetimeoffset.now.addseconds(1), interval=3} }; var scheduler = scheduler.default; var scheduledwork = new compositedisposable(); foreach (var thing in things) { scheduledwork.add( scheduler.scheduleperiodic(thing, timespan.fromseconds(thing.interval), t=>dowork(t.uri))); } //just showing can cancel i.e. clean resources. scheduler.schedule(timespan.fromseconds(10), ()=>scheduledwork.dispose());
this schedule each thing processed periodically (without drift), on it's own interval , provide cancellation.
we can upgrade query if like
var scheduledwork = thing in things select scheduler.scheduleperiodic(thing, timespan.fromseconds(thing.interval), t=>dowork(t.uri)); var work = new compositedisposable(scheduledwork);
the problem these query don't fulfil starttime
requirement. annoyingly ccheduler.scheduleperiodic
method not provide overload have start offset.
the observable.timer
operator provide this. internally leverage non-drifting scheduling features. reconstruct query observable.timer
can following.
var uristopoll = thing in things.toobservable() _ in observable.timer(thing.starttime, timespan.fromseconds(thing.interval)) select thing; var subscription = uristopoll.subscribe(t=>dowork(t.uri));
so have nice interface should avoid drift. however, think work here done in serial manner (if many dowork actions called concurrently).
*ideally try avoid side effect statements not 100% sure requirements.
edit appears calls dowork must in parallel, need bit more. ideally make dowork asnyc, if cant can fake till make it.
var polling = thing in things.toobservable() _ in observable.timer(thing.starttime, timespan.fromseconds(thing.interval)) result in observable.start(()=>dowork(thing.uri)) select result; var subscription = polling.subscribe(); //ignore bool results?
Comments
Post a Comment