/************************************************************************** * * * Copyright (C) 1997, Silicon Graphics, Inc. * * * * These coded instructions, statements, and computer programs contain * * unpublished proprietary information of Silicon Graphics, Inc., and * * are protected by Federal copyright law. They may not be disclosed * * to third parties or copied or duplicated in any form, in whole or * * in part, without the prior written consent of Silicon Graphics, Inc. * * * **************************************************************************/ /* * Client support. */ #include "rtmond.h" #include #include #include #include #include #define NZ(x) ((x) ? (x) : 1) static sem_t comlock; /* sema for global client list */ static client_common_t* master = NULL; /* master list of clients */ static int maxindbytes = 0; /* max over all active clients */ static void delete_client(daemon_info_t* dp, client_t* cp); void init_client(void) { if (sem_init(&comlock, 0, 1) != 0) Fatal(NULL, "Cannot initialize client list semaphore: %s", strerror(errno)); } /* * Add a new client. * * NB: clients_sem is assumed to be locked by the caller. */ static client_t* new_client(daemon_info_t* dp, client_common_t* com) { client_t* cp; IFTRACE(CLIENT)(dp, "New client at %s:%u, events %#llx, talking %s cookie %#llx%s", com->host, com->port, com->events, com->proto->name, com->cookie, !dp->isrunning ? " (start thread)" : ""); cp = (client_t*) malloc(sizeof (*cp)); if (cp != NULL) { cp->next = NULL; com->refs++; cp->com = com; cp->cookie = com->cookie; cp->events = 0; /* NB: set when started */ cp->proto = com->proto; cp->host = com->host; cp->port = com->port; cp->lastevt = 0; cp->lastkid = (int64_t) -1; memset(&cp->kids, 0, sizeof (cp->kids)); cp->kevents = 0; cp->kdrops = 0; (*com->proto->initClient)(dp, cp); /* do protocol-specific stuff */ if (io_client_init(dp, cp)) { /* setup async write support */ cp->next = (client_t*) dp->clients; dp->clients = cp; if (!dp->isrunning) /* event collection thread */ startMerge(dp); else { int v; sem_getvalue(&dp->clients_sem, &v); IFTRACE(THREAD)(dp, "Thread already running, clients_sem %d, clients %#x", v, dp->clients ); } } else delete_client(dp, cp), cp = NULL; } return (cp); } /* * Recalculate maxindbytes and update kernel * state if the value should change. */ static void calculate_maxindbytes(void) { client_common_t* com; int max = 0; sem_wait(&comlock); for (com = master; com; com = com->next) if (com->maxindbytes > max) max = com->maxindbytes; sem_post(&comlock); if (max != maxindbytes && setmaxindbytes(max)) maxindbytes = max; } /* * Delete/reclaim common state associated with a client * and terminate any active i/o thread associated with it. */ static void delete_client_common(daemon_info_t* dp, client_common_t* com) { client_common_t** cpp; IFTRACE(CLIENT)(dp, "Delete common state for %s:%u", com->host, com->port); sem_wait(&comlock); for (cpp = &master; *cpp; cpp = &(*cpp)->next) if (*cpp == com) { *cpp = com->next; /* remove from list */ break; } sem_post(&comlock); io_com_cleanup(dp, com); /* asynch write support */ if (com->fd != -1) (void) close(com->fd); /* close stream */ free((char*) com->host); if (com->maxindbytes == maxindbytes) calculate_maxindbytes(); free(com); } /* * Delete/reclaim per-CPU client state and possibly * the common state (on last reference). */ static void delete_client(daemon_info_t* dp, client_t* cp) { kidblock_t* kb; kidblock_t* next; uint32_t refs = atomicDec(&cp->com->refs); extern int iobufsiz; /* XXX */ IFTRACE(CLIENT)(dp, "Delete client %s:%u refs %u", cp->host, cp->port,refs); if ((trace & TRACE_PERF) || cp->kdrops != 0) Trace(dp, "Client %s:%u: %lu events %lu dropped (%u%%) %lu writes (%u%% push) for %llu bytes, %u push buffers for %u KB" , cp->host, cp->port , cp->kevents , cp->kdrops, 100*cp->kdrops / NZ(cp->kdrops+cp->kevents) , cp->writes, 100*cp->pushes / NZ(cp->writes) , cp->totbytes , cp->niobufs, (cp->niobufs*iobufsiz)/1024 ); for (kb = cp->kids.next; kb; kb = next) { next = kb->next; free(kb); } if (refs == 1) /* last ref of common client state */ delete_client_common(dp, cp->com); io_client_cleanup(dp, cp); free(cp); } /* * Purge all vestiges of a client given a pointer * to the common state block. */ void purge_client(client_common_t* com) { int cpu; for (cpu = getncpu()-1; cpu >= 0; cpu--) { daemon_info_t* dp = getdaemoninfo(cpu); client_t* cp; client_t** cpp; sem_wait(&dp->clients_sem); for (cpp = (client_t**) &dp->clients; cp = *cpp; cpp = &cp->next) if (cp->com == com) { *cpp = cp->next; new_cpu_mask(dp); delete_client(dp, cp); break; } sem_post(&dp->clients_sem); } } /* * Locate a client given a file descriptor. */ client_common_t* find_client(int fd) { client_common_t* com; sem_wait(&comlock); for (com = master; com && com->fd != fd; com = com->next) ; sem_post(&comlock); return (com); } /* * Create a new client's state. */ client_common_t* client_create(int fd, struct sockaddr* sockname) { client_common_t* com; const char* hostname; int port; uint64_t mask; if (sockname->sa_family == AF_INET) { struct sockaddr_in* sin = (struct sockaddr_in*) sockname; struct hostent* hp = gethostbyaddr(&sin->sin_addr, sizeof (struct in_addr), AF_INET); if (hp) { hostname = hp->h_name; mask = check_access(hostname, sin->sin_addr); } else { hostname = inet_ntoa(sin->sin_addr); mask = check_access(NULL, sin->sin_addr); } port = sin->sin_port; } else if (sockname->sa_family == AF_UNIX) { static uint clientnum = 0; /* for generating unique id */ struct in_addr in; in.s_addr = INADDR_LOOPBACK; /* use loopback addr for UNIX sockets */ mask = check_access("localhost", in); hostname = "", port = clientnum++; } else { Log(LOG_ERR, NULL, "Unknown socket address family %d", sockname->sa_family); return (NULL); } if (mask == 0) { Log(LOG_ERR, NULL, "Client %s:%u: Access denied", hostname, port); return (NULL); } com = (client_common_t*) malloc(sizeof (*com)); if (com) { memset(com, 0, sizeof (*com)); com->fd = fd; com->maxindbytes = PAR_DEFINDBYTES; com->evmask = mask; com->host = strdup(hostname); com->port = port; { int bs = 128*1024; if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &bs, sizeof (bs)) < 0) Log(LOG_WARNING, NULL, "setsockopt(SO_SNDBUF:%u): %s", bs, strerror(errno)); } sem_wait(&comlock); com->next = master; master = com; sem_post(&comlock); } else Log(LOG_ERR, NULL, "Out of memory allocating common client info"); return (com); } /* * Establish client connection state in * response to client's "open" request. */ int client_open(client_common_t* com, uint64_t cookie) { client_common_t* c; /* * Verify that the cookie is unique. If not, then * someone is probably trying to use it to do something * they're not supposed to; reject the connection and * drop the other connection since they're probably * compromised (or collaborating). */ sem_wait(&comlock); for (c = master; c && c->cookie != cookie; c = c->next) ; sem_post(&comlock); if (c) { Log(LOG_NOTICE, NULL, "Client %s:%u rejected; attempt to reuse cookie %llx owned by %s:%u", com->host, com->port, cookie, c->host, c->port); return (EPERM); } else { com->cookie = cookie; return (0); } } /* * Set client-controllable parameters in the * connection state as specified by a "params" request. */ int client_setparams(client_common_t* com, rtmon_cmd_params_t* params) { int i, n, nonZero; protosw_t* proto; if (params->ncpus > getncpu()) params->ncpus = getncpu(); nonZero = 0; for (i = 0, n = params->ncpus; n > 0; n -= 64, i++) { if (n < 64) params->cpus[i] &= ((uint64_t)1<cpus[i] != 0); } if (!nonZero) { Log(LOG_WARNING, NULL, "Client %s:%u: no valid CPUs in the cpu mask", com->host, com->port); return (ENXIO); } proto = findProtocol(params->protocol); if (proto == NULL) { Log(LOG_WARNING, NULL, "Client %s:%u: unregistered protocol %d", com->host, com->port, params->protocol); return (ENXIO); } if (params->events == 0) { Log(LOG_WARNING, NULL, "Client %s:%u: invalid (zero) event mask %#llx", com->host, com->port, params->events); return (ENXIO); } if ((params->events & com->evmask) == 0) { Log(LOG_NOTICE, NULL, "Client %s:%u rejected;" " requested event mask %#llx disallowed (retricted to %#llx)", com->host, com->port, params->events, com->evmask); return (EPERM); } params->events &= com->evmask; com->proto = proto; com->maxindbytes = ntohs(params->maxindbytes); com->events = params->events; for (i = 0, n = params->ncpus; n > 0; n -= 64, i++) com->cpus[i] = params->cpus[i]; return (0); } /* * Apply the specified function to each * CPU enabled for event collection. */ static int foreachcpu(client_common_t* com, int (*f)(daemon_info_t*, client_common_t*)) { int i, n; for (i = 0, n = RTMOND_MAXCPU; n > 0; n -= 64, i++) { uint64_t cm = com->cpus[i]; if (cm != 0) { int cpu; for (cpu = 0; cpu < 64 && cm; cpu++) { uint64_t m = 1LL<host, com->port); sem_wait(&dp->clients_sem); for (cpp = (client_t**) &dp->clients; cp = *cpp; cpp = &cp->next) if (cp->com == com) { *cpp = cp->next; new_cpu_mask(dp); break; } sem_post(&dp->clients_sem); if (cp) { if ((trace & TRACE_PERF) || cp->kdrops != 0) Trace(dp, "Client %s:%u: %lu events %lu dropped %lu writes (%u%% push) for %llu bytes", cp->host, cp->port, cp->kevents, cp->kdrops, 100*cp->kdrops / NZ(cp->kdrops+cp->kevents), cp->writes, 100*cp->pushes / NZ(cp->writes), cp->totbytes); io_client_pause(dp, cp); if (com->refs == 1) /* only ref, kill push thread */ io_com_pause(dp, com); cp->events = 0; /* restored when resume'd */ cp->next = dp->paused; /* add to paused list */ dp->paused = cp; } else Log(LOG_WARNING, dp, "pause_client: %s:%u not found", com->host, com->port); return (TRUE); /* NB: always return OK */ } /* * Implement a "suspect" request by a client. */ int client_suspend(client_common_t* com) { (void) foreachcpu(com, c_suspend); return (0); } static int c_start(daemon_info_t* dp, client_common_t* com) { client_t* cp; sem_wait(&dp->clients_sem); cp = new_client(dp, com); sem_post(&dp->clients_sem); return (cp != NULL && dp->isrunning); } /* * Check for paused stream that should be restarted. */ static int c_resume(daemon_info_t* dp, client_common_t* com) { client_t* cp; client_t** cpp; for (cpp = &dp->paused; cp = *cpp; cpp = &cp->next) if (cp->com == com) { /* * There is already a stream to this host+port; if * it is paused, just restart it. If the client * went away but we had not cleaned up yet, then * clear the state. Otherwise, if a connection * exists and is not paused, then reject the request. */ IFTRACE(CLIENT)(dp, "Resume client at %s:%u", com->host, com->port); *cpp = cp->next; /* off paused list... */ cp->lastevt = 0; /* force time sync */ if (com->refs == 1) /* restart i/o thread */ io_com_resume(com); io_client_resume(dp, cp); sem_wait(&dp->clients_sem); cp->next = (client_t*) dp->clients; /* ...on active list */ dp->clients = cp; if (!dp->isrunning) /* kick off thread */ startMerge(dp); sem_post(&dp->clients_sem); return (dp->isrunning); } return (FALSE); } /* * Resume/start event collection in response to * a client's "resume" request. */ int client_resume(client_common_t* com) { if (com->refs == 0) { if (!io_com_init(com)) /* start push thread */ return (EAGAIN); /* * Initiate event dispatching for each cpu marked for * collection. Note that all cpus get the same event * mask; if the client wants different masks for different * processors, then they need to establish a separate * data connection for each mask. Also, a single descriptor * is shared between all the event threads, but no global * sorting is done by event timestamp (only on a per-cpu * basis); thus clients may/are likely to receive events * out of order if they receive events from multiple cpus. * To deal with this a client should buffer events on a * per-cpu basis and "flush" events based on the assumption * that events are ordered on a per-cpu basis. */ com->refs++; /* hold reference */ if (foreachcpu(com, c_start)) { (void) atomicDec(&com->refs); /* * Return before completing the work so the command * response can be sent before any event data is * sent to the client. This splitup in the work * is necessary because the control protocol shares * the same connection as the event data. */ return (-1); } else { Log(LOG_ERR, NULL, "Client %s:%u: could not setup new client", com->host, com->port); (void) atomicDec(&com->refs); purge_client(com); return (EAGAIN); } } else return (foreachcpu(com, c_resume) ? 0 : EAGAIN); } static int c_writeheader(daemon_info_t* dp, client_common_t* com) { client_t* cp; sem_wait(&dp->clients_sem); for (cp = (client_t*) dp->clients; cp && cp->com != com; cp = cp->next) ; sem_post(&dp->clients_sem); if (cp) (*com->proto->writeHeader)(dp, cp, com->events); return (TRUE); } static int c_setmask(daemon_info_t* dp, client_common_t* com) { sem_wait(&dp->clients_sem); new_cpu_mask(dp); sem_post(&dp->clients_sem); return (TRUE); } /* * Complete the work associated with starting up a * new client's event collection *after* the command * response has been sent to the client. */ void client_start(client_common_t* com) { (void) foreachcpu(com, c_writeheader); if (com->maxindbytes > maxindbytes && setmaxindbytes(com->maxindbytes)) maxindbytes = com->maxindbytes; (void) foreachcpu(com, c_setmask); }