在Dart中,如果我们向要返回多个值,可以使用IterableStream。 当每个值都需要经过耗时计算而得(比如每次计算耗时1s),如果使用Iterable,可以这样:

Iterable<int> genSnc() sync* {  
  for (var i = 0; i < 3; i++) {  
    sleep(Duration(seconds: 1));
    yield i;  
  }  
}

但是这里的sleep是占用CPU的 Blocking 操作,运行这段代码会阻塞主线程,这在Flutter中是不允许的。 于是我们想到可以使用异步计算替代同步计算,这时候我们就需要使用Stream来替代Iterable

Stream<int> genAsync() async* {  
  for (var i = 0; i < 3; i++) {  
    await Future.delayed(Duration(seconds: 1));
    yield i;  
  }  
}

Cold Stream Link to heading

Stream 类似于Iterable,是一个冷流,这一味这上述的 Asynchronous generator 只有当Stream 开始 listenawait for 的时候才会执行(Synchronous generator 只有当 Iterable 开始迭代的时候才会执行)。

await for Link to heading

当我们需要从Stream获取值时,除了使用 Stream API,还可以使用asyncasync forasynchronous for loop):

void forEachStream() async {  
  await for(var value in genAsync()) {  
    print(value);  
  }  
}

for循环直到Stream emit一个值时一直处于wait状态;当Stream emit一个值后,执行循环体并将value设置为emit的值;然后这样循环执行直到Stream 处于closed状态。 for循环breakreturn将打破for循环并取消订阅流。

如何创建Stream Link to heading

除了使用Asynchronous generator, 还可以使用StreamController来创建StreamStreamController可以创建两种类型的Stream

支持单次订阅的Stream Link to heading

final controller = StreamController<int>();  
final stream = controller.stream;  
stream.listen(print);  
stream.listen(print);  
controller.add(10);

这将抛出异常:

Unhandled exception:
Bad state: Stream has already been listened to.
#0      _StreamController._subscribe (dart:async/stream_controller.dart:710:7)
#1      _ControllerStream._createSubscription (dart:async/stream_controller.dart:860:19)
#2      _StreamImpl.listen (dart:async/stream_impl.dart:493:9)
#3      main (file:///home/xianxueliang/IdeaProjects/dart_app/bin/dart_app.dart:9:10)
#4      _startIsolate.<anonymous closure> (dart:isolate-patch/isolate_patch.dart:299:32)
#5      _RawReceivePortImpl._handleMessage (dart:isolate-patch/isolate_patch.dart:168:12)

支持多次订阅的Streambroadcast stream Link to heading

final controller = StreamController<int>.broadcast();  
final stream = controller.stream;  
stream.listen(print);  
stream.listen(print);  
controller.add(10);

这将输出:

10
10

Async/Sync StreamController Link to heading

当你创建一个StreamController的时候,有一个Named Parameter-sync来控制StreamControllerSync还是AsyncSync StreamController相比Async StreamController的唯一优势在于低延迟:当向Stream添加一个event的时候,Async StreamController会将这个Event包装成为_DelayedEvent,然后通过scheduleMicrotask 异步调度这个_DelayedEvent将其转发给Subscriber,所以会有一定的延迟;和Async StreamController相反,Sync StreamController会立刻将event转发给Subscriber,延迟只因程序的执行而产生,所以可以视为无延迟。 Note:官方推荐使用 non-sync 版本。

StreamTransformer Link to heading

Iterable一样,Stream也需要一些中间操作来丰富自己-如Streameventsmapfilter等等。当然,这些都已经存在于Stream的核心API中,但是我们可能还需要类似 timestamp(包装原始event,使其增加时间辍)等等这些拓展API,这时候需要我们自己拓展Stream。我们借助Streamtransform函数来完成Source StreamTarget Stream的转化。这里以rxdart中的timestamp拓展函数来演示: 首先我们定义自己的StreamTransformer

class TimestampStreamTransformer<S> extends StreamTransformerBase<S, Timestamped<S>> {  
 /// Constructs a [StreamTransformer] which emits events from the  
 /// source [Stream] as snapshots in the form of [Timestamped]. 
 TimestampStreamTransformer();  
  
  @override  
  Stream<Timestamped<S>> bind(Stream<S> stream) =>  
      Stream.eventTransformed(stream, (sink) => _TimestampStreamSink<S>(sink));  
}

当然,这里的bind就是真正进行Stream转换的地方,你可以使用其他方法来实现:如创建output stream,然后listen source stream,然后转发Source Streameventsoutput stream,然后在转发的过程中实现wrap timestamp的目的。这也是库stream_transform的通用做法。使用官方APIStream.eventTransformed当然更简单一些,当然目的都是为了实现eventtransform

然后创建Stream的拓展类应用这个StreamTransformer

extension TimeStampExtension<T> on Stream<T> {  
  Stream<Timestamped<T>> timestamp() =>  
      transform(TimestampStreamTransformer<T>());  
}

部分官方API介绍 Link to heading

expand Link to heading

Source Stream中的每个event通过convert转换成多个元素序列然后emitTarget Stream

final map = <String, Iterable<int>>{  
  'even': <int>[0, 2, 4, 6, 8, 10],  
  'odd': <int>[1, 3, 5, 7, 9]  
};  
final c = StreamController<String>(sync: true);  
c.stream.expand((element) => map[element]).forEach(print);  
  
map.keys.forEach(c.add);

这将会输出0~10所有的自然数:

0
2
4
6
8
10
1
3
5
7
9

pipe Link to heading

将当前Stream中的events通过管道发送给另一个StreamConsumer

final sourceController = StreamController<String>(sync: true);  
final targetController = StreamController<String>(sync: true);  
  
sourceController.stream.pipe(targetController);  
  
// listen  
targetController.stream.listen(print);
  
// send  
sourceController.add('event from sourceController');

这将会在控制台上打印出:

event from sourceController

受欢迎的Stream拓展库 Link to heading