1
0
Files
irix-657m-src/eoe/cmd/rtmon/rtmond/client.c
2022-09-29 17:59:04 +03:00

559 lines
15 KiB
C

/**************************************************************************
* *
* Copyright (C) 1997, Silicon Graphics, Inc. *
* *
* These coded instructions, statements, and computer programs contain *
* unpublished proprietary information of Silicon Graphics, Inc., and *
* are protected by Federal copyright law. They may not be disclosed *
* to third parties or copied or duplicated in any form, in whole or *
* in part, without the prior written consent of Silicon Graphics, Inc. *
* *
**************************************************************************/
/*
* Client support.
*/
#include "rtmond.h"
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/par.h>
#include <netdb.h>
#define NZ(x) ((x) ? (x) : 1)
static sem_t comlock; /* sema for global client list */
static client_common_t* master = NULL; /* master list of clients */
static int maxindbytes = 0; /* max over all active clients */
static void delete_client(daemon_info_t* dp, client_t* cp);
void
init_client(void)
{
if (sem_init(&comlock, 0, 1) != 0)
Fatal(NULL, "Cannot initialize client list semaphore: %s",
strerror(errno));
}
/*
* Add a new client.
*
* NB: clients_sem is assumed to be locked by the caller.
*/
static client_t*
new_client(daemon_info_t* dp, client_common_t* com)
{
client_t* cp;
IFTRACE(CLIENT)(dp,
"New client at %s:%u, events %#llx, talking %s cookie %#llx%s",
com->host, com->port, com->events, com->proto->name,
com->cookie, !dp->isrunning ? " (start thread)" : "");
cp = (client_t*) malloc(sizeof (*cp));
if (cp != NULL) {
cp->next = NULL;
com->refs++;
cp->com = com;
cp->cookie = com->cookie;
cp->events = 0; /* NB: set when started */
cp->proto = com->proto;
cp->host = com->host;
cp->port = com->port;
cp->lastevt = 0;
cp->lastkid = (int64_t) -1;
memset(&cp->kids, 0, sizeof (cp->kids));
cp->kevents = 0;
cp->kdrops = 0;
(*com->proto->initClient)(dp, cp); /* do protocol-specific stuff */
if (io_client_init(dp, cp)) { /* setup async write support */
cp->next = (client_t*) dp->clients;
dp->clients = cp;
if (!dp->isrunning) /* event collection thread */
startMerge(dp);
else {
int v;
sem_getvalue(&dp->clients_sem, &v);
IFTRACE(THREAD)(dp,
"Thread already running, clients_sem %d, clients %#x",
v, dp->clients
);
}
} else
delete_client(dp, cp), cp = NULL;
}
return (cp);
}
/*
* Recalculate maxindbytes and update kernel
* state if the value should change.
*/
static void
calculate_maxindbytes(void)
{
client_common_t* com;
int max = 0;
sem_wait(&comlock);
for (com = master; com; com = com->next)
if (com->maxindbytes > max)
max = com->maxindbytes;
sem_post(&comlock);
if (max != maxindbytes && setmaxindbytes(max))
maxindbytes = max;
}
/*
* Delete/reclaim common state associated with a client
* and terminate any active i/o thread associated with it.
*/
static void
delete_client_common(daemon_info_t* dp, client_common_t* com)
{
client_common_t** cpp;
IFTRACE(CLIENT)(dp, "Delete common state for %s:%u", com->host, com->port);
sem_wait(&comlock);
for (cpp = &master; *cpp; cpp = &(*cpp)->next)
if (*cpp == com) {
*cpp = com->next; /* remove from list */
break;
}
sem_post(&comlock);
io_com_cleanup(dp, com); /* asynch write support */
if (com->fd != -1)
(void) close(com->fd); /* close stream */
free((char*) com->host);
if (com->maxindbytes == maxindbytes)
calculate_maxindbytes();
free(com);
}
/*
* Delete/reclaim per-CPU client state and possibly
* the common state (on last reference).
*/
static void
delete_client(daemon_info_t* dp, client_t* cp)
{
kidblock_t* kb;
kidblock_t* next;
uint32_t refs = atomicDec(&cp->com->refs);
extern int iobufsiz; /* XXX */
IFTRACE(CLIENT)(dp, "Delete client %s:%u refs %u", cp->host, cp->port,refs);
if ((trace & TRACE_PERF) || cp->kdrops != 0)
Trace(dp,
"Client %s:%u: %lu events %lu dropped (%u%%) %lu writes (%u%% push) for %llu bytes, %u push buffers for %u KB"
, cp->host, cp->port
, cp->kevents
, cp->kdrops, 100*cp->kdrops / NZ(cp->kdrops+cp->kevents)
, cp->writes, 100*cp->pushes / NZ(cp->writes)
, cp->totbytes
, cp->niobufs, (cp->niobufs*iobufsiz)/1024
);
for (kb = cp->kids.next; kb; kb = next) {
next = kb->next;
free(kb);
}
if (refs == 1) /* last ref of common client state */
delete_client_common(dp, cp->com);
io_client_cleanup(dp, cp);
free(cp);
}
/*
* Purge all vestiges of a client given a pointer
* to the common state block.
*/
void
purge_client(client_common_t* com)
{
int cpu;
for (cpu = getncpu()-1; cpu >= 0; cpu--) {
daemon_info_t* dp = getdaemoninfo(cpu);
client_t* cp;
client_t** cpp;
sem_wait(&dp->clients_sem);
for (cpp = (client_t**) &dp->clients; cp = *cpp; cpp = &cp->next)
if (cp->com == com) {
*cpp = cp->next;
new_cpu_mask(dp);
delete_client(dp, cp);
break;
}
sem_post(&dp->clients_sem);
}
}
/*
* Locate a client given a file descriptor.
*/
client_common_t*
find_client(int fd)
{
client_common_t* com;
sem_wait(&comlock);
for (com = master; com && com->fd != fd; com = com->next)
;
sem_post(&comlock);
return (com);
}
/*
* Create a new client's state.
*/
client_common_t*
client_create(int fd, struct sockaddr* sockname)
{
client_common_t* com;
const char* hostname;
int port;
uint64_t mask;
if (sockname->sa_family == AF_INET) {
struct sockaddr_in* sin = (struct sockaddr_in*) sockname;
struct hostent* hp = gethostbyaddr(&sin->sin_addr, sizeof (struct in_addr), AF_INET);
if (hp) {
hostname = hp->h_name;
mask = check_access(hostname, sin->sin_addr);
} else {
hostname = inet_ntoa(sin->sin_addr);
mask = check_access(NULL, sin->sin_addr);
}
port = sin->sin_port;
} else if (sockname->sa_family == AF_UNIX) {
static uint clientnum = 0; /* for generating unique id */
struct in_addr in;
in.s_addr = INADDR_LOOPBACK; /* use loopback addr for UNIX sockets */
mask = check_access("localhost", in);
hostname = "<local>", port = clientnum++;
} else {
Log(LOG_ERR, NULL, "Unknown socket address family %d",
sockname->sa_family);
return (NULL);
}
if (mask == 0) {
Log(LOG_ERR, NULL, "Client %s:%u: Access denied", hostname, port);
return (NULL);
}
com = (client_common_t*) malloc(sizeof (*com));
if (com) {
memset(com, 0, sizeof (*com));
com->fd = fd;
com->maxindbytes = PAR_DEFINDBYTES;
com->evmask = mask;
com->host = strdup(hostname);
com->port = port;
{ int bs = 128*1024;
if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &bs, sizeof (bs)) < 0)
Log(LOG_WARNING, NULL, "setsockopt(SO_SNDBUF:%u): %s",
bs, strerror(errno));
}
sem_wait(&comlock);
com->next = master;
master = com;
sem_post(&comlock);
} else
Log(LOG_ERR, NULL, "Out of memory allocating common client info");
return (com);
}
/*
* Establish client connection state in
* response to client's "open" request.
*/
int
client_open(client_common_t* com, uint64_t cookie)
{
client_common_t* c;
/*
* Verify that the cookie is unique. If not, then
* someone is probably trying to use it to do something
* they're not supposed to; reject the connection and
* drop the other connection since they're probably
* compromised (or collaborating).
*/
sem_wait(&comlock);
for (c = master; c && c->cookie != cookie; c = c->next)
;
sem_post(&comlock);
if (c) {
Log(LOG_NOTICE, NULL,
"Client %s:%u rejected; attempt to reuse cookie %llx owned by %s:%u",
com->host, com->port, cookie, c->host, c->port);
return (EPERM);
} else {
com->cookie = cookie;
return (0);
}
}
/*
* Set client-controllable parameters in the
* connection state as specified by a "params" request.
*/
int
client_setparams(client_common_t* com, rtmon_cmd_params_t* params)
{
int i, n, nonZero;
protosw_t* proto;
if (params->ncpus > getncpu())
params->ncpus = getncpu();
nonZero = 0;
for (i = 0, n = params->ncpus; n > 0; n -= 64, i++) {
if (n < 64)
params->cpus[i] &= ((uint64_t)1<<n)-1;
nonZero += (params->cpus[i] != 0);
}
if (!nonZero) {
Log(LOG_WARNING, NULL, "Client %s:%u: no valid CPUs in the cpu mask",
com->host, com->port);
return (ENXIO);
}
proto = findProtocol(params->protocol);
if (proto == NULL) {
Log(LOG_WARNING, NULL, "Client %s:%u: unregistered protocol %d",
com->host, com->port, params->protocol);
return (ENXIO);
}
if (params->events == 0) {
Log(LOG_WARNING, NULL, "Client %s:%u: invalid (zero) event mask %#llx",
com->host, com->port, params->events);
return (ENXIO);
}
if ((params->events & com->evmask) == 0) {
Log(LOG_NOTICE, NULL, "Client %s:%u rejected;"
" requested event mask %#llx disallowed (retricted to %#llx)",
com->host, com->port, params->events, com->evmask);
return (EPERM);
}
params->events &= com->evmask;
com->proto = proto;
com->maxindbytes = ntohs(params->maxindbytes);
com->events = params->events;
for (i = 0, n = params->ncpus; n > 0; n -= 64, i++)
com->cpus[i] = params->cpus[i];
return (0);
}
/*
* Apply the specified function to each
* CPU enabled for event collection.
*/
static int
foreachcpu(client_common_t* com, int (*f)(daemon_info_t*, client_common_t*))
{
int i, n;
for (i = 0, n = RTMOND_MAXCPU; n > 0; n -= 64, i++) {
uint64_t cm = com->cpus[i];
if (cm != 0) {
int cpu;
for (cpu = 0; cpu < 64 && cm; cpu++) {
uint64_t m = 1LL<<cpu;
if (cm & m) {
if (!(*f)(getdaemoninfo(64*i+cpu), com))
return (FALSE);
cm &= ~m;
}
}
}
}
return (TRUE);
}
/*
* Pause transmission of events to a client. We
* remove it from the client list for the thread
* that's monitoring the event queue and, potentially,
* update the kernel's tstamp event mask.
*/
static int
c_suspend(daemon_info_t* dp, client_common_t* com)
{
client_t* cp;
client_t** cpp;
IFTRACE(CLIENT)(dp, "Pause client at %s:%u", com->host, com->port);
sem_wait(&dp->clients_sem);
for (cpp = (client_t**) &dp->clients; cp = *cpp; cpp = &cp->next)
if (cp->com == com) {
*cpp = cp->next;
new_cpu_mask(dp);
break;
}
sem_post(&dp->clients_sem);
if (cp) {
if ((trace & TRACE_PERF) || cp->kdrops != 0)
Trace(dp,
"Client %s:%u: %lu events %lu dropped %lu writes (%u%% push) for %llu bytes",
cp->host, cp->port, cp->kevents,
cp->kdrops, 100*cp->kdrops / NZ(cp->kdrops+cp->kevents),
cp->writes, 100*cp->pushes / NZ(cp->writes), cp->totbytes);
io_client_pause(dp, cp);
if (com->refs == 1) /* only ref, kill push thread */
io_com_pause(dp, com);
cp->events = 0; /* restored when resume'd */
cp->next = dp->paused; /* add to paused list */
dp->paused = cp;
} else
Log(LOG_WARNING, dp, "pause_client: %s:%u not found",
com->host, com->port);
return (TRUE); /* NB: always return OK */
}
/*
* Implement a "suspect" request by a client.
*/
int
client_suspend(client_common_t* com)
{
(void) foreachcpu(com, c_suspend);
return (0);
}
static int
c_start(daemon_info_t* dp, client_common_t* com)
{
client_t* cp;
sem_wait(&dp->clients_sem);
cp = new_client(dp, com);
sem_post(&dp->clients_sem);
return (cp != NULL && dp->isrunning);
}
/*
* Check for paused stream that should be restarted.
*/
static int
c_resume(daemon_info_t* dp, client_common_t* com)
{
client_t* cp;
client_t** cpp;
for (cpp = &dp->paused; cp = *cpp; cpp = &cp->next)
if (cp->com == com) {
/*
* There is already a stream to this host+port; if
* it is paused, just restart it. If the client
* went away but we had not cleaned up yet, then
* clear the state. Otherwise, if a connection
* exists and is not paused, then reject the request.
*/
IFTRACE(CLIENT)(dp, "Resume client at %s:%u",
com->host, com->port);
*cpp = cp->next; /* off paused list... */
cp->lastevt = 0; /* force time sync */
if (com->refs == 1) /* restart i/o thread */
io_com_resume(com);
io_client_resume(dp, cp);
sem_wait(&dp->clients_sem);
cp->next = (client_t*) dp->clients; /* ...on active list */
dp->clients = cp;
if (!dp->isrunning) /* kick off thread */
startMerge(dp);
sem_post(&dp->clients_sem);
return (dp->isrunning);
}
return (FALSE);
}
/*
* Resume/start event collection in response to
* a client's "resume" request.
*/
int
client_resume(client_common_t* com)
{
if (com->refs == 0) {
if (!io_com_init(com)) /* start push thread */
return (EAGAIN);
/*
* Initiate event dispatching for each cpu marked for
* collection. Note that all cpus get the same event
* mask; if the client wants different masks for different
* processors, then they need to establish a separate
* data connection for each mask. Also, a single descriptor
* is shared between all the event threads, but no global
* sorting is done by event timestamp (only on a per-cpu
* basis); thus clients may/are likely to receive events
* out of order if they receive events from multiple cpus.
* To deal with this a client should buffer events on a
* per-cpu basis and "flush" events based on the assumption
* that events are ordered on a per-cpu basis.
*/
com->refs++; /* hold reference */
if (foreachcpu(com, c_start)) {
(void) atomicDec(&com->refs);
/*
* Return before completing the work so the command
* response can be sent before any event data is
* sent to the client. This splitup in the work
* is necessary because the control protocol shares
* the same connection as the event data.
*/
return (-1);
} else {
Log(LOG_ERR, NULL, "Client %s:%u: could not setup new client",
com->host, com->port);
(void) atomicDec(&com->refs);
purge_client(com);
return (EAGAIN);
}
} else
return (foreachcpu(com, c_resume) ? 0 : EAGAIN);
}
static int
c_writeheader(daemon_info_t* dp, client_common_t* com)
{
client_t* cp;
sem_wait(&dp->clients_sem);
for (cp = (client_t*) dp->clients; cp && cp->com != com; cp = cp->next)
;
sem_post(&dp->clients_sem);
if (cp)
(*com->proto->writeHeader)(dp, cp, com->events);
return (TRUE);
}
static int
c_setmask(daemon_info_t* dp, client_common_t* com)
{
sem_wait(&dp->clients_sem);
new_cpu_mask(dp);
sem_post(&dp->clients_sem);
return (TRUE);
}
/*
* Complete the work associated with starting up a
* new client's event collection *after* the command
* response has been sent to the client.
*/
void
client_start(client_common_t* com)
{
(void) foreachcpu(com, c_writeheader);
if (com->maxindbytes > maxindbytes && setmaxindbytes(com->maxindbytes))
maxindbytes = com->maxindbytes;
(void) foreachcpu(com, c_setmask);
}