6. Estymätön ja asynkroninen siirräntä
Sisältö:
6 Estymätön ja asynkroninen siirräntä
Tarvitaan tavallisimmin tietoliikennesovelluksissa ja monen asiakkaan palvelijoissa. Palvelija ei voi jäädä odottamaan, että:
- tiedoston avaus lopulta onnistuisi (open())
- nimetyn putken toista päätä ei avattu
- tiedosto jollakin käytössä
- saataisiin syötteitä joltakin asiakkaalta (read()) putkesta, pistokkeesta, näppäimistöltä
- tulisi tilaa kirjoittamista varten (write()), koska esim putki voi olla täynnä
- joku vapauttaisi tiedostolukituksen (fcntl()) cmd = F_SETLKW (tai pakottava lukitus)
- tietyt ioctl()-kutsut valmistuisivat
- prosessien välinen tiedonsiirto valmistuisi
Jos tietää etukäteen, mikä asiakas (= tiedostokuvaaja) on valmis tiedonsiirtoon, voi käyttää estyviä siirrännän systeemikutsuja (esim. vuorot pelissä). Jos aktiivisuusjärjestys on satunnainen, voi sovelluksen toteuttaa kahdella tavalla:
- luo kullekin asiakkaalle oma prosessi, joka käyttää estyviä systeemikutsuja.
- tee kaikille yhteinen prosessi, joka käyttää estymättömiä systeemikutsuja.
6.1 Estymätön siirräntä
Kun asetetaan open()- tai fcntl()-kutsussa käyttötavaksi O_NONBLOCK, ei siirrännän systeemikutsut read() ja write() jätä prosessia odottamaan. Jos kutsu ei voi saattaa toimintoa loppuun asti, se palauttaa arvon -1 ja errno == EAGAIN. Prosessin tarkkailtavana voi olla monta kuvaajaa. Mistä tiedetään mitä voi lukea / kirjoittaa ?
- yritetään lukea tai kirjoittaa vuorotellen kutakin kuvaajaa eli aktiivinen tarkkailu.
- kysellään systeemikutsuilla select(2) tai poll(2) mitä kuvaajia voi käsitellä odottamatta tai herätys, kun on käsiteltävää. Käsitellään vain niitä kuvaajia, jotka saatu funktion paluussa.
#include < sys/types.h > #include < sys/times.h > #include < unistd.h > struct timeval { long tv_sec; sekunnit long tv_usec; mikrosekunnit }; int select(int maxfdp1, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *tvptr); Palauttaa: käsittelyä odottavien lkm, 0, jos timeout maxfdp1 suurin seuraavissa bittikartoissa (fd_set) käyttämämme kuvaajan numero + 1 readfds bittikartta kuvaajista, joista ollaan kiinnostuneita lukemaan writefds bittikartta kuvaajista, joihin ollaan kiinnostuneita kirjoittamaan exceptfds bittikartta kuvaajista, joihin liittyvät poikkeustilanteet halutaan käsitellä tvptr kuinka kauan maksimissaan odotetaan jos tvptr==NULL niin jää odottamaan, kunnes käsiteltävää muuten jos tv_sec==0, tv_usec==0 niin ei jää odottamaan muuten odota kork. tv_sec + tv_usec Palauttaa 0, jos timeout montako kuvaajaa odottaa käsittelyä käsittelyä odottavien kuvaajien numerot asetettu em. bittikarttoihin
Kun select() palauttaa kuvaajan numeron bittikartassa, ei ko. kuvaajaan kirjoittaminen tai lukeminen jätä prosessia odottamaan. O_NONBLOCK ei vaikuta funktion select() toimintaan, joten asetettava ajastus 0, jos halutaan sillekin estymätön suoritus. Signaali voi katkaista funktion select() suorituksen, jolloin funktio palauttaa -1 ja errno == EINTR. fd_set-bittimaskien käsittelyyn on makrot:
FD_ZERO(fd_set *fdset); FD_SET(int fd, fd_set *fdset); FD_CLR(int fd, fd_set *fdset); FD_ISSET(int fd, fd_set *fdset); FD_ZERO alustaa bittikartan tyhjäksi FD_SET asettaa bitin FD_CLR kaataa bitin FD_ISSET palauttaa TRUE, jos bitti asetettu fd_set rset; int fd, fd2; #define MAX(x,y) (((x)>(y))?(X):(y)) FD_ZERO(&rset); FD_SET(fd,&rset); FD_SET(fd2,&rset); while (select(MAX(fd,fd2),&rset, NULL, NULL, &tvptr)) { if (FD_ISSET(fd,&rset)) { ... } if (FD_ISSET(fd2,&rset)) { ... } }
Aseta estymätön siirräntä päälle tai pois:
#define ON 1 #define OFF 0 void Nonblock(int fd, int on) { int flags; if (( flags = fcntl(fd, F_GETFL, 0)) < 0) perror("Nonblock: fcntl-GET"); if (on) flags |= O_NONBLOCK; else flags &= ~O_NONBLOCK; if (fcntl(fd, F_SETFL, flags) < 0) perror("Nonblock: fcntl-SET"); } /* Siirrä tavu fdin virrasta fdout virtaan: */ #define FULL 1 #define EMPTY 0 void Pass(inr fdin, int fdout, int *n, char *ch, int eof) { if (*n == FULL) { switch (write(fdout, ch, 1)) { case 1: *n = EMPTY; break; case -1: if (errno != EAGAIN) perror("Pass: write"); break; } } if (*n == EMPTY) { switch (read(fdin, ch, 1)) { case 0: *eof = TRUE; break; case 1: *n = FULL; break; case -1: if (errno != EAGAIN) perror("Pass: read"); break; } } } /* Välitä dataa kaksisuuntaisesti */ void BiPass(int fdin1, int fdout1, int fdin2, int fdout2) { int n1 = EMPTY, n2 = EMPTY; char ch1, ch2; int eof1 = FALSE, eof2 = FALSE; Nonblock(fdin1,ON); Nonblock(fdin2,ON); for ( ; ; ) { Pass(fdin1,fdout1,&n1,&ch1,&eof1); if (eof1) close(fdout1); Pass(fdin2,fdout2,&n2,&ch2,&eof2); if (eof2) close(fdout2); if (eof1 && eof2) break; if ((n1==EMPTY) && (n2==EMPTY)) sleep(1); } Nonblock(fdin1,OFF); Nonblock(fdin2,OFF); }
Sama välittäjäprosessi kuin edellä, toteutus funktiota select() käyttäen:
/* Siirrä tavu fdin virrasta fdout virtaan:*/ #define SIZE 512 void Pass(int fdin, int fdout, int eof) { int r_cnt, w_cnt, i = 0; char buf[SIZE]; switch (r_cnt = read(fdin, buf, SIZE) { case -1: perror("Pass: read"); break; case 0: *eof = TRUE; return; default: break; } while (i < r_cnt) { if (w_cnt = write(fdout, &buf[i], r_cnt - i)) < 0) perror("Pass: write"); i += w_cnt; } } /* Välitä dataa kaksisuuntaisesti */ void BiPass(int fdin1, int fdout1, int fdin2, int fdout2) { int eof1 = FALSE, eof2 = FALSE; fd_set rdset, wrset; FD_ZERO(&wrset); for ( ; ; ) { FD_ZERO(&wrset); if (! eof1) FD_SET(fdin1,&rdset); if (! eof2) FD_SET(fdin2,&rdset); if (select(FD_SETSIZE, &rdset, &wrset,NULL, NULL) < 0) perror("BiPass: select"); if (FD_ISSET(fdin1,&rdset) { Pass(fdin1, fdout1, &eof1) if (eof1) close(fdout1); } if (FD_ISSET(fdin2,&rdset) { Pass(fdin2, fdout2, &eof2) if (eof2) close(fdout2); } if (eof1 && eof2) break; } }
6.2 Asynkroninen siirräntä Otetaan käyttöön funktiolla fcntl() asettamalla tiedostokuvaajalle O_ASYNC. Prosessille signaali, kun käsiteltävää:
- SIGPOLL (SVR4)
- SIGIO (BSD)
- SIGURG (BSD) (vain pistokkeille (OOB-data))
Jos useita asynkronisia siirtokanavia, ei tiedetä mikä kuvaaja aiheutti signaalin lähettämisen. Yritettävä lukea / kirjoittaa kaikkia vuorotellen estymättömiä kutsuja käyttäen tai käytettävä funktiota select() tai poll(). Alkutoimet:
aseta SIGPOLL-käsittelijä: sigaction() tai sigset() aseta siirräntä asynkroniseksi: ioctl(2), request=I_SETSIG, arg=milloin halutaan SIGPOLL Rajoitus: toimii vain ns. tietovirroille (streams devices), esim. pääte I/O, s-putki, nimetty s- putki
/* Kirjastot ja globaalit esittelyt: */ #include < stdio.h > #include < unistd.h > #include < sys/types.h > #include < stropts.h > #include < errno.h > #include < signal.h > #include < setjmp.h > #define ON 1 #define OFF 0 sigset_t ennen_set; struct sigaction ennen_action; sigjmp_buf lakkautus; /* Salli tai kiellä SIGPOLL: */ void Aseta_Asyncmode(int fd, int on) { int ret; if (on) ret = ioctl(fd, I_SETSIG, S_RDNORM | S_HANGUP); else ret = ioctl(fd, I_SETSIG, 0); if (ret != 0) perror("Asyncmode: ioctl, I_SETSIG"); } /* Tarkistus onko luettavaa: */ int Luettavaa(int fd) { int tavuja, sanomia; if ((sanomia = ioctl(fd, I_NREAD, &tavuja)) < 0) perror("Luettavaa: ioctl, I_NREAD"); return (sanomia > 0); } /* Varsinainen laskurutiini: */ void Laskenta(float x) { printf("lasketaan %.2f\n",x); while (1) { /* tähän laskentarutiinit */ } } /* "Alkuperäinen" signaalinkäsittelijä: */ void Kerro_Sigpoll(int signo) { psignal(signo,"Sattuipa signaali"); } /* Uusi signaalinkäsittelijämme: */ void Lopeta_Laskenta(int signo) { int vanha_errno; vanha_errno = errno; /* Kutsu aiempaa käsittelijäämme Kerro_Sigpoll */ if ((!sigismember(&ennen_set, SIGPOLL)) && (ennen_action.sa_handler != SIG_IGN) && (ennen_action.sa_handler != SIG_DFL) ) ennen_action.sa_handler(signo); /* Onko näppäilty? */ if (Luettavaa(STDIN_FILENO)) siglongjmp(lakkautus,1); errno = vanha_errno; } /* Laskennan katkaisu: */ void LaskeKunnes(float x) { sigset_t sigpoll_set; struct sigaction sigpoll_action; sigemptyset(&sigpoll_set); /* uusi SIGPOLL käsittelijä */ sigaddset(&sigpoll_set,SIGPOLL); sigprocmask(SIG_BLOCK,&sigpoll_set, &ennen_set); sigpoll_action.sa_handler = Lopeta_Laskenta; sigpoll_action.sa_mask = sigpoll_set; sigpoll_action.sa_flags = 0; sigaction(SIGPOLL, &sigpoll_action, &ennen_action); Aseta_Asyncmode(STDIN_FILENO, ON); if (sigsetjmp(lakkautus,1) == 0) { sigprocmask(SIG_UNBLOCK, &sigpoll_set, NULL); if (Luettavaa(STDIN_FILENO)) raise(SIGPOLL); Laskenta(x); sigprocmask(SIG_BLOCK, &sigpoll_set, NULL); } else /* siglongjump tuo tänne */ printf("Laskenta %.2f keskeytyi.\n",x); Aseta_Asyncmode(STDIN_FILENO,OFF); sigaction(SIGIO,&ennen_action, NULL); sigprocmask(SIG_SETMASK, &ennen_set, NULL); } /* Ja vihdoin funktio main(): */ int main(int argc, char *argv[]) { float x; sigset_t sigpoll_set; struct sigaction sigpoll_action; printf("Olen prosessi %d\n", getpid()); sigemptyset(&sigpoll_set); sigaddset(&sigpoll_set, SIGPOLL); sigpoll_action.sa_handler = Kerro_Sigpoll; sigpoll_action.sa_mask = sigpoll_set; sigpoll_action.sa_flags = 0; sigaction(SIGPOLL, &sigpoll_action, NULL); /* komentoriviargumentti blokkaa signaalin SIGPOLL */ if (argc > 1) sigprocmask(SIG_BLOCK, &sigpoll_set, NULL); printf("Anna x: "); while (scanf("%f",&x) != EOF) { if (x == 0.0) break; LaskeKunnes(x); } exit(0); }
Jan Lindström (Jan.Lindstrom@cs.Helsinki.FI)