#include "assert.h" #include "errno.h" #include "tkm.h" #include "unistd.h" #include "memory.h" #include "stdlib.h" #include "stdio.h" #include "strings.h" #include "getopt.h" #include "ulocks.h" #include "mutex.h" #include "time.h" #include "signal.h" #include "sys/types.h" #include "sys/prctl.h" #include "mbox.h" #include "syncv.h" #define STKSIZE (1024*1024*1) static void sig(int); static void node(int, int); #define TEST3_LOOKUP 1 /* c->s RPC */ #define TEST3_GETVALUE 2 /* c->s RPC */ #define TEST3_REVOKE 3 /* server -> client 1-way */ #define TEST3_RVALUE 5 /* c->s IPC/RPC */ union test3_req { struct lookup_req { unsigned long id; } lookup_req; struct getvalue_req { tk_set_t obtain; tk_set_t toreturn; tk_disp_t why; void *handle; unsigned long value; } getvalue_req; struct revoke_req { tk_set_t torevoke; tk_set_t eh; tk_disp_t which; void *handle; unsigned long id; /* used for debugging */ unsigned long value; } revoke_req; }; union test3_res { struct lookup_res { tk_set_t existance; void *handle; } lookup_res; struct getvalue_res { tk_set_t granted; tk_set_t already; unsigned long value; unsigned long id; } getvalue_res; }; /* * Service info */ #define MAXSERVICES 1 struct service_info { int server_node; /* node that will act as server */ int service_id; int cmpl; /* client multi-programming level */ void (*wait)(mesg_t *, int); void (*start)(void *, size_t); void *sdata; /* private server data */ void **cdata; /* private client data / node */ } services[MAXSERVICES]; #define S1_CP(n) (services[SERVICE1_ID].cdata[n])/* get private data for node */ #define S1_SP() (services[SERVICE1_ID].sdata)/* get server data */ #define SERVICE1_ID 0 /* are we the service1 server? */ #define S1S() (services[SERVICE1_ID].server_node) #define AM_S1S(n) (n == services[SERVICE1_ID].server_node) /* * tokens */ #define EXIST_CLASS 0 #define EXIST_READ TK_MAKE(EXIST_CLASS, TK_READ) #define DATA_CLASS 1 #define DATA_READ TK_MAKE(DATA_CLASS, TK_READ) #define SDATA_READ TK_MAKE_SINGLETON(DATA_CLASS, TK_READ) #define DATA_UPDATE TK_MAKE(DATA_CLASS, TK_WRITE) #define SDATA_UPDATE TK_MAKE_SINGLETON(DATA_CLASS, TK_WRITE) #define DATA_SET TK_ADD_SET(DATA_READ, DATA_UPDATE) #define NTOKENS 2 /* * server side * data per node */ struct server { usema_t *upd; ulock_t lckvalue; struct sdata *data; /* list of active sdata's */ }; /* server side distributed object */ struct sdata { TKS_DECL(ss, NTOKENS); unsigned long id; /* object id */ unsigned long value; /* object value */ struct sdata *next; }; /* * client side * data per node */ struct client { tks_ch_t h; /* handle (contains node we're on) */ int node; /* node we're on (for debug) */ int ndone; /* # threads done */ ulock_t *upd; sv_t wait; /* wait for in-progress objects */ struct cdata *data; /* list of active cdata's */ }; /* client side of distributed object */ struct cdata { TKC_DECL(cs, NTOKENS); unsigned long id; /* object value */ unsigned long value; /* object value */ tks_ch_t h; /* client handle ... should be global */ void *handle; /* server handle */ int flags; int ref; ulock_t *upd; struct cdata *next; }; #define CL_INPROGRESS 1 /* * htonode - convert a handle to a node number */ #define htonode(h) (h) #define nodetoh(n) (n) int nloops = 10; pid_t ppid; unsigned long ndoneclients; unsigned long amlogging; int verbose; /* * the following arena is used for all locks and semaphores for all nodes * and all services ... */ usptr_t *usptr; char *Cmd; int nodes; int dumplog = 0; int maxobjects = 100; static void service1_wait(mesg_t *, int); static void service1_start(void *, size_t); static void service1_test(int node); int main(int argc, char **argv) { int c, i; int cmpl; unsigned long totclients; int nwaiters; int usedebuglocks = 0; setlinebuf(stdout); setlinebuf(stderr); Cmd = strdup(argv[0]); ppid = getpid(); prctl(PR_COREPID, 0, 1); cmpl = 3; nodes = 4; while ((c = getopt(argc, argv, "o:dTN:vLm:n:")) != EOF) switch (c) { case 'd': usedebuglocks = 1; break; case 'T': __tk_tracenow = 1; break; case 'v': verbose++; break; case 'L': dumplog++; break; case 'm': /* node multi-programming level */ cmpl = atoi(optarg); break; case 'o': maxobjects = atoi(optarg); break; case 'N': nodes = atoi(optarg); break; case 'n': nloops = atoi(optarg); break; default: fprintf(stderr, "test3:illegal option %c\n", c); fprintf(stderr, "Usage:test3 [options]\n"); fprintf(stderr, "\t-n #\t# loops\n"); fprintf(stderr, "\t-N #\t# nodes\n"); fprintf(stderr, "\t-o #\t# objects\n"); fprintf(stderr, "\t-m #\tclient multi-programming level\n"); fprintf(stderr, "\t-v\tverbose\n"); exit(1); } printf("test3: nodes %d loops %d client-multiprogramming %d objects %d\n", nodes, nloops, cmpl, maxobjects); /* * setup services */ for (i = 0; i < MAXSERVICES; i++) services[i].service_id = -1; services[0].server_node = 0; services[0].service_id = SERVICE1_ID; services[0].cmpl = cmpl; services[0].wait = service1_wait; services[0].start = service1_start; services[0].cdata = (void **)malloc(nodes * sizeof(void *)); /* * XXX we really need more why?? * 1) each service has 'cmpl' client threads per node * 2) Each service server needs a thread per potential client thread * that is not on its node * 3) each client not on the server needs a 'wait' thread. * XX each client thread could be calling the server but * the server could also send out some RPCs that must be answered. */ totclients = nodes * cmpl; nwaiters = (nodes+1) * cmpl; nwaiters *= 2; nwaiters += (nodes - 1); usconfig(CONF_INITUSERS, nwaiters + totclients + 1); usconfig(CONF_INITSIZE, (32*1024) * nodes); sigset(SIGTERM, sig); sigset(SIGABRT, sig); sigset(SIGUSR1, sig); sigset(SIGUSR2, sig); sigset(SIGURG, sig); prctl(PR_SETEXITSIG, SIGTERM); /* initialize RPC */ initmbox("/usr/tmp/test3mbox"); /* call tkc_init to get arenas started up in correct sequence */ tkc_init(); tks_init(); /* * alloc an area for locks - note that all nodes 'share' this * but that should be OK */ if (usedebuglocks) usconfig(CONF_LOCKTYPE, US_DEBUGPLUS); usconfig(CONF_ARENATYPE, US_SHAREDONLY); if ((usptr = usinit("/usr/tmp/test3")) == NULL) abort(); /* create 'nodes' */ for (i = 0; i < nodes; i++) node(i, nodes); while (ndoneclients != totclients) sginap(100); if (dumplog) tk_printlog(stdout, 1000, dumplog > 2 ? TK_LOG_ALL : (dumplog > 1 ? TK_LOG_TS : 0), NULL); return 0; } static void sig(int s) { static FILE *f = NULL; if (s != SIGTERM && f == NULL) f = fopen("test3.LOG", "w"); if (s == SIGABRT) { /* make sure only one process logs */ sighold(SIGTERM); if (f == NULL) f = stderr; fprintf(f, "\n\n======ABORT=============\n\n"); if (test_and_set(&amlogging, 1) == 0) tk_printlog(f, -1, TK_LOG_ALL, NULL); fflush(f); sigset(SIGABRT, SIG_DFL); abort(); } else if (s == SIGUSR1 || s == SIGUSR2) { if (f == NULL) f = stdout; if (amlogging) return; fprintf(f, "\n\n===SIGUSR===============\n\n"); tk_printlog(f, -1, s == SIGUSR2 ? TK_LOG_ALL : 0, NULL); fflush(f); return; } else if (s == SIGURG) { char criteria[128]; fprintf(stdout, "Criteria?:"); fflush(stdout); if (fgets(criteria, sizeof(criteria), stdin) == NULL) criteria[0] = '\0'; if (f == NULL) f = stdout; fprintf(f, "\n\n===SIGURG===============\n\n"); tk_printlog(f, -1, TK_LOG_ALL, criteria); fflush(f); return; } sighold(SIGTERM); exit(0); } /* * Node */ /* * do all waits for ootb messages - both client and server */ /* ARGSUSED */ static void node_wait(void *a, size_t sz) { int node = (int)(ptrdiff_t)a; struct mbox *mbox; auto mesg_t *m; mbox = ntombox(node); for (;;) { readmbox(mbox, &m); (services[m->service_id].wait)(m, node); if (m->flags & MESG_RPC) replymbox(mbox, m); else freemesg(m); } } static void node(int mynode, int nodes) { int i, j; struct mbox *mb; pid_t spid; mb = allocmbox(); setmbox(mynode, mb); for (i = 0; i < MAXSERVICES; i++) { if (services[i].service_id < 0) continue; /* start up wait threads - * if we are the server then we need 'cmpl' server threads * per client node AND client wait threads. * if we are a client we need 'cmpl' client wait threads * but no server wait threads .. */ if (services[i].server_node == mynode) { int ns; ns = (nodes+1) * services[SERVICE1_ID].cmpl; for (j = 0; j < ns; j++) { if ((spid = sprocsp(node_wait, PR_SALL, (void *)(ptrdiff_t)mynode, NULL, STKSIZE)) < 0) { perror("sproc"); exit(1); } if (verbose) printf("%s:started up server wait thread on node %d pid %d\n", Cmd, mynode, spid); } } else { int ns; ns = services[SERVICE1_ID].cmpl; for (j = 0; j < ns; j++) { if ((spid = sprocsp(node_wait, PR_SALL, (void *)(ptrdiff_t)mynode, NULL, STKSIZE)) < 0) { perror("sproc"); exit(1); } if (verbose) printf("%s:started up client wait thread on node %d pid %d\n", Cmd, mynode, spid); } } /* * start up 'threads' that do testing */ if ((spid = sprocsp(services[i].start, PR_SALL, (void *)(ptrdiff_t)mynode, NULL, STKSIZE)) < 0) { perror("sproc"); exit(1); } if (verbose) printf("%s:started up service %d on node %d pid %d\n", Cmd, i, mynode, spid); } } /* * token callout modules */ static void client_obtain(void *, tk_set_t, tk_set_t, tk_disp_t, tk_set_t *); static void client_return(tkc_state_t, void *, tk_set_t, tk_set_t, tk_disp_t); static void server_recall(void *o, tks_ch_t h, tk_set_t r, tk_disp_t); static void server_recalled(void *, tk_set_t, tk_set_t); static void server_idle(void *, tk_set_t); tks_ifstate_t svriface = { server_recall, server_recalled, server_idle }; tkc_ifstate_t iface = { client_obtain, client_return }; /* * stubs and server implementation modules */ static int invoke_lookup(tks_ch_t, unsigned long, tk_set_t *, void **); static int invoke_getvalue(tks_ch_t, tk_set_t, tk_set_t, tk_disp_t, tk_set_t *, tk_set_t *, void *, unsigned long *, unsigned long *); static int server_lookup(tks_ch_t h, unsigned long, tk_set_t *, void **); static int server_getvalue(tks_ch_t, tk_set_t, tk_set_t, tk_disp_t, tk_set_t *, tk_set_t *, void *, unsigned long *, unsigned long *); static void server_return(tks_ch_t, unsigned long, void *, tk_set_t, tk_set_t, tk_disp_t); static int getlock(struct cdata *, int); static void putlock(struct cdata *, int); static struct cdata *getid(struct client *cp, unsigned long id); static void releaseid(struct client *, struct cdata *); /* ARGSUSED */ static void service1_launch(void *a, size_t sz) { service1_test((int)(ptrdiff_t)a); /* NOTREACHED */ } /* * called once per node - sets up minimal state needed */ /* ARGSUSED */ void service1_start(void *a, size_t sz) { int mynode = (int)(ptrdiff_t)a; int i; pid_t spid; struct server sd; struct client cd; /* this is what we're manipulating */ /* init a client data object */ cd.h = nodetoh(mynode); /* handle is just node number */ cd.node = mynode; cd.upd = usnewlock(usptr); sv_create(&cd.wait); cd.data = NULL; cd.ndone = 0; /* place local data in 'fake' private memory */ S1_CP(mynode) = &cd; if (AM_S1S(mynode)) { /* init server side */ sd.upd = usnewsema(usptr, 1); sd.data = NULL; /* place local data in 'fake' private memory */ S1_SP() = &sd; } /* * each client / server is multi-threaded */ for (i = 1; i < services[SERVICE1_ID].cmpl; i++) { if ((spid = sprocsp(service1_launch, PR_SALL, (void *)(ptrdiff_t)mynode, NULL, STKSIZE)) < 0) { perror("sproc"); exit(1); } if (verbose) printf("%s:started up client on node %d pid %d\n", Cmd, mynode, spid); } service1_test(mynode); /* NOTREACHED */ } /* * called by all threads - they all simulate accessing a variety of * like-objects * each thread selects an object and a mode (read/write), gets access * to it, plays with it a while and releases it. Upon last us * the object is flushed back to the server */ static void service1_test(int node) { struct client *cp; struct cdata *cd; int i, j, rw; unsigned long id, nvalue; auto unsigned int seed; seed = getpid(); cp = S1_CP(node); for (j = 0; j < nloops; j++) { id = (unsigned long)(rand_r(&seed) % maxobjects); rw = (rand_r(&seed) % 2) == 0? TK_READ : TK_WRITE; if ((cd = getid(cp, id)) == NULL) { /* some form of error */ if (verbose) printf("test3:client %d failed to get object %lu\n", node, id); sginap(4); continue; } if (verbose > 1) printf("test3:client %d has object %d\n", node, id); /* play a bit */ for (i = 0; i < 10; i++) { rw = (rand_r(&seed) % 2) == 0? TK_READ : TK_WRITE; if (getlock(cd, rw)) { /* error - server dead */ assert(0); } assert(cd->value); if (rw == TK_WRITE) { do { nvalue = rand_r(&seed); } while (nvalue == cd->value || nvalue == 0); cd->value = nvalue; } sginap(rand_r(&seed) % 5); putlock(cd, rw); } /* release object */ releaseid(cp, cd); } ussetlock(cp->upd); cp->ndone++; usunsetlock(cp->upd); test_then_add(&ndoneclients, 1); for (;;) pause(); } static struct cdata * getid(struct client *cp, unsigned long id) { struct cdata *cd, **pcd; tk_set_t granted; int rv; /* grab lock protecting client chain */ retry: ussetlock(cp->upd); for (cd = cp->data; cd; cd = cd->next) { if (cd->id == id) { /* have one already */ ussetlock(cd->upd); if (cd->flags & CL_INPROGRESS) { usunsetlock(cd->upd); sv_wait(&cp->wait, cp->upd); goto retry; } assert(cd->ref > 0); cd->ref++; usunsetlock(cd->upd); usunsetlock(cp->upd); return cd; } } /* none there - create one, put onlist and mark INPROGRESS */ cd = malloc(sizeof(*cd)); cd->handle = NULL; cd->id = id; cd->flags = CL_INPROGRESS; cd->ref = 1; cd->h = cp->h; cd->upd = usnewlock(usptr); cd->next = cp->data; cp->data = cd; usunsetlock(cp->upd); /* look up on server */ if (AM_S1S(cp->node)) { rv = server_lookup(cp->h, id, &granted, &cd->handle); } else { rv = invoke_lookup(cp->h, id, &granted, &cd->handle); } assert(rv == 0); /* * if rv != 0 then the server id dead * if granted is TK_NULLSET then server didn't have what we wanted */ if (rv == 0 && granted != TK_NULLSET) { /* we now have existance token */ tkc_create("client", cd->cs, (void *)cd, &iface, NTOKENS, granted, (void *)id); ussetlock(cd->upd); assert(cd->flags & CL_INPROGRESS); cd->flags &= ~CL_INPROGRESS; usunsetlock(cd->upd); ussetlock(cp->upd); sv_broadcast(&cp->wait); usunsetlock(cp->upd); } else { /* some form of error */ ussetlock(cp->upd); for (pcd = &cp->data; *pcd; pcd = &(*pcd)->next) { if (*pcd == cd) { *pcd = cd->next; break; } } usfreelock(cd->upd, usptr); free(cd); sv_broadcast(&cp->wait); usunsetlock(cp->upd); cd = NULL; } return cd; } /* * releaseid - client all done with object * called either from client app code or client_recall */ static void releaseid(struct client *cp, struct cdata *cd) { tk_set_t toreturn; tk_disp_t dofret; struct cdata **pcd; ussetlock(cd->upd); assert(cd->ref > 0); if (--cd->ref > 0) { usunsetlock(cd->upd); return; } /* * ref count 0 - mark as INPROGRESS, return to server, then * delete from client */ cd->flags |= CL_INPROGRESS; usunsetlock(cd->upd); tkc_release(cd->cs, EXIST_READ); /* * Must pass TK_WAIT in since otherwise we can get the * following race: * thread gets WRITE token and then releases it and it's ref on * the object (the WRITE token gets CACHED). * a revoke comes in from the server, the WRITE token goes into * the RETURNING state. * another client thread drops its ref count, the ref count goes * to 0 and it calls tkc_returning. If it doesn't wait * for the WRITE token to finish returning, this client * could come back from tkc_returning, send an RPC to the * server. get the response and call tkc_destroy - all before * the thread that is in client_revoke() gets back and runs * and calls tkc_return ... */ tkc_returning(cd->cs, TK_ADD_SET(EXIST_READ, DATA_SET), &toreturn, &dofret, TK_WAIT); assert(TK_SUB_SET(toreturn, TK_ADD_SET(EXIST_READ, DATA_SET)) == TK_NULLSET); /* * send back tokens and data * Turns out that the revoke callout is just the thing we need * It also calls tkc_return(); * Note that the token server will not return to us until any * outstanding revokes it has (that would come into client_recall()) * have completed. Thus once we return there can be no more outstanding * messages for this object. */ client_return(NULL, cd, toreturn, TK_NULLSET, dofret); /* delete from client list */ ussetlock(cp->upd); for (pcd = &cp->data; *pcd; pcd = &(*pcd)->next) { if (*pcd == cd) { *pcd = cd->next; break; } } sv_broadcast(&cp->wait); usunsetlock(cp->upd); usfreelock(cd->upd, usptr); tkc_destroy(cd->cs); free(cd); } /* * Can get the lock for either read or update */ static int getlock(struct cdata *cd, int type) { int rv; if (type == TK_READ) rv = tkc_acquire1(cd->cs, SDATA_READ); else rv = tkc_acquire1(cd->cs, SDATA_UPDATE); if (rv == 0) ussetlock(cd->upd); return rv; } static void putlock(struct cdata *cd, int type) { usunsetlock(cd->upd); if (type == TK_READ) tkc_release1(cd->cs, SDATA_READ); else tkc_release1(cd->cs, SDATA_UPDATE); } /*===================================================================*/ /* * client message ops - these correspond 1-1 with server routines * that are called from _wait */ static int invoke_lookup(tks_ch_t h, unsigned long id, tk_set_t *granted, void **handle) { mesg_t *m; union test3_req *req; union test3_res *res; int error; m = getmesg(); m->op = TEST3_LOOKUP; m->service_id = SERVICE1_ID; m->handle = h; req = (union test3_req *)&m->request; req->lookup_req.id = id; error = callmbox(ntombox(S1S()), m); if (error == 0) { res = (union test3_res *)&m->response; *granted = res->lookup_res.existance; *handle = res->lookup_res.handle; } freemesg(m); return error; } static int invoke_getvalue(tks_ch_t h, tk_set_t obtain, tk_set_t toreturn, tk_disp_t why, tk_set_t *granted, tk_set_t *already, void *handle, unsigned long *value, unsigned long *id) { mesg_t *m; union test3_req *req; union test3_res *res; int error; m = getmesg(); m->op = TEST3_GETVALUE; m->service_id = SERVICE1_ID; m->handle = h; req = (union test3_req *)&m->request; req->getvalue_req.obtain = obtain; req->getvalue_req.toreturn = toreturn; req->getvalue_req.why = why; req->getvalue_req.handle = handle; req->getvalue_req.value = *value; error = callmbox(ntombox(S1S()), m); if (error == 0) { res = (union test3_res *)&m->response; *granted = res->getvalue_res.granted; *already = res->getvalue_res.already; *value = res->getvalue_res.value; *id = res->getvalue_res.id; } freemesg(m); return error; } /*=================================================================*/ /* * server routines - invoked from _wait and from local clients */ static int server_lookup(tks_ch_t h, unsigned long id, tk_set_t *granted, void **handle) { auto tk_set_t already; struct server *sp; struct sdata *sd; assert(id < maxobjects); sp = (struct server *)S1_SP(); if (sp == NULL) { /* race .. return equiv of ENOENT */ *granted = TK_NULLSET; return 0; } uspsema(sp->upd); for (sd = sp->data; sd; sd = sd->next) { if (sd->id == id) break; } if (sd) { /* got one already */ tks_obtain(sd->ss, h, EXIST_READ, granted, NULL, &already); } else { /* create a new one */ sd = malloc(sizeof(*sd)); sd->id = id; sd->value = 1; sd->next = sp->data; sp->data = sd; tks_create("server", sd->ss, (void *)sd, &svriface, NTOKENS, (void *)id); /* all set - respond with existance token */ tks_obtain(sd->ss, h, EXIST_READ, granted, NULL, &already); } *handle = sd; usvsema(sp->upd); tks_notify_idle(sd->ss, EXIST_READ); assert(already == TK_NULLSET); assert(*granted == EXIST_READ); return 0; } /* * respond to GETVALUE request from client */ static int server_getvalue(tks_ch_t h, tk_set_t obtain, tk_set_t toreturn, tk_disp_t why, tk_set_t *granted, tk_set_t *already, void *handle, unsigned long *value, unsigned long *id) { struct sdata *sd = (struct sdata *)handle; if (toreturn != TK_NULLSET) { if (TK_IS_IN_SET(toreturn, DATA_UPDATE)) { /* shouldn't need a lock since we have WRITE token */ sd->value = *value; } assert(!TK_IS_IN_SET(toreturn, EXIST_READ)); tks_return(sd->ss, h, toreturn, TK_NULLSET, TK_NULLSET, why); } tks_obtain(sd->ss, h, obtain, granted, NULL, already); /* always give back value */ *value = sd->value; *id = sd->id; return 0; } /* * server function to handle a return from client - either via a recall * or a revoke or a client-initiated return. */ static void server_return(tks_ch_t h, unsigned long value, void *handle, tk_set_t ok, tk_set_t eh, tk_disp_t which) { struct sdata *sd = (struct sdata *)handle; if (TK_IS_IN_SET(ok, DATA_UPDATE)) { /* shouldn't need lock - no one else can touch */ sd->value = value; } /* this could trigger the tks_idle callback */ tks_return(sd->ss, h, ok, TK_NULLSET, eh, which); return; } /* * callout function - called when a token has been recalled and can now be * reclaimed/destroyed */ /* ARGSUSED */ static void server_recalled(void *o, tk_set_t r, tk_set_t refused) { assert(0); } static void server_idle(void *o, tk_set_t idle) { struct sdata *sd = (struct sdata *)o; struct sdata **psd; struct server *sp; auto tk_set_t st; int found; sp = (struct server *)S1_SP(); assert(idle == EXIST_READ); uspsema(sp->upd); tks_getstate(sd->ss, &st); if (TK_GET_LEVEL(st, EXIST_CLASS)) { /* someone grabbed it again */ usvsema(sp->upd); return; } assert(st == TK_NULLSET); /* remove from list on server */ for (psd = &sp->data, found = 0; *psd; psd = &(*psd)->next) { if (*psd == sd) { *psd = sd->next; found = 1; break; } } assert(found); usvsema(sp->upd); tks_destroy(sd->ss); free(sd); } /* * token callout for obtaining tokens */ /* ARGSUSED */ static void client_obtain(void *o, tk_set_t obtain, tk_set_t toreturn, tk_disp_t why, tk_set_t *refused) { struct cdata *cd = (struct cdata *)o; tk_set_t granted, already; auto unsigned long value, id; int error; if (AM_S1S(cd->h)) { error = server_getvalue(cd->h, obtain, toreturn, why, &granted, &already, cd->handle, &value, &id); } else { error = invoke_getvalue(cd->h, obtain, toreturn, why, &granted, &already, cd->handle, &value, &id); } assert(error == 0); assert(id == cd->id); *refused = TK_SUB_SET(obtain, TK_ADD_SET(granted, already)); assert(*refused == TK_NULLSET); assert(already == TK_NULLSET); if (TK_IS_IN_SET(granted, DATA_SET)) cd->value = value; } /* * client_return - callout from token client module */ /* ARGSUSED */ static void client_return( tkc_state_t ci, void *o, tk_set_t revoke, tk_set_t eh, tk_disp_t which) { mesg_t *m; union test3_req *req; struct cdata *cd = (struct cdata *)o; /* REFERENCED */ int error = 0; if (AM_S1S(cd->h)) { /* we are server */ server_return(cd->h, cd->value, cd->handle, revoke, eh, which); } else { m = getmesg(); m->op = TEST3_RVALUE; m->service_id = SERVICE1_ID; m->handle = cd->h; req = (union test3_req *)&m->request; req->revoke_req.torevoke = revoke; req->revoke_req.eh = eh; req->revoke_req.which = which; req->revoke_req.value = cd->value; req->revoke_req.handle = cd->handle; if (TK_IS_ANY_CLIENT(which)) { error = callmbox(ntombox(S1S()), m); freemesg(m); } else { error = writembox(ntombox(S1S()), m); } } /* inform client token module that all is done */ tkc_returned(cd->cs, revoke, TK_NULLSET); } /* * client_recall - called for a server invoked REVOKE or RECALL * Note that because of the token server protocol, it won't respond * to a client_initiated return until all server initiated revokes are * completed, we know that the handle presented here is valid. */ static void client_recall( struct client *cp, void *handle, unsigned long id, tk_set_t toreturn, tk_disp_t which) { struct cdata *cd; /* assert that handle is still valid */ ussetlock(cp->upd); for (cd = cp->data; cd; cd = cd->next) { if (cd->handle == handle) break; } assert(cd); assert(cd->id == id); usunsetlock(cp->upd); assert(!TK_IS_IN_SET(toreturn, EXIST_READ)); tkc_recall(cd->cs, toreturn, which); #if OLD ussetlock(cd->upd); usunsetlock(cp->upd); /* * as soon as we release this lock another client thread could * release this object. We don't want it to go away until * we're done with it */ assert(cd->ref > 0); cd->ref++; usunsetlock(cd->upd); assert(!TK_IS_IN_SET(toreturn, EXIST_READ)); (void) tkc_recall(cd->cs, toreturn, which); assert(cd->ref > 0); releaseid(cp, cd); #endif } /* * get client handle from within token module - used for TRACING only */ tks_ch_t getch_t(void *o) { struct cdata *cd = (struct cdata *)o; assert(cd); return(cd->h); } /* * callout function that is called from the token server module when a * token (set) needs to be revoked. It sends a message to the appropriate * client. This sends a 1 way message */ /* ARGSUSED */ static void server_recall(void *o, tks_ch_t h, tk_set_t r, tk_disp_t which) { mesg_t *m; union test3_req *req; struct sdata *sd = (struct sdata *)o; if (AM_S1S(htonode(h))) { struct client *cp; cp = (struct client *)S1_CP(htonode(h)); client_recall(cp, sd, sd->id, r, which); return; } m = getmesg(); m->op = TEST3_REVOKE; m->service_id = SERVICE1_ID; m->handle = h; req = (union test3_req *)&m->request; req->revoke_req.torevoke = r; req->revoke_req.which = which; req->revoke_req.handle = sd; req->revoke_req.id = sd->id; (void)writembox(ntombox(htonode(h)), m); } /* * this thread handles any out-of-the-blue messages * Both client and server. * 'node' is which node we are. */ static void service1_wait(mesg_t *m, int node) { struct client *cp; /* client side data */ union test3_req *req; union test3_res *res; req = (union test3_req *)&m->request; res = (union test3_res *)&m->response; switch (m->op) { /* client messages */ case TEST3_REVOKE: assert(!AM_S1S(node)); cp = (struct client *)S1_CP(node); client_recall(cp, req->revoke_req.handle, req->revoke_req.id, req->revoke_req.torevoke, req->revoke_req.which); break; /* * server messages */ case TEST3_RVALUE: { assert(AM_S1S(node)); server_return(m->handle, req->revoke_req.value, req->revoke_req.handle, req->revoke_req.torevoke, req->revoke_req.eh, req->revoke_req.which); break; } case TEST3_GETVALUE: { auto unsigned long value; assert(AM_S1S(node)); value = req->getvalue_req.value; server_getvalue(m->handle, req->getvalue_req.obtain, req->getvalue_req.toreturn, req->getvalue_req.why, &res->getvalue_res.granted, &res->getvalue_res.already, req->getvalue_req.handle, &value, &res->getvalue_res.id); res->getvalue_res.value = value; break; } case TEST3_LOOKUP: assert(AM_S1S(node)); server_lookup(m->handle, req->lookup_req.id, &res->lookup_res.existance, &res->lookup_res.handle); break; default: fprintf(stderr, "service1-wait:Unknown msg op:%d\n", m->op); abort(); break; } return; }