flutter

Dart에서 stream 만들기

paulaner80 2019. 4. 4. 12:30
반응형


다트의 dart:async 라이브러리의 많은API중에 Stream과 Future라는 중요한 타입이 있습니다.

Future는 단일 계산을 나타내고 Stream은 결과의 시퀀스를 나타낸다. 스트림을 청취하고 결과를 받고 종료알림을 받습니다.


여기서는 스트림을 발생시키는 방법을 알아보겠습니다.


스트림은 몇가지 방법을 생성할 수 있습니다.

-. 기존스트림 변환하기

-.  async* 함수에서 만들기

-. StreamController로 만들기




기존 스트림 변환

스트림을 만드는 일반적인 케이스는 기존의 스트림의 이벤트를 기반으로 새 스트림을 만들려는 경우입니다. 예를 들어, UTF-8 입력을 디코딩하여 문자열 스트림으로 변환하려는 바이트 스트림이있을 수 있습니다. 가장 일반적인 방법은 원본 스트림에서 이벤트를 기다린 다음 새 이벤트를 출력하는 새 스트림을 만드는 것입니다.


일반적으로 스트림을 변환 할 때, map (), where (), expand () 및 take ()와 같은 Stream에서 제공하는 변환 메서드를 사용할 수 있습니다.
예를들어 매초마다 증가하는 카운터를 내보내는 counterStream이 있다고 가정합니다. 구현방법은 다음과 같습니다.

import 'dart:async';

main(List<String> args) {
var conterStream =
Stream<int>.periodic(Duration(seconds: 1), (x) => x).take(15);
conterStream.forEach(print);
}


스트림 이벤트를 변환하려면 스트림을 듣기 전에 스트림에서 map ()과 같은 변형 메서드를 호출 할 수 있습니다. 이 메서드는 새 스트림을 반환합니다.


main(List<String> args) {
var conterStream =
Stream<int>.periodic(Duration(seconds: 1), (x) => x).take(15);

conterStream
.where((int x) => x.isEven) //짝수만 나오도록한다.
.map((int x) => x * 2) //값을 두배로 해준다.
.expand((var x) => [x, x]) //이벤트를 복제한다.
.take(5) //처음 5개 이벤트 후 멈춘다.
.forEach(print);
}



보통은 변형 함수만 있으면 됩니다. 그러나 변화을 더 많이 제어해야하는 경우 Stream의 transform () 메서드를 사용하여 StreamTransformer를 지정할 수 있습니다. 플랫폼 라이브러리는 일반적인 작업을 위해 많은 스트림 트랜스포머를 제공하고 있습니다. 다음 예제는 dart:convert 라이브러리가 제공하는 utf8.decoder 와  LineSplitter를 사용하는 코드 입니다. 


import 'dart:convert';
import 'dart:io';

main(List<String> args) async {
Stream<List<int>> content = File('someFile.txt').openRead();
List<String> lines = await content
.transform(utf8.decoder)
.transform(LineSplitter())
.toList() as List<String>;
}




async* 에서 만들기

스트림을 생성하는 방법중 하나는 비동기 제너레이터(async *) 함수를 사용하는 것입니다.  함수가 호출되면 스트림이 만들어집니다. 그리고 스트림이 청취되기 시작하면 함수의 본문이 시작됩니다. 함수가 리턴되면 스트림이 닫힙니다. 스트림이 닫힐 때까지 yeild나 yeild*를 통해서 스트림으로 이벤트가 내보내집니다. 


기본적인 예제 코드 입니다. 

main(List<String> args) {
Stream<String> timedCounter$ = timedCounter(Duration(seconds: 1), 10);
timedCounter$.forEach(print);
}

Stream<String> timedCounter(Duration interval, [int maxCount]) async* {
int i = 0;
int c = "a".codeUnitAt(0);
while (true) {
await Future.delayed(interval);
yield String.fromCharCode(c + i);
i++;
if (i == maxCount) break;
}
}


timedCounter 함수는 스트림을 반환합니다. 그 스트림이 청취되면 본문이 실행되기 시작합니다. 반복적으로 interval 만큼 지연시킨다음 다음
String.formCharCode(c+1)을 발생시킵니다. maxCount 파라미터가 없으면 영원히 계속되거나 청취자가 구독을 취소할 때 까지 계속 출력합니다. 


리스너가 취소하면 다음 본문이 yield 문에 도달하면 yield 문은 return문으로 사용됩니다. final 블럭이 있으면 실행되고 함수를 빠져나오게 됩니다.  함수가 종료하기 전에 값을 생성하려고하면 실패하고 리턴 값으로 작동합니다. 함수가 마지막으로 종료되면  cancel () 메서드에서 반환되는 future는 complete가 됩니다. 함수가 오류와 함께 종료되면 future는 error가 됩니다. 그렇지 않으면 null로 완료됩니다.



또 다른 유용한 예는 future의 시퀀스를 스트림으로 변환하는 것입니다.

Stream<T> streamFromFutures<T>(Iterable<Future<T>> futures) async* {
for (var future in futures) {
var result = await future;
yield result;
}
}


이 함수의 파라미터는 Iterable<Future> 입니다. for문에서 future를 await한다음 결과를 발생시킵니다. 만안 future가 에러로 완료되면 스트림도 에러로 완료됩니다. 


아무 것도없는 스트림을 구축하는 async* 함수를 사용하는 경우는 거의 없습니다. 어딘가에서 데이터를 가져오는 것이 필요한데 가장 자주 사용되는 것은 다른 스트림으로부터입니다. 위와의 future 시퀀스 같이 데이터는 비동기 이벤트 소스로부터 올수 있습니다. 많은 경우에 async* 함수는 여러개의 데이터소스를 다루기에는 너무 간단합니다. 그래서 StreamController가 있는 것입니다.



StreamController 사용하기

만약 스트림이벤트가 프로그램의 다른 부분에서 온다면, 스트림을 만들기위해서 StreamController를 사용하세요.

StreamController은  이벤트를 스트림에 넣는 방법과 스트림을 제공합니다. 스트림은 리스너처리와  일시 중지에 필요한 모든 로직을 가지고 있습니다.


다음의 예제는 약간의 결함이 있지만 이전 예제의 timedCounter()를 구현하기 위한 StreamController의 사용법을 보여줍니다. 이 코드는 반환 할 스트림을 만든 다음 스트림이나 future가 아니고 타이머 이벤트를 기반으로 데이터를 제공합니다.


import 'dart:async';

// 이예제는 결함이 있습니다.
// 구독자를 갖기전에 시작합니다. 그리고 pause를 구현하지 않았습니다.
Stream<int> timedCounter(Duration interval, [int maxCounter]) {
var controller = StreamController<int>();

int counter = 0;

void tick(Timer timer) {
counter++;
controller.add(counter); //스트림에게 카운터 값을 보내라고 요청합니다.

if (maxCounter != null && counter >= maxCounter) {
timer.cancel();
controller.close(); //스트림을 닫고 청취자들에게 말해달라고 요청합니다.
}
}

Timer.periodic(interval, tick); //BAD: 구독자를 갖기전에 시작함.

return controller.stream;
}

main(List<String> args) {
var counterStream = timedCounter(Duration(seconds: 1), 5);
counterStream.listen(print);
}



timedCounter()의 구현은 두가지 문제점을 가지고 있습니다.

-. 구독자를 가지기 전에 이벤트 생산을 시작합니다.

-. 구족자가 pause를 요청해도 이벤트 생산을 계속합니다.

다음 섹션에서 StreamController를 생성할 때 onListen과 onPause 같은 콜백을 명시헤서 이 문제를 해결하는 것을 보여드리겠습니다.


구독 기다리기

스트림은 일을 시작하기전에 구독자를 기다리는 것이 규칙입니다. async*는 자동으로 이것을 합니다. 하지만 StreamController를 사용할 때는, 모든권한을 가지며 수행하지 않아도 이벤트를 추가할 수 있습니다. 스트림에 구독자가 없는 경우 StreamController는 이벤트를 버퍼링 하므로, 구독자를 얻지 못하면 메모리 누수가 발생할 수 있습니다. 


스트림을 사용하는 코드를 아래와 같이 변경하세요

void listenAfterDelay() async {
var counterStream = timedCounter(const Duration(seconds: 1), 15);
print("1---------------${DateTime.now().toString().substring(11, 19)}");
await Future.delayed(const Duration(seconds: 5));
print("--2---------------${DateTime.now().toString().substring(11, 19)}");

// 5초후 리스너 등록
await for (int n in counterStream) {
print("----3---------------${DateTime.now().toString().substring(11, 19)}");
print(n); //15번 매초마다 숫자 출력
}
}

main(List<String> args) {
// var counterStream = timedCounter(Duration(seconds: 1), 5);
// counterStream.listen(print);

listenAfterDelay();
}


출력

1)--12:01:49

2)--12:01:54

3)--12:01:54 : 1

3)--12:01:54 : 2

3)--12:01:54 : 3

3)--12:01:54 : 4

3)--12:01:54 : 5

3)--12:01:55 : 6

3)--12:01:56 : 7

3)--12:01:57 : 8

3)--12:01:58 : 9

3)--12:01:59 : 10

3)--12:02:00 : 11

3)--12:02:01 : 12

3)--12:02:02 : 13

3)--12:02:03 : 14

3)--12:02:04 : 15


이 코드를 실행하면, 스트림은 동작하지만 처음 5초동안은 아무것도 출력되지않습니다. 그리고 나서 리스너가 등록되면 StreamController에 의해 버퍼링된 처음 5개가 출력되고 이벤트들이 출력됩니다. 


구독에대한 알림을 받으려면 StreamController를 만들 때 onListen 인수를 지정하세요. onListen 콜백은 스트림이 첫 번째 구독자를 가져올 때 호출 됩니다. 만약 onCancel콜백을 지정하였다면, 컨트롤러가 마지막 구독자를 잃을 때 호출됩니다. 앞의 예제에서 Timer.periodic()은 다음섹션에서 볼수 있듯이 onListen 핸들러로 이동해야합니다. 


pause 상태 이행하기

리스너가 pause를 요청한다면 이벤트 발생을 피하세요. async* 함수는 구독자가 멈추었을 때 yield 문에서 자동으로 멈춤니다. 반면 StreamController는 멈춤 상태일 때 이벤트들을 버퍼링합니다.  만약 이벤트를 제공하는 코드가 일시정지를 고려하지 않는다면 버퍼사이즈는 무한히 커질 수 있습니다. 또한 리스너가 일시정지후 곧 듣기를 멈추면 버퍼 생성 작업이 낭비됩니다. 


일시정지 지원이 없다면 어떤 일이  일어나는지 보세요. 스트림을 사용하는 코드를 아래처럼 수정합니다.


void listenWithPause() {
var conuterStream = timedCounter(Duration(seconds: 1), 15);
StreamSubscription<int> subscription;

subscription = conuterStream.listen((int counter) {
print(counter);

//5초후 멈추고, 5초후
if (counter == 5) {
/*
void pause ([
Future resumeSignal
])

일시정지 중에는 어떤 이벤트도 발생하지 않습니다.
소스로 부터 이벤트들은 구독이 재시작될때까지 퍼링됩니다.
non-broadcast 스트림의 경우 기본소스에 대게 일시 중지에대한
기본정보가 제공되므로 구독이 제게될때까지 이벤트 생성을
중지할 수 있습니다.
Broadcast 스트림의 경우 중간의 이벤트가 중요하지 않다면
버퍼링하지 않기 위해서 구독을 취소하고나 이벤트가
필요할 때 청취를 다시 시작하는 것이 더 좋습니다.
resumeSignal 인자가 있으면 Future resumeSignal이 완료되면 스트림 구독은
resume을 호출해 일시정지를 취소할 것입니다. Future resumeSignal이 에러로
완료되면 스트림은 재게되고 Zone.handleUncaughtErrorr가 전달 됩니다.

재개할 때 resume을 호출 합니다.

pause가 호출 된 만큼 resume도 호출 됩니다. resume을 호출하는 것과
resumeSignal이 완료되는 것은 상호교환이 가능합니다.
resumeSignal을 전달하는 pause는 resume호출에 의해 종료될 수 있으며
resumeSignal의 완료는 또다른 puase가 종료 될 수 있습니다.

구독이 일시정지되지 않은 경우에도 resuem하거나 resumeSignal이
종료되어도 안전합니다. resume은 효과가 없습니다.
*/
subscription.pause(Future.delayed(Duration(seconds: 5)));
}
});
}



5초의 일시중지가 발생하면 그 시간동안 발생한 이벤트는 한번에 다 받습니다. 이는 스트림의 소스는 일지정지하지않고 스트림에 이벤트를 계속 추가하기 때문에 발생합니다. 따라서 스트림은 이벤트를 버퍼링하고 스트림이 resume될 때 버퍼를 비웁니다.



아래소스는 StreamController의 onListen, onPause, onResuem, onCance을 사용하여 pause를 구현한 예제입니다.

Stream<int> timedCounter(Duration interval, [int maxCount]) {
StreamController controller;
Timer timer;
int counter;

//void tick(Timer timer) => timer가 필요 없으므로 _
void tick(_) {
counter++;
controller.add(counter);
if (counter == maxCount) {
timer.cancel();
controller.close();
}
}

void startTimer() {
timer = Timer.periodic(interval, tick);
}

void stopTimer() {
if (timer != null) {
timer.cancel();
timer = null;
}
}

controller = StreamController<int>(
onListen: startTimer,
onPause: stopTimer,
onResume: startTimer,
onCancel: stopTimer);

//The stream that this controller is controlling.
return controller.stream;
}


이 코드를 listenWithPause 함수와 같이 실행하세요. 일시중지 된 동안 계산이 중지되고 나중에 다시 시작됩니다.


일시중지상태의 변화를 통보받기 위해서는 모든 리스너들 (onListen, onPause, onResuem, onCance)을 사용해야 합니다. 그렇게하면 onListen과 onCancel이 호출되면서 구독과 일시정지상태가 동시에 변경됩니다. 




마지막 힌트

aync* 함수를 사용하지 않고 스트림을 만들 때는 이런 것들을 명심하세요

 . 동기 큰트롤러를 사용할 때는 조심하세요. StreamController(sync:true)를 사용하여 스트림하를 만는 예를 보면... 일시정지되지 않은 비동기 컨틀롤러로 이벤트를 보내면 ( 예를 들어 EventSink로 정의된 add(), addError()또는 close()메소드들을 사용하여), 그 이벤튼 즉시 스트림의 모든 청취자들에게 보내집니다. 스트림 리스너의 코드들은 전체가 리턴될때까지 호출되지 않습니다.  그리고 잘못된 시간에 동기 컨트롤러를 사용하는 것은 실패할 수 있습니다. 동기 컨트롤러 사용은 피하세요.


. StreamController를 사용하면 listen 호출이 StreamSubscription을 반환하기 전에 onListen 콜백이 호출됩니다. onListen 콜백이 이미 존재하는 구독에 의존하지 않도록하십시오. 예를 들어, 다음 코드에서 subscription 변수가 유효한 값을 가지기 전에 onListen 이벤트가 발생하고 핸들러가 호출됩니다.

   subscription = stream.listen(handler);


StreamListroller에 의해 정의 된 onListen, onPause, onResume 및 onCancel 콜백은 스트림의 리스너 상태가 변경 될 때 스트림에 의해 호출되지만 이벤트 발생 또는 다른 상태 변경 핸들러 호출 중에는 결코 호출되지 않습니다. 이러한 경우 상태 변경 콜백은 이전 콜백이 완료 될 때까지 지연됩니다.


직접 Stream 인터페이스를 구현하지 마십시오. 이벤트, 콜백 간의 상호 작용을 쉽게 얻고 청취자를 미묘하게 잘못 추가하거나 제거 할 수 있습니다. 새 스트림의 수신 대기를 구현하려면 항상 StreamController의 기존 스트림을 사용하십시오.


Stream 클래스를 확장하고 청취 메소드와 추가 기능을 구현하여 더 많은 기능을 가진 Stream을 확장하는 클래스를 만들 수는 있지만 사용자가 고려해야하는 새로운 유형을 도입하기 때문에 일반적으로 권장하지 않습니다. 대신 스트림 (또는 그 이상)이 아닌 클래스 (종종 스트림)를 만들 수 있습니다.






https://www.dartlang.org/articles/libraries/creating-streams





'flutter' 카테고리의 다른 글

Widget, State, BuildContext 그리고 InheritedWidget  (1) 2019.04.16
플러터용 파이어 베이스 1부  (0) 2019.04.11
dart  (0) 2019.04.03
Dart mixin 이란?  (0) 2019.04.03
RxDart 관련 내용들  (0) 2019.03.29