Everything negative-pressure, challenges-is all an opportunity for me to rise
26 July 2020
这篇文章讨论以下Dart中的Stream。
在Dart中,如果我们向要返回多个值,可以使用Iterable
和 Stream
。
当每个值都需要经过耗时计算而得(比如每次计算耗时1s),如果使用Iterable
,可以这样:
1
2
3
4
5
6
Iterable<int> genSnc() sync* {
for (var i = 0; i < 3; i++) {
sleep(Duration(seconds: 1));
yield i;
}
}
但是这里的sleep
是占用CPU的 Blocking 操作,运行这段代码会阻塞主线程,这在Flutter中是不允许的。
于是我们想到可以使用异步计算替代同步计算,这时候我们就需要使用Stream
来替代Iterable
:
1
2
3
4
5
6
Stream<int> genAsync() async* {
for (var i = 0; i < 3; i++) {
await Future.delayed(Duration(seconds: 1));
yield i;
}
}
Stream
类似于Iterable
,是一个冷流,这一味这上述的 Asynchronous generator 只有当Stream
开始 listen
或 await for
的时候才会执行(Synchronous generator 只有当 Iterable
开始迭代的时候才会执行)。
当我们需要从Stream
获取值时,除了使用 Stream API,还可以使用async
和async for
(asynchronous for loop):
1
2
3
4
5
void forEachStream() async {
await for(var value in genAsync()) {
print(value);
}
}
该for循环直到Stream
emit一个值时一直处于wait状态;当Stream
emit一个值后,执行循环体并将value
设置为emit的值;然后这样循环执行直到Stream
处于closed状态。
for循环的break
或return
将打破for循环并取消订阅流。
除了使用Asynchronous generator, 还可以使用StreamController
来创建Stream
。StreamController
可以创建两种类型的Stream
:
Stream
:1
2
3
4
5
final controller = StreamController<int>();
final stream = controller.stream;
stream.listen(print);
stream.listen(print);
controller.add(10);
这将抛出异常:
1
2
3
4
5
6
7
8
Unhandled exception:
Bad state: Stream has already been listened to.
#0 _StreamController._subscribe (dart:async/stream_controller.dart:710:7)
#1 _ControllerStream._createSubscription (dart:async/stream_controller.dart:860:19)
#2 _StreamImpl.listen (dart:async/stream_impl.dart:493:9)
#3 main (file:///home/xianxueliang/IdeaProjects/dart_app/bin/dart_app.dart:9:10)
#4 _startIsolate.<anonymous closure> (dart:isolate-patch/isolate_patch.dart:299:32)
#5 _RawReceivePortImpl._handleMessage (dart:isolate-patch/isolate_patch.dart:168:12)
Stream
(broadcast stream):1
2
3
4
5
final controller = StreamController<int>.broadcast();
final stream = controller.stream;
stream.listen(print);
stream.listen(print);
controller.add(10);
这将输出:
1
2
10
10
当你创建一个 StreamController
的时候,有一个Named Parameter-sync
来控制StreamController
是Sync还是Async,Sync StreamController相比Async StreamController的唯一优势在于低延迟:当向Stream
添加一个event的时候,Async StreamController会将这个Event包装成为_DelayedEvent
,然后通过scheduleMicrotask
异步调度这个_DelayedEvent
将其转发给Subscriber,所以会有一定的延迟;和Async StreamController相反,Sync StreamController会立刻将event转发给Subscriber,延迟只因程序的执行而产生,所以可以视为无延迟。
Note:官方推荐使用 non-sync 版本。
和Iterable
一样,Stream
也需要一些中间操作来丰富自己-如Stream
中events的map
,filter
等等。当然,这些都已经存在于Stream
的核心API中,但是我们可能还需要类似 timestamp
(包装原始event,使其增加时间辍)等等这些拓展API,这时候需要我们自己拓展Stream
。我们借助Stream
的transform
函数来完成Source Stream向Target Stream的转化。这里以rxdart中的timestamp
拓展函数来演示:
首先我们定义自己的StreamTransformer
:
1
2
3
4
5
6
7
8
9
class TimestampStreamTransformer<S> extends StreamTransformerBase<S, Timestamped<S>> {
/// Constructs a [StreamTransformer] which emits events from the
/// source [Stream] as snapshots in the form of [Timestamped].
TimestampStreamTransformer();
@override
Stream<Timestamped<S>> bind(Stream<S> stream) =>
Stream.eventTransformed(stream, (sink) => _TimestampStreamSink<S>(sink));
}
当然,这里的bind
就是真正进行Stream
转换的地方,你可以使用其他方法来实现:如创建output stream,然后listen source stream,然后转发Source Stream的events给output stream,然后在转发的过程中实现wrap timestamp的目的。这也是库stream_transform的通用做法。使用官方APIStream.eventTransformed
当然更简单一些,当然目的都是为了实现event的transform。
然后创建Stream
的拓展类应用这个StreamTransformer
:
1
2
3
4
extension TimeStampExtension<T> on Stream<T> {
Stream<Timestamped<T>> timestamp() =>
transform(TimestampStreamTransformer<T>());
}
将Source Stream中的每个event通过convert
转换成多个元素序列然后emit给Target Stream:
1
2
3
4
5
6
7
8
final map = <String, Iterable<int>>{
'even': <int>[0, 2, 4, 6, 8, 10],
'odd': <int>[1, 3, 5, 7, 9]
};
final c = StreamController<String>(sync: true);
c.stream.expand((element) => map[element]).forEach(print);
map.keys.forEach(c.add);
这将会输出0~10所有的自然数:
1
2
3
4
5
6
7
8
9
10
11
0
2
4
6
8
10
1
3
5
7
9
将当前Stream
中的events通过管道发送给另一个StreamConsumer
1
2
3
4
5
6
7
8
9
10
final sourceController = StreamController<String>(sync: true);
final targetController = StreamController<String>(sync: true);
sourceController.stream.pipe(targetController);
// listen
targetController.stream.listen(print);
// send
sourceController.add('event from sourceController');
这将会在控制台上打印出:
1
event from sourceController
— Lenox Enjoy