Streaming knjižnica s velesilom: FS2 i funkcionalno programiranje

Scala ima vrlo posebnu biblioteku za streaming pod nazivom FS2 (Functional Streams for Scala). Ova knjižnica utjelovljuje sve prednosti funkcionalnog programiranja (FP). Razumijevanjem njegovih dizajnerskih ciljeva postat ćete izloženi osnovnim idejama koje FP čine toliko privlačnim.

FS2 ima jedan središnji tip: Stream[Effect,Output]

Iz ove vrste možete saznati da je a Streami da emitira vrijednosti tipa Output.

Ovdje je očito pitanje što je Effect? Koja je veza između Effecti Output? A koje prednosti FS2 ima u odnosu na druge streaming knjižnice?

Pregled

Započet ću s pregledom problema koje FS2 rješava. Zatim uspoređujem Listi Streams nekoliko primjera koda. Nakon toga usredotočit ću se na upotrebu Streams DB-om ili bilo kojim drugim IO-om. Tu svijetli FS2 i tamo Effectse koristi tip. Jednom kad shvatite što Effectjest, prednosti funkcionalnog programiranja trebaju vam biti očite.

Na kraju ovog posta dobit ćete odgovore na sljedeća pitanja:

  • Koje probleme mogu riješiti s FS2?
  • Što mogu učiniti s Streamtim što Listne mogu?
  • Kako mogu unositi podatke iz API-ja / datoteke / DB-a u Stream?
  • Koji je ovaj Effecttip i kako se odnosi na funkcionalno programiranje?

Napomena: Kôd je na skali i trebao bi biti razumljiv čak i bez prethodnog poznavanja sintakse.

Koje probleme mogu riješiti s FS2?

  1. Strujanje I / O: učitavanje postupno velikih skupova podataka koji ne bi stali u memoriju i rad na njima bez ispuhavanja gomile.
  2. Kontrolni tok (nije obuhvaćen): Premještanje podataka iz jednog / nekoliko DB-a / datoteka / API-ja u druge na lijep deklarativni način.
  3. Istodobnost (nije pokrivena): Paralelno pokrenite različite tokove i natjerajte ih da komuniciraju zajedno. Na primjer, učitavanje podataka iz više datoteka i njihova istovremena obrada, za razliku od sekvencijalne. Ovdje možete raditi neke napredne stvari. Potoci mogu međusobno komunicirati tijekom faze obrade, a ne samo na kraju.

List nasuprot Stream

Listje najpoznatija i najkorištenija struktura podataka. Da bismo stekli osjećaj kako se razlikuje od FS2 Stream, proći ćemo kroz nekoliko slučajeva korištenja. Vidjet ćemo kako Streammožemo riješiti probleme koji Listto ne mogu.

Vaši su podaci preveliki i ne uklapaju se u memoriju

Recimo da imate vrlo veliku datoteku (40 GB) fahrenheit.txt. Datoteka ima temperaturu u svakom retku i želite je pretvoriti u celsius.txt.

Učitavanje velike datoteke pomoću List

import scala.io.Source val list = Source.fromFile("testdata/fahrenheit.txt").getLines.toList java.lang.OutOfMemoryError: Java heap space java.util.Arrays.copyOfRange(Arrays.java:3664) java.lang.String.(String.java:207) java.io.BufferedReader.readLine(BufferedReader.java:356) java.io.BufferedReader.readLine(BufferedReader.java:389)

Listneuspješno propadne jer je datoteka, naravno, prevelika da bi se stala u memoriju. Ako ste znatiželjni, možete ovdje provjeriti cjelovito rješenje Stream- ali učinite to kasnije, čitajte dalje :)

Kad Popis neće ... Stream u pomoć!

Recimo da sam uspio pročitati svoju datoteku i želim je vratiti. Želio bih sačuvati linijsku strukturu. Moram umetnuti znak novog retka \nnakon svake temperature.

Za to mogu koristiti interspersekombinator

import fs2._ Stream(1,2,3,4).intersperse("\n").toList

Još jedan lijep je zipWithNext

scala> Stream(1,2,3,4).zipWithNext.toList res1: List[(Int, Option[Int])] = List((1,Some(2)), (2,Some(3)), (3,Some(4)), (4,None))

Spaja uzastopne stvari zajedno, vrlo korisno ako želite ukloniti uzastopne duplikate.

Ovo je samo nekoliko od mnogih vrlo korisnih, evo cijelog popisa.

Očito Streammože učiniti puno stvari koje Listne mogu, ali najbolja značajka dolazi u sljedećem odjeljku, sve je o tome kako Streamu stvarnom svijetu koristiti DB-ove / datoteke / API-je ...

Kako mogu unositi podatke iz API-ja / datoteke / DB-a u Stream?

Recimo za sada da je ovo naš program

scala> Stream(1,2,3) res2: fs2.Stream[fs2.Pure,Int] = Stream(..)

Što to Pureznači? Evo skaladoka iz izvornog koda:

/** * Indicates that a stream evaluates no effects. * * A `Stream[Pure,O]` can be safely converted to a `Stream[F,O]` for all `F`. */ type Pure[A] <: Nothing

To znači da nema efekata, u redu ..., ali što je efekt? i preciznije kakav je učinak našeg programa Stream(1,2,3)?

Ovaj program doslovno nema utjecaja na svijet. Njegov jedini učinak bit će da vaš CPU radi i troši malo energije !! Ne utječe na svijet oko vas.

Utječući na svijet, mislim da troši bilo koji značajan resurs poput datoteke, baze podataka ili proizvodi bilo što poput datoteke, prenoseći neke podatke negdje, zapisujući na vaš terminal itd.

Kako pretvoriti Purestream u nešto korisno?

Recimo da želim učitati korisničke ID-ove iz DB-a, ja dobivam ovu funkciju, ona poziva DB i vraća userId kao Long.

import scala.concurrent.Future def loadUserIdByName(userName: String): Future[Long] = ???

Vraća znak Futurekoji označava da je ovaj poziv asinkron i da će vrijednost biti dostupna u nekom trenutku u budućnosti. Omotava vrijednost koju vraća DB.

Imam ovaj Puretok.

scala> val names = Stream("bob", "alice", "joe") names: fs2.Stream[fs2.Pure,String] = Stream(..)

Kako mogu dobiti StreamID-ove?

Naivni pristup bio bi korištenje mapfunkcije, trebao bi pokretati funkciju za svaku vrijednost u Stream.

scala> userIdsFromDB.compile res5: fs2.Stream.ToEffect[scala.concurrent.Future,Long] = [email protected]

Ipak sam se vratio Pure! Dao Streamsam funkciju koja utječe na svijet i još uvijek imam Pure, a ne cool ... Bilo bi uredno da FS2 automatski prepozna da loadUserIdByNamefunkcija ima utjecaj na svijet i vrati mi nešto što NIJE, Pureali ima ne raditi tako. Morate koristiti poseban kombinator umjesto map: morate koristiti evalMap.

scala> userIdsFromDB.toList :18: error: value toList is not a member of fs2.Stream[scala.concurrent.Future,Long] userIdsFromDB.toList ^

Nema više Pure! imamo Futureumjesto toga, yay! Što se upravo dogodilo?

Trebalo je:

  • loadUserIdByName: Future[Long]
  • Stream[Pure, String]

I prebacio vrste toka na

  • Stream[Future, Long]

Odvojilo je Futurei izoliralo! Lijeva strana koja je bila Effectparametar tipa sada je betonska Futurevrsta.

Sjajan trik, ali kako mi pomaže?

Upravo ste svjedočili istinskom razdvajanju zabrinutosti. Možete nastaviti raditi na streamu sa svim lijepim Listkombinacijama i ne morate brinuti je li DB u kvaru, sporo ili sa svim stvarima koje su povezane s mrežnim (efektnim) problemima.

Sve to funkcionira dok ne želim koristiti toListza vraćanje vrijednosti

scala> userIdsFromDB.toList :18: error: value toList is not a member of fs2.Stream[scala.concurrent.Future,Long] userIdsFromDB.toList ^

Što???!!! Mogao bih se zakleti da sam toListprije koristio i da je djelovalo, kako mogu reći da toListviše nisam član fs2.Stream[Future,String]? To je kao da je ova funkcija uklonjena onog trenutka kad sam počeo koristiti efektni stream, to je impresivno! Ali kako da vratim svoje vrijednosti?

scala> userIdsFromDB.compile res5: fs2.Stream.ToEffect[scala.concurrent.Future,Long] = [email protected]

First we use compile to tell the Stream to combine all the effects into one, effectively it folds all the calls to loadUserIdByName into one big Future. This is needed by the framework, and it will become apparent why this step is needed soon.

Now toList should work

scala> userIdsFromDB.compile.toList :18: error: could not find implicit value for parameter F: cats.effect.Sync[scala.concurrent.Future] userIdsFromDB.compile.toList ^

What?! the compiler is still complaining. That’s because Future is not a good Effect type — it breaks the philosophy of separation of concerns as explained in the next very important section.

IMPORTANT: The ONE thing to take away from this post

A key point here, is that the DB has not been called at this point. Nothing happened really, the full program does not produce anything.

def loadUserIdByName(userName: String): Future[Long] = ??? Stream("bob", "alice", "joe").evalMap(loadUserIdByName).compile

Separating program description from evaluation

Yes it might be surprising but the major theme in FP is separating the

  • Description of your program: a good example is the program we just wrote, it’s a pure description of the problem “I give you names and a DB, give me back IDs”

And the

  • Execution of your program: running the actual code and asking it to go to the DB

One more time our program has literally no effect on the world besides making your computer warm, exactly like our Pure stream.

Code that does not have an effect is called pure and that’s what all Functional Programming is about: writing programs with functions that are pure. Bravo, you now know what FP is all about.

Why would you want write code this way? Simple: to achieve separation of concerns between the IO parts and the rest of our code.

Now let’s fix our program and take care of this Future problem.

As we said Future is a bad Effect type, it goes against the separation of concerns principle. Indeed, Future is eager in Scala: the moment you create one it starts to executes on some thread, you don't have control of the execution and thus it breaks. FS2 is well aware of that and does not let you compile. To fix this we have to use a type called IO that wraps our bad Future.

That brings us to the last part, what is this IO type? and how do I finally get my list of usedIds back?

scala> import cats.effect.IO import cats.effect.IO scala> Stream("bob", "alice", "joe").evalMap(name => IO.fromFuture(IO(loadUserIdByName(name)))).compile.toList res8: cats.effect.IO[List[Long]] = IO$2104439279

It now gives us back a List but still, we didn't get our IDs back, so one last thing must be missing.

What does IO really mean?

IO comes from cats-effect library. First let's finish our program and finally get out the ids back from the DB.

scala> userIds.compile.toList.unsafeRunSync :18: error: not found: value userIds userIds.compile.toList.unsafeRunSync ^

The proof that it’s doing something is the fact that it’s failing.

loadUserIdByName(userName: String): Future[Long] = ???

When ??? is called you will get this exception, it means the function was executed (as opposed to before when we made the point that nothing was really happening). When we implement this function it will go to the DB and load the ids, and it will have an effect on the world (network/files system).

IO[Long] is a description of how to get a value of type Long and it most certainly involves doing some I/O i.e going to the network, loading a file,...

It’s the How and not the What. It describes how to get the value from the network. If you want to execute this description, you can use unsafeRunSync (or other functions prefixed unsafe). You can guess why they are called this way: indeed a call to a DB is inherently unsafe as it could fail if, for example, your Internet connection is out.

Recap

Let’s take a last look at Stream[Effect,Output].

Output is the type that the stream emits (could be a stream of String, Long or whatever type you defined).

Effect is the way (the recipe) to produce the Output (i.e go to the DB and give me an id of type Long).

It’s important to understand that if these types are separated to make things easier, breaking down a problem in subproblems allows you to reason about the subproblems independently. You can then solve them and combine their solutions.

The link between these 2 types is the following :

In order for the Stream to emit an element of type

  • Output

It needs to evaluate a type

  • Effect

A special type that encodes an effective action as a value of type IO, this IO value allows the separation of 2 concerns:

  • Description:IO is a simple immutable value, it’s a recipe to get a type A by doing some kind of IO(network/filesystem/…)
  • Execution: in order forIO to do something, you need to execute/run it using io.unsafeRunSync

Putting it all together

Stream[IO,Long] says:

This is a Stream that emits values of type Long and in order to do so, it needs to run an effective function that producesIO[Long] for each value.

That’s a lot of details packed in this very short type. The more details you get about how things happen the fewer errors you make.

Takeaways

  • Stream is a super charged version of List
  • Stream(1,2,3) is of type Stream[Pure, Int] , the second type Int is the type of all values that this stream will emit
  • Pure means no effect on the world. It just makes your CPU work and consumes some power, but besides that it does not affect the world around you.
  • Use evalMap instead of map when you want to apply a function that has an effect like loadUserIdByName to a Stream.
  • Stream[IO, Long] separates the concerns of What and How by letting you work only with the values and not worrying about how to get them (loading from the db).
  • Separating program description from evaluation is a key aspect of FP.
  • All the programs you write with Stream will do nothing until you use unsafeRunSync. Before that your code is effectively pure.
  • IO[Long] is an effect type that tells you: you will get Long values from IO (could be a file, the network, the console ...). It's a description and not a wrapper!r
  • Future does not abide by this philosophy and thus is not compatible with FS2, you have to use IO type instead.

FS2 videos

  • Hands on screencast by Michael Pilquist: //www.youtube.com/watch?v=B1wb4fIdtn4
  • Talk by Fabio Labella //www.youtube.com/watch?v=x3GLwl1FxcA